标签:spark
String host = args[0];
int port = Integer.parseInt(args[1]);
String host1 = args[2];
int port1 = Integer.parseInt(args[3]);
InetSocketAddress address1 = new InetSocketAddress(host,port);
InetSocketAddress address2 = new InetSocketAddress(host1,port1);
InetSocketAddress[] InetSocketAddressArray = {address1,address2};
JavaStreamingContext jssc = new JavaStreamingContext(new SparkConf().setAppName("JavaFlumeEventHandle_1"), Durations.seconds(2));
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc, InetSocketAddressArray, StorageLevel.MEMORY_AND_DISK_SER_2());
String host = args[0];
int port = Integer.parseInt(args[1]);
String host1 = args[2];
int port1 = Integer.parseInt(args[3]);
InetSocketAddress address1 = new InetSocketAddress(host,port);
InetSocketAddress address2 = new InetSocketAddress(host1,port1);
InetSocketAddress[] InetSocketAddressArray = {address1,address2};
JavaStreamingContext jssc = new JavaStreamingContext(new SparkConf().setAppName("JavaFlumeEventHandle_1"), Durations.seconds(2));
// JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc, InetSocketAddressArray, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jssc,host,port);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream1 = FlumeUtils.createPollingStream(jssc, host1, port1);
JavaDStream<SparkFlumeEvent> union = flumeStream.union(flumeStream1);
一个spark receiver 或多个spark receiver 接收 多个flume agent
标签:spark
原文地址:http://blog.csdn.net/kntao/article/details/44943267