标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
相关依赖
<!-- Kafka 依赖包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
一个简单的Kafka生产者一般步骤如下:
创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。
(1)
bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。
key.serializer :配置用于序列化消息 Key 的类。
value.serializer :配置用于序列化消息实际数据的类。
(2)根据 Properties 对象实例化一个 KafkaProducer 对象。
(3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。
(4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord <String,String> record 方法和sendσroducerRecord<string,string> record,Callback callback)方法,带有回调函数的 send() 方法要实现 org.apache kafka.clients.producer Callback 接口。如果消息发送发生异常, Callback 接口的 onCompletion会捕获到相应异常。 KafkaProducer 默认是异步发送消息, 会将消息缓存到消息缓冲区中,当消息 在消息缓冲区中累计到一定数量后作为一个 RecordBatch 再发迭。生产者发送消息实质分两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是 Sender 线程负责将缓冲区的消息发送到代理,执行真正的I/O操作,而在第一阶段执行完后就返回一个Future 象,根据对Future对象处理方式的不同,KafkaProducer 支持两种发送消息方式。

package com.kafka.action.chapter6.producer;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import com.kafka.action.chapter6.dto.StockQuotationinfo;
/**
*
* @Title: QuotationProducer.java
* @Package com.kafka.action.chapter6.producer
* @Description: 单线程生产者
* @author licl
* @date 2018年9月9日
*/
public class QuotationProducer {
// 设置实例生产消息的总数
private static final int MSG_SIZE = 100;
// 主题名称
private static final String TOPIC = "test";
// kafka集群
private static final String BROKER_LIST = "192.168.1.106:9092";
private static KafkaProducer<String, String> producer = null;
static {
/*
* I I 1. 构造用于实例化 Kaf kaProducer Properties 信息
*/
Properties configs = initConfig();
// II 2. 初始化一个 KafkaProducer
producer = new KafkaProducer<String, String>(configs);
}
/*
* 初始化 Kafka 配置
*/
private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return properties;
}
// 生产股票行情信息
private static StockQuotationinfo createQuotationinfo() {
StockQuotationinfo quotationinfo = new StockQuotationinfo();
// 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码
Random r = new Random();
Integer stockCode = 600100 + r.nextInt(10);
// /随机产生一个 0-1之间的浮点数
float random = (float) Math.random();
// 设置涨跌规则
if (random / 2 < 0.5) {
random = -random;
}
// 设置保存两位有效数字
DecimalFormat decimalFormat = new DecimalFormat(".00");
// 设置最新价在 11元浮动
quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat
.format(11 + random)));
// 设置昨日收盘价为固定值
quotationinfo.setPreClosePrice(11.80f);
// 设置开盘价
quotationinfo.setOpenPrice(11.5f);
// 设置最低价,并不考虑 10% 限制,/以及当前价是否已是最低价
quotationinfo.setLowPrice(10.5f);
// 设置最高价,并不考虑 10 %限制/以及当前价是否已是最高价
quotationinfo.setHighPrice(12.5f);
quotationinfo.setStockCode(stockCode.toString());
quotationinfo.setTradeTime(System.currentTimeMillis());
quotationinfo.setStockName(" 股票- + stockCode");
return quotationinfo;
}
public static void main(String[] args) {
ProducerRecord<String, String> record = null;
StockQuotationinfo quotationinfo = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
quotationinfo = createQuotationinfo();
record = new ProducerRecord<String, String>(TOPIC, null,
quotationinfo.getTradeTime(),
quotationinfo.getStockCode(), quotationinfo.toString());
// 异步发送消息
// 1.正常发送
//producer.send(record);
// 2.指定回调实现逻辑
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
System.out.println("Send message occurs exception");
exception.printStackTrace();
}
if(exception == null){
System.out.println(String.format("offset:%s,partition:%s", metadata.offset(),metadata.partition()));
}
}
});
if (num++ % 10 == 0) {
// 休眠 2s
Thread.sleep(2000L);
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
producer.close();
}
}
}
package com.kafka.action.chapter6.producer;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
*
* @Title: QuotationProducer.java
* @Package com.kafka.action.chapter6.producer
* @Description: 多线程生产者
* @date 2018年9月9日
*/
import com.kafka.action.chapter6.dto.StockQuotationinfo;
public class KafkaProducerThread implements Runnable {
// 设置实例生产消息的总数
private static final int MSG_SIZE = 100;
private static final String TOPIC = "test";
private KafkaProducer<String, String> producer = null;
private ProducerRecord<String, String> record = null;
StockQuotationinfo quotationinfo = null;
ExecutorService executor = Executors.newFixedThreadPool(10);
long current = System.currentTimeMillis();
private static StockQuotationinfo createQuotationinfo() {
StockQuotationinfo quotationinfo = new StockQuotationinfo();
// 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码
Random r = new Random();
Integer stockCode = 600100 + r.nextInt(10);
// /随机产生一个 0-1之间的浮点数
float random = (float) Math.random();
// 设置涨跌规则
if (random / 2 < 0.5) {
random = -random;
}
// 设置保存两位有效数字
DecimalFormat decimalFormat = new DecimalFormat(".00");
// 设置最新价在 11元浮动
quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat
.format(11 + random)));
// 设置昨日收盘价为固定值
quotationinfo.setPreClosePrice(11.80f);
// 设置开盘价
quotationinfo.setOpenPrice(11.5f);
// 设置最低价,并不考虑 10% 限制,/以及当前价是否已是最低价
quotationinfo.setLowPrice(10.5f);
// 设置最高价,并不考虑 10 %限制/以及当前价是否已是最高价
quotationinfo.setHighPrice(12.5f);
quotationinfo.setStockCode(stockCode.toString());
quotationinfo.setTradeTime(System.currentTimeMillis());
quotationinfo.setStockName(" 股票- + stockCode");
return quotationinfo;
}
@Override
public void run() {
// 1.线程池
try {
for (int i = 0; i < MSG_SIZE; i++) {
quotationinfo = createQuotationinfo();
record = new ProducerRecord<String, String>(TOPIC, null,
quotationinfo.getTradeTime(),
quotationinfo.getStockCode(), quotationinfo.toString());
executor.submit(new KafkaProducerThread(producer, record));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
executor.shutdown();
}
}
public KafkaProducerThread(KafkaProducer<String, String> producer,
ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}
}
package com.kafka.action.chapter6.dto;
import java.io.Serializable;
public class StockQuotationinfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public StockQuotationinfo() {
super();
}
//股票代码
private String stockCode ;
//股票名称
private String stockName ;
@Override
public String toString() {
return "StockQuotationinfo [stockCode=" + stockCode + ", stockName="
+ stockName + ", tradeTime=" + tradeTime + ", preClosePrice="
+ preClosePrice + ", openPrice=" + openPrice
+ ", currentPrice=" + currentPrice + ", highPrice=" + highPrice
+ ", lowPrice=" + lowPrice + "]";
}
//交易时间
private long tradeTime;
//昨日收盘价
private float preClosePrice;
//开盘价
private float openPrice ;
//当前价,收盘时即为当日收盘价
private float currentPrice ;
//今日最高
private float highPrice;
//今日最低
private float lowPrice;
public StockQuotationinfo(String stockCode, String stockName,
long tradeTime, float preClosePrice, float openPrice,
float currentPrice, float highPrice, float lowPrice) {
super();
this.stockCode = stockCode;
this.stockName = stockName;
this.tradeTime = tradeTime;
this.preClosePrice = preClosePrice;
this.openPrice = openPrice;
this.currentPrice = currentPrice;
this.highPrice = highPrice;
this.lowPrice = lowPrice;
}
public String getStockCode() {
return stockCode;
}
public void setStockCode(String stockCode) {
this.stockCode = stockCode;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public long getTradeTime() {
return tradeTime;
}
public void setTradeTime(long tradeTime) {
this.tradeTime = tradeTime;
}
public float getPreClosePrice() {
return preClosePrice;
}
public void setPreClosePrice(float preClosePrice) {
this.preClosePrice = preClosePrice;
}
public float getOpenPrice() {
return openPrice;
}
public void setOpenPrice(float openPrice) {
this.openPrice = openPrice;
}
public float getCurrentPrice() {
return currentPrice;
}
public void setCurrentPrice(float currentPrice) {
this.currentPrice = currentPrice;
}
public float getHighPrice() {
return highPrice;
}
public void setHighPrice(float highPrice) {
this.highPrice = highPrice;
}
public float getLowPrice() {
return lowPrice;
}
public void setLowPrice(float lowPrice) {
this.lowPrice = lowPrice;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
}
消费者
package demo2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
public class MyKafkaConsumer {
/**
* 自动提交offset
*/
@Test
public void comsumeMsgAutoCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
props.put("group.id", Constants.GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
sleep(1);
}
}
/**
* 手动提交offset
*/
@Test
public void consumerMsgManualCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
props.put("group.id", Constants.GROUP_ID);
props.put("max.poll.records", 10);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
final int minBatchSize = 100;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
buffer.add(record);
System.out.println(buffer.size());
}
if (buffer.size() >= minBatchSize) {
System.out.println("进入手动提交offset");
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
for (ConsumerRecord<String, String> record : buffer) {
System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package demo2;
import java.net.InetAddress;
public class Constants {
final static String GROUP_ID = "test_group";
final static String MY_TOPIC = "test";
final static String KAFKA_SERVER_ADRESS = "192.168.1.106";
final static int KAFKA_SERVER_PORT = 9092;
}
标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final
原文地址:https://www.cnblogs.com/liclBlog/p/9613421.html