标签:rds res spl input output double lis highlight textfile
//清理格式不匹配的数据
//此代码可以实现自动滤除掉无法转化为double类型的数据
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple7;
import java.util.ArrayList;
import java.util.List;
public class Filter {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("filter");
JavaSparkContext jsc = new JavaSparkContext(conf);
jsc.setLogLevel("Error");
String inputPath = "./data/hqf.csv";
JavaRDD<String> inputs = jsc.textFile(inputPath);
//ambient,coolant,u_d,u_q,motor_speed,torque,i_d,i_q,pm,stator_yoke,stator_tooth,stator_winding,profile_id
// 0 1 2 3 4 5 6 7 8 9 10 11 12
JavaRDD<ArrayList<Tuple7<String, String, String, String, String, String, String>>> mapRDD = inputs.map(new Function<String, ArrayList<Tuple7<String, String, String, String, String, String, String>>>() {
@Override
public ArrayList<Tuple7<String, String, String, String, String, String, String>> call(String one) throws Exception {
String[] words = one.split(",");
String others = "0.000000";
int result = 0;
int offset = 0;
String[] tpres = new String[28];
//ArrayList<Tuple7<String, String, String, String, String, String, String>> transList = new ArrayList<>();
for (String word : words) {
result = isNumDouble(word);
if (result == -1) {
tpres[offset] = "error";
offset++;
}else {
tpres[offset] = word;
offset++;
}
}
ArrayList<Tuple7<String, String, String, String, String, String, String>> list = new ArrayList<>();
Tuple7<String, String, String, String, String, String, String> tp1 = new Tuple7<>(tpres[0], tpres[1], tpres[2], tpres[3], tpres[4], tpres[5], tpres[6]);
// Tuple7<String, String, String, String, String, String, String> tp2 = new Tuple7<>(tpres[7], tpres[8], tpres[9], tpres[10], tpres[11], tpres[12], tpres[13]);
// Tuple7<String, String, String, String, String, String, String> tp3 = new Tuple7<>(tpres[14], tpres[15], tpres[16], tpres[17], tpres[18], tpres[20], tpres[21]);
// Tuple7<String, String, String, String, String, String, String> tp4 = new Tuple7<>(tpres[22], tpres[23], tpres[24], tpres[25], others, others, others);
list.add(tp1);
// list.add(tp2);
// list.add(tp3);
// list.add(tp4);
return list;
}
});
JavaRDD<ArrayList<Tuple7<String, String, String, String, String, String, String>>> filterRDD = mapRDD.filter(new Function<ArrayList<Tuple7<String, String, String, String, String, String, String>>, Boolean>() {
@Override
public Boolean call(ArrayList<Tuple7<String, String, String, String, String, String, String>> lines) throws Exception {
for (Tuple7<String, String, String, String, String, String, String> one : lines) {
if ("error".equals(one._1())) {
return false;
} else if ("error".equals(one._2())) {
return false;
} else if ("error".equals(one._3())) {
return false;
} else if ("error".equals(one._4())) {
return false;
} else if ("error".equals(one._5())) {
return false;
} else if ("error".equals(one._6())) {
return false;
} else if ("error".equals(one._7())) {
return false;
}
}
return true;
}
});
String outputPath = "./data/result.csv";
List<ArrayList<Tuple7<String, String, String, String, String, String, String>>> take = filterRDD.take(100);
for (ArrayList<Tuple7<String, String, String, String, String, String, String>> ls:take){
for (Tuple7<String, String, String, String, String, String, String> elem : ls){
System.err.println(elem._1()+"\t" +
elem._2()+"\t" +
elem._3()+"\t" +
elem._4()+"\t" +
elem._5()+"\t" +
elem._6()+"\t" +
elem._7());
}
}
filterRDD.foreach(new VoidFunction<ArrayList<Tuple7<String, String, String, String, String, String, String>>>() {
@Override
public void call(ArrayList<Tuple7<String, String, String, String, String, String, String>> lines) throws Exception {
for (Tuple7<String, String, String, String, String, String, String> elem:lines){
System.err.println(elem._1()+"\t" +
elem._2()+"\t" +
elem._3()+"\t" +
elem._4()+"\t" +
elem._5()+"\t" +
elem._6()+"\t" +
elem._7());
}
}
});
filterRDD.saveAsTextFile(outputPath);
}
public static int isNumDouble(String word){
try {
Double.parseDouble(word);
}catch (Exception e){
return -1;
}
return 0;
}
}
标签:rds res spl input output double lis highlight textfile
原文地址:https://www.cnblogs.com/walxt/p/12781954.html