diff --git a/formats/avro/README.md b/formats/avro/README.md new file mode 100644 index 000000000..d15345650 --- /dev/null +++ b/formats/avro/README.md @@ -0,0 +1,4 @@ +# CloudEvents Avro Format + +This project provides functionality for the Java SDK to handle the +[avro format](https://github.com/cloudevents/spec/blob/v1.0.1/avro-format.md). diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml new file mode 100644 index 000000000..bd730296e --- /dev/null +++ b/formats/avro/pom.xml @@ -0,0 +1,121 @@ + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 2.3.0-SNAPSHOT + ../../pom.xml + + + cloudevents-avro + CloudEvents - Avro + jar + + + 1.9.2 + 3.2.0 + io.cloudevents.formats.avro + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + ${project.build.directory}/generated-sources/java/ + String + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build.helper.version} + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/java/ + + + + + + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + org.apache.avro + avro + ${avro.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + + diff --git a/formats/avro/src/main/avro/spec.avsc b/formats/avro/src/main/avro/spec.avsc new file mode 100644 index 000000000..8b4a76c6c --- /dev/null +++ b/formats/avro/src/main/avro/spec.avsc @@ -0,0 +1,63 @@ +{ + "namespace":"io.cloudevents.avro", + "type":"record", + "name":"AvroCloudEvent", + "version":"1.0", + "doc":"Avro Event Format for CloudEvents", + "fields":[ + { + "name":"attribute", + "type":{ + "type":"map", + "values":[ + "null", + "boolean", + "int", + "string", + "bytes" + ] + } + }, + { + "name": "data", + "type": [ + "bytes", + "null", + "boolean", + { + "type": "map", + "values": [ + "null", + "boolean", + { + "type": "record", + "name": "AvroCloudEventData", + "doc": "Representation of a JSON Value", + "fields": [ + { + "name": "value", + "type": { + "type": "map", + "values": [ + "null", + "boolean", + { "type": "map", "values": "AvroCloudEventData" }, + { "type": "array", "items": "AvroCloudEventData" }, + "double", + "string" + ] + } + } + ] + }, + "double", + "string" + ] + }, + { "type": "array", "items": "AvroCloudEventData" }, + "double", + "string" + ] + } + ] +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java new file mode 100644 index 000000000..5dec5ae57 --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import io.cloudevents.avro.AvroCloudEventData; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.format.EventDeserializationException; + +/** + * Encode JSON style cloudevent data into Avro format. + * + */ +public class AvroCloudEventDataWrapper implements CloudEventData { + + private final AvroCloudEventData avroCloudEventData; + + /** + * Wraps a JSON object-like data structure. + */ + public AvroCloudEventDataWrapper(Map data) { + avroCloudEventData = new AvroCloudEventData(); + avroCloudEventData.setValue(Objects.requireNonNull(data)); + } + + @Override + public byte[] toBytes() { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream()) { + AvroCloudEventData.getEncoder().encode(this.avroCloudEventData, bytes); + return bytes.toByteArray(); + } catch (IOException e) { + throw new EventDeserializationException(e); + } + } +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java new file mode 100644 index 000000000..d8690d001 --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -0,0 +1,83 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import java.util.Map; +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.OffsetDateTime; + +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.avro.AvroCloudEvent; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventReader; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; + +class AvroDeserializer implements CloudEventReader { + + private final AvroCloudEvent avroCloudEvent; + + public AvroDeserializer(AvroCloudEvent avroCloudEvent) { + this.avroCloudEvent = avroCloudEvent; + } + + @Override + public , R> R read(CloudEventWriterFactory writerFactory, + CloudEventDataMapper mapper) throws CloudEventRWException { + Map avroCloudEventAttrs = this.avroCloudEvent.getAttribute(); + SpecVersion specVersion = SpecVersion.parse((String)avroCloudEventAttrs.get(CloudEventV1.SPECVERSION)); + final CloudEventWriter writer = writerFactory.create(specVersion); + + for (Map.Entry entry: avroCloudEventAttrs.entrySet()) { + String key = entry.getKey().toString(); + + switch(key) { + case CloudEventV1.SPECVERSION: + continue; + case CloudEventV1.TIME: { + // OffsetDateTime + OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue()); + writer.withContextAttribute(key, value); + }; + case CloudEventV1.DATASCHEMA: { + // URI + URI value = URI.create((String) entry.getValue()); + writer.withContextAttribute(key, value); + }; + default: + writer.withContextAttribute(key, (String) entry.getValue()); + } + } + + ByteBuffer buffer = (ByteBuffer) this.avroCloudEvent.getData(); + + if (buffer != null) { + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); + return writer.end(mapper.map(BytesCloudEventData.wrap(data))); + } else { + return writer.end(); + } + } + +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java new file mode 100644 index 000000000..39fc774a9 --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.avro.AvroCloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.rw.CloudEventDataMapper; + +public class AvroFormat implements EventFormat { + + public static final String AVRO_CONTENT_TYPE = "application/avro"; + + @Override + public byte[] serialize(CloudEvent event) throws EventSerializationException { + AvroCloudEvent avroCloudEvent = AvroSerializer.toAvro(event); + + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + AvroCloudEvent.getEncoder().encode(avroCloudEvent, output); + return output.toByteArray(); + } catch (IOException e) { + throw new EventSerializationException(e); + } + } + + @Override + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) + throws EventDeserializationException { + try (ByteArrayInputStream input = new ByteArrayInputStream(bytes)) { + AvroCloudEvent avroCloudEvent = AvroCloudEvent.getDecoder().decode(input); + + return new AvroDeserializer(avroCloudEvent).read(CloudEventBuilder::fromSpecVersion, mapper); + } catch (IOException e) { + throw new EventDeserializationException(e); + } + } + + @Override + public String serializedContentType() { + return AVRO_CONTENT_TYPE; + } +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java new file mode 100644 index 000000000..67d34234e --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.avro.AvroCloudEvent; + +class AvroSerializer { + + static final AvroCloudEvent toAvro(CloudEvent e) { + AvroCloudEvent avroCloudEvent = new AvroCloudEvent(); + + Map attrs = new HashMap<>(); + + attrs.put(CloudEventV1.SPECVERSION, e.getSpecVersion().toString()); + attrs.put(CloudEventV1.TYPE, e.getType()); + attrs.put(CloudEventV1.ID, e.getId()); + attrs.put(CloudEventV1.SOURCE, e.getSource()); + + if (e.getTime() != null) { + // convert to string + attrs.put(CloudEventV1.TIME, e.getTime().toString()); + } + + if (e.getDataSchema() != null) { + // convert + attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString()); + } + + attrs.put(CloudEventV1.SUBJECT, e.getSubject()); + attrs.put(CloudEventV1.DATACONTENTTYPE, e.getDataContentType()); + + avroCloudEvent.setAttribute(attrs); + + // check datacontenttype + CloudEventData cloudEventData = e.getData(); + if (cloudEventData != null) { + avroCloudEvent.setData(ByteBuffer.wrap(cloudEventData.toBytes())); + } + + return avroCloudEvent; + } +} diff --git a/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat new file mode 100644 index 000000000..31cb85d6d --- /dev/null +++ b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat @@ -0,0 +1 @@ +io.cloudevents.avro.AvroFormat diff --git a/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java new file mode 100644 index 000000000..2e7de183b --- /dev/null +++ b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro; + +import java.util.Map; +import java.util.HashMap; +import java.net.URI; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + +public class AvroFormatTest { + + public static Map testData = new HashMap<>(); + + static { + testData.put("name", "Ning"); + testData.put("age", 22.0); + } + + @Test + public void testSerde() { + EventFormat avroFormat = new AvroFormat(); + CloudEventData cloudEventData = new AvroCloudEventDataWrapper(testData); + + assertThat(cloudEventData).isNotNull(); + assertThat(cloudEventData.toBytes()).isNotNull(); + + CloudEvent cloudEvent = CloudEventBuilder.v1() + .withId("1") + .withType("testdata") + .withSource(URI.create("http://localhost/test")) + .withData("application/avro", cloudEventData) + .build(); + assertThat(cloudEvent).isNotNull(); + assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1); + + byte[] bytes = avroFormat.serialize(cloudEvent); + + assertThat(bytes).isNotNull(); + assertThat(bytes).hasSizeGreaterThan(0); + + CloudEvent cloudEvent2 = avroFormat.deserialize(bytes); + + assertThat(cloudEvent2).isNotNull(); + assertThat(cloudEvent2.getId()).isEqualTo("1"); + assertThat(cloudEvent2.getType()).isEqualTo("testdata"); + } + + @Test + public void testV03Serde() { + EventFormat avroFormat = new AvroFormat(); + CloudEventData cloudEventData = new AvroCloudEventDataWrapper(testData); + + assertThat(cloudEventData).isNotNull(); + assertThat(cloudEventData.toBytes()).isNotNull(); + + CloudEvent cloudEvent = CloudEventBuilder.v03() + .withId("1") + .withType("testdata") + .withSource(URI.create("http://localhost/test")) + .withData("application/avro", cloudEventData) + .build(); + assertThat(cloudEvent).isNotNull(); + assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V03); + + byte[] bytes = avroFormat.serialize(cloudEvent); + + assertThat(bytes).isNotNull(); + assertThat(bytes).hasSizeGreaterThan(0); + + CloudEvent cloudEvent2 = avroFormat.deserialize(bytes); + + assertThat(cloudEvent2).isNotNull(); + assertThat(cloudEvent2.getId()).isEqualTo("1"); + assertThat(cloudEvent2.getType()).isEqualTo("testdata"); + } + +} diff --git a/pom.xml b/pom.xml index a124a66aa..4e8f961ed 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ core formats/json-jackson formats/protobuf + formats/avro amqp http/basic http/vertx