Skip to content

Commit

Permalink
Remove future from operation statistics (#439)
Browse files Browse the repository at this point in the history
Remove the future from the operation statistics functions so it instead returns the object directly.
  • Loading branch information
TwistedTwigleg authored Jan 27, 2023
1 parent 082ebfe commit 172cbbe
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 76 deletions.
20 changes: 3 additions & 17 deletions awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,25 +724,11 @@ def get_stats(self):
"""Queries the connection's internal statistics for incomplete operations.
Returns:
A future with a (:class:`OperationStatisticsData`)
The (:class:`OperationStatisticsData`) containing the statistics
"""

future = Future()

def get_stats_result(
incomplete_operation_count,
incomplete_operation_size,
unacked_operation_count,
unacked_operation_size):
operation_statistics_data = OperationStatisticsData(
incomplete_operation_count,
incomplete_operation_size,
unacked_operation_count,
unacked_operation_size)
future.set_result(operation_statistics_data)

_awscrt.mqtt_client_connection_get_stats(self._binding, get_stats_result)
return future
result = _awscrt.mqtt_client_connection_get_stats(self._binding)
return OperationStatisticsData(result[0], result[1], result[2], result[3])


class WebsocketHandshakeTransformArgs:
Expand Down
21 changes: 3 additions & 18 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,23 +1848,8 @@ def get_stats(self):
"""Queries the client's internal statistics for incomplete operations.
Returns:
A future with a (:class:`OperationStatisticsData`)
The (:class:`OperationStatisticsData`) containing the statistics
"""

future = Future()

def get_stats_result(
incomplete_operation_count,
incomplete_operation_size,
unacked_operation_count,
unacked_operation_size):
operation_statistics_data = OperationStatisticsData(
incomplete_operation_count,
incomplete_operation_size,
unacked_operation_count,
unacked_operation_size)
future.set_result(operation_statistics_data)

_awscrt.mqtt5_client_get_stats(self._binding, get_stats_result)

return future
result = _awscrt.mqtt5_client_get_stats(self._binding)
return OperationStatisticsData(result[0], result[1], result[2], result[3])
42 changes: 26 additions & 16 deletions source/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2007,9 +2007,8 @@ PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args) {
bool success = false;

PyObject *impl_capsule;
PyObject *get_stats_callback_fn_py;

if (!PyArg_ParseTuple(args, "OO", &impl_capsule, &get_stats_callback_fn_py)) {
if (!PyArg_ParseTuple(args, "O", &impl_capsule)) {
return NULL;
}

Expand All @@ -2018,34 +2017,45 @@ PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args) {
return NULL;
}

/* These must be DECREF'd when function ends */
/* These must be DECREF'd when function ends on error */
PyObject *result = NULL;

struct aws_mqtt5_client_operation_statistics stats;
AWS_ZERO_STRUCT(stats);

aws_mqtt5_client_get_stats(client->native, &stats);

result = PyObject_CallFunction(
get_stats_callback_fn_py,
"(KKKK)",
/* K */ (unsigned long long)stats.incomplete_operation_count,
/* K */ (unsigned long long)stats.incomplete_operation_size,
/* K */ (unsigned long long)stats.unacked_operation_count,
/* K */ (unsigned long long)stats.unacked_operation_size);
result = PyTuple_New(4);
if (!result) {
PyErr_WriteUnraisable(PyErr_Occurred());
goto done;
}

success = true;
PyTuple_SET_ITEM(result, 0, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_count));
if (PyTuple_GET_ITEM(result, 0) == NULL) {
goto done;
}

done:
PyTuple_SET_ITEM(result, 1, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_size));
if (PyTuple_GET_ITEM(result, 1) == NULL) {
goto done;
}

Py_XDECREF(result);
PyTuple_SET_ITEM(result, 2, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_count));
if (PyTuple_GET_ITEM(result, 2) == NULL) {
goto done;
}

PyTuple_SET_ITEM(result, 3, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_size));
if (PyTuple_GET_ITEM(result, 3) == NULL) {
goto done;
}

success = true;

done:
if (success) {
Py_RETURN_NONE;
return result;
}
Py_XDECREF(result);
return NULL;
}
}
40 changes: 25 additions & 15 deletions source/mqtt_client_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1178,9 +1178,8 @@ PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args
bool success = false;

PyObject *impl_capsule;
PyObject *get_stats_callback_fn_py;

if (!PyArg_ParseTuple(args, "OO", &impl_capsule, &get_stats_callback_fn_py)) {
if (!PyArg_ParseTuple(args, "O", &impl_capsule)) {
return NULL;
}

Expand All @@ -1190,34 +1189,45 @@ PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args
return NULL;
}

/* These must be DECREF'd when function ends */
/* These must be DECREF'd when function ends on error */
PyObject *result = NULL;

struct aws_mqtt_connection_operation_statistics stats;
AWS_ZERO_STRUCT(stats);

aws_mqtt_client_connection_get_stats(connection->native, &stats);

result = PyObject_CallFunction(
get_stats_callback_fn_py,
"(KKKK)",
/* K */ (unsigned long long)stats.incomplete_operation_count,
/* K */ (unsigned long long)stats.incomplete_operation_size,
/* K */ (unsigned long long)stats.unacked_operation_count,
/* K */ (unsigned long long)stats.unacked_operation_size);
result = PyTuple_New(4);
if (!result) {
PyErr_WriteUnraisable(PyErr_Occurred());
goto done;
}

success = true;
PyTuple_SET_ITEM(result, 0, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_count));
if (PyTuple_GET_ITEM(result, 0) == NULL) {
goto done;
}

done:
PyTuple_SET_ITEM(result, 1, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_size));
if (PyTuple_GET_ITEM(result, 1) == NULL) {
goto done;
}

Py_XDECREF(result);
PyTuple_SET_ITEM(result, 2, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_count));
if (PyTuple_GET_ITEM(result, 2) == NULL) {
goto done;
}

PyTuple_SET_ITEM(result, 3, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_size));
if (PyTuple_GET_ITEM(result, 3) == NULL) {
goto done;
}

success = true;

done:
if (success) {
Py_RETURN_NONE;
return result;
}
Py_XDECREF(result);
return NULL;
}
18 changes: 10 additions & 8 deletions test/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_connect_publish_wait_statistics_disconnect(self):
connection.connect().result(TIMEOUT)

# check operation statistics
statistics = connection.get_stats().result(TIMEOUT)
statistics = connection.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 0)
self.assertEqual(statistics.incomplete_operation_size, 0)
self.assertEqual(statistics.unacked_operation_count, 0)
Expand All @@ -254,7 +254,7 @@ def test_connect_publish_wait_statistics_disconnect(self):
self.assertEqual(packet_id, puback['packet_id'])

# check operation statistics
statistics = connection.get_stats().result(TIMEOUT)
statistics = connection.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 0)
self.assertEqual(statistics.incomplete_operation_size, 0)
self.assertEqual(statistics.unacked_operation_count, 0)
Expand All @@ -273,20 +273,22 @@ def test_connect_publish_statistics_wait_disconnect(self):
expected_size = len(self.TEST_TOPIC) + len(self.TEST_MSG) + 4

# check operation statistics
statistics = connection.get_stats().result(TIMEOUT)
statistics = connection.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 1)
self.assertEqual(statistics.incomplete_operation_size, expected_size)
# NOTE: Unacked will be zero because we have not invoked the future yet
# and so it has not had time to move to the socket
self.assertEqual(statistics.unacked_operation_count, 0)
self.assertEqual(statistics.unacked_operation_size, 0)
# NOTE: Unacked MAY be zero because we have not invoked the future yet
# and so it has not had time to move to the socket. With Python especially, it seems to heavily depend on how fast
# the test is executed, which makes it sometimes rarely get into the unacked operation and then it is non-zero.
# To fix this, we just make sure it is within expected bounds (0 or within packet size).
self.assertTrue(statistics.unacked_operation_count <= 1)
self.assertTrue(statistics.unacked_operation_count <= expected_size)

# wait for PubAck
puback = published.result(TIMEOUT)
self.assertEqual(packet_id, puback['packet_id'])

# check operation statistics
statistics = connection.get_stats().result(TIMEOUT)
statistics = connection.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 0)
self.assertEqual(statistics.incomplete_operation_size, 0)
self.assertEqual(statistics.unacked_operation_count, 0)
Expand Down
4 changes: 2 additions & 2 deletions test/test_mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ def test_operation_statistics_uc1(self):
client1, callbacks = self._test_connect(auth_type=AuthType.DIRECT_MUTUAL_TLS, client_options=client_options)

# Make sure the operation statistics are empty
statistics = client1.get_stats().result(TIMEOUT)
statistics = client1.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 0)
self.assertEqual(statistics.incomplete_operation_size, 0)
self.assertEqual(statistics.unacked_operation_count, 0)
Expand All @@ -1257,7 +1257,7 @@ def test_operation_statistics_uc1(self):
publish_future.result(TIMEOUT)

# Make sure the operation statistics are empty
statistics = client1.get_stats().result(TIMEOUT)
statistics = client1.get_stats()
self.assertEqual(statistics.incomplete_operation_count, 0)
self.assertEqual(statistics.incomplete_operation_size, 0)
self.assertEqual(statistics.unacked_operation_count, 0)
Expand Down

0 comments on commit 172cbbe

Please sign in to comment.