How to Connect to an Apache Kafka Topic

Document created by harvey_melfi Employee on Aug 30, 2016
Version 1Show Document
  • View in full screen mode

This article describes how to send and receive messages to Apache Kafka messaging cluster via custom scripting. It also includes a basic overview of downloading and running a Kafka server on your local PC for evaluation purposes.

 

 

 

Use Case

You want to send and get messages to/from an Apache Kafka topic. Apache Kafka is a free messaging component that is increasingly popular for Internet of Things scenarios. It is a very high-performance, high-throughput, clustered messaging broker.

 

For more information on Kafka and some of its uses, see:

 

Approach

  1. Install a Kafka server instance locally for evaluation purposes.
  2. Run the Kafka server and create a new topic.
  3. Configure the local Atom with the Kafka client libraries.
  4. Create an AtomSphere integration process to publish messages to the Kafka topic via Groovy custom scripting.
  5. Create an AtomSphere integration process to consume messages from the Kafka topic via Groovy custom scripting.

 

Implementation

 

1. Download and Run Kafka

The instructions below are intended to be a quick summary. For full information about Kafka and detailed instructions please see Apache Kafka.

 

  1. Download the latest version of Kafka from Apache: http://kafka.apache.org/downloads.html
  2. Extract the gzipped compressed file to a directory. In the examples below, the contents were extracted to C:\Kafka.

 

Start the single-node Zookeeper instance

Apache ZooKeeper manages the configuration information for the distributed application.

 

In a command prompt:

cd C:\Kafka\kafka_2.11-0.10.0.0\bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

 

Start the Kafka server

In a command prompt:

cd C:\Kafka\kafka_2.11-0.10.0.0\bin\windows

kafka-server-start.bat ../../config/server.properties

 

Create a new topic

Create a topic named "BoomiTopic" with a single partition and one replica.

 

In a command prompt:

cd C:\Kafka\kafka_2.11-0.10.0.0\bin\windows

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BoomiTopic

 

List the topics

In a command prompt:

kafka-topics.bat --list --zookeeper localhost:2181

 

Send messages to the topic

In a command prompt:

kafka-console-producer.bat --broker-list localhost:9092 --topic BoomiTopic

 

Then type a message and click enter to publish that line of text as a message. When you are done, enter Ctrl-C to end the publisher command.

 

Consume messages from the BoomiTopic topic:

In a command prompt:

kafka-console-consumer.bat --zookeeper localhost:2181 --topic BoomiTopic --from-beginning

 

To stop the Kafka server

In a command prompt:

kafka-server-stop.bat

then

zookeeper-server-stop.bat

 

2. Configure the Local Atom

To publish messages to Kafka from Boomi:

  1. Copy the following jar file C:\Kafka\kafka_2.11-0.10.0.0\libs\kafka-clients-0.10.0.0.jar into the Atom's ..\<atom root>\userlib\script directory.
  2. Restart the Atom.

 

3. Create the Integration Process to Publish Message to a Kafka Topic

Create a simple integration process with a No Data start shape, a Message shape with static test data, Data Process shape, and finally a Stop shape.

 

 

In the Data Process shape add a Custom Scripting step with the following code. The script connects to the specified topic and publishes each incoming document as a Kafka message.

 

import java.util.Properties;
import java.io.InputStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.*;

// Configure Kafka settings
String TOPIC_NAME = "BoomiTopic";
String SERVER_HOST = "localhost:9092";

// Initialize Kafka connection
Properties props = new Properties();
props.put("bootstrap.servers", SERVER_HOST)
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer(props);

// Loop through documents and send to Kafka topic
for (int i = 0; i < dataContext.getDataCount(); i++) {
  InputStream is = dataContext.getStream(i);
  Properties Bprops = dataContext.getProperties(i);

  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  byte[] buffer = new byte[1024];
  int length;
  while ((length = is.read(buffer)) != -1) {
    baos.write(buffer, 0, length);
  }

  String docString = baos.toString("UTF-8");

  producer.send(new ProducerRecord<String, String>(TOPIC_NAME, docString));
}

producer.close();

// Create simple response document to return to process
String responseDocument = "Success";
dataContext.storeStream(new ByteArrayInputStream(responseDocument.getBytes()), new Properties());

 

4. Create the Integration Process to Consume Messages From a Kafka Topic

Create a simple integration process with a No Data start shape, Data Process shape, and finally a Stop shape.

 

 

In the Data Process shape add a Custom Scripting step with the following code. The script connects to the specified topic, retrieves all available messages, and outputs a document for each.

 

import java.util.Properties;
import java.io.*;

import org.apache.kafka.clients.consumer.*;
import com.boomi.execution.ExecutionUtil;

// Configure Kafka settings
String TOPIC_NAME = "BoomiTopic";
String SERVER_HOST = "localhost:9092";
String GROUP_ID = "Boomi";

// Initialize Kafka connection
Properties props = new Properties();
props.put("bootstrap.servers", SERVER_HOST)
props.put("acks", "all");
props.put("group.id", GROUP_ID);
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer(props);

// Get records from topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
ConsumerRecords<String, String> records = consumer.poll(1000);

// Output each record as a document for the next process step
String msg = "";
for (ConsumerRecord<String, String> record : records) {
  msg = record.value();

  dataContext.storeStream(new ByteArrayInputStream(msg.getBytes()), new Properties());
}

consumer.close();

 

Considerations

  • The throughput of Kafka is impressive, handling 50 MB/sec for producers and 100 MB/sec for consumers. Check out the Kafka performance page for performance statistics.

Attachments

    Outcomes