开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问Flink,从迭代器创建datastream,这里的类型怎么填呢?

请问Flink,从迭代器创建datastream,这里的类型怎么填呢?9d0654d7c18deead3d3569dfce605e25.png

展开
收起
真的很搞笑 2023-08-01 11:42:41 56 0
4 条回答
写回答
取消 提交回答
  • 类型选择Long.class。

    // Create a DataStream from an Iterator
    Iterator<Long> longIt = ...
    DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
    

    ——参考链接

    2024-01-25 18:01:06
    赞同 1 展开评论 打赏
  • 某政企事业单位安全运维工程师,主要从事系统运维及网络安全工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,全国数信杯数据安全大赛银奖,手握多张EDU、CNVD、CNNVD证书。

    在Flink中,为了指定数据类型,我们需要使用TypeInformation类。针对您的问题,我们可以按照下面的方式设置类型信息:

    // 获取Long类型的TypeInformation
    TypeInformation<Long> typeInfo = TypeInformation.of(Long.class);
    
    // 使用迭代器创建数据流源,并指定类型信息
    DataStreamSource<Long> longDataStreamSource = 
        env.fromCollection(longIt, typeInfo);
    

    在这个例子中,我们首先通过 TypeInformation.of(Class) 获取到 Long 类型的TypeInformation 实例,然后我们在env.fromCollection()方法中提供了这个类型信息。这样就可以告诉Flkink我们要从长整数类型的迭代器中读取数据,并且要把它当作Long类型来进行处理。

    2024-01-19 11:03:08
    赞同 展开评论 打赏
  • 在Flink中,我们可以利用TypeInformation类来指定DataStream源的元素类型。当你从Java集合(如ArrayList)创建DataStream时,可以通过调用getExecutionEnvironment().fromCollection()方法并将该集合作为参数来实现这一点。

    在这段代码中,我们正在创建一个Longs类型的Arraylist,然后从中获取一个iterator对象。接着我们将这个iterator对象用于env.fromCollection(longIt),从而生成一个DataStream。

    由于我们在创建这个DataStream的过程中并没有明确指定它的元素类型,所以在打印流(StreamExecutionEnvironment#print())时,Flink会试图推断出这个流的实际类型。在这种情况下,它会识别出这是一个long[],即一个长整数数组。

    如果我们希望显式指定这个流的类型,我们应该这样做:DataStream<Long> longDataStreamSource = env.fromCollection((Iterator<Long>) longIt);. 这样一来,我们就清楚地告诉Flink我们的流是由Longs构成的。
    在实际编程过程中,最好总是尽可能清晰明了地标记出每种数据类型,避免出现混淆不清的地方。
    image.png

    2024-01-15 14:44:24
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在 Flink 中,从迭代器创建 DataStream 时,您需要提供迭代器的类型信息以及输出类型。这里的类型应该填写为 Typelnformation,其中 OUT 是您期望的输出类型。
    以下是一个示例,从 List 创建一个 DataStream:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import java.util.List;
    import org.apache.flink.streaming.api.datastream.DataStream;
    public class Main {
    public static void main(String[] args) {
    List data = Arrays.asList("Hello", "World");
    DataStream dataStream = DataStream.fromIterable(data);
    }
    }
    CopyCopy

    在这个例子中,我们使用 DataStream.fromIterable(data) 从 List 创建一个 DataStream。这里的类型信息已经由 DataStream 指定,表示输出类型为 String。
    如果您提供的迭代器是泛型类型,例如 Iterator,您可以使用以下方式创建 DataStream:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import java.util.Iterator;
    import org.apache.flink.streaming.api.datastream.DataStream;
    public class Main {
    public static void main(String[] args) {
    Iterator data = Arrays.asList("Hello", "World").iterator();
    DataStream dataStream = DataStream.fromIterable(data, String.class);
    }
    }
    CopyCopy

    在这个例子中,我们使用 DataStream.fromIterable(data, String.class) 从泛型迭代器创建一个 DataStream。这里的类型信息由 String.class 指定,表示输出类型为 String。

    2024-01-12 22:08:48
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载