HackingScripts/pcap_file_extract.py
2023-12-09 09:37:28 -05:00

363 lines
13 KiB
Python
Executable File

#!/bin/python
import argparse
import os
import re
from abc import ABC, abstractmethod
from scapy.all import *
from hackingscripts import util
from collections import OrderedDict
class HttpPacket(ABC):
def __init__(self, sock_src, version):
self.version = version
self.headers = util.CaseInsensitiveDict()
self.payload = None
self.socket = sock_src
@staticmethod
def parse(sock_src, data):
index = data.index(b"\r\n")
first_line = data[0:index+2].decode()
matches_req = re.match(HttpRequest.PATTERN.decode(), first_line)
matches_res = re.match(HttpResponse.PATTERN.decode(), first_line)
if matches_req:
http_packet = HttpRequest(sock_src, *matches_req.groups())
elif matches_res:
http_packet = HttpResponse(sock_src, *matches_res.groups())
else:
return None
header_end = data.index(b"\r\n\r\n")
header_buffer = data[index+2:header_end+2].decode()
http_packet.payload = data[header_end+4:]
for line in re.findall("([^:]+):\s?(.*)\r\n", header_buffer):
http_packet.headers[line[0]] = line[1]
return http_packet
@abstractmethod
def get_file_path(self):
pass
class HttpRequest(HttpPacket):
PATTERN = b"([A-Z]+) ([^ ]+) HTTP/([0-9.]+)\r\n"
def __init__(self, socket, method, uri, version):
super().__init__(socket, version)
self.method = method
self.uri = uri
def __repr__(self):
return f"{self.method} {self.uri} HTTP/{self.version}, payload=" + util.human_readable_size(len(self.payload))
def get_file_path(self):
return self.uri
class HttpResponse(HttpPacket):
PATTERN = b"HTTP/([0-9.]+) ([0-9]+) (.*)\r\n"
def __init__(self, socket, version, status_code, status_text):
super().__init__(socket, version)
self.status_code = int(status_code)
self.status_text = status_text
self.response_to = None
def get_file_path(self):
content_disposition = self.headers.get("Content-Disposition", None)
if content_disposition:
matches = re.findall(";\s*filename=\"?(.*)\"?(;|$)", content_disposition)
if matches:
return matches[0][0]
if self.response_to:
return self.response_to.get_file_path()
return None
def __repr__(self):
return f"HTTP/{self.version} {self.status_code} {self.status_text}, payload=" + util.human_readable_size(len(self.payload))
class PacketIterator:
def __init__(self, connection):
self.connection = connection
self.index = 0
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.has_more():
packet = self.connection.packets[self.index]
self.index += 1
return packet
else:
raise StopIteration
def peek(self):
return None if not self.has_more() else self.connection.packets[self.index]
def pop(self):
packet = self.peek()
if packet:
self.index += 1
return packet
def find_packet(self, pattern, sock_src=None):
for packet in self.connection.packets[self.index:]:
self.index += 1
tcp_packet = packet[TCP]
ip_hdr = packet[IP]
packet_src = f"{ip_hdr.src}:{tcp_packet.sport}"
if sock_src is not None and packet_src != sock_src:
continue
payload = bytes(tcp_packet.payload)
match = re.findall(pattern, payload)
if match:
return packet, match[0], packet_src
return None
def has_more(self):
return self.index < len(self.connection.packets)
class TcpConnection:
def __init__(self, sock_a, sock_b):
self.sock_a = sock_a
self.sock_b = sock_b
self.packets = []
self._payload_size = 0
def add_packet(self, packet):
self.packets.append(packet)
self._payload_size += len(packet[TCP].payload)
def get_key(self):
return TcpConnections._format_key(self.sock_a, self.sock_b)
def iterator(self):
return PacketIterator(self)
def get_other_sock(self, sock):
return self.sock_a if sock == self.sock_b else self.sock_b
def __repr__(self):
return f"{self.get_key()}: {len(self.packets)} packets, {util.human_readable_size(self._payload_size)}"
class TcpConnections:
def __init__(self):
self.connections = OrderedDict()
def __contains__(self, item: TcpConnection):
return str(item) in self.connections
def add(self, element: TcpConnection):
self.connections[str(element)] = element
def __getitem__(self, item: TcpConnection):
return self.connections[str(item)]
def __iter__(self):
return iter(self.connections.values())
@staticmethod
def _format_key(sock_a, sock_b):
return f"{sock_a}<->{sock_b}" if sock_a < sock_b else f"{sock_b}<->{sock_a}"
def get_connection(self, sock_a, sock_b):
key = self._format_key(sock_a, sock_b)
return self.connections[key]
def add_packet(self, sock_src, sock_dst, packet):
key = self._format_key(sock_src, sock_dst)
if key not in self.connections:
self.connections[key] = TcpConnection(sock_src, sock_dst)
self.connections[key].add_packet(packet)
return self.connections[key]
class PcapExtractor:
def __init__(self, pcap_path, output_dir="extracted_files/", filters=None):
self.pcap_path = pcap_path
self.output_dir = output_dir
self.filters = filters if filters is not None else []
self._packets = None
def _open_file(self):
self._packets = rdpcap(self.pcap_path)
def extract_all(self):
self._open_file()
http_packets = self._parse_http_packets()
filtered_packets = self._apply_filters(http_packets)
for packet in filtered_packets:
if len(packet.payload) > 0:
file_path = packet.get_file_path()
with open(os.path.join(self.output_dir, file_path.replace("/", "_")), "wb") as f:
f.write(packet.payload)
print(f"[+] Extracted: {file_path} {util.human_readable_size(len(packet.payload))} Bytes")
def __iter__(self):
self._open_file()
http_packets = self._parse_http_packets()
self.iter_filtered_packets = self._apply_filters(http_packets)
return iter(self.iter_filtered_packets)
def __next__(self):
return next(self.iter_filtered_packets)
def _apply_filters(self, packets):
filtered_packets = packets
for f in self.filters:
filtered_packets = filter(f, filtered_packets)
return list(filtered_packets)
def list(self):
self._open_file()
http_packets = self._parse_http_packets()
filtered_packets = self._apply_filters(http_packets)
for packet in filtered_packets:
print(packet)
def get_http_packet(self, packet_iterator, sock_src, initial_packet):
http_buffer = raw(initial_packet[TCP].payload)
prev_seq = initial_packet[TCP].seq
buff = None
while packet_iterator.has_more():
next_packet = packet_iterator.peek()
if sock_src == f"{next_packet[IP].src}:{next_packet[TCP].sport}":
next_packet = packet_iterator.pop()
if buff is not None:
# if there is a buffered package, and the seq. number was not reused
if buff[0] != next_packet[TCP].seq:
# append this to output
http_buffer += buff[1]
buff = None
payload_len = len(next_packet[TCP].payload)
if next_packet[TCP].seq - prev_seq != payload_len and payload_len == 1:
buff = (next_packet[TCP].seq, raw(next_packet[TCP].payload))
# potential TCP ZeroWindowProbe
continue
# TODO: instead of assertions, we should make sure, the seq. is ascending
assert next_packet[TCP].seq > prev_seq
assert next_packet[IP].frag == 0
http_buffer += raw(next_packet[TCP].payload)
prev_seq = next_packet[TCP].seq
else:
break
return HttpPacket.parse(sock_src, http_buffer)
def _parse_http_packets(self):
connections = TcpConnections()
for packet in self._packets:
if TCP not in packet:
continue
ip_hdr = packet[IP]
tcp_packet = packet[TCP]
if len(tcp_packet.payload) == 0:
continue
sock_src = f"{ip_hdr.src}:{tcp_packet.sport}"
sock_dst = f"{ip_hdr.dst}:{tcp_packet.dport}"
connections.add_packet(sock_src, sock_dst, packet)
http_packets = []
for connection in connections:
packet_iterator = connection.iterator()
while packet_iterator.has_more():
request = packet_iterator.find_packet(HttpRequest.PATTERN)
if not request:
continue
packet, match, sock_src = request
method = match[0].decode()
file_name = match[1].decode().rsplit("?")[0]
http_request_packet = self.get_http_packet(packet_iterator, sock_src, packet)
http_packets.append(http_request_packet)
other_sock = connection.get_other_sock(sock_src)
response = packet_iterator.find_packet(HttpResponse.PATTERN, sock_src=other_sock)
if not response:
continue
packet, match, sock_src = response
status_code = match[1].decode()
http_response_packet = self.get_http_packet(packet_iterator, sock_src, packet)
http_response_packet.response_to = http_request_packet
http_packets.append(http_response_packet)
return http_packets
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Path to pcap file to extract files from")
parser.add_argument("-o", "--output-dir", help="Path to destination directory", default="extracted_files/",
dest="output_dir")
parser.add_argument("-l", "--list", help="List available files only", default=False, action="store_true")
parser.add_argument("-e", "--extract", help="Extract files (default)", default=False, action="store_true")
parser.add_argument("-ec", "--exclude-codes", help="Exclude http status codes, default: 101,304,403,404",
default="101,304,403,404", dest="exclude_codes")
parser.add_argument("-ic", "--include-codes", help="Limit http status codes", type=str,
default="", dest="include_codes")
parser.add_argument("-fe", "--file-extensions", help="File extensions, e.g. txt,exe,pdf", type=str,
default="", dest="file_extensions")
parser.add_argument("-fn", "--file-name", help="File name, e.g. passwords.txt", type=str,
default="", dest="file_name")
parser.add_argument("-fp", "--file-path", help="File path (uri), e.g. /admin/index.html", type=str,
default="", dest="file_path")
# TODO: ports, ip_addresses...
args = parser.parse_args()
filters = [
lambda p: not isinstance(p, HttpResponse) or p.status_code not in [int(x) for x in args.exclude_codes.split(",")],
]
if args.include_codes:
filters.append(lambda p: not isinstance(p, HttpResponse) or p.status_code in [int(x) for x in args.include_codes.split(",")])
if args.file_extensions:
filters.append(lambda p: os.path.splitext(p.file_name)[1] in args.file_extensions.split(","))
if args.file_name:
filters.append(lambda p: os.path.basename(p.get_file_path()) == args.file_name)
if args.file_path:
filters.append(lambda p: p.get_file_path() == args.file_path)
pcap_path = args.file
if not os.path.isfile(pcap_path):
print("[-] File not found or not a file:", pcap_path)
exit(1)
output_dir = args.output_dir
if not os.path.isdir(output_dir):
os.makedirs(output_dir, exist_ok=True)
if not os.path.isdir(output_dir):
print("[-] Output directory is not a directory or does not exist and could not be created:", output_dir)
exit(2)
pcap_extractor = PcapExtractor(pcap_path, output_dir, filters)
if args.list and args.extract:
print("[-] Can only specify one of list or extract, not both")
exit(3)
elif args.list:
pcap_extractor.list()
else:
pcap_extractor.extract_all()