Skip to content

Commit

Permalink
feat: no schema in live mode (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp authored Aug 1, 2024
1 parent 358e0ab commit 06e843c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changeset/flat-tigers-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/next": patch
"@core/sync-service": patch
---

Only include schema in header of responses to non-live requests.
10 changes: 9 additions & 1 deletion packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ defmodule Electric.Plug.ServeShapePlug do
plug :put_resp_content_type, "application/json"
plug :validate_query_params
plug :load_shape_info
plug :put_schema_header

# We're starting listening as soon as possible to not miss stuff that was added since we've asked for last offset
plug :listen_for_new_changes
Expand Down Expand Up @@ -154,7 +155,6 @@ defmodule Electric.Plug.ServeShapePlug do
|> assign(:last_offset, last_offset)
|> put_resp_header("x-electric-shape-id", shape_id)
|> put_resp_header("x-electric-chunk-last-offset", "#{last_offset}")
|> put_resp_header("x-electric-schema", schema(shape))
end

defp schema(shape) do
Expand All @@ -165,6 +165,14 @@ defmodule Electric.Plug.ServeShapePlug do
|> Jason.encode!()
end

# Only adds schema header when not in live mode
defp put_schema_header(conn, _) when not conn.assigns.live do
shape = conn.assigns.shape_definition
put_resp_header(conn, "x-electric-schema", schema(shape))
end

defp put_schema_header(conn, _), do: conn

# If the offset requested is -1, noop as we can always serve it
def validate_shape_offset(%Conn{assigns: %{offset: @before_all_offset}} = conn, _) do
# noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Plug.Conn.get_resp_header(conn, "pragma") == ["no-cache"]
assert Plug.Conn.get_resp_header(conn, "expires") == ["0"]
assert Plug.Conn.get_resp_header(conn, "x-electric-chunk-last-offset") == [next_offset_str]
assert Plug.Conn.get_resp_header(conn, "x-electric-schema") == []
end

test "handles shape rotation" do
Expand Down
13 changes: 9 additions & 4 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ArgumentsType } from 'vitest'
import { Message, Value, Offset } from './types'
import { Message, Value, Offset, Schema } from './types'
import { MessageParser, Parser } from './parser'

export type ShapeData = Map<string, { [key: string]: Value }>
Expand Down Expand Up @@ -169,6 +169,7 @@ export class ShapeStream {
private options: ShapeStreamOptions
private backoffOptions: BackoffOptions
private fetchClient: typeof fetch
private schema?: Schema

private subscribers = new Map<
number,
Expand Down Expand Up @@ -255,12 +256,15 @@ export class ShapeStream {
this.lastOffset = lastOffset as Offset
}

const schemaHeader = headers.get(`X-Electric-Schema`)!
const schema = schemaHeader ? JSON.parse(schemaHeader) : {}
const getSchema = (): Schema => {
const schemaHeader = headers.get(`X-Electric-Schema`)
return schemaHeader ? JSON.parse(schemaHeader) : {}
}
this.schema = this.schema ?? getSchema()

const messages = status === 204 ? `[]` : await response.text()

const batch = this.messageParser.parse(messages, schema)
const batch = this.messageParser.parse(messages, this.schema)

// Update isUpToDate
if (batch.length > 0) {
Expand Down Expand Up @@ -346,6 +350,7 @@ export class ShapeStream {
this.lastOffset = `-1`
this.shapeId = shapeId
this.isUpToDate = false
this.schema = undefined
}

private validateOptions(options: ShapeStreamOptions): void {
Expand Down

0 comments on commit 06e843c

Please sign in to comment.