Blink流式计算-Kafka接入demo

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: //定义解析Kakfa message的UDTF CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF'; CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF'; //注意:kafka源表DDL字段必须与以下例子一致 create table my_input (

//定义解析Kakfa message的UDTF

CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF';

CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF';

//注意:kafka源表DDL字段必须与以下例子一致

create table my_input (

messageKey VARBINARY,

message VARBINARY,

topic varchar,

partition int,

offset bigint,

ctTime AS TO_TIMESTAMP (myUdf (message)),

//注意计算里的类型必须为timestamp才能在做watermark。

WATERMARK wk FOR ctTime AS WITHOFFSET (ctTime, 2000) --为rowtime定义watermark

) WITH (

type = 'KAFKA08',

topic = 'myTopic',

group.id = 'mGroup',

extraConfig = 'bootstrap.servers=127.0.0.1:9092',

zookeeper.connect = '127.0.0.1:2181',

startupMode = 'EARLISET'

);

-- 滚动窗口 group by prodId

CREATE VIEW input_view01 (

windowStart,

windowEnd,

prodId,

prodName,

prodNumber

) AS

SELECT

HOP_START (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

HOP_END (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

T.prodId as prodId,

T.prodName as prodName,

count (*) as prodNumber

from

my_input as S,

LATERAL TABLE (myParse (message)) as T (

id,

prodId,

prodName,

createdAt,

updatedAt

)

Group BY HOP (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE), T.prodId, T.prodName;

CREATE VIEW input_view60 (

id,

prodId,

prodName,

createdAt,

updatedAt

) AS

SELECT

T.id,

T.prodId,

T.prodName,

T.createdAt,

T.updatedAt

from

my_input as S,

LATERAL TABLE (myParse (message)) as T (

id,

goCs,

prodId,

prodName,

createdAt,

updatedAt

);

-- 结果print

create table outprint01(

prodId bigint,

prodName varchar,

prodNumber bigint

)with(

type = 'print'

);

insert into outprint01

select prodId , prodName , prodNumber

from input_view01;

-- 结算结果写入Kafka

create table result_kafka (

messageKey VARBINARY,

message VARBINARY,

PRIMARY KEY (messageKey)

) with (

type = 'KAFKA08',

topic = 'myResultTopic',

extraConfig='bootstrap.servers=127.0.0.1:9092',

zookeeper.connect = '127.0.0.1:2181',

startupMode='EARLISET'

);

//此处的结果输出,可以考虑将结果组装成字符串,中间用|隔开,接收方再解析

INSERT INTO

result_kafka

SELECT

cast(prodId as VARBINARY) as messageKey,

cast(prodName as VARBINARY) as message

FROM

input_view01;

MyKafkaUDTF写法:

package com.xxxxxxxx;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.table.functions.TableFunction;

import org.apache.flink.table.types.DataType;

import org.apache.flink.table.types.DataTypes;

import org.apache.flink.types.Row;

import java.io.UnsupportedEncodingException;

import java.sql.Timestamp;

public class MyKafkaUDTF extends TableFunction {

public void eval(byte[] message) {

try {

String msg = new String(message, "UTF-8");

System.out.println("收到的消息:"+msg);

try {

JSONObject jsonObject = JSONObject.parseObject(msg);

if (jsonObject != null) {

//id

Long id = jsonObject.getLong("id");

//prodId

Long prodId = jsonObject.getLong("prodId");

//prodName

String prodName = jsonObject.getString("prodName ");

Long createAt = jsonObject.getLong("createdAt");

Long updatedAt = jsonObject.getLong("updatedAt");

//创建时间时间戳

Timestamp createAtTimeStamp = new Timestamp(createAt);

Timestamp updatedAtTimeStamp = new Timestamp(updatedAt);

Row row = new Row(8);

row.setField(0, id);

row.setField(1, prodId);

row.setField(2, prodName);

row.setField(3, createAtTimeStamp );

row.setField(4, updatedAtTimeStamp );

System.out.println("message str ==>" + row.toString());

collect(row);

}

} catch (Exception e) {

e.printStackTrace();

System.out.println(" error. Input data " + msg + "is not json string");

}

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

@Override

// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型

public DataType getResultType(Object[] arguments, Class[] argTypes) {

return DataTypes.createRowType(

DataTypes.LONG,

DataTypes.LONG,

DataTypes.STRING,

DataTypes.TIMESTAMP,

DataTypes.TIMESTAMP);

}

}

package xxxxxxx;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.table.functions.ScalarFunction;

import java.text.SimpleDateFormat;

import java.util.Date;

public class MyWaterMarkUDTF extends ScalarFunction {

public String eval(byte[] message) {

try {

String msg = new String(message, "UTF-8");

JSONObject data = JSONObject.parseObject(msg);

System.out.println("time:"+data.getString("createdAt"));

Long createAtLong = data.getLong("createdAt");

SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String createTimeStr = parser.format(new Date(createAtLong));

return createTimeStr;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

//可选,close方法可以不写

@Override

public void close() {

}

}

目录
相关文章
ly~
|
11月前
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
948 3
|
10月前
|
存储 缓存 JavaScript
如何优化Node.js应用的内存使用以提高性能?
通过以上多种方法的综合运用,可以有效地优化 Node.js 应用的内存使用,提高性能,提升用户体验。同时,不断关注内存管理的最新技术和最佳实践,持续改进应用的性能表现。
509 62
|
10月前
|
消息中间件 canal 分布式计算
类似apache nifi的产品还有哪些?
【10月更文挑战第23天】类似apache nifi的产品还有哪些?
470 3
|
10月前
|
Java API 持续交付
apache nifi 如何进行二次开发?
【10月更文挑战第23天】apache nifi 如何进行二次开发?
530 2
|
虚拟化 Docker Windows
window 10专业版部署docker环境
本文介绍了如何在Windows 10专业版上部署Docker环境,包括安装步骤、配置镜像加速以及可能遇到的错误处理。
605 3
window 10专业版部署docker环境
|
存储 SQL 人工智能
【云栖实录】Hologres3.0全新升级:一体化实时湖仓平台
2024年云栖大会,Hologres 3.0全新升级为一体化实时湖仓平台,通过统一数据平台实现湖仓存储一体、多模式计算一体、分析服务一体、Data+AI 一体,发布 Dynamic Table、External Database、分时弹性、Query Queue、NL2SQL 等众多新的产品能力,实现一份数据、一份计算、一份服务,极大提高数据开发及应用效率。同时,Hologres 的预付费实例年付折扣再降15%,仅需7折,不断帮助企业降低数据管理成本,赋能业务增长。
最新版 MyBatisPlus 分页插件(直接拿来就可以用)
最新版 MyBatisPlus 分页插件(直接拿来就可以用)
724 0
|
消息中间件 存储 Kafka
【Kafka】Kafka 的日志保留期与数据清理策略
【4月更文挑战第13天】【Kafka】Kafka 的日志保留期与数据清理策略
|
负载均衡 网络架构 CDN
阿里云服务器网络不稳定,可能有以下一些原因
阿里云服务器网络不稳定,可能有以下一些原因
2295 1
|
关系型数据库 MySQL 数据库
2023 年Windows MySql 5.7,MySql 8.0 下载安装教程, 附详细图解,亲测可用
2023 年Windows MySql 5.7,MySql 8.0 下载安装教程, 附详细图解,亲测可用