Spark部署
Spark應用程序使用spark-submit(shell命令)來部署在集羣中的Spark應用程序。 它通過一個統一的接口採用全各自的集羣管理器。 因此,您不必每一個應用程序配置。
示例
讓我們同樣以計算字數爲例子,在使用之前,使用shell命令。 在這裏,我們考慮同樣 spark 應用程序的例子。
簡單輸入
下面的文字是輸入數據,並命名該文件爲 in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
請看下面的程序 −
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/\* local = master URL; Word Count = application name; \*/
/\* /usr/local/spark = Spark Home; Nil = jars; Map = environment \*/
/\* Map = variables to work nodes \*/
/\*creating an inputRDD to read text file (in.txt) through Spark context\*/
val input = sc.textFile("in.txt")
/\* Transform the inputRDD into countRDD \*/
valcount = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(\_ + \_)
/\* saveAsTextFile method is an action that effects on the RDD \*/
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
保存上述程序到指定的文件 SparkWordCount.scala 並將其放置在一個用戶定義的目錄名爲 spark-application.
注 − 雖然轉化 inputRDD 成 countRDD 我們使用 flatMap() 用於標記化(從文本文件),行成單詞, map() 方法統計詞頻和 reduceByKey() 方法計算每個單詞的重複。
使用以下步驟來提交應用程序。通過終端在 spark-application目錄中執行所有步驟。
第1步:下載 Spark Ja
Spark需要核心 jar 來編譯,因此,從下面的鏈接下載spark-core_2.10-1.3.0.jar 移動下載 jar 的文件到 spark-application 應用程序目錄。
第2步:編譯程序
使用下面給出的命令編譯上述程序。這個命令應該在spark-application應用程序目錄下執行。這裏,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 採用了 Hadoop 的 jar 支持程序。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
第3步:創建 JAR
使用以下 spark 命令應用程序創建 jar 文件。這裏,wordcount 爲 jar 文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
第4步:提交spark應用
使用以下命令提交 spark 應用 −
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功執行,那麼會發現有下面給出的輸出。在下面輸出的正常用戶識別,這是程序的最後一行。如果仔細閱讀下面的輸出,會發現不同的東西,比如 −
在端口 42954 成功啓動服務 「sparkDriver」
MemoryStore 啓動使用容量267.3 MB
啓動SparkUI在 http://192.168.1.217:4040
添加JAR文件:/home/hadoop/piapplication/count.jar
ResultStage 1 (saveAsTextFile 在 SparkPi.scala:11) finished in 0.566 s
停止 Spark web用戶界面在 http://192.168.1.217:4040
MemoryStore 清理
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
第5步:檢查輸出
成功執行程序後,會發現一個名爲outfile在spark-application應用程序目錄。
下面的命令用於在outfile目錄中打開和檢查文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
part-00000 文件檢查輸出命令 −
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
part-00001文件查看輸出命令 −
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
通過下面的部分更多地瞭解「spark-submit」命令。
Spark-submit 語法
spark-submit [options] <app jar | python file> [app arguments]
選項
下面給出描述選項列表 −
S.No
選項
描述
1
--master
spark://host:port, mesos://host:port, yarn, 或 local.
2
--deploy-mode
無論是在本地啓動驅動程序(client),或在工作人員的機器中的一個集羣內 ("cluster") (默認: client)
3
--class
應用程序的主類(適用於 Java/Scala 的應用程序)
4
--name
應用程序的名稱
5
--jars
以逗號分隔本地 jar 列表包括驅動器和執行者類路徑
6
--packages
逗號分隔 jar 的 Maven 座標系列表,包括驅動器和執行者類路徑
7
--repositories
逗號分隔額外遠程存儲庫列表搜索Maven給定的座標,使用 --packages
8
--py-files
用逗號分隔 .zip,.egg 或.py文件的列表放在Python路徑中的 Python 應用程序
9
--files
逗號分隔放置在每一個執行者的工作目錄中的文件的列表
10
--conf (prop=val)
任意 Spark 配置屬性
11
--properties-file
路徑從一個文件來加載額外屬性。如果沒有指定,這將在 conf/spark-defaults 尋找默認值
12
--driver-memory
存儲驅動程序 (e.g. 1000M, 2G) (默認: 512M)
13
--driver-java-options
額外的Java選項傳遞給驅動程序
14
--driver-library-path
額外的庫路徑條目傳遞給驅動程序
15
--driver-class-path
額外的類路徑條目傳遞給驅動程序
需要注意的是使用 --jars 添加 jar 會自動包含在類路徑中
16
--executor-memory
每個執行者的內存(e.g. 1000M, 2G) (默認: 1G)
17
--proxy-user
用戶在提交申請時模仿
18
--help, -h
顯示此幫助信息並退出
19
--verbose, -v
打印額外的調試輸出
20
--version
打印當前 Spark 版本
21
--driver-cores NUM
核心驅動程序(默認值:1)
22
--supervise
如果給定,重新啓動對故障的驅動程序
23
--kill
如果給定,殺死指定的驅動程序
24
--status
如果給定,請求指定的驅動程序的狀態
25
--total-executor-cores
爲所有執行者的核心總數
26
--executor-cores
每執行者內核的數量。 (默認值:1是YARN模式,或在獨立模式下,工人利用多內核)