K8S-Kafka-Strimzi-Kubedge实验
本文最后更新于 2024年5月28日 凌晨
实验八、容器编排与通信
实验目标
使用docker cri做运行时容器,k8s做弹性容器管理系统,在此之上用kafka做中间件,实现一个简单的Producer-Consumer机制的demo。
最后利用kubedge框架将代理节点(broker节点)变为云端,剩下3个节点变成边缘节点
实验过程
按照顺序,在华为云申请4个ECS节点,使用kubeadm部署k8s节点。先建立自己的docker image用于部署kafka application,最后部署kafka中间件、kubedge框架。
k8s部署
申请节点
在华为云上申请4个ecs服务器,并带上弹性公网ip
-
系统:CentOS 7
-
规格:2vCPUs | 4GiB | s7.large.2
-
安全组设置:k8s
已保存在云端,可以下载导入:k8s-security
kubeadm搭建k8s集群
参考:https://www.cnblogs.com/xuweiweiwoaini/p/13884112.html
共同步骤:该部分操作需要在所有节点上执行
-
关闭防火墙, 禁止防火墙开机自启
1
2systemctl stop firewalld
systemctl disable firewalld -
永久关闭swap分区
1
2sed -ri 's/.*swap.*/#&/' /etc/fstab
reboot这里会重启主机,等待即可
-
设置主机名
注意这里要设置成ecs服务器的内网ip
1
2
3
4
5
6cat >> /etc/hosts << EOF
your-master-node-ip k8s-master
your-node1-ip k8s-01
your-node2-ip k8s-02
your-node3-ip k8s-03
EOF -
桥接的IPv4流量传递到iptables的链
1
2
3
4
5
6
7
8
9cat > /etc/sysctl.d/k8s.conf << EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
net.ipv4.ip_forward = 1
vm.swappiness = 0
EOF
modprobe br_netfilter
sysctl --system -
时间同步
1
2yum install ntpdate -y
ntpdate time.windows.com -
开启ipvs
1
2
3
4
5
6
7
8
9
10
11
12yum -y install ipset ipvsadm
cat > /etc/sysconfig/modules/ipvs.modules <<EOF
#!/bin/bash
modprobe -- ip_vs
modprobe -- ip_vs_rr
modprobe -- ip_vs_wrr
modprobe -- ip_vs_sh
modprobe -- nf_conntrack_ipv4
EOF
chmod 755 /etc/sysconfig/modules/ipvs.modules && bash /etc/sysconfig/modules/ipvs.modules && lsmod | grep -e ip_vs -e nf_conntrack_ipv4 -
安装Docker
1
2
3
4
5wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
yum -y install docker-ce-18.06.3.ce-3.el7
systemctl enable docker && systemctl start docker
docker version -
设置Docker镜像加速器
1
2
3
4
5
6
7
8
9
10
11sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"exec-opts": ["native.cgroupdriver=systemd"],
"registry-mirrors": ["https://b9pmyelo.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker -
添加阿里云的YUM软件源
1
2
3
4
5
6
7
8
9cat > /etc/yum.repos.d/kubernetes.repo << EOF
[kubernetes]
name=Kubernetes
baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF -
安装kubeadm、kubelet和kubectl
这里指定版本号为 k8s 1.18,可以自行安装更高版本的
1
yum install -y kubelet-1.18.0 kubeadm-1.18.0 kubectl-1.18.0
- 修改
/etc/sysconfig/kubelet
1
2
3
4vim /etc/sysconfig/kubelet
# 修改
KUBELET_EXTRA_ARGS="--cgroup-driver=systemd"-
设置开机自启动
1
systemctl enable kubelet
- 修改
单独部署
-
Master节点部署
1
2
3
4
5
6
7
8
9
10
11kubeadm init \
--apiserver-advertise-address=192.168.217.100 \
--image-repository registry.aliyuncs.com/google_containers \
--kubernetes-version v1.18.0 \
--service-cidr=10.96.0.0/12 \
--pod-network-cidr=10.244.0.0/16
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
-
Master节点获取加入命令
1
kubeadm token create --ttl 0 --print-join-command
-
其他节点加入集群
复制 加入命令,shell执行即可
-
Master节点使用kubectl工具查看节点状态
1
kubectl get nodes
-
Master节点部署CNI网络插件
1
2
3
4wget https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
创建docker image
-
如果你没有docker账号,请先注册docker账号,并创建一个repo
-
或者使用我准备好的docker image,那么你可以跳过整个2.2
1
shallowdream2/mykafka:latest
-
为了模拟kafka中的生产者-消费者模型,我们需要4个文件,分别是producer.py , consumer1.py, consumer2.py, Dockerfile
-
Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13# 使用官方的Python基础镜像
FROM python:3.9
# 安装Kafka Python客户端
RUN pip install kafka-python
# 将生产者和消费者脚本复制到容器中
COPY producer.py /app/producer.py
COPY consumer1.py /app/consumer1.py
COPY consumer2.py /app/consumer2.py
WORKDIR /app -
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from kafka import KafkaProducer
import time
import json
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 生成消息并发送到Kafka
while True:
message1 = {'message1': 'Hello Kafka, send to 1', 'timestamp': time.time()}
message2 = {'message2': 'Hello Kafka, send to 2', 'timestamp': time.time()}
producer.send('test-topic', value=message1)
producer.send('test-topic-2', value=message2)
print(f'Sent: {message1}')
print(f'Sent: {message2}')
time.sleep(5) -
1
2
3
4
5
6
7
8
9
10
11
12from kafka import KafkaConsumer
import json
# 创建Kafka消费者
consumer = KafkaConsumer('test-topic',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# 处理消息
for message in consumer:
print(f'Consumer1 received: {message.value}') -
1
2
3
4
5
6
7
8
9
10
11
12from kafka import KafkaConsumer
import json
# 创建Kafka消费者
consumer = KafkaConsumer('test-topic-2',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# 处理消息
for message in consumer:
print(f'Consumer2 received: {message.value}') -
创建docker镜像并push到docker hub
1
2
3
4# 假设上述文件在文件夹mykafka下
cd mykafka
docker build -t <your-docker-name/your-repo:your-tag>
docker push <your-docker-name/your-repo:your-tag>
部署 kafka
以下配置只需要在master节点执行,默认在~目录下。我只给出kafka-deployment.yaml的详情作为示例,其余见仓库
配置文件
详见:https://github.com/shallowdream2/k8s-kafka-experiment
-
创建kafka-deployment.yaml
注意
metadata
中的name
字段不可改为kafka
,会导致宏定义冲突使得port被错误设置为非INT参考issue: https://github.com/wurstmeister/kafka-docker/issues/122
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-cluster
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
tolerations:
- key: "node-role.kubernetes.io/master"
operator: "Exists"
effect: "NoSchedule"
nodeSelector:
kafka-broker: "true"
containers:
- name: kafka
image: wurstmeister/kafka:latest
ports:
- containerPort: 9092
env:
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka:9092
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_PORT
value: "9092"
---
apiVersion: v1
kind: Service
metadata:
name: kafka
labels:
app: kafka
spec:
ports:
- port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka
type: ClusterIP -
创建zookeeper-deployment.yaml
-
创建kafka-topic.yaml
-
创建producer-deployment.yaml
-
创建consumer1-deployment.yaml
-
创建consumer2-deployment.yaml
将上述文件上传到master节点,执行以下命令:
- 查看nodes是否都是Ready状态
1 |
|
-
部署deployment && service && job
1
2
3
4
5
6
7
8
9
10
11kubectl apply -f kafka-deployment.yaml
kubectl apply -f zookeeper-deployment.yaml
kubectl apply -f consumer1-deployment.yaml
kubectl apply -f consumer2-deployment.yaml
kubectl apply -f producer-deployment.yaml
kubectl apply -f kafka-topic.yaml -
查看运行状态
等待一会儿,如果都是Running,则运行成功!
1
kubectl get pods
-
打印节点的输出
pod-name可以在
kubectl get pods
中查看,-f
参数意为一直跟踪1
kubectl logs -f <pod-name>
部署Kubedge
参考: https://blog.csdn.net/zhangshihao11/article/details/130672867
在master节点部署cloudcore,在其余节点部署edgecore
请前往kubedge 查看兼容性,选择兼容的版本,下面的instruction基于k8s 1.18
-
使用Keadm进行部署(all nodes)
1
2
3
4wget https://github.com/kubeedge/kubeedge/releases/download/v1.12.1/keadm-v1.12.1-linux-amd64.tar.gz
tar -xvf keadm-v1.12.1-linux-amd64.tar.gz
cd keadm-v1.12.1-linux-amd64/keadm
mv ./keadm /usr/bin/ -
Master-cloudcore
1
keadm init --advertise-address=<your-master-ip> --kubeedge-version=1.12.1
-
获取token
1
keadm gettoken
-
-
Otherwise-edgecore
1
keadm join --cloudcore-ipport=<your-master-node-ip>:10000 --kubeedge-version=1.12.1 --token= <master-provide-token>
使用strimzi部署用到的指令
1 |
|