hadoop集群同步实现

简介: #!/usr/bin/env python#coding=utf-8#scribe日志接收存在小集群到大集群之间, distcp 同步失败的情况,需要手动进行补入。#1、如果查询补入的日志量少,则可以之间用脚本处理。

#!/usr/bin/env python

#coding=utf-8

#scribe日志接收存在小集群到大集群之间, distcp 同步失败的情况,需要手动进行补入。

#1、如果查询补入的日志量少,则可以之间用脚本处理。如果量大,则使用 hadoop 提交job。

# hadoop job 提交方式:

#   hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-distcp-2.4.0.jar -m 100 hdfs://scribehadoop/scribelog/common_act/2016/08/02/13/  /file/realtime/distcpv2/scribelog/common_act/2016/08/02/13  --update 

 

#  --update 参数表示如果目标地址目录存在,则更新该目录中的内容。


#手动同步脚本使用方法: python manual_check_sync.py  dst_path  

#脚本完成大集群和小集群之间的目录大小比较,目录文件比较。 输出差异文件列表。最后完成同步入库。




import sys,os,commands,re

import logging,logging.handlers 


Module=sys.argv[1].split("scribelog")[1]



logger1 = logging.getLogger('mylogger1')

logger1.setLevel(logging.DEBUG)

# 创建一个handler,用于写入日志文件

filehandle = logging.FileHandler('log/test.log',mode='a')

# 再创建一个handler,用于输出到控制台

consehandle = logging.StreamHandler()


# 定义handler的输出格式formatter

formatter = logging.Formatter('[%(asctime)s]:%(message)s',datefmt='%F %T')

#formatter = logging.Formatter('[%(asctime)s-line:%(lineno)d-%(levelname)s]:%(message)s',datefmt='%F %T')

filehandle.setFormatter(formatter)

consehandle.setFormatter(formatter)


logger1.addHandler(filehandle)

logger1.addHandler(consehandle)





little_cluster="hdfs://yz632.hadoop.com.cn/scribelog"

large_cluster="hdfs://ns1"


#logger1.info("源目录(小集群):%s%s" % ( little_cluster,Module))


logger1.info("源目录(小集群):%s%s" %(little_cluster,Module))

logger1.info("目标目录(大集群):%s%s" %(large_cluster,sys.argv[1]))




#统计目录大小等情况

du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)

du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])


#获取的值是str类型,所以需要转为list来做比较。


logger1.info("               DIR_COUNT FILECOUNT CONTENTSIZE ")

logger1.info("小集群目录信息:%s" %(du_little))

logger1.info("大集群目录信息:%s" %(du_large))



#Python的str类有split方法,只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以定义多个分隔符。这里可以用单个空格作为分隔即可。

du_little=re.split('         |           | ' ,du_little)

du_large=re.split('         |           | ' ,du_large)



#du_little=du_little.split(" ")

#du_large=du_large.split(" ")



#print du_large,du_little

#print du_large[3],du_little[3]

#print du_large[6],du_little[6]

#print du_large[7],du_little[7]

#

if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:

logger1.info("大小集群文件数量、大小一致,不需要同步")

exit()




#如果大小不一致,取出目录下所有文件

little_list=commands.getoutput("hadoop fs -lsr " + little_cluster + Module + "|grep -v \"^d\"" )

large_list =commands.getoutput("hadoop fs -lsr " + large_cluster + sys.argv[1]+"|grep -v \"^d\"")


#logger1.info( "小集群情况:%s " %(little_list))

#logger1.info( "大集群情况:%s ")%(large_list ))




list1=[]

list2=[]

lost_list=[]

for i in  little_list.split("\n"):

list1.append(i.split("scribelog")[1])


for i in  large_list.split("\n"):

list2.append(i.split("scribelog")[1])



#logger1.info("小集群目录文件:",list1

#logger1.info("大集群目录文件" ,list2



logger1.info("对比大小集群文件---》未同步文件列表:")

for i in list1:

if i not in list2:

logger1.info(i) 

lost_list.append(i)


logger1.info(lost_list)



#拉取小集群文件到本地tmp目录中

for i in lost_list:

s=commands.getstatusoutput("hadoop fs -get " + little_cluster + i+" /tmp/")

logger1.info("拉取小集群文件到本地tmp目录中%s" %s )



#入库到大集群

for i in lost_list:

logger1.info(i)

  j=commands.getstatusoutput("sudo su - datacopy -s /bin/bash -c '/usr/local/hadoop-2.4.0/bin/hadoop fs -put  /tmp/" +i.split("/")[-1] + "  /file/realtime/distcpv2/scribelog"+i+" '" )

logger1.info(j)

if j[0] == 0 :

logger1.info(" %s + 同步完成" %(i))



#clear tmp file

for i  in lost_list:

commands.getstatusoutput("rm -f /tmp/"+i.split("/")[-1] )


#当同步目录时,可能会出现put的时候提示没有目录存在。参考赵兵的 put  -  src dest  方法,是否可以避免此类问题有待验证。




du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)

du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])



logger1.info("                  DIR_COUNT       FILECOUNT       CONTENTSIZE ")

logger1.info("小集群目录信息:%s" %(du_little))

logger1.info("大集群目录信息:%s" %(du_large))



#Python的str类有split方法,但是这个split方法只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以用来做这件事情

du_little=re.split('         |           | ' ,du_little)

du_large=re.split('         |           | ' ,du_large)



if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:

        logger1.info("小集群文件数量、大小一致,不需要同步")

        exit()


目录
相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
177 6
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
76 4
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
34 3
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
59 3
|
2月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
67 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
43 1
|
2月前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
45 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
51 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
96 0

相关实验场景

更多