diff --git a/README.md b/README.md index 63f11e3..1cb2d1d 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ This setup ensures continuous data synchronization and minimal downtime in captu | `publication.tables` | []Table | yes | - | Set tables which are tracked by data change capture | Define multiple tables as needed. | | `publication.tables[i].name` | string | yes | - | Set the data change captured table name | Must be a valid table name in the specified database. | | `publication.tables[i].replicaIdentity` | string | yes | - | Set the data change captured table replica identity [`FULL`, `DEFAULT`] | **FULL:** Captures all columns when a row is updated or deleted.
**DEFAULT:** Captures only the primary key when a row is updated or deleted. | +| `publication.tables[i].schema` | string | no | public | Set the data change captured table schema name | Must be a valid table name in the specified database. | | `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | | | `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. | | `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). | diff --git a/config/config.go b/config/config.go index 73f2928..0f3513b 100644 --- a/config/config.go +++ b/config/config.go @@ -33,7 +33,7 @@ type LoggerConfig struct { LogLevel slog.Level `json:"level" yaml:"level"` // if custom logger is nil, set the slog log level } -func (c Config) DSN() string { +func (c *Config) DSN() string { return fmt.Sprintf("postgres://%s:%s@%s/%s?replication=database", c.Username, c.Password, c.Host, c.Database) } @@ -53,6 +53,13 @@ func (c *Config) SetDefault() { if c.Logger.Logger == nil { c.Logger.Logger = logger.NewSlog(c.Logger.LogLevel) } + + // Set default schema names for tables + for tableID, table := range c.Publication.Tables { + if table.Schema == "" { + c.Publication.Tables[tableID].Schema = "public" + } + } } func (c *Config) Validate() error { diff --git a/example/postgresql/main.go b/example/postgresql/main.go index 8bcdff5..1fb822b 100644 --- a/example/postgresql/main.go +++ b/example/postgresql/main.go @@ -79,6 +79,7 @@ func main() { Tables: publication.Tables{publication.Table{ Name: "users", ReplicaIdentity: publication.ReplicaIdentityDefault, + Schema: "public", }}, }, Slot: slot.Config{ diff --git a/example/simple-file-config/config.yml b/example/simple-file-config/config.yml index 155e387..5afd9bd 100644 --- a/example/simple-file-config/config.yml +++ b/example/simple-file-config/config.yml @@ -12,6 +12,7 @@ publication: tables: - name: users replicaIdentity: FULL + schema: public slot: createIfNotExists: true name: cdc_slot @@ -19,4 +20,5 @@ slot: metric: port: 8083 logger: - logLevel: INFO \ No newline at end of file + level: DEBUG +debugMode: true \ No newline at end of file diff --git a/example/simple/main.go b/example/simple/main.go index 5a175d3..977ca53 100644 --- a/example/simple/main.go +++ b/example/simple/main.go @@ -47,6 +47,7 @@ func main() { Tables: publication.Tables{publication.Table{ Name: "users", ReplicaIdentity: publication.ReplicaIdentityFull, + Schema: "public", }}, }, Slot: slot.Config{ diff --git a/integration_test/main_test.go b/integration_test/main_test.go index fa6c133..34b7eb9 100644 --- a/integration_test/main_test.go +++ b/integration_test/main_test.go @@ -116,7 +116,14 @@ func containerRequest(cfg config.Config) (testcontainers.GenericContainerRequest } func newPostgresConn() (pq.Connection, error) { - return pq.NewConnection(context.TODO(), config.Config{Host: Config.Host, Username: "postgres", Password: "postgres", Database: Config.Database}.DSN()) + c := config.Config{ + Host: Config.Host, + Username: "postgres", + Password: "postgres", + Database: Config.Database, + } + + return pq.NewConnection(context.TODO(), c.DSN()) } func SetupTestDB(ctx context.Context, conn pq.Connection, cfg config.Config) error { diff --git a/pq/publication/replica_identity.go b/pq/publication/replica_identity.go index 91cfa99..15b7346 100644 --- a/pq/publication/replica_identity.go +++ b/pq/publication/replica_identity.go @@ -17,7 +17,7 @@ const ( ) var ( - ErrorTablesNotExists = goerrors.New("table is not exists") + ErrorTablesNotExists = goerrors.New("table does not exists") ReplicaIdentityOptions = []string{ReplicaIdentityDefault, ReplicaIdentityFull} ReplicaIdentityMap = map[string]string{ "d": ReplicaIdentityDefault, // primary key on old value @@ -66,10 +66,17 @@ func (c *Publication) GetReplicaIdentities(ctx context.Context) ([]Table, error) tableNames := make([]string, len(c.cfg.Tables)) for i, t := range c.cfg.Tables { - tableNames[i] = "'" + t.Name + "'" + if t.Schema == "" && !strings.Contains(t.Name, ".") { + tableNames[i] = "'" + t.Name + "'" + } else { + tableNames[i] = "'" + t.Schema + "." + t.Name + "'" + } + } - query := "SELECT relname AS table_name, relreplident AS replica_identity FROM pg_class WHERE relname IN (" + strings.Join(tableNames, ", ") + ")" + query := fmt.Sprintf("SELECT relname AS table_name, n.nspname AS schema_name, relreplident AS replica_identity FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE concat(n.nspname, '.', c.relname) IN (%s)", strings.Join(tableNames, ", ")) + + logger.Debug("executing query: ", query) resultReader := c.conn.Exec(ctx, query) results, err := resultReader.ReadAll() diff --git a/pq/publication/table.go b/pq/publication/table.go index da9c28b..32b21d9 100644 --- a/pq/publication/table.go +++ b/pq/publication/table.go @@ -10,6 +10,7 @@ import ( type Table struct { Name string `json:"name" yaml:"name"` ReplicaIdentity string `json:"replicaIdentity" yaml:"replicaIdentity"` + Schema string `json:"schema,omitempty" yaml:"schema,omitempty"` } func (tc Table) Validate() error {