背景及目的
方便和辅助 MaxCompute 开发人员使用 Java / Python SDK 方式进行日常代码的开发工作。
一、MaxCompute Java SDK
1、Java SDK用法介绍
在本文档中,我们仅会对较为常用的MaxCompute核心接口做简短介绍,更多详细信息请参阅SDK Java Doc。
包名 | 描述 |
---|---|
odps-sdk-core | MaxCompute基础功能。封装了基础的MaxCompute概念及其编程接口,包括odps、project、table等,tunnel相关的功能也在此包。 |
odps-sdk-core-internal | MaxCompute扩展功能。封装了一些不常用的MaxCompute概念和操作,例如Event、XFlow等。 |
odps-sdk-commons | MaxCompute基础设施。包含TableSchema、Column、Record、OdpsType等基础设施和一些util的封装。 |
odps-sdk-udf | MaxCompute UDF编程接口。 |
odps-sdk-mapred | MaxCompute MapReduce作业编程接口。 |
1.1、Odps
MaxCompute SDK的入口,用户通过此类来获取项目空间下的所有对象集合,包括:
Projects、Tables、Resources、Functions、Instances。
用户可以通过传入AliyunAccount实例来构造MaxCompute对象。程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
odps.setDefaultProject("my_project");
for (Table t : odps.tables()) {
....
}
1.2、Projects
MaxCompute 中所有项目空间的集合。集合中的元素为Project 。程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Project p = odps.projects().get("my_exists");
p.reload();
Map<String, String> properties = prj.getProperties();
...
1.3、SQL Task
SQL Task 是SDK直接调用MaxCompute SQL的接口,能很方便得运行SQL并获得其返回结果。
从文档可以看到,SQLTask.getResult(i); 返回的是一个List。用户可以循环迭代这个List,获得完整的SQL计算返回结果。不过这个方法有个缺陷,可以参考这里这里提到的SetProject READ_TABLE_MAX_ROW的功能。
目前Select语句返回给客户端的数据条数最大可以调整到1万。也就是说如果在客户端上(包括SQLTask)直接Select,那相当于查询结果上最后加了个Limit N(如果是CREATE TABLE XX AS SELECT或者用INSERT INTO/OVERWRITE TABLE把结果固化到具体的表里就没关系)。
下面代码用于运行、处理SQL任务的接口。可以通过run接口直接运行SQL。
run接口返回Instance 实例,通过Instance获取SQL的运行状态及运行结果。
程序示例如下,仅供参考:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Instance instance = SQLTask.run(odps, "my_project", "select ...");
String id = instance.getId();
instance.waitforsuccess();
Set<String> taskNames = instance.getTaskNames();
for (String name : taskNames) {
TaskSummary summary = instance.getTaskSummary(name);
String s = summary.getSummaryText();
}
Map<String, String> results = instance.getTaskResults();
Map<String, TaskStatus> taskStatus = instance.getTaskStatus();
for (Entry<String, TaskStatus> status : taskStatus.entrySet()) {
String result = results.get(status.getKey());
}
说明:如果用户想创建表,需要通过SQLTask接口,而不是Table 接口。用户需要将创建表(CREATE
TABLE)的语句传入SQLTask。
1.4、Instances
MaxCompute中所有实例(Instance)的集合。
集合中的元素为Instance 。程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
odps.setDefaultProject("my_project");
for (Instance i : odps.instances ()) {
....
}
对实例信息的描述,可以通过Instances获取相应的实例。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps (account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Instance ins = odps.instances().get("instance id");
Date startTime = instance.getStartTime();
Date endTime = instance.getEndTime();
...
Status instanceStatus = instance.getStatus();
String instanceStatusStr = null;
if (instanceStatus == Status.TERMINATED) {
instanceStatusStr = TaskStatus.Status.SUCCESS.toString();
Map<String, TaskStatus> taskStatus = instance.getTaskStatus();
for (Entry<String, TaskStatus> status : taskStatus.entrySet()) {
if (status.getValue().getStatus() != TaskStatus.Status.SUCCESS) {
instanceStatusStr = status.getValue().getStatus().toString();
break;
}
}
} else {
instanceStatusStr = instanceStatus.toString();
}
...
TaskSummary summary = instance.getTaskSummary("instance name");
String s = summary.getSummaryText();
1.5、Tables
MaxCompute中所有表的集合。集合中的元素为Table。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
odps.setDefaultProject("my_project");
for (Table t : odps.tables()) {
....
}
对表信息的描述,可以通过Tables获取相应的表。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Table t = odps.tables().get("table name");
t.reload();
Partition part = t.getPartition(new PartitionSpec(tableSpec[1]));
part.reload();
...
1.6、Resources
MaxCompute中所有资源的集合。集合中的元素为Resource。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
odps.setDefaultProject("my_project");
for (Resource r : odps.resources()) {
....
}
对资源信息的描述,可以通过Resources获取相应的资源。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Resource r = odps.resources().get("resource name");
r.reload();
if (r.getType() == Resource.Type.TABLE) {
TableResource tr = new TableResource(r);
String tableSource = tr.getSourceTable().getProject() + "." + tr.
getSourceTable().getName();
if (tr.getSourceTablePartition() != null) {
tableSource += " partition(" + tr.getSourceTablePartition().toString() + ")";
}
....
}
一个创建文件资源的示例:
String projectName = "my_porject";
String source = "my_local_file.txt";
File file = new File(source);
InputStream is = new FileInputStream(file);
FileResource resource = new FileResource();
String name = file.getName();
resource.setName(name);
odps.resources().create(projectName, resource, is);
一个创建表资源的示例:
TableResource resource = new TableResource(tableName, tablePrj,partitionSpec);
resource.setName("table_resource_name");
odps.resources().update(projectName, resource);
1.7、Functions
MaxCompute中所有函数的集合。集合中的元素为Function。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
odps.setDefaultProject("my_project");
for (Function f : odps.functions()) {
....
}
对函数信息的描述,可以通过Functions获取相应的函数。
程序示例如下:
Account account = new AliyunAccount("my_access_id", "my_access_key");
Odps odps = new Odps(account);
String odpsUrl = "<your odps endpoint>";
odps.setEndpoint(odpsUrl);
Function f = odps.functions().get("function name");
List<Resource> resources = f.getResources();
一个创建函数的示例:
String resources = "xxx:xxx";
String classType = "com.aliyun.odps.mapred.open.example.WordCount";
ArrayList<String> resourceList = new ArrayList<String>();
for (String r : resources.split(":")) {
resourceList.add(r);
}
Function func = new Function();
func.setName(name);
func.setClassType(classType);
func.setResources(resourceList);
odps.functions().create(projectName, func);
1.8、Tunnel
如果需要导出的查询结果就是某张表的全部内容(或者是具体的某个分区的全部内容),可以用SDK Tunnel导出。
程序示例如下:
package daniel.sixiang;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.common.collect.Lists;
public class ChimaeraCustomerODPSUtils {
private static Log logger = LogFactory.getLog(ChimaeraCustomerODPSUtils.class);
private String partition;
private static final String ACCESS_ID = "---";
private static final String ACCESS_KEY = "---";
private static final String project = "---";
private static final String table = "---";
private static final String FIELD_admin_mbr_seq = "---";
private static final String FIELD_state = "---";
private static final String FIELD_level = "---";
private static final String FIELD_org_level = "---";
private static final String FIELD_identity = "---";
private static String ODPS_URL = "---";
private static String TUNNEL_URL = "---";
public static void main(String[] args) {
ChimaeraCustomerODPSUtils customerODPSUtils = new ChimaeraCustomerODPSUtils("20190115");
System.out.println(customerODPSUtils.getCustomerFromODPS(1L, 10));
}
public ChimaeraCustomerODPSUtils(String dString) {
this.partition = "ds =" + dString;
}
public List<CustomerOdpsDTO> getCustomerFromODPS(Long start, int pageSize) {
List<CustomerOdpsDTO> custList = Lists.newArrayListWithExpectedSize(pageSize);
Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
Odps odps = new Odps(account);
odps.setEndpoint(ODPS_URL);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(TUNNEL_URL);
RecordReader recordReader = null;
try {
DownloadSession downloadSession;
PartitionSpec partitionSpec = new PartitionSpec(partition);
downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
long count = downloadSession.getRecordCount();
logger.warn("Session Status is : " + downloadSession.getStatus().toString());
if (count != 0) {
recordReader = downloadSession.openRecordReader(start, pageSize);
Record record;
while ((record = recordReader.read()) != null) {
CustomerOdpsDTO cust = buildCustomer(record, downloadSession.getSchema());
if (cust != null) {
custList.add(cust);
}
}
}
} catch (Exception e) {
logger.error("error ", e);
} finally {
if (recordReader != null) {
try {
recordReader.close();
} catch (IOException e) {
logger.error("odps_record_reader close error, e=", e);
}
}
}
return custList;
}
private CustomerOdpsDTO buildCustomer(Record record, TableSchema schema) {
CustomerOdpsDTO cust = new CustomerOdpsDTO();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
try {
if (StringUtils.equalsIgnoreCase(column.getName(), FIELD_admin_mbr_seq)) {
cust.setAliMemberId(record.getString(i));
} else if (StringUtils.equalsIgnoreCase(column.getName(), FIELD_state)) {
cust.setOper(record.getString(i));
} else if (StringUtils.equalsIgnoreCase(column.getName(), FIELD_level)) {
cust.setNewLevel(record.getString(i));
} else if (StringUtils.equalsIgnoreCase(column.getName(), FIELD_org_level)) {
cust.setOrgLevel(record.getString(i));
} else if (StringUtils.equalsIgnoreCase(column.getName(), FIELD_identity)) {
cust.setIdentity(record.getString(i));
}
} catch (Exception e) {
logger.error("read from column exception.", e);
return null;
}
}
return cust;
}
public Long getCustCountFromODPS() {
Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
Odps odps = new Odps(account);
odps.setEndpoint(ODPS_URL);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(TUNNEL_URL);
DownloadSession downloadSession;
try {
PartitionSpec partitionSpec = new PartitionSpec(partition);
downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
return downloadSession.getRecordCount();
} catch (TunnelException e) {
logger.error("TunnelException ,partition = " + partition, e);
return 0L;
} finally {
}
}
}
package daniel.sixiang;
import java.io.Serializable;
public class CustomerOdpsDTO implements Serializable {
private static final long serialVersionUID = -9075685189945269963L;
private String aliMemberId;
private String oper;
private String newLevel;
private String orgLevel;
private String identity;
public String getAliMemberId() {
return aliMemberId;
}
public void setAliMemberId(String aliMemberId) {
this.aliMemberId = aliMemberId;
}
public String getOper() {
return oper;
}
public void setOper(String oper) {
this.oper = oper;
}
public String getNewLevel() {
return newLevel;
}
public void setNewLevel(String newLevel) {
this.newLevel = newLevel;
}
public String getOrgLevel() {
return orgLevel;
}
public void setOrgLevel(String orgLevel) {
this.orgLevel = orgLevel;
}
public String getIdentity() {
return identity;
}
public void setIdentity(String identity) {
this.identity = identity;
}
@Override
public String toString() {
return "CustomerOdpsDTO [aliMemberId=" + aliMemberId + ", oper=" + oper + ", newLevel=" + newLevel
+ ", orgLevel=" + orgLevel + ", identity=" + identity + "]";
}
}
2、Java SDK实用小技巧
2.1、如何使用 SDK 运行安全相关命令
使用MaxCompute Console的同学,可能都使用过MaxCompute安全相关的命令。官方文档上有详细的MaxCompute安全指南,并给出了安全相关语句汇总。
简而言之,权限管理、列级别访问控制、项目空间安全配置以及跨项目空间的资源分享都属于MaxCompute安全命令相关的范畴。
再直白一点,以下列关键字开头的命令,都是MaxCompute安全相关操作命令:
GRANT/REVOKE ...
SHOW GRANTS/ACL/PACKAGE/LABEL/ROLE/PRINCIPALS
SHOW PRIV/PRIVILEGES
LIST/ADD/REOVE USERS/ROLES/TRUSTEDPROJECTS
DROP/CREATE ROLE
CLEAR EXPIRED GRANTS
DESC/DESCRIBE ROLE/PACKAGE
CREATE/DELETE/DROP PACKAGE
ADD ... TO PACKAGE
REMOVE ... FROM PACKAGE
ALLOW/DISALLOW PROJECT
INSTALL/UNINSTALL PACKAGE
LIST/ADD/REMOVE ACCOUNTPROVIDERS
SET LABLE ...
那么,这些能在 MaxCompute Console 上运行的命令,该如何使用 MaxCompute Java SDK 运行呢?它们是与 SQL 一样通过创建 instance 的方式来运行吗?
答案:不可以,这些命令不是 SQL , 不可以通过 SQL Task 来运行。
需要使用接口 SecurityManager.runQuery() 来运行。详细 SDK Java Doc 戳这里
SecurityManager 类在 odps-sdk-core 中,因此在使用时请添加依赖:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.29.11-oversea-public</version>
</dependency>
下面通过一个例子来演示如何通过 MaxCompute Java SDK 来设置表 test_label 列的访问级别为 2,也就是运行命令:SET LABEL 2 TO TABLE test_label(key, value);
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
public class test {
public static void main(String [] args) throws OdpsException {
try {
// init odps
Account account = new AliyunAccount("<your_accessid>", "<your_accesskey>");
Odps odps = new Odps(account);
odps.setEndpoint("http://service-corp.odps.aliyun-inc.com/api");
odps.setDefaultProject("<your_project>");
// create test table
// if u already have a table, skip this
TableSchema schema = new TableSchema();
schema.addColumn(new Column("key", OdpsType.STRING));
schema.addColumn(new Column("value", OdpsType.BIGINT));
odps.tables().create("test_label", schema);
// set label 2 to table columns
SecurityManager securityManager = odps.projects().get().getSecurityManager();
String res = securityManager.runQuery("SET LABEL 2 TO TABLE test_label(key, value);", false);
System.out.println(res);
} catch (OdpsException e) {
e.printStackTrace();
}
}
}
运行结果:
程序运行完成后,在 MaxCompute Console 中运行 ‘desc test_lable;’ 命令,可以看到 set label 已经生效了。
其他安全相关的命令,都可以这样通过 MaxCompute Java SDK 来运行!
2.2、如何使用 SDK 生成 instance Logview
场景
A: “用 MaxCompute Java SDK 跑作业,为什么卡住不动了?”
ME: “有 Logview 吗?发来看下。”
A: “没有,我用的是SDK,没Logview。”
用户 A 的问题在于没有 instance 的 logview,导致无法追踪 instance 的运行过程。
通常用户在创建 instance 后会调用 instance.waitForSuccess() 来等待作业运行完成,一旦作业耗时巨大,程序就卡在这一步了,此时如果有 logview ,就能查看追踪查看作业等待的具体原因了。
【 怎么使用 MaxCompute Java SDK 生成 instance Logview 】
答案很简单, MaxCompute Java SDK 提供了 logview 接口,详情可查看 SDK Java Doc
RunningJob rj = JobClient.runJob(job);
com.aliyun.odps.Instance instance = SessionState.get().getOdps().instances().get(rj.getInstanceID());
String logview = SessionState.get().getOdps().logview().generateLogView(instance, 7 * 24);
System.out.println(logview);
两个参数: instance 对象,logview token 超时时间 (单位:小时)
再次提醒用户,在使用 SDK 的时候,请为每个 instance 记录 Logview,一旦遇到问题可快速追踪。
当然如果改代码很麻烦,那还有一个绝招:在 MaxCompute Console 中使用 wait 命令也可以得到Logview。
2.3、如何使用 SDK 输出错误日志
场景
B :“用 MaxCompute Java SDK 访问 Table,为什么卡住半天没反应?”
ME:“卡在哪一行了?”
B:"就 RestClient retry 然后卡住了。"
用户 B 的问题在于 sdk 的 Restclient 本身有重试机制,从表面来看就是卡住了,没有任何输出。
如果在每次重试的时候都输出错误,就可以快速定位问题节约时间了。我已经遇到好几个公共云用户因为缺包导致一直卡住几分钟才丢出异常,严重影响了工作效率。
【 能不能在每次重试的时候,都把错误输出呢?】
当然可以。MaxCompute Java SDK 提供了抽象类 RetryLogger 详情可查看 SDK Java Doc
public static abstract class RetryLogger {
/**
* 当 RestClent 发生重试前的回调函数
*
* @param e
* 错误异常
* @param retryCount
* 重试计数
* @param retrySleepTime
* 下次需要的重试时间
*/
public abstract void onRetryLog(Throwable e, long retryCount, long retrySleepTime);
}
只需实现一个自己的 RetryLogger 子类,然后在初始化 odps 对象的时候使用 odps.getRestClient().setRetryLogger(new UserRetryLogger()); 就可以将日志输出。
一个典型的实现如下:
// init odps
odps.getRestClient().setRetryLogger(new UserRetryLogger());
// your retry logger
public class UserRetryLogger extends RetryLogger {
@Override
public void onRetryLog(Throwable e, long retryCount, long sleepTime) {
if (e != null && e instanceof OdpsException) {
String requestId = ((OdpsException) e).getRequestId();
if (requestId != null) {
System.err.println(String.format(
"Warning: ODPS request failed, requestID:%s, retryCount:%d, will retry in %d seconds.",
requestId, retryCount, sleepTime));
return;
}
}
System.err.println(String.format(
"Warning: ODPS request failed:%s, retryCount:%d, will retry in %d seconds.", e.getMessage(),retryCount,
sleepTime));
}
}
2.4、如何设置SQL参数
使用 DataWorks 或者 MaxCompute Console 提交SQL 经常需要设置一些flag,比如:
set odps.sql.type.system.odps2=true;
那么用SDK 提交 SQL的时候,要怎么设置呢?
特别注意,当使用SDK提交SQL,把set直接放到sql query中是不生效的,会报错的。
下面以Java SDK为例说明:
// 构造 SQLTask 对象
SQLTask task = new SQLTask();
task.setName("foobar");
task.setQuery("select ...");
// 设置flag
Map<String, String> settings = new HashMap<>();
settings.put("odps.sql.type.system.odps2", "true");
... // set other flags
task.setProperty("settings", new JSONObject(settings).toString()); // 这里是关键:将flags对应的json string设置到settings property中
// 执行
Instance instance = odps.instances().create(task);
2.5、SDK如何提交多条SQL
我们知道DataWorks中一个node可以包含多条SQL,而MaxCompute Console也可以通过odpscmd -f 来提交一个包含多条语句的文件。实际上这些提交方式,都是通过一个预处理,来把语句按照分号来拆分成多条语句,然后再一条一条运行的。
这个拆分的过程是在MaxCompute Console做的,而用户通过SDK提交作业,就需要自己做这个拆分。
一次提交一条SQL,如果一次提交了多句SQL,是会报错的。
MaxCompute是不是支持一次性提交多条SQL来执行呢?答案是肯定的,我们提供了脚本模式的功能。
脚本模式下,所有SQL是一次性提交,一次性执行的。注意用脚本模式提交,相当于让不同语句之间产生了关系,所有SQL作为一个整体执行,这个和MaxCompute Console 一句一句顺序执行的模式是完全不同的。同时也会引入一些限制,比如最多只有一个create table as 语句,settings和ddl必须放在开头等。如果不能满足这个条件,那么还是需要用户自己一条一条地拆分后提交。
脚本模式的正常脚本经常需要涉及到多条语句之间的关联操作(比如 table variable定义 和 table variable 引用是在不同的SQL语句中)。这时候,将SQL一句句拆开执行是不行的,因为定义variable的语句是在前一条语句执行的,和引用variable的语句完全是两个上下文,会报错 "variable xx cannot be resolved" 。 这时候,如果用别的客户端提交查询,也需要以脚本模式的方式提交
- MaxCompute Console 通过 -f 参数提交的查询是会将脚本一句一句拆分开的,如果也想要使用脚本模式来提交查询,那么需要使用-s 参数,如 odpscmd -s foo.sql 。注意目前MaxCompute Console 的交互式模式是不能提交脚本模式的查询的。
- DataWorks里面用"ODPS SQL"节点提交作业,是一句一句顺序执行的。按照脚本模式,需要在创建节点的时候,选择类型为 "ODPS Script"
- MaxCompute Studio 对脚本模式有非常强大的支持,推荐使用。
2.6、如何使用SQLTask+Tunnel实现大数据量导出
SQLTask不能处理超过1万条记录,但是Tunnel刚好可以,两者存在互补。所以可以基于两者完成大数据的导出。
以下用一个代码的例子来实现:
private static final String accessId = "userAccessId";
private static final String accessKey = "userAccessKey";
private static final String endPoint = "http://service.odps.aliyun.com/api";
private static final String project = "userProject";
private static final String sql = "userSQL";
private static final String table = "Tmp_"+UUID.randomUUID().toString().replace("-", "_");//其实也就是随便找了个随机字符串作为临时表的名字
private static final Odps odps = getOdps();
public static void main(String[] args) {
System.out.println(table);
runSql();
tunnel();
}
/*
* 把SQLTask的结果下载过来
* */
private static void tunnel() {
TableTunnel tunnel = new TableTunnel(odps);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(
project, table);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
/*
* 保存这条数据
* 数据量少的话直接打印后拷贝走也是一种取巧的方法。实际场景可以用Java.io写到本地文件,或者写到远端数据等各种目标保存起来。
* */
private static void consumeRecord(Record record, TableSchema schema) {
System.out.println(record.getString("username")+","+record.getBigint("cnt"));
}
/*
* 运行SQL,把查询结果保存成临时表,方便后面用Tunnel下载
* 这里保存数据的lifecycle为1天,所以哪怕删除步骤出了问题,也不会太浪费存储空间
* */
private static void runSql() {
Instance i;
StringBuilder sb = new StringBuilder("Create Table ").append(table)
.append(" lifecycle 1 as ").append(sql);
try {
System.out.println(sb.toString());
i = SQLTask.run(getOdps(), sb.toString());
i.waitForSuccess();
} catch (OdpsException e) {
e.printStackTrace();
}
}
/*
* 初始化MaxCompute连接信息
* */
private static Odps getOdps() {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
return odps;
}
二、MaxCompute Python SDK
1、Python SDK基本功能
1.1、获取项目空间、表操作
项目空间是MaxCompute的基本组织单元,类似于Database的概念,Table是MaxCompute的数据存储单元。可以通过get_project获取,也可以通过exist_project查看某个项目空间是否存在,具体如下:
project = odps.get_project('my_project') # 取到某个项目
project = odps.get_project() # 取到默认项目
通过调用 list_tables可以列出项目空间下的所有表,如下所示:
for table in odps.list_tables():
通过调用 exist_table可以判断表是否存在,通过调用 get_table可以获取表。
t = odps.get_table('dual')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2019-03-01 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
1.2、创建Schema、Table
通过表的列和可选的分区来初始化:
from odps.models import Schema, Column, Partition
columns = [Column(name='num', type='bigint', comment='the column')]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)
schema.columns
[<column num, type bigint>, <partition pt, type string>]
通过调用Schema.from_lists,虽然调用更加方便,但显然无法直接设置列和分区的注释:
schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
schema.columns
[<column num, type bigint>, <partition pt, type string>]
您可以使用表的Schema来创建表,操作如下所示:
table = odps.create_table('my_new_table', schema)
table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建
table = o.create_table('my_new_table', schema, lifecycle=7) # 设置生命周期
也可以使用逗号连接的字段名 字段类型字符串组合来创建表,操作如下所示:
# 创建非分区表
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
# 创建分区表可传入 (表字段列表, 分区字段列表)
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
在未经设置的情况下,创建表时,只允许使用bigint、double、decimal、string、datetime、boolean、map和array类型。
如果您的服务位于公共云,或者支持tinyint、struct等新类型,可以设置 options.sql.use_odps2_extension = True,以打开这些类型的支持,示例如下:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')
1.3、获取Table数据
通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据,如下所示:
t = odps.get_table('dual')
for record in t.head(3):
print(record[0]) # 取第0个位置的值
print(record['c_double_a']) # 通过字段取值
print(record[0: 3]) # 切片操作
print(record[0, 2, 3]) # 取多个位置的值
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
通过在table上执行open_reader操作,打开一个reader来读取数据。您可以使用with表达式,也可以不使用。
# 使用with表达式
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
# 处理一条记录
# 不使用with表达式
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10]
# 处理一条记录
通过使用Tunnel API读取表数据,open_reader操作其实也是对Tunnel API的封装。
1.4、写入Table数据
类似于 open_reader,table对象同样可以执行 open_writer来打开writer,并写数据。如下所示:
# 使用 with 表达式
with t.open_writer(partition='pt=test') as writer:
writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0
with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block
writer.write(0, gen_records(block=0))
writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的
# 不使用 with 表达式
writer = t.open_writer(partition='pt=test', blocks=[0, 1])
writer.write(0, gen_records(block=0))
writer.write(1, gen_records(block=1))
writer.close() # 不要忘记关闭 writer,否则数据可能写入不完全
1.5、执行SQL、Resource
执行SQL
odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
instance = odps.run_sql('select * from dual') # 异步的方式执行
instance.wait_for_success() # 阻塞直到完成
读取SQL执行结果
运行SQL的instance能够直接执行 open_reader的操作,一种情况是SQL返回了结构化的数据。
with odps.execute_sql('select * from dual').open_reader() as reader:
for record in reader:
# 处理每一个record
另一种情况是SQL可能执行的比如 desc,这时通过 reader.raw属性取到原始的SQL执行结果。
with odps.execute_sql('desc dual').open_reader() as reader:
print(reader.raw)
资源在MaxCompute上常用在UDF和MapReduce中。
列出所有资源还是可以使用list_resources,判断资源是否存在使用exist_resource。删除资源时,可以调用delete_resource,或者直接对于Resource对象调用drop方法。
在PyODPS中,主要支持两种资源类型,一种是文件,另一种是表。
文件资源包括基础的 file类型、以及 py、 jar和 archive。
创建文件资源
创建文件资源可以通过给定资源名、文件类型、以及一个file-like的对象(或者是字符串对象)来创建,示例如下:
resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象
resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串
读取和修改文件资源
对文件资源调用 open方法,或者在MaxCompute入口调用 open_resource都能打开一个资源, 打开后的对象会是file-like的对象。类似于Python内置的 open方法,文件资源也支持打开的模式。示例如下:
with resource.open('r') as fp: # 以读模式打开
content = fp.read() # 读取全部的内容
fp.seek(0) # 回到资源开头
lines = fp.readlines() # 读成多行
fp.write('Hello World') # 报错,读模式下无法写资源
with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开
fp.read()
fp.tell() # 当前位置
fp.seek(10)
fp.truncate() # 截断后面的内容
fp.writelines(['Hello\n', 'World\n']) # 写入多行
fp.write('Hello World')
fp.flush() # 手动调用会将更新提交到MaxCompute
所有支持的打开类型包括:
r:读模式,只能打开不能写。
w:写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空。
a:追加模式,只能写入内容到文件末尾。
r+:读写模式,能任意读写内容。
w+:类似于r+,但会先清空文件内容。
a+:类似于r+,但写入时只能写入文件末尾。
同时,PyODPS中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此rb就是指以二进制读模式打开文件,r+b是指以二进制读写模式打开。
表资源
创建表资源
odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
更新表资源
table_resource = odps.get_resource('test_table_resource')
table_resource.update(partition='pt=test2', project_name='my_project2')
1.6、DataFrame
PyODPS提供了DataFrame API,它提供了类似pandas的接口,但是能充分利用MaxCompute的计算能力。完整的DataFrame文档请参见DataFrame。
DataFrame的示例如下:
o = ODPS('**your-access-id**', '**your-secret-access-key**',
project='**your-project**', endpoint='**your-end-point**'))
此处以movielens 100K作为示例,假设已经有三张表,分别是pyodps_ml_100k_movies(电影相关的数据),pyodps_ml_100k_users(用户相关的数据),pyodps_ml_100k_ratings(评分有关的数据)。
只需传入Table对象,便可创建一个DataFrame对象。如下所示:
from odps.df import DataFrame
users = DataFrame(o.get_table('pyodps_ml_100k_users'))
通过dtypes属性来查看这个DataFrame有哪些字段,分别是什么类型,如下所示:
users.dtypes
通过head方法,可以获取前N条数据,方便快速预览数据。如下所示:
users.head(10)
有时候,并不需要都看到所有字段,便可以从中筛选出一部分。如下所示:
users[['user_id', 'age']].head(5)
有时候只是排除个别字段。如下所示:
>>> users.exclude('zip_code', 'age').head(5)
排除掉一些字段的同时,想要通过计算得到一些新的列,比如将sex为M的置为True,否则为False,并取名叫sex_bool。如下所示:
>>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
1.7、Configuration
PyODPS提供了一系列的配置选项,可通过 odps.options获得。可配置的MaxCompute选项,如下所示:
选项 | 说明 | 默认值 |
---|---|---|
end_point | MaxCompute Endpoint | None |
default_project | 默认Project | None |
log_view_host | LogView主机名 | None |
log_view_hours | LogView保持时间(小时) | 24 |
local_timezone | 使用的时区,True表示本地时间,False表示UTC,也可用pytz 的时区 | 1 |
lifecycle | 所有表生命周期 | None |
temp_lifecycle | 临时表生命周期 | 1 |
biz_id | 用户ID | None |
verbose | 是否打印日志 | False |
verbose_log | 日志接收器 | None |
chunk_size | 写入缓冲区大小 | 1496 |
retry_times | 请求重试次数 | 4 |
pool_connections | 缓存在连接池的连接数 10 | |
pool_maxsize | 连接池最大容量 | 10 |
connect_timeout | 连接超时 | 5 |
read_timeout | 读取超时 | 120 |
completion_size | 对象补全列举条数限制 | 10 |
notebook_repr_widget | 使用交互式图表 | True |
sql.settings | ODPS SQL运行全局hints | None |
sql.use_odps2_extension | 启用MaxCompute2.0语言扩展 | False |
数据上传/下载配置
选项 | 说明 | 默认值 |
---|---|---|
tunnel.endpoint | Tunnel Endpoint | None |
tunnel.use_instance_tunnel | 使用Instance Tunnel获取执行结果 | True |
tunnel.limited_instance_tunnel | 限制Instance Tunnel获取结果的条数 | True |
tunnel.string_as_binary | 在string类型中使用bytes而非unicode False |
DataFrame配置
选项 | 说明 | 默认值 |
---|---|---|
interactive | 是否在交互式环境 | 根据检测值 |
df.analyze | 是否启用非ODPS内置函数 | True |
df.optimize | 是否开启DataFrame全部优化 | True |
df.optimizes.pp | 是否开启DataFrame谓词下推优化 | True |
df.optimizes.cp | 是否开启DataFrame列剪裁优化 | True |
df.optimizes.tunnel | 是否开启DataFrame使用tunnel优化执行 | True |
df.quote | ODPS SQL后端是否用来标记字段和表名 | True |
df.libraries | DataFrame运行使用的第三方库(资源名) | None |
PyODPS ML配置
选项 | 说明 | 默认值 |
---|---|---|
ml.xflow_project | 默认Xflow工程名 | algo_public |
ml.use_model_transfer | 是否使用ModelTransfer获取模型PMML | True |
ml.model_volume | 在使用ModelTransfer时使用的Volume名称 | pyodps_volume |