Listing 2. KafkaConsumer
public class Consumer {
private static Scanner in;
private static boolean stop = false;
public static void main(String[] argv)throws Exception{
if (argv.length != 2) {
System.err.printf("Usage: %s <topicName> <groupId>\n",
Consumer.class.getSimpleName());
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = argv[0];
String groupId = argv[1];
ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
consumerRunnable.start();
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName));
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
Consumer and ConsumerThread
Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer
object before exiting. I'll describe each class in turn. First, ConsumerThread
is an inner class that takes a topic name and group name as its arguments. In the run()
method it creates a KafkaConsumer
object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe()
method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.
In the Consumer
class we create a new object of ConsumerThread
and start it in a different thread. The ConsumerThead
starts an infinite loop and keeps polling the topic for new messages. Meanwhile in the Consumer
class, the main thread waits for a user to enter exit
on the console. Once a user enters exit, it calls the KafkaConsumer.wakeup()
method, causing the KafkaConsumer
to stop polling for new messages and throw a WakeupException
. We can then close the KafkaConsumer
gracefully, by calling kafkaConsumer
's close()
method.
Run the application
To test this application you can run the code in Listings 1 and 2 from your IDE, or you can follow these steps:
- Download the sample code, KafkaAPIClient, by executing the command:
git clone https://github.com/sdpatil/KafkaAPIClient.git
. - Compile the code and create a fat JAR with the command:
mvn clean compile assembly:single
. - Start the consumer:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1
. - Start the producer:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test
. - Enter a message in the producer console and check to see whether that message appears in the consumer. Try a few messages.
- Type
exit
in the consumer and producer consoles to close them.
Figure 2. A Kafka producer/consumer application
Conclusion to Part 1
In the first half of this tutorial you've learned the basics of big data messaging with Apache Kafka, including a conceptual overview of Kafka, setup instructions, and how to configure a producer/consumer messaging system with Kafka.
As you've seen, Kafka's architecture is both simple and efficient, designed for performance and throughput. In Part 2 I'll introduce some more advanced techniques for distributed messaging with Kafka, starting with using partitions to subdivide topics. I'll also demonstrate how to manage message offsets in order to support different use cases.