diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8bd3a05 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target/ +/.settings/ +/.classpath +/.project diff --git a/README.md b/README.md index b289983..724b307 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,11 @@ # azure-eventhub-javaexample Java application to send and receive JSON message from Azure Event Hub + + +The Project has 2 sample code + +1. Sample code to send JSON Message to the eventhub +2. Sample code to listen message from the eventhub using eventhub processor host (EPH) libaray. + +Happy Learning. + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..63d6b10 --- /dev/null +++ b/pom.xml @@ -0,0 +1,40 @@ + + 4.0.0 + azure-eventhub-javaexample + azure-eventhub-javaexample + 0.0.1-SNAPSHOT + + + + com.microsoft.azure + azure-eventhubs + 1.2.1 + + + + com.microsoft.azure + azure-eventhubs-eph + 2.0.1 + + + + + com.google.code.gson + gson + 2.8.5 + + + + + + + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + + \ No newline at end of file diff --git a/src/main/java/edu/azure/constants/Constants.java b/src/main/java/edu/azure/constants/Constants.java new file mode 100644 index 0000000..f0b03e5 --- /dev/null +++ b/src/main/java/edu/azure/constants/Constants.java @@ -0,0 +1,37 @@ +package edu.azure.constants; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; + +public interface Constants { + + // Eventhub Namespace + String EH_NAMESPACE="<--Evenhub Name Space-->"; + String EH_NAME = "<-- Event hub Name -->"; + + String SENDER_SAS_KEY_NAME = "<-- Sender SAS Key Name -->"; + String SENDER_SAS_KEY="<-- Sender SAS KEY (Primary Key) -->"; + String SAMPLE_FILE_NAME = "sample.json"; + + String LISTEN_SAS_KEY_NAME = "<-- Listener SAS Key Name -->"; + String LISTEN_SAS_KEY="<-- Listener SAS KEY (Primary Key) -->"; + String CONSUMER_GROUP="<-- consumer group (required for listerner) -->"; + + // Azure Storage information required for Event hub listener to do checkpoint + String AZURE_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net"; + String AZURE_STORAGE_CONTAINER_NAME = "<-- container name -->"; + + + + final ConnectionStringBuilder SENDER_CONNECTION_STRING = new ConnectionStringBuilder() + .setNamespaceName(EH_NAMESPACE) + .setEventHubName(EH_NAME) + .setSasKeyName(SENDER_SAS_KEY_NAME) + .setSasKey(SENDER_SAS_KEY); + + final ConnectionStringBuilder LISTENER_CONNECTION_STRING = new ConnectionStringBuilder() + .setNamespaceName(EH_NAMESPACE) + .setEventHubName(EH_NAME) + .setSasKeyName(LISTEN_SAS_KEY_NAME) + .setSasKey(LISTEN_SAS_KEY); + +} diff --git a/src/main/java/edu/azure/eventhub/receiver/eph/ErrorNotificationHandler.java b/src/main/java/edu/azure/eventhub/receiver/eph/ErrorNotificationHandler.java new file mode 100644 index 0000000..79ef545 --- /dev/null +++ b/src/main/java/edu/azure/eventhub/receiver/eph/ErrorNotificationHandler.java @@ -0,0 +1,13 @@ +package edu.azure.eventhub.receiver.eph; + +import java.util.function.Consumer; +import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs; + +public class ErrorNotificationHandler implements Consumer +{ + @Override + public void accept(ExceptionReceivedEventArgs t) + { + System.out.println("Event Hub Reader: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString()); + } +} diff --git a/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessor.java b/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessor.java new file mode 100644 index 0000000..073533b --- /dev/null +++ b/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessor.java @@ -0,0 +1,79 @@ +package edu.azure.eventhub.receiver.eph; + +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventprocessorhost.CloseReason; +import com.microsoft.azure.eventprocessorhost.IEventProcessor; +import com.microsoft.azure.eventprocessorhost.PartitionContext; + +/** + * + * @author hramc + * + * Listener Class of Event hub + * + * It will get execute for each partition & messages. + * + */ +public class EventProcessor implements IEventProcessor +{ + + private long checkpointBatchingCount = 0; + + // OnOpen is called when a new event processor instance is created by the host. + @Override + public void onOpen(PartitionContext context) throws Exception + { + System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " is opening"); + } + + // OnClose is called when an event processor instance is being shut down. + @Override + public void onClose(PartitionContext context, CloseReason reason) throws Exception + { + System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString()); + } + + // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure. + @Override + public void onError(PartitionContext context, Throwable error) + { + System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " onError: " + error.toString()); + } + + // onEvents is called when events are received on this partition of the Event Hub. + @Override + public void onEvents(PartitionContext context, Iterable events) throws Exception + { + System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " got event batch"); + int eventCount = 0; + for (EventData data : events) + { + try + { + System.out.println("Event Hub Reader (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," + + data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8")); + eventCount++; + + + + // Checkpointing persists the current position in the event stream for this partition and means that the next + // time any host opens an event processor on this event hub+consumer group+partition combination, it will start + // receiving at the event after this one. + this.checkpointBatchingCount++; + if ((checkpointBatchingCount % 5) == 0) + { + + // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing + // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering. + context.checkpoint(data).get(); + + } + } + catch (Exception e) + { + System.out.println("Processing failed for an event: " + e.toString()); + } + } + System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner()); + } +} diff --git a/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessorReciever.java b/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessorReciever.java new file mode 100644 index 0000000..7871f1b --- /dev/null +++ b/src/main/java/edu/azure/eventhub/receiver/eph/EventProcessorReciever.java @@ -0,0 +1,94 @@ +package edu.azure.eventhub.receiver.eph; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; + +import com.microsoft.azure.eventprocessorhost.EventProcessorHost; +import com.microsoft.azure.eventprocessorhost.EventProcessorOptions; + +import edu.azure.constants.Constants; + +/** + * + * @author hramc + * + * Class to receive data from Event hub. It uses Eventhub processor Host azure library. + * + */ +public class EventProcessorReciever +{ + public static void main(String args[]) throws InterruptedException, ExecutionException, UnknownHostException + { + // Get the Host name for our reference + String HOST_NAME = InetAddress.getLocalHost().getHostName(); + + // Initialize the Eventhub processor host + final EventProcessorHost eventHubProcessorHost = new EventProcessorHost( + EventProcessorHost.createHostName(HOST_NAME), + Constants.EH_NAME, + Constants.CONSUMER_GROUP, + Constants.LISTENER_CONNECTION_STRING.toString(), + Constants.AZURE_STORAGE_CONNECTION_STRING, + Constants.AZURE_STORAGE_CONTAINER_NAME); + + + System.out.println("Registering host named " + eventHubProcessorHost.getHostName()); + + // Event hub options + EventProcessorOptions options = new EventProcessorOptions(); + options.setExceptionNotification(new ErrorNotificationHandler()); + options.setMaxBatchSize(1); + + /** + * whenComplete - code will get execute after register the event processor listener. + * if any exception in initializing in listener, we will get to know. + * + * thenAccept - this code will be executed after registering the listener. + * here we are waiting in a loop. Till it get satisify, listener will read the data from the eventhub + * + * thenCompose - it will be last step of unregistering the listener. + * + */ + eventHubProcessorHost.registerEventProcessor(EventProcessor.class, options) + .whenComplete((unused, e) -> + { + if (e != null) + { + System.out.println("Failure while registering: " + e.toString()); + if (e.getCause() != null) + { + System.out.println("Inner exception: " + e.getCause().toString()); + } + } + }) + .thenAccept((unused) -> + { + System.out.println("Press any key to Stop"); + try + { + System.in.read(); + } + catch (Exception e) + { + System.out.println("Keyboard read failed: " + e.toString()); + } + }) + .thenCompose((unused) -> + { + return eventHubProcessorHost.unregisterEventProcessor(); + }) + .exceptionally((e) -> + { + System.out.println("Failure while unregistering: " + e.toString()); + if (e.getCause() != null) + { + System.out.println("Inner exception: " + e.getCause().toString()); + } + return null; + }) + .get(); // Wait for everything to finish before exiting main! + + System.out.println("End of Reading Data"); + } +} diff --git a/src/main/java/edu/azure/eventhub/sender/Sender.java b/src/main/java/edu/azure/eventhub/sender/Sender.java new file mode 100644 index 0000000..619b63f --- /dev/null +++ b/src/main/java/edu/azure/eventhub/sender/Sender.java @@ -0,0 +1,51 @@ +package edu.azure.eventhub.sender; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; + +import edu.azure.constants.Constants; + +/** + * + * @author hramc + * + * Class to send a message into event hub + * + */ +public class Sender { + + public static void main(String[] args) + throws EventHubException, ExecutionException, InterruptedException, IOException { + + // Executor Service required by the Eventhub client + ExecutorService executorService = Executors.newFixedThreadPool(1); + + // Eventhub client + final EventHubClient ehClient = EventHubClient.createSync(Constants.SENDER_CONNECTION_STRING.toString(), executorService); + + // Send JSON Data Data + ehClient.sendSync(EventData.create(new Gson().fromJson(new FileReader( + new File(Sender.class.getClassLoader().getResource(Constants.SAMPLE_FILE_NAME).getFile())), + JsonObject.class). + toString().getBytes(Charset.defaultCharset()))); + + // close the client at the end of your program + ehClient.closeSync(); + + // Shutdown the executor service + executorService.shutdown(); + + } + + } diff --git a/src/main/resources/sample.json b/src/main/resources/sample.json new file mode 100644 index 0000000..b9fe243 --- /dev/null +++ b/src/main/resources/sample.json @@ -0,0 +1,3 @@ +{ + "name" : "hrmac" +} \ No newline at end of file