Strimzi a
CNCF sandbox project
provides a way to run an Apache Kafka cluster on Kubernetes in various
deployment configurations. In this post we will take a look at how to get this
running on VMware Tanzu Kubernetes Grid Integrated Edition (TKGI) and
consume the Kafka cluster from a Springboot application.
If you have a K8s cluster that's all you need to follow along in this exampleI
am using VMware Tanzu Kubernetes Grid Integrated Edition (TKGI) but you can
use any K8s cluster you have such as GKE, AKS, EKS etc.
1. Installing Strimzi is pretty straight forward so we can do that as follows.
I am using the namespace "kafka" which needs to be created prior to
running this command.
kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n
kafka
2. Verify that the operator was installed correctly and we have a running POD
as shown below
$ kubectl get pods -n kafka NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-6c9d899778-4mdtg 1/1 Running 0 6d22h
3. Next let's ensure we have a default storage class for the cluster as shown
below.
$ kubectl get storageclass
NAME PROVISIONER
AGE
fast (default) kubernetes.io/vsphere-volume
47d
4. Now at this point we are ready to create a Kafka cluster. For this example
we will create a 3 node cluster defined in YML as follows.
kafka-persistent-MULTI_NODE.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: apples-kafka-cluster
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
external:
type: loadbalancer
tls: false
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.5"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
Few things to note:
- We have enable access to the cluster using the type LoadBalancer which means your K8s cluster needs to support such a Type
- We need to create dynamic Persistence claim's in the cluster so ensure #3 above is in place
- We have disabled TLS given this is a demo
5. Create the Kafka cluster as shown below ensuring we target the namespace
"kafka"
$ kubectl apply -f kafka-persistent-MULTI_NODE.yaml -n
kafka
6. Now we can view the status/creation of our cluster one of two ways as shown
below. You will need to wait a few minutes for everything to start up.
Option 1:
$ kubectl get Kafka -n kafka NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS apples-kafka-cluster 3 3 1/1 Running 0 6d22h
Option 2:
$ kubectl get all -n kafka NAME READY STATUS RESTARTS AGE pod/apples-kafka-cluster-entity-operator-58685b8fbd-r4wxc 3/3 Running 0 6d21h pod/apples-kafka-cluster-kafka-0 2/2 Running 0 6d21h pod/apples-kafka-cluster-kafka-1 2/2 Running 0 6d21h pod/apples-kafka-cluster-kafka-2 2/2 Running 0 6d21h pod/apples-kafka-cluster-zookeeper-0 1/1 Running 0 6d21h pod/apples-kafka-cluster-zookeeper-1 1/1 Running 0 6d21h pod/apples-kafka-cluster-zookeeper-2 1/1 Running 0 6d21h pod/strimzi-cluster-operator-6c9d899778-4mdtg 1/1 Running 0 6d23h NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/apples-kafka-cluster-kafka-0 LoadBalancer 10.100.200.90 10.195.93.200 9094:30362/TCP 6d21h service/apples-kafka-cluster-kafka-1 LoadBalancer 10.100.200.179 10.195.93.197 9094:32022/TCP 6d21h service/apples-kafka-cluster-kafka-2 LoadBalancer 10.100.200.155 10.195.93.201 9094:32277/TCP 6d21h service/apples-kafka-cluster-kafka-bootstrap ClusterIP 10.100.200.77 <none> 9091/TCP,9092/TCP,9093/TCP 6d21h service/apples-kafka-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 6d21h service/apples-kafka-cluster-kafka-external-bootstrap LoadBalancer 10.100.200.58 10.195.93.196 9094:30735/TCP 6d21h service/apples-kafka-cluster-zookeeper-client ClusterIP 10.100.200.22 <none> 2181/TCP 6d21h service/apples-kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 6d21h NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/apples-kafka-cluster-entity-operator 1/1 1 1 6d21h deployment.apps/strimzi-cluster-operator 1/1 1 1 6d23h NAME DESIRED CURRENT READY AGE replicaset.apps/apples-kafka-cluster-entity-operator-58685b8fbd 1 1 1 6d21h replicaset.apps/strimzi-cluster-operator-6c9d899778 1 1 1 6d23h NAME READY AGE statefulset.apps/apples-kafka-cluster-kafka 3/3 6d21h statefulset.apps/apples-kafka-cluster-zookeeper 3/3 6d21h 3 1/1 Running 0 6d22h
7. Our entry point into the cluster is a service of type LoadBalancer which we
asked for as per our Kafka cluster YML config. To find the IP address we can
run a command as follow using the cluster name from above.
$ kubectl get service -n kafka
apples-kafka-cluster-kafka-external-bootstrap
-o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
10.195.93.196
Note: Make a not of this IP address as we will need it shortly
8. Let's create a Kafka Topic using YML as follows. In this YML we actually
ensure we are using the namespace "kafka".
create-kafka-topic.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: apples-topic
namespace: kafka
labels:
strimzi.io/cluster: apples-kafka-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
9. Create a Kafka topic as shown below.
$ kubectl apply -f create-kafka-topic.yaml
10. We can view the Kafka topics as shown below.
$ kubectl get KafkaTopic -n kafka NAME PARTITIONS REPLICATION FACTOR apples-topic 1 1
11. Now at this point we ready to send some messages to our topic
"apples-topic" as well as consume messages so to do that we are going to use a
Springboot Application in fact two of them which exist on GitHub.
Download or clone those onto your file system.
12.With both downloaded you will need to set the
spring.kafka.bootstrap-servers with the IP address we retrieved from #7
above. That needs to be done in both GitHub downloaded/cloned repo's above.
The file we need to edit for both repo's is as follows.
File: src/main/resources/application.yml
Example:
spring:
kafka:
bootstrap-servers: IP-ADDRESS:9094
Note: Make sure you do this for both downloaded repo
application.yml files
13. Now let's run the producer and consumer Springboot application using a
command as follows in seperate terminal windows. One will use PORT 8080 while
the other uses port 8081.
$ ./mvnw spring-boot:run
Consumer:
papicella@papicella:~/pivotal/DemoProjects/spring-starter/pivotal/KAFKA/demo-kafka-producer$
./mvnw spring-boot:run
...
2020-08-03 11:41:46.742 INFO 34025 --- [
main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat
started on port(s): 8080 (http) with context path ''
2020-08-03 11:41:46.754 INFO 34025 --- [
main] a.a.t.k.DemoKafkaProducerApplication
: Started DemoKafkaProducerApplication in 1.775 seconds (JVM running
for 2.102)
Producer:
papicella@papicella:~/pivotal/DemoProjects/spring-starter/pivotal/KAFKA/demo-kafka-consumer$
./mvnw spring-boot:run
...
2020-08-03 11:43:53.423 INFO 34056 --- [
main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat
started on port(s): 8081 (http) with context path ''
2020-08-03 11:43:53.440 INFO 34056 --- [
main] a.a.t.k.DemoKafkaConsumerApplication
: Started DemoKafkaConsumerApplication in 1.666 seconds (JVM running
for 1.936)
14. Start by opening up the the Producer UI by navigating to
http://localhost:8080/
15. Now let's not add any messages yet and also open up the Consumer UI by
navigating to http://localhost:8081/
Note: This application will automatically refresh the page every 2 seconds
to show which messages have been sent to the Kafka Topic
16. Return to the Producer UI http://localhost:8080/ and add two messages using whatever text you like as shown below.
17. Return to the Consumer UI http://localhost:8081/ to verify the two messages sent to the Kafka topic has been consumed
18. Both these Springboot applications are using "Spring for Apache Kafka"
Both Springboot application use a application.yml to bootstrap access to the
Kafka cluster
The Producer Springboot application is using a KafkaTemplate to send messages
to our Kafka Topic as shown below.
@Controller @Slf4j public class TopicMessageController { private KafkaTemplate<String, String> kafkaTemplate; @Autowired public TopicMessageController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } final private String topicName = "apples-topic"; @GetMapping("/") public String indexPage (Model model){ model.addAttribute("topicMessageAddSuccess", "N"); return "home"; } @PostMapping("/addentry") public String addNewTopicMessage (@RequestParam(value="message") String message, Model model){ kafkaTemplate.send(topicName, message); log.info("Sent single message: " + message); model.addAttribute("message", message); model.addAttribute("topicMessageAddSuccess", "Y"); return "home"; } }
The Consumer Springboot application is configured with a KafkaListener as
shown below
@Controller @Slf4j public class TopicConsumerController { private static ArrayList<String> topicMessages = new ArrayList<String>(); @GetMapping("/") public String indexPage (Model model){ model.addAttribute("topicMessages", topicMessages); model.addAttribute("topicMessagesCount", topicMessages.size()); return "home"; } @KafkaListener(topics = "apples-topic") public void listen(String message) { log.info("Received Message: " + message); topicMessages.add(message); } }
In this post we did not setup any client authentication against the cluster for the producer or consumer given this was just a demo.
More Information
Spring for Apache Kafka
CNCF Sanbox projects
Strimzi
No comments:
Post a Comment