【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【云计算与大数据技术】流计算讲解及集群日志文件实时分析实战(附源码)

需要源码请点赞关注收藏后评论区留言私信~~

一、流计算概述

在传统的数据处理流程中总是先收集数据,然后将数据放到 DB中。当人们需要的 时候通过DB对数据做query,得到答案或进行相关的处理。这样看起来虽然非常合理,采用类似于 MapReduce方式的离线处理并不能很好地解决问题,结果却不理想,尤其是对一些实时搜索应用环境中的某些具体问题,这就引出了一种新的数据计算结构--流计算方式

流计算可以很好地对大规模流动数据在不断变化的运动过程中实时地 进行分析,捕捉到可能有用的信息,并把结果发送到下一计算节点

流计算包括早期的IBM System S,当前比较流行的流式计算框架Storm、Kafka

二、流计算与批处理系统对比

流计算侧重于实时计算方面,而批处理系统侧重于离线数据处理方面,一个追求的是低延迟,另外一个追求的是高吞吐量,处理的数据也不同,流计算处理的数据经常不断变化,而离线处理的数据是静态数据,输出形式也不同,总体来讲,两者的区别体现在以下几点

系统的输入包括两类数据,即实时的流式数据和静态的离线数据

系统的输出也包括流式数据和离线数据

业务的计算结果输出方式是通过两个条件决定的

三、Storm流计算系统

Storm 是一个 Twitter开源的分布式、高容错的实时计算系统

Storm 经常用于实时分析、在线机器学习、持续计算 、分布式远程调用和ETL等领域

Storm主要分为 Nimbus 和 Supervisor两种组件

下图是是Storm集群架构

Storm中每个实时计算任务表示称一个topology

四、Samza流计算系统

Apache Samza是一个分布式流处理框架

它使用 Apache Kafka用于消息发送,采 用 Apache Hadoop YARN 来提供容错、处理器隔离、安全性和资源管理,专用于实时数据的处理

Samza由以下3层构成  

数据流层(A streaming layer)  

执行层(An execution layer)  

处理层(A progressing layer)

五、阿里云流计算

Aliyun Stream Compute(阿里云流计算 )是运行在阿里云平台上的流式大数据分析平台,给用户提供在云上进行流式数据实时化分析的工具

阿里云流计算提供类标准的StreamSQL语义协助用户简单、轻松地完成流式计算逻辑的处理

六、集群日志文件的实时分析

目前分布式系统在各大生产 系统中广泛使用,监控这些分布式系统产生的日志,判断集群运行是否正常,采用流计算框架实时分析分布式系统产生的日志

以分析HDFS集群运行状态来简单说明流式计算框架的使用。当 NameNode 出现故障的时候需要及时报警,从而最大程度地减少损失

利用Flink做简单的日志文件单词统计分析,分析一个时间段内 NameNode产 生的单词统计

运行效果如下

可以根据Flink的Web界面查看SocketTextStream任务,找到对应的Flink文本统计计算节点

代码如下

package alibook.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * This example shows an implementation of WordCount with data from a text
 * socket. To run the example make sure that the service providing the text data
 * is already up and running.
 * <p>
 * To start an example socket text stream on your local machine run netcat from
 * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
 * port number.
 * </p>
 * <p>
 * Usage:
 * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
 * </p>
 * <p>
 * This example shows how to:
 * <ul>
 * <li>use StreamExecutionEnvironment.socketTextStream
 * <li>write a simple Flink program,
 * <li>write and use user-defined functions.
 * </ul>
 *
 */
public class SocketTextStream {
  public static void main(String[] args) throws Exception {
    if (!parseParameters(args)) {
      return;
    }
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();
    // get input data
    DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
    DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0)
            .sum(1);
    if (fileOutput) {
      counts.writeAsText(outputPath, WriteMode.NO_OVERWRITE);
    } else {
      counts.print();
    }
    // execute program
    env.execute("WordCount from SocketTextStream Example");
  }
  // *************************************************************************
  // UTIL METHODS
  // *************************************************************************
  private static boolean fileOutput = false;
  private static String hostName;
  private static int port;
  private static String outputPath;
  private static boolean parseParameters(String[] args) {
    // parse input arguments
    if (args.length == 3) {
      fileOutput = true;
      hostName = args[0];
      port = Integer.valueOf(args[1]);
      outputPath = args[2];
    } else if (args.length == 2) {
      hostName = args[0];
      port = Integer.valueOf(args[1]);
    } else {
      System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
      return false;
    }
    return true;
  }
  /**
   * Implements the string tokenizer that splits sentences into words as a
   * user-defined FlatMapFunction. The function takes a line (String) and
   * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
   * Integer>}).
   */
  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
        throws Exception {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("\\W+");
      // emit the pairs
      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<String, Integer>(token, 1));
        }
      }
    }
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
1月前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
146 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
20天前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
115 9
|
1月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的控制文件与归档日志文件
本文介绍了Oracle数据库中的控制文件和归档日志文件。控制文件记录了数据库的物理结构信息,如数据库名、数据文件和联机日志文件的位置等。为了保护数据库,通常会进行控制文件的多路复用。归档日志文件是联机重做日志文件的副本,用于记录数据库的变更历史。文章还提供了相关SQL语句,帮助查看和设置数据库的日志模式。
【赵渝强老师】Oracle的控制文件与归档日志文件
|
1月前
|
SQL 关系型数据库 MySQL
【赵渝强老师】MySQL的全量日志文件
MySQL全量日志记录所有操作的SQL语句,默认禁用。启用后,可通过`show variables like %general_log%检查状态,使用`set global general_log=ON`临时开启,执行查询并查看日志文件以追踪SQL执行详情。
|
1月前
|
Oracle 关系型数据库 数据库
【赵渝强老师】Oracle的参数文件与告警日志文件
本文介绍了Oracle数据库的参数文件和告警日志文件。参数文件分为初始化参数文件(PFile)和服务器端参数文件(SPFile),在数据库启动时读取并分配资源。告警日志文件记录了数据库的重要活动、错误和警告信息,帮助诊断问题。文中还提供了相关视频讲解和示例代码。
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
335 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
50 2
|
1月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
89 1
|
28天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
68 4