[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。

管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。

Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。

本章介绍Luigi的安装和工作流程的详细说明。

安装

pip install luigi

工作流

在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。

  • 任务

任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。

图片.png

  • 目标

目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。

  • 参数

参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。

参考资料

工作流本示例

#!/usr/bin/env python
# 项目实战讨论QQ群630011153 144081101
# https://github.com/china-testing/python-api-tesing
import luigi

class InputFile(luigi.Task):
   """
   A task wrapping a Target 
   """
   input_file = luigi.Parameter()

   def output(self):
      """
      Return the target for this task
      """
      return luigi.LocalTarget(self.input_file)

class WordCount(luigi.Task):
   """
   A task that counts the number of words in a file
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter(default='/tmp/wordcount')

   def requires(self):
      """
      The task's dependencies:
      """
      return InputFile(self.input_file)

   def output(self):
      """
      The task's output
      """
      return luigi.LocalTarget(self.output_file)

   def run(self):
      """
      The task's logic
      """
      count = {}

      ifp = self.input().open('r')

      for line in ifp:
         for word in line.strip().split():
            count[word] = count.get(word, 0) + 1

      ofp = self.output().open('w')
      for k, v in count.items():
            ofp.write('{}\t{}\n'.format(k, v))
      ofp.close()

if __name__ == '__main__':
   luigi.run()

执行

$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   PENDING
INFO: Informed scheduler that task   InputFile__home_hduser__in_0eced493f7   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running   WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done      WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 InputFile(input_file=/home/hduser_/input2.txt)
* 1 ran successfully:
    - 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
jack    2
be    2
nimble    1
quick    1
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
19小时前
|
存储 算法 Serverless
剖析文件共享工具背后的Python哈希表算法奥秘
在数字化时代,文件共享工具不可或缺。哈希表算法通过将文件名或哈希值映射到存储位置,实现快速检索与高效管理。Python中的哈希表可用于创建简易文件索引,支持快速插入和查找文件路径。哈希表不仅提升了文件定位速度,还优化了存储管理和多节点数据一致性,确保文件共享工具高效运行,满足多用户并发需求,推动文件共享领域向更高效、便捷的方向发展。
|
30天前
|
数据可视化 编译器 Python
Manim:数学可视化的强大工具 | python小知识
Manim(Manim Community Edition)是由3Blue1Brown的Grant Sanderson开发的数学动画引擎,专为数学和科学可视化设计。它结合了Python的灵活性与LaTeX的精确性,支持多领域的内容展示,能生成清晰、精确的数学动画,广泛应用于教育视频制作。安装简单,入门容易,适合教育工作者和编程爱好者使用。
240 7
|
2月前
|
JavaScript 前端开发 开发者
探索 DrissionPage: 强大的Python网页自动化工具
DrissionPage 是一个基于 Python 的网页自动化工具,结合了浏览器自动化的便利性和 requests 库的高效率。它提供三种页面对象:ChromiumPage、WebPage 和 SessionPage,分别适用于不同的使用场景,帮助开发者高效完成网页自动化任务。
208 4
|
2月前
|
开发者 Python
探索Python中的列表推导式:简洁而强大的工具
【10月更文挑战第41天】 在编程的世界中,效率与简洁是永恒的追求。本文将深入探讨Python编程语言中一个独特且强大的特性——列表推导式(List Comprehension)。我们将通过实际代码示例,展示如何利用这一工具简化代码、提升性能,并解决常见编程问题。无论你是初学者还是资深开发者,掌握列表推导式都将使你的Python之旅更加顺畅。
|
2月前
|
数据采集 JavaScript 程序员
探索CSDN博客数据:使用Python爬虫技术
本文介绍了如何利用Python的requests和pyquery库爬取CSDN博客数据,包括环境准备、代码解析及注意事项,适合初学者学习。
95 0
|
2月前
|
C语言 Python
探索Python中的列表推导式:简洁而强大的工具
【10月更文挑战第24天】在Python编程的世界中,追求代码的简洁性和可读性是永恒的主题。列表推导式(List Comprehensions)作为Python语言的一个特色功能,提供了一种优雅且高效的方法来创建和处理列表。本文将深入探讨列表推导式的使用场景、语法结构以及如何通过它简化日常编程任务。
|
1月前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
1月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
20天前
|
Unix Linux 程序员
[oeasy]python053_学编程为什么从hello_world_开始
视频介绍了“Hello World”程序的由来及其在编程中的重要性。从贝尔实验室诞生的Unix系统和C语言说起,讲述了“Hello World”作为经典示例的起源和流传过程。文章还探讨了C语言对其他编程语言的影响,以及它在系统编程中的地位。最后总结了“Hello World”、print、小括号和双引号等编程概念的来源。
105 80
|
9天前
|
Python
[oeasy]python055_python编程_容易出现的问题_函数名的重新赋值_print_int
本文介绍了Python编程中容易出现的问题,特别是函数名、类名和模块名的重新赋值。通过具体示例展示了将内建函数(如`print`、`int`、`max`)或模块名(如`os`)重新赋值为其他类型后,会导致原有功能失效。例如,将`print`赋值为整数后,无法再用其输出内容;将`int`赋值为整数后,无法再进行类型转换。重新赋值后,这些名称失去了原有的功能,可能导致程序错误。总结指出,已有的函数名、类名和模块名不适合覆盖赋新值,否则会失去原有功能。如果需要使用类似的变量名,建议采用其他命名方式以避免冲突。
30 14