Skip to content

Commit

Permalink
Data Cleanup
Browse files Browse the repository at this point in the history
Added DataMapper, removed Doubles from code and replaced with Long for
Price and Quantity (decimal offset to be determined)
  • Loading branch information
john authored and john committed Sep 13, 2023
1 parent ac784e0 commit 8dc936b
Show file tree
Hide file tree
Showing 24 changed files with 315 additions and 146 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.alignmentsystems.matching;
/******************************************************************************
*
* Author : John Greenan
* Contact : sales@alignment-systems.com
* Date : 13th September 2023
* Copyright : Alignment Systems Ltd 2023
* Project : Alignment Matching Toy
* Artefact : DataMapper
* Description :
*****************************************************************************/
import java.util.HashMap;

/**
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
*
*/
public class DataMapper {
private HashMap<String, Long> memberFIXSenderCompIdToExchangeIdMap = new HashMap<String, Long>();
private HashMap<String, Long> memberInstrumentIdToExchangeInstrumentIdMap = new HashMap<String, Long>();

protected HashMap<String, Long> getMemberFIXSenderCompIdToExchangeIdMap() {
return memberFIXSenderCompIdToExchangeIdMap;
}

protected HashMap<String, Long> getMemberInstrumentIdToExchangeInstrumentIdMap() {
return memberInstrumentIdToExchangeInstrumentIdMap;
}

public DataMapper() {
Long added = Long.MIN_VALUE;
memberFIXSenderCompIdToExchangeIdMap.put("MEMBER_A", added);
added++;
memberFIXSenderCompIdToExchangeIdMap.put("MEMBER_B", added);
added++;
memberFIXSenderCompIdToExchangeIdMap.put("EXCHANGE", added);


memberInstrumentIdToExchangeInstrumentIdMap.put("Badger.W", Long.MAX_VALUE);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ public void onMessage(com.alignmentsystems.fix44.NewOrderSingle message, Session

AlignmentOrder ao = new AlignmentOrder();

ao.setNewOrderSingle(message, sessionID, MessageDirection.RECEIVED, UUID.randomUUID().toString(),
orderBookSide);
ao.setNewOrderSingle(message, sessionID, UUID.randomUUID(), orderBookSide);

StringBuilder sb = new StringBuilder().append(" OrderID=(").append(ao.getOrderId()).append(" ClOrdID=(")
.append(ao.getClOrdID()).append(" Side=(").append(ao.getOrderBookSide().sideValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*****************************************************************************/

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -24,6 +25,7 @@
import com.alignmentsystems.library.LibraryFunctions;
import com.alignmentsystems.library.LogEncapsulation;
import com.alignmentsystems.library.constants.Constants;
import com.alignmentsystems.library.enumerations.Encodings;
import com.alignmentsystems.library.enumerations.InstanceType;
import com.alignmentsystems.library.interfaces.InterfaceFIXToBinaryProcessor;
import com.alignmentsystems.library.interfaces.InterfaceOrder;
Expand All @@ -35,7 +37,13 @@ public class FIXToBinaryProcessor implements Runnable, InterfaceFIXToBinaryProce
private KafkaProducer<String, byte[]> kafkaProducerB = null;
private AtomicBoolean running = new AtomicBoolean(false);
private final static int MILLISLEEP = 200;
private DataMapper dataMapper = null;
private HashMap<String, Long> memberFIXSenderCompIdToExchangeIdMap;
private HashMap<String, Long> memberInstrumentIdToExchangeInstrumentIdMap;




public FIXToBinaryProcessor() {
// TODO Auto-generated constructor stub
}
Expand All @@ -45,12 +53,19 @@ public boolean initialise(ConcurrentLinkedQueue<InterfaceOrder> inQueue, LogEnca
this.inQueue = inQueue;
this.log = log;

dataMapper = new DataMapper();

memberFIXSenderCompIdToExchangeIdMap = dataMapper.getMemberFIXSenderCompIdToExchangeIdMap();
memberInstrumentIdToExchangeInstrumentIdMap = dataMapper.getMemberInstrumentIdToExchangeInstrumentIdMap();



if (this.kafkaProducerB == null) {
Properties props;
try {
props = LibraryFunctions.getProperties(FIXToBinaryProcessor.class.getClassLoader(), InstanceType.KAFKA.getProperties());
} catch (FileNotFoundException | NullPointerException e) {
log.error(e.getMessage() , e);
this.log.error(e.getMessage() , e);
throw e;
}
this.kafkaProducerB = new KafkaProducer<>(props);
Expand All @@ -60,21 +75,68 @@ public boolean initialise(ConcurrentLinkedQueue<InterfaceOrder> inQueue, LogEnca

}

private Sender getBufferFromOrder(InterfaceOrder inSeq) {

String symbol = inSeq.getSymbol();
String orderId = inSeq.getOrderId().toString();

Long exchangeIdMappedFromSenderCompID = memberFIXSenderCompIdToExchangeIdMap.get(inSeq.getSender());
Long exchangeIdMappedFromTargetCompID = memberFIXSenderCompIdToExchangeIdMap.get(inSeq.getTarget());
Long exchangeInstrumentIdMappedFromSymbol = memberInstrumentIdToExchangeInstrumentIdMap.get(inSeq.getSymbol());


final Encodings encoding = Encodings.FIXSBELITTLEENDIAN;
// TODO Auto-generated method stub
int bufferLength =
Long.BYTES * 2 //ClOrdId
+
Long.BYTES * 2 //OrderId
+
Long.BYTES //exchangeIdMappedFromSenderCompID.BYTES
+
Long.BYTES //exchangeIdMappedFromTargetCompID.BYTES
+
Long.BYTES //this.orderQty
+
Long.BYTES //this.limitPrice
+
Long.BYTES //exchangeInstrumentIdMappedFromSymbol.BYTES
+
Double.BYTES //this.ts
;

ByteBuffer buf = ByteBuffer.allocate(bufferLength).order(encoding.getByteOrder());
buf.putLong(inSeq.getClOrdID().getLeastSignificantBits());
buf.putLong(inSeq.getClOrdID().getMostSignificantBits());
buf.putLong(inSeq.getOrderId().getLeastSignificantBits());
buf.putLong(inSeq.getOrderId().getMostSignificantBits());
buf.putLong(exchangeIdMappedFromSenderCompID);
buf.putLong(exchangeIdMappedFromTargetCompID);
buf.putLong(inSeq.getOrderQty());
buf.putLong(inSeq.getLimitPrice());
buf.putLong(exchangeInstrumentIdMappedFromSymbol);
buf.putLong(Double.doubleToLongBits(inSeq.getTimestamp().toInstant().toEpochMilli()));

Sender sender = new Sender(buf, symbol, orderId);


return sender;
}




@Override
public void run() {
running.set(true);
ByteBuffer bb = null;
String symbol = null;
String orderId = null;

while (running.get()) {

InterfaceOrder inSeq = inQueue.poll();

if (inSeq != null) {
bb = inSeq.getBinaryOrderData();
symbol = inSeq.getSymbol();
orderId = inSeq.getOrderId();
this.send(symbol, orderId, bb);
Sender sender = getBufferFromOrder(inSeq);
this.send(sender.getSymbol(), sender.getOrderId(), sender.getBb());
}
try {
Thread.currentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public boolean initialise(InstanceType instanceType) {
sb.append(CLASSNAME).append(" Started instance=").append(this.instanceType.type).append(" Started version=")
.append(LibraryFunctions.getVersion(this.getClass()));


Boolean returnValue = Boolean.FALSE;
log.info(sb.toString());

// What do we do here?
Expand All @@ -58,12 +60,22 @@ public boolean initialise(InstanceType instanceType) {
switch (instanceType) {

case FIXMESSAGINGINFRA:
return initialiseFIXMessagingInfrastructure(instanceType);
returnValue = initialiseFIXMessagingInfrastructure(instanceType);
case ORDERBOOK:
return initialiseOrderBook(instanceType) ;
returnValue = initialiseOrderBook(instanceType) ;
default:
return false;
returnValue = false;
}
while (returnValue) {
try {
this.wait(2000);
} catch (InterruptedException e) {
log.error(e.getMessage() , e );
}
}
return returnValue;


}

private Boolean initialiseOrderBook(InstanceType instanceType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
*****************************************************************************/

/**
* The type Abstract class SimpleKafka
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
*
*/
public abstract class KafkaAbstractSimple {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import com.alignmentsystems.library.enumerations.ConfigurationProperties;
import com.alignmentsystems.library.enumerations.InstanceType;


/**
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
*
*/
public class KafkaLibrary {

public static JsonObject getMessageLogEntryJSON(String source, String topic, String key, String message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
* The interface KafkaMessageHandler.
*
* This interface is the template callback functions that can be passed to an
Expand Down
23 changes: 12 additions & 11 deletions exchange/src/main/java/com/alignmentsystems/matching/OrderBook.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -147,10 +148,10 @@ private void runMatch() {
return;
}

final Double topOfBuyBookPrice = topOfBuyBook.getLimitPrice().getValue();
final Double topOfSellBookPrice = topOfSellBook.getLimitPrice().getValue();
Double tradedQuantity = 0d;
Double tradedPrice = 0d;
final Long topOfBuyBookPrice = topOfBuyBook.getLimitPrice();
final Long topOfSellBookPrice = topOfSellBook.getLimitPrice();
Long tradedQuantity = null;
Long tradedPrice = null;
// If buy top of book price is greater than or equal to the sell top of book
// then we have got a buyer
// who has crossed the spread. So we have a trade, yay!
Expand Down Expand Up @@ -178,25 +179,25 @@ private void runMatch() {
aggressor = new Side(Side.BUY);
}

Double topOfBuyBookQty = topOfBuyBook.getOrderQty().getValue();
Double topOfSellBookQty = topOfBuyBook.getOrderQty().getValue();
Long topOfBuyBookQty = topOfBuyBook.getOrderQty();
Long topOfSellBookQty = topOfBuyBook.getOrderQty();

tradedQuantity = Math.min(topOfBuyBookQty, topOfSellBookQty);

switch (aggressor.getValue()) {
case Side.SELL:
tradedPrice = Math.max(topOfBuyBookPrice, topOfSellBookPrice);
tradedPrice = Math. max(topOfBuyBookPrice, topOfSellBookPrice);
break;
case Side.BUY:
tradedPrice = Math.min(topOfBuyBookPrice, topOfSellBookPrice);
break;
}
OffsetDateTime executionTimestamp = OffsetDateTime.now(Constants.HERE);

String buyClOrdID = topOfBuyBook.getClOrdID();
String sellClOrdID = topOfSellBook.getClOrdID();
String buyOrderID = topOfBuyBook.getOrderId();
String sellOrderID = topOfSellBook.getOrderId();
UUID buyClOrdID = topOfBuyBook.getClOrdID();
UUID sellClOrdID = topOfSellBook.getClOrdID();
UUID buyOrderID = topOfBuyBook.getOrderId();
UUID sellOrderID = topOfSellBook.getOrderId();
final boolean isEligibleForMarketData = true;

Match match = new Match(tradedQuantity, tradedPrice, topOfBuyBook, topOfSellBook, aggressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void sendExecutionReportAcknowledgementForReceivedOrder(InterfaceOrder n
}

private static ExecutionReport getExecutionReportAcknowledgementForOrder(InterfaceOrder nos) throws FieldNotFound {
OrderID orderId = new OrderID(nos.getOrderId());
OrderID orderId = new OrderID(nos.getOrderId().toString());
ExecID execID = new ExecID(UUID.randomUUID().toString());
ExecType execType = new ExecType(ExecType.NEW);
OrdStatus ordStatus = new OrdStatus(OrdStatus.NEW);
Expand Down
38 changes: 38 additions & 0 deletions exchange/src/main/java/com/alignmentsystems/matching/Sender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.alignmentsystems.matching;
/******************************************************************************
*
* Author : John Greenan
* Contact : sales@alignment-systems.com
* Date : 13th September 2023
* Copyright : Alignment Systems Ltd 2023
* Project : Alignment Matching Toy
* Artefact : Sender
* Description :
*****************************************************************************/
import java.nio.ByteBuffer;

/**
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
*
*/
public class Sender {
public Sender(ByteBuffer bb, String symbol, String orderId) {
super();
this.bb = bb;
this.symbol = symbol;
this.orderId = orderId;
}
ByteBuffer bb = null;
String symbol = null;
String orderId = null;

public ByteBuffer getBb() {
return bb;
}
public String getSymbol() {
return symbol;
}
public String getOrderId() {
return orderId;
}
}
4 changes: 2 additions & 2 deletions exchange/version.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#Wed Sep 13 19:44:30 AEST 2023
VERSION_BUILD=574
#Thu Sep 14 00:32:04 AEST 2023
VERSION_BUILD=589
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ClientInstanceWrapper implements InterfaceInstanceWrapper{
public boolean initialise(InstanceType instanceType) {

this.instanceType = instanceType;

Boolean returnValue = Boolean.FALSE;
StringBuilder sb = new StringBuilder();

sb
Expand All @@ -72,11 +72,18 @@ public boolean initialise(InstanceType instanceType) {
switch(instanceType){
case MEMBERA:
case MEMBERB:
return initialiseMember(instanceType);
returnValue = initialiseMember(instanceType);
default:
return false;
returnValue = false;
}

while (returnValue) {
try {
this.wait(2000);
} catch (InterruptedException e) {
log.error(e.getMessage() , e );
}
}
return returnValue;



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public FIXEngineMember(LogEncapsulation log, InstanceType instanceType) {
public void onCreate(SessionID sessionId) {
// TODO Auto-generated method stub
final String METHODNAME = "onCreate".intern();



StringBuilder sb = new StringBuilder()
.append(sessionId.toString())
;
Expand Down
Loading

0 comments on commit 8dc936b

Please sign in to comment.