Spring Boot集成Kafka


声明:本文转载自https://my.oschina.net/LaravelShao/blog/1788005,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

系统环境

使用远程服务器上搭建的kafka服务

  • Ubuntu 16.04 LTS
  • kafka_2.12-0.11.0.0.tgz
  • zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>      <groupId>com.laravelshao.springboot</groupId>     <artifactId>spring-boot-integration-kafka</artifactId>     <version>0.0.1-SNAPSHOT</version>     <packaging>jar</packaging>      <name>spring-boot-integration-kafka</name>     <description>Demo project for Spring Boot</description>      <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>2.0.0.RELEASE</version>         <relativePath/> <!-- lookup parent from repository -->     </parent>      <properties>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>         <java.version>1.8</java.version>     </properties>      <dependencies>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter</artifactId>         </dependency>         <!--kafka-->         <dependency>             <groupId>org.springframework.kafka</groupId>             <artifactId>spring-kafka</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-json</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-test</artifactId>             <scope>test</scope>         </dependency>     </dependencies>      <build>         <plugins>             <plugin>                 <groupId>org.springframework.boot</groupId>                 <artifactId>spring-boot-maven-plugin</artifactId>             </plugin>         </plugins>     </build> </project> 

2.添加配置信息,这里使用yml文件

spring:   kafka:     bootstrap-servers:X.X.X.X:9092     producer:       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer     consumer:       group-id: test       auto-offset-reset: earliest       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer       properties:         spring:           json:             trusted:               packages: com.laravelshao.springboot.kafka 

3.创建消息对象

public class Message {     private Integer id;     private String msg;      public Message() {     }      public Message(Integer id, String msg) {         this.id = id;         this.msg = msg;     }      public Integer getId() {         return id;     }      public void setId(Integer id) {         this.id = id;     }      public String getMsg() {         return msg;     }      public void setMsg(String msg) {         this.msg = msg;     }      @Override     public String toString() {         return "Message{" +                 "id=" + id +                 ", msg='" + msg + '\'' +                 '}';     } } 

4.创建生产者

package com.laravelshao.springboot.kafka;  import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;  /**  * Created by shaoqinghua on 2018/3/23.  */ @Component public class Producer {     private static Logger log = LoggerFactory.getLogger(Producer.class);      @Autowired     private KafkaTemplate kafkaTemplate;      public void send(String topic, Message message) {         kafkaTemplate.send(topic, message);         log.info("Producer->topic:{}, message:{}", topic, message);     }  } 

5.创建消费者,使用@ KafkaListener注解监听主题

package com.laravelshao.springboot.kafka;  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;  /**  * Created by shaoqinghua on 2018/3/23.  */ @Component public class Consumer {     private static Logger log = LoggerFactory.getLogger(Consumer.class);      @KafkaListener(topics = "test_topic")     public void receive(ConsumerRecord<String, Message> consumerRecord) {         log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());     }  } 

6.发送消费测试

package com.laravelshao.springboot;  import com.laravelshao.springboot.kafka.Message; import com.laravelshao.springboot.kafka.Producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext;  @SpringBootApplication public class IntegrationKafkaApplication {      public static void main(String[] args) throws InterruptedException {         ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);         Producer producer = context.getBean(Producer.class);          for (int i = 1; i < 10; i++) {             producer.send("test_topic", new Message(i, "test topic message " + i));             Thread.sleep(2000);         }     }  } 

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). 	at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) 	at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) 	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191) 	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) 	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) 	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) 	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) 	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) 	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) 	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667) 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 	at java.util.concurrent.FutureTask.run(FutureTask.java:266) 	at java.lang.Thread.run(Thread.java:745) 

解决方法:将当前包添加到kafka信任的包路径下

spring:   kafka:     consumer:       properties:         spring:           json:             trusted:               packages: com.laravelshao.springboot.kafka

 

本文发表于2018年04月01日 22:38
(c)注:本文转载自https://my.oschina.net/LaravelShao/blog/1788005,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 3041 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1