记一次线上安全测试中误用父类属性导致数据污染的解决方案

简介: 在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同

在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。


在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。


然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同时进行安全测试时,数据相互污染,从而影响最终测试结果的准确性。


为了解决这个问题,需要重新设计变量 all_open_ports 的存储和访问方式,以确保在多线程环境下数据的独立性和一致性,接下来由博主为各位读者进行仔细讲解。

博文中的所有代码全部收集在博主的 GitHub 仓库中:http://github.com/sid10t/project-actual-code/tree/main

场景复现

先创建一个父类 Parent,定义一个类属性 all_open_ports 用来记录已经开放的端口,并创建一个方法 check_port() 来模拟端口检测,代码如下所示:

class Parent:
    all_open_ports = set()
    def __init__(self, args):
        self.all_open_ports.update(args.get("open_ports", []))
    def check_port(self, port):
        # 忽略端口扫描...
        if port not in self.all_open_ports:
            self.all_open_ports.add(port)
            print(f"{port} in all_open_ports, {self.all_open_ports}")
            pass

再创建一个子类 Child 继承父类,构造 scan() 方法来模拟安全测试过程,代码如下所示:

import threading
from parent import Parent
class Child(Parent):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self):
        print(threading.current_thread().name, self.all_open_ports)
        self.check_port(self.port)
        pass

最后创建一个测试用例,实例化两个 Child 对象,并以多线程的方式运行对象方法 scan() 来进行场景复现,代码如下所示:

def test_thread():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    t1 = threading.Thread(target=c1.scan, name="Child_1")
    t1.start()
    t1.join()
    c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]})
    t2 = threading.Thread(target=c2.scan, name="Child_2")
    t2.start()
    t2.join()
    print("All tasks have finished!")

运行结果:

1719388492564.jpg

根因分析

造成上述问题的根本原因就是在多线程中 all_open_ports 可被当成共享变量使用,致使数据相互污染,从而影响最终测试结果的准确性。


因为 all_open_ports 是在父类中定义的一个类属性,这意味着它是类 Parent 的一部分,它被所有派生类(子类)所共享。通过这种方式,父类的所有子类都可以访问和更新 all_open_ports 属性。


每当子类的实例创建时,如果传递了 open_ports 参数,那么这些端口将被添加到 all_open_ports 集合中,并且在父类中的 check_port 方法中,判断给定端口 port 是否存在于 all_open_ports 集合中,如果不存在,则将端口添加到集合中。这样,所有子类实例都可以共享和更新这个属性。


现在我们修改部分代码,在打印时输出 all_open_ports 的地址来判断是否使用了同一变量,代码如下所示:

def scan(self):
    print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports))
    self.check_port(self.port)
    pass

运行结果:

1719388526729.jpg

那么有什么方法能解决当前的问题呢?

  • 重新初始化 all_open_ports;
  • 上下文管理 contextvar;
  • 线程本地变量 thread.local;

重新初始化 all_open_ports

重新初始化 all_open_ports 的方法是最快捷的,但是会有一个问题,重新初始化 all_open_ports 会使得每个 Child 对象都有自己独立的 all_open_ports 集合,而不会共享相同的集合,这会发生重复检测端口的情况,也就违背了一开始的设计初衷。


创建一个测试用例来观察一下当前的 all_open_ports 集合使用情况,代码如下所示:

def test_init_set():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]})
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c1.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c2.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)

运行结果:

1719388608447.jpg

根据运行结果可以发现, all_open_ports 集合在当前情况下可以被看做是共享变量,哪怕在不同的线程中,个 Child 对象都能共享 all_open_ports 集合。

这时候,修改父类 Parent 中的 __init__ 代码,使得 all_open_ports 集合在 __init__ 时重新初始化,代码如下所示:

def __init__(self, args):
    self.all_open_ports = set()
    self.all_open_ports.update(args.get("open_ports", []))

运行结果:

1719388628617.jpg

根据运行结果可以发现,c1 和 c2 中的 all_open_ports 是完全独立的集合,c1 向 all_open_ports 集合中的增加操作不会影响到 c2,这虽然避免了数据污染,但是会导致在 c1 检测过的端口还需要在 c2 重新进行检测,这与我们一开始设计 all_open_ports 集合来提升效率的想法背道而驰了。

上下文管理 contextvar

先说结论,好像不行,不知道是不是思路有问题,希望各位大神指点一下!

运行结果:

1719388659944.jpg

代码如下所示:

class ParentContext:
    all_open_ports = contextvars.ContextVar("all_open_ports", default=set())
    def __init__(self, args):
        open_ports = self.all_open_ports.get()
        open_ports.update(args.get("open_ports", []))
        self.all_open_ports.set(open_ports)
    def check_port(self, port):
        all_open_ports_ = self.all_open_ports.get()
        if port not in all_open_ports_:
            all_open_ports_.add(port)
            self.all_open_ports.set(all_open_ports_)
            print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")
class ChildContext(ParentContext):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self, port=None):
        self.check_port(port or self.port)
        pass
def test_contextvars(open_ports, port):
    c1 = ChildContext({"port": port, "open_ports": open_ports})
    c1.scan()
if __name__ == '__main__':
    t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

线程本地变量 thread.local

threading.local() 是 Python 标准库中的一个类,它提供了一种在多线程环境下创建线程本地存储的机制。它允许每个线程都有自己独立的变量副本,这些变量在不同线程之间是相互隔离的,不会相互干扰。


当多个线程同时执行时,它们可以访问和修改各自的线程本地变量,而不会影响其他线程的变量。这对于需要在线程之间共享数据,但又需要保持数据独立性的情况非常有用。

接下来,我们创建父类 ParentLocal,并使用 threading.local() 来存储集合 all_open_ports,代码如下所示:

class ParentLocal:
    local = threading.local()
    def __init__(self, args):
        self.local.all_open_ports = getattr(self.local, "all_open_ports", set())
        self.local.all_open_ports.update(args.get("open_ports", []))
    def check_port(self, port):
        if port not in self.local.all_open_ports:
            self.local.all_open_ports.add(port)
            print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}")
    def print_prefix(self):
        return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"

在上述代码中,ParentLocal 类定义了初始化方法 __init__,通过 getattr() 函数来获取 self.local.all_open_ports 的值。如果 self.local.all_open_ports 不存在,则使用 set() 创建一个空的集合,并将其赋值给 self.local.all_open_ports。然后,我们使用 update() 方法将 args.get("open_ports", []) 中的端口添加到 self.local.all_open_ports 中。


通过使用 ParentLocal 类,我们可以在多线程环境中创建多个实例,并且每个实例都有自己独立的 all_open_ports 变量。这样,不同线程的实例之间的数据不会相互干扰。


而 Child 类与之前基本保持不变,代码如下所示:

class ChildLocal(ParentLocal):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")
    def scan(self, port=None):
        self.check_port(port or self.port)
        pass

在上述代码中,ChildLocal 类是继承自 ParentLocal 类的子类,通过继承关系它可以访问父类的 self.local.all_open_ports 集合。这使得 ChildLocal 实例可以在同一线程下共享数据,同时不会受到其他线程中的 ChildLocal 实例的影响。


编写测试代码如下所示:

def tset_local(open_ports, port):
    c1 = ChildLocal({"port": port, "open_ports": open_ports})
    c1.scan()
    args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)}
    print(threading.current_thread().name, args)
    c2 = ChildLocal(args)
    c2.scan()
    time.sleep(3)
    c1.scan(random.randint(8000, 9999))
if __name__ == '__main__':
    t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

运行结果:

1719388703087.jpg

如我们所料,Child1 和 Child2 线程中的 ChildLocal 实例相互之间共享 all_open_ports 集合的数据,但是不同线程之间的 ChildLocal 实例不能相互共享数据。


需要注意的是,threading.local() 对象在不同的线程中具有相同的 id 值,这是因为它们实际上是同一个对象的不同实例。每个线程都有自己独立的 threading.local() 对象,但它们共享相同的类定义。


当在不同的线程中创建 threading.local() 对象时,每个线程都会创建一个新的实例,但这些实例的类定义是相同的。因此,它们的 id 值是相同的。

后记

幸好我们及时发现了这个问题,并没有造成安全事故。现在我们将这次经历分享出来,希望能给其他开发团队带来启发,共同提高系统的安全性和稳定性。

作者:sidiot

链接:https://juejin.cn/post/7384326861964476425

相关文章
|
3月前
|
测试技术 API C#
C#使用Bogus生成测试数据
C#使用Bogus生成测试数据
54 1
|
6天前
|
JavaScript 测试技术 API
常见的 Vue 3 单元测试问题及解决方案
常见的 Vue 3 单元测试问题及解决方案
20 2
|
30天前
|
存储 测试技术 数据库
数据驱动测试和关键词驱动测试的区别
数据驱动测试 数据驱动测试或 DDT 也被称为参数化测试。
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
38 4
|
1月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
50 1
|
1月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
75 1
|
1月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
33 2
|
1月前
|
存储 监控 网络安全
内网渗透测试基础——敏感数据的防护
内网渗透测试基础——敏感数据的防护
|
1月前
|
SQL 关系型数据库 MySQL
SQL批量插入测试数据的几种方法?
SQL批量插入测试数据的几种方法?
89 1
|
1月前
|
存储 SQL 分布式计算
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
35 0