Hello World with a basic Kafka Producer and Consumer. It is assumed that you know Kafka terminology. There is a lot to learn about Kafka, but this starter is as simple as it can get with Zookeeper, Kafka and Java based producer/consumer.
Download and install Kafka 2.12. Install in this case is just unzip. Open two console windows to your Kafka directory (named such as kafka_2.12-2.3.0)
1 2 3 4 5 |
Start zookeeper in one console bin/zookeeper-server-start.sh config/zookeeper.properties Start Kafka broker in the other (one broker is good enough for our hello world example) bin/kafka-server-start.sh config/server.properties |
Play around with Kafka using CLI commands to check if all is good and also create the topic named testtopic with a replication factor of 1 (limited to 1 since we started only 1 broker) and partition size of 2 (though in our example the 2nd partition is not used since the message key is hardcoded. Left as an exercise to generate keys for each message.
1 2 3 4 5 6 7 8 |
Create a topic named test ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic Put messages into the test topic (type in message and hit enter to publish) ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic Consume messages from the test topic (ctrl+c to quit) ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic |
And now the Java code to publish and subscribe. I used Eclipse STS and you can run the producer and consumer from the IDE. In the producer console type in a string and hit enter. The text will be published to a test topic. The consumer will continue to run forever and pick up any messages published and print to console. Type in “exit” to stop the publisher (or you can kill it). Consumer can be killed when you are done testing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.Scanner; import java.util.concurrent.ExecutionException; /** * Kafka console message producer. Takes in text (ending with newline) and sends it to a Kafka topic. */ public class KafkaProducerExample { public final static String TOPIC_NAME = "testtopic"; public final static Logger logger = LoggerFactory.getLogger(KafkaProducerExample.class.getName()); public static void main(String[] args) throws InterruptedException, ExecutionException { // produce a test message // if u run this multiple times ... u will have multiple messages in the // test_topic topic (as would be expected) Producer<String, String> producer = KafkaProducerExample.createProducer(); Scanner sc = new Scanner(System.in); try { String inputText = null; while (!"exit".equalsIgnoreCase(inputText)) { inputText = sc.nextLine(); // key is hardcoded to 'key1', which forces all messages to go to a single partition as per kafka behavior ProducerRecord<String, String> recordToSend = new ProducerRecord<>(TOPIC_NAME, "key1", inputText); // asynchronous send producer.send(recordToSend, (recordMetadata, e) -> { if (e == null) { logger.info("Message Sent. topic={}, partition={}, offset={}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } else { logger.error("Error while sending message. ", e); } }); } } finally { sc.close(); producer.flush(); producer.close(); } } private static Producer<String, String> createProducer() { Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<String, String>(kafkaProps); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package kafka; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Kafka console message consumer. */ public class KafkaConsumerExample { private final static String TOPIC_NAME = KafkaProducerExample.TOPIC_NAME; public final static Logger logger = LoggerFactory.getLogger(KafkaConsumerExample.class.getName()); public static void main(String[] args) { // consume messages Consumer<String, String> consumer = KafkaConsumerExample.createConsumer(); // subscribe to the test topic consumer.subscribe(Collections.singletonList(TOPIC_NAME)); try { String receivedText = null; while (!"exit".equalsIgnoreCase(receivedText)) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { receivedText = record.value(); if (receivedText != null) { logger.info( "Message received ==> topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), receivedText); } } } } finally { consumer.close(); } } private static Consumer<String, String> createConsumer() { Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group"); kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new KafkaConsumer<String, String>(kafkaProps); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> </dependencies> |
To run the java producer (after you build the mvn artifact using mvn clean package)
1 |
mvn exec:java -Dexec.mainClass="kafka.KafkaProducerExample" |
To run the java consumer
1 |
mvn exec:java -Dexec.mainClass="kafka.KafkaConsumerExample" |
Type something into the console for the producer and it should echo back in the consumer console window … see below
Code is at https://github.com/thomasma/hellokafka
Note: This is a very basic (hello world) example of using Kafka just using Kafka APIs (no Spring). Using Kafka in a real app needs tons of more thought. Everything from choosing cluster architecture, to how you assign message keys, to how design right partition strategy, data security, high availability architecture, etc. are to be thought through.