-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce incoming gossip query handling and forwarding #9542
base: master
Are you sure you want to change the base?
Conversation
6161d0f
to
8d972b1
Compare
@f321x oops, I see I got in your way a little with my imports/whitespace cleanup, sorry bout that :) |
thanks for the hint @accumulator :D |
bd00a56
to
0bd23d1
Compare
electrum/lnpeer.py
Outdated
new_gossip, last_lngossip_refresh_ts = await lngossip.get_forwarding_gossip() | ||
if (not last_lngossip_refresh_ts > last_gossip_batch_ts | ||
and self.gossip_timestamp_filter.only_forwarding): | ||
continue # no new batch available, and no special request | ||
last_gossip_batch_ts = last_lngossip_refresh_ts | ||
|
||
async with self.network.lngossip.gossip_request_semaphore: | ||
if not self.gossip_timestamp_filter.only_forwarding: | ||
requested_gossip = chan_db.get_gossip_in_timespan(self.gossip_timestamp_filter) | ||
new_gossip = requested_gossip + new_gossip | ||
self.gossip_timestamp_filter.only_forwarding = True | ||
try: | ||
await self.send_gossip_list(new_gossip) | ||
except Exception as e: | ||
self.logger.debug(f"failed to forward gossip: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this takes longer than ~60 seconds? i.e. if this loop never sees some batches as lngossip._maintain_forwarding_gossip
already had some iterations while we were sending an older batch to this peer. (due to slow outgoing link to this peer, or we might be waiting on gossip_request_semaphore
)
If that is ok, at least there should be some comment about it around here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not too bad if a peer misses a batch as gossip has no guarantees to be complete and you should fetch from multiple peers to collect a more complete view but i see that this could be solved more elegantly. In 60ea8ef i put the send_gossip_list in the Peers OldTaskGroup
when testing this seems to work well, now even if there is a huge send_gossip_list
call that takes minutes (when getting a 0 timestamp filter), it concurrently forwards the new batches every minute.
electrum/lnpeer.py
Outdated
if not self.gossip_timestamp_filter.only_forwarding: | ||
requested_gossip = chan_db.get_gossip_in_timespan(self.gossip_timestamp_filter) | ||
new_gossip = requested_gossip + new_gossip | ||
self.gossip_timestamp_filter.only_forwarding = True | ||
await self.taskgroup.spawn(self.send_gossip_list(new_gossip)) | ||
|
||
async def send_gossip_list(self, messages: List[GossipForwardingMessage]): | ||
amount_sent = 0 | ||
async with self.network.lngossip.gossip_request_semaphore: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a peer sends us gossip_timestamp_filter(first_timestamp=0)
, how big is the data structure returned by chan_db.get_gossip_in_timespan()
? Probably on the order of 100 MB?
And we keep that in memory until gossip_request_semaphore
can be taken.
And this data is duplicated per peer.
It looks easy to OOM-kill a node then: just spin up a couple hundred peers, connect to the node around the same time, and send gossip_timestamp_filter(first_timestamp=0)
with each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the gossip on mainnet from timestamp 0 is around 140k messages, this can be problematic. I refactored the gossip_timestamp_filter handling in 392e867 into 2 separate parts, the forwarding part continues to create a task every 60 seconds to forward new gossip from the queue, and if the peer requested historical gossip (ts < now) we spawn a separate task that satisfies the request and waits for the semaphore before doing so. The forwarding gossip is small, so there should be no memory problem, and the historical gossip is now loaded in memory a maximum of 5 times (if we get 5 different requests with ts 0 simultaneously).
Additionally i changed the process_gossip loop so it spawns tasks for the requests instead of awaiting them so we continue to process incoming gossip while satisfying an outgoing request.
29805b1
to
3f11589
Compare
electrum/lnpeer.py
Outdated
self.transport.send_bytes(msg.msg) | ||
amount_sent += 1 | ||
if amount_sent % 250 == 0: | ||
# this can be a lot of messages, completely blocking the event loop | ||
await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just realised we never call transport.writer.drain()
anywhere.
See https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter.drain
Without that, we keep filling the OS send buffer, without backpressure. As long as we don't send too much traffic, it's probably fine, but now that we want to forward gossip, we should probably await drain()
somewhere.
nice blog post about backpressure btw: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#bug-1-backpressure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting read, thanks for sharing. In ef87a8a added a method send_bytes_and_drain
to LNTransport
that drains after writing and use it in the methods that forward gossip, as they dump a lot of stuff on the StreamWriter
. It would probably be nice to have all sending functions use it but refactoring everything to async context seems risky to me? As the other sending functions only send single messages i think its ok not to call drain in them.
c1ef315
to
6867545
Compare
This PR implements handling the query messages defined in BOLT-7.
Only
LNWallet
peers forward gossip messages and handle queries. This allows peers to find us (LNWallet
publishesnode_announcement
), and prevents peers from querying bothLNGossip
andLNWallet
for the same data. All this functionality will only be active if payment forwarding is enabled too.Queries
Queries are handled directly in the
Peer
, when receiving a query the Peer will fetch the requested messages fromChannelDB
and forward them. Simultaneous queries are disallowed and dropped with warning.Forwarding
When receiving a
gossip_timestamp_filter
message it is stored as non-persisted variable of thePeer
.When we receive gossip it is added to a separate queues in ChannelDB after validation. LNGossip requests these queues with the provided method in ChannelDB in an interval of 60 sec and provides a single list of all queued messages to be accessed by the single peers. Every
Peer
checks this list for changes (updated timestamp) occasionally and forwards it to the peer if it has been refreshed.In case our peer requested additional gossip (before the current timestamp), the
Peer
will query it directly from theChannelDB
and send it together with the gossip to be forwarded.Additional changes
send_warning
andsend_error
sync, because there is no reason for them to beasync
gossip_timestamp_filter
with time=now toLNGossip
peers for them to forward new gossip messages to us.ChannelDB
when calculating progress inget_sync_progress_estimate()
to prevent us from jumping to 100% while not having received a singlenode_announcement
, this can happen when peers sendnode_announcements
last in order. This makesis_synced()
more reliable.