Springboot集成kafka
# 工程接入
kafka批量消费
# pom文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2
3
4
# yml文件
server:
servlet:
context-path: /imdemosc
port: 8610
spring:
application:
name: imdemosc
kafka:
bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
consumer:
enable-auto-commit: true
group-id: dsaim105
# 批量一次最大拉取数据量
max-poll-records: 100
auto-commit-interval: 100
auto-offset-reset: latest
bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
producer:
# 重试次数
retries: 3
# 批量发送的消息数量
batch-size: 100
# 32MB的批处理缓冲区
buffer-memory: 33554432
bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# JAVA文件
# kafka消费者配置文件
KafkaConsumerConfig
package ok96.cn;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String consumerBootstrapServers;
@Value("${spring.kafka.producer.bootstrap-servers}")
private String producerBootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
/**
* 生产者配置信息
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 生产者工厂
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生产者模板
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消费者批量工程
*/
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(1);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1000);
return factory;
}
// @Bean
// public KafkaConsumerListener listener(){
// return new KafkaConsumerListener();
// }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# kafka监听消费入口
package ok96.cn;
import static java.util.concurrent.Executors.newCachedThreadPool;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaBatchListener {
@KafkaListener(topics = "imtest6", containerFactory = "batchFactory")
public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
System.out.println("我进来啦");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 集成KAFKA权限
SASL_PLAINTEXT
# 在kafka消费者配置文件增加配置
在producerConfigs方法和consumerConfigs方法中增加
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
2
# 在resources下增加文件kafka_client_jaas.conf
文件内容username和password为kafka中设置的权限账号密码
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="ok96"
password="ok96-password";
};
2
3
4
5
# 启动类增加setProperty
@SpringBootApplication
public class SpringbootApp {
public static void main(String[] args) {
String dirname = "classpath:kafka_client_jaas.conf";
System.setProperty("java.security.auth.login.config", dirname);
SpringApplication.run(SpringbootApp.class, args);
}
}
2
3
4
5
6
7
8
# 常用代码
# 发送实体类
也就是先将XXXX实体类转换成String
ObjectMapper mapper2 = new ObjectMapper();
kafkaTemplate.send(topic, mapper2.writeValueAsString(XXXXX));
2
# 消费实体类
也就是将String转换成XXXX实体类,可以在getTypeReference方法修改为静态方法,增加static
修改KafkaBatchListener
package cn.ok96;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaBatchListener {
public TypeReference<?> getTypeReferenceTemp() {
return new TypeReference<XXXX>() {
};
}
@KafkaListener(topics = "imtest6", containerFactory = "batchFactory")
public void listenPartition1(List<ConsumerRecord<String,String>> records) {
for (ConsumerRecord<String, String> record : records) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//消息接收正常
ObjectMapper mapper2 = new ObjectMapper();
XXXX xxxx = (XXXX) mapper2.readValue(record.value(), getTypeReferenceTemp());
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 监听topic使用配置文件常量
# yml配置文件
kafkaSelf:
topic: test
2
# JAVA代码
@KafkaListener(topics = "#{'${kafkaSelf.topic}'}", containerFactory = "batchFactory")
# 部署KAFKA
服务器IP:192.168.213.200
# Docker-单机
docker-compose.yml,连接地址192.168.213.200:19092
version: '3'
services:
kafka-zookeeper:
image: openzipkin/zipkin-kafka:2.11.12
restart: always
container_name: kafka-zookeeper
ports:
- 2181:2181
- 9092:9092
- 19092:19092
environment:
- KAFKA_ADVERTISED_HOST_NAME=192.168.213.200
2
3
4
5
6
7
8
9
10
11
12
命令行
docker run -d --restart=always --name kafka-zookeeper -p 2181:2181 -p 9092:9092 -p 19092:19092 --env KAFKA_ADVERTISED_HOST_NAME=192.168.213.200 openzipkin/zipkin-kafka:2.11.12
部署成功测试
1.开启两个终端进入kafka
docker exec -it kafka-zookeeper /bin/sh
2.分别在终端容器内部执行
终端1
unset JMX_PORT;bin/kafka-console-producer.sh --broker-list 192.168.213.200:19092 --topic test9999
终端2
unset JMX_PORT;bin/kafka-console-consumer.sh --bootstrap-server 192.168.213.200:19092 --topic test9999 --from-beginning
2
3
4
5
6
7
# 宿主机-单机
服务器ip:192.168.213.203
连接地址:192.168.213.203:9092
# 上传压缩包并解压
上传kafka_2.12-2.3.0.tgz到/data/目录下
解压
tar -zvxf kafka_2.12-2.3.0.tgz
# 修改配置并启动
修改server.properties
vi /data/kafka_2.12-2.3.0/config/server.properties
增加以下内容,保存并退出
listeners=PLAINTEXT://192.168.213.203:9092
启动
cd /data/kafka_2.12-2.3.0
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
2
3
查看日志
tail -f /data/kafka_2.12-2.3.0/logs/zookeeper.out
tail -f /data/kafka_2.12-2.3.0/logs/kafkaServer.out
2
# 创建topic
创建topic:tpc.test001
/data/kafka_2.12-2.3.0/bin/kafka-topics.sh --create --zookeeper 192.168.213.203:2181 --replication-factor 1 --partitions 1 --topic tpc.test001
# 配置SASL
# 服务端配置
# 修改server.properties
vi /data/kafka_2.12-2.3.0/config/server.properties
修改/增加以下内容,将原来写的listeners用#注释
listeners=SASL_PLAINTEXT://192.168.213.203:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin
2
3
4
5
6
7
# 创建账号
vi /data/kafka_2.12-2.3.0/config/kafka_server_jaas.conf
增加以下内容,保存并退出
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pwd"
user_admin="admin-pwd"
user_wuxiaoku="xiaoku-lzx22247a";
};
2
3
4
5
6
7
管理员账号:admin 密码 admin-pwd
普通用户账号:wuxiaoku 密码 xiaoku-lzx22247a
# 修改kafka-run-class.sh
添加java.security.auth.login.config环境变量
vi /data/kafka_2.12-2.3.0/bin/kafka-run-class.sh
增加
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_server_jaas.conf'
增加
$KAFKA_SASL_OPTS
# 客户端配置账号
# 新建文件
vi /data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf
新增以下内容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="wuxiaoku"
password="xiaoku-lzx22247a";
};
2
3
4
5
# 修改客户端脚本
消费者:
vi /data/kafka_2.12-2.3.0/config/consumer.properties
最后一行加上如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2
生产者:
vi /data/kafka_2.12-2.3.0/config/producer.properties
最后一行加上如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2
# 增加用户配置
消费者:
vi /data/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh
增加以下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
生产者:
vi /data/kafka_2.12-2.3.0/bin/kafka-console-producer.sh
增加以下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
# 重启
关闭命令:
cd /data/kafka_2.12-2.3.0
bin/kafka-server-stop.sh config/server.properties
bin/zookeeper-server-stop.sh config/zookeeper.properties
2
3
启动:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
2
# 设置用户权限
可百度 kfaka ACL
赋权:add 移除:remove 读:Read 写:Write
设置白名单:--allow-host 192.168.213.200
设置组权限:--group 设置topic权限--topic
对wuxiaoku用户的组test-consumer-group设置所有权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add --allow-principal User:wuxiaoku --operation All --group test-consumer-group
对wuxiaoku用户赋权topic读权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add --allow-principal User:wuxiaoku --allow-host '*' --operation Read --topic tpc.test001
对wuxiaoku用户移除topic读权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --remove --allow-principal User:wuxiaoku --allow-host '*' --operation Read --topic tpc.test001
对wuxiaoku用户赋权topic写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add --allow-principal User:wuxiaoku --allow-host '*' --operation Write --topic tpc.test001
对wuxiaoku用户移除topic写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --remove --allow-principal User:wuxiaoku --allow-host '*' --operation Write --topic tpc.test001
查看权限
bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=192.168.213.203:2181
# 测试
开启两个终端
1终端消费者执行命令:
/data/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.213.203:9092 --topic tpc.test001 --from-beginning --consumer.config /data/kafka_2.12-2.3.0/config/consumer.properties
2终端生产者执行命令:
/data/kafka_2.12-2.3.0/bin/kafka-console-producer.sh --broker-list 192.168.213.203:9092 --topic tpc.test001 --producer.config /data/kafka_2.12-2.3.0/config/producer.properties
在生产者终端发送随机字符串,在消费者终端能接收数据即正常
# 清除Topic下数据
例topic:cardata
1.修改保留时间为10秒
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name cardata --alter --add-config retention.ms=10000
2.查看topic策略
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-type topics --entity-name cardata
3.等待数据清理后,删除策略
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name cardata --alter --delete-config retention.ms
4.再次查看策略
2
3
4
5
6
7