【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

需要源码请点赞关注收藏后评论区留言私信~~

一、流计算概述

在传统的数据处理流程中总是先收集数据,然后将数据放到 DB中。当人们需要的 时候通过DB对数据做query,得到答案或进行相关的处理。这样看起来虽然非常合理,采用类似于 MapReduce方式的离线处理并不能很好地解决问题,结果却不理想,尤其是对一些实时搜索应用环境中的某些具体问题,这就引出了一种新的数据计算结构--流计算方式

流计算可以很好地对大规模流动数据在不断变化的运动过程中实时地 进行分析,捕捉到可能有用的信息,并把结果发送到下一计算节点

流计算包括早期的IBM System S,当前比较流行的流式计算框架Storm、Kafka

二、流计算与批处理系统对比

流计算侧重于实时计算方面,而批处理系统侧重于离线数据处理方面,一个追求的是低延迟,另外一个追求的是高吞吐量,处理的数据也不同,流计算处理的数据经常不断变化,而离线处理的数据是静态数据,输出形式也不同,总体来讲,两者的区别体现在以下几点

系统的输入包括两类数据,即实时的流式数据和静态的离线数据

系统的输出也包括流式数据和离线数据

业务的计算结果输出方式是通过两个条件决定的

三、Storm流计算系统

Storm 是一个 Twitter开源的分布式、高容错的实时计算系统

Storm 经常用于实时分析、在线机器学习、持续计算 、分布式远程调用和ETL等领域

Storm主要分为 Nimbus 和 Supervisor两种组件

下图是是Storm集群架构

Storm中每个实时计算任务表示称一个topology

四、Samza流计算系统

Apache Samza是一个分布式流处理框架

它使用 Apache Kafka用于消息发送,采 用 Apache Hadoop YARN 来提供容错、处理器隔离、安全性和资源管理,专用于实时数据的处理

Samza由以下3层构成  

数据流层(A streaming layer)  

执行层(An execution layer)  

处理层(A progressing layer)

五、阿里云流计算

Aliyun Stream Compute(阿里云流计算 )是运行在阿里云平台上的流式大数据分析平台,给用户提供在云上进行流式数据实时化分析的工具

阿里云流计算提供类标准的StreamSQL语义协助用户简单、轻松地完成流式计算逻辑的处理

六、集群日志文件的实时分析

目前分布式系统在各大生产 系统中广泛使用,监控这些分布式系统产生的日志,判断集群运行是否正常,采用流计算框架实时分析分布式系统产生的日志

以分析HDFS集群运行状态来简单说明流式计算框架的使用。当 NameNode 出现故障的时候需要及时报警,从而最大程度地减少损失

利用Flink做简单的日志文件单词统计分析,分析一个时间段内 NameNode产 生的单词统计

运行效果如下

可以根据Flink的Web界面查看SocketTextStream任务,找到对应的Flink文本统计计算节点

代码如下

package alibook.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * This example shows an implementation of WordCount with data from a text
 * socket. To run the example make sure that the service providing the text data
 * is already up and running.
 * <p>
 * To start an example socket text stream on your local machine run netcat from
 * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
 * port number.
 * </p>
 * <p>
 * Usage:
 * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
 * </p>
 * <p>
 * This example shows how to:
 * <ul>
 * <li>use StreamExecutionEnvironment.socketTextStream
 * <li>write a simple Flink program,
 * <li>write and use user-defined functions.
 * </ul>
 *
 */
public class SocketTextStream {
  public static void main(String[] args) throws Exception {
    if (!parseParameters(args)) {
      return;
    }
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();
    // get input data
    DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
    DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0)
            .sum(1);
    if (fileOutput) {
      counts.writeAsText(outputPath, WriteMode.NO_OVERWRITE);
    } else {
      counts.print();
    }
    // execute program
    env.execute("WordCount from SocketTextStream Example");
  }
  // *************************************************************************
  // UTIL METHODS
  // *************************************************************************
  private static boolean fileOutput = false;
  private static String hostName;
  private static int port;
  private static String outputPath;
  private static boolean parseParameters(String[] args) {
    // parse input arguments
    if (args.length == 3) {
      fileOutput = true;
      hostName = args[0];
      port = Integer.valueOf(args[1]);
      outputPath = args[2];
    } else if (args.length == 2) {
      hostName = args[0];
      port = Integer.valueOf(args[1]);
    } else {
      System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
      return false;
    }
    return true;
  }
  /**
   * Implements the string tokenizer that splits sentences into words as a
   * user-defined FlatMapFunction. The function takes a line (String) and
   * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
   * Integer>}).
   */
  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
        throws Exception {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("\\W+");
      // emit the pairs
      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<String, Integer>(token, 1));
        }
      }
    }
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
17天前
|
机器学习/深度学习 人工智能 运维
智能日志分析:用AI点亮运维的未来
智能日志分析:用AI点亮运维的未来
132 15
|
1月前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
|
2月前
|
存储 运维 监控
Linux--深入理与解linux文件系统与日志文件分析
深入理解 Linux 文件系统和日志文件分析,对于系统管理员和运维工程师来说至关重要。文件系统管理涉及到文件的组织、存储和检索,而日志文件则记录了系统和应用的运行状态,是排查故障和维护系统的重要依据。通过掌握文件系统和日志文件的管理和分析技能,可以有效提升系统的稳定性和安全性。
56 7
|
2月前
|
监控 安全 Linux
启用Linux防火墙日志记录和分析功能
为iptables启用日志记录对于监控进出流量至关重要
|
2月前
|
监控 应用服务中间件 定位技术
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
198 3
|
3月前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
3月前
|
存储 弹性计算 分布式计算
云计算在大数据处理中的优势与挑战
云计算在大数据处理中的优势与挑战
|
3月前
|
存储 SQL 监控
|
3月前
|
运维 监控 安全
|
3月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
921 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板