第一步、启动zookeeper server和kafka server
启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave
第二步、创建一个maven项目
这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.0.0.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>
第三步、kafka配置
@Configuration@EnableKafkapublicclassKafkaConfig {/* --------------producer configuration-----------------**/@Beanpublic Map<String, Object>producerConfigs() {
Map<String, Object> props =new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG,0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;
}@Beanpublic ProducerFactory<String, String>producerFactory() {returnnew DefaultKafkaProducerFactory<>(producerConfigs());
}/* --------------consumer configuration-----------------**/@Beanpublic Map<String, Object>consumerConfigs() {
Map<String, Object> props =new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"0");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;
}@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());return factory;
}@Beanpublic ConsumerFactory<String, String>consumerFactory() {returnnew DefaultKafkaConsumerFactory<>(consumerConfigs());
}@Bean//消息监听器public MyListenermyListener() {returnnew MyListener();
}/* --------------kafka template configuration-----------------**/@Beanpublic KafkaTemplate<String,String>kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate =new KafkaTemplate<>(producerFactory());return kafkaTemplate;
}
}
第四步、topic的配置
自动创建的topic分区数是1,复制因子是0
@Configuration@EnableKafkapublicclassTopicConfig {@Beanpublic KafkaAdminkafkaAdmin() {
Map<String, Object> configs =new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");returnnew KafkaAdmin(configs);
}@Beanpublic NewTopicfoo() {
/第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数//当broker个数为1个时会创建topic失败,//提示:replication factor: 2 larger than available brokers: 1//只有在集群中才能使用kafka的备份功能returnnew NewTopic("foo",10, (short)2);
}@Beanpublic NewTopicbar() {returnnew NewTopic("bar",10, (short)2);
}@Beanpublic NewTopictopic1(){returnnew NewTopic("topic1",10, (short)2);
}@Beanpublic NewTopictopic2(){returnnew NewTopic("topic2",10, (short)2);
}
}
第五步、使用@KafkaListener注解
topicPartitions和topics、topicPattern不能同时使用
publicclass MyListener {
@KafkaListener(id ="myContainer1",//id是消费者监听容器
topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,//topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
{ @TopicPartition(topic ="topic1", partitions = {"0","3" }),
@TopicPartition(topic ="topic2", partitions ="0",
partitionOffsets = @PartitionOffset(partition ="1", initialOffset ="4"))
})publicvoidlisten(ConsumerRecord<?, ?> record) {
System.out.println("topic" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
@KafkaListener(id ="myContainer2",topics = {"foo","bar"})publicvoidlisten2(ConsumerRecord<?, ?> record){
System.out.println("topic:" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
}
第六步、创建发送消息的接口
@RestControllerpublicclassKafkaController {privatefinalstatic Logger logger = LoggerFactory.getLogger(KafkaController.class);@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@RequestMapping(value ="/{topic}/send",method = RequestMethod.GET)publicvoidsendMeessageTotopic1(@PathVariable String topic,@RequestParam(value ="partition",defaultValue ="0")int partition) {
logger.info("start send message to {}",topic);
kafkaTemplate.send(topic,partition,"你","好");
}
}
第七步、启动程序、调用接口
消息监听器只监听订阅的topic的特定分区的消息
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2
声明:本站所有文章,如无特殊说明或标注,均为网络收集发布。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。