标签:serial ace batch cto message path park sts 解析
定制avro schema:
{ "type": "record", "name": "userlog", "fields": [ {"name": "ip","type": "string"}, {"name": "identity","type":"string"}, {"name": "userid","type":"int"}, {"name": "time","type": "string"}, {"name": "requestinfo","type": "string"}, {"name": "state","type": "int"}, {"name": "responce","type": "string"}, {"name": "referer","type": "string"}, {"name": "useragent","type": "string"} ] }
创建producer发送对象:
private static Producer<String, String> createProducer() { Properties props = new Properties(); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 声明kafka broker props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092,192.168.0.123:9092"); Producer<String, String> procuder = new KafkaProducer<String, String>(props); return procuder; }
读取schema文件为Schema对象:
解析schema文件
private static Schema getSchema(final Configuration hadoopConf, final String avroFilePath) { Schema schema = null; try { Path pt = new Path(avroFilePath); FileSystem fs = FileSystem.get(hadoopConf); if (fs.exists(pt)) { FSDataInputStream inputStream = fs.open(pt); Schema.Parser parser = new Schema.Parser(); schema = parser.parse(inputStream); } } catch (IOException e) { e.printStackTrace(); } return schema; }
使用Schema对象生成record存储器,并对存储进行序列化:
protected static byte[] serializeEvent(GenericRecord record) throws Exception { ByteArrayOutputStream bos = null; try { bos = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bos, null); GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema()); writer.write(record, encoder); encoder.flush(); byte[] serializedValue = bos.toByteArray(); return serializedValue; } catch (Exception ex) { throw ex; } finally { if (bos != null) { try { bos.close(); } catch (Exception e) { bos = null; } } } }
通过producer发送数据到topic:
package com.dx.streaming.producer; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; public class TestProducer { private static final String avroFilePath = "D:\\Java_Study\\workspace\\kafka-streaming-learn\\conf\\avro\\userlog.avsc"; // "/user/dx/conf/avro/userlog.avsc"; private static final String topic = "t-my"; public static void main(String[] args) throws InterruptedException { int size = 0; String appName = "Test Avro"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(appName); SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); Configuration hadoopConf = sparkSession.sparkContext().hadoopConfiguration(); Producer<String, String> procuder = createProducer(); while (true) { Random random = new Random(); String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255); String identity = UUID.randomUUID().toString(); int userid = random.nextInt(100); String time = "2018-07-03 " + random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60); String requestInfo = "...."; int state = random.nextInt(600); String responce = "..."; String referer = "..."; String useragent = "..."; Schema schema = getSchema(hadoopConf, avroFilePath); GenericRecord record = new GenericData.Record(schema); record.put("ip", ip); record.put("identity", identity); record.put("userid", userid); record.put("time", time); record.put("requestinfo", requestInfo); record.put("state", state); record.put("responce", responce); record.put("referer", referer); record.put("useragent", useragent); System.out.println(ip + "\r\n" + identity + "\r\n" + userid + "\r\n" + time); try { byte[] serializedValue = serializeEvent(record); ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, serializedValue.toString()); procuder.send(msg); } catch (Exception e) { e.printStackTrace(); } size++; if (size % 100 == 0) { size = 0; Thread.sleep(10000); if (size > 10000) { break; } } } // 列出topic的相关信息 List<PartitionInfo> partitions = new ArrayList<PartitionInfo>(); partitions = procuder.partitionsFor(topic); for (PartitionInfo p : partitions) { System.out.println(p); } System.out.println("send message over."); procuder.close(100, java.util.concurrent.TimeUnit.MILLISECONDS); } .... }
打印结果:
192:49:185:13 1b87f3ee-cdad-46c6-91e5-64e4f2711faa 59 2018-07-03 11:41:28 25:128:123:27 115235b7-771f-42b0-94e8-2d8fba60d1d3 21 2018-07-03 7:56:53
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过sparkstreaming读取kafka的数据
标签:serial ace batch cto message path park sts 解析
原文地址:https://www.cnblogs.com/yy3b2007com/p/9261205.html