标签:技术分享 get max location ati db2 1.0 bin nts
今天我们来学习一下kafka的简单的使用与配置。世上有可以挽回的和不可挽回的事,而时间经过就是一种不可挽回的事。
这里的安装以及案例都是基于window上的。kafka的运行需要java环境和zookeeper的启动。
kafka的运行需要java环境,java的下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html。
zookeeper的安装,下载地址:http://zookeeper.apache.org/releases.html。解压即可使用。
具体的可以参考这篇文章:https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html
现在我们通过java编写生产者与消费者来演示一下kafka的过程。我们的安装目录如下:
这里面的data目录是我们手动创建的,用地记录产生的日志文件。另外需要修改config下面的server.properties文件。修改如下
log.dirs=D:/Apache/apache-kafka/kafka_2.11-0.11.0.1/data
新建窗口,切换目录:cd D:\Apache\apache-kafka\kafka_2.11-0.11.0.1\bin\windows。
运行zookeeper-server-start ../../config/zookeeper.properties启动zookeeper。打印日志
[2017-11-03 23:48:54,899] INFO Reading configuration from: ..\..\config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-11-03 23:48:54,925] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-11-03 23:48:54,927] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-11-03 23:48:54,928] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2017-11-03 23:48:54,930] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2017-11-03 23:48:54,970] INFO Reading configuration from: ..\..\config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-11-03 23:48:54,973] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2017-11-03 23:48:55,005] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2017-11-03 23:48:55,009] INFO Server environment:host.name=Linux (org.apache.zookeeper.server.ZooKeeperServer) [2017-11-03 23:48:55,012] INFO Server environment:java.version=1.8.0_152 (org.apache.zookeeper.server.ZooKeeperServer) [2017-11-03 23:48:55,019] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2017-11-03 23:48:55,023] INFO Server environment:java.home=D:\Java\jdk\jdk1.8.0_152\jre (org.apache.zookeeper.server.ZooKeeperServer) ........
新建窗口,切换目录: cd D:\Apache\apache-kafka\kafka_2.11-0.11.0.1\bin\windows。
运行kafka-server-start.bat ../../config/server.properties启动kafka。打印日志
[2017-11-03 23:54:16,362] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 0.11.0-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = D:/Apache/apache-kafka/kafka_2.11-0.11.0.1/data log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.11.0-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = localhost:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-11-03 23:54:16,480] INFO starting (kafka.server.KafkaServer) [2017-11-03 23:54:16,483] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2017-11-03 23:54:16,497] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) .......
新建窗口,切换目录: cd D:\Apache\apache-kafka\kafka_2.11-0.11.0.1\bin\windows。
运行kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test创建topc,名称为test。
Created topic "test".
我们使用的maven依赖如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
package com.linux.huhx.firstdemo; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @Author: huhx * @Date: 2017-11-03 下午 4:41 */ public class HelloProducer { public static void main(String[] args) { String topicName = "test"; Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i))); } System.out.println("Message sent successfully"); producer.close(); } }
package com.linux.huhx.firstdemo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * @Author: huhx * @Date: 2017-11-03 下午 5:52 */ public class HelloConsumer { public static void main(String[] args) { String topicName = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName)); System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
运行main函数:(HelloProducer --> HelloConsumer --> HelloProducer)。整个过程HelloProducer发布了20条消息,HelloConsumer只接受到后来的10条消息。HelloConsumer的打印日志如下:
offset = 10, key = 0, value = 0 offset = 11, key = 1, value = 1 offset = 12, key = 2, value = 2 offset = 13, key = 3, value = 3 offset = 14, key = 4, value = 4 offset = 15, key = 5, value = 5 offset = 16, key = 6, value = 6 offset = 17, key = 7, value = 7 offset = 18, key = 8, value = 8 offset = 19, key = 9, value = 9
原因是topic是基于订阅发布的,不是基于队列的。
标签:技术分享 get max location ati db2 1.0 bin nts
原文地址:http://www.cnblogs.com/huhx/p/baseusekafkalearn1.html