-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adds support for streaming multi tables #40
Conversation
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
Signed-off-by: Bruno Calza <brunoangelicalza@gmail.com>
return exists, nil | ||
} | ||
// DatabaseStreamSetup setups the database for streaming. | ||
type DatabaseStreamSetup struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created this object to make it easier to setup the database for replication. it can:
- create the publication in Postgres
- fetch the schemas of the tables in Postgres so that we can create a similar table in Duckdb
@@ -32,8 +32,8 @@ import ( | |||
var vaultNameRx = regexp.MustCompile(`^([a-zA-Z_][a-zA-Z0-9_]*)[.]([a-zA-Z_][a-zA-Z0-9_]*$)`) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lots of changes in this file, but in essence:
- moved the flags from
create
tostream
- we're now passing multiple tables and schemas to the database manager
// NewDBManager creates a new DBManager. | ||
func NewDBManager( | ||
dbDir, table string, cols []Column, windowInterval time.Duration, uploader *VaultsUploader, | ||
dbDir string, schemas []TableSchema, windowInterval time.Duration, uploader *VaultsUploader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing multiple tables and schemas now
return true, nil | ||
} | ||
exportedFiles := []string{} | ||
for _, schema := range dbm.schemas { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for each schema we check if the table has rows, if not we will not export it
@@ -214,13 +225,13 @@ func (dbm *DBManager) UploadAll(ctx context.Context) error { | |||
if re.MatchString(fname) { | |||
dbPath := path.Join(dbm.dbDir, fname) | |||
exportAt := dbPath + ".parquet" | |||
isEmpty, err := dbm.Export(ctx, exportAt) | |||
files, err := dbm.Export(ctx, exportAt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only exported files will be returned
|
||
recordVals := []string{} | ||
// build an insert stmt for each record inside tx | ||
stmts := []string{} | ||
for _, r := range tx.Records { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a tx can have records of different tables now, we gotta consider this
if column.IsPrimary { | ||
pks = column.Name | ||
stmts := []string{} | ||
for _, schema := range dbm.schemas { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we gotta create multiple tables now, one for each schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
) *DBManager { | ||
return &DBManager{ | ||
dbDir: dbDir, | ||
table: table, | ||
cols: cols, | ||
schemas: schemas, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -156,7 +133,8 @@ func newVaultCreateCommand() *cli.Command { | |||
} | |||
|
|||
func newStreamCommand() *cli.Command { | |||
var privateKey string | |||
var privateKey, dburi, tables string | |||
var winSize int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should winSize
be configurable per table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think there's a good reason for it? we could, but we would need an export gouroutine per table which sounds more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, def sounds complicated for now. maybe @dtbuchholz can tell if people have asked for it or likely to ask for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed—i think a single window size makes sense as a first feature. when i stream multiple tables, I'm essentially just trying to replicate the full db, so one window size is sufficient.
multiple windows could be useful, i suppose, but that's something we can wait on to hear directly from a dev. personally, i don't think i'd need it unless i was trying to really optimize for something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Summary
This PR adds support for streaming changes of multiple Postgres tables at the same time
Context
When you created a vault and started streaming database changes to that vault, only the changes from one table could be streamed. The reason for that was that the table was picked from the vault's name. For example, if the vault was called
demo.test
, the CLI assumed that thetest
would be the table. That meant that if you wanted to stream multiple tables, you would have to start multiple CLI processes, which is not idealThis PR changes this behavior, to allow multiple tables to be picked and its changes streamed to a particular vault, all in a simple
vaults stream
call.Changes
vault
it simply creates the vault in the provider and smart contract. That meansvaults create
command doesn't acceptdburi
flag anymore, because there's no need to connect to the database at this point.dburi
andtables
flag to thevaults stream
command. Thetables
value is a list of tables separated by comma (e.g.t1,t2
). You can also provide thewindow-size
flag.Usage
Creating a vault
Streaming changes
Suppose you want to stream tables
t1
andt2
to vaultdemo.test
, this is the command:Note: There's one caveat with the current implementation. Once a
stream
command is called for a particularvault
with a particulardburi
andtables
there's no way of changing that. For example, suppose that now you want to add another table, that wouldn't work. I wonder what would be the ideal flow here.