Etl目标
解析我们收集的日志数据,将解析后的数据保存到hbase中。这里选择hbase来存储数据的主要原因就是:
hbase的宽表结构设计适合我们的这样多种数据格式的数据存储(不同event有不同的存储格式)。
在etl过程中,我们需要将我们收集得到的数据进行处理,包括ip地址解析、userAgent解析、服务器时间解析等。
在我们本次项目中ip解析采用的是纯真ip数据库,官网是http://www.cz88.net/
另外:ip解析可以采用淘宝提供的ip接口来进行解析
地址:http://ip.taobao.com/
接口:http://ip.taobao.com/service/getIpInfo.php?ip=[ip地址字串]
ETL存储
etl的结果存储到hbase中,由于考虑到不同事件有不同的数据格式,所以我们将最终etl的结果保存到hbase中,我们使用单family的数据格式,rowkey的生产模式我们采用timestamp+uuid.crc编码的方式。hbase创建命令:create 'event_logs', 'info'
步骤如下:
1. 修改pom文件,添加hadoop和hbase依赖
<dependencies> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>cz.mallat.uasparser</groupId> <artifactId>uasparser</artifactId> <version>0.6.2</version> </dependency> <!-- hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.7.2</version> </dependency> <!-- hadoop end --> <!-- hbase start --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client </artifactId> <version>1.1.3</version> </dependency> <!-- hbase end --> <!-- mysql start --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.18</version> </dependency> <!-- mysql end --> <dependency> <groupId>cz.mallat.uasparser</groupId> <artifactId>uasparser</artifactId> <version>0.6.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.cloudera.htrace</groupId> <artifactId>htrace-core</artifactId> <version>2.04</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core</artifactId> <version>3.1.0-incubating</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core4</artifactId> <version>4.0.1-incubating</version> </dependency> </dependencies>
2. 添加LoggerUtil类,中间设计到EventLogConstant常量类和TimeUtil工具类
LoggerUtil主要作用就是解析日志,返回一个map对象
package com.kk.etl.util; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import com.kk.common.EventLogConstants; import com.kk.etl.util.IpSeekerExt.RegionInfo; import com.kk.etl.util.UserAgentUtil.UserAgentInfo; import com.kk.util.TimeUtil; public class LoggerUtils { private static final Logger logger = Logger.getLogger(LoggerUtils.class); private static IpSeekerExt ipSeekerExt = new IpSeekerExt(); /** * 处理日志数据logText,返回处理结果map集合<br/> * 如果logText没有指定数据格式,那么直接返回empty的集合 * * @param logText * @return */ public static Map<String, String> handleLog(String logText) { Map< String, String> clientInfo=new HashMap<String,String>(); if (logText!=null&&!logText.isEmpty()) { String[] splits=logText.split(EventLogConstants.LOG_SEPARTIOR); if (splits.length==4) { // 日志格式为: ip^A 服务器时间^A host^A 请求参数 clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_IP, splits[0].trim()); // 设置ip //设置服务器时间 clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(TimeUtil.parseNginxServerTime2Long(splits[1].trim()))); int index=splits[3].indexOf("?"); if (index > -1) { String requestBody = splits[3].substring(index + 1); // 获取请求参数,也就是我们的收集数据 // 处理请求参数 handleRequestBody(requestBody, clientInfo); // 处理userAgent handleUserAgent(clientInfo); // 处理ip地址 handleIp(clientInfo); }else { // 数据格式异常 clientInfo.clear(); } } } return clientInfo; } /** * 处理请求参数 * * @param requestBody * @param clientInfo */ private static void handleRequestBody(String requestBody, Map<String, String> clientInfo) { String[] requestParams=requestBody.split("&"); for (String param : requestParams) { if (param!=null&&!param.isEmpty()) { int index= param.indexOf("="); if (index < 0) { logger.warn("没法进行解析参数:" + param + ", 请求参数为:" + requestBody); continue; } try { String p1=param.substring(0,index); String p2 = URLDecoder.decode(param.substring(index + 1), "utf-8"); if (StringUtils.isNotBlank(p1) && StringUtils.isNotBlank(p2)) { clientInfo.put(p1, p2); } } catch (UnsupportedEncodingException e) { logger.warn("解码操作出现异常", e); continue; } } else { } } } /** * 处理ip地址 * * @param clientInfo */ private static void handleIp(Map<String,String> clientInfo) { if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_IP)) { String ip = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_IP); RegionInfo info = ipSeekerExt.analyticIp(ip); if (info!= null) { if (info.getCountry().equals("unknown")&&info.getCity().equals("unknown")&&info.getProvince().equals("unknown")) { info.setCountry("中国"); info.setProvince("广东省"); info.setCity("广州市"); } clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry()); clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince()); clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity()); } } } /** * 处理浏览器的userAgent信息 * * @param clientInfo */ private static void handleUserAgent(Map<String, String> clientInfo) { if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)) { UserAgentInfo info = UserAgentUtil.analyticUserAgent(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)); if (info != null) { clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, info.getOsName()); clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, info.getOsVersion()); clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, info.getBrowserName()); clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, info.getBrowserVersion()); } } } }
EventLogConstants主要作用就是描述hbase的event_logs表的信息(表名,列簇名,列名)以及日志收集的日志中的数据参数name。其实列名和name是一样的。
package com.kk.common; import java.nio.ByteBuffer; /** * * @author hzk *@parm * */ public class EventLogConstants { /** * 事件枚举类。指定事件的名称 * * @author gerry * */ public static enum EventEnum { LAUNCH(1, "launch event", "e_l"), // launch事件,表示第一次访问 PAGEVIEW(2, "page view event", "e_pv"), // 页面浏览事件 CHARGEREQUEST(3, "charge request event", "e_crt"), // 订单生产事件 CHARGESUCCESS(4, "charge success event", "e_cs"), // 订单成功支付事件 CHARGEREFUND(5, "charge refund event", "e_cr"), // 订单退款事件 EVENT(6, "event duration event", "e_e") // 事件 ; public final int id; // id 唯一标识 public final String name; // 名称 public final String alias; // 别名,用于数据收集的简写 private EventEnum(int id, String name, String alias) { this.id = id; this.name = name; this.alias = alias; } /** * 获取匹配别名的event枚举对象,如果最终还是没有匹配的值,那么直接返回null。 * * @param alias * @return */ public static EventEnum valueOfAlias(String alias) { for (EventEnum event : values()) { if (event.alias.equals(alias)) { return event; } } return null; } } /** * 表名称 */ public static final String HBASE_NAME_EVENT_LOGS = "event_logs"; /** * event_logs表的列簇名称 */ public static final String EVENT_LOGS_FAMILY_NAME = "info"; /** * 日志分隔符 */ public static final String LOG_SEPARTIOR = "\\^A"; /** * 用户ip地址 */ public static final String LOG_COLUMN_NAME_IP = "ip"; /** * 服务器时间 */ public static final String LOG_COLUMN_NAME_SERVER_TIME = "s_time"; /** * 事件名称 */ public static final String LOG_COLUMN_NAME_EVENT_NAME = "en"; /** * 数据收集端的版本信息 */ public static final String LOG_COLUMN_NAME_VERSION = "ver"; /** * 用户唯一标识符 */ public static final String LOG_COLUMN_NAME_UUID = "u_ud"; /** * 会员唯一标识符 */ public static final String LOG_COLUMN_NAME_MEMBER_ID = "u_mid"; /** * 会话id */ public static final String LOG_COLUMN_NAME_SESSION_ID = "u_sd"; /** * 客户端时间 */ public static final String LOG_COLUMN_NAME_CLIENT_TIME = "c_time"; /** * 语言 */ public static final String LOG_COLUMN_NAME_LANGUAGE = "l"; /** * 浏览器user agent参数 */ public static final String LOG_COLUMN_NAME_USER_AGENT = "b_iev"; /** * 浏览器分辨率大小 */ public static final String LOG_COLUMN_NAME_RESOLUTION = "b_rst"; /** * 当前url */ public static final String LOG_COLUMN_NAME_CURRENT_URL = "p_url"; /** * 前一个页面的url */ public static final String LOG_COLUMN_NAME_REFERRER_URL = "p_ref"; /** * 当前页面的title */ public static final String LOG_COLUMN_NAME_TITLE = "tt"; /** * 订单id */ public static final String LOG_COLUMN_NAME_ORDER_ID = "oid"; /** * 订单名称 */ public static final String LOG_COLUMN_NAME_ORDER_NAME = "on"; /** * 订单金额 */ public static final String LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT = "cua"; /** * 订单货币类型 */ public static final String LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE = "cut"; /** * 订单支付金额 */ public static final String LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE = "pt"; /** * category名称 */ public static final String LOG_COLUMN_NAME_EVENT_CATEGORY = "ca"; /** * action名称 */ public static final String LOG_COLUMN_NAME_EVENT_ACTION = "ac"; /** * kv前缀 */ public static final String LOG_COLUMN_NAME_EVENT_KV_START = "kv_"; /** * duration持续时间 */ public static final String LOG_COLUMN_NAME_EVENT_DURATION = "du"; /** * 操作系统名称 */ public static final String LOG_COLUMN_NAME_OS_NAME = "os"; /** * 操作系统版本 */ public static final String LOG_COLUMN_NAME_OS_VERSION = "os_v"; /** * 浏览器名称 */ public static final String LOG_COLUMN_NAME_BROWSER_NAME = "browser"; /** * 浏览器版本 */ public static final String LOG_COLUMN_NAME_BROWSER_VERSION = "browser_v"; /** * ip地址解析的所属国家 */ public static final String LOG_COLUMN_NAME_COUNTRY = "country"; /** * ip地址解析的所属省份 */ public static final String LOG_COLUMN_NAME_PROVINCE = "province"; /** * ip地址解析的所属城市 */ public static final String LOG_COLUMN_NAME_CITY = "city"; /** * 定义platform */ public static final String LOG_COLUMN_NAME_PLATFORM = "pl"; }
TimeUtil主要作用解析服务器时间以及定义rowkey中的timestamp时间戳格式。
package com.kk.util; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import com.kk.common.DateEnum; public class TimeUtil { /** * 将nginx服务器时间转换为时间戳,如果说解析失败,返回-1 * * @param input * @return */ public static long parseNginxServerTime2Long(String input) { Date date = parseNginxServerTime2Date(input); return date == null ? -1L : date.getTime(); } /** * 将nginx服务器时间转换为date对象。如果解析失败,返回null * * @param input * 格式: 1449410796.976 * @return */ public static Date parseNginxServerTime2Date(String input) { if (StringUtils.isNotBlank(input)) { try { long timestamp = Double.valueOf(Double.valueOf(input.trim()) * 1000).longValue(); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(timestamp); return calendar.getTime(); } catch (Exception e) { // nothing } } return null; } /** * 判断输入的参数是否是一个有效的时间格式数据 * * @param input * @return */ public static boolean isValidateRunningDate(String input) { Matcher matcher = null; boolean result = false; String regex = "[0-9]{4}-[0-9]{2}-[0-9]{2}"; if (input != null && !input.isEmpty()) { Pattern pattern = Pattern.compile(regex); matcher = pattern.matcher(input); } if (matcher != null) { result = matcher.matches(); } return result; } public static final String DATE_FORMAT = "yyyy-MM-dd"; /** * 获取昨日的日期格式字符串数据 * * @return */ public static String getYesterday() { return getYesterday(DATE_FORMAT); } /** * 获取对应格式的时间字符串 * * @param pattern * @return */ public static String getYesterday(String pattern) { SimpleDateFormat sdf = new SimpleDateFormat(pattern); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.DAY_OF_YEAR, -1); return sdf.format(calendar.getTime()); } /** * 将yyyy-MM-dd格式的时间字符串转换为时间戳 * * @param input * @return */ public static long parseString2Long(String input) { return parseString2Long(input, DATE_FORMAT); } /** * 将指定格式的时间字符串转换为时间戳 * * @param input * @param pattern * @return */ public static long parseString2Long(String input, String pattern) { Date date = null; try { date = new SimpleDateFormat(pattern).parse(input); } catch (ParseException e) { throw new RuntimeException(e); } return date.getTime(); } /** * 将时间戳转换为yyyy-MM-dd格式的时间字符串 * @param input * @return */ public static String parseLong2String(long input) { return parseLong2String(input, DATE_FORMAT); } /** * 将时间戳转换为指定格式的字符串 * * @param input * @param pattern * @return */ public static String parseLong2String(long input, String pattern) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(input); return new SimpleDateFormat(pattern).format(calendar.getTime()); } /** * 从时间戳中获取需要的时间信息 * * @param time * 时间戳 * @param type * @return 如果没有匹配的type,抛出异常信息 */ public static int getDateInfo(long time, DateEnum type) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); if (DateEnum.YEAR.equals(type)) { // 需要年份信息 return calendar.get(Calendar.YEAR); } else if (DateEnum.SEASON.equals(type)) { // 需要季度信息 int month = calendar.get(Calendar.MONTH) + 1; if (month % 3 == 0) { return month / 3; } return month / 3 + 1; } else if (DateEnum.MONTH.equals(type)) { // 需要月份信息 return calendar.get(Calendar.MONTH) + 1; } else if (DateEnum.WEEK.equals(type)) { // 需要周信息 return calendar.get(Calendar.WEEK_OF_YEAR); } else if (DateEnum.DAY.equals(type)) { return calendar.get(Calendar.DAY_OF_MONTH); } else if (DateEnum.HOUR.equals(type)) { return calendar.get(Calendar.HOUR_OF_DAY); } throw new RuntimeException("没有对应的时间类型:" + type); } /** * 获取time指定周的第一天的时间戳值 * * @param time * @return */ public static long getFirstDayOfThisWeek(long time) { Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(time); cal.set(Calendar.DAY_OF_WEEK, 1); cal.set(Calendar.HOUR_OF_DAY, 0); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); return cal.getTimeInMillis(); } }
3. 编写mapper类和runner类
mapper
package com.kk.etl.util.mr.ald; import java.io.IOException; import java.util.Map; import java.util.zip.CRC32; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import com.kk.common.EventLogConstants; import com.kk.common.EventLogConstants.EventEnum; import com.kk.etl.util.LoggerUtils; /** * 自定义数据解析map类 * * @author gerry * */ public class AnalyserLogDataMapper extends Mapper<Object, Text, NullWritable, Put> { private final Logger logger = Logger.getLogger(AnalyserLogDataMapper.class); private int inputRecords, filterRecords, outputRecords; // 主要用于标志,方便查看过滤数据 private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME); private CRC32 crc32 = new CRC32(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { this.inputRecords++; this.logger.debug("Analyse data of :" + value); try { // 解析日志 Map<String, String> clientInfo = LoggerUtils.handleLog(value.toString()); // 过滤解析失败的数据 if (clientInfo.isEmpty()) { this.filterRecords++; return; } // 获取事件名称 String eventAliasName = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME); EventEnum event = EventEnum.valueOfAlias(eventAliasName); switch (event) { case LAUNCH: case PAGEVIEW: case CHARGEREQUEST: case CHARGEREFUND: case CHARGESUCCESS: case EVENT: // 处理数据 this.handleData(clientInfo, event, context); break; default: this.filterRecords++; this.logger.warn("该事件没法进行解析,事件名称为:" + eventAliasName); } } catch (Exception e) { this.filterRecords++; this.logger.error("处理数据发出异常,数据:" + value, e); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); logger.info("输入数据:" + this.inputRecords + ";输出数据:" + this.outputRecords + ";过滤数据:" + this.filterRecords); } /** * 具体处理数据的方法 * * @param clientInfo * @param context * @param event * @throws InterruptedException * @throws IOException */ private void handleData(Map<String, String> clientInfo, EventEnum event, Context context) throws IOException, InterruptedException { String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID); String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID); String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME); if (StringUtils.isNotBlank(serverTime)) { // 要求服务器时间不为空 clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉 String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp // + // (uuid+memberid+event).crc Put put = new Put(Bytes.toBytes(rowkey)); for (Map.Entry<String, String> entry : clientInfo.entrySet()) { if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) { put.add(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue())); } } context.write(NullWritable.get(), put); this.outputRecords++; } else { this.filterRecords++; } } /** * 根据uuid memberid servertime创建rowkey * * @param uuid * @param memberId * @param eventAliasName * @param serverTime * @return */ private String generateRowKey(String uuid, String memberId, String eventAliasName, String serverTime) { StringBuilder sb = new StringBuilder(); sb.append(serverTime).append("_"); this.crc32.reset(); if (StringUtils.isNotBlank(uuid)) { this.crc32.update(uuid.getBytes()); } if (StringUtils.isNotBlank(memberId)) { this.crc32.update(memberId.getBytes()); } this.crc32.update(eventAliasName.getBytes()); sb.append(this.crc32.getValue() % 100000000L); return sb.toString(); } }
runner
package com.kk.etl.util.mr.ald; import java.io.File; import java.io.IOException; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import com.kk.common.EventLogConstants; import com.kk.common.GlobalConstants; import com.kk.util.EJob; import com.kk.util.TimeUtil; import cz.mallat.uasparser.*; public class AnalyserLogDataRunner implements Tool{ private static final Logger logger = Logger.getLogger(AnalyserLogDataRunner.class); private Configuration conf = null; public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new AnalyserLogDataRunner(), args); } catch (Exception e) { logger.error("执行日志解析job异常", e); throw new RuntimeException(e); } } @Override public void setConf(Configuration conf) { this.conf = HBaseConfiguration.create(conf); } @Override public Configuration getConf() { return this.conf; } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); this.processArgs(conf, args); conf.set("fs.defaultFS","hdfs://hadoop-001:9000"); conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","hadoop-002"); Job job = Job.getInstance(conf, "analyser_logdata" ); // 设置本地提交job,集群运行,需要代码 File jarFile = EJob.createTempJar("target/classes"); JobConf jobConf=(JobConf)job.getConfiguration(); jobConf.setJar(jarFile.toString()); // 设置本地提交job,集群运行,需要代码结束 job.setJarByClass(AnalyserLogDataRunner.class); job.setMapperClass(AnalyserLogDataMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); // 设置reducer配置 // 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true) conf.set("addDependencyJars", "true"); TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job); // 2. 本地运行,要求参数addDependencyJars为false // // conf.set("addDependencyJars", "false"); // TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, // null, job, null, null, null, null, false); job.setNumReduceTasks(0); // 设置输入路径 this.setJobInputPaths(job); return job.waitForCompletion(true) ? 0 : -1; } /** * 处理参数 * * @param conf * @param args */ private void processArgs(Configuration conf, String[] args) { String date = null; for (int i = 0; i < args.length; i++) { if ("-d".equals(args[i])) { if (i + 1 < args.length) { date = args[++i]; break; } } } // 要求date格式为: yyyy-MM-dd if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) { // date是一个无效时间数据 date = TimeUtil.getYesterday(); // 默认时间是昨天 } conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date); } /** * 设置job的输入路径 * * @param job */ private void setJobInputPaths(Job job) { Configuration conf = job.getConfiguration(); FileSystem fs = null; try { fs = FileSystem.get(conf); String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); Path inputPath = new Path("/logs/nginx/" + TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), "MM/dd"));//+"/BF-01.1553792641215"); if (fs.exists(inputPath)) { FileInputFormat.addInputPath(job, inputPath); } else { throw new RuntimeException("文件不存在:" + inputPath); } } catch (IOException e) { throw new RuntimeException("设置job的mapreduce输入路径出现异常", e); } finally { if (fs != null) { try { fs.close(); } catch (IOException e) { // nothing } } } } }
4. 添加环境变量文件,core-site.xml hbase-site.xml log4j.properties 根据不同的运行情况,修改源码将修改后的源码放到代码中。
5. 添加pom编译代码,并进行测试
本地运行测试: 需要注意的就是windows环境可能会导致出现access方法调用异常,需要修改nativeio这个java文件。
使用TableMapReduceUtil的时候如果出现异常:
*****/htrace-core-2.04.jar from hdfs://***/htrace-core-2.04.jar is not a valid DFS filename.
就需要将addDependencyJars参数设置为false。
本地提交job,集群运行测试:
本地需要知道提交的job是需要提交到集群上的,所以需要指定两个参数mapreduce.framework.name和yarn.resourcemanager.address,value分别为yarn和hh:8032即可,但是可能会出现异常信息,此时需要将参数mapreduce.app-submission.cross-platform设置为true。
参数设置:
mapreduce.framework.name=yarn
yarn.resourcemanager.address=hadoop-001:8032
mapreduce.app-submission.cross-platform=true
目录结构如下
异常:
1. Permission denied: user=gerry, access=EXECUTE, inode="/tmp":hadoop:supergroup:drwx------
解决方案:执行
hdfs dfs -chmod -R 777 /tmp
2. Stack trace: ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control
解决方案:
添加mapreduce.app-submission.cross-platform=true
3. ExitCodeException exitCode=1:
解决方案:
habse指定输出reducer的时候必须给定addDependencyJars参数为true。
4. Class com.beifeng.etl.mr.ald.AnalyserLogDataMapper not found
解决方案:
引入EJob.java文件,然后再runner类中添加代码
File jarFile = EJob.createTempJar("target/classes");
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
集群提交&运行job测试: