Skip to content

A Go (Golang) client for Postgres Message Queue (PGMQ)

License

Notifications You must be signed in to change notification settings

craigpastro/pgmq-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pgmq-go

Go Reference Go Report Card CI codecov

A Go (Golang) client for Postgres Message Queue (PGMQ). Based loosely on the Rust client.

pgmq-go works with pgx. The second argument of most functions only needs to satisfy the DB interface, which means it can take, among others, a *pgx.Conn, *pgxpool.Pool, or pgx.Tx.

Usage

Start a Postgres instance with the PGMQ extension installed:

docker run -d --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 quay.io/tembo/pgmq-pg:latest

Then

package main

import (
    "context"
    "fmt"

    "github.com/craigpastro/pgmq-go"
)

func main() {
    ctx := context.Background()

    pool, err := pgmq.NewPgxPool(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        panic(err)
    }

    err = pgmq.CreatePGMQExtension(ctx, pool)
    if err != nil {
        panic(err)
    }

    err = pgmq.CreateQueue(ctx, pool, "my_queue")
    if err != nil {
        panic(err)
    }

    // We can perform various queue operations using a transaction.
    tx, err := pool.Begin(ctx)
    if err != nil {
        panic(err)
    }

    id, err := pgmq.Send(ctx, tx, "my_queue", json.RawMessage(`{"foo": "bar"}`))
    if err != nil {
        panic(err)
    }

    msg, err := pgmq.Read(ctx, tx, "my_queue", 30)
    if err != nil {
        panic(err)
    }

    // Archive the message by moving it to the "pgmq.a_<queue_name>" table.
    // Alternatively, you can `Delete` the message, or read and delete in one
    // call by using `Pop`.
    _, err = pgmq.Archive(ctx, tx, "my_queue", id)
    if err != nil {
        panic(err)
    }

    // Commit the transaction.
    err = tx.Commit(ctx)
    if err != nil {
        panic(err)
    }

    // Close the connection pool.
    pool.Close()
}

Contributions

We ❤️ contributions.

See also