IDEA常用插件 Mybatis源码分析(1) - Mybatis包目录简介 tair 1:tair学习 dataflow论文阅读笔记 Polysh的安装使用 SnappyData排序函数比较 Squirrel-sql客户端连接SnappyData手册 在虚拟机里显示Hello World spark学习博客推荐 SnappyData学习博客和官网文章 Docker常用命令 MyBatis支持的OGNL语法 mysql性能优化 mysql性能优化-优化Sql语句 java各版本新特性 mac上命令行操作 explain输出格式 从文件中读取zk配置——ZooKeeper编程技能(1) git进阶经验-从项目中删除移除的目录 Mongodb 学习之shell命令操作(3) mysql命令 git进阶经验-从多模块项目中分理子模块 从零学hadoop-搭建基础(单点)的Hdfs环境 ZooKeeper集群操作脚本 Vue安装使用 2 初学JVM之问答式记住类加载机制 2 初学JVM之问答式记住虚拟机性能监控与故障处理工具 2 初学JVM之问答式记住垃圾收集器 log4j2 按天生成日志文件 1 初学JVM之问答式记住java自动内存管理 MapReduce学习心得之MapReduce初识 log4j2 日志发送到kafka配置实战 log4j2 日志配置实战 Mongodb 学习之shell命令操作(二) Mongodb 学习之linux版本安装(一) Dubbo的初级使用 ServiceLoader内部实现分析 ServiceLoader 初级应用 log4j日志发送邮件配置实战 红黑树笔记 IDEA首次使用之前的配置 java源码学习之Enum java源码学习之String 自定义Spring tag标签 编写一键发布脚本 记一次Spring Scheduler莫名不执行的一次堆栈分析 kafka的基本操作 nginx 5:Nginx内部变量 nginx 4:Nginx日志管理 提高hadoop配置效率的shell脚本 Hive编程指南之一 Hive的安装 Ambari服务器安装 Ambari服务器管理集群 HBase分布式安装 windows下Eclipse远程调试运行MR程序 基于MapReduce新的API的编程Demo-wordCount window下Eclipse远程只读HDFS上的文件 YARN上节点标签 编写第一个MapReduce的wordcount程序 NodeManager的重启 搭建JStorm集群 YARN上的web应用代理 YARN上的ResourceManager的高可用方案 配置vmware中的虚拟机使用宿主机的共享网络 YARN架构简述 HDFS 架构 Spring的统一异常处理机制 Tomcat 配置服务 HDFS的viewfs指南 HDFS的Federation之路 HDFS基于QJM的HA之路 nginx 3:Nginx反向代理 mybatis操作主体流程 1.正则表达式学习-基础篇 log4j日志配置详解 mysql的时间函数 nginx 2:Nginx模块配置理论及实战 HashMap相关解析和测试文章 工作一年后的面试 用私有构造器或枚举类型强化Singleton属性 java中比较重要的图 mybatis处理枚举类 mybatis集成进spring Spring比较重要的几个截图 21.hadoop-2.7.2官网文档翻译-使用NFS的HDFS高可用性 20.hadoop-2.7.2官网文档翻译-使用仲裁日志管理器的HDFS高可用性 markdown在jekyll中支持的一些操作 Spring项目中配置sl4j和log4j的日志配置 19.hadoop-2.7.2官网文档翻译-HDFS命令指南 Spring的profile机制介绍 mybatis-generator反向生成 18.hadoop-2.7.2官网文档翻译-HDFS用户指南 17.hadoop-2.7.2官网文档翻译-实现Hadoop中Dapper-like追踪 16.hadoop-2.7.2官网文档翻译-Hadoop的KMS(key 管理服务器)-文档集 15.hadoop-2.7.2官网文档翻译-Hadoop的http web认证 14.hadoop-2.7.2官网文档翻译-服务级别的授权指南 13.hadoop-2.7.2官网文档翻译-安全模式中的Hadoop 09.hadoop-2.7.2官网文档翻译-Hadoop命令行微型集群 12.hadoop-2.7.2官网文档翻译-机架感知 11.hadoop-2.7.2官网文档翻译-代理用户-超级用户对其他用户的代表 10.hadoop-2.7.2官网文档翻译-原生库指南 08.hadoop-2.7.2官网文档翻译-文件系统规范 07.hadoop-2.7.2官网文档翻译-Hadoop接口类别 (转)浅析 Decorator 模式,兼谈 CDI Decorator 注解 06.hadoop-2.7.2官网文档翻译-Hadoop的兼容性 05.hadoop-2.7.2官网文档翻译-文件系统命令 04.hadoop-2.7.2官网文档翻译-Hadoop命令指南 03.hadoop-2.7.2官网文档翻译-集群安装 02.hadoop-2.7.2官网文档翻译-单节点集群安装 01.hadoop-2.7.2官网文档翻译-概述 Http 协议相应状态码大全及常用状态码 IDEA快捷键 JDBC Type与Java Type redis 12:redis 操作集合 mybatis-generator错误集合 redis 11:redis 错误集合 nginx 1:nginx的安装 redis 10:redis cluster命令操作 redis 9:redis实例集群安装 java设计模式 hadoop集群学习笔记(1) Apache Shiro 简介 vim编辑神器的进阶命令 Eclipse配置 Eclipse快捷键 Linux 测试题 Linux脚本学习(1) Linux启动简要过程 Centos7上安装Mysql hadoop集群学习笔记(1) (转)分布式发布订阅消息系统 Kafka 架构设计 maven 命令 Kafka集群安装 Kafka初步使用 redis 8:redis server 和 scripting命令操作 redis 7:redis transaction 和 connection命令操作 redis 6:redis hash 命令操作 redis 5:redis sorted_set 命令操作 搭建本地Jekyll+Markdown+Github的开发环境 Spring源码阅读笔记(2) redis 4:redis set命令操作 Spring添加任务调度配置 redis 3:Redis list命令操作 redis 2:redis 一般命令操作 redis 1:redis单机安装笔记 redis 0:redis配置属性描述 Spring源码阅读笔记(1) spark 错误集锦 spark集群安装 Linux 基本命令操作 Hadoop错误信息处理 Hadoop代码拾忆 从零开始搭建spring-springmvc-mybatis-mysql和dubbo项目 java知识点札记 java排错 Google Java Style 中文版 git进阶经验 github使用经验 MongoDB用户角色授权与AUTH启用 MongoDB 命令 MongoDB 特定规范 Spring MVC实现跳转的几种方式 史上最全最强SpringMVC详细示例实战教程 Spring 零星笔记 js中(function(){…})()立即执行函数写法理解 如何解决跨域问题 创建ajax简单过程 前端定位 设置MYSQL允许通过IP访问 mybatis异常 :元素内容必须由格式正确的字符数据或标记组成 如何为 WordPress 绑定多个域名的方法s WordPress工作原理之程序文件执行顺序(传说中的架构源码分析) Spring源码导入Eclipse中 基于PHPnow搭建Eclipse开发环境 解决wordpress首页文章内容截断处理的几种方法 ZooKeeper理论知识 ZooKeeper集群安装配置 Git常用命令速查表 Linux 4:磁盘与文件系统管理 Linux 3:文件与目录管理 Linux 2:文件权限与目录配置 Markdown输入LaTeX数学公式
从零学hadoop-搭建基础(单点)的Hdfs环境 MapReduce学习心得之MapReduce初识 Ambari服务器安装 Ambari服务器管理集群 windows下Eclipse远程调试运行MR程序 基于MapReduce新的API的编程Demo-wordCount window下Eclipse远程只读HDFS上的文件 YARN上节点标签 编写第一个MapReduce的wordcount程序 NodeManager的重启 YARN上的web应用代理 YARN上的ResourceManager的高可用方案 YARN架构简述 HDFS 架构 HDFS的viewfs指南 HDFS的Federation之路 HDFS基于QJM的HA之路 21.hadoop-2.7.2官网文档翻译-使用NFS的HDFS高可用性 20.hadoop-2.7.2官网文档翻译-使用仲裁日志管理器的HDFS高可用性 19.hadoop-2.7.2官网文档翻译-HDFS命令指南 18.hadoop-2.7.2官网文档翻译-HDFS用户指南 17.hadoop-2.7.2官网文档翻译-实现Hadoop中Dapper-like追踪 16.hadoop-2.7.2官网文档翻译-Hadoop的KMS(key 管理服务器)-文档集 15.hadoop-2.7.2官网文档翻译-Hadoop的http web认证 14.hadoop-2.7.2官网文档翻译-服务级别的授权指南 13.hadoop-2.7.2官网文档翻译-安全模式中的Hadoop 09.hadoop-2.7.2官网文档翻译-Hadoop命令行微型集群 12.hadoop-2.7.2官网文档翻译-机架感知 11.hadoop-2.7.2官网文档翻译-代理用户-超级用户对其他用户的代表 10.hadoop-2.7.2官网文档翻译-原生库指南 08.hadoop-2.7.2官网文档翻译-文件系统规范 07.hadoop-2.7.2官网文档翻译-Hadoop接口类别 06.hadoop-2.7.2官网文档翻译-Hadoop的兼容性 05.hadoop-2.7.2官网文档翻译-文件系统命令 04.hadoop-2.7.2官网文档翻译-Hadoop命令指南 03.hadoop-2.7.2官网文档翻译-集群安装 02.hadoop-2.7.2官网文档翻译-单节点集群安装 01.hadoop-2.7.2官网文档翻译-概述 hadoop集群学习笔记(1) hadoop集群学习笔记(1) Hadoop错误信息处理 Hadoop代码拾忆

基于MapReduce新的API的编程Demo-wordCount

2016年12月14日
摘要:该MapReduce是基于MR的新API开发,基于hadoop2.7环境运行

概述

进行MapReduce开发暂且认为分为8步,依次为:

  1. 获取输入输出路径参数
  2. 删除已经存在输出目录
  3. 根据系统类型进行configuration的配置
  4. 获取Job实例
  5. 为Job配置Jar,MapperClass,CombinerClas,ReducerClass
  6. 为Job配置输出的key和value类型
  7. 设置文件格式化
  8. 提交Job并等待完成

准备环境

  • hadoop2.7.2 集群环境(三个节点,h2m1,h2s1,h2s2)
  • jdk 1.7.0_75版本
  • centos6.5系统

该MR代码支持输入源为多个文件或多个目录,不可以文件和目录混合作为输入源

搭建程序

使用eclipse新建maven程序,开发在window环境,运行在linux环境

在maven的pom.xml文件中配置

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <artifactId>brief-hadoop-demo</artifactId>
  <properties>
  	<hadoop.version>2.7.2</hadoop.version>
  </properties>
  <dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- 测试 -->
    <dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
	</dependency>
	
    <!-- 日志 -->
	 <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
   	</dependency>
   	<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
   	</dependency>
   	<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
   	</dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </dependency>
  </dependencies>
</project>

新建三个类:WordCountV2.java,WordCountMapperV2.java,WordCountReduceV2.java,HDFSOper.java

四个类的内容:

WordCountV2.java


package cn.followtry.hadoop.demo.v2.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import cn.followtry.hadoop.demo.hdfs.HDFSOper;

/**
 * 
 *  brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountV2
 * @author 
 *		jingzz 
 * @since 
 *		2016年12月14日 上午10:03:48
 */
public class WordCountV2 {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs == null || otherArgs.length < 2) {
			System.out.println("用法:\n"
					+ "     至少需要两个参数,最后一个为输出目录,其他为输入文件路径");
			System.exit(-1);
		}
		StringBuilder inputPaths = new StringBuilder();
		String outpathDir;
		int len = otherArgs.length - 1;
		for (int i = 0; i < len; i++) {
			inputPaths.append(otherArgs[i]);
			if (i < len - 1) {
				inputPaths.append(",");
			}
		}
		outpathDir = otherArgs[len];
		//检查输出目录是否存在,存在则直接删除目录
		HDFSOper.rmExistsOutputDir(outpathDir);
		
		Job job = Job.getInstance(conf, "wordCount v2 demo");
		
		job.setJarByClass(WordCountV2.class);
		
		job.setMapperClass(WordCountMapV2.class);
		job.setCombinerClass(WordCountReduceV2.class);
		job.setReducerClass(WordCountReduceV2.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.setInputPaths(job, inputPaths.toString());
		FileOutputFormat.setOutputPath(job, new Path(outpathDir));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

WordCountMapperV2.java文件

package cn.followtry.hadoop.demo.v2.mr;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 
 *  brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountMapV2
 * @author 
 *		jingzz 
 * @since 
 *		2016年12月14日 上午10:25:07
 */
public class WordCountMapV2 extends Mapper<LongWritable, Text, Text, IntWritable>{

	private static final int ONE = 1;
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (StringUtils.isNotEmpty(line)) {
			String[] words = line.split(" ");
			for (String word : words) {
				context.write(new Text(word), new IntWritable(ONE));
			}
		}
	}
}

WordCountReduceV2.java文件

package cn.followtry.hadoop.demo.v2.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 *  brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountReduceV2
 * @author 
 *		jingzz 
 * @since 
 *		2016年12月14日 上午10:25:28
 */
public class WordCountReduceV2 extends Reducer<Text, IntWritable, Text, IntWritable> {

	private static final Logger LOGGER = LoggerFactory.getLogger(WordCountReduceV2.class);
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		int count = 0;
		for (IntWritable intValue : values) {
			count += intValue.get();
		}
		LOGGER.info("统计{}的次数为{}", key, count);
		context.write(key, new IntWritable(count));
	}

}

HDFSOper.java文件

package cn.followtry.hadoop.demo.hdfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 *  brief-hadoop-demo/cn.followtry.hadoop.demo.hdfs.HDFSOper
 * @author 
 *		jingzz 
 * @since 
 *		2016年12月14日 上午10:26:23
 */
public class HDFSOper {

	private static final Logger LOGGER = LoggerFactory.getLogger(HDFSOper.class);

	/**
	 * 删除指定的目录
	 * @author jingzz
	 * @param outpathDir 指定要删除的目录路径
	 * @return 删除返回true,不存在或没有删除返回false
	 * @throws FileNotFoundException 文件未找到异常
	 * @throws IOException IO异常
	 */
	public static boolean rmExistsOutputDir(String outpathDir) throws FileNotFoundException, IOException {
		boolean hasDel = false;
		// 将本地文件上传到hdfs。
		Configuration config = new Configuration();
		FileSystem fs = FileSystem.get(URI.create("webhdfs://h2m1:50070"), config);
		Path output = new Path(outpathDir);
		if (hasDel = fs.exists(output)) {
			LOGGER.info("目录{}已经存在,正在删除...", outpathDir);
			System.out.println("目录" + outpathDir + "已经存在,正在删除...");
			if (hasDel = fs.delete(output, true)) {
				System.out.println("目录" + outpathDir + "已经删除");
				LOGGER.info("目录{}已经删除", outpathDir);
			} else {
				System.out.println("目录" + outpathDir + "删除失败");
				LOGGER.info("目录{}删除失败", outpathDir);

			}
		} else {
			System.out.println("目录" + outpathDir + "不存在");
			LOGGER.info("目录{}不存在", outpathDir);
		}
		return hasDel;
	}
}

打包发布

打包

项目(右键) –> Export –> java(jar file) –> next –> jar file(browse,指定输出位置) –> finish。

上传到hadoop linux服务器

创建并将输入文件上传到hdfs

比如:

输入文件file1.txt内容如下:

hello world
hello world
hello world2
hello world2
hello world3
hello world4
hello world5
hello world5
hello world5
hello world6
hello world7
hello world8
hello world8

执行hdfs dfs -put -f file1.txt /user/root/input/file1.txt命令,上传输入文件

执行

hadoop jar wordcount.jar cn.followtry.hadoop.demo.v2.mr.WordCountV2 /user/root/input/file1.txt /user/root/output/ 或者 hadoop jar wordcount.jar cn.followtry.hadoop.demo.v2.mr.WordCountV2 viewfs://hadoop-cluster-jingzz/user/root/input/file1.txt /user/root/output/

输入为全路径,hadoop-cluster-jingzz为RM的集群名称。

部分执行日志显示:

16/12/13 18:10:07 INFO hdfs.HDFSOper: 目录/user/root/output已经存在,正在删除...
目录/user/root/output已经存在,正在删除...
目录/user/root/output已经删除
16/12/13 18:10:07 INFO hdfs.HDFSOper: 目录/user/root/output已经删除
16/12/13 18:10:08 INFO input.FileInputFormat: Total input paths to process : 2
16/12/13 18:10:08 INFO mapreduce.JobSubmitter: number of splits:2
16/12/13 18:10:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1481615539888_0016
16/12/13 18:10:09 INFO impl.YarnClientImpl: Submitted application application_1481615539888_0016
16/12/13 18:10:09 INFO mapreduce.Job: The url to track the job: http://h2m1:8088/proxy/application_1481615539888_0016/
16/12/13 18:10:09 INFO mapreduce.Job: Running job: job_1481615539888_0016
16/12/13 18:10:18 INFO mapreduce.Job: Job job_1481615539888_0016 running in uber mode : false
16/12/13 18:10:18 INFO mapreduce.Job:  map 0% reduce 0%
16/12/13 18:10:28 INFO mapreduce.Job:  map 100% reduce 0%
16/12/13 18:10:36 INFO mapreduce.Job:  map 100% reduce 100%
16/12/13 18:10:36 INFO mapreduce.Job: Job job_1481615539888_0016 completed successfully
16/12/13 18:10:36 INFO mapreduce.Job: Counters: 54
        File System Counters
                FILE: Number of bytes read=160
                FILE: Number of bytes written=356056
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=436
                HDFS: Number of bytes written=99
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
                VIEWFS: Number of bytes read=0
                VIEWFS: Number of bytes written=0
                VIEWFS: Number of read operations=0
                VIEWFS: Number of large read operations=0
                VIEWFS: Number of write operations=0
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=13091
                Total time spent by all reduces in occupied slots (ms)=5876
                Total time spent by all map tasks (ms)=13091
                Total time spent by all reduce tasks (ms)=5876
                Total vcore-seconds taken by all map tasks=13091
                Total vcore-seconds taken by all reduce tasks=5876
                Total megabyte-seconds taken by all map tasks=13405184
                Total megabyte-seconds taken by all reduce tasks=6017024
        Map-Reduce Framework
                Map input records=15
                Map output records=30
                Map output bytes=314
                Map output materialized bytes=166
                Input split bytes=242
                Combine input records=30
                Combine output records=12
                Reduce input groups=11
                Reduce shuffle bytes=166
                Reduce input records=12
                Reduce output records=11
                Spilled Records=24
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=268
                CPU time spent (ms)=2090
                Physical memory (bytes) snapshot=520069120
                Virtual memory (bytes) snapshot=2547580928
                Total committed heap usage (bytes)=281157632
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=0
        File Output Format Counters 
                Bytes Written=0

执行命令:hf -cat /user/root/output/part-r-00000

显示执行结果:

hello   13
world   2
world2  2
world3  1
world4  1
world5  3
world6  1
world7  1
world8  2