码迷,mamicode.com
首页 > 其他好文 > 详细

spark常用操作(二)

时间:2020-05-08 18:23:41      阅读:85      评论:0      收藏:0      [点我收藏+]

标签:override   adjust   rcp   EDA   object   filesyste   slist   write   asList   

//spark读取数据
Dataset<Row> df = spark.read().textFile(currentSrcPath, 1);
Dataset<Row> df = spark.read().json(path);
Dataset<Row> df = spark.read().orc(path);
Dataset<Row> parquet = spark.read().parquet(path);

//spark写入数据
df.write().mode("overwrite").text(outputPath);
df.write().mode("overwrite").parquet(outputPath);
df.write().mode("overwrite").orc(outputPath);

//rdd转Dataset<Row>
Dataset<Row> df = spark.createDataFrame(rowRDD, AdjustSchema.row);

//list转Dataset
Dataset<String> dataset = spark.createDataset(Collections.singletonList(Long.toString(startTime)), Encoders.STRING());

 

//从spark获取hadoop FileSystem
FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());

 

//构建schema
public static StructType row = DataTypes.createStructType(
            Arrays.asList(
                    DataTypes.createStructField("phone_name", StringType, true),
                    DataTypes.createStructField("app_id", StringType, true)
...
));

 

//rdd/javaRDD转DataFrame(Dataset<Row>)
Dataset<Row> personDF = spark.createDataFrame(personRDD, Person.class);
spark.createDataFrame(personRDD, PersonSchema);
personDF = spark.createDataFrame(personJavaRDD, Person.class);

//rdd转Dataset
Encoder<Person> personEncoder = Encoders.bean(Person.class);
personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);

//list直接构建Dataset
Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);

//JavaRDD<Row>转Dataset<Row>
JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);

//Dataset<Person> -> JavaRDD<Person>
personJavaRDD = personDS.toJavaRDD();

//Dataset<Row> -> JavaRDD<Person>
personJavaRDD = personDF.toJavaRDD().map(row -> {
          String name = row.getAs("name");
          int age = row.getAs("age");
          return new Person(name, age);
      });

//Dataset<Person> -> Dataset<Row>
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
      Dataset<Row> personDF_fromDS = personDS.map(
              (MapFunction<Person, Row>) person -> {
                  List<Object> objectList = new ArrayList<>();
                  objectList.add(person.name);
                  objectList.add(person.age);
                  return RowFactory.create(objectList.toArray());
              },
              rowEncoder
      );

//Dataset<Row> -> Dataset<Person>
personDS = personDF.map(new MapFunction<Row, Person>() {
          @Override
          public Person call(Row value) throws Exception {
              return new Person(value.getAs("name"), value.getAs("age"));
          }
      }, personEncoder);

 

spark常用操作(二)

标签:override   adjust   rcp   EDA   object   filesyste   slist   write   asList   

原文地址:https://www.cnblogs.com/wangbin2188/p/12851952.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!