MaxCompute Graph开发-eclipse.sql

操作步骤
1.运行MaxCompute客户端。
MaxCompute Graph开发-eclipse.sql

2.执行如下命令创建输入表sssp_in和输出表sssp_out。

CREATE TABLE sssp_in (v bigint, es string);
CREATE TABLE sssp_out (v bigint, l bigint);

3.上传数据至表sssp_in中。
示例数据如下,建议您创建sssp.txt文件将数据保存至本地。假设保存在本地路径为D:\IntelliJ\data\sssp.txt

1 2:2,3:1,4:4
2 1:2,3:2,4:1
3 1:1,2:2,5:1
4 1:4,2:1,5:1
5 3:1,4:1

4.执行Tunnel命令上传数据至表sssp_in中, 以空格键做两列的分隔符。

tunnel u -fd " " D:\IntelliJ\data\sssp.txt sssp_in;
odps@ yunbee>select * from sssp_in;

MaxCompute Graph开发-eclipse.sql

5.在eclipse上创建odps object(graph小写)
File -> New -> Other -> ODPS -> ODPS Project -> next -> Packge ->Project name (graph) ->
Config ODPS console installation path->Brower->C:\odpscmd\yunbee

MaxCompute Graph开发-eclipse.sql

MaxCompute Graph开发-eclipse.sql

6.创建Java class (SSSP大写)
Graph(右键)->new -> Class ->Name(例如命名成 SSSP)->Finish
MaxCompute Graph开发-eclipse.sql

7.编写SSSP示例。
本地编译、调试SSSP算法示例,假设代码被打包为名为odps-graph-example-sssp.jar的文件。
说明 仅需要将SSSP代码打包即可,不需要同时将SDK打包入odps-graph-example-sssp.jar中。

import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;

public class SSSP {
  public static final String START_VERTEX = "sssp.start.vertex.id";
  public static class SSSPVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
    private static long startVertexId = -1;
    public SSSPVertex() {
      this.setValue(new LongWritable(Long.MAX_VALUE));
    }

    public boolean isStartVertex(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
      if (startVertexId == -1) {
        String s = context.getConfiguration().get(START_VERTEX);
        startVertexId = Long.parseLong(s);
      }
      return getId().get() == startVertexId;
    }

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
      for (LongWritable msg : messages) {
        if (msg.get() < minDist) {
          minDist = msg.get();
        }
      }

      if (minDist < this.getValue().get()) {
        this.setValue(new LongWritable(minDist));
        if (hasEdges()) {
          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
            context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
                + e.getValue().get()));
          }
        }
      } else {
        voteToHalt();
      }
    }

    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }

  public static class MinLongCombiner extends
      Combiner<LongWritable, LongWritable> {

    @Override
    public void combine(LongWritable vertexId, LongWritable combinedMessage,
        LongWritable messageToCombine) throws IOException {
      if (combinedMessage.get() > messageToCombine.get()) {
        combinedMessage.set(messageToCombine.get());
      }
    }

  }

  public static class SSSPVertexReader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      SSSPVertex vertex = new SSSPVertex();
      vertex.setId((LongWritable) record.get(0));
      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        String[] ss = edges[i].split(":");
        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
            new LongWritable(Long.parseLong(ss[1])));
      }

      context.addVertexRequest(vertex);
    }

  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2) {
      System.out.println("Usage: <startnode> <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(SSSPVertexReader.class);
    job.setVertexClass(SSSPVertex.class);
    job.setCombinerClass(MinLongCombiner.class);

    job.set(START_VERTEX, args[0]);
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.addOutput(TableInfo.builder().tableName(args[2]).build());

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

MaxCompute Graph开发-eclipse.sql

8.在eclipse上配置参数并运行

run -> run configuations -> ODPS Mapreduce(双击) -> main class (自动生成:graph.SSSP)
->ODPS config (yunbee)->Arguments -> apply -> run
传参数:1 sssp_in sssp_out

    3 sssp_in sssp_out
    5 sssp_in sssp_out

MaxCompute Graph开发-eclipse.sql

9.查看数据
D:\eclipse\eclipse-workspace\graph\temp
D:\eclipse\eclipse-workspace\graph\warehouse\yunbee\_tables_\sssp_out
MaxCompute Graph开发-eclipse.sql

10.maven打jar包
点maxcompute->右键 -> Run AS -> 3.Maven build
#Goal:clean package -DskipTests
MaxCompute Graph开发-eclipse.sql

11.检查jar包是否正常
D:\eclipse\eclipse-workspace\maxcompute\target
maxcompute-0.0.1-SNAPSHOT.jar

MaxCompute Graph开发-eclipse.sql

12.在maxcompute上添加Jar资源
odps@ yunbee>add jar D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar -f;
MaxCompute Graph开发-eclipse.sql

13.运行SSSP
jar -libjars maxcompute-0.0.1-SNAPSHOT.jar -classpath D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar graph.SSSP 1 sssp_in sssp_out;
MaxCompute Graph开发-eclipse.sql
MaxCompute Graph开发-eclipse.sql

14.查看数据

odps@ yunbee>select * from sssp_out;

MaxCompute Graph开发-eclipse.sql

在maxcomupte跑的结果和本地eclipse跑的结果完成吻合。

上一篇:启用快速增量备份


下一篇:Oracle 11g加密备份