码迷,mamicode.com
首页 > 其他好文 > 详细

kafka相关

时间:2021-04-06 15:02:20      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:sys   record   ble   imm   bootstra   https   org   mil   tps   

https://www.jianshu.com/p/abbc09ed6703

https://blog.csdn.net/wangshuminjava/article/details/80238161?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-4&spm=1001.2101.3001.4242

创建topic和指定分区数

    @GetMapping("/createTopic")
    public CreateTopicsResult createTopic(){
        NewTopic newTopic = new NewTopic("createTopic", 3, (short) 1);
        CreateTopicsResult result = kafkaAdminClient.createTopics(Arrays.asList(newTopic));
        return result;
    }
发送消息

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @Resource
    private KafkaAdminClient kafkaAdminClient;



    @RequestMapping("/sendMsg")
    public String sendMsg(@RequestParam String msg){
        try {
            kafkaTemplate.send("createTopic",msg);
            return "发送消息成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "发送消息失败";
        }
    }
    消费消息注解版

@KafkaListener(topics = "jikeyiTest" )
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \n", record.topic(), record.offset(),record.partition(), record.value());
        ack.acknowledge();
    }
消费消息复杂版,指定分区

    public static void startConsume(){
        Properties props = new Properties();
        // 必须设置的属性
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "yasuo");

        Consumer consumer = new KafkaConsumer(props);
        TopicPartition topicPartition = new TopicPartition("createTopic",1);
        consumer.assign(Arrays.asList(topicPartition));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \n", record.topic(), record.offset(),record.partition(), record.value());
            });
        }
    }
配置文件


spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: yasuo
      enable-auto-commit: false
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      max-poll-records: 1
    listener:
      ack-mode: manual_immediate

 

kafka相关

标签:sys   record   ble   imm   bootstra   https   org   mil   tps   

原文地址:https://www.cnblogs.com/jikeyi/p/14617212.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!