Skip to content

Commit

Permalink
Add tests for sensitive cmd token invocation IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
wleightond committed Jun 2, 2024
1 parent c1ac32b commit a030953
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 13 deletions.
82 changes: 82 additions & 0 deletions tests/integration/test_cmd_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import random
from canarytokens.constants import CANARYTOKEN_ALPHABET, INVOCATION_ID_LENGTH
import pytest

from canarytokens.models import (
CMDTokenHistory,
CMDTokenRequest,
CMDTokenResponse,
TokenAlertDetailGeneric,
)

from tests.utils import (
clear_stats_on_webhook,
trigger_cmd_token,
create_token,
get_stats_from_webhook,
get_token_history,
run_or_skip,
v3,
)


@pytest.mark.parametrize("version", [v3])
@pytest.mark.parametrize(
"use_invocation_id, expected_hits",
[
(True, 1),
(False, 2),
],
)
def test_cmd_token_fires(
use_invocation_id: bool,
expected_hits: int,
webhook_receiver,
version,
runv2,
runv3,
):
"""
Tests the sensitive command token.
"""
run_or_skip(version, runv2=runv2, runv3=runv3)
# Create a CMD token request
memo = "Test stuff break stuff test stuff sometimes build stuff"

token_request = CMDTokenRequest(
webhook_url=webhook_receiver, memo=memo, cmd_process="klist.exe"
)
resp = create_token(token_request, version=version)

# Check dns token has correct attributes
token_info = CMDTokenResponse(**resp)
# assert dns_token_info.webhook_url == token_request.webhook_url
assert token_info.token in token_info.hostname.split(".")

clear_stats_on_webhook(webhook_receiver, token=token_info.token)
# Trigger CMD token twice, make sure the invocation ID limits it to one hit
invocation_id = (
"".join(
random.choice(CANARYTOKEN_ALPHABET) for _ in range(INVOCATION_ID_LENGTH)
)
if use_invocation_id
else None
)
_ = trigger_cmd_token(token_info, version=version, invocation_id=invocation_id)
_ = trigger_cmd_token(token_info, version=version, invocation_id=invocation_id)

stats = get_stats_from_webhook(webhook_receiver, token=token_info.token)
if stats is not None:
# Check that what was sent to the webhook is consistent.
assert len(stats) == expected_hits
assert stats[0]["memo"] == memo
_ = TokenAlertDetailGeneric(**stats[0])

# Check that the returned history has a single hit.
resp = get_token_history(token_info=token_info, version=version)

token_history = CMDTokenHistory(**resp)
# TODO: what other fields do we want to assert on.
# note: making them TokenHistory have stronger validators is
# the better option.
assert len(token_history.hits) == expected_hits
31 changes: 18 additions & 13 deletions tests/units/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,27 @@ def test_log4_shell_pattern(query, computer_name, should_match):


@pytest.mark.parametrize(
"query, cmd_computer_name,cmd_user_name,should_match,",
"query, cmd_computer_name,cmd_user_name,cmd_invocation_id",
[
("cbrokenpc.UN.ubrokenuser.CMD.sometoken.com", "brokenpc", "brokenuser", True),
("c.UN.ubrokenuser.CMD.sometoken.com", "(not obtained)", "brokenuser", True),
# ("xbrokenpc.L4.sometoken.com", "brokenpc", False),
(
"cbrokenpc.UN.ubrokenuser.CMD.someid78.sometoken.com",
"brokenpc",
"brokenuser",
"someid78",
),
("cbrokenpc.UN.ubrokenuser.CMD.sometoken.com", "brokenpc", "brokenuser", None),
("c.UN.ubrokenuser.CMD.sometoken.com", "(not obtained)", "brokenuser", None),
("cbrokenpc.UN.u.CMD.sometoken.com", "brokenpc", "(not obtained)", None),
],
)
def test_cmd_process_pattern(query, cmd_computer_name, cmd_user_name, should_match):
if (m := t.cmd_process_pattern.match(query)) and m is not None:
data = t.Canarytoken._cmd_process(m)
assert should_match
assert data["src_data"]["cmd_computer_name"] == cmd_computer_name
assert data["src_data"]["cmd_user_name"] == cmd_user_name

else:
assert not should_match
def test_cmd_process_pattern(
query, cmd_computer_name, cmd_user_name, cmd_invocation_id
):
m = t.cmd_process_pattern.match(query)
data = t.Canarytoken._cmd_process(m)
assert data["src_data"]["cmd_computer_name"] == cmd_computer_name
assert data["src_data"]["cmd_user_name"] == cmd_user_name
assert data["src_data"].get("cmd_invocation_id") == cmd_invocation_id


def test_canarytoken_create_and_fetch():
Expand Down
29 changes: 29 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AWSKeyTokenResponse,
AzureIDTokenResponse,
AzureIDAdditionalInfo,
CMDTokenResponse,
CustomBinaryTokenRequest,
CustomBinaryTokenResponse,
CustomImageTokenRequest,
Expand Down Expand Up @@ -172,6 +173,34 @@ def windows_directory_fire_token(
return target


def trigger_cmd_token(
token_info: CMDTokenResponse,
version: Union[V2, V3],
invocation_id: Optional[str],
computername: str = "comp",
username: str = "user",
) -> str:
"""
Triggers a CMD token by making a dns query with the expected parameters as Windows would produce. Older tokens don't have an invocation ID, so it reconstructs the domain based on its presence or absence.
"""
domain_template = token_info.reg_file.split("-Name")[1][5:-9]
components = (
domain_template.replace("$u", f"u{username}")
.replace("$c", f"c{computername}")
.split(".")
)
if invocation_id:
components[components.index("$id")] = invocation_id
elif "$id" in components:
components.remove("$id")

target = ".".join(components)

resolver = grab_resolver(version=version)
resolver.resolve(target, "A")
return target


def retry_on_failure(
retry_when_raised: tuple[Exception, ...],
retry_intervals: tuple[float, ...] = (3.0, 3.0, 5.0, 5.0),
Expand Down

0 comments on commit a030953

Please sign in to comment.