Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL case sensitivity in listen #530

Merged
merged 14 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/giant-spiders-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

listen and unlisten case sensitivity behaviour aligned to default PostgreSQL behaviour'
5 changes: 4 additions & 1 deletion docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ const unsub = await pg.listen('test', (payload) => {
await pg.query("NOTIFY test, 'Hello, world!'")
```

Channel names are case sensitive if double-quoted (`pg.listen('"TeST"')`). Otherwise channel name will be lower cased (`pg.listen('TeStiNG')` == `pg.listen('testing')`).

### unlisten

`.unlisten(channel: string, callback?: (payload: string) => void): Promise<void>`
Expand Down Expand Up @@ -362,9 +364,10 @@ await pg.describeQuery('SELECT * FROM test WHERE name = $1', ['test'])
```

### clone

`.clone(): Promise<PGlite>`

Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test.
Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test.

## Properties

Expand Down
39 changes: 27 additions & 12 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import type {
PGliteOptions,
} from './interface.js'
import PostgresModFactory, { type PostgresMod } from './postgresMod.js'
import { getFsBundle, instantiateWasm, startWasmDownload } from './utils.js'
import {
getFsBundle,
instantiateWasm,
startWasmDownload,
toPostgresName,
} from './utils.js'

// Importing the source as the built version is not ESM compatible
import { Parser as ProtocolParser, serialize } from '@electric-sql/pg-protocol'
Expand Down Expand Up @@ -716,13 +721,22 @@ export class PGlite
* @param callback The callback to call when a notification is received
*/
async listen(channel: string, callback: (payload: string) => void) {
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set())
const pgChannel = toPostgresName(channel)
if (!this.#notifyListeners.has(pgChannel)) {
this.#notifyListeners.set(pgChannel, new Set())
}
this.#notifyListeners.get(pgChannel)!.add(callback)
try {
await this.exec(`LISTEN ${channel}`)
} catch (e) {
this.#notifyListeners.get(pgChannel)!.delete(callback)
if (this.#notifyListeners.get(pgChannel)?.size === 0) {
this.#notifyListeners.delete(pgChannel)
}
throw e
}
this.#notifyListeners.get(channel)!.add(callback)
await this.exec(`LISTEN "${channel}"`)
return async () => {
await this.unlisten(channel, callback)
await this.unlisten(pgChannel, callback)
}
}

Expand All @@ -732,15 +746,16 @@ export class PGlite
* @param callback The callback to remove
*/
async unlisten(channel: string, callback?: (payload: string) => void) {
const pgChannel = toPostgresName(channel)
if (callback) {
this.#notifyListeners.get(channel)?.delete(callback)
if (this.#notifyListeners.get(channel)?.size === 0) {
await this.exec(`UNLISTEN "${channel}"`)
this.#notifyListeners.delete(channel)
this.#notifyListeners.get(pgChannel)?.delete(callback)
if (this.#notifyListeners.get(pgChannel)?.size === 0) {
await this.exec(`UNLISTEN ${channel}`)
this.#notifyListeners.delete(pgChannel)
}
} else {
await this.exec(`UNLISTEN "${channel}"`)
this.#notifyListeners.delete(channel)
await this.exec(`UNLISTEN ${channel}`)
this.#notifyListeners.delete(pgChannel)
}
}

Expand Down
17 changes: 17 additions & 0 deletions packages/pglite/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,20 @@ export function debounceMutex<A extends any[], R>(
return promise
}
}

/**
* Postgresql handles quoted names as CaseSensitive and unquoted as lower case.
* If input is quoted, returns an unquoted string (same casing)
* If input is unquoted, returns a lower-case string
*/
export function toPostgresName(input: string): string {
let output
if (input.startsWith('"') && input.endsWith('"')) {
// Postgres sensitive case
output = input.substring(1, input.length - 1)
} else {
// Postgres case insensitive - all to lower
output = input.toLowerCase()
}
return output
}
Comment on lines +235 to +245
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear this breaks channel names with spaces.

You spotted in my original implementation I made a mistake, on the same thread I quoted the channel name, with the worker it was unquoted.

I think it's now apparent we need to decide between two options:

  1. Always quote the channel name - this forces it to be case sensitive, but means that channel names with spaces will always work: pg.listen('Channel With Spaces', ...)

  2. Never use quotes and user has to provide them to get the case sensitivity or use spaces:

  • pg.listen('"Channel With Spaces"', ...) works
  • pg.listen('Channel With Spaces', ...) throws an error

I'm not sure what the best option is.

For a reference point, Electric requires the user to provide the quotes to be case sensitive, and defaults to case insensitive.

Thinking about this now, I fear we may have a similar problem in the live.incrementalQuery api with the key column argument...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna take a moment to have a look at the possibilities. Will let you know.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reason to throw if no quotes provided:

ERROR:  syntax error at or near "channel"
LINE 1: LISTEN my channel;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I guess it's not only about spaces, but other characters as well - in an unquoted string.

ERROR:  syntax error at or near "&"
LINE 1: LISTEN my&channel;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think we should take the opportunity to consider what the correct pattern is here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More issues surfaced: if the provided channel name causes the db engine to throw for whatever reason, there's no cleanup of the #notifyListeners made in the frontend. This might be the case in other parts of the code as well.

13 changes: 7 additions & 6 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
} from '../interface.js'
import type { PGlite } from '../pglite.js'
import { BasePGlite } from '../base.js'
import { uuid } from '../utils.js'
import { toPostgresName, uuid } from '../utils.js'

export type PGliteWorkerOptions<E extends Extensions = Extensions> =
PGliteOptions<E> & {
Expand Down Expand Up @@ -359,14 +359,15 @@ export class PGliteWorker
channel: string,
callback: (payload: string) => void,
): Promise<() => Promise<void>> {
await this.waitReady
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set())
const pgChannel = toPostgresName(channel)

if (!this.#notifyListeners.has(pgChannel)) {
this.#notifyListeners.set(pgChannel, new Set())
}
this.#notifyListeners.get(channel)?.add(callback)
this.#notifyListeners.get(pgChannel)!.add(callback)
await this.exec(`LISTEN ${channel}`)
return async () => {
await this.unlisten(channel, callback)
await this.unlisten(pgChannel, callback)
}
}

Expand Down
127 changes: 126 additions & 1 deletion packages/pglite/tests/notify.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, it, expect } from 'vitest'
import { describe, it, expect, vi } from 'vitest'
import { PGlite } from '../dist/index.js'
import { expectToThrowAsync } from './test-utils.js'

describe('notify API', () => {
it('notify', async () => {
Expand Down Expand Up @@ -41,4 +42,128 @@ describe('notify API', () => {

await new Promise((resolve) => setTimeout(resolve, 1000))
})

it('check notify case sensitivity + special chars as Postgresql', async () => {
const pg = new PGlite()

const allLower1 = vi.fn()
await pg.listen('postgresdefaultlower', allLower1)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)

const autoLowerTest1 = vi.fn()
await pg.listen('PostgresDefaultLower', autoLowerTest1)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)

const autoLowerTest2 = vi.fn()
await pg.listen('PostgresDefaultLower', autoLowerTest2)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)

const autoLowerTest3 = vi.fn()
await pg.listen('postgresdefaultlower', autoLowerTest3)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)

const caseSensitive1 = vi.fn()
await pg.listen('"tesT2"', caseSensitive1)
await pg.query(`NOTIFY "tesT2", 'paYloAd2'`)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add a tests with spaces in the channel name:

    const quotedWithSpaces = vi.fn()
    await pg.listen('"Quoted Channel With Spaces"', quotedWithSpaces)
    await pg.query(`NOTIFY ""Quoted Channel With Spaces", 'payload1'`)


    const unquotedWithSpaces = vi.fn()
    await pg.listen('Unquoted Channel With Spaces', unquotedWithSpaces)
    // This one may need to throw?
    await pg.query(`NOTIFY "Unquoted Channel With Spaces", 'payload1'`)

const caseSensitive2 = vi.fn()
await pg.listen('"tesT3"', caseSensitive2)
await pg.query(`NOTIFY tesT3, 'paYloAd2'`)

const caseSensitive3 = vi.fn()
await pg.listen('testNotCalled2', caseSensitive3)
await pg.query(`NOTIFY "testNotCalled2", 'paYloAd2'`)

const quotedWithSpaces = vi.fn()
await pg.listen('"Quoted Channel With Spaces"', quotedWithSpaces)
await pg.query(`NOTIFY "Quoted Channel With Spaces", 'payload1'`)

const unquotedWithSpaces = vi.fn()
await expectToThrowAsync(
pg.listen('Unquoted Channel With Spaces', unquotedWithSpaces),
)
await expectToThrowAsync(
pg.query(`NOTIFY Unquoted Channel With Spaces, 'payload1'`),
)

const otherCharsWithQuotes = vi.fn()
await pg.listen('"test&me"', otherCharsWithQuotes)
await pg.query(`NOTIFY "test&me", 'paYloAd2'`)

const otherChars = vi.fn()
await expectToThrowAsync(pg.listen('test&me', otherChars))
await expectToThrowAsync(pg.query(`NOTIFY test&me, 'payload1'`))

expect(allLower1).toHaveBeenCalledTimes(4)
expect(autoLowerTest1).toHaveBeenCalledTimes(3)
expect(autoLowerTest2).toHaveBeenCalledTimes(2)
expect(autoLowerTest3).toHaveBeenCalledTimes(1)
expect(caseSensitive1).toHaveBeenCalledOnce()
expect(caseSensitive2).not.toHaveBeenCalled()
expect(caseSensitive3).not.toHaveBeenCalled()
expect(otherCharsWithQuotes).toHaveBeenCalledOnce()
expect(quotedWithSpaces).toHaveBeenCalledOnce()
expect(unquotedWithSpaces).not.toHaveBeenCalled()
})

it('check unlisten case sensitivity + special chars as Postgresql', async () => {
const pg = new PGlite()

const allLower1 = vi.fn()
{
const unsub1 = await pg.listen('postgresdefaultlower', allLower1)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)
await unsub1()
}

const autoLowerTest1 = vi.fn()
{
const unsub2 = await pg.listen('PostgresDefaultLower', autoLowerTest1)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)
await unsub2()
}

const autoLowerTest2 = vi.fn()
{
const unsub3 = await pg.listen('PostgresDefaultLower', autoLowerTest2)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)
await unsub3()
}

const autoLowerTest3 = vi.fn()
{
const unsub4 = await pg.listen('postgresdefaultlower', autoLowerTest3)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)
await unsub4()
}

const caseSensitive1 = vi.fn()
{
await pg.listen('"CaSESEnsiTIvE"', caseSensitive1)
await pg.query(`NOTIFY "CaSESEnsiTIvE", 'payload1'`)
await pg.unlisten('"CaSESEnsiTIvE"')
await pg.query(`NOTIFY "CaSESEnsiTIvE", 'payload1'`)
}

const quotedWithSpaces = vi.fn()
{
await pg.listen('"Quoted Channel With Spaces"', quotedWithSpaces)
await pg.query(`NOTIFY "Quoted Channel With Spaces", 'payload1'`)
await pg.unlisten('"Quoted Channel With Spaces"')
}

const otherCharsWithQuotes = vi.fn()
{
await pg.listen('"test&me"', otherCharsWithQuotes)
await pg.query(`NOTIFY "test&me", 'payload'`)
await pg.unlisten('"test&me"')
}

expect(allLower1).toHaveBeenCalledOnce()
expect(autoLowerTest1).toHaveBeenCalledOnce()
expect(autoLowerTest2).toHaveBeenCalledOnce()
expect(autoLowerTest3).toHaveBeenCalledOnce()
expect(caseSensitive1).toHaveBeenCalledOnce()
expect(otherCharsWithQuotes).toHaveBeenCalledOnce()
})
})