From b7d79be4239d0711fc52a3bd54efe77de1f0d8f2 Mon Sep 17 00:00:00 2001 From: Nick Zelei <2420177+nickzelei@users.noreply.github.com> Date: Wed, 29 Jan 2025 16:53:18 -0800 Subject: [PATCH] Fixes PG Data Type Arrays on schema init, re-enables integration tests in CI (#3202) --- .github/workflows/go.yml | 9 +- .../go/db/dbschemas/postgresql/system.sql.go | 7 +- .../sql/postgresql/queries/system.sql | 7 +- .../mssql/mssql-manager_integration_test.go | 10 ++- .../postgres-manager_integration_test.go | 7 +- .../sqlmanager/postgres/testdata/setup.sql | 22 ++++- backend/pkg/sqlretry/dbtx_retry.go | 52 ++++++----- cli/internal/cmds/neosync/sync/config.go | 2 +- .../interceptors/retry/interceptor.go | 42 +++++---- .../interceptors/retry/interceptor_test.go | 90 +++++++++++-------- internal/sshtunnel/dialer_integration_test.go | 15 +++- .../testcontainers/sqlserver/sqlserver.go | 4 +- worker/pkg/integration-test/dynamodb_test.go | 2 +- 13 files changed, 176 insertions(+), 93 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 5ccef21685..0b263661fb 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -131,7 +131,7 @@ jobs: - name: Run Integration Tests run: | - go test -race -timeout 1800s -coverprofile=integration-coverage.out -covermode=atomic -run TestIntegrationTestSuite ./... + go test -race -timeout 1800s -coverprofile=integration-coverage.out -covermode=atomic ./... env: INTEGRATION_TESTS_ENABLED: 1 S3_INTEGRATION_TESTS_ENABLED: 1 @@ -206,7 +206,7 @@ jobs: with: go-version-file: go.mod cache-dependency-path: go.sum - + - name: Install benchstat run: go install golang.org/x/perf/cmd/benchstat@latest @@ -217,7 +217,7 @@ jobs: path: main-benchmark.txt key: benchmarks-main restore-keys: benchmarks-main - + - name: Run benchmarks on main if cache miss if: steps.cache-restore.outputs.cache-hit != 'true' run: | @@ -263,7 +263,7 @@ jobs: echo -e "\n
\nBenchstat results\n\n\`\`\`\n$(cat benchstat-output.txt)\n\`\`\`\n
\n" >> benchstat-results.txt - name: Comment Benchstat Results on PR - if: github.event_name == 'pull_request' + if: github.event_name == 'pull_request' uses: thollander/actions-comment-pull-request@v3 with: file-path: benchstat-results.txt @@ -277,4 +277,3 @@ jobs: with: path: branch-benchmark.txt key: benchmarks-main - diff --git a/backend/gen/go/db/dbschemas/postgresql/system.sql.go b/backend/gen/go/db/dbschemas/postgresql/system.sql.go index 991360e122..14e8ac203a 100644 --- a/backend/gen/go/db/dbschemas/postgresql/system.sql.go +++ b/backend/gen/go/db/dbschemas/postgresql/system.sql.go @@ -282,13 +282,18 @@ WITH custom_types AS ( table_columns AS ( SELECT c.oid AS table_oid, - a.atttypid AS type_oid + CASE + WHEN t.typtype = 'b' THEN t.typelem -- If it's an array, use the element type + ELSE a.atttypid -- Otherwise use the type directly + END AS type_oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid + JOIN + pg_catalog.pg_type t ON t.oid = a.atttypid WHERE n.nspname = $1 AND c.relname = ANY($2::TEXT[]) diff --git a/backend/pkg/dbschemas/sql/postgresql/queries/system.sql b/backend/pkg/dbschemas/sql/postgresql/queries/system.sql index c3b5148202..e6debc2b7a 100644 --- a/backend/pkg/dbschemas/sql/postgresql/queries/system.sql +++ b/backend/pkg/dbschemas/sql/postgresql/queries/system.sql @@ -442,13 +442,18 @@ WITH custom_types AS ( table_columns AS ( SELECT c.oid AS table_oid, - a.atttypid AS type_oid + CASE + WHEN t.typtype = 'b' THEN t.typelem -- If it's an array, use the element type + ELSE a.atttypid -- Otherwise use the type directly + END AS type_oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid + JOIN + pg_catalog.pg_type t ON t.oid = a.atttypid WHERE n.nspname = sqlc.arg('schema') AND c.relname = ANY(sqlc.arg('tables')::TEXT[]) diff --git a/backend/pkg/sqlmanager/mssql/mssql-manager_integration_test.go b/backend/pkg/sqlmanager/mssql/mssql-manager_integration_test.go index d0c41a71f6..2c8800a92a 100644 --- a/backend/pkg/sqlmanager/mssql/mssql-manager_integration_test.go +++ b/backend/pkg/sqlmanager/mssql/mssql-manager_integration_test.go @@ -52,6 +52,8 @@ func Test_MssqlManager(t *testing.T) { t.Run("GetDatabaseSchema", func(t *testing.T) { t.Parallel() var expectedIdentityGeneration = "IDENTITY(1,1)" + var expectedIdentitySeed = 1 + var expectedIdentityIncrement = 1 expectedSubset := []*sqlmanager_shared.DatabaseSchemaRow{ { TableSchema: "sqlmanagermssql3", @@ -66,6 +68,9 @@ func Test_MssqlManager(t *testing.T) { OrdinalPosition: 1, GeneratedType: nil, IdentityGeneration: &expectedIdentityGeneration, + ColumnDefaultType: nil, + IdentitySeed: &expectedIdentitySeed, + IdentityIncrement: &expectedIdentityIncrement, }, } @@ -347,9 +352,8 @@ func Test_MssqlManager(t *testing.T) { // } func containsSubset[T any](t testing.TB, array, subset []T) { - t.Helper() - for _, elem := range subset { - require.Contains(t, array, elem) + for idx, elem := range subset { + require.Contains(t, array, elem, "array does not contain expected subset element at index %d", idx) } } diff --git a/backend/pkg/sqlmanager/postgres/postgres-manager_integration_test.go b/backend/pkg/sqlmanager/postgres/postgres-manager_integration_test.go index a9a8eb3077..c8d5057681 100644 --- a/backend/pkg/sqlmanager/postgres/postgres-manager_integration_test.go +++ b/backend/pkg/sqlmanager/postgres/postgres-manager_integration_test.go @@ -379,9 +379,14 @@ func Test_PostgresManager(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, statements) + allStmts := []string{} + for _, block := range statements { + allStmts = append(allStmts, block.Statements...) + } + require.NotEmpty(t, allStmts) + for _, block := range statements { t.Logf("executing %d statements for label %q", len(block.Statements), block.Label) - require.NotEmpty(t, block.Statements) for _, stmt := range block.Statements { _, err = target.DB.Exec(ctx, stmt) require.NoError(t, err, "failed to execute %s statement %q", block.Label, stmt) diff --git a/backend/pkg/sqlmanager/postgres/testdata/setup.sql b/backend/pkg/sqlmanager/postgres/testdata/setup.sql index 1c17ead74c..0bc05b405e 100644 --- a/backend/pkg/sqlmanager/postgres/testdata/setup.sql +++ b/backend/pkg/sqlmanager/postgres/testdata/setup.sql @@ -83,6 +83,12 @@ CREATE TYPE custom_type AS ( part1 INTEGER, part2 TEXT ); + +CREATE TYPE custom_type2 AS ( + part1 INTEGER, + part2 TEXT +); + CREATE FUNCTION custom_function() RETURNS TRIGGER AS $$ BEGIN NEW.id := nextval('custom_seq'); @@ -97,11 +103,23 @@ CREATE TYPE custom_enum AS ENUM ( CREATE DOMAIN custom_domain AS TEXT CHECK (VALUE ~ '^[a-zA-Z]+$'); -- Only allows alphabetic characters +CREATE DOMAIN custom_domain2 AS TEXT + CHECK (VALUE ~ '^[a-zA-Z]+$'); -- Only allows alphabetic characters + +CREATE TYPE custom_enum2 AS ENUM ( + 'value4', + 'value5', + 'value6' +); + CREATE TABLE custom_table ( id INTEGER NOT NULL DEFAULT nextval('custom_seq'), name custom_domain NOT NULL, + names custom_domain2[] NOT NULL, -- testing custom domain as array data custom_type, + datas custom_type2[] NOT NULL, -- testing custom composite type as array status custom_enum NOT NULL, + statuses custom_enum2[] NOT NULL, -- testing custom data type as array created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW() ); -- Adding a trigger to use the custom function for setting the 'id' field @@ -140,7 +158,7 @@ CREATE TABLE IF NOT EXISTS "CaPiTaL"."BadName" ( ); INSERT INTO "CaPiTaL"."BadName" ("NAME") -VALUES +VALUES ('Xk7pQ9nM3v'), ('Rt5wLjH2yB'), ('Zc8fAe4dN6'), @@ -154,7 +172,7 @@ CREATE TABLE "CaPiTaL"."Bad Name 123!@#" ( INSERT INTO "CaPiTaL"."Bad Name 123!@#" ("NAME") -VALUES +VALUES ('Xk7pQ9nM3v'), ('Rt5wLjH2yB'), ('Zc8fAe4dN6'), diff --git a/backend/pkg/sqlretry/dbtx_retry.go b/backend/pkg/sqlretry/dbtx_retry.go index db761fd3a3..bb3202d1da 100644 --- a/backend/pkg/sqlretry/dbtx_retry.go +++ b/backend/pkg/sqlretry/dbtx_retry.go @@ -24,39 +24,46 @@ type RetryDBTX struct { var _ sqldbtx.DBTX = (*RetryDBTX)(nil) type config struct { - retryOpts []backoff.RetryOption + getRetryOpts func() []backoff.RetryOption } type Option func(*config) func NewDefault(dbtx sqldbtx.DBTX, logger *slog.Logger) *RetryDBTX { - backoffStrategy := backoff.NewExponentialBackOff() - backoffStrategy.InitialInterval = 200 * time.Millisecond - backoffStrategy.MaxInterval = 30 * time.Second - backoffStrategy.Multiplier = 2 - backoffStrategy.RandomizationFactor = 0.3 - return New(dbtx, WithRetryOptions( - backoff.WithBackOff(backoffStrategy), - backoff.WithMaxTries(25), - backoff.WithMaxElapsedTime(5*time.Minute), - backoff.WithNotify(func(err error, d time.Duration) { - logger.Warn(fmt.Sprintf("sql error with retry: %s, retrying in %s", err.Error(), d.String())) - }), + func() []backoff.RetryOption { + backoffStrategy := backoff.NewExponentialBackOff() + backoffStrategy.InitialInterval = 200 * time.Millisecond + backoffStrategy.MaxInterval = 30 * time.Second + backoffStrategy.Multiplier = 2 + backoffStrategy.RandomizationFactor = 0.3 + return []backoff.RetryOption{ + backoff.WithBackOff(backoffStrategy), + backoff.WithMaxTries(25), + backoff.WithMaxElapsedTime(5 * time.Minute), + backoff.WithNotify(func(err error, d time.Duration) { + logger.Warn(fmt.Sprintf("sql error with retry: %s, retrying in %s", err.Error(), d.String())) + }), + } + }, )) } +func noRetryOptions() []backoff.RetryOption { + return []backoff.RetryOption{} +} + func New(dbtx sqldbtx.DBTX, opts ...Option) *RetryDBTX { - cfg := &config{} + cfg := &config{getRetryOpts: noRetryOptions} for _, opt := range opts { opt(cfg) } return &RetryDBTX{dbtx: dbtx, config: cfg} } -func WithRetryOptions(opts ...backoff.RetryOption) Option { +func WithRetryOptions(getRetryOpts func() []backoff.RetryOption) Option { return func(cfg *config) { - cfg.retryOpts = opts + cfg.getRetryOpts = getRetryOpts } } @@ -64,21 +71,21 @@ func (r *RetryDBTX) ExecContext(ctx context.Context, query string, args ...any) operation := func() (sql.Result, error) { return r.dbtx.ExecContext(ctx, query, args...) } - return retry(ctx, operation, r.config.retryOpts...) + return retry(ctx, operation, r.config.getRetryOpts) } func (r *RetryDBTX) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { operation := func() (*sql.Stmt, error) { return r.dbtx.PrepareContext(ctx, query) } - return retry(ctx, operation, r.config.retryOpts...) + return retry(ctx, operation, r.config.getRetryOpts) } func (r *RetryDBTX) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { operation := func() (*sql.Rows, error) { return r.dbtx.QueryContext(ctx, query, args...) } - return retry(ctx, operation, r.config.retryOpts...) + return retry(ctx, operation, r.config.getRetryOpts) } func (r *RetryDBTX) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { @@ -89,7 +96,7 @@ func (r *RetryDBTX) PingContext(ctx context.Context) error { operation := func() (any, error) { return nil, r.dbtx.PingContext(ctx) } - _, err := retry(ctx, operation, r.config.retryOpts...) + _, err := retry(ctx, operation, r.config.getRetryOpts) return err } @@ -97,10 +104,11 @@ func (r *RetryDBTX) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, operation := func() (*sql.Tx, error) { return r.dbtx.BeginTx(ctx, opts) } - return retry(ctx, operation, r.config.retryOpts...) + return retry(ctx, operation, r.config.getRetryOpts) } -func retry[T any](ctx context.Context, fn func() (T, error), opts ...backoff.RetryOption) (T, error) { +func retry[T any](ctx context.Context, fn func() (T, error), getOpts func() []backoff.RetryOption) (T, error) { + opts := getOpts() return retryUnwrap(backoff.Retry(ctx, retryWrap(fn), opts...)) } diff --git a/cli/internal/cmds/neosync/sync/config.go b/cli/internal/cmds/neosync/sync/config.go index 6023e2f67e..1639946a52 100644 --- a/cli/internal/cmds/neosync/sync/config.go +++ b/cli/internal/cmds/neosync/sync/config.go @@ -230,7 +230,7 @@ func isConfigValid(cmd *cmdConfig, logger *slog.Logger, sourceConnection *mgmtv1 return fmt.Errorf("truncate cascade is only supported in postgres") } - if cmd.Destination.OnConflict.DoNothing && cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled { + if cmd.Destination != nil && cmd.Destination.OnConflict.DoNothing && cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled { return errors.New("on-conflict-do-nothing and on-conflict-do-update cannot be used together") } diff --git a/internal/connectrpc/interceptors/retry/interceptor.go b/internal/connectrpc/interceptors/retry/interceptor.go index c7abd6d8d1..a527957c4f 100644 --- a/internal/connectrpc/interceptors/retry/interceptor.go +++ b/internal/connectrpc/interceptors/retry/interceptor.go @@ -18,35 +18,41 @@ type Interceptor struct { var _ connect.Interceptor = &Interceptor{} type config struct { - retryOptions []backoff.RetryOption + getRetryOptions func() []backoff.RetryOption } type Option func(*config) func DefaultRetryInterceptor(logger *slog.Logger) *Interceptor { return New( - WithRetryOptions( - backoff.WithBackOff(backoff.NewExponentialBackOff()), - backoff.WithMaxTries(10), - backoff.WithMaxElapsedTime(1*time.Minute), - backoff.WithNotify(func(err error, d time.Duration) { - logger.Warn(fmt.Sprintf("error with retry: %s, retrying in %s", err.Error(), d.String())) - }), - ), + WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithBackOff(backoff.NewExponentialBackOff()), + backoff.WithMaxTries(10), + backoff.WithMaxElapsedTime(1 * time.Minute), + backoff.WithNotify(func(err error, d time.Duration) { + logger.Warn(fmt.Sprintf("error with retry: %s, retrying in %s", err.Error(), d.String())) + }), + } + }), ) } +func noRetryOptions() []backoff.RetryOption { + return []backoff.RetryOption{} +} + func New(opts ...Option) *Interceptor { - cfg := &config{} + cfg := &config{getRetryOptions: noRetryOptions} for _, opt := range opts { opt(cfg) } return &Interceptor{config: cfg} } -func WithRetryOptions(opts ...backoff.RetryOption) Option { +func WithRetryOptions(getRetryOptions func() []backoff.RetryOption) Option { return func(cfg *config) { - cfg.retryOptions = opts + cfg.getRetryOptions = getRetryOptions } } @@ -60,7 +66,8 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return response, nil } - res, err := backoff.Retry(ctx, operation, i.config.retryOptions...) + opts := i.config.getRetryOptions() + res, err := backoff.Retry(ctx, operation, opts...) if err != nil { return nil, unwrapPermanentError(err) } @@ -111,7 +118,8 @@ func (r *retryStreamingClientConn) Send(msg any) error { return nil, nil } - _, err := backoff.Retry(r.ctx, operation, r.config.retryOptions...) + opts := r.config.getRetryOptions() + _, err := backoff.Retry(r.ctx, operation, opts...) return unwrapPermanentError(err) } @@ -125,7 +133,8 @@ func (r *retryStreamingClientConn) Receive(msg any) error { return nil, nil } - _, err := backoff.Retry(r.ctx, operation, r.config.retryOptions...) + opts := r.config.getRetryOptions() + _, err := backoff.Retry(r.ctx, operation, opts...) return unwrapPermanentError(err) } @@ -139,7 +148,8 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co return nil, nil } - _, err := backoff.Retry(ctx, operation, i.config.retryOptions...) + opts := i.config.getRetryOptions() + _, err := backoff.Retry(ctx, operation, opts...) return unwrapPermanentError(err) } } diff --git a/internal/connectrpc/interceptors/retry/interceptor_test.go b/internal/connectrpc/interceptors/retry/interceptor_test.go index 0f6391444c..5d444464f3 100644 --- a/internal/connectrpc/interceptors/retry/interceptor_test.go +++ b/internal/connectrpc/interceptors/retry/interceptor_test.go @@ -28,10 +28,12 @@ func TestInterceptor_WrapUnary(t *testing.T) { mock := &mockUnaryFunc{ err: connect.NewError(connect.CodeUnavailable, errors.New("service unavailable")), } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) wrapped := interceptor.WrapUnary(mock.Call) _, err := wrapped(context.Background(), &connect.Request[any]{}) @@ -46,10 +48,12 @@ func TestInterceptor_WrapUnary(t *testing.T) { mock := &mockUnaryFunc{ err: connect.NewError(connect.CodeInvalidArgument, errors.New("invalid argument")), } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(10*time.Millisecond), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(10 * time.Millisecond), + } + })) wrapped := interceptor.WrapUnary(mock.Call) _, err := wrapped(context.Background(), &connect.Request[any]{}) @@ -64,10 +68,12 @@ func TestInterceptor_WrapUnary(t *testing.T) { mock := &mockUnaryFunc{ err: nil, } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(10*time.Millisecond), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(10 * time.Millisecond), + } + })) wrapped := interceptor.WrapUnary(mock.Call) _, err := wrapped(context.Background(), &connect.Request[any]{}) @@ -109,10 +115,12 @@ func TestInterceptor_WrapStreamingClient(t *testing.T) { receiveErr: connect.NewError(connect.CodeUnavailable, errors.New("service unavailable")), } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) wrapped := interceptor.WrapStreamingClient(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { return mock }) @@ -131,10 +139,12 @@ func TestInterceptor_WrapStreamingClient(t *testing.T) { receiveErr: connect.NewError(connect.CodeInvalidArgument, errors.New("invalid argument")), } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) wrapped := interceptor.WrapStreamingClient(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { return mock }) @@ -152,10 +162,12 @@ func TestInterceptor_WrapStreamingClient(t *testing.T) { receiveErr: nil, } - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(2), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(2), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) wrapped := interceptor.WrapStreamingClient(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { return mock }) @@ -174,10 +186,12 @@ func TestInterceptor_WrapStreamingHandler(t *testing.T) { callCount := 0 handlerErr := connect.NewError(connect.CodeUnavailable, errors.New("unavailable")) - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(3), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(3), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) handler := interceptor.WrapStreamingHandler(func(ctx context.Context, conn connect.StreamingHandlerConn) error { callCount++ @@ -196,10 +210,12 @@ func TestInterceptor_WrapStreamingHandler(t *testing.T) { callCount := 0 handlerErr := connect.NewError(connect.CodeInvalidArgument, errors.New("invalid argument")) - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(3), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(3), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) handler := interceptor.WrapStreamingHandler(func(ctx context.Context, conn connect.StreamingHandlerConn) error { callCount++ @@ -215,10 +231,12 @@ func TestInterceptor_WrapStreamingHandler(t *testing.T) { t.Run("should not retry on success", func(t *testing.T) { callCount := 0 - interceptor := New(WithRetryOptions( - backoff.WithMaxTries(3), - backoff.WithMaxElapsedTime(30*time.Second), - )) + interceptor := New(WithRetryOptions(func() []backoff.RetryOption { + return []backoff.RetryOption{ + backoff.WithMaxTries(3), + backoff.WithMaxElapsedTime(30 * time.Second), + } + })) handler := interceptor.WrapStreamingHandler(func(ctx context.Context, conn connect.StreamingHandlerConn) error { callCount++ diff --git a/internal/sshtunnel/dialer_integration_test.go b/internal/sshtunnel/dialer_integration_test.go index f9d1ef8b7f..71ec7659ff 100644 --- a/internal/sshtunnel/dialer_integration_test.go +++ b/internal/sshtunnel/dialer_integration_test.go @@ -47,8 +47,6 @@ func TestDatabaseConnections(t *testing.T) { } dialerConfig := sshtunnel.DefaultSSHDialerConfig() dialerConfig.KeepAliveInterval = 1 * time.Second - dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) - t.Cleanup(func() { dialer.Close() }) t.Run("postgres", func(t *testing.T) { t.Parallel() @@ -69,6 +67,9 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) + connector, cleanup, err := postgrestunconnector.New( container.URL, postgrestunconnector.WithDialer(dialer), @@ -102,6 +103,8 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) connector, cleanup, err := postgrestunconnector.New( container.URL, postgrestunconnector.WithDialer(dialer), @@ -134,6 +137,8 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) connector, cleanup, err := mysqltunconnector.New( container.URL, mysqltunconnector.WithDialer(dialer), @@ -167,6 +172,8 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) connector, cleanup, err := mysqltunconnector.New( container.URL, mysqltunconnector.WithDialer(dialer), @@ -199,6 +206,8 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) connector, cleanup, err := mssqltunconnector.New( container.URL, mssqltunconnector.WithDialer(dialer), @@ -232,6 +241,8 @@ func TestDatabaseConnections(t *testing.T) { t.Run("with_dialer", func(t *testing.T) { t.Parallel() + dialer := sshtunnel.NewLazySSHDialer(addr, cconfig, dialerConfig, testutil.GetConcurrentTestLogger(t)) + t.Cleanup(func() { dialer.Close() }) connector, cleanup, err := mssqltunconnector.New( container.URL, mssqltunconnector.WithDialer(dialer), diff --git a/internal/testutil/testcontainers/sqlserver/sqlserver.go b/internal/testutil/testcontainers/sqlserver/sqlserver.go index d88d36019a..7ba4c11dbf 100644 --- a/internal/testutil/testcontainers/sqlserver/sqlserver.go +++ b/internal/testutil/testcontainers/sqlserver/sqlserver.go @@ -30,7 +30,7 @@ func NewMssqlTestSyncContainer(ctx context.Context, sourceOpts, destOpts []Optio errgrp.Go(func() error { m, err := NewMssqlTestContainer(ctx, sourceOpts...) if err != nil { - return err + return fmt.Errorf("unable to create source mssql test container: %w", err) } tc.Source = m return nil @@ -39,7 +39,7 @@ func NewMssqlTestSyncContainer(ctx context.Context, sourceOpts, destOpts []Optio errgrp.Go(func() error { m, err := NewMssqlTestContainer(ctx, destOpts...) if err != nil { - return err + return fmt.Errorf("unable to create dest mssql test container: %w", err) } tc.Target = m return nil diff --git a/worker/pkg/integration-test/dynamodb_test.go b/worker/pkg/integration-test/dynamodb_test.go index c7f6dd0a87..81e909c316 100644 --- a/worker/pkg/integration-test/dynamodb_test.go +++ b/worker/pkg/integration-test/dynamodb_test.go @@ -119,7 +119,7 @@ func test_dynamodb_alltypes( AccountId: accountId, SourceConn: sourceConn, DestConn: destConn, - JobName: "all_types", + JobName: "dynamodb_all_types", JobMappings: mappings, }, tableName, tableName)