开发者社区> 问答> 正文

Streaming File Sink的使用问题?

Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

env.setParallelism(1); 

env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE); 

env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint"))); 

Schema schema = new Schema.Parser().parse("{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"); 

final DataStreamSource<GenericRecord> source = env.addSource(new RichSourceFunction<GenericRecord>() { 

Schema schema1; 

@Override 

public void open(Configuration parameters) throws Exception { 

schema1 = new Schema.Parser().parse("{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"); 

@Override 

public void run(SourceContext<GenericRecord> ctx) throws Exception { 

while (true) { 

Thread.sleep(2000); 

GenericRecord record = new GenericData.Record(schema1); 

record.put("name", "zhangsan"); 

ctx.collect(record); 

@Override 

public void cancel() { 

}); 

final StreamingFileSink<GenericRecord> sink = StreamingFileSink 

.forBulkFormat(new Path("file:///g:/tmp/streamsink"), ParquetAvroWriters.forGenericRecord(schema)) 

.build(); 

source.addSink(sink); 

env.execute();但是程序运行起来却报如下错误是什么原因呢com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at myflink.connector.StreamFileSinkConnector$1.run(StreamFileSinkConnector.java:37) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 22 more*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:29:23 716 0
1 条回答
写回答
取消 提交回答
  • 从报错来看,GenericRecord可能不能被序列化;感觉目前可以先用一个自定义的数据类型来传输*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:23:40
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPARK STREAMING 立即下载
Structured Streaming for Columnar Data Warehouses 立即下载
From Spark Streaming to Structured Streaming 立即下载