开发者社区> 问答> 正文

CDN 实时日志分析 UDF 定义是什么?

CDN 实时日志分析 UDF 定义是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:44:12 334 0
1 条回答
写回答
取消 提交回答
  • 这里我们用了刚才提到的 named function 的方式定义一个 ip_to_province() 的UDF,输入是 ip 地址,输出是地区名字字符串。我们这里描述了输入类型是一个字符串,输出类型也是一个字符串。当然这里面的查询服务仅供演示,大家在自己的生产环境要替换为可靠的地域查询服务。

    import re
    import json
    from pyFlink.table import DataTypes
    from pyFlink.table.udf import udf
    from urllib.parse import quote_plus
    from urllib.request import urlopen
    
    @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
    def ip_to_province(ip):
       """
       format:
           {
           'ip': '27.184.139.25',
           'pro': '河北省',
           'proCode': '130000',
           'city': '石家庄市',
           'cityCode': '130100',
           'region': '灵寿县',
           'regionCode': '130126',
           'addr': '河北省石家庄市灵寿县 电信',
           'regionNames': '',
           'err': ''
           }
       """
       try:
           urlobj = urlopen( \
            'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
           data = str(urlobj.read(), "gbk")
           pos = re.search("{[^{}]+\}", data).span()
           geo_data = json.loads(data[pos[0]:pos[1]])
           if geo_data['pro']:
               return geo_data['pro']
           else:
               return geo_data['err']
       except:
           return "UnKnow"
    
    2021-12-07 15:44:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
Kubernetes下日志实时采集、存储与计算实践 立即下载
日志数据采集与分析对接 立即下载