Kafka实战:从RDBMS到Hadoop,七步实现实时传输

简介:

本文是关于Flume成功应用Kafka的研究案例,深入剖析它是如何将RDBMS实时数据流导入到HDFS的Hive表中。

对于那些想要把数据快速摄取到Hadoop中的企业来讲,Kafka是一个很好的选择。Kafka是什么?Kafka是一个分布式、可伸缩、可信赖的消息传递系统,利用发布-订阅模型来集成应用程序/数据流。同时,Kafka还是Hadoop技术堆栈中的关键组件,能够很好地支持实时数据分析或者货币化的物联网数据。

本文服务于技术人群。下面就图解Kafka是如何把数据流从RDBMS(关系数据库管理系统)导入Hive,同时借助一个实时分析用例加以说明。作为参考,本文中使用的组件版本分别为Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。

Kafka所在位置:解决方案的整体结构

下图显示了解决方案的整体结构: Kafka 和 Flume 的结合,再加上Hive的交易功能,RDBMS的交易数据被成功传递到目标 Hive 表中。

Hadoop

七步实现Hadoop实时数据导入

现在让我们深入方案细节,并展示如何在几个步骤内将数据流导入Hadoop。

1.从RDBMS中提取数据

所有关系型数据库都有一个日志文件,用来记录最新的交易。解决方案的第一步就是获取这些交易数据,同时要确保这些数据格式是可以被Hadoop所接受的。

2.设置Kafka生产商

发布Kafka话题消息的过程称为“生产商”。“话题”里有各种Kafka所需要维护的信息类别,RDBMS数据也会被转换成Kafka话题。对于这个示例,要求设置一个服务于整个销售团队的数据库,且该数据库中的交易数据均以Kafka话题形式发布。以下步骤都需要设置Kafka 生产商:


 
 
  1. $cd /usr/hdp/2.4.0.0-169/kafka 
  2.  
  3. $bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions 
  4.  
  5. Created topic "SalesDBTransactions". 
  6.  
  7. $bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181 
  8.  
  9. SalesDBTransactions 

3.设置Hive

接下来将创建一个Hive表,准备接收销售团队的数据库交易数据。这个例子中,我们将创建一个用户数据表:


 
 
  1. [bedrock@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive 
  2.  
  3. 0: jdbc:hive2://> use raj; 
  4.  
  5. create table customers (id string, name string, email string, street_address string, company string) 
  6.  
  7. partitioned by (time string) 
  8.  
  9. clustered by (id) into 5 buckets stored as orc 
  10.  
  11. location '/user/bedrock/salescust' 
  12.  
  13. TBLPROPERTIES ('transactional'='true'); 

为了确保Hive能够有效处理交易数据,以下设置要求在Hive配置中进行:


 
 
  1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 

4.为Kafka到Hive的数据流设置Flume代理

现在来看下如何创建一个Flume代理,用于收集Kafka话题资料并向Hive表发送数据。

在启用Flume代理前,要通过这几个步骤设置运行环境:


 
 
  1. $ pwd 
  2.  
  3. /home/bedrock/streamingdemo 
  4.  
  5. $ mkdir flume/checkpoint 
  6.  
  7. $ mkdir flume/data 
  8.  
  9. $ chmod 777 -R flume 
  10.  
  11. $ export HIVE_HOME=/usr/hdp/current/hive-server2 
  12.  
  13. $ export HCAT_HOME=/usr/hdp/current/hive-webhcat 
  14.  
  15. $ pwd 
  16.  
  17. /home/bedrock/streamingdemo/flume 
  18.  
  19. $ mkdir logs 

再如下所示创建一个log4j属性文件:


 
 
  1. [bedrock@sandbox conf]$ vi log4j.properties 
  2.  
  3. flume.root.logger=INFO,LOGFILE 
  4.  
  5. flume.log.dir=/home/bedrock/streamingdemo/flume/logs 
  6.  
  7. flumeflume.log.file=flume.log 

然后为Flume代理配置以下文件:


 
 
  1. $ vi flumetohive.conf 
  2.  
  3. flumeagent1.sources = source_from_kafka 
  4.  
  5. flumeagent1.channels = mem_channel 
  6.  
  7. flumeagent1.sinks = hive_sink 
  8.  
  9. # Define / Configure source 
  10.  
  11. flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource 
  12.  
  13. flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181 
  14.  
  15. flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions 
  16.  
  17. flumeflumeagent1.sources.source_from_kafka.groupID = flume 
  18.  
  19. flumeagent1.sources.source_from_kafka.channels = mem_channel 
  20.  
  21. flumeagent1.sources.source_from_kafka.interceptors = i1 
  22.  
  23. flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp 
  24.  
  25. flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 
  26.  
  27. # Hive Sink 
  28.  
  29. flumeagent1.sinks.hive_sink.type = hive 
  30.  
  31. flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083 
  32.  
  33. flumeagent1.sinks.hive_sink.hive.database = raj 
  34.  
  35. flumeagent1.sinks.hive_sink.hive.table = customers 
  36.  
  37. flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 
  38.  
  39. flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M 
  40.  
  41. flumeagent1.sinks.hive_sink.batchSize = 10 
  42.  
  43. flumeagent1.sinks.hive_sink.serializer = DELIMITED 
  44.  
  45. flumeagent1.sinks.hive_sink.serializer.delimiter = , 
  46.  
  47. flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company 
  48.  
  49. # Use a channel which buffers events in memory 
  50.  
  51. flumeagent1.channels.mem_channel.type = memory 
  52.  
  53. flumeagent1.channels.mem_channel.capacity = 10000 
  54.  
  55. flumeagent1.channels.mem_channel.transactionCapacity = 100 
  56.  
  57. # Bind the source and sink to the channel 
  58.  
  59. flumeagent1.sources.source_from_kafka.channels = mem_channel 
  60.  
  61. flumeagent1.sinks.hive_sink.channel = mem_channel 

5.启用Flume代理

通过以下指令启用Flume代理:


 
 
  1. $ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf 

6.启用Kafka流

作为示例下面是一个模拟交易的消息集,这在实际系统中需要通过源数据库才能生成。例如,以下可能来自Oracle流,在回放被提交到数据库的SQL交易数据,也可能来自GoldenGate。


 
 
  1. $ cd /usr/hdp/2.4.0.0-169/kafka 
  2.  
  3. $ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions 
  4.  
  5. 1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company" 
  6.  
  7. 2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated" 
  8.  
  9. 3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP" 
  10.  
  11. 4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc." 
  12.  
  13. 5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd" 
  14.  
  15. 6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC" 
  16.  
  17. 7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute" 
  18.  
  19. 8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation" 
  20.  
  21. 9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated" 
  22.  
  23. 10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates" 
  24.  
  25. 11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation" 

7.接收Hive数据

如果上面所有的步骤都完成了,那么现在就可以从Kafka发送数据,可以看到数据流在几秒钟内就会被发送到Hive表。


本文作者:佚名

来源:51CTO

相关文章
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
79 5
|
6月前
|
存储 分布式计算 Hadoop
Hadoop Distributed File System (HDFS): 概念、功能点及实战
【6月更文挑战第12天】Hadoop Distributed File System (HDFS) 是 Hadoop 生态系统中的核心组件之一。它设计用于在大规模集群环境中存储和管理海量数据,提供高吞吐量的数据访问和容错能力。
684 4
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
47 3
|
2月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
46 3
|
2月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
41 2
|
3月前
|
分布式计算 Hadoop Devops
Hadoop集群配置https实战案例
本文提供了一个实战案例,详细介绍了如何在Hadoop集群中配置HTTPS,包括生成私钥和证书文件、配置keystore和truststore、修改hdfs-site.xml和ssl-client.xml文件,以及重启Hadoop集群的步骤,并提供了一些常见问题的故障排除方法。
88 3
Hadoop集群配置https实战案例
|
3月前
|
分布式计算 监控 Hadoop
监控Hadoop集群实战篇
介绍了监控Hadoop集群的方法,包括监控Linux服务器、Hadoop指标、使用Ganglia监控Hadoop集群、Hadoop日志记录、通过Hadoop的Web UI进行监控以及其他Hadoop组件的监控,并提供了相关监控工具和资源的推荐阅读链接。
97 2
|
4月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
104 4
|
4月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
85 8