Skip to content

Commit

Permalink
Initial Version
Browse files Browse the repository at this point in the history
  • Loading branch information
hramc committed Dec 11, 2018
1 parent 41a1d9f commit d9f5138
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/target/
/.settings/
/.classpath
/.project
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
# azure-eventhub-javaexample
Java application to send and receive JSON message from Azure Event Hub


The Project has 2 sample code

1. Sample code to send JSON Message to the eventhub
2. Sample code to listen message from the eventhub using eventhub processor host (EPH) libaray.

Happy Learning.

40 changes: 40 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>azure-eventhub-javaexample</groupId>
<artifactId>azure-eventhub-javaexample</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.2.1</version>
</dependency>

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>2.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
37 changes: 37 additions & 0 deletions src/main/java/edu/azure/constants/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package edu.azure.constants;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;

public interface Constants {

// Eventhub Namespace
String EH_NAMESPACE="<--Evenhub Name Space-->";
String EH_NAME = "<-- Event hub Name -->";

String SENDER_SAS_KEY_NAME = "<-- Sender SAS Key Name -->";
String SENDER_SAS_KEY="<-- Sender SAS KEY (Primary Key) -->";
String SAMPLE_FILE_NAME = "sample.json";

String LISTEN_SAS_KEY_NAME = "<-- Listener SAS Key Name -->";
String LISTEN_SAS_KEY="<-- Listener SAS KEY (Primary Key) -->";
String CONSUMER_GROUP="<-- consumer group (required for listerner) -->";

// Azure Storage information required for Event hub listener to do checkpoint
String AZURE_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<primaryKey>;EndpointSuffix=core.windows.net";
String AZURE_STORAGE_CONTAINER_NAME = "<-- container name -->";



final ConnectionStringBuilder SENDER_CONNECTION_STRING = new ConnectionStringBuilder()
.setNamespaceName(EH_NAMESPACE)
.setEventHubName(EH_NAME)
.setSasKeyName(SENDER_SAS_KEY_NAME)
.setSasKey(SENDER_SAS_KEY);

final ConnectionStringBuilder LISTENER_CONNECTION_STRING = new ConnectionStringBuilder()
.setNamespaceName(EH_NAMESPACE)
.setEventHubName(EH_NAME)
.setSasKeyName(LISTEN_SAS_KEY_NAME)
.setSasKey(LISTEN_SAS_KEY);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package edu.azure.eventhub.receiver.eph;

import java.util.function.Consumer;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;

public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
{
@Override
public void accept(ExceptionReceivedEventArgs t)
{
System.out.println("Event Hub Reader: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
79 changes: 79 additions & 0 deletions src/main/java/edu/azure/eventhub/receiver/eph/EventProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package edu.azure.eventhub.receiver.eph;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;

/**
*
* @author hramc
*
* Listener Class of Event hub
*
* It will get execute for each partition & messages.
*
*/
public class EventProcessor implements IEventProcessor
{

private long checkpointBatchingCount = 0;

// OnOpen is called when a new event processor instance is created by the host.
@Override
public void onOpen(PartitionContext context) throws Exception
{
System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " is opening");
}

// OnClose is called when an event processor instance is being shut down.
@Override
public void onClose(PartitionContext context, CloseReason reason) throws Exception
{
System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
}

// onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
@Override
public void onError(PartitionContext context, Throwable error)
{
System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " onError: " + error.toString());
}

// onEvents is called when events are received on this partition of the Event Hub.
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
{
System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " got event batch");
int eventCount = 0;
for (EventData data : events)
{
try
{
System.out.println("Event Hub Reader (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
eventCount++;



// Checkpointing persists the current position in the event stream for this partition and means that the next
// time any host opens an event processor on this event hub+consumer group+partition combination, it will start
// receiving at the event after this one.
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 5) == 0)
{

// Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
// before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
context.checkpoint(data).get();

}
}
catch (Exception e)
{
System.out.println("Processing failed for an event: " + e.toString());
}
}
System.out.println("Event Hub Reader: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package edu.azure.eventhub.receiver.eph;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;

import edu.azure.constants.Constants;

/**
*
* @author hramc
*
* Class to receive data from Event hub. It uses Eventhub processor Host azure library.
*
*/
public class EventProcessorReciever
{
public static void main(String args[]) throws InterruptedException, ExecutionException, UnknownHostException
{
// Get the Host name for our reference
String HOST_NAME = InetAddress.getLocalHost().getHostName();

// Initialize the Eventhub processor host
final EventProcessorHost eventHubProcessorHost = new EventProcessorHost(
EventProcessorHost.createHostName(HOST_NAME),
Constants.EH_NAME,
Constants.CONSUMER_GROUP,
Constants.LISTENER_CONNECTION_STRING.toString(),
Constants.AZURE_STORAGE_CONNECTION_STRING,
Constants.AZURE_STORAGE_CONTAINER_NAME);


System.out.println("Registering host named " + eventHubProcessorHost.getHostName());

// Event hub options
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
options.setMaxBatchSize(1);

/**
* whenComplete - code will get execute after register the event processor listener.
* if any exception in initializing in listener, we will get to know.
*
* thenAccept - this code will be executed after registering the listener.
* here we are waiting in a loop. Till it get satisify, listener will read the data from the eventhub
*
* thenCompose - it will be last step of unregistering the listener.
*
*/
eventHubProcessorHost.registerEventProcessor(EventProcessor.class, options)
.whenComplete((unused, e) ->
{
if (e != null)
{
System.out.println("Failure while registering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
}
})
.thenAccept((unused) ->
{
System.out.println("Press any key to Stop");
try
{
System.in.read();
}
catch (Exception e)
{
System.out.println("Keyboard read failed: " + e.toString());
}
})
.thenCompose((unused) ->
{
return eventHubProcessorHost.unregisterEventProcessor();
})
.exceptionally((e) ->
{
System.out.println("Failure while unregistering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
return null;
})
.get(); // Wait for everything to finish before exiting main!

System.out.println("End of Reading Data");
}
}
51 changes: 51 additions & 0 deletions src/main/java/edu/azure/eventhub/sender/Sender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package edu.azure.eventhub.sender;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import edu.azure.constants.Constants;

/**
*
* @author hramc
*
* Class to send a message into event hub
*
*/
public class Sender {

public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {

// Executor Service required by the Eventhub client
ExecutorService executorService = Executors.newFixedThreadPool(1);

// Eventhub client
final EventHubClient ehClient = EventHubClient.createSync(Constants.SENDER_CONNECTION_STRING.toString(), executorService);

// Send JSON Data Data
ehClient.sendSync(EventData.create(new Gson().fromJson(new FileReader(
new File(Sender.class.getClassLoader().getResource(Constants.SAMPLE_FILE_NAME).getFile())),
JsonObject.class).
toString().getBytes(Charset.defaultCharset())));

// close the client at the end of your program
ehClient.closeSync();

// Shutdown the executor service
executorService.shutdown();

}

}
3 changes: 3 additions & 0 deletions src/main/resources/sample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name" : "hrmac"
}

0 comments on commit d9f5138

Please sign in to comment.