本文源码码云地址:https://gitee.com/MaxBill/hadoop

在上篇《hadoop(02)、使用JavaApi对HDFS进行基本操作》中,通过JavaApi连接HDFS系统进行了基本的操作实践,本文将使用Hadoop的Map/Reduce框架进行简单的实践操作。

一、MapReduce框架

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

一个Map/Reduce 作业(job)通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

注:以上Hadoop Map/Reduce摘自hadoop官方介绍,地址:http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html

二、环境准备

1.windows下hadoop开发环境:参见《hadoop(01)、Windows平台下Hadoop环境搭建

2.IDEA 开发编辑器

3.下载一个部小说(本文使用著名小说:三国演义)

4.上一篇中的项目基础,码云地址:本文源码码云地址:https://gitee.com/MaxBill/hadoop

三、开发编码

1.启动hdfs服务

2.编写WordCount程序(它可以计算出指定数据集中指定单词出现的次数)

官方的例子是统计单词的,比较简单,本文则使用分词器对三国演义的指定词频进行统计。在上篇《hadoop(02)、使用JavaApi对HDFS进行基本操作》的基础上,打开项目,在pom文件中添加分词的依赖:

1
2
3
4
5
<dependency>
         <groupId>cn.bestwu</groupId>
        <artifactId>ik-analyzers</artifactId>
       <version>5.1.0</version>
</dependency>

3.开始词频统计编码
主要有以下几个步骤:
<1>.上传三国演义小说(分词数据集)到HDFS中

<2>.编写统计词频代码

<3>.添加分词器

<4>.统计指定的词频
词频统计及测试主类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.maxbill.hadoop.reduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

import java.util.UUID;

/**
* @功能
* @作者 MaxBill
* @日期 2017/11/17
* @时间 14:39
* @备注 WordCountV1
*/
public class WordCountV1 {

private final static String userPath = "/user/Administrator/";

/**
* @功能 单词统计任务
* @作者 MaxBill
* @日期 2017/11/16
* @时间 12:12
*/
public static void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
JobConf jobConf = JobsUtils.getJobsConf(jobName);
FileInputFormat.setInputPaths(jobConf, new Path(inputPath));
FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
JobClient.runJob(jobConf);
}

/**
* @功能 主类测试
* @作者 MaxBill
* @日期 2017/11/17
*/
public static void main(String[] args) throws Exception {
String inputPath = userPath + "input/";
String outputPath = userPath + "output/" + UUID.randomUUID().toString().toUpperCase();
//1.创建输入输出目录
//HdfsUtils.mkdir(inputPath);
//2.上传三国演义到Administrator目录下
//HdfsUtils.uploadFile("D:\\\sgyy.txt", inputPath);
//3.调用统计任务
wordCount("wordCountV1", inputPath, outputPath);
}

}

Hadoop Map/Reduce操作工具类及作业配置代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.maxbill.hadoop.reduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

/**
* @功能 Hadoop Map/Reduce操作工具类
* @作者 MaxBill
* @日期 2017/11/16
* @时间 12:12
*/
public class JobsUtils {

private final static String hdfsPath = "hdfs://127.0.0.1:10000";
private final static String jobsPath = "hdfs://127.0.0.1:20000";

/**
* @功能 获取HDFS的配置信息
* @作者 MaxBill
* @日期 2017/11/16
* @时间 12:12
*/
public static Configuration getConfig() {
Configuration config = new Configuration();
config.set("fs.default.name", hdfsPath);
config.set("mapred.job.tracker", jobsPath);
return config;
}

/**
* @功能 获取HDFS的job配置信息
* @作者 MaxBill
* @日期 2017/11/16
* @时间 12:12
*/
public static JobConf getJobsConf(String jobName) {
JobConf jobConf = new JobConf(getConfig());
jobConf.setJobName(jobName);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(MyMap.class);
jobConf.setCombinerClass(MyReduce.class);
jobConf.setReducerClass(MyReduce.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
return jobConf;
}

}

MAP中使用分词器代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.maxbill.hadoop.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.*;

/**
* @功能
* @作者 MaxBill
* @日期 2017/11/17
*/
public class MyMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

/**
* @作者 MaxBill
* @日期 2017/11/17
* @时间 14:46
*/
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//未使用分词器
//String line = value.toString();
//StringTokenizer tokenizer = new StringTokenizer(line);
// hile (tokenizer.hasMoreTokens()) {
//word.set(tokenizer.nextToken());
//output.collect(word, one);
//}
//使用分词器
byte\[\] btValue = value.getBytes();
InputStream ip = new ByteArrayInputStream(btValue);
Reader reader = new InputStreamReader(ip);
IKSegmenter iks = new IKSegmenter(reader, true);
Lexeme lexeme;
while ((lexeme = iks.next()) != null) {
//打印全部分词
//System.err.println(lexeme.getLexemeText());
word.set(lexeme.getLexemeText());
output.collect(word, one);
}
}

}

过滤指定词频代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.maxbill.hadoop.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* @功能
* @作者 MaxBill
* @日期 2017/11/17
* @时间 14:46
* @备注 Reduce
*/
public class MyReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

List<String> textList = null;

public MyReduce() {
textList = new ArrayList<>();
textList.add("孙权");
textList.add("姜维");
}

/**
* @作者 MaxBill
* @日期 2017/11/17
* @时间 14:46
*/
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
String keyStr = new String(key.toString());
boolean isHas = textList.contains(keyStr);
if (isHas) {
System.out.println(">>>>>" + keyStr + " [" + sum + "]");
}
}

}

4.运行测试主类

从运行测试记过可以看到,已经统计出孙权和姜维在文中出现的次数。

四、本文总结

本文通过使用一些Map/Reduce框架提供的功能,实现的是的利用hadoop hdfs存储一个数据集,然后使用Hadoop Map/Reduce框架对数据进行简单的分析处理的一个小实践。在实践过程中也遇到了许多的问题,在查阅官方文档和网上资料中都一一解决了,对于Map/Reduce还有很多的内容,将在以后的内容中慢慢的学习补充。如有遗漏或者错误,欢迎提出!