From 32a52b081bff813acf69e00001936187a417a108 Mon Sep 17 00:00:00 2001 From: Gal Rotem Date: Thu, 18 Jan 2024 17:00:58 -0800 Subject: [PATCH] unit tests - use skip if not gpu/distributed decorators (#684) Summary: Adopt skip_if_not_distributed, skip_if_not_gpu test decorators across all unit tests Differential Revision: D52893384 --- .../torchrec/tests/torchrec_example_test.py | 10 +-- .../callbacks/test_base_checkpointer.py | 18 +++-- .../callbacks/test_checkpoint_utils.py | 20 +++--- tests/framework/callbacks/test_dcp_saver.py | 21 +++--- .../callbacks/test_torchsnapshot_saver.py | 21 +++--- tests/framework/test_unit_utils.py | 17 ++--- tests/utils/data/test_data_prefetcher.py | 13 +--- tests/utils/data/test_profile_dataloader.py | 5 +- tests/utils/loggers/test_tensorboard.py | 6 +- tests/utils/test_device.py | 60 ++++------------ tests/utils/test_distributed.py | 56 ++++----------- tests/utils/test_early_stop_checker.py | 9 +-- tests/utils/test_memory_snapshot_profiler.py | 6 +- tests/utils/test_oom.py | 14 +--- tests/utils/test_prepare_module.py | 72 ++++++------------- 15 files changed, 105 insertions(+), 243 deletions(-) diff --git a/examples/torchrec/tests/torchrec_example_test.py b/examples/torchrec/tests/torchrec_example_test.py index 352e6fc958..7b9d8a5c1b 100644 --- a/examples/torchrec/tests/torchrec_example_test.py +++ b/examples/torchrec/tests/torchrec_example_test.py @@ -8,19 +8,13 @@ import unittest import torch -from torchtnt.utils.test_utils import skip_if_asan, spawn_multi_process +from torchtnt.utils.test_utils import skip_if_asan, skip_if_not_gpu, spawn_multi_process from ..main import main class TorchrecExampleTest(unittest.TestCase): - - cuda_available: bool = torch.cuda.is_available() - @skip_if_asan - @unittest.skipUnless( - cuda_available, - "Skip when CUDA is not available", - ) + @skip_if_not_gpu def test_torchrec_example(self) -> None: spawn_multi_process(2, "nccl", main, []) diff --git a/tests/framework/callbacks/test_base_checkpointer.py b/tests/framework/callbacks/test_base_checkpointer.py index d5ff191023..a20490f758 100644 --- a/tests/framework/callbacks/test_base_checkpointer.py +++ b/tests/framework/callbacks/test_base_checkpointer.py @@ -39,7 +39,11 @@ from torchtnt.framework.unit import AppStateMixin, TrainUnit, TTrainData from torchtnt.utils.distributed import get_global_rank from torchtnt.utils.env import init_from_env -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) class BaseCheckpointSaver(BaseCheckpointer): @@ -363,9 +367,7 @@ def test_save_on_train_end(self) -> None: ], ) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_directory_sync_collective(self) -> None: spawn_multi_process( 2, @@ -410,12 +412,8 @@ def test_invalid_args(self) -> None: ): BaseCheckpointSaver(temp_dir, save_every_n_epochs=0) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_process_group_plumbing(self) -> None: """ Creates a new process group and verifies that it's passed through correctly diff --git a/tests/framework/callbacks/test_checkpoint_utils.py b/tests/framework/callbacks/test_checkpoint_utils.py index 413b45337b..0cfded7cdc 100644 --- a/tests/framework/callbacks/test_checkpoint_utils.py +++ b/tests/framework/callbacks/test_checkpoint_utils.py @@ -32,14 +32,16 @@ from torchtnt.utils.distributed import get_global_rank, PGWrapper from torchtnt.utils.env import init_from_env from torchtnt.utils.fsspec import get_filesystem -from torchtnt.utils.test_utils import get_pet_launch_config, spawn_multi_process +from torchtnt.utils.test_utils import ( + get_pet_launch_config, + skip_if_not_distributed, + spawn_multi_process, +) METADATA_FNAME: str = ".metadata" class CheckpointUtilsTest(unittest.TestCase): - distributed_available: bool = torch.distributed.is_available() - @staticmethod def _create_snapshot_metadata(output_dir: str) -> None: path = os.path.join(output_dir, METADATA_FNAME) @@ -86,9 +88,7 @@ def test_latest_checkpoint_path(self) -> None: get_latest_checkpoint_path(temp_dir, METADATA_FNAME), path_2 ) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_latest_checkpoint_path_distributed(self) -> None: config = get_pet_launch_config(2) launcher.elastic_launch( @@ -290,9 +290,7 @@ def test_retrieve_checkpoint_dirpaths_with_metrics(self) -> None: {os.path.join(temp_dir, paths[1])}, ) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_distributed_get_checkpoint_dirpaths(self) -> None: spawn_multi_process(2, "gloo", self._distributed_get_checkpoint_dirpaths) @@ -425,9 +423,7 @@ def test_get_app_state(self) -> None: ["module", "optimizer", "loss_fn", "train_progress"], ) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_rank_zero_read_and_broadcast(self) -> None: spawn_multi_process(2, "gloo", self._test_rank_zero_read_and_broadcast) diff --git a/tests/framework/callbacks/test_dcp_saver.py b/tests/framework/callbacks/test_dcp_saver.py index 9372cf13fb..232f49ee1f 100644 --- a/tests/framework/callbacks/test_dcp_saver.py +++ b/tests/framework/callbacks/test_dcp_saver.py @@ -29,13 +29,14 @@ from torchtnt.framework.train import train from torchtnt.utils.distributed import get_global_rank from torchtnt.utils.env import seed -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) class DistributedCheckpointSaverTest(unittest.TestCase): - cuda_available: bool = torch.cuda.is_available() - distributed_available: bool = torch.distributed.is_available() - def test_save_restore(self) -> None: input_dim = 2 dataset_len = 10 @@ -223,12 +224,8 @@ def test_save_restore_no_lr_scheduler_restore( app_state = mock_dist_cp.load_state_dict.call_args.args[0]["app_state"] self.assertIn("lr_scheduler", app_state) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_save_restore_fsdp(self) -> None: spawn_multi_process( 2, @@ -276,9 +273,7 @@ def _save_restore_fsdp() -> None: if get_global_rank() == 0: shutil.rmtree(temp_dir) # delete temp directory - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_save_restore_ddp(self) -> None: spawn_multi_process( 2, diff --git a/tests/framework/callbacks/test_torchsnapshot_saver.py b/tests/framework/callbacks/test_torchsnapshot_saver.py index 190f6f9e3c..e6c372352a 100644 --- a/tests/framework/callbacks/test_torchsnapshot_saver.py +++ b/tests/framework/callbacks/test_torchsnapshot_saver.py @@ -33,13 +33,14 @@ from torchtnt.framework.train import train from torchtnt.utils.distributed import get_global_rank from torchtnt.utils.env import seed -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) class TorchSnapshotSaverTest(unittest.TestCase): - cuda_available: bool = torch.cuda.is_available() - distributed_available: bool = torch.distributed.is_available() - def test_save_restore(self) -> None: input_dim = 2 dataset_len = 10 @@ -227,12 +228,8 @@ def test_save_restore_no_lr_scheduler_restore( app_state = mock_torchsnapshot.Snapshot().restore.call_args.args[0] self.assertIn("lr_scheduler", app_state) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_save_restore_fsdp(self) -> None: spawn_multi_process( 2, @@ -281,9 +278,7 @@ def _save_restore_fsdp() -> None: if get_global_rank() == 0: shutil.rmtree(temp_dir) # delete temp directory - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_save_restore_ddp(self) -> None: spawn_multi_process( 2, diff --git a/tests/framework/test_unit_utils.py b/tests/framework/test_unit_utils.py index 5d5ae5a649..93ca6e24a1 100644 --- a/tests/framework/test_unit_utils.py +++ b/tests/framework/test_unit_utils.py @@ -18,13 +18,14 @@ ) from torchtnt.framework.state import State from torchtnt.utils.env import init_from_env -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) class UnitUtilsTest(unittest.TestCase): - cuda_available: bool = torch.cuda.is_available() - distributed_available: bool = torch.distributed.is_available() - def test_step_func_requires_iterator(self) -> None: class Foo: def bar(self, state: State, data: object) -> object: @@ -56,12 +57,8 @@ def test_find_optimizers_for_module(self) -> None: optim_name, _ = optimizers[0] self.assertEqual(optim_name, "optim2") - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_find_optimizers_for_FSDP_module(self) -> None: spawn_multi_process(2, "nccl", self._find_optimizers_for_FSDP_module) diff --git a/tests/utils/data/test_data_prefetcher.py b/tests/utils/data/test_data_prefetcher.py index fb017ae960..fef74c3d1b 100644 --- a/tests/utils/data/test_data_prefetcher.py +++ b/tests/utils/data/test_data_prefetcher.py @@ -11,15 +11,12 @@ import torch from torch.utils.data.dataset import Dataset, TensorDataset from torchtnt.utils.data.data_prefetcher import CudaDataPrefetcher +from torchtnt.utils.test_utils import skip_if_not_gpu Batch = Tuple[torch.Tensor, torch.Tensor] class DataTest(unittest.TestCase): - - # pyre-fixme[4]: Attribute must be annotated. - cuda_available = torch.cuda.is_available() - def _generate_dataset(self, num_samples: int, input_dim: int) -> Dataset[Batch]: """Returns a dataset of random inputs and labels for binary classification.""" data = torch.randn(num_samples, input_dim) @@ -39,9 +36,7 @@ def test_cpu_device_data_prefetcher(self) -> None: with self.assertRaisesRegex(ValueError, "expects a CUDA device"): _ = CudaDataPrefetcher(dataloader, device, num_prefetch_batches) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_num_prefetch_batches_data_prefetcher(self) -> None: device = torch.device("cuda:0") @@ -65,9 +60,7 @@ def test_num_prefetch_batches_data_prefetcher(self) -> None: _ = CudaDataPrefetcher(dataloader, device, num_prefetch_batches=1) _ = CudaDataPrefetcher(dataloader, device, num_prefetch_batches=2) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_cuda_data_prefetcher(self) -> None: device = torch.device("cuda:0") diff --git a/tests/utils/data/test_profile_dataloader.py b/tests/utils/data/test_profile_dataloader.py index 752af2b4c4..c891894a15 100644 --- a/tests/utils/data/test_profile_dataloader.py +++ b/tests/utils/data/test_profile_dataloader.py @@ -12,6 +12,7 @@ from torch.profiler import ProfilerActivity from torchtnt.utils.data.profile_dataloader import profile_dataloader from torchtnt.utils.env import init_from_env +from torchtnt.utils.test_utils import skip_if_not_gpu class DummyIterable: @@ -46,9 +47,7 @@ def test_profile_dataloader_profiler(self) -> None: timer = profile_dataloader(iterable, p) self.assertEqual(len(timer.recorded_durations["next(iter)"]), max_length) - @unittest.skipUnless( - bool(torch.cuda.is_available()), reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_profile_dataloader_device(self) -> None: device = init_from_env() max_length = 10 diff --git a/tests/utils/loggers/test_tensorboard.py b/tests/utils/loggers/test_tensorboard.py index 5ccf8cac7d..beca759cdb 100644 --- a/tests/utils/loggers/test_tensorboard.py +++ b/tests/utils/loggers/test_tensorboard.py @@ -17,7 +17,7 @@ from torch import distributed as dist from torchtnt.utils.loggers.tensorboard import TensorBoardLogger -from torchtnt.utils.test_utils import get_pet_launch_config +from torchtnt.utils.test_utils import get_pet_launch_config, skip_if_not_distributed class TensorBoardLoggerTest(unittest.TestCase): @@ -87,9 +87,7 @@ def _test_distributed() -> None: assert test_path in logger.path assert invalid_path not in logger.path - @unittest.skipUnless( - bool(dist.is_available()), reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_multiple_workers(self: TensorBoardLoggerTest) -> None: config = get_pet_launch_config(2) launcher.elastic_launch(config, entrypoint=self._test_distributed)() diff --git a/tests/utils/test_device.py b/tests/utils/test_device.py index 4f7ed01db8..4b93464e8a 100644 --- a/tests/utils/test_device.py +++ b/tests/utils/test_device.py @@ -23,21 +23,17 @@ record_data_in_stream, set_float32_precision, ) +from torchtnt.utils.test_utils import skip_if_not_gpu class DeviceTest(unittest.TestCase): - - cuda_available: bool = torch.cuda.is_available() - @patch("torch.cuda.is_available", return_value=False) def test_get_cpu_device(self, _) -> None: device = get_device_from_env() self.assertEqual(device.type, "cpu") self.assertEqual(device.index, None) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_get_gpu_device(self) -> None: device_idx = torch.cuda.device_count() - 1 self.assertGreaterEqual(device_idx, 0) @@ -61,9 +57,7 @@ def test_get_gpu_device(self) -> None: self.assertEqual(device.index, 0) self.assertEqual(device.index, torch.cuda.current_device()) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_tensor(self) -> None: cuda_0 = torch.device("cuda:0") a = torch.tensor([1, 2, 3]) @@ -71,9 +65,7 @@ def test_copy_data_to_device_tensor(self) -> None: a = copy_data_to_device(a, cuda_0) self.assertEqual(a.device.type, "cuda") - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_module(self) -> None: cuda_0 = torch.device("cuda:0") model = torch.nn.Linear(1, 1) @@ -83,9 +75,7 @@ def test_copy_data_to_device_module(self) -> None: for param in model.parameters(): self.assertEqual(param.device.type, "cuda") - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_list(self) -> None: cuda_0 = torch.device("cuda:0") b = torch.tensor([1, 2, 3]) @@ -97,9 +87,7 @@ def test_copy_data_to_device_list(self) -> None: for elem in new_list: self.assertEqual(elem.device.type, "cuda") - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_tuple(self) -> None: cuda_0 = torch.device("cuda:0") d = torch.tensor([1, 2, 3]) @@ -111,9 +99,7 @@ def test_copy_data_to_device_tuple(self) -> None: for elem in new_tuple: self.assertEqual(elem.device.type, "cuda") - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_dict(self) -> None: cuda_0 = torch.device("cuda:0") f = torch.tensor([1, 2, 3]) @@ -125,9 +111,7 @@ def test_copy_data_to_device_dict(self) -> None: for key in new_dict.keys(): self.assertEqual(new_dict[key].device.type, "cuda") - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_named_tuple(self) -> None: cuda_0 = torch.device("cuda:0") @@ -146,9 +130,7 @@ def test_copy_data_to_device_named_tuple(self) -> None: self.assertIsNotNone(new_named_tuple.tensor_b) self.assertEqual(type(original_named_tuple), type(new_named_tuple)) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_dataclass(self) -> None: cuda_0 = torch.device("cuda:0") @@ -193,9 +175,7 @@ def __post_init__(self): torch.equal(new_data_class.val, torch.tensor([1, 2], device=cuda_0)) ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_defaultdict(self) -> None: cuda_0 = torch.device("cuda:0") @@ -215,9 +195,7 @@ def test_copy_data_to_device_defaultdict(self) -> None: # make sure the type of new keys is the same self.assertEqual(type(dd[3]), type(new_dd[3])) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_copy_data_to_device_nested(self) -> None: h = torch.tensor([1, 2, 3]) i = torch.tensor([4, 5, 6]) @@ -296,9 +274,7 @@ def test_get_gpu_stats(self) -> None: self.assertGreaterEqual(gpu_stats["temperature_gpu_celsius"], 0) self.assertGreaterEqual(gpu_stats["temperature_memory_celsius"], 0) - @unittest.skipUnless( - condition=(cuda_available), reason="This test must run on a GPU host." - ) + @skip_if_not_gpu def test_record_data_in_stream_dict(self) -> None: curr_stream = torch.cuda.current_stream() a = torch.tensor([1, 2, 3]) @@ -314,9 +290,7 @@ def test_record_data_in_stream_dict(self) -> None: mock_record_stream_a.assert_called_once() mock_record_stream_b.assert_called_once() - @unittest.skipUnless( - condition=(cuda_available), reason="This test must run on a GPU host." - ) + @skip_if_not_gpu def test_record_data_in_stream_tuple(self) -> None: curr_stream = torch.cuda.current_stream() a = torch.tensor([1, 2, 3]) @@ -332,9 +306,7 @@ def test_record_data_in_stream_tuple(self) -> None: mock_record_stream_a.assert_called_once() mock_record_stream_b.assert_called_once() - @unittest.skipUnless( - condition=(cuda_available), reason="This test must run on a GPU host." - ) + @skip_if_not_gpu def test_record_data_in_stream_list(self) -> None: curr_stream = torch.cuda.current_stream() a = torch.tensor([1, 2, 3]) @@ -350,9 +322,7 @@ def test_record_data_in_stream_list(self) -> None: mock_record_stream_a.assert_called_once() mock_record_stream_b.assert_called_once() - @unittest.skipUnless( - condition=(cuda_available), reason="This test must run on a GPU host." - ) + @skip_if_not_gpu def test_set_float32_precision(self) -> None: set_float32_precision("highest") self.assertEqual(torch.get_float32_matmul_precision(), "highest") diff --git a/tests/utils/test_distributed.py b/tests/utils/test_distributed.py index c57c67dcc2..394d6a1cb1 100644 --- a/tests/utils/test_distributed.py +++ b/tests/utils/test_distributed.py @@ -31,12 +31,10 @@ revert_sync_batchnorm, sync_bool, ) -from torchtnt.utils.test_utils import get_pet_launch_config +from torchtnt.utils.test_utils import get_pet_launch_config, skip_if_not_distributed class DistributedTest(unittest.TestCase): - distributed_available: bool = torch.distributed.is_available() - def test_get_process_group_backend_cpu(self) -> None: device = torch.device("cpu") pg_backend = get_process_group_backend_from_device(device) @@ -50,9 +48,7 @@ def test_get_process_group_backend_gpu(self) -> None: def test_get_world_size_single(self) -> None: self.assertEqual(get_world_size(), 1) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_get_world_size(self) -> None: world_size = 4 config = get_pet_launch_config(world_size) @@ -67,9 +63,7 @@ def _test_get_world_size(world_size: int) -> None: dist.init_process_group("gloo") assert get_world_size() == dist.get_world_size() - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_get_global_rank(self) -> None: config = get_pet_launch_config(4) launcher.elastic_launch(config, entrypoint=self._test_get_global_rank)() @@ -86,9 +80,7 @@ def test_get_local_rank_single(self) -> None: self.assertEqual(get_local_rank(), 0) self.assertEqual(get_local_world_size(), 1) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_get_local_rank(self) -> None: config = get_pet_launch_config(2) launcher.elastic_launch(config, entrypoint=self._test_get_local_rank)() @@ -105,18 +97,14 @@ def _destroy_process_group() -> None: destroy_process_group() assert not torch.distributed.is_initialized() - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_destroy_process_group(self) -> None: # should be a no-op if dist is not initialized destroy_process_group() config = get_pet_launch_config(2) launcher.elastic_launch(config, entrypoint=self._destroy_process_group)() - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_gather_uneven(self, world_size: Optional[int] = 4) -> None: config = get_pet_launch_config(2) launcher.elastic_launch( @@ -136,9 +124,7 @@ def _test_ddp_gather_uneven_tensors() -> None: assert len(result[idx]) == idx assert (result[idx] == torch.ones_like(result[idx])).all() - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_gather_uneven_multidim(self) -> None: config = get_pet_launch_config(2) launcher.elastic_launch( @@ -222,9 +208,7 @@ def test_sync_bool_single_process(self) -> None: # these should be the same in a single process case self.assertEqual(val, new_val) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_rank_zero(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)( @@ -234,9 +218,7 @@ def test_sync_bool_multi_process_coherence_mode_rank_zero(self) -> None: self.assertTrue(result[0]) self.assertTrue(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_any(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)( @@ -246,9 +228,7 @@ def test_sync_bool_multi_process_coherence_mode_any(self) -> None: self.assertTrue(result[0]) self.assertTrue(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_all(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)( @@ -258,9 +238,7 @@ def test_sync_bool_multi_process_coherence_mode_all(self) -> None: self.assertFalse(result[0]) self.assertFalse(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_int_false(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)(2) @@ -268,9 +246,7 @@ def test_sync_bool_multi_process_coherence_mode_int_false(self) -> None: self.assertFalse(result[0]) self.assertFalse(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_int_true(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)(1) @@ -278,9 +254,7 @@ def test_sync_bool_multi_process_coherence_mode_int_true(self) -> None: self.assertTrue(result[0]) self.assertTrue(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_float_true(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)(0.4) @@ -288,9 +262,7 @@ def test_sync_bool_multi_process_coherence_mode_float_true(self) -> None: self.assertTrue(result[0]) self.assertTrue(result[1]) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_sync_bool_multi_process_coherence_mode_float_false(self) -> None: config = get_pet_launch_config(2) result = launcher.elastic_launch(config, entrypoint=self._full_sync_worker)(1.0) diff --git a/tests/utils/test_early_stop_checker.py b/tests/utils/test_early_stop_checker.py index 3d47eb8cb6..e9b320814a 100644 --- a/tests/utils/test_early_stop_checker.py +++ b/tests/utils/test_early_stop_checker.py @@ -9,13 +9,10 @@ import torch from torchtnt.utils.early_stop_checker import EarlyStopChecker +from torchtnt.utils.test_utils import skip_if_not_gpu class EarlyStopCheckerTest(unittest.TestCase): - - # pyre-fixme[4]: Attribute must be annotated. - cuda_available = torch.cuda.is_available() - def test_early_stop_patience(self) -> None: # Loss does not decrease beyond 0.25 losses = [0.4, 0.3, 0.28, 0.25, 0.26, 0.25] @@ -87,9 +84,7 @@ def test_early_stop_min_delta(self) -> None: should_stop = es2.check(torch.tensor(0.26)) self.assertTrue(should_stop) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_early_stop_min_delta_on_gpu(self) -> None: device = torch.device("cuda:0") diff --git a/tests/utils/test_memory_snapshot_profiler.py b/tests/utils/test_memory_snapshot_profiler.py index 918ccb8863..6b8e0239c0 100644 --- a/tests/utils/test_memory_snapshot_profiler.py +++ b/tests/utils/test_memory_snapshot_profiler.py @@ -15,17 +15,15 @@ MemorySnapshotParams, MemorySnapshotProfiler, ) +from torchtnt.utils.test_utils import skip_if_not_gpu from torchtnt.utils.version import is_torch_version_geq_2_0 class MemorySnapshotProfilerTest(unittest.TestCase): - cuda_available: bool = torch.cuda.is_available() torch_version_geq_2_0: bool = is_torch_version_geq_2_0() - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @unittest.skipUnless( condition=torch_version_geq_2_0, reason="This test needs changes from PyTorch 2.0 to run.", diff --git a/tests/utils/test_oom.py b/tests/utils/test_oom.py index afb4303090..404d04a99e 100644 --- a/tests/utils/test_oom.py +++ b/tests/utils/test_oom.py @@ -18,14 +18,11 @@ is_out_of_memory_error, log_memory_snapshot, ) +from torchtnt.utils.test_utils import skip_if_not_gpu from torchtnt.utils.version import is_torch_version_geq_2_0 class OomTest(unittest.TestCase): - - # pyre-fixme[4]: Attribute must be annotated. - cuda_available = torch.cuda.is_available() - def test_is_out_of_cpu_memory(self) -> None: """Test CPU OOM error detection.""" cpu_oom_error = RuntimeError("DefaultCPUAllocator: can't allocate memory") @@ -57,14 +54,9 @@ def test_is_out_of_memory_error(self) -> None: not_oom_error = RuntimeError("RuntimeError: blah") self.assertFalse(is_out_of_memory_error(not_oom_error)) + @skip_if_not_gpu @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) - # pyre-fixme[56]: Pyre was not able to infer the type of argument - # `torchtnt.utils.version.is_torch_version_geq_2_0()` to decorator factory - # `unittest.skipUnless`. - @unittest.skipUnless( - condition=is_torch_version_geq_2_0(), + condition=bool(is_torch_version_geq_2_0()), reason="This test needs changes from PyTorch 2.0 to run.", ) def test_log_memory_snapshot(self) -> None: diff --git a/tests/utils/test_prepare_module.py b/tests/utils/test_prepare_module.py index d217d3d53e..c7ca215f2c 100644 --- a/tests/utils/test_prepare_module.py +++ b/tests/utils/test_prepare_module.py @@ -23,7 +23,11 @@ prepare_module, TorchCompileParams, ) -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) from torchtnt.utils.version import is_torch_version_geq_1_13, is_torch_version_geq_2_0 COMPILE_AVAIL = False @@ -36,22 +40,14 @@ class PrepareModelTest(unittest.TestCase): - - cuda_available: bool = torch.cuda.is_available() - distributed_available: bool = torch.distributed.is_available() - - @unittest.skipUnless( - condition=(cuda_available), reason="This test should run on a GPU host." - ) + @skip_if_not_gpu def test_prepare_no_strategy(self) -> None: module = torch.nn.Linear(2, 2) # initialize on cpu device = init_from_env() # should be cuda device module = prepare_module(module, device, strategy=None) self.assertEqual(next(module.parameters()).device, device) - @unittest.skipUnless( - condition=(cuda_available), reason="This test should run on a GPU host." - ) + @skip_if_not_gpu def test_prepare_noop(self) -> None: module = torch.nn.Linear(2, 2) # initialize on cpu device = init_from_env() # should be cuda device @@ -62,12 +58,8 @@ def test_prepare_noop(self) -> None: module2 = prepare_module(module2, device, strategy="noop") self.assertNotEqual(next(module2.parameters()).device, device) - @unittest.skipUnless( - condition=(cuda_available), reason="This test should run on a GPU host." - ) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_gpu + @skip_if_not_distributed def test_prepare_ddp(self) -> None: spawn_multi_process( 2, @@ -87,12 +79,8 @@ def _test_prepare_ddp() -> None: tc = unittest.TestCase() tc.assertTrue(isinstance(ddp_module, DDP)) - @unittest.skipUnless( - condition=(cuda_available), reason="This test should run on a GPU host." - ) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_gpu + @skip_if_not_distributed def test_prepare_fsdp(self) -> None: spawn_multi_process( 2, @@ -108,12 +96,8 @@ def _test_prepare_fsdp() -> None: tc = unittest.TestCase() tc.assertTrue(isinstance(fsdp_module, FSDP)) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_fsdp_pytorch_version(self) -> None: """ Test that a RuntimeError is thrown when using FSDP, and PyTorch < v1.12 @@ -150,11 +134,9 @@ def _test_is_fsdp_module() -> None: fully_shard(model) assert _is_fsdp_module(model) + @skip_if_not_distributed @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=bool(cuda_available and torch.cuda.device_count() >= 2), + condition=bool(torch.cuda.device_count() >= 2), reason="This test needs 2 GPUs to run.", ) def test_is_fsdp_module(self) -> None: @@ -164,12 +146,8 @@ def test_is_fsdp_module(self) -> None: self._test_is_fsdp_module, ) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_fdsp_precision(self) -> None: spawn_multi_process( 2, @@ -206,12 +184,8 @@ def test_prepare_module_strategy_invalid_str(self) -> None: strategy="foo", ) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_prepare_module_with_fsdp(self) -> None: """ Launch tests of FSDP strategy @@ -257,9 +231,7 @@ def _test_prepare_module_fsdp_string_wrapped_in_fsdp() -> None: tc.assertTrue(isinstance(fsdp_module, FSDP)) - @unittest.skipUnless( - distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_prepare_module_with_ddp(self) -> None: """ Launch tests of DDP strategy @@ -333,9 +305,7 @@ def _test_prepare_module_ddp_throws_with_compile_params_and_static_graph() -> No condition=COMPILE_AVAIL, reason="This test needs PyTorch 1.13 or greater to run.", ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu def test_prepare_module_compile_module_state_dict(self) -> None: device = init_from_env() my_module = torch.nn.Linear(2, 2, device=device)