-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrowd.py
173 lines (121 loc) · 5.13 KB
/
crowd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import base64
import requests
REQUEST_TIMEOUT_IN_SECS = 5
class Crowd:
'''A class holds many methods which send HTTP requests to Crowd and return the response.
Args:
base_url (str): Crowd Base URL (normally ends with /crowd)
account (str, str): Crowd Account(ID, PW) which has a permission to use API
Attributes:
_admin_api_base_url (str): Crowd Admin API Base URL (concat with base_url argument)
_session (requests.Session): Authenticaed Crowd Sesison using HTTP Basic Authentication
'''
def __init__(self, base_url: str, account: tuple[str, str]):
self._admin_api_base_url = base_url + '/rest/admin/1.0'
self._session = requests.Session()
self._session.auth = account
@classmethod
def generate_group_id(cls, directory_id: str, group_name: str) -> str:
'''Make Group ID with Directory ID and Group Name
Crowd's Group ID & User ID have a format as follows:
{Directory ID}-Base64Encode({Group Name or User Name})
Args:
directory_id (str): Crowd Directory ID
group_name (str): Crowd Group Name
Returns:
str: Group ID
'''
base64_encoded_group_name = base64.b64encode(group_name.encode('utf-8')).decode('utf-8')
return str(directory_id) + '-' + base64_encoded_group_name
def get_managed_directories(self) -> list:
'''Get all managed directories in Crowd
Returns:
list: Managed Directories
'''
results = []
start_index = 0
while True:
# Fetch all Crowd directories by 100.
response = self._session.get(
f'{self._admin_api_base_url}/directory/managed?limit=100&start={start_index}',
timeout=REQUEST_TIMEOUT_IN_SECS).json()
results.extend(list(response['values']))
if response['isLastPage'] is True:
break
start_index = int(response['start']) + int(response['size'])
return results
def get_active_users_by_directory(self, directory_id: str) -> list[dict]:
'''Get all Active users in the given Directory
Args:
directory_id (str): Directory's ID where users belong
Returns:
list: Users
'''
results = []
start_index = 0
req_data = {
'directoryIds': [
directory_id
],
'active': True
}
while True:
# Fetch all Active users in specific directory by 100
response = self._session.post(
f'{self._admin_api_base_url}/users/search?limit=100&start={start_index}',
json=req_data,
timeout=REQUEST_TIMEOUT_IN_SECS).json()
results.extend(list(response['values']))
if response['isLastPage'] is True:
break
start_index = int(response['start']) + int(response['size'])
return results
def get_users_by_group(self, group_id: str) -> list[dict]:
'''Get all users in the given Group
Args:
group_id (str): Group's ID where users belong
Returns:
list: Users
'''
results = []
start_index = 0
while True:
# Fetch all users in specific group by 100
response = self._session.get(
f'{self._admin_api_base_url}/groups/{group_id}/users?limit=100&start={start_index}',
timeout=REQUEST_TIMEOUT_IN_SECS).json()
results.extend(list(response['values']))
if response['isLastPage'] is True:
break
start_index = int(response['start']) + int(response['size'])
return results
def remove_users_from_group(self, group_id: str, user_ids: list[str]) -> dict[str, list[str]]:
'''Remove Given users from the given Group
Args:
group_id (str): Group's ID where users belong
user_ids (list of str): Users' ID to remove
Returns:
dict: User ID list which successfully removed or failed.
'''
if len(user_ids) == 0:
return { 'successes': [], 'failures': [] }
req_data = { 'ids': user_ids }
return self._session.delete(
f'{self._admin_api_base_url}/groups/{group_id}/users',
json=req_data,
timeout=REQUEST_TIMEOUT_IN_SECS).json()
def add_users_to_group(self, group_id: str, user_ids: list[str]) -> dict[str, list[str]]:
'''Add Given users to the given Group
Args:
group_id (str): Group's ID where users will belong
user_ids (list of str): Users' ID to add
Returns:
dict: User ID list which successfully added or failed.
'''
if len(user_ids) == 0:
return { 'successes': [], 'failures': [] }
req_data = { 'ids': user_ids }
return self._session.post(
f'{self._admin_api_base_url}/groups/{group_id}/users',
json=req_data,
timeout=REQUEST_TIMEOUT_IN_SECS).json()