Spark編程

Spark 包含兩種不同類型的共享變量 - 一個是廣播變量和第二是累加器

  • 廣播變量 − 採用高效,分發大值

  • 累加器 − 用於聚集特定集合的信息

廣播變量


廣播變量允許程序員保持每臺機器上一個只讀變量緩存,而不是運輸它的一個副本任務。它們可用於,例如,給每一個節點,一個大的輸入數據集的副本,以有效的方式。Spark 也嘗試分發廣播變量來使用高效的廣播算法來降低通信成本。

Spark 操作通過一組階段執行,通過分佈式「洗牌」作業分開。Spark 會自動廣播各階段任務所需的通用數據。

廣播數據緩存到序列化的形式,在運行每個任務之前,反序列化。這意味着顯式地創建廣播變量,當僅是在多個階段的任務需要相同的數據或在反序列化形式緩存數據時非常重要的。

廣播變量從一個變量v通過調用 SparkContext.broadcast(v)來創建。廣播變量是圍繞 v 封裝,其值可以通過調用值的方法來訪問。下面給出的代碼顯示了這一點 -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

輸出 −

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) 

創建廣播變量之後,它應該被用來代替任何函數的值 v 的集羣上運行, v 不運到節點不止一次。此外,對象 v 不應在它的廣播後修飾,以確保所有節點獲得廣播變量的相同的值。

累加器


蓄電池僅是「補充」到通過關聯操作變量,因此可以,可以並行有效的支持。它們可以被用來實現計數器(如在MapReduce)或求和。Spark原生支持累加器的數字類型,程序員可以添加支持新類型。如果累加器使用自定義的一個名稱創建,它將顯示在 Spark 的 UI 中。這對於瞭解運行階段和進度很有用(注 - 這還不支持在Python)。

累加器從初始值v的值是通過調用 SparkContext.accumulator(v) 創建. 在集羣上運行任務可以使用 add 方法或 += 運算符(在 Scala 和Python)來添加它。 然而無法讀取它的值。只有驅動程序可以讀取累加器的值,使用 value 方法。

下面給出的代碼顯示一個累加器,用來相加數組的元素 −

scala> val accum = sc.accumulator(0) scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)

如果想看到的上面的代碼的輸出,可以使用下面的命令 −

scala> accum.value

輸出

res2: Int = 10

數字RDD操作


允許使用預定義的API方法之一做不同數字數據的操作。 Spark 數字運算是與流傳輸算法,允許構建模型,一次一個元素實現。

這些操作被計算,並返回調用一個StatusCounter對象的 status()方法。

下面是在 StatusCounter 可用的數字方法列表。

S.No

方法 & 含義

1

count()

在RDD元素的數量

2

Mean()

在RDD元素的平均值

3

Sum()

在RDD中元素的總和

4

Max()

在RDD中所有元素的最大值

5

Min()

在RDD中所有元素的最小值

6

Variance()

元素的 差異

7

Stdev()

元素的標準差

如果你只想要這些方法之一來使用,可以直接調用 RDD 相應的方法。