k8s搭建测试kafka
一、安装单节点kafka(自己测试过)
1、创建zookeeper服务
zookeeper-service.yaml内容如下:
#Service apiVersion: v1 kind: Service metadata: name: kafka-zookeeper-service labels: name: zookeeper-service spec: selector: name: kafka-zookeeper-pod type: NodePort ports: - name: "zookeeper" port: 2181 targetPort: 2181
zookeeper-deploy.yaml内容如下:
#Deployment apiVersion: apps/v1 kind: Deployment metadata: name: kafka-zookeeper-deploy labels: name: zookeeper-deploy-label spec: replicas: 1 selector: matchLabels: name: kafka-zookeeper-pod template: metadata: labels: name: kafka-zookeeper-pod spec: terminationGracePeriodSeconds: 30 #k8s正确、优雅地关闭应用,等待时间30秒 nodeSelector: kafka: "true" containers: - name: "kafka-zookeeper" image: wurstmeister/zookeeper imagePullPolicy: IfNotPresent ports: - containerPort: 2181 volumeMounts: - name: zk-data readOnly: false mountPath: /opt/zookeeper-3.4.13/data volumes: - name: zk-data hostPath: path: /home/k8s-1.19.2/paas-basic/kafka/zookeeper_data
kafka-service.yaml的内容如下:
#Service apiVersion: v1 kind: Service metadata: name: kafka-service labels: name: kafka-service spec: selector: name: kafka-pod type: NodePort ports: - name: "kafka" port: 9092 targetPort: 9092 nodePort: 30092
kafka-deploy.yaml的内容如下:
#Deployment apiVersion: apps/v1 kind: Deployment metadata: name: kafka-deploy labels: name: kafka-deploy spec: replicas: 1 selector: matchLabels: name: kafka-pod template: metadata: labels: name: kafka-pod spec: terminationGracePeriodSeconds: 30 #k8s正确、优雅地关闭应用,等待时间30秒 nodeSelector: kafka: "true" hostname: kafka-hostname #设置pod的hostaname containers: - name: "kafka" image: wurstmeister/kafka:2.12-2.3.0 imagePullPolicy: IfNotPresent ports: - containerPort: 9092 volumeMounts: - name: kafka-volume mountPath: /kafka env: - name: KAFKA_ADVERTISED_PORT value: "9092" - name: KAFKA_ADVERTISED_HOST_NAME value: 192.168.2.100.nip.io - name: KAFKA_ZOOKEEPER_CONNECT value: kafka-zookeeper-service:2181 - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE value: "true" volumes: - name: kafka-volume hostPath: path: /root/k3s/kafka/volume
使用kafka tool连接测试:
接收测试代码:
@Log4j2 @Component public class KafkaConsumerListener { @KafkaListener(topics = "#{'${kafka.topic.subscribe}'.split(',')}", groupId = "#{'${kafka.group.id}'}") //@KafkaListener(topics = {"CDP_WECOM_TASK_STATUS_PREPROD","CDP_WECOM_TASK_TA_PREPROD"}, groupId = "#{'${kafka.group.id}'}") public void onMessage(@Payload String msg) { log.info("================receive msg:"); log.info(msg); } }
发送测试代码:
@Log4j2 @RestController @RequestMapping("/kafka") public class kafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.send}") private String topic; @GetMapping("/send") public R send(@RequestParam(required = false) String id) { String msg = new Date().toString(); try { kafkaTemplate.send(topic, "{name:" + id + ", time:\"" + new Date() + "\"}"); log.info("send success"); } catch (Exception e) { e.printStackTrace(); } return R.ok(msg); } }
kafka配置代码(支持SSL):
package com.artefact.guccitaskapi.config; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.util.ResourceUtils; import java.io.FileNotFoundException; import java.util.Map; import java.util.HashMap; import com.artefact.guccitaskapi.utils.FileUtil; @Configuration public class KafkaInitialConfiguration { @Value("${kafka.servers}") private String servers; @Value("${kafka.security.protocol}") private String protocol; @Value("${kafka.client.id}") private String client_id; @Value("${kafka.group.id}") private String group_id; @Value("${kafka.ssl.keystore.location}") private String keystore; @Value("${kafka.ssl.truststore.location}") private String truststore; @Value("${kafka.ssl.key.password}") private String password; //注入kafkaTemplate @Bean KafkaTemplate<String, String> myKafkaTemplate() throws FileNotFoundException { DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties()); DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties()); KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory); stringStringKafkaTemplate.setConsumerFactory(consumerFactory); return stringStringKafkaTemplate; } //kafka的配置 public Map<String, Object> producerProperties() throws FileNotFoundException { // kafka的相关参数 比如ip地址和分组这些参数 Map<String, Object> properties = new HashMap<>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, client_id); properties.put(ProducerConfig.CLIENT_ID_CONFIG, client_id); //ssl加密和认证配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); //获取Resources配置文件中client.keystore.jks properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); //设置为空字符串来禁用服务器主机名验证 properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); //获取Resources配置文件中client.truststore.jks properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); return properties; } public Map<String, Object> consumerProperties() throws FileNotFoundException { // 配置 Kafka 消费者 Map<String, Object> properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, client_id); //ssl加密和认证配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); //获取Resources配置文件中client.keystore.jks properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); //设置为空字符串来禁用服务器主机名验证 properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); //获取Resources配置文件中client.truststore.jks properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); return properties; } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() throws FileNotFoundException { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // consumerGroupId为空时,会用默认的groupId ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties()); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(4); // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } }
============ 欢迎各位老板打赏~ ===========
与本文相关的文章
- · Kafka 消费者poll配置
- · 软件安全:OWASP top 10详解
- · 解决Linux实例磁盘空间满问题
- · nginx主动健康检查负载均衡模块
- · Linux安装NFS
- · k3s+kuboard快速搭建K8s生产环境集群
- · filebeat按docker容器名创建不同的索引
- · nginx上传文件超出默认大小限制,提示:413 Request Entity Too Large
- · 使用traefik做为docker网关(负载均衡/滚动更新)
- · 部署docker+k3s+rancher2
- · 部署k3s+KubeSphere
- · es+filebeat+elastalert2实现异常邮件提醒