Skip to content

Commit

Permalink
fix mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkho committed Nov 7, 2024
1 parent 7337555 commit b5eec07
Showing 1 changed file with 19 additions and 27 deletions.
46 changes: 19 additions & 27 deletions torchdata/nodes/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ def __next__(self) -> T:
if not isinstance(item, StartupExceptionWrapper):
self._sem.release()
item.reraise()
else:
self._steps_since_snapshot += 1
self._sem.release()
self._maybe_update_snapshot(idx)
return item

self._steps_since_snapshot += 1
self._sem.release()
self._maybe_update_snapshot(idx)
return item

def get_state(self) -> Dict[str, Any]:
return {
Expand Down Expand Up @@ -388,44 +388,36 @@ def __init__(
daemon=True,
)
self._thread.start()
self._stopped = False
for _ in range(fast_forward):
next(self)

def __iter__(self) -> Iterator[T]:
return self

def __next__(self) -> T:
if self._stopped:
raise StopIteration()

while True:
if self._stop_event.is_set():
self._stopped = True
raise StopIteration()
try:
item, idx = self._q.get(block=True, timeout=QUEUE_TIMEOUT)
break
except queue.Empty:
continue

if isinstance(item, StopIteration):
self._stopped = True
self._sem.release()
self._stop_event.set()
raise item
elif isinstance(item, ExceptionWrapper):
self._stopped = True
if not isinstance(item, StartupExceptionWrapper):
# We don't need to release for startup exceptions
if isinstance(item, StopIteration):
self._sem.release()
self._stop_event.set()
item.reraise()
else:
self._sem.release()
self._steps_since_snapshot += 1
self._maybe_update_snapshot(idx)
return item
self._stop_event.set()
raise item
elif isinstance(item, ExceptionWrapper):
if not isinstance(item, StartupExceptionWrapper):
# We don't need to release for startup exceptions
self._sem.release()
self._stop_event.set()
item.reraise()
else:
self._sem.release()
self._steps_since_snapshot += 1
self._maybe_update_snapshot(idx)
return item

def get_state(self) -> Dict[str, Any]:
return {
Expand Down

0 comments on commit b5eec07

Please sign in to comment.