开发者社区> 问答> 正文

MaxCompute用户指南:数据上传下载:批量数据通道SDK示例:多线程下载示例

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.List;
  5. import java.util.concurrent.Callable;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10. import com.aliyun.odps.Column;
  11. import com.aliyun.odps.Odps;
  12. import com.aliyun.odps.PartitionSpec;
  13. import com.aliyun.odps.TableSchema;
  14. import com.aliyun.odps.account.Account;
  15. import com.aliyun.odps.account.AliyunAccount;
  16. import com.aliyun.odps.data.Record;
  17. import com.aliyun.odps.data.RecordReader;
  18. import com.aliyun.odps.tunnel.TableTunnel;
  19. import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
  20. import com.aliyun.odps.tunnel.TunnelException;
  21. class DownloadThread implements Callable<Long> {
  22.          private long id;
  23.          private RecordReader recordReader;
  24.          private TableSchema tableSchema;
  25.          public DownloadThread(int id,
  26.                          RecordReader recordReader, TableSchema tableSchema) {
  27.                  this.id = id;
  28.                  this.recordReader = recordReader;
  29.                  this.tableSchema = tableSchema;
  30.          }
  31.          @Override
  32.          public Long call() {
  33.                  Long recordNum = 0L;
  34.                  try {
  35.                          Record record;
  36.                          while ((record = recordReader.read()) != null) {
  37.                                  recordNum++;
  38.                                  System.out.print("Thread " + id + "\t");
  39.                                  consumeRecord(record, tableSchema);
  40.                          }
  41.                          recordReader.close();
  42.                  } catch (IOException e) {
  43.                          e.printStackTrace();
  44.                  }
  45.                  return recordNum;
  46.          }
  47.          private static void consumeRecord(Record record, TableSchema schema) {
  48.                  for (int i = 0; i < schema.getColumns().size(); i++) {
  49.                          Column column = schema.getColumn(i);
  50.                          String colValue = null;
  51.                          switch (column.getType()) {
  52.                          case BIGINT: {
  53.                                  Long v = record.getBigint(i);
  54.                                  colValue = v == null ? null : v.toString();
  55.                                  break;
  56.                          }
  57.                          case BOOLEAN: {
  58.                                  Boolean v = record.getBoolean(i);
  59.                                  colValue = v == null ? null : v.toString();
  60.                                  break;
  61.                          }
  62.                          case DATETIME: {
  63.                                  Date v = record.getDatetime(i);
  64.                                  colValue = v == null ? null : v.toString();
  65.                                  break;
  66.                          }
  67.                          case DOUBLE: {
  68.                                  Double v = record.getDouble(i);
  69.                                  colValue = v == null ? null : v.toString();
  70.                                  break;
  71.                          }
  72.                          case STRING: {
  73.                                  String v = record.getString(i);
  74.                                  colValue = v == null ? null : v.toString();
  75.                                  break;
  76.                          }
  77.                          default:
  78.                                  throw new RuntimeException("Unknown column type: "
  79.                                                  + column.getType());
  80.                          }
  81.                          System.out.print(colValue == null ? "null" : colValue);
  82.                          if (i != schema.getColumns().size())
  83.                                  System.out.print("\t");
  84.                  }
  85.                  System.out.println();
  86.          }
  87. }
  88. public class DownloadThreadSample {
  89.          private static String accessId = "<your access id>";
  90.          private static String accessKey = "<your access Key>";
  91.          private static String odpsUrl = "http://service.odps.aliyun.com/api";
  92.          private static String project = "<your project>";
  93.          private static String table = "<your table name>";
  94.          private static String partition = "<your partition spec>";
  95.          private static int threadNum = 10;
  96.          public static void main(String args[]) {
  97.                  Account account = new AliyunAccount(accessId, accessKey);
  98.                  Odps odps = new Odps(account);
  99.                  odps.setEndpoint(odpsUrl);
  100.                  odps.setDefaultProject(project);
  101.                  TableTunnel tunnel = new TableTunnel(odps);
  102.                  PartitionSpec partitionSpec = new PartitionSpec(partition);
  103.                  DownloadSession downloadSession;
  104.                  try {
  105.                          downloadSession = tunnel.createDownloadSession(project, table,
  106.                                          partitionSpec);
  107.                          System.out.println("Session Status is : "
  108.                                          + downloadSession.getStatus().toString());
  109.                          long count = downloadSession.getRecordCount();
  110.                          System.out.println("RecordCount is: " + count);
  111.                          ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  112.                          ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
  113.                          long start = 0;
  114.                          long step = count / threadNum;
  115.                          for (int i = 0; i < threadNum - 1; i++) {
  116.                                  RecordReader recordReader = downloadSession.openRecordReader(
  117.                                                  step * i, step);
  118.                                  callers.add(new DownloadThread( i, recordReader, downloadSession.getSchema()));
  119.                          }
  120.                          RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count
  121.                                          - ((threadNum - 1) * step));
  122.                          callers.add(new DownloadThread( threadNum - 1, recordReader, downloadSession.getSchema()));
  123.                          Long downloadNum = 0L;
  124.                          List<Future<Long>> recordNum = pool.invokeAll(callers);
  125.                          for (Future<Long> num : recordNum)
  126.                                  downloadNum += num.get();
  127.                          System.out.println("Record Count is: " + downloadNum);
  128.                          pool.shutdown();
  129.                  } catch (TunnelException e) {
  130.                          e.printStackTrace();
  131.                  } catch (IOException e) {
  132.                          e.printStackTrace();
  133.                  } catch (InterruptedException e) {
  134.                          e.printStackTrace();
  135.                  } catch (ExecutionException e) {
  136.                          e.printStackTrace();
  137.                  }
  138.          }
  139. }

注意:
对于 Tunnel Endpoint,支持指定或者不指定。

  • 如果指定,按照指定的 Endpoint 下载。

  • 如果不指定,支持按照自动路由下载。

展开
收起
行者武松 2017-10-23 15:35:39 1886 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载
大数据&AI实战派 第2期 立即下载