diff --git a/pkg/connector/client/confluence.go b/pkg/connector/client/confluence.go index f7f25fb..79efd3b 100644 --- a/pkg/connector/client/confluence.go +++ b/pkg/connector/client/confluence.go @@ -8,27 +8,19 @@ import ( "io" "net/http" "net/url" + "slices" "strconv" - "time" + "strings" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/helpers" "github.com/conductorone/baton-sdk/pkg/uhttp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( - maxResults = 50 - minRatelimitWait = 1 * time.Second // Minimum time to wait after a request was ratelimited before trying again - maxRatelimitWait = (2 * time.Minute) + (30 * time.Second) // Maximum time to wait after a request was ratelimited before erroring - - currentUserUrlPath = "/wiki/rest/api/user/current" - GroupsListUrlPath = "/wiki/rest/api/group" - getUsersByGroupIdUrlPath = "/wiki/rest/api/group/%s/membersByGroupId" - addUsersToGroupUrlPath = "/wiki/rest/api/group/userByGroupId?groupId=%s" - removeUsersFromGroupUrlPath = "/wiki/rest/api/group/userByGroupId?groupId=%s&accountId=%s" - spacePermissionsCreateUrlPath = "/wiki/rest/api/space/%s/permissions" - spacePermissionsUpdateUrlPath = "/wiki/rest/api/space/%s/permissions/%s" - SpacesListUrlPath = "/wiki/api/v2/spaces" - spacesGetUrlPath = "/wiki/api/v2/spaces/%s" - SpacePermissionsListUrlPath = "/wiki/api/v2/spaces/%s/permissions" + maxResults = 50 ) type RequestError struct { @@ -42,10 +34,10 @@ func (r *RequestError) Error() string { } type ConfluenceClient struct { - httpClient *http.Client - user string - apiKey string - apiBase *url.URL + user string + apiKey string + apiBase *url.URL + wrapper *uhttp.BaseHttpClient } // fallBackToHTTPS checks to domain and tacks on "https://" if no scheme is @@ -66,7 +58,7 @@ func fallBackToHTTPS(domain string) (*url.URL, error) { } func NewConfluenceClient(ctx context.Context, user, apiKey, domain string) (*ConfluenceClient, error) { - apiBase, err := fallBackToHTTPS(fmt.Sprintf("%s/wiki/rest/api/", domain)) + apiBase, err := fallBackToHTTPS(domain) if err != nil { return nil, err } @@ -77,25 +69,26 @@ func NewConfluenceClient(ctx context.Context, user, apiKey, domain string) (*Con } return &ConfluenceClient{ - httpClient: httpClient, - apiBase: apiBase, - user: user, - apiKey: apiKey, + apiBase: apiBase, + apiKey: apiKey, + user: user, + wrapper: uhttp.NewBaseHttpClient(httpClient), }, nil } func (c *ConfluenceClient) Verify(ctx context.Context) error { - u, err := c.genURLNonPaginated("user/current") + currentUserUrl, err := c.genURLNonPaginated(CurrentUserUrlPath) if err != nil { return err } - var resp *ConfluenceUser - if err := c.get(ctx, u, &resp); err != nil { + var response *ConfluenceUser + _, err = c.get(ctx, currentUserUrl, &response) + if err != nil { return err } - currentUser := resp.AccountId + currentUser := response.AccountId if currentUser == "" { return errors.New("failed to find new user") } @@ -103,224 +96,240 @@ func (c *ConfluenceClient) Verify(ctx context.Context) error { return nil } -func (c *ConfluenceClient) GetUsers(ctx context.Context, pageToken string, pageSize int) ([]ConfluenceUser, string, error) { - // There is no api to get all users, so get all groups then all members of each group. - // We also have to internally handle paging, due to the multiple layers of requests. - - groups := make([]ConfluenceGroup, 0) - - // Get all groups - for { - u, err := c.genURL(pageToken, pageSize, "group") - if err != nil { - return nil, "", err - } - - var resp *confluenceGroupList - if err := c.get(ctx, u, &resp); err != nil { - return nil, "", err - } +func isThereAnotherPage(links ConfluenceLink) bool { + return links.Next != "" +} - groupPage := resp.Results - if len(groupPage) == 0 { - break - } - groups = append(groups, groupPage...) - pageToken = incToken(pageToken, len(groupPage)) +func (c *ConfluenceClient) GetGroups( + ctx context.Context, + pageToken string, + pageSize int, +) ( + []ConfluenceGroup, + string, + *v2.RateLimitDescription, + error, +) { + groupsUrl, err := c.genURL(pageToken, pageSize, GroupsListUrlPath) + if err != nil { + return nil, "", nil, err } - if len(groups) == 0 { - return []ConfluenceUser{}, "", nil + var response *confluenceGroupList + ratelimitData, err := c.get(ctx, groupsUrl, &response) + if err != nil { + return nil, "", ratelimitData, err } - userMap := make(map[string]ConfluenceUser) + groups := response.Results - // Get members of each group - for _, group := range groups { - users := make([]ConfluenceUser, 0) - pageToken = "" - for { - u, err := c.genURL(pageToken, pageSize, "group/member?name="+group.Name) - if err != nil { - return nil, "", err - } - - var resp *confluenceUserList - if err := c.get(ctx, u, &resp); err != nil { - return nil, "", err - } - - userPage := resp.Results - if len(userPage) == 0 { - break - } - - users = append(users, userPage...) - - pageToken = incToken(pageToken, len(userPage)) - } - - // De-dupe users accross groups - for _, user := range users { - if _, ok := userMap[user.AccountId]; !ok { - userMap[user.AccountId] = user - } - } + if !isThereAnotherPage(response.Links) { + return groups, "", ratelimitData, nil } - allUsers := make([]ConfluenceUser, 0) - for _, user := range userMap { - allUsers = append(allUsers, user) - } + token := incToken(pageToken, len(groups)) - return allUsers, "", nil + return groups, token, ratelimitData, nil } -func (c *ConfluenceClient) GetGroups(ctx context.Context, pageToken string, pageSize int) ([]ConfluenceGroup, string, error) { - u, err := c.genURL(pageToken, pageSize, "group") +func (c *ConfluenceClient) GetGroupMembers( + ctx context.Context, + pageToken string, + pageSize int, + groupId string, +) ( + []ConfluenceUser, + string, + *v2.RateLimitDescription, + error, +) { + getUsersUrl, err := c.parse( + fmt.Sprintf(getUsersByGroupIdUrlPath, groupId), + withLimitAndOffset(pageToken, pageSize), + withQueryParameters(map[string]interface{}{ + "expand": "operations", + }), + ) if err != nil { - return nil, "", err + return nil, "", nil, err } - var resp *confluenceGroupList - if err := c.get(ctx, u, &resp); err != nil { - return nil, "", err + var response *confluenceUserList + ratelimitData, err := c.get(ctx, getUsersUrl, &response) + if err != nil { + return nil, "", ratelimitData, err } - groups := resp.Results + users := response.Results - if len(groups) == 0 { - return groups, "", nil + if !isThereAnotherPage(response.Links) { + return users, "", ratelimitData, nil } - token := incToken(pageToken, len(groups)) + token := incToken(pageToken, len(users)) - return groups, token, nil + return users, token, ratelimitData, nil } -func (c *ConfluenceClient) GetGroupMembers(ctx context.Context, pageToken string, pageSize int, group string) ([]ConfluenceUser, string, error) { - u, err := c.genURL(pageToken, pageSize, "group/member?name="+group) +func (c *ConfluenceClient) AddUserToGroup( + ctx context.Context, + accountID string, + groupId string, +) (*v2.RateLimitDescription, error) { + getUsersUrl, err := c.genURLNonPaginated( + fmt.Sprintf( + addUsersToGroupUrlPath, + groupId, + ), + ) if err != nil { - return nil, "", err + return nil, err } - var resp *confluenceUserList - if err := c.get(ctx, u, &resp); err != nil { - return nil, "", err + bodyBytes, err := json.Marshal( + AddUserToGroupRequestBody{ + AccountId: accountID, + }, + ) + if err != nil { + return nil, err } - users := resp.Results + body := strings.NewReader(string(bodyBytes)) + ratelimitData, err := c.post(ctx, getUsersUrl, nil, body) + if err != nil { + return ratelimitData, err + } + return ratelimitData, nil +} - if len(users) == 0 { - return users, "", nil +func (c *ConfluenceClient) RemoveUserFromGroup( + ctx context.Context, + accountID string, + groupId string, +) (*v2.RateLimitDescription, error) { + getUsersUrl, err := c.genURLNonPaginated( + fmt.Sprintf( + removeUsersFromGroupUrlPath, + groupId, + accountID, + ), + ) + if err != nil { + return nil, err } - token := incToken(pageToken, len(users)) + ratelimitData, err := c.delete(ctx, getUsersUrl, nil) + if err != nil { + return ratelimitData, err + } + return ratelimitData, nil +} + +func (c *ConfluenceClient) get( + ctx context.Context, + getUrl *url.URL, + target interface{}, +) (*v2.RateLimitDescription, error) { + return c.makeRequest(ctx, getUrl, target, http.MethodGet, nil) +} + +func (c *ConfluenceClient) post( + ctx context.Context, + postUrl *url.URL, + target interface{}, + requestBody io.Reader, +) (*v2.RateLimitDescription, error) { + return c.makeRequest(ctx, postUrl, target, http.MethodPost, requestBody) +} - return users, token, nil +func (c *ConfluenceClient) delete( + ctx context.Context, + deleteUrl *url.URL, + target interface{}, +) (*v2.RateLimitDescription, error) { + return c.makeRequest(ctx, deleteUrl, target, http.MethodDelete, nil) } -func (c *ConfluenceClient) get(ctx context.Context, u *url.URL, target interface{}) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) +func (c *ConfluenceClient) makeRequest( + ctx context.Context, + url *url.URL, + target interface{}, + method string, + requestBody io.Reader, +) (*v2.RateLimitDescription, error) { + req, err := http.NewRequestWithContext(ctx, method, url.String(), requestBody) if err != nil { - return err + return nil, err } req.SetBasicAuth(c.user, c.apiKey) - for { - resp, err := c.httpClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - retryAfter := strToInt(resp.Header.Get("Retry-After")) - - switch resp.StatusCode { - case http.StatusOK: - if err := json.NewDecoder(resp.Body).Decode(target); err != nil { - return fmt.Errorf("failed to decode response body for '%s': %w", u, err) - } - return nil - case http.StatusTooManyRequests: - if err := wait(ctx, retryAfter); err != nil { - return fmt.Errorf("confluence-connector: failed to wait for retry on '%s': %w", u, err) - } - continue - case http.StatusServiceUnavailable: - // Per the docs: transient 5XX errors should be treated as 429/too-many-requests if they have a retry header. - // 503 errors were the only ones explicitly called out, but I guess it's possible for others too. - // https://developer.atlassian.com/cloud/confluence/rate-limiting/ - if retryAfter != 0 { - if err := wait(ctx, retryAfter); err != nil { - return fmt.Errorf("confluence-connector: failed to wait for retry on '%s': %w", u, err) - } - continue - } - } + ratelimitData := v2.RateLimitDescription{} - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading non-200 response body: %w", err) - } + response, err := c.wrapper.Do( + req, + WithConfluenceRatelimitData(&ratelimitData), + uhttp.WithJSONResponse(target), + ) + if err == nil { + return &ratelimitData, nil + } + if response == nil { + return nil, err + } + defer response.Body.Close() - return &RequestError{ - URL: u, - Status: resp.StatusCode, - Body: string(body), - } + // If we get ratelimit data back (e.g. the "Retry-After" header) or a + // "ratelimit-like" status code, then return a recoverable gRPC code. + if isRatelimited(ratelimitData.Status, response.StatusCode) { + return &ratelimitData, status.Error(codes.Unavailable, response.Status) + } + + // If it's some other error, it is unrecoverable. + responseBody, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + return nil, &RequestError{ + URL: url, + Status: response.StatusCode, + Body: string(responseBody), } } +// genURLNonPaginated adds the given URL path to the API base URL. func (c *ConfluenceClient) genURLNonPaginated(path string) (*url.URL, error) { parsed, err := url.Parse(path) if err != nil { return nil, fmt.Errorf("failed to parse request path '%s': %w", path, err) } - u := c.apiBase.ResolveReference(parsed) - return u, nil + parsedUrl := c.apiBase.ResolveReference(parsed) + return parsedUrl, nil } +// genURL adds `start` and `limit` query parameters to a URL. This pagination +// parameter is only used by the v1 REST API. func (c *ConfluenceClient) genURL(pageToken string, pageSize int, path string) (*url.URL, error) { parsed, err := url.Parse(path) if err != nil { return nil, fmt.Errorf("failed to parse request path '%s': %w", path, err) } - u := c.apiBase.ResolveReference(parsed) + parsedUrl := c.apiBase.ResolveReference(parsed) - max := pageSize - if max == 0 || max > maxResults { - max = maxResults + maximum := pageSize + if maximum == 0 || maximum > maxResults { + maximum = maxResults } - q := u.Query() - q.Set("start", pageToken) - q.Set("limit", strconv.Itoa(max)) - u.RawQuery = q.Encode() + query := parsedUrl.Query() + query.Set("start", pageToken) + query.Set("limit", strconv.Itoa(maximum)) + parsedUrl.RawQuery = query.Encode() - return u, nil -} - -func wait(ctx context.Context, retryAfter int) error { - now := time.Now() - resetAt := now.Add(time.Duration(retryAfter) * time.Second) - - // Wait must be within min/max window - d := resetAt.Sub(now) - if d < minRatelimitWait { - d = minRatelimitWait - } else if d > maxRatelimitWait { - d = maxRatelimitWait - } - - select { - case <-time.After(d): - return nil - case <-ctx.Done(): - return ctx.Err() - } + return parsedUrl, nil } func incToken(pageToken string, count int) string { @@ -341,3 +350,86 @@ func strToInt(s string) int { } return i } + +// WithConfluenceRatelimitData Per the docs: transient 5XX errors should be +// treated as 429/too-many-requests if they have a retry header. 503 errors were +// the only ones explicitly called out, but I guess it's possible for others too +// https://developer.atlassian.com/cloud/confluence/rate-limiting/ +func WithConfluenceRatelimitData(resource *v2.RateLimitDescription) uhttp.DoOption { + return func(response *uhttp.WrapperResponse) error { + rateLimitData, err := helpers.ExtractRateLimitData(response.StatusCode, &response.Header) + if err != nil { + return err + } + resource = rateLimitData + return nil + } +} + +func isRatelimited( + ratelimitStatus v2.RateLimitDescription_Status, + statusCode int, +) bool { + return slices.Contains( + []v2.RateLimitDescription_Status{ + v2.RateLimitDescription_STATUS_OVERLIMIT, + v2.RateLimitDescription_STATUS_ERROR, + }, + ratelimitStatus, + ) || slices.Contains( + []int{ + http.StatusTooManyRequests, + http.StatusGatewayTimeout, + http.StatusServiceUnavailable, + }, + statusCode, + ) +} + +// GetUsersFromSearch There are no official, documented ways to get lists of +// users in Confluence. One way to get users is to issue a CQL search query with +// no conditions. The documentation mentions that queries return "up to 10k" +// users. So that may end up being a limitation of this approach. +func (c *ConfluenceClient) GetUsersFromSearch( + ctx context.Context, + pageToken string, + pageSize int, +) ( + []ConfluenceUser, + string, + *v2.RateLimitDescription, + error, +) { + getUsersUrl, err := c.parse( + SearchUrlPath, + withLimitAndOffset(pageToken, pageSize), + withQueryParameters(map[string]interface{}{ + "cql": "type=user", + "expand": "operations", + }), + ) + if err != nil { + return nil, "", nil, err + } + + var response *ConfluenceSearchList + ratelimitData, err := c.get(ctx, getUsersUrl, &response) + if err != nil { + return nil, "", ratelimitData, err + } + + users := make([]ConfluenceUser, 0) + for _, user := range response.Results { + users = append(users, user.User) + } + + // The only way we can tell that we've hit the end of the list is if we get + // back fewer results than we asked for. If we get the last page but there + // are `pageSize`, then `.List()` still has to fetch the blank next page. + if len(users) < pageSize { + return users, "", ratelimitData, nil + } + + token := incToken(pageToken, len(users)) + return users, token, ratelimitData, nil +} diff --git a/pkg/connector/client/models.go b/pkg/connector/client/models.go index fbd96ac..a14d827 100644 --- a/pkg/connector/client/models.go +++ b/pkg/connector/client/models.go @@ -1,17 +1,44 @@ package client +type ConfluenceLink struct { + Base string `json:"base"` + Next string `json:"next,omitempty"` +} + type ConfluenceUser struct { - AccountId string - AccountType string - DisplayName string - Email string + AccountId string `json:"accountId"` + AccountType string `json:"accountType"` + DisplayName string `json:"displayName"` + Email string `json:"email,omitempty"` + Operations []ConfluenceOperation `json:"operations,omitempty"` +} + +type ConfluenceOperation struct { + Operation string `json:"operation"` + TargetType string `json:"targetType"` } type confluenceUserList struct { - Start int - Limit int - Size int - Results []ConfluenceUser + Start int `json:"start"` + Limit int `json:"limit"` + Size int `json:"size"` + Links ConfluenceLink `json:"_links"` + Results []ConfluenceUser `json:"results"` +} + +type ConfluenceSearch struct { + EntityType string `json:"entityType"` + Score float64 `json:"score"` + Title string `json:"title"` + User ConfluenceUser `json:"user"` +} + +type ConfluenceSearchList struct { + Start int `json:"start"` + Limit int `json:"limit"` + TotalSize int `json:"totalSize"` + Size int `json:"size"` + Results []ConfluenceSearch `json:"results"` } type ConfluenceGroup struct { @@ -21,8 +48,13 @@ type ConfluenceGroup struct { } type confluenceGroupList struct { - Start int - Limit int - Size int - Results []ConfluenceGroup + Start int `json:"start"` + Limit int `json:"limit"` + Size int `json:"size"` + Links ConfluenceLink `json:"_links"` + Results []ConfluenceGroup `json:"results"` +} + +type AddUserToGroupRequestBody struct { + AccountId string `json:"accountId"` } diff --git a/pkg/connector/client/path.go b/pkg/connector/client/path.go new file mode 100644 index 0000000..f8ef2f7 --- /dev/null +++ b/pkg/connector/client/path.go @@ -0,0 +1,92 @@ +package client + +import ( + "fmt" + "net/url" + "strconv" +) + +const ( + CurrentUserUrlPath = "/wiki/rest/api/user/current" + GroupsListUrlPath = "/wiki/rest/api/group" + getUsersByGroupIdUrlPath = "/wiki/rest/api/group/%s/membersByGroupId" + groupBaseUrlPath = "/wiki/rest/api/group/userByGroupId" + spacePermissionsCreateUrlPath = "/wiki/rest/api/space/%s/permissions" + spacePermissionsUpdateUrlPath = "/wiki/rest/api/space/%s/permissions/%s" + SpacesListUrlPath = "/wiki/api/v2/spaces" + spacesGetUrlPath = "/wiki/api/v2/spaces/%s" + SpacePermissionsListUrlPath = "/wiki/api/v2/spaces/%s/permissions" + SearchUrlPath = "/wiki/rest/api/search/user" + addUsersToGroupUrlPath = "/wiki/rest/api/group/userByGroupId?groupId=%s" + removeUsersFromGroupUrlPath = "/wiki/rest/api/group/userByGroupId?groupId=%s&accountId=%s" +) + +type Option = func(*url.URL) (*url.URL, error) + +func withQueryParameters(parameters map[string]interface{}) Option { + return func(url *url.URL) (*url.URL, error) { + query := url.Query() + for key, interfaceValue := range parameters { + var stringValue string + switch actualValue := interfaceValue.(type) { + case string: + stringValue = actualValue + case int: + stringValue = strconv.Itoa(actualValue) + case bool: + if actualValue { + stringValue = "1" + } else { + stringValue = "0" + } + default: + return nil, fmt.Errorf("invalid query parameter type %s", actualValue) + } + query.Set(key, stringValue) + } + url.RawQuery = query.Encode() + return url, nil + } +} + +// withLimitAndOffset adds `start` and `limit` query parameters to a URL. This +// pagination parameter is only used by the v1 REST API. +func withLimitAndOffset(pageToken string, pageSize int) Option { + return withQueryParameters(map[string]interface{}{ + "limit": pageSize, + "start": pageToken, + }) +} + +// WithPaginationCursor uses Confluence Cloud's REST API v2 pagination scheme. +func WithPaginationCursor( + pageSize int, + paginationCursor string, +) Option { + parameters := map[string]interface{}{ + "limit": pageSize, + } + if paginationCursor != "" { + parameters["cursor"] = paginationCursor + } + + return withQueryParameters(parameters) +} + +func (c *ConfluenceClient) parse( + path string, + options ...Option, +) (*url.URL, error) { + parsed, err := url.Parse(path) + if err != nil { + return nil, fmt.Errorf("failed to parse request path '%s': %w", path, err) + } + parsedUrl := c.apiBase.ResolveReference(parsed) + for _, option := range options { + parsedUrl, err = option(parsedUrl) + if err != nil { + return nil, err + } + } + return parsedUrl, nil +} diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index e7a5616..88f071d 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -13,6 +13,7 @@ import ( const ( accountTypeAtlassian = "atlassian" // user account type + accountTypeApp = "app" // bot account type ) var ( diff --git a/pkg/connector/group.go b/pkg/connector/group.go index 36487f3..901d67c 100644 --- a/pkg/connector/group.go +++ b/pkg/connector/group.go @@ -8,11 +8,9 @@ import ( v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/pagination" - ent "github.com/conductorone/baton-sdk/pkg/types/entitlement" - grant "github.com/conductorone/baton-sdk/pkg/types/grant" - res "github.com/conductorone/baton-sdk/pkg/types/resource" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "go.uber.org/zap" + "github.com/conductorone/baton-sdk/pkg/types/entitlement" + "github.com/conductorone/baton-sdk/pkg/types/grant" + "github.com/conductorone/baton-sdk/pkg/types/resource" ) const ( @@ -35,9 +33,9 @@ func groupResource(ctx context.Context, group *client.ConfluenceGroup) (*v2.Reso "group_type": group.Type, } - groupTraitOptions := []res.GroupTraitOption{res.WithGroupProfile(profile)} + groupTraitOptions := []resource.GroupTraitOption{resource.WithGroupProfile(profile)} - resource, err := res.NewGroupResource( + newGroupResource, err := resource.NewGroupResource( group.Name, resourceTypeGroup, group.Id, @@ -47,10 +45,19 @@ func groupResource(ctx context.Context, group *client.ConfluenceGroup) (*v2.Reso return nil, err } - return resource, nil + return newGroupResource, nil } -func (o *groupResourceType) List(ctx context.Context, resourceId *v2.ResourceId, pt *pagination.Token) ([]*v2.Resource, string, annotations.Annotations, error) { +func (o *groupResourceType) List( + ctx context.Context, + resourceId *v2.ResourceId, + pt *pagination.Token, +) ( + []*v2.Resource, + string, + annotations.Annotations, + error, +) { bag := &pagination.Bag{} err := bag.Unmarshal(pt.Token) if err != nil { @@ -61,9 +68,10 @@ func (o *groupResourceType) List(ctx context.Context, resourceId *v2.ResourceId, ResourceTypeID: resourceTypeGroup.Id, }) } - groups, token, err := o.client.GetGroups(ctx, bag.PageToken(), ResourcesPageSize) + groups, token, ratelimitData, err := o.client.GetGroups(ctx, bag.PageToken(), ResourcesPageSize) + outputAnnotations := WithRateLimitAnnotations(ratelimitData) if err != nil { - return nil, "", nil, err + return nil, "", outputAnnotations, err } rv := make([]*v2.Resource, 0, len(groups)) @@ -72,7 +80,7 @@ func (o *groupResourceType) List(ctx context.Context, resourceId *v2.ResourceId, gr, err := groupResource(ctx, &groupCopy) if err != nil { - return nil, "", nil, err + return nil, "", outputAnnotations, err } rv = append(rv, gr) @@ -80,22 +88,31 @@ func (o *groupResourceType) List(ctx context.Context, resourceId *v2.ResourceId, nextPage, err := bag.NextToken(token) if err != nil { - return nil, "", nil, err + return nil, "", outputAnnotations, err } - return rv, nextPage, nil, nil + return rv, nextPage, outputAnnotations, nil } -func (o *groupResourceType) Entitlements(ctx context.Context, resource *v2.Resource, _ *pagination.Token) ([]*v2.Entitlement, string, annotations.Annotations, error) { +func (o *groupResourceType) Entitlements( + ctx context.Context, + resource *v2.Resource, + _ *pagination.Token, +) ( + []*v2.Entitlement, + string, + annotations.Annotations, + error, +) { var rv []*v2.Entitlement - assignmentOptions := []ent.EntitlementOption{ - ent.WithGrantableTo(resourceTypeUser), - ent.WithDisplayName(fmt.Sprintf("%s Group Member", resource.DisplayName)), - ent.WithDescription(fmt.Sprintf("Is member of the %s group in Confluence", resource.DisplayName)), + assignmentOptions := []entitlement.EntitlementOption{ + entitlement.WithGrantableTo(resourceTypeUser), + entitlement.WithDisplayName(fmt.Sprintf("%s Group Member", resource.DisplayName)), + entitlement.WithDescription(fmt.Sprintf("Is member of the %s group in Confluence", resource.DisplayName)), } - rv = append(rv, ent.NewAssignmentEntitlement( + rv = append(rv, entitlement.NewAssignmentEntitlement( resource, groupMemberEntitlement, assignmentOptions..., @@ -104,8 +121,16 @@ func (o *groupResourceType) Entitlements(ctx context.Context, resource *v2.Resou return rv, "", nil, nil } -func (o *groupResourceType) Grants(ctx context.Context, resource *v2.Resource, pt *pagination.Token) ([]*v2.Grant, string, annotations.Annotations, error) { - l := ctxzap.Extract(ctx) +func (o *groupResourceType) Grants( + ctx context.Context, + resource *v2.Resource, + pt *pagination.Token, +) ( + []*v2.Grant, + string, + annotations.Annotations, + error, +) { bag := &pagination.Bag{} err := bag.Unmarshal(pt.Token) if err != nil { @@ -117,15 +142,20 @@ func (o *groupResourceType) Grants(ctx context.Context, resource *v2.Resource, p }) } - users, token, err := o.client.GetGroupMembers(ctx, bag.PageToken(), ResourcesPageSize, resource.DisplayName) + users, token, ratelimitData, err := o.client.GetGroupMembers( + ctx, + bag.PageToken(), + ResourcesPageSize, + resource.Id.Resource, + ) + outputAnnotations := WithRateLimitAnnotations(ratelimitData) if err != nil { - return nil, "", nil, err + return nil, "", outputAnnotations, err } var rv []*v2.Grant for _, user := range users { - if user.AccountType != accountTypeAtlassian { - l.Debug("confluence: user is not of type atlassian", zap.Any("user", user)) + if !shouldIncludeUser(ctx, user) { continue } @@ -141,9 +171,36 @@ func (o *groupResourceType) Grants(ctx context.Context, resource *v2.Resource, p nextPage, err := bag.NextToken(token) if err != nil { - return nil, "", nil, err + return nil, "", outputAnnotations, err } - return rv, nextPage, nil, nil + return rv, nextPage, outputAnnotations, nil +} + +func (o *groupResourceType) Grant( + ctx context.Context, + principal *v2.Resource, + entitlement *v2.Entitlement, +) (annotations.Annotations, error) { + ratelimitData, err := o.client.AddUserToGroup( + ctx, + entitlement.Resource.Id.Resource, + principal.Id.Resource, + ) + outputAnnotations := WithRateLimitAnnotations(ratelimitData) + return outputAnnotations, err +} + +func (o *groupResourceType) Revoke( + ctx context.Context, + grant *v2.Grant, +) (annotations.Annotations, error) { + ratelimitData, err := o.client.RemoveUserFromGroup( + ctx, + grant.Entitlement.Resource.Id.Resource, + grant.Principal.Id.Resource, + ) + outputAnnotations := WithRateLimitAnnotations(ratelimitData) + return outputAnnotations, err } func groupBuilder(client *client.ConfluenceClient) *groupResourceType { diff --git a/pkg/connector/helpers.go b/pkg/connector/helpers.go index da1ed38..c058d6c 100644 --- a/pkg/connector/helpers.go +++ b/pkg/connector/helpers.go @@ -1,8 +1,13 @@ package connector import ( + "context" + + "github.com/conductorone/baton-confluence/pkg/connector/client" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/annotations" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" ) const ResourcesPageSize = 100 @@ -12,3 +17,28 @@ func annotationsForUserResourceType() annotations.Annotations { annos.Update(&v2.SkipEntitlementsAndGrants{}) return annos } + +func WithRateLimitAnnotations( + ratelimitDescriptionAnnotations ...*v2.RateLimitDescription, +) annotations.Annotations { + outputAnnotations := annotations.Annotations{} + for _, annotation := range ratelimitDescriptionAnnotations { + outputAnnotations.Append(annotation) + } + + return outputAnnotations +} + +// shouldIncludeUser only include extant, human users. +func shouldIncludeUser(ctx context.Context, user client.ConfluenceUser) bool { + logger := ctxzap.Extract(ctx) + if user.AccountType != accountTypeAtlassian { + logger.Debug("confluence: user is not of type atlassian", zap.Any("user", user)) + return false + } + if len(user.Operations) == 0 { + logger.Debug("confluence: user is deactivated (Unlicensed)", zap.Any("user", user)) + return false + } + return true +} diff --git a/pkg/connector/user.go b/pkg/connector/user.go index ffea8ab..ce682a7 100644 --- a/pkg/connector/user.go +++ b/pkg/connector/user.go @@ -2,21 +2,35 @@ package connector import ( "context" + "fmt" "github.com/conductorone/baton-confluence/pkg/connector/client" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/pagination" - resource "github.com/conductorone/baton-sdk/pkg/types/resource" + "github.com/conductorone/baton-sdk/pkg/types/resource" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" "go.uber.org/zap" ) +const ( + GroupPageSizeMaximum = 25 +) + type userResourceType struct { resourceType *v2.ResourceType client *client.ConfluenceClient } +// limitPageSizeForGroups - Enforcing an arbitrarily small page size limit for +// groups so that we can handle the 2D pagination scheme with a comfortable margin. +func limitPageSizeForGroups(pageSize int) int { + if pageSize <= 0 || pageSize > GroupPageSizeMaximum { + return GroupPageSizeMaximum + } + return pageSize +} + func (o *userResourceType) ResourceType(_ context.Context) *v2.ResourceType { return o.resourceType } @@ -35,7 +49,7 @@ func userResource(ctx context.Context, user *client.ConfluenceUser) (*v2.Resourc resource.WithStatus(v2.UserTrait_Status_STATUS_ENABLED), } - resource, err := resource.NewUserResource( + newUserResource, err := resource.NewUserResource( user.DisplayName, resourceTypeUser, user.AccountId, @@ -45,40 +59,231 @@ func userResource(ctx context.Context, user *client.ConfluenceUser) (*v2.Resourc return nil, err } - return resource, nil + return newUserResource, nil +} + +// parsePageToken given a marshalled pageToken as a string, return the pageToken +// bag and the current page number. +func parsePageToken( + pToken *pagination.Token, + resourceID *v2.ResourceId, +) ( + *pagination.Bag, + string, + int, + error, +) { + b := &pagination.Bag{} + err := b.Unmarshal(pToken.Token) + if err != nil { + return nil, "0", 0, err + } + + if b.Current() == nil { + b.Push( + pagination.PageState{ + ResourceTypeID: resourceID.ResourceType, + ResourceID: resourceID.Resource, + }, + ) + } + + page := b.PageToken() + size := pToken.Size + if size == 0 { + size = ResourcesPageSize + } + if page == "" { + page = "0" + } + return b, page, size, nil } -func (o *userResourceType) List(ctx context.Context, _ *v2.ResourceId, pt *pagination.Token) ([]*v2.Resource, string, annotations.Annotations, error) { - l := ctxzap.Extract(ctx) +func (o *userResourceType) List( + ctx context.Context, + _ *v2.ResourceId, + pToken *pagination.Token, +) ( + []*v2.Resource, + string, + annotations.Annotations, + error, +) { + logger := ctxzap.Extract(ctx) + logger.Debug("Starting Users List", zap.String("token", pToken.Token)) + + // Unfortunately, there is no Confluence Cloud REST API to get all users. We + // try to get all the users in a roundabout away by using three other APIs. + // First get all groups in Confluence and then for each group get all + // members. Finally, we use User Search to try and find any users that were + // not a part of any groups. - users, _, err := o.client.GetUsers(ctx, "", ResourcesPageSize) + bag, page, size, err := parsePageToken(pToken, &v2.ResourceId{}) if err != nil { return nil, "", nil, err } - rv := make([]*v2.Resource, 0) - for _, user := range users { - if user.AccountType != accountTypeAtlassian { - l.Debug("confluence: user is not of type atlassian", zap.Any("user", user)) - continue + + outputResources := make([]*v2.Resource, 0) + var outputAnnotations annotations.Annotations + switch bag.ResourceTypeID() { + case "": + users, nextToken, ratelimitData, err := o.client.GetUsersFromSearch(ctx, page, size) + outputAnnotations := WithRateLimitAnnotations(ratelimitData) + if err != nil { + return nil, "", outputAnnotations, err + } + for _, user := range users { + if !shouldIncludeUser(ctx, user) { + continue + } + + userCopy := user + newUserResource, err := userResource(ctx, &userCopy) + if err != nil { + return nil, "", nil, err + } + + outputResources = append(outputResources, newUserResource) } - userCopy := user - ur, err := userResource(ctx, &userCopy) + err = bag.Next(nextToken) if err != nil { return nil, "", nil, err } - rv = append(rv, ur) + if bag.Current() == nil { + logger.Debug("Finished User Search, moving on to 2D query") + bag.Push( + pagination.PageState{ + // Using "user" here as a placeholder so the for loop know to start again. + ResourceTypeID: resourceTypeUser.Id, + }, + ) + } + + case resourceTypeUser.Id: + logger.Debug("Got the placeholder from the bag", zap.String("page", page)) + if page == "" { + page = "0" + } + + size = limitPageSizeForGroups(size) + + // Add a new page of groups + groups, nextToken, ratelimitData, err := o.client.GetGroups( + ctx, + page, + size, + ) + logger.Debug( + "Got groups", + zap.Int("len", len(groups)), + zap.String("nextToken", nextToken), + ) + outputAnnotations = WithRateLimitAnnotations(ratelimitData) + if err != nil { + return nil, "", outputAnnotations, err + } + + // Push next page to stack. (Short-circuits if token is "".) + err = bag.Next(nextToken) + if err != nil { + return nil, "", outputAnnotations, err + } + + for _, group := range groups { + logger.Debug( + "adding a group to the bag", + zap.String("id", group.Id), + zap.String("name", group.Name), + ) + bag.Push( + pagination.PageState{ + ResourceTypeID: resourceTypeGroup.Id, + ResourceID: group.Id, + }, + ) + } + case resourceTypeGroup.Id: + currentState := bag.Current() + + start := currentState.Token + if start == "" { + start = "0" + } + logger.Debug( + "Got a group from the bag", + zap.String("start", start), + zap.String("group_id", currentState.ResourceID), + ) + + // Get users for this group. + users, nextToken, ratelimitData, err := o.client.GetGroupMembers( + ctx, + start, + size, + currentState.ResourceID, + ) + outputAnnotations = WithRateLimitAnnotations(ratelimitData) + if err != nil { + return nil, "", outputAnnotations, err + } + + // Push next page to stack. (Short-circuits if token is "".) + err = bag.Next(nextToken) + if err != nil { + return nil, "", outputAnnotations, err + } + + // Add users to output resources. There will be duplicates across groups. + for _, user := range users { + if !shouldIncludeUser(ctx, user) { + continue + } + + userCopy := user + newUserResource, err := userResource(ctx, &userCopy) + if err != nil { + return nil, "", nil, err + } + + outputResources = append(outputResources, newUserResource) + } + default: + return nil, "", nil, fmt.Errorf("unexpected resource type while fetching list of users") + } + + pageToken, err := bag.Marshal() + if err != nil { + return nil, "", nil, err } - return rv, "", nil, nil + return outputResources, pageToken, outputAnnotations, nil } -func (o *userResourceType) Entitlements(_ context.Context, _ *v2.Resource, _ *pagination.Token) ([]*v2.Entitlement, string, annotations.Annotations, error) { +func (o *userResourceType) Entitlements( + _ context.Context, + _ *v2.Resource, + _ *pagination.Token, +) ( + []*v2.Entitlement, + string, + annotations.Annotations, + error, +) { return nil, "", nil, nil } -func (o *userResourceType) Grants(_ context.Context, _ *v2.Resource, _ *pagination.Token) ([]*v2.Grant, string, annotations.Annotations, error) { +func (o *userResourceType) Grants( + _ context.Context, + _ *v2.Resource, + _ *pagination.Token, +) ( + []*v2.Grant, + string, + annotations.Annotations, + error, +) { return nil, "", nil, nil } diff --git a/pkg/connector/user_test.go b/pkg/connector/user_test.go index bb5fad2..a095c76 100644 --- a/pkg/connector/user_test.go +++ b/pkg/connector/user_test.go @@ -8,13 +8,14 @@ import ( "github.com/conductorone/baton-confluence/test" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/pagination" + mapset "github.com/deckarep/golang-set/v2" "github.com/stretchr/testify/require" ) func TestUsersList(t *testing.T) { ctx := context.Background() - t.Run("should get users, using pagination, ignoring robots", func(t *testing.T) { + t.Run("should get users, using pagination, ignoring robots & deactivated", func(t *testing.T) { server := test.FixturesServer() defer server.Close() @@ -32,7 +33,7 @@ func TestUsersList(t *testing.T) { resources := make([]*v2.Resource, 0) bag := &pagination.Bag{} for { - pToken := pagination.Token{} + pToken := pagination.Token{Size: 2} state := bag.Current() if state != nil { token, _ := bag.Marshal() @@ -55,9 +56,15 @@ func TestUsersList(t *testing.T) { } require.NotNil(t, resources) - // We expect there to be duplicates. - // require.Len(t, resources, 3) - require.Len(t, resources, 2) + // We expect there to be duplicates from users being in multiple groups + // and then showing up in User Search. + require.Len(t, resources, 6) require.NotEmpty(t, resources[0].Id) + + allIDs := mapset.NewSet[string]() + for _, resource := range resources { + allIDs.Add(resource.Id.Resource) + } + require.Equal(t, allIDs.Cardinality(), 2) }) } diff --git a/test/fixtures/users2.json b/test/fixtures/blank.json similarity index 100% rename from test/fixtures/users2.json rename to test/fixtures/blank.json diff --git a/test/fixtures/search0.json b/test/fixtures/search0.json new file mode 100644 index 0000000..06fe736 --- /dev/null +++ b/test/fixtures/search0.json @@ -0,0 +1,94 @@ +{ + "results": [ + { + "user": { + "type": "known", + "accountId": "234", + "accountType": "atlassian", + "email": "marcos.gaeta@conductorone.com", + "publicName": "Marcos Gaeta", + "profilePicture": { + "path": "/wiki/aa-avatar/234", + "width": 48, + "height": 48, + "isDefault": false + }, + "displayName": "Marcos Gaeta", + "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], + "_expandable": { + "personalSpace": "" + }, + "_links": { + "self": "https://conductorone.atlassian.net/wiki/rest/api/user?accountId=234" + } + } + }, + { + "user": { + "type": "known", + "accountId": "345", + "accountType": "app", + "email": "robot@conductorone.com", + "publicName": "Robot", + "profilePicture": { + "path": "/wiki/aa-avatar/345", + "width": 48, + "height": 48, + "isDefault": false + }, + "displayName": "Robot", + "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], + "_expandable": { + "personalSpace": "" + }, + "_links": { + "self": "https://conductorone.atlassian.net/wiki/rest/api/user?accountId=345" + } + } + }, + { + "user": { + "type": "known", + "accountId": "321", + "accountType": "atlassian", + "email": "deactivated@conductorone.com", + "publicName": "Deactivated (Unlicensed)", + "profilePicture": { + "path": "/wiki/aa-avatar/321", + "width": 48, + "height": 48, + "isDefault": false + }, + "displayName": "Deactivated", + "isExternalCollaborator": false, + "operations": [], + "_expandable": { + "personalSpace": "" + }, + "_links": { + "self": "https://conductorone.atlassian.net/wiki/rest/api/user?accountId=321" + } + } + } + ], + "start": 0, + "limit": 200, + "size": 2, + "_links": { + "base": "https://conductorone.atlassian.net/wiki", + "context": "/wiki", + "self": "https://conductorone.atlassian.net/wiki/rest/api/group/123/membersByGroupId" + } +} \ No newline at end of file diff --git a/test/fixtures/search1.json b/test/fixtures/search1.json new file mode 100644 index 0000000..ae4ce64 --- /dev/null +++ b/test/fixtures/search1.json @@ -0,0 +1,70 @@ +{ + "results": [ + { + "user": { + "type": "known", + "accountId": "234", + "accountType": "atlassian", + "email": "marcos.gaeta@conductorone.com", + "publicName": "Marcos Gaeta", + "profilePicture": { + "path": "/wiki/aa-avatar/234", + "width": 48, + "height": 48, + "isDefault": false + }, + "displayName": "Marcos Gaeta", + "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], + "_expandable": { + "personalSpace": "" + }, + "_links": { + "self": "https://conductorone.atlassian.net/wiki/rest/api/user?accountId=234" + } + } + }, + { + "user": { + "type": "known", + "accountId": "789", + "accountType": "atlassian", + "email": "other.user@conductorone.com", + "publicName": "Other User", + "profilePicture": { + "path": "/wiki/aa-avatar/789", + "width": 48, + "height": 48, + "isDefault": false + }, + "displayName": "Other User", + "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], + "_expandable": { + "personalSpace": "" + }, + "_links": { + "self": "https://conductorone.atlassian.net/wiki/rest/api/user?accountId=789" + } + } + } + ], + "start": 0, + "limit": 200, + "size": 2, + "_links": { + "base": "https://conductorone.atlassian.net/wiki", + "context": "/wiki", + "self": "https://conductorone.atlassian.net/wiki/rest/api/group/456/membersByGroupId" + } +} \ No newline at end of file diff --git a/test/fixtures/users0.json b/test/fixtures/users0.json index b73b73b..b2409ca 100644 --- a/test/fixtures/users0.json +++ b/test/fixtures/users0.json @@ -14,8 +14,13 @@ }, "displayName": "Marcos Gaeta", "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], "_expandable": { - "operations": "", "personalSpace": "" }, "_links": { @@ -36,8 +41,13 @@ }, "displayName": "Robot", "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], "_expandable": { - "operations": "", "personalSpace": "" }, "_links": { diff --git a/test/fixtures/users1.json b/test/fixtures/users1.json index 673ae28..3b35158 100644 --- a/test/fixtures/users1.json +++ b/test/fixtures/users1.json @@ -14,8 +14,13 @@ }, "displayName": "Marcos Gaeta", "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], "_expandable": { - "operations": "", "personalSpace": "" }, "_links": { @@ -36,8 +41,13 @@ }, "displayName": "Other User", "isExternalCollaborator": false, + "operations": [ + { + "targetType": "application", + "operation": "use" + } + ], "_expandable": { - "operations": "", "personalSpace": "" }, "_links": { diff --git a/test/testhelpers.go b/test/testhelpers.go index 3ef0520..784591c 100644 --- a/test/testhelpers.go +++ b/test/testhelpers.go @@ -1,6 +1,7 @@ package test import ( + "fmt" "net/http" "net/http/httptest" "os" @@ -49,8 +50,9 @@ func FixturesServer() *httptest.Server { var filename string routeUrl := request.URL.String() switch { - case strings.Contains(routeUrl, "group/member") && strings.Contains(routeUrl, "start=2"): - filename = "../../test/fixtures/users2.json" + case strings.Contains(routeUrl, "group/member") && strings.Contains(routeUrl, "start=2") || + (strings.Contains(routeUrl, client.SearchUrlPath) && strings.Contains(routeUrl, "start=5")): + filename = "../../test/fixtures/blank.json" case (strings.Contains(routeUrl, "group/member") && strings.Contains(routeUrl, "confluence-users")) || (strings.Contains(routeUrl, client.GroupsListUrlPath) && strings.Contains(routeUrl, "123")): filename = "../../test/fixtures/users0.json" @@ -67,9 +69,13 @@ func FixturesServer() *httptest.Server { filename = "../../test/fixtures/spaces1.json" case strings.Contains(routeUrl, client.SpacesListUrlPath) && !strings.Contains(routeUrl, "cursor"): filename = "../../test/fixtures/spaces0.json" + case strings.Contains(routeUrl, client.SearchUrlPath) && strings.Contains(routeUrl, "start=0"): + filename = "../../test/fixtures/search0.json" + case strings.Contains(routeUrl, client.SearchUrlPath) && strings.Contains(routeUrl, "start=3"): + filename = "../../test/fixtures/search1.json" default: // This should never happen in tests. - return + panic(fmt.Errorf("bad url: %s", routeUrl)) } data, _ := os.ReadFile(filename) _, err := writer.Write(data)