K8S-Kafka-Strimzi-Kubedge实验

本文最后更新于 2024年5月28日 凌晨

实验八、容器编排与通信

实验目标

使用docker cri做运行时容器,k8s做弹性容器管理系统,在此之上用kafka做中间件,实现一个简单的Producer-Consumer机制的demo。

最后利用kubedge框架将代理节点(broker节点)变为云端,剩下3个节点变成边缘节点

实验过程

按照顺序,在华为云申请4个ECS节点,使用kubeadm部署k8s节点。先建立自己的docker image用于部署kafka application,最后部署kafka中间件、kubedge框架。

k8s部署

申请节点

在华为云上申请4个ecs服务器,并带上弹性公网ip

  • 系统:CentOS 7

  • 规格:2vCPUs | 4GiB | s7.large.2

  • 安全组设置:k8s

    已保存在云端,可以下载导入:k8s-security

kubeadm搭建k8s集群

参考:https://www.cnblogs.com/xuweiweiwoaini/p/13884112.html

共同步骤:该部分操作需要在所有节点上执行
  • 关闭防火墙, 禁止防火墙开机自启

    1
    2
    systemctl stop firewalld
    systemctl disable firewalld
  • 永久关闭swap分区

    1
    2
    sed -ri 's/.*swap.*/#&/' /etc/fstab
    reboot

    这里会重启主机,等待即可

  • 设置主机名

    注意这里要设置成ecs服务器的内网ip

    1
    2
    3
    4
    5
    6
    cat >> /etc/hosts << EOF
    your-master-node-ip k8s-master
    your-node1-ip k8s-01
    your-node2-ip k8s-02
    your-node3-ip k8s-03
    EOF
  • 桥接的IPv4流量传递到iptables的链

    1
    2
    3
    4
    5
    6
    7
    8
    9
    cat > /etc/sysctl.d/k8s.conf << EOF
    net.bridge.bridge-nf-call-ip6tables = 1
    net.bridge.bridge-nf-call-iptables = 1
    net.ipv4.ip_forward = 1
    vm.swappiness = 0
    EOF

    modprobe br_netfilter
    sysctl --system
  • 时间同步

    1
    2
    yum install ntpdate -y
    ntpdate time.windows.com
  • 开启ipvs

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    yum -y install ipset ipvsadm

    cat > /etc/sysconfig/modules/ipvs.modules <<EOF
    #!/bin/bash
    modprobe -- ip_vs
    modprobe -- ip_vs_rr
    modprobe -- ip_vs_wrr
    modprobe -- ip_vs_sh
    modprobe -- nf_conntrack_ipv4
    EOF

    chmod 755 /etc/sysconfig/modules/ipvs.modules && bash /etc/sysconfig/modules/ipvs.modules && lsmod | grep -e ip_vs -e nf_conntrack_ipv4
  • 安装Docker

    1
    2
    3
    4
    5
    wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo

    yum -y install docker-ce-18.06.3.ce-3.el7
    systemctl enable docker && systemctl start docker
    docker version
  • 设置Docker镜像加速器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    sudo mkdir -p /etc/docker
    sudo tee /etc/docker/daemon.json <<-'EOF'
    {
    "exec-opts": ["native.cgroupdriver=systemd"],
    "registry-mirrors": ["https://b9pmyelo.mirror.aliyuncs.com"]
    }
    EOF

    sudo systemctl daemon-reload

    sudo systemctl restart docker
  • 添加阿里云的YUM软件源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    cat > /etc/yum.repos.d/kubernetes.repo << EOF
    [kubernetes]
    name=Kubernetes
    baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
    enabled=1
    gpgcheck=0
    repo_gpgcheck=0
    gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
    EOF
  • 安装kubeadm、kubelet和kubectl

    这里指定版本号为 k8s 1.18,可以自行安装更高版本的

    1
    yum install -y kubelet-1.18.0 kubeadm-1.18.0 kubectl-1.18.0
    • 修改 /etc/sysconfig/kubelet
    1
    2
    3
    4
    vim /etc/sysconfig/kubelet

    # 修改
    KUBELET_EXTRA_ARGS="--cgroup-driver=systemd"
    • 设置开机自启动

      1
      systemctl enable kubelet
单独部署
  • Master节点部署

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    kubeadm init \
    --apiserver-advertise-address=192.168.217.100 \
    --image-repository registry.aliyuncs.com/google_containers \
    --kubernetes-version v1.18.0 \
    --service-cidr=10.96.0.0/12 \
    --pod-network-cidr=10.244.0.0/16


    mkdir -p $HOME/.kube
    sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
    sudo chown $(id -u):$(id -g) $HOME/.kube/config

  • Master节点获取加入命令

    1
    kubeadm token create --ttl 0 --print-join-command
  • 其他节点加入集群

    复制 加入命令,shell执行即可

  • Master节点使用kubectl工具查看节点状态

    1
    kubectl get nodes
  • Master节点部署CNI网络插件

    1
    2
    3
    4
    wget https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml

    kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml

创建docker image

  • 如果你没有docker账号,请先注册docker账号,并创建一个repo

  • 或者使用我准备好的docker image,那么你可以跳过整个2.2

    1
    shallowdream2/mykafka:latest
  • 为了模拟kafka中的生产者-消费者模型,我们需要4个文件,分别是producer.py , consumer1.py, consumer2.py, Dockerfile

  • Dockerfile

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 使用官方的Python基础镜像
    FROM python:3.9

    # 安装Kafka Python客户端
    RUN pip install kafka-python

    # 将生产者和消费者脚本复制到容器中
    COPY producer.py /app/producer.py
    COPY consumer1.py /app/consumer1.py
    COPY consumer2.py /app/consumer2.py

    WORKDIR /app

  • producer.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from kafka import KafkaProducer
    import time
    import json

    # 创建Kafka生产者
    producer = KafkaProducer(bootstrap_servers='kafka:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))


    # 生成消息并发送到Kafka
    while True:
    message1 = {'message1': 'Hello Kafka, send to 1', 'timestamp': time.time()}
    message2 = {'message2': 'Hello Kafka, send to 2', 'timestamp': time.time()}
    producer.send('test-topic', value=message1)
    producer.send('test-topic-2', value=message2)
    print(f'Sent: {message1}')
    print(f'Sent: {message2}')
    time.sleep(5)

  • consumer1.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from kafka import KafkaConsumer
    import json

    # 创建Kafka消费者
    consumer = KafkaConsumer('test-topic',
    bootstrap_servers='kafka:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    # 处理消息
    for message in consumer:
    print(f'Consumer1 received: {message.value}')

  • consumer2.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from kafka import KafkaConsumer
    import json

    # 创建Kafka消费者
    consumer = KafkaConsumer('test-topic-2',
    bootstrap_servers='kafka:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    # 处理消息
    for message in consumer:
    print(f'Consumer2 received: {message.value}')

  • 创建docker镜像并push到docker hub

    1
    2
    3
    4
    # 假设上述文件在文件夹mykafka下
    cd mykafka
    docker build -t <your-docker-name/your-repo:your-tag>
    docker push <your-docker-name/your-repo:your-tag>

部署 kafka

以下配置只需要在master节点执行,默认在~目录下。我只给出kafka-deployment.yaml的详情作为示例,其余见仓库

配置文件

详见:https://github.com/shallowdream2/k8s-kafka-experiment

  • 创建kafka-deployment.yaml

    注意metadata中的name 字段不可改为kafka,会导致宏定义冲突使得port被错误设置为非INT

    参考issue: https://github.com/wurstmeister/kafka-docker/issues/122

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: kafka-cluster
    labels:
    app: kafka
    spec:
    replicas: 1
    selector:
    matchLabels:
    app: kafka
    template:
    metadata:
    labels:
    app: kafka
    spec:
    tolerations:
    - key: "node-role.kubernetes.io/master"
    operator: "Exists"
    effect: "NoSchedule"
    nodeSelector:
    kafka-broker: "true"
    containers:
    - name: kafka
    image: wurstmeister/kafka:latest
    ports:
    - containerPort: 9092
    env:
    - name: KAFKA_LISTENERS
    value: PLAINTEXT://0.0.0.0:9092
    - name: KAFKA_ADVERTISED_LISTENERS
    value: PLAINTEXT://kafka:9092
    - name: KAFKA_ZOOKEEPER_CONNECT
    value: zookeeper:2181
    - name: KAFKA_PORT
    value: "9092"
    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: kafka
    labels:
    app: kafka
    spec:
    ports:
    - port: 9092
    protocol: TCP
    targetPort: 9092
    selector:
    app: kafka
    type: ClusterIP

  • 创建zookeeper-deployment.yaml

  • 创建kafka-topic.yaml

  • 创建producer-deployment.yaml

  • 创建consumer1-deployment.yaml

  • 创建consumer2-deployment.yaml

将上述文件上传到master节点,执行以下命令:

  • 查看nodes是否都是Ready状态
1
kubectl get nodes
  • 部署deployment && service && job

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    kubectl apply -f kafka-deployment.yaml

    kubectl apply -f zookeeper-deployment.yaml

    kubectl apply -f consumer1-deployment.yaml

    kubectl apply -f consumer2-deployment.yaml

    kubectl apply -f producer-deployment.yaml

    kubectl apply -f kafka-topic.yaml
  • 查看运行状态

    等待一会儿,如果都是Running,则运行成功!

    1
    kubectl get pods
  • 打印节点的输出

    pod-name可以在kubectl get pods中查看,-f参数意为一直跟踪

    1
    kubectl logs -f <pod-name>

部署Kubedge

参考: https://blog.csdn.net/zhangshihao11/article/details/130672867

在master节点部署cloudcore,在其余节点部署edgecore

请前往kubedge 查看兼容性,选择兼容的版本,下面的instruction基于k8s 1.18

  • 使用Keadm进行部署(all nodes)

    1
    2
    3
    4
    wget https://github.com/kubeedge/kubeedge/releases/download/v1.12.1/keadm-v1.12.1-linux-amd64.tar.gz
    tar -xvf keadm-v1.12.1-linux-amd64.tar.gz
    cd keadm-v1.12.1-linux-amd64/keadm
    mv ./keadm /usr/bin/
  • Master-cloudcore

    1
    keadm init --advertise-address=<your-master-ip> --kubeedge-version=1.12.1 
    • 获取token

      1
      keadm gettoken
  • Otherwise-edgecore

    1
    keadm join --cloudcore-ipport=<your-master-node-ip>:10000  --kubeedge-version=1.12.1 --token= <master-provide-token>

使用strimzi部署用到的指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# kubectl delete deploy kafka

# kubectl delete service kafka

kubectl delete kafka my-kafka-cluster

kubectl delete deploy zookeeper

kubectl delete deploy consumer1

kubectl delete deploy consumer2

kubectl delete deploy producer

kubectl delete pvc data-my-kafka-cluster-zookeeper-0
kubectl delete pvc data-my-kafka-cluster-zookeeper-1
kubectl delete pvc data-my-kafka-cluster-zookeeper-2

kubectl delete pvc data-my-kafka-cluster-kafka-0
kubectl delete pvc data-my-kafka-cluster-kafka-1
kubectl delete pvc data-my-kafka-cluster-kafka-2

kubectl delete storageclass local-storage

kubectl delete clusterrole strimzi-cluster-operator
kubectl delete clusterrolebinding strimzi-cluster-operator

kubectl delete pod -l name=strimzi-cluster-operator

kubectl delete pv local-pv
kubectl delete pv local-pv-zk-0
kubectl delete pv local-pv-zk-1
kubectl delete pv local-pv-zk-2

kubectl delete pv local-pv-kafka-0
kubectl delete pv local-pv-kafka-1
kubectl delete pv local-pv-kafka-2



kubectl apply -f strimzi-rbac.yaml

kubectl apply -f local-storageclass.yaml

kubectl apply -f zookeeper-localPV.yaml

kubectl apply -f zookeeper-localPVC.yaml

kubectl apply -f kafka-localPV.yaml
kubectl apply -f kafka-localPVC.yaml



kubectl apply -f kafka-deployment.yaml

kubectl apply -f consumer1-deployment.yaml

kubectl apply -f consumer2-deployment.yaml

kubectl apply -f producer-deployment.yaml

kubectl apply -f kafka-topic.yaml

# kubectl apply -f strimzi-rbac.yaml

# kubectl apply -f zookeeper-deployment.yaml // not



kubectl logs -f consumer1-7478f8c64b-5wjdf

kubectl logs -f consumer2-6f57488b9f-s747z

kubectl logs -f producer-7f5564966b-bxnqn





# keadm init --advertise-address="192.168.0.171"

#keadm join --cloudcore-ipport=192.168.0.171:10000 --token=d75c5060c439e4d4559ae9231b773b8d86cf009f25d35af04b896e9bb42675df.eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTY3MTA4ODB9.6KdE0qsuCebY_4JadXol1Bf7WQXqGbq2hkc13DAiJ5g

K8S-Kafka-Strimzi-Kubedge实验
https://shallowdream2.github.io/2024/05/20/实验八、容器编排与通信/
作者
Peng Chang
发布于
2024年5月20日
许可协议