kafka数据拉取和发送

news/2025/2/26 9:24:27

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

kafka_2">1、pom文件引入kafka

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

@Component
public class TestMsgConsumer implements InitializingBean {
   

    @Value("${test.kafka.address:127.0.0.1:9092}")
    private String kafkaAddress;
    @Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")
    private String msgTopic;
    @Value("${test.consumer.name:yogima}")
    private String consumerGroupId;

    /**
     * 消费开关: true-消费,false-暂停消费
     * 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可
     */
    private Boolean consumeSwitch = true;

    public void consumerMessage(List<String> topic, String groupId) {
   
        LOGGER.info("consumer topic list1:{}",topic.toString());
        Properties props = new Properties();
        /**
         * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
         * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
         */
        LOGGER.info("test.kafka.address:{}",kafkaAddress);
        props.put("bootstrap.servers", kafkaAddress);
        /**
         * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
         * 设置一个有业务意义的名字即可
         */
        props.put("group.id", groupId);
        /**
         * 自动提交位移
         */
        props.put("enable.auto.commit", Boolean.TRUE);
        /**
         * 位移提交超时时间
         */
        props.put("auto.commit.interval.ms", "1000");
        /**
         * 从最早的消息开始消费
         * 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         */
        props.put("auto.offset.reset", "latest");
        /**
         * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
         * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
         * org.apache.kafka.common.serialization.ByteArrayDeserializer
         * StringDeserializer
         */
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**
         * 对消息体进行解序列化,与key解序列化类似
         */
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
        props.put("max.poll.records", "500");
        //fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
        props.put("fetch.message.max.bytes", "300000000");

        KafkaConsumer<String, String> consumer;

        try{
   
            /**
             * 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器
             */
            LOGGER.info("start set consumer,props:{}",props.toString());
            consumer = new KafkaConsumer<>(props);
            LOGGER.info("set consumer finished");
            /**
             * 订阅consumer group需要消费的topic列表
             */
            LOGGER.info("consumer topic list:{}",topic.toString());
            consumer.subscribe(topic);
        }catch (Exception e){
   
            LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);
            return;
        }

        /**
         * 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,
         * 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作
         */
        try {
   
            while (true) {
   
                if (!consumeSwitch) {
   
                    try {
   
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
   
                        LOGGER.error("err msg:" + e.getMessage());
                    }
                }
                /**
                 * 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
                 * consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
                 */
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                /**
                 * poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
                 * 返回即认为consumer成功消费了消息
                 */
                for (ConsumerRecord<String, String> record : records) {
   
                    LOGGER.debug("offset = {}, key = {}, value = {}"

http://www.niftyadmin.cn/n/5868471.html

相关文章

【Python爬虫(69)】解锁游戏数据宝藏:Python爬虫实战攻略

【Python爬虫】专栏简介&#xff1a;本专栏是 Python 爬虫领域的集大成之作&#xff0c;共 100 章节。从 Python 基础语法、爬虫入门知识讲起&#xff0c;深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑&#xff0c;覆盖网页、图片、音频等各类数据爬取&#xff…

01-03基于vs2022的c语言笔记——软件安装,写程序前的准备,初识c语言

目录 前言 1.c语言编程环境的安装 安装vs2022 2.写程序前的准备 ​ 3.初识c语言 3-1第一个程序Hello World 3-2注释的使用 3-3变量 ​3-4-1 字面常量 3-4-2 用const修饰常量 3-4-3用#define来修饰常量 3-5关键字 3-6标识符 前言 本套笔记是基于英雄哪里出来c语言…

Tips :仿真竞争条件 指的是什么?

文章目录 **为什么会出现仿真竞争条件?****典型场景举例****System Verilog 如何解决竞争条件?****1. 使用 `program` 块隔离测试平台****2. 使用 `clocking` 块明确时序关系****3. 非阻塞赋值(`<=`)的合理使用****竞争条件的根本原因****总结****代码结构****1. 设计模…

单目摄像头物体深度计算基础原理

三维空间物体表面点位与其在图像中对应点之间的相互关系&#xff0c;必须建立相机成像的几何模型&#xff0c;这些几何模型参数就是相机参数&#xff0c;而相机参数的求解就是相机标定。 相机的参数矩阵包括内参和外参&#xff1a; 外参&#xff1a;决定现实坐标到摄像机坐标。…

vite react 项目打包报错处理

Could not find a declaration file for module lodash 安装 Lodash 类型声明文件 # 使用 npm npm install --save-dev types/lodash# 使用 yarn yarn add -D types/lodash 打包成功

Vue进阶之AI智能助手项目(四)——ChatGPT的调用和开发

AI智能助手项目 前端接口部分src/api/index.tssrc/utils/request/index.tspost方法httpHttpOptionsrc/utils/request/axios.tsLayout布局页面-viewsexception异常页面src/views/exception/404/index.vuesrc/views/exception/500/index.vueLayout布局页面src/views/chat/layout/…

Android Audio其他——数字音频接口(附)

数字音频接口 DAI,即 Digital Audio Interfaces,顾名思义,DAI 表示在板级或板间传输数字音频信号的方式。相比于模拟接口,数字音频接口抗干扰能力更强,硬件设计简单,DAI 在音频电路设计中得到越来越广泛的应用。 一、音频链路 1、模拟音频信号 可以看到在传统的…

月之暗面改进并开源了 Muon 优化算法,对行业有哪些影响?

互联网各领域资料分享专区(不定期更新): Sheet 正文 月之暗面团队改进并开源的 Muon 优化算法 在深度学习和大模型训练领域引发了广泛关注,其核心创新在于显著降低算力需求(相比 AdamW 减少 48% 的 FLOPs)并提升训练效率,同时通过开源推动技术生态的共建。 1. 显著降低大…