Flink实时告警系统设计与开发
背景
实时监控系统需要满足对多种来源的数据进行告警,为提升系统的可扩展行和灵活性,采用动态规则配置来实现多种数据源、多种告警规则的实时告警。需要实时监测和发现车端云端的信号、埋点数据是否有异常,车辆运行状况异常。
1、数据来源
2、系统架构设计
1、系统分层架构设计
本着高内聚低耦合的原则,实时告警系统采用分层设计的思想对整体的功能模块进行组合,其中: 1、Flink DataStream 层的功能是数据流在Flink内部的整体流向DAG图,如addSource、connect、process、addSink; 2、Flink Function 层的功能是对function的具体实现,如AlertManagerSinkFunction、CustomMysqlSourceFunction、RuleMatchBroadCastProcessFunction等; 3、Service 层是业务的处理过程,如负责向AlertManager传输数据的AlertManagerService、负责规则同步、更新、维护、转化、匹配的 RulesService。
2、业务模块设计
说明:业务上,需要告警的数据源目前有4中数据来源,分别是远端日志、云端微服务日志、车机端埋点、Sentry异常奔溃,其中Sentry 中的数据需要通过告警规则的筛选后发送到kafka中用于实时监控。设计上首先通过Driver中的class 路由到通用JSON告警模块或者Sentry异常奔溃业务处理模块,其次通过app.type 选择kafka中的数据源。
3、Flink DataStream 处理流程图
说明:DataStream 处理流程图展示的是数据从Kafka消费后再Flink Function 中的流向关系,Driver 负责Flink程序的启动,通过class筛选路由到通用JSON告警或者Sentry异常崩溃模块,其中内部的逻辑比较相似: 1、首先Mysql中的配置通过自定义数据源模块会被解析成配置流; 2、其次kafka topic 会被解析成数据流,通过广播连接,配置流会被广播到每个数据流的TaskManager; 3、通过规则匹配模块对数据流和规则流进行匹配; 4、匹配到数据筛选出非Sentry中的数据分别发送到AlertManager实时告警、MySQL告警统计、kafka 实时监控
4、规则引擎使用
Aviator是一个高性能、轻量级的java语言实现的表达式求值引擎,主要用于各种表达式的动态求值。Aviator是直接将表达式编译成Java字节码,交给 JVM去执行。
规则匹配模块核心使用的是Avaitor规则引擎表达式进行规则匹配,匹配的内容来源于: 1、数据流的JSON通过flattenAsMap转成map; 2、规则流中有效的Rule中获取得到的规则表达式。
5、规则设计
规则存储在MySQL中便于管理和修改,通过flink CDC可实现动态修改和同步。
CREATE TABLE IF NOT EXISTS `flink`.`flink-alert-rule123` ( `id` int(16) NOT NULL AUTO_INCREMENT COMMENT '主键id', `job_id` int(16) NOT NULL COMMENT '需求id', `rule_keys` varchar(255) DEFAULT NULL COMMENT '规则相关的key', `name` varchar(255) DEFAULT NULL COMMENT '规则名称', `exp` varchar(1020) DEFAULT NULL COMMENT '规则表达式', `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新规则时间', `create_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建规则时间', `is_valid` int(1) NOT NULL COMMENT '规则是否有效,无效不会告警', `app_type` varchar(255) NOT NULL COMMENT '规则适用的应用类型,值必须是AppType的枚举值,忽略大小写', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
json规则的示例:
{ "common_data":{ "appPackage":"ltd.qisi.sotasupportapp", "appVersion":"3.03.01.000", "collectedTime":1625240289781, "behaviorId":"50026003", "qisiuiVersion":"0.2.02", "uid":"1924427992", "regionCode":"659001", "eventName":"mock", "vin":"MOCK1TELWMOMZRQAWO", "hardwareVersion":"3.03.01.000", "carseries":"E115", "pdsn":"47556519116431", "displayId":"0" }, "gather_data":{ "key1":"value5", "key2":"69", "key3":"0" } }
告警规则 common_data.appPackage == 'ltd.qisi.sotasupportapp' 表示common_data.appPackage字段等于'td.qisi.sotasupportapp
6、输出业务告警数据格式设计
1、车机端告警统计格式
CREATE TABLE `flink`.`flink-alert-data` ( `app_package` varchar(255) comment 'app包名' ,`collected_time` bigint(16) comment '数据' ,`behavior_id` varchar(255) comment '' ,`qisiui_version` varchar(255) comment '' ,`uid` varchar(255) comment '用户id' ,`region_code` varchar(255) comment '' ,`os_version` varchar(255) comment '' ,`event_name` varchar(255) comment '' ,`vin` varchar(255) comment 'vin码' ,`hardware_version` varchar(255) comment '' ,`carseries` varchar(255) comment '' ,`pdsn` varchar(255) comment '' ,`display_id` varchar(255) comment '屏幕id[ 0:主控屏;1:副驾屏;2:左后排屏;3:右后排屏;-1:未知]' ,`rule_name` varchar(255) comment '' ,`rule_id` varchar(255) comment '' ,`rule_exp` varchar(255) comment '' ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、云端告警统计格式
CREATE TABLE `flink`.`flink-cloud-alert-data` ( `microservice` varchar(255) comment '' ,`reqPath` varchar(255) comment '' ,`clicnetIP` varchar(255) comment '' ,`resultCode` varchar(255) comment '' ,`createDate` varchar(255) comment '' ,`ctime` varchar(255) comment '' ,`rule_name` varchar(255) comment '' ,`rule_id` varchar(255) comment '' ,`rule_exp` varchar(255) comment '' ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
7、AlertManager告警模块设计
对接AlertManager的模块为ISendService的实现类,通过调用void send(AlertManagerData data)方法把数据发送的出去,其其实类会调用AlertManager的post请求发送json数据,请求的基本格式为:
curl -XPOST http://localhost:9093/api/v1/alerts -d ' [ { "labels": { "alertname": "DiskRunningFull", "dev": "sda1", "instance": "中文测试", "route": "WEBHOOK" }, "annotations": { "info": "The disk sda1 is running full", "summary": "please check the instance example1" }, "Source": { "link": "http://www.baidu.com" } } ] '
8、核心代码部分
package com.qisi.driver; import com.qisi.functions.AlertManagerSinkFunction; import com.qisi.functions.RuleMatchBroadCastProcessFunction; import com.qisi.functions.SinkFunctions; import com.qisi.functions.SourceFunctions; import com.qisi.pojo.KafkaAlertLog; import com.qisi.pojo.Rule; import com.qisi.utils.Config; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 车机端埋点日志告警,用 {@link Driver}调用 */ public class JsonAlertJobDriver { public static void main(String[] args) throws Exception { // 全局配置 Configuration config = Config.initApplicationConfig(args, JsonAlertJobDriver.class); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(2); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,3000L)); //env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoint/cp/flink-real-time-alert/cp")); env.setStateBackend(new FsStateBackend("file:///qisi/gitlab/flink-real-time-alert/cp")); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); env.getCheckpointConfig().disableCheckpointing(); // 捕获 MySQL 中的规则流 SingleOutputStreamOperator<Rule> mysqlRuleStream = env.addSource(SourceFunctions.getCustomMysqlSource(config)).name("mysqlRuleStream"); // 获取 Kafka 日志流 SingleOutputStreamOperator<String> kafkaSource = env.addSource(SourceFunctions.getKafkaEventSource()).name("kafkaSourceStream"); // 连接 日志流 & 规则广播流 ConnectedStreams<String, Rule> connectStream = kafkaSource.connect(mysqlRuleStream.broadcast()); // 匹配出告警的 LogMessage SingleOutputStreamOperator<KafkaAlertLog> alertStream = connectStream.process(new RuleMatchBroadCastProcessFunction(config)); // 发送告警信息到 AlertManager alertStream.addSink(new AlertManagerSinkFunction(config)); // 告警服务发送到 Mysql 满足告警统计的需求 alertStream.addSink(SinkFunctions.getMysqlAlertSinkFunction(config)); // 告警结果发送到 Kafka 满足监控的需求 alertStream.filter(KafkaAlertLog::isSentryAlertData).map(KafkaAlertLog::getJson).addSink(SinkFunctions.getFlinkKafkaProducer()); alertStream.print(); env.execute(JsonAlertJobDriver.class.getSimpleName()); } }
RulesService (管理规则的服务)
package com.qisi.services; import com.alibaba.fastjson.JSONObject; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; import com.googlecode.aviator.Options; import com.qisi.pojo.AppType; import com.qisi.pojo.Rule; import com.qisi.utils.BeanToMap; import com.qisi.utils.MysqlConnectionUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import java.io.Serializable; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 管理规则的服务,负责规则的更新、维护 */ @Slf4j @Data public class RulesService implements Serializable{ private AppType appType; private String mysqlRuleSql; private String mysqlUrl; private String url; private String user; private String pswd; /** * 保存全量的规则 */ private HashMap<Integer, Rule> rules = new HashMap<>(); public RulesService(Configuration config) { ConfigOption<AppType> appTypeOption = ConfigOptions.key("app.type").enumType(AppType.class).noDefaultValue(); ConfigOption<String> ruleSqlOption = ConfigOptions.key("rule.sql").stringType().noDefaultValue(); ConfigOption<String> uelOption = ConfigOptions.key("mysql.url").stringType().noDefaultValue(); appType = config.get(appTypeOption); mysqlRuleSql = config.get(ruleSqlOption); mysqlUrl = config.get(uelOption); ConfigOption<String> urlOption = ConfigOptions.key("mysql.url").stringType().noDefaultValue(); ConfigOption<String> userOption = ConfigOptions.key("user").stringType().noDefaultValue(); ConfigOption<String> pswdOption = ConfigOptions.key("pswd").stringType().noDefaultValue(); url = config.get(urlOption); user = config.get(userOption); pswd = config.get(pswdOption); } /** * 全量同步 mysql 的规则到{@link #rules} 适用于: * 1、增量同步第一次启动时 * 2、全量同步 */ public void allSyncRule() { try { MysqlConnectionUtil connUtil = new MysqlConnectionUtil(url, user, pswd); ArrayList<Rule> rules = connUtil.onceSelectToJavaBean(mysqlRuleSql, Rule.class); for (Rule rule : rules) { if(isValid(rule)) { this.rules.put(rule.getId(), rule); } } } catch (SQLException e) { log.error("all sync mysql rule error, url: {}, sql: {}", mysqlUrl, mysqlRuleSql); e.printStackTrace(); } } /** * 增量同步规则到 this.rules * @param value : binlog 内容 */ public void incrementSyncRule(String value) { Rule rule = RulesService.toRule(value); if(isValid(rule)) { rules.put(rule.getId(), rule); } } /** * 增量同步规则到 this.rules * @param rule : Rule 对象 */ public void incrementSyncRule(Rule rule) { // 更新规则库 if(isValid(rule)) { rules.put(rule.getId(), rule); } } private boolean isValid(Rule rule) { return rule.getIsValid() == 1 && appType.name().equalsIgnoreCase(rule.getAppType()); } private static Rule toRule(String binlogJson){ try { // 获取更新后的变化内容 String after = JSONObject.parseObject(binlogJson).getJSONObject("after").toJSONString(); // 解析成 Rule 类型 return JSONObject.parseObject(after, Rule.class); } catch (Exception e) { e.printStackTrace(); log.error("parse MySQL rules error:{}", binlogJson); } return new Rule(0); } public List<Rule> mathRules(String json) { List<Rule> res = new ArrayList<>(); Map<String, Object> map = BeanToMap.jsonFlatten(json); for (Map.Entry<Integer, Rule> entry : this.getRules().entrySet()) { try { Rule rule = entry.getValue(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setOption(Options.ENABLE_PROPERTY_SYNTAX_SUGAR, false); Object execute = AviatorEvaluator.execute(rule.getExp(), map); if (execute instanceof Boolean) { boolean match = (boolean) execute; if(match){ // 触发告警 log.info("Send Alert Data to AlertManager: | {} | <--> | {} |", rule, json); res.add(rule); }else{ log.debug("No Matched: | {} | -- | {} |", rule, json); } }else{ log.error("rule set error, please modify rule:{}", rule); } } catch (Exception e) { log.error("AviatorEvaluator regular expression match error:| {} | -- | {} |", this.getRules(), json); e.printStackTrace(); } } return res; } }
Flink CDC
private static SourceFunction<String> getBinlogSourceFunction(String table) { Properties properties = new Properties(); properties.put("offset.flush.interval.ms", "10000"); properties.put("offset.flush.timeout.ms", "40000"); properties.put("autocommit", "false"); return MySQLSource.<String>builder() .hostname(Config.MYSQL_HOST) .port(Config.MYSQL_PORT) .databaseList(Config.MYSQL_DB) .tableList(table) //.startupOptions(StartupOptions.latest()) .username(Config.MYSQL_USER) .password(Config.MYSQL_PSWD) .deserializer(new MysqlCDCDeserializationSchema()) .debeziumProperties(properties) .build(); }
规则匹配核心类
package com.qisi.functions; import com.qisi.pojo.KafkaAlertLog; import com.qisi.pojo.Rule; import com.qisi.pojo.alertManager.Route; import com.qisi.services.RulesService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; import java.sql.SQLException; import java.util.List; /** * 规则匹配核心类 */ @Slf4j @Data public class RuleMatchBroadCastProcessFunction extends CoProcessFunction<String, Rule, KafkaAlertLog> { private RulesService rulesService; private Route route; private static final String alertName = "Sentry Alert"; public RuleMatchBroadCastProcessFunction(Configuration config) throws SQLException { ConfigOption<Route> routeOption = ConfigOptions.key("am.route").enumType(Route.class).defaultValue(Route.WEBHOOK); route = config.get(routeOption); rulesService = new RulesService(config); // rulesService.allSyncRule(); log.info("init load Mysql rule completed, total count: {} ", rulesService.getRules().size()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void processElement1(String value, Context ctx, Collector<KafkaAlertLog> out) { log.debug("accept a per dada: {}", value); List<Rule> rules = rulesService.mathRules(value); for (Rule rule : rules) { out.collect(new KafkaAlertLog(value, rule)); } } @Override public void processElement2(Rule value, Context ctx, Collector<KafkaAlertLog> out) { log.debug("rule refresh :{}", value); rulesService.incrementSyncRule(value); } }
AlertMessageFilterFunction
package com.qisi.functions; import com.googlecode.aviator.AviatorEvaluator; import com.qisi.pojo.LogMessage; import com.qisi.pojo.Rule; import com.qisi.services.RulesService; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; /** * RulesService 周期性查询 MySQL 更新规则,筛选出明细 LogMessage */ @Deprecated public class AlertMessageFilterFunction extends ProcessFunction<LogMessage, LogMessage> { private RulesService rulesService; private HashMap<Integer, Rule> rules; private long time = 0; private long period; public AlertMessageFilterFunction(long period) { this.period = period; } @Override public void open(Configuration parameters) throws SQLException { rulesService = new RulesService(parameters); rules = rulesService.getRules(); } @Override public void processElement(LogMessage value, Context ctx, Collector<LogMessage> out) throws Exception { long current = ctx.timerService().currentProcessingTime(); long period = current - time; if (period > this.period) { rulesService.incrementSyncRule(value.toString()); time = current; } for (Map.Entry<Integer, Rule> entry : rules.entrySet()) { String ruleExp = entry.getValue().getExp(); if((boolean) AviatorEvaluator.execute(ruleExp, value.toMap())){ out.collect(value); } } } @Override public void close() throws Exception { super.close(); } }
发送告警
package com.qisi.services; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qisi.pojo.alertManager.AlertManagerData; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.concurrent.FutureCallback; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; /** * AlertManager 告警服务,http 发送 AlertManagerData */ @Slf4j public class AlertManagerService implements ISendService, Serializable { private static CloseableHttpClient httpClient = HttpClientBuilder.create().build(); //private CloseableHttpAsyncClient asyncHttpClient = HttpAsyncClientBuilder.create().build(); private String uri; public AlertManagerService(String amHost, int amPort){ uri = String.format("http://%s:%d/api/v1/alerts", amHost, amPort); // asyncHttpClient.start(); } public void send(AlertManagerData data) { JSONArray arr = new JSONArray(); int statusCode = 0; try { arr.add(data); log.info("uri:{}", uri); HttpPost httpPost = new HttpPost(uri); StringEntity se = new StringEntity(JSONObject.toJSONString(arr), StandardCharsets.UTF_8); se.setContentEncoding("utf8"); se.setContentType("application/json"); httpPost.addHeader("Content-type","application/json; charset=utf-8"); httpPost.setHeader("Accept", "application/json"); httpPost.setEntity(se); log.debug("http client execute starting ..."); CloseableHttpResponse response = httpClient.execute(httpPost); statusCode = response.getStatusLine().getStatusCode(); // Future<HttpResponse> execute = asyncHttpClient.execute(httpPost, new Callbak(arr)); // statusCode = execute.get().getStatusLine().getStatusCode(); log.info("Code: {},Send Alert Data Succeed: {}", statusCode, arr.toString()); }catch (Exception e){ log.error("Code: {}, Send Alert Data Failed: {}", statusCode, arr.toString()); e.printStackTrace(); } } public void close() throws IOException { httpClient.close(); } public static class Callbak implements FutureCallback<HttpResponse> { private JSONArray arr; private Callbak(JSONArray arr) { this.arr = arr; } @Override public void completed(HttpResponse httpResponse) { int statusCode = httpResponse.getStatusLine().getStatusCode(); if (statusCode == 200) { log.info("Code: {},Send Alert Data Succeed: {}", statusCode, arr.toString()); }else{ log.error("Code: {},Send Alert Data Succeed: {}", statusCode, arr.toString()); } } @Override public void failed(Exception e) { log.error(e.getMessage()); e.printStackTrace(); } @Override public void cancelled() { log.info("Callbak cancelled"); } } }