请问Flink,从迭代器创建datastream,这里的类型怎么填呢?
在Flink中,为了指定数据类型,我们需要使用TypeInformation类。针对您的问题,我们可以按照下面的方式设置类型信息:
// 获取Long类型的TypeInformation
TypeInformation<Long> typeInfo = TypeInformation.of(Long.class);
// 使用迭代器创建数据流源,并指定类型信息
DataStreamSource<Long> longDataStreamSource =
env.fromCollection(longIt, typeInfo);
在这个例子中,我们首先通过 TypeInformation.of(Class) 获取到 Long 类型的TypeInformation 实例,然后我们在env.fromCollection()方法中提供了这个类型信息。这样就可以告诉Flkink我们要从长整数类型的迭代器中读取数据,并且要把它当作Long类型来进行处理。
在Flink中,我们可以利用TypeInformation类来指定DataStream源的元素类型。当你从Java集合(如ArrayList)创建DataStream时,可以通过调用getExecutionEnvironment().fromCollection()方法并将该集合作为参数来实现这一点。
在这段代码中,我们正在创建一个Longs类型的Arraylist,然后从中获取一个iterator对象。接着我们将这个iterator对象用于env.fromCollection(longIt),从而生成一个DataStream。
由于我们在创建这个DataStream的过程中并没有明确指定它的元素类型,所以在打印流(StreamExecutionEnvironment#print())时,Flink会试图推断出这个流的实际类型。在这种情况下,它会识别出这是一个long[],即一个长整数数组。
如果我们希望显式指定这个流的类型,我们应该这样做:DataStream<Long> longDataStreamSource = env.fromCollection((Iterator<Long>) longIt);
. 这样一来,我们就清楚地告诉Flink我们的流是由Longs构成的。
在实际编程过程中,最好总是尽可能清晰明了地标记出每种数据类型,避免出现混淆不清的地方。
在 Flink 中,从迭代器创建 DataStream 时,您需要提供迭代器的类型信息以及输出类型。这里的类型应该填写为 Typelnformation,其中 OUT 是您期望的输出类型。
以下是一个示例,从 List 创建一个 DataStream:
import org.apache.flink.streaming.api.datastream.DataStream;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
public class Main {
public static void main(String[] args) {
List data = Arrays.asList("Hello", "World");
DataStream dataStream = DataStream.fromIterable(data);
}
}
CopyCopy
在这个例子中,我们使用 DataStream.fromIterable(data) 从 List 创建一个 DataStream。这里的类型信息已经由 DataStream 指定,表示输出类型为 String。
如果您提供的迭代器是泛型类型,例如 Iterator,您可以使用以下方式创建 DataStream:
import org.apache.flink.streaming.api.datastream.DataStream;
import java.util.Iterator;
import org.apache.flink.streaming.api.datastream.DataStream;
public class Main {
public static void main(String[] args) {
Iterator data = Arrays.asList("Hello", "World").iterator();
DataStream dataStream = DataStream.fromIterable(data, String.class);
}
}
CopyCopy
在这个例子中,我们使用 DataStream.fromIterable(data, String.class) 从泛型迭代器创建一个 DataStream。这里的类型信息由 String.class 指定,表示输出类型为 String。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。