Skip to content

Commit

Permalink
engine: update advisory lock handling and add locked transaction supp…
Browse files Browse the repository at this point in the history
…ort (#4152)

* engine: update advisory lock handling and add locked transaction support

* engine/message: update advisory lock query to include global message sending
  • Loading branch information
mastercactapus authored Nov 19, 2024
1 parent f77d3fd commit 45fed57
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
10 changes: 8 additions & 2 deletions engine/message/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewDB(ctx context.Context, db *sql.DB, a *alertlog.Store, pausable lifecycl

sentMessages: make(map[string]Message),

advLock: p.P(`select pg_advisory_lock($1)`),
advLock: p.P(`select pg_try_advisory_lock($1)`),
advLockCleanup: p.P(`
select pg_terminate_backend(lock.pid)
from pg_locks lock
Expand Down Expand Up @@ -528,10 +528,16 @@ func (db *DB) _SendMessages(ctx context.Context, send SendFunc, status StatusFun
}
defer cLock.Close()

_, err = cLock.Exec(execCtx, db.advLock, lock.GlobalMessageSending)
var gotLock bool
err = cLock.WithTx(execCtx, func(tx *sql.Tx) error {
return tx.StmtContext(execCtx, db.advLock).QueryRowContext(execCtx, lock.GlobalMessageSending).Scan(&gotLock)
})
if err != nil {
return errors.Wrap(err, "acquire global sending advisory lock")
}
if !gotLock {
return processinglock.ErrNoLock
}
defer func() {
_, _ = cLock.ExecWithoutLock(log.FromContext(execCtx).BackgroundContext(), `select pg_advisory_unlock(4912)`)
}()
Expand Down
20 changes: 20 additions & 0 deletions engine/processinglock/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"sync"

"github.com/target/goalert/util/sqlutil"
)

// Conn allows using locked transactions over a single connection.
Expand Down Expand Up @@ -41,6 +43,24 @@ func (c *Conn) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error
return c.l._BeginTx(ctx, c.conn, opts)
}

// WithTx will run the given function in a locked transaction.
func (c *Conn) WithTx(ctx context.Context, txFn func(tx *sql.Tx) error) error {
c.mx.Lock()
defer c.mx.Unlock()
tx, err := c.l._BeginTx(ctx, c.conn, nil)
if err != nil {
return err
}
defer sqlutil.Rollback(ctx, "rollback tx", tx)

err = txFn(tx)
if err != nil {
return err
}

return tx.Commit()
}

// Exec will call ExecContext on the statement wrapped in a locked transaction.
func (c *Conn) Exec(ctx context.Context, stmt *sql.Stmt, args ...interface{}) (sql.Result, error) {
c.mx.Lock()
Expand Down

0 comments on commit 45fed57

Please sign in to comment.