java web kafka的简单使用
一.依赖类库
kafka和springboot的版本需要对应,都使用最新版本即可.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.3.RELEASE</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.3.1</version> </dependency>
二.生产者
public static final String TOPIC="smartdm_log"; private void sendMessageToKafka(RequestLogEntity requestLogEntity){ Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","192.168.9.11:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") ; KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps); String data = sglobalGson.toJson(requestLogEntity); producer.send(new ProducerRecord<String, String>(TOPIC,requestLogEntity.getModel(),data)); }
三.消费者
在yml文件中添加配置
spring: kafka: consumer: enable-auto-commit: true group-id: smartdm_log auto-offset-reset: latest bootstrap-servers: 192.168.9.11:9092
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaLogConsumer { @KafkaListener(topics = {"smartdm_log"}) public void receive(String message) { System.out.println("app_log--消费消息:" + message); } }