半个小时搭建自己的实时监控系统

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 基于Flume、Kafka、Flink的实时监控

首先给直观的看看监控效果图:
image.png

数据流架构如下所示,通过Flume采集日志数据,并写入到kafka中,Flink读取kafka数据经过处理后再次放入到kafka中,监控页面通过websocket监听kafka中数据实现实时的数据显示。
实时监控顶层数据流架构

整体技术框架基于ruoyi单机版本搭建
新增加的文件如下:

第一步先启动Flume,Flume监听文件,我这里通过tail命令监听文件新写入的内容

./flume-ng agent -c /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf -f /Users/dbq/Documents/middleware/flume/master/apache-flume-1.9.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console

配置文件如下,Flume实时监控文件数据,并写入到kafka test 主题中

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /Users/d/Documents/middleware/flume/data/log.00
a1.sources.ri.shell = /bin/sh -c
#对于sink的配置描述 使用kafka做数据的消费
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 5
#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

第二步:系统启动的时候监听kafka topic,并通过Flink进行流式计算,Sink负责将处理后的数据输出到外部系统中。

@Component
public class Runner implements CommandLineRunner {
    @Override
    public void run(String... args) throws Exception {
        System.out.println("--------------");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), getProperties());
        DataStream<String> dataStream = env.addSource(consumer);

        //模拟业务过程流式处理
        DataStream<String> after = dataStream.map((MapFunction<String, String>) s -> {
            MonitorObject mo = getMonitorObject(s);
            return JSON.toJSONString(mo);
        });
        after.addSink(new MySink());
        env.execute("spring flink demo");
    }

    /**
     * 模拟二次处理
     */
    private static MonitorObject getMonitorObject(String s) {
        MonitorObject mo = JSON.toJavaObject(JSON.parseObject(s), MonitorObject.class);
        mo.setOil(mo.getOil() % 2);
        mo.setSpeed(mo.getSpeed()%2);
        return mo;
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:3181");
        properties.setProperty("group.id", "flink-group");
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;

    }
}

第三步:sink中讲数据重新写入到kafka中,这里重写写入到kafka目的是起到平滑推送数据到前端页面的效果,也方便以广播的方式推送到其他业务系统,其他业务系统只需要订阅test_after主题,就可以获得Flink处理之后的数据

@Slf4j
@Component
public class MySink extends RichSinkFunction<String> {
    private AnnotationConfigApplicationContext ctx;

    private final static String topic = "test_after";

    public MySink() {
        log.info("mySink new");
    }

    @Override
    public void open(Configuration paramters) {
        this.ctx = new AnnotationConfigApplicationContext(Config.class);
        log.info("my sink open");
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("[flink监控kafka数据]:{}", value);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<String> data = new ArrayList<>();
        data.add(value);

        DataStreamSource<String> source = env.fromCollection(data);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                topic,
                (KafkaSerializationSchema<String>) (element, timestamp) -> new ProducerRecord<>(
                        topic,
                        element.getBytes()
                ),
                getProperties(),
                FlinkKafkaProducer.Semantic.NONE
        );
        //重新写入到kafka
        source.addSink(producer);
        env.execute();
    }

    @Override
    public void close() {
        ctx.close();
        log.info("my sink close");
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:3181");
        properties.setProperty("group.id", "flink-group");
        properties.setProperty("auto.offset.reset", "latest");
        return properties;

    }

第四步:监听kafka,并通过websocket推送到前端页面

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(groupId = "3",topics = "test_after")
    public void listen(String msg){
        System.out.println("====================> " + msg);
        MonitorObject mo = JSON.toJavaObject(JSON.parseObject(msg), MonitorObject.class);
        WebSocketUsers.sendMessageToUsersByText(mo);
    }
}

其他代码
WebSocketConfig.java

@Configuration
public class WebSocketConfig
{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

WebSocketServer.java

@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer
{
    /**
     * WebSocketServer 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 默认最多允许同时在线人数100
     */
    public static int socketMaxOnlineCount = 100;

    private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) throws Exception
    {
        boolean semaphoreFlag = false;
        // 尝试获取信号量
        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
        if (!semaphoreFlag)
        {
            // 未获取到信号量
            LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
//            WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
            session.close();
        }
        else
        {
            // 添加用户
            WebSocketUsers.put(session.getId(), session);
            LOGGER.info("\n 建立连接 - {}", session);
            LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
//            WebSocketUsers.sendMessageToUserByText(session, "连接成功");
        }
    }

    /**
     * 连接关闭时处理
     */
    @OnClose
    public void onClose(Session session)
    {
        LOGGER.info("\n 关闭连接 - {}", session);
        // 移除用户
        WebSocketUsers.remove(session.getId());
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 抛出异常时处理
     */
    @OnError
    public void onError(Session session, Throwable exception) throws Exception
    {
        if (session.isOpen())
        {
            // 关闭连接
            session.close();
        }
        String sessionId = session.getId();
        LOGGER.info("\n 连接异常 - {}", sessionId);
        LOGGER.info("\n 异常信息 - {}", exception);
        // 移出用户
        WebSocketUsers.remove(sessionId);
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    /**
     * 服务器接收到客户端消息时调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session)
    {
        String msg = message.replace("你", "我").replace("吗", "");
        WebSocketUsers.sendMessageToUserByText(session, msg);
    }
}

WebSocketUsers.java

/**
 * websocket 客户端用户集
 * 
 * @author ruoyi
 */
public class WebSocketUsers
{
    /**
     * WebSocketUsers 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);

    /**
     * 用户集
     */
    private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();

    /**
     * 存储用户
     *
     * @param key 唯一键
     * @param session 用户信息
     */
    public static void put(String key, Session session)
    {
        USERS.put(key, session);
    }

    /**
     * 移除用户
     *
     * @param session 用户信息
     *
     * @return 移除结果
     */
    public static boolean remove(Session session)
    {
        String key = null;
        boolean flag = USERS.containsValue(session);
        if (flag)
        {
            Set<Map.Entry<String, Session>> entries = USERS.entrySet();
            for (Map.Entry<String, Session> entry : entries)
            {
                Session value = entry.getValue();
                if (value.equals(session))
                {
                    key = entry.getKey();
                    break;
                }
            }
        }
        else
        {
            return true;
        }
        return remove(key);
    }

    /**
     * 移出用户
     *
     * @param key 键
     */
    public static boolean remove(String key)
    {
        LOGGER.info("\n 正在移出用户 - {}", key);
        Session remove = USERS.remove(key);
        if (remove != null)
        {
            boolean containsValue = USERS.containsValue(remove);
            LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
            return containsValue;
        }
        else
        {
            return true;
        }
    }

    /**
     * 获取在线用户列表
     *
     * @return 返回用户集合
     */
    public static Map<String, Session> getUsers()
    {
        return USERS;
    }

    /**
     * 群发消息文本消息
     *
     * @param message 消息内容
     */
    public static void sendMessageToUsersByText(Object message)
    {
        Collection<Session> values = USERS.values();
        for (Session value : values)
        {
            sendMessageToUserByText(value, message);
        }
    }

    /**
     * 发送文本消息
     *
     * @param session 自己的用户名
     * @param message 消息内容
     */
    public static void sendMessageToUserByText(Session session, Object message)
    {
        if (session != null)
        {
            try
            {
                session.getBasicRemote().sendText(JSON.toJSONString(message));
            }
            catch (IOException e)
            {
                LOGGER.error("\n[发送消息异常]", e);
            }
        }
        else
        {
            LOGGER.info("\n[你已离线]");
        }
    }
}

前端代码

var url = "ws://127.0.0.1:80/websocket/message";
        var ws = new WebSocket(url);
        ws.onopen = function() {
            $('#text_content').append('已经打开连接!' + '\n');
        }
        ws.onmessage = function(event) {
            console.log(event.data)
            var obj = JSON.parse(event.data);
            gaugeChart.setOption({
                series : [
                    {
                        name: '速度',
                        data: [{value: obj.speed}]
                    },
                    {
                        name: '转速',
                        data: [{value: obj.rotate_speed}]
                    },
                    {
                        name: '油耗',
                        data: [{value: obj.oil}]
                    }
                ]
            })
        }

模拟数据的持续生成,这里每秒钟生成一条数据,以json格式写入到日志文件中

while true
do
 echo "{\"speed\":$((RANDOM %220)),\"rotate_speed\":$((RANDOM %7)),\"oil\":$((RANDOM %3))}" >> log.00
 sleep 1
done
相关文章
|
存储 分布式计算 监控
Hologres产品介绍与技术揭秘
近年来,随着数据实时化的诉求加剧,催生了一系列的实时数仓架构,Lambda架构也应运而生,但是随着场景的复杂度和业务多维需求,Lambda架构的痛点也越来越明显。HSAP的理念则是服务分析一体化,在本文中,来自阿里巴巴的资深技术专家将会深度剖析HSAP技术实现Hologres的设计原理,解读其产品典型场景。
13347 0
Hologres产品介绍与技术揭秘
|
2月前
|
数据采集 开发框架 .NET
告别爬取困境:用Playwright完美抓取复杂动态网页
Playwright:动态网页爬虫新利器。跨浏览器支持、智能等待、网络拦截,轻松应对异步加载与反爬机制。实战案例+高效技巧,解锁复杂页面数据抓取。
257 0
|
调度
关于定时任务,看着一系列就够了——2.Cron表达式
软件开发定时任务基础—— Cron 表达式介绍
1826 0
关于定时任务,看着一系列就够了——2.Cron表达式
|
3月前
|
消息中间件 数据采集 NoSQL
秒级行情推送系统实战:从触发、采集到入库的端到端架构
本文设计了一套秒级实时行情推送系统,涵盖触发、采集、缓冲、入库与推送五层架构,结合动态代理IP、Kafka/Redis缓冲及WebSocket推送,实现金融数据低延迟、高并发处理,适用于股票、数字货币等实时行情场景。
333 3
秒级行情推送系统实战:从触发、采集到入库的端到端架构
|
3月前
|
JSON API 数据安全/隐私保护
Python采集淘宝评论API接口及JSON数据返回全流程指南
Python采集淘宝评论API接口及JSON数据返回全流程指南
|
存储 SQL JSON
信创迁移适配实战-MySQL到达梦数据库DM8的数据迁移
信创迁移适配实战-MySQL到达梦数据库DM8的数据迁移
6921 0
信创迁移适配实战-MySQL到达梦数据库DM8的数据迁移
|
9月前
|
人工智能 API 语音技术
EmotiVoice:网易开源AI语音合成黑科技,2000+音色情感可控
EmotiVoice是网易有道开源的多语言语音合成系统,支持中英文2000多种音色,通过提示词控制情感输出,提供Web界面和API接口,具备语音克隆等先进功能。
1079 43
EmotiVoice:网易开源AI语音合成黑科技,2000+音色情感可控
|
12月前
|
人工智能 搜索推荐 开发者
ClotheDreamer:上海大学联合腾讯等高校推出的3D服装生成技术
ClotheDreamer是由上海大学、上海交通大学、复旦大学和腾讯优图实验室联合推出的3D服装生成技术,能够根据文本描述生成高保真、可穿戴的3D服装资产,适用于虚拟试穿和物理精确动画。
258 6
ClotheDreamer:上海大学联合腾讯等高校推出的3D服装生成技术
|
数据采集 DataWorks 测试技术
DataWorks产品使用合集之如何通过REST API进行数据采集,并且自定义传入API的参数
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
422 26
|
数据采集 存储 监控
使用Java构建实时监控和警报系统的最佳实践
使用Java构建实时监控和警报系统的最佳实践