Built for realtime: Big data messaging with Apache Kafka, Part 2

Find out for yourself how Apache Kafka's partitions, message offsets, and consumer groups handle up to millions of messages per day

1 2 3 Page 3
Page 3 of 3

Listing 4. Adding a third argument to the consumer


  private static class ConsumerThread extends Thread{
    private String topicName;
    private String groupId;
    private long startingOffset;
    private KafkaConsumer<String,String> kafkaConsumer;

    public ConsumerThread(String topicName, String groupId, long startingOffset){
        this.topicName = topicName;
        this.groupId = groupId;
        this.startingOffset=startingOffset;
    }
    public void run() {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
        configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //Figure out where to start processing messages from
        kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
        kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
            }
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                while(topicPartitionIterator.hasNext()){
                    TopicPartition topicPartition = topicPartitionIterator.next();
                    System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                    if(startingOffset ==0){
                        System.out.println("Setting offset to beginning");
                        kafkaConsumer.seekToBeginning(topicPartition);
                    }else if(startingOffset == -1){
                        System.out.println("Setting it to the end ");
                        kafkaConsumer.seekToEnd(topicPartition);
                    }else {
                        System.out.println("Resetting offset to " + startingOffset);
                        kafkaConsumer.seek(topicPartition, startingOffset);
                    }
                }
            }
        });
        //Start processing messages
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }

            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            kafkaConsumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
    public KafkaConsumer<String,String> getKafkaConsumer(){
        return this.kafkaConsumer;
    }
}

Once your code is ready you can test it by executing following command:


        java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
    

The Kafka client should print all the messages from an offset of 0, or you could change the value of the last argument to jump around in the message queue.

Consumer groups in Apache Kafka

Traditional messaging use cases can be divided into two main types: point to point and publish-subscribe. In a point-to-point scenario, one consumer consumes one message. When a message relays a bank transaction, only one consumer should respond by updating the bank account. In a publish-subscribe scenario, multiple consumers will consume a single message but respond differently to it. When a web server goes down, you want the alert to go to consumers programmed to respond in different ways.

Queue refers to a point-to-point scenario, where a message is consumed by only one consumer. Topic refers to a publish-subscribe scenario, where a message is consumed by every consumer. Kafka doesn't define a separate API for the queue and topic use cases; instead, when you start your consumer you need to specify the ConsumerConfig.GROUP_ID_CONFIG property.

If you use the same GROUP_ID_CONFIG for more than one consumer, Kafka will assume that both of them are part of a single group, and it will deliver messages to only one of the consumers. If you start the two consumers in separate group.ids, Kafka will assume that they are not related, so each consumer will get its own copy of the message.

Recall that the partitioned consumer in Listing 3 takes groupId as its second parameter. Now we'll use the groupId parameter to implement both queue and topic use cases for the consumer.

  1. Create a topic named group-test with two partitions:
    
      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
    
  2. Start a producer that could be used for publishing messages to the group-test topic that you just created:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
    
  3. Start three consumers that listen for messages published to the group-test topic. Use group1 for the value of your group id. This will give you three consumers in group1:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
    
  4. Start a fourth consumer, but this time change the value of the group id to group2. This will give you three consumers in group1 and a single consumer in group2:
    
      java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
    
  5. Return to the producer console and start typing messages. Every new message you publish should appear once in the group2 consumer window and once in one of the three group1 consumer windows, as shown in Figure 3.
Consumer group output

Figure 3. Consumer group output

Conclusion to Part 2

Early use cases for big data message systems called for batch processing, such as running a nightly ETL process or moving data from the RDBMS to a NoSQL datastore at regular intervals. In the past few years the demand for realtime processing has increased, especially for fraud detection and emergency response systems. Apache Kafka was built for just these types of realtime scenarios.

Apache Kafka is a great open source product but it does have some limitations; for instance you can't query data from inside a topic before it reaches its destination, or replicate data across multiple geographically distributed clusters. You could combine MapR Streams (a commercial product) with the Kafka API for these and other more complex publish-subscribe scenarios.

1 2 3 Page 3
Page 3 of 3