Big data messaging with Kafka, Part 2

Use Kafka's partitions, message offsets, and consumer groups to handle up to millions of messages per day

1 2 3 Page 2
Page 2 of 3

Say you're creating a new topic with three partitions. When you start the first consumer for the new topic, Kafka will assign all three partitions to the same consumer. If you then start a second consumer, Kafka will reassign all the partitions, assigning one partition to the first consumer and the remaining two partitions to the second consumer. If you add a third consumer, Kafka will reassign the partitions again, so that each consumer is assigned a single partition. Finally, if you start fourth and fifth consumers, then three of the consumers will have an assigned partition, but the others won't receive any messages. If one of the initial three partitions goes down, Kafka will use the same partitioning logic to reassign that consumer's partition to one of the additional consumers.

We'll use automatic assignment for the example application. Most of our consumer code will be the same as it was for the simple consumer seen in Part 1. The only difference is that we'll pass an instance of ConsumerRebalanceListener as a second argument to our KafkaConsumer.subscribe() method. Kafka will call methods of this class every time it either assigns or revokes a partition to this consumer. We'll override ConsumerRebalanceListener's onPartitionsRevoked() and onPartitionsAssigned() methods and print the list of partitions that were assigned or revoked from this subscriber.

Listing 3. A partitioned consumer

  
   private static class ConsumerThread extends Thread {
     private String topicName;
     private String groupId;
     private KafkaConsumer<String, String> kafkaConsumer;

     public ConsumerThread(String topicName, String groupId) {
         this.topicName = topicName;
         this.groupId = groupId;
     }

     public void run() {
         Properties configProperties = new Properties();
         configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

         //Figure out where to start processing messages from
         kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
         kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
             }
             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
             }
         });
         //Start processing messages
         try {
             while (true) {
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                 for (ConsumerRecord<String, String> record : records)
                     System.out.println(record.value());
             }
         } catch (WakeupException ex) {
             System.out.println("Exception caught " + ex.getMessage());
         } finally {
             kafkaConsumer.close();
             System.out.println("After closing KafkaConsumer");
         }
     }

     public KafkaConsumer<String, String> getKafkaConsumer() {
         return this.kafkaConsumer;
     }
}
   
   

Test the application

We're ready to run and test the current iteration of our producer/consumer application. As you've done previously, you can use the code in Listings 1 through 3, or download the complete source code on GitHub.

  1. Compile and create a fat JAR by invoking: mvn compile assembly:single.
  2. Create a topic named part-demo with three partitions and one replication factor:
    
        <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
      
    
  3. Start a producer:
    
                java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
    
    
  4. Start three consumers, then watch the console to see how your partitions are assigned and revoked every time you start a new instance of the consumer:
    
                java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
    
    
  5. Type some messages into your producer console and verify whether the messages are routed to the correct consumer:
    
                USA: First order
                India: First order
                USA: Second order
                France: First order
    
    

Figure 2 shows producer/consumer output in the partitioned topic.

Producer/consumer output

Figure 2. Producer/consumer output

Being able to partition a single topic into multiple parts is one essential to Kafka's scalability. Partitioning lets you scale your messaging infrastructure horizontally while also maintaining order within each partition. Next we'll look at how Kafka uses message offsets to track and manage complex messaging scenarios.

Managing message offsets

I mentioned in Part 1 that whenever a producer publishes a message, the Kafka server assigns an offset to that message. A consumer is able to control which messages it wants to consume by setting or resetting the message offset. When developing a consumer you have two options for managing the offset: automatic and manual.

Two types of offset

When you start a consumer in the Kafka client, it will read the value of your ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset) configuration. If that config is set to earliest then the consumer will start with the smallest offset available for the topic. In its first request to Kafka, the consumer will say: give me all the messages in this partition with an offset greater than the smallest one available. It will also specify a batch size. The Kafka server will return it all the matching messages in batches of the specified size.

The consumer keeps track of the offset of the last message it has processed, so it will always request messages with an offset higher than the last offset. This setup works when a consumer is functioning normally, but what happens if the consumer crashes, or you want to stop it for maintenance? In this case you would want the consumer to remember the offset of last message processed, so that it can start with the first unprocessed message.

In order to ensure message persistence, Kafka uses two types of offset: The current offset is used to track messages consumed when the consumer is working normally. The committed offset also tracks the last message offset, but it sends that information to the Kafka server for persistent storage.

If the consumer goes down or is taken down for some reason, it can query the Kafka server for the last committed offset and resume message consumption as if no time has been lost. For its part, the Kafka broker stores this information in a topic called __consumer_offsets. This data is replicated to multiple brokers so that the broker won't ever lose the offsets.

Committing offset data

You have a choice about how often to commit offset data. If you commit frequently, you'll take a performance penalty. On the other hand, if the consumer does go down you will have fewer messages to reprocess and consume. Your other option is to commit less frequently (for better performance), but reprocess more messages in case of failure. In either case the consumer has two options for committing the offset:

  1. Auto commits: You can set auto.commit to true and set the auto.commit.interval.ms property with a value in milliseconds. Once you've enabled this, the Kafka consumer will commit the offset of the last message received in response to its poll() call. The poll() call is issued in the background at the set auto.commit.interval.ms.
  2. Manual commits: You can call a commitSync() or commitAsync() method anytime on the KafkaConsumer. When you issue the call, the consumer will take the offset of the last message received during a poll() and commit that to the Kafka server.

Three use cases for manual offsets

Let's consider three use cases where you wouldn't want to use Kafka's default offset management infrastructure. Instead, you'll manually decide what message to to start from.

  1. Start from the beginning: In this use case, you are capturing database changes in Kafka. The first record was the full record; thereafter you only get columns whose value has changed (delta of changes). In this case you always need to read all the messages in a topic from the beginning, in order to construct the full state of the record. To solve a scenario like this, you can configure the consumer to read from the beginning by calling the kafkaConsumer.seekToBeginning(topicPartition) method. Remember that by default Kafka will delete messages more than seven days old, so you need to configure log.retention.hours to a higher value for this use case.
  2. Go to the end: Now let's say you're building a stock recommendation application by analyzing trades in realtime. The worst case happens and your consumer application goes down. In this case, you've used kafkaConsumer.seekToEnd(topicPartition) to configure the offset to ignore messages that are posted during downtime. Instead, the consumer will begin processing trades that are happening from the instant that it restarts.
  3. Start at a given offset: Finally, say that you just released a new version of the producer in your production environment. After watching it produce a few messages, you realize that it is generating bad messages. You fix the producer and start it again. You don't want your consumer to consume those bad messages, so you manually set the offset to the first good message produced, by calling kafkaConsumer.seek(topicPartition, startingOffset).

Manual offsets in the consumer app

The consumer code that we've developed so far auto-commits records every 5 seconds. Now let's update the consumer to take a third argument that manually sets your offset consumption.

If you use the value of the last argument equal to 0, the consumer will assume that you want to start from the beginning, so it will call a kafkaConsumer.seekToBeginning() method for each of its partitions. If you pass a value of -1 it will assume that you want to ignore the existing messages and only consume messages published after the consumer has been restarted. In this case it will call kafkaConsumer.seekToEnd() on each of the partitions. Finally, if you specify any value other than 0 or -1 it will assume that you have specified the offset that you want the consumer to start from; for example, if you pass the third value as 5, then on restart the consumer will consume messages with an offset greater than 5. For this it would call kafkaConsumer.seek(<topicname>, <startingoffset>).

1 2 3 Page 2
Page 2 of 3