开发者社区> 问答> 正文

CDN 实时日志分析完整代码是什么?

CDN 实时日志分析完整代码是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:45:56 443 0
1 条回答
写回答
取消 提交回答
  • 我们在整体看一遍完整代码,首先是核心依赖的导入,然后是我们需要创建一个ENV,并设置采用的 planner(目前Flink支持Flink和blink两套 planner)建议大家采用 blink planner。

    接下来将我们刚才描述的 kafka 和 mysql 的 ddl 进行表的注册。再将 Python UDF 进行注册,这里特别提醒一点,UDF所依赖的其他文件也可以在API里面进行制定,这样在job提交时候会一起提交到集群。然后是核心的统计逻辑,最后调用 executre 提交作业。这样一个实际的CDN日志实时分析的作业就开发完成了。我们再看一下实际的统计效果

    import os
    
    from pyFlink.datastream import StreamExecutionEnvironment
    from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
    from enjoyment.cdn.cdn_udf import ip_to_province
    from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
    
    # 创建Table Environment, 并选择使用的Planner
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(
       env,
       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
    
    # 创建Kafka数据源表
    t_env.sql_update(kafka_source_ddl)
    # 创建MySql结果表
    t_env.sql_update(mysql_sink_ddl)
    
    # 注册IP转换地区名称的UDF
    t_env.register_function("ip_to_province", ip_to_province)
    
    # 添加依赖的Python文件
    t_env.add_Python_file(
        os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
    t_env.add_Python_file(os.path.dirname(
        os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")
    
    # 核心的统计逻辑
    t_env.from_path("cdn_access_log")\
       .select("uuid, "
               "ip_to_province(client_ip) as province, " # IP 转换为地区名称
               "response_size, request_time")\
       .group_by("province")\
       .select( # 计算访问量
               "province, count(uuid) as access_count, " 
               # 计算下载总量 
               "sum(response_size) as total_download,  " 
               # 计算下载速度
               "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
       .insert_into("cdn_access_statistic")
    
    # 执行作业
    t_env.execute("pyFlink_parse_cdn_log")
    
    2021-12-07 15:46:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
Kubernetes下日志实时采集、存储与计算实践 立即下载
日志数据采集与分析对接 立即下载