开发者社区> 问答> 正文

CDN 实时日志分析 Connector 定义是什么?

已解决

CDN 实时日志分析 Connector 定义是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:44:53 457 0
1 条回答
写回答
取消 提交回答
  • 推荐回答

    我们完成了需求分析和 UDF 的定义,我们开始进行作业的开发了,按照通用的作业结构,需要定义 Source connector 来读取 Kafka 数据,定义 Sink connector 来将计算结果存储到 MySQL。最后是编写统计逻辑。

    在这特别说明一下,在 PyFlink 中也支持 SQL DDL 的编写,我们用一个简单的 DDL 描述,就完成了 Source Connector的开发。其中 connector.type 填写 kafka。SinkConnector 也一样,用一行DDL描述即可,其中 connector.type 填写 jdbc。描述 connector 的逻辑非常简单,我们再看看核心统计逻辑是否也一样简单:)

    kafka_source_ddl = """
    CREATE TABLE cdn_access_log (
     uuid VARCHAR,
     client_ip VARCHAR,
     request_time BIGINT,
     response_size BIGINT,
     uri VARCHAR
    ) WITH (
     'connector.type' = 'kafka',
     'connector.version' = 'universal',
     'connector.topic' = 'access_log',
     'connector.properties.zookeeper.connect' = 'localhost:2181',
     'connector.properties.bootstrap.servers' = 'localhost:9092',
     'format.type' = 'csv',
     'format.ignore-parse-errors' = 'true'
    )
    """
    
    mysql_sink_ddl = """
    CREATE TABLE cdn_access_statistic (
     province VARCHAR,
     access_count BIGINT,
     total_download BIGINT,
     download_speed DOUBLE
    ) WITH (
     'connector.type' = 'jdbc',
     'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
     'connector.table' = 'access_statistic',
     'connector.username' = 'root',
     'connector.password' = 'root',
     'connector.write.flush.interval' = '1s'
    )
    """
    
    2021-12-07 15:45:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
构建智能化的视频系统 阿里云CDN的进化 立即下载
CDN数据化实践 立即下载
直播CDN-X - 白山直播CDN流传递链路优化 立即下载