Kafka與Spark整合

在本章中,將討論如何將Apache Kafka與Spark Streaming API集成。

Spark是什麼?

Spark Streaming API支持實時數據流的可擴展,高吞吐量,容錯流處理。 數據可以從Kafka,Flume,Twitter等許多來源獲取,並且可以使用複雜算法進行處理,例如:映射,縮小,連接和窗口等高級功能。 最後,處理後的數據可以推送到文件系統,數據庫和現場儀表板上。 彈性分佈式數據集(RDD)是Spark的基礎數據結構。 它是一個不可變的分佈式對象集合。 RDD中的每個數據集都被劃分爲邏輯分區,這些分區可以在集羣的不同節點上進行計算。

與Spark整合

Kafka是Spark流媒體的潛在消息傳遞和集成平臺。 Kafka充當實時數據流的中心樞紐,並使用Spark Streaming中的複雜算法進行處理。 數據處理完成後,Spark Streaming可以將結果發佈到HDFS,數據庫或儀表板中的另一個Kafka主題中。 下圖描述了概念流程。
Kafka與Spark整合

現在,詳細介紹一下Kafka-Spark API。

SparkConf API

它代表Spark應用程序的配置。 用於將各種Spark參數設置爲鍵值對。

SparkConf類具有以下方法 -

  • set(string key, string value) − 設置配置變量。
  • remove(string key) − 從配置中刪除鍵。
  • setAppName(string name) − 爲應用程序設置應用程序名稱。
  • get(string key) − 獲得鍵。

StreamingContext API

這是Spark功能的主要入口點。 SparkContext表示與Spark羣集的連接,並且可用於在羣集上創建RDD,累加器和廣播變量。 簽名的定義如下所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - 要連接的羣集URL(例如,mesos://host:portspark://host:portlocal [4])。
  • appName - 作業的名稱,以顯示在集羣Web UI上。
  • batchDuration - 流數據將被分成批次的時間間隔。
public StreamingContext(SparkConf conf, Duration batchDuration)

通過提供新的SparkContext所需的配置來創建StreamingContext。

  • conf - Spark參數。
  • batchDuration - 流數據將被分成批次的時間間隔。

KafkaUtils API

KafkaUtils API用於將Kafka集羣連接到Spark流。 該API具有如下定義的重要方法createStream簽名。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上面顯示的方法用於創建從Kafka Brokers中提取消息的輸入流。

  • ssc - StreamingContext對象。
  • zkQuorum - Zookeeper仲裁。
  • groupId - 此消費者的組ID。
  • topics - 返回要消費的主題地圖。
  • storageLevel - 用於存儲接收對象的存儲級別。

KafkaUtils API還有另一種方法createDirectStream,它用於創建一個輸入流,直接從Kafka Brokers中提取消息而不使用任何接收器。 此流可以保證來自Kafka的每條消息都只包含在一次轉換中。

示例應用程序在Scala中完成。 要編譯應用程序,請下載並安裝sbt,scala構建工具(與maven類似)。 主應用程序代碼如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

構建腳本
spark-kafka集成取決於spark,spark流和spark Kafka集成jar。 創建一個新的文件build.sbt並指定應用程序的詳細信息及其依賴關係。 sbt將在編譯和打包應用程序時下載必要的jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

編譯/包

運行以下命令來編譯和打包應用程序的jar文件。 需要將jar文件提交到spark控制檯來運行應用程序。

sbt package

提交給Spark

啓動Kafka Producer CLI(在前一章中介紹),創建一個名稱爲my-first-topic的新主題,並提供一些示例消息,如下所示。

Another spark test message

運行以下命令將應用程序提交到 spark 控制檯。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

這個應用程序的輸出示例如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..