k8s搭建测试kafka
一、安装单节点kafka(自己测试过)
1、创建zookeeper服务
zookeeper-service.yaml内容如下:
#Service
apiVersion: v1
kind: Service
metadata:
name: kafka-zookeeper-service
labels:
name: zookeeper-service
spec:
selector:
name: kafka-zookeeper-pod
type: NodePort
ports:
- name: "zookeeper"
port: 2181
targetPort: 2181
zookeeper-deploy.yaml内容如下:
#Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-zookeeper-deploy
labels:
name: zookeeper-deploy-label
spec:
replicas: 1
selector:
matchLabels:
name: kafka-zookeeper-pod
template:
metadata:
labels:
name: kafka-zookeeper-pod
spec:
terminationGracePeriodSeconds: 30 #k8s正确、优雅地关闭应用,等待时间30秒
nodeSelector:
kafka: "true"
containers:
- name: "kafka-zookeeper"
image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
ports:
- containerPort: 2181
volumeMounts:
- name: zk-data
readOnly: false
mountPath: /opt/zookeeper-3.4.13/data
volumes:
- name: zk-data
hostPath:
path: /home/k8s-1.19.2/paas-basic/kafka/zookeeper_data
kafka-service.yaml的内容如下:
#Service
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
name: kafka-service
spec:
selector:
name: kafka-pod
type: NodePort
ports:
- name: "kafka"
port: 9092
targetPort: 9092
nodePort: 30092
kafka-deploy.yaml的内容如下:
#Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deploy
labels:
name: kafka-deploy
spec:
replicas: 1
selector:
matchLabels:
name: kafka-pod
template:
metadata:
labels:
name: kafka-pod
spec:
terminationGracePeriodSeconds: 30 #k8s正确、优雅地关闭应用,等待时间30秒
nodeSelector:
kafka: "true"
hostname: kafka-hostname #设置pod的hostaname
containers:
- name: "kafka"
image: wurstmeister/kafka:2.12-2.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
volumeMounts:
- name: kafka-volume
mountPath: /kafka
env:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
value: 192.168.2.100.nip.io
- name: KAFKA_ZOOKEEPER_CONNECT
value: kafka-zookeeper-service:2181
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
volumes:
- name: kafka-volume
hostPath:
path: /root/k3s/kafka/volume
使用kafka tool连接测试:

接收测试代码:
@Log4j2
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "#{'${kafka.topic.subscribe}'.split(',')}", groupId = "#{'${kafka.group.id}'}")
//@KafkaListener(topics = {"CDP_WECOM_TASK_STATUS_PREPROD","CDP_WECOM_TASK_TA_PREPROD"}, groupId = "#{'${kafka.group.id}'}")
public void onMessage(@Payload String msg) {
log.info("================receive msg:");
log.info(msg);
}
}
发送测试代码:
@Log4j2
@RestController
@RequestMapping("/kafka")
public class kafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.send}")
private String topic;
@GetMapping("/send")
public R send(@RequestParam(required = false) String id) {
String msg = new Date().toString();
try {
kafkaTemplate.send(topic, "{name:" + id + ", time:\"" + new Date() + "\"}");
log.info("send success");
} catch (Exception e) {
e.printStackTrace();
}
return R.ok(msg);
}
}
kafka配置代码(支持SSL):
package com.artefact.guccitaskapi.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.ResourceUtils;
import java.io.FileNotFoundException;
import java.util.Map;
import java.util.HashMap;
import com.artefact.guccitaskapi.utils.FileUtil;
@Configuration
public class KafkaInitialConfiguration {
@Value("${kafka.servers}")
private String servers;
@Value("${kafka.security.protocol}")
private String protocol;
@Value("${kafka.client.id}")
private String client_id;
@Value("${kafka.group.id}")
private String group_id;
@Value("${kafka.ssl.keystore.location}")
private String keystore;
@Value("${kafka.ssl.truststore.location}")
private String truststore;
@Value("${kafka.ssl.key.password}")
private String password;
//注入kafkaTemplate
@Bean
KafkaTemplate<String, String> myKafkaTemplate() throws FileNotFoundException {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties());
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory);
stringStringKafkaTemplate.setConsumerFactory(consumerFactory);
return stringStringKafkaTemplate;
}
//kafka的配置
public Map<String, Object> producerProperties() throws FileNotFoundException {
// kafka的相关参数 比如ip地址和分组这些参数
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, client_id);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, client_id);
//ssl加密和认证配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
//获取Resources配置文件中client.keystore.jks
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
//设置为空字符串来禁用服务器主机名验证
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
//获取Resources配置文件中client.truststore.jks
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
return properties;
}
public Map<String, Object> consumerProperties() throws FileNotFoundException {
// 配置 Kafka 消费者
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, client_id);
//ssl加密和认证配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
//获取Resources配置文件中client.keystore.jks
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
//设置为空字符串来禁用服务器主机名验证
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
//获取Resources配置文件中client.truststore.jks
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
return properties;
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() throws FileNotFoundException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// consumerGroupId为空时,会用默认的groupId
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(4);
// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
============ 欢迎各位老板打赏~ ===========
与本文相关的文章
- · Kafka 消费者poll配置
- · 利用k8s ingress访问非POD服务
- · windows下利用wsl+sshpass 自动发布脚本
- · 单台服务器应用不中断服务热部署滚动更新方案
- · git tracked remote 关联远程分支
- · 软件安全:OWASP top 10详解
- · 解决Linux实例磁盘空间满问题
- · nginx主动健康检查负载均衡模块
- · Linux安装NFS
- · k3s+kuboard快速搭建K8s生产环境集群
- · filebeat按docker容器名创建不同的索引
- · nginx上传文件超出默认大小限制,提示:413 Request Entity Too Large
