开发者社区 问答 正文

CDN 实时日志分析 Connector 定义是什么?

已解决

CDN 实时日志分析 Connector 定义是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:44:53 481 分享 版权
1 条回答
写回答
取消 提交回答
  • 推荐回答

    我们完成了需求分析和 UDF 的定义,我们开始进行作业的开发了,按照通用的作业结构,需要定义 Source connector 来读取 Kafka 数据,定义 Sink connector 来将计算结果存储到 MySQL。最后是编写统计逻辑。

    在这特别说明一下,在 PyFlink 中也支持 SQL DDL 的编写,我们用一个简单的 DDL 描述,就完成了 Source Connector的开发。其中 connector.type 填写 kafka。SinkConnector 也一样,用一行DDL描述即可,其中 connector.type 填写 jdbc。描述 connector 的逻辑非常简单,我们再看看核心统计逻辑是否也一样简单:)

    kafka_source_ddl = """
    CREATE TABLE cdn_access_log (
     uuid VARCHAR,
     client_ip VARCHAR,
     request_time BIGINT,
     response_size BIGINT,
     uri VARCHAR
    ) WITH (
     'connector.type' = 'kafka',
     'connector.version' = 'universal',
     'connector.topic' = 'access_log',
     'connector.properties.zookeeper.connect' = 'localhost:2181',
     'connector.properties.bootstrap.servers' = 'localhost:9092',
     'format.type' = 'csv',
     'format.ignore-parse-errors' = 'true'
    )
    """
    
    mysql_sink_ddl = """
    CREATE TABLE cdn_access_statistic (
     province VARCHAR,
     access_count BIGINT,
     total_download BIGINT,
     download_speed DOUBLE
    ) WITH (
     'connector.type' = 'jdbc',
     'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
     'connector.table' = 'access_statistic',
     'connector.username' = 'root',
     'connector.password' = 'root',
     'connector.write.flush.interval' = '1s'
    )
    """
    
    2021-12-07 15:45:13
    赞同 展开评论