Skip to content

Commit

Permalink
Fixes PG Data Type Arrays on schema init, re-enables integration test…
Browse files Browse the repository at this point in the history
…s in CI (#3202)
  • Loading branch information
nickzelei authored Jan 30, 2025
1 parent b97eaab commit b7d79be
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 93 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:

- name: Run Integration Tests
run: |
go test -race -timeout 1800s -coverprofile=integration-coverage.out -covermode=atomic -run TestIntegrationTestSuite ./...
go test -race -timeout 1800s -coverprofile=integration-coverage.out -covermode=atomic ./...
env:
INTEGRATION_TESTS_ENABLED: 1
S3_INTEGRATION_TESTS_ENABLED: 1
Expand Down Expand Up @@ -206,7 +206,7 @@ jobs:
with:
go-version-file: go.mod
cache-dependency-path: go.sum

- name: Install benchstat
run: go install golang.org/x/perf/cmd/benchstat@latest

Expand All @@ -217,7 +217,7 @@ jobs:
path: main-benchmark.txt
key: benchmarks-main
restore-keys: benchmarks-main

- name: Run benchmarks on main if cache miss
if: steps.cache-restore.outputs.cache-hit != 'true'
run: |
Expand Down Expand Up @@ -263,7 +263,7 @@ jobs:
echo -e "\n<details>\n<summary>Benchstat results</summary>\n\n\`\`\`\n$(cat benchstat-output.txt)\n\`\`\`\n</details>\n" >> benchstat-results.txt
- name: Comment Benchstat Results on PR
if: github.event_name == 'pull_request'
if: github.event_name == 'pull_request'
uses: thollander/actions-comment-pull-request@v3
with:
file-path: benchstat-results.txt
Expand All @@ -277,4 +277,3 @@ jobs:
with:
path: branch-benchmark.txt
key: benchmarks-main

7 changes: 6 additions & 1 deletion backend/gen/go/db/dbschemas/postgresql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion backend/pkg/dbschemas/sql/postgresql/queries/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,18 @@ WITH custom_types AS (
table_columns AS (
SELECT
c.oid AS table_oid,
a.atttypid AS type_oid
CASE
WHEN t.typtype = 'b' THEN t.typelem -- If it's an array, use the element type
ELSE a.atttypid -- Otherwise use the type directly
END AS type_oid
FROM
pg_catalog.pg_class c
JOIN
pg_catalog.pg_namespace n ON n.oid = c.relnamespace
JOIN
pg_catalog.pg_attribute a ON a.attrelid = c.oid
JOIN
pg_catalog.pg_type t ON t.oid = a.atttypid
WHERE
n.nspname = sqlc.arg('schema')
AND c.relname = ANY(sqlc.arg('tables')::TEXT[])
Expand Down
10 changes: 7 additions & 3 deletions backend/pkg/sqlmanager/mssql/mssql-manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func Test_MssqlManager(t *testing.T) {
t.Run("GetDatabaseSchema", func(t *testing.T) {
t.Parallel()
var expectedIdentityGeneration = "IDENTITY(1,1)"
var expectedIdentitySeed = 1
var expectedIdentityIncrement = 1
expectedSubset := []*sqlmanager_shared.DatabaseSchemaRow{
{
TableSchema: "sqlmanagermssql3",
Expand All @@ -66,6 +68,9 @@ func Test_MssqlManager(t *testing.T) {
OrdinalPosition: 1,
GeneratedType: nil,
IdentityGeneration: &expectedIdentityGeneration,
ColumnDefaultType: nil,
IdentitySeed: &expectedIdentitySeed,
IdentityIncrement: &expectedIdentityIncrement,
},
}

Expand Down Expand Up @@ -347,9 +352,8 @@ func Test_MssqlManager(t *testing.T) {
// }

func containsSubset[T any](t testing.TB, array, subset []T) {
t.Helper()
for _, elem := range subset {
require.Contains(t, array, elem)
for idx, elem := range subset {
require.Contains(t, array, elem, "array does not contain expected subset element at index %d", idx)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ func Test_PostgresManager(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, statements)

allStmts := []string{}
for _, block := range statements {
allStmts = append(allStmts, block.Statements...)
}
require.NotEmpty(t, allStmts)

for _, block := range statements {
t.Logf("executing %d statements for label %q", len(block.Statements), block.Label)
require.NotEmpty(t, block.Statements)
for _, stmt := range block.Statements {
_, err = target.DB.Exec(ctx, stmt)
require.NoError(t, err, "failed to execute %s statement %q", block.Label, stmt)
Expand Down
22 changes: 20 additions & 2 deletions backend/pkg/sqlmanager/postgres/testdata/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ CREATE TYPE custom_type AS (
part1 INTEGER,
part2 TEXT
);

CREATE TYPE custom_type2 AS (
part1 INTEGER,
part2 TEXT
);

CREATE FUNCTION custom_function() RETURNS TRIGGER AS $$
BEGIN
NEW.id := nextval('custom_seq');
Expand All @@ -97,11 +103,23 @@ CREATE TYPE custom_enum AS ENUM (
CREATE DOMAIN custom_domain AS TEXT
CHECK (VALUE ~ '^[a-zA-Z]+$'); -- Only allows alphabetic characters

CREATE DOMAIN custom_domain2 AS TEXT
CHECK (VALUE ~ '^[a-zA-Z]+$'); -- Only allows alphabetic characters

CREATE TYPE custom_enum2 AS ENUM (
'value4',
'value5',
'value6'
);

CREATE TABLE custom_table (
id INTEGER NOT NULL DEFAULT nextval('custom_seq'),
name custom_domain NOT NULL,
names custom_domain2[] NOT NULL, -- testing custom domain as array
data custom_type,
datas custom_type2[] NOT NULL, -- testing custom composite type as array
status custom_enum NOT NULL,
statuses custom_enum2[] NOT NULL, -- testing custom data type as array
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW()
);
-- Adding a trigger to use the custom function for setting the 'id' field
Expand Down Expand Up @@ -140,7 +158,7 @@ CREATE TABLE IF NOT EXISTS "CaPiTaL"."BadName" (
);

INSERT INTO "CaPiTaL"."BadName" ("NAME")
VALUES
VALUES
('Xk7pQ9nM3v'),
('Rt5wLjH2yB'),
('Zc8fAe4dN6'),
Expand All @@ -154,7 +172,7 @@ CREATE TABLE "CaPiTaL"."Bad Name 123!@#" (


INSERT INTO "CaPiTaL"."Bad Name 123!@#" ("NAME")
VALUES
VALUES
('Xk7pQ9nM3v'),
('Rt5wLjH2yB'),
('Zc8fAe4dN6'),
Expand Down
52 changes: 30 additions & 22 deletions backend/pkg/sqlretry/dbtx_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,61 +24,68 @@ type RetryDBTX struct {
var _ sqldbtx.DBTX = (*RetryDBTX)(nil)

type config struct {
retryOpts []backoff.RetryOption
getRetryOpts func() []backoff.RetryOption
}

type Option func(*config)

func NewDefault(dbtx sqldbtx.DBTX, logger *slog.Logger) *RetryDBTX {
backoffStrategy := backoff.NewExponentialBackOff()
backoffStrategy.InitialInterval = 200 * time.Millisecond
backoffStrategy.MaxInterval = 30 * time.Second
backoffStrategy.Multiplier = 2
backoffStrategy.RandomizationFactor = 0.3

return New(dbtx, WithRetryOptions(
backoff.WithBackOff(backoffStrategy),
backoff.WithMaxTries(25),
backoff.WithMaxElapsedTime(5*time.Minute),
backoff.WithNotify(func(err error, d time.Duration) {
logger.Warn(fmt.Sprintf("sql error with retry: %s, retrying in %s", err.Error(), d.String()))
}),
func() []backoff.RetryOption {
backoffStrategy := backoff.NewExponentialBackOff()
backoffStrategy.InitialInterval = 200 * time.Millisecond
backoffStrategy.MaxInterval = 30 * time.Second
backoffStrategy.Multiplier = 2
backoffStrategy.RandomizationFactor = 0.3
return []backoff.RetryOption{
backoff.WithBackOff(backoffStrategy),
backoff.WithMaxTries(25),
backoff.WithMaxElapsedTime(5 * time.Minute),
backoff.WithNotify(func(err error, d time.Duration) {
logger.Warn(fmt.Sprintf("sql error with retry: %s, retrying in %s", err.Error(), d.String()))
}),
}
},
))
}

func noRetryOptions() []backoff.RetryOption {
return []backoff.RetryOption{}
}

func New(dbtx sqldbtx.DBTX, opts ...Option) *RetryDBTX {
cfg := &config{}
cfg := &config{getRetryOpts: noRetryOptions}
for _, opt := range opts {
opt(cfg)
}
return &RetryDBTX{dbtx: dbtx, config: cfg}
}

func WithRetryOptions(opts ...backoff.RetryOption) Option {
func WithRetryOptions(getRetryOpts func() []backoff.RetryOption) Option {
return func(cfg *config) {
cfg.retryOpts = opts
cfg.getRetryOpts = getRetryOpts
}
}

func (r *RetryDBTX) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
operation := func() (sql.Result, error) {
return r.dbtx.ExecContext(ctx, query, args...)
}
return retry(ctx, operation, r.config.retryOpts...)
return retry(ctx, operation, r.config.getRetryOpts)
}

func (r *RetryDBTX) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
operation := func() (*sql.Stmt, error) {
return r.dbtx.PrepareContext(ctx, query)
}
return retry(ctx, operation, r.config.retryOpts...)
return retry(ctx, operation, r.config.getRetryOpts)
}

func (r *RetryDBTX) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
operation := func() (*sql.Rows, error) {
return r.dbtx.QueryContext(ctx, query, args...)
}
return retry(ctx, operation, r.config.retryOpts...)
return retry(ctx, operation, r.config.getRetryOpts)
}

func (r *RetryDBTX) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
Expand All @@ -89,18 +96,19 @@ func (r *RetryDBTX) PingContext(ctx context.Context) error {
operation := func() (any, error) {
return nil, r.dbtx.PingContext(ctx)
}
_, err := retry(ctx, operation, r.config.retryOpts...)
_, err := retry(ctx, operation, r.config.getRetryOpts)
return err
}

func (r *RetryDBTX) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
operation := func() (*sql.Tx, error) {
return r.dbtx.BeginTx(ctx, opts)
}
return retry(ctx, operation, r.config.retryOpts...)
return retry(ctx, operation, r.config.getRetryOpts)
}

func retry[T any](ctx context.Context, fn func() (T, error), opts ...backoff.RetryOption) (T, error) {
func retry[T any](ctx context.Context, fn func() (T, error), getOpts func() []backoff.RetryOption) (T, error) {
opts := getOpts()
return retryUnwrap(backoff.Retry(ctx, retryWrap(fn), opts...))
}

Expand Down
2 changes: 1 addition & 1 deletion cli/internal/cmds/neosync/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func isConfigValid(cmd *cmdConfig, logger *slog.Logger, sourceConnection *mgmtv1
return fmt.Errorf("truncate cascade is only supported in postgres")
}

if cmd.Destination.OnConflict.DoNothing && cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled {
if cmd.Destination != nil && cmd.Destination.OnConflict.DoNothing && cmd.Destination.OnConflict.DoUpdate != nil && cmd.Destination.OnConflict.DoUpdate.Enabled {
return errors.New("on-conflict-do-nothing and on-conflict-do-update cannot be used together")
}

Expand Down
42 changes: 26 additions & 16 deletions internal/connectrpc/interceptors/retry/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,41 @@ type Interceptor struct {
var _ connect.Interceptor = &Interceptor{}

type config struct {
retryOptions []backoff.RetryOption
getRetryOptions func() []backoff.RetryOption
}

type Option func(*config)

func DefaultRetryInterceptor(logger *slog.Logger) *Interceptor {
return New(
WithRetryOptions(
backoff.WithBackOff(backoff.NewExponentialBackOff()),
backoff.WithMaxTries(10),
backoff.WithMaxElapsedTime(1*time.Minute),
backoff.WithNotify(func(err error, d time.Duration) {
logger.Warn(fmt.Sprintf("error with retry: %s, retrying in %s", err.Error(), d.String()))
}),
),
WithRetryOptions(func() []backoff.RetryOption {
return []backoff.RetryOption{
backoff.WithBackOff(backoff.NewExponentialBackOff()),
backoff.WithMaxTries(10),
backoff.WithMaxElapsedTime(1 * time.Minute),
backoff.WithNotify(func(err error, d time.Duration) {
logger.Warn(fmt.Sprintf("error with retry: %s, retrying in %s", err.Error(), d.String()))
}),
}
}),
)
}

func noRetryOptions() []backoff.RetryOption {
return []backoff.RetryOption{}
}

func New(opts ...Option) *Interceptor {
cfg := &config{}
cfg := &config{getRetryOptions: noRetryOptions}
for _, opt := range opts {
opt(cfg)
}
return &Interceptor{config: cfg}
}

func WithRetryOptions(opts ...backoff.RetryOption) Option {
func WithRetryOptions(getRetryOptions func() []backoff.RetryOption) Option {
return func(cfg *config) {
cfg.retryOptions = opts
cfg.getRetryOptions = getRetryOptions
}
}

Expand All @@ -60,7 +66,8 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return response, nil
}

res, err := backoff.Retry(ctx, operation, i.config.retryOptions...)
opts := i.config.getRetryOptions()
res, err := backoff.Retry(ctx, operation, opts...)
if err != nil {
return nil, unwrapPermanentError(err)
}
Expand Down Expand Up @@ -111,7 +118,8 @@ func (r *retryStreamingClientConn) Send(msg any) error {
return nil, nil
}

_, err := backoff.Retry(r.ctx, operation, r.config.retryOptions...)
opts := r.config.getRetryOptions()
_, err := backoff.Retry(r.ctx, operation, opts...)
return unwrapPermanentError(err)
}

Expand All @@ -125,7 +133,8 @@ func (r *retryStreamingClientConn) Receive(msg any) error {
return nil, nil
}

_, err := backoff.Retry(r.ctx, operation, r.config.retryOptions...)
opts := r.config.getRetryOptions()
_, err := backoff.Retry(r.ctx, operation, opts...)
return unwrapPermanentError(err)
}

Expand All @@ -139,7 +148,8 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co
return nil, nil
}

_, err := backoff.Retry(ctx, operation, i.config.retryOptions...)
opts := i.config.getRetryOptions()
_, err := backoff.Retry(ctx, operation, opts...)
return unwrapPermanentError(err)
}
}
Expand Down
Loading

0 comments on commit b7d79be

Please sign in to comment.