报表查询优化之ClickHouse

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 报表查询优化之ClickHouse

公司最近有一个新需求,需要对报表的查询进行优化,目前查询数据量大的表如下t_parking_record 一个是停车记录表 有1千多万条数据 每天大约有1.5万新增数据, t_charge_record 是缴费记录表,基本上和停车记录差不多,也是千万级的大表,由于索引的优化和sql语句的优化已经对其结果影响甚微,如果查询时间稍微长一点,就会很慢,导致oom或者其他问题。针对目前这个问题,采取的方案是通过canal中间件把mysql的数据导入 ClickHouse ,通过专门的olap工具进行优化。

首先解释一下什么是olap

联机分析处理OLAP(On-Line Analytical Processing) 是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态的报表系统。

再说说为啥用ClickHouse  简单总结一个字快

Elasticsearch vs ClickHouse

ClickHouse 是一款高性能列式分布式数据库管理系统,我们对 ClickHouse 进行了测试,发现有下列优势:

①ClickHouse 写入吞吐量大

单服务器日志写入量在 50MB 到 200MB/s,每秒写入超过 60w 记录数,是 ES 的 5 倍以上。

在 ES 中比较常见的写 Rejected 导致数据丢失、写入延迟等问题,在 ClickHouse 中不容易发生。

②查询速度快

官方宣称数据在 pagecache 中,单服务器查询速率大约在 2-30GB/s;没在 pagecache 的情况下,查询速度取决于磁盘的读取速率和数据的压缩率。经测试 ClickHouse 的查询速度比 ES 快 5-30 倍以上。

③ClickHouse 比 ES 服务器成本更低

一方面 ClickHouse 的数据压缩比比 ES 高,相同数据占用的磁盘空间只有 ES 的 1/3 到 1/30,节省了磁盘空间的同时,也能有效的减少磁盘 IO,这也是ClickHouse查询效率更高的原因之一。

另一方面 ClickHouse 比 ES 占用更少的内存,消耗更少的 CPU 资源。我们预估用 ClickHouse 处理日志可以将服务器成本降低一半。

多维分析组件选型考察方面

海量:是衡量OLAP最基础的指标,扩展性要好,是否可以支持单集群百台以上服务器,支撑每天百亿+的数据分析计算。

适应性:选择的组件可以覆盖到大部分的分析场景,不需要通过增加其他额外的组件来支持多样化的业务需求,致使架构的复杂性提升。

灵活性:能够在任意维度上进行组合,灵活的调整数据指标,动态增删列,很好的响应业务需求。

时效性:比如做到分钟级/亚秒级端到端数据延时,让相关人员能够即时的看到决策效果,并做响应的调整。

知道了为什么使用ClickHouse后我们来看一个案例

1mysql 开启binlog文件配置

# binlog

server-id=1  #配置从节点id

log-bin=mysql-bin

binlog_format=row

binlog-do-db=gmall

2重启mysql

win停止mysql

net stop mysql

net start mysql

3查看binlog是否开启

show variables like '%log_bin%'

4创建canal账号

CREATE USER canal IDENTIFIED BY 'canal';  #新建用户

GRANT ALL ON `tender`.* TO 'canal'@'%';             #赋予权限

FLUSH PRIVILEGES;  

修改canal的配置文件

vi conf/example/instance.properties

canal.instance.master.address=localhost:3306

canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
设置需要同步的表
# table regex
canal.instance.filter.regex=stpnew\\.t_parking_record,stpnew\\.t_charge_record
_charge_record
设置不需要同步的表
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
此处选用mq的方式进行同步 配置 topic
canal.mq.topic=binlog_stpnew
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#同一id的数据进入同一个分区,保证消费的顺序性
canal.mq.partitionHash=stpnew.t_parking_record:id,
stpnew.t_charge_record:parkingrecordidcord:parkingrecordid
vi /usr/local/canal/conf/canal.properties
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
其他相关的配置可以参考
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
然后启动canal
运行startup.sh  win 运行 .bat 即可
kafka创建topic
### 手动创建topic(集群)
bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 6 --topic binlog_stpnew
代码参考
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.reformer.dataeye.cache.JedisClient;
import com.reformer.dataeye.handler.AbstractTableHandler;
import com.reformer.dataeye.handler.TableStraegyFactory;
import com.reformer.dataeye.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("unchecked")
@Slf4j
@Configuration
@Component
public class DataConsumer {
    @Autowired
    protected JedisClient       jedisClient;
    @Autowired
    private TableStraegyFactory tableStraegyFactory;
    @Value("${spring.kafka.listener.concurrency:6}")
    private int                 concurrency;
    protected String            RETRY_KEY = "dataeye_retry";
    @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory")
    public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) {
        Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>();
        ConsumerRecord<String, String> firstRecord = records.get(0);
        log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size());
        try {
            //获取
            for (ConsumerRecord<String, String> record : records) {
                Optional<String> kafkaMessage = (Optional<String>) Optional
                        .ofNullable(record.value());
                if (!kafkaMessage.isPresent()) {
                    continue;
                }
                JSONObject json = JSONObject.parseObject(kafkaMessage.get());
                String tableName = json.getString("table");
                if (StringUtil.isBlank(tableName)
                        || tableStraegyFactory.getTableStraegy(tableName) == null) {
                    //没有定义的表处理类直接返回
                    continue;
                }
                //数据分组聚合
                List<JSONObject> listJson = tableMap.get(tableName);
                if (listJson == null) {
                    listJson = new ArrayList<JSONObject>();
                }
                if (!listJson.contains(json)) {
                    listJson.add(json);
                }
                tableMap.put(tableName, listJson);
            }
            if (tableMap.isEmpty() || tableMap.size() == 0) {
                return;
            }
            for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) {
                List<JSONObject> array = entry.getValue();
                log.info("insert:size={},table={}", array.size(), entry.getKey());
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey());
                if (tableHanlder != null && array.size() > 0) {
                    boolean ret = tableHanlder.tableProcess(array);
                    if (!ret) {
                        this.putRetyQueue(entry.getKey(), array);
                    }
                }
            }
        } catch (Exception ex) {
            //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息
            log.error("process_error:message={}" + ex.getMessage(), ex);
            ex.printStackTrace();
        } finally {
            //提交偏移量
            ackgt.acknowledge();
        }
    }
    /**
     * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,
     * 手动调用Acknowledgment.acknowledge()后提交
     * 
     * @param consumerFactory
     * @return
     */
    @Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置超时时间
        factory.getContainerProperties().setPollTimeout(5000);
        //设置提交偏移量的方式
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        log.info("init_props,consumer={},factory={}",
                JSON.toJSONString(consumerFactory.getConfigurationProperties()),
                JSON.toJSONString(factory.getContainerProperties()));
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        return factory;
    }
    /**
     * 数据库插入失败,保存至重试队列,后面进行定时重试
     * 
     * @param list
     */
    private synchronized void putRetyQueue(String table, List<JSONObject> array) {
        String value = jedisClient.hget(RETRY_KEY, table);
        if (StringUtil.isNotBlank((value))) {
            array.addAll(JSONObject.parseObject(value, List.class));
        }
        jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array));
        log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array);
    }
    /**
     * 批量插入重试任务,每隔2分钟执行一次
     * 
     * @throws Exception
     */
    @Scheduled(fixedDelay = 120000, initialDelay = 10000)
    public void retry() throws Exception {
        String lockKey = RETRY_KEY + "_lock";
        try {
            if (!jedisClient.tryLock(lockKey, 6000, 6000)) {
                return;
            }
            Map<String, String> map = jedisClient.hgetAll(RETRY_KEY);
            Set<Map.Entry<String, String>> entrySet = map.entrySet();
            for (Map.Entry<String, String> entry : entrySet) {
                List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class);
                String table = entry.getKey();
                log.info("retry,table={},size={},array={}", table, array.size(), array);
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table);
                boolean ret = tableHanlder.tableProcess(array);
                if (ret) {
                    long l = jedisClient.hdel(RETRY_KEY, table);
                    log.info("retry_success,table={},size={},l={}", table, array.size(), l);
                }
            }
        } catch (Exception e) {
            log.error("-----message=" + e.getMessage(), e);
        } finally {
            jedisClient.unlock(lockKey);
        }
    }
}
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.reformer.dataeye.cache.JedisClient;
import com.reformer.dataeye.handler.AbstractTableHandler;
import com.reformer.dataeye.handler.TableStraegyFactory;
import com.reformer.dataeye.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("unchecked")
@Slf4j
@Configuration
@Component
public class DataConsumer {
    @Autowired
    protected JedisClient       jedisClient;
    @Autowired
    private TableStraegyFactory tableStraegyFactory;
    @Value("${spring.kafka.listener.concurrency:6}")
    private int                 concurrency;
    protected String            RETRY_KEY = "dataeye_retry";
    @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory")
    public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) {
        Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>();
        ConsumerRecord<String, String> firstRecord = records.get(0);
        log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size());
        try {
            //获取
            for (ConsumerRecord<String, String> record : records) {
                Optional<String> kafkaMessage = (Optional<String>) Optional
                        .ofNullable(record.value());
                if (!kafkaMessage.isPresent()) {
                    continue;
                }
                JSONObject json = JSONObject.parseObject(kafkaMessage.get());
                String tableName = json.getString("table");
                if (StringUtil.isBlank(tableName)
                        || tableStraegyFactory.getTableStraegy(tableName) == null) {
                    //没有定义的表处理类直接返回
                    continue;
                }
                //数据分组聚合
                List<JSONObject> listJson = tableMap.get(tableName);
                if (listJson == null) {
                    listJson = new ArrayList<JSONObject>();
                }
                if (!listJson.contains(json)) {
                    listJson.add(json);
                }
                tableMap.put(tableName, listJson);
            }
            if (tableMap.isEmpty() || tableMap.size() == 0) {
                return;
            }
            for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) {
                List<JSONObject> array = entry.getValue();
                log.info("insert:size={},table={}", array.size(), entry.getKey());
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey());
                if (tableHanlder != null && array.size() > 0) {
                    boolean ret = tableHanlder.tableProcess(array);
                    if (!ret) {
                        this.putRetyQueue(entry.getKey(), array);
                    }
                }
            }
        } catch (Exception ex) {
            //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息
            log.error("process_error:message={}" + ex.getMessage(), ex);
            ex.printStackTrace();
        } finally {
            //提交偏移量
            ackgt.acknowledge();
        }
    }
    /**
     * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,
     * 手动调用Acknowledgment.acknowledge()后提交
     * 
     * @param consumerFactory
     * @return
     */
    @Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置超时时间
        factory.getContainerProperties().setPollTimeout(5000);
        //设置提交偏移量的方式
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        log.info("init_props,consumer={},factory={}",
                JSON.toJSONString(consumerFactory.getConfigurationProperties()),
                JSON.toJSONString(factory.getContainerProperties()));
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        return factory;
    }
    /**
     * 数据库插入失败,保存至重试队列,后面进行定时重试
     * 
     * @param list
     */
    private synchronized void putRetyQueue(String table, List<JSONObject> array) {
        String value = jedisClient.hget(RETRY_KEY, table);
        if (StringUtil.isNotBlank((value))) {
            array.addAll(JSONObject.parseObject(value, List.class));
        }
        jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array));
        log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array);
    }
    /**
     * 批量插入重试任务,每隔2分钟执行一次
     * 
     * @throws Exception
     */
    @Scheduled(fixedDelay = 120000, initialDelay = 10000)
    public void retry() throws Exception {
        String lockKey = RETRY_KEY + "_lock";
        try {
            if (!jedisClient.tryLock(lockKey, 6000, 6000)) {
                return;
            }
            Map<String, String> map = jedisClient.hgetAll(RETRY_KEY);
            Set<Map.Entry<String, String>> entrySet = map.entrySet();
            for (Map.Entry<String, String> entry : entrySet) {
                List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class);
                String table = entry.getKey();
                log.info("retry,table={},size={},array={}", table, array.size(), array);
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table);
                boolean ret = tableHanlder.tableProcess(array);
                if (ret) {
                    long l = jedisClient.hdel(RETRY_KEY, table);
                    log.info("retry_success,table={},size={},l={}", table, array.size(), l);
                }
            }
        } catch (Exception e) {
            log.error("-----message=" + e.getMessage(), e);
        } finally {
            jedisClient.unlock(lockKey);
        }
    }
}


相关文章
|
存储 Java 关系型数据库
家政服务管理平台
家政服务管理平台
|
存储 搜索推荐 NoSQL
ES 和 clickhouse 对比选型
clickhouse 是列式存储所以无法进行全文检索,所以更适合数据分析的需求。elasticsearch更适合高并发并且查询返回结果较少的全文检索,如搜索引擎。
4295 0
|
SQL 存储 算法
ClickHouse性能优化 3
ClickHouse性能优化
986 0
|
传感器 人工智能 算法
适应多形态多任务,最强开源机器人学习系统八爪鱼诞生
【6月更文挑战第6天】【八爪鱼开源机器人学习系统】由加州大学伯克利分校等机构研发,适用于多形态多任务,已在arXiv上发表。系统基于transformer,预训练于800k机器人轨迹数据集,能快速适应新环境,支持单臂、双机械臂等。特点是多形态适应、多任务处理、快速微调及开源可复现。实验显示其在9个平台有效,但仍需改进传感器处理和语言指令理解。论文链接:https://arxiv.org/pdf/2405.12213
580 1
|
存储 SQL 缓存
优化ClickHouse查询性能:最佳实践与调优技巧
【10月更文挑战第26天】在大数据分析领域,ClickHouse 以其卓越的查询性能和高效的列式存储机制受到了广泛的关注。作为一名已经有一定 ClickHouse 使用经验的开发者,我深知在实际应用中,合理的表设计、索引优化以及查询优化对于提升 ClickHouse 性能的重要性。本文将结合我的实践经验,分享一些有效的优化策略。
1775 3
|
数据采集 存储 分布式计算
ClickHouse大规模数据导入优化:批处理与并行处理
【10月更文挑战第27天】在数据驱动的时代,高效的数据导入和处理能力是企业竞争力的重要组成部分。作为一位数据工程师,我在实际工作中经常遇到需要将大量数据导入ClickHouse的需求。ClickHouse是一款高性能的列式数据库系统,非常适合进行大规模数据的分析和查询。然而,如何优化ClickHouse的数据导入过程,提高导入的效率和速度,是我们面临的一个重要挑战。本文将从我个人的角度出发,详细介绍如何通过批处理、并行处理和数据预处理等技术优化ClickHouse的数据导入过程。
1590 0
|
SQL JavaScript Java
Spring Boot 3 整合 Mybatis-Plus 实现数据权限控制
本文介绍了如何在Spring Boot 3中整合MyBatis-Plus实现数据权限控制,通过使用MyBatis-Plus提供的`DataPermissionInterceptor`插件,在不破坏原有代码结构的基础上实现了细粒度的数据访问控制。文中详细描述了自定义注解`DataScope`的使用方法、`DataPermissionHandler`的具体实现逻辑,以及根据用户的不同角色和部门动态添加SQL片段来限制查询结果。此外,还展示了基于Spring Boot 3和Vue 3构建的前后端分离快速开发框架的实际应用案例,包括项目的核心功能模块如用户管理、角色管理等,并提供Gitee上的开源仓库
2861 11
|
10月前
|
监控 网络协议 视频直播
UDP协议(特点与应用场景)
UDP(用户数据报协议)是传输层的一种无连接协议,具有简单高效、低延迟的特点。其主要特点包括:无连接(无需握手)、不可靠传输(不保证数据完整性)、面向数据报(独立传输)。尽管UDP不如TCP可靠,但在实时通信(如语音通话、视频会议)、在线游戏、多媒体流媒体(如直播、点播)及网络监控等领域广泛应用,满足了对速度和实时性要求较高的需求。
1523 19
|
存储 前端开发 Java
SpringBoot整合Flowable【05】- 使用流程变量传递业务数据
本文介绍了如何使用Flowable的流程变量来管理绩效流程中的自定义数据。首先回顾了之前的简单绩效流程,指出现有流程缺乏分数输入和保存步骤。接着详细解释了流程变量的定义、分类(运行时变量和历史变量)及类型。通过具体代码示例展示了如何在绩效流程中插入全局和局部流程变量,实现各节点打分并维护分数的功能。最后总结了流程变量的使用场景及其在实际业务中的灵活性,并承诺将持续更新Flowable系列文章,帮助读者更好地理解和应用Flowable。 简要来说,本文通过实例讲解了如何利用Flowable的流程变量功能优化绩效评估流程,确保每个环节都能记录和更新分数,同时提供了全局和局部变量的对比和使用方法。
1218 0
SpringBoot整合Flowable【05】- 使用流程变量传递业务数据

热门文章

最新文章