diff --git a/cxotime/__init__.py b/cxotime/__init__.py index adf2056..9371067 100644 --- a/cxotime/__init__.py +++ b/cxotime/__init__.py @@ -5,6 +5,7 @@ from .convert import * # noqa: F401, F403 from .cxotime import * # noqa: F401, F403 +from .utils import get_range_in_chunks __version__ = ska_helpers.get_version(__package__) diff --git a/cxotime/tests/test_utils.py b/cxotime/tests/test_utils.py new file mode 100644 index 0000000..e9022a7 --- /dev/null +++ b/cxotime/tests/test_utils.py @@ -0,0 +1,87 @@ +import astropy.units as u +import numpy as np +import pytest + +from cxotime import CxoTime, get_range_in_chunks + + +@pytest.mark.parametrize( + "start, stop, dt_max, expected_len, expected_values", + [ + ("2000:001", "2000:001", 1 * u.day, 2, ["2000:001", "2000:001"]), + ( + "2000:001", + "2000:005", + 24 * u.hour, + 5, + ["2000:001", "2000:002", "2000:003", "2000:004", "2000:005"], + ), + ( + "2000:001", + "2000:005", + 2880 * u.minute, + 3, + ["2000:001", "2000:003", "2000:005"], + ), + ("2000:001", "2000:005", 10 * 86400 * u.second, 2, ["2000:001", "2000:005"]), + ], +) +def test_get_range_in_chunks(start, stop, dt_max, expected_len, expected_values): + start = CxoTime(start) + stop = CxoTime(stop) + result = get_range_in_chunks(start, stop, dt_max) + + # Confirm that the time intervals are uniform if there is any interval + if len(result) > 2: + assert all( + np.isclose((result[1:] - result[:-1]).sec, dt_max.to(u.second).value) + ) + + # Confirm that the time range is covered + assert result[0] == start + assert result[-1] == stop + + # And confirm that the result is as expected + assert len(result) == expected_len + assert all(a == CxoTime(b) for a, b in zip(result, expected_values)) + + +# Add a test of a negative time range +def test_get_range_in_chunks_negative(): + start = CxoTime("2000:005") + stop = CxoTime("2000:001") + dt_max = 24 * u.hour + result = get_range_in_chunks(start, stop, dt_max) + assert len(result) == 5 + expected_values = ["2000:005", "2000:004", "2000:003", "2000:002", "2000:001"] + assert all(a == CxoTime(b) for a, b in zip(result, expected_values)) + + +# Add a test that shows that the time range is covered even if the time range is less than dt_max +def test_get_range_in_chunks_small(): + start = CxoTime("2020:001") + stop = CxoTime("2020:005") + dt_max = 30 * u.day + result = get_range_in_chunks(start, stop, dt_max) + assert len(result) == 2 + assert all(a == CxoTime(b) for a, b in zip(result, ["2020:001", "2020:005"])) + + +# Add a test that shows we get an error if dt_max is zero +def test_get_range_in_chunks_zero(): + start = CxoTime("2020:001") + stop = CxoTime("2020:005") + dt_max = 0 * u.day + with pytest.raises(ValueError) as excinfo: + get_range_in_chunks(start, stop, dt_max) + assert "dt_max must be positive nonzero" in str(excinfo.value) + + +# Add a test that shows we get an error if dt_max is negative +def test_get_range_in_chunks_negative_dt(): + start = CxoTime("2020:001") + stop = CxoTime("2020:005") + dt_max = -1 * u.day + with pytest.raises(ValueError) as excinfo: + get_range_in_chunks(start, stop, dt_max) + assert "dt_max must be positive nonzero" in str(excinfo.value) diff --git a/cxotime/utils.py b/cxotime/utils.py new file mode 100644 index 0000000..4c9b19a --- /dev/null +++ b/cxotime/utils.py @@ -0,0 +1,44 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +import astropy.units as u +import numpy as np + +from cxotime import CxoTime, CxoTimeLike + + +def get_range_in_chunks(start: CxoTimeLike, stop: CxoTimeLike, dt_max: u.Quantity): + """ + Get uniform time chunks for a given time range. + + Output times are spaced uniformly spaced by up to ``dt_max`` and cover the time + range from ``start`` to ``stop``. + + Parameters + ---------- + start : CxoTime + Start time of the time range. + stop : CxoTime + Stop time of the time range. + dt_max : u.Quantity (timelike) + Maximum time interval for each chunk. + + Returns + ------- + CxoTime + CxoTime with time bin edges for each chunk. + """ + start = CxoTime(start) + stop = CxoTime(stop) + + # Require that dt_max is a positive nonzero quantity + if dt_max <= 0 * u.s: + raise ValueError("dt_max must be positive nonzero") + + # Let this work if start > stop, but flip the sign of dt_max + if start > stop: + dt_max = -dt_max + + # Calculate chunks to cover time range, handling edge case of start == stop + n_chunk = max(np.ceil(float((stop - start) / dt_max)), 1) + dt = (stop - start) / n_chunk + times = start + np.arange(n_chunk + 1) * dt + return times