Skip to content

Commit

Permalink
Add default implementation of events, snapshots, plb printers
Browse files Browse the repository at this point in the history
Add dummy printers
Add heartbeat printing to the samples
Add quiet mode to the samples
  • Loading branch information
AnatolyKalin committed Nov 20, 2023
1 parent a7efd06 commit e86421d
Show file tree
Hide file tree
Showing 9 changed files with 656 additions and 460 deletions.
513 changes: 507 additions & 6 deletions dxf_api/src/extras/EventPrinter.cs

Large diffs are not rendered by default.

168 changes: 30 additions & 138 deletions dxf_client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,131 +21,6 @@

namespace dxf_client
{
public class SnapshotPrinter :
IDxOrderSnapshotListener,
IDxCandleSnapshotListener,
IDxTimeAndSaleSnapshotListener,
IDxSpreadOrderSnapshotListener,
IDxGreeksSnapshotListener,
IDxSeriesSnapshotListener
{
private readonly int recordsPrintLimit;

public SnapshotPrinter(int recordsPrintLimit)
{
this.recordsPrintLimit = recordsPrintLimit;
}

#region Implementation of IDxCandleSnapshotListener

/// <summary>
/// On Candle snapshot event received.
/// </summary>
/// <typeparam name="TB">Event buffer type.</typeparam>
/// <typeparam name="TE">Event type.</typeparam>
/// <param name="buf">Event buffer object.</param>
public void OnCandleSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxCandle
{
PrintSnapshot(buf);
}

#endregion //IDxCandleSnapshotListener

#region Implementation of IDxGreeksSnapshotListener

public void OnGreeksSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxGreeks
{
PrintSnapshot(buf);
}

#endregion

#region Implementation of IDxOrderSnapshotListener

/// <summary>
/// On Order snapshot event received.
/// </summary>
/// <typeparam name="TB">Event buffer type.</typeparam>
/// <typeparam name="TE">Event type.</typeparam>
/// <param name="buf">Event buffer object.</param>
public void OnOrderSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxOrder
{
PrintSnapshot(buf);
}

#endregion //IDxOrderSnapshotListener

#region Implementation of IDxSeriesSnapshotListener

public void OnSeriesSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxSeries
{
PrintSnapshot(buf);
}

#endregion

#region Implementation of IDxSpreadOrderSnapshotListener

/// <summary>
/// On SpreadOrder snapshot event received.
/// </summary>
/// <typeparam name="TB">Event buffer type.</typeparam>
/// <typeparam name="TE">Event type.</typeparam>
/// <param name="buf">Event buffer object.</param>
public void OnSpreadOrderSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxSpreadOrder
{
PrintSnapshot(buf);
}

#endregion

#region Implementation of IDxTimeAndSaleSnapshotListener

/// <summary>
/// On TimeAndSale snapshot event received.
/// </summary>
/// <typeparam name="TB">Event buffer type.</typeparam>
/// <typeparam name="TE">Event type.</typeparam>
/// <param name="buf">Event buffer object.</param>
public void OnTimeAndSaleSnapshot<TB, TE>(TB buf)
where TB : IDxEventBuf<TE>
where TE : IDxTimeAndSale
{
PrintSnapshot(buf);
}

#endregion //IDxTimeAndSaleSnapshotListener

private void PrintSnapshot<TE>(IDxEventBuf<TE> buf)
{
Console.WriteLine(string.Format(CultureInfo.InvariantCulture,
"Snapshot {0} {{Symbol: '{1}', RecordsCount: {2}}}",
buf.EventType, buf.Symbol, buf.Size));

var count = 0;
foreach (var o in buf)
{
Console.WriteLine($" {{ {o} }}");

if (++count < recordsPrintLimit || recordsPrintLimit == 0) continue;

Console.WriteLine($" {{ ... {buf.Size - count} records left ...}}");

break;
}
}
}

internal class InputParam<T>
{
private T value;
Expand Down Expand Up @@ -248,10 +123,10 @@ private static bool TryParseTaggedStringParam(string tag, string paramTagString,

private static void Main(string[] args)
{
if (args.Length < 3 || args.Length > 13)
if (args.Length < 3 || (args.Length > 0 && args[0].Equals("-h")))
{
Console.WriteLine(
"Usage: dxf_client <host:port>|<path> <event> <symbol> [<date>] [<source>] [snapshot] [-l <records_print_limit>] [-T <token>] [-s <subscr_data>] [-p] [-b]\n" +
"Usage: dxf_client <host:port>|<path> <event> <symbol> [<date>] [<source>] [snapshot] [-l <records_print_limit>] [-T <token>] [-s <subscr_data>] [-p] [-b] [-q]\n" +
"where\n" +
" host:port - The address of dxfeed server (demo.dxfeed.com:7300)\n" +
" path - The path to file with candle data (non zipped Candle Web Service output) or `tape` file\n" +
Expand Down Expand Up @@ -279,7 +154,8 @@ private static void Main(string[] args)
" -T <token> - The authorization token\n\n" +
" -s <subscr_data> - The subscription data: ticker|TICKER, stream|STREAM, history|HISTORY\n" +
" -p - Enables the data transfer logging\n" +
" -b - Enables the server's heartbeat logging to console\n\n" +
" -b - Enables the server's heartbeat logging to console\n" +
" -q - Quiet mode (do not print events and snapshots)\n\n" +
"examples:\n" +
" events: dxf_client demo.dxfeed.com:7300 Quote,Trade MSFT.TEST,IBM.TEST\n" +
" events: dxf_client demo.dxfeed.com:7300 Quote,Trade MSFT.TEST,IBM.TEST -s stream\n" +
Expand Down Expand Up @@ -318,6 +194,7 @@ private static void Main(string[] args)
var subscriptionData = new InputParam<EventSubscriptionFlag>(EventSubscriptionFlag.Default);
var logDataTransferFlag = false;
var logServerHeartbeatsFlag = false;
var quiteMode = false;

for (var i = SymbolIndex + 1; i < args.Length; i++)
{
Expand Down Expand Up @@ -363,6 +240,13 @@ private static void Main(string[] args)
continue;
}

if (quiteMode == false && args[i].Equals("-q"))
{
quiteMode = true;

continue;
}

if (!sources.IsSet)
TryParseSourcesParam(args[i], sources);
}
Expand All @@ -377,37 +261,45 @@ private static void Main(string[] args)
NativeTools.LoadConfigFromString("logger.level = \"debug\"\n");
NativeTools.InitializeLogging("dxf_client.log", true, true, logDataTransferFlag);

var listener = new EventPrinter();
var eventPrinter = quiteMode ? (IEventPrinter) new DummyEventPrinter() : new EventPrinter();

using (var con = token.IsSet
? new NativeConnection(address, token.Value, DisconnectHandler, ConnectionStatusChangeHandler)
: new NativeConnection(address, DisconnectHandler, ConnectionStatusChangeHandler))
? new NativeConnection(address, token.Value, DisconnectHandler, ConnectionStatusChangeHandler)
: new NativeConnection(address, DisconnectHandler, ConnectionStatusChangeHandler))
{
if (logServerHeartbeatsFlag)
{
con.SetOnServerHeartbeatHandler((connection, time, lagMark, rtt) =>
{
Console.Error.WriteLine(
$"##### Server time (UTC) = {time}, Server lag = {lagMark} us, RTT = {rtt} us #####");
});
}

IDxSubscription s = null;
try
{
if (dateTime.IsSet && (events & (EventType.Order | EventType.SpreadOrder)) != 0)
{
Console.Error.WriteLine("Date and event type Order or SpreadOrder cannot be used for subscription");
Console.Error.WriteLine(
"Date and event type Order or SpreadOrder cannot be used for subscription");
}

if (isSnapshot.Value)
{
s = con.CreateSnapshotSubscription(events, dateTime.Value,
new SnapshotPrinter(recordsPrintLimit.Value));
quiteMode
? (ISnapshotPrinter) new DummySnapshotPrinter()
: new SnapshotPrinter(recordsPrintLimit.Value));
}
else if (dateTime.IsSet)
s = subscriptionData.IsSet
? con.CreateSubscription(events, dateTime.Value, subscriptionData.Value, listener)
: con.CreateSubscription(events, dateTime.Value, listener);
? con.CreateSubscription(events, dateTime.Value, subscriptionData.Value, eventPrinter)
: con.CreateSubscription(events, dateTime.Value, eventPrinter);
else
s = subscriptionData.IsSet
? con.CreateSubscription(events, subscriptionData.Value, listener)
: con.CreateSubscription(events, listener);
? con.CreateSubscription(events, subscriptionData.Value, eventPrinter)
: con.CreateSubscription(events, eventPrinter);

if (events.HasFlag(EventType.Order) && sources.Value.Length > 0)
s.SetSource(sources.Value);
Expand Down
45 changes: 37 additions & 8 deletions samples/dxf_inc_order_snapshot_sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System;
using com.dxfeed.api;
using com.dxfeed.api.events;
using com.dxfeed.api.extras;
using com.dxfeed.native;

namespace dxf_inc_order_snapshot_sample
Expand Down Expand Up @@ -66,10 +67,10 @@ private static bool TryParseTaggedStringParam(string tag, string paramTagString,

private static void Main(string[] args)
{
if (args.Length < 2 || args.Length > 8)
if (args.Length < 2 || (args.Length > 0 && args[0].Equals("-h")))
{
Console.WriteLine(
"Usage: dxf_inc_order_snapshot_sample <host:port> <symbol> [<source>] [-l <records_print_limit>] [-T <token>] [-p]\n" +
"Usage: dxf_inc_order_snapshot_sample <host:port> <symbol> [<source>] [-l <records_print_limit>] [-T <token>] [-p] [-b] [-q]\n" +
"where\n" +
" host:port - The address of dxfeed server (demo.dxfeed.com:7300)\n" +
" symbol - The symbol string, it is allowed to use only one symbol (IBM, MSFT, etc)\n" +
Expand All @@ -82,8 +83,10 @@ private static void Main(string[] args)
" If source is not specified MarketMaker snapshot will be\n" +
" subscribed by default.\n\n" +
$" -l <records_print_limit> - The number of displayed records (0 - unlimited, default: {DefaultRecordsPrintLimit})\n" +
" -T <token> - The authorization token\n" +
" -p - Enables the data transfer logging\n\n" +
" -T <token> - The authorization token\n" +
" -p - Enables the data transfer logging\n" +
" -b - Enables the server's heartbeat logging to console\n" +
" -q - Quiet mode (do not print snapshots)\n\n" +
"order example: dxf_inc_order_snapshot_sample demo.dxfeed.com:7300 AAPL NTV\n" +
"market maker example:\n" +
" dxf_inc_order_snapshot_sample demo.dxfeed.com:7300 AAPL AGGREGATE_BID\n" +
Expand All @@ -100,6 +103,8 @@ private static void Main(string[] args)
var recordsPrintLimit = new InputParam<int>(DefaultRecordsPrintLimit);
var token = new InputParam<string>(null);
var logDataTransferFlag = false;
var logServerHeartbeatsFlag = false;
var quiteMode = false;

for (var i = SymbolIndex + 1; i < args.Length; i++)
{
Expand All @@ -122,7 +127,20 @@ private static void Main(string[] args)
if (logDataTransferFlag == false && args[i].Equals("-p"))
{
logDataTransferFlag = true;
i++;

continue;
}

if (logServerHeartbeatsFlag == false && args[i].Equals("-b"))
{
logServerHeartbeatsFlag = true;

continue;
}

if (quiteMode == false && args[i].Equals("-q"))
{
quiteMode = true;

continue;
}
Expand All @@ -140,11 +158,22 @@ private static void Main(string[] args)
{
NativeTools.InitializeLogging("dxf_inc_order_snapshot_sample.log", true, true, logDataTransferFlag);
using (var con = token.IsSet
? new NativeConnection(address, token.Value, DisconnectHandler)
: new NativeConnection(address, DisconnectHandler))
? new NativeConnection(address, token.Value, DisconnectHandler)
: new NativeConnection(address, DisconnectHandler))
{
if (logServerHeartbeatsFlag)
{
con.SetOnServerHeartbeatHandler((connection, time, lagMark, rtt) =>
{
Console.Error.WriteLine(
$"##### Server time (UTC) = {time}, Server lag = {lagMark} us, RTT = {rtt} us #####");
});
}

using (var s =
con.CreateIncOrderSnapshotSubscription(new SnapshotListener(recordsPrintLimit.Value)))
con.CreateIncOrderSnapshotSubscription(quiteMode
? (IDxIncOrderSnapshotListener) new DummyIncOrderSnapshotPrinter()
: new IncOrderSnapshotPrinter(recordsPrintLimit.Value)))
{
s.AddSource(source.Value);
s.AddSymbol(symbol);
Expand Down
66 changes: 0 additions & 66 deletions samples/dxf_inc_order_snapshot_sample/SnapshotListener.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SnapshotListener.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\dxf_api\dxf_api.csproj">
Expand Down
Loading

0 comments on commit e86421d

Please sign in to comment.