Skip to content

Commit

Permalink
enhance: Add default $ps & $timestamp env for scan expr
Browse files Browse the repository at this point in the history
Previously, scan-binlog command need to manually provide outputFields
for pk column even the pk column & ts column is always fetched.

This PR unifies `scan-binlog` & `scan-deltalog` command to have the
default `$pk` & `$timestamp` env value for filter expression.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jan 22, 2025
1 parent f65dfb2 commit 6bb1e91
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
4 changes: 3 additions & 1 deletion states/scan_binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara

err = s.scanBinlogs(pkObject, fieldObjects, func(pk storage.PrimaryKey, offset int, values map[int64]any) error {
pkv := pk.GetValue()
ts := values[1].(int64)
if !p.IgnoreDelete {
ts := values[1].(int64)
if deletedRecords[pkv] > uint64(ts) {
return nil
}
Expand All @@ -188,6 +188,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
env := lo.MapKeys(values, func(_ any, fid int64) string {
return fields[fid].Name
})
env["$pk"] = pkv
env["$timestamp"] = ts
program, err := expr.Compile(p.Expr, expr.Env(env))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions states/scan_deltalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func (s *InstanceState) ScanDeltalogCommand(ctx context.Context, p *ScanDeltalog
}()
if len(p.Expr) != 0 {
env := map[string]any{
"pk": pk.GetValue(),
"ts": ts,
"$pk": pk.GetValue(),
"$timestamp": ts,
}
program, err := expr.Compile(p.Expr, expr.Env(env))
if err != nil {
Expand Down

0 comments on commit 6bb1e91

Please sign in to comment.