一、需求分析
1.维度信息,关联mysql数据库查询
2.数据为以下格式:
user001,A1,2020-09-23 10:10:10,2,北京市
user002,A3,2020-09-23 10:10:10,1,上海市
user003,A2,2020-09-23 10:10:10,2,苏州市
user002,A3,2020-09-23 10:10:10,1,辽宁市
user001,A2,2020-09-23 10:10:10,2,北京市
user002,A2,2020-09-23 10:10:10,1,上海市
user003,A1,2020-09-23 10:10:10,1,北京市
经过实时计算Flink处理后变成了如下格式
user001,新人礼物,2020-09-23 10:10:10,2,北京市
user002,年终礼物,2020-09-23 10:10:10,1,上海市
user003,月末礼物,2020-09-23 10:10:10,2,苏州市
二、环境要求
1.zookeeper
2.kafka
3.创建topic
bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-f
三、flink程序
3.1 Activity实体类
public class ActivityBean { public String uid; public String aid; public String activityName; public String time; public int eventType; public String province; public ActivityBean() { } public ActivityBean(String uid, String aid, String activityName, String time, int eventType, String province) { this.uid = uid; this.aid = aid; this.activityName = activityName; this.time = time; this.eventType = eventType; this.province = province; } @Override public String toString() { return "ActivityBean{" + "uid='" + uid + '\'' + ", aid='" + aid + '\'' + ", activityName='" + activityName + '\'' + ", time='" + time + '\'' + ", eventType=" + eventType + ", province='" + province + '\'' + '}'; } public static ActivityBean of(String uid,String aid,String activityName,String time,int eventType,String province){ return new ActivityBean(uid,aid,activityName,time,eventType,province); } }
3.2 flink实时计算程序
public class ActivityCount { public static void main(String[] args) throws Exception { //1.获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.kafka配置 String topic = "activity"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定 prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset", "earliest"); prop.setProperty("group.id", "consumer1"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop); //3.获取数据 DataStream<String> lines = env.addSource(myConsumer); SingleOutputStreamOperator<ActivityBean> beans = lines.map(new RichMapFunction<String, ActivityBean>() { private Connection connection = null; // 4.连接数据库 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink?characterEncoding=UTF-8","root","123456"); } @Override public ActivityBean map(String line) throws Exception { String[] fields = line.split(","); String uid = fields[0]; String aid = fields[1]; // 5.查询条件为aid 活动标号,查出活动的名称 PreparedStatement preparedStatement = connection.prepareStatement("select name from activities where id = ?"); preparedStatement.setString(1, aid); ResultSet resultSet = preparedStatement.executeQuery(); String name = null; while (resultSet.next()) { name = resultSet.getString(1); } preparedStatement.close(); String time = fields[2]; int eventType = Integer.parseInt(fields[3]); String province = fields[4]; return ActivityBean.of(uid, aid, name, time, eventType, province); } // 6.关闭数据库连接 @Override public void close() throws Exception { super.close(); connection.close(); } }); beans.print(); //7.执行 env.execute("StreamingActivity"); } }
四、测试
1.开启kafka生产者
bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,192.168.52.201:9092,19
2.运行flink程序
3.运行结果: