Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the timestamp parser with more structured data types and tim… #204

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12']
python-version: ['3.11', '3.12', '3.13']
os: [ubuntu-latest, windows-latest, macos-latest]

steps:
Expand Down
89 changes: 72 additions & 17 deletions unfurl/parsers/parse_timestamp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Ryan Benson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@

import datetime
import re

from unfurl import utils

timestamp_edge = {
Expand Down Expand Up @@ -50,7 +51,11 @@ def decode_epoch_seconds(seconds):
2030: 1900000000

"""
return datetime.datetime.utcfromtimestamp(float(seconds)), 'Epoch seconds'
return {
'data_type': 'timestamp.epoch-seconds',
'display_type': 'Epoch seconds',
'timestamp_value': str(datetime.datetime.fromtimestamp(float(seconds), datetime.UTC))
}


def decode_epoch_centiseconds(centiseconds):
Expand All @@ -68,9 +73,13 @@ def decode_epoch_centiseconds(centiseconds):
"""
# Trim off the 4 trailing 0s (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(
str(datetime.datetime.utcfromtimestamp(float(centiseconds) / 100)), 4)
return converted_ts, 'Epoch centiseconds'
str(datetime.datetime.fromtimestamp(float(centiseconds) / 100, datetime.UTC)), 4)

return {
'data_type': 'timestamp.epoch-centiseconds',
'display_type': 'Epoch centiseconds',
'timestamp_value': converted_ts
}

def decode_epoch_milliseconds(milliseconds):
"""Decode a numeric timestamp in Epoch milliseconds format to a human-readable timestamp.
Expand All @@ -87,7 +96,12 @@ def decode_epoch_milliseconds(milliseconds):
converted_dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(milliseconds=float(milliseconds))
# Trim off the 3 trailing 0s (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(str(converted_dt), 3)
return converted_ts, 'Epoch milliseconds'

return {
'data_type': 'timestamp.epoch-milliseconds',
'display_type': 'Epoch milliseconds',
'timestamp_value': converted_ts
}


def decode_epoch_ten_microseconds(ten_microseconds):
Expand All @@ -105,9 +119,13 @@ def decode_epoch_ten_microseconds(ten_microseconds):
"""
# Trim off the trailing 0 (don't add precision that wasn't in the timestamp)
converted_ts = trim_zero_fractional_seconds(
str(datetime.datetime.utcfromtimestamp(float(ten_microseconds) / 100000)), 1)
return converted_ts, 'Epoch ten-microsecond increments'
str(datetime.datetime.fromtimestamp(float(ten_microseconds) / 100000, datetime.UTC)), 1)

return {
'data_type': 'timestamp.epoch-ten-microseconds',
'display_type': 'Epoch ten-microsecond increments',
'timestamp_value': converted_ts
}

def decode_epoch_microseconds(microseconds):
"""Decode a numeric timestamp in Epoch microseconds format to a human-readable timestamp.
Expand All @@ -121,8 +139,13 @@ def decode_epoch_microseconds(microseconds):
2030: 1900000000000000

"""
converted_ts = str(datetime.datetime.utcfromtimestamp(float(microseconds) / 1000000))
return converted_ts, 'Epoch microseconds'
converted_ts = datetime.datetime.fromtimestamp(float(microseconds) / 1000000, datetime.UTC)

return {
'data_type': 'timestamp.epoch-microseconds',
'display_type': 'Epoch microseconds',
'timestamp_value': str(converted_ts)
}


def decode_webkit(microseconds):
Expand All @@ -136,8 +159,13 @@ def decode_webkit(microseconds):
2025: 13380163200000000

"""
return datetime.datetime.utcfromtimestamp((float(microseconds) / 1000000) - 11644473600), 'Webkit'
converted_ts = datetime.datetime.fromtimestamp((float(microseconds) / 1000000) - 11644473600, datetime.UTC)

return {
'data_type': 'timestamp.webkit',
'display_type': 'Webkit',
'timestamp_value': str(converted_ts)
}

def decode_windows_filetime(intervals):
"""Decode a numeric timestamp in Windows FileTime format to a human-readable timestamp.
Expand All @@ -152,8 +180,13 @@ def decode_windows_filetime(intervals):
2065: 146424672000000000

"""
return datetime.datetime.utcfromtimestamp((float(intervals) / 10000000) - 11644473600), 'Windows FileTime'
converted_ts = datetime.datetime.fromtimestamp((float(intervals) / 10000000) - 11644473600, datetime.UTC)

return {
'data_type': 'timestamp.windows-filetime',
'display_type': 'Windows FileTime',
'timestamp_value': str(converted_ts)
}

def decode_datetime_ticks(ticks):
"""Decode a numeric timestamp in .Net/C# DateTime ticks format to a human-readable timestamp.
Expand All @@ -175,7 +208,13 @@ def decode_datetime_ticks(ticks):

"""
seconds = (ticks - 621355968000000000) / 10000000
return (datetime.datetime.fromtimestamp(seconds)), 'DateTime ticks'
converted_ts = datetime.datetime.fromtimestamp(seconds)

return {
'data_type': 'timestamp.datetime-ticks',
'display_type': 'DateTime ticks',
'timestamp_value': str(converted_ts)
}


def decode_mac_absolute_time(seconds):
Expand All @@ -194,7 +233,13 @@ def decode_mac_absolute_time(seconds):
2035: 1072915200

"""
return datetime.datetime.utcfromtimestamp(float(seconds)+978307200), 'Mac Absolute Time / Cocoa'
converted_ts = datetime.datetime.fromtimestamp(float(seconds) + 978307200, datetime.UTC)

return {
'data_type': 'timestamp.mac-absolute-time',
'display_type': 'Mac Absolute Time / Cocoa',
'timestamp_value': str(converted_ts)
}


def decode_epoch_hex(seconds):
Expand All @@ -209,7 +254,12 @@ def decode_epoch_hex(seconds):

"""
timestamp, _ = decode_epoch_seconds(int(seconds, 16))
return timestamp, 'Epoch seconds (hex)'

return {
'data_type': 'timestamp.epoch-seconds-hex',
'display_type': 'Epoch seconds (hex)',
'timestamp_value': str(timestamp)
}


def decode_windows_filetime_hex(intervals):
Expand All @@ -227,7 +277,12 @@ def decode_windows_filetime_hex(intervals):
"""
int_right = int(intervals, 16)
timestamp, _ = decode_windows_filetime(int_right)
return timestamp, 'Windows FileTime (hex)'

return {
'data_type': 'timestamp.windows-filetime-hex',
'display_type': 'Windows FileTime (hex)',
'timestamp_value': str(timestamp)
}


def run(unfurl, node):
Expand Down Expand Up @@ -333,6 +388,6 @@ def run(unfurl, node):

if new_timestamp != (None, 'unknown'):
unfurl.add_to_queue(
data_type=new_timestamp[1], key=None, value=new_timestamp[0],
hover=f'Converted as {new_timestamp[1]}', parent_id=node.node_id,
data_type=new_timestamp['data_type'], key=None, value=new_timestamp['timestamp_value'],
hover=f'Converted as {new_timestamp["display_type"]}', parent_id=node.node_id,
incoming_edge_config=timestamp_edge)
5 changes: 3 additions & 2 deletions unfurl/parsers/parse_url.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google LLC
# Copyright 2024 Ryan Benson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,8 @@ def parse_delimited_string(unfurl_instance, node, delimiter, pairs=False) -> Non

def try_url_unquote(unfurl_instance, node) -> bool:
unquoted = urllib.parse.unquote_plus(node.value)
if unquoted != node.value:
# The regex is to avoid erroneously unquoting a timestamp string (ending with +00:00)
if unquoted != node.value and not re.match(r'.*\+\d\d:\d\d$', node.value):
unfurl_instance.add_to_queue(
data_type='string', key=None, value=unquoted,
hover='Unquoted URL (replaced %xx escapes with their single-character equivalent)',
Expand Down
4 changes: 2 additions & 2 deletions unfurl/tests/unit/test_bluesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_bluesky_post(self):
self.assertEqual(1732040395098000, test.nodes[12].value)

# embedded timestamp parses correctly
self.assertEqual('2024-11-19 18:19:55.098000', test.nodes[13].value)
self.assertEqual('2024-11-19 18:19:55.098000+00:00', test.nodes[13].value)

def test_bluesky_bare_tid(self):
""" Test parsing a Bluesky/ATProto TID"""
Expand All @@ -46,7 +46,7 @@ def test_bluesky_bare_tid(self):
self.assertEqual(1731543333133695, test.nodes[2].value)

# embedded timestamp parses correctly
self.assertEqual('2024-11-14 00:15:33.133695', test.nodes[3].value)
self.assertEqual('2024-11-14 00:15:33.133695+00:00', test.nodes[3].value)

if __name__ == '__main__':
unittest.main()
15 changes: 1 addition & 14 deletions unfurl/tests/unit/test_jwt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from unfurl.core import Unfurl
import datetime
import unittest


Expand Down Expand Up @@ -32,10 +31,6 @@ def test_jwt_simple(self):
# confirm that the explanation of the standard "typ" parameter was added
self.assertIn('declare the media type', test.nodes[12].label)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)

def test_jwt_iat_timestamp(self):
"""Parse a sole JWT with an iat field that is parsed as a timestamp.

Expand Down Expand Up @@ -64,11 +59,7 @@ def test_jwt_iat_timestamp(self):
self.assertEqual(1422779638, test.nodes[10].value)

# confirm that the "iat" claim was detected and parsed as a timestamp
self.assertEqual(datetime.datetime(2015, 2, 1, 8, 33, 58), test.nodes[14].value)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)
self.assertEqual('2015-02-01 08:33:58+00:00', test.nodes[14].value)

def test_jwt_as_url_segment(self):
"""Parse a JWT that is part of the URL.
Expand Down Expand Up @@ -101,10 +92,6 @@ def test_jwt_as_url_segment(self):
# confirm that the header was parsed as JSON
self.assertEqual('alg', test.nodes[19].key)

# make sure the queue finished empty
self.assertTrue(test.queue.empty())
self.assertEqual(len(test.edges), 0)


if __name__ == '__main__':
unittest.main()
Loading