自己写的数据交换工具——从Oracle到Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

先说说需求的背景,由于业务数据都在Oracle数据库中,想要对它进行数据的分析会非常非常慢,用传统的数据仓库-->数据集市这种方式,集市层表会非常大,查询的时候如果再做一些group的操作,一个访问需要一分钟甚至更久才能响应。

为了解决这个问题,就想把业务库的数据迁移到Elasticsearch中,然后针对es再去做聚合查询。

问题来了,数据库中的数据量很大,如何导入到ES中呢?

Logstash JDBC

Logstash提供了一款JDBC的插件,可以在里面写sql语句,自动查询然后导入到ES中。这种方式比较简单,需要注意的就是需要用户自己下载jdbc的驱动jar包。

input {
    jdbc {
        jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test"
        jdbc_user => "test"
        jdbc_password => "test123"
        schedule => "* * * * *"
        statement => "select * from TARGET_TABLE"
        add_field => ["type","a"]
    }
}
output{
    elasticsearch {
        hosts =>["10.10.1.205:9200"]
        index => "product"
        document_type => "%{type}"
    }
}

不过,它的性能实在是太差了!我导了一天,才导了两百多万的数据。

因此,就考虑自己来导。

自己的数据交换工具

思路:

最后使用发现,自己写的导入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!

遇到的问题

  • 1 JDBC需要采用分页的方式读取全量数据
  • 2 要模仿bulk文件进行存储
  • 3 由于bulk文件过大,导致curl内存溢出

程序开源

下面的代码需要注意的就是

public class JDBCUtil {
    private static Connection conn = null;
    private static PreparedStatement sta=null;
    static{
        try {
            Class.forName("oracle.jdbc.driver.OracleDriver");
            conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        System.out.println("Database connection established");
    }
    /**
    * 把查到的数据格式化写入到文件
    *
    * @param list 需要存储的数据
    * @param index 索引的名称
    * @param type 类型的名称
    * @param path 文件存储的路径
    **/
    public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException {
        System.out.println("开始写文件");
        File file = new File(path);
        int count = 0;
        int size = list.size();
        for(Map map : list){
            FileUtils.write(file,  "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true);
            FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true);
//            System.out.println("写入了" + ((count++)+1) + "[" + size + "]");
        }
        System.out.println("写入完成");
    }

    /**
     * 读取数据
     * @param sql
     * @return
     * @throws SQLException
     */
    public static List<Map> readTable(String tablename,int start,int end) throws SQLException {
        System.out.println("开始读数据库");
        //执行查询
        sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end);
        ResultSet rs = sta.executeQuery();

        //获取数据列表
        List<Map> data = new ArrayList();
        List<String> columnLabels = getColumnLabels(rs);

        Map<String, Object> map = null;
        while(rs.next()){
            map = new HashMap<String, Object>();

            for (String columnLabel : columnLabels) {
                Object value = rs.getObject(columnLabel);
                map.put(columnLabel.toLowerCase(), value);
            }
            data.add(map);
        }
        sta.close();
        System.out.println("数据读取完毕");
        return data;
    }
    /**
     * 获得列名
     * @param resultSet
     * @return
     * @throws SQLException
     */
    private static List<String> getColumnLabels(ResultSet resultSet)
            throws SQLException {
        List<String> labels = new ArrayList<String>();

        ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData();
        for (int i = 0; i < rsmd.getColumnCount(); i++) {
            labels.add(rsmd.getColumnLabel(i + 1));
        }

        return labels;
    }
    /**
    * 获得数据库表的总数,方便进行分页
    *
    * @param tablename 表名
    */
    public static int count(String tablename) throws SQLException {
        int count = 0;
        Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
        ResultSet rs = stmt.executeQuery("select count(1) from "+tablename);
        while (rs.next()) {
            count = rs.getInt(1);
        }
        System.out.println("Total Size = " + count);
        rs.close();
        stmt.close();
        return count;
    }
    /**
     * 执行查询,并持久化文件
     * 
     * @param tablename 导出的表明
     * @param page 分页的大小
     * @param path 文件的路径
     * @param index 索引的名称
     * @param type 类型的名称
     * @return
     * @throws SQLException
     */
    public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException {
        int count = count(tablename);
        int i =0;
        for(i =0;i<count;){
            List<Map> map = JDBCUtil.readTable(tablename,i,i+page);
            JDBCUtil.writeTable(map,index,type,path);
            i+=page;
        }
    }
}

在main方法中传入必要的参数即可:

public class Main {
    public static void main(String[] args) {
        try {
            JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type");
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这样得到bulk的数据后,就可以运行脚本分批导入了。

下面脚本的思路,就是每100000行左右的数据导入到一个目标文件,使用bulk命令导入到es中。注意一个细节就是不能随意的切分文件,因为bulk的文件是两行为一条数据的。

#!/bin/bash

count=0
rm target.json
touch target.json


while read line;do

((count++))

{
        echo $line >> target.json

        if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then
                count=0
                curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
                rm target.json
                touch target.json
        fi

}

done < $1
echo 'last submit'
curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null

最后执行脚本:

sh auto_bulk.sh data.json

自己测试最后要比logstasj jdbc快5-6倍。

本文转自博客园xingoo的博客,原文链接:自己写的数据交换工具——从Oracle到Elasticsearch,如需转载请自行联系原博主。
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5月前
|
SQL 运维 Oracle
【迁移秘籍揭晓】ADB如何助你一臂之力,轻松玩转Oracle至ADB的数据大转移?
【8月更文挑战第27天】ADB(Autonomous Database)是由甲骨文公司推出的自动化的数据库服务,它极大简化了数据库的运维工作。在从传统Oracle数据库升级至ADB的过程中,数据迁移至关重要。
87 0
|
2月前
|
存储 Oracle 关系型数据库
【赵渝强老师】Oracle的还原数据
Oracle数据库中的还原数据(也称为undo数据或撤销数据)存储在还原表空间中,主要用于支持查询的一致性读取、实现闪回技术和恢复失败的事务。文章通过示例详细介绍了还原数据的工作原理和应用场景。
【赵渝强老师】Oracle的还原数据
|
5月前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的联机重做日志文件与数据写入过程
在Oracle数据库中,联机重做日志文件记录了数据库的变化,用于实例恢复。每个数据库有多组联机重做日志,每组建议至少有两个成员。通过SQL语句可查看日志文件信息。视频讲解和示意图进一步解释了这一过程。
|
2月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的数据文件
在Oracle数据库中,数据库由多个表空间组成,每个表空间包含多个数据文件。数据文件存储实际的数据库数据。查询时,如果内存中没有所需数据,Oracle会从数据文件中读取并加载到内存。可通过SQL语句查看和管理数据文件。附有视频讲解及示例。
|
3月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
263 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
3月前
|
Oracle 关系型数据库 数据库
oracle数据创建同义词
oracle数据创建同义词
61 0
|
3月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
234 0
|
5月前
|
存储 缓存 监控
|
5月前
|
自然语言处理 索引
ElasticSearch 实现分词全文检索 - 测试数据准备
ElasticSearch 实现分词全文检索 - 测试数据准备
63 1

推荐镜像

更多
下一篇
开通oss服务