开发者社区> 问答> 正文

MaxCompute用户指南:数据上传下载:批量数据通道SDK示例:多线程上传示例


  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import com.aliyun.odps.Column;
  8. import com.aliyun.odps.Odps;
  9. import com.aliyun.odps.PartitionSpec;
  10. import com.aliyun.odps.TableSchema;
  11. import com.aliyun.odps.account.Account;
  12. import com.aliyun.odps.account.AliyunAccount;
  13. import com.aliyun.odps.data.Record;
  14. import com.aliyun.odps.data.RecordWriter;
  15. import com.aliyun.odps.tunnel.TableTunnel;
  16. import com.aliyun.odps.tunnel.TunnelException;
  17. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  18. class UploadThread implements Callable<Boolean> {
  19.          private long id;
  20.          private RecordWriter recordWriter;
  21.          private Record record;
  22.          private TableSchema tableSchema;
  23.          public UploadThread(long id, RecordWriter recordWriter, Record record,
  24.                          TableSchema tableSchema) {
  25.                  this.id = id;
  26.                  this.recordWriter = recordWriter;
  27.                  this.record = record;
  28.                  this.tableSchema = tableSchema;
  29.          }
  30.          @Override
  31.          public Boolean call() {
  32.                  for (int i = 0; i < tableSchema.getColumns().size(); i++) {
  33.                          Column column = tableSchema.getColumn(i);
  34.                          switch (column.getType()) {
  35.                          case BIGINT:
  36.                                  record.setBigint(i, 1L);
  37.                                  break;
  38.                          case BOOLEAN:
  39.                                  record.setBoolean(i, true);
  40.                                  break;
  41.                          case DATETIME:
  42.                                  record.setDatetime(i, new Date());
  43.                                  break;
  44.                          case DOUBLE:
  45.                                  record.setDouble(i, 0.0);
  46.                                  break;
  47.                          case STRING:
  48.                                  record.setString(i, "sample");
  49.                                  break;
  50.                          default:
  51.                                  throw new RuntimeException("Unknown column type: "
  52.                                                  + column.getType());
  53.                          }
  54.                  }
  55.                  for (int i = 0; i < 10; i++) {
  56.                          try {
  57.                                  recordWriter.write(record);
  58.                          } catch (IOException e) {
  59.                                  recordWriter.close();
  60.                                  e.printStackTrace();
  61.                                  return false;
  62.                          }
  63.                  }
  64.                  recordWriter.close();
  65.                  return true;
  66.          }
  67. }
  68. public class UploadThreadSample {
  69.          private static String accessId = "<your access id>";
  70.          private static String accessKey = "<your access Key>";
  71.          private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
  72.          private static String project = "<your project>";
  73.          private static String table = "<your table name>";
  74.          private static String partition = "<your partition spec>";
  75.          private static int threadNum = 10;
  76.          public static void main(String args[]) {
  77.                  Account account = new AliyunAccount(accessId, accessKey);
  78.                  Odps odps = new Odps(account);
  79.                  odps.setEndpoint(odpsUrl);
  80.                  odps.setDefaultProject(project);
  81.                  try {
  82.                          TableTunnel tunnel = new TableTunnel(odps);
  83.                          PartitionSpec partitionSpec = new PartitionSpec(partition);
  84.                          UploadSession uploadSession = tunnel.createUploadSession(project,
  85.                                          table, partitionSpec);
  86.                          System.out.println("Session Status is : "
  87.                                          + uploadSession.getStatus().toString());
  88.                          ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  89.                          ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
  90.                          for (int i = 0; i < threadNum; i++) {
  91.                                  RecordWriter recordWriter = uploadSession.openRecordWriter(i);
  92.                                  Record record = uploadSession.newRecord();
  93.                                  callers.add(new UploadThread(i, recordWriter, record,
  94.                                                  uploadSession.getSchema()));
  95.                          }
  96.                          pool.invokeAll(callers);
  97.                          pool.shutdown();
  98.                          Long[] blockList = new Long[threadNum];
  99.                          for (int i = 0; i < threadNum; i++)
  100.                                  blockList = Long.valueOf(i);
  101.                          uploadSession.commit(blockList);
  102.                          System.out.println("upload success!");
  103.                  } catch (TunnelException e) {
  104.                          e.printStackTrace();
  105.                  } catch (IOException e) {
  106.                          e.printStackTrace();
  107.                  } catch (InterruptedException e) {
  108.                          e.printStackTrace();
  109.                  }
  110.          }
  111. }

注意:
对于 Tunnel Endpoint,支持指定或者不指定。

  • 如果指定,按照指定的 Endpoint 路由。

  • 如果不指定,支持自动路由。

展开
收起
行者武松 2017-10-23 15:35:11 1715 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
从 SDK 到编解码:视频直播架构解析 立即下载
跨平台的云服务SDK需要什么 立即下载
一个跨平台的云服务SDK需要什么 立即下载