01 引言
Flink项目目前是没有实现protobuf格式的,目前Flink相关的开发者正在开发,详情可以参考:https://github.com/maosuhan/flink/tree/feature/flink-pb-format
本文来讲讲如何实现Flink的protobuf。
02 protobuf概述
概念:protobuf (protocol buffer) 是谷歌内部的混合语言数据标准。通过将结构化的数据进行序列化(串行化),用于通讯协议、数据存储等领域和语言无关、平台无关、可扩展的序列化结构数据格式。
protobuf github 地址:https://github.com/protocolbuffers/protobuf
从上图可以看到protobuf很火,因此很有必要去学习它。
2.1 protobuf优缺点
既然知道protbuf很火,那它有什么优势呢?简单概括如下:
性能方面
- 序列化后,数据大小可缩小3倍
- 序列化速度快
- 传输速度快
使用方面
- 使用简单:proto编译器自动进行序列化和反序列化
- 维护成本低:多平台只需要维护一套对象协议文件,即.proto文件
- 可扩展性好:不必破坏旧的数据格式,就能对数据结构进行更新
- 加密性好:http传输内容抓包只能抓到字节数据
使用范围
- 跨平台、跨语言、可扩展性强
既然有优点,那么是否有缺点呢?当然有的,先来看看怎么使用protobuf。
2.2 protobuf使用
首先需要下载protobuf客户端,直接在github仓库下载即可:https://github.com/protocolbuffers/protobuf/releases/tag/v3.20.1
下载完后,我们看看使用方式,示例命令如下:
./protoc --java_out=生成java文件的目录 protobuf文件路径
可以看到使用protoc命令,需要传两个命令,一个是生成java文件的目录(或者cpp文件路径),一个是proto文件的路径。
也就是说,如果使用protbuf进行通信,发送方和接收方必须定义一个protobuf的彼此公用认同的消息模板(类似于实体类),然后双方都要基于这个消息模板(proto模板文件)去生成自己语言的模板。即:
- A端根据proto模板生成c++实体类文件
- A端写入内容到这个实体类,序列化实体类(二进制),然后发送给B端
- B端接收到二进制后,先根据proto模板生成对应的java实体类文件,
- 然后反序列化二进制。
从上述的流程,可以看到消息是通过二进制传输的,速度肯定很快,因为共同定义了protobuf模板,使得不受平台的限制,做到跨语言通信。
那么,有什么缺陷呢?
就是太依赖protobuf程序了,只有使用这个程序才可以生成对应平台语言的实体类文件。目前是无法解决的,protobuf也没有提供工具类去生成(比如:使用工具类把proto消息模板文件转换成java实体类文件)
解决方案就只有把protobuf执行程序顺带打进我们的jar包了。
03 Flink使用protobuf
我们可以把protobuf打成一个format,其主流思路与自定义connector思路大致一样,之前写过博客,可以参考:
- 《Flink自定义Connector》
- 《flink kafka connector源码解读(超详细)》
原理图如下:
3.1 Flink如何使用protobuf
自定义format
的流程不是难点,难点是如何合理优雅的嵌入protobuf
程序到format
?
我们希望连接参数传入proto消息模板,然后直接调用这个format
就可以使用了,如下:
CREATE TABLE table_sink_kafka ( name STRING, id INT ) WITH ( 'connector' = 'kafka', 'format' = 'protobuf', 'protobuf.class-name' = 'org.apache.flink.pb.proto.SimpleTest', 'protobuf.protobuf-tpl' = 'syntax = "proto2"; package org.apache.flink.pb.other; option java_package = "org.apache.flink.pb.proto"; option java_multiple_files = false; message SimpleTest { optional string name = 1; optional int32 id = 2; optional Data data = 3; } message Data { optional int32 uid = 1; optional string username = 2; } ', 'scan.startup.mode' = 'latest-offset' )
其实是可以做到的,说说实现思路。
3.2 Flink实现protobuf思路
思路如下:
- 首先format集成protobuf程序进jar包;
- 根据传入的模板消息定义,当前目录生成proto文件;
- 调用protobuf程序去生成对应的java实体类文件到当前目录;
- 使用类加载器去加载对应的实体类文件并编译加载进入jvm;
- 后续就可以使用这个java实体类对象了。
主要有几个痛点:
- Java如何使用shell命令生成java文件?
- 生成的java文件如何类加载进jvm?
下面我提供下这两个工具类:
shell工具类:
/** * shell工具类 * * @author : YangLinWei * @createTime: 2022/5/13 2:09 下午 * @version: 1.0.0 */ @Slf4j public class ShellUtil { /** * @see #runShellCommandSync(String, String[], Charset, String) */ public static int runShellCommandSync(String baseShellDir, String[] cmd, Charset outputCharset) throws IOException { return runShellCommandSync(baseShellDir, cmd, outputCharset, null); } /** * 真正运行shell命令 * * @param baseShellDir 运行命令所在目录(先切换到该目录后再运行命令) * @param cmd 命令数组 * @param outputCharset 日志输出字符集,一般windows为GBK, linux为utf8 * @param logFilePath 日志输出文件路径, 为空则直接输出到当前应用日志中,否则写入该文件 * @return 进程退出码, 0: 成功, 其他:失败 * @throws IOException 执行异常时抛出 */ public static int runShellCommandSync(String baseShellDir, String[] cmd, Charset outputCharset, String logFilePath) throws IOException { long startTime = System.currentTimeMillis(); boolean needReadProcessOutLogStreamByHand = false; log.info("【cli】receive new Command. baseDir: {}, cmd: {}, logFile:{}", baseShellDir, String.join(" ", cmd), logFilePath); ProcessBuilder pb = new ProcessBuilder(cmd); pb.directory(new File(baseShellDir)); initErrorLogHolder(logFilePath, outputCharset); int exitCode = 0; try { if (logFilePath != null) { ensureFilePathExists(logFilePath); // String redirectLogInfoAndErrCmd = " > " + logFilePath + " 2>&1 "; // cmd = mergeTwoArr(cmd, redirectLogInfoAndErrCmd.split("\\s+")); pb.redirectErrorStream(true); pb.redirectOutput(new File(logFilePath)); needReadProcessOutLogStreamByHand = false; } Process p = pb.start(); if (needReadProcessOutLogStreamByHand) { readProcessOutLogStream(p, outputCharset); } try { p.waitFor(); } catch (InterruptedException e) { log.error("进程被中断", e); setProcessLastError("中断异常:" + e.getMessage()); } finally { exitCode = p.exitValue(); log.info("【cli】process costTime:{}ms, exitCode:{}", System.currentTimeMillis() - startTime, exitCode); } if (exitCode != 0) { throw new RuntimeException( "进程返回异常信息, returnCode:" + exitCode + ", lastError:" + getProcessLastError()); } return exitCode; } finally { removeErrorLogHolder(); } } /** * 使用 Runtime.exec() 运行shell */ public static int runShellWithRuntime(String baseShellDir, String[] cmd, Charset outputCharset) throws IOException { long startTime = System.currentTimeMillis(); initErrorLogHolder(null, outputCharset); Process p = Runtime.getRuntime().exec(cmd, null, new File(baseShellDir)); //readProcessOutLogStream(p, outputCharset); int exitCode; try { p.waitFor(); } catch (InterruptedException e) { log.error("进程被中断", e); setProcessLastError("中断异常:" + e.getMessage()); } catch (Throwable e) { log.error("其他异常", e); setProcessLastError(e.getMessage()); } finally { exitCode = p.exitValue(); log.info("【cli】process costTime:{}ms, exitCode:{}", System.currentTimeMillis() - startTime, exitCode); } if (exitCode != 0) { throw new RuntimeException("进程返回异常信息, returnCode:" + exitCode + ", lastError:" + getProcessLastError()); } return exitCode; } /** * 确保文件夹存在 * * @param filePath 文件路径 * @throws IOException 创建文件夹异常抛出 */ public static void ensureFilePathExists(String filePath) throws IOException { File path = new File(filePath); if (path.exists()) { return; } File p = path.getParentFile(); if (p.mkdirs()) { log.info("为文件创建目录: {} 成功", p.getPath()); return; } log.warn("创建目录:{} 失败", p.getPath()); } /** * 合并两个数组数据 * * @param arrFirst 左边数组 * @param arrAppend 要添加的数组 * @return 合并后的数组 */ public static String[] mergeTwoArr(String[] arrFirst, String[] arrAppend) { String[] merged = new String[arrFirst.length + arrAppend.length]; System.arraycopy(arrFirst, 0, merged, 0, arrFirst.length); System.arraycopy(arrAppend, 0, merged, arrFirst.length, arrAppend.length); return merged; } /** * 删除以某字符结尾的字符 * * @param originalStr 原始字符 * @param toTrimChar 要检测的字 * @return 裁剪后的字符串 */ public static String trimEndsWith(String originalStr, char toTrimChar) { char[] value = originalStr.toCharArray(); int i = value.length - 1; while (i > 0 && value[i] == toTrimChar) { i--; } return new String(value, 0, i + 1); } /** * 错误日志读取线程池(不设上限) */ private static final ExecutorService errReadThreadPool = Executors.newCachedThreadPool( new NamedThreadFactory("ReadProcessErrOut")); /** * 最后一次异常信息 */ private static final Map<Thread, ProcessErrorLogDescriptor> lastErrorHolder = new ConcurrentHashMap<>(); /** * 主动读取进程的标准输出信息日志 * * @param process 进程实体 * @param outputCharset 日志字符集 * @throws IOException 读取异常时抛出 */ private static void readProcessOutLogStream(Process process, Charset outputCharset) throws IOException { try (BufferedReader stdInput = new BufferedReader(new InputStreamReader( process.getInputStream(), outputCharset))) { Thread parentThread = Thread.currentThread(); // 另起一个线程读取错误消息,必须先启该线程 errReadThreadPool.submit(() -> { try { try (BufferedReader stdError = new BufferedReader( new InputStreamReader(process.getErrorStream(), outputCharset))) { String err; while ((err = stdError.readLine()) != null) { log.error("【cli】{}", err); setProcessLastError(parentThread, err); } } } catch (IOException e) { log.error("读取进程错误日志输出时发生了异常", e); setProcessLastError(parentThread, e.getMessage()); } }); // 外部线程读取标准输出消息 String stdOut; while ((stdOut = stdInput.readLine()) != null) { log.info("【cli】{}", stdOut); } } } /** * 新建一个进程错误信息容器 * * @param logFilePath 日志文件路径,如无则为 null */ private static void initErrorLogHolder(String logFilePath, Charset outputCharset) { lastErrorHolder.put(Thread.currentThread(), new ProcessErrorLogDescriptor(logFilePath, outputCharset)); } /** * 移除错误日志监听 */ private static void removeErrorLogHolder() { lastErrorHolder.remove(Thread.currentThread()); } /** * 获取进程的最后错误信息 * <p> * 注意: 该方法只会在父线程中调用 */ private static String getProcessLastError() { Thread thread = Thread.currentThread(); return lastErrorHolder.get(thread).getLastError(); } /** * 设置最后一个错误信息描述 * <p> * 使用当前线程或自定义 */ private static void setProcessLastError(String lastError) { lastErrorHolder.get(Thread.currentThread()).setLastError(lastError); } private static void setProcessLastError(Thread thread, String lastError) { lastErrorHolder.get(thread).setLastError(lastError); } /** * 判断当前系统是否是 windows */ public static boolean isWinOs() { return System.getProperty("os.name").toLowerCase() .startsWith("win"); } /** * 判断当前系统是否是 Mac */ public static boolean isMacOs() { return System.getProperty("os.name").toLowerCase() .startsWith("mac"); } public static String getSystemType() { return System.getProperty("os.name"); } /** * 进程错误信息描述封装类 */ private static class ProcessErrorLogDescriptor { /** * 错误信息记录文件 */ private String logFile; /** * 最后一行错误信息 */ private String lastError; private Charset charset; ProcessErrorLogDescriptor(String logFile, Charset outputCharset) { this.logFile = logFile; charset = outputCharset; } String getLastError() { if (lastError != null) { return lastError; } try { if (logFile == null) { return null; } List<String> lines = FileUtils.readLines( new File(logFile), charset); StringBuilder sb = new StringBuilder(); for (int i = lines.size() - 1; i >= 0; i--) { sb.insert(0, lines.get(i) + "\n"); if (sb.length() > 200) { break; } } return sb.toString(); } catch (Exception e) { log.error("【cli】读取最后一次错误信息失败", e); } return null; } void setLastError(String err) { if (lastError == null) { lastError = err; return; } lastError = lastError + "\n" + err; if (lastError.length() > 200) { lastError = lastError.substring(lastError.length() - 200); } } } public static void main(String[] args) throws IOException, ClassNotFoundException { String messageClassName = "org.apache.flink.pb.proto.SimpleTestOuterClass"; String meesagePbTpl = "syntax = \"proto2\";\n" + "package org.apache.flink.pb.other;\n" + "option java_package = \"org.apache.flink.pb.proto\";\n" + "option java_multiple_files = false;\n" + "\n" + "message SimpleTest {\n" + " optional int32 a = 1;\n" + " optional int64 b = 2;\n" + " optional bool c = 3;\n" + " optional float d = 4;\n" + " optional double e = 5;\n" + " optional string f = 6;\n" + " optional bytes g = 7;\n" + " optional Corpus h = 8;\n" + "\n" + " enum Corpus {\n" + " UNIVERSAL = 0;\n" + " WEB = 1;\n" + " IMAGES = 2;\n" + " LOCAL = 3;\n" + " NEWS = 4;\n" + " PRODUCTS = 5;\n" + " VIDEO = 7;\n" + " }\n" + "\n" + "}\n" + "\n" + "\n"; String resPath = ShellUtil.class.getClassLoader().getResource("").getPath(); String baseDir = resPath + "/temp"; String protoFileName = StrUtil.humpToUnderline(StrUtil.getSplitLast(messageClassName, "\\.")) + ".proto"; String fileOutPath = String.format("%s/%s" , baseDir , StringUtils.join(StrUtil.splitExpectLast(messageClassName, "\\."), "/")); String protoFilePath = String.format("%s/%s", fileOutPath, protoFileName); FileUtils.write(new File(protoFilePath), meesagePbTpl, Charset.forName("UTF-8")); String[] protocCmd = new String[]{ "protoc", "--java_out=" + baseDir, "--proto_path=" + fileOutPath, protoFileName }; String exePath = resPath + "/protobuf/linux/bin"; if (isMacOs()) { exePath = resPath + "/protobuf/osx/bin"; } //InputStream protocIS = ShellUtil.class.getClassLoader().getResourceAsStream("/protobuf/osx/bin"); int exitCode = runShellCommandSync(exePath, protocCmd, Charset.forName("UTF-8")); Class<?> pbEntity = JarLoader.compile(messageClassName, FileUtils.readFileToString( new File(fileOutPath + "/" + StrUtil.getSplitLast(messageClassName, "\\.") + ".java"), Charset.forName("UTF-8")) ); System.out.println("-->" + pbEntity); } }
类加载器代码:
/** * 类加载器 * * @author : YangLinWei * @createTime: 2022/5/13 3:33 下午 * @version: 1.0.0 */ public class JarLoader extends URLClassLoader { public JarLoader(String[] paths) { this(paths, JarLoader.class.getClassLoader()); } public JarLoader(String[] paths, ClassLoader parent) { super(getURLs(paths), parent); } private static URL[] getURLs(String[] paths) { Validate.isTrue(null != paths && 0 != paths.length, "jar包路径不能为空."); List<String> dirs = new ArrayList<String>(); for (String path : paths) { dirs.add(path); JarLoader.collectDirs(path, dirs); } List<URL> urls = new ArrayList<URL>(); for (String path : dirs) { urls.addAll(doGetURLs(path)); } return urls.toArray(new URL[0]); } private static void collectDirs(String path, List<String> collector) { if (null == path || StringUtils.isBlank(path)) { return; } File current = new File(path); if (!current.exists() || !current.isDirectory()) { return; } for (File child : current.listFiles()) { if (!child.isDirectory()) { continue; } collector.add(child.getAbsolutePath()); collectDirs(child.getAbsolutePath(), collector); } } private static List<URL> doGetURLs(final String path) { Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空."); File jarPath = new File(path); Validate.isTrue(jarPath.exists() && jarPath.isDirectory(), "jar包路径必须存在且为目录."); /* set filter */ FileFilter jarFilter = new FileFilter() { @Override public boolean accept(File pathname) { return pathname.getName().endsWith(".jar"); } }; /* iterate all jar */ File[] allJars = new File(path).listFiles(jarFilter); List<URL> jarURLs = new ArrayList<URL>(allJars.length); for (int i = 0; i < allJars.length; i++) { try { jarURLs.add(allJars[i].toURI().toURL()); } catch (Exception e) { throw new RuntimeException("系统加载jar包出错", e); } } return jarURLs; } /** * 装载字符串成为java可执行文件 * * @param className className * @param javaCodes javaCodes * @return Class */ public static Class<?> compile(String className, String javaCodes) { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null); StrSrcJavaObject srcObject = new StrSrcJavaObject(className, javaCodes); Iterable<? extends JavaFileObject> fileObjects = Arrays.asList(srcObject); String flag = "-d"; String outDir = ""; try { File classPath = new File(Thread.currentThread().getContextClassLoader().getResource("").toURI()); outDir = classPath.getAbsolutePath() + File.separator; } catch (URISyntaxException e1) { e1.printStackTrace(); } Iterable<String> options = Arrays.asList(flag, outDir); JavaCompiler.CompilationTask task = compiler.getTask(null, fileManager, null, options, null, fileObjects); boolean result = task.call(); if (result == true) { try { return Class.forName(className); } catch (ClassNotFoundException e) { e.printStackTrace(); } } return null; } private static class StrSrcJavaObject extends SimpleJavaFileObject { private String content; public StrSrcJavaObject(String name, String content) { super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE); this.content = content; } public CharSequence getCharContent(boolean ignoreEncodingErrors) { return content; } } }
ok,以上两个工具类解决了我们以上痛点了。
3.3 可能会出现的问题
当我们正式部署上服务器可能会出现一些问题:
- 为何读取不了jar包里面的程序,特别是Flink on Yarn的模式;
- 为何加载后的类会丢失(Flink也有自己的类加载器FlinkUserClassLoader)
这些问题大家可以私聊我,本人已经实现了Flink使用protobuf 格式的插件了,已上线使用。
04 文末
本文主要讲解了Protobuf的一些概念,以及如何在Flink里使用Protobuf,希望能帮助到大家,谢谢大家的阅读!