java web kafka的简单使用

一.依赖类库

kafka和springboot的版本需要对应,都使用最新版本即可.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.2.1.RELEASE</version>
</parent>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

二.生产者

public static final String TOPIC="smartdm_log";
private void sendMessageToKafka(RequestLogEntity requestLogEntity){
    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers","192.168.9.11:9092");
    kafkaProps.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer") ;
    KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);


    String data = sglobalGson.toJson(requestLogEntity);
    producer.send(new ProducerRecord<String, String>(TOPIC,requestLogEntity.getModel(),data));


}

 

三.消费者

在yml文件中添加配置

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      group-id: smartdm_log
      auto-offset-reset: latest
      bootstrap-servers: 192.168.9.11:9092
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLogConsumer {

    @KafkaListener(topics = {"smartdm_log"})
    public void receive(String message) {
        System.out.println("app_log--消费消息:" + message);
    }

}