+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package io.cloudevents.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import io.cloudevents.avro.AvroCloudEventData;
+import io.cloudevents.CloudEventData;
+import io.cloudevents.core.format.EventDeserializationException;
+
+/**
+ * Encode JSON style cloudevent data into Avro format.
+ *
+ */
+public class AvroCloudEventDataWrapper implements CloudEventData {
+
+ private final AvroCloudEventData avroCloudEventData;
+
+ /**
+ * Wraps a JSON object-like data structure.
+ */
+ public AvroCloudEventDataWrapper(Map data) {
+ avroCloudEventData = new AvroCloudEventData();
+ avroCloudEventData.setValue(Objects.requireNonNull(data));
+ }
+
+ @Override
+ public byte[] toBytes() {
+ try (ByteArrayOutputStream bytes = new ByteArrayOutputStream()) {
+ AvroCloudEventData.getEncoder().encode(this.avroCloudEventData, bytes);
+ return bytes.toByteArray();
+ } catch (IOException e) {
+ throw new EventDeserializationException(e);
+ }
+ }
+}
diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java
new file mode 100644
index 000000000..d8690d001
--- /dev/null
+++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.avro;
+
+import java.util.Map;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.time.OffsetDateTime;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.avro.AvroCloudEvent;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventReader;
+import io.cloudevents.rw.CloudEventDataMapper;
+import io.cloudevents.rw.CloudEventWriter;
+import io.cloudevents.rw.CloudEventWriterFactory;
+
+class AvroDeserializer implements CloudEventReader {
+
+ private final AvroCloudEvent avroCloudEvent;
+
+ public AvroDeserializer(AvroCloudEvent avroCloudEvent) {
+ this.avroCloudEvent = avroCloudEvent;
+ }
+
+ @Override
+ public , R> R read(CloudEventWriterFactory writerFactory,
+ CloudEventDataMapper extends CloudEventData> mapper) throws CloudEventRWException {
+ Map avroCloudEventAttrs = this.avroCloudEvent.getAttribute();
+ SpecVersion specVersion = SpecVersion.parse((String)avroCloudEventAttrs.get(CloudEventV1.SPECVERSION));
+ final CloudEventWriter writer = writerFactory.create(specVersion);
+
+ for (Map.Entry entry: avroCloudEventAttrs.entrySet()) {
+ String key = entry.getKey().toString();
+
+ switch(key) {
+ case CloudEventV1.SPECVERSION:
+ continue;
+ case CloudEventV1.TIME: {
+ // OffsetDateTime
+ OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue());
+ writer.withContextAttribute(key, value);
+ };
+ case CloudEventV1.DATASCHEMA: {
+ // URI
+ URI value = URI.create((String) entry.getValue());
+ writer.withContextAttribute(key, value);
+ };
+ default:
+ writer.withContextAttribute(key, (String) entry.getValue());
+ }
+ }
+
+ ByteBuffer buffer = (ByteBuffer) this.avroCloudEvent.getData();
+
+ if (buffer != null) {
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ return writer.end(mapper.map(BytesCloudEventData.wrap(data)));
+ } else {
+ return writer.end();
+ }
+ }
+
+}
diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java
new file mode 100644
index 000000000..39fc774a9
--- /dev/null
+++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+import io.cloudevents.avro.AvroCloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.EventDeserializationException;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.format.EventSerializationException;
+import io.cloudevents.rw.CloudEventDataMapper;
+
+public class AvroFormat implements EventFormat {
+
+ public static final String AVRO_CONTENT_TYPE = "application/avro";
+
+ @Override
+ public byte[] serialize(CloudEvent event) throws EventSerializationException {
+ AvroCloudEvent avroCloudEvent = AvroSerializer.toAvro(event);
+
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+ AvroCloudEvent.getEncoder().encode(avroCloudEvent, output);
+ return output.toByteArray();
+ } catch (IOException e) {
+ throw new EventSerializationException(e);
+ }
+ }
+
+ @Override
+ public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper extends CloudEventData> mapper)
+ throws EventDeserializationException {
+ try (ByteArrayInputStream input = new ByteArrayInputStream(bytes)) {
+ AvroCloudEvent avroCloudEvent = AvroCloudEvent.getDecoder().decode(input);
+
+ return new AvroDeserializer(avroCloudEvent).read(CloudEventBuilder::fromSpecVersion, mapper);
+ } catch (IOException e) {
+ throw new EventDeserializationException(e);
+ }
+ }
+
+ @Override
+ public String serializedContentType() {
+ return AVRO_CONTENT_TYPE;
+ }
+}
diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java
new file mode 100644
index 000000000..67d34234e
--- /dev/null
+++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.cloudevents.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+import io.cloudevents.core.v1.CloudEventV1;
+import io.cloudevents.avro.AvroCloudEvent;
+
+class AvroSerializer {
+
+ static final AvroCloudEvent toAvro(CloudEvent e) {
+ AvroCloudEvent avroCloudEvent = new AvroCloudEvent();
+
+ Map attrs = new HashMap<>();
+
+ attrs.put(CloudEventV1.SPECVERSION, e.getSpecVersion().toString());
+ attrs.put(CloudEventV1.TYPE, e.getType());
+ attrs.put(CloudEventV1.ID, e.getId());
+ attrs.put(CloudEventV1.SOURCE, e.getSource());
+
+ if (e.getTime() != null) {
+ // convert to string
+ attrs.put(CloudEventV1.TIME, e.getTime().toString());
+ }
+
+ if (e.getDataSchema() != null) {
+ // convert
+ attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString());
+ }
+
+ attrs.put(CloudEventV1.SUBJECT, e.getSubject());
+ attrs.put(CloudEventV1.DATACONTENTTYPE, e.getDataContentType());
+
+ avroCloudEvent.setAttribute(attrs);
+
+ // check datacontenttype
+ CloudEventData cloudEventData = e.getData();
+ if (cloudEventData != null) {
+ avroCloudEvent.setData(ByteBuffer.wrap(cloudEventData.toBytes()));
+ }
+
+ return avroCloudEvent;
+ }
+}
diff --git a/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat
new file mode 100644
index 000000000..31cb85d6d
--- /dev/null
+++ b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat
@@ -0,0 +1 @@
+io.cloudevents.avro.AvroFormat
diff --git a/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java
new file mode 100644
index 000000000..2e7de183b
--- /dev/null
+++ b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2018-Present The CloudEvents Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *