Skip to content

Commit

Permalink
Merge pull request #91 from xataio/allow-to-provide-replication-slot-…
Browse files Browse the repository at this point in the history
…name-on-cli

Allow providing replication slot on the CLI
  • Loading branch information
eminano authored Dec 2, 2024
2 parents 7c03ccb + c30bbf6 commit 9186e3e
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 17 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ This will create the `pgstream` schema in the configured Postgres database, alon
pgstream init --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"
```

If you want to provide the name of the replication slot to be created instead of using the default value (`pgstream_<dbname>_slot`), you can use the `--replication-slot` flag or set the environment variable `PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME`.

```
pgstream init --pgurl "postgres://postgres:postgres@localhost?sslmode=disable" --replication-slot test
```

If there are any issues or if you want to clean up the pgstream setup, you can run the following.

```
Expand Down Expand Up @@ -121,9 +127,10 @@ Here's a list of all the environment variables that can be used to configure the
<details>
<summary>Postgres Listener</summary>

| Environment Variable | Default | Required | Description |
| ------------------------------ | ------- | -------- | -------------------------------------------------------------------- |
| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
| Environment Variable | Default | Required | Description |
| --------------------------------------- | -------------------------- | -------- | -------------------------------------------------------------------- |
| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
| PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME | "pgstream\_<dbname>\_slot" | No | Name of the Postgres replication slot name. |

</details>

Expand Down
11 changes: 10 additions & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func pgURL() string {
return viper.GetString("PGSTREAM_POSTGRES_LISTENER_URL")
}

func replicationSlotName() string {
replicationslot := viper.GetString("replication-slot")
if replicationslot != "" {
return replicationslot
}
return viper.GetString("PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME")
}

func parseStreamConfig() *stream.Config {
return &stream.Config{
Listener: parseListenerConfig(),
Expand All @@ -65,7 +73,8 @@ func parsePostgresListenerConfig() *stream.PostgresListenerConfig {

return &stream.PostgresListenerConfig{
Replication: pgreplication.Config{
PostgresURL: pgURL,
PostgresURL: pgURL,
ReplicationSlotName: replicationSlotName(),
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/init_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var initCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
sp, _ := pterm.DefaultSpinner.WithText("initialising pgstream...").Start()

if err := stream.Init(context.Background(), pgURL()); err != nil {
if err := stream.Init(context.Background(), pgURL(), replicationSlotName()); err != nil {
sp.Fail(err.Error())
return err
}
Expand All @@ -34,7 +34,7 @@ var tearDownCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
sp, _ := pterm.DefaultSpinner.WithText("tearing down pgstream...").Start()

if err := stream.TearDown(context.Background(), pgURL()); err != nil {
if err := stream.TearDown(context.Background(), pgURL(), replicationSlotName()); err != nil {
sp.Fail(err.Error())
return err
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ func init() {
viper.AutomaticEnv()

rootCmd.PersistentFlags().String("pgurl", "postgres://postgres:postgres@localhost?sslmode=disable", "Postgres URL")
rootCmd.PersistentFlags().String("replication-slot", "", "Name of the postgres replication slot to be created")
rootCmd.PersistentFlags().StringP("config", "c", "", ".env config file to use if any")
rootCmd.PersistentFlags().String("log-level", "debug", "log level for the application")

viper.BindPFlag("pgurl", rootCmd.PersistentFlags().Lookup("pgurl"))
viper.BindPFlag("replication-slot", rootCmd.PersistentFlags().Lookup("replication-slot"))
viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config"))
viper.BindPFlag("PGSTREAM_LOG_LEVEL", rootCmd.PersistentFlags().Lookup("log-level"))
}
Expand Down
4 changes: 4 additions & 0 deletions internal/postgres/pg_replication_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (c *ReplicationConn) Close(ctx context.Context) error {
return mapError(c.conn.Close(ctx))
}

func DefaultReplicationSlotName(dbName string) string {
return "pgstream_" + dbName + "_slot"
}

type Error struct {
Severity string
Msg string
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/integration/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestMain(m *testing.M) {
}
defer pgcleanup()

if err := stream.Init(ctx, pgurl); err != nil {
if err := stream.Init(ctx, pgurl, ""); err != nil {
log.Fatal(err)
}

Expand Down
23 changes: 14 additions & 9 deletions pkg/stream/stream_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"

pglib "github.com/xataio/pgstream/internal/postgres"
pgmigrations "github.com/xataio/pgstream/migrations/postgres"

"github.com/golang-migrate/migrate/v4"
Expand All @@ -23,7 +24,7 @@ const (

// Init initialises the pgstream state in the postgres database provided, along
// with creating the relevant replication slot.
func Init(ctx context.Context, pgURL string) error {
func Init(ctx context.Context, pgURL, replicationSlotName string) error {
conn, err := newPGConn(ctx, pgURL)
if err != nil {
return err
Expand All @@ -45,9 +46,11 @@ func Init(ctx context.Context, pgURL string) error {
return fmt.Errorf("failed to run internal pgstream migrations: %w", err)
}

replicationSlotName, err := getReplicationSlotName(pgURL)
if err != nil {
return err
if replicationSlotName == "" {
replicationSlotName, err = getReplicationSlotName(pgURL)
if err != nil {
return err
}
}

if err := createReplicationSlot(ctx, conn, replicationSlotName); err != nil {
Expand All @@ -59,16 +62,18 @@ func Init(ctx context.Context, pgURL string) error {

// TearDown removes the pgstream state from the postgres database provided,
// as well as removing the replication slot.
func TearDown(ctx context.Context, pgURL string) error {
func TearDown(ctx context.Context, pgURL, replicationSlotName string) error {
conn, err := newPGConn(ctx, pgURL)
if err != nil {
return err
}
defer conn.Close(ctx)

replicationSlotName, err := getReplicationSlotName(pgURL)
if err != nil {
return err
if replicationSlotName == "" {
replicationSlotName, err = getReplicationSlotName(pgURL)
if err != nil {
return err
}
}

if err := dropReplicationSlot(ctx, conn, replicationSlotName); err != nil {
Expand Down Expand Up @@ -162,5 +167,5 @@ func getReplicationSlotName(pgURL string) (string, error) {
if cfg.Database != "" {
dbName = cfg.Database
}
return fmt.Sprintf("pgstream_%s_slot", dbName), nil
return pglib.DefaultReplicationSlotName(dbName), nil
}
2 changes: 1 addition & 1 deletion pkg/wal/replication/postgres/pg_replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (h *Handler) StartReplication(ctx context.Context) error {
}

if h.pgReplicationSlotName == "" {
h.pgReplicationSlotName = fmt.Sprintf("pgstream_%s_slot", sysID.DBName)
h.pgReplicationSlotName = pglib.DefaultReplicationSlotName(sysID.DBName)
}

logFields := loglib.Fields{
Expand Down

0 comments on commit 9186e3e

Please sign in to comment.