我在测试Querable State功能的时候,发现 语法 dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询 但在 studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+""+val.getTaskId() , val->val.getCourseId()+""+val.getTaskId()) .process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来
错误:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41) Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
使用版本1.9.1
代码如下
状态代码 import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector;
public class QueryableStateDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
DataStream studentAnswerDataStream = env.addSource(new SourceFunction () { @Override public void run(SourceContext ctx) throws Exception { StudentAnswer studentAnswer = new StudentAnswer(); studentAnswer.setCourseId(100L); studentAnswer.setTaskId("ug01"); studentAnswer.setUserId(1L); studentAnswer.setAnswer("答案"); ctx.collect(studentAnswer); while (true) { Thread.sleep(1000 * 60); } }
@Override public void cancel() {
} });
DataStream learningStrategyDataStream = env.addSource(new SourceFunction () { @Override public void run(SourceContext ctx) throws Exception { LearningStrategy learningStrategy = new LearningStrategy(); learningStrategy.setCourseId(100L); learningStrategy.setTaskId("ug01"); ctx.collect(learningStrategy); while (true) { Thread.sleep(1000 * 60); } }
@Override public void cancel() {
} });
studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+""+val.getTaskId() , val->val.getCourseId()+""+val.getTaskId()) .process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy, String>() {
private transient ValueState leftBuffer; private transient ValueState rightBuffer;
@Override public void open(Configuration conf) { ValueStateDescriptor leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class)); leftBufferDescriptor.setQueryable("left_buffer_query");
ValueStateDescriptor rightBufferDescriptor = new ValueStateDescriptor<>( "right_buffer", TypeInformation.of(LearningStrategy.class)); rightBufferDescriptor.setQueryable("right_buffer_query");
leftBuffer = getRuntimeContext().getState(leftBufferDescriptor); rightBuffer = getRuntimeContext().getState(rightBufferDescriptor); }
@Override public void processElement1(StudentAnswer value, Context context, Collector collector) throws Exception { System.out.println("processElement1:" + value); leftBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); }
@Override public void processElement2(LearningStrategy value, Context context, Collector collector) throws Exception { System.out.println("processElement2:" + value); rightBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); } }).print("结果");
env.execute("State"); }
}
客户端代码:
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.util.concurrent.CompletableFuture;
public class GetQueryableState2 {
public static void main(String[] args) throws Exception {
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
ExecutionConfig executionConfig = new ExecutionConfig(); client.setExecutionConfig(executionConfig);
ValueStateDescriptor leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class));
String key = "100_ug01";
JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");
CompletableFuture<ValueState > resultFuture = client.getKvState(jobId, "left_buffer_query", key , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);
ValueState leftBuffer = resultFuture.get(); System.out.println("结果:"+leftBuffer.value());
// now handle the returned value // resultFuture.thenAccept(response -> // { // try { // Tuple2<String, Long> res = response.value(); // // System.out.println("Queried sum value: " + res); // // } catch (Exception e) // { // e.printStackTrace(); // } // System.out.println("Exiting future ..."); // }); Thread.sleep(1000L*10); }
}
Domain如下 public class BaseDomain implements Serializable {
protected String bn = "2019"; protected String version = "1.0";
public String getBn() { return bn; }
public void setBn(String bn) { this.bn = bn; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; } }
public class LearningStrategy extends BaseDomain { private Long courseId; private String taskId; private Byte pushOrder = 1;
public Long getCourseId() { return courseId; }
public void setCourseId(Long courseId) { this.courseId = courseId; }
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
public Byte getPushOrder() { return pushOrder; }
public void setPushOrder(Byte pushOrder) { this.pushOrder = pushOrder; }
@Override public String toString() { return "LearningStrategy{" + "bn='" + bn + ''' + ", version='" + version + ''' + ", courseId=" + courseId + ", taskId='" + taskId + ''' + ", pushOrder=" + pushOrder + '}'; } }
public class StudentAnswer extends BaseDomain{ private Long courseId; private String taskId; private Long userId; private String answer;
public Long getCourseId() { return courseId; }
public void setCourseId(Long courseId) { this.courseId = courseId; }
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getAnswer() { return answer; }
public void setAnswer(String answer) { this.answer = answer; }
@Override public String toString() { return "StudentAnswer{" + "bn='" + bn + ''' + ", version='" + version + ''' + ", courseId=" + courseId + ", taskId='" + taskId + ''' + ", userId=" + userId + ", answer='" + answer + ''' + '}'; } }*来自志愿者整理的flink邮件归档
从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。