pyflink 连接kafka 不定字段的json反序列化,有没有比较好的写法?
使用 PyFlink 连接 Kafka 并处理不固定字段的 JSON 数据,有以下几种比较好的写法:
1. 使用 Apache Flink 的 JsonSchema
JsonSchema
是 Apache Flink 提供的一个类,它可以帮助你解析 JSON 数据,即使字段是不固定的。示例代码如下:
from pyflink.datastream import DataStream
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.table import Schema, DataTypes
from pyflink.table.sources import TableSource
class UnboundedJsonSource(TableSource):
def __init__(self, topic, bootstrap_servers, group_id):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
def get_data_stream(self, context):
json_schema = Schema.new_builder() \
.column("name", DataTypes.STRING()) \
.column("age", DataTypes.INT()) \
.column("city", DataTypes.STRING()) \
.column("occupation", DataTypes.STRING()) \
.build()
kafka_consumer = FlinkKafkaConsumer(
topics=self.topic,
deserialization_schema=json_schema,
properties={'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id})
return context.create_data_stream(kafka_consumer)
# 创建 TableSource
json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group")
# 创建 TableEnvironment
t_env = TableEnvironment.create()
# 注册 TableSource
t_env.register_table_source("json_table", json_source)
# 查询 Table
result_table = t_env.scan("json_table")
# 输出结果
result_table.execute().print()
2. 使用第三方库,如 jsonschema
jsonschema
是一个流行的 Python 库,用于验证和解析 JSON 数据。它可以帮助你动态地创建 Python 类,这些类可以根据给定的 JSON 模式解析 JSON 数据。示例代码如下:
import jsonschema
from pyflink.datastream import DataStream
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.table import Schema, DataTypes
from pyflink.table.sources import TableSource
class UnboundedJsonSource(TableSource):
def __init__(self, topic, bootstrap_servers, group_id, json_schema):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.json_schema = json_schema
def get_data_stream(self, context):
json_validator = jsonschema.Draft7Validator(self.json_schema)
def deserialize_json(message):
try:
json_validator.validate(message)
return message
except jsonschema.ValidationError:
return None
kafka_consumer = FlinkKafkaConsumer(
topics=self.topic,
deserialization_schema=deserialize_json,
properties={'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id})
return context.create_data_stream(kafka_consumer)
# 创建 JSON 模式
json_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"city": {"type": "string"},
"occupation": {"type": "string"}
}
}
# 创建 TableSource
json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group", json_schema=json_schema)
# 创建 TableEnvironment
t_env = TableEnvironment.create()
# 注册 TableSource
t_env.register_table_source("json_table", json_source)
# 查询 Table
result_table = t_env.scan("json_table")
# 输出结果
result_table.execute().print()
3. 使用自定义反序列化函数
你还可以编写自己的自定义反序列化函数来解析不固定字段的 JSON 数据。示例代码如下:
import json
from pyflink.datastream import DataStream
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.table import Schema, DataTypes
from pyflink.table.sources import TableSource
class UnboundedJsonSource(TableSource):
def __init__(self, topic, bootstrap_servers, group_id):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
def get_data_stream(self, context):
def deserialize_json(message):
data = json.loads(message)
return (data.get("name"), data.get("age"), data.get("city"), data.get("occupation"))
kafka_consumer = FlinkKafkaConsumer(
topics=self.topic,
deserialization_schema=deserialize_json,
properties={'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id})
return context.create_data_stream(kafka_consumer)
# 创建 TableSource
json_source = UnboundedJsonSource(topic="my-topic", bootstrap_servers="localhost:9092", group_id="my-group")
# 创建 TableEnvironment
t_env = TableEnvironment.create()
# 注册 TableSource
t_env.register_table_source("json_table", json_source)
# 查询 Table
result_table = t_env.scan("json_table")
# 输出结果
result_table.execute().print()
选择哪种方法取决于你的具体需求和偏好。JsonSchema
是 Apache Flink 提供的一个方便的选项,但它可能不适合所有情况。第三方库(如 jsonschema
)提供了更多的灵活性,但需要额外的依赖项。自定义反序列化函数给你最大的控制权,但可能需要更多的编码工作。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。