码迷,mamicode.com
首页 > 编程语言 > 详细

关于livy的 java api 报错org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.xxx.wordcount.WordCountJavaSpark

时间:2019-02-05 22:17:50      阅读:314      评论:0      收藏:0      [点我收藏+]

标签:core   ati   reac   jar   1.2   ===   resolve   def   end   

Livy Java api

依赖

<dependency>
  <groupId>org.apache.livy</groupId>
  <artifactId>livy-client-http</artifactId>
  <version>0.5.0-incubating</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.2</version>
</dependency>

业务程序

public class WordCountJavaSpark implements Job<Object> {
    /**
     * call就是执行逻辑
     * @param jobContext
     * @return
     * @throws Exception
     */
    @Override
    public Object call(JobContext jobContext) throws Exception {
        JavaSparkContext sc = jobContext.sc();
        Map<String ,Integer> mp = new HashMap<String, Integer>();


        // 此处要使用hdfs的ha路径,则需要在livy的livy-env.sh中配置HADOOP_CONF_DIR
        JavaRDD<String> javaRDD = sc.textFile("hdfs://myha/data/livy/zookeeper.out");
        JavaRDD<String> flatMapedRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        JavaPairRDD<String,Integer> mapedRDD = flatMapedRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1);
            }
        });

        JavaPairRDD<String, Integer> reduceJavaPariRDD = mapedRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer1, Integer integer2) throws Exception {
                return integer1 + integer2;
            }
        });


        reduceJavaPariRDD.collect().forEach((tuple2)->{
            System.out.println(tuple2._1+"<===>"+tuple2._2);
            mp.put(tuple2._1,tuple2._2);
        });

        return mp;
    }
}

启动程序

public class StartApp {
    private static LivyClient client = null;

    public static void main(String[] args) {

        String livyURI ="http://192.168.128.100:8998";

        //jar包的位置
        String file = "/Users/chenzhuanglan/WorkSpace/livyjavatest/out/artifacts/livyWC/WordCountJavaSpark.jar";
        
        try{
            client = new LivyClientBuilder().setURI(new URI(livyURI)).build();

            System.err.printf("Uploading %s to the Spark context...\n", file);
            // 将 spark job的 jar包上传到服务器上
            client.uploadJar(new File(file)).get();

            System.err.printf("Running WordCountJavaSpark ...\n");
            // 提交作业
            HashMap<String,Integer> map = (HashMap<String, Integer>) client.submit(new WordCountJavaSpark()).get();

            map.forEach((k,v)->{
                System.out.println(k+"==="+v);
            });


        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

}

注意、注意、注意!

log4j:WARN No appenders could be found for logger (org.apache.livy.shaded.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Uploading /Users/chenzhuanglan/WorkSpace/testwcjava/target/testwcjava-1.0-SNAPSHOT.jar to the Spark context...
Running WordCountJavaSpark ...
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.czlan.wordcount.WordCountJavaSpark
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
org.apache.livy.shaded.kryo.kryo.Kryo.readClass(Kryo.java:656)
org.apache.livy.shaded.kryo.kryo.Kryo.readClassAndObject(Kryo.java:767)
org.apache.livy.client.common.Serializer.deserialize(Serializer.java:63)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:39)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:42)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
    at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:198)
    at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:88)
    at com.czlan.wordcount.StartApp2.main(StartApp2.java:32)

上面这个报错是因为
client.uploadJar(new File(file)).get();写成了client.uploadFile(new File(file)).get();
uploadJar 是上传要添加到Spark应用程序类路径中的jar
uploadFile 是上传要传递给Spark应用程序的文件

关于livy的 java api 报错org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.xxx.wordcount.WordCountJavaSpark

标签:core   ati   reac   jar   1.2   ===   resolve   def   end   

原文地址:https://www.cnblogs.com/czlan91/p/10353139.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!