Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unambiguous JSON (from/to) #283

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,43 @@ func NewCodecForStandardJSONFull(schemaSpecification string) (*Codec, error) {
})
}

// NewCodecForUnambiguousJSON provides full serialization/deserialization
// for json that is unambiguous in terms of what the field will contain.
// This means that avro Union types containing only a single concrete type
// e.g. ["null", "string"] no longer have to specify their type. Unlike
// NewCodecForStandardJSONFull, ambiguous types ["int", "string"] do still
// need to specify their type as map. See the following examples:
//
// ["null", "string"] => "some string" || null
// ["int", "string"] => {"int": 1} || {"string": "some string"}
// ["null", "int", "string"] => null || {"int": 1} || {"string": "some string"}
//
// this is especially useful when using json.Marshal with structs containing
// optional types:
//
// type Person struct {
// Name *string `json:"name,omitempty"`
// }
//
// or using json.Marshal with structs containing a union:
//
// type Message struct {
// Direction DirectionUnion `json:DirectionUnion"
// }
//
// type DirectionUnion struct { // only one of the fields can be non-nil
//
// Request *string `json:"request,omitempty"`
// Response *string `json:"response,omitempty"`
// }
func NewCodecForUnambiguousJSON(schemaSpecification string) (*Codec, error) {
return NewCodecFrom(schemaSpecification, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceUnambiguousJSON,
})
}

func NewCodecFrom(schemaSpecification string, cb *codecBuilder) (*Codec, error) {
var schema interface{}

Expand Down
153 changes: 139 additions & 14 deletions union.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,38 @@ import (
// codecInfo is a set of quick lookups it holds all the lookup info for the
// all the schemas we need to handle the list of types for this union
type codecInfo struct {
allowedTypes []string
codecFromIndex []*Codec
codecFromName map[string]*Codec
indexFromName map[string]int
allowedTypes []string
codecFromIndex []*Codec
codecFromName map[string]*Codec
indexFromName map[string]int
unambiguousMode bool
}

// isNullable returns if the "null" type is one of the registered types
func (cr codecInfo) isNullable() bool {
_, nullable := cr.indexFromName["null"]
return nullable
}

// numConcreteTypes returns the number of concrete types (not "null") specified to the codec
func (cr codecInfo) numConcreteTypes() int {
_, nullable := cr.indexFromName["null"]
numConcreteTypes := len(cr.allowedTypes)
if nullable {
numConcreteTypes -= 1
}
return numConcreteTypes
}

// firstConcreteTypeCodec returns the first non-null codec
func (cr codecInfo) firstConcreteTypeCodec() *Codec {
for k, v := range cr.codecFromName {
if k == "null" {
continue
}
return v
}
return nil
}

// Union wraps a datum value in a map for encoding as a Union, as required by
Expand Down Expand Up @@ -124,6 +152,13 @@ func unionBinaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([
}
return longBinaryFromNative(buf, index)
case map[string]interface{}:
if cr.unambiguousMode && cr.isNullable() && cr.numConcreteTypes() == 1 {
c := cr.firstConcreteTypeCodec()
index := cr.indexFromName[c.typeName.fullName]
buf, _ = longBinaryFromNative(buf, index)
return c.binaryFromNative(buf, datum)
}

if len(v) != 1 {
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
Expand All @@ -138,6 +173,14 @@ func unionBinaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([
return c.binaryFromNative(buf, value)
}
}

if cr.unambiguousMode && cr.isNullable() && cr.numConcreteTypes() == 1 {
c := cr.firstConcreteTypeCodec()
index := cr.indexFromName[c.typeName.fullName]
buf, _ = longBinaryFromNative(buf, index)
return c.binaryFromNative(buf, datum)
}

return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
Expand All @@ -163,8 +206,7 @@ func unionTextualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) (
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
if !cr.isNullable() {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
Expand All @@ -178,19 +220,24 @@ func unionTextualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) (
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
buf = append(buf, '{')
var err error
buf, err = stringTextualFromNative(buf, key)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
if !cr.unambiguousMode || cr.numConcreteTypes() > 1 {
buf = append(buf, '{')
buf, err = stringTextualFromNative(buf, key)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
buf = append(buf, ':')
}
buf = append(buf, ':')
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return append(buf, '}'), nil
if !cr.unambiguousMode || cr.numConcreteTypes() > 1 {
buf = append(buf, '}')
}
return buf, nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
Expand All @@ -200,8 +247,7 @@ func textualJSONFromNativeAvro(cr *codecInfo) func(buf []byte, datum interface{}
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
if !cr.isNullable() {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
Expand Down Expand Up @@ -301,6 +347,32 @@ func buildCodecForTypeDescribedBySliceOneWayJSON(st map[string]*Codec, enclosing
}
return rv, nil
}
func buildCodecForTypeDescribedBySliceUnambiguousJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}

cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
cr.unambiguousMode = true
if err != nil {
return nil, err
}

rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJSON(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}

func buildCodecForTypeDescribedBySliceTwoWayJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
Expand Down Expand Up @@ -340,6 +412,11 @@ func checkAll(allowedTypes []string, cr *codecInfo, buf []byte) (interface{}, []
if err != nil {
continue
}

// in unambiguous mode, don't return the type if only a single concrete type is registered
if cr.unambiguousMode && cr.numConcreteTypes() == 1 {
return rv, rb, nil
}
return map[string]interface{}{name: rv}, rb, nil
}
return nil, buf, fmt.Errorf("could not decode any json data in input %v", string(buf))
Expand Down Expand Up @@ -405,11 +482,59 @@ func nativeAvroFromTextualJSON(cr *codecInfo) func(buf []byte) (interface{}, []b
sort.Strings(cr.allowedTypes)

case map[string]interface{}:
if cr.unambiguousMode && cr.numConcreteTypes() > 1 {
asmap, ok := m.(map[string]interface{}) // we know this cast cannot fail
if !ok || len(asmap) != 1 {
return nil, buf, fmt.Errorf("expected map with a single key, got: %v", string(buf))
}

var name string
var value []byte
for _name, _value := range asmap {
name = _name
var err error
value, err = json.Marshal(_value)
if err != nil {
return nil, buf, fmt.Errorf("could not read value of type as []byte: %v", _value)
}
}

index, ok := cr.indexFromName[name]
if !ok {
return nil, buf, fmt.Errorf("invalid type: %v", name)
}

c := cr.codecFromIndex[index]
rv, _, err := c.NativeFromTextual(value)
if err != nil {
return nil, buf, fmt.Errorf("could not decode json data in input: %v: %v", string(buf), err)
}
return map[string]interface{}{name: rv}, buf[dec.InputOffset():], nil
}

// try to decode it as a map
// because a map should fail faster than a record
// if that fails assume record and return it
sort.Strings(cr.allowedTypes)
case interface{}:
// if running in unambiguous mode, allow a nullable (NULL, T) type to be checked
if cr.unambiguousMode && cr.numConcreteTypes() == 2 {
// get T
var index int
for _key, _index := range cr.indexFromName {
if _key != "null" {
index = _index
break
}
}

c := cr.codecFromIndex[index]
rv, _, err := c.NativeFromTextual(buf)
if err != nil {
return nil, buf, fmt.Errorf("could not decode json data in input: %v: %v", string(buf), err)
}
return rv, buf[dec.InputOffset():], nil
}
}

return checkAll(allowedTypes, cr, buf)
Expand Down
Loading