在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。
在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。
然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同时进行安全测试时,数据相互污染,从而影响最终测试结果的准确性。
为了解决这个问题,需要重新设计变量 all_open_ports 的存储和访问方式,以确保在多线程环境下数据的独立性和一致性,接下来由博主为各位读者进行仔细讲解。
博文中的所有代码全部收集在博主的 GitHub 仓库中:http://github.com/sid10t/project-actual-code/tree/main
场景复现
先创建一个父类 Parent,定义一个类属性 all_open_ports 用来记录已经开放的端口,并创建一个方法 check_port() 来模拟端口检测,代码如下所示:
class Parent: all_open_ports = set() def __init__(self, args): self.all_open_ports.update(args.get("open_ports", [])) def check_port(self, port): # 忽略端口扫描... if port not in self.all_open_ports: self.all_open_ports.add(port) print(f"{port} in all_open_ports, {self.all_open_ports}") pass
再创建一个子类 Child 继承父类,构造 scan() 方法来模拟安全测试过程,代码如下所示:
import threading from parent import Parent class Child(Parent): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self): print(threading.current_thread().name, self.all_open_ports) self.check_port(self.port) pass
最后创建一个测试用例,实例化两个 Child 对象,并以多线程的方式运行对象方法 scan() 来进行场景复现,代码如下所示:
def test_thread(): c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]}) t1 = threading.Thread(target=c1.scan, name="Child_1") t1.start() t1.join() c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]}) t2 = threading.Thread(target=c2.scan, name="Child_2") t2.start() t2.join() print("All tasks have finished!")
运行结果:
根因分析
造成上述问题的根本原因就是在多线程中 all_open_ports 可被当成共享变量使用,致使数据相互污染,从而影响最终测试结果的准确性。
因为 all_open_ports 是在父类中定义的一个类属性,这意味着它是类 Parent 的一部分,它被所有派生类(子类)所共享。通过这种方式,父类的所有子类都可以访问和更新 all_open_ports 属性。
每当子类的实例创建时,如果传递了 open_ports 参数,那么这些端口将被添加到 all_open_ports 集合中,并且在父类中的 check_port 方法中,判断给定端口 port 是否存在于 all_open_ports 集合中,如果不存在,则将端口添加到集合中。这样,所有子类实例都可以共享和更新这个属性。
现在我们修改部分代码,在打印时输出 all_open_ports 的地址来判断是否使用了同一变量,代码如下所示:
def scan(self): print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports)) self.check_port(self.port) pass
运行结果:
那么有什么方法能解决当前的问题呢?
- 重新初始化 all_open_ports;
- 上下文管理 contextvar;
- 线程本地变量 thread.local;
重新初始化 all_open_ports
重新初始化 all_open_ports 的方法是最快捷的,但是会有一个问题,重新初始化 all_open_ports 会使得每个 Child 对象都有自己独立的 all_open_ports 集合,而不会共享相同的集合,这会发生重复检测端口的情况,也就违背了一开始的设计初衷。
创建一个测试用例来观察一下当前的 all_open_ports 集合使用情况,代码如下所示:
def test_init_set(): c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]}) c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]}) print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports) c1.scan() print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports) c2.scan() print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
运行结果:
根据运行结果可以发现, all_open_ports 集合在当前情况下可以被看做是共享变量,哪怕在不同的线程中,个 Child 对象都能共享 all_open_ports 集合。
这时候,修改父类 Parent 中的 __init__ 代码,使得 all_open_ports 集合在 __init__ 时重新初始化,代码如下所示:
def __init__(self, args): self.all_open_ports = set() self.all_open_ports.update(args.get("open_ports", []))
运行结果:
根据运行结果可以发现,c1 和 c2 中的 all_open_ports 是完全独立的集合,c1 向 all_open_ports 集合中的增加操作不会影响到 c2,这虽然避免了数据污染,但是会导致在 c1 检测过的端口还需要在 c2 重新进行检测,这与我们一开始设计 all_open_ports 集合来提升效率的想法背道而驰了。
上下文管理 contextvar
先说结论,好像不行,不知道是不是思路有问题,希望各位大神指点一下!
运行结果:
代码如下所示:
class ParentContext: all_open_ports = contextvars.ContextVar("all_open_ports", default=set()) def __init__(self, args): open_ports = self.all_open_ports.get() open_ports.update(args.get("open_ports", [])) self.all_open_ports.set(open_ports) def check_port(self, port): all_open_ports_ = self.all_open_ports.get() if port not in all_open_ports_: all_open_ports_.add(port) self.all_open_ports.set(all_open_ports_) print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}") class ChildContext(ParentContext): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self, port=None): self.check_port(port or self.port) pass def test_contextvars(open_ports, port): c1 = ChildContext({"port": port, "open_ports": open_ports}) c1.scan() if __name__ == '__main__': t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,)) t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,)) t1.start() t2.start() t1.join() t2.join()
线程本地变量 thread.local
threading.local() 是 Python 标准库中的一个类,它提供了一种在多线程环境下创建线程本地存储的机制。它允许每个线程都有自己独立的变量副本,这些变量在不同线程之间是相互隔离的,不会相互干扰。
当多个线程同时执行时,它们可以访问和修改各自的线程本地变量,而不会影响其他线程的变量。这对于需要在线程之间共享数据,但又需要保持数据独立性的情况非常有用。
接下来,我们创建父类 ParentLocal,并使用 threading.local() 来存储集合 all_open_ports,代码如下所示:
class ParentLocal: local = threading.local() def __init__(self, args): self.local.all_open_ports = getattr(self.local, "all_open_ports", set()) self.local.all_open_ports.update(args.get("open_ports", [])) def check_port(self, port): if port not in self.local.all_open_ports: self.local.all_open_ports.add(port) print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}") def print_prefix(self): return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"
在上述代码中,ParentLocal 类定义了初始化方法 __init__,通过 getattr() 函数来获取 self.local.all_open_ports 的值。如果 self.local.all_open_ports 不存在,则使用 set() 创建一个空的集合,并将其赋值给 self.local.all_open_ports。然后,我们使用 update() 方法将 args.get("open_ports", []) 中的端口添加到 self.local.all_open_ports 中。
通过使用 ParentLocal 类,我们可以在多线程环境中创建多个实例,并且每个实例都有自己独立的 all_open_ports 变量。这样,不同线程的实例之间的数据不会相互干扰。
而 Child 类与之前基本保持不变,代码如下所示:
class ChildLocal(ParentLocal): def __init__(self, args): super().__init__(args) self.port = args.get("port") def scan(self, port=None): self.check_port(port or self.port) pass
在上述代码中,ChildLocal 类是继承自 ParentLocal 类的子类,通过继承关系它可以访问父类的 self.local.all_open_ports 集合。这使得 ChildLocal 实例可以在同一线程下共享数据,同时不会受到其他线程中的 ChildLocal 实例的影响。
编写测试代码如下所示:
def tset_local(open_ports, port): c1 = ChildLocal({"port": port, "open_ports": open_ports}) c1.scan() args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)} print(threading.current_thread().name, args) c2 = ChildLocal(args) c2.scan() time.sleep(3) c1.scan(random.randint(8000, 9999)) if __name__ == '__main__': t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,)) t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,)) t1.start() t2.start() t1.join() t2.join()
运行结果:
如我们所料,Child1 和 Child2 线程中的 ChildLocal 实例相互之间共享 all_open_ports 集合的数据,但是不同线程之间的 ChildLocal 实例不能相互共享数据。
需要注意的是,threading.local() 对象在不同的线程中具有相同的 id 值,这是因为它们实际上是同一个对象的不同实例。每个线程都有自己独立的 threading.local() 对象,但它们共享相同的类定义。
当在不同的线程中创建 threading.local() 对象时,每个线程都会创建一个新的实例,但这些实例的类定义是相同的。因此,它们的 id 值是相同的。
后记
幸好我们及时发现了这个问题,并没有造成安全事故。现在我们将这次经历分享出来,希望能给其他开发团队带来启发,共同提高系统的安全性和稳定性。
作者:sidiot