Search This Blog

Monday, 3 August 2020

Using CNCF Sandbox Project Strimzi for Kafka Clusters on VMware Tanzu Kubernetes Grid Integrated Edition (TKGI)

Strimzi a CNCF sandbox project provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations. In this post we will take a look at how to get this running on VMware Tanzu Kubernetes Grid Integrated Edition (TKGI) and consume the Kafka cluster from a Springboot application.

If you have a K8s cluster that's all you need to follow along in this exampleI am using VMware Tanzu Kubernetes Grid Integrated Edition (TKGI) but you can use any K8s cluster you have such as GKE, AKS, EKS etc.

Steps

1. Installing Strimzi is pretty straight forward so we can do that as follows. I am using the namespace "kafka" which needs to be created prior to running this command.

kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

2. Verify that the operator was installed correctly and we have a running POD as shown below
  
$ kubectl get pods -n kafka
NAME                                                    READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-6c9d899778-4mdtg               1/1     Running   0          6d22h

3. Next let's ensure we have a default storage class for the cluster as shown below.

$ kubectl get storageclass
NAME             PROVISIONER                    AGE
fast (default)   kubernetes.io/vsphere-volume   47d

4. Now at this point we are ready to create a Kafka cluster. For this example we will create a 3 node cluster defined in YML as follows.

kafka-persistent-MULTI_NODE.yaml

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: apples-kafka-cluster
spec:
  kafka:
    version: 2.5.0
    replicas: 3
    listeners:
      external:
        type: loadbalancer
        tls: false
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.5"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Few things to note:
  • We have enable access to the cluster using the type LoadBalancer which means your K8s cluster needs to support such a Type
  • We need to create dynamic Persistence claim's in the cluster so ensure #3 above is in place
  • We have disabled TLS given this is a demo 
5. Create the Kafka cluster as shown below ensuring we target the namespace "kafka"

$ kubectl apply -f kafka-persistent-MULTI_NODE.yaml -n kafka

6. Now we can view the status/creation of our cluster one of two ways as shown below. You will need to wait a few minutes for everything to start up.

Option 1:
  
$ kubectl get Kafka -n kafka
NAME                   DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
apples-kafka-cluster   3                        3             1/1     Running   0          6d22h

Option 2:
  
$ kubectl get all -n kafka
NAME                                                        READY   STATUS    RESTARTS   AGE
pod/apples-kafka-cluster-entity-operator-58685b8fbd-r4wxc   3/3     Running   0          6d21h
pod/apples-kafka-cluster-kafka-0                            2/2     Running   0          6d21h
pod/apples-kafka-cluster-kafka-1                            2/2     Running   0          6d21h
pod/apples-kafka-cluster-kafka-2                            2/2     Running   0          6d21h
pod/apples-kafka-cluster-zookeeper-0                        1/1     Running   0          6d21h
pod/apples-kafka-cluster-zookeeper-1                        1/1     Running   0          6d21h
pod/apples-kafka-cluster-zookeeper-2                        1/1     Running   0          6d21h
pod/strimzi-cluster-operator-6c9d899778-4mdtg               1/1     Running   0          6d23h

NAME                                                    TYPE           CLUSTER-IP       EXTERNAL-IP     PORT(S)                      AGE
service/apples-kafka-cluster-kafka-0                    LoadBalancer   10.100.200.90    10.195.93.200   9094:30362/TCP               6d21h
service/apples-kafka-cluster-kafka-1                    LoadBalancer   10.100.200.179   10.195.93.197   9094:32022/TCP               6d21h
service/apples-kafka-cluster-kafka-2                    LoadBalancer   10.100.200.155   10.195.93.201   9094:32277/TCP               6d21h
service/apples-kafka-cluster-kafka-bootstrap            ClusterIP      10.100.200.77    <none>          9091/TCP,9092/TCP,9093/TCP   6d21h
service/apples-kafka-cluster-kafka-brokers              ClusterIP      None             <none>          9091/TCP,9092/TCP,9093/TCP   6d21h
service/apples-kafka-cluster-kafka-external-bootstrap   LoadBalancer   10.100.200.58    10.195.93.196   9094:30735/TCP               6d21h
service/apples-kafka-cluster-zookeeper-client           ClusterIP      10.100.200.22    <none>          2181/TCP                     6d21h
service/apples-kafka-cluster-zookeeper-nodes            ClusterIP      None             <none>          2181/TCP,2888/TCP,3888/TCP   6d21h

NAME                                                   READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/apples-kafka-cluster-entity-operator   1/1     1            1           6d21h
deployment.apps/strimzi-cluster-operator               1/1     1            1           6d23h

NAME                                                              DESIRED   CURRENT   READY   AGE
replicaset.apps/apples-kafka-cluster-entity-operator-58685b8fbd   1         1         1       6d21h
replicaset.apps/strimzi-cluster-operator-6c9d899778               1         1         1       6d23h

NAME                                              READY   AGE
statefulset.apps/apples-kafka-cluster-kafka       3/3     6d21h
statefulset.apps/apples-kafka-cluster-zookeeper   3/3     6d21h                     3             1/1     Running   0          6d22h

7. Our entry point into the cluster is a service of type LoadBalancer which we asked for as per our Kafka cluster YML config. To find the IP address we can run a command as follow using the cluster name from above.

$ kubectl get service -n kafka apples-kafka-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
10.195.93.196

Note: Make a not of this IP address as we will need it shortly

8. Let's create a Kafka Topic using YML as follows. In this YML we actually ensure we are using the namespace "kafka".  

create-kafka-topic.yaml

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: apples-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: apples-kafka-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824


9. Create a Kafka topic as shown below.

$ kubectl apply -f create-kafka-topic.yaml

10. We can view the Kafka topics as shown below.
  
$ kubectl get KafkaTopic -n kafka
NAME                                                          PARTITIONS   REPLICATION FACTOR
apples-topic                                                  1            1

11. Now at this point we ready to send some messages to our topic "apples-topic" as well as consume messages so to do that we are going to use a Springboot Application in fact two of them which exist on GitHub.


Download or clone those onto your file system. 

12.With both downloaded you will need to set the spring.kafka.bootstrap-servers with the IP address we retrieved from #7 above. That needs to be done in both GitHub downloaded/cloned repo's above. The file we need to edit for both repo's is as follows. 

File: src/main/resources/application.yml 

Example:

spring:
  kafka:
    bootstrap-servers: IP-ADDRESS:9094

Note: Make sure you do this for both downloaded repo application.yml files

13. Now let's run the producer and consumer Springboot application using a command as follows in seperate terminal windows. One will use PORT 8080 while the other uses port 8081.

$ ./mvnw spring-boot:run

Consumer:

papicella@papicella:~/pivotal/DemoProjects/spring-starter/pivotal/KAFKA/demo-kafka-producer$ ./mvnw spring-boot:run

...
2020-08-03 11:41:46.742  INFO 34025 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-03 11:41:46.754  INFO 34025 --- [           main] a.a.t.k.DemoKafkaProducerApplication     : Started DemoKafkaProducerApplication in 1.775 seconds (JVM running for 2.102)

Producer:

papicella@papicella:~/pivotal/DemoProjects/spring-starter/pivotal/KAFKA/demo-kafka-consumer$ ./mvnw spring-boot:run

...
2020-08-03 11:43:53.423  INFO 34056 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2020-08-03 11:43:53.440  INFO 34056 --- [           main] a.a.t.k.DemoKafkaConsumerApplication     : Started DemoKafkaConsumerApplication in 1.666 seconds (JVM running for 1.936)

14. Start by opening up the the Producer UI by navigating to http://localhost:8080/



15. Now let's not add any messages yet and also open up the Consumer UI by navigating to http://localhost:8081/



Note: This application will automatically refresh the page every 2 seconds to show which messages have been sent to the Kafka Topic

16. Return to the Producer UI http://localhost:8080/ and add two messages using whatever text you like as shown below.


17. Return to the Consumer UI http://localhost:8081/ to verify the two messages sent to the Kafka topic has been consumed



18. Both these Springboot applications are using "Spring for Apache Kafka


Both Springboot application use a application.yml to bootstrap access to the Kafka cluster

The Producer Springboot application is using a KafkaTemplate to send messages to our Kafka Topic as shown below.
  
@Controller
@Slf4j
public class TopicMessageController {

    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public TopicMessageController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    final private String topicName = "apples-topic";

    @GetMapping("/")
    public String indexPage (Model model){
        model.addAttribute("topicMessageAddSuccess", "N");
        return "home";
    }

    @PostMapping("/addentry")
    public String addNewTopicMessage (@RequestParam(value="message") String message, Model model){

        kafkaTemplate.send(topicName, message);

        log.info("Sent single message: " + message);
        model.addAttribute("message", message);
        model.addAttribute("topicMessageAddSuccess", "Y");

        return "home";
    }
}                                                

The Consumer Springboot application is configured with a KafkaListener as shown below
  
@Controller
@Slf4j
public class TopicConsumerController {

    private static ArrayList<String> topicMessages = new ArrayList<String>();

    @GetMapping("/")
    public String indexPage (Model model){
        model.addAttribute("topicMessages", topicMessages);
        model.addAttribute("topicMessagesCount", topicMessages.size());

        return "home";
    }

    @KafkaListener(topics = "apples-topic")
    public void listen(String message) {
        log.info("Received Message: " + message);
        topicMessages.add(message);
    }
}                                              

In this post we did not setup any client authentication against the cluster for the producer or consumer given this was just a demo.





More Information

Spring for Apache Kafka

CNCF Sanbox projects

Strimzi

No comments: