分类目录

链接

2024 年 1 月
1234567
891011121314
15161718192021
22232425262728
293031  

近期文章

热门标签

新人福利,免费薅羊毛

现在位置:    首页 > k8s, 云计算, 分布式, 系统架构 > 正文
k8s搭建测试kafka
k8s, 云计算, 分布式, 系统架构 暂无评论 阅读(32)

一、安装单节点kafka(自己测试过)

1、创建zookeeper服务

zookeeper-service.yaml内容如下:

#Service
apiVersion: v1
kind: Service
metadata:
    name: kafka-zookeeper-service
    labels:
        name: zookeeper-service
spec:    
    selector:
        name: kafka-zookeeper-pod 
    type: NodePort
    ports:
    - name: "zookeeper"
      port: 2181
      targetPort: 2181

zookeeper-deploy.yaml内容如下:

#Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
    name: kafka-zookeeper-deploy 
    labels:
        name: zookeeper-deploy-label
spec:
    replicas: 1
    selector:
      matchLabels:
        name: kafka-zookeeper-pod
    template:
        metadata:
            labels:
                name: kafka-zookeeper-pod
        spec:
            terminationGracePeriodSeconds: 30  #k8s正确、优雅地关闭应用,等待时间30秒
            nodeSelector:
              kafka: "true"
            containers:
            - name: "kafka-zookeeper"
              image: wurstmeister/zookeeper
              imagePullPolicy: IfNotPresent
              ports:
              - containerPort: 2181
              volumeMounts:
              - name: zk-data
                readOnly: false
                mountPath: /opt/zookeeper-3.4.13/data
            volumes:
            - name: zk-data
              hostPath:
                path: /home/k8s-1.19.2/paas-basic/kafka/zookeeper_data

kafka-service.yaml的内容如下:

#Service
apiVersion: v1
kind: Service
metadata:
    name: kafka-service 
    labels:
        name: kafka-service
spec:    
    selector:
        name: kafka-pod 
    type: NodePort
    ports:
    - name: "kafka"
      port: 9092
      targetPort: 9092
      nodePort: 30092

kafka-deploy.yaml的内容如下:

#Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
    name: kafka-deploy 
    labels:
        name: kafka-deploy
spec:
    replicas: 1
    selector:
      matchLabels:
        name: kafka-pod
    template:
        metadata:
            labels:
                name: kafka-pod
        spec:
            terminationGracePeriodSeconds: 30  #k8s正确、优雅地关闭应用,等待时间30秒
            nodeSelector:
              kafka: "true"
            hostname: kafka-hostname       #设置pod的hostaname
            containers:
            - name: "kafka"
              image: wurstmeister/kafka:2.12-2.3.0
              imagePullPolicy: IfNotPresent
              ports:
              - containerPort: 9092
              volumeMounts:
              - name: kafka-volume
                mountPath: /kafka
              env:
              - name: KAFKA_ADVERTISED_PORT
                value: "9092"
              - name: KAFKA_ADVERTISED_HOST_NAME
                value: 192.168.2.100.nip.io
              - name: KAFKA_ZOOKEEPER_CONNECT
                value: kafka-zookeeper-service:2181
              - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
                value: "true"
            volumes:
            - name: kafka-volume
              hostPath:
                path: /root/k3s/kafka/volume

 

使用kafka tool连接测试:

 

接收测试代码:

@Log4j2
@Component
public class KafkaConsumerListener { 

    @KafkaListener(topics = "#{'${kafka.topic.subscribe}'.split(',')}", groupId = "#{'${kafka.group.id}'}")
    //@KafkaListener(topics = {"CDP_WECOM_TASK_STATUS_PREPROD","CDP_WECOM_TASK_TA_PREPROD"}, groupId = "#{'${kafka.group.id}'}")
    public void onMessage(@Payload String msg) { 

        log.info("================receive msg:");
        log.info(msg);

    }
}

发送测试代码:

@Log4j2
@RestController
@RequestMapping("/kafka")
public class kafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    @Value("${kafka.topic.send}")
    private String topic;


    @GetMapping("/send")
    public R send(@RequestParam(required = false) String id) {

        String msg = new Date().toString();


        try {
            kafkaTemplate.send(topic, "{name:" + id + ", time:\"" + new Date() + "\"}");
            log.info("send success");
        } catch (Exception e) {
            e.printStackTrace();
        }


        return R.ok(msg);
    }
}

 

kafka配置代码(支持SSL):

package com.artefact.guccitaskapi.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.ResourceUtils;

import java.io.FileNotFoundException;
import java.util.Map;
import java.util.HashMap;

import com.artefact.guccitaskapi.utils.FileUtil;

@Configuration
public class KafkaInitialConfiguration {

    @Value("${kafka.servers}")
    private String servers;

    @Value("${kafka.security.protocol}")
    private String protocol;

    @Value("${kafka.client.id}")
    private String client_id;

    @Value("${kafka.group.id}")
    private String group_id;

    @Value("${kafka.ssl.keystore.location}")
    private String keystore;


    @Value("${kafka.ssl.truststore.location}")
    private String truststore;


    @Value("${kafka.ssl.key.password}")
    private String password;

    //注入kafkaTemplate
    @Bean
    KafkaTemplate<String, String> myKafkaTemplate() throws FileNotFoundException {
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties());
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());

        KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory);
        stringStringKafkaTemplate.setConsumerFactory(consumerFactory);

        return stringStringKafkaTemplate;
    }


    //kafka的配置
    public Map<String, Object> producerProperties() throws FileNotFoundException {
        // kafka的相关参数 比如ip地址和分组这些参数
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, client_id);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, client_id);

        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);

        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);

        return properties;
    }

    public Map<String, Object> consumerProperties() throws FileNotFoundException {
        // 配置 Kafka 消费者
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, client_id);

        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(keystore).getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.getResourceFile(truststore).getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);

        return properties;
    }


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() throws FileNotFoundException {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // consumerGroupId为空时,会用默认的groupId
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());

        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(4);
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

}

 

 

 

============ 欢迎各位老板打赏~ ===========

本文版权归Bruce's Blog所有,转载引用请完整注明以下信息:
本文作者:Bruce
本文地址:k8s搭建测试kafka | Bruce's Blog

发表评论

留言无头像?