Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RLink Forwarding Issue #2080

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions crossbar/worker/rlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,26 @@ def on_subscription_create(sub_session, sub_details, details=None):
The handler will then also subscribe on the other router, and when receiving
events, re-publish those on this router.

:param sub_id:
:param sub_session:
:param sub_details:
:param details:
:return:
"""
if sub_details["uri"].startswith("wamp."):
return

if sub_details["id"] in self._subs:
# this should not happen actually, but not sure ..
sub_id = sub_details["id"]

if sub_id in self._subs and self._subs[sub_id]["sub"]:
# This will happen if, partway through the subscription process, the RLink disconnects
self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}',
sub_id=sub_details["id"],
sub_id=sub_id,
method=hltype(BridgeSession._setup_event_forwarding))
return

self._subs[sub_details["id"]] = sub_details
self._subs[sub_details["id"]]["sub"] = None
if sub_id not in self._subs:
self._subs[sub_id] = sub_details
self._subs[sub_id]["sub"] = None

uri = sub_details['uri']
ERR_MSG = [None]
Expand Down Expand Up @@ -162,17 +165,17 @@ def on_event(*args, **kwargs):
uri))
return

if sub_details["id"] not in self._subs:
if sub_id not in self._subs:
self.log.info("subscription already gone: {uri}", uri=sub_details['uri'])
yield sub.unregister()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like copy/paste mistake. Subs don't have unregister() they have unsubscribe() which is called further down

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, indeed, that seems definitely wrong. good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sadly, mypy (which is pretty good at static code analysis) does not catch that:

(cpy311_7) oberstet@intel-nuci7:~/scm/crossbario/crossbar$ git log -n1
commit 507726e70eb329fda87b8f009b6310435dd85b31 (HEAD -> updating_dependencies, origin/updating_dependencies)
Author: Tobias Oberstein <tobias.oberstein@gmail.com>
Date:   Wed Aug 23 19:15:11 2023 +0200

    use latest release of eth-abi published 08.06.23
(cpy311_7) oberstet@intel-nuci7:~/scm/crossbario/crossbar$ python -V
Python 3.11.1
(cpy311_7) oberstet@intel-nuci7:~/scm/crossbario/crossbar$ tox -e mypy
mypy: commands[0]> mypy --exclude '(test_.*\.py)|(syntaxerror\.py)' --ignore-missing-imports crossbar
Success: no issues found in 215 source files
  mypy: OK (0.33=setup[0.05]+cmd[0.28] seconds)
  congratulations :) (0.40 seconds)
(cpy311_7) oberstet@intel-nuci7:~/scm/crossbario/crossbar$ 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, sorry. Good spot.

Now that this PR has been closed and re-opened because of CI problems, I can't make the changes. Is there anything I can do to help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the reopening mess .. I was aware that it'll be more mess .. but unfort., this is a good example why 1 tiny issue (logs deleted after N days by GH, and so on) leads to more and more things ... which is why all of this is such a time burner

in general:

  1. on the one hand, we need to resolve Move CI fully to hosted GH runners again #2092 to make the hosted CI work again. rip out "self-hosted", replace with GH runners, .. that is, work on GH actions workflow files
  2. until 1. is fixed, you can also work on CI with local testing

I just tried tox -e sphinx,flake8,mypy,yapf,bandit,pytest. and other targets.

now comes the fun;) here is just one that gives another good example of "time burner":

sphinx fails currently. because it is set to "warnings are errors" (conservative). and

Warning, treated as error:
Failed to read intersphinx_mapping[https://docs.python.org/], ignored: SphinxWarning('The pre-Sphinx 1.0 \'intersphinx_mapping\' format is deprecated and will be removed in Sphinx 8. Update to the current format as described in the documentation. Hint: "intersphinx_mapping = {\'<name>\': (\'https://docs.python.org/\', None)}".https://www.sphinx-doc.org/en/master/usage/extensions/intersphinx.html#confval-intersphinx_mapping')

intersphinx is for auto linking our docs to other popular projects' docs.

I don't know what intersphinx config shit changed, and I have zero motivation to find out;) but it needs fixing. I could go on. we've tried hard over the years to arrive at robust, working CI. I gave up. maybe I am just too exhausted, but this kind of stuff needs pretty much constant attention. why can't I write some CI today, make it work perfectly, and then rely on that it will continue to work next week when there are no changes on our side? I don't get it. 2 + 2 is 4 today, and it should be forever, but I can't rely on that tomorrow? wtf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to mention: if you want, you can work locally or you can create new PRs or branch from mine or whatever .. I don't mind closing "mine" again .. whatever works works for me;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a read of the CI bug you linked (https://github.com/orgs/community/discussions/49242) and think this might work:
If you re-open this PR I can push the change suggested above and I believe it should re-trigger the CI to make new logs.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should re-trigger the CI to make new logs.

fwiw, I've reopened this one, but I don't think it'll help, since the self-hosted runner is gone (#2092 )

else:
self._subs[sub_details["id"]]["sub"] = sub
self._subs[sub_id]["sub"] = sub

self.log.debug(
"created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}",
me=self._session_id,
other=other,
sub_id=sub_details["id"],
sub_id=sub_id,
sub_details=sub_details,
details=details,
sub_session=sub_session,
Expand Down Expand Up @@ -222,12 +225,21 @@ def forward_current_subs():
def on_remote_join(_session, _details):
yield forward_current_subs()

def on_remote_leave(_session, _details):
# The remote session has ended, clear subscription records.
# Clearing this dictionary helps avoid the case where
# local procedures are not subscribed on the remote leg
# on reestablishment of remote session.
# See: https://github.com/crossbario/crossbar/issues/1909
self._subs = {}

if self.IS_REMOTE_LEG:
yield forward_current_subs()
else:
# from the local leg, don't try to forward events on the
# remote leg unless the remote session is established.
other.on('join', on_remote_join)
other.on('leave', on_remote_leave)

# listen to when new subscriptions are created on the local router
yield self.subscribe(on_subscription_create,
Expand Down Expand Up @@ -267,15 +279,18 @@ def on_registration_create(reg_session, reg_details, details=None):
if reg_details['uri'].startswith("wamp."):
return

if reg_details['id'] in self._regs:
# this should not happen actually, but not sure ..
reg_id = reg_details["id"]

if reg_id in self._regs and self._regs[reg_id]["reg"]:
# This will happen if, partway through the registration process, the RLink disconnects
self.log.error('on_registration_create: reg ID {reg_id} already in map {method}',
reg_id=reg_details['id'],
reg_id=reg_id,
method=hltype(BridgeSession._setup_invocation_forwarding))
return

self._regs[reg_details['id']] = reg_details
self._regs[reg_details['id']]['reg'] = None
if reg_id not in self._regs:
self._regs[reg_id] = reg_details
Skully17 marked this conversation as resolved.
Show resolved Hide resolved
self._regs[reg_id]['reg'] = None

uri = reg_details['uri']
ERR_MSG = [None]
Expand Down Expand Up @@ -365,17 +380,17 @@ def on_call(*args, **kwargs):
# on the "other" router, *this* router may have already
# un-registered. If that happened, our registration will
# be gone, so we immediately un-register on the other side
if reg_details['id'] not in self._regs:
if reg_id not in self._regs:
self.log.info("registration already gone: {uri}", uri=reg_details['uri'])
yield reg.unregister()
else:
self._regs[reg_details['id']]['reg'] = reg
self._regs[reg_id]['reg'] = reg

self.log.info(
self.log.debug(
"created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
me=self._session_id,
other=other._session_id,
reg_id=reg_details['id'],
reg_id=reg_id,
reg_details=reg_details,
details=details,
reg_session=reg_session,
Expand Down