Skip to content

Commit

Permalink
Add invocation IDs to sensitive command tokens (#460)
Browse files Browse the repository at this point in the history
* Add invocation IDs and handling for them to sensitive command tokens

* Add tests for sensitive cmd token invocation IDs
  • Loading branch information
wleightond authored Jun 4, 2024
1 parent 06d74ad commit a41ab04
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 25 deletions.
15 changes: 14 additions & 1 deletion canarytokens/channel_dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def handle_query_name(query_name: Name) -> Tuple[Canarydrop, Dict[str, str]]:
# V2
src_data = Canarytoken.look_for_source_data(query_name=query_name_decoded)
log.info(
f"Recovered: {repr([o for o in src_data.items()])} for {query_name_decoded}"
f"Recovered: {list(src_data.items())} for {query_name_decoded}".replace(
"{", "{{"
).replace("}", "}}")
)
return canarydrop, src_data

Expand Down Expand Up @@ -238,6 +240,17 @@ def query(self, query: Query, src_ip: str): # noqa C901
):
return defer.succeed(self._do_dynamic_response(name=query.name.name))

if canarydrop.type == TokenTypes.CMD:
invocation_id = src_data["src_data"].get("cmd_invocation_id")
if (invocation_id is not None) and any(
hit.src_data.get("cmd_invocation_id") == invocation_id
for hit in canarydrop.triggered_details.hits
):
log.info(
f"Ignoring hit on token {canarydrop.canarytoken.value()}; {invocation_id=} already seen."
)
return defer.succeed(self._do_dynamic_response(name=query.name.name))

token_hit = Canarytoken.create_token_hit(
token_type=canarydrop.type,
input_channel=self.CHANNEL,
Expand Down
10 changes: 8 additions & 2 deletions canarytokens/msreg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from canarytokens.models import Hostname

INVOCATION_ID_LENGTH = 8

REG_TEMPLATE = r"""Windows Registry Editor Version 5.00
; Sensitive command token generated by Thinkst Canary
; Run the following commands with admin privs on a Windows machine:
Expand All @@ -13,7 +15,7 @@
; magic unique canarytoken that will be fired when this command is executed
[HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\SilentProcessExit\{PROCESS}]
"ReportingMode"=dword:00000001
"MonitorProcess"="cmd.exe /c start /min powershell.exe -windowstyle hidden -command \"$($u=$(\\\"u$env:username\\\" -replace('[^a-zA-Z0-9\\-]+', ''))[0..63] -join '';$c=$(\\\"c$env:computername\\\" -replace('[^a-zA-Z0-9\\-]+', ''))[0..63] -join '';Resolve-DnsName -Name \\\"$c.UN.$u.CMD.{TOKEN_DNS}\\\")\""
"MonitorProcess"="cmd.exe /c start /min powershell.exe -windowstyle hidden -command \"$($u=$(\\\"u$env:username\\\" -replace('[^a-zA-Z0-9\\-]+', ''))[0..63] -join '';$c=$(\\\"c$env:computername\\\" -replace('[^a-zA-Z0-9\\-]+', ''))[0..63] -join ''; $id=\\\"\\\"; 1..{INVOCATION_ID_LENGTH} | foreach-object {{ $id += [Char[]]\\\"abcdefhijklmnonpqrstuvwxyz0123456789\\\" | Get-Random }}; Resolve-DnsName -Name \\\"$c.UN.$u.CMD.$id.{TOKEN_DNS}\\\")\""
"""


Expand All @@ -34,4 +36,8 @@ def make_canary_msreg(token_hostname: Hostname, process_name: str = "klist.exe")
if process_name.find(".exe") == -1:
process_name += ".exe"

return REG_TEMPLATE.format(TOKEN_DNS=token_hostname, PROCESS=process_name)
return REG_TEMPLATE.format(
TOKEN_DNS=token_hostname,
PROCESS=process_name,
INVOCATION_ID_LENGTH=INVOCATION_ID_LENGTH,
)
28 changes: 19 additions & 9 deletions canarytokens/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from canarytokens.settings import SwitchboardSettings

from canarytokens import canarydrop, queries
from canarytokens import canarydrop, msreg, queries
from canarytokens.constants import (
CANARYTOKEN_ALPHABET,
CANARYTOKEN_LENGTH,
Expand Down Expand Up @@ -46,7 +46,12 @@
re.IGNORECASE,
)
log4_shell_pattern = re.compile(r"([A-Za-z0-9.-]*)\.L4J\.", re.IGNORECASE)
cmd_process_pattern = re.compile(r"(.+)\.UN\.(.+)\.CMD\.", re.IGNORECASE)
cmd_process_pattern = re.compile(
r"(.+)\.UN\.(.+)\.CMD\.([A-Z0-9]{{{LEN}}}\.)?".format(
LEN=msreg.INVOCATION_ID_LENGTH
),
re.IGNORECASE,
)

# to validate decoded sql username, not a data extractor:
sql_decoded_username = re.compile(r"[A-Za-z0-9\!\#\'\-\.\\\^\_\~]+")
Expand Down Expand Up @@ -220,15 +225,20 @@ def _generic(matches: Match[AnyStr]) -> dict[str, str]:
@staticmethod
def _cmd_process(matches: Match[AnyStr]) -> dict[str, dict[str, AnyStr]]:
""""""
computer_name = matches.group(1).lower()
user_name = matches.group(2).lower()
data = {}
data["cmd_computer_name"] = "(not obtained)"
data["cmd_user_name"] = "(not obtained)"
computer_name = matches.group(1)
user_name = matches.group(2)
invocation_id = matches.group(3)
data = {
"cmd_computer_name": "(not obtained)",
"cmd_user_name": "(not obtained)",
}
if user_name and user_name != "u":
data["cmd_user_name"] = user_name[1:]
data["cmd_user_name"] = user_name[1:].lower()
if computer_name and computer_name != "c":
data["cmd_computer_name"] = computer_name[1:]
data["cmd_computer_name"] = computer_name[1:].lower()
if invocation_id:
data["cmd_invocation_id"] = invocation_id[:-1].lower()

return {"src_data": data}

@staticmethod
Expand Down
84 changes: 84 additions & 0 deletions tests/integration/test_cmd_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import random
import pytest

from canarytokens.constants import CANARYTOKEN_ALPHABET
from canarytokens import msreg
from canarytokens.models import (
CMDTokenHistory,
CMDTokenRequest,
CMDTokenResponse,
TokenAlertDetailGeneric,
)

from tests.utils import (
clear_stats_on_webhook,
trigger_cmd_token,
create_token,
get_stats_from_webhook,
get_token_history,
run_or_skip,
v3,
)


@pytest.mark.parametrize("version", [v3])
@pytest.mark.parametrize(
"use_invocation_id, expected_hits",
[
(True, 1),
(False, 2),
],
)
def test_cmd_token_fires(
use_invocation_id: bool,
expected_hits: int,
webhook_receiver,
version,
runv2,
runv3,
):
"""
Tests the sensitive command token.
"""
run_or_skip(version, runv2=runv2, runv3=runv3)
# Create a CMD token request
memo = "Test stuff break stuff test stuff sometimes build stuff"

token_request = CMDTokenRequest(
webhook_url=webhook_receiver, memo=memo, cmd_process="klist.exe"
)
resp = create_token(token_request, version=version)

# Check dns token has correct attributes
token_info = CMDTokenResponse(**resp)
# assert dns_token_info.webhook_url == token_request.webhook_url
assert token_info.token in token_info.hostname.split(".")

clear_stats_on_webhook(webhook_receiver, token=token_info.token)
# Trigger CMD token twice, make sure the invocation ID limits it to one hit
invocation_id = (
"".join(
random.choice(CANARYTOKEN_ALPHABET)
for _ in range(msreg.INVOCATION_ID_LENGTH)
)
if use_invocation_id
else None
)
_ = trigger_cmd_token(token_info, version=version, invocation_id=invocation_id)
_ = trigger_cmd_token(token_info, version=version, invocation_id=invocation_id)

stats = get_stats_from_webhook(webhook_receiver, token=token_info.token)
if stats is not None:
# Check that what was sent to the webhook is consistent.
assert len(stats) == expected_hits
assert stats[0]["memo"] == memo
_ = TokenAlertDetailGeneric(**stats[0])

# Check that the returned history has a single hit.
resp = get_token_history(token_info=token_info, version=version)

token_history = CMDTokenHistory(**resp)
# TODO: what other fields do we want to assert on.
# note: making them TokenHistory have stronger validators is
# the better option.
assert len(token_history.hits) == expected_hits
2 changes: 2 additions & 0 deletions tests/units/test_msreg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ def test_make_canary_msreg(process_name, token_hostname):
)
assert ".exe" in reg_file_info
assert process_name in reg_file_info
assert f"1..{msreg.INVOCATION_ID_LENGTH}" in reg_file_info
assert "$c.UN.$u.CMD.$id." in reg_file_info
31 changes: 18 additions & 13 deletions tests/units/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,27 @@ def test_log4_shell_pattern(query, computer_name, should_match):


@pytest.mark.parametrize(
"query, cmd_computer_name,cmd_user_name,should_match,",
"query, cmd_computer_name, cmd_user_name, cmd_invocation_id",
[
("cbrokenpc.UN.ubrokenuser.CMD.sometoken.com", "brokenpc", "brokenuser", True),
("c.UN.ubrokenuser.CMD.sometoken.com", "(not obtained)", "brokenuser", True),
# ("xbrokenpc.L4.sometoken.com", "brokenpc", False),
(
"cbrokenpc.UN.ubrokenuser.CMD.someid78.sometoken.com",
"brokenpc",
"brokenuser",
"someid78",
),
("cbrokenpc.UN.ubrokenuser.CMD.sometoken.com", "brokenpc", "brokenuser", None),
("c.UN.ubrokenuser.CMD.sometoken.com", "(not obtained)", "brokenuser", None),
("cbrokenpc.UN.u.CMD.sometoken.com", "brokenpc", "(not obtained)", None),
],
)
def test_cmd_process_pattern(query, cmd_computer_name, cmd_user_name, should_match):
if (m := t.cmd_process_pattern.match(query)) and m is not None:
data = t.Canarytoken._cmd_process(m)
assert should_match
assert data["src_data"]["cmd_computer_name"] == cmd_computer_name
assert data["src_data"]["cmd_user_name"] == cmd_user_name

else:
assert not should_match
def test_cmd_process_pattern(
query, cmd_computer_name, cmd_user_name, cmd_invocation_id
):
m = t.cmd_process_pattern.match(query)
data = t.Canarytoken._cmd_process(m)
assert data["src_data"]["cmd_computer_name"] == cmd_computer_name
assert data["src_data"]["cmd_user_name"] == cmd_user_name
assert data["src_data"].get("cmd_invocation_id") == cmd_invocation_id


def test_canarytoken_create_and_fetch():
Expand Down
29 changes: 29 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AWSKeyTokenResponse,
AzureIDTokenResponse,
AzureIDAdditionalInfo,
CMDTokenResponse,
CustomBinaryTokenRequest,
CustomBinaryTokenResponse,
CustomImageTokenRequest,
Expand Down Expand Up @@ -172,6 +173,34 @@ def windows_directory_fire_token(
return target


def trigger_cmd_token(
token_info: CMDTokenResponse,
version: Union[V2, V3],
invocation_id: Optional[str],
computername: str = "comp",
username: str = "user",
) -> str:
"""
Triggers a CMD token by making a dns query with the expected parameters as Windows would produce. Older tokens don't have an invocation ID, so it reconstructs the domain based on its presence or absence.
"""
domain_template = token_info.reg_file.split("-Name")[1][5:-9]
components = (
domain_template.replace("$u", f"u{username}")
.replace("$c", f"c{computername}")
.split(".")
)
if invocation_id:
components[components.index("$id")] = invocation_id
elif "$id" in components:
components.remove("$id")

target = ".".join(components)

resolver = grab_resolver(version=version)
resolver.resolve(target, "A")
return target


def retry_on_failure(
retry_when_raised: tuple[Exception, ...],
retry_intervals: tuple[float, ...] = (3.0, 3.0, 5.0, 5.0),
Expand Down

0 comments on commit a41ab04

Please sign in to comment.