[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流

简介: 管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。

管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。

Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。

本章介绍Luigi的安装和工作流程的详细说明。

安装

pip install luigi

工作流

在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。

  • 任务

任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。

图片.png

  • 目标

目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。

  • 参数

参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。

参考资料

工作流本示例

#!/usr/bin/env python
# 项目实战讨论QQ群630011153 144081101
# https://github.com/china-testing/python-api-tesing
import luigi

class InputFile(luigi.Task):
   """
   A task wrapping a Target 
   """
   input_file = luigi.Parameter()

   def output(self):
      """
      Return the target for this task
      """
      return luigi.LocalTarget(self.input_file)

class WordCount(luigi.Task):
   """
   A task that counts the number of words in a file
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter(default='/tmp/wordcount')

   def requires(self):
      """
      The task's dependencies:
      """
      return InputFile(self.input_file)

   def output(self):
      """
      The task's output
      """
      return luigi.LocalTarget(self.output_file)

   def run(self):
      """
      The task's logic
      """
      count = {}

      ifp = self.input().open('r')

      for line in ifp:
         for word in line.strip().split():
            count[word] = count.get(word, 0) + 1

      ofp = self.output().open('w')
      for k, v in count.items():
            ofp.write('{}\t{}\n'.format(k, v))
      ofp.close()

if __name__ == '__main__':
   luigi.run()

执行

$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   PENDING
INFO: Informed scheduler that task   InputFile__home_hduser__in_0eced493f7   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running   WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done      WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 InputFile(input_file=/home/hduser_/input2.txt)
* 1 ran successfully:
    - 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
jack    2
be    2
nimble    1
quick    1
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
7月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
7月前
|
数据可视化 关系型数据库 MySQL
基于python大数据的的海洋气象数据可视化平台
针对海洋气象数据量大、维度多的挑战,设计基于ECharts的可视化平台,结合Python、Django与MySQL,实现数据高效展示与交互分析,提升科研与决策效率。
|
7月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的台风灾害分析及预测系统
针对台风灾害预警滞后、精度不足等问题,本研究基于Python与大数据技术,构建多源数据融合的台风预测系统。利用机器学习提升路径与强度预测准确率,结合Django框架实现动态可视化与实时预警,为防灾决策提供科学支持,显著提高应急响应效率,具有重要社会经济价值。
|
7月前
|
数据可视化 大数据 关系型数据库
基于python大数据技术的医疗数据分析与研究
在数字化时代,医疗数据呈爆炸式增长,涵盖患者信息、检查指标、生活方式等。大数据技术助力疾病预测、资源优化与智慧医疗发展,结合Python、MySQL与B/S架构,推动医疗系统高效实现。
|
7月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
8月前
|
数据采集 存储 XML
Python爬虫技术:从基础到实战的完整教程
最后强调: 父母法律法规限制下进行网络抓取活动; 不得侵犯他人版权隐私利益; 同时也要注意个人安全防止泄露敏感信息.
1001 19

推荐镜像

更多