参考官方文档:http://gora.apache.org/current/tutorial.html
项目代码见:https://code.csdn.net/jediael_lu/mygorademo
另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521
当着数据已通过之前的示例存储在hbase中,数据如下:
\x00\x00\x00\x00\x00\x00\x00D column=common:ip, timestamp=1422529645469, value=85.100.75.104
\x00\x00\x00\x00\x00\x00\x00D column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0
\x00\x00\x00\x00\x00\x00\x00D column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241
\x00\x00\x00\x00\x00\x00\x00D column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00D column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00D column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00D column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241
\x00\x00\x00\x00\x00\x00\x00D column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021
910 Firefox/3.0.7
\x00\x00\x00\x00\x00\x00\x00E column=common:ip, timestamp=1422529645469, value=85.100.75.104
\x00\x00\x00\x00\x00\x00\x00E column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP
\x00\x00\x00\x00\x00\x00\x00E column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00E column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00E column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00E column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021
910 Firefox/3.0.7
本例将使用MR读取hbase中的数据,并进行分析,分析每个url,一天时间内有多少人在访问,输出结果保存在hbase中,表中的key为“url+时间”格式的String,value包括三列,分别是url,时间,访问次数。
0、创建java project及gora.properties,内容如下:
##gora.datastore.default is the default detastore implementation to use
##if it is not passed to the DataStoreFactory#createDataStore() method.
gora.datastore.default=org.apache.gora.hbase.store.HBaseStore ##whether to create schema automatically if not exists.
gora.datastore.autocreateschema=true
1、创建用于对应输入数据的json文件,并生成相应的类。
上个示例已经完成,见passview.json与PageView.java
{
"type": "record",
"name": "Pageview", "default":null,
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "url", "type": ["null","string"], "default":null},
{"name": "timestamp", "type": "long", "default":0},
{"name": "ip", "type": ["null","string"], "default":null},
{"name": "httpMethod", "type": ["null","string"], "default":null},
{"name": "httpStatusCode", "type": "int", "default":0},
{"name": "responseSize", "type": "int", "default":0},
{"name": "referrer", "type": ["null","string"], "default":null},
{"name": "userAgent", "type": ["null","string"], "default":null}
]
}
2、创建输入数据的类与表映射文件
<?xml version="1.0" encoding="UTF-8"?> <!--
Gora Mapping file for HBase Backend
-->
<gora-otd>
<table name="Pageview"> <!-- optional descriptors for tables -->
<family name="common"/> <!-- This can also have params like compression, bloom filters -->
<family name="http"/>
<family name="misc"/>
</table> <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
<field name="url" family="common" qualifier="url"/>
<field name="timestamp" family="common" qualifier="timestamp"/>
<field name="ip" family="common" qualifier="ip" />
<field name="httpMethod" family="http" qualifier="httpMethod"/>
<field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
<field name="responseSize" family="http" qualifier="responseSize"/>
<field name="referrer" family="misc" qualifier="referrer"/>
<field name="userAgent" family="misc" qualifier="userAgent"/>
</class> </gora-otd>
3、创建用于对于输出数据的json文件,并生成相应的类。
{
"type": "record",
"name": "MetricDatum",
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "metricDimension", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "metric", "type" : "long"}
]
}
liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/
Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json
Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src
Compiler executed SUCCESSFULL.
4、创建输出数据的类与表映射内容,并将之加入第2步创建的文件中。
<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
<field name="metricDimension" family="common" qualifier="metricDimension"/>
<field name="timestamp" family="common" qualifier="ts"/>
<field name="metric" family="common" qualifier="metric"/>
</class>
5、写主类文件
程序处理的关键步骤:
(1)获取输入、输出DataStore
if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
}
(2)设置job的一些基本属性
Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());
(3)定义job相关的Map类及mapr的输入输出信息。
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,
LogAnalyticsMapper.class, true);
(4)定义job相关的Reduce类及reduce的输入输出信息。
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
(5)定义map类
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,
LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override
protected void setup(Context context) throws IOException ,InterruptedException {
tuple = new TextLong();
tuple.setKey(new Text());
tuple.setValue(new LongWritable());
}; @Override
protected void map(Long key, Pageview pageview, Context context)
throws IOException ,InterruptedException { CharSequence url = pageview.getUrl();
long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString());
tuple.getValue().set(day); context.write(tuple, one);
}; /** Rolls up the given timestamp to the day cardinality, so that
* data can be aggregated daily */
private long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
}
(6)定义reduce类
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,
String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override
protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)
throws IOException ,InterruptedException { long sum = 0L; //sum up the values
for(LongWritable value: values) {
sum+= value.get();
} String dimension = tuple.getKey().toString();
long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension));
metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);
metricDatum.setMetric(sum); context.write(key, metricDatum);
};
}
(8)使用输入输出DataStore来创建一个job,并执行
Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);
其实使用Gora与一般的MR程序的主要区别在于:
(1)继承于GoraMapper/GoraReducer,而不是Mapper/Reducer。
(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。
如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。
对比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的运行一个job所需的基本属性:
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true);
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
以上语句同时完成了2、3、4、5步,即
指定了2、Map/Reduce的类:LogAnalyticsMapper.class与LogAnalyticsReducer.class
指定了3、4、输入格式及内容及5、reduce的输出类型:即输入输出均为DataSource格式,内容为inStore与outStore中的内容。
指定了5、指定了map的输出类型,这也是reduce的输入类型。
附详细代码:
(1)KeyValueWritable.java
package org.apache.gora.tutorial.log; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /**
* A WritableComparable containing a key-value WritableComparable pair.
* @param <K> the class of key
* @param <V> the class of value
*/
public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable>
implements WritableComparable<KeyValueWritable<K,V>> { protected K key = null;
protected V value = null; public KeyValueWritable() {
} public KeyValueWritable(K key, V value) {
this.key = key;
this.value = value;
} public K getKey() {
return key;
} public void setKey(K key) {
this.key = key;
} public V getValue() {
return value;
} public void setValue(V value) {
this.value = value;
} @Override
public void readFields(DataInput in) throws IOException {
if(key == null) { }
key.readFields(in);
value.readFields(in);
} @Override
public void write(DataOutput out) throws IOException {
key.write(out);
value.write(out);
} @Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
} @Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
KeyValueWritable other = (KeyValueWritable) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
} @Override
public int compareTo(KeyValueWritable<K, V> o) {
int cmp = key.compareTo(o.key);
if(cmp != 0)
return cmp; return value.compareTo(o.value);
}
}
(2) TextLong.java
package org.apache.gora.tutorial.log; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; /**
* A {@link KeyValueWritable} of {@link Text} keys and
* {@link LongWritable} values.
*/
public class TextLong extends KeyValueWritable<Text, LongWritable> { public TextLong() {
key = new Text();
value = new LongWritable();
} }
(3) LogAnalytics.java
package org.apache.gora.tutorial.log; import java.io.IOException; import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
* LogAnalytics is the tutorial class to illustrate Gora MapReduce API.
* The analytics mapreduce job reads the web access data stored earlier by the
* {@link LogManager}, and calculates the aggregate daily pageviews. The
* output of the job is stored in a Gora compatible data store.
*
* <p>See the tutorial.html file in docs or go to the
* <a href="http://incubator.apache.org/gora/docs/current/tutorial.html">
* web site</a>for more information.</p>
*/
public class LogAnalytics extends Configured implements Tool { private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class); /** The number of miliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24; /**
* The Mapper takes Long keys and Pageview objects, and emits
* tuples of <url, day> as keys and 1 as values. Input values are
* read from the input data store.
* Note that all Hadoop serializable classes can be used as map output key and value.
*
*/
//6、定义map类
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,
LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override
protected void setup(Context context) throws IOException ,InterruptedException {
tuple = new TextLong();
tuple.setKey(new Text());
tuple.setValue(new LongWritable());
}; @Override
protected void map(Long key, Pageview pageview, Context context)
throws IOException ,InterruptedException { CharSequence url = pageview.getUrl();
long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString());
tuple.getValue().set(day); context.write(tuple, one);
}; /** Rolls up the given timestamp to the day cardinality, so that
* data can be aggregated daily */
private long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
} /**
* The Reducer receives tuples of <url, day> as keys and a list of
* values corresponding to the keys, and emits a combined keys and
* {@link MetricDatum} objects. The metric datum objects are stored
* as job outputs in the output data store.
*/
//7、定义reduce类
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,
String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override
protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)
throws IOException ,InterruptedException { long sum = 0L; //sum up the values
for(LongWritable value: values) {
sum+= value.get();
} String dimension = tuple.getKey().toString();
long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension));
metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);
metricDatum.setMetric(sum); context.write(key, metricDatum);
};
} /**
* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
* @param inStore
* @param outStore
* @param numReducer
* @return
* @throws IOException
*/
public Job createJob(DataStore<Long, Pageview> inStore,
DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
//3、设置job的一些基本属性
Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass()); /* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/
//4、定义job相关的Map类及mapr的输入输出信息。
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,
LogAnalyticsMapper.class, true); //4、定义job相关的Reduce类及reduce的输入输出信息。
/* Reducers are initialized with GoraReducer#initReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class); return job;
} @Override
public int run(String[] args) throws Exception { DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration conf = new Configuration(); //1、获取输入、输出DataStore。
if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
} //2、使用输入输出DataStore来创建一个job
Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true); inStore.close();
outStore.close(); log.info("Log completed with " + (success ? "success" : "failure")); return success ? 0 : 1;
} private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>"; public static void main(String[] args) throws Exception {
if(args.length < 2) {
System.err.println(USAGE);
System.exit(1);
}
//run as any other MR job
int ret = ToolRunner.run(new LogAnalytics(), args);
System.exit(ret);
} }
6、运行程序
(1)导出程序—>runnable jar file,并将其上传到服务器
(2)运行程序
$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore
(3)查看hbase中的结果
hbase(main):001:0> list
TABLE
AccessLog
Jan2814_webpage
Jan2819_webpage
Jan2910_webpage
Jan2920_webpage
Metrics
Passwd
member
8 row(s) in 2.6450 seconds
hbase(main):002:0> scan 'Metrics'