【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

2.5 tunnel upload 扫描文件


t_tunnel建表参考:

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

欲将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv上传至t_tunnel,可以在上传前先做个预扫描

tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel --scan=only;

2.6 tunnel upload 行、列分隔符


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):


                   drop table if exists t_tunnel;
                   create table t_tunnel (id int, name string);

将文件E:\ODPS_DEMO\resources\02-DataTransfer\people_delimeter.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\people_delimiter.csv t_tunnel -fd || -rd &;
read t_tunnel;

指定数据文件的列分隔符 –fd || , 行分隔符 –rd &


2.7 tunnel upload多线程


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

使用多个线程将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel -threads 8;
read t_tunnel;
count t_tunnel;


当数据文件较大地,可以通过-threads N 指定N个线程同时进行装载,加快速度


2.8 tunnel download 非分区表


将表 t_tunnel 下载到本地文件 t_tunnel.csv


tunnel download t_tunnel t_tunnel.csv;


数据下载当前路径下,可以指定导出文件的具体路径,指定path\t_tunnel.csv 即可 path 根据自己的机器情况指定


2.9 tunnel download 分区表


将表 t_tunnel_p 下载到本地文件 t_tunnel_p.csv


tunnel download t_tunnel_p/gender='male' t_tunnel_p.csv;


第 3 章:tunnel JAVA SDK 定制开发数


3.1 安装配置eclipse开发环境


首先,解压下载的eclipse-java-luna-SR2-win32-x86_64.zip到E:\ODPS_DEMO,然后执行以下操作:


(1) 将E:\ODPS_DEMO\InstallMedia\odps-eclipse-plugin-bundle-0.16.0.jar 拷贝至目录E:\ODPS_DEMO\eclipse\plugins

20200711134452720.png


(2) 执行E:\ODPS_DEMO\eclipse\eclipse.exe,打开 eclipse,点击 New -> Other

(3) 配置 odps


20200711134525781.png

20200711134538594.png


(4) 指定 JavaProject Name:

20200711134555362.png


3.2 单线程上传文件


此部分建表语句参考:

drop table if exists t_tunnel_sdk;

create table t_tunnel_sdk (

id    int,
name string
)
partitioned by (gender string);

本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\UploadSample.java


(1) 新增 Java 类:

(2) 类名为UploadSample,包名为 DTSample

(3) 设计该类实现功能为将单个文件上传至ODPS的表中,需要的输入参数为:


-f <file_name>
-t <table_name>
-c <config_file>
[-p ]
[-fd <field_delimiter>]

编写方法 printUsage,提示调用语法:

private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }

编写获取、解析输入参数的方法 :

private static String accessId;
  private static String accessKey;  
  private static String OdpsEndpoint;
  private static String TunnelEndpoint;
  private static String project;
  private static String table;
  private static String partition;
  private static String fieldDelimeter;
  private static String file;
  private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }
  private static void parseArgument(String[] args) {
    for (int i = 0; i < args.length; i++) {
      if ("-f".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("source file not specified in -f");
        }
        file = args[i];
      }
      else if ("-t".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("ODPS table not specified in -t");
        }
        table = args[i];
      } 
      else if ("-c".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "ODPS configuration file not specified in -c");
        }
        try {
          InputStream is = new FileInputStream(args[i]);
          Properties props = new Properties();
          props.load(is);
          accessId = props.getProperty("access_id");
          accessKey = props.getProperty("access_key");
          project = props.getProperty("project_name");
          OdpsEndpoint = props.getProperty("end_point");
          TunnelEndpoint = props.getProperty("tunnel_endpoint");
        } catch (IOException e) {
          throw new IllegalArgumentException(
              "Error reading ODPS config file '" + args[i] + "'.");
        }
      }
      else if ("-p".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "odps table partition not specified in -p");
        }
        partition = args[i];
      } 
      else if ("-fd".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
             "odps table partition not specified in -p");
        }
        fieldDelimeter = args[i];
      } 
    } 
    if(file == null) {
      throw new IllegalArgumentException(
          "Missing argument -f file");
    }
    if (table == null) {
      throw new IllegalArgumentException(
          "Missing argument -t table");
    }  
    if (accessId == null || accessKey == null || 
        project == null || OdpsEndpoint == null || TunnelEndpoint == null) {
      throw new IllegalArgumentException(
          "ODPS conf not set, please check -c odps.conf");
      }
  }

(4) 编写方法,从文件中读出记录,同时将这些记录格式化

 读出记录,逐列处理
  import java.text.DecimalFormat;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.Date;
  import java.util.TimeZone;
  import com.aliyun.odps.Column;
  import com.aliyun.odps.OdpsType;
  import com.aliyun.odps.TableSchema;
  import com.aliyun.odps.data.ArrayRecord;
  import com.aliyun.odps.data.Record;
class RecordConverter {
  private TableSchema schema;
  private String nullTag;
  private SimpleDateFormat dateFormater;
  private DecimalFormat doubleFormat;
  private String DEFAULT_DATE_FORMAT_PATTERN = "yyyyMMddHHmmss";
  public RecordConverter(TableSchema schema, String nullTag, String dateFormat,
      String tz) {
    this.schema = schema;
    this.nullTag = nullTag;
    if (dateFormat == null) {
      this.dateFormater = new SimpleDateFormat(DEFAULT_DATE_FORMAT_PATTERN);
    } else {
      dateFormater = new SimpleDateFormat(dateFormat);
    }
    dateFormater.setLenient(false);
    dateFormater.setTimeZone(TimeZone.getTimeZone(tz == null ? "GMT" : tz));
    doubleFormat = new DecimalFormat();
    doubleFormat.setMinimumFractionDigits(0);
    doubleFormat.setMaximumFractionDigits(20);
  }
  /**
   * record to String array
   * */
  public String[] format(Record r) {
    int cols = schema.getColumns().size();
    String[] line = new String[cols];
    String colValue = null;
    for (int i = 0; i < cols; i++) {
      Column column = schema.getColumn(i);
      OdpsType t = column.getType();
      switch (t) {
      case BIGINT: {
        Long v = r.getBigint(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case DOUBLE: {
        Double v = r.getDouble(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = doubleFormat.format(v).replaceAll(",", "");
        }
        break;
      }
      case DATETIME: {
        Date v = r.getDatetime(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = dateFormater.format(v);
        }
        break;
      }
      case BOOLEAN: {
        Boolean v = r.getBoolean(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case STRING: {
        String v = r.getString(i);
        colValue = (v == null ? null : v.toString());
        break;
      }
      default:
        throw new RuntimeException("Unknown column type: " + t);
      }
      if (colValue == null) {
        line[i] = nullTag;
      } else {
        line[i] = colValue;
      }
    }
    return line;
  }
  /**
   * String array to record
   * @throws ParseException 
   * */
  public Record parse(String[] line){
    if (line == null) {
      return null;
    }
    int columnCnt = schema.getColumns().size();
    Column[] cols = new Column[columnCnt];
    for (int i = 0; i < columnCnt; ++i) {
      Column c = new Column(schema.getColumn(i).getName(), 
          schema.getColumn(i).getType());          
      cols[i] = c;
    }
    ArrayRecord r = new ArrayRecord(cols);
    int i = 0;
    for (String v : line) {
      if (v.equals(nullTag)) {
        i++;
        continue;
      }
      if (i >= columnCnt) {
        break;
      }
      OdpsType type = schema.getColumn(i).getType();
      switch (type) {
      case BIGINT:
        r.setBigint(i, Long.valueOf(v));
        break;
      case DOUBLE:
        r.setDouble(i, Double.valueOf(v));
        break;
      case DATETIME:
        try {
          r.setDatetime(i, dateFormater.parse(v));
        } catch (ParseException e) {
          throw new RuntimeException(e.getMessage());
        }
        break;
      case BOOLEAN:
        v = v.trim().toLowerCase();
        if (v.equals("true") || v.equals("false")) {
          r.setBoolean(i, v.equals("true") ? true : false);
        } else if (v.equals("0") || v.equals("1")) {
          r.setBoolean(i, v.equals("1") ? true : false);
        } else {
          throw new RuntimeException(
              "Invalid boolean type, expect: true|false|0|1");
        }
        break;
      case STRING:
        r.setString(i, v);
        break;
      default:
        throw new RuntimeException("Unknown column type");
      }
      i++;
    }
    return r;
  }
}

(5) 编写主方法,读取文件,上传到odps表中去:

public static void main(String args[]) {
    try {
       parseArgument(args);
     } catch (IllegalArgumentException e) {
       printUsage(e.getMessage());
       System.exit(2);
     }    
     Account account = new AliyunAccount(accessId, accessKey);
     Odps odps = new Odps(account);
     odps.setDefaultProject(project);
     odps.setEndpoint(OdpsEndpoint);  
     BufferedReader br = null;
     try {      
       TableTunnel tunnel = new TableTunnel(odps);
       tunnel.setEndpoint(TunnelEndpoint);
       TableTunnel.UploadSession uploadSession = null;
       if(partition != null) {
         PartitionSpec spec = new PartitionSpec(partition);
         System.out.println(spec.toString());
         uploadSession=tunnel.createUploadSession(project, table,spec);
         System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }
       else
       {
         uploadSession= tunnel.createUploadSession(project, table);
         //System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }        
       Long blockid = (long) 0;
       RecordWriter recordWriter = uploadSession.openRecordWriter(blockid, true);
       Record record = uploadSession.newRecord();
       TableSchema schema = uploadSession.getSchema();      
       RecordConverter converter = new RecordConverter(schema, "NULL", null, null);      
       br = new BufferedReader(new FileReader(file));
       Pattern pattern = Pattern.compile(fieldDelimeter);
       String line = null;
       while ((line = br.readLine()) != null) {
         String[] items=pattern.split(line,0);
         record = converter.parse(items);
         recordWriter.write(record);
       }       
       recordWriter.close();      
       Long[] blocks = {blockid};
       uploadSession.commit(blocks);   
     System.out.println("Upload succeed!");
     } catch (TunnelException e) {
       e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }finally {
       try {
         if (br != null)
           br.close();
       } catch (IOException ex) {
         ex.printStackTrace();
       }
     }
  } 

(6) 在 ODPS 中建表:

odpscmd -f E:\ODPS_DEMO\resources\02-DataTransfer\crt_tbl.sql

(7) 在 eclipse 中设置测试运行参数:

2020071113481976.png

将下列参数填入program parameter:


-f E:\ODPS_DEMO\resources\02-DataTransfer\uploadDataSet.csv -t t_tunnel_sdk -p ‘gender=“Male”’ -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini


20200711134839315.png


通过console查看输出信息:

去ODPS project里检查上传结果:

read t_tunnel_sdk;


20200711134903229.png


3.3 单线程下载文件


本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\DownloadSample.java


(1) 在已有的名称为 DataTransfer 的 Java project中的 DTSample 包下新增 Java 类:

(2) 编写方法 printUsage,提示调用该程序的输入参数;


(3) 编写方法 parseArgument,获取并解析输入参数;


(4) 编写类RecordConverter用来格式化数据,生成 Record 记录;


(5) 编写方法 main 方法,实现单线程数据下载:


//根据输入参数中的配置文件,配置阿里云账号
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setDefaultProject(project);
  odps.setEndpoint(OdpsEndpoint);
//基于上述云账号,创建服务入口类
  TableTunnel tunnel = new TableTunnel(odps);
  tunnel.setEndpoint(TunnelEndpoint);
//创建从上述odps服务通道中下载数据的会话,分为分区的表和非分区表两种:
  TableTunnel.DownloadSession session;
  if(partition != null) {
    PartitionSpec spec = new PartitionSpec(partition);
    session= tunnel.createDownloadSession(project, table, spec);
  }else{
    session= tunnel.createDownloadSession(project, table);
  }
//从odps表中读出记录,格式化后,写入到本地文件:
  RecordReader reader = session.openRecordReader(0L, session.getRecordCount(),
 true);
  TableSchema schema = session.getSchema();
  Record record;
  RecordConverter converter = new RecordConverter(schema, "NULL", null, null);
  String[] items = new String[schema.getColumns().size()];
  while ((record = reader.read()) != null) {
    items = converter.format(record);
    for(int i=0; i<items.length; ++i) {
      if(i>0) out.write(fieldDelimeter.getBytes());
      out.write(items[i].getBytes());
    }
    out.write(lineDelimiter.getBytes());
  }
  reader.close();
  out.close();

(6) 在 eclipse 中设置测试运行参数:

将下列参数填入 program parameter


-f E:\ODPS_DEMO\resources\02-DataTransfer\downloadMaleDataSet.csv -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini -t t_tunnel_sdk -p ‘gender=“Male”’

20200711135003353.png


去查看下载文件E:\ODPS_DEMO\resources\02-DataTransfer\downloadMale

DataSet.csv,并和ODPS表 t_tunnel_sdk 中的数据对比。


第 4 章:实验总结


4.1 实验总结


MaxCompute提供的数据传输功能,tunnel命令集方便我们上传本地数据到单表、分区表,并支持上传时自定义设计列分隔符、行分隔符、及数据容错等能力,数据量较大,还可以指定线程数据来加速数据的上传,实验详细介绍了日常工作中常用的功能。


另外,MaxCompute还提供了tunnel JAVA SDK 方便我们进行程序开发时使用数据传输功能

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8天前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
29 1
|
2月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
2月前
|
人工智能 分布式计算 架构师
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
|
2月前
|
机器学习/深度学习 搜索推荐 算法
飞天大数据平台产品问题之AIRec在阿里巴巴飞天大数据平台中的功能如何解决
飞天大数据平台产品问题之AIRec在阿里巴巴飞天大数据平台中的功能如何解决
|
2月前
|
存储 机器学习/深度学习 数据采集
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
|
2月前
|
SQL 存储 分布式计算
MaxCompute 入门:大数据处理的第一步
【8月更文第31天】在当今数字化转型的时代,企业和组织每天都在产生大量的数据。有效地管理和分析这些数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个用于处理海量数据的大规模分布式计算服务。它提供了强大的存储能力以及丰富的数据处理功能,让开发者能够快速构建数据仓库、实时报表系统、数据挖掘等应用。本文将介绍 MaxCompute 的基本概念、架构,并演示如何开始使用这一大数据处理平台。
374 0
|
3月前
|
分布式计算 安全 大数据
HAS插件式Kerberos认证框架:构建安全可靠的大数据生态系统
在教育和科研领域,研究人员需要共享大量数据以促进合作。HAS框架可以提供一个安全的数据共享平台,确保数据的安全性和合规性。
|
2月前
|
SQL 分布式计算 大数据
"大数据计算难题揭秘:MaxCompute中hash join内存超限,究竟该如何破解?"
【8月更文挑战第20天】在大数据处理领域,阿里云的MaxCompute以高效稳定著称,但复杂的hash join操作常导致内存超限。本文通过一个实例解析此问题:数据分析师小王需对两个共计300GB的大表进行join,却遭遇内存不足。经分析发现,单个mapper任务内存默认为2GB,不足以支持大型hash表的构建。为此,提出三种解决方案:1) 提升mapper任务内存;2) 利用map join优化小表连接;3) 实施分而治之策略,将大表分割后逐一处理再合并结果。这些方法有助于提升大数据处理效率及稳定性。
60 0
|
2月前
|
SQL 分布式计算 大数据
"揭秘MaxCompute大数据秘术:如何用切片技术在数据海洋中精准打捞?"
【8月更文挑战第20天】在大数据领域,MaxCompute(曾名ODPS)作为阿里集团自主研发的服务,提供强大、可靠且易用的大数据处理平台。数据切片是其提升处理效率的关键技术之一,它通过将数据集分割为小块来优化处理流程。使用MaxCompute进行切片可显著提高查询性能、支持并行处理、简化数据管理并增强灵活性。例如,可通过SQL按时间或其他维度对数据进行切片。此外,MaxCompute还支持高级切片技术如分区表和分桶表等,进一步加速数据处理速度。掌握这些技术有助于高效应对大数据挑战。
86 0
|
2月前
|
SQL 开发框架 大数据
【数据挖掘】顺丰科技2022年秋招大数据挖掘与分析工程师笔试题
顺丰科技2022年秋招大数据挖掘与分析工程师笔试题解析,涵盖了多领域选择题和编程题,包括动态规划、数据库封锁协议、概率论、SQL、排序算法等知识点。
80 0