前言
本文是一篇笔记,原文作者是@pry0cc(已经50多岁了),本文内容主要是对原文相关内容做的笔记,出于易读性考虑,对部分字句有所删改。
本文所涉及的技术/工具包括: ProjectDiscovery 的工具, shell以及python的flask,MongoDB,Redis,代码见文末
整体思路
利用ProjectDiscovery团队开发的工具搭建一个攻击面监控系统,主要有扫描器、调度器、导入器、队列和API
- 扫描器: 扫描目标并将结果以JSON格式输出到一个目录。
- 导入器:解析并导入扫描器产生的JSON输出文件.
- 队列:由一个Redis列表组成,当新的任务被添加到队列中时,任务将触发扫描脚本。
- API:将任务添加到队列中并开始扫描,同时从MongoDB数据库中查询导入的数据。
整个流程如下图所示:
扫描器
#!/bin/bash echo "Scanning $1" target_id="$1" ppath="$(pwd)" scan_id="$target_id-$(date +%s)" scan_path="$ppath/scans/$scan_id" raw_path="$ppath/rawdata/$target_id/" threads=13 notify="slack" mkdir -p "$scan_path" mkdir -p "$raw_path" cd "$scan_path" cp "$ppath/scope/$1" "$scan_path/scope.txt" echo "$ppath" cat scope.txt | subfinder -json -o subs.json | jq --unbuffered -r '.host' | dnsx -json -o dnsx.json | jq --unbuffered -r '.host' | httpx -json -o http.json | jq --unbuffered -r '.url' | nuclei -o nuclei.json -json -severity low -t ~/nuclei-templates --stats | jq -c --unbuffered 'del(.timestamp) | del(."curl-command")' | anew "$raw_path/nuclei.json" | notify -pc "$ppath/config/notify.yaml" -mf "New vuln found! {{data}}" cat dnsx.json | jq -r ".host" | tlsx -json -o tls.json find "$scan_path" -type f -name "*.json" -exec "$ppath/bin/import.py" {} "$scan_id" "$target_id" \; cat subs.json | jq -r '.host' | anew "$raw_path/hosts.txt" > "$raw_path/hosts.txt.new" notify -bulk -i "$raw_path/hosts.txt.new" -pc "$ppath/config/notify.yaml" -mf "New Hostnames Found! {{data}}" cat http.json | jq -r '.url' | anew "$raw_path/urls.txt" > "$raw_path/urls.txt.new" notify -bulk -i "$raw_path/urls.txt.new" -pc "$ppath/config/notify.yaml" -mf "New URLs found! {{data}}" cat dns.json | jq -r '.host' | anew "$raw_path/resolved.txt" cat dns.json | jq -r '.a?[]?' | anew "$raw_path/ips.txt"
脚本运行的时候需要接受一个参数target_id,下面具体来看看:
echo "Scanning $1" target_id="$1" ppath="$(pwd)" scan_id="$target_id-$(date +%s)" scan_path="$ppath/scans/$scan_id" raw_path="$ppath/rawdata/$target_id/" threads=13 notify="slack" mkdir -p "$scan_path" mkdir -p "$raw_path" cd "$scan_path" cp "$ppath/scope/$1" "$scan_path/scope.txt" echo "$ppath"
上面这部分主要是设置基本变量,包含scan_id和scan_path;同时将范围信息复制到扫描目录中。
后面是扫描部分,主要是运行一些工具,使用import.py脚本(后面会介绍)导入到MongoDB,并使用Anew检查是否有新域名,如果有则会通知。
下面会仔细分析后面的脚本:
cat scope.txt | subfinder -json -o subs.json
获取根域名,利用subfinder
进行子域名暴力破解,将结果以JSON格式输出;
jq --unbuffered -r '.host'
从JSON中提取主机名,unbuffered这个参数是说每个JSON对象都会被输出(使用场景:利用管道将数据输送到jq,并把jq的输出输送到其他地方,这种场景由于数据量比较大,故过程可能会比较慢);
dnsx -json -o dnsx.json
DNSx执行DNS查找并以JSON格式输出
jq --unbuffered -r '.host'
提取被DNSx解析的主机名
httpx -json -o http.json
对已解析的主机名利用HTTPx进行枚举,将发现的网络服务器以JSON形式输出。
jq —unbuffered -r ‘.url’
从上面输出的JSON中提取出url
nuclei -o nuclei.json -json -severity low -t --stats
对上面发现的url,用Nuclei进行扫描(注意:这里最好用自定义的模板,否则会和别人撞车)
jq -c --unbuffered 'del(.timestamp) | del(."curl-command")'
使用jq,将上面输出的数据(nuclei的扫描结果)中的timestamp(时间戳)和curl-command删除,这样做的目的是为了和后面的anew配合过滤掉重复的漏洞
anew "$raw_path/nuclei.json"
将新的记录(结果)追加到nuclei.json中
notify -pc "$ppath/config/notify.yaml" -mf "New vuln found! {{data}}"
提醒我们Nuclei有新发现的漏洞
find "$scan_path" -type f -name "*.json" -exec "$ppath/bin/import.py" {} "$scan_id" "$target_id" \;
在扫描目录中找到所有的JSON文件,并使用import.py脚本导入它们,后面会讲解
cat subs.json | jq -r '.host' | anew "$raw_path/hosts.txt" > "$raw_path/hosts.txt.new" | notify -bulk -i "$raw_path/hosts.txt.new" -pc "$ppath/config/notify.yaml" -mf "New Hostnames Found! {{data}}"
主要逻辑就是使用Anew发现subs.json(这个文件是动态变化的)文件中的新主机,并提醒我们
数据库和导入器/解析器(Importer/Parser)
使用Docker开启mongo和redis服务,主要是要运行MongoDB数据库和一个Redis队列
#!/bin/bash sudo docker run -p 127.0.0.1:27017:27017 --name mongo -d mongo sudo docker run -p 127.0.0.1:6379:6379 --name redis -d redis
下面看看导入器(import.py):
运行这个脚本需要输入三个参数: filename,scan_id,target_id
#!/usr/bin/env python3 import sys import json from pymongo import MongoClient client = MongoClient("mongodb://mongo:27017") db = client.asm filename = sys.argv[1] scan_id = sys.argv[2] collection_name = filename.split('.')[0].split("/")[-1] collection = db[collection_name] target_id = sys.argv[3] scan_meta = {'scan_id':scan_id, 'target_id':target_id} def jsonf_to_lines(filename): parsed_lines = [] with open(filename, 'r') as reader: for line in reader.read().split('\n'): try: parsed = json.loads(line) parsed["scan_id"] = scan_id parsed["target_id"] = target_id parsed_lines.append(parsed) except Exception as err: print("Whoops %s", err) return parsed_lines collection.insert_many(jsonf_to_lines(filename)) if collection_name == "subs": db.scans.insert_one(scan_meta)
使用这个脚本,我们可以导入由前面工具生成的JSON文件,包括子域、httpx输出、dnsx输出、nuclei输出和tlsx输出。
其逻辑为:迭代每个文件中的每一行,解析JSON,然后将其批量导入MongoDB。在本文的后面,将建立一个API,使我们能够从数据库中查询这些信息;
这里导入器的主要作用是将前面工具生成的数据做一个解析,而后导入至MongoDB数据库中。
队列和worker(任务对象)
下面是队列和worker脚本。队列是用Redis的列表实现的。
worker.py
#!/usr/bin/env python3 import redis import time import subprocess import threading r = redis.Redis(host='redis', port=6379, db=0) def scan(target_id): print(target_id) subprocess.call(['sh', 'bin/scanner.sh', target_id]) while True: res = r.rpop('queue') if res != None: x = threading.Thread(target=scan, args=(res,)) x.start() time.sleep(1)
该脚本使用RPOP方法查询名为queue
的Redis队列,RPOP将队列中的最后一个项目删除(队列特点)并返回;
这意味着理论上,我们可以在多个系统上运行多个工作节点 我们可以使用像axiom (https://github.com/pry0cc/axiom)这样的解决方案;
此时,推送到队列中的是target_id这个变量所指代的,就是前边扫描脚本中scan_path那里的,简单来说,就是在scope/
目录中添加一个文件,文件的名称为target_id
这里举个例子,在我们当前的目录中,有几个scope文件,包括AirBnb或AT&T。
这里worker.py的作用是:使得扫描过程可以在多个系统上面运行
调度器
调度器作用是定期运行扫描,可以利用一个调度脚本,如下所示:
#!/usr/bin/env python3 from datetime import datetime import redis from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger r = redis.Redis(host='redis', port=6379, db=0) scheduler = BlockingScheduler() @scheduler.scheduled_job(IntervalTrigger(hours=1)) def queue_job(): print('queuing!') r.rpush('queue','airbnb') scheduler.start()
在这个脚本中,使用apscheduler
python包,可以定义时间间隔来安排使用Redis队列手动添加一个作业到队列中!
这里定义它每小时推送一次(时间可以自己定义)
用API把所有的东西整合起来
注意:这里最重要的阶段!
#!/usr/bin/env python3 from pymongo import MongoClient import redis from flask import Flask from flask import request from flask import jsonify app = Flask(__name__) r = redis.Redis(host='redis', port=6379, db=0) client = MongoClient("mongodb://mongo:27017") db = client.asm @app.route("/api/<target>/<datatype>") def get_subdomains(target, datatype): scan_id = request.args.get("scan_id") query = {'target_id':target} if scan_id != None: query['scan_id'] = scan_id collection = db[datatype] res = collection.find(query) data = [] for row in res: row.pop('_id') data.append(row) return jsonify(data) @app.route("/start_scan") def start_scan(target): req = request.args.get('target') r.rpush('queue', req) data = {"message":"Scan launched!"} return jsonify(data)
这里的脚本主要是用的Flask框架写的;
注意看get_subdomains函数:
主要逻辑为:
将target_id和数据类型转为Mongo查询,使用request.args.get()函数保证这里的结果为一次的扫描而不是所有的结果;
执行查询后,将从数据库返回的结果进行迭代,并删除_id字段(这些字段无意义)
最后,从数据库返回的结果为JSON对象,而后将这些对象返回给用户
使用flask run来运行服务器:
一旦服务器开启,就可以用curl查询,如何开始扫描呢?---->如果想添加一个目标,只需要添加一个文件: scope/<target_id>
。然后使用这个target ID,也可以通过HTTP开始扫描;
curl -s "http://127.0.0.1:5000/start_scan?target=airbnb"
下面将开启扫描,worker也将开始工作,在scanner.sh脚本中设置的导入的JSON文件的名称为url中的路径((本例中为/subs
),
在该目录下生成的任何其他JSON文件现在都会被导入,除此以外,还有以下路由和数据源供我们使用:
/api/<target>/subs /api/<target>/http /api/<target>/dnsx /api/<target>/tls /api/<target>/scans
此外,还可以在任何URL上添加?scan_id=<scan_id>
,并限制结果:
curl -s ”http://127.0.0.1:5000/api/airbnb/subs?scan_id=airbnb-1657556975“
可以使用以下命令来解析数据:
curl -s "http://127.0.0.1:5000/api/army/scans" curl -s "http://127.0.0.1:5000/api/army/subs" | jq -r '.[].host' | anew curl -s "http://127.0.0.1:5000/api/army/subs" | jq -r '.[].source' | sort | uniq -c | sort -rh
curl -s "http://127.0.0.1:5000/api/army/dnsx" | jq -r '.[].a?[]?' | anew curl -s "http://127.0.0.1:5000/api/army/dnsx" | jq -r '.[].a?[]?' | anew | ipinfo bulk curl -s "http://127.0.0.1:5000/api/army/dnsx" | jq -r '.[].a?[]?' | anew | ipinfo summarize curl -s "http://127.0.0.1:5000/api/army/dnsx" | jq -r ".[].cname?[]?" curl -s "http://127.0.0.1:5000/api/army/dnsx?scan_id=army-1657627777" | jq -r '.[].host' curl -s "http://127.0.0.1:5000/api/army/http" | jq -r '.[].url' curl -s "http://127.0.0.1:5000/api/army/http" | jq -r '.[] | [.url,."status-code",.title] | @csv' | csvtomd | batcat
curl -s "http://127.0.0.1:5000/api/army/nuclei" | jq '.[] | [.info.name, .template, .info.severity] | @csv' -r | anew
curl -s "http://127.0.0.1:5000/api/army/tls" | jq -r '.[]."subject-dn"' curl -s "http://127.0.0.1:5000/api/army/tls" | jq -r '.[]."subject-cn"'
结论
这个作者使用这个系统发现了一堆漏洞。效果还是可以的;
当然,我们主要是学习这个作者的思路:
通过API(主模块)开启flask服务之后,通过与用户交互开启多进程队列,开始在多系统中开启扫描, 扫描出的结果被导入器解析进而被导入至MongoDB数据库中,主模块(API)可以获取数据库中查询结果;
此外,还有一些地方值得我们思考,比如:
- 如何写出自定义的nuclei模板
- 扫描过程还可以怎么优化(笔者这里给出几点可以思考的方向,主要还是信息搜集方面)
- 本文作者在扫描过程中采用的是主动信息搜集
- 本文作者有没考虑泛解析等问题
参考
https://github.com/pry0cc/pdiscovery-bot
https://blog.projectdiscovery.io/asm-platform-using-projectdiscovery-tools/
https://github.com/projectdiscovery