开发者社区> 问答> 正文

flink1.10.1 SQL 作业 netty报错如何解决?

flink1.10.1 写的 SQL 作业, 开始运行3个小时正常, checkpoint也正常. 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.

TaskManager 日志: 2020-06-1620:38:16,640INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discoveredgroup coordinator 172.16.30.165:9092 (id: 2147483645 rack: null) 2020-06-1623:27:46,026INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1). 2020-06-1623:27:46,027INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2) (ed41475641eb3c58f7504d4a16a9c19b). 2020-06-1623:27:46,034INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a). 2020-06-1623:27:46,036INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched fromRUNNING to FAILED. java.lang.Exception: Couldnot perform checkpoint 329for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(UnknownSource) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Causedby: org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0 at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1464) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1448) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(UnknownSource) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776) ... 12 more 2020-06-1623:27:46,045INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin(table=[JDBCTableSource(id, category_id_first, brand, category)], joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand ISNOTNULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369). 2020-06-1623:27:46,038INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, item_id, 1AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id ISNOTNULLAND item_id ISNOTNULL)]) (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc). 2020-06-1623:27:46,038INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, item_id, 1AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id ISNOTNULLAND item_id ISNOTNULL)]) (1/2) (32b2422a4619f4900c5a668dfb466ec9). 2020-06-1623:27:46,037INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). 2020-06-1623:27:46,050INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched fromRUNNING to FAILED. java.lang.Exception: Couldnot perform checkpoint 329for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(UnknownSource) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Causedby: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(0): Buffer1 (freed) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1451) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(UnknownSource) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776) ... 12 more 2020-06-1623:27:46,051INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). 2020-06-1623:27:46,036INFO org.apache.flink.runtime.taskman

展开
收起
游客nnqbtnagn7h6s 2021-12-06 19:56:19 488 0
1 条回答
写回答
取消 提交回答
  • 看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:24:30
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载