a 、 Flink采用一种成为任务链的优化技术 可以在指定条件下减少本地通信开销 为了满足任务链的要求 必须将两个或多个算子设为相同的并行度 并通过本地转发的方式连接(local forward) b、 相同并行度one to one操作 Flink这样相连的算子链接在一起形成一个task 原来的算子成为里面的subtask c、 并行度相同、并且是One-to-One操作 两个条件缺一不可 d、 将算子链接成task是非常有效的优化 它能减少线程之间的切换和基于缓存区的数据交换 在减少时延的同时提升吞吐量 链接的行为可以在编程API中进行指定
上面是纯理论 下面实践下 才能对理论理解的更加透彻
Flink 流处理API
Environment
- getExecutionEnvironment
创建一个执行环境 表示当前执行程序的上下文 如果程序是独立调用的 则此方法返回本地执行环境 如果从命令行调用程序以提交到集群 则此方法返回集群的执行环境
- 代码
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment 或 val env = StreamExecutionEnvironment.getExecutionEnvironment a 返回本地执行环境 需要在调用时指定默认的并行度 val env = StreamExecutionEnvironment.createLocalEnvironment(1) b 返回集群环境 将jar包提交到远程服务器 需要在调用时指定 JobManager的IP和端口号 并指定要在集群中运行的jar包 val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
- 并行度
如果没有设置并行度 会以flink-conf.yaml中配置为准 默认是1
Source
- 从集合读取数据
// 定义样例类,传感器 id ,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) object Sensor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env .fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) )) stream1.print("stream1:").setParallelism(1) env.execute() } }
demo程序是用的kafka版本是 kafka-0.11_2.11
目前虚拟机上安装的版本是2.10-0.8.2.1 所以为了跑demo程序 所以安装下kafka-0.11_2.11版本
- 安装包下载路径
https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz
- 唯一的区别
/opt/kafka/版本号/config/server.properties 这个配置文件 配置zk集群的配置项名称 kafka_2.10-0.8.2.1: zookeeper.contact kafka_2.12-0.11.0.1: zookeeper.connect
- 生成一条消息
./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test
- 从文件读取数据
val stream2 = env.readTextFile("YOUR_FILE_PATH")
- 以kafka消息队列的数据作为来源
需要引入kafka连接器的依赖 pom: <dependency> <groupId>org.apache.flink</ groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactI d> <version>1.7.2</version> </dependency> 代码: val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
- 先测试下kafka发送消息 从zk中消费消息是否可以
cd /opt/kafka/kafka_2.10-0.8.2.1/bin 生产消息 ./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test 消费消息 ./kafka-console-consumer.sh --zookeeper 192.168.84.128:2181 --topic test --from-beginning
- 通过Flink Kafka来消费
代码
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/co