MySQL Sink
Flink处理结果写入到MySQL中,这并不是Flink默认支持的,需要添加MySQL的驱动依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
因为不是内嵌支持的,所以需要基于RichSinkFunction自定义sink。
代码例子:消费kafka中数据,统计各个卡口的流量,并且存入到MySQL中
注意点:需要去重,操作MySQL需要幂等性
import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object MySQLSink { case class CarInfo(monitorId: String, carId: String, eventTime: String, Speed: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置连接kafka的配置信息 val props = new Properties() //注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据) props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") props.setProperty("group.id", "flink-kafka-001") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) //第一个参数 : 消费的topic名 val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { //什么时候停止,停止条件是什么 override def isEndOfStream(t: (String, String)): Boolean = false //要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定一下返回的数据类型 Flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1) .reduce(new ReduceFunction[(String, Int)] { //t1:上次聚合完的结果 t2:当前的数据 override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } }).addSink(new MySQLCustomSink) env.execute() } //幂等性写入外部数据库MySQL class MySQLCustomSink extends RichSinkFunction[(String, Int)] { var conn: Connection = _ var insertPst: PreparedStatement = _ var updatePst: PreparedStatement = _ //每来一个元素都会调用一次 override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { println(value) updatePst.setInt(1, value._2) updatePst.setString(2, value._1) updatePst.execute() println(updatePst.getUpdateCount) if(updatePst.getUpdateCount == 0){ println("insert") insertPst.setString(1, value._1) insertPst.setInt(2, value._2) insertPst.execute() } } //thread初始化的时候执行一次 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123123") insertPst = conn.prepareStatement("INSERT INTO car_flow(monitorId,count) VALUES(?,?)") updatePst = conn.prepareStatement("UPDATE car_flow SET count = ? WHERE monitorId = ?") } //thread关闭的时候 执行一次 override def close(): Unit = { insertPst.close() updatePst.close() conn.close() } } }
Socket Sink
Flink处理结果发送到套接字(Socket),基于RichSinkFunction自定义sink:
import java.io.PrintStream import java.net.{InetAddress, Socket} import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer //sink 到 套接字 socket object SocketSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置连接kafka的配置信息 val props = new Properties() //注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据) props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") props.setProperty("group.id", "flink-kafka-001") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) //第一个参数 : 消费的topic名 val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { //什么时候停止,停止条件是什么 override def isEndOfStream(t: (String, String)): Boolean = false //要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定一下返回的数据类型 Flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1) .reduce(new ReduceFunction[(String, Int)] { //t1:上次聚合完的结果 t2:当前的数据 override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } }).addSink(new SocketCustomSink("node01",8888)) env.execute() } class SocketCustomSink(host:String,port:Int) extends RichSinkFunction[(String,Int)]{ var socket: Socket = _ var writer:PrintStream = _ override def open(parameters: Configuration): Unit = { socket = new Socket(InetAddress.getByName(host), port) writer = new PrintStream(socket.getOutputStream) } override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { writer.println(value._1 + "\t" +value._2) writer.flush() } override def close(): Unit = { writer.close() socket.close() } } }
File Sink
Flink处理的结果保存到文件,这种使用方式不是很常见
支持分桶写入,每一个桶就是一个目录,默认每隔一个小时会产生一个分桶,每个桶下面会存储每一个Thread的处理结果,可以设置一些文件滚动的策略(文件打开、文件大小等),防止出现大量的小文件。
Flink默认支持,导入连接文件的连接器依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.9.2</version> </dependency>
import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FileSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置连接kafka的配置信息 val props = new Properties() //注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据) props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") props.setProperty("group.id", "flink-kafka-001") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) //第一个参数 : 消费的topic名 val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { //什么时候停止,停止条件是什么 override def isEndOfStream(t: (String, String)): Boolean = false //要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定一下返回的数据类型 Flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) val restStream = stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1) .reduce(new ReduceFunction[(String, Int)] { //t1:上次聚合完的结果 t2:当前的数据 override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } }).map(x=>x._1 + "\t" + x._2) //设置文件滚动策略 val rolling:DefaultRollingPolicy[String,String] = DefaultRollingPolicy.create() //当文件超过2s没有写入新数据,则滚动产生一个小文件 .withInactivityInterval(2000) //文件打开时间超过2s 则滚动产生一个小文件 每隔2s产生一个小文件 .withRolloverInterval(2000) //当文件大小超过256 则滚动产生一个小文件 .withMaxPartSize(256*1024*1024) .build() /** * 默认: * 每一个小时对应一个桶(文件夹),每一个thread处理的结果对应桶下面的一个小文件 * 当小文件大小超过128M或者小文件打开时间超过60s,滚动产生第二个小文件 */ val sink: StreamingFileSink[String] = StreamingFileSink.forRowFormat( new Path("d:/data/rests"), new SimpleStringEncoder[String]("UTF-8")) .withBucketCheckInterval(1000) .withRollingPolicy(rolling) .build() // val sink = StreamingFileSink.forBulkFormat( // new Path("./data/rest"), // ParquetAvroWriters.forSpecificRecord(classOf[String]) // ).build() restStream.addSink(sink) env.execute() } }
HBase Sink
计算结果写入sink 两种实现方式:
- map算子写入,频繁创建hbase连接。
- process写入,适合批量写入hbase。
导入HBase依赖包
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency>
读取kafka数据,统计卡口流量保存至HBase数据库中
- HBase中创建对应的表
create 'car_flow',{NAME => 'count', VERSIONS => 1}
- 实现代码
import java.util.{Date, Properties} import com.msb.stream.util.{DateUtils, HBaseUtil} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.common.serialization.StringSerializer object HBaseSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置连接kafka的配置信息 val props = new Properties() //注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据) props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") props.setProperty("group.id", "flink-kafka-001") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props)) stream.map(row => { val arr = row.split("\t") (arr(0), 1) }).keyBy(_._1) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }).process(new ProcessFunction[(String, Int), (String, Int)] { var htab: HTable = _ override def open(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181") val hbaseName = "car_flow" htab = new HTable(conf, hbaseName) } override def close(): Unit = { htab.close() } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { // rowkey:monitorid 时间戳(分钟) value:车流量 val min = DateUtils.getMin(new Date()) val put = new Put(Bytes.toBytes(value._1)) put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2)) htab.put(put) } }) env.execute() } }