开发者社区> 问答> 正文

MaxCompute快速入门:导入数据



MaxCompute 提供多种数据导入导出方式,如下所示:


  • 直接在客户端使用 Tunnel 命令

  • 通过 Tunnel 提供的 SDK 自行编写 Java 工具。

  • 通过 Flume 及 Fluentd 插件方式导入。

  • 通过大数据开发套件对数据导入和导出,详情请参见 数据集成概述。导出数据请参见 Tunnel 命令操作 中 Download 的相关命令。


Tunnel 命令导入数据



准备数据


假设您已准备本地文件 wc_example.txt,本地存放路径为 D:\odps\odps\bin,内容如下:  
  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!


创建 MaxCompute 表


您需要把上面的数据导入到 MaxCompute 的一张表中,所以需要创建 MaxCompute 表:
  1. CREATE TABLE wc_in (word string);


执行 tunnel 命令


输入表创建成功后,可以在 MaxCompute 客户端输入 Tunnel 命令进行数据的导入,如下所示:  
  1. tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;

执行成功后,查看表 wc_in 的记录,如下所示:

注意

  • 有关 Tunnel 命令的更多详细介绍,例如:如何将数据导入分区表等,请参见 Tunnel 操作

  • 当表中含有多个列时,可以通过 -fd 参数指定列分隔符。


Tunnel SDK


下文将通过场景示例,为您介绍如何利用 Tunnel SDK 上传数据。

场景描述


上传数据到 MaxCompute,其中,项目空间为 odps_public_dev,表名为 tunnel_sample_test,分区为 pt=20150801,dt=hangzhou。

操作步骤


  1. 创建表,添加分区,SQL 语句如下所示:CREATE TABLE IF NOT EXISTS tunnel_sample_test(
  2. id STRING,
  3. name STRING)
  4. PARTITIONED BY (pt STRING, dt STRING); --创建表
  5. ALTER TABLE tunnel_sample_test
  6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分区

创建 UploadSample 的工程目录结构,如下所示:  
  1. |---pom.xml
  2. |---src
  3. |---main
  4.     |---java  
  5.        |---com  
  6.            |---aliyun
  7.                |---odps  
  8.                    |---tunnel  
  9.                        |---example  
  10.                            |---UploadSample.java

目录说明:

  • pom.xml:maven 工程文件。

  • UploadSample:Tunnel 源文件。

编写 UploadSample 程序。代码如下所示:
  1. package com.aliyun.odps.tunnel.example;
  2. import java.io.IOException;
  3. import java.util.Date;
  4. import com.aliyun.odps.Column;
  5. import com.aliyun.odps.Odps;
  6. import com.aliyun.odps.PartitionSpec;
  7. import com.aliyun.odps.TableSchema;
  8. import com.aliyun.odps.account.Account;
  9. import com.aliyun.odps.account.AliyunAccount;
  10. import com.aliyun.odps.data.Record;
  11. import com.aliyun.odps.data.RecordWriter;
  12. import com.aliyun.odps.tunnel.TableTunnel;
  13. import com.aliyun.odps.tunnel.TunnelException;
  14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  15. public class UploadSample {
  16.       private static String accessId = "####";
  17.       private static String accessKey = "####";
  18.       private static String tunnelUrl = "http://dt.odps.aliyun.com";
  19.       private static String odpsUrl = "http://service.odps.aliyun.com/api";
  20.       private static String project = "odps_public_dev";
  21.       private static String table = "tunnel_sample_test";
  22.       private static String partition = "pt=20150801,dt=hangzhou";
  23.       public static void main(String args[]) {
  24.               Account account = new AliyunAccount(accessId, accessKey);
  25.               Odps odps = new Odps(account);
  26.               odps.setEndpoint(odpsUrl);
  27.               odps.setDefaultProject(project);
  28.               try {
  29.                       TableTunnel tunnel = new TableTunnel(odps);
  30.                       tunnel.setEndpoint(tunnelUrl);
  31.                       PartitionSpec partitionSpec = new PartitionSpec(partition);
  32.                       UploadSession uploadSession = tunnel.createUploadSession(project,
  33.                                       table, partitionSpec);
  34.                       System.out.println("Session Status is : "
  35.                                       + uploadSession.getStatus().toString());
  36.                       TableSchema schema = uploadSession.getSchema();
  37.                       RecordWriter recordWriter = uploadSession.openRecordWriter(0);
  38.                       Record record = uploadSession.newRecord();
  39.                       for (int i = 0; i < schema.getColumns().size(); i++) {
  40.                               Column column = schema.getColumn(i);
  41.                               switch (column.getType()) {
  42.                               case BIGINT:
  43.                                       record.setBigint(i, 1L);
  44.                                       break;
  45.                               case BOOLEAN:
  46.                                       record.setBoolean(i, true);
  47.                                       break;
  48.                               case DATETIME:
  49.                                       record.setDatetime(i, new Date());
  50.                                       break;
  51.                               case DOUBLE:
  52.                                       record.setDouble(i, 0.0);
  53.                                       break;
  54.                               case STRING:
  55.                                       record.setString(i, "sample");
  56.                                       break;
  57.                               default:
  58.                                       throw new RuntimeException("Unknown column type: "
  59.                                                       + column.getType());
  60.                               }
  61.                       }
  62.                       for (int i = 0; i < 10; i++) {
  63.                               recordWriter.write(record);
  64.                       }
  65.                       recordWriter.close();
  66.                       uploadSession.commit(new Long[]{0L});
  67.                       System.out.println("upload success!");
  68.               } catch (TunnelException e) {
  69.                       e.printStackTrace();
  70.               } catch (IOException e) {
  71.                       e.printStackTrace();
  72.               }
  73.       }
  74. }

注意
这里省略了 accessId 和 accesskey 的配置,实际运行时请换上您自己的 accessId 以及 accessKey。

配置 pom.xml 文件。如下所示:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.aliyun.odps.tunnel.example</groupId>
  7. <artifactId>UploadSample</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10.   <dependency>
  11.     <groupId>com.aliyun.odps</groupId>
  12.     <artifactId>odps-sdk-core</artifactId>
  13.     <version>0.20.7-public</version>
  14.   </dependency>
  15. </dependencies>
  16. <repositories>
  17.   <repository>
  18.   <id>alibaba</id>
  19.   <name>alibaba Repository</name>
  20.   <url>http://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
  21.   </repository>
  22. </repositories>
  23. </project>

编译与运行。
编译 UploadSample 工程,如下所示:
  1. mvn package

运行 UploadSample 程序,这里使用 eclipse 导入 maven project:

  1. 右击 java 工程 并单击 Import->Maven->Existing Maven Projects 设置如下:

  2. 右击 UploadSample.java 并单击 Run As->Run Configurations,如下所示:

  3. 单击 Run 运行成功,控制台显示如下:  Session Status is : NORMAL
  4. upload success!

查看运行结果。
您在客户端输入如下语句,即可查看运行结果。  
  1. select * from tunnel_sample_test;

显示结果如下:  
  1. +----+------+----+----+
  2. | id | name | pt | dt |
  3. +----+------+----+----+
  4. | sample | sample | 20150801 | hangzhou |
  5. | sample | sample | 20150801 | hangzhou |
  6. | sample | sample | 20150801 | hangzhou |
  7. | sample | sample | 20150801 | hangzhou |
  8. | sample | sample | 20150801 | hangzhou |
  9. | sample | sample | 20150801 | hangzhou |
  10. | sample | sample | 20150801 | hangzhou |
  11. | sample | sample | 20150801 | hangzhou |
  12. | sample | sample | 20150801 | hangzhou |
  13. | sample | sample | 20150801 | hangzhou |
  14. +----+------+----+----+

注意

  • Tunnel 作为 MaxCompute 中一个独立的服务,有专属的访问端口提供给大家。当您在阿里云内网环境中,使用 Tunnel 内网连接下载数据时,MaxCompute 不会将该操作产生的流量计入计费。此外内网地址仅对上海域的云产品有效。

  • MaxCompute 阿里云内网地址:http://odps-ext.aliyun-inc.com/api

  • MaxCompute 公网地址:http://service.odps.aliyun.com/api


其他导入方式


除了通过客户端及 Tunnel Java SDK 导入数据,阿里云数加数据集成、开源的Sqoop、Fluentd、Flume、LogStash 等工具都可以进行数据导入到 MaxCompute,具体介绍请参见 数据上传下载-工具介绍

展开
收起
行者武松 2017-10-23 15:16:45 2538 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载