开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

我是个pyflink初学者 根据范例写的一个过程但运行出错 求教各位大佬

已解决


import os
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.jdbc import JdbcSink
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, \
    RollingPolicy
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions

from pyflink.table import TableEnvironment, EnvironmentSettings


class AddSuffixProcessFunction:

    def process_element(value: str = ''):
        return [value + ' -->']




# @logger.catch
def process_files(dir_input, out_db_conf):
    # 创建 Flink 流执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)

    # jar
    env.add_jars("file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar")
    env.add_jars("file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar")

    # 设置并行度
    env.set_parallelism(4)

    t_env = TableEnvironment.create(EnvironmentSettings.new_instance().in_batch_mode().build())

    # 获取输入目录中的所有文件
    input_files = [os.path.join(dir_input, f) for f in os.listdir(dir_input) if
                   os.path.isfile(os.path.join(dir_input, f))]

    input_files = input_files[:4]

    for input_file in input_files:
        # 定义文件的输出路径
        base_name = os.path.basename(input_file)

        # 创建文件源
        file_source = FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                          input_file).process_static_file_set().build()

        ds = env.from_source(
            source=file_source,
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )

        ds = ds.map(lambda x: AddSuffixProcessFunction.process_element(x), output_type=Types.ROW([Types.STRING()]))

        # 定义JDBC Sink
        jdbc_url = out_db_conf['url']
        jdbc_table = out_db_conf['table']
        jdbc_driver = out_db_conf['driver']
        jdbc_user = out_db_conf['user']
        jdbc_password = out_db_conf['password']

        # 构建JdbcConnectionOptions
        jdbc_connection_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
            .with_url(jdbc_url) \
            .with_driver_name(jdbc_driver) \
            .with_user_name(jdbc_user) \
            .with_password(jdbc_password) \
            .build()

        # 构建JdbcExecutionOptions
        jdbc_execution_options = JdbcExecutionOptions.builder() \
            .with_batch_size(1000) \
            .with_batch_interval_ms(200) \
            .with_max_retries(3) \
            .build()

        # SQL语句
        sql_dml_statement = "INSERT INTO " + jdbc_table + " (rowdata) VALUES (?)"

        # 构建JdbcSink
        jdbc_sink = JdbcSink.sink(
            sql_dml_statement,
            Types.ROW([Types.STRING()]),  # 指定输出类型
            jdbc_execution_options=jdbc_execution_options,
            jdbc_connection_options=jdbc_connection_options,
        )

        # 将JdbcSink添加到数据流中
        ds.sink_to(jdbc_sink)

    # 执行流处理
    env.execute("File Processing")


# 示例使用
dir_input = '/data/cw/wdproc/_data/testin'
dir_output = '/data/cw/wdproc/_data/testout'

out_db_conf = {
    'url': 'jdbc:postgresql://localhost:5432/etlr1',
    'table': 'td1',
    'driver': 'org.postgresql.Driver',
    'user': 'postgres',
    'password': 'pgtest'
}

if __name__ == "__main__":
    process_files(dir_input, out_db_conf)

出现以下错误:

python p0_flink/test_db.py
Traceback (most recent call last):
  File "/data/cw/wdproc/p0_flink/test_db.py", line 111, in <module>
    process_files(dir_input, out_db_conf)
  File "/data/cw/wdproc/p0_flink/test_db.py", line 84, in process_files
    jdbc_sink = JdbcSink.sink(
  File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/pyflink/datastream/connectors/jdbc.py", line 60, in sink
    j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
  File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o124.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
        at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)

听说应该配置配套的连接器和驱动版本,但我只能找到
env.add_jars("file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar")
env.add_jars("file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar")

请原谅我java不熟悉 用的是默认的jdk11和python3.9

展开
收起
1868405305143499 2024-07-18 22:48:21 59 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长
    采纳回答

    PyFlink 版本与 Flink 版本不兼容,或者缺少相应的依赖

    更新 PyFlink:确保你使用的 PyFlink 版本与 Flink 版本相匹配

    pip install apache-flink==1.19.0 # 确保版本号与你的 Flink 版本一致

    添加依赖:确保所有必要的依赖都已经添加到环境中

    
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import JdbcSinkProvider
    
    env = StreamExecutionEnvironment.get_execution_environment()
    
    env.add_jars([
        "file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar",
        "file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar"
    ])
    
    # 配置 JDBC 连接器
    jdbc_sink = JdbcSink.sink(
        "jdbc:postgresql://your-database-url",
        "your_username",
        "your_password",
        "your_table",
        JdbcSinkProvider(
            drivername="org.postgresql.Driver",
            dburl="jdbc:postgresql://your-database-url",
            dbtable="your_table",
            username="your_username",
            password="your_password",
            data_types=[DataTypes.STRING(), DataTypes.INT(), DataTypes.DOUBLE()]
        )
    )
    
    2024-07-19 08:39:59
    赞同 4 展开评论 打赏
  • Java开发

    这个是很常见的问题,调用的组件版本跟调用的方法不是同一个版本,也就是版本问题。

    2024-07-19 08:39:59
    赞同 6 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载