MapReduce的基本使用
添加依赖
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> </dependencies>
WordCount计算
经典的WordCount计算,统计如下文本内容中单词的个数
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.
HDFS创建目录
hdfs dfs -mkdir /mapReduce/input
上传到HDFS
hdfs dfs -put mapReduce.txt /mapReduce/input
定义Mapper
/** * Mapper<KEYIN, KEYIN, KEYOUT, VALUEOUT> * KEYIN : K1的类型 行偏移量 LongWritable * KEYIN : V1的类型 一行数据 Text * KEYOUT : K2的类型 每个单词 Text * VALUEOUT : V2的类型 固定值1 LongWritable */public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /** * 将数据切分为 Key-Value(K1和V1), 输入到第二步 * 自定义Map逻辑, 将第一步的结果转换成另外的Key-Value(K2和V2), 输出结果 * <p> * K1 V1 K2 V2 * 0 hello world ===> hello 1 * 11 hello mapReduce world 1 * * @param key K1 * @param value V1 * @param context MapReduce上下文对象 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text text = new Text(); LongWritable longWritable = new LongWritable(); // 对每行数据拆分处理 String row = value.toString(); String[] worlds = row.split(" "); // 对拆分数据转换 for (String world : worlds) { text.set(world); longWritable.set(1); context.write(text, longWritable); } }}
定义Reduce
/** * Reducer<KEYIN, KEYIN, KEYOUT, VALUEOUT> * KEYIN : K2的类型 每个单词 Text * KEYIN : V2的类型 集合中泛型的类型 LongWritable * KEYOUT : K3的类型 每个单词 Text * VALUEOUT : V3的类型 每个单词出现的次数 LongWritable */public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 将新的K2 V2转换为K3 V3 * <p> * 新K2 新V2 K3 V3 * hello <1,1> ===> hello 2 * world <1,1,1> world 3 * * @param key K2 * @param values V2 * @param context MapReduce上下文对象 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; LongWritable longWritable = new LongWritable(); // 遍历集合对每个单词出现次数累加 for (LongWritable value : values) { count += value.get(); } // 写入MapReduce上下文 longWritable.set(count); context.write(key, longWritable); }}
定义Job
方式一:
public class WordCountJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // 创建任务对象 Job job = Job.getInstance(super.getConf(), "mapreduce-test"); //打包到集群运行,必须要添加以下配置,指定程序的main函数// job.setJarByClass(WordCountJob.class); // 设置读取文件的类以及从哪里读取 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node001:9000/mapReduce/input")); // 设置Mapper类 job.setMapperClass(WordCountMapper.class); // 设置Map阶段, K2 V2的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // Shuffle阶段,使用默认方式 job.setReducerClass(WordCountReducer.class); // 设置Reduce类 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置输出类 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("hdfs://node001:9000/mapReduce/output")); // 提交任务 boolean waitForCompletion = job.waitForCompletion(true); return waitForCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new WordCountJob(), args); // 非零状态码表示异常终止 System.exit(run); }}
方式二:
复制core-site.xml
、hdfs-site.xml
、mapred-site.xml
、yarn-site.xml
等文件到项目resources
目录
public class WordCountJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取配置文件 Configuration configuration = new Configuration(true); //本地模式运行 configuration.set("mapreduce.framework.name", "local"); //创建任务 Job job = Job.getInstance(configuration); //设置任务主类 job.setJarByClass(WordCountJob.class); //设置任务 job.setJobName("wordcount-" + System.currentTimeMillis()); //设置Reduce的数量 job.setNumReduceTasks(2); //设置数据的输入路径 FileInputFormat.setInputPaths(job, new Path("/mapReduce/input")); //设置数据的输出路径 FileOutputFormat.setOutputPath(job, new Path("/mapReduce/output_" + System.currentTimeMillis())); //设置Map的输入的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置Reduce的输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置Map和Reduce的处理类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //提交任务 job.waitForCompletion(true); }}
MapReduce的执行方式
Linux端执行方式
1.对项目打包,如:wordcount.jar,并上传Linux服务器
2.再Linux服务器执行:Hadoop jar wordcount.jar xx.xx.xx.WordCountJob
window端本地化执行
1.复制core-site.xml
、hdfs-site.xml
、mapred-site.xml
、yarn-site.xml
等Hadoop配置文件到项目resources
目录
2.configuration.set("mapreduce.framework.name", "local");
在Linux服务端运行
打Jar包运行
hadoop jar wordcount.jar cn.ybzy.mapreduce.WordCountJob
bash-4.1# hadoop jar mapreduce.jar cn.ybzy.mapreduce.WordCountJob 22/02/27 09:05:54 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id22/02/27 09:05:54 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=22/02/27 09:05:55 WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).22/02/27 09:05:55 INFO input.FileInputFormat: Total input paths to process : 122/02/27 09:05:55 INFO mapreduce.JobSubmitter: number of splits:122/02/27 09:05:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1736835367_000122/02/27 09:05:56 INFO mapreduce.Job: The url to track the job: http://node001:8080/22/02/27 09:05:56 INFO mapreduce.Job: Running job: job_local1736835367_000122/02/27 09:05:56 INFO mapred.LocalJobRunner: OutputCommitter set in config null22/02/27 09:05:56 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:56 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter22/02/27 09:05:56 INFO mapred.LocalJobRunner: Waiting for map tasks22/02/27 09:05:56 INFO mapred.LocalJobRunner: Starting task: attempt_local1736835367_0001_m_000000_022/02/27 09:05:56 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:56 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]22/02/27 09:05:56 INFO mapred.MapTask: Processing split: hdfs://node001:9000/mapReduce/input/mapReduce.txt:0+29122/02/27 09:05:56 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)22/02/27 09:05:56 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 10022/02/27 09:05:56 INFO mapred.MapTask: soft limit at 8388608022/02/27 09:05:56 INFO mapred.MapTask: bufstart = 0; bufvoid = 10485760022/02/27 09:05:56 INFO mapred.MapTask: kvstart = 26214396; length = 655360022/02/27 09:05:57 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer22/02/27 09:05:57 INFO mapred.LocalJobRunner: 22/02/27 09:05:57 INFO mapred.MapTask: Starting flush of map output22/02/27 09:05:57 INFO mapred.MapTask: Spilling map output22/02/27 09:05:57 INFO mapred.MapTask: bufstart = 0; bufend = 643; bufvoid = 10485760022/02/27 09:05:57 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214224(104856896); length = 173/655360022/02/27 09:05:57 INFO mapred.MapTask: Finished spill 022/02/27 09:05:57 INFO mapred.Task: Task:attempt_local1736835367_0001_m_000000_0 is done. And is in the process of committing22/02/27 09:05:57 INFO mapred.LocalJobRunner: map22/02/27 09:05:57 INFO mapred.Task: Task 'attempt_local1736835367_0001_m_000000_0' done.22/02/27 09:05:57 INFO mapred.LocalJobRunner: Finishing task: attempt_local1736835367_0001_m_000000_022/02/27 09:05:57 INFO mapred.LocalJobRunner: map task executor complete.22/02/27 09:05:57 INFO mapred.LocalJobRunner: Waiting for reduce tasks22/02/27 09:05:57 INFO mapred.LocalJobRunner: Starting task: attempt_local1736835367_0001_r_000000_022/02/27 09:05:57 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:57 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]22/02/27 09:05:57 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@16e20f4022/02/27 09:05:57 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=333971456, maxSingleShuffleLimit=83492864, mergeThreshold=220421168, ioSortFactor=10, memToMemMergeOutputsThreshold=1022/02/27 09:05:57 INFO reduce.EventFetcher: attempt_local1736835367_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events22/02/27 09:05:57 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1736835367_0001_m_000000_0 decomp: 733 len: 737 to MEMORY22/02/27 09:05:57 INFO reduce.InMemoryMapOutput: Read 733 bytes from map-output for attempt_local1736835367_0001_m_000000_022/02/27 09:05:57 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 733, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->73322/02/27 09:05:57 INFO mapreduce.Job: Job job_local1736835367_0001 running in uber mode : false22/02/27 09:05:57 INFO mapreduce.Job: map 100% reduce 0%22/02/27 09:05:57 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning22/02/27 09:05:57 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:57 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs22/02/27 09:05:57 INFO mapred.Merger: Merging 1 sorted segments22/02/27 09:05:57 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 719 bytes22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merged 1 segments, 733 bytes to disk to satisfy reduce memory limit22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merging 1 files, 737 bytes from disk22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce22/02/27 09:05:57 INFO mapred.Merger: Merging 1 sorted segments22/02/27 09:05:57 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 719 bytes22/02/27 09:05:57 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:57 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords22/02/27 09:05:58 INFO mapred.Task: Task:attempt_local1736835367_0001_r_000000_0 is done. And is in the process of committing22/02/27 09:05:58 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:58 INFO mapred.Task: Task attempt_local1736835367_0001_r_000000_0 is allowed to commit now22/02/27 09:05:58 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1736835367_0001_r_000000_0' to hdfs://node001:9000/mapReduce/output/_temporary/0/task_local1736835367_0001_r_00000022/02/27 09:05:58 INFO mapred.LocalJobRunner: reduce > reduce22/02/27 09:05:58 INFO mapred.Task: Task 'attempt_local1736835367_0001_r_000000_0' done.22/02/27 09:05:58 INFO mapred.LocalJobRunner: Finishing task: attempt_local1736835367_0001_r_000000_022/02/27 09:05:58 INFO mapred.LocalJobRunner: reduce task executor complete.22/02/27 09:05:58 INFO mapreduce.Job: map 100% reduce 100%22/02/27 09:05:58 INFO mapreduce.Job: Job job_local1736835367_0001 completed successfully22/02/27 09:05:58 INFO mapreduce.Job: Counters: 35 File System Counters FILE: Number of bytes read=1864 FILE: Number of bytes written=551289 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=582 HDFS: Number of bytes written=343 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=9 Map output records=44 Map output bytes=643 Map output materialized bytes=737 Input split bytes=120 Combine input records=0 Combine output records=0 Reduce input groups=38 Reduce shuffle bytes=737 Reduce input records=44 Reduce output records=38 Spilled Records=88 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=22 Total committed heap usage (bytes)=488112128 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=291 File Output Format Counters Bytes Written=343bash-4.1#
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.0
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.1
解决运行任务卡住
方案一
网上多说yarn管理的内存资源不够,修改yarn-site.xml
,设置资源大小
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.2
方案二
修改mapred-site.xml
,将
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.3
修改为
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.4
分区Partation
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.5
对上述mapReduce.txt文件中的单词进行统计分区
定义Partitioner
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.6
使用Partitioner
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.7
测试
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.8
vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.9
hdfs dfs -mkdir /mapReduce/input0
hdfs dfs -mkdir /mapReduce/input1
序列化和排序
对文本文件中的数据(字母、数字)排序
数据准备
准备sort.txt
hdfs dfs -mkdir /mapReduce/input2
定义MyPairWritable
hdfs dfs -mkdir /mapReduce/input3
定义SortMapper
hdfs dfs -mkdir /mapReduce/input4
定义SortReduce
hdfs dfs -mkdir /mapReduce/input5
定义Job
hdfs dfs -mkdir /mapReduce/input6
测试
代码打包上传并执行
hdfs dfs -mkdir /mapReduce/input7
hdfs dfs -mkdir /mapReduce/input8
计数器
hadoop内置计数器
通过Context上下文对象
定义计数器,通过context上下文对象获取计数器,进行记录。
通过context上下文对象,使用计数器统计Map阶段读取了多少条数据
hdfs dfs -mkdir /mapReduce/input9
通过Enum枚举
通过enum枚举类型来定义计数器
通过Enum枚举,统计Reduce阶段读取了多少条数据
hdfs dfs -put mapReduce.txt /mapReduce/input0
hdfs dfs -put mapReduce.txt /mapReduce/input1
组合器Combiner
Combiner是对每一个 maptask的输出先做一次合并,减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能
数据准备
对如下文本内容进行单词个数统计
hdfs dfs -put mapReduce.txt /mapReduce/input2
自定义Combiner
自定义Combiner,继承 Reducer,重写 reduce 方法
hdfs dfs -put mapReduce.txt /mapReduce/input3
Job中设置Combiner
hdfs dfs -put mapReduce.txt /mapReduce/input4
测试
未使用规约对单词统计
hdfs dfs -put mapReduce.txt /mapReduce/input5
使用规约对单词统计
hdfs dfs -put mapReduce.txt /mapReduce/input6原文:https://juejin.cn/post/7097121027850764319