MaxCompute

WordCount示例

1.下载MaxCompute客户端工具

1.下载客户端

http://repo.aliyun.com/download/odpscmd/0.29.2/odpscmd_public.zip

2.配置和使用

1).解压缩后,odpscmd 会在用户目录下新建一个名字叫odpscmd 的目录,其中保存了 odpscmd 的配置文件 ~/.odpscmd/odps_config.ini,打开它将看到:

project_name=dataworks_workshop_01

access_id=<accessid>

access_key=<accesskey>

end_point=http://service.odps.aliyun.com/api

tunnel_endpoint=http://dt.odps.aliyun.com

log_view_host=http://logview.odps.aliyun.com

https_check=true

2).在其中填入登录所需要的信息(包括项目名、accessId/Key),保存并退出。

3).在命令行中输入 odpscmd ,回车,将看到这样的提示:

> odpscmd

Aliyun ODPS Command Line Tool

Version 0.29.1

@Copyright 2015 Alibaba Cloud Computing Co., Ltd. All rights reserved.

odps@ proj_name>

4.其中 proj_name 代表你所在的项目名。输入 list tables 可以列出项目下的所有表名。

odps@ proj_name>list tables;

this_is_a_user_name:this_is_a_table_name

...

2.数据环境准备

进行odpscmd客户端进行如下操作。

1.创建输入和输出表

create table wc_in_23 (key string, value string);

create table wc_out_23 (key string, cnt bigint);

2. 添加测试资源

----需要替换本机jar包所在的绝对路径

add jar  /Work/Testing/odpscmd_public/lib/mapreduce-examples.jar -f;

3. 使用insert into 插入测试数据

insert into table wc_in_23 select '238',' val_238' ;

或者

insert into wc_in_23 values('hello','maxcompute');

备注,也可以通过tunnel、数据集成等方式从本地或数据库、OSS上导入数据到输入表.

tunnel upload data wc_in;

3.执行测试步骤

在 odpscmd 中执行 WordCount,如下所示:

jar -resources mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in_23 wc_out_23;

预期结果:

作业成功结束后,输出表 wc_out 中的内容,如下所示:

1. +------------+------------+

2. | key        | cnt        |

3. +------------+------------+

4. | hello      | 1          |

5. | odps       | 1          |

6. +------------+------------+

4.代码示例

    package com.aliyun.odps.mapred.open.example;

    import java.io.IOException;

    import java.util.Iterator;

    import com.aliyun.odps.data.Record;

    import com.aliyun.odps.data.TableInfo;

    import com.aliyun.odps.mapred.JobClient;

    import com.aliyun.odps.mapred.MapperBase;

    import com.aliyun.odps.mapred.ReducerBase;

    import com.aliyun.odps.mapred.TaskContext;

    import com.aliyun.odps.mapred.conf.JobConf;

    import com.aliyun.odps.mapred.utils.InputUtils;

    import com.aliyun.odps.mapred.utils.OutputUtils;

    import com.aliyun.odps.mapred.utils.SchemaUtils;

    public class WordCount {

      public static class TokenizerMapper extends MapperBase {

        private Record word;

        private Record one;

        @Override

        public void setup(TaskContext context) throws IOException {

          word = context.createMapOutputKeyRecord();

          one = context.createMapOutputValueRecord();

          one.set(new Object[] { 1L });

          System.out.println("TaskID:" + context.getTaskID().toString());

        }

        @Override

        public void map(long recordNum, Record record, TaskContext context)

            throws IOException {

          for (int i = 0; i < record.getColumnCount(); i++) {

            word.set(new Object[] { record.get(i).toString() });

            context.write(word, one);

          }

        }

      }

      /**

       * A combiner class that combines map output by sum them.

       **/

      public static class SumCombiner extends ReducerBase {

        private Record count;

        @Override

        public void setup(TaskContext context) throws IOException {

          count = context.createMapOutputValueRecord();

        }

        @Override

        public void reduce(Record key, Iterator<Record> values, TaskContext context)

            throws IOException {

          long c = 0;

          while (values.hasNext()) {

            Record val = values.next();

            c += (Long) val.get(0);

          }

          count.set(0, c);

          context.write(key, count);

        }

      }

      /**

       * A reducer class that just emits the sum of the input values.

       **/

      public static class SumReducer extends ReducerBase {

        private Record result = null;

        @Override

        public void setup(TaskContext context) throws IOException {

          result = context.createOutputRecord();

        }

        @Override

        public void reduce(Record key, Iterator<Record> values, TaskContext context)

            throws IOException {

          long count = 0;

          while (values.hasNext()) {

            Record val = values.next();

            count += (Long) val.get(0);

          }

          result.set(0, key.get(0));

          result.set(1, count);

          context.write(result);

        }

      }

      public static void main(String[] args) throws Exception {

        if (args.length != 2) {

          System.err.println("Usage: WordCount <in_table> <out_table>");

          System.exit(2);

        }

        JobConf job = new JobConf();

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(SumCombiner.class);

        job.setReducerClass(SumReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));

        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);

        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);

        JobClient.runJob(job);

      }

    }

 

上一篇:ODPS与Kettle融合之道


下一篇:一款神级API接口神器