报表查询优化之ClickHouse

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 报表查询优化之ClickHouse

公司最近有一个新需求,需要对报表的查询进行优化,目前查询数据量大的表如下t_parking_record 一个是停车记录表 有1千多万条数据 每天大约有1.5万新增数据, t_charge_record 是缴费记录表,基本上和停车记录差不多,也是千万级的大表,由于索引的优化和sql语句的优化已经对其结果影响甚微,如果查询时间稍微长一点,就会很慢,导致oom或者其他问题。针对目前这个问题,采取的方案是通过canal中间件把mysql的数据导入 ClickHouse ,通过专门的olap工具进行优化。

首先解释一下什么是olap

联机分析处理OLAP(On-Line Analytical Processing) 是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态的报表系统。

再说说为啥用ClickHouse  简单总结一个字快

Elasticsearch vs ClickHouse

ClickHouse 是一款高性能列式分布式数据库管理系统,我们对 ClickHouse 进行了测试,发现有下列优势:

①ClickHouse 写入吞吐量大

单服务器日志写入量在 50MB 到 200MB/s,每秒写入超过 60w 记录数,是 ES 的 5 倍以上。

在 ES 中比较常见的写 Rejected 导致数据丢失、写入延迟等问题,在 ClickHouse 中不容易发生。

②查询速度快

官方宣称数据在 pagecache 中,单服务器查询速率大约在 2-30GB/s;没在 pagecache 的情况下,查询速度取决于磁盘的读取速率和数据的压缩率。经测试 ClickHouse 的查询速度比 ES 快 5-30 倍以上。

③ClickHouse 比 ES 服务器成本更低

一方面 ClickHouse 的数据压缩比比 ES 高,相同数据占用的磁盘空间只有 ES 的 1/3 到 1/30,节省了磁盘空间的同时,也能有效的减少磁盘 IO,这也是ClickHouse查询效率更高的原因之一。

另一方面 ClickHouse 比 ES 占用更少的内存,消耗更少的 CPU 资源。我们预估用 ClickHouse 处理日志可以将服务器成本降低一半。

多维分析组件选型考察方面

海量:是衡量OLAP最基础的指标,扩展性要好,是否可以支持单集群百台以上服务器,支撑每天百亿+的数据分析计算。

适应性:选择的组件可以覆盖到大部分的分析场景,不需要通过增加其他额外的组件来支持多样化的业务需求,致使架构的复杂性提升。

灵活性:能够在任意维度上进行组合,灵活的调整数据指标,动态增删列,很好的响应业务需求。

时效性:比如做到分钟级/亚秒级端到端数据延时,让相关人员能够即时的看到决策效果,并做响应的调整。

知道了为什么使用ClickHouse后我们来看一个案例

1mysql 开启binlog文件配置

# binlog

server-id=1  #配置从节点id

log-bin=mysql-bin

binlog_format=row

binlog-do-db=gmall

2重启mysql

win停止mysql

net stop mysql

net start mysql

3查看binlog是否开启

show variables like '%log_bin%'

4创建canal账号

CREATE USER canal IDENTIFIED BY 'canal';  #新建用户

GRANT ALL ON `tender`.* TO 'canal'@'%';             #赋予权限

FLUSH PRIVILEGES;  

修改canal的配置文件

vi conf/example/instance.properties

canal.instance.master.address=localhost:3306

canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
设置需要同步的表
# table regex
canal.instance.filter.regex=stpnew\\.t_parking_record,stpnew\\.t_charge_record
_charge_record
设置不需要同步的表
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
此处选用mq的方式进行同步 配置 topic
canal.mq.topic=binlog_stpnew
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#同一id的数据进入同一个分区,保证消费的顺序性
canal.mq.partitionHash=stpnew.t_parking_record:id,
stpnew.t_charge_record:parkingrecordidcord:parkingrecordid
vi /usr/local/canal/conf/canal.properties
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
其他相关的配置可以参考
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
然后启动canal
运行startup.sh  win 运行 .bat 即可
kafka创建topic
### 手动创建topic(集群)
bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 6 --topic binlog_stpnew
代码参考
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.reformer.dataeye.cache.JedisClient;
import com.reformer.dataeye.handler.AbstractTableHandler;
import com.reformer.dataeye.handler.TableStraegyFactory;
import com.reformer.dataeye.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("unchecked")
@Slf4j
@Configuration
@Component
public class DataConsumer {
    @Autowired
    protected JedisClient       jedisClient;
    @Autowired
    private TableStraegyFactory tableStraegyFactory;
    @Value("${spring.kafka.listener.concurrency:6}")
    private int                 concurrency;
    protected String            RETRY_KEY = "dataeye_retry";
    @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory")
    public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) {
        Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>();
        ConsumerRecord<String, String> firstRecord = records.get(0);
        log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size());
        try {
            //获取
            for (ConsumerRecord<String, String> record : records) {
                Optional<String> kafkaMessage = (Optional<String>) Optional
                        .ofNullable(record.value());
                if (!kafkaMessage.isPresent()) {
                    continue;
                }
                JSONObject json = JSONObject.parseObject(kafkaMessage.get());
                String tableName = json.getString("table");
                if (StringUtil.isBlank(tableName)
                        || tableStraegyFactory.getTableStraegy(tableName) == null) {
                    //没有定义的表处理类直接返回
                    continue;
                }
                //数据分组聚合
                List<JSONObject> listJson = tableMap.get(tableName);
                if (listJson == null) {
                    listJson = new ArrayList<JSONObject>();
                }
                if (!listJson.contains(json)) {
                    listJson.add(json);
                }
                tableMap.put(tableName, listJson);
            }
            if (tableMap.isEmpty() || tableMap.size() == 0) {
                return;
            }
            for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) {
                List<JSONObject> array = entry.getValue();
                log.info("insert:size={},table={}", array.size(), entry.getKey());
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey());
                if (tableHanlder != null && array.size() > 0) {
                    boolean ret = tableHanlder.tableProcess(array);
                    if (!ret) {
                        this.putRetyQueue(entry.getKey(), array);
                    }
                }
            }
        } catch (Exception ex) {
            //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息
            log.error("process_error:message={}" + ex.getMessage(), ex);
            ex.printStackTrace();
        } finally {
            //提交偏移量
            ackgt.acknowledge();
        }
    }
    /**
     * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,
     * 手动调用Acknowledgment.acknowledge()后提交
     * 
     * @param consumerFactory
     * @return
     */
    @Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置超时时间
        factory.getContainerProperties().setPollTimeout(5000);
        //设置提交偏移量的方式
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        log.info("init_props,consumer={},factory={}",
                JSON.toJSONString(consumerFactory.getConfigurationProperties()),
                JSON.toJSONString(factory.getContainerProperties()));
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        return factory;
    }
    /**
     * 数据库插入失败,保存至重试队列,后面进行定时重试
     * 
     * @param list
     */
    private synchronized void putRetyQueue(String table, List<JSONObject> array) {
        String value = jedisClient.hget(RETRY_KEY, table);
        if (StringUtil.isNotBlank((value))) {
            array.addAll(JSONObject.parseObject(value, List.class));
        }
        jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array));
        log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array);
    }
    /**
     * 批量插入重试任务,每隔2分钟执行一次
     * 
     * @throws Exception
     */
    @Scheduled(fixedDelay = 120000, initialDelay = 10000)
    public void retry() throws Exception {
        String lockKey = RETRY_KEY + "_lock";
        try {
            if (!jedisClient.tryLock(lockKey, 6000, 6000)) {
                return;
            }
            Map<String, String> map = jedisClient.hgetAll(RETRY_KEY);
            Set<Map.Entry<String, String>> entrySet = map.entrySet();
            for (Map.Entry<String, String> entry : entrySet) {
                List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class);
                String table = entry.getKey();
                log.info("retry,table={},size={},array={}", table, array.size(), array);
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table);
                boolean ret = tableHanlder.tableProcess(array);
                if (ret) {
                    long l = jedisClient.hdel(RETRY_KEY, table);
                    log.info("retry_success,table={},size={},l={}", table, array.size(), l);
                }
            }
        } catch (Exception e) {
            log.error("-----message=" + e.getMessage(), e);
        } finally {
            jedisClient.unlock(lockKey);
        }
    }
}
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.reformer.dataeye.cache.JedisClient;
import com.reformer.dataeye.handler.AbstractTableHandler;
import com.reformer.dataeye.handler.TableStraegyFactory;
import com.reformer.dataeye.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("unchecked")
@Slf4j
@Configuration
@Component
public class DataConsumer {
    @Autowired
    protected JedisClient       jedisClient;
    @Autowired
    private TableStraegyFactory tableStraegyFactory;
    @Value("${spring.kafka.listener.concurrency:6}")
    private int                 concurrency;
    protected String            RETRY_KEY = "dataeye_retry";
    @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory")
    public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) {
        Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>();
        ConsumerRecord<String, String> firstRecord = records.get(0);
        log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size());
        try {
            //获取
            for (ConsumerRecord<String, String> record : records) {
                Optional<String> kafkaMessage = (Optional<String>) Optional
                        .ofNullable(record.value());
                if (!kafkaMessage.isPresent()) {
                    continue;
                }
                JSONObject json = JSONObject.parseObject(kafkaMessage.get());
                String tableName = json.getString("table");
                if (StringUtil.isBlank(tableName)
                        || tableStraegyFactory.getTableStraegy(tableName) == null) {
                    //没有定义的表处理类直接返回
                    continue;
                }
                //数据分组聚合
                List<JSONObject> listJson = tableMap.get(tableName);
                if (listJson == null) {
                    listJson = new ArrayList<JSONObject>();
                }
                if (!listJson.contains(json)) {
                    listJson.add(json);
                }
                tableMap.put(tableName, listJson);
            }
            if (tableMap.isEmpty() || tableMap.size() == 0) {
                return;
            }
            for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) {
                List<JSONObject> array = entry.getValue();
                log.info("insert:size={},table={}", array.size(), entry.getKey());
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey());
                if (tableHanlder != null && array.size() > 0) {
                    boolean ret = tableHanlder.tableProcess(array);
                    if (!ret) {
                        this.putRetyQueue(entry.getKey(), array);
                    }
                }
            }
        } catch (Exception ex) {
            //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息
            log.error("process_error:message={}" + ex.getMessage(), ex);
            ex.printStackTrace();
        } finally {
            //提交偏移量
            ackgt.acknowledge();
        }
    }
    /**
     * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,
     * 手动调用Acknowledgment.acknowledge()后提交
     * 
     * @param consumerFactory
     * @return
     */
    @Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置超时时间
        factory.getContainerProperties().setPollTimeout(5000);
        //设置提交偏移量的方式
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        log.info("init_props,consumer={},factory={}",
                JSON.toJSONString(consumerFactory.getConfigurationProperties()),
                JSON.toJSONString(factory.getContainerProperties()));
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        return factory;
    }
    /**
     * 数据库插入失败,保存至重试队列,后面进行定时重试
     * 
     * @param list
     */
    private synchronized void putRetyQueue(String table, List<JSONObject> array) {
        String value = jedisClient.hget(RETRY_KEY, table);
        if (StringUtil.isNotBlank((value))) {
            array.addAll(JSONObject.parseObject(value, List.class));
        }
        jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array));
        log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array);
    }
    /**
     * 批量插入重试任务,每隔2分钟执行一次
     * 
     * @throws Exception
     */
    @Scheduled(fixedDelay = 120000, initialDelay = 10000)
    public void retry() throws Exception {
        String lockKey = RETRY_KEY + "_lock";
        try {
            if (!jedisClient.tryLock(lockKey, 6000, 6000)) {
                return;
            }
            Map<String, String> map = jedisClient.hgetAll(RETRY_KEY);
            Set<Map.Entry<String, String>> entrySet = map.entrySet();
            for (Map.Entry<String, String> entry : entrySet) {
                List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class);
                String table = entry.getKey();
                log.info("retry,table={},size={},array={}", table, array.size(), array);
                AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table);
                boolean ret = tableHanlder.tableProcess(array);
                if (ret) {
                    long l = jedisClient.hdel(RETRY_KEY, table);
                    log.info("retry_success,table={},size={},l={}", table, array.size(), l);
                }
            }
        } catch (Exception e) {
            log.error("-----message=" + e.getMessage(), e);
        } finally {
            jedisClient.unlock(lockKey);
        }
    }
}


相关文章
|
存储 监控 OLAP
【ClickHouse 技术系列】- 在 ClickHouse 物化视图中使用 Join
本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB级的数据规模,简单的架构,被国内外公司广泛采用。本系列技术文章,将详细展开介绍 ClickHouse。
【ClickHouse 技术系列】- 在 ClickHouse 物化视图中使用 Join
|
7月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之如何优化大数据量的查询和处理
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
109 5
|
6月前
|
SQL Oracle 关系型数据库
MySQL单表千万级数据查询优化大家怎么说(评论有亮点)
单表千万级数据是MySQL查询的一个坎,可能还不是天花板。“一个人走的慢,一群人走的快”,通过讨论可以发现MySQL千万数据的全貌大概是怎样的。
237 0
|
8月前
|
SQL 存储 算法
clickhouse SQL优化
clickhouse 是 OLAP 数据库,但其具有独特的索引设计,所以如果拿 MySQL 或者其他 RDB 的优化经验来优化 clickhouse 可能得不到很好的效果,所以特此单独整理一篇文档,用于有 SQL 优化需求的同学,本人接触 clickhouse 时间也不长,难免有不足的地方,如果大家发现错误,还请不吝指正。
83580 3
|
8月前
|
存储 SQL 关系型数据库
ClickHouse(08)ClickHouse表引擎概况
ClickHouse支持四种主要表引擎系列:MergeTree家族,适用于大数据插入并按主键排序;日志引擎系列,适合小数据量写入,如StripeLog、Log和TinyLog;集成表引擎,如ODBC、JDBC,用于与外部系统集成;特殊引擎,包括分布式、内存、随机数生成等,满足特定需求。MergeTree系列提供数据副本和分区,日志系列不支持索引和突变操作。详细解析见相关文章链接。
111 0
|
8月前
|
SQL 缓存 运维
常用ClickHouse问题诊断查询
Clickhouse是一个性能强大的OLAP数据库,在实际使用中会遇到各种各样的问题,同时也有很多可以调优的地方。诊断调优所用到的SQL查询必不可少。本文就是一个ClickHouse日常运维的常用SQL查询手册。这个手册本人就在用,非常实用。
74425 48
|
BI 索引
|
存储 算法 数据挖掘
火山引擎:ClickHouse增强计划之“多表关联查询”
火山引擎:ClickHouse增强计划之“多表关联查询”
|
SQL 关系型数据库 OLAP
数据查询|学习笔记
快速学习数据查询
144 0
数据查询|学习笔记
|
存储 机器学习/深度学习 监控
【ClickHouse 技术系列】- 在 ClickHouse 中处理实时更新
本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB级的数据规模,简单的架构,被国内外公司广泛采用。本系列技术文章,将详细展开介绍 ClickHouse。
【ClickHouse 技术系列】- 在 ClickHouse 中处理实时更新