diff --git a/awscrt/mqtt.py b/awscrt/mqtt.py index 2f67c8dc3..affbbd0fa 100644 --- a/awscrt/mqtt.py +++ b/awscrt/mqtt.py @@ -724,25 +724,11 @@ def get_stats(self): """Queries the connection's internal statistics for incomplete operations. Returns: - A future with a (:class:`OperationStatisticsData`) + The (:class:`OperationStatisticsData`) containing the statistics """ - future = Future() - - def get_stats_result( - incomplete_operation_count, - incomplete_operation_size, - unacked_operation_count, - unacked_operation_size): - operation_statistics_data = OperationStatisticsData( - incomplete_operation_count, - incomplete_operation_size, - unacked_operation_count, - unacked_operation_size) - future.set_result(operation_statistics_data) - - _awscrt.mqtt_client_connection_get_stats(self._binding, get_stats_result) - return future + result = _awscrt.mqtt_client_connection_get_stats(self._binding) + return OperationStatisticsData(result[0], result[1], result[2], result[3]) class WebsocketHandshakeTransformArgs: diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 9c37755cd..1de9d2ef7 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1848,23 +1848,8 @@ def get_stats(self): """Queries the client's internal statistics for incomplete operations. Returns: - A future with a (:class:`OperationStatisticsData`) + The (:class:`OperationStatisticsData`) containing the statistics """ - future = Future() - - def get_stats_result( - incomplete_operation_count, - incomplete_operation_size, - unacked_operation_count, - unacked_operation_size): - operation_statistics_data = OperationStatisticsData( - incomplete_operation_count, - incomplete_operation_size, - unacked_operation_count, - unacked_operation_size) - future.set_result(operation_statistics_data) - - _awscrt.mqtt5_client_get_stats(self._binding, get_stats_result) - - return future + result = _awscrt.mqtt5_client_get_stats(self._binding) + return OperationStatisticsData(result[0], result[1], result[2], result[3]) diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index d034709a8..99b0549e1 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -2007,9 +2007,8 @@ PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args) { bool success = false; PyObject *impl_capsule; - PyObject *get_stats_callback_fn_py; - if (!PyArg_ParseTuple(args, "OO", &impl_capsule, &get_stats_callback_fn_py)) { + if (!PyArg_ParseTuple(args, "O", &impl_capsule)) { return NULL; } @@ -2018,7 +2017,7 @@ PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args) { return NULL; } - /* These must be DECREF'd when function ends */ + /* These must be DECREF'd when function ends on error */ PyObject *result = NULL; struct aws_mqtt5_client_operation_statistics stats; @@ -2026,26 +2025,37 @@ PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args) { aws_mqtt5_client_get_stats(client->native, &stats); - result = PyObject_CallFunction( - get_stats_callback_fn_py, - "(KKKK)", - /* K */ (unsigned long long)stats.incomplete_operation_count, - /* K */ (unsigned long long)stats.incomplete_operation_size, - /* K */ (unsigned long long)stats.unacked_operation_count, - /* K */ (unsigned long long)stats.unacked_operation_size); + result = PyTuple_New(4); if (!result) { - PyErr_WriteUnraisable(PyErr_Occurred()); goto done; } - success = true; + PyTuple_SET_ITEM(result, 0, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_count)); + if (PyTuple_GET_ITEM(result, 0) == NULL) { + goto done; + } -done: + PyTuple_SET_ITEM(result, 1, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_size)); + if (PyTuple_GET_ITEM(result, 1) == NULL) { + goto done; + } - Py_XDECREF(result); + PyTuple_SET_ITEM(result, 2, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_count)); + if (PyTuple_GET_ITEM(result, 2) == NULL) { + goto done; + } + + PyTuple_SET_ITEM(result, 3, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_size)); + if (PyTuple_GET_ITEM(result, 3) == NULL) { + goto done; + } + success = true; + +done: if (success) { - Py_RETURN_NONE; + return result; } + Py_XDECREF(result); return NULL; -} \ No newline at end of file +} diff --git a/source/mqtt_client_connection.c b/source/mqtt_client_connection.c index 4e7e6445b..bd06d290e 100644 --- a/source/mqtt_client_connection.c +++ b/source/mqtt_client_connection.c @@ -1178,9 +1178,8 @@ PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args bool success = false; PyObject *impl_capsule; - PyObject *get_stats_callback_fn_py; - if (!PyArg_ParseTuple(args, "OO", &impl_capsule, &get_stats_callback_fn_py)) { + if (!PyArg_ParseTuple(args, "O", &impl_capsule)) { return NULL; } @@ -1190,7 +1189,7 @@ PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args return NULL; } - /* These must be DECREF'd when function ends */ + /* These must be DECREF'd when function ends on error */ PyObject *result = NULL; struct aws_mqtt_connection_operation_statistics stats; @@ -1198,26 +1197,37 @@ PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args aws_mqtt_client_connection_get_stats(connection->native, &stats); - result = PyObject_CallFunction( - get_stats_callback_fn_py, - "(KKKK)", - /* K */ (unsigned long long)stats.incomplete_operation_count, - /* K */ (unsigned long long)stats.incomplete_operation_size, - /* K */ (unsigned long long)stats.unacked_operation_count, - /* K */ (unsigned long long)stats.unacked_operation_size); + result = PyTuple_New(4); if (!result) { - PyErr_WriteUnraisable(PyErr_Occurred()); goto done; } - success = true; + PyTuple_SET_ITEM(result, 0, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_count)); + if (PyTuple_GET_ITEM(result, 0) == NULL) { + goto done; + } -done: + PyTuple_SET_ITEM(result, 1, PyLong_FromUnsignedLongLong((unsigned long long)stats.incomplete_operation_size)); + if (PyTuple_GET_ITEM(result, 1) == NULL) { + goto done; + } - Py_XDECREF(result); + PyTuple_SET_ITEM(result, 2, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_count)); + if (PyTuple_GET_ITEM(result, 2) == NULL) { + goto done; + } + PyTuple_SET_ITEM(result, 3, PyLong_FromUnsignedLongLong((unsigned long long)stats.unacked_operation_size)); + if (PyTuple_GET_ITEM(result, 3) == NULL) { + goto done; + } + + success = true; + +done: if (success) { - Py_RETURN_NONE; + return result; } + Py_XDECREF(result); return NULL; } diff --git a/test/test_mqtt.py b/test/test_mqtt.py index 35100e58b..8ac756205 100644 --- a/test/test_mqtt.py +++ b/test/test_mqtt.py @@ -242,7 +242,7 @@ def test_connect_publish_wait_statistics_disconnect(self): connection.connect().result(TIMEOUT) # check operation statistics - statistics = connection.get_stats().result(TIMEOUT) + statistics = connection.get_stats() self.assertEqual(statistics.incomplete_operation_count, 0) self.assertEqual(statistics.incomplete_operation_size, 0) self.assertEqual(statistics.unacked_operation_count, 0) @@ -254,7 +254,7 @@ def test_connect_publish_wait_statistics_disconnect(self): self.assertEqual(packet_id, puback['packet_id']) # check operation statistics - statistics = connection.get_stats().result(TIMEOUT) + statistics = connection.get_stats() self.assertEqual(statistics.incomplete_operation_count, 0) self.assertEqual(statistics.incomplete_operation_size, 0) self.assertEqual(statistics.unacked_operation_count, 0) @@ -273,20 +273,22 @@ def test_connect_publish_statistics_wait_disconnect(self): expected_size = len(self.TEST_TOPIC) + len(self.TEST_MSG) + 4 # check operation statistics - statistics = connection.get_stats().result(TIMEOUT) + statistics = connection.get_stats() self.assertEqual(statistics.incomplete_operation_count, 1) self.assertEqual(statistics.incomplete_operation_size, expected_size) - # NOTE: Unacked will be zero because we have not invoked the future yet - # and so it has not had time to move to the socket - self.assertEqual(statistics.unacked_operation_count, 0) - self.assertEqual(statistics.unacked_operation_size, 0) + # NOTE: Unacked MAY be zero because we have not invoked the future yet + # and so it has not had time to move to the socket. With Python especially, it seems to heavily depend on how fast + # the test is executed, which makes it sometimes rarely get into the unacked operation and then it is non-zero. + # To fix this, we just make sure it is within expected bounds (0 or within packet size). + self.assertTrue(statistics.unacked_operation_count <= 1) + self.assertTrue(statistics.unacked_operation_count <= expected_size) # wait for PubAck puback = published.result(TIMEOUT) self.assertEqual(packet_id, puback['packet_id']) # check operation statistics - statistics = connection.get_stats().result(TIMEOUT) + statistics = connection.get_stats() self.assertEqual(statistics.incomplete_operation_count, 0) self.assertEqual(statistics.incomplete_operation_size, 0) self.assertEqual(statistics.unacked_operation_count, 0) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index b8487be6c..ef0f38a4e 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1240,7 +1240,7 @@ def test_operation_statistics_uc1(self): client1, callbacks = self._test_connect(auth_type=AuthType.DIRECT_MUTUAL_TLS, client_options=client_options) # Make sure the operation statistics are empty - statistics = client1.get_stats().result(TIMEOUT) + statistics = client1.get_stats() self.assertEqual(statistics.incomplete_operation_count, 0) self.assertEqual(statistics.incomplete_operation_size, 0) self.assertEqual(statistics.unacked_operation_count, 0) @@ -1257,7 +1257,7 @@ def test_operation_statistics_uc1(self): publish_future.result(TIMEOUT) # Make sure the operation statistics are empty - statistics = client1.get_stats().result(TIMEOUT) + statistics = client1.get_stats() self.assertEqual(statistics.incomplete_operation_count, 0) self.assertEqual(statistics.incomplete_operation_size, 0) self.assertEqual(statistics.unacked_operation_count, 0)