《500 Lines or Less》(2)A Continuous Integration System(持续集成系统)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 《500 Lines or Less》(2)A Continuous Integration System(持续集成系统)

什么是持续集成系统?

在软件开发过程中,我们通过测试验证代码是否按照预期工作。

持续集成系统(CI)用于测试新代码。提交代码后,CI将验证新提交能否通过测试。

因此,CI系统需要获取新更改、允许测试、生成报告。本项目将演示一个小型的、分布式的CI,该系统具有可扩展性。


介绍

CI具有三个组件:


观察者(observer):监视代码存储库。如果代码更改了,就通知调度器。

调度 (dispatcher):寻找合适的测试执行程序。

测试执行(test runner):执行测试。

这三个组件可以通过很多方式结合,例如,同时在一个/多个机器上的一个进程/多个进程。

这个项目中,这些组件中的每一个都是自己的过程。这将使每个进程独立于其他进程,并让我们运行每个进程的多个实例。

此外,进程间还通过套接字进行通信,这将使我们能够在单独的网络机器上运行每个进程。为每个组件分配一个唯一的主机/端口地址,每个进程都可以通过在分配的地址上发布消息来与其他进程进行通信。

初始设置

CI系统通过检测代码存储库中的更改来运行测试,因此首先,我们需要设置 CI 系统将监控的存储库test_repo

mkdir test_repo 
cd test_repo 
git init

test_repo是主存储库,开发人员进行修改的地方。我们的CI系统将从此存储库中进行拉取和检查,运行测试。

此外,我们还需要两个额外的存储库用于观察器和测试运行。

git clone /path/to/test_repo test_repo_clone_obs
git clone /path/to/test_repo test_repo_clone_runner

组件

观察者(repo_observer.py)

观察者用于监视存储库,发现新的提交时通知调度程序。

具体来说,观察者将定期轮询存储库,发现更改时,告诉调度程序需要进行测试的最新提交ID。

为了能通知调度程序,我们需要知道调度程序的地址和端口:

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server", 
                        help="Dispatcher host:port, "\
                             "default is localhost:8888",
                             default="localhost:8888",
                             action="store")
    parser.add_argument("repo", metavar="REPO",type=str, 
                        help="The path of Repository to observe")
    return  parser.parse_args()

观察者的主函数时poll()。此函数会执行无限循环,一直检查存储库的更改。

def poll():
    args = parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

    while True:
        # 调用update_repo.sh 脚本
        # 更新仓库,检查是否有新的提交。
        # 如果有新的提交,写入包含最新提交的id到 .commit_id 文件
        try:
            subprocess.check_call(["./update_repo.sh", args.repo]) 
        except subprocess.CalledProcessError as e:
            raise Exception(f"Error updating repository: {e.output}" )

update_repo.sh 文件用于标识任何新的提交。由于笔者不熟悉bash语法,此处略过。原文有对该文件的具体说明。

当发现有新的提交,就通知调度程序:

        # 有新的提交。通知调度(分发测试)。
        if os.path.isfile(".commit_id"):
            try:
                status_response = helpers.communicate(dispatcher_host, int(dispatcher_port), 
                                              "status")
            except socket.error as e:
                raise Exception(f"Error communicating with dispatcher: {e}")
            
            if status_response == 'OK':
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                commit_response = helpers.communicate(dispatcher_host, int(dispatcher_port), 
                                                f"dispatch:{commit}")
                if commit_response != 'OK':
                    raise Exception(f"Could not  dispatcher the test: {commit_response}")
                print("Test dispatched!")
            else:
                raise Exception(f"Dispatcher is not ready: {status_response}")
            
            time.sleep(5)

调度程序(dispatcher.py)

调度程序用于委派测试任务。它侦听来自测试运行程序和观察者的请求。


测试程序可以在此注册自己。

观察者可以提交ID到这里。

当有待测试ID时,调度程序将分配给某个测试程序运行测试。


启动调度程序服务器和另外两个线程。一个线程运行 runner_checker 函数,另一个线程运行函数 redistribute 。

runner_checker 检查注册的测试运行程序的状态,确定它们有响应,否则将它们删除。redistribute 检查待测试的提交pending_commits,并进行分配。

def serve():
    args = parse_args()
    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print(f"serving on {args.host}:{int(args.port)}")  

    def runner_checker(server):
        '''通过ping 检查测试运行程序是否在线'''
        ...

    def redistribute(server):
        '''重新分配测试'''
        ...  

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # serve forever  
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()

dispatch_tests 函数用于从已注册的运行器池中查找可用的测试运行器。如果可用,它将向其发送带有提交 ID 的运行测试消息。否则,等待两秒钟并重试。调度后,它会记录 dispatched_commits 变量中的哪个测试运行程序正在测试哪个提交 ID。如果提交 ID 在pending_commits 中,则会将其删除,因为 dispatch_tests 因为它已成功重新调度。

为了让调度程序服务器处理并发连接,ThreadingTCPServer使用 Mixin类,将线程处理功能添加到默认 SocketServer

class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
  ...

调度程序服务器通过为每个请求定义处理程序来工作。这由DispatcherHandler类定义,该类继承自SocketServer的BaseRequestHandler。这个基类只需要我们定义handle函数,每当请求连接时就会调用它。

class DispatcherHandler(socketserver.BaseRequestHandler):
    """
    调度程序的RequestHandler类。
    根据传入的提交分派测试运行器,并处理它们的请求和测试结果(self.request)。
    """
    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024
    def handle(self):
        self.data = self.request.recv(self.BUF_SIZE).decode('utf-8').strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command".encode("utf-8"))
            return
        command = command_groups.group(1)
        if command == "status":  
            print("in status")
            self.request.sendall("OK".encode("utf-8"))
        ...

测试运行程序(test_runner.py)

测试运行程序负责针对给定的提交 ID 运行测试并报告结果。它仅与调度程序服务器通信,该服务器负责为其提供要运行的提交 ID,并将接收测试结果。

dispatcher_checker 函数每 5 秒对调度程序服务器执行一次 ping 操作,以确保调度仍处于启动和运行状态。

测试运行程序也有一个ThreadingTCPServer ,用来接收调度的消息。

测试运行程序服务器响应来自调度程序的两条消息。第一个是 ping ,调度程序服务器使用它来验证运行器是否仍处于活动状态。

class TestHandler(SocketServer.BaseRequestHandler):
    ...
    def handle(self):
        ....
        if command == "ping":
            print("pinged")
            self.server.last_communication = time.time()
            self.request.sendall("pong")

第二个是 runtest ,它接受 形式的 runtest:<commit ID> 消息,并用于启动对给定提交的测试。调用 runtest 时,测试运行程序将检查它是否已在运行测试,如果是,它将向调度程序返回 BUSY 响应。如果可用,它将通过消息 OK。

        elif command == "runtest":
            print("got runtest command: am I busy? %s" % self.server.busy)
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print("running")
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False

run_tests函数调用 shell 脚本 ,脚本test_runner_script.sh 将存储库更新为给定的提交 ID。脚本返回后,如果成功更新存储库,我们将使用 unittest 运行测试并将结果收集到文件中。测试完成运行后,测试运行程序将读取结果文件,并在结果消息中将其发送到调度程序。

流程图

运行代码

我们可以在本地运行这个简单的 CI 系统,为每个进程使用三个不同的终端 shell。我们首先启动调度程序,在端口 8888 上运行:

python dispatcher.py

在一个新的 shell 中,我们启动测试运行程序:

python test_runner.py <path/to/test_repo_clone_runner>

最后,在另一个新 shell 中,让我们启动 repo 观察器:

python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>

现在一切都设置好了,让我们触发一些测试吧!为此,我们需要进行新的提交。转到主存储库并进行任意更改:

$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file

然后 repo_observer.py 会意识到有一个新的提交并通知调度程序。您可以在它们各自的 shell 中看到输出,以便监视它们。调度程序收到测试结果后,会使用提交 ID 作为文件名,将它们存储在此代码库的 test_results/ 文件夹中。

错误处理

此 CI 系统包括一些简单的错误处理。

如果终止进程 test_runner.py , dispatcher.py 则会发现运行器不再可用,并将其从池中删除。

您还可以终止测试运行程序,以模拟计算机崩溃或网络故障。如果这样做,调度程序将意识到运行器已关闭,并将向另一个测试运行程序(如果池中有一个可用)提供作业,或者将等待新的测试运行程序在池中注册自己。


如果你杀死了调度程序,存储库观察者会发现它已经关闭,并会抛出一个异常。测试运行器也会注意到并关闭。

结论

通过关注点分离,我们构建了分布式CI系统。通过套接字通信,我们能将系统分布在多个机器上,增加了系统的可扩展性。

CI 系统现在非常简单,您可以自己扩展它以使其功能更强大。以下是一些改进建议:

每次提交测试运行:当前系统将定期检查是否运行了新提交,并将运行最近的提交。这应该得到改进,以测试每个提交。为此,您可以修改定期检查程序,以在上次测试和最新提交之间的日志中调度每个提交的测试运行。

更智能的测试运行程序:如果测试运行程序检测到调度程序无响应,它将停止运行。即使测试运行程序正在运行测试,也会发生这种情况!如果测试运行程序等待一段时间(或者无限期,如果您不关心资源管理),以便调度程序重新联机,那就更好了。在这种情况下,如果调度程序在测试运行程序主动运行测试时关闭,则它不会关闭,而是完成测试并等待调度程序重新联机,并将结果报告给它。这将确保我们不会浪费测试运行者所做的任何努力,并且我们每次提交只会运行一次测试。

真实报告:在实际的 CI 系统中,您将将测试结果报告给报告服务,该服务将收集结果,将它们发布到某个地方供人们查看,并在发生故障或其他显着事件时通知相关方列表。您可以通过创建一个新流程来获取报告的结果,而不是由调度程序收集结果来扩展我们的简单 CI 系统。这个新过程可以是一个 Web 服务器(或者可以连接到 Web 服务器),它可以在线发布结果,并且可以使用邮件服务器来提醒订阅者任何测试失败。

测试运行程序管理器:现在,您必须手动启动 test_runner.py 该文件才能启动测试运行程序。相反,您可以创建一个测试运行程序管理器进程,该进程将评估来自调度程序的测试请求的当前负载,并相应地缩放活动测试运行程序的数量。此进程将接收运行测试消息,并将为每个请求启动测试运行程序进程,并在负载减少时终止未使用的进程。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
安全 测试技术 C#
C#.手术麻醉系统源码 手麻系统如何与医院信息系统进行集成?
C#手术麻醉系统源码集成到医院信息系统涉及标准数据接口、患者信息同步、医嘱报告共享、实时监测数据传输及安全权限管理。通过这些步骤,确保信息的准确、及时和安全流动,提升医疗效率和服务质量。系统集成后需测试优化,以保证稳定性。图片展示了集成的不同方面。
25 0
|
1月前
|
存储 传感器 编解码
LabVIEW风力涡轮机的雷电流测量系统中集成高速摄像机
LabVIEW风力涡轮机的雷电流测量系统中集成高速摄像机
20 1
|
5天前
|
人工智能 移动开发 IDE
安利几款与钉钉平台无缝集成打通账号认证的企业文档管理系统
钉钉是很多中小企业都爱用的产品,开通账号就能直接使用了,应用生态非常丰富,尤其是AI技术的应用,走在行业前列。但仍有很多企业对于全面拥抱SaaS服务充满了顾虑,尤其在内部资料的管理这块,即使钉钉在线文档已经提供了非常优秀的协作体验,不少客户仍更偏爱私有部署在局域网里面的企业文档管理系统。那么能将企业内部部署的文档管理系统集成到钉钉平台上面,和钉钉文档并行使用呢?市面上又有哪些企业文档管理系统软件支持与钉钉的集成呢?这也是很多企业客户的疑问。
安利几款与钉钉平台无缝集成打通账号认证的企业文档管理系统
|
7天前
|
消息中间件 监控 Java
Java一分钟之-Spring Integration:企业级集成
【6月更文挑战第11天】Spring Integration是Spring框架的一部分,用于简化企业应用的集成,基于EIP设计,采用消息传递连接不同服务。核心概念包括通道(Channel)、端点(Endpoint)和适配器(Adapter)。常见问题涉及过度设计、消息丢失与重复处理、性能瓶颈。解决策略包括遵循YAGNI原则、使用幂等性和事务管理、优化线程配置。通过添加依赖并创建简单消息处理链,可以开始使用Spring Integration。注意实践中要关注消息可靠性、系统性能,逐步探索高级特性以提升集成解决方案的质量和可维护性。
32 3
Java一分钟之-Spring Integration:企业级集成
|
19天前
|
人工智能 自然语言处理 安全
构建未来:AI驱动的自适应网络安全防御系统提升软件测试效率:自动化与持续集成的实践之路
【5月更文挑战第30天】 在数字化时代,网络安全已成为维护信息完整性、保障用户隐私和企业持续运营的关键。传统的安全防御手段,如防火墙和入侵检测系统,面对日益复杂的网络攻击已显得力不从心。本文提出了一种基于人工智能(AI)技术的自适应网络安全防御系统,该系统能够实时分析网络流量,自动识别潜在威胁,并动态调整防御策略以应对未知攻击。通过深度学习算法和自然语言处理技术的结合,系统不仅能够提高检测速度和准确性,还能自主学习和适应新型攻击模式,从而显著提升网络安全防御的效率和智能化水平。 【5月更文挑战第30天】 在快速迭代的软件开发周期中,传统的手动测试方法已不再适应现代高效交付的要求。本文探讨了如
|
19天前
|
Dart 前端开发 测试技术
移动应用开发的未来:跨平台框架与原生系统的融合深入理解软件测试中的持续集成与持续部署(CI/CD)
【5月更文挑战第30天】 在本文中,我们将深入探讨移动应用开发领域的最新趋势:跨平台开发框架与原生操作系统的融合。随着移动设备成为日常生活的核心,高效、灵活且性能卓越的应用程序需求日益增长。文章分析了当前主流的跨平台工具如React Native和Flutter,并探讨了它们如何与iOS和Android等原生系统相互作用,以及这种融合对开发者、用户和整个移动生态系统意味着什么。我们还将预测未来可能的技术发展,并提出相应的策略建议。
|
21天前
|
监控 测试技术 持续交付
构建高效持续集成系统的策略与实践
【5月更文挑战第28天】 在快速迭代的软件开发过程中,持续集成(CI)系统是确保代码质量和加速交付的关键。本文将探讨构建一个高效、可靠的CI系统的关键策略,并通过实际案例分析如何实现这些策略。我们将讨论自动化测试、容器化部署、监控和日志记录等主题,以及它们如何共同作用以提升开发流程的效率和稳定性。通过实施这些策略,团队可以显著减少集成问题,并缩短从开发到部署的时间。
26 2
|
21天前
|
运维 Kubernetes jenkins
构建高效自动化运维系统:基于容器技术的持续集成与持续部署实践
【5月更文挑战第28天】 在现代软件工程实践中,持续集成(CI)和持续部署(CD)已成为提升开发效率、确保产品质量的关键环节。本文旨在探讨如何利用容器技术构建一套高效、可靠的自动化运维系统,以支持敏捷开发流程和微服务架构。通过对Docker容器及Kubernetes集群管理工具的深入分析,我们提出了一种结合Jenkins实现自动化测试、构建与部署的完整解决方案,并讨论了其在现实业务中的应用效果和面临的挑战。
|
22天前
|
运维 监控 Linux
提升系统稳定性:Linux服务器性能监控与故障排查实践深入理解与实践:持续集成在软件测试中的应用
【5月更文挑战第27天】在互联网服务日益增长的今天,保障Linux服务器的性能和稳定性对于企业运维至关重要。本文将详细探讨Linux服务器性能监控的工具选择、故障排查流程以及优化策略,旨在帮助运维人员快速定位问题并提升系统的整体运行效率。通过实际案例分析,我们将展示如何利用系统资源监控、日志分析和性能调优等手段,有效预防和解决服务器性能瓶颈。
|
22天前
|
敏捷开发 监控 Devops
构建高效持续集成系统的关键要素
【5月更文挑战第27天】 在现代软件开发过程中,持续集成(CI)已成为确保代码质量和加快交付速度的重要实践。本文将探讨构建一个高效持续集成系统的关键要素,涵盖自动化构建、测试、部署以及监控等方面。我们将通过分析各要素的作用和实施策略,提供一套全面的指南,帮助开发团队优化其CI流程,提升软件项目的可靠性和效率。

热门文章

最新文章