Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sock_sendto #628

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ cdef class Loop:

cdef _sock_recv(self, fut, sock, n)
cdef _sock_recv_into(self, fut, sock, buf)
cdef _sock_sendto(self, fut, sock, data, address)
cdef _sock_sendall(self, fut, sock, data)
cdef _sock_accept(self, fut, sock)

Expand Down
90 changes: 89 additions & 1 deletion uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,54 @@ cdef class Loop:
fut.set_result(data)
self._remove_reader(sock)

cdef _sock_sendto(self, fut, sock, data, address):
cdef:
Handle handle
int n

if UVLOOP_DEBUG:
if fut.cancelled():
# Shouldn't happen with _SyncSocketWriterFuture.
raise RuntimeError(
f'_sock_sendto is called on a cancelled Future')

if not self._has_writer(sock):
raise RuntimeError(
f'socket {sock!r} does not have a writer '
f'in the _sock_sendto callback')

try:
n = sock.sendto(data, address)
except (BlockingIOError, InterruptedError):
# Try next time.
return
except (KeyboardInterrupt, SystemExit):
raise
except BaseException as exc:
fut.set_exception(exc)
self._remove_writer(sock)
return

self._remove_writer(sock)

if n == len(data):
fut.set_result(None)
else:
if n:
if not isinstance(data, memoryview):
data = memoryview(data)
data = data[n:]

handle = new_MethodHandle3(
self,
"Loop._sock_sendto",
<method3_t>self._sock_sendto,
None,
self,
fut, sock, data)

self._add_writer(sock, handle)

cdef _sock_sendall(self, fut, sock, data):
cdef:
Handle handle
Expand Down Expand Up @@ -2644,7 +2692,47 @@ cdef class Loop:

@cython.iterable_coroutine
async def sock_sendto(self, sock, data, address):
raise NotImplementedError
cdef:
Handle handle
ssize_t n

if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")

if not data:
return

socket_inc_io_ref(sock)
try:
try:
n = sock.sendto(data, address)
except (BlockingIOError, InterruptedError):
pass
else:
if UVLOOP_DEBUG:
# This can be a partial success, i.e. only part
# of the data was sent
self._sock_try_write_total += 1

if n == len(data):
return
if not isinstance(data, memoryview):
data = memoryview(data)
data = data[n:]

fut = _SyncSocketWriterFuture(sock, self)
handle = new_MethodHandle3(
self,
"Loop._sock_sendto",
<method3_t>self._sock_sendto,
None,
self,
fut, sock, data)

self._add_writer(sock, handle)
return await fut
finally:
socket_dec_io_ref(sock)

@cython.iterable_coroutine
async def connect_accepted_socket(self, protocol_factory, sock, *,
Expand Down
Loading