K8S builds Kafka:2.13-2.6.0 and Zookeeper:3.6.2 clusters

Build Kafka:2.13-2.6.0 and Zookeeper:3.6.2 clusters

1, Service version information:

  • Kafka: v2.13-2.6.0
  • Zookeeper: v3.6.2
  • Kubernetes: v1.18.4

2, Create Zookeeper image

Zookeeper uses the official image provided in docker hub. You can download it directly by using the following command:

docker pull zookeeper:3.6.2

Since the startup script used in the official image is not suitable for internal use in our company, the docker entrypoint SH script and Dockerfile have been modified.

1. Modify docker entrypoint SH script

Modified docker entrypoint The SH script is as follows (refer to the original script for details): https://github.com/31z4/zookeeper-docker/tree/2373492c6f8e74d3c1167726b19babe8ac7055dd/3.6.2):

#!/bin/bash

set -e

HOST=$(hostname -s)
DOMAIN=$(hostname -d)
CLIENT_PORT=2181
SERVER_PORT=2888
ELECTION_PORT=3888

function createConfig(){
    if [[ ! -f "$ZOO_CONF_DIR/${HOST}/zoo.cfg" ]]; then
        
        # Create a directory based on the passed in variables
        mkdir -p $ZOO_CONF_DIR/${HOST}
        mkdir -p $ZOO_DATA_DIR/${HOST}
        mkdir -p $ZOO_DATA_LOG_DIR/${HOST}
        
        # To zoo Write some necessary configuration items in CFG. These variables are defined in Dockerfile. If you need to modify them, you can define env in yaml file
        CONFIG="$ZOO_CONF_DIR/${HOST}/zoo.cfg"
        {
            echo "dataDir=$ZOO_DATA_DIR/${HOST}"
            echo "dataLogDir=$ZOO_DATA_LOG_DIR/${HOST}"

            echo "tickTime=$ZOO_TICK_TIME"
            echo "initLimit=$ZOO_INIT_LIMIT"
            echo "syncLimit=$ZOO_SYNC_LIMIT"

            echo "autopurge.snapRetainCount=$ZOO_AUTOPURGE_SNAPRETAINCOUNT"
            echo "autopurge.purgeInterval=$ZOO_AUTOPURGE_PURGEINTERVAL"
            echo "maxClientCnxns=$ZOO_MAX_CLIENT_CNXNS"
            echo "standaloneEnabled=$ZOO_STANDALONE_ENABLED"
            echo "admin.enableServer=$ZOO_ADMINSERVER_ENABLED"
        } >> ${CONFIG}
        
        if [[ -n $ZOO_4LW_COMMANDS_WHITELIST ]]; then
            echo "4lw.commands.whitelist=$ZOO_4LW_COMMANDS_WHITELIST" >> ${CONFIG}
        fi

		# If you need to add other configuration items, you can set zoom in the env configuration of yaml file_ CFG_ Extra variable, in which additional configuration items are written
		# It should be noted that when adding additional configuration items, value must use the name recognized by zookeeper, because there is no format conversion below
        for cfg_extra_entry in $ZOO_CFG_EXTRA; do
            echo "$cfg_extra_entry" >> ${CONFIG}
        done
    fi
}

# Since sts is a Pod named in the format of "service name number", the following is used to obtain the numeric number in the host name and the name of the service
function getHostNum(){
    if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then
        NAME=${BASH_REMATCH[1]}
        ORD=${BASH_REMATCH[2]}
    else
        echo "Fialed to parse name and ordinal of Pod"
        exit 1
    fi
}

# Create the myid of the Zookeeper cluster to ensure that the generated myid is unique and incremental
function createID(){
    ID_FILE="$ZOO_DATA_DIR/${HOST}/myid"
    MY_ID=$((ORD+1))
    echo $MY_ID > $ID_FILE
}

# Write the information of each node to the configuration file so that the cluster can take effect. It should be noted that the server variable must be passed into the container, and the value of this variable must be consistent with the number of copies
# Therefore, when you want to expand a node in the future, you only need to change the number of copies and the values of the SERVERS variable
function addServer(){
    for (( i=1; i<=$SERVERS; i++ ))
    do
        s="server.$i=$NAME-$((i-1)).$DOMAIN:$SERVER_PORT:$ELECTION_PORT;$CLIENT_PORT"
        [[ $(grep "$s" $ZOO_CONF_DIR/${HOST}/zoo.cfg) ]] || echo $s >> $ZOO_CONF_DIR/${HOST}/zoo.cfg
    done
}

# Authorize the working directory and data directory, and allow to start with -- user zookeeper
function userPerm(){
    if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; then
        chown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" "$ZOO_CONF_DIR"
        exec gosu zookeeper "$0" "$@"
    fi
}

# Start Zookeeper. Since the path of the configuration file has been changed, you must use the -- config option here
# The default configuration file directory is zoom_ CONF_ Dir = / conf, which has been defined in Dockerfile, so -- config can be removed if the default path is not changed
function startZK(){
    /apache-zookeeper-3.6.2-bin/bin/zkServer.sh --config "$ZOO_CONF_DIR/$(hostname -s)" start-foreground
}

createConfig
getHostNum
createID
addServer
userPerm
startZK

2. Modify Dockerfile

I have made little changes to Dockerfile here. I just commented out the original entry point configuration item and changed the CMD configuration item to docker entry point SH start:

FROM openjdk:11-jre-slim

ENV ZOO_CONF_DIR=/conf \
    ZOO_DATA_DIR=/data \
    ZOO_DATA_LOG_DIR=/datalog \
    ZOO_LOG_DIR=/logs \
    ZOO_TICK_TIME=2000 \
    ZOO_INIT_LIMIT=5 \
    ZOO_SYNC_LIMIT=2 \
    ZOO_AUTOPURGE_PURGEINTERVAL=0 \
    ZOO_AUTOPURGE_SNAPRETAINCOUNT=3 \
    ZOO_MAX_CLIENT_CNXNS=60 \
    ZOO_STANDALONE_ENABLED=true \
    ZOO_ADMINSERVER_ENABLED=true

# Add a user with an explicit UID/GID and create necessary directories
RUN set -eux; \
    groupadd -r zookeeper --gid=1000; \
    useradd -r -g zookeeper --uid=1000 zookeeper; \
    mkdir -p "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"; \
    chown zookeeper:zookeeper "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"

# Install required packges
RUN set -eux; \
    apt-get update; \
    DEBIAN_FRONTEND=noninteractive \
    apt-get install -y --no-install-recommends \
        ca-certificates \
        dirmngr \
        gosu \
        gnupg \
        netcat \
        wget; \
    rm -rf /var/lib/apt/lists/*; \
# Verify that gosu binary works
    gosu nobody true

ARG GPG_KEY=BBE7232D7991050B54C8EA0ADC08637CA615D22C
ARG SHORT_DISTRO_NAME=zookeeper-3.6.2
ARG DISTRO_NAME=apache-zookeeper-3.6.2-bin

# Download Apache Zookeeper, verify its PGP signature, untar and clean up
RUN set -eux; \
    ddist() { \
        local f="$1"; shift; \
        local distFile="$1"; shift; \
        local success=; \
        local distUrl=; \
        for distUrl in \
            'https://www.apache.org/dyn/closer.cgi?action=download&filename=' \
            https://www-us.apache.org/dist/ \
            https://www.apache.org/dist/ \
            https://archive.apache.org/dist/ \
        ; do \
            if wget -q -O "$f" "$distUrl$distFile" && [ -s "$f" ]; then \
                success=1; \
                break; \
            fi; \
        done; \
        [ -n "$success" ]; \
    }; \
    ddist "$DISTRO_NAME.tar.gz" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz"; \
    ddist "$DISTRO_NAME.tar.gz.asc" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz.asc"; \
    export GNUPGHOME="$(mktemp -d)"; \
    gpg --keyserver ha.pool.sks-keyservers.net --recv-key "$GPG_KEY" || \
    gpg --keyserver pgp.mit.edu --recv-keys "$GPG_KEY" || \
    gpg --keyserver keyserver.pgp.com --recv-keys "$GPG_KEY"; \
    gpg --batch --verify "$DISTRO_NAME.tar.gz.asc" "$DISTRO_NAME.tar.gz"; \
    tar -zxf "$DISTRO_NAME.tar.gz"; \
    mv "$DISTRO_NAME/conf/"* "$ZOO_CONF_DIR"; \
    rm -rf "$GNUPGHOME" "$DISTRO_NAME.tar.gz" "$DISTRO_NAME.tar.gz.asc"; \
    chown -R zookeeper:zookeeper "/$DISTRO_NAME"

WORKDIR $DISTRO_NAME
VOLUME ["$ZOO_DATA_DIR", "$ZOO_DATA_LOG_DIR", "$ZOO_LOG_DIR"]

EXPOSE 2181 2888 3888 8080

ENV PATH=$PATH:/$DISTRO_NAME/bin \
    ZOOCFGDIR=$ZOO_CONF_DIR

COPY docker-entrypoint.sh /

# Annotate the ENTRYPOINT content
# ENTRYPOINT ["/docker-entrypoint.sh"]

# Annotate the original CMD and add the following configuration
# CMD ["zkServer.sh", "start-foreground"]
CMD ["/docker-entrypoint.sh"]

3. Package image and upload private server

In the root directory of Dockerfile, use the following command to package the image and modify the tag

docker build --tag 10.16.12.204/ops/zookeeper:custom-v3.6.2 -f Dockerfile .

Upload to image warehouse:

docker push 10.16.12.204/ops/zookeeper:custom-v3.6.2

3, Make Kafka image

Making Kafka image is based on the image made by wurstmeister in docker hub. The original image file can be downloaded using the following command:

docker pull wurstmeister/kafka:2.13-2.6.0

Start Kafka. Is used in this image SH script to initialize the configuration of Kafka and start it, but some of its contents do not meet the requirements of deployment in K8S, so modify the script.

1. Modify start Kafka SH script

Original start Kafka SH script content can be found https://github.com/wurstmeister/kafka-docker View in. The revised content is as follows:

#!/bin/bash -e

# Allow specific kafka versions to perform any unique bootstrap operations
OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh"
if [[ -x "$OVERRIDE_FILE" ]]; then
    echo "Executing override file $OVERRIDE_FILE"
    eval "$OVERRIDE_FILE"
fi

# Store original IFS config, so we can restore it at various stages
ORIG_IFS=$IFS

# Set the hookeeper connection address. If this variable is not specified, an error will be reported
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
    echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT"
    exit 1
fi

# Set the port of kafka. If no port is specified, the default port will be used
if [[ -z "$KAFKA_PORT" ]]; then
    export KAFKA_PORT=9092
fi

# After kafka is started, the topic is automatically created. If kafka is not specified_ CREATE_ Topics does not automatically create topics
create-topics.sh &
unset KAFKA_CREATE_TOPICS

# If Kafka is not specified directly_ BROKER_ ID, through broker_ ID_ The command contained in the command variable automatically generates the broker id, which ensures that the broker id is unique and incremental
if [[ -z "$KAFKA_BROKER_ID" ]]; then
    if [[ -n "$BROKER_ID_COMMAND" ]]; then
        KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND")
        export KAFKA_BROKER_ID
    else
        export KAFKA_BROKER_ID=-1
    fi
fi

# If the kafka log directory is not specified, the default address will be used, and the default directory name will have the current host name
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
    export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fi

# If Kafka is specified_ HEAP_ Opts configuration and write it to Kafka server start In SH script
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
    sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh"
    unset KAFKA_HEAP_OPTS
fi

# The function here is to assign this command to hostname if you want the container to be the host name according to the returned result of executing the specified command after startup_ COMMAND
# Then use eval to execute the command in the variable to get the result, and then assign it to HOSTNAME_VALUE variable
if [[ -n "$HOSTNAME_COMMAND" ]]; then
    HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND")

    # Replace any occurences of _{HOSTNAME_COMMAND} with the value
    IFS=$'\n'
    for VAR in $(env); do
        if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then
            eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"
        fi
    done
    IFS=$ORIG_IFS
fi

# The function here is to assign this command to port if you want the container to take the return result of executing the specified command as the port number after startup_ COMMAND
# Then use eval to execute the command in the variable to obtain the result, and then assign it to PORT_VALUE variable
if [[ -n "$PORT_COMMAND" ]]; then
    PORT_VALUE=$(eval "$PORT_COMMAND")

    # Replace any occurences of _{PORT_COMMAND} with the value
    IFS=$'\n'
    for VAR in $(env); do
        if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then
	    eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"
        fi
    done
    IFS=$ORIG_IFS
fi

if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
    KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND")
    export KAFKA_BROKER_RACK
fi

# Here is to check whether Kafka is set_ Listeners variable, whose value is generally set to PLAINTEXT://:9092
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
    if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
        echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS"
        exit 1
    elif [[ -z "$HOSTNAME_VALUE" ]]; then
        echo "ERROR: No listener or advertised hostname configuration provided in environment."
        echo "       Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME"
        exit 1
    fi

    # Maintain existing behaviour
    # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined.
    export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
fi

#Issue newline to config file in case there is not one already
echo "" >> "$KAFKA_HOME/config/server.properties"

(
    function updateConfig() {
        key=$1
        value=$2
        file=$3

        # Omit $value here, in case there is sensitive information
        echo "[Configuring] '$key' in '$file'"

        # If config exists in file, replace it. Otherwise, append to file.
        if grep -E -q "^#?$key=" "$file"; then
            sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char
        else
            echo "$key=$value" >> "$file"
        fi
    }

    # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|'

# Define the initialization configurations to be excluded. These configurations already exist in the configuration file, so they do not need to be changed or added
EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|"


    IFS=$'\n'
    for VAR in $(env)
    do
        env_var=$(echo "$VAR" | cut -d= -f1)
        if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then
            echo "Excluding $env_var from broker config"
            continue
        fi

        if [[ $env_var =~ ^KAFKA_ ]]; then
            kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
            updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties"
        fi

        if [[ $env_var =~ ^LOG4J_ ]]; then
            log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .)
            updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties"
        fi
    done

	# The main reason is to add the configuration here and splice bootstrap according to the value of SERVERS_ SERVERS and update the configuration to the configuration file
    PODNAME=$(hostname -s | awk -F'-' 'OFS="-"{$NF="";print}' |sed 's/-$//g')
    for ((i=0;i<$SERVERS;i++))
    do
        BOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT},"
    done
    BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?}
    echo ${BOOTSTRAP_SERVERS} > /opt/log.txt
    sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.properties
    sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties
)

# If there are other initialization configuration scripts defined, execute
if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
  eval "$CUSTOM_INIT_SCRIPT"
fi

exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"

2. Modify Dockerfile

Dockerfile has not been modified, but the modified start Kafka Add the SH script to the image and use the bash environment to execute the script (otherwise some commands cannot be executed):

FROM wurstmeister/kafka:2.13-2.6.0

ADD start-kafka.sh /

CMD ["bash","start-kafka.sh"]

3. Package image and upload private server

Repackage the image and modify the tag using the following command:

docker build --tag 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 -f Dockerfile .

Upload the image to the image warehouse:

docker push 10.16.12.204/ops/kafka:custom-v2.13-2.6.0

4, Create namespace

The entire Kafka and Zookeeper clusters must be in the same namespace, so use the following yaml file to create the NS Kafka namespace:

---
apiVersion: v1
kind: Namespace
metadata:
  name: ns-kafka
  labels:
    name: ns-kafka

5, Create Secret

Kubelet needs to verify to pull images from the image warehouse, so create a Secret to verify the Harbor warehouse:

kubectl create secret docker-registry harbor-secret --namespace=ns-kafka --docker-server=http://10.16.12.204 --docker-username=admin --docker-password=Harbor12345

6, Create PV and PVC

In the process of building clusters this time, it is planned to use the same PV for Kafka cluster and Zookeeper cluster. When defining the Pod initialization script, you can see that the data directory and log directory in Kafka and Zookeeper are all under the directory named after their own host name, so even if the same PV is used, the directory can be distinguished. Create yaml file for PV and PVC, the content is as follows:

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-data-pv
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 500Gi
  local:
    path: /opt/ops_ceph_data/kafka_data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kafka-cluster
          operator: In
          values:
          - "true"
  persistentVolumeReclaimPolicy: Retain
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: kafka-data-pvc
  namespace: ns-kafka
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 500Gi

It should be noted that the storage I currently use is cephfs, which is attached to / opt / OPS of each node of K8S_ ceph_ Data directory, so the storage type used when creating PV is local.

7, Create Labels

Since the storage type specified when creating PV above is local, this PV can only be scheduled in nodes that meet the specified label, so add a label for all nodes in the cluster:

for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done

8, Create Zookeeper cluster

1. Create Service

Create a Service for Zookeeper to communicate with other nodes. The contents of yaml file are as follows:

---
apiVersion: v1
kind: Service
metadata:
  name: zk-inner-service
  namespace: ns-kafka
  labels:
    app: zk
spec:
  selector:
    app: zk
  clusterIP: None
  ports:
  - name: server
    port: 2888
  - name: leader-election
    port: 3888
---
apiVersion: v1
kind: Service
metadata:
  name: zk-client-service
  namespace: ns-kafka
  labels:
    app: zk
spec:
  selector:
    app: zk
  type: NodePort
  ports:
  - name: client
    port: 2181
    nodePort: 31811

2. Create StatefulSet

Zookeeper is a stateful service, so it should be deployed using StatefulSet. The contents of yaml file are as follows:

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
  namespace: ns-kafka
spec:
  selector:
    matchLabels:
      app: zk
  serviceName: "zk-inner-service"
  replicas: 3
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk
    spec:
      containers:
      - name: zk
        imagePullPolicy: Always
        image: 10.16.12.204/ops/zookeeper:custom-v3.6.2
        resources:
          requests:
            memory: "500Mi"
            cpu: "0.5"
        ports:
          - containerPort: 2181
            name: client
          - containerPort: 2888
            name: server
          - containerPort: 3888
            name: leader-election
        env:
          - name: SERVERS              # Set the SERVERS variable, which must be consistent with the number of copies
            value: "3"
          - name: ZOO_CONF_DIR         # Set the directory of the configuration file
            value: /opt/conf
          - name: ZOO_DATA_DIR         # Set the directory of the data file
            value: /opt/data
          - name: ZOO_DATA_LOG_DIR     # Set the directory of the data log file
            value: /opt/data_log
        volumeMounts:                  # Set the directory where data needs to be persisted
        - name: zookeeper-data
          mountPath: /opt/data
          subPath: zookeeper-cluster-data/data
        - name: zookeeper-data
          mountPath: /opt/data_log
          subPath: zookeeper-cluster-data/data_log
        - name: data-conf
          mountPath: /etc/localtime
      imagePullSecrets:
      - name: harbor-secret
      volumes:
      - name: zookeeper-data
        persistentVolumeClaim:
          claimName: kafka-data-pvc
      - name: data-conf
        hostPath:
          path: /usr/share/zoneinfo/Asia/Shanghai

3. Verify cluster status

After the cluster is built, check the current status of each node of zookeeper, and use the following command:

[@k8s-master1 /]# for i in  0 1 2; do kubectl exec -it zk-$i  -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-0/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-1/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-2/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

You can see that there is one leader and two follower s in the current cluster. Next, verify the message synchronization of each node of the cluster. First, create a message on zk-0 node:

[@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] create /testMessage Hello
Created /testMessage

View this message on the other two nodes:

[@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get /testMessage
Hello

[@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get /testMessage
Hello

You can see the message normally, which means that the cluster is running normally.

9, Create Kafka cluster

1. Create Service

Create a Service for Kafka communication. The contents of yaml file are as follows:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: ns-kafka
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: server
  clusterIP: None
  selector:
    app: kafka

2. Create StatefulSet

Kafka is a stateful service, so it should be deployed using StatefulSet. The contents of yaml file are as follows:

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: ns-kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: "kafka-service"
  replicas: 3
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: kafka
    spec:
      imagePullSecrets:
      - name: harbor-secret
      containers:
      - name: kafka
        imagePullPolicy: Always
        image: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0
        resources:
          requests:
            memory: "500Mi"
            cpu: "0.5"
        env:
          - name: SERVERS                      # Make sure that the value set by SERVERS is consistent with the number of copies
            value: "3"
          - name: KAFKA_LISTENERS
            value: "PLAINTEXT://:9092"
          - name: KAFKA_ZOOKEEPER_CONNECT      # Set Zookeeper connection address
            value: "zk-inner-service.ns-kafka.svc.cluster.local:2181"
          - name: KAFKA_PORT
            value: "9092"
          - name: KAFKA_MESSAGE_MAX_BYTES
            value: "20000000"
          - name: BROKER_ID_COMMAND            # This variable is used to generate a broker id inside the container
            value: "hostname | awk -F'-' '{print $NF}'"
        volumeMounts:
          - name: kafka-log                    # You only need to persist kafka's log directory
            mountPath: /kafka
            subPath: kafka-cluster-log
          - name: data-conf
            mountPath: /etc/localtime
      volumes:
      - name: kafka-log
        persistentVolumeClaim:
          claimName: kafka-data-pvc
      - name: data-conf
        hostPath:
          path: /usr/share/zoneinfo/Asia/Shanghai

3. Verify cluster status

3.1 view the broker in Zookeeper
[@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
Connecting to localhost:2181

[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]

[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"}

[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"}

[zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"}

You can see that all three broker s have been registered in zookeeper.

3.2 create Topic

Create a topic named Message in kafka-0 node, with 3 partitions and 3 copies:

[@k8s-master1 ~]# kubectl exec -it kafka-0 -n ns-kafka -- /bin/bash
bash-4.4# kafka-topics.sh --create --topic Message --zookeeper zk-inner-service.ns-kafka.svc.cluster.local:2181 --partitions 3 --replication-factor 3
Created topic Message.

Check whether this Topic exists in zk-1 node:

[@k8s-master1 ~]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[Message]

This Topic already exists in Zookeeper.

3.3 simulation of producers and consumers

First, simulate the producer to write a Message to the Message on kafka-1:

[@k8s-master1 ~]# kubectl exec -it kafka-1 -n ns-kafka -- /bin/bash
bash-4.4# kafka-console-producer.sh --topic Message --broker-list kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092
>This is a test message
>Welcome to Kafka

Then simulate consumers' consumption of this information in kafka-2:

[@k8s-master1 ~]# kubectl exec -it kafka-2 -n ns-kafka -- /bin/bash
bash-4.4# kafka-console-consumer.sh --topic Message --bootstrap-server kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 --from-beginning

This is a test message
Welcome to Kafka

Production messages and consumption messages can be sent normally, which means that Kafka cluster is running normally.

10, FAQ

1. How to specify the Topic to be created in yaml file

Specify the following env in yaml file to automatically create Topic when Pod starts:

env:
  - name: KAFKA_CREATE_TOPICS
    value: "Topic1:1:3,Topic2:1:1:compact"   

The above content represents that Topic1 will have 1 partition and 3 copies, Topic2 will have 1 partition and 1 copy, and the cleanup of the copy Policy is set to compact.

Kafka must be set for automatic Topic creation_ CREATE_ Topics variable, which is then created by_ Topic. The SH script (existing in the image) is automatically created according to the variable content.

2. The compaction set for Topic does not take effect

Available at: https://github.com/wurstmeister/kafka-docker/wiki#topic-compaction-does-not-work

Tags: Docker kafka Kubernetes Zookeeper

Posted by miles_rich on Mon, 09 May 2022 02:15:25 +0300