学习笔记
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。
1. 从集合读
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 从集合读 // DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3)); // 2. 直接填元素 DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4); source.print(); env.execute(); }
2. 从文件读取
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileSource<String> source = FileSource.forRecordStreamFormat( new TextLineInputFormat(), new Path("input/world.txt")) .build(); env .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource") .print(); env.execute(); }
3. 从 socket 读取
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost", 7777); source.print(); env.execute(); }
可以使用 nc -l 7777
创建一个监听链接的 tcp
4. 从 kafka 读取
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092") .setTopics("topic_1") .setGroupId("atguigu") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source"); stream.print("Kafka"); env.execute(); }
5. 从数据生成器读取数据
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, 10, // 自动生成的数字序列 RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条 Types.STRING // 返回类型 ); env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print(); env.execute(); }