与Spark集成
Kafka是潜在的Spark流消息传递和集成平台。Kafka充当实时数据流的中心枢纽,并在Spark Streaming中使用复杂的算法进行处理。处理完数据后,Spark Streaming可能会将结果发布到另一个Kafka主题中,或存储在HDFS,数据库或仪表板中。下图描述了概念流程。
现在,让我们详细了解Kafka-Spark API。
SparkConf API
它代表Spark应用程序的配置。用于将各种Spark参数设置为键值对。
SparkConf类具有以下方法-
- set(string key, string value) - 设置配置变量。
- remove(string key) - 从配置中删除key。
- setAppName(string name) - 为您的应用程序设置应用程序名称。
- get(string key) - 获取键
StreamingContext API
这是Spark功能的主要切入点。SparkContext表示与Spark集群的连接,可用于在集群上创建RDD,累加器和广播变量。签名定义如下。
- master - 簇URL连接到(例如mesos://主机:端口,spark://主机:端口,local[4])。
- appName - 您的工作的名称,显示在群集Web UI上
- batchDuration - 将流数据分为几批的时间间隔
- conf - Spark参数
- batchDuration - 将流数据分为几批的时间间隔
KafkaUtils API
KafkaUtils API用于将Kafka集群连接到Spark流。该API具有如下定义的有意义的方法createStream签名。
- ssc - StreamingContext对象。
- zkQuorum - Zookeeper仲裁。
- groupId - 该使用者的组ID。
- topics - 返回要使用的主题图。
- storageLevel - 用于存储接收到的对象的存储级别。
KafkaUtils API还有另一个方法createDirectStream,该方法用于创建输入流,该输入流直接从Kafka Brokers提取消息,而无需使用任何接收器。该流可以确保将来自Kafka的每个消息准确地包含在转换中一次。
示例应用程序在Scala中完成。要编译该应用程序,请下载并安装sbt,scala构建工具(类似于maven)。主要应用代码如下所示。
构建脚本
spark-kafka集成取决于spark,spark stream和spark Kafka集成jar。创建一个新文件build.sbt并指定应用程序详细信息及其依赖性。该SBT在编译和打包应用程序会下载必要的jar。
编译/打包
运行以下命令来编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台以运行该应用程序。
提交给Spark
启动Kafka Producer CLI(在上一章中进行了说明),创建一个名为my-first-topic的新主题,并提供一些示例消息,如下所示。
运行以下命令以将应用程序提交到Spark控制台。
该应用程序的示例输出如下所示。