Skip to content

Commit

Permalink
feat: add weka_api function
Browse files Browse the repository at this point in the history
  • Loading branch information
rugggger committed Jan 7, 2025
1 parent 90c7be9 commit f44f34e
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 137 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/tf-style-checks.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Terrafrom style check
name: Terraform style check

on:
pull_request:
Expand All @@ -7,7 +7,10 @@ jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: terraform-linters/setup-tflint@v2
- uses: pre-commit/action@v3.0.0
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: hashicorp/setup-terraform@v2
with:
terraform_version: "1.5.7"
- uses: terraform-linters/setup-tflint@v2
- uses: pre-commit/action@v3.0.0
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ The `helper_commands` part in the output provides lambda call that can be used t
| [aws_lambda_function.status_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_function.terminate_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_function.transient_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_function.weka_api](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_permission.invoke_lambda_permission](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_permission) | resource |
| [aws_launch_template.launch_template](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/launch_template) | resource |
| [aws_lb.alb](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lb) | resource |
Expand Down Expand Up @@ -887,8 +888,8 @@ The `helper_commands` part in the output provides lambda call that can be used t
| <a name="input_lambda_iam_role_arn"></a> [lambda\_iam\_role\_arn](#input\_lambda\_iam\_role\_arn) | IAM Role that will be used by AWS Lambdas, if not specified will be created automatically. If pre-created should match policy described in readme | `string` | `""` | no |
| <a name="input_lambdas_custom_s3_bucket"></a> [lambdas\_custom\_s3\_bucket](#input\_lambdas\_custom\_s3\_bucket) | S3 bucket name for lambdas | `string` | `null` | no |
| <a name="input_lambdas_custom_s3_key"></a> [lambdas\_custom\_s3\_key](#input\_lambdas\_custom\_s3\_key) | S3 key for lambdas | `string` | `null` | no |
| <a name="input_lambdas_dist"></a> [lambdas\_dist](#input\_lambdas\_dist) | Lambdas code dist | `string` | `"release"` | no |
| <a name="input_lambdas_version"></a> [lambdas\_version](#input\_lambdas\_version) | Lambdas code version (hash) | `string` | `"871ba1d14342e39e37ce7f0466aecd4c"` | no |
| <a name="input_lambdas_dist"></a> [lambdas\_dist](#input\_lambdas\_dist) | Lambdas code dist | `string` | `"dev"` | no |
| <a name="input_lambdas_version"></a> [lambdas\_version](#input\_lambdas\_version) | Lambdas code version (hash) | `string` | `"3efb652a45b2118f03e32766190d5c73"` | no |
| <a name="input_metadata_http_tokens"></a> [metadata\_http\_tokens](#input\_metadata\_http\_tokens) | Whether or not the metadata service requires session tokens, also referred to as Instance Metadata Service Version 2 (IMDSv2) | `string` | `"required"` | no |
| <a name="input_nat_public_subnet_cidr"></a> [nat\_public\_subnet\_cidr](#input\_nat\_public\_subnet\_cidr) | CIDR block for public subnet | `string` | `"10.0.2.0/24"` | no |
| <a name="input_nfs_capacity_reservation_id"></a> [nfs\_capacity\_reservation\_id](#input\_nfs\_capacity\_reservation\_id) | The ID of the capacity reservation in which to run the nfs clients | `string` | `null` | no |
Expand Down Expand Up @@ -987,5 +988,6 @@ The `helper_commands` part in the output provides lambda call that can be used t
| <a name="output_smb_protocol_gateways_name"></a> [smb\_protocol\_gateways\_name](#output\_smb\_protocol\_gateways\_name) | Name of SMB protocol gateway instances |
| <a name="output_subnet_ids"></a> [subnet\_ids](#output\_subnet\_ids) | Subnet ids of backends |
| <a name="output_vpc_id"></a> [vpc\_id](#output\_vpc\_id) | VPC id |
| <a name="output_weka_api_name"></a> [weka\_api\_name](#output\_weka\_api\_name) | n/a |
| <a name="output_weka_cluster_admin_password_secret_id"></a> [weka\_cluster\_admin\_password\_secret\_id](#output\_weka\_cluster\_admin\_password\_secret\_id) | Secret id of weka admin password |
<!-- END_TF_DOCS -->
36 changes: 35 additions & 1 deletion lambdas.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ locals {
s3_key = var.lambdas_custom_s3_key != null ? var.lambdas_custom_s3_key : "${var.lambdas_dist}/${var.lambdas_version}.zip"
functions = toset([
"deploy", "clusterize", "report", "clusterize-finalization", "status", "scale-down", "fetch", "terminate",
"transient", "join-nfs-finalization"
"transient", "join-nfs-finalization", "weka-api"
])
enable_lambda_vpc = var.enable_lambda_vpc_config ? 1 : 0
obs_prefix = lookup(var.custom_prefix, "obs", var.prefix)
Expand Down Expand Up @@ -291,6 +291,7 @@ resource "aws_lambda_function" "status_lambda" {
STATE_KEY = local.state_key
NFS_STATE_KEY = local.nfs_state_key
CLUSTER_NAME = var.cluster_name
WEKA_API_LAMBDA = aws_lambda_function.weka_api.function_name
MANAGEMENT_LAMBDA = aws_lambda_function.management.function_name
USERNAME_ID = aws_secretsmanager_secret.weka_username.id
DEPLOYMENT_PASSWORD_ID = aws_secretsmanager_secret.weka_deployment_password.id
Expand All @@ -302,6 +303,39 @@ resource "aws_lambda_function" "status_lambda" {
depends_on = [aws_cloudwatch_log_group.lambdas_log_group]
}

resource "aws_lambda_function" "weka_api" {
function_name = "${var.prefix}-${var.cluster_name}-weka-api-lambda"
s3_bucket = local.s3_bucket
s3_key = local.s3_key
handler = local.handler_name
role = local.lambda_iam_role_arn
memory_size = 128
timeout = 20
runtime = "provided.al2"
architectures = ["arm64"]
dynamic "vpc_config" {
for_each = range(0, local.enable_lambda_vpc)
content {
security_group_ids = local.sg_ids
subnet_ids = local.subnet_ids
}
}
environment {
variables = {
LAMBDA = "weka-api"
CLUSTER_NAME = var.cluster_name
MANAGEMENT_LAMBDA = aws_lambda_function.management.function_name
USERNAME_ID = aws_secretsmanager_secret.weka_username.id
DEPLOYMENT_PASSWORD_ID = aws_secretsmanager_secret.weka_deployment_password.id
ADMIN_PASSWORD_ID = aws_secretsmanager_secret.weka_password.id
USE_SECRETMANAGER_ENDPOINT = var.secretmanager_use_vpc_endpoint
}
}
tags = var.tags_map
depends_on = [aws_cloudwatch_log_group.lambdas_log_group]
}


resource "aws_lambda_function" "fetch_lambda" {
function_name = "${local.lambda_prefix}-${var.cluster_name}-fetch-lambda"
s3_bucket = local.s3_bucket
Expand Down
63 changes: 63 additions & 0 deletions lambdas/functions/management/management.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package management

import (
"context"
"encoding/json"
"fmt"
"github.com/weka/go-cloud-lib/connectors"
"github.com/weka/go-cloud-lib/lib/jrpc"
"github.com/weka/go-cloud-lib/lib/weka"
"github.com/weka/go-cloud-lib/logging"
)

type WekaApiRequest struct {
Method weka.JrpcMethod `json:"method"`
Params map[string]string `json:"params"`
}

type ManagementRequest struct {
WekaApiRequest

Username string `json:"username"`
Password string `json:"password"`
BackendPrivateIps []string `json:"backend_private_ips"`
}

func CallJRPC(ctx context.Context, request ManagementRequest) (json.RawMessage, error) {
logger := logging.LoggerFromCtx(ctx)
logger.Debug().Msg("CallJRPC > Start")
logger.Info().Msgf("CallJRPC > method %s", request.Method)

var jrpcResponse json.RawMessage

logger.Debug().Msgf("CallJRPC > Username: %s", request.Username)
jrpcBuilder := func(ip string) *jrpc.BaseClient {
return connectors.NewJrpcClient(ctx, ip, weka.ManagementJrpcPort, request.Username, request.Password)
}

ips := request.BackendPrivateIps
if len(ips) == 0 {
return nil, fmt.Errorf("CallJRPC - backend private ips are empty")
}
logger.Debug().Msgf("CallJRPC > BackendPrivateIps: %v", ips)

var params interface{}
if request.Params != nil {
params = request.Params
} else {
params = struct{}{}
}

jpool := &jrpc.Pool{
Ips: ips,
Clients: map[string]*jrpc.BaseClient{},
Active: "",
Builder: jrpcBuilder,
Ctx: ctx,
}

if err := jpool.Call(request.Method, params, &jrpcResponse); err != nil {
return nil, fmt.Errorf("CallJRPC - call [%s] failed > %w", request.Method, err)
}
return jrpcResponse, nil
}
87 changes: 87 additions & 0 deletions lambdas/functions/weka_api/weka_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package weka_api

import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/common"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/management"
"github.com/weka/go-cloud-lib/logging"
"os"
"strconv"
)

func MakeWekaApiRequest[T any](ctx context.Context, wr *management.WekaApiRequest) (response *T, err error) {
logger := logging.LoggerFromCtx(ctx)
logger.Debug().Msg("MakeWekaApiRequest > Start")

r, err := invokeManagementLambda[T](ctx, wr)
if err != nil {
return nil, fmt.Errorf("MakeWekaApiRequest > lambda invocation failed: %v", err)
}
return r, nil
}

func invokeManagementLambda[T any](ctx context.Context, wr *management.WekaApiRequest) (response *T, err error) {
logger := logging.LoggerFromCtx(ctx)
logger.Debug().Msg("invokeManagementLambda > Start")
logger.Info().Msgf("invokeManagementLambda > Params: %s", wr.Params)

managementLambdaName := os.Getenv("MANAGEMENT_LAMBDA")
if managementLambdaName == "" {
return nil, fmt.Errorf("MANAGEMENT_LAMBDA is not set")
}
if wr.Method == "" {
return nil, fmt.Errorf("weka-api method is not set")
}

useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
if err != nil {
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
}
var username, password string
if !useSecretManagerEndpoint {
log.Info().Msg("Secret manager endpoint not in use, sending credentials in body")
usernameId := os.Getenv("USERNAME_ID")
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
if err != nil {
return nil, fmt.Errorf("invokeManagementLambda > GetDeploymentOrAdminUsernameAndPassword: %w", err)
}

username = creds.Username
password = creds.Password
}

clusterName := os.Getenv("CLUSTER_NAME")
if clusterName == "" {
return nil, fmt.Errorf("CLUSTER_NAME is not set")
}
ips, err := common.GetBackendsPrivateIps(clusterName, "backend")
if err != nil {
return nil, fmt.Errorf("invokeManagementLambda > GetBackendsPrivateIps: %w", err)
}
log.Info().Msgf("invokeManagementLambda > Backend private IPs: %v", ips)

logger.Debug().Msgf("invokeManagementLambda > Username: %s", username)

managementRequest := management.ManagementRequest{
WekaApiRequest: management.WekaApiRequest{
Method: wr.Method,
Params: wr.Params,
},
BackendPrivateIps: ips,
Username: username,
Password: password, // empty string is interpreted as no credentials
}

response, err = common.InvokeLambdaFunction[T](managementLambdaName, managementRequest)
if err != nil {
wrappedError := fmt.Errorf("invokeManagementLambda >: %w", err)
log.Error().Err(wrappedError).Send()
}

return response, nil

}
95 changes: 30 additions & 65 deletions lambdas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/weka/go-cloud-lib/lib/weka"
"os"
"strconv"
"strings"
Expand All @@ -13,8 +14,10 @@ import (
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/common"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/connectors"
lambdas "github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/fetch"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/management"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/terminate"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/management"
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/weka_api"

"github.com/weka/go-cloud-lib/logging"
"github.com/weka/go-cloud-lib/scale_down"

Expand Down Expand Up @@ -315,55 +318,17 @@ func getClusterStatus(ctx context.Context, stateTable, stateTableHashKey, stateK
return clusterStatus, nil
}

clusterName := os.Getenv("CLUSTER_NAME")
if clusterName == "" {
return protocol.ClusterStatus{}, fmt.Errorf("CLUSTER_NAME is not set")
}
ips, err := common.GetBackendsPrivateIps(clusterName, "backend")
if err != nil {
return protocol.ClusterStatus{}, fmt.Errorf("getClusterStatus > GetBackendsPrivateIps: %w", err)
}
log.Info().Msgf("GetClusterStatus > Backend private IPs: %v", ips)

managementLambdaName := os.Getenv("MANAGEMENT_LAMBDA")
if managementLambdaName == "" {
return protocol.ClusterStatus{}, fmt.Errorf("MANAGEMENT_LAMBDA is not set")
}

useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
if err != nil {
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
}
var username, password string
if !useSecretManagerEndpoint {
log.Info().Msg("Secret manager endpoint not in use, sending credentials in body")
usernameId := os.Getenv("USERNAME_ID")
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
if err != nil {
return protocol.ClusterStatus{}, fmt.Errorf("getClusterStatus > GetDeploymentOrAdminUsernameAndPassword: %w", err)
}

username = creds.Username
password = creds.Password
}

managementRequest := management.ManagementRequest{
Type: "status",
WekaStatusRequest: management.WekaStatusRequest{
BackendPrivateIps: ips,
Username: username,
Password: password, // empty string is interpreted as no credentials
},
wekaApiRequest := management.WekaApiRequest{
Method: weka.JrpcStatus,
}
var wekaStatus *protocol.WekaStatus
wekaStatus, err = common.InvokeLambdaFunction[protocol.WekaStatus](managementLambdaName, managementRequest)
wekaStatus, err = weka_api.MakeWekaApiRequest[protocol.WekaStatus](ctx, &wekaApiRequest)
if err != nil {
wrappedError := fmt.Errorf("getClusterStatus > InvokeLambdaFunction: %w", err)
wrappedError := fmt.Errorf("getClusterStatus > MakeWekaApiRequest: %w", err)
log.Error().Err(wrappedError).Send()
wekaStatus = &protocol.WekaStatus{}
}

clusterStatus.WekaStatus = *wekaStatus

return clusterStatus, nil
Expand Down Expand Up @@ -423,30 +388,28 @@ func scaleDownHandler(ctx context.Context, info protocol.HostGroupInfoResponse)
return scale_down.ScaleDown(ctx, info)
}

func managementHandler(ctx context.Context, req management.ManagementRequest) (protocol.WekaStatus, error) {
switch req.Type {
case "status":
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
func managementHandler(ctx context.Context, req management.ManagementRequest) (interface{}, error) {
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
if err != nil {
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
}
if useSecretManagerEndpoint && req.Password == "" {
usernameId := os.Getenv("USERNAME_ID")
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
if err != nil {
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
log.Error().Err(err).Send()
return nil, err
}
if useSecretManagerEndpoint && req.Password == "" {
usernameId := os.Getenv("USERNAME_ID")
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
if err != nil {
log.Error().Err(err).Send()
return protocol.WekaStatus{}, err
}
req.Username = creds.Username
req.Password = creds.Password
}
return management.GetWekaStatus(ctx, req.WekaStatusRequest)
default:
log.Error().Msgf("Invalid management type: %s", req.Type)
return protocol.WekaStatus{}, fmt.Errorf("invalid management type: %s", req.Type)
req.Username = creds.Username
req.Password = creds.Password
}
return management.CallJRPC(ctx, req)
}

func wekaApiHandler(ctx context.Context, req management.WekaApiRequest) (interface{}, error) {
return weka_api.MakeWekaApiRequest[interface{}](ctx, &req)
}

func main() {
Expand All @@ -473,6 +436,8 @@ func main() {
lambda.Start(transientHandler)
case "management":
lambda.Start(managementHandler)
case "weka-api":
lambda.Start(wekaApiHandler)
default:
lambda.Start(func() error { return fmt.Errorf("unsupported lambda command") })
}
Expand Down
Loading

0 comments on commit f44f34e

Please sign in to comment.