开发者社区> 问答> 正文

Queryable State 查询反序列化问题

我在测试Querable State功能的时候,发现 语法 dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询 但在 studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+""+val.getTaskId() , val->val.getCourseId()+""+val.getTaskId()) .process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来

错误:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41) Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

使用版本1.9.1

代码如下

状态代码 import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector;

public class QueryableStateDemo2 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

DataStream studentAnswerDataStream = env.addSource(new SourceFunction () { @Override public void run(SourceContext ctx) throws Exception { StudentAnswer studentAnswer = new StudentAnswer(); studentAnswer.setCourseId(100L); studentAnswer.setTaskId("ug01"); studentAnswer.setUserId(1L); studentAnswer.setAnswer("答案"); ctx.collect(studentAnswer); while (true) { Thread.sleep(1000 * 60); } }

@Override public void cancel() {

} });

DataStream learningStrategyDataStream = env.addSource(new SourceFunction () { @Override public void run(SourceContext ctx) throws Exception { LearningStrategy learningStrategy = new LearningStrategy(); learningStrategy.setCourseId(100L); learningStrategy.setTaskId("ug01"); ctx.collect(learningStrategy); while (true) { Thread.sleep(1000 * 60); } }

@Override public void cancel() {

} });

studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+""+val.getTaskId() , val->val.getCourseId()+""+val.getTaskId()) .process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy, String>() {

private transient ValueState leftBuffer; private transient ValueState rightBuffer;

@Override public void open(Configuration conf) { ValueStateDescriptor leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class)); leftBufferDescriptor.setQueryable("left_buffer_query");

ValueStateDescriptor rightBufferDescriptor = new ValueStateDescriptor<>( "right_buffer", TypeInformation.of(LearningStrategy.class)); rightBufferDescriptor.setQueryable("right_buffer_query");

leftBuffer = getRuntimeContext().getState(leftBufferDescriptor); rightBuffer = getRuntimeContext().getState(rightBufferDescriptor); }

@Override public void processElement1(StudentAnswer value, Context context, Collector collector) throws Exception { System.out.println("processElement1:" + value); leftBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); }

@Override public void processElement2(LearningStrategy value, Context context, Collector collector) throws Exception { System.out.println("processElement2:" + value); rightBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); } }).print("结果");

env.execute("State"); }

}

客户端代码:

import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.util.concurrent.CompletableFuture;

public class GetQueryableState2 {

public static void main(String[] args) throws Exception {

QueryableStateClient client = new QueryableStateClient("localhost", 9069);

ExecutionConfig executionConfig = new ExecutionConfig(); client.setExecutionConfig(executionConfig);

ValueStateDescriptor leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class));

String key = "100_ug01";

JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");

CompletableFuture<ValueState > resultFuture = client.getKvState(jobId, "left_buffer_query", key , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);

ValueState leftBuffer = resultFuture.get(); System.out.println("结果:"+leftBuffer.value());

// now handle the returned value // resultFuture.thenAccept(response -> // { // try { // Tuple2<String, Long> res = response.value(); // // System.out.println("Queried sum value: " + res); // // } catch (Exception e) // { // e.printStackTrace(); // } // System.out.println("Exiting future ..."); // }); Thread.sleep(1000L*10); }

}

Domain如下 public class BaseDomain implements Serializable {

protected String bn = "2019"; protected String version = "1.0";

public String getBn() { return bn; }

public void setBn(String bn) { this.bn = bn; }

public String getVersion() { return version; }

public void setVersion(String version) { this.version = version; } }

public class LearningStrategy extends BaseDomain { private Long courseId; private String taskId; private Byte pushOrder = 1;

public Long getCourseId() { return courseId; }

public void setCourseId(Long courseId) { this.courseId = courseId; }

public String getTaskId() { return taskId; }

public void setTaskId(String taskId) { this.taskId = taskId; }

public Byte getPushOrder() { return pushOrder; }

public void setPushOrder(Byte pushOrder) { this.pushOrder = pushOrder; }

@Override public String toString() { return "LearningStrategy{" + "bn='" + bn + ''' + ", version='" + version + ''' + ", courseId=" + courseId + ", taskId='" + taskId + ''' + ", pushOrder=" + pushOrder + '}'; } }

public class StudentAnswer extends BaseDomain{ private Long courseId; private String taskId; private Long userId; private String answer;

public Long getCourseId() { return courseId; }

public void setCourseId(Long courseId) { this.courseId = courseId; }

public String getTaskId() { return taskId; }

public void setTaskId(String taskId) { this.taskId = taskId; }

public Long getUserId() { return userId; }

public void setUserId(Long userId) { this.userId = userId; }

public String getAnswer() { return answer; }

public void setAnswer(String answer) { this.answer = answer; }

@Override public String toString() { return "StudentAnswer{" + "bn='" + bn + ''' + ", version='" + version + ''' + ", courseId=" + courseId + ", taskId='" + taskId + ''' + ", userId=" + userId + ", answer='" + answer + ''' + '}'; } }*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 14:50:25 2301 0
1 条回答
写回答
取消 提交回答
  • 从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier*来自志愿者整理的flink邮件归档

    2021-12-08 14:59:51
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载