Skip to content

Commit

Permalink
Merge branch 'main' into renovate/actions-checkout-4.x
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpener6 authored Oct 8, 2024
2 parents 4390c36 + 116f47e commit 7a18718
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 12 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.5"
__version__ = "1.8.6"
69 changes: 61 additions & 8 deletions scaler/client/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, task: Task, is_delayed: bool, group_task_id: Optional[bytes],
self._result_object_id: Optional[bytes] = None
self._result_ready_event = threading.Event()
self._result_request_sent = False
self._result_received = False

self._profiling_info: Optional[ProfileResult] = None

Expand All @@ -42,6 +43,8 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[
if self.done():
raise InvalidStateError(f"invalid future state: {self._state}")

self._state = "FINISHED"

if object_id is not None:
self._result_object_id = object_id

Expand All @@ -54,25 +57,74 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[

self._result_ready_event.set()

def set_exception(self, exception: Optional[BaseException], profile_result: Optional[ProfileResult] = None) -> None:
def _set_result_or_exception(
self,
result: Optional[Any] = None,
exception: Optional[BaseException] = None,
profiling_info: Optional[ProfileResult] = None
) -> None:
with self._condition: # type: ignore[attr-defined]
if profile_result is not None:
self._profiling_info = profile_result
if self.cancelled():
raise InvalidStateError(f"invalid future state: {self._state}")

if self._result_received:
raise InvalidStateError("future already received object data.")

if profiling_info is not None:
if self._profiling_info is not None:
raise InvalidStateError("cannot set profiling info twice.")

self._profiling_info = profiling_info

self._state = "FINISHED"
self._result_received = True

if exception is not None:
assert result is None
self._exception = exception
for waiter in self._waiters:
waiter.add_exception(self)
else:
self._result = result
for waiter in self._waiters:
waiter.add_result(self)

self._result_ready_event.set()
self._condition.notify_all()

return super().set_exception(exception)
self._invoke_callbacks() # type: ignore[attr-defined]

def result(self, timeout=None):
def set_result(self, result: Any, profiling_info: Optional[ProfileResult] = None) -> None:
self._set_result_or_exception(result=result, profiling_info=profiling_info)

def set_exception(self, exception: Optional[BaseException], profiling_info: Optional[ProfileResult] = None) -> None:
self._set_result_or_exception(exception=exception, profiling_info=profiling_info)

def result(self, timeout: Optional[float] = None) -> Any:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.result() get called
# if it's delayed future, get the result when future.result() gets called
if self._is_delayed:
self._request_result_object()

# wait for
return super().result(timeout)
if not self._result_received:
self._condition.wait(timeout)

return super().result()

def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.exception() gets called
if self._is_delayed:
self._request_result_object()

if not self._result_received:
self._condition.wait(timeout)

return super().exception()

def cancel(self) -> bool:
with self._condition: # type: ignore[attr-defined]
Expand All @@ -88,6 +140,7 @@ def cancel(self) -> bool:
self._connector.send(TaskCancel.new_msg(self._task_id))

self._state = "CANCELLED"
self._result_received = True

self._result_ready_event.set()
self._condition.notify_all() # type: ignore[attr-defined]
Expand Down
20 changes: 17 additions & 3 deletions tests/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_callback(self):
done_called_event = Event()

def on_done_callback(fut):
self.assertTrue(fut.done())
self.assertAlmostEqual(fut.result(), 4.0)
done_called_event.set()

Expand All @@ -45,29 +46,42 @@ def test_as_completed(self):

def test_state(self):
with Client(address=self.address) as client:
fut = client.submit(noop_sleep, 1.0)
fut = client.submit(noop_sleep, 0.5)
self.assertTrue(fut.running())
self.assertFalse(fut.done())

fut.result()
time.sleep(1.5)

self.assertFalse(fut.running())
self.assertTrue(fut.done())

def test_cancel(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, 100.0)
fut.cancel()
self.assertTrue(fut.cancel())

self.assertTrue(fut.cancelled())
self.assertTrue(fut.done())

with self.assertRaises(CancelledError):
fut.result()

fut = client.submit(math.sqrt, 16)
fut.result()

# cancel() should fail on a completed future.
self.assertFalse(fut.cancel())
self.assertFalse(fut.cancelled())

def test_exception(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, "16")

with self.assertRaises(TypeError):
fut.result()

self.assertTrue(fut.done())

self.assertIsInstance(fut.exception(), TypeError)

def test_client_disconnected(self):
Expand Down

0 comments on commit 7a18718

Please sign in to comment.