-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathlinux-entra-sso.py
executable file
·347 lines (307 loc) · 11.5 KB
/
linux-entra-sso.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python3
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright 2024 Siemens AG
# pylint: disable=missing-docstring,invalid-name
# Renable invalid-name check, it should only cover the module name
# pylint: enable=invalid-name
import argparse
import sys
import json
import struct
import uuid
import ctypes
import time
from signal import SIGINT
from threading import Thread, RLock
from gi.repository import GLib, Gio
from pydbus import SessionBus
# version is replaced on installation
LINUX_ENTRA_SSO_VERSION = "0.0.0-dev"
# the ssoUrl is a mandatory parameter when requesting a PRT SSO
# Cookie, but the correct value is not checked as of 30.05.2024
# by the authorization backend. By that, a static (fallback)
# value can be used, if no real value is provided.
SSO_URL_DEFAULT = "https://login.microsoftonline.com/"
EDGE_BROWSER_CLIENT_ID = "d7b530a4-7680-4c23-a8bf-c52c121d2e87"
BROKER_START_TIMEOUT = 5
# dbus start service reply codes
START_REPLY_SUCCESS = 1
START_REPLY_ALREADY_RUNNING = 2
# prctl constants
PR_SET_PDEATHSIG = 1
class NativeMessaging:
@staticmethod
def get_message():
"""
Read a message from stdin and decode it.
"""
raw_length = sys.stdin.buffer.read(4)
if not raw_length:
sys.exit(0)
message_length = struct.unpack("@I", raw_length)[0]
message = sys.stdin.buffer.read(message_length).decode("utf-8")
return json.loads(message)
@staticmethod
def encode_message(message_content):
"""
Encode a message for transmission, given its content
"""
encoded_content = json.dumps(message_content, separators=(",", ":")).encode(
"utf-8"
)
encoded_length = struct.pack("@I", len(encoded_content))
return {"length": encoded_length, "content": encoded_content}
@staticmethod
def send_message(encoded_message):
"""
Send an encoded message to stdout
"""
sys.stdout.buffer.write(encoded_message["length"])
sys.stdout.buffer.write(encoded_message["content"])
sys.stdout.buffer.flush()
class SsoMib:
BROKER_NAME = "com.microsoft.identity.broker1"
BROKER_PATH = "/com/microsoft/identity/broker1"
GRAPH_SCOPES = ["https://graph.microsoft.com/.default"]
def __init__(self, daemon=False):
self._bus = SessionBus()
self.broker = None
self.session_id = uuid.uuid4()
self._state_changed_cb = None
self._last_state_reported = False
if daemon:
self._introspect_broker(fail_on_error=False)
self._monitor_bus()
def _introspect_broker(self, fail_on_error=True):
timeout = time.time() + BROKER_START_TIMEOUT
while not self.broker and time.time() < timeout:
try:
self.broker = self._bus.get(self.BROKER_NAME, self.BROKER_PATH)
self._report_state_change()
return
except GLib.Error as err:
# GDBus.Error:org.freedesktop.dbus.errors.UnknownObject:
# Introspecting on non-existant object
# See https://github.com/siemens/linux-entra-sso/issues/33
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.DBUS_ERROR):
time.sleep(0.1)
continue
if fail_on_error:
raise RuntimeError("Could not start broker")
def _monitor_bus(self):
self._bus.subscribe(
sender="org.freedesktop.DBus",
object="/org/freedesktop/DBus",
signal="NameOwnerChanged",
arg0=self.BROKER_NAME,
signal_fired=self._broker_state_changed,
)
def _broker_state_changed(
self, sender, object, iface, signal, params
): # pylint: disable=redefined-builtin,too-many-arguments
_ = (sender, object, iface, signal)
# params = (name, old_owner, new_owner)
new_owner = params[2]
if new_owner:
self._introspect_broker()
else:
# we need to ensure that the next dbus call will
# wait until the broker is fully initialized again
self.broker = None
self._report_state_change()
def _report_state_change(self):
current_state = bool(self.broker)
if self._state_changed_cb and self._last_state_reported != current_state:
self._state_changed_cb(current_state)
self._last_state_reported = current_state
def on_broker_state_changed(self, callback):
"""
Register a callback to be called when the broker state changes.
The callback should accept a single boolean argument, indicating
if the broker is online or not.
"""
self._state_changed_cb = callback
@staticmethod
def _get_auth_parameters(account, scopes):
return {
"account": account,
"additionalQueryParametersForAuthorization": {},
"authority": "https://login.microsoftonline.com/common",
"authorizationType": 8, # OAUTH2
"clientId": EDGE_BROWSER_CLIENT_ID,
"redirectUri": "https://login.microsoftonline.com"
"/common/oauth2/nativeclient",
"requestedScopes": scopes,
"username": account["username"],
}
def get_accounts(self):
self._introspect_broker()
context = {
"clientId": EDGE_BROWSER_CLIENT_ID,
"redirectUri": str(self.session_id),
}
resp = self.broker.getAccounts("0.0", str(self.session_id), json.dumps(context))
return json.loads(resp)
def acquire_prt_sso_cookie(
self, account, sso_url, scopes=GRAPH_SCOPES
): # pylint: disable=dangerous-default-value
self._introspect_broker()
request = {
"account": account,
"authParameters": SsoMib._get_auth_parameters(account, scopes),
"ssoUrl": sso_url,
}
token = json.loads(
self.broker.acquirePrtSsoCookie(
"0.0", str(self.session_id), json.dumps(request)
)
)
return token
def acquire_token_silently(
self, account, scopes=GRAPH_SCOPES
): # pylint: disable=dangerous-default-value
self._introspect_broker()
request = {
"account": account,
"authParameters": SsoMib._get_auth_parameters(account, scopes),
}
token = json.loads(
self.broker.acquireTokenSilently(
"0.0", str(self.session_id), json.dumps(request)
)
)
return token
def get_broker_version(self):
self._introspect_broker()
params = json.dumps({"msalCppVersion": LINUX_ENTRA_SSO_VERSION})
resp = json.loads(
self.broker.getLinuxBrokerVersion("0.0", str(self.session_id), params)
)
resp["native"] = LINUX_ENTRA_SSO_VERSION
return resp
def run_as_native_messaging():
iomutex = RLock()
def respond(command, message):
NativeMessaging.send_message(
NativeMessaging.encode_message({"command": command, "message": message})
)
def notify_state_change(online):
with iomutex:
respond("brokerStateChanged", "online" if online else "offline")
def handle_command(cmd, received_message):
if cmd == "acquirePrtSsoCookie":
account = received_message["account"]
sso_url = received_message["ssoUrl"] or SSO_URL_DEFAULT
token = ssomib.acquire_prt_sso_cookie(account, sso_url)
respond(cmd, token)
elif cmd == "acquireTokenSilently":
account = received_message["account"]
scopes = received_message.get("scopes") or ssomib.GRAPH_SCOPES
token = ssomib.acquire_token_silently(account, scopes)
respond(cmd, token)
elif cmd == "getAccounts":
respond(cmd, ssomib.get_accounts())
elif cmd == "getVersion":
respond(cmd, ssomib.get_broker_version())
def run_dbus_monitor():
# inform other side about initial state
notify_state_change(bool(ssomib.broker))
loop = GLib.MainLoop()
loop.run()
def register_terminate_with_parent():
libc = ctypes.CDLL("libc.so.6")
libc.prctl(PR_SET_PDEATHSIG, SIGINT, 0, 0, 0)
print("Running as native messaging instance.", file=sys.stderr)
print("For interactive mode, start with --interactive", file=sys.stderr)
# on chrome and chromium, the parent process does not reliably
# terminate the process when the parent process is killed.
register_terminate_with_parent()
ssomib = SsoMib(daemon=True)
ssomib.on_broker_state_changed(notify_state_change)
monitor = Thread(target=run_dbus_monitor)
monitor.start()
while True:
received_message = NativeMessaging.get_message()
with iomutex:
cmd = received_message["command"]
try:
handle_command(cmd, received_message)
except Exception as exp: # pylint: disable=broad-except
err = {"error": f"Failure during request processing: {str(exp)}"}
respond(cmd, err)
def run_interactive():
def _get_account(accounts, idx):
try:
return accounts["accounts"][idx]
except IndexError:
json.dump(
{"error": f"invalid account index {idx}"},
indent=2,
fp=sys.stdout,
)
print()
sys.exit(1)
parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="run in interactive mode",
)
parser.add_argument(
"-a",
"--account",
type=int,
default=0,
help="account index to use for operations",
)
parser.add_argument(
"-s",
"--ssoUrl",
default=SSO_URL_DEFAULT,
help="ssoUrl part of SSO PRT cookie request",
)
parser.add_argument(
"command",
choices=[
"getAccounts",
"getVersion",
"acquirePrtSsoCookie",
"acquireTokenSilently",
"monitor",
],
)
args = parser.parse_args()
monitor_mode = args.command == "monitor"
ssomib = SsoMib(daemon=monitor_mode)
if monitor_mode:
print("Monitoring D-Bus for broker availability.")
ssomib.on_broker_state_changed(
lambda online: print(
f"{ssomib.BROKER_NAME} is now " f"{'online' if online else 'offline'}."
)
)
GLib.MainLoop().run()
return
accounts = ssomib.get_accounts()
if len(accounts["accounts"]) == 0:
print("warning: no accounts registered.", file=sys.stderr)
if args.command == "getAccounts":
json.dump(accounts, indent=2, fp=sys.stdout)
elif args.command == "getVersion":
json.dump(ssomib.get_broker_version(), indent=2, fp=sys.stdout)
elif args.command == "acquirePrtSsoCookie":
account = _get_account(accounts, args.account)
cookie = ssomib.acquire_prt_sso_cookie(account, args.ssoUrl)
json.dump(cookie, indent=2, fp=sys.stdout)
elif args.command == "acquireTokenSilently":
account = _get_account(accounts, args.account)
token = ssomib.acquire_token_silently(account)
json.dump(token, indent=2, fp=sys.stdout)
# add newline
print()
if __name__ == "__main__":
if "--interactive" in sys.argv or "-i" in sys.argv:
run_interactive()
else:
run_as_native_messaging()