Apache Camel K is a lightweight integration framework built from Apache Camel that runs natively on Kubernetes and is specifically designed for serverless and microservice architectures. Often referred to as the Swiss army knife for solving integration problems, it can be used to easily integrate many heterogeneous systems and applications allowing them share data seamlessly. Camel K users can instantly run integration code written inCamel DSL on their preferred cloud (Kubernetes or OpenShift). To learn more about Camel K and Camel, visit theofficial Camel site
Kafka can be easily integrated with the rest of your applications using Camel K. Kafka brokers support client authentication usingSASL (Simple Authentication and Security Layer). SASL decouples authentication mechanisms from application protocols thus allowing us to use any of the authentication mechanisms supported by SASL. We will use the SASL supported OAUTHBEARER mechanism for authentication and SSL will handle the data encryption.
Getting started
This demonstration requires access to a kubernetes/openshift cluster.
Installing Camel K and required CLI tools
Camel K
: Follow theCamel K installation guide for your specific clusterkamel
: the Apache Camel K CLI tool. Can be downloaded from theCamel K releases pagekubectl
: the kubernetes default CLI tool. Can be downloaded from theKubernetes releases page
Setting up the Kafka instance
Using Openshift Streams for Apache Kafka, we can easily create a Kafka instance, a service account and a Kafka Topic. The SASL OAUTHBEARER authentication method should be selected. For this demonstration, we are going to name the Kafka Topic "test". Save the Kafka broker URL, service account ID, service account secret and token endpoint URL. We will use them in a configuration file.
Using Kafka in a Camel K integration
We will create the following files:
application.properties
: Configuration file, holds configuration propertiesSaslSSLKafkaProducer.java
: Producer, contains integration code and produces to the Kafka TopicSaslSSLKafkaConsumer.java
: Consumer, contains integration code and consumes from the Kafka Topic
Kafka uses the Java Authentication and Authorization Service (JAAS) for SASL configuration, hence we will specify the JAAS configuration in theapplication.properties
config file. We also have to specify a login callback handler that will retrieve the oauthbearer token.
application.properties file:
# Kafka configcamel.component.kafka.brokers = <YOUR KAFKA BROKER URL>camel.component.kafka.security-protocol = SASL_SSLcamel.component.kafka.sasl-mechanism = OAUTHBEARERcamel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id='<YOUR SERVICE ACCOUNT ID>' \ oauth.client.secret='<YOUR SERVICE ACCOUNT SECRET>' \ oauth.token.endpoint.uri="<TOKEN ENDPOINT URL>" ;camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandlerconsumer.topic=testproducer.topic=test
In the integration files, we will specifykafka-oauth-client
as a maven dependency because it provides the login handler class specified in the configuration file.
SaslSSLKafkaProducer.java:
// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0importorg.apache.camel.builder.RouteBuilder;publicclassSaslSSLKafkaProducerextendsRouteBuilder{@Overridepublicvoidconfigure()throwsException{log.info("About to start route: Timer -> Kafka ");from("timer:foo").routeId("FromTimer2Kafka").setBody().simple("Message #${exchangeProperty.CamelTimerCounter}").to("kafka:{{producer.topic}}").log("Message correctly sent to the topic!");}}
SaslSSLKafkaConsumer.java:
// camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.10.0importorg.apache.camel.builder.RouteBuilder;publicclassSaslSSLKafkaConsumerextendsRouteBuilder{@Overridepublicvoidconfigure()throwsException{log.info("About to start route: Kafka -> Log ");from("kafka:{{consumer.topic}}").routeId("FromKafka2Log").log("${body}");}}
Bundle configuration properties into a secret:
kubectl create secret generic kafka-props --from-file application.properties
Producing to the Kafka Topic
kamel run --config secret:kafka-props SaslSSLKafkaProducer.java --dev...[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic![2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic![2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
Consuming from the Kafka Topic
kamel run --config secret:kafka-props SaslSSLKafkaConsumer.java --dev...[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8[1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9[1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
That's it! Hopefully someone finds this useful and if you have any questions or comments, feel free to post in the comments section. Happy Coding!
Top comments(1)
For further actions, you may consider blocking this person and/orreporting abuse