RDD操作
RDD提供兩種類型的操作:
- 轉換
- 行動
轉換
在Spark中,轉換的作用是從現有數據集創建新數據集。轉換是惰性的,因爲它們僅在動作需要將結果返回到驅動程序時才計算。
下面來看看一些常用的RDD轉換。
-
map(func)
- 它返回一個新的分佈式數據集, 該數據集是通過函數func
傳遞源的每個元素而形成的。 -
filter(func)
- 它返回一個新數據集, 該數據集是通過選擇函數func
返回true
的源元素而形成的。 -
flatMap(func)
- 這裏,每個輸入項可以映射到零個或多個輸出項, 因此函數func
應該返回序列而不是單個項。 -
mapPartitions(func)
- 它類似於map,但是在RDD的每個分區(塊)上單獨運行, 因此當在類型T的RDD上運行時,func
必須是Iterator <T> => Iterator <U>
類型。 -
mapPartitionsWithIndex(func)
- 它類似於mapPartitions
,它爲func
提供了一個表示分區索引的整數值,因此當在類型T的RDD上運行時,func
必須是類型(Int,Iterator <T>)=> Iterator <U>
。 -
sample(withReplacement, fraction, seed)
- 它使用給定的隨機數生成器種子對數據的分數部分進行採樣,有或沒有替換。 -
union(otherDataset)
- 它返回一個新數據集,其中包含源數據集和參數中元素的並集。 -
intersection(otherDataset)
- 它返回一個新的RDD,其中包含源數據集和參數中的元素的交集。 -
distinct([numPartitions]))
- 它返回一個新數據集,其中包含源數據集的不同元素。 -
groupByKey([numPartitions])
- 當在(K,V)
對的數據集上調用時,它返回(K,Iterable)
對的數據集。 -
reduceByKey(func, [numPartitions])
- 當調用(K,V)
對的數據集時,返回(K,V)
對的數據集,其中使用給定的reduce
函數func
聚合每個鍵的值,該函數必須是類型(V,V)=>V
。 -
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
- 當調用(K,V)
對的數據集時,返回(K,U)
對的數據集,其中使用給定的組合函數和中性「零」值聚合每個鍵的值。 -
sortByKey([ascending], [numPartitions])
- 它返回按鍵按升序或降序排序的鍵值對的數據集,如在布爾ascending
參數中所指定。 -
join(otherDataset, [numPartitions])
-當調用類型(K,V)
和(K,W)
的數據集時,返回(K,(V,W))
對的數據集以及每個鍵的所有元素對。通過leftOuterJoin
,rightOuterJoin
和fullOuterJoin
支持外連接。 -
cogroup(otherDataset, [numPartitions])
-當調用類型(K,V)
和(K,W)
的數據集時,返回(K,(Iterable,Iterable))
元組的數據集。此操作也稱爲groupWith
。 -
cartesian(otherDataset)
-當調用類型爲T和U的數據集時,返回(T,U)
對的數據集(所有元素對)。 -
pipe(command, [envVars])
-通過shell命令管道RDD的每個分區,例如, 一個Perl或bash腳本。 -
coalesce(numPartitions)
-它將RDD中的分區數減少到numPartitions
。 -
repartition(numPartitions)
-它隨機重新調整RDD中的數據,以創建更多或更少的分區,並在它們之間進行平衡。 -
repartitionAndSortWithinPartitions(partitioner)
- 它根據給定的分區器對RDD進行重新分區,並在每個生成的分區中鍵對記錄進行排序。
操作
在Spark中,操作的作用是在對數據集運行計算後將值返回給驅動程序。
下面來看看一些常用的RDD操作。
操作
描述
reduce(func)
它使用函數func(它接受兩個參數並返回一個)來聚合數據集的元素。該函數應該是可交換的和關聯的,以便可以並行正確計算。
collect()
它將數據集的所有元素作爲數組返回到驅動程序中。在過濾器或其他返回足夠小的數據子集的操作之後,這通常很有用。
count()
它返回數據集中的元素數。
first()
它返回數據集的第一個元素(類似於take(1)
)。
take(n)
它返回一個包含數據集的前n個元素的數組。
takeSample(withReplacement, num, [seed])
它返回一個數組,其中包含數據集的num個元素的隨機樣本,有或沒有替換,可選地預先指定隨機數生成器種子。
takeOrdered(n, [ordering])
它使用自然順序或自定義比較器返回RDD的前n個元素。
saveAsTextFile(path)
它用於將數據集的元素作爲文本文件(或文本文件集)寫入本地文件系統,HDFS或任何其他Hadoop支持的文件系統的給定目錄中。
saveAsSequenceFile(path)
它用於在本地文件系統,HDFS或任何其他Hadoop支持的文件系統中的給定路徑中將數據集的元素編寫爲Hadoop SequenceFile。
saveAsObjectFile(path)
它用於使用Java序列化以簡單格式編寫數據集的元素,然後可以使用SparkContext.objectFile()
加載。
countByKey()
它僅適用於類型(K,V)的RDD。因此,它返回(K,Int)對的散列映射與每個鍵的計數。
foreach(func)
它在數據集的每個元素上運行函數func
以獲得副作用,例如更新累加器或與外部存儲系統交互。