Skip to content

Commit

Permalink
-Add BINARY mode. WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzo-gomez-windhover committed Jan 18, 2024
1 parent 2cc1ae2 commit 724c36a
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.windhoverlabs</groupId>
<artifactId>yamcs-gdl90</artifactId>
<version>1.0.0</version>
<version>1.1.0-SNAPSHOT</version>

<packaging>jar</packaging>
<name>YAMCS plugin for GDL90 standard.</name>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/windhoverlabs/yamcs/gdl90/DataSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.windhoverlabs.yamcs.gdl90;

public enum DataSource {
BINARY,
PV
}
227 changes: 184 additions & 43 deletions src/main/java/com/windhoverlabs/yamcs/gdl90/GDL90Link.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

package com.windhoverlabs.yamcs.gdl90;

import static org.yamcs.StandardTupleDefinitions.GENTIME_COLUMN;
import static org.yamcs.StandardTupleDefinitions.TM_RECTIME_COLUMN;

import com.google.gson.Gson;
import java.io.IOException;
import java.net.DatagramPacket;
Expand All @@ -46,9 +49,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -71,11 +76,13 @@
import org.yamcs.protobuf.Yamcs.NamedObjectId;
import org.yamcs.tctm.AbstractLink;
import org.yamcs.tctm.PacketInputStream;
import org.yamcs.utils.ByteArrayUtils;
import org.yamcs.xtce.Parameter;
import org.yamcs.yarch.ColumnDefinition;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.FileSystemBucket;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;
Expand All @@ -85,7 +92,8 @@ public class GDL90Link extends AbstractLink
implements Runnable,
SystemParametersProducer,
ParameterSubscription.Listener,
ConnectionListener {
ConnectionListener,
StreamSubscriber {

class GDL90Device {
String host;
Expand Down Expand Up @@ -190,6 +198,9 @@ public String toString() {
private String ForeFlightIDStreamName;
private Stream ForeFlightIDStream;

private String _1HZ_MsgsStreamName;
private Stream _1HZ_MsgsStream;

static final String RECTIME_CNAME = "rectime";
static final String MSG_NAME_CNAME = "MSG_NAME_CNAME";
static final String DATA_CNAME = "data";
Expand All @@ -198,6 +209,11 @@ public String toString() {

public int keepAliveConfig = 30; // Seconds

private DataSource source;

Set<Integer> msgIds_1HZ = new HashSet<>();
Set<Integer> msgIds_5HZ = new HashSet<>();

ConcurrentHashMap<String, GDL90Device> gdl90Devices =
new ConcurrentHashMap<String, GDL90Device>();

Expand Down Expand Up @@ -247,6 +263,69 @@ public void init(String yamcsInstance, String serviceName, YConfiguration config
e.printStackTrace();
}

String sourceString = this.getConfig().getString("DataSource", DataSource.BINARY.toString());

source = DataSource.valueOf(sourceString);

switch (source) {
case BINARY:
initBINARYMode();
break;
case PV:
initPVMode();
break;
default:
break;
}

scheduler.scheduleAtFixedRate(
() -> {
if (isRunningAndEnabled()) {
for (GDL90Device d : gdl90Devices.values()) {
Instant now = Instant.now();

Instant end = d.lastBroadcastTime;
Duration timeElapsed = Duration.between(end, now);

if (timeElapsed.toMillis() / 1000 > d.keepAliveSeconds) {
gdl90Devices.remove(d.host);
}
}
}
},
1,
10,
TimeUnit.SECONDS);

initStreams();
}

private void initPVMode() {
initGDL90Timers();
yamcsHost = this.getConfig().getString("yamcsHost", "http://localhost");
yamcsPort = this.getConfig().getInt("yamcsPort", 8090);

pvMap = new ConcurrentHashMap<String, String>(this.config.getMap("pvMap"));

// TODO: This is unnecessarily complicated
yclient =
YamcsClient.newBuilder(yamcsHost + ":" + yamcsPort)
// .withConnectionAttempts(config.getInt("connectionAttempts", 20))
// .withRetryDelay(reconnectionDelay)
// .withVerifyTls(config.getBoolean("verifyTls", true))
.build();
yclient.addConnectionListener(this);

try {
yclient.connectWebSocket();
} catch (ClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/** Method only relevant when in PV mode */
private void initGDL90Timers() {
scheduler.scheduleAtFixedRate(
() -> {
if (isRunningAndEnabled()) {
Expand Down Expand Up @@ -280,32 +359,6 @@ public void init(String yamcsInstance, String serviceName, YConfiguration config
100,
200,
TimeUnit.MILLISECONDS);

scheduler.scheduleAtFixedRate(
() -> {
if (isRunningAndEnabled()) {
for (GDL90Device d : gdl90Devices.values()) {
Instant now = Instant.now();

Instant end = d.lastBroadcastTime;
Duration timeElapsed = Duration.between(end, now);

if (timeElapsed.toMillis() / 1000 > d.keepAliveSeconds) {
gdl90Devices.remove(d.host);
}
}
}
},
1,
10,
TimeUnit.SECONDS);

yamcsHost = this.getConfig().getString("yamcsHost", "http://localhost");
yamcsPort = this.getConfig().getInt("yamcsPort", 8090);

pvMap = new ConcurrentHashMap<String, String>(this.config.getMap("pvMap"));

initStreams();
}

private void initStreams() {
Expand Down Expand Up @@ -342,6 +395,20 @@ private void initStreams() {
}
}

private void initBINARYMode() {
YarchDatabaseInstance ydb = YarchDatabase.getInstance(this.yamcsInstance);
_1HZ_MsgsStreamName = this.getConfig().getString("_1HZ_MsgsStreamName", "tm_realtime");

if (_1HZ_MsgsStreamName != null) {
this._1HZ_MsgsStream = getMsgStream(ydb, _1HZ_MsgsStreamName);
_1HZ_MsgsStream.addSubscriber(this);
}

for (Object mid : this.getConfig().getList("1HZ_Messages")) {
msgIds_1HZ.add((Integer) mid);
}
}

private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
Stream stream = ydb.getStream(streamName);
if (stream == null) {
Expand All @@ -359,6 +426,21 @@ private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
return stream;
}

/**
* Our Message Streams MUST exist when in Binary mode
*
* @param ydb
* @param streamName
* @return
*/
private static Stream getMsgStream(YarchDatabaseInstance ydb, String streamName) {
Stream stream = ydb.getStream(streamName);
if (stream == null) {
throw new ConfigurationException("Stream " + streamName + " doesn't exist");
}
return stream;
}

@Override
public void doDisable() {
/* If the thread is created, interrupt it. */
Expand Down Expand Up @@ -400,22 +482,6 @@ protected void doStart() {
if (!isDisabled()) {
doEnable();
}

// TODO: This is unnecessarily complicated
yclient =
YamcsClient.newBuilder(yamcsHost + ":" + yamcsPort)
// .withConnectionAttempts(config.getInt("connectionAttempts", 20))
// .withRetryDelay(reconnectionDelay)
// .withVerifyTls(config.getBoolean("verifyTls", true))
.build();
yclient.addConnectionListener(this);

try {
yclient.connectWebSocket();
} catch (ClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
notifyStarted();
}

Expand Down Expand Up @@ -1024,4 +1090,79 @@ public void resetCounters() {
foreFlightIDCount = 0;
AHRSCount = 0;
}

@Override
public void onTuple(Stream stream, Tuple t) {
// TODO Auto-generated method stub

byte[] packet = (byte[]) t.getColumn("packet");

int msgId = ByteArrayUtils.decodeUnsignedShort(packet, 0);

if (msgIds_1HZ.contains(msgId)) {
long rectime = (Long) t.getColumn(TM_RECTIME_COLUMN);
long gentime = (Long) t.getColumn(GENTIME_COLUMN);

try {
processPacket(rectime, gentime, packet);
} catch (Exception e) {
log.warn("Failed to process event packet", e);
}
}
}

private void processPacket(long rectime, long gentime, byte[] packet) {
// ByteBuffer buf = ByteBuffer.wrap(packet);
// // Skip CCSDS Header
// buf.position(12);
//
byte[] GDL90Payload = Arrays.copyOfRange(packet, 12, packet.length);
//
// System.out.println(
// "Binary payload:" + org.yamcs.utils.StringConverter.arrayToHexString(GDL90Payload,
// true));
//
// System.out.println(
// "Binary payload:" + org.yamcs.utils.StringConverter.arrayToHexString(buf.array(),
// true));

for (GDL90Device d : gdl90Devices.values()) {
if (d.alive) {
d.datagram.setData(GDL90Payload);
try {
GDL90Socket.send(d.datagram);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
heartBeatCount++;
if (this.heartbeatStream != null) {
this.heartbeatStream.emitTuple(
new Tuple(
gftdef, Arrays.asList(timeService.getMissionTime(), "Heartbeat", GDL90Payload)));
}
}
}
// String msg = decodeString(buf, eventMsgMax);
//
// EventSeverity evSev;
//
// switch (eventType) {
// case 3:
// evSev = EventSeverity.ERROR;
// break;
// case 4:
// evSev = EventSeverity.CRITICAL;
// break;
// default:
// evSev = EventSeverity.INFO;
// }
//
// Event ev = Event.newBuilder().setGenerationTime(gentime).setReceptionTime(rectime)
// .setSeqNumber(0).setSource("/CFS/CPU" + processorId + "/" +
// app).setSeverity(evSev)
// .setType("EVID" + eventId).setMessage(msg).build();
//
// eventProducer.sendEvent(ev);
}
}

0 comments on commit 724c36a

Please sign in to comment.