flink输出到es、redis、mysql、kafka、file
文章目录
配置pom文件
公共实体类
KafkaSInk
ElasticsearchSink(EsSink)
RedisSink
MysqlSink(JdbcSink)
FileSink
自己先准备一下相关环境
配置pom文件
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.13.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.12</scala.binary.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <!-- 引入 Flink 相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 引入日志管理相关依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> <!-- 引入kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <!-- redis依赖 --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <!-- ES依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Mysql依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
公共实体类
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @NoArgsConstructor @ToString @AllArgsConstructor public class UserEvent { private String userName; private String url; private Long timestemp; }
KafkaSInk
将数据输出到kafka中,先启动kafka consumer,再运行程序
import com.event.UserEvent; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.lang.reflect.Array; import java.util.Arrays; import java.util.Properties; public class KafkaSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); //kafka相关配置 properties.setProperty("bootstrap.servers", "hadoop01:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource<String> stream = env.fromCollection(Arrays.asList( "xiaoming,www.baidu.com,1287538716253", "Mr Li,www.baidu.com,1287538710000", "Mr Zhang,www.baidu.com,1287538710900" )); SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { //输出规则 String[] split = value.split(","); return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString(); } }); //启动kafkaconsumer指令 // ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events result.addSink(new FlinkKafkaProducer<String>( //kafka所在地址 "hadoop01:9092", //指定输出的topic "events", new SimpleStringSchema() )); env.execute(); } }
运行结果
ElasticsearchSink(EsSink)
将数据输出到elasticsearch中
示例代码
import com.event.UserEvent; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.table.descriptors.Elasticsearch; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.Arrays; import java.util.HashMap; import java.util.List; public class EsSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserEvent> userEventDataStreamSource = env.fromCollection( Arrays.asList( new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L), new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L), new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()), new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L), new UserEvent("mary", "path?checkParam", System.currentTimeMillis()), new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L) )); //定义host列表 List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop01", 9200)); //定义ElasticsearchSinkFunction ElasticsearchSinkFunction<UserEvent> elasticsearchSinkFunction = new ElasticsearchSinkFunction<UserEvent>() { @Override public void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { IndexRequest indexRequest = Requests.indexRequest() .index("events") .type("type") .source(new HashMap<String, String>() {{ put(userEvent.getUserName(), userEvent.getUrl()); }}); requestIndexer.add(indexRequest); } }; //写入es userEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build()); env.execute(); } }
指令
GET _cat/indices GET _cat/indices/events GET events/_search
运行结果
RedisSink
将数据输出到Redis
示例代码
import com.event.UserEvent; import my.test.source.CustomSouce; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class RedisSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<UserEvent> streamSource = env.addSource(new CustomSouce()); //创建jedis连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("master") .setTimeout(10000) .setPort(6379) .build(); //写到redis streamSource.addSink(new RedisSink<>(config, new MyRedisMapper())); env.execute(); } public static class MyRedisMapper implements RedisMapper<UserEvent>{ @Override public RedisCommandDescription getCommandDescription() { //写入方式为hset return new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里 } @Override public String getKeyFromData(UserEvent userEvent) { return userEvent.getUserName(); } @Override public String getValueFromData(UserEvent userEvent) { return userEvent.getUrl(); } } }
自定义source
import com.event.UserEvent; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Calendar; import java.util.Random; public class CustomSouce implements SourceFunction<UserEvent> { // 声明一个布尔变量,作为控制数据生成的标识位 private Boolean running = true; @Override public void run(SourceContext<UserEvent> ctx) throws Exception { Random random = new Random(); // 在指定的数据集中随机选取数据 String[] users = {"Mary", "Alice", "Bob", "Cary"}; String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"}; while (running) { ctx.collect(new UserEvent( users[random.nextInt(users.length)], urls[random.nextInt(urls.length)], Calendar .getInstance().getTimeInMillis() )); // 隔 1 秒生成一个点击事件,方便观测 Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
运行结果
因为上述source是一个无界流,所以数据一直会变化
MysqlSink(JdbcSink)
将数据输出到mysql
表结构
create table events( user_name varchar(20) not null, url varchar(100) not null );
示例代码
import com.event.UserEvent; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.List; public class MysqlSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //一组数据 DataStreamSource<UserEvent> userEventDataStreamSource = env.fromCollection( Arrays.asList( new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L), new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L), new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()), new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L), new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()), new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L) )); userEventDataStreamSource.addSink(JdbcSink.sink( //要执行的sql语句 "INSERT INTO events (user_name, url) VALUES(?, ?)", new JdbcStatementBuilder<UserEvent>() { @Override public void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException { //sql占位符赋值 preparedStatement.setString(1, userEvent.getUserName()); preparedStatement.setString(2, userEvent.getUrl()); } }, //jdbc相关参数配置 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://hadoop01:3306/mysql") .withUsername("root") .withPassword("123456") .withDriverName("com.mysql.jdbc.Driver") .build() )); env.execute(); } }
当程序运行结束之后可以看到mysql的events表里面多了数据
FileSink
将数据输出到文件中(可以输出分区文件)
import com.event.UserEvent; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.util.TimeUtils; import java.util.Arrays; import java.util.concurrent.TimeUnit; public class FileSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserEvent> userEventDataStreamSource = env.fromCollection( Arrays.asList( new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L), new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L), new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()), new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L), new UserEvent("mary", "path?checkParam", System.currentTimeMillis()), new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L) )); StreamingFileSink<String> streamingFileSink = StreamingFileSink. <String>forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withMaxPartSize(1024 * 1024 * 1024) .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) //不活跃的间隔时间,用于归档保存使用 .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .build() ).build(); userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink); env.execute(); } }
运行结束后会多出来一些文件