Kafka Java API实现的简单Producer和Consumer

from:
原文阅读

关键字:Kafka Java API、producer、consumer前面的文章《Kafka安装配置测试》中安装配置了分布式的Kafka集群,并且使用自带的kafka-console-producer.sh和kafka-console-consumer.sh模拟测试了发送消息和消费消息。

本文使用简单的Java API模拟Kafka的producer和consumer,其中,procuder从一个文本文件中逐行读取内容,然后发送到Kafka,consumer则从Kafka中读取内容并在控制台打印。

Java API Producer

  1. package com.lxw1234.kafka;
  2.  
  3. import java.io.BufferedReader;
  4. import java.io.File;
  5. import java.io.FileReader;
  6. import java.io.IOException;
  7. import java.util.Properties;
  8.  
  9. import kafka.javaapi.producer.Producer;
  10. import kafka.producer.KeyedMessage;
  11. import kafka.producer.ProducerConfig;
  12.  
  13. public class MyProducer {
  14.  
  15. public static void main(String[] args) {
  16. Properties props = new Properties();
  17. props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
  18. props.put(“metadata.broker.list”, “172.16.212.17:9091,172.16.212.17:9092,172.16.212.102:9091,172.16.212.102:9092”);
  19. Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
  20. String topic = “lxw1234.com”;
  21. File file = new File(“E:/track-log.txt”);
  22. BufferedReader reader = null;
  23. try {
  24. reader = new BufferedReader(new FileReader(file));
  25. String tempString = null;
  26. int line = 1;
  27. while ((tempString = reader.readLine()) != null) {
  28. producer.send(new KeyedMessage<Integer, String>(topic,line + “—“ + tempString));
  29. System.out.println(“Success send [“ + line + “] message ..”);
  30. line++;
  31. }
  32. reader.close();
  33. System.out.println(“Total send [“ + line + “] messages ..”);
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. } finally {
  37. if (reader != null) {
  38. try {
  39. reader.close();
  40. } catch (IOException e1) {}
  41. }
  42. }
  43. producer.close();
  44. }
  45. }

程序从E:/track-log.txt文件中读取内容,发送至Kafka。

Java API Consumer

  1. package com.lxw1234.kafka;
  2.  
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Properties;
  7.  
  8. import kafka.consumer.Consumer;
  9. import kafka.consumer.ConsumerConfig;
  10. import kafka.consumer.ConsumerIterator;
  11. import kafka.consumer.KafkaStream;
  12. import kafka.javaapi.consumer.ConsumerConnector;
  13.  
  14. public class MyConsumer {
  15. public static void main(String[] args) {
  16. String topic = “lxw1234.com”;
  17. ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
  18. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  19. topicCountMap.put(topic, new Integer(1));
  20. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  21. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  22. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  23. while(it.hasNext())
  24. System.out.println(“consume: “ + new String(it.next().message()));
  25. }
  26. private static ConsumerConfig createConsumerConfig() {
  27. Properties props = new Properties();
  28. props.put(“group.id”,“group1”);
  29. props.put(“zookeeper.connect”,“zk1:2181,zk2:2181,zk3:2181”);
  30. props.put(“zookeeper.session.timeout.ms”, “400”);
  31. props.put(“zookeeper.sync.time.ms”, “200”);
  32. props.put(“auto.commit.interval.ms”, “1000”);
  33. return new ConsumerConfig(props);
  34. }
  35. }
  36.  

Consumer从Kafka中消费数据,并在控制台中打印消息内容。

运行和结果

先运行Consumer,之后再运行Producer,运行时候将$KAFKA_HOME/lib/下的所有jar包依赖进去。

Producer运行结果如下:

Kafka Producer

文件中只有50000行记录,因为最后又把行号加了一次,因此最后打印出是50001.

Consumer运行结果如下:

Kafka Consumer

 

Consumer成功获取了5000条数据。

关于Kafka,还有很多疑问,继续尝试和学习吧,enjoy it!

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Kafka Java API实现的简单Producer和Consumer

此条目发表在kafka分类目录,贴了标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注