标签:
应用场景:使用JavaHiveContext执行SQL之后,希望能得到其字段名及相应的值,但却出现"Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField"的错误,代码如下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); final StructField[] fields = schema.schema().getFields(); JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fields[i].getName() + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
在spark官网上查找序列化方面的内容,看到可以通过注册的方式自定义类的序列化方式,故在conf上添加以下设置:
conf.registerKryoClasses(new Class[] { org.apache.spark.sql.api.java.StructField.class });
测试执行后,还是报相同的错误:java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField,不知道为什么,如果有朋友知道,可在下面留言。
上述方法测不通后,又再网上寻求方法,此时看到了下面的这篇文章,内容摘录见下:http://www.cnblogs.com/zwCHAN/p/4305156.html
按照第一种方法,将依赖的变量StructField[]放到map内部定义,代码见下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StructField[] fields = schema.schema().getFields(); StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fields[i].getName() + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
测试通过,但考虑到每次map都需要从JavaSchemaRDD中获取一次schema信息,比较耗时,而在map中有只需要String类型的字段名就可以了,故在原有基础上对代码进行优化,见下:
JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlContext = new JavaHiveContext(sc); JavaSchemaRDD schema = sqlContext.sql("select * from default.dual"); StructField[] fields = schema.schema().getFields(); final String[] fieldsName = new String[fields.length]; for (int i = 0; i < fields.length; i++) { fieldsName[i] = fields[i].getName(); } JavaRDD<String> result = schema.map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { StringBuffer out = new StringBuffer(); for (int i = 0; i < row.length(); i++) { out.append(fieldsName[i] + "->" + row.get(i) + ";"); } return out.toString(); } }); System.out.println(result.collect());
以下内容摘录自:http://www.cnblogs.com/zwCHAN/p/4305156.html
出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:
If you see this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:
NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
rdd.map(s -> notSerializable.doSomething(s)).collect();
This will trigger that error. Here are some ideas to fix this error:
rdd.forEachPartition(iter -> {
NotSerializable notSerializable = new NotSerializable();
// ...Now process iter
});
另外, stackoverflow上http://stackoverflow.com/questions/25914057/task-not-serializable-exception-while-running-apache-spark-job 这个答的也很简明易懂。
标签:
原文地址:http://www.cnblogs.com/Cherise/p/4332584.html