理解 MapReducer
本教程中的代碼分爲 3 個部分:
解釋 SalesMapper 類
解釋 SalesCountryReducer 類
解釋 SalesCountryDriver 類
SalesMapper類的說明
在本節中,我們將瞭解 SalesMapper 類的實現。
我們首先指定類的包名稱。 SalesCountry 就是這個示例中使用的包名。請注意編譯的輸出,SalesMapper.class 將進入目錄並命名這個軟件包名稱:SalesCountry.
其次,我們導入庫軟件包。
以下快照顯示實現 SalesMapper 類
代碼解釋:
1. SalesMapper 類定義
public class SalesMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {...}
每一個 mapper 類必須從 MapReduceBase 類進行擴展,它必須實現 Mapper 接口。
2. 定義 'map' 函數
1
2
3
4
publicvoidmap(LongWritable key,
Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throwsIOException
Mapper類的主要部分是接受四個參數的 「map()」 方法。
每次調用 'map()' 方法, 一個鍵值 key-value 對 ('key' 和 'value' 在代碼裏) 被傳遞。
'map()' 方法開始被接受拆分輸入文本作爲一個參數,並使用分詞來拆分這些行成詞。
1
2
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
這裏,「,」 被用作分隔符。
在這之後,使用記錄在數組 'SingleCountryData' 中的第七索引,其值爲 '1'.
output.collect(new Text(SingleCountryData[7]), one);
我們在選擇第7索引記錄,因爲我們需要的國家數據,它位於數組 'SingleCountryData' 的第七索引。
請注意,我們輸入的數據是下面的格式 (Country 在索引的位置爲:7, 0 是開始的索引)-
Transaction_date,Product,Price,Payment_Type,Name,City,State,Country,Account_Created,Last_Login,Latitude,Longitude
mapper的輸出使用的是 'OutputCollector' 的 'collect()' 方法的鍵值對.
SalesCountryReducer 類的說明
在本節中,我們將瞭解 SalesCountryReducer 類的實現。
1. 我們首先爲類指定包的名稱。SalesCountry 是包的名稱。請注意編譯的輸出, SalesCountryReducer.class 將進入命名這個軟件包名稱目錄: SalesCountry.
其次,我們導入庫軟件包。
以下快照顯示實現 SalesCountryReducer 類
代碼解釋:
1. SalesCountryReducer 類定義 -
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
此處,前兩個數據類型, 'Text' 和 'IntWritable' 是輸入鍵值的數據類型到reducer。
映射器的輸出的形式<CountryName1, 1>, <CountryName2, 1>.映射器的輸出被輸入到reducer。所以,以配合其數據類型, Text 和 IntWritable 數據在這裏輸入被使用。
最後兩個數據類型,'Text' 和 'IntWritable' 是由 reducer 的鍵 - 值對的形式生成的輸出的數據類型。
每個 reducer 類必須從MapReduceBase類進行擴展,它必須實現 Reducer 接口。
2. Defining 'reduce' function-
1
2
3
publicvoidreduce( Text t_key,
Iterator
Reporter reporter) throwsIOException {
輸入到 reduce() 方法是在具有多個值的列表中選擇一個鍵。
例如,在我們的示例中,這將是 -
<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>,<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>.
這賦予 reducer 作爲 <United Arab Emirates, {1,1,1,1,1,1}>
因此,接受這種形式參數,前兩個數據類型的使用,即 Text 和 Iterator
接下來的參數的類型是 OutputCollector<Text,IntWritable> 它收集 reducer 階段的輸出。
reduce() 方法開始通過複製鍵值和初始化頻率計數爲0。
Text key = t_key;
int frequencyForCountry = 0;
然後,使用 「while」 循環,我們通過與鍵關聯的值列表循環,並通過總結所有計算的值。
1
2
3
4
5
6
while(values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
現在,結果中的鍵得到的頻率計數輸出到收集器。
下面的代碼執行這個 -
output.collect(key, new IntWritable(frequencyForCountry));
SalesCountryDriver類的說明
在本節中,我們將瞭解 SalesCountryDriver 類實現。
1. 我們首先爲類指定包的名稱。 SalesCountry 是這裏使用的包名。請注意編譯的輸出, SalesCountryDriver.class 將進入命名這個包名稱的目錄: SalesCountry.
這裏一行指定是包名稱後面的代碼是導入庫軟件包。
2. 定義一個用於創建一個新的客戶端工作,配置 Mapper及Reducer 類對象驅動程序類。
該驅動程序類負責設置我們的 MapReduce 作業在 Hadoop 運行。 在這個類中,我們指定作業名稱,輸入/輸出,mapper 和 reducer 類名稱的數據類型。
3. 在下面的代碼片段中,我們設置這是用來輸入數據集消費和生產輸出,分別輸入和輸出目錄。
arg[0] 和 arg[1] 是通過 MapReduce 的實際操作,也就是賦予在命令行參數執行命令,
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. 觸發我們的作業
下面的代碼開始執行 MapReduce 作業
try{
// Run the job
JobClient.runJob(job_conf);
} catch(Exception e) {
e.printStackTrace();
}