标签:
<span style="font-size:12px;"> public class MyProcessor extends Processor { private ProcessorContext context; private KeyValueStore kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); this.kvStore = (KeyValueStore) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) { String[] words = line.toLowerCase().split(" "); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } @Override public void punctuate(long timestamp) { KeyValueIterator iter = this.kvStore.all(); while (iter.hasNext()) { KeyValue entry = iter.next(); context.forward(entry.key, entry.value.toString()); } iter.close(); context.commit(); } @Override public void close() { this.kvStore.close(); } };</span>
<span style="font-size:12px;"> TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE") .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");</span>
TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE") // create the in-memory state store "COUNTS" associated with processor "PROCESS1" .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1") .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") // connect the state store "COUNTS" with processor "PROCESS2" .connectProcessorAndStateStores("PROCESS2", "COUNTS"); .addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");
KStreamBuilder builder = new KStreamBuilder(); KStream source1 = builder.stream("topic1", "topic2"); KTable source2 = builder.table("topic3");
// written in Java 8+, using lambda expressions KStream mapped = source1.mapValue(record -> record.get("category"));
// written in Java 8+, using lambda expressions KTable, Long> counts = source1.aggregateByKey( () -> 0L, // initial value (aggKey, value, aggregate) -> aggregate + 1L, // aggregating value HoppingWindows.of("counts").with(5000L).every(1000L), // intervals in milliseconds ); KStream joined = source1.leftJoin(source2, (record1, record2) -> record1.get("user") + "-" + record2.get("region"); );
joined.to("topic4");
// equivalent to // // joined.to("topic4"); // materialized = builder.stream("topic4"); KStream materialized = joined.through("topic4");
名称
|
描述
|
类型
|
默认值
|
application.id
|
流处理应用的标识,对同一个应用需要一致,因为它是作为消费的group_id的
|
string
|
|
bootstrap.servers
|
host1:port1,host2:port2 这样的列表,是用来发现所有Kafka节点的种子,因此不需要配上所有的Kafka节点
|
list
|
|
client.id
|
应用的一个客户端的逻辑名称,设定后可以区分是哪个客户端在请求
|
string
|
“"
|
zookeeper.connect
|
zookeeper连接串
|
string
|
“"
|
key.serde
|
键的序列化/反序列化类
|
class
|
class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
|
partition.grouper
|
用于分区组织的类,需要实现PartitionGrouper接口
|
class
|
class org.apache.kafka.streams.processor.DefaultPartitionGrouper
|
replication.factor
|
流处理应用会创建change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数
|
int
|
1
|
state.dir
|
状态仓库的存储路径
|
string
|
/tmp/kafka-streams
|
timestamp.extractor
|
时间戳抽取类,需要实现TimestampExtractor接口
|
class
|
class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
|
value.serde
|
值的序列化/反序列化类
|
class
|
class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
|
buffered.records.per.partition
|
每个分区缓存的最大记录数
|
int
|
1000
|
commit.interval.ms
|
存储处理器当前位置的间隔毫秒数
|
long
|
30000
|
metric.reporters
|
用于性能报告的类列表。需要实现MetricReporter接口。JmxReporter会永远开启不需要指定
|
list
|
[]
|
metric.num.samples
|
计算性能需要的采样数
|
int
|
2
|
metric.sample.window.ms
|
性能采样的时间间隔
|
long
|
30000
|
num.standby.replicas
|
每个任务的后备副本数
|
int
|
0
|
num.stream.threads
|
执行流处理的线程数
|
int
|
1
|
poll.ms
|
等待输入的毫秒数
|
long
|
100
|
state.cleanup.delay.ms
|
一个分区迁移后,在删除状态前等待的毫秒数
|
long
|
60000
|
标签:
原文地址:http://blog.csdn.net/mayp1/article/details/51626643