From d1dfb05fbb3bcc59cc2622b6b2d02ebadf1cf33f Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Tue, 3 Oct 2023 03:35:23 -0700 Subject: [PATCH] feat: add dynamic tables to sdk (#2074) * add dynamic tables to sdk * update dynamic tables * minor fixes * update * fix int error * update validations * go fmt --- .vscode/launch.json | 7 + pkg/sdk/client.go | 2 + pkg/sdk/database_role_test.go | 2 +- pkg/sdk/dynamic_table.go | 227 ++++++++++++++++++++++ pkg/sdk/dynamic_table_dto.go | 51 +++++ pkg/sdk/dynamic_table_dto_builders.go | 107 ++++++++++ pkg/sdk/dynamic_table_impl.go | 113 +++++++++++ pkg/sdk/dynamic_table_integration_test.go | 156 +++++++++++++++ pkg/sdk/dynamic_table_test.go | 201 +++++++++++++++++++ pkg/sdk/dynamic_table_validations.go | 115 +++++++++++ pkg/sdk/helper_test.go | 58 ++++++ pkg/sdk/poc/main.go | 2 +- 12 files changed, 1039 insertions(+), 2 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 pkg/sdk/dynamic_table.go create mode 100644 pkg/sdk/dynamic_table_dto.go create mode 100644 pkg/sdk/dynamic_table_dto_builders.go create mode 100644 pkg/sdk/dynamic_table_impl.go create mode 100644 pkg/sdk/dynamic_table_integration_test.go create mode 100644 pkg/sdk/dynamic_table_test.go create mode 100644 pkg/sdk/dynamic_table_validations.go diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000000..5c7247b40a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,7 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [] +} \ No newline at end of file diff --git a/pkg/sdk/client.go b/pkg/sdk/client.go index 37ea9ef7a4..bf291ac64a 100644 --- a/pkg/sdk/client.go +++ b/pkg/sdk/client.go @@ -31,6 +31,7 @@ type Client struct { Comments Comments Databases Databases DatabaseRoles DatabaseRoles + DynamicTables DynamicTables ExternalTables ExternalTables FailoverGroups FailoverGroups FileFormats FileFormats @@ -130,6 +131,7 @@ func (c *Client) initialize() { c.ConversionFunctions = &conversionFunctions{client: c} c.Databases = &databases{client: c} c.DatabaseRoles = &databaseRoles{client: c} + c.DynamicTables = &dynamicTables{client: c} c.ExternalTables = &externalTables{client: c} c.FailoverGroups = &failoverGroups{client: c} c.FileFormats = &fileFormats{client: c} diff --git a/pkg/sdk/database_role_test.go b/pkg/sdk/database_role_test.go index 446068d4ba..d26d734789 100644 --- a/pkg/sdk/database_role_test.go +++ b/pkg/sdk/database_role_test.go @@ -195,7 +195,7 @@ func TestDatabaseRolesShow(t *testing.T) { } t.Run("validation: nil options", func(t *testing.T) { - var opts *ShowPipeOptions = nil + var opts *showDatabaseRoleOptions = nil assertOptsInvalidJoinedErrors(t, opts, errNilOptions) }) diff --git a/pkg/sdk/dynamic_table.go b/pkg/sdk/dynamic_table.go new file mode 100644 index 0000000000..89f4abca68 --- /dev/null +++ b/pkg/sdk/dynamic_table.go @@ -0,0 +1,227 @@ +package sdk + +import ( + "context" + "database/sql" + "time" +) + +type DynamicTables interface { + Create(ctx context.Context, request *CreateDynamicTableRequest) error + Alter(ctx context.Context, request *AlterDynamicTableRequest) error + Describe(ctx context.Context, request *DescribeDynamicTableRequest) (*DynamicTableDetails, error) + Drop(ctx context.Context, request *DropDynamicTableRequest) error + Show(ctx context.Context, opts *ShowDynamicTableRequest) ([]DynamicTable, error) + ShowByID(ctx context.Context, id AccountObjectIdentifier) (*DynamicTable, error) +} + +// createDynamicTableOptions is based on https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table +type createDynamicTableOptions struct { + create bool `ddl:"static" sql:"CREATE"` + OrReplace *bool `ddl:"keyword" sql:"OR REPLACE"` + dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLE"` + name SchemaObjectIdentifier `ddl:"identifier"` + targetLag TargetLag `ddl:"parameter,no_quotes" sql:"TARGET_LAG"` + warehouse AccountObjectIdentifier `ddl:"identifier,equals" sql:"WAREHOUSE"` + Comment *string `ddl:"parameter,single_quotes" sql:"COMMENT"` + query string `ddl:"parameter,no_equals,no_quotes" sql:"AS"` +} + +type TargetLag struct { + Lagtime *string `ddl:"keyword,single_quotes"` + Downstream *bool `ddl:"keyword" sql:"DOWNSTREAM"` +} + +type DynamicTableSet struct { + TargetLag *TargetLag `ddl:"parameter,no_quotes" sql:"TARGET_LAG"` + Warehouse *AccountObjectIdentifier `ddl:"identifier,equals" sql:"WAREHOUSE"` +} + +// alterDynamicTableOptions is based on https://docs.snowflake.com/en/sql-reference/sql/alter-dynamic-table +type alterDynamicTableOptions struct { + alter bool `ddl:"static" sql:"ALTER"` + dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLE"` + name SchemaObjectIdentifier `ddl:"identifier"` + + Suspend *bool `ddl:"keyword" sql:"SUSPEND"` + Resume *bool `ddl:"keyword" sql:"RESUME"` + Refresh *bool `ddl:"keyword" sql:"REFRESH"` + Set *DynamicTableSet `ddl:"keyword" sql:"SET"` +} + +// dropDynamicTableOptions is based on https://docs.snowflake.com/en/sql-reference/sql/drop-dynamic-table +type dropDynamicTableOptions struct { + drop bool `ddl:"static" sql:"DROP"` + dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLE"` + name SchemaObjectIdentifier `ddl:"identifier"` +} + +// showDynamicTableOptions is based on https://docs.snowflake.com/en/sql-reference/sql/show-dynamic-tables +type showDynamicTableOptions struct { + show bool `ddl:"static" sql:"SHOW"` + dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLES"` + Like *Like `ddl:"keyword" sql:"LIKE"` + In *In `ddl:"keyword" sql:"IN"` + StartsWith *string `ddl:"parameter,single_quotes,no_equals" sql:"STARTS WITH"` + Limit *LimitFrom `ddl:"keyword" sql:"LIMIT"` +} + +type DynamicTableRefreshMode string + +const ( + DynamicTableRefreshModeIncremental DynamicTableRefreshMode = "INCREMENTAL" + DynamicTableRefreshModeFull DynamicTableRefreshMode = "FULL" +) + +type DynamicTableSchedulingState string + +const ( + DynamicTableSchedulingStateRunning DynamicTableSchedulingState = "RUNNING" + DynamicTableSchedulingStateSuspended DynamicTableSchedulingState = "SUSPENDED" +) + +type DynamicTable struct { + CreatedOn time.Time + Name string + Reserved string + DatabaseName string + SchemaName string + ClusterBy string + Rows int + Bytes int + Owner string + TargetLag string + RefreshMode DynamicTableRefreshMode + RefreshModeReason string + Warehouse string + Comment string + Text string + AutomaticClustering bool + SchedulingState DynamicTableSchedulingState + LastSuspendedOn time.Time + IsClone bool + IsReplica bool + DataTimestamp time.Time +} + +func (dt *DynamicTable) ID() SchemaObjectIdentifier { + return NewSchemaObjectIdentifier(dt.DatabaseName, dt.SchemaName, dt.Name) +} + +type dynamicTableRow struct { + CreatedOn time.Time `db:"created_on"` + Name string `db:"name"` + Reserved string `db:"reserved"` + DatabaseName string `db:"database_name"` + SchemaName string `db:"schema_name"` + ClusterBy string `db:"cluster_by"` + Rows int `db:"rows"` + Bytes int `db:"bytes"` + Owner string `db:"owner"` + TargetLag string `db:"target_lag"` + RefreshMode string `db:"refresh_mode"` + RefreshModeReason sql.NullString `db:"refresh_mode_reason"` + Warehouse string `db:"warehouse"` + Comment string `db:"comment"` + Text string `db:"text"` + AutomaticClustering string `db:"automatic_clustering"` + SchedulingState string `db:"scheduling_state"` + LastSuspendedOn sql.NullTime `db:"last_suspended_on"` + IsClone bool `db:"is_clone"` + IsReplica bool `db:"is_replica"` + DataTimestamp time.Time `db:"data_timestamp"` +} + +func (dtr dynamicTableRow) convert() *DynamicTable { + dt := &DynamicTable{ + CreatedOn: dtr.CreatedOn, + Name: dtr.Name, + Reserved: dtr.Reserved, + DatabaseName: dtr.DatabaseName, + SchemaName: dtr.SchemaName, + ClusterBy: dtr.ClusterBy, + Rows: dtr.Rows, + Bytes: dtr.Bytes, + Owner: dtr.Owner, + TargetLag: dtr.TargetLag, + RefreshMode: DynamicTableRefreshMode(dtr.RefreshMode), + Warehouse: dtr.Warehouse, + Comment: dtr.Comment, + Text: dtr.Text, + AutomaticClustering: dtr.AutomaticClustering == "ON", // "ON" or "OFF + SchedulingState: DynamicTableSchedulingState(dtr.SchedulingState), + IsClone: dtr.IsClone, + IsReplica: dtr.IsReplica, + DataTimestamp: dtr.DataTimestamp, + } + if dtr.RefreshModeReason.Valid { + dt.RefreshModeReason = dtr.RefreshModeReason.String + } + if dtr.LastSuspendedOn.Valid { + dt.LastSuspendedOn = dtr.LastSuspendedOn.Time + } + return dt +} + +// describeDynamicTableOptions is based on https://docs.snowflake.com/en/sql-reference/sql/desc-dynamic-table +type describeDynamicTableOptions struct { + describe bool `ddl:"static" sql:"DESCRIBE"` + dynamicTable bool `ddl:"static" sql:"DYNAMIC TABLE"` + name SchemaObjectIdentifier `ddl:"identifier"` +} + +type DynamicTableDetails struct { + Name string + Type DataType + Kind string + IsNull bool + Default string + PrimaryKey string + UniqueKey string + Check string + Expression string + Comment string + PolicyName string +} + +type dynamicTableDetailsRow struct { + Name string `db:"name"` + Type string `db:"type"` + Kind string `db:"kind"` + IsNull string `db:"null?"` + Default sql.NullString `db:"default"` + PrimaryKey string `db:"primary key"` + UniqueKey string `db:"unique key"` + Check sql.NullString `db:"check"` + Expression sql.NullString `db:"expression"` + Comment sql.NullString `db:"comment"` + PolicyName sql.NullString `db:"policy name"` +} + +func (row dynamicTableDetailsRow) convert() *DynamicTableDetails { + typ, _ := ToDataType(row.Type) + dtd := &DynamicTableDetails{ + Name: row.Name, + Type: typ, + Kind: row.Kind, + IsNull: row.IsNull == "Y", + PrimaryKey: row.PrimaryKey, + UniqueKey: row.UniqueKey, + } + if row.Default.Valid { + dtd.Default = row.Default.String + } + if row.Check.Valid { + dtd.Check = row.Check.String + } + if row.Expression.Valid { + dtd.Expression = row.Expression.String + } + if row.Comment.Valid { + dtd.Comment = row.Comment.String + } + if row.PolicyName.Valid { + dtd.PolicyName = row.PolicyName.String + } + return dtd +} diff --git a/pkg/sdk/dynamic_table_dto.go b/pkg/sdk/dynamic_table_dto.go new file mode 100644 index 0000000000..f629a9c46f --- /dev/null +++ b/pkg/sdk/dynamic_table_dto.go @@ -0,0 +1,51 @@ +package sdk + +//go:generate go run ./dto-builder-generator/main.go + +var ( + _ optionsProvider[createDynamicTableOptions] = new(CreateDynamicTableRequest) + _ optionsProvider[alterDynamicTableOptions] = new(AlterDynamicTableRequest) + _ optionsProvider[dropDynamicTableOptions] = new(DropDynamicTableRequest) + _ optionsProvider[showDynamicTableOptions] = new(ShowDynamicTableRequest) +) + +type CreateDynamicTableRequest struct { + orReplace bool + + name SchemaObjectIdentifier // required + warehouse AccountObjectIdentifier // required + targetLag TargetLag // required + query string // required + + comment *string +} + +type AlterDynamicTableRequest struct { + name SchemaObjectIdentifier // required + + // One of + suspend *bool + resume *bool + refresh *bool + set *DynamicTableSetRequest +} + +type DynamicTableSetRequest struct { + targetLag *TargetLag + warehourse *AccountObjectIdentifier +} + +type DropDynamicTableRequest struct { + name SchemaObjectIdentifier // required +} + +type DescribeDynamicTableRequest struct { + name SchemaObjectIdentifier // required +} + +type ShowDynamicTableRequest struct { + like *Like + in *In + startsWith *string + limit *LimitFrom +} diff --git a/pkg/sdk/dynamic_table_dto_builders.go b/pkg/sdk/dynamic_table_dto_builders.go new file mode 100644 index 0000000000..6b10db233a --- /dev/null +++ b/pkg/sdk/dynamic_table_dto_builders.go @@ -0,0 +1,107 @@ +package sdk + +func NewCreateDynamicTableRequest( + name SchemaObjectIdentifier, + warehouse AccountObjectIdentifier, + targetLag TargetLag, + query string, +) *CreateDynamicTableRequest { + s := CreateDynamicTableRequest{} + s.name = name + s.warehouse = warehouse + s.targetLag = targetLag + s.query = query + return &s +} + +func (s *CreateDynamicTableRequest) WithOrReplace(orReplace bool) *CreateDynamicTableRequest { + s.orReplace = orReplace + return s +} + +func (s *CreateDynamicTableRequest) WithComment(comment *string) *CreateDynamicTableRequest { + s.comment = comment + return s +} + +func NewAlterDynamicTableRequest( + name SchemaObjectIdentifier, +) *AlterDynamicTableRequest { + s := AlterDynamicTableRequest{} + s.name = name + return &s +} + +func (s *AlterDynamicTableRequest) WithSuspend(suspend *bool) *AlterDynamicTableRequest { + s.suspend = suspend + return s +} + +func (s *AlterDynamicTableRequest) WithResume(resume *bool) *AlterDynamicTableRequest { + s.resume = resume + return s +} + +func (s *AlterDynamicTableRequest) WithRefresh(refresh *bool) *AlterDynamicTableRequest { + s.refresh = refresh + return s +} + +func (s *AlterDynamicTableRequest) WithSet(set *DynamicTableSetRequest) *AlterDynamicTableRequest { + s.set = set + return s +} + +func NewDynamicTableSetRequest() *DynamicTableSetRequest { + return &DynamicTableSetRequest{} +} + +func (s *DynamicTableSetRequest) WithTargetLag(targetLag *TargetLag) *DynamicTableSetRequest { + s.targetLag = targetLag + return s +} + +func (s *DynamicTableSetRequest) WithWarehourse(warehourse *AccountObjectIdentifier) *DynamicTableSetRequest { + s.warehourse = warehourse + return s +} + +func NewDropDynamicTableRequest( + name SchemaObjectIdentifier, +) *DropDynamicTableRequest { + s := DropDynamicTableRequest{} + s.name = name + return &s +} + +func NewDescribeDynamicTableRequest( + name SchemaObjectIdentifier, +) *DescribeDynamicTableRequest { + s := DescribeDynamicTableRequest{} + s.name = name + return &s +} + +func NewShowDynamicTableRequest() *ShowDynamicTableRequest { + return &ShowDynamicTableRequest{} +} + +func (s *ShowDynamicTableRequest) WithLike(like *Like) *ShowDynamicTableRequest { + s.like = like + return s +} + +func (s *ShowDynamicTableRequest) WithIn(in *In) *ShowDynamicTableRequest { + s.in = in + return s +} + +func (s *ShowDynamicTableRequest) WithStartsWith(startsWith *string) *ShowDynamicTableRequest { + s.startsWith = startsWith + return s +} + +func (s *ShowDynamicTableRequest) WithLimit(limit *LimitFrom) *ShowDynamicTableRequest { + s.limit = limit + return s +} diff --git a/pkg/sdk/dynamic_table_impl.go b/pkg/sdk/dynamic_table_impl.go new file mode 100644 index 0000000000..3743c06898 --- /dev/null +++ b/pkg/sdk/dynamic_table_impl.go @@ -0,0 +1,113 @@ +package sdk + +import ( + "context" +) + +var _ DynamicTables = (*dynamicTables)(nil) + +type dynamicTables struct { + client *Client +} + +func (v *dynamicTables) Create(ctx context.Context, request *CreateDynamicTableRequest) error { + opts := request.toOpts() + return validateAndExec(v.client, ctx, opts) +} + +func (v *dynamicTables) Alter(ctx context.Context, request *AlterDynamicTableRequest) error { + opts := request.toOpts() + return validateAndExec(v.client, ctx, opts) +} + +func (v *dynamicTables) Drop(ctx context.Context, request *DropDynamicTableRequest) error { + opts := request.toOpts() + return validateAndExec(v.client, ctx, opts) +} + +func (v *dynamicTables) Describe(ctx context.Context, request *DescribeDynamicTableRequest) (*DynamicTableDetails, error) { + opts := request.toOpts() + row, err := validateAndQueryOne[dynamicTableDetailsRow](v.client, ctx, opts) + if err != nil { + return nil, err + } + return row.convert(), nil +} + +func (v *dynamicTables) Show(ctx context.Context, request *ShowDynamicTableRequest) ([]DynamicTable, error) { + opts := request.toOpts() + rows, err := validateAndQuery[dynamicTableRow](v.client, ctx, opts) + if err != nil { + return nil, err + } + result := convertRows[dynamicTableRow, DynamicTable](rows) + return result, nil +} + +func (v *dynamicTables) ShowByID(ctx context.Context, id AccountObjectIdentifier) (*DynamicTable, error) { + request := NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(id.Name())}) + dynamicTables, err := v.Show(ctx, request) + if err != nil { + return nil, err + } + return findOne(dynamicTables, func(r DynamicTable) bool { return r.Name == id.Name() }) +} + +func (s *CreateDynamicTableRequest) toOpts() *createDynamicTableOptions { + return &createDynamicTableOptions{ + OrReplace: Bool(s.orReplace), + name: s.name, + warehouse: s.warehouse, + targetLag: s.targetLag, + query: s.query, + Comment: s.comment, + } +} + +func (s *AlterDynamicTableRequest) toOpts() *alterDynamicTableOptions { + opts := alterDynamicTableOptions{ + name: s.name, + } + if s.suspend != nil { + opts.Suspend = s.suspend + } + if s.resume != nil { + opts.Resume = s.resume + } + if s.refresh != nil { + opts.Refresh = s.refresh + } + if s.set != nil { + opts.Set = &DynamicTableSet{s.set.targetLag, s.set.warehourse} + } + return &opts +} + +func (s *DropDynamicTableRequest) toOpts() *dropDynamicTableOptions { + return &dropDynamicTableOptions{ + name: s.name, + } +} + +func (s *DescribeDynamicTableRequest) toOpts() *describeDynamicTableOptions { + return &describeDynamicTableOptions{ + name: s.name, + } +} + +func (s *ShowDynamicTableRequest) toOpts() *showDynamicTableOptions { + opts := showDynamicTableOptions{} + if s.like != nil { + opts.Like = s.like + } + if s.in != nil { + opts.In = s.in + } + if s.startsWith != nil { + opts.StartsWith = s.startsWith + } + if s.limit != nil { + opts.Limit = s.limit + } + return &opts +} diff --git a/pkg/sdk/dynamic_table_integration_test.go b/pkg/sdk/dynamic_table_integration_test.go new file mode 100644 index 0000000000..efcaa6a760 --- /dev/null +++ b/pkg/sdk/dynamic_table_integration_test.go @@ -0,0 +1,156 @@ +package sdk + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInt_DynamicTableCreateAndDrop(t *testing.T) { + client := testClient(t) + + warehouseTest, warehouseCleanup := createWarehouse(t, client) + t.Cleanup(warehouseCleanup) + databaseTest, databaseCleanup := createDatabase(t, client) + t.Cleanup(databaseCleanup) + schemaTest, schemaCleanup := createSchema(t, client, databaseTest) + t.Cleanup(schemaCleanup) + tableTest, tableCleanup := createTable(t, client, databaseTest, schemaTest) + t.Cleanup(tableCleanup) + + ctx := context.Background() + t.Run("test complete", func(t *testing.T) { + name := NewSchemaObjectIdentifier(databaseTest.Name, schemaTest.Name, randomString(t)) + targetLag := TargetLag{ + Lagtime: String("2 minutes"), + } + query := "select id from " + tableTest.ID().FullyQualifiedName() + comment := randomComment(t) + err := client.DynamicTables.Create(ctx, NewCreateDynamicTableRequest(name, warehouseTest.ID(), targetLag, query).WithOrReplace(true).WithComment(&comment)) + require.NoError(t, err) + t.Cleanup(func() { + err = client.DynamicTables.Drop(ctx, NewDropDynamicTableRequest(name)) + require.NoError(t, err) + }) + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(name.Name())})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + + entity := entities[0] + require.Equal(t, name.Name(), entity.Name) + require.Equal(t, warehouseTest.ID().Name(), entity.Warehouse) + require.Equal(t, *targetLag.Lagtime, entity.TargetLag) + }) + + t.Run("test complete with target lag", func(t *testing.T) { + name := NewSchemaObjectIdentifier(databaseTest.Name, schemaTest.Name, randomString(t)) + targetLag := TargetLag{ + Downstream: Bool(true), + } + query := "select id from " + tableTest.ID().FullyQualifiedName() + comment := randomComment(t) + err := client.DynamicTables.Create(ctx, NewCreateDynamicTableRequest(name, warehouseTest.ID(), targetLag, query).WithOrReplace(true).WithComment(&comment)) + require.NoError(t, err) + t.Cleanup(func() { + err = client.DynamicTables.Drop(ctx, NewDropDynamicTableRequest(name)) + require.NoError(t, err) + }) + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(name.Name())})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + + entity := entities[0] + require.Equal(t, name.Name(), entity.Name) + require.Equal(t, warehouseTest.ID().Name(), entity.Warehouse) + require.Equal(t, "DOWNSTREAM", entity.TargetLag) + }) +} + +func TestInt_DynamicTableDescribe(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + dynamicTable, dynamicTableCleanup := createDynamicTable(t, client) + t.Cleanup(dynamicTableCleanup) + + t.Run("when dynamic table exists", func(t *testing.T) { + _, err := client.DynamicTables.Describe(ctx, NewDescribeDynamicTableRequest(dynamicTable.ID())) + require.NoError(t, err) + }) + + t.Run("when dynamic table does not exist", func(t *testing.T) { + name := NewSchemaObjectIdentifier("my_db", "my_schema", "does_not_exist") + _, err := client.DynamicTables.Describe(ctx, NewDescribeDynamicTableRequest(name)) + assert.ErrorIs(t, err, errObjectNotExistOrAuthorized) + }) +} + +func TestInt_DynamicTableAlter(t *testing.T) { + client := testClient(t) + ctx := context.Background() + + t.Run("alter with suspend or resume", func(t *testing.T) { + dynamicTable, dynamicTableCleanup := createDynamicTable(t, client) + t.Cleanup(dynamicTableCleanup) + + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(dynamicTable.Name)})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + require.Equal(t, DynamicTableSchedulingStateRunning, entities[0].SchedulingState) + + err = client.DynamicTables.Alter(ctx, NewAlterDynamicTableRequest(dynamicTable.ID()).WithSuspend(Bool(true))) + require.NoError(t, err) + + entities, err = client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(dynamicTable.Name)})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + require.Equal(t, DynamicTableSchedulingStateSuspended, entities[0].SchedulingState) + + err = client.DynamicTables.Alter(ctx, NewAlterDynamicTableRequest(dynamicTable.ID()).WithResume(Bool(true))) + require.NoError(t, err) + + entities, err = client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(dynamicTable.Name)})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + require.Equal(t, DynamicTableSchedulingStateRunning, entities[0].SchedulingState) + }) + + t.Run("alter with refresh", func(t *testing.T) { + dynamicTable, dynamicTableCleanup := createDynamicTable(t, client) + t.Cleanup(dynamicTableCleanup) + + err := client.DynamicTables.Alter(ctx, NewAlterDynamicTableRequest(dynamicTable.ID()).WithRefresh(Bool(true))) + require.NoError(t, err) + + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(dynamicTable.Name)})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + }) + + t.Run("alter with suspend and resume", func(t *testing.T) { + dynamicTable, dynamicTableCleanup := createDynamicTable(t, client) + t.Cleanup(dynamicTableCleanup) + + err := client.DynamicTables.Alter(ctx, NewAlterDynamicTableRequest(dynamicTable.ID()).WithSuspend(Bool(true)).WithResume(Bool(true))) + require.Error(t, err) + expected := "alter statement needs exactly one action from: set, unset, refresh" + require.Equal(t, expected, err.Error()) + }) + + t.Run("alter with set", func(t *testing.T) { + dynamicTable, dynamicTableCleanup := createDynamicTable(t, client) + t.Cleanup(dynamicTableCleanup) + + targetLagCases := []string{"10 minutes", "DOWNSTREAM"} + for _, value := range targetLagCases { + err := client.DynamicTables.Alter(ctx, NewAlterDynamicTableRequest(dynamicTable.ID()).WithSet(NewDynamicTableSetRequest().WithTargetLag(&TargetLag{Lagtime: String(value)}))) + require.NoError(t, err) + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(dynamicTable.Name)})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + require.Equal(t, value, entities[0].TargetLag) + } + }) +} diff --git a/pkg/sdk/dynamic_table_test.go b/pkg/sdk/dynamic_table_test.go new file mode 100644 index 0000000000..b8cd043397 --- /dev/null +++ b/pkg/sdk/dynamic_table_test.go @@ -0,0 +1,201 @@ +package sdk + +import ( + "testing" +) + +func TestDynamicTableCreate(t *testing.T) { + id := randomSchemaObjectIdentifier(t) + defaultOpts := func() *createDynamicTableOptions { + return &createDynamicTableOptions{ + name: id, + targetLag: TargetLag{ + Lagtime: String("1 minutes"), + }, + warehouse: AccountObjectIdentifier{ + name: "warehouse_name", + }, + query: "SELECT product_id, product_name FROM staging_table", + } + } + t.Run("validation: nil options", func(t *testing.T) { + var opts *createDynamicTableOptions = nil + assertOptsInvalidJoinedErrors(t, opts, errNilOptions) + }) + + t.Run("validation: incorrect identifier", func(t *testing.T) { + opts := defaultOpts() + opts.name = NewSchemaObjectIdentifier("", "", "") + assertOptsInvalidJoinedErrors(t, opts, errInvalidObjectIdentifier) + }) + + t.Run("basic", func(t *testing.T) { + opts := defaultOpts() + opts.OrReplace = Bool(true) + assertOptsValidAndSQLEquals(t, opts, `CREATE OR REPLACE DYNAMIC TABLE %s TARGET_LAG = '1 minutes' WAREHOUSE = "warehouse_name" AS SELECT product_id, product_name FROM staging_table`, id.FullyQualifiedName()) + }) + + t.Run("all optional", func(t *testing.T) { + opts := defaultOpts() + opts.OrReplace = Bool(true) + opts.Comment = String("comment") + assertOptsValidAndSQLEquals(t, opts, `CREATE OR REPLACE DYNAMIC TABLE %s TARGET_LAG = '1 minutes' WAREHOUSE = "warehouse_name" COMMENT = 'comment' AS SELECT product_id, product_name FROM staging_table`, id.FullyQualifiedName()) + }) +} + +func TestDynamicTableAlter(t *testing.T) { + id := randomSchemaObjectIdentifier(t) + defaultOpts := func() *alterDynamicTableOptions { + return &alterDynamicTableOptions{ + name: id, + } + } + + t.Run("validation: nil options", func(t *testing.T) { + var opts *alterDynamicTableOptions = nil + assertOptsInvalidJoinedErrors(t, opts, errNilOptions) + }) + + t.Run("validation: incorrect identifier", func(t *testing.T) { + opts := defaultOpts() + opts.name = NewSchemaObjectIdentifier("", "", "") + assertOptsInvalidJoinedErrors(t, opts, errInvalidObjectIdentifier) + }) + + t.Run("validation: no alter action", func(t *testing.T) { + opts := defaultOpts() + assertOptsInvalidJoinedErrors(t, opts, errAlterNeedsExactlyOneAction) + }) + + t.Run("validation: multiple alter actions", func(t *testing.T) { + opts := defaultOpts() + opts.Resume = Bool(true) + opts.Suspend = Bool(true) + assertOptsInvalidJoinedErrors(t, opts, errAlterNeedsExactlyOneAction) + }) + + t.Run("validation: no property to unset", func(t *testing.T) { + opts := defaultOpts() + assertOptsInvalidJoinedErrors(t, opts, errAlterNeedsAtLeastOneProperty) + }) + + t.Run("suspend", func(t *testing.T) { + opts := defaultOpts() + opts.Suspend = Bool(true) + assertOptsValidAndSQLEquals(t, opts, `ALTER DYNAMIC TABLE %s SUSPEND`, id.FullyQualifiedName()) + }) + + t.Run("resume", func(t *testing.T) { + opts := defaultOpts() + opts.Resume = Bool(true) + assertOptsValidAndSQLEquals(t, opts, `ALTER DYNAMIC TABLE %s RESUME`, id.FullyQualifiedName()) + }) + + t.Run("set", func(t *testing.T) { + opts := defaultOpts() + opts.Set = &DynamicTableSet{ + TargetLag: &TargetLag{ + Lagtime: String("1 minutes"), + }, + Warehouse: &AccountObjectIdentifier{ + name: "warehouse_name", + }, + } + assertOptsValidAndSQLEquals(t, opts, `ALTER DYNAMIC TABLE %s SET TARGET_LAG = '1 minutes' WAREHOUSE = "warehouse_name"`, id.FullyQualifiedName()) + }) +} + +func TestDynamicTableDrop(t *testing.T) { + id := randomSchemaObjectIdentifier(t) + defaultOpts := func() *dropDynamicTableOptions { + return &dropDynamicTableOptions{ + name: id, + } + } + + t.Run("validation: nil options", func(t *testing.T) { + var opts *dropDynamicTableOptions = nil + assertOptsInvalidJoinedErrors(t, opts, errNilOptions) + }) + + t.Run("validation: incorrect identifier", func(t *testing.T) { + opts := defaultOpts() + opts.name = NewSchemaObjectIdentifier("", "", "") + assertOptsInvalidJoinedErrors(t, opts, errInvalidObjectIdentifier) + }) + + t.Run("empty options", func(t *testing.T) { + opts := defaultOpts() + assertOptsValidAndSQLEquals(t, opts, `DROP DYNAMIC TABLE %s`, id.FullyQualifiedName()) + }) +} + +func TestDynamicTableShow(t *testing.T) { + id := randomSchemaObjectIdentifier(t) + defaultOpts := func() *showDynamicTableOptions { + return &showDynamicTableOptions{} + } + + t.Run("validation: nil options", func(t *testing.T) { + var opts *showDynamicTableOptions = nil + assertOptsInvalidJoinedErrors(t, opts, errNilOptions) + }) + + t.Run("validation: empty like", func(t *testing.T) { + opts := defaultOpts() + opts.Like = &Like{} + assertOptsInvalidJoinedErrors(t, opts, errPatternRequiredForLikeKeyword) + }) + + t.Run("show with in", func(t *testing.T) { + opts := defaultOpts() + opts.In = &In{ + Database: NewAccountObjectIdentifier("database"), + } + assertOptsValidAndSQLEquals(t, opts, `SHOW DYNAMIC TABLES IN DATABASE "database"`) + }) + + t.Run("show with like", func(t *testing.T) { + opts := defaultOpts() + opts.Like = &Like{ + Pattern: String(id.Name()), + } + assertOptsValidAndSQLEquals(t, opts, `SHOW DYNAMIC TABLES LIKE '%s'`, id.Name()) + }) + + t.Run("show with like and in", func(t *testing.T) { + opts := defaultOpts() + opts.Like = &Like{ + Pattern: String(id.Name()), + } + opts.In = &In{ + Database: NewAccountObjectIdentifier("database"), + } + assertOptsValidAndSQLEquals(t, opts, `SHOW DYNAMIC TABLES LIKE '%s' IN DATABASE "database"`, id.Name()) + }) +} + +func TestDynamicTableDescribe(t *testing.T) { + id := randomSchemaObjectIdentifier(t) + defaultOpts := func() *describeDynamicTableOptions { + return &describeDynamicTableOptions{ + name: id, + } + } + + t.Run("validation: nil options", func(t *testing.T) { + var opts *describeDynamicTableOptions = nil + assertOptsInvalidJoinedErrors(t, opts, errNilOptions) + }) + + t.Run("validation: incorrect identifier", func(t *testing.T) { + opts := defaultOpts() + opts.name = NewSchemaObjectIdentifier("", "", "") + assertOptsInvalidJoinedErrors(t, opts, errInvalidObjectIdentifier) + }) + + t.Run("describe", func(t *testing.T) { + opts := defaultOpts() + assertOptsValidAndSQLEquals(t, opts, `DESCRIBE DYNAMIC TABLE %s`, id.FullyQualifiedName()) + }) +} diff --git a/pkg/sdk/dynamic_table_validations.go b/pkg/sdk/dynamic_table_validations.go new file mode 100644 index 0000000000..80fd2e0e36 --- /dev/null +++ b/pkg/sdk/dynamic_table_validations.go @@ -0,0 +1,115 @@ +package sdk + +import ( + "errors" +) + +var ( + _ validatable = new(createDynamicTableOptions) + _ validatable = new(alterDynamicTableOptions) + _ validatable = new(dropDynamicTableOptions) + _ validatable = new(showDynamicTableOptions) + _ validatable = new(describeDynamicTableOptions) + _ validatable = new(DynamicTableSet) +) + +func (tl *TargetLag) validate() error { + if tl == nil { + return errors.Join(errNilOptions) + } + var errs []error + if everyValueSet(tl.Lagtime, tl.Downstream) { + errs = append(errs, errOneOf("Lagtime", "Downstream")) + } + return errors.Join(errs...) +} + +func (opts *createDynamicTableOptions) validate() error { + if opts == nil { + return errors.Join(errNilOptions) + } + var errs []error + if !validObjectidentifier(opts.name) { + errs = append(errs, errInvalidObjectIdentifier) + } + if !validObjectidentifier(opts.warehouse) { + errs = append(errs, errInvalidObjectIdentifier) + } + return errors.Join(errs...) +} + +func (dts *DynamicTableSet) validate() error { + var errs []error + if valueSet(dts.TargetLag) { + errs = append(errs, dts.TargetLag.validate()) + } + + if valueSet(dts.Warehouse) { + if !validObjectidentifier(*dts.Warehouse) { + errs = append(errs, errInvalidObjectIdentifier) + } + } + return errors.Join(errs...) +} + +func (opts *alterDynamicTableOptions) validate() error { + if opts == nil { + return errors.Join(errNilOptions) + } + var errs []error + if !validObjectidentifier(opts.name) { + errs = append(errs, errInvalidObjectIdentifier) + } + if ok := exactlyOneValueSet( + opts.Suspend, + opts.Resume, + opts.Refresh, + opts.Set, + ); !ok { + errs = append(errs, errAlterNeedsExactlyOneAction) + } + if !anyValueSet(opts.Suspend, opts.Resume, opts.Refresh, opts.Set) { + errs = append(errs, errAlterNeedsAtLeastOneProperty) + } + if valueSet(opts.Set) && valueSet(opts.Set.TargetLag) { + errs = append(errs, opts.Set.TargetLag.validate()) + } + return errors.Join(errs...) +} + +func (opts *showDynamicTableOptions) validate() error { + if opts == nil { + return errors.Join(errNilOptions) + } + var errs []error + if valueSet(opts.Like) && !valueSet(opts.Like.Pattern) { + errs = append(errs, errPatternRequiredForLikeKeyword) + } + if valueSet(opts.In) && !exactlyOneValueSet(opts.In.Account, opts.In.Database, opts.In.Schema) { + errs = append(errs, errScopeRequiredForInKeyword) + } + return errors.Join(errs...) +} + +func (opts *dropDynamicTableOptions) validate() error { + if opts == nil { + return errors.Join(errNilOptions) + } + var errs []error + + if !validObjectidentifier(opts.name) { + errs = append(errs, errInvalidObjectIdentifier) + } + return errors.Join(errs...) +} + +func (opts *describeDynamicTableOptions) validate() error { + if opts == nil { + return errors.Join(errNilOptions) + } + var errs []error + if !validObjectidentifier(opts.name) { + errs = append(errs, errInvalidObjectIdentifier) + } + return errors.Join(errs...) +} diff --git a/pkg/sdk/helper_test.go b/pkg/sdk/helper_test.go index 31283962cc..548bee1741 100644 --- a/pkg/sdk/helper_test.go +++ b/pkg/sdk/helper_test.go @@ -347,6 +347,9 @@ func createDatabaseWithOptions(t *testing.T, client *Client, id AccountObjectIde require.NoError(t, err) return database, func() { err := client.Databases.Drop(ctx, id, nil) + if err == errObjectNotExistOrAuthorized { + return + } require.NoError(t, err) } } @@ -366,6 +369,9 @@ func createSchemaWithIdentifier(t *testing.T, client *Client, database *Database require.NoError(t, err) return schema, func() { err := client.Schemas.Drop(ctx, schemaID, nil) + if err == errObjectNotExistOrAuthorized { + return + } require.NoError(t, err) } } @@ -711,6 +717,58 @@ func createStage(t *testing.T, client *Client, database *Database, schema *Schem }, stageCleanup } +func createDynamicTable(t *testing.T, client *Client) (*DynamicTable, func()) { + t.Helper() + return createDynamicTableWithOptions(t, client, nil, nil, nil, nil) +} + +func createDynamicTableWithOptions(t *testing.T, client *Client, warehouse *Warehouse, database *Database, schema *Schema, table *Table) (*DynamicTable, func()) { + t.Helper() + var warehouseCleanup func() + if warehouse == nil { + warehouse, warehouseCleanup = createWarehouse(t, client) + } + var databaseCleanup func() + if database == nil { + database, databaseCleanup = createDatabase(t, client) + } + var schemaCleanup func() + if schema == nil { + schema, schemaCleanup = createSchema(t, client, database) + } + var tableCleanup func() + if table == nil { + table, tableCleanup = createTable(t, client, database, schema) + } + name := NewSchemaObjectIdentifier(schema.DatabaseName, schema.Name, randomString(t)) + targetLag := TargetLag{ + Lagtime: String("2 minutes"), + } + query := "select id from " + table.ID().FullyQualifiedName() + comment := randomComment(t) + ctx := context.Background() + err := client.DynamicTables.Create(ctx, NewCreateDynamicTableRequest(name, warehouse.ID(), targetLag, query).WithOrReplace(true).WithComment(&comment)) + require.NoError(t, err) + entities, err := client.DynamicTables.Show(ctx, NewShowDynamicTableRequest().WithLike(&Like{Pattern: String(name.Name())}).WithIn(&In{Schema: schema.ID()})) + require.NoError(t, err) + require.Equal(t, 1, len(entities)) + return &entities[0], func() { + require.NoError(t, client.DynamicTables.Drop(ctx, NewDropDynamicTableRequest(name))) + if tableCleanup != nil { + tableCleanup() + } + if schemaCleanup != nil { + schemaCleanup() + } + if databaseCleanup != nil { + databaseCleanup() + } + if warehouseCleanup != nil { + warehouseCleanup() + } + } +} + func createStageWithURL(t *testing.T, client *Client, name AccountObjectIdentifier, url string) (*Stage, func()) { t.Helper() ctx := context.Background() diff --git a/pkg/sdk/poc/main.go b/pkg/sdk/poc/main.go index ca4b251c6c..b999983dab 100644 --- a/pkg/sdk/poc/main.go +++ b/pkg/sdk/poc/main.go @@ -26,7 +26,7 @@ func main() { fmt.Printf("Running generator on %s with args %#v\n", file, os.Args[1:]) definition := getDefinition(file) - //runAllTemplatesToStdOut(definition) + // runAllTemplatesToStdOut(definition) runAllTemplatesAndSave(definition, file) }