Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Source Postgres #79

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions drivers/postgres/generated.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"github_com-piyushsingariya-gear5-utils-SSLConfig": {
"properties": {
"client_cert": {
"title": "Client Certificate",
"type": "string"
},
"client_key": {
"title": "Client Certificate Key",
"type": "string"
},
"mode": {
"enum": ["require", "disable", "verify-ca", "verify-full"],
"title": "SSL mode",
"type": "string"
},
"server_ca": {
"title": "CA Certificate",
"type": "string"
}
},
"required": ["mode"],
"title": "SSLConfig is a dto for deserialized SSL configuration for Postgres",
"type": "object",
"x-go-path": "github.com/datazip-inc/olake/utils/SSLConfig"
}
},
"properties": {
"database": {
"title": "Name of the database.",
"type": "string"
},
"host": {
"title": "Hostname of the database.",
"type": "string"
},
"jdbc_url_params": {
"additionalProperties": true,
"description": "Additional properties to pass to the JDBC URL string when connecting to the database. For more information read about https://jdbc.postgresql.org/documentation/head/connect.html",
"title": "JDBC URL Parameters (Advanced)",
"type": "object"
},
"password": {
"title": "password of the user.",
"type": "string"
},
"port": {
"default": 5432,
"maximum": 65536,
"title": "Port of the database.",
"type": "integer"
},
"ssl": {
"$ref": "#/definitions/github_com-piyushsingariya-gear5-utils-SSLConfig",
"title": "Hostname of the database."
},
"update_method": {
"oneOf": [
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Standard Sync",
"type": "object",
"x-go-path": "github.com/datazip-inc/olake/drivers/postgres/internal/Standard"
},
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"properties": {
"intial_wait_time": {
"default": 0,
"title": "Initial Wait Time for first CDC Log",
"type": "integer"
},
"replication_slot": {
"description": "Read about replication slots.",
"title": "A plugin logical replication slot.",
"type": "string"
}
},
"required": ["replication_slot", "intial_wait_time"],
"title": "Capture Write Ahead Logs",
"type": "object",
"x-go-path": "github.com/datazip-inc/olake/drivers/postgres/internal/CDC"
}
],
"title": "Configures how data is extracted from the database.",
"type": "object"
},
"username": {
"title": "user of the database.",
"type": "string"
}
},
"required": [
"host",
"port",
"database",
"username",
"password",
"ssl",
"update_method"
],
"type": "object",
"x-go-path": "github.com/datazip-inc/olake/drivers/postgres/internal/Config"
}
80 changes: 80 additions & 0 deletions drivers/postgres/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
module github.com/datazip-inc/olake/drivers/postgres

go 1.22

toolchain go1.22.5

require (
github.com/datazip-inc/olake v0.0.0-20230630130252-054496f39abb
github.com/lib/pq v1.10.9
)

require (
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go v1.43.31 // indirect
github.com/felixge/fgprof v0.9.5 // indirect
github.com/fraugster/parquet-go v0.12.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v5 v5.7.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/magiconair/properties v1.8.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/parquet-go/parquet-go v0.24.0 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/viper v1.3.2 // indirect
github.com/xitongsys/parquet-go v1.6.2 // indirect
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
)

require (
github.com/brainicorn/ganno v0.0.0-20220304182003-e638228cd865 // indirect
github.com/brainicorn/goblex v0.0.0-20210908194630-cfe0cfdf87dd // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgx/v4 v4.18.1
github.com/jmoiron/sqlx v1.4.0
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/datazip-inc/olake => ../../
138 changes: 138 additions & 0 deletions drivers/postgres/internal/cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package driver

import (
"context"
"fmt"
"time"

"github.com/datazip-inc/olake/drivers/base"
"github.com/datazip-inc/olake/pkg/jdbc"
"github.com/datazip-inc/olake/pkg/waljs"
"github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"
"github.com/jmoiron/sqlx"
)

func (p *Postgres) prepareWALJSConfig(streams ...protocol.Stream) (*waljs.Config, error) {
if !p.Driver.CDCSupport {
return nil, fmt.Errorf("Invalid call; %s not running in CDC mode", p.Type())
}

config := &waljs.Config{
Connection: *p.config.Connection,
ReplicationSlotName: p.cdcConfig.ReplicationSlot,
InitialWaitTime: time.Duration(p.cdcConfig.InitialWaitTime) * time.Second,
State: p.cdcState,
FullSyncTables: types.NewSet[protocol.Stream](),
Tables: types.NewSet[protocol.Stream](),
BatchSize: p.config.BatchSize,
}

for _, stream := range streams {
if stream.Self().GetStateCursor() == nil {
config.FullSyncTables.Insert(stream)
}

config.Tables.Insert(stream)
}

return config, nil
}

func (p *Postgres) StateType() types.StateType {
return types.MixedType
}

// func (p *Postgres) GlobalState() any {
// return p.cdcState
// }

func (p *Postgres) SetupGlobalState(state *types.State) error {
state.Type = p.StateType()
// Setup raw state
p.cdcState = types.NewGlobalState(&waljs.WALState{})

return base.ManageGlobalState(state, p.cdcState, p)
}

// Write Ahead Log Sync
func (p *Postgres) RunChangeStream(pool *protocol.WriterPool, streams ...protocol.Stream) error {
cdcCtx := context.TODO()
config, err := p.prepareWALJSConfig(streams...)
if err != nil {
return err
}

socket, err := waljs.NewConnection(p.client, config)
if err != nil {
return err
}
insertionMap := make(map[protocol.Stream]protocol.InsertFunction)
for _, stream := range streams {
insert, err := pool.NewThread(cdcCtx, stream)
if err != nil {
return err
}
defer insert.Close()
insertionMap[stream] = insert.Insert
}

return socket.OnMessage(func(message waljs.WalJSChange) (bool, error) {
if message.Kind == "delete" {
message.Data[jdbc.CDCDeletedAt] = message.Timestamp
}
if message.LSN != nil {
message.Data[jdbc.CDCLSN] = message.LSN
}
message.Data[jdbc.CDCUpdatedAt] = message.Timestamp

// get olake_key_id
olakeID := utils.GetKeysHash(message.Data, message.Stream.GetStream().SourceDefinedPrimaryKey.Array()...)

// insert record
rawRecord := types.CreateRawRecord(olakeID, message.Data, message.Timestamp.UnixMilli())
exit, err := insertionMap[message.Stream](rawRecord)
if err != nil {
return false, err
}
if exit {
return false, nil
}

// TODO: State Management

return false, nil
})
}

func doesReplicationSlotExists(conn *sqlx.DB, slotName string) (bool, error) {
var exists bool
err := conn.QueryRow(
"SELECT EXISTS(Select 1 from pg_replication_slots where slot_name = $1)",
slotName,
).Scan(&exists)
if err != nil {
return false, err
}

return exists, validateReplicationSlot(conn, slotName)
}

func validateReplicationSlot(conn *sqlx.DB, slotName string) error {
slot := waljs.ReplicationSlot{}
err := conn.Get(&slot, fmt.Sprintf(waljs.ReplicationSlotTempl, slotName))
if err != nil {
return err
}

if slot.Plugin != "wal2json" {
return fmt.Errorf("Plugin not supported[%s]: driver only supports wal2json", slot.Plugin)
}

if slot.SlotType != "logical" {
return fmt.Errorf("only logical slots are supported: %s", slot.SlotType)
}

return nil
}
Loading
Loading