公司最近有一个新需求,需要对报表的查询进行优化,目前查询数据量大的表如下t_parking_record 一个是停车记录表 有1千多万条数据 每天大约有1.5万新增数据, t_charge_record 是缴费记录表,基本上和停车记录差不多,也是千万级的大表,由于索引的优化和sql语句的优化已经对其结果影响甚微,如果查询时间稍微长一点,就会很慢,导致oom或者其他问题。针对目前这个问题,采取的方案是通过canal中间件把mysql的数据导入 ClickHouse ,通过专门的olap工具进行优化。
首先解释一下什么是olap
联机分析处理OLAP(On-Line Analytical Processing) 是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态的报表系统。
再说说为啥用ClickHouse 简单总结一个字快
Elasticsearch vs ClickHouse
ClickHouse 是一款高性能列式分布式数据库管理系统,我们对 ClickHouse 进行了测试,发现有下列优势:
①ClickHouse 写入吞吐量大
单服务器日志写入量在 50MB 到 200MB/s,每秒写入超过 60w 记录数,是 ES 的 5 倍以上。
在 ES 中比较常见的写 Rejected 导致数据丢失、写入延迟等问题,在 ClickHouse 中不容易发生。
②查询速度快
官方宣称数据在 pagecache 中,单服务器查询速率大约在 2-30GB/s;没在 pagecache 的情况下,查询速度取决于磁盘的读取速率和数据的压缩率。经测试 ClickHouse 的查询速度比 ES 快 5-30 倍以上。
③ClickHouse 比 ES 服务器成本更低
一方面 ClickHouse 的数据压缩比比 ES 高,相同数据占用的磁盘空间只有 ES 的 1/3 到 1/30,节省了磁盘空间的同时,也能有效的减少磁盘 IO,这也是ClickHouse查询效率更高的原因之一。
另一方面 ClickHouse 比 ES 占用更少的内存,消耗更少的 CPU 资源。我们预估用 ClickHouse 处理日志可以将服务器成本降低一半。
多维分析组件选型考察方面
海量:是衡量OLAP最基础的指标,扩展性要好,是否可以支持单集群百台以上服务器,支撑每天百亿+的数据分析计算。
适应性:选择的组件可以覆盖到大部分的分析场景,不需要通过增加其他额外的组件来支持多样化的业务需求,致使架构的复杂性提升。
灵活性:能够在任意维度上进行组合,灵活的调整数据指标,动态增删列,很好的响应业务需求。
时效性:比如做到分钟级/亚秒级端到端数据延时,让相关人员能够即时的看到决策效果,并做响应的调整。
知道了为什么使用ClickHouse后我们来看一个案例
1mysql 开启binlog文件配置
# binlog
server-id=1 #配置从节点id
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall
2重启mysql
win停止mysql
net stop mysql
net start mysql
3查看binlog是否开启
show variables like '%log_bin%'
4创建canal账号
CREATE USER canal IDENTIFIED BY 'canal'; #新建用户
GRANT ALL ON `tender`.* TO 'canal'@'%'; #赋予权限
FLUSH PRIVILEGES;
修改canal的配置文件
vi conf/example/instance.properties
canal.instance.master.address=localhost:3306
canal.instance.dbUsername = canal canal.instance.dbPassword = canal 设置需要同步的表 # table regex canal.instance.filter.regex=stpnew\\.t_parking_record,stpnew\\.t_charge_record _charge_record 设置不需要同步的表 # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* 此处选用mq的方式进行同步 配置 topic canal.mq.topic=binlog_stpnew canal.mq.partition=0 # hash partition config canal.mq.partitionsNum=6 #canal.mq.partitionHash=test.table:id^name,.*\\..* #同一id的数据进入同一个分区,保证消费的顺序性 canal.mq.partitionHash=stpnew.t_parking_record:id, stpnew.t_charge_record:parkingrecordidcord:parkingrecordid vi /usr/local/canal/conf/canal.properties canal.serverMode = kafka kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 其他相关的配置可以参考 https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart 然后启动canal 运行startup.sh win 运行 .bat 即可 kafka创建topic ### 手动创建topic(集群) bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 6 --topic binlog_stpnew 代码参考
import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.reformer.dataeye.cache.JedisClient; import com.reformer.dataeye.handler.AbstractTableHandler; import com.reformer.dataeye.handler.TableStraegyFactory; import com.reformer.dataeye.util.StringUtil; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("unchecked") @Slf4j @Configuration @Component public class DataConsumer { @Autowired protected JedisClient jedisClient; @Autowired private TableStraegyFactory tableStraegyFactory; @Value("${spring.kafka.listener.concurrency:6}") private int concurrency; protected String RETRY_KEY = "dataeye_retry"; @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory") public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) { Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>(); ConsumerRecord<String, String> firstRecord = records.get(0); log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size()); try { //获取 for (ConsumerRecord<String, String> record : records) { Optional<String> kafkaMessage = (Optional<String>) Optional .ofNullable(record.value()); if (!kafkaMessage.isPresent()) { continue; } JSONObject json = JSONObject.parseObject(kafkaMessage.get()); String tableName = json.getString("table"); if (StringUtil.isBlank(tableName) || tableStraegyFactory.getTableStraegy(tableName) == null) { //没有定义的表处理类直接返回 continue; } //数据分组聚合 List<JSONObject> listJson = tableMap.get(tableName); if (listJson == null) { listJson = new ArrayList<JSONObject>(); } if (!listJson.contains(json)) { listJson.add(json); } tableMap.put(tableName, listJson); } if (tableMap.isEmpty() || tableMap.size() == 0) { return; } for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) { List<JSONObject> array = entry.getValue(); log.info("insert:size={},table={}", array.size(), entry.getKey()); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey()); if (tableHanlder != null && array.size() > 0) { boolean ret = tableHanlder.tableProcess(array); if (!ret) { this.putRetyQueue(entry.getKey(), array); } } } } catch (Exception ex) { //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息 log.error("process_error:message={}" + ex.getMessage(), ex); ex.printStackTrace(); } finally { //提交偏移量 ackgt.acknowledge(); } } /** * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, * 手动调用Acknowledgment.acknowledge()后提交 * * @param consumerFactory * @return */ @Bean("manualListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); //设置超时时间 factory.getContainerProperties().setPollTimeout(5000); //设置提交偏移量的方式 factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); log.info("init_props,consumer={},factory={}", JSON.toJSONString(consumerFactory.getConfigurationProperties()), JSON.toJSONString(factory.getContainerProperties())); factory.setConcurrency(concurrency); factory.setBatchListener(true); return factory; } /** * 数据库插入失败,保存至重试队列,后面进行定时重试 * * @param list */ private synchronized void putRetyQueue(String table, List<JSONObject> array) { String value = jedisClient.hget(RETRY_KEY, table); if (StringUtil.isNotBlank((value))) { array.addAll(JSONObject.parseObject(value, List.class)); } jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array)); log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array); } /** * 批量插入重试任务,每隔2分钟执行一次 * * @throws Exception */ @Scheduled(fixedDelay = 120000, initialDelay = 10000) public void retry() throws Exception { String lockKey = RETRY_KEY + "_lock"; try { if (!jedisClient.tryLock(lockKey, 6000, 6000)) { return; } Map<String, String> map = jedisClient.hgetAll(RETRY_KEY); Set<Map.Entry<String, String>> entrySet = map.entrySet(); for (Map.Entry<String, String> entry : entrySet) { List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class); String table = entry.getKey(); log.info("retry,table={},size={},array={}", table, array.size(), array); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table); boolean ret = tableHanlder.tableProcess(array); if (ret) { long l = jedisClient.hdel(RETRY_KEY, table); log.info("retry_success,table={},size={},l={}", table, array.size(), l); } } } catch (Exception e) { log.error("-----message=" + e.getMessage(), e); } finally { jedisClient.unlock(lockKey); } } } import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.reformer.dataeye.cache.JedisClient; import com.reformer.dataeye.handler.AbstractTableHandler; import com.reformer.dataeye.handler.TableStraegyFactory; import com.reformer.dataeye.util.StringUtil; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("unchecked") @Slf4j @Configuration @Component public class DataConsumer { @Autowired protected JedisClient jedisClient; @Autowired private TableStraegyFactory tableStraegyFactory; @Value("${spring.kafka.listener.concurrency:6}") private int concurrency; protected String RETRY_KEY = "dataeye_retry"; @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory") public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) { Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>(); ConsumerRecord<String, String> firstRecord = records.get(0); log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size()); try { //获取 for (ConsumerRecord<String, String> record : records) { Optional<String> kafkaMessage = (Optional<String>) Optional .ofNullable(record.value()); if (!kafkaMessage.isPresent()) { continue; } JSONObject json = JSONObject.parseObject(kafkaMessage.get()); String tableName = json.getString("table"); if (StringUtil.isBlank(tableName) || tableStraegyFactory.getTableStraegy(tableName) == null) { //没有定义的表处理类直接返回 continue; } //数据分组聚合 List<JSONObject> listJson = tableMap.get(tableName); if (listJson == null) { listJson = new ArrayList<JSONObject>(); } if (!listJson.contains(json)) { listJson.add(json); } tableMap.put(tableName, listJson); } if (tableMap.isEmpty() || tableMap.size() == 0) { return; } for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) { List<JSONObject> array = entry.getValue(); log.info("insert:size={},table={}", array.size(), entry.getKey()); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey()); if (tableHanlder != null && array.size() > 0) { boolean ret = tableHanlder.tableProcess(array); if (!ret) { this.putRetyQueue(entry.getKey(), array); } } } } catch (Exception ex) { //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息 log.error("process_error:message={}" + ex.getMessage(), ex); ex.printStackTrace(); } finally { //提交偏移量 ackgt.acknowledge(); } } /** * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, * 手动调用Acknowledgment.acknowledge()后提交 * * @param consumerFactory * @return */ @Bean("manualListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); //设置超时时间 factory.getContainerProperties().setPollTimeout(5000); //设置提交偏移量的方式 factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); log.info("init_props,consumer={},factory={}", JSON.toJSONString(consumerFactory.getConfigurationProperties()), JSON.toJSONString(factory.getContainerProperties())); factory.setConcurrency(concurrency); factory.setBatchListener(true); return factory; } /** * 数据库插入失败,保存至重试队列,后面进行定时重试 * * @param list */ private synchronized void putRetyQueue(String table, List<JSONObject> array) { String value = jedisClient.hget(RETRY_KEY, table); if (StringUtil.isNotBlank((value))) { array.addAll(JSONObject.parseObject(value, List.class)); } jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array)); log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array); } /** * 批量插入重试任务,每隔2分钟执行一次 * * @throws Exception */ @Scheduled(fixedDelay = 120000, initialDelay = 10000) public void retry() throws Exception { String lockKey = RETRY_KEY + "_lock"; try { if (!jedisClient.tryLock(lockKey, 6000, 6000)) { return; } Map<String, String> map = jedisClient.hgetAll(RETRY_KEY); Set<Map.Entry<String, String>> entrySet = map.entrySet(); for (Map.Entry<String, String> entry : entrySet) { List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class); String table = entry.getKey(); log.info("retry,table={},size={},array={}", table, array.size(), array); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table); boolean ret = tableHanlder.tableProcess(array); if (ret) { long l = jedisClient.hdel(RETRY_KEY, table); log.info("retry_success,table={},size={},l={}", table, array.size(), l); } } } catch (Exception e) { log.error("-----message=" + e.getMessage(), e); } finally { jedisClient.unlock(lockKey); } } }