记一次线上安全测试中误用父类属性导致数据污染的解决方案

简介: 在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同

在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。


在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。


然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同时进行安全测试时,数据相互污染,从而影响最终测试结果的准确性。


为了解决这个问题,需要重新设计变量 all_open_ports 的存储和访问方式,以确保在多线程环境下数据的独立性和一致性,接下来由博主为各位读者进行仔细讲解。

博文中的所有代码全部收集在博主的 GitHub 仓库中:http://github.com/sid10t/project-actual-code/tree/main

场景复现

先创建一个父类 Parent,定义一个类属性 all_open_ports 用来记录已经开放的端口,并创建一个方法 check_port() 来模拟端口检测,代码如下所示:

class Parent:
    all_open_ports = set()
    def __init__(self, args):
        self.all_open_ports.update(args.get("open_ports", []))
    def check_port(self, port):
        # 忽略端口扫描...
        if port not in self.all_open_ports:
            self.all_open_ports.add(port)
            print(f"{port} in all_open_ports, {self.all_open_ports}")
            pass

再创建一个子类 Child 继承父类,构造 scan() 方法来模拟安全测试过程,代码如下所示:

import threading
from parent import Parent
class Child(Parent):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self):
        print(threading.current_thread().name, self.all_open_ports)
        self.check_port(self.port)
        pass

最后创建一个测试用例,实例化两个 Child 对象,并以多线程的方式运行对象方法 scan() 来进行场景复现,代码如下所示:

def test_thread():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    t1 = threading.Thread(target=c1.scan, name="Child_1")
    t1.start()
    t1.join()
    c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]})
    t2 = threading.Thread(target=c2.scan, name="Child_2")
    t2.start()
    t2.join()
    print("All tasks have finished!")

运行结果:

1719388492564.jpg

根因分析

造成上述问题的根本原因就是在多线程中 all_open_ports 可被当成共享变量使用,致使数据相互污染,从而影响最终测试结果的准确性。


因为 all_open_ports 是在父类中定义的一个类属性,这意味着它是类 Parent 的一部分,它被所有派生类(子类)所共享。通过这种方式,父类的所有子类都可以访问和更新 all_open_ports 属性。


每当子类的实例创建时,如果传递了 open_ports 参数,那么这些端口将被添加到 all_open_ports 集合中,并且在父类中的 check_port 方法中,判断给定端口 port 是否存在于 all_open_ports 集合中,如果不存在,则将端口添加到集合中。这样,所有子类实例都可以共享和更新这个属性。


现在我们修改部分代码,在打印时输出 all_open_ports 的地址来判断是否使用了同一变量,代码如下所示:

def scan(self):
    print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports))
    self.check_port(self.port)
    pass

运行结果:

1719388526729.jpg

那么有什么方法能解决当前的问题呢?

  • 重新初始化 all_open_ports;
  • 上下文管理 contextvar;
  • 线程本地变量 thread.local;

重新初始化 all_open_ports

重新初始化 all_open_ports 的方法是最快捷的,但是会有一个问题,重新初始化 all_open_ports 会使得每个 Child 对象都有自己独立的 all_open_ports 集合,而不会共享相同的集合,这会发生重复检测端口的情况,也就违背了一开始的设计初衷。


创建一个测试用例来观察一下当前的 all_open_ports 集合使用情况,代码如下所示:

def test_init_set():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]})
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c1.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c2.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)

运行结果:

1719388608447.jpg

根据运行结果可以发现, all_open_ports 集合在当前情况下可以被看做是共享变量,哪怕在不同的线程中,个 Child 对象都能共享 all_open_ports 集合。

这时候,修改父类 Parent 中的 __init__ 代码,使得 all_open_ports 集合在 __init__ 时重新初始化,代码如下所示:

def __init__(self, args):
    self.all_open_ports = set()
    self.all_open_ports.update(args.get("open_ports", []))

运行结果:

1719388628617.jpg

根据运行结果可以发现,c1 和 c2 中的 all_open_ports 是完全独立的集合,c1 向 all_open_ports 集合中的增加操作不会影响到 c2,这虽然避免了数据污染,但是会导致在 c1 检测过的端口还需要在 c2 重新进行检测,这与我们一开始设计 all_open_ports 集合来提升效率的想法背道而驰了。

上下文管理 contextvar

先说结论,好像不行,不知道是不是思路有问题,希望各位大神指点一下!

运行结果:

1719388659944.jpg

代码如下所示:

class ParentContext:
    all_open_ports = contextvars.ContextVar("all_open_ports", default=set())
    def __init__(self, args):
        open_ports = self.all_open_ports.get()
        open_ports.update(args.get("open_ports", []))
        self.all_open_ports.set(open_ports)
    def check_port(self, port):
        all_open_ports_ = self.all_open_ports.get()
        if port not in all_open_ports_:
            all_open_ports_.add(port)
            self.all_open_ports.set(all_open_ports_)
            print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")
class ChildContext(ParentContext):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self, port=None):
        self.check_port(port or self.port)
        pass
def test_contextvars(open_ports, port):
    c1 = ChildContext({"port": port, "open_ports": open_ports})
    c1.scan()
if __name__ == '__main__':
    t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

线程本地变量 thread.local

threading.local() 是 Python 标准库中的一个类,它提供了一种在多线程环境下创建线程本地存储的机制。它允许每个线程都有自己独立的变量副本,这些变量在不同线程之间是相互隔离的,不会相互干扰。


当多个线程同时执行时,它们可以访问和修改各自的线程本地变量,而不会影响其他线程的变量。这对于需要在线程之间共享数据,但又需要保持数据独立性的情况非常有用。

接下来,我们创建父类 ParentLocal,并使用 threading.local() 来存储集合 all_open_ports,代码如下所示:

class ParentLocal:
    local = threading.local()
    def __init__(self, args):
        self.local.all_open_ports = getattr(self.local, "all_open_ports", set())
        self.local.all_open_ports.update(args.get("open_ports", []))
    def check_port(self, port):
        if port not in self.local.all_open_ports:
            self.local.all_open_ports.add(port)
            print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}")
    def print_prefix(self):
        return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"

在上述代码中,ParentLocal 类定义了初始化方法 __init__,通过 getattr() 函数来获取 self.local.all_open_ports 的值。如果 self.local.all_open_ports 不存在,则使用 set() 创建一个空的集合,并将其赋值给 self.local.all_open_ports。然后,我们使用 update() 方法将 args.get("open_ports", []) 中的端口添加到 self.local.all_open_ports 中。


通过使用 ParentLocal 类,我们可以在多线程环境中创建多个实例,并且每个实例都有自己独立的 all_open_ports 变量。这样,不同线程的实例之间的数据不会相互干扰。


而 Child 类与之前基本保持不变,代码如下所示:

class ChildLocal(ParentLocal):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self, port=None):
        self.check_port(port or self.port)
        pass

在上述代码中,ChildLocal 类是继承自 ParentLocal 类的子类,通过继承关系它可以访问父类的 self.local.all_open_ports 集合。这使得 ChildLocal 实例可以在同一线程下共享数据,同时不会受到其他线程中的 ChildLocal 实例的影响。


编写测试代码如下所示:

def tset_local(open_ports, port):
    c1 = ChildLocal({"port": port, "open_ports": open_ports})
    c1.scan()
    args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)}
    print(threading.current_thread().name, args)
    c2 = ChildLocal(args)
    c2.scan()
    time.sleep(3)
    c1.scan(random.randint(8000, 9999))
if __name__ == '__main__':
    t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

运行结果:

1719388703087.jpg

如我们所料,Child1 和 Child2 线程中的 ChildLocal 实例相互之间共享 all_open_ports 集合的数据,但是不同线程之间的 ChildLocal 实例不能相互共享数据。


需要注意的是,threading.local() 对象在不同的线程中具有相同的 id 值,这是因为它们实际上是同一个对象的不同实例。每个线程都有自己独立的 threading.local() 对象,但它们共享相同的类定义。


当在不同的线程中创建 threading.local() 对象时,每个线程都会创建一个新的实例,但这些实例的类定义是相同的。因此,它们的 id 值是相同的。

后记

幸好我们及时发现了这个问题,并没有造成安全事故。现在我们将这次经历分享出来,希望能给其他开发团队带来启发,共同提高系统的安全性和稳定性。

作者:sidiot

链接:https://juejin.cn/post/7384326861964476425

相关文章
|
16天前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
13天前
|
数据挖掘 测试技术 项目管理
2025年测试用例管理看这一篇就够了 ----Codes 开源免费、全面的测试管理解决方案
Codes 是国内首款重新定义 SaaS 模式的开源项目管理平台,支持云端认证、本地部署、全部功能开放,并且对 30 人以下团队免费。它通过整合迭代、看板、度量和自动化等功能,简化测试协同工作,使敏捷测试更易于实施。并提供低成本的敏捷测试解决方案,如同步在线离线测试用例、流程化管理缺陷、低代码接口自动化测试和 CI/CD,以及基于迭代的测试管理和测试用时的成本计算等,践行敏捷测试。
2025年测试用例管理看这一篇就够了 ----Codes 开源免费、全面的测试管理解决方案
|
22天前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
34 11
|
23天前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
49 10
|
2月前
|
机器学习/深度学习 算法 UED
在数据驱动时代,A/B 测试成为评估机器学习项目不同方案效果的重要方法
在数据驱动时代,A/B 测试成为评估机器学习项目不同方案效果的重要方法。本文介绍 A/B 测试的基本概念、步骤及其在模型评估、算法改进、特征选择和用户体验优化中的应用,同时提供 Python 实现示例,强调其在确保项目性能和用户体验方面的关键作用。
50 6
|
2月前
|
机器学习/深度学习 算法 UED
在数据驱动时代,A/B 测试成为评估机器学习项目效果的重要手段
在数据驱动时代,A/B 测试成为评估机器学习项目效果的重要手段。本文介绍了 A/B 测试的基本概念、步骤及其在模型评估、算法改进、特征选择和用户体验优化中的应用,强调了样本量、随机性和时间因素的重要性,并展示了 Python 在 A/B 测试中的具体应用实例。
40 1
|
3月前
|
存储 测试技术 数据库
数据驱动测试和关键词驱动测试的区别
数据驱动测试 数据驱动测试或 DDT 也被称为参数化测试。
49 1
|
3月前
|
机器学习/深度学习 监控 计算机视觉
目标检测实战(八): 使用YOLOv7完成对图像的目标检测任务(从数据准备到训练测试部署的完整流程)
本文介绍了如何使用YOLOv7进行目标检测,包括环境搭建、数据集准备、模型训练、验证、测试以及常见错误的解决方法。YOLOv7以其高效性能和准确率在目标检测领域受到关注,适用于自动驾驶、安防监控等场景。文中提供了源码和论文链接,以及详细的步骤说明,适合深度学习实践者参考。
810 0
目标检测实战(八): 使用YOLOv7完成对图像的目标检测任务(从数据准备到训练测试部署的完整流程)
|
3月前
|
机器学习/深度学习 并行计算 数据可视化
目标分类笔记(二): 利用PaddleClas的框架来完成多标签分类任务(从数据准备到训练测试部署的完整流程)
这篇文章介绍了如何使用PaddleClas框架完成多标签分类任务,包括数据准备、环境搭建、模型训练、预测、评估等完整流程。
223 0
|
3月前
|
机器学习/深度学习 数据采集 算法
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
这篇博客文章介绍了如何使用包含多个网络和多种训练策略的框架来完成多目标分类任务,涵盖了从数据准备到训练、测试和部署的完整流程,并提供了相关代码和配置文件。
86 0
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)

热门文章

最新文章