标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
相关依赖
<!-- Kafka 依赖包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
一个简单的Kafka生产者一般步骤如下:
创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。
(1)
bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。
key.serializer :配置用于序列化消息 Key 的类。
value.serializer :配置用于序列化消息实际数据的类。
(2)根据 Properties 对象实例化一个 KafkaProducer 对象。
(3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。
(4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord <String,String> record 方法和sendσroducerRecord<string,string> record,Callback callback)方法,带有回调函数的 send() 方法要实现 org.apache kafka.clients.producer Callback 接口。如果消息发送发生异常, Callback 接口的 onCompletion会捕获到相应异常。 KafkaProducer 默认是异步发送消息, 会将消息缓存到消息缓冲区中,当消息 在消息缓冲区中累计到一定数量后作为一个 RecordBatch 再发迭。生产者发送消息实质分两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是 Sender 线程负责将缓冲区的消息发送到代理,执行真正的I/O操作,而在第一阶段执行完后就返回一个Future 象,根据对Future对象处理方式的不同,KafkaProducer 支持两种发送消息方式。
package com.kafka.action.chapter6.producer; import java.text.DecimalFormat; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import com.kafka.action.chapter6.dto.StockQuotationinfo; /** * * @Title: QuotationProducer.java * @Package com.kafka.action.chapter6.producer * @Description: 单线程生产者 * @author licl * @date 2018年9月9日 */ public class QuotationProducer { // 设置实例生产消息的总数 private static final int MSG_SIZE = 100; // 主题名称 private static final String TOPIC = "test"; // kafka集群 private static final String BROKER_LIST = "192.168.1.106:9092"; private static KafkaProducer<String, String> producer = null; static { /* * I I 1. 构造用于实例化 Kaf kaProducer Properties 信息 */ Properties configs = initConfig(); // II 2. 初始化一个 KafkaProducer producer = new KafkaProducer<String, String>(configs); } /* * 初始化 Kafka 配置 */ private static Properties initConfig() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return properties; } // 生产股票行情信息 private static StockQuotationinfo createQuotationinfo() { StockQuotationinfo quotationinfo = new StockQuotationinfo(); // 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码 Random r = new Random(); Integer stockCode = 600100 + r.nextInt(10); // /随机产生一个 0-1之间的浮点数 float random = (float) Math.random(); // 设置涨跌规则 if (random / 2 < 0.5) { random = -random; } // 设置保存两位有效数字 DecimalFormat decimalFormat = new DecimalFormat(".00"); // 设置最新价在 11元浮动 quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat .format(11 + random))); // 设置昨日收盘价为固定值 quotationinfo.setPreClosePrice(11.80f); // 设置开盘价 quotationinfo.setOpenPrice(11.5f); // 设置最低价,并不考虑 10% 限制,/以及当前价是否已是最低价 quotationinfo.setLowPrice(10.5f); // 设置最高价,并不考虑 10 %限制/以及当前价是否已是最高价 quotationinfo.setHighPrice(12.5f); quotationinfo.setStockCode(stockCode.toString()); quotationinfo.setTradeTime(System.currentTimeMillis()); quotationinfo.setStockName(" 股票- + stockCode"); return quotationinfo; } public static void main(String[] args) { ProducerRecord<String, String> record = null; StockQuotationinfo quotationinfo = null; try { int num = 0; for (int i = 0; i < MSG_SIZE; i++) { quotationinfo = createQuotationinfo(); record = new ProducerRecord<String, String>(TOPIC, null, quotationinfo.getTradeTime(), quotationinfo.getStockCode(), quotationinfo.toString()); // 异步发送消息 // 1.正常发送 //producer.send(record); // 2.指定回调实现逻辑 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.out.println("Send message occurs exception"); exception.printStackTrace(); } if(exception == null){ System.out.println(String.format("offset:%s,partition:%s", metadata.offset(),metadata.partition())); } } }); if (num++ % 10 == 0) { // 休眠 2s Thread.sleep(2000L); } } } catch (Exception e) { e.printStackTrace(); }finally{ producer.close(); } } }
package com.kafka.action.chapter6.producer; import java.text.DecimalFormat; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * * @Title: QuotationProducer.java * @Package com.kafka.action.chapter6.producer * @Description: 多线程生产者 * @date 2018年9月9日 */ import com.kafka.action.chapter6.dto.StockQuotationinfo; public class KafkaProducerThread implements Runnable { // 设置实例生产消息的总数 private static final int MSG_SIZE = 100; private static final String TOPIC = "test"; private KafkaProducer<String, String> producer = null; private ProducerRecord<String, String> record = null; StockQuotationinfo quotationinfo = null; ExecutorService executor = Executors.newFixedThreadPool(10); long current = System.currentTimeMillis(); private static StockQuotationinfo createQuotationinfo() { StockQuotationinfo quotationinfo = new StockQuotationinfo(); // 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码 Random r = new Random(); Integer stockCode = 600100 + r.nextInt(10); // /随机产生一个 0-1之间的浮点数 float random = (float) Math.random(); // 设置涨跌规则 if (random / 2 < 0.5) { random = -random; } // 设置保存两位有效数字 DecimalFormat decimalFormat = new DecimalFormat(".00"); // 设置最新价在 11元浮动 quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat .format(11 + random))); // 设置昨日收盘价为固定值 quotationinfo.setPreClosePrice(11.80f); // 设置开盘价 quotationinfo.setOpenPrice(11.5f); // 设置最低价,并不考虑 10% 限制,/以及当前价是否已是最低价 quotationinfo.setLowPrice(10.5f); // 设置最高价,并不考虑 10 %限制/以及当前价是否已是最高价 quotationinfo.setHighPrice(12.5f); quotationinfo.setStockCode(stockCode.toString()); quotationinfo.setTradeTime(System.currentTimeMillis()); quotationinfo.setStockName(" 股票- + stockCode"); return quotationinfo; } @Override public void run() { // 1.线程池 try { for (int i = 0; i < MSG_SIZE; i++) { quotationinfo = createQuotationinfo(); record = new ProducerRecord<String, String>(TOPIC, null, quotationinfo.getTradeTime(), quotationinfo.getStockCode(), quotationinfo.toString()); executor.submit(new KafkaProducerThread(producer, record)); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); executor.shutdown(); } } public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { this.producer = producer; this.record = record; } }
package com.kafka.action.chapter6.dto; import java.io.Serializable; public class StockQuotationinfo implements Serializable{ /** * */ private static final long serialVersionUID = 1L; public StockQuotationinfo() { super(); } //股票代码 private String stockCode ; //股票名称 private String stockName ; @Override public String toString() { return "StockQuotationinfo [stockCode=" + stockCode + ", stockName=" + stockName + ", tradeTime=" + tradeTime + ", preClosePrice=" + preClosePrice + ", openPrice=" + openPrice + ", currentPrice=" + currentPrice + ", highPrice=" + highPrice + ", lowPrice=" + lowPrice + "]"; } //交易时间 private long tradeTime; //昨日收盘价 private float preClosePrice; //开盘价 private float openPrice ; //当前价,收盘时即为当日收盘价 private float currentPrice ; //今日最高 private float highPrice; //今日最低 private float lowPrice; public StockQuotationinfo(String stockCode, String stockName, long tradeTime, float preClosePrice, float openPrice, float currentPrice, float highPrice, float lowPrice) { super(); this.stockCode = stockCode; this.stockName = stockName; this.tradeTime = tradeTime; this.preClosePrice = preClosePrice; this.openPrice = openPrice; this.currentPrice = currentPrice; this.highPrice = highPrice; this.lowPrice = lowPrice; } public String getStockCode() { return stockCode; } public void setStockCode(String stockCode) { this.stockCode = stockCode; } public String getStockName() { return stockName; } public void setStockName(String stockName) { this.stockName = stockName; } public long getTradeTime() { return tradeTime; } public void setTradeTime(long tradeTime) { this.tradeTime = tradeTime; } public float getPreClosePrice() { return preClosePrice; } public void setPreClosePrice(float preClosePrice) { this.preClosePrice = preClosePrice; } public float getOpenPrice() { return openPrice; } public void setOpenPrice(float openPrice) { this.openPrice = openPrice; } public float getCurrentPrice() { return currentPrice; } public void setCurrentPrice(float currentPrice) { this.currentPrice = currentPrice; } public float getHighPrice() { return highPrice; } public void setHighPrice(float highPrice) { this.highPrice = highPrice; } public float getLowPrice() { return lowPrice; } public void setLowPrice(float lowPrice) { this.lowPrice = lowPrice; } public static long getSerialversionuid() { return serialVersionUID; } }
消费者
package demo2; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Test; public class MyKafkaConsumer { /** * 自动提交offset */ @Test public void comsumeMsgAutoCommit() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); props.put("group.id", Constants.GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } sleep(1); } } /** * 手动提交offset */ @Test public void consumerMsgManualCommit() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); props.put("group.id", Constants.GROUP_ID); props.put("max.poll.records", 10); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); final int minBatchSize = 100; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); buffer.add(record); System.out.println(buffer.size()); } if (buffer.size() >= minBatchSize) { System.out.println("进入手动提交offset"); insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } } private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) { for (ConsumerRecord<String, String> record : buffer) { System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } private void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
package demo2; import java.net.InetAddress; public class Constants { final static String GROUP_ID = "test_group"; final static String MY_TOPIC = "test"; final static String KAFKA_SERVER_ADRESS = "192.168.1.106"; final static int KAFKA_SERVER_PORT = 9092; }
标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
原文地址:https://www.cnblogs.com/liclBlog/p/9613421.html