From 13dd2a4deeded70e09c779178f4d08e215db6bfe Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 7 Sep 2024 03:05:01 +0200 Subject: [PATCH 1/6] Optimize ssl performance by invoke SSLProtocol.get_buffer directly --- uvloop/handles/stream.pyx | 28 ++++++++++++++++++++++------ uvloop/sslproto.pxd | 3 ++- uvloop/sslproto.pyx | 23 +++++++++++++++-------- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 9fbc5a51..a5778431 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -232,7 +232,10 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) - if (hasattr(protocol, 'get_buffer') and + if isinstance(protocol, SSLProtocol): + self._protocol_buffer_updated = protocol.buffer_updated + self.__buffered = 1 + elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): try: self._protocol_get_buffer = protocol.get_buffer @@ -924,9 +927,21 @@ cdef void __uv_stream_buffered_alloc( "UVStream alloc buffer callback") == 0: return + cdef UVStream sc = stream.data + + # Fast pass for our own SSLProtocol + # avoid python calls, memoryviews, etc + if isinstance(sc._protocol, SSLProtocol): + try: + (sc._protocol).get_buffer_c( + suggested_size, &uvbuf.base, &uvbuf.len) + return + except BaseException as exc: + uvbuf.len = 0 + uvbuf.base = NULL + return + cdef: - UVStream sc = stream.data - Loop loop = sc._loop Py_buffer* pybuf = &sc._read_pybuf int got_buf = 0 @@ -987,7 +1002,7 @@ cdef void __uv_stream_buffered_on_read( return try: - if nread > 0 and not sc._read_pybuf_acquired: + if nread > 0 and not isinstance(sc._protocol, SSLProtocol) and not sc._read_pybuf_acquired: # From libuv docs: # nread is > 0 if there is data available or < 0 on error. When # we’ve reached EOF, nread will be set to UV_EOF. When @@ -1015,5 +1030,6 @@ cdef void __uv_stream_buffered_on_read( sc._fatal_error(exc, False) finally: - sc._read_pybuf_acquired = 0 - PyBuffer_Release(pybuf) + if sc._read_pybuf_acquired: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 3da10f00..5f1a4ee9 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -59,7 +59,6 @@ cdef class SSLProtocol: object _outgoing_read char* _ssl_buffer size_t _ssl_buffer_len - object _ssl_buffer_view SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -84,6 +83,8 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle + cdef inline get_buffer_c(self, size_t n, char** buf, size_t* buf_size) + cdef _set_app_protocol(self, app_protocol) cdef _wakeup_waiter(self, exc=*) cdef _get_extra_info(self, name, default=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 42bb7644..885e71fc 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -204,11 +204,8 @@ cdef class SSLProtocol: self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) if not self._ssl_buffer: raise MemoryError() - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE) def __dealloc__(self): - self._ssl_buffer_view = None PyMem_RawFree(self._ssl_buffer) self._ssl_buffer = NULL self._ssl_buffer_len = 0 @@ -358,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - def get_buffer(self, n): + cdef get_buffer_c(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -367,11 +364,21 @@ cdef class SSLProtocol: if not self._ssl_buffer: raise MemoryError() self._ssl_buffer_len = want - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, want, PyBUF_WRITE) - return self._ssl_buffer_view - def buffer_updated(self, nbytes): + buf[0] = self._ssl_buffer + buf_size[0] = self._ssl_buffer_len + + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + + cdef: + char* buf + size_t buf_size + + self.get_buffer_c(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) From d0280206e55941f8ed349a13df6bde89a1a9ef5f Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 7 Sep 2024 03:11:56 +0200 Subject: [PATCH 2/6] Add inline directive to SSLProtocol private methods --- uvloop/sslproto.pxd | 62 ++++++++++++++++++++++----------------------- uvloop/sslproto.pyx | 2 +- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 5f1a4ee9..1de4bc1e 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -85,55 +85,55 @@ cdef class SSLProtocol: cdef inline get_buffer_c(self, size_t n, char** buf, size_t* buf_size) - cdef _set_app_protocol(self, app_protocol) - cdef _wakeup_waiter(self, exc=*) - cdef _get_extra_info(self, name, default=*) - cdef _set_state(self, SSLProtocolState new_state) + cdef inline _set_app_protocol(self, app_protocol) + cdef inline _wakeup_waiter(self, exc=*) + cdef inline _get_extra_info(self, name, default=*) + cdef inline _set_state(self, SSLProtocolState new_state) # Handshake flow - cdef _start_handshake(self) - cdef _check_handshake_timeout(self) - cdef _do_handshake(self) - cdef _on_handshake_complete(self, handshake_exc) + cdef inline _start_handshake(self) + cdef inline _check_handshake_timeout(self) + cdef inline _do_handshake(self) + cdef inline _on_handshake_complete(self, handshake_exc) # Shutdown flow - cdef _start_shutdown(self, object context=*) - cdef _check_shutdown_timeout(self) - cdef _do_read_into_void(self, object context) - cdef _do_flush(self, object context=*) - cdef _do_shutdown(self, object context=*) - cdef _on_shutdown_complete(self, shutdown_exc) - cdef _abort(self, exc) + cdef inline _start_shutdown(self, object context=*) + cdef inline _check_shutdown_timeout(self) + cdef inline _do_read_into_void(self, object context) + cdef inline _do_flush(self, object context=*) + cdef inline _do_shutdown(self, object context=*) + cdef inline _on_shutdown_complete(self, shutdown_exc) + cdef inline _abort(self, exc) # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) - cdef _do_write(self) - cdef _process_outgoing(self) + cdef inline _write_appdata(self, list_of_data, object context) + cdef inline _do_write(self) + cdef inline _process_outgoing(self) # Incoming flow - cdef _do_read(self) - cdef _do_read__buffered(self) - cdef _do_read__copied(self) - cdef _call_eof_received(self, object context=*) + cdef inline _do_read(self) + cdef inline _do_read__buffered(self) + cdef inline _do_read__copied(self) + cdef inline _call_eof_received(self, object context=*) # Flow control for writes from APP socket - cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) - cdef _set_write_buffer_limits(self, high=*, low=*) + cdef inline _control_app_writing(self, object context=*) + cdef inline size_t _get_write_buffer_size(self) + cdef inline _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket - cdef _pause_reading(self) - cdef _resume_reading(self, object context) + cdef inline _pause_reading(self) + cdef inline _resume_reading(self, object context) # Flow control for reads from SSL socket - cdef _control_ssl_reading(self) - cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) - cdef _fatal_error(self, exc, message=*) + cdef inline _control_ssl_reading(self) + cdef inline _set_read_buffer_limits(self, high=*, low=*) + cdef inline size_t _get_read_buffer_size(self) + cdef inline _fatal_error(self, exc, message=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 885e71fc..9aca825e 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -370,7 +370,7 @@ cdef class SSLProtocol: def get_buffer(self, size_t n): # This pure python call is still used by some very peculiar test cases - + cdef: char* buf size_t buf_size From 6a1a1644d0d95ac0d24b07a594e665449fc153b3 Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 7 Sep 2024 17:53:23 +0200 Subject: [PATCH 3/6] Fix tests --- uvloop/handles/stream.pyx | 17 +++++++++++---- uvloop/sslproto.pxd | 6 +++++- uvloop/sslproto.pyx | 44 ++++++++++++++++++++++----------------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index a5778431..b60b264c 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -233,7 +233,6 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) if isinstance(protocol, SSLProtocol): - self._protocol_buffer_updated = protocol.buffer_updated self.__buffered = 1 elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): @@ -930,13 +929,16 @@ cdef void __uv_stream_buffered_alloc( cdef UVStream sc = stream.data # Fast pass for our own SSLProtocol - # avoid python calls, memoryviews, etc + # avoid python calls, memoryviews, context enter/exit, etc if isinstance(sc._protocol, SSLProtocol): try: - (sc._protocol).get_buffer_c( + (sc._protocol).get_buffer_impl( suggested_size, &uvbuf.base, &uvbuf.len) return except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. uvbuf.len = 0 uvbuf.base = NULL return @@ -1023,7 +1025,14 @@ cdef void __uv_stream_buffered_on_read( if UVLOOP_DEBUG: loop._debug_stream_read_cb_total += 1 - run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + if isinstance(sc._protocol, SSLProtocol): + Context_Enter(sc.context) + try: + (sc._protocol).buffer_updated_impl(nread) + finally: + Context_Exit(sc.context) + else: + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) except BaseException as exc: if UVLOOP_DEBUG: loop._debug_stream_read_cb_errors_total += 1 diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 1de4bc1e..cf3205d1 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -83,7 +83,11 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle - cdef inline get_buffer_c(self, size_t n, char** buf, size_t* buf_size) + # Instead of doing python calls, c methods *_impl are called directly + # from stream.pyx + + cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size) + cdef inline buffer_updated_impl(self, size_t nbytes) cdef inline _set_app_protocol(self, app_protocol) cdef inline _wakeup_waiter(self, exc=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 9aca825e..1deb3b83 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -355,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - cdef get_buffer_c(self, size_t n, char** buf, size_t* buf_size): + cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -368,17 +368,7 @@ cdef class SSLProtocol: buf[0] = self._ssl_buffer buf_size[0] = self._ssl_buffer_len - def get_buffer(self, size_t n): - # This pure python call is still used by some very peculiar test cases - - cdef: - char* buf - size_t buf_size - - self.get_buffer_c(n, &buf, &buf_size) - return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) - - def buffer_updated(self, size_t nbytes): + cdef buffer_updated_impl(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) @@ -394,6 +384,18 @@ cdef class SSLProtocol: elif self._state == SHUTDOWN: self._do_shutdown() + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + cdef: + char* buf + size_t buf_size + + self.get_buffer_impl(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): + self.buffer_updated_impl(nbytes) + def eof_received(self): """Called when the other end of the low-level stream is half-closed. @@ -546,6 +548,7 @@ cdef class SSLProtocol: if self._app_state == STATE_INIT: self._app_state = STATE_CON_MADE self._app_protocol.connection_made(self._get_app_transport()) + self._wakeup_waiter() # We should wakeup user code before sending the first data below. In @@ -558,7 +561,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, # current context is good + None, self)) # Shutdown flow @@ -758,7 +761,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, # current context is good + None, self)) except ssl_SSLAgainErrors as exc: pass @@ -794,10 +797,12 @@ cdef class SSLProtocol: data.append(chunk) except ssl_SSLAgainErrors as exc: pass + if one: self._app_protocol.data_received(first) elif not zero: self._app_protocol.data_received(b''.join(data)) + if not chunk: # close_notify self._call_eof_received() @@ -887,11 +892,12 @@ cdef class SSLProtocol: self._app_reading_paused = False if self._state == WRAPPED: self._loop._call_soon_handle( - new_MethodHandle(self._loop, - "SSLProtocol._do_read", - self._do_read, - context, - self)) + new_MethodHandle1(self._loop, + "SSLProtocol._do_read", + self._do_read, + context, + self, + context)) # Flow control for reads from SSL socket From fc432d2c7dab5d615ae861c60bf6b0d6adb4e803 Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 7 Sep 2024 17:53:23 +0200 Subject: [PATCH 4/6] Fix tests --- uvloop/sslproto.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 1deb3b83..5dc66796 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -561,7 +561,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, + None, # current context is good self)) # Shutdown flow @@ -761,7 +761,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, + None, # current context is good self)) except ssl_SSLAgainErrors as exc: pass From ab16cc347ee7eaae973d792579ab3dded1c0a722 Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 7 Sep 2024 18:07:56 +0200 Subject: [PATCH 5/6] Undo some changes --- uvloop/sslproto.pyx | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 5dc66796..9015ba57 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -548,7 +548,6 @@ cdef class SSLProtocol: if self._app_state == STATE_INIT: self._app_state = STATE_CON_MADE self._app_protocol.connection_made(self._get_app_transport()) - self._wakeup_waiter() # We should wakeup user code before sending the first data below. In @@ -561,7 +560,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, # current context is good + None, # current context is good self)) # Shutdown flow @@ -761,7 +760,7 @@ cdef class SSLProtocol: new_MethodHandle(self._loop, "SSLProtocol._do_read", self._do_read, - None, # current context is good + None, # current context is good self)) except ssl_SSLAgainErrors as exc: pass @@ -797,12 +796,10 @@ cdef class SSLProtocol: data.append(chunk) except ssl_SSLAgainErrors as exc: pass - if one: self._app_protocol.data_received(first) elif not zero: self._app_protocol.data_received(b''.join(data)) - if not chunk: # close_notify self._call_eof_received() @@ -892,12 +889,11 @@ cdef class SSLProtocol: self._app_reading_paused = False if self._state == WRAPPED: self._loop._call_soon_handle( - new_MethodHandle1(self._loop, - "SSLProtocol._do_read", - self._do_read, - context, - self, - context)) + new_MethodHandle(self._loop, + "SSLProtocol._do_read", + self._do_read, + context, + self)) # Flow control for reads from SSL socket From afe7e60ac233128bb635b71b8c68ed7e04a78397 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 10 Sep 2024 00:54:52 +0200 Subject: [PATCH 6/6] Call UVStream.write directly from SSLProtocol --- uvloop/handles/stream.pxd | 2 ++ uvloop/handles/stream.pyx | 2 +- uvloop/sslproto.pyx | 5 ++++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..df2e3615 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -16,6 +16,8 @@ cdef class UVStream(UVBaseTransport): Py_buffer _read_pybuf bint _read_pybuf_acquired + cpdef write(self, object buf) + # All "inline" methods are final cdef inline _init(self, Loop loop, object protocol, Server server, diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index b60b264c..30f83979 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -676,7 +676,7 @@ cdef class UVStream(UVBaseTransport): self.__reading, id(self)) - def write(self, object buf): + cpdef write(self, object buf): self._ensure_alive() if self._eof: diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 9015ba57..5442b5d0 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -705,7 +705,10 @@ cdef class SSLProtocol: if not self._ssl_writing_paused: data = self._outgoing_read() if len(data): - self._transport.write(data) + if isinstance(self._transport, UVStream): + (self._transport).write(data) + else: + self._transport.write(data) # Incoming flow