【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

需要源码请点赞关注收藏后评论区留言私信~~

一、流计算概述

在传统的数据处理流程中总是先收集数据,然后将数据放到 DB中。当人们需要的 时候通过DB对数据做query,得到答案或进行相关的处理。这样看起来虽然非常合理,采用类似于 MapReduce方式的离线处理并不能很好地解决问题,结果却不理想,尤其是对一些实时搜索应用环境中的某些具体问题,这就引出了一种新的数据计算结构--流计算方式

流计算可以很好地对大规模流动数据在不断变化的运动过程中实时地 进行分析,捕捉到可能有用的信息,并把结果发送到下一计算节点

流计算包括早期的IBM System S,当前比较流行的流式计算框架Storm、Kafka

二、流计算与批处理系统对比

流计算侧重于实时计算方面,而批处理系统侧重于离线数据处理方面,一个追求的是低延迟,另外一个追求的是高吞吐量,处理的数据也不同,流计算处理的数据经常不断变化,而离线处理的数据是静态数据,输出形式也不同,总体来讲,两者的区别体现在以下几点

系统的输入包括两类数据,即实时的流式数据和静态的离线数据

系统的输出也包括流式数据和离线数据

业务的计算结果输出方式是通过两个条件决定的

三、Storm流计算系统

Storm 是一个 Twitter开源的分布式、高容错的实时计算系统

Storm 经常用于实时分析、在线机器学习、持续计算 、分布式远程调用和ETL等领域

Storm主要分为 Nimbus 和 Supervisor两种组件

下图是是Storm集群架构

Storm中每个实时计算任务表示称一个topology

四、Samza流计算系统

Apache Samza是一个分布式流处理框架

它使用 Apache Kafka用于消息发送,采 用 Apache Hadoop YARN 来提供容错、处理器隔离、安全性和资源管理,专用于实时数据的处理

Samza由以下3层构成  

数据流层(A streaming layer)  

执行层(An execution layer)  

处理层(A progressing layer)

五、阿里云流计算

Aliyun Stream Compute(阿里云流计算 )是运行在阿里云平台上的流式大数据分析平台,给用户提供在云上进行流式数据实时化分析的工具

阿里云流计算提供类标准的StreamSQL语义协助用户简单、轻松地完成流式计算逻辑的处理

六、集群日志文件的实时分析

目前分布式系统在各大生产 系统中广泛使用,监控这些分布式系统产生的日志,判断集群运行是否正常,采用流计算框架实时分析分布式系统产生的日志

以分析HDFS集群运行状态来简单说明流式计算框架的使用。当 NameNode 出现故障的时候需要及时报警,从而最大程度地减少损失

利用Flink做简单的日志文件单词统计分析,分析一个时间段内 NameNode产 生的单词统计

运行效果如下

可以根据Flink的Web界面查看SocketTextStream任务,找到对应的Flink文本统计计算节点

代码如下

package alibook.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * This example shows an implementation of WordCount with data from a text
 * socket. To run the example make sure that the service providing the text data
 * is already up and running.
 * <p>
 * To start an example socket text stream on your local machine run netcat from
 * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
 * port number.
 * </p>
 * <p>
 * Usage:
 * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
 * </p>
 * <p>
 * This example shows how to:
 * <ul>
 * <li>use StreamExecutionEnvironment.socketTextStream
 * <li>write a simple Flink program,
 * <li>write and use user-defined functions.
 * </ul>
 *
 */
public class SocketTextStream {
  public static void main(String[] args) throws Exception {
    if (!parseParameters(args)) {
      return;
    }
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();
    // get input data
    DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
    DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0)
            .sum(1);
    if (fileOutput) {
      counts.writeAsText(outputPath, WriteMode.NO_OVERWRITE);
    } else {
      counts.print();
    }
    // execute program
    env.execute("WordCount from SocketTextStream Example");
  }
  // *************************************************************************
  // UTIL METHODS
  // *************************************************************************
  private static boolean fileOutput = false;
  private static String hostName;
  private static int port;
  private static String outputPath;
  private static boolean parseParameters(String[] args) {
    // parse input arguments
    if (args.length == 3) {
      fileOutput = true;
      hostName = args[0];
      port = Integer.valueOf(args[1]);
      outputPath = args[2];
    } else if (args.length == 2) {
      hostName = args[0];
      port = Integer.valueOf(args[1]);
    } else {
      System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
      return false;
    }
    return true;
  }
  /**
   * Implements the string tokenizer that splits sentences into words as a
   * user-defined FlatMapFunction. The function takes a line (String) and
   * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
   * Integer>}).
   */
  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
        throws Exception {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("\\W+");
      // emit the pairs
      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<String, Integer>(token, 1));
        }
      }
    }
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
6月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
7月前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
8月前
|
存储 弹性计算 分布式计算
云端智链:挖掘云计算中的大数据潜能
云端智链:挖掘云计算中的大数据潜能
197 21
|
8月前
|
安全 大数据 虚拟化
随着云计算和大数据技术的发展,Hyper-V在虚拟化领域的地位日益凸显
随着云计算和大数据技术的发展,Hyper-V在虚拟化领域的地位日益凸显。作为Windows Server的核心组件,Hyper-V具备卓越的技术性能,支持高可用性、动态迁移等功能,确保虚拟机稳定高效运行。它与Windows深度集成,管理便捷,支持远程管理和自动化部署,降低管理成本。内置防火墙、RBAC等安全功能,提供全方位安全保障。作为内置组件,Hyper-V无需额外购买软件,降低成本。其广泛的生态系统支持和持续增长的市场需求,使其成为企业虚拟化解决方案的首选。
|
9月前
|
人工智能 大数据
阿里云云计算ACA、大数据ACA、人工智能ACA三门认证升级调整公告
阿里云云计算ACA、大数据ACA、人工智能ACA三门认证升级调整公告
|
9月前
|
存储 分布式计算 大数据
大数据与云计算:无缝结合,开启数据新纪元
大数据与云计算:无缝结合,开启数据新纪元
677 11
|
8月前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
208 0
|
6月前
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
784 54
|
11月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
334 9