flink实现protobuf format(超详细)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: flink实现protobuf format(超详细)

01 引言

Flink项目目前是没有实现protobuf格式的,目前Flink相关的开发者正在开发,详情可以参考:https://github.com/maosuhan/flink/tree/feature/flink-pb-format

本文来讲讲如何实现Flinkprotobuf

02 protobuf概述

概念protobuf (protocol buffer) 是谷歌内部的混合语言数据标准。通过将结构化的数据进行序列化(串行化),用于通讯协议、数据存储等领域和语言无关、平台无关、可扩展的序列化结构数据格式。

protobuf github 地址https://github.com/protocolbuffers/protobuf

从上图可以看到protobuf很火,因此很有必要去学习它。

2.1 protobuf优缺点

既然知道protbuf很火,那它有什么优势呢?简单概括如下:

性能方面

  • 序列化后,数据大小可缩小3倍
  • 序列化速度快
  • 传输速度快

使用方面

  • 使用简单:proto编译器自动进行序列化和反序列化
  • 维护成本低:多平台只需要维护一套对象协议文件,即.proto文件
  • 可扩展性好:不必破坏旧的数据格式,就能对数据结构进行更新
  • 加密性好:http传输内容抓包只能抓到字节数据

使用范围

  • 跨平台、跨语言、可扩展性强

既然有优点,那么是否有缺点呢?当然有的,先来看看怎么使用protobuf

2.2 protobuf使用

首先需要下载protobuf客户端,直接在github仓库下载即可:https://github.com/protocolbuffers/protobuf/releases/tag/v3.20.1

下载完后,我们看看使用方式,示例命令如下:

./protoc  --java_out=生成java文件的目录 protobuf文件路径
• 1

可以看到使用protoc命令,需要传两个命令,一个是生成java文件的目录(或者cpp文件路径),一个是proto文件的路径。

也就是说,如果使用protbuf进行通信,发送方和接收方必须定义一个protobuf的彼此公用认同的消息模板(类似于实体类),然后双方都要基于这个消息模板(proto模板文件)去生成自己语言的模板。即:

  • A端根据proto模板生成c++实体类文件
  • A端写入内容到这个实体类,序列化实体类(二进制),然后发送给B端
  • B端接收到二进制后,先根据proto模板生成对应的java实体类文件,
  • 然后反序列化二进制。

从上述的流程,可以看到消息是通过二进制传输的,速度肯定很快,因为共同定义了protobuf模板,使得不受平台的限制,做到跨语言通信。

那么,有什么缺陷呢?

就是太依赖protobuf程序了,只有使用这个程序才可以生成对应平台语言的实体类文件目前是无法解决的,protobuf也没有提供工具类去生成(比如:使用工具类把proto消息模板文件转换成java实体类文件)

解决方案就只有把protobuf执行程序顺带打进我们的jar包了。

03 Flink使用protobuf

我们可以把protobuf打成一个format,其主流思路与自定义connector思路大致一样,之前写过博客,可以参考:

原理图如下:

3.1 Flink如何使用protobuf

自定义format的流程不是难点,难点是如何合理优雅的嵌入protobuf程序到format

我们希望连接参数传入proto消息模板,然后直接调用这个format就可以使用了,如下:

CREATE TABLE table_sink_kafka (
      name STRING,
        id INT
)
WITH (
    'connector' = 'kafka',
    'format' = 'protobuf',
    'protobuf.class-name' = 'org.apache.flink.pb.proto.SimpleTest',
    'protobuf.protobuf-tpl' = 'syntax = "proto2";
    package org.apache.flink.pb.other;
    option java_package = "org.apache.flink.pb.proto";
    option java_multiple_files = false;
    message SimpleTest {
        optional string name = 1;
        optional int32 id = 2;
        optional Data data = 3;
    }
    message Data {
               optional int32 uid = 1;
               optional string username = 2;
           }
  ',
    'scan.startup.mode' = 'latest-offset'
)

其实是可以做到的,说说实现思路。

3.2 Flink实现protobuf思路

思路如下:

  • 首先format集成protobuf程序进jar包;
  • 根据传入的模板消息定义,当前目录生成proto文件;
  • 调用protobuf程序去生成对应的java实体类文件到当前目录;
  • 使用类加载器去加载对应的实体类文件并编译加载进入jvm
  • 后续就可以使用这个java实体类对象了。

主要有几个痛点:

  • Java如何使用shell命令生成java文件?
  • 生成的java文件如何类加载进jvm

下面我提供下这两个工具类:

shell工具类:

/**
 * shell工具类
 *
 * @author : YangLinWei
 * @createTime: 2022/5/13 2:09 下午
 * @version: 1.0.0
 */
@Slf4j
public class ShellUtil {
    /**
     * @see #runShellCommandSync(String, String[], Charset, String)
     */
    public static int runShellCommandSync(String baseShellDir, String[] cmd,
                                          Charset outputCharset) throws IOException {
        return runShellCommandSync(baseShellDir, cmd, outputCharset, null);
    }
    /**
     * 真正运行shell命令
     *
     * @param baseShellDir  运行命令所在目录(先切换到该目录后再运行命令)
     * @param cmd           命令数组
     * @param outputCharset 日志输出字符集,一般windows为GBK, linux为utf8
     * @param logFilePath   日志输出文件路径, 为空则直接输出到当前应用日志中,否则写入该文件
     * @return 进程退出码, 0: 成功, 其他:失败
     * @throws IOException 执行异常时抛出
     */
    public static int runShellCommandSync(String baseShellDir, String[] cmd,
                                          Charset outputCharset, String logFilePath)
            throws IOException {
        long startTime = System.currentTimeMillis();
        boolean needReadProcessOutLogStreamByHand = false;
        log.info("【cli】receive new Command. baseDir: {}, cmd: {}, logFile:{}",
                baseShellDir, String.join(" ", cmd), logFilePath);
        ProcessBuilder pb = new ProcessBuilder(cmd);
        pb.directory(new File(baseShellDir));
        initErrorLogHolder(logFilePath, outputCharset);
        int exitCode = 0;
        try {
            if (logFilePath != null) {
                ensureFilePathExists(logFilePath);
//            String redirectLogInfoAndErrCmd = " > " + logFilePath + " 2>&1 ";
//            cmd = mergeTwoArr(cmd, redirectLogInfoAndErrCmd.split("\\s+"));
                pb.redirectErrorStream(true);
                pb.redirectOutput(new File(logFilePath));
                needReadProcessOutLogStreamByHand = false;
            }
            Process p = pb.start();
            if (needReadProcessOutLogStreamByHand) {
                readProcessOutLogStream(p, outputCharset);
            }
            try {
                p.waitFor();
            } catch (InterruptedException e) {
                log.error("进程被中断", e);
                setProcessLastError("中断异常:" + e.getMessage());
            } finally {
                exitCode = p.exitValue();
                log.info("【cli】process costTime:{}ms, exitCode:{}",
                        System.currentTimeMillis() - startTime, exitCode);
            }
            if (exitCode != 0) {
                throw new RuntimeException(
                        "进程返回异常信息, returnCode:" + exitCode
                                + ", lastError:" + getProcessLastError());
            }
            return exitCode;
        } finally {
            removeErrorLogHolder();
        }
    }
    /**
     * 使用 Runtime.exec() 运行shell
     */
    public static int runShellWithRuntime(String baseShellDir,
                                          String[] cmd,
                                          Charset outputCharset) throws IOException {
        long startTime = System.currentTimeMillis();
        initErrorLogHolder(null, outputCharset);
        Process p = Runtime.getRuntime().exec(cmd, null, new File(baseShellDir));
        //readProcessOutLogStream(p, outputCharset);
        int exitCode;
        try {
            p.waitFor();
        } catch (InterruptedException e) {
            log.error("进程被中断", e);
            setProcessLastError("中断异常:" + e.getMessage());
        } catch (Throwable e) {
            log.error("其他异常", e);
            setProcessLastError(e.getMessage());
        } finally {
            exitCode = p.exitValue();
            log.info("【cli】process costTime:{}ms, exitCode:{}",
                    System.currentTimeMillis() - startTime, exitCode);
        }
        if (exitCode != 0) {
            throw new RuntimeException("进程返回异常信息, returnCode:" + exitCode
                    + ", lastError:" + getProcessLastError());
        }
        return exitCode;
    }
    /**
     * 确保文件夹存在
     *
     * @param filePath 文件路径
     * @throws IOException 创建文件夹异常抛出
     */
    public static void ensureFilePathExists(String filePath) throws IOException {
        File path = new File(filePath);
        if (path.exists()) {
            return;
        }
        File p = path.getParentFile();
        if (p.mkdirs()) {
            log.info("为文件创建目录: {} 成功", p.getPath());
            return;
        }
        log.warn("创建目录:{} 失败", p.getPath());
    }
    /**
     * 合并两个数组数据
     *
     * @param arrFirst  左边数组
     * @param arrAppend 要添加的数组
     * @return 合并后的数组
     */
    public static String[] mergeTwoArr(String[] arrFirst, String[] arrAppend) {
        String[] merged = new String[arrFirst.length + arrAppend.length];
        System.arraycopy(arrFirst, 0,
                merged, 0, arrFirst.length);
        System.arraycopy(arrAppend, 0,
                merged, arrFirst.length, arrAppend.length);
        return merged;
    }
    /**
     * 删除以某字符结尾的字符
     *
     * @param originalStr 原始字符
     * @param toTrimChar  要检测的字
     * @return 裁剪后的字符串
     */
    public static String trimEndsWith(String originalStr, char toTrimChar) {
        char[] value = originalStr.toCharArray();
        int i = value.length - 1;
        while (i > 0 && value[i] == toTrimChar) {
            i--;
        }
        return new String(value, 0, i + 1);
    }
    /**
     * 错误日志读取线程池(不设上限)
     */
    private static final ExecutorService errReadThreadPool = Executors.newCachedThreadPool(
            new NamedThreadFactory("ReadProcessErrOut"));
    /**
     * 最后一次异常信息
     */
    private static final Map<Thread, ProcessErrorLogDescriptor>
            lastErrorHolder = new ConcurrentHashMap<>();
    /**
     * 主动读取进程的标准输出信息日志
     *
     * @param process       进程实体
     * @param outputCharset 日志字符集
     * @throws IOException 读取异常时抛出
     */
    private static void readProcessOutLogStream(Process process,
                                                Charset outputCharset) throws IOException {
        try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(
                process.getInputStream(), outputCharset))) {
            Thread parentThread = Thread.currentThread();
            // 另起一个线程读取错误消息,必须先启该线程
            errReadThreadPool.submit(() -> {
                try {
                    try (BufferedReader stdError = new BufferedReader(
                            new InputStreamReader(process.getErrorStream(), outputCharset))) {
                        String err;
                        while ((err = stdError.readLine()) != null) {
                            log.error("【cli】{}", err);
                            setProcessLastError(parentThread, err);
                        }
                    }
                } catch (IOException e) {
                    log.error("读取进程错误日志输出时发生了异常", e);
                    setProcessLastError(parentThread, e.getMessage());
                }
            });
            // 外部线程读取标准输出消息
            String stdOut;
            while ((stdOut = stdInput.readLine()) != null) {
                log.info("【cli】{}", stdOut);
            }
        }
    }
    /**
     * 新建一个进程错误信息容器
     *
     * @param logFilePath 日志文件路径,如无则为 null
     */
    private static void initErrorLogHolder(String logFilePath, Charset outputCharset) {
        lastErrorHolder.put(Thread.currentThread(),
                new ProcessErrorLogDescriptor(logFilePath, outputCharset));
    }
    /**
     * 移除错误日志监听
     */
    private static void removeErrorLogHolder() {
        lastErrorHolder.remove(Thread.currentThread());
    }
    /**
     * 获取进程的最后错误信息
     * <p>
     * 注意: 该方法只会在父线程中调用
     */
    private static String getProcessLastError() {
        Thread thread = Thread.currentThread();
        return lastErrorHolder.get(thread).getLastError();
    }
    /**
     * 设置最后一个错误信息描述
     * <p>
     * 使用当前线程或自定义
     */
    private static void setProcessLastError(String lastError) {
        lastErrorHolder.get(Thread.currentThread()).setLastError(lastError);
    }
    private static void setProcessLastError(Thread thread, String lastError) {
        lastErrorHolder.get(thread).setLastError(lastError);
    }
    /**
     * 判断当前系统是否是 windows
     */
    public static boolean isWinOs() {
        return System.getProperty("os.name").toLowerCase()
                .startsWith("win");
    }
    /**
     * 判断当前系统是否是 Mac
     */
    public static boolean isMacOs() {
        return System.getProperty("os.name").toLowerCase()
                .startsWith("mac");
    }
    public static String getSystemType() {
        return System.getProperty("os.name");
    }
    /**
     * 进程错误信息描述封装类
     */
    private static class ProcessErrorLogDescriptor {
        /**
         * 错误信息记录文件
         */
        private String logFile;
        /**
         * 最后一行错误信息
         */
        private String lastError;
        private Charset charset;
        ProcessErrorLogDescriptor(String logFile, Charset outputCharset) {
            this.logFile = logFile;
            charset = outputCharset;
        }
        String getLastError() {
            if (lastError != null) {
                return lastError;
            }
            try {
                if (logFile == null) {
                    return null;
                }
                List<String> lines = FileUtils.readLines(
                        new File(logFile), charset);
                StringBuilder sb = new StringBuilder();
                for (int i = lines.size() - 1; i >= 0; i--) {
                    sb.insert(0, lines.get(i) + "\n");
                    if (sb.length() > 200) {
                        break;
                    }
                }
                return sb.toString();
            } catch (Exception e) {
                log.error("【cli】读取最后一次错误信息失败", e);
            }
            return null;
        }
        void setLastError(String err) {
            if (lastError == null) {
                lastError = err;
                return;
            }
            lastError = lastError + "\n" + err;
            if (lastError.length() > 200) {
                lastError = lastError.substring(lastError.length() - 200);
            }
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        String messageClassName = "org.apache.flink.pb.proto.SimpleTestOuterClass";
        String meesagePbTpl = "syntax = \"proto2\";\n" +
                "package org.apache.flink.pb.other;\n" +
                "option java_package = \"org.apache.flink.pb.proto\";\n" +
                "option java_multiple_files = false;\n" +
                "\n" +
                "message SimpleTest {\n" +
                "    optional int32 a = 1;\n" +
                "    optional int64 b = 2;\n" +
                "    optional bool c = 3;\n" +
                "    optional float d = 4;\n" +
                "    optional double e = 5;\n" +
                "    optional string f = 6;\n" +
                "    optional bytes g = 7;\n" +
                "    optional Corpus h = 8;\n" +
                "\n" +
                "    enum Corpus {\n" +
                "        UNIVERSAL = 0;\n" +
                "        WEB = 1;\n" +
                "        IMAGES = 2;\n" +
                "        LOCAL = 3;\n" +
                "        NEWS = 4;\n" +
                "        PRODUCTS = 5;\n" +
                "        VIDEO = 7;\n" +
                "      }\n" +
                "\n" +
                "}\n" +
                "\n" +
                "\n";
        String resPath = ShellUtil.class.getClassLoader().getResource("").getPath();
        String baseDir = resPath + "/temp";
        String protoFileName = StrUtil.humpToUnderline(StrUtil.getSplitLast(messageClassName, "\\.")) + ".proto";
        String fileOutPath = String.format("%s/%s"
                , baseDir
                , StringUtils.join(StrUtil.splitExpectLast(messageClassName, "\\."), "/"));
        String protoFilePath = String.format("%s/%s", fileOutPath, protoFileName);
        FileUtils.write(new File(protoFilePath), meesagePbTpl, Charset.forName("UTF-8"));
        String[] protocCmd = new String[]{
                "protoc",
                "--java_out=" + baseDir,
                "--proto_path=" + fileOutPath,
                protoFileName
        };
        String exePath = resPath + "/protobuf/linux/bin";
        if (isMacOs()) {
            exePath = resPath + "/protobuf/osx/bin";
        }
        //InputStream protocIS = ShellUtil.class.getClassLoader().getResourceAsStream("/protobuf/osx/bin");
        int exitCode = runShellCommandSync(exePath, protocCmd, Charset.forName("UTF-8"));
        Class<?> pbEntity = JarLoader.compile(messageClassName, FileUtils.readFileToString(
                new File(fileOutPath + "/" + StrUtil.getSplitLast(messageClassName, "\\.") + ".java"), Charset.forName("UTF-8"))
        );
        System.out.println("-->" + pbEntity);
    }
}

类加载器代码:

/**
 * 类加载器
 *
 * @author : YangLinWei
 * @createTime: 2022/5/13 3:33 下午
 * @version: 1.0.0
 */
public class JarLoader extends URLClassLoader {
    public JarLoader(String[] paths) {
        this(paths, JarLoader.class.getClassLoader());
    }
    public JarLoader(String[] paths, ClassLoader parent) {
        super(getURLs(paths), parent);
    }
    private static URL[] getURLs(String[] paths) {
        Validate.isTrue(null != paths && 0 != paths.length,
                "jar包路径不能为空.");
        List<String> dirs = new ArrayList<String>();
        for (String path : paths) {
            dirs.add(path);
            JarLoader.collectDirs(path, dirs);
        }
        List<URL> urls = new ArrayList<URL>();
        for (String path : dirs) {
            urls.addAll(doGetURLs(path));
        }
        return urls.toArray(new URL[0]);
    }
    private static void collectDirs(String path, List<String> collector) {
        if (null == path || StringUtils.isBlank(path)) {
            return;
        }
        File current = new File(path);
        if (!current.exists() || !current.isDirectory()) {
            return;
        }
        for (File child : current.listFiles()) {
            if (!child.isDirectory()) {
                continue;
            }
            collector.add(child.getAbsolutePath());
            collectDirs(child.getAbsolutePath(), collector);
        }
    }
    private static List<URL> doGetURLs(final String path) {
        Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");
        File jarPath = new File(path);
        Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
                "jar包路径必须存在且为目录.");
        /* set filter */
        FileFilter jarFilter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return pathname.getName().endsWith(".jar");
            }
        };
        /* iterate all jar */
        File[] allJars = new File(path).listFiles(jarFilter);
        List<URL> jarURLs = new ArrayList<URL>(allJars.length);
        for (int i = 0; i < allJars.length; i++) {
            try {
                jarURLs.add(allJars[i].toURI().toURL());
            } catch (Exception e) {
                throw new RuntimeException("系统加载jar包出错", e);
            }
        }
        return jarURLs;
    }
    /**
     * 装载字符串成为java可执行文件
     *
     * @param className className
     * @param javaCodes javaCodes
     * @return Class
     */
    public static Class<?> compile(String className, String javaCodes) {
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
        StrSrcJavaObject srcObject = new StrSrcJavaObject(className, javaCodes);
        Iterable<? extends JavaFileObject> fileObjects = Arrays.asList(srcObject);
        String flag = "-d";
        String outDir = "";
        try {
            File classPath = new File(Thread.currentThread().getContextClassLoader().getResource("").toURI());
            outDir = classPath.getAbsolutePath() + File.separator;
        } catch (URISyntaxException e1) {
            e1.printStackTrace();
        }
        Iterable<String> options = Arrays.asList(flag, outDir);
        JavaCompiler.CompilationTask task = compiler.getTask(null, fileManager, null, options, null, fileObjects);
        boolean result = task.call();
        if (result == true) {
            try {
                return Class.forName(className);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
    private static class StrSrcJavaObject extends SimpleJavaFileObject {
        private String content;
        public StrSrcJavaObject(String name, String content) {
            super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE);
            this.content = content;
        }
        public CharSequence getCharContent(boolean ignoreEncodingErrors) {
            return content;
        }
    }
}

ok,以上两个工具类解决了我们以上痛点了。

3.3 可能会出现的问题

当我们正式部署上服务器可能会出现一些问题:

  • 为何读取不了jar包里面的程序,特别是Flink on Yarn的模式;
  • 为何加载后的类会丢失(Flink也有自己的类加载器FlinkUserClassLoader

这些问题大家可以私聊我,本人已经实现了Flink使用protobuf 格式的插件了,已上线使用。

04 文末

本文主要讲解了Protobuf的一些概念,以及如何在Flink里使用Protobuf,希望能帮助到大家,谢谢大家的阅读!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
147 1
|
8月前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
256 1
|
8月前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)
阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)
1425 3
|
8月前
|
SQL 消息中间件 Java
Flink报错问题之使用debezium-json format报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
8月前
|
SQL Java 关系型数据库
flink问题之使用debezium-json format报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
170 0
|
SQL 消息中间件 canal
Flink cdc自定义format格式数据源
变更数据捕获 (CDC) 已成为一种流行的模式,本文介绍如何通过自定义format来获取不同格式cdc数据源
Flink cdc自定义format格式数据源
|
SQL 消息中间件 JSON
Flink SQL JSON Format 源码解析
用 Flink SQL 解析 JSON 格式的数据是非常简单的,只需要在 DDL 语句中设置 Format 为 json 即可,像下面这样: CREATE TABLE kafka_source ( funcName STRING, data ROW<snapshots ARRAY<ROW<content_type STRING,url STRING>>,audio ARRAY<ROW<content_type STRING,url STRING>>>, resultMap ROW<`result` MAP<STRING,STRING>,isSuccess BOOLEAN
|
消息中间件 SQL canal
Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据
Flink 1.11 最重要的 Feature —— Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 —— CDC。
Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1467 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎