Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (client): extensible value type for use with custom parsers #1791

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pink-rivers-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Make parser generic such that it can be parameterized with additional types supported by custom parsers.
2 changes: 1 addition & 1 deletion examples/tanstack-example/src/Example.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const Example = () => {
const { data: items } = useShape<Item>(itemShape())
const submissions: Item[] = useMutationState({
filters: { status: `pending` },
select: (mutation) => mutation.state.context as Item | undefined,
select: (mutation) => mutation.state.context as Item,
}).filter((item) => item !== undefined)

const { mutateAsync: addItemMut } = useMutation({
Expand Down
2 changes: 1 addition & 1 deletion examples/tanstack-example/src/match-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isChangeMessage,
} from "@electric-sql/client"

export async function matchStream<T extends Row>({
export async function matchStream<T extends Row<unknown>>({
stream,
operations,
matchFn,
Expand Down
43 changes: 28 additions & 15 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@ import {
ShapeStream,
ShapeStreamOptions,
Row,
GetExtensions,
} from '@electric-sql/client'
import React from 'react'
import { useSyncExternalStoreWithSelector } from 'use-sync-external-store/with-selector.js'

const streamCache = new Map<string, ShapeStream>()
const shapeCache = new Map<ShapeStream, Shape>()
type UnknownShape = Shape<Row<unknown>>
type UnknownShapeStream = ShapeStream<Row<unknown>>

export async function preloadShape<T extends Row = Row>(
options: ShapeStreamOptions
const streamCache = new Map<string, UnknownShapeStream>()
const shapeCache = new Map<UnknownShapeStream, UnknownShape>()

export async function preloadShape<T extends Row<unknown> = Row>(
options: ShapeStreamOptions<GetExtensions<T>>
): Promise<Shape<T>> {
const shapeStream = getShapeStream<T>(options)
const shape = getShape<T>(shapeStream)
await shape.value
return shape
}

export function sortedOptionsHash(options: ShapeStreamOptions): string {
export function sortedOptionsHash<T>(options: ShapeStreamOptions<T>): string {
return JSON.stringify(options, Object.keys(options).sort())
}

export function getShapeStream<T extends Row = Row>(
options: ShapeStreamOptions
export function getShapeStream<T extends Row<unknown>>(
options: ShapeStreamOptions<GetExtensions<T>>
): ShapeStream<T> {
const shapeHash = sortedOptionsHash(options)

Expand All @@ -42,7 +46,9 @@ export function getShapeStream<T extends Row = Row>(
}
}

export function getShape<T extends Row>(shapeStream: ShapeStream<T>): Shape<T> {
export function getShape<T extends Row<unknown>>(
shapeStream: ShapeStream<T>
): Shape<T> {
// If the stream is already cached, return
if (shapeCache.has(shapeStream)) {
// Return the ShapeStream
Expand All @@ -57,7 +63,7 @@ export function getShape<T extends Row>(shapeStream: ShapeStream<T>): Shape<T> {
}
}

export interface UseShapeResult<T extends Row = Row> {
export interface UseShapeResult<T extends Row<unknown> = Row> {
/**
* The array of rows that make up the Shape.
* @type {T[]}
Expand All @@ -76,14 +82,19 @@ export interface UseShapeResult<T extends Row = Row> {
isError: boolean
}

function shapeSubscribe<T extends Row>(shape: Shape<T>, callback: () => void) {
function shapeSubscribe<T extends Row<unknown>>(
shape: Shape<T>,
callback: () => void
) {
const unsubscribe = shape.subscribe(callback)
return () => {
unsubscribe()
}
}

function parseShapeData<T extends Row>(shape: Shape<T>): UseShapeResult<T> {
function parseShapeData<T extends Row<unknown>>(
shape: Shape<T>
): UseShapeResult<T> {
return {
data: [...shape.valueSync.values()],
isLoading: shape.isLoading(),
Expand All @@ -98,19 +109,21 @@ function identity<T>(arg: T): T {
return arg
}

interface UseShapeOptions<SourceData extends Row, Selection>
extends ShapeStreamOptions {
interface UseShapeOptions<SourceData extends Row<unknown>, Selection>
extends ShapeStreamOptions<GetExtensions<SourceData>> {
selector?: (value: UseShapeResult<SourceData>) => Selection
}

export function useShape<
SourceData extends Row = Row,
SourceData extends Row<unknown> = Row,
Selection = UseShapeResult<SourceData>,
>({
selector = identity as (arg: UseShapeResult<SourceData>) => Selection,
...options
}: UseShapeOptions<SourceData, Selection>): Selection {
const shapeStream = getShapeStream<SourceData>(options as ShapeStreamOptions)
const shapeStream = getShapeStream<SourceData>(
options as ShapeStreamOptions<GetExtensions<SourceData>>
)
const shape = getShape<SourceData>(shapeStream)

const useShapeData = React.useMemo(() => {
Expand Down
3 changes: 3 additions & 0 deletions packages/react-hooks/test/react-hooks.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ describe(`useShape`, () => {
foo: number
bar: boolean
baz: string
ts: Date
}

it(`should infer correct return type when a selector is provided`, () => {
Expand All @@ -25,6 +26,7 @@ describe(`useShape`, () => {
foo: 5,
bar: true,
baz: `str`,
ts: new Date(),
}
},
})
Expand All @@ -41,6 +43,7 @@ describe(`useShape`, () => {
foo: 5,
bar: true,
baz: `str`,
ts: new Date(),
}
},
})
Expand Down
23 changes: 15 additions & 8 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { Message, Offset, Schema, Row, MaybePromise } from './types'
import {
Message,
Offset,
Schema,
Row,
MaybePromise,
GetExtensions,
} from './types'
import { MessageParser, Parser } from './parser'
import { isUpToDateMessage } from './helpers'
import { FetchError, FetchBackoffAbortError } from './error'
Expand All @@ -21,7 +28,7 @@ import {
/**
* Options for constructing a ShapeStream.
*/
export interface ShapeStreamOptions {
export interface ShapeStreamOptions<T = never> {
/**
* The full URL to where the Shape is hosted. This can either be the Electric server
* directly or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape/foo`
Expand Down Expand Up @@ -53,10 +60,10 @@ export interface ShapeStreamOptions {
subscribe?: boolean
signal?: AbortSignal
fetchClient?: typeof fetch
parser?: Parser
parser?: Parser<T>
}

export interface ShapeStreamInterface<T extends Row = Row> {
export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
subscribe(
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
Expand Down Expand Up @@ -108,10 +115,10 @@ export interface ShapeStreamInterface<T extends Row = Row> {
* ```
*/

export class ShapeStream<T extends Row = Row>
export class ShapeStream<T extends Row<unknown> = Row>
implements ShapeStreamInterface<T>
{
readonly options: ShapeStreamOptions
readonly options: ShapeStreamOptions<GetExtensions<T>>

readonly #fetchClient: typeof fetch
readonly #messageParser: MessageParser<T>
Expand All @@ -135,7 +142,7 @@ export class ShapeStream<T extends Row = Row>
#shapeId?: string
#schema?: Schema

constructor(options: ShapeStreamOptions) {
constructor(options: ShapeStreamOptions<GetExtensions<T>>) {
validateOptions(options)
this.options = { subscribe: true, ...options }
this.#lastOffset = this.options.offset ?? `-1`
Expand Down Expand Up @@ -366,7 +373,7 @@ export class ShapeStream<T extends Row = Row>
}
}

function validateOptions(options: Partial<ShapeStreamOptions>): void {
function validateOptions<T>(options: Partial<ShapeStreamOptions<T>>): void {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`)
}
Expand Down
6 changes: 3 additions & 3 deletions packages/typescript-client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { ChangeMessage, ControlMessage, Message, Row } from './types'
* }
* ```
*/
export function isChangeMessage<T extends Row = Row>(
export function isChangeMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ChangeMessage<T> {
return `key` in message
Expand All @@ -40,13 +40,13 @@ export function isChangeMessage<T extends Row = Row>(
* }
* ```
*/
export function isControlMessage<T extends Row = Row>(
export function isControlMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ControlMessage {
return !isChangeMessage(message)
}

export function isUpToDateMessage<T extends Row = Row>(
export function isUpToDateMessage<T extends Row<unknown> = Row>(
message: Message<T>
): message is ControlMessage & { up_to_date: true } {
return isControlMessage(message) && message.headers.control === `up-to-date`
Expand Down
45 changes: 29 additions & 16 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import { ColumnInfo, Message, Row, Schema, Value } from './types'
import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types'

type NullToken = null | `NULL`
type Token = Exclude<string, NullToken>
type NullableToken = Token | NullToken
export type ParseFunction = (
export type ParseFunction<Extensions = never> = (
value: Token,
additionalInfo?: Omit<ColumnInfo, `type` | `dims`>
) => Value
type NullableParseFunction = (
) => Value<Extensions>
type NullableParseFunction<Extensions = never> = (
value: NullableToken,
additionalInfo?: Omit<ColumnInfo, `type` | `dims`>
) => Value
export type Parser = { [key: string]: ParseFunction }
) => Value<Extensions>
/**
* @typeParam Extensions - Additional types that can be parsed by this parser beyond the standard SQL types.
* Defaults to no additional types.
*/
export type Parser<Extensions = never> = {
[key: string]: ParseFunction<Extensions>
}

const parseNumber = (value: string) => Number(value)
const parseBool = (value: string) => value === `true` || value === `t`
Expand All @@ -31,15 +37,18 @@ export const defaultParser: Parser = {
}

// Taken from: https://github.com/electric-sql/pglite/blob/main/packages/pglite/src/types.ts#L233-L279
export function pgArrayParser(value: Token, parser?: ParseFunction): Value {
export function pgArrayParser<Extensions>(
value: Token,
parser?: ParseFunction<Extensions>
): Value<Extensions> {
let i = 0
let char = null
let str = ``
let quoted = false
let last = 0
let p: string | undefined = undefined

function loop(x: string): Value[] {
function loop(x: string): Array<Value<Extensions>> {
const xs = []
for (; i < x.length; i++) {
char = x[i]
Expand Down Expand Up @@ -79,9 +88,9 @@ export function pgArrayParser(value: Token, parser?: ParseFunction): Value {
return loop(value)[0]
}

export class MessageParser<T extends Row> {
private parser: Parser
constructor(parser?: Parser) {
export class MessageParser<T extends Row<unknown>> {
private parser: Parser<GetExtensions<T>>
constructor(parser?: Parser<GetExtensions<T>>) {
// Merge the provided parser with the default parser
// to use the provided parser whenever defined
// and otherwise fall back to the default parser
Expand All @@ -96,7 +105,7 @@ export class MessageParser<T extends Row> {
// But `typeof null === 'object'` so we need to make an explicit check.
if (key === `value` && typeof value === `object` && value !== null) {
// Parse the row values
const row = value as Record<string, Value>
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})
Expand All @@ -106,7 +115,11 @@ export class MessageParser<T extends Row> {
}

// Parses the message values using the provided parser based on the schema information
private parseRow(key: string, value: NullableToken, schema: Schema): Value {
private parseRow(
key: string,
value: NullableToken,
schema: Schema
): Value<GetExtensions<T>> {
const columnInfo = schema[key]
if (!columnInfo) {
// We don't have information about the value
Expand Down Expand Up @@ -137,11 +150,11 @@ export class MessageParser<T extends Row> {
}
}

function makeNullableParser(
parser: ParseFunction,
function makeNullableParser<Extensions>(
parser: ParseFunction<Extensions>,
columnInfo: ColumnInfo,
columnName?: string
): NullableParseFunction {
): NullableParseFunction<Extensions> {
const isNullable = !(columnInfo.not_null ?? false)
// The sync service contains `null` value for a column whose value is NULL
// but if the column value is an array that contains a NULL value
Expand Down
6 changes: 3 additions & 3 deletions packages/typescript-client/src/shape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { isChangeMessage, isControlMessage } from './helpers'
import { FetchError } from './error'
import { ShapeStreamInterface } from './client'

export type ShapeData<T extends Row = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row = Row> = (
export type ShapeData<T extends Row<unknown> = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row<unknown> = Row> = (
value: ShapeData<T>
) => void

Expand Down Expand Up @@ -39,7 +39,7 @@ export type ShapeChangedCallback<T extends Row = Row> = (
* console.log(shapeData)
* })
*/
export class Shape<T extends Row = Row> {
export class Shape<T extends Row<unknown> = Row> {
readonly #stream: ShapeStreamInterface<T>

readonly #data: ShapeData<T> = new Map()
Expand Down
Loading
Loading