开发者社区> 问答> 正文

MaxCompute用户指南:数据上传下载:批量数据通道SDK示例:简单上传示例


  1. import java.io.IOException;
  2. import java.util.Date;
  3. import com.aliyun.odps.Column;
  4. import com.aliyun.odps.Odps;
  5. import com.aliyun.odps.PartitionSpec;
  6. import com.aliyun.odps.TableSchema;
  7. import com.aliyun.odps.account.Account;
  8. import com.aliyun.odps.account.AliyunAccount;
  9. import com.aliyun.odps.data.Record;
  10. import com.aliyun.odps.data.RecordWriter;
  11. import com.aliyun.odps.tunnel.TableTunnel;
  12. import com.aliyun.odps.tunnel.TunnelException;
  13. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  14. public class UploadSample {
  15.          private static String accessId = "<your access id>";
  16.          private static String accessKey = "<your access Key>";
  17.          private static String odpsUrl = "http://service.odps.aliyun.com/api";
  18.          private static String project = "<your project>";
  19.          private static String table = "<your table name>";
  20.          private static String partition = "<your partition spec>";
  21.          public static void main(String args[]) {
  22.                  Account account = new AliyunAccount(accessId, accessKey);
  23.                  Odps odps = new Odps(account);
  24.                  odps.setEndpoint(odpsUrl);
  25.                  odps.setDefaultProject(project);
  26.                  try {
  27.                          TableTunnel tunnel = new TableTunnel(odps);
  28.                          PartitionSpec partitionSpec = new PartitionSpec(partition);
  29.                          UploadSession uploadSession = tunnel.createUploadSession(project,
  30.                                          table, partitionSpec);
  31.                          System.out.println("Session Status is : "
  32.                                          + uploadSession.getStatus().toString());
  33.                          TableSchema schema = uploadSession.getSchema();
  34.                           // 准备数据后打开Writer开始写入数据,准备数据后写入一个Block
  35.                           // 单个Block内写入数据过少会产生大量小文件 严重影响计算性能, 强烈建议每次写入64MB以上数据(100GB以内数据均可写入同一Block)
  36.                           // 可通过数据的平均大小与记录数量大致计算总量即 64MB < 平均记录大小*记录数 < 100GB
  37.                          RecordWriter recordWriter = uploadSession.openRecordWriter(0);
  38.                          Record record = uploadSession.newRecord();
  39.                          for (int i = 0; i < schema.getColumns().size(); i++) {
  40.                                  Column column = schema.getColumn(i);
  41.                                  switch (column.getType()) {
  42.                                  case BIGINT:
  43.                                          record.setBigint(i, 1L);
  44.                                          break;
  45.                                  case BOOLEAN:
  46.                                          record.setBoolean(i, true);
  47.                                          break;
  48.                                  case DATETIME:
  49.                                          record.setDatetime(i, new Date());
  50.                                          break;
  51.                                  case DOUBLE:
  52.                                          record.setDouble(i, 0.0);
  53.                                          break;
  54.                                  case STRING:
  55.                                          record.setString(i, "sample");
  56.                                          break;
  57.                                  default:
  58.                                          throw new RuntimeException("Unknown column type: "
  59.                                                          + column.getType());
  60.                                  }
  61.                          }
  62.                          for (int i = 0; i < 10; i++) {
  63.                                   // Write数据至服务端,每写入8KB数据会进行一次网络传输
  64.                                   // 若120s没有网络传输服务端将会关闭连接,届时该Writer将不可用,需要重新写入
  65.                                  recordWriter.write(record);
  66.                          }
  67.                          recordWriter.close();
  68.                          uploadSession.commit(new Long[]{0L});
  69.                          System.out.println("upload success!");
  70.                  } catch (TunnelException e) {
  71.                          e.printStackTrace();
  72.                  } catch (IOException e) {
  73.                          e.printStackTrace();
  74.                  }
  75.          }
  76. }

构造器举例说明:  
PartitionSpec(String spec):通过字符串构造此类对象。
参数说明:
spec:分区定义字符串,比如:pt=’1’,ds=’2’。
因此程序中应该配置如下:
private static String partition = “pt=’XXX’,ds=’XXX’”;

展开
收起
行者武松 2017-10-23 15:33:51 1996 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载
大数据&AI实战派 第2期 立即下载