使用python的hdfs包操作分布式文件系统(HDFS)

简介: 转载请注明出处:@http://blog.csdn.net/gamer_gyt,Thinkagmer 撰写博主微博:http://weibo.com/234654758 (欢迎互撩)Github:https://github.

转载请注明出处:@http://blog.csdn.net/gamer_gyt,Thinkagmer 撰写

博主微博:http://weibo.com/234654758 (欢迎互撩)

Github:https://github.com/thinkgamer

=====================================================================================

写在前边的话:

        之前做的hadoop集群,组合了hive,hbase,sqoop,spark等开源工具,现在要对他们做一个Web的可视化操作,由于本小白只懂如何使用python做一个交互的web应用,所以这里就选择了Python的Django

        Django教程参考:Django从manage.py shell 到项目部署

        hadoop集群操作请参考:三台PC服务器部署高可用hadoop集群


言归正传:

       使用python操作hdfs本身并不难,只不过是把对应的shell 功能“翻译”成高级语言,网上大部分使用的是

       pyhdfs:官方文档

       hdfs:官方文档

       libhdfs(比较狗血)

       我这里选用的是hdfs,下边的实例都是基于hdfs包进行的

1:安装

      由于我的是windows环境(linux其实也一样),只要有pip或者setup_install安装起来都是很方便的

     pip install hdfs

2:Client——创建集群连接

>>> from hdfs import *
>>> client = Client("http://127.0.0.1:50070")

       其他参数说明:

       classhdfs.client.Client(urlroot=Noneproxy=Nonetimeout=Nonesession=None)

                    url:ip:端口

                    root:制定的hdfs根目录

                    proxy:制定登陆的用户身份

                    timeout:设置的超时时间

                    seesion:requests.Session instance, used to emit all requests.(不是太懂,应该四用户发出请求)

       这里我们着重看一下proxy这个,首先我们指定root用户连接

>>> client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.list("/")
[u'hbase']
       看起来一切正常的样子,接下来我们指定一个别的用户,比如说gamer再看
>>> client = Client("http://127.0.0.1:50070",root="/",proxy="gamer",timeout=100,session=False)
>>> client.list("/")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 893, in list
    statuses = self._list_status(hdfs_path).json()['FileStatuses']['FileStatus']
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 92, in api_handler
    **self.kwargs
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 181, in _request
    return _on_error(response)
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 44, in _on_error
    raise HdfsError(message)
hdfs.util.HdfsError: Failed to obtain user group information: org.apache.hadoop.security.authorize.AuthorizationException: User: dr.who is not allowed to impersonate gamer
       这时候就抛出异常了

3:dir——查看支持的方法

>>> dir(client)
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', 
'__hash__', '__init__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__registry__',
 '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_append', '_create', '_delete',
 '_get_content_summary', '_get_file_checksum', '_get_file_status', '_get_home_directory', '_list_status', '_mkdirs', '_open',
 '_proxy', '_rename', '_request', '_session', '_set_owner', '_set_permission', '_set_replication', '_set_times', '_timeout', 
'checksum', 'content', 'delete', 'download', 'from_options', 'list', 'makedirs', 'parts', 'read', 'rename', 'resolve', 'root',
 'set_owner', 'set_permission', 'set_replication', 'set_times', 'status', 'upload',
 'url', 'walk', 'write']

4:status——获取路径的具体信息

>>> client.status("/")
{'accessTime': 0, 'pathSuffix': '', 'group': 'supergroup', 'type': 'DIRECTORY', 'owner': 'root', 'childrenNum': 4, 'blockSize': 0,
 'fileId': 16385, 'length': 0, 'replication': 0, 'storagePolicy': 0, 'modificationTime': 1473023149031, 'permission': '777'}

      其他参数:status(hdfs_pathstrict=True)

               hdfs_path:就是hdfs路径

               strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

>>> client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.status("/gamer",strict=True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 277, in status
    res = self._get_file_status(hdfs_path, strict=strict)
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 92, in api_handler
    **self.kwargs
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 181, in _request
    return _on_error(response)
  File "/usr/local/lib/python2.7/dist-packages/hdfs/client.py", line 44, in _on_error
    raise HdfsError(message)
hdfs.util.HdfsError: File does not exist: /gamer
>>> client.status("/gamer",strict=False)
>>>
      从例子中可以看出,当设置为false时,路径不存在,什么也不输出

5:list——获取指定路径的子目录信息

>>> client.list("/")
['file', 'gyt', 'hbase', 'tmp']

     其他参数:list(hdfs_pathstatus=False)

              status:为True时,也返回子目录的状态信息,默认为Flase

>>> client.list("/")
[u'hbase']
>>> client.list("/",status=False)
[u'hbase']
>>> client.list("/",status=True)
[(u'hbase', {u'group': u'supergroup', u'permission': u'755', u'blockSize': 0, u'accessTime': 0, u'pathSuffix': u'hbase', u'modificationTime': 1472986624167, u'replication': 0, u'length': 0, u'childrenNum': 7, u'owner': u'root', u'storagePolicy': 0, u'type': u'DIRECTORY', u'fileId': 16386})]
>>> 

6:makedirs——创建目录

>>> client.makedirs("/test")
>>> client.list("/")
['file', 'gyt', 'hbase', 'test', 'tmp']
>>> client.status("/test")
{'accessTime': 0, 'pathSuffix': '', 'group': 'supergroup', 'type': 'DIRECTORY', 'owner': 'dr.who', 'childrenNum': 0, 'blockSize': 0,
 'fileId': 16493, 'length': 0, 'replication': 0, 'storagePolicy': 0, 'modificationTime': 1473096896947, 'permission': '755'}

       其他参数:makedirs(hdfs_pathpermission=None)

                permission:设置权限

>>> client.makedirs("/test",permission=777)
>>> client.status("/test")
{u'group': u'supergroup', u'permission': u'777', u'blockSize': 0, u'accessTime': 0, u'pathSuffix': u'', u'modificationTime': 1473175557340, u'replication': 0, u'length': 0, u'childrenNum': 0, u'owner': u'dr.who', u'storagePolicy': 0, u'type': u'DIRECTORY', u'fileId': 16437}
       可以看出该文件夹的权限是777

7:rename—重命名

>>> client.rename("/test","/new_name")
>>> client.list("/")
['file', 'gyt', 'hbase', 'new_name', 'tmp']

       格式说明:rename(hdfs_path, local_path)

8:delete—删除

>>> client.list("/")
['file', 'gyt', 'hbase', 'new_name', 'tmp']
>>> client.delete("/new_name")
True
>>> client.list("/")
['file', 'gyt', 'hbase', 'tmp']

      其他参数:delete(hdfs_pathrecursive=False)

               recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False

>>> client.delete("/test",recursive=True)
True
>>> client.delete("/test",recursive=True)
False
>>> client.delete("/test")
False

9:upload——上传数据

=======================分割线==========================

为什么这里需要分割线?因为在做web平台可视化操作hdfs的时候遇到了问题!错误如下:

requests.exceptions.ConnectionError: HTTPConnectionPool(host='slaver1', port=50075): Max retries exceeded with url:
 /webhdfs/v1/thinkgamer/name.txt?op=OPEN&namenoderpcaddress=master&offset=0 (Caused by NewConnectionError
('<requests.packages.urllib3.connection.HTTPConnection object at 0x00000000043A3FD0>: Failed to establish a new connection:
 [Errno 11004] getaddrinfo failed',))

对错误的理解:看其大意是Http连接太多,没有及时关闭,导致错误 (PS:网上对hdfs操作的资料比较少,大部分都只停留在基础语法层面,但对于错误的记录及解决办法少之又少)

解决办法:暂无

由于我是在windows上操作集群的,而我的集群是在服务器上部署的,所以我考虑是否在服务器上尝试下载和上传数据,果断ok

>>> client.list("/")
[u'hbase', u'test']
>>> client.upload("/test","/opt/bigdata/hadoop/NOTICE.txt")
'/test/NOTICE.txt'
>>> client.list("/")
[u'hbase', u'test']
>>> client.list("/test")
[u'NOTICE.txt']
       其他参数: upload ( hdfs_path local_path overwrite=False n_threads=1 temp_dir=None

                                 chunk_size=65536,progress=Nonecleanup=True**kwargs)

               overwrite:是否是覆盖性上传文件

               n_threads:启动的线程数目

               temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换

               chunk_size:文件上传的大小区间

               progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数

               cleanup:如果在上传任何文件时发生错误,则删除该文件

10:download——下载

>>> client.download("/test/NOTICE.txt","/home")
'/home/NOTICE.txt'
>>> import os
>>> os.system("ls /home")
lost+found  NOTICE.txt	thinkgamer
0
>>> 
      其他参数: download ( hdfs_path local_path overwrite=False n_threads=1 temp_dir=None **kwargs )
              参考上传 upload

11:read——读取文件

    同样在windows客户端上执行依旧报错,在hadoop的节点服务器上执行

 
>>> with client.read("/test/NOTICE.txt") as reader:
...     print reader.read()
... 
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).

>>>
     其他参数: read ( *args **kwds )

              hdfs_path:hdfs路径

              offset:设置开始的字节位置

              length:读取的长度(字节为单位)

              buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。

              encoding:制定编码

              chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象

              delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。

              progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。


附:在对文件操作时,可能会提示错误

hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x
        解决办法是:在配置文件hdfs-site.xml中加入
<property>
  <name>dfs.permissions</name>
  <value>false</value>
</property>
        重启集群即可

基本常用的功能也就这些了,如果需要一些特殊的功能,可以自己执行help(client.method)进行查看


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
Python
下载python所有的包 国内地址
下载python所有的包 国内地址
WK
|
3月前
|
Python
如何在Python中导入包
在 Python 中,包是一种组织代码的方式,通过包含 `__init__.py` 文件(在 Python 3.3 及以上版本可选)的目录实现。包内可以包含多个模块(`.py` 文件)和其他子包。导入包有多种方式:整体导入包、导入特定模块、导入特定函数或类、导入子包等。推荐的做法是明确指定导入内容以提高代码的可读性和可维护性。此外,确保包目录结构正确,并将其添加到 Python 的搜索路径中。对于分发包,使用 setuptools 和 pip 等工具更为便捷。
WK
125 66
WK
|
3月前
|
Python
如何在Python中创建包
在Python中创建包十分简便,主要涉及目录结构的设置及`__init__.py`文件的配置。虽然Python 3.3后空`__init__.py`文件不再强制要求,但在特定场景下保留它有助于保持兼容性或执行包初始化代码。创建包的具体步骤包括:构建目录结构、编写模块代码、(可选)编写初始化代码等。例如,可以创建一个名为`mypackage`的目录,其中包含`__init__.py`及多个模块文件如
WK
115 62
|
29天前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
1月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
28 1
|
1月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
40 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
47 1
|
2月前
|
机器学习/深度学习 搜索推荐 数据可视化
Python量化炒股常用的Matplotlib包
Python量化炒股常用的Matplotlib包
|
2月前
|
数据采集 数据可视化 数据挖掘
Python量化炒股常用的Pandas包
Python量化炒股常用的Pandas包
|
2月前
|
人工智能 算法 数据处理
Python常用的Numpy包
Python常用的Numpy包