开发者社区> 问答> 正文

JDBC 并发写入量大时挂掉?

测试发了10个线程,每个线程1000次,一共1万条记录 会在写入几千条的时候挂掉 2020-10-29 12:04:55,573 WARN org.apache.flink.runtime.taskmanager.Task [] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID, PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0, CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY, CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID, PAYER_CUSTOMER_REQUEST_NO, OPE RATOR_NAME, CARD_HOLDER_NAME, ID1, CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO, MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id, ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date, PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS external_no, STA TUS0 AS pay_status, STATUS AS order_status, PAY_AMOUNT AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way, GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS productdesc, GOODS_DESC AS goods_des c, CUSTOMER_BIZ_REQUEST_NO AS customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo, EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code, CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato r, CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS payerupdatetime, CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS coupon_amount]) -> Sink: Sink(table=[default_catalog.default_database.wide_table_1], fields=[id, op_id, order_date, complete_date, payer_customer_request_no, pay_t ime, customer_request_no, external_no, pay_status, order_status, pay_amount, payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc, customer_biz_request_no, goods_ext_info, memo, extend_info, channel_ret_code, c hannel_ret_msg, operator, customer_no, customer_name, extend, payercreatetime, payerupdatetime, card_no, card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1) (14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING to FAILED. java.io.IOException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86) ~[flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at StreamExecCalc$147.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305) ~[flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:278) ~[flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115) ~[flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.12-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73] Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[flink-table_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:310) ~[flink-table_2.12-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.getPrimaryKey(JdbcDynamicOutputFormatBuilder.java:216) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createRowKeyExtractor$7(JdbcDynamicOutputFormatBuilder.java:193) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcDynamicOutputFormatBuilder.java:128) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:99) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:154) ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2] ... 32 more*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-05 21:31:45 877 0
1 条回答
写回答
取消 提交回答
  • 看起来是这个bug,已经在1.11.3上修复,你可以自己 build 下 release-1.11 分支。 https://issues.apache.org/jira/browse/FLINK-19423*来自志愿者整理的flink

    2021-12-05 22:29:01
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载