Skip to content

Commit

Permalink
Add get_range_in_chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanconn committed Sep 25, 2024
1 parent 2bbbaf2 commit 843da65
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 0 deletions.
1 change: 1 addition & 0 deletions cxotime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .convert import * # noqa: F401, F403
from .cxotime import * # noqa: F401, F403
from .utils import get_range_in_chunks

__version__ = ska_helpers.get_version(__package__)

Expand Down
87 changes: 87 additions & 0 deletions cxotime/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import astropy.units as u
import numpy as np
import pytest

from cxotime import CxoTime, get_range_in_chunks


@pytest.mark.parametrize(
"start, stop, dt_max, expected_len, expected_values",
[
("2000:001", "2000:001", 1 * u.day, 2, ["2000:001", "2000:001"]),
(
"2000:001",
"2000:005",
24 * u.hour,
5,
["2000:001", "2000:002", "2000:003", "2000:004", "2000:005"],
),
(
"2000:001",
"2000:005",
2880 * u.minute,
3,
["2000:001", "2000:003", "2000:005"],
),
("2000:001", "2000:005", 10 * 86400 * u.second, 2, ["2000:001", "2000:005"]),
],
)
def test_get_range_in_chunks(start, stop, dt_max, expected_len, expected_values):
start = CxoTime(start)
stop = CxoTime(stop)
result = get_range_in_chunks(start, stop, dt_max)

# Confirm that the time intervals are uniform if there is any interval
if len(result) > 2:
assert all(
np.isclose((result[1:] - result[:-1]).sec, dt_max.to(u.second).value)
)

# Confirm that the time range is covered
assert result[0] == start
assert result[-1] == stop

# And confirm that the result is as expected
assert len(result) == expected_len
assert all(a == CxoTime(b) for a, b in zip(result, expected_values))


# Add a test of a negative time range
def test_get_range_in_chunks_negative():
start = CxoTime("2000:005")
stop = CxoTime("2000:001")
dt_max = 24 * u.hour
result = get_range_in_chunks(start, stop, dt_max)
assert len(result) == 5
expected_values = ["2000:005", "2000:004", "2000:003", "2000:002", "2000:001"]
assert all(a == CxoTime(b) for a, b in zip(result, expected_values))


# Add a test that shows that the time range is covered even if the time range is less than dt_max
def test_get_range_in_chunks_small():
start = CxoTime("2020:001")
stop = CxoTime("2020:005")
dt_max = 30 * u.day
result = get_range_in_chunks(start, stop, dt_max)
assert len(result) == 2
assert all(a == CxoTime(b) for a, b in zip(result, ["2020:001", "2020:005"]))


# Add a test that shows we get an error if dt_max is zero
def test_get_range_in_chunks_zero():
start = CxoTime("2020:001")
stop = CxoTime("2020:005")
dt_max = 0 * u.day
with pytest.raises(ValueError) as excinfo:
get_range_in_chunks(start, stop, dt_max)
assert "dt_max must be positive nonzero" in str(excinfo.value)


# Add a test that shows we get an error if dt_max is negative
def test_get_range_in_chunks_negative_dt():
start = CxoTime("2020:001")
stop = CxoTime("2020:005")
dt_max = -1 * u.day
with pytest.raises(ValueError) as excinfo:
get_range_in_chunks(start, stop, dt_max)
assert "dt_max must be positive nonzero" in str(excinfo.value)
44 changes: 44 additions & 0 deletions cxotime/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import astropy.units as u
import numpy as np

from cxotime import CxoTime, CxoTimeLike


def get_range_in_chunks(start: CxoTimeLike, stop: CxoTimeLike, dt_max: u.Quantity):
"""
Get uniform time chunks for a given time range.
Output times are spaced uniformly spaced by up to ``dt_max`` and cover the time
range from ``start`` to ``stop``.
Parameters
----------
start : CxoTime
Start time of the time range.
stop : CxoTime
Stop time of the time range.
dt_max : u.Quantity (timelike)
Maximum time interval for each chunk.
Returns
-------
CxoTime
CxoTime with time bin edges for each chunk.
"""
start = CxoTime(start)
stop = CxoTime(stop)

# Require that dt_max is a positive nonzero quantity
if dt_max <= 0 * u.s:
raise ValueError("dt_max must be positive nonzero")

# Let this work if start > stop, but flip the sign of dt_max
if start > stop:
dt_max = -dt_max

# Calculate chunks to cover time range, handling edge case of start == stop
n_chunk = max(np.ceil(float((stop - start) / dt_max)), 1)
dt = (stop - start) / n_chunk
times = start + np.arange(n_chunk + 1) * dt
return times

0 comments on commit 843da65

Please sign in to comment.