开发者社区> 问答> 正文

flink sql写mysql中文有乱码问题怎么办?

我的flink sql作业如下

SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM(

mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;

运行起来后写入mysql表的数据带有中文乱码 ??????

查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-01 17:28:36 1974 0
1 条回答
写回答
取消 提交回答
  • 你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样
    CREATE TABLE jdbc_sink(
    id INT COMMENT '订单id',
    goods_name VARCHAR(128) COMMENT '商品名称',
    price DECIMAL(32,2) COMMENT '商品价格',
    user_name VARCHAR(64) COMMENT '用户名称'
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'mysqluser',
    'password' = 'mysqluser',
    'table-name' = 'jdbc_sink'
    )*来自志愿者整理的flink邮件归档

    2021-12-01 18:27:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载

相关镜像