【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

本文涉及的产品
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

2.5 tunnel upload 扫描文件


t_tunnel建表参考:

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

欲将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv上传至t_tunnel,可以在上传前先做个预扫描

tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel --scan=only;

2.6 tunnel upload 行、列分隔符


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):


                   drop table if exists t_tunnel;
                   create table t_tunnel (id int, name string);

将文件E:\ODPS_DEMO\resources\02-DataTransfer\people_delimeter.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\people_delimiter.csv t_tunnel -fd || -rd &;
read t_tunnel;

指定数据文件的列分隔符 –fd || , 行分隔符 –rd &


2.7 tunnel upload多线程


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

使用多个线程将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel -threads 8;
read t_tunnel;
count t_tunnel;


当数据文件较大地,可以通过-threads N 指定N个线程同时进行装载,加快速度


2.8 tunnel download 非分区表


将表 t_tunnel 下载到本地文件 t_tunnel.csv


tunnel download t_tunnel t_tunnel.csv;


数据下载当前路径下,可以指定导出文件的具体路径,指定path\t_tunnel.csv 即可 path 根据自己的机器情况指定


2.9 tunnel download 分区表


将表 t_tunnel_p 下载到本地文件 t_tunnel_p.csv


tunnel download t_tunnel_p/gender='male' t_tunnel_p.csv;


第 3 章:tunnel JAVA SDK 定制开发数


3.1 安装配置eclipse开发环境


首先,解压下载的eclipse-java-luna-SR2-win32-x86_64.zip到E:\ODPS_DEMO,然后执行以下操作:


(1) 将E:\ODPS_DEMO\InstallMedia\odps-eclipse-plugin-bundle-0.16.0.jar 拷贝至目录E:\ODPS_DEMO\eclipse\plugins

20200711134452720.png


(2) 执行E:\ODPS_DEMO\eclipse\eclipse.exe,打开 eclipse,点击 New -> Other

(3) 配置 odps


20200711134525781.png

20200711134538594.png


(4) 指定 JavaProject Name:

20200711134555362.png


3.2 单线程上传文件


此部分建表语句参考:

drop table if exists t_tunnel_sdk;

create table t_tunnel_sdk (

id    int,
name string
)
partitioned by (gender string);

本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\UploadSample.java


(1) 新增 Java 类:

(2) 类名为UploadSample,包名为 DTSample

(3) 设计该类实现功能为将单个文件上传至ODPS的表中,需要的输入参数为:


-f <file_name>
-t <table_name>
-c <config_file>
[-p ]
[-fd <field_delimiter>]

编写方法 printUsage,提示调用语法:

private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }

编写获取、解析输入参数的方法 :

private static String accessId;
  private static String accessKey;  
  private static String OdpsEndpoint;
  private static String TunnelEndpoint;
  private static String project;
  private static String table;
  private static String partition;
  private static String fieldDelimeter;
  private static String file;
  private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }
  private static void parseArgument(String[] args) {
    for (int i = 0; i < args.length; i++) {
      if ("-f".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("source file not specified in -f");
        }
        file = args[i];
      }
      else if ("-t".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("ODPS table not specified in -t");
        }
        table = args[i];
      } 
      else if ("-c".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "ODPS configuration file not specified in -c");
        }
        try {
          InputStream is = new FileInputStream(args[i]);
          Properties props = new Properties();
          props.load(is);
          accessId = props.getProperty("access_id");
          accessKey = props.getProperty("access_key");
          project = props.getProperty("project_name");
          OdpsEndpoint = props.getProperty("end_point");
          TunnelEndpoint = props.getProperty("tunnel_endpoint");
        } catch (IOException e) {
          throw new IllegalArgumentException(
              "Error reading ODPS config file '" + args[i] + "'.");
        }
      }
      else if ("-p".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "odps table partition not specified in -p");
        }
        partition = args[i];
      } 
      else if ("-fd".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
             "odps table partition not specified in -p");
        }
        fieldDelimeter = args[i];
      } 
    } 
    if(file == null) {
      throw new IllegalArgumentException(
          "Missing argument -f file");
    }
    if (table == null) {
      throw new IllegalArgumentException(
          "Missing argument -t table");
    }  
    if (accessId == null || accessKey == null || 
        project == null || OdpsEndpoint == null || TunnelEndpoint == null) {
      throw new IllegalArgumentException(
          "ODPS conf not set, please check -c odps.conf");
      }
  }

(4) 编写方法,从文件中读出记录,同时将这些记录格式化

 读出记录,逐列处理
  import java.text.DecimalFormat;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.Date;
  import java.util.TimeZone;
  import com.aliyun.odps.Column;
  import com.aliyun.odps.OdpsType;
  import com.aliyun.odps.TableSchema;
  import com.aliyun.odps.data.ArrayRecord;
  import com.aliyun.odps.data.Record;
class RecordConverter {
  private TableSchema schema;
  private String nullTag;
  private SimpleDateFormat dateFormater;
  private DecimalFormat doubleFormat;
  private String DEFAULT_DATE_FORMAT_PATTERN = "yyyyMMddHHmmss";
  public RecordConverter(TableSchema schema, String nullTag, String dateFormat,
      String tz) {
    this.schema = schema;
    this.nullTag = nullTag;
    if (dateFormat == null) {
      this.dateFormater = new SimpleDateFormat(DEFAULT_DATE_FORMAT_PATTERN);
    } else {
      dateFormater = new SimpleDateFormat(dateFormat);
    }
    dateFormater.setLenient(false);
    dateFormater.setTimeZone(TimeZone.getTimeZone(tz == null ? "GMT" : tz));
    doubleFormat = new DecimalFormat();
    doubleFormat.setMinimumFractionDigits(0);
    doubleFormat.setMaximumFractionDigits(20);
  }
  /**
   * record to String array
   * */
  public String[] format(Record r) {
    int cols = schema.getColumns().size();
    String[] line = new String[cols];
    String colValue = null;
    for (int i = 0; i < cols; i++) {
      Column column = schema.getColumn(i);
      OdpsType t = column.getType();
      switch (t) {
      case BIGINT: {
        Long v = r.getBigint(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case DOUBLE: {
        Double v = r.getDouble(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = doubleFormat.format(v).replaceAll(",", "");
        }
        break;
      }
      case DATETIME: {
        Date v = r.getDatetime(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = dateFormater.format(v);
        }
        break;
      }
      case BOOLEAN: {
        Boolean v = r.getBoolean(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case STRING: {
        String v = r.getString(i);
        colValue = (v == null ? null : v.toString());
        break;
      }
      default:
        throw new RuntimeException("Unknown column type: " + t);
      }
      if (colValue == null) {
        line[i] = nullTag;
      } else {
        line[i] = colValue;
      }
    }
    return line;
  }
  /**
   * String array to record
   * @throws ParseException 
   * */
  public Record parse(String[] line){
    if (line == null) {
      return null;
    }
    int columnCnt = schema.getColumns().size();
    Column[] cols = new Column[columnCnt];
    for (int i = 0; i < columnCnt; ++i) {
      Column c = new Column(schema.getColumn(i).getName(), 
          schema.getColumn(i).getType());          
      cols[i] = c;
    }
    ArrayRecord r = new ArrayRecord(cols);
    int i = 0;
    for (String v : line) {
      if (v.equals(nullTag)) {
        i++;
        continue;
      }
      if (i >= columnCnt) {
        break;
      }
      OdpsType type = schema.getColumn(i).getType();
      switch (type) {
      case BIGINT:
        r.setBigint(i, Long.valueOf(v));
        break;
      case DOUBLE:
        r.setDouble(i, Double.valueOf(v));
        break;
      case DATETIME:
        try {
          r.setDatetime(i, dateFormater.parse(v));
        } catch (ParseException e) {
          throw new RuntimeException(e.getMessage());
        }
        break;
      case BOOLEAN:
        v = v.trim().toLowerCase();
        if (v.equals("true") || v.equals("false")) {
          r.setBoolean(i, v.equals("true") ? true : false);
        } else if (v.equals("0") || v.equals("1")) {
          r.setBoolean(i, v.equals("1") ? true : false);
        } else {
          throw new RuntimeException(
              "Invalid boolean type, expect: true|false|0|1");
        }
        break;
      case STRING:
        r.setString(i, v);
        break;
      default:
        throw new RuntimeException("Unknown column type");
      }
      i++;
    }
    return r;
  }
}

(5) 编写主方法,读取文件,上传到odps表中去:

public static void main(String args[]) {
    try {
       parseArgument(args);
     } catch (IllegalArgumentException e) {
       printUsage(e.getMessage());
       System.exit(2);
     }    
     Account account = new AliyunAccount(accessId, accessKey);
     Odps odps = new Odps(account);
     odps.setDefaultProject(project);
     odps.setEndpoint(OdpsEndpoint);  
     BufferedReader br = null;
     try {      
       TableTunnel tunnel = new TableTunnel(odps);
       tunnel.setEndpoint(TunnelEndpoint);
       TableTunnel.UploadSession uploadSession = null;
       if(partition != null) {
         PartitionSpec spec = new PartitionSpec(partition);
         System.out.println(spec.toString());
         uploadSession=tunnel.createUploadSession(project, table,spec);
         System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }
       else
       {
         uploadSession= tunnel.createUploadSession(project, table);
         //System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }        
       Long blockid = (long) 0;
       RecordWriter recordWriter = uploadSession.openRecordWriter(blockid, true);
       Record record = uploadSession.newRecord();
       TableSchema schema = uploadSession.getSchema();      
       RecordConverter converter = new RecordConverter(schema, "NULL", null, null);      
       br = new BufferedReader(new FileReader(file));
       Pattern pattern = Pattern.compile(fieldDelimeter);
       String line = null;
       while ((line = br.readLine()) != null) {
         String[] items=pattern.split(line,0);
         record = converter.parse(items);
         recordWriter.write(record);
       }       
       recordWriter.close();      
       Long[] blocks = {blockid};
       uploadSession.commit(blocks);   
     System.out.println("Upload succeed!");
     } catch (TunnelException e) {
       e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }finally {
       try {
         if (br != null)
           br.close();
       } catch (IOException ex) {
         ex.printStackTrace();
       }
     }
  } 

(6) 在 ODPS 中建表:

odpscmd -f E:\ODPS_DEMO\resources\02-DataTransfer\crt_tbl.sql

(7) 在 eclipse 中设置测试运行参数:

2020071113481976.png

将下列参数填入program parameter:


-f E:\ODPS_DEMO\resources\02-DataTransfer\uploadDataSet.csv -t t_tunnel_sdk -p ‘gender=“Male”’ -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini


20200711134839315.png


通过console查看输出信息:

去ODPS project里检查上传结果:

read t_tunnel_sdk;


20200711134903229.png


3.3 单线程下载文件


本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\DownloadSample.java


(1) 在已有的名称为 DataTransfer 的 Java project中的 DTSample 包下新增 Java 类:

(2) 编写方法 printUsage,提示调用该程序的输入参数;


(3) 编写方法 parseArgument,获取并解析输入参数;


(4) 编写类RecordConverter用来格式化数据,生成 Record 记录;


(5) 编写方法 main 方法,实现单线程数据下载:


//根据输入参数中的配置文件,配置阿里云账号
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setDefaultProject(project);
  odps.setEndpoint(OdpsEndpoint);
//基于上述云账号,创建服务入口类
  TableTunnel tunnel = new TableTunnel(odps);
  tunnel.setEndpoint(TunnelEndpoint);
//创建从上述odps服务通道中下载数据的会话,分为分区的表和非分区表两种:
  TableTunnel.DownloadSession session;
  if(partition != null) {
    PartitionSpec spec = new PartitionSpec(partition);
    session= tunnel.createDownloadSession(project, table, spec);
  }else{
    session= tunnel.createDownloadSession(project, table);
  }
//从odps表中读出记录,格式化后,写入到本地文件:
  RecordReader reader = session.openRecordReader(0L, session.getRecordCount(),
 true);
  TableSchema schema = session.getSchema();
  Record record;
  RecordConverter converter = new RecordConverter(schema, "NULL", null, null);
  String[] items = new String[schema.getColumns().size()];
  while ((record = reader.read()) != null) {
    items = converter.format(record);
    for(int i=0; i<items.length; ++i) {
      if(i>0) out.write(fieldDelimeter.getBytes());
      out.write(items[i].getBytes());
    }
    out.write(lineDelimiter.getBytes());
  }
  reader.close();
  out.close();

(6) 在 eclipse 中设置测试运行参数:

将下列参数填入 program parameter


-f E:\ODPS_DEMO\resources\02-DataTransfer\downloadMaleDataSet.csv -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini -t t_tunnel_sdk -p ‘gender=“Male”’

20200711135003353.png


去查看下载文件E:\ODPS_DEMO\resources\02-DataTransfer\downloadMale

DataSet.csv,并和ODPS表 t_tunnel_sdk 中的数据对比。


第 4 章:实验总结


4.1 实验总结


MaxCompute提供的数据传输功能,tunnel命令集方便我们上传本地数据到单表、分区表,并支持上传时自定义设计列分隔符、行分隔符、及数据容错等能力,数据量较大,还可以指定线程数据来加速数据的上传,实验详细介绍了日常工作中常用的功能。


另外,MaxCompute还提供了tunnel JAVA SDK 方便我们进行程序开发时使用数据传输功能

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
3月前
|
人工智能 分布式计算 架构师
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
|
3月前
|
SQL 存储 分布式计算
MaxCompute 入门:大数据处理的第一步
【8月更文第31天】在当今数字化转型的时代,企业和组织每天都在产生大量的数据。有效地管理和分析这些数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个用于处理海量数据的大规模分布式计算服务。它提供了强大的存储能力以及丰富的数据处理功能,让开发者能够快速构建数据仓库、实时报表系统、数据挖掘等应用。本文将介绍 MaxCompute 的基本概念、架构,并演示如何开始使用这一大数据处理平台。
553 0
|
3月前
|
SQL 分布式计算 大数据
"大数据计算难题揭秘:MaxCompute中hash join内存超限,究竟该如何破解?"
【8月更文挑战第20天】在大数据处理领域,阿里云的MaxCompute以高效稳定著称,但复杂的hash join操作常导致内存超限。本文通过一个实例解析此问题:数据分析师小王需对两个共计300GB的大表进行join,却遭遇内存不足。经分析发现,单个mapper任务内存默认为2GB,不足以支持大型hash表的构建。为此,提出三种解决方案:1) 提升mapper任务内存;2) 利用map join优化小表连接;3) 实施分而治之策略,将大表分割后逐一处理再合并结果。这些方法有助于提升大数据处理效率及稳定性。
86 0
|
3月前
|
SQL 分布式计算 大数据
"揭秘MaxCompute大数据秘术:如何用切片技术在数据海洋中精准打捞?"
【8月更文挑战第20天】在大数据领域,MaxCompute(曾名ODPS)作为阿里集团自主研发的服务,提供强大、可靠且易用的大数据处理平台。数据切片是其提升处理效率的关键技术之一,它通过将数据集分割为小块来优化处理流程。使用MaxCompute进行切片可显著提高查询性能、支持并行处理、简化数据管理并增强灵活性。例如,可通过SQL按时间或其他维度对数据进行切片。此外,MaxCompute还支持高级切片技术如分区表和分桶表等,进一步加速数据处理速度。掌握这些技术有助于高效应对大数据挑战。
116 0
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之运行DDL任务时出现异常,具体错误是ODPS-0110061,该如何处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
108 3
|
4月前
|
分布式计算 大数据 BI
MaxCompute操作报错合集之返回错误代码ODPS-0110999,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
分布式计算 运维 大数据
混合云模式下 MaxCompute + Hadoop 混搭大数据架构实践。
除了资源效率和成本的优势外,混合云模式还为斗鱼带来了可量化的成本、增值服务以及额外的专业服务。阿里云的专业团队可以为斗鱼提供技术咨询和解决方案,帮助斗鱼解决业务难题。此外,计算资源的可量化也使得斗鱼能够清晰地了解资源使用情况,为业务决策提供依据。
|
4月前
|
存储 SQL 机器学习/深度学习
阿里云数加大数据计算服务MaxCompute学习路线图:从入门到精通
将所学知识应用于实际工作中并不断进行实践和创新是提升技术能力的关键所在。用户可以结合业务需求和技术发展趋势积极探索新的应用场景和解决方案,并在实践中不断总结经验和教训以提升自己的技术水平和实践能力。
|
4月前
|
SQL 分布式计算 数据处理
SQL 能力问题之MaxCompute(ODPS)SQL有哪些特点
SQL 能力问题之MaxCompute(ODPS)SQL有哪些特点

热门文章

最新文章

下一篇
无影云桌面