Skip to content

Commit

Permalink
feat: add configurable schema name for tables
Browse files Browse the repository at this point in the history
* Upgraded dependencies versions, Go version, added support for schema in tables definitions

* Updated documentation

* Reverted Go and dependencies versions, used fmt.Sprintf to concatenate strings, added default "public" value for table schema name in code and documentation

* Reverted Go and dependencies versions, used fmt.Sprintf to concatenate strings, added default "public" value for table schema name in code and documentation

* Fixed setting default schema name for tables.

* Fixed TableID variable naming

* An attempt to fix error: cannot call pointer method DSN on "github.com/Trendyol/go-pq-cdc/config".Config
  • Loading branch information
lospejos authored Jan 29, 2025
1 parent a1b4ab1 commit 363437e
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ This setup ensures continuous data synchronization and minimal downtime in captu
| `publication.tables` | []Table | yes | - | Set tables which are tracked by data change capture | Define multiple tables as needed. |
| `publication.tables[i].name` | string | yes | - | Set the data change captured table name | Must be a valid table name in the specified database. |
| `publication.tables[i].replicaIdentity` | string | yes | - | Set the data change captured table replica identity [`FULL`, `DEFAULT`] | **FULL:** Captures all columns when a row is updated or deleted. <br> **DEFAULT:** Captures only the primary key when a row is updated or deleted. |
| `publication.tables[i].schema` | string | no | public | Set the data change captured table schema name | Must be a valid table name in the specified database. |
| `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
| `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
| `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
Expand Down
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type LoggerConfig struct {
LogLevel slog.Level `json:"level" yaml:"level"` // if custom logger is nil, set the slog log level
}

func (c Config) DSN() string {
func (c *Config) DSN() string {
return fmt.Sprintf("postgres://%s:%s@%s/%s?replication=database", c.Username, c.Password, c.Host, c.Database)
}

Expand All @@ -53,6 +53,13 @@ func (c *Config) SetDefault() {
if c.Logger.Logger == nil {
c.Logger.Logger = logger.NewSlog(c.Logger.LogLevel)
}

// Set default schema names for tables
for tableID, table := range c.Publication.Tables {
if table.Schema == "" {
c.Publication.Tables[tableID].Schema = "public"
}
}
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions example/postgresql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
Tables: publication.Tables{publication.Table{
Name: "users",
ReplicaIdentity: publication.ReplicaIdentityDefault,
Schema: "public",
}},
},
Slot: slot.Config{
Expand Down
4 changes: 3 additions & 1 deletion example/simple-file-config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ publication:
tables:
- name: users
replicaIdentity: FULL
schema: public
slot:
createIfNotExists: true
name: cdc_slot
slotActivityCheckerInterval: 2000
metric:
port: 8083
logger:
logLevel: INFO
level: DEBUG
debugMode: true
1 change: 1 addition & 0 deletions example/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func main() {
Tables: publication.Tables{publication.Table{
Name: "users",
ReplicaIdentity: publication.ReplicaIdentityFull,
Schema: "public",
}},
},
Slot: slot.Config{
Expand Down
9 changes: 8 additions & 1 deletion integration_test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ func containerRequest(cfg config.Config) (testcontainers.GenericContainerRequest
}

func newPostgresConn() (pq.Connection, error) {
return pq.NewConnection(context.TODO(), config.Config{Host: Config.Host, Username: "postgres", Password: "postgres", Database: Config.Database}.DSN())
c := config.Config{
Host: Config.Host,
Username: "postgres",
Password: "postgres",
Database: Config.Database,
}

return pq.NewConnection(context.TODO(), c.DSN())
}

func SetupTestDB(ctx context.Context, conn pq.Connection, cfg config.Config) error {
Expand Down
13 changes: 10 additions & 3 deletions pq/publication/replica_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
)

var (
ErrorTablesNotExists = goerrors.New("table is not exists")
ErrorTablesNotExists = goerrors.New("table does not exists")
ReplicaIdentityOptions = []string{ReplicaIdentityDefault, ReplicaIdentityFull}
ReplicaIdentityMap = map[string]string{
"d": ReplicaIdentityDefault, // primary key on old value
Expand Down Expand Up @@ -66,10 +66,17 @@ func (c *Publication) GetReplicaIdentities(ctx context.Context) ([]Table, error)
tableNames := make([]string, len(c.cfg.Tables))

for i, t := range c.cfg.Tables {
tableNames[i] = "'" + t.Name + "'"
if t.Schema == "" && !strings.Contains(t.Name, ".") {
tableNames[i] = "'" + t.Name + "'"
} else {
tableNames[i] = "'" + t.Schema + "." + t.Name + "'"
}

}

query := "SELECT relname AS table_name, relreplident AS replica_identity FROM pg_class WHERE relname IN (" + strings.Join(tableNames, ", ") + ")"
query := fmt.Sprintf("SELECT relname AS table_name, n.nspname AS schema_name, relreplident AS replica_identity FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE concat(n.nspname, '.', c.relname) IN (%s)", strings.Join(tableNames, ", "))

logger.Debug("executing query: ", query)

resultReader := c.conn.Exec(ctx, query)
results, err := resultReader.ReadAll()
Expand Down
1 change: 1 addition & 0 deletions pq/publication/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Table struct {
Name string `json:"name" yaml:"name"`
ReplicaIdentity string `json:"replicaIdentity" yaml:"replicaIdentity"`
Schema string `json:"schema,omitempty" yaml:"schema,omitempty"`
}

func (tc Table) Validate() error {
Expand Down

0 comments on commit 363437e

Please sign in to comment.