Kafka 简单生产者/消费者示例
-
简单生产者示例
让我们创建一个使用Java客户端发布和使用消息的应用程序。Kafka生产者客户端包含以下API。 -
KafkaProducer API
让我们了解本节中最重要的KafkaProducer API集。KafkaProducer API的核心部分是KafkaProducer类。KafkaProducer类提供了使用以下方法连接其构造函数中的Kafka代理的选项。- KafkaProducer类提供了send方法,以将消息异步发送到主题(Topic)。send()的用法如下
- ProducerRecord - 生产者管理等待发送记录的缓冲区。
- callback - 用户提供的回调,在服务器确认记录后执行(空表示没有回调)。
- KafkaProducer类提供了flush方法,以确保所有先前发送的消息均已实际完成。flush方法的语法如下-
- KafkaProducer类提供了partitionFor方法,该方法有助于获取给定主题的分区元数据。这可以用于自定义分区。该方法的如下-
- public void close() - KafkaProducer类提供close方法块,直到所有先前发送的请求完成为止。
-
Producer API
Producer API的核心部分是Producer类。Producer类提供了一种通过以下方法在其构造函数中连接Kafka 代理(Broker)的选项。Producer 类Producer类提供了使用以下方法签名将消息发送到单个或多个主题的send方法。Producer有两种类型:Sync(同步)和Async(异步)。相同的API配置也适用于Sync生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,首选异步生成器。在像0.8这样的早期版本中,异步生产者没有用于send()的回调来注册错误处理程序。public void close()Producer类提供了close方法,以关闭与所有Kafka 代理的生产者池连接。 -
配置设定
下表列出了Producer API的主要配置设置,以便于更好地理解-配置 说明 client.id 识别生产者申请 producer.type 同步sync或异步async acks Acks配置控制生产者请求下的条件被认为是完整的。 retries 如果生产者请求失败,则自动使用特定值重试。 bootstrap.servers 代理自举列表。 linger.ms 如果要减少请求数量,可以将linger.ms设置为大于某个值的值。 key.serializer key序列化器 value.serializer value序列化器 batch.size 缓存大小 buffer.memory 控制可用于生产者的内存总量。 -
ProducerRecord API
ProducerRecord是一个键/值对,它被发送到Kafka集群。ProducerRecord类的构造函数用于使用以下签名创建具有分区,键值对的记录。- topic -用户定义的主题名称,将附加到记录中。
- partition -分区数
- key - 将包含在记录中的key。
- value -记录内容(值)
ProducerRecord类构造函数用于创建具有键,值对且无分区的记录。- topic -用户定义的主题名称,将附加到记录中。
- key - 将包含在记录中的key。
- value -记录内容(值)
ProducerRecord类创建一个没有分区和键的记录- topic -用户定义的主题名称,将附加到记录中。
- value -记录内容(值)
下表中列出了ProducerRecord类方法-方法 说明 public string topic() 主题将追加到记录中。 public K key() 将包含在记录中的key。如果没有这样的键,则将在此处返回null。 public V value() 记录内容。 partition() 记录的分区数 -
SimpleProducer应用程序
在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建自己的主题。之后,创建一个名为SimpleProducer.java的Java类,并输入以下代码。编译 -可以使用以下命令来编译应用程序。编译 -可以使用以下命令执行应用程序。输出要检查上面的输出,请打开新终端并输入Consumer CLI命令来接收消息。 -
简单的消费者示例
截至目前,我们已经创建了一个生产者,用于将消息发送到Kafka集群。现在让我们创建一个消费者以使用来自Kafka集群的消息。KafkaConsumer API用于消费来自Kafka集群的消息。KafkaConsumer类的构造函数在下面定义。语法:configs - 返回使用者配置图。KafkaConsumer类具有以下重要方法,下表中列出了这些方法。方法 说明 public java.util.Set<TopicPar-tition> assignment() 获取消费者当前分配的一组分区。 public string subscription() 订阅给定的主题列表,以获取动态分配的分区。 public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 订阅给定的主题列表,以获取动态分配的分区。 public void unsubscribe() 从给定的分区列表中退订主题。 public void sub-scribe(java.util.List<java.lang.String> topics) 订阅给定的主题列表,以获取动态分配的分区。 如果给定的主题列表为空,则将其与unsubscribe()相同。 public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式是指正则表达式格式的订阅模式,并且listener参数从订阅模式获取通知。 public void as-sign(java.util.List<TopicParti-tion> partitions) 手动为客户分配分区列表。 poll() 使用订阅/分配API之一获取指定主题或分区的数据。 如果在轮询数据之前未预订主题,则将返回错误。 public void commitSync() 提交最后一次poll()返回的所有主题和分区订阅列表的偏移量。 相同的操作应用于commitAsyn()。 public void seek(TopicPartition partition, long offset) 获取使用者将在下一个poll()方法上使用的当前偏移值。 public void resume() 恢复暂停的分区。 public void wakeup() 唤醒消费者。 -
ConsumerRecord API
ConsumerRecord API用于接收来自Kafka集群的记录。该API由主题名称,分区号(从中接收记录)以及指向Kafka分区中的记录的偏移量组成。ConsumerRecord类用于创建具有特定主题名称,分区计数和<键,值>对的消费者记录。它具有以下签名。- topic - 从Kafka集群收到的消费者记录的主题名称。
- partition - 主题分区。
- offset - 记录的键,如果不存在键,则返回null。
- value - 记录内容。
-
ConsumerRecords API
ConsumerRecords API充当ConsumerRecord的容器。该API用于保留特定主题的每个分区的ConsumerRecord列表。其构造函数定义如下。- TopicPartition - 返回特定主题的分区图。
- records - ConsumerRecord的返回列表。
ConsumerRecords类具有以下定义的方法。方法 说明 public int count() 所有主题的记录数。 public Set partitions() 此记录集中具有数据的分区集(如果未返回任何数据,则该集为空)。 public Iterator iterator() 迭代器使您可以循环浏览集合,获取或删除元素。 public List records() 获取给定分区的记录列表。 -
配置设定
消费者客户端API主要配置设置的配置设置在下面列出-方法 说明 bootstrap.servers 自举代理列表。 group.id 将单个消费者分配给一个组。 enable.auto.commit 如果值为true,则启用自动提交偏移量,否则不提交。 auto.commit.interval.ms 返回将更新的消耗偏移量写入ZooKeeper的频率。 session.timeout.ms 指示在放弃并继续使用消息之前,Kafka将等待ZooKeeper响应请求(读取或写入)的毫秒数。 -
SimpleConsumer应用程序
生产者应用程序步骤在此保持不变。首先,启动您的ZooKeeper和Kafka经纪人。然后,使用名为SimpleConsumer.java的Java类创建SimpleConsumer应用程序,并键入以下代码。输出编译 -可以使用以下命令来编译应用程序。执行-可以使用以下命令执行应用程序输入 - 打开生产者CLI,并向该主题发送一些消息。例如输入“Hello Consumer”发送。输出 -以下将是输出。