Flink on zeppelin 实时写入hive

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 概述随着Flink1.11.0版本的发布,一个很重要的特性就是支持了流数据直接写入到hive中,用户可以非常方便的用SQL的方式把kafka的数据直接写入到hive里面.这篇文章会给出Flink on zeppelin里面实现流式写入hive的简单示例以及遇到问题的解决方案

概述


随着Flink1.11.0版本的发布,一个很重要的特性就是支持了流数据直接写入到hive中,用户可以非常方便的用SQL的方式把kafka的数据直接写入到hive里面.这篇文章会给出Flink on zeppelin里面实现流式写入hive的简单示例以及遇到问题的解决方案


Streaming Writing


先来看一下官网对hive streaming writing的介绍.



streaming writing是基于Filesystem streaming sink实现了,当然也就支持端到端exactly-once语义了.从1.11开始,用户可以在 Table API/SQL 和 SQL Client 中使用 Hive 语法(HiveQL)来编写 SQL 语句。为了支持这一特性,Flink 引入了一种新的 SQL 方言,用户可以动态的为每一条语句选择使用Flink(default)或Hive(hive)方法。可以通过SET table.sql-dialect=hive;或者SET table.sql-dialect=default; 来设置.


添加依赖



这里还是要强调一下,用不到的jar包不要随便添加,很容易会造成jar包的冲突,报各种乱七八糟的错.只添加需要用的即可.


创建表


1,创建kafka的流表


%flink.ssql
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
    name VARCHAR COMMENT '姓名',
    age int COMMENT '年龄',
  city VARCHAR,
    borth VARCHAR,
    ts BIGINT  COMMENT '时间戳',
    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
    proctime as PROCTIME(),
    WATERMARK FOR t AS t - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'jason_flink',  -- kafka topic
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test',
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json'  -- 数据源格式为 json
);


2,创建hive表


在创建hive表之前需要先把方言设置成hive的,也就是先执行


SET table.sql-dialect=hive; 但是目前zeppelin应该是还不支持直接执SQL语句,还需要写scala的代码. 但是Flink的sql-client里面是可以直接执行的.


%flink
stenv.getConfig().setSqlDialect(SqlDialect.HIVE)


这里我就只创建了一个天级别的分区,分区提交策略使用的是默认的


%flink.ssql
DROP TABLE IF EXISTS fs_table;
CREATE TABLE fs_table (
  name STRING,
  age int,
  dt STRING
) PARTITIONED BY (dt STRING) STORED AS PARQUET TBLPROPERTIES (
  'sink.partition-commit.delay'='1s',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'sink.rolling-policy.check-interval'='1min'
);


hive streaming sink还引入了新的机制:Partition commit。

一个合理的数仓的数据导入,它不止包含数据文件的写入,也包含了 Partition 的可见性提交。当某个 Partition 完成写入时,需要通知 Hive metastore 或者在文件夹内添加 SUCCESS 文件。Flink 1.11 的 Partition commit 机制可以让你:


Trigger:控制Partition提交的时机,可以根据Watermark加上从Partition中提取的时间来判断,也可以通过Processing time来判断。你可以控制:是想先尽快看到没写完的Partition;还是保证写完Partition之后,再让下游看到它。


Policy:提交策略,内置支持SUCCESS文件和Metastore的提交,你也可以扩展提交的实现,比如在提交阶段触发Hive的analysis来生成统计信息,或者进行小文件的合并等等。


这里还有几个参数需要说明一下:


sink.partition-commit.trigger:触发分区提交的时间特征。默认为 processing-time,即处理时间,但是在有延迟的情况下,可能会造成数据分区错乱。所以你可以使用 partition-time,即按照分区时间戳(使用partition中抽取时间,加上watermark决定partiton commit的时机 即分区内数据对应的事件时间)来提交。


partition.time-extractor.timestamp-pattern:分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。显然,Hive 表的分区字段值来自流表中定义好的事件时间,timestamp-pattern 会从你定义的partition 里面推断出完整的timestamp


sink.partition-commit.delay:触发分区提交的延迟。在时间特征设为 partition-time 的情况下,当水印时间戳大于分区创建时间加上此延迟时(当 watermark > partition时间 + 1小时,会commit这个partition),分区才会真正提交。此值最好与分区粒度相同,例如若 Hive 表按1小时分区,此参数可设为 1 h,若按 10 分钟分区,可设为 10 min。


sink.partition-commit.policy.kind:分区提交策略,可以理解为使分区对下游可见的附加操作。metastore 表示更新 Hive Metastore 中的表元数据, success-file 则表示在分区内创建 _SUCCESS 标记文件(先更新metastore(addPartition),再写SUCCESS文件)。只有创建了success文件标识,hive里面的数据才真正可见.


执行插入hive表SQL


%flink.ssql(type=update,parallelism=8)
INSERT INTO fs_table SELECT name, age, FROM_UNIXTIME(ts/1000,'yyyy-MM-dd') FROM kafka_table where name = 'jason';


然后来看下Flink的WEB UI上任务的DAG如下图所示



从UI上可以看到有数据进来了,然后来看一下HDFS对应的路径下是否有文件产生.



这里产生了success文件,就是上面说的分区提交策略,这个时候hive里面的数据就是可见的了.


为了保险起见我们再到hive里面查一下数据验证一下,看看能不能查到数据


hive> show tables;
OK
fs_table
kafka_table
Time taken: 1.648 seconds, Fetched: 2 row(s)
hive> select * from fs_table ;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
jason  1  2020-07-26
jason  1  2020-07-26
jason  1  2020-07-26
jason  1  2020-07-26
jason  1  2020-07-26
Time taken: 3.965 seconds, Fetched: 5 row(s)


数据是没有问题的,我往kafka里面写入了10条数据,但是上面的SQL里面只过滤了name是jason的,所以只有5条.


遇到问题总结


遇到的第一个问题是关于hive的metastore的问题.


Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris
  at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
  at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:171) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
  at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:157) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
  at flink.hive.FlinkHiveDemo$.main(FlinkHiveDemo.scala:40) ~[?:?]
  at flink.hive.FlinkHiveDemo.main(FlinkHiveDemo.scala) ~[?:?]
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111]
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111]
  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
  ... 13 more


这个报错是因为在Flink1.11.0版本中HiveCatalog已经不允许embedded模式了,所以需要我们自己启动一个独立的metastore server.


下面再简单的说一下hive metastore的配置.


hive server端的配置


<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://master:3306/hive?useSSL=false&amp;createDatabaseIfNotExist=true&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
        <description>JDBC connect string for a JDBC metastore</description>    
    </property>   
    <property> 
        <name>javax.jdo.option.ConnectionDriverName</name> 
        <value>com.mysql.jdbc.Driver</value> 
        <description>Driver class name for a JDBC metastore</description>     
    </property>
    <property> 
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>****</value>
        <description>username to use against metastore database</description>
    </property>
    <property>  
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>****</value>
        <description>password to use against metastore database</description>  
    </property>          
<property>
    <name>datanucleus.schema.autoCreateAll</name>
    <value>true</value>
 </property>
<property>  
  <name>hive.metastore.warehouse.dir</name>  
  <value>/hive/warehouse</value>  
</property>  
<property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
</property>
</configuration>
hive client端的配置
<configuration>
<property>
  <name>hive.metastore.local</name>
  <value>false</value>
</property>
<property>  
  <name>hive.metastore.uris</name>  
  <value>thrift://master:9083</value>  
</property>  
</configuration>


配置完成后启动任务的时候可能还会遇到下面的报错


org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:760)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
  at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
  at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
  at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.zeppelin.interpreter.InterpreterException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
  at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
  at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
  at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:47)
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
  ... 8 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
  at org.apache.flink.table.catalog.hive.client.HiveShimV230.getHiveMetastoreClient(HiveShimV230.java:52)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
  at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
  at org.apache.zeppelin.flink.FlinkScalaInterpreter.registerHiveCatalog(FlinkScalaInterpreter.scala:458)
  at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:133)
  at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:67)
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
  ... 12 more
Caused by: java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.table.catalog.hive.client.HiveShimV230.getHiveMetastoreClient(HiveShimV230.java:50)
  ... 22 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1708)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:89)
  ... 27 more
Caused by: java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1706)
  ... 30 more
Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
  at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:480)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:247)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1706)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:89)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.table.catalog.hive.client.HiveShimV230.getHiveMetastoreClient(HiveShimV230.java:50)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
  at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:223)
  at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
  at org.apache.zeppelin.flink.FlinkScalaInterpreter.registerHiveCatalog(FlinkScalaInterpreter.scala:458)
  at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:133)
  at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:67)
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
  at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
  at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
  at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:47)
  at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:760)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
  at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
  at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
  at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
  at java.net.PlainSocketImpl.socketConnect(Native Method)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:589)
  at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
  ... 37 more
)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:529)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:247)
  ... 35 more


这个报错是因为你虽然配置了hive的metastore,但是没有启动服务.在你配置的服务端执行hive --service metastore 启动就可以了.


这篇文章主要是介绍了在Flink on zeppelin里面流式的写入数据到hive里,以及中间遇到问题的解决方法,同样的你也可以在IDEA里面写代码或者用sql-client去提交这样的任务如果你没有装zeppelin的话.Flink还支持了流式读取hive表,这个暂时就先不介绍了.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
70 6
|
5月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
机器学习/深度学习 SQL 算法
Flink on Zeppelin (4) - 机器学习篇
Flink 在机器学习这个领域发力较晚,社区版没有一个完整的机器学习算法库可以用,Alink[1]是目前 Flink 生态圈相对比较完整的机器学习算法库,Alink 也在往 Flink 社区贡献的路上。今天我主要讲的就是如何在 Zeppelin 里使用 Alink。
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1449 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
174 56
|
10天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。