Kafka 与Spark集成

  • 与Spark集成。

    在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成。
  • 关于Spark

    Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(例如Kafka,Flume,Twitter等)中提取,并可以使用复杂的算法(例如高级功能(如map,reduce,join和window))进行处理。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。弹性分布式数据集(RDD)是Spark的基本数据结构。它是对象的不可变分布式集合。RDD中的每个数据集都分为逻辑分区,可以在群集的不同节点上进行计算。
  • 与Spark集成

    Kafka是潜在的Spark流消息传递和集成平台。Kafka充当实时数据流的中心枢纽,并在Spark Streaming中使用复杂的算法进行处理。处理完数据后,Spark Streaming可能会将结果发布到另一个Kafka主题中,或存储在HDFS,数据库或仪表板中。下图描述了概念流程。
    现在,让我们详细了解Kafka-Spark API。
    SparkConf API
    它代表Spark应用程序的配置。用于将各种Spark参数设置为键值对。
    SparkConf类具有以下方法-
    • set(string key, string value) - 设置配置变量。
    • remove(string key) - 从配置中删除key。
    • setAppName(string name) - 为您的应用程序设置应用程序名称。
    • get(string key) - 获取键
    StreamingContext API
    这是Spark功能的主要切入点。SparkContext表示与Spark集群的连接,可用于在集群上创建RDD,累加器和广播变量。签名定义如下。
    
    public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
    
    • master - 簇URL连接到(例如mesos://主机:端口,spark://主机:端口,local[4])。
    • appName - 您的工作的名称,显示在群集Web UI上
    • batchDuration - 将流数据分为几批的时间间隔
    
    public StreamingContext(SparkConf conf, Duration batchDuration)
    
    • conf - Spark参数
    • batchDuration - 将流数据分为几批的时间间隔
    KafkaUtils API
    KafkaUtils API用于将Kafka集群连接到Spark流。该API具有如下定义的有意义的方法createStream签名。
    
    public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
       StreamingContext ssc, String zkQuorum, String groupId,
       scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
    
    • ssc - StreamingContext对象。
    • zkQuorum - Zookeeper仲裁。
    • groupId - 该使用者的组ID。
    • topics - 返回要使用的主题图。
    • storageLevel - 用于存储接收到的对象的存储级别。
    KafkaUtils API还有另一个方法createDirectStream,该方法用于创建输入流,该输入流直接从Kafka Brokers提取消息,而无需使用任何接收器。该流可以确保将来自Kafka的每个消息准确地包含在转换中一次。
    示例应用程序在Scala中完成。要编译该应用程序,请下载并安装sbt,scala构建工具(类似于maven)。主要应用代码如下所示。
    
    import java.util.HashMap
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object KafkaWordCount {
       def main(args: Array[String]) {
          if (args.length < 4) {
             System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
             System.exit(1)
          }
    
          val Array(zkQuorum, group, topics, numThreads) = args
          val sparkConf = new SparkConf().setAppName("KafkaWordCount")
          val ssc = new StreamingContext(sparkConf, Seconds(2))
          ssc.checkpoint("checkpoint")
    
          val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
          val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
          val words = lines.flatMap(_.split(" "))
          val wordCounts = words.map(x => (x, 1L))
             .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
          wordCounts.print()
    
          ssc.start()
          ssc.awaitTermination()
       }
    }
    
    构建脚本
    spark-kafka集成取决于spark,spark stream和spark Kafka集成jar。创建一个新文件build.sbt并指定应用程序详细信息及其依赖性。该SBT在编译和打包应用程序会下载必要的jar。
    
    name := "Spark Kafka Project"
    version := "1.0"
    scalaVersion := "2.10.5"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
    
    编译/打包
    运行以下命令来编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台以运行该应用程序。
    
    sbt package
    
    提交给Spark
    启动Kafka Producer CLI(在上一章中进行了说明),创建一个名为my-first-topic的新主题,并提供一些示例消息,如下所示。
    
    Another spark test message
    
    运行以下命令以将应用程序提交到Spark控制台。
    
    /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
    -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
    -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
    
    该应用程序的示例输出如下所示。
    
    spark console messages ..
    (Test,1)
    (spark,1)
    (another,1)
    (message,1)
    spark console message ..