Exploit template enhancement

This commit is contained in:
2025-12-14 12:51:41 +01:00
parent 75b845a74f
commit 91dcd50350
2 changed files with 140 additions and 48 deletions

20
sqli.py
View File

@@ -157,6 +157,25 @@ class ReflectedSQLi(SQLi, ABC):
return rows
@classmethod
def guess_reflected_columns(cls, callback):
data = None
column_count = 1
while data is None:
query_columns = list(map(lambda c: f"'column-{c}-sqli'", range(column_count)))
query_str = cls.build_query(query_columns)
data = callback(query_str) # should return some kind of text for a given query
if not data:
column_count += 1
continue
reflected_columns = []
for c in range(column_count):
column_name = f"'column-{c}-sqli'"
reflected_columns.append(str if column_name in data else None) # how to guess the type (str/int)?
return reflected_columns
# todo: extract_multiple with columns as dict (name -> type), e.g. extract_multiple({"id": int, "name": str})
class BlindSQLi(SQLi, ABC):
@@ -238,7 +257,6 @@ class BlindSQLi(SQLi, ABC):
return cur_str
class PostgreSQLi(SQLi, ABC):
def get_database_version(self, verbose=False):
return self.extract_string("VERSION()", verbose=verbose)

View File

@@ -1,23 +1,40 @@
#!/usr/bin/env python
import re
import sys
import json
import argparse
import urllib.parse
def generate_template(base_url, features):
ip_address = "util.get_address()"
for feature in features:
if feature.lower().startswith("ip_address="):
ip_address = "'" + feature.split("=")[1] + "'"
# we could all need that
imports = [
"os", "io", "re", "sys",
"json", "time", "base64", "requests",
"subprocess", "urllib.parse"
]
partial_imports = {
"bs4": ["BeautifulSoup"],
"hackingscripts": ["util", "rev_shell"],
"urllib3.exceptions": ["InsecureRequestWarning"]
}
main_code = []
methods = []
ip_address_arg = next(filter(lambda f: re.match(r"ip_address=(.*)", f), features), None)
ip_address = "util.get_address()" if not ip_address_arg else "'" + ip_address_arg[1] + "'"
variables = {
"IP_ADDRESS": ip_address,
"BASE_URL": f'"{base_url}" if "LOCAL" not in sys.argv else "http://127.0.0.1:1337"',
"PROXIES": json.dumps({"http":"http://127.0.0.1:8080", "https":"http://127.0.0.1:8080"})
"BASE_URL": f'"{base_url}" if "LOCAL" not in sys.argv else "http://127.0.0.1:1337"'
}
if "proxies" in features or "burp" in features:
proxy_arg = next(filter(lambda f: re.match(r"proxy=(.*)", f), features), None)
if proxy_arg or "burp" in features:
proxy_url = "http://127.0.0.1:8080" if not proxy_arg else proxy_arg[1]
variables["PROXIES"] = json.dumps({"http": proxy_url, "https": proxy_url})
proxy = """
if \"proxies\" not in kwargs:
kwargs[\"proxies\"] = PROXIES
@@ -34,8 +51,8 @@ def generate_template(base_url, features):
else:
vhost_param = ""
full_url = "BASE_URL + uri"
request_method = f"""def request(method, uri{vhost_param}, **kwargs):
methods.insert(0, f"""def request(method, uri{vhost_param}, **kwargs):
if not uri.startswith("/") and uri != "":
uri = "/" + uri
@@ -52,25 +69,12 @@ def generate_template(base_url, features):
{proxy}
url = {full_url}
return client.request(method, url, **kwargs)
"""
methods = [request_method]
if "login" in features or "account" in features:
variables["USERNAME"] = '"Blindhero"'
variables["PASSWORD"] = '"test1234"'
methods.append("""
def login(username, password):
session = requests.Session()
res = request("POST", "/login", data={"username": username, "password": password}, session=session)
if res.status_code != 200:
print("[-] Error logging in")
exit()
return session
""")
if "register" in features or "account" in features:
main_code.append("""if not register(USERNAME, PASSWORD):
exit(1)
""")
variables["USERNAME"] = '"Blindhero"'
variables["PASSWORD"] = '"test1234"'
methods.append("""
@@ -83,11 +87,71 @@ def register(username, password):
return True
""")
main = """
if "login" in features or "account" in features:
main_code.append("""session = login(USERNAME, PASSWORD)
if not session:
exit(1)
""")
variables["USERNAME"] = '"username"'
variables["PASSWORD"] = '"password"'
methods.append("""
def login(username, password):
session = requests.Session()
res = request("POST", "/login", data={"username": username, "password": password}, session=session)
if res.status_code != 200:
print("[-] Error logging in")
exit()
return session
""")
if "sqli" in features:
partial_imports["hackingscripts.sqli"] = ["MySQLi", "PostgreSQLi", "BlindSQLi", "ReflectedSQLi"]
methods.append("""
class ReflectedSQLiPoC(MySQLi, ReflectedSQLi):
def __init__(self):
# TODO: specify reflected columns with their types
super().__init__([None, str, int])
def reflected_sqli(self, columns: list, table=None, condition=None, offset=None, verbose=False):
# TODO: build query and extract columns from response
return None
""")
methods.append("""
class BlindSQLiPoC(MySQLi, BlindSQLi):
def blind_sqli(self, condition: str, verbose=False) -> bool:
# TODO: build query and evaluate condition
return False
""")
main_code.append("""poc = ReflectedSQLiPoC()
print(poc.get_current_user())
""")
if "http-server" in features or "file-server" in features:
partial_imports["hackingscripts.fileserver"] = ["HttpFileServer"]
main_code.append("""file_server = HttpFileServer("0.0.0.0", 3000)
file_server.enableLogging()
file_server.addRoute("/dynamic", on_request)
file_server.addFile("/static", b"static-content")
file_server.startBackground()
""")
methods.append("""
def on_request(req):
# TODO: auto generated method stub
return 200, b"", { "X-Custom-Header": "1" }
""")
if len(main_code) == 0:
main_code = ["pass"]
main = f"""
if __name__ == "__main__":
pass
{'\n '.join(main_code)}
"""
imports = "\n".join(f"import {i}" for i in sorted(imports, key=len))
imports += "\n" + "\n".join(sorted(list(f"from {p} import {', '.join(i)}" for p, i in partial_imports.items()), key=len))
variables = "\n".join(f"{k} = {v}" for k, v in variables.items())
header = f"""#!/usr/bin/env python
@@ -96,21 +160,7 @@ if __name__ == "__main__":
# For more information, visit: https://git.romanh.de/Roman/HackingScripts
#
import os
import io
import re
import sys
import json
import time
import base64
import requests
import subprocess
import urllib.parse
from bs4 import BeautifulSoup
from hackingscripts import util, rev_shell
from hackingscripts.fileserver import HttpFileServer
from hackingscripts.sqli import MySQLi, PostgreSQLi, BlindSQLi, ReflectedSQLi
from urllib3.exceptions import InsecureRequestWarning
{imports}
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
{variables}
@@ -121,14 +171,38 @@ requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: %s <URL> [features]" % sys.argv[0])
exit()
parser = argparse.ArgumentParser(
description="Exploit Template for web attacks",
formatter_class=argparse.RawTextHelpFormatter
)
url = sys.argv[1]
available_features = [
"ip_address=[...]: Local IP-Address for reverse connections",
"burp|proxy=[...]: Tunnel traffic through a given proxy or Burp defaults",
"subdomain|vhost: Allow to specify a subdomain for outgoing requests",
"register|account: Generate an account registration method stub",
"login|account: Generate an account login method stub",
"sqli: Generate an template SQL-Injection class",
"http-server|file-server: Generate code for starting an in-memory http server"
]
parser.add_argument("url", type=str, help="Target URL")
parser.add_argument(
"-f",
"--features",
nargs="*",
type=str,
default=[],
help="Optional list of features:\n- " + "\n- ".join(available_features)
)
args = parser.parse_args()
url = args.url
if "://" not in url:
url = "http://" + url
features = [] if len(sys.argv) < 3 else sys.argv[2].split(",")
features = args.features
template = generate_template(url, features)
print(template)