Skip to main content

Enabling Kafka Streams in Mule 4 applications

Apache Kafka is used for various real time and event based use cases such as tracking website activities, managing operational metrics, aggregating logs from different sources, processing stream data etc.

MuleSoft Anypoint Platform provides an out of the box Kafka Connector in order to stream messages in and out of a Kafka broker allowing users to ingest real-time data from Apache Kafka, as well as publish it to Apache Kafka.

Problem Statement

MuleSoft's Kafka connector provides a rich functionality of consuming and publishing to Kafka topics, in certain use cases where there is a need of stateful operations like Joins, Reduce, Aggregations etc the logic inside mule consumer application becomes highly complex.

Solution

Resolve this complexity by using the Kafka Streams API, a library extension to the Mule runtime.

The Kafka Streams API is a simple and lightweight client library, which can be easily embedded in any Java application (including Mule runtime)  and provides a robust stream processing framework which includes Stateless and Stateful operation (joins, aggregations etc).

Implementation

Below are the steps required to configure a Mule 4 application to use the Kafka Streams API:

  • Create an empty Mule 4 application and add Kafka streams dependency in pom.xml
kafka stream dependence
  • Create a Java class which implements: org.springframework.beans.factory.InitializingBean.
java class
  • The Java class should use the Kafka streams API to consume data from Kafka Topic using Streams API DSL language. Below is the snippet of the class to do a simple Reduce operation which is basically stateful. Again there can be more complex cases like joins of multiple Kafka topics but for now we have tried to make it a simple use case of reduce
reduce operation
  • Create Spring boot config file under src/main/resources. In this case we have named it beans.xml.
  • Add startup listener bean in the beans.xml which refers to the Java class which was created in step 2.
startup listener
  • Finally add spring config in the Mule config file and refer it to the beans.xml created above.
spring config
  • The Mule application is now ready to be started/deployed.
  • When the Mule application starts it initializes the Kafka streams Java class and processes the stream as per the reduce logic mentioned in the streams java class.
    The mule application can also be deployed to CloudHub, RTF. We just need to ensure that both CloudHub and Runtime Fabric instances have network connectivity with the Kafka broker.
  • For simplicity, the mule application is deployed in Anypoint Studio and then we are going to test the application by sending some messages into the input Kafka topic.
    Since we have implemented reduce operation, the streams app should output the reduced sum of the amount for a particular key.
  • Sending the first payment object to Kafka topic for account id ‘xyz’ and amount as 100. Below is the output of the mule application.
kafka-payment
  • Sending a second payment object to the Kafka input topic for the same accountid ‘xyz’ and amount as 200. The output should be 300 now as its a reduce operation.
kafka-payment

To find out more about the Kafka Streams API and how you can connect MuleSoft and Apache Kafka, give us a call, or email us at Salesforce@coforge.com

Other useful links:

Our MuleSoft Services

MuleSoft Connect 2021: Key Takeaways

Confluent and Apache Kafka support

Let’s engage