From c415b8f48c8b7f4dc558140d0c42a16e13bd7652 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Thu, 27 Jun 2024 17:13:56 -0600 Subject: [PATCH 1/9] Adding Endpoint for Receiving All message Types --- docker-compose.yml | 1 + .../us/dot/its/jpo/ode/OdeProperties.java | 20 +++ .../stream/LogFileToAsn1CodecPublisher.java | 6 + .../ode/udp/AbstractUdpReceiverPublisher.java | 2 +- .../dot/its/jpo/ode/udp/bsm/BsmReceiver.java | 77 +++++++----- .../udp/controller/UdpServicesController.java | 4 + .../jpo/ode/udp/generic/GenericReceiver.java | 118 ++++++++++++++++++ .../dot/its/jpo/ode/udp/map/MapReceiver.java | 58 +++++---- .../dot/its/jpo/ode/udp/psm/PsmReceiver.java | 55 ++++---- .../its/jpo/ode/udp/spat/SpatReceiver.java | 54 ++++---- .../dot/its/jpo/ode/udp/srm/SrmReceiver.java | 52 ++++---- .../dot/its/jpo/ode/udp/ssm/SsmReceiver.java | 57 +++++---- .../dot/its/jpo/ode/udp/tim/TimReceiver.java | 56 +++++---- .../us/dot/its/jpo/ode/uper/UperUtil.java | 68 ++++++---- scripts/tests/udpsender_generic.py | 33 +++++ 15 files changed, 462 insertions(+), 199 deletions(-) create mode 100644 jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java create mode 100644 scripts/tests/udpsender_generic.py diff --git a/docker-compose.yml b/docker-compose.yml index f61839bff..59c6defb3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,7 @@ services: - "44920:44920/udp" - "44930:44930/udp" - "44940:44940/udp" + - "44990:44990/udp" - "5555:5555/udp" - "6666:6666/udp" environment: diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java index 7b06ec783..e8988453b 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java @@ -179,6 +179,10 @@ public class OdeProperties implements EnvironmentAware { private String kafkaTopicOdePsmJson = "topic.OdePsmJson"; private int psmReceiverPort = 44940; private int psmBufferSize = 500; + + // Generic Receiver + private int genericReceiverPort = 44990; + private int genericBufferSize = 2000; // DriverAlerts private String kafkaTopicDriverAlertJson = "topic.OdeDriverAlertJson"; @@ -492,6 +496,22 @@ public void setPsmBufferSize(int psmBufferSize) { this.psmBufferSize = psmBufferSize; } + public int getGenericReceiverPort() { + return genericReceiverPort; + } + + public void setGenericReceiverPort(int genericReceiverPort) { + this.genericReceiverPort = genericReceiverPort; + } + + public int getGenericBufferSize() { + return genericBufferSize; + } + + public void setGenericBufferSize(int psmBufferSize) { + this.genericBufferSize = genericBufferSize; + } + public String getDdsCasUrl() { return ddsCasUrl; } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java index 17b9e42c4..0454d8ecc 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java @@ -167,9 +167,15 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP if (messageType == "MAP") { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } else if(messageType == "SPAT"){ + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } else if (messageType == "TIM") { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + } else if (messageType == "BSM") { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeBsmJson()); } else if (messageType == "SSM") { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java index 96e373ce6..9ae73b255 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java @@ -57,7 +57,7 @@ public AbstractUdpReceiverPublisher(OdeProperties odeProps, int port, int buffer } } - public OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { + public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { String startFlag = UperUtil.getStartFlag(msgType); // extract the actual packet from the buffer byte[] payload = packet.getData(); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index b8b252f48..7dcd35e22 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -58,45 +58,54 @@ public void run() { logger.debug("Waiting for UDP BSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload bsmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); - if (bsmPayload == null) - continue; - OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); - - // Set BSM Metadata values that can be assumed from the UDP endpoint - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - bsmMetadata.setOdeReceivedAt(timestamp); - - ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); - OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( - "unavailable", - "unavailable", - "unavailable", - "unavailable", - "unavailable"); - receivedMessageDetails.setRxSource(RxSource.RSU); - receivedMessageDetails.setLocationData(locationData); - bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); - - bsmMetadata.setOriginIp(senderIp); - bsmMetadata.setBsmSource(BsmSource.EV); - bsmMetadata.setRecordType(RecordType.bsmTx); - bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - bsmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - bsmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false), - bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + String bsmJson = buildJsonBsmFromPacket(packet); + + if(bsmJson != null){ + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + bsmPublisher.publish(bsmJson, bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + } + + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + public static String buildJsonBsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + OdeAsn1Payload bsmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); + if (bsmPayload == null) + return null; + OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); + + // Set BSM Metadata values that can be assumed from the UDP endpoint + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + bsmMetadata.setOdeReceivedAt(timestamp); + + ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); + OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( + "unavailable", + "unavailable", + "unavailable", + "unavailable", + "unavailable"); + receivedMessageDetails.setRxSource(RxSource.RSU); + receivedMessageDetails.setLocationData(locationData); + bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); + + bsmMetadata.setOriginIp(senderIp); + bsmMetadata.setBsmSource(BsmSource.EV); + bsmMetadata.setRecordType(RecordType.bsmTx); + bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + bsmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false); + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java index f16c657bd..c27352a9b 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java @@ -7,6 +7,7 @@ import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.bsm.BsmReceiver; +import us.dot.its.jpo.ode.udp.generic.GenericReceiver; import us.dot.its.jpo.ode.udp.tim.TimReceiver; import us.dot.its.jpo.ode.udp.ssm.SsmReceiver; import us.dot.its.jpo.ode.udp.srm.SrmReceiver; @@ -53,6 +54,9 @@ public UdpServicesController(OdeProperties odeProps) { // PSM internal port rm.submit(new PsmReceiver(odeProps)); + // Generic Receiver internal port + rm.submit(new GenericReceiver(odeProps)); + logger.debug("UDP receiver services started."); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java new file mode 100644 index 000000000..6c9b8297a --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java @@ -0,0 +1,118 @@ +package us.dot.its.jpo.ode.udp.generic; + +import java.net.DatagramPacket; +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.coder.StringPublisher; +import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; +import us.dot.its.jpo.ode.udp.bsm.BsmReceiver; +import us.dot.its.jpo.ode.udp.map.MapReceiver; +import us.dot.its.jpo.ode.udp.psm.PsmReceiver; +import us.dot.its.jpo.ode.udp.spat.SpatReceiver; +import us.dot.its.jpo.ode.udp.srm.SrmReceiver; +import us.dot.its.jpo.ode.udp.ssm.SsmReceiver; +import us.dot.its.jpo.ode.udp.tim.TimReceiver; +import us.dot.its.jpo.ode.uper.UperUtil; + +public class GenericReceiver extends AbstractUdpReceiverPublisher { + + private static Logger logger = LoggerFactory.getLogger(GenericReceiver.class); + + private StringPublisher publisher; + + + + @Autowired + public GenericReceiver(OdeProperties odeProps) { + this(odeProps, odeProps.getGenericReceiverPort(), odeProps.getGenericBufferSize()); + + this.publisher = new StringPublisher(odeProps); + } + + public GenericReceiver(OdeProperties odeProps, int port, int bufferSize) { + super(odeProps, port, bufferSize); + + this.publisher = new StringPublisher(odeProps); + + } + + @Override + public void run() { + + logger.debug("Generic UDP Receiver Service started."); + + byte[] buffer = new byte[bufferSize]; + + + + do { + + // packet should be recreated on each loop to prevent latent data in buffer + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + logger.debug("Waiting for Generic UDP packets..."); + socket.receive(packet); + if (packet.getLength() > 0) { + senderIp = packet.getAddress().getHostAddress(); + senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + byte[] payload = packet.getData(); + if (payload == null){ + logger.debug("Skipping Null Payload"); + continue; + } + String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); + String messageType = UperUtil.determineHexPacketType(payloadHexString); + + logger.debug("Detected Message Type {}", messageType); + + if (messageType == "MAP") { + String mapJson = MapReceiver.buildJsonMapFromPacket(packet); + if(mapJson != null){ + publisher.publish(mapJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } + } else if(messageType == "SPAT") { + String spatJson = SpatReceiver.buildJsonSpatFromPacket(packet); + if(spatJson != null){ + publisher.publish(spatJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + } + } else if (messageType == "TIM") { + String timJson = TimReceiver.buildJsonTimFromPacket(packet); + if(timJson != null){ + publisher.publish(timJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + } + } else if (messageType == "BSM") { + String bsmJson = BsmReceiver.buildJsonBsmFromPacket(packet); + if(bsmJson!=null){ + publisher.publish(bsmJson, this.odeProperties.getKafkaTopicOdeRawEncodedBSMJson()); + } + } else if (messageType == "SSM") { + String ssmJson = SsmReceiver.buildJsonSsmFromPacket(packet); + if(ssmJson!=null){ + publisher.publish(ssmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSSMJson()); + } + } else if (messageType == "SRM") { + String srmJson = SrmReceiver.buildJsonSrmFromPacket(packet); + if(srmJson!=null){ + publisher.publish(srmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSRMJson()); + } + } else if (messageType == "PSM") { + String psmJson = PsmReceiver.buildJsonPsmFromPacket(packet); + if(psmJson!=null){ + publisher.publish(psmJson, this.odeProperties.getKafkaTopicOdeRawEncodedPSMJson()); + } + }else{ + logger.debug("Unknown Message Type"); + } + } + } catch (Exception e) { + logger.error("Error receiving packet", e); + } + } while (!isStopped()); + } +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java index d77b15cf2..05f73f98c 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java @@ -54,34 +54,44 @@ public void run() { logger.debug("Waiting for UDP Map packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload mapPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); - if (mapPayload == null) - continue; - OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - mapMetadata.setOdeReceivedAt(timestamp); - - mapMetadata.setOriginIp(senderIp); - mapMetadata.setMapSource(MapSource.RSU); - mapMetadata.setRecordType(RecordType.mapTx); - mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - mapMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - mapPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false), - mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + + String mapJson = buildJsonMapFromPacket(packet); + if(mapJson != null){ + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + mapPublisher.publish(mapJson, mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + + public static String buildJsonMapFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload mapPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); + if (mapPayload == null) + return null; + OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + mapMetadata.setOdeReceivedAt(timestamp); + + mapMetadata.setOriginIp(senderIp); + mapMetadata.setMapSource(MapSource.RSU); + mapMetadata.setRecordType(RecordType.mapTx); + mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + mapMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false); + + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java index 7bd56c95c..9acd23776 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java @@ -54,34 +54,41 @@ public void run() { logger.debug("Waiting for UDP PSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload psmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); - if (psmPayload == null) - continue; - OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - psmMetadata.setOdeReceivedAt(timestamp); - - psmMetadata.setOriginIp(senderIp); - psmMetadata.setPsmSource(PsmSource.RSU); - psmMetadata.setRecordType(RecordType.psmTx); - psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); - psmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - psmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false), - psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); + String psmJson = buildJsonPsmFromPacket(packet); + if(psmJson != null){ + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + psmPublisher.publish(psmJson, psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + public static String buildJsonPsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload psmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); + if (psmPayload == null) + return null; + OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + psmMetadata.setOdeReceivedAt(timestamp); + + psmMetadata.setOriginIp(senderIp); + psmMetadata.setPsmSource(PsmSource.RSU); + psmMetadata.setRecordType(RecordType.psmTx); + psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); + psmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false); + + + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java index 5bb7bd8ef..9ecb993d3 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java @@ -54,34 +54,42 @@ public void run() { logger.debug("Waiting for UDP SPaT packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload spatPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); - if (spatPayload == null) - continue; - OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - spatMetadata.setOdeReceivedAt(timestamp); - - spatMetadata.setOriginIp(senderIp); - spatMetadata.setSpatSource(SpatSource.RSU); - spatMetadata.setRecordType(RecordType.spatTx); - spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - spatMetadata.setSecurityResultCode(SecurityResultCode.success); - + + String spatJson = buildJsonSpatFromPacket(packet); // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - spatPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false), - spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + spatPublisher.publish(spatJson,spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + + public static String buildJsonSpatFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload spatPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); + if (spatPayload == null) + return null; + OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + spatMetadata.setOdeReceivedAt(timestamp); + + spatMetadata.setOriginIp(senderIp); + spatMetadata.setSpatSource(SpatSource.RSU); + spatMetadata.setRecordType(RecordType.spatTx); + spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + spatMetadata.setSecurityResultCode(SecurityResultCode.success); + + + return JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false); + + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java index b9d9121bd..edbbf9da5 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java @@ -54,34 +54,40 @@ public void run() { logger.debug("Waiting for UDP SRM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload srmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); - if (srmPayload == null) - continue; - OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - srmMetadata.setOdeReceivedAt(timestamp); - - srmMetadata.setOriginIp(senderIp); - srmMetadata.setSrmSource(SrmSource.RSU); - srmMetadata.setRecordType(RecordType.srmTx); - srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - srmMetadata.setSecurityResultCode(SecurityResultCode.success); - + + String srmJson = buildJsonSrmFromPacket(packet); // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - srmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false), - srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); + srmPublisher.publish(srmJson, srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + public static String buildJsonSrmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload srmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); + if (srmPayload == null) + return null; + OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + srmMetadata.setOdeReceivedAt(timestamp); + + srmMetadata.setOriginIp(senderIp); + srmMetadata.setSrmSource(SrmSource.RSU); + srmMetadata.setRecordType(RecordType.srmTx); + srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + srmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false); + + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java index 27108eec7..c12aeae4a 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java @@ -54,34 +54,43 @@ public void run() { logger.debug("Waiting for UDP SSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload ssmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); - if (ssmPayload == null) - continue; - OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - ssmMetadata.setOdeReceivedAt(timestamp); - - ssmMetadata.setOriginIp(senderIp); - ssmMetadata.setSsmSource(SsmSource.RSU); - ssmMetadata.setRecordType(RecordType.ssmTx); - ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - ssmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - ssmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false), - ssmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); + + String ssmJson = buildJsonSsmFromPacket(packet); + + if(ssmJson!=null){ + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + ssmPublisher.publish(ssmJson, ssmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + public static String buildJsonSsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload ssmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); + if (ssmPayload == null) + return null; + OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + ssmMetadata.setOdeReceivedAt(timestamp); + + ssmMetadata.setOriginIp(senderIp); + ssmMetadata.setSsmSource(SsmSource.RSU); + ssmMetadata.setRecordType(RecordType.ssmTx); + ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + ssmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false); + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java index 5524463fe..3fb8d1cc2 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java @@ -52,33 +52,43 @@ public void run() { logger.debug("Waiting for UDP TIM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload timPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); - if (timPayload == null) - continue; - OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - timMetadata.setOdeReceivedAt(timestamp); - - timMetadata.setOriginIp(senderIp); - timMetadata.setRecordType(RecordType.timMsg); - timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - timMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - timPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false), - timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + + String timJson = buildJsonTimFromPacket(packet); + if(timJson != null){ + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + timPublisher.publish(timJson, timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + public static String buildJsonTimFromPacket(DatagramPacket packet){ + + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload timPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); + if (timPayload == null) + return null; + OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + timMetadata.setOdeReceivedAt(timestamp); + + timMetadata.setOriginIp(senderIp); + timMetadata.setRecordType(RecordType.timMsg); + timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + timMetadata.setSecurityResultCode(SecurityResultCode.success); + return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false); + + + } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java index 06d0eb360..486c7b1eb 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java @@ -88,41 +88,63 @@ public static String stripDot3Header(String hexString, String payload_start_flag return headers.substring(signedDot2StartIndex, headers.length()) + payload; } - /** - * Determines the message type based off the most likely start flag - * - * @param payload The OdeMsgPayload to check the content of. - */ + /** + * Determines the message type based off the most likely start flag + * + * @param payload The OdeMsgPayload to check the content of. + */ public static String determineMessageType(OdeMsgPayload payload) { String messageType = ""; try { JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson()); String hexString = payloadJson.getString("bytes").toLowerCase(); + messageType = determineHexPacketType(hexString); - HashMap flagIndexes = new HashMap(); - flagIndexes.put("MAP", hexString.indexOf(MAP_START_FLAG)); - flagIndexes.put("TIM", hexString.indexOf(TIM_START_FLAG)); - flagIndexes.put("SSM", hexString.indexOf(SSM_START_FLAG)); - flagIndexes.put("PSM", hexString.indexOf(PSM_START_FLAG)); - flagIndexes.put("SRM", hexString.indexOf(SRM_START_FLAG)); - - int lowestIndex = Integer.MAX_VALUE; - for (String key : flagIndexes.keySet()) { - if (flagIndexes.get(key) == -1) { - logger.debug("This message is not of type " + key); - continue; - } - if (flagIndexes.get(key) < lowestIndex) { - messageType = key; - lowestIndex = flagIndexes.get(key); - } - } } catch (JsonUtilsException e) { logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString()); } return messageType; } + public static String determineHexPacketType(String hexString){ + + String messageType = ""; + HashMap flagIndexes = new HashMap(); + + flagIndexes.put("MAP", findValidStartFlagLocation(hexString, MAP_START_FLAG)); + flagIndexes.put("SPAT", findValidStartFlagLocation(hexString, SPAT_START_FLAG)); + flagIndexes.put("TIM", findValidStartFlagLocation(hexString, TIM_START_FLAG)); + flagIndexes.put("BSM", findValidStartFlagLocation(hexString, BSM_START_FLAG)); + flagIndexes.put("SSM", findValidStartFlagLocation(hexString, SSM_START_FLAG)); + flagIndexes.put("PSM", findValidStartFlagLocation(hexString, PSM_START_FLAG)); + flagIndexes.put("SRM", findValidStartFlagLocation(hexString, SRM_START_FLAG)); + + int lowestIndex = Integer.MAX_VALUE; + for (String key : flagIndexes.keySet()) { + if (flagIndexes.get(key) == -1) { + logger.debug("This message is not of type " + key); + continue; + } + if (flagIndexes.get(key) < lowestIndex) { + messageType = key; + lowestIndex = flagIndexes.get(key); + } + } + return messageType; + } + + public static int findValidStartFlagLocation(String hexString, String startFlag){ + int index = hexString.indexOf(startFlag); + + // Make sure start flag is on an even numbered byte + while(index != -1 && index %2 != 0){ + index = hexString.indexOf(startFlag, index+1); + } + return index; + } + + + // Get methods for message start flags public static String getBsmStartFlag() { return BSM_START_FLAG; diff --git a/scripts/tests/udpsender_generic.py b/scripts/tests/udpsender_generic.py new file mode 100644 index 000000000..36d02aaa4 --- /dev/null +++ b/scripts/tests/udpsender_generic.py @@ -0,0 +1,33 @@ +import socket +import time +import os + +BSM_MESSAGE = "0022e12d18466c65c1493800000e00e4616183e85a8f0100c000038081bc001480b8494c4c950cd8cde6e9651116579f22a424dd78fffff00761e4fd7eb7d07f7fff80005f11d1020214c1c0ffc7c016aff4017a0ff65403b0fd204c20ffccc04f8fe40c420ffe6404cefe60e9a10133408fcfde1438103ab4138f00e1eec1048ec160103e237410445c171104e26bc103dc4154305c2c84103b1c1c8f0a82f42103f34262d1123198103dac25fb12034ce10381c259f12038ca103574251b10e3b2210324c23ad0f23d8efffe0000209340d10000004264bf00" +MAP_MESSAGE = "001283c138003000205e9c014d3eab092ca624b5518202dc3658042800000400023622c60ca009f66d48abfaf81388d8ad18070027d9b2ffcfe9804f13667b1ffd009ec2c76e3ffc82c4e0001004b00c5000000800066c4574101813ecd8b757fae027d9b30e6ff5604ec363561fe7809ec6cd69bfec813c4d8a617fc9027d9b2147008604fb163666000016250000802580228000001000096229e1309b51a6fe4204dd361cf1fe5009f6018e1000096020a00000080004d88a57f84027d9b3827002804ec36087600a009f62c289407282c310001c0440188800000006c46dbe02813ec5816d800710052200000001b11b6fad404fb16054a0000401c8800000006c47b3d24813ec5816d801b100c4200000000af890f12c580007e87100d4200000008af4c0f12c580077e7a2c0004000160002001cb028d000000800052c160bc40b5fffd8a9409d86bfebb5b40141457fef53b76c008b467014145800080002bffcbffc82c6a0001804b024d000000800036c2213c3b013ecd80096d64027d9affd8cdfc04f635ff7983bc09f66c0082aa2014280b1b80006012c0b3400000100004b02bcf0f6d7fe065d602788b0138eb900b1240001012c083400000080009b0c2af0b804fb15fe6de171afff6c63e04ec15fe1de670060e40002581ea8000004000135da6df0180a0a6adc2c00d0143cd51897fda028c8abb25001a0b0680008012c105400000200009aedbefae005053540ee003c0a326a9cf3fed8143c5667780010582c0004009608aa00000080004d76de7ee402829aba88ffdc050f354525fff80a322bcf23fa602c690000c04b0395000000200016bb4fbd4e01414d3215800802940ab108fff2030d2000110126200000001aee5103be050a15f6f1ffc8404d8800000006bb97c18e0142857dfa800010146200000001aee89099a050a15f8720000b05dd000000800046be3743b781428d80e1b00002879b00514b4404f63600827d8c09e22c000400015ffe6007016190000402582ce8000004000135ecee1de80a146c02e54758143cd8059ad3e027b1b00613dd004f102c360000804b055d000000200046bcc7c3c781428d80108c6e02829b002b2ece050a16019a4b29b00ab5c3604f136004e410409ec018a10000960c3a00000080004d7de9878602851b003923cc05053601623b440a0a6bfb8c3a5014140b0640005012c197400000100005afe570ef2050a36003a47c80a0a6bfd2c45f014140b054000501101a8200000001b05a90edc050535ffe605800a0a101b8200000001b08a30ec0050535ffe605300a0a101c8200000005b0c6f0ea4050515ffca0568b0001000e" +SPAT_MESSAGE = "001338000817a780000089680500204642b342b34802021a15a955a940181190acd0acd20100868555c555c00104342aae2aae002821a155715570" +PSM_MESSAGE = "011d0000201a0000021bd86891de75f84da101c13f042e2214141fff00022c2000270000000163b2cc7986010000" +SSM_MESSAGE = "001e120000000005e9c04071a26614c06000040ba0" +SRM_MESSAGE = "001d2130000010090bd341080d00855c6c0c6899853000a534f7c24cb29897694759b7c0" +TIM_MESSAGE = "005f498718cca69ec1a04600000100105d9b46ec5be401003a0103810040038081d4001f80d07016da410000000000000bbc2b0f775d9b0309c271431fa166ee0a27fff93f136b8205a0a107fb2ef979f4c5bfaeec97e4ad70c2fb36cd9730becdb355cc2fd2a7556b160b98b46ab98ae62c185fa55efb468d5b4000000004e2863f42cddc144ff7980040401262cdd7b809c509f5c62cdd35519c507b9062cdcee129c505cf262cdca5ff9c50432c62cdc5d3d9c502e3e62cdc13e79c501e9262cdbca2d9c5013ee62cdb80359c500e6a62cdb36299c500bc862cdaec1d9c50093c62cdaa2109c5006ea1080203091a859eeebb36006001830001aad27f4ff7580001aad355e39b5880a30029d6585009ef808332d8d9f80c3855151b38c772f765007967ec1170bcb7937f5cb880a25a52863493bcb87570dbcb5abc6bfb2faec606cfa34eb95a24790b2017366d3aabe7729e" +BAD_MESSAGE = "0000badc0de0000000000000000000000000000000" + +# Currently set to oim-dev environment's ODE +UDP_IP = os.getenv('DOCKER_HOST_IP') +UDP_PORT = 44990 +print("UDP target IP:", UDP_IP) +print("UDP target port:", UDP_PORT) +#print("message:", MESSAGE) + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + +while True: + time.sleep(5) + print("sending Messages every 5 second") + sock.sendto(bytes.fromhex(BSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(MAP_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SPAT_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(PSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SRM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(TIM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(JUNK_MESSAGE), (UDP_IP, UDP_PORT)) From cba44e2c1fe83dca732cb1a82da5eed0fc61b838 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 28 Jun 2024 08:56:42 -0600 Subject: [PATCH 2/9] Refactored Static methods into a single static class --- .../ode/udp/AbstractUdpReceiverPublisher.java | 24 +- .../us/dot/its/jpo/ode/udp/UdpHexDecoder.java | 245 ++++++++++++++++++ .../dot/its/jpo/ode/udp/bsm/BsmReceiver.java | 38 +-- .../jpo/ode/udp/generic/GenericReceiver.java | 22 +- .../dot/its/jpo/ode/udp/map/MapReceiver.java | 42 +-- .../dot/its/jpo/ode/udp/psm/PsmReceiver.java | 42 +-- .../its/jpo/ode/udp/spat/SpatReceiver.java | 43 +-- .../dot/its/jpo/ode/udp/srm/SrmReceiver.java | 42 +-- .../dot/its/jpo/ode/udp/ssm/SsmReceiver.java | 41 +-- .../dot/its/jpo/ode/udp/tim/TimReceiver.java | 41 +-- 10 files changed, 275 insertions(+), 305 deletions(-) create mode 100644 jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java index 9ae73b255..f4ed3b121 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java @@ -1,17 +1,13 @@ package us.dot.its.jpo.ode.udp; import java.net.DatagramSocket; -import java.net.DatagramPacket; import java.net.SocketException; -import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.OdeProperties; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.uper.UperUtil; public abstract class AbstractUdpReceiverPublisher implements Runnable { @@ -57,24 +53,6 @@ public AbstractUdpReceiverPublisher(OdeProperties odeProps, int port, int buffer } } - public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { - String startFlag = UperUtil.getStartFlag(msgType); - // extract the actual packet from the buffer - byte[] payload = packet.getData(); - if (payload == null) - return null; - // convert bytes to hex string and verify identity - String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); - if (payloadHexString.indexOf(startFlag) == -1) - return null; - - logger.debug("Full {} packet: {}", msgType, payloadHexString); - payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag); - logger.debug("Stripped {} packet: {}", msgType, payloadHexString); - - OdeAsn1Payload timPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); - - return timPayload; - } + } \ No newline at end of file diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java new file mode 100644 index 000000000..1535f6745 --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java @@ -0,0 +1,245 @@ +package us.dot.its.jpo.ode.udp; + +import java.net.DatagramPacket; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import us.dot.its.jpo.ode.model.OdeAsn1Data; +import us.dot.its.jpo.ode.model.OdeAsn1Payload; +import us.dot.its.jpo.ode.model.OdeBsmMetadata; +import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; +import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; +import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; +import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; +import us.dot.its.jpo.ode.model.OdeMapMetadata; +import us.dot.its.jpo.ode.model.OdeMapMetadata.MapSource; +import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; +import us.dot.its.jpo.ode.model.OdePsmMetadata; +import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource; +import us.dot.its.jpo.ode.model.OdeSpatMetadata; +import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource; +import us.dot.its.jpo.ode.model.OdeSrmMetadata; +import us.dot.its.jpo.ode.model.OdeSrmMetadata.SrmSource; +import us.dot.its.jpo.ode.model.OdeSsmMetadata; +import us.dot.its.jpo.ode.model.OdeSsmMetadata.SsmSource; +import us.dot.its.jpo.ode.model.OdeTimMetadata; +import us.dot.its.jpo.ode.model.ReceivedMessageDetails; +import us.dot.its.jpo.ode.model.RxSource; +import us.dot.its.jpo.ode.uper.UperUtil; +import us.dot.its.jpo.ode.util.JsonUtils; + +public class UdpHexDecoder { + + private static Logger logger = LoggerFactory.getLogger(AbstractUdpReceiverPublisher.class); + + public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { + String startFlag = UperUtil.getStartFlag(msgType); + // extract the actual packet from the buffer + byte[] payload = packet.getData(); + if (payload == null) + return null; + // convert bytes to hex string and verify identity + String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); + if (payloadHexString.indexOf(startFlag) == -1) + return null; + + logger.debug("Full {} packet: {}", msgType, payloadHexString); + payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag); + logger.debug("Stripped {} packet: {}", msgType, payloadHexString); + + OdeAsn1Payload timPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); + + return timPayload; + } + + public static String buildJsonMapFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload mapPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); + if (mapPayload == null) + return null; + OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + mapMetadata.setOdeReceivedAt(timestamp); + + mapMetadata.setOriginIp(senderIp); + mapMetadata.setMapSource(MapSource.RSU); + mapMetadata.setRecordType(RecordType.mapTx); + mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + mapMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false); + } + + public static String buildJsonSpatFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload spatPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); + if (spatPayload == null) + return null; + OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + spatMetadata.setOdeReceivedAt(timestamp); + + spatMetadata.setOriginIp(senderIp); + spatMetadata.setSpatSource(SpatSource.RSU); + spatMetadata.setRecordType(RecordType.spatTx); + spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + spatMetadata.setSecurityResultCode(SecurityResultCode.success); + + + return JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false); + } + + public static String buildJsonTimFromPacket(DatagramPacket packet){ + + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload timPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); + if (timPayload == null) + return null; + OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + timMetadata.setOdeReceivedAt(timestamp); + + timMetadata.setOriginIp(senderIp); + timMetadata.setRecordType(RecordType.timMsg); + timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + timMetadata.setSecurityResultCode(SecurityResultCode.success); + return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false); + } + + public static String buildJsonBsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + OdeAsn1Payload bsmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); + if (bsmPayload == null) + return null; + OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); + + // Set BSM Metadata values that can be assumed from the UDP endpoint + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + bsmMetadata.setOdeReceivedAt(timestamp); + + ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); + OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( + "unavailable", + "unavailable", + "unavailable", + "unavailable", + "unavailable"); + receivedMessageDetails.setRxSource(RxSource.RSU); + receivedMessageDetails.setLocationData(locationData); + bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); + + bsmMetadata.setOriginIp(senderIp); + bsmMetadata.setBsmSource(BsmSource.EV); + bsmMetadata.setRecordType(RecordType.bsmTx); + bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + bsmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false); + } + + public static String buildJsonSsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload ssmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); + if (ssmPayload == null) + return null; + OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + ssmMetadata.setOdeReceivedAt(timestamp); + + ssmMetadata.setOriginIp(senderIp); + ssmMetadata.setSsmSource(SsmSource.RSU); + ssmMetadata.setRecordType(RecordType.ssmTx); + ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + ssmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false); + } + + public static String buildJsonSrmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload srmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); + if (srmPayload == null) + return null; + OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + srmMetadata.setOdeReceivedAt(timestamp); + + srmMetadata.setOriginIp(senderIp); + srmMetadata.setSrmSource(SrmSource.RSU); + srmMetadata.setRecordType(RecordType.srmTx); + srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + srmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false); + } + + public static String buildJsonPsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload psmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); + if (psmPayload == null) + return null; + OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + psmMetadata.setOdeReceivedAt(timestamp); + + psmMetadata.setOriginIp(senderIp); + psmMetadata.setPsmSource(PsmSource.RSU); + psmMetadata.setRecordType(RecordType.psmTx); + psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); + psmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false); + } + + +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index 7dcd35e22..bdf4cbeeb 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -22,6 +22,7 @@ import us.dot.its.jpo.ode.model.ReceivedMessageDetails; import us.dot.its.jpo.ode.model.RxSource; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; import us.dot.its.jpo.ode.uper.UperUtil; import us.dot.its.jpo.ode.util.JsonUtils; @@ -59,7 +60,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { // Create OdeMsgPayload and OdeLogMetadata objects and populate them - String bsmJson = buildJsonBsmFromPacket(packet); + String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); if(bsmJson != null){ // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic @@ -74,38 +75,5 @@ public void run() { } while (!isStopped()); } - public static String buildJsonBsmFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - OdeAsn1Payload bsmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); - if (bsmPayload == null) - return null; - OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); - - // Set BSM Metadata values that can be assumed from the UDP endpoint - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - bsmMetadata.setOdeReceivedAt(timestamp); - - ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); - OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( - "unavailable", - "unavailable", - "unavailable", - "unavailable", - "unavailable"); - receivedMessageDetails.setRxSource(RxSource.RSU); - receivedMessageDetails.setLocationData(locationData); - bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); - - bsmMetadata.setOriginIp(senderIp); - bsmMetadata.setBsmSource(BsmSource.EV); - bsmMetadata.setRecordType(RecordType.bsmTx); - bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - bsmMetadata.setSecurityResultCode(SecurityResultCode.success); - - return JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false); - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java index 6c9b8297a..2746a7941 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java @@ -9,13 +9,7 @@ import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.coder.StringPublisher; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.udp.bsm.BsmReceiver; -import us.dot.its.jpo.ode.udp.map.MapReceiver; -import us.dot.its.jpo.ode.udp.psm.PsmReceiver; -import us.dot.its.jpo.ode.udp.spat.SpatReceiver; -import us.dot.its.jpo.ode.udp.srm.SrmReceiver; -import us.dot.its.jpo.ode.udp.ssm.SsmReceiver; -import us.dot.its.jpo.ode.udp.tim.TimReceiver; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; import us.dot.its.jpo.ode.uper.UperUtil; public class GenericReceiver extends AbstractUdpReceiverPublisher { @@ -72,37 +66,37 @@ public void run() { logger.debug("Detected Message Type {}", messageType); if (messageType == "MAP") { - String mapJson = MapReceiver.buildJsonMapFromPacket(packet); + String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); if(mapJson != null){ publisher.publish(mapJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); } } else if(messageType == "SPAT") { - String spatJson = SpatReceiver.buildJsonSpatFromPacket(packet); + String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); if(spatJson != null){ publisher.publish(spatJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } } else if (messageType == "TIM") { - String timJson = TimReceiver.buildJsonTimFromPacket(packet); + String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); if(timJson != null){ publisher.publish(timJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); } } else if (messageType == "BSM") { - String bsmJson = BsmReceiver.buildJsonBsmFromPacket(packet); + String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); if(bsmJson!=null){ publisher.publish(bsmJson, this.odeProperties.getKafkaTopicOdeRawEncodedBSMJson()); } } else if (messageType == "SSM") { - String ssmJson = SsmReceiver.buildJsonSsmFromPacket(packet); + String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); if(ssmJson!=null){ publisher.publish(ssmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSSMJson()); } } else if (messageType == "SRM") { - String srmJson = SrmReceiver.buildJsonSrmFromPacket(packet); + String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); if(srmJson!=null){ publisher.publish(srmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSRMJson()); } } else if (messageType == "PSM") { - String psmJson = PsmReceiver.buildJsonPsmFromPacket(packet); + String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); if(psmJson!=null){ publisher.publish(psmJson, this.odeProperties.getKafkaTopicOdeRawEncodedPSMJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java index 05f73f98c..9fb88cc11 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.map; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMapMetadata.MapSource; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeMapMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class MapReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(MapReceiver.class); @@ -55,7 +43,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { - String mapJson = buildJsonMapFromPacket(packet); + String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); if(mapJson != null){ // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic mapPublisher.publish(mapJson, mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); @@ -69,29 +57,5 @@ public void run() { } - public static String buildJsonMapFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload mapPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); - if (mapPayload == null) - return null; - OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - mapMetadata.setOdeReceivedAt(timestamp); - - mapMetadata.setOriginIp(senderIp); - mapMetadata.setMapSource(MapSource.RSU); - mapMetadata.setRecordType(RecordType.mapTx); - mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - mapMetadata.setSecurityResultCode(SecurityResultCode.success); - - return JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false); - - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java index 9acd23776..9e6f0d060 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.psm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource; -import us.dot.its.jpo.ode.model.OdePsmMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class PsmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(PsmReceiver.class); @@ -54,7 +42,7 @@ public void run() { logger.debug("Waiting for UDP PSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - String psmJson = buildJsonPsmFromPacket(packet); + String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); if(psmJson != null){ // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic psmPublisher.publish(psmJson, psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); @@ -66,29 +54,5 @@ public void run() { } while (!isStopped()); } - public static String buildJsonPsmFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload psmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); - if (psmPayload == null) - return null; - OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - psmMetadata.setOdeReceivedAt(timestamp); - - psmMetadata.setOriginIp(senderIp); - psmMetadata.setPsmSource(PsmSource.RSU); - psmMetadata.setRecordType(RecordType.psmTx); - psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); - psmMetadata.setSecurityResultCode(SecurityResultCode.success); - - return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false); - - - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java index 9ecb993d3..44ccdba29 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.spat; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSpatMetadata; -import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SpatReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SpatReceiver.class); @@ -55,7 +43,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { - String spatJson = buildJsonSpatFromPacket(packet); + String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic spatPublisher.publish(spatJson,spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } @@ -66,30 +54,5 @@ public void run() { } - public static String buildJsonSpatFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload spatPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); - if (spatPayload == null) - return null; - OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - spatMetadata.setOdeReceivedAt(timestamp); - - spatMetadata.setOriginIp(senderIp); - spatMetadata.setSpatSource(SpatSource.RSU); - spatMetadata.setRecordType(RecordType.spatTx); - spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - spatMetadata.setSecurityResultCode(SecurityResultCode.success); - - - return JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false); - - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java index edbbf9da5..89091fad2 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.srm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSrmMetadata; -import us.dot.its.jpo.ode.model.OdeSrmMetadata.SrmSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SrmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SrmReceiver.class); @@ -55,7 +43,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { - String srmJson = buildJsonSrmFromPacket(packet); + String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic srmPublisher.publish(srmJson, srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); } @@ -65,29 +53,5 @@ public void run() { } while (!isStopped()); } - public static String buildJsonSrmFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload srmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); - if (srmPayload == null) - return null; - OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - srmMetadata.setOdeReceivedAt(timestamp); - - srmMetadata.setOriginIp(senderIp); - srmMetadata.setSrmSource(SrmSource.RSU); - srmMetadata.setRecordType(RecordType.srmTx); - srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - srmMetadata.setSecurityResultCode(SecurityResultCode.success); - - return JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false); - - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java index c12aeae4a..ddb789f85 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.ssm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSsmMetadata; -import us.dot.its.jpo.ode.model.OdeSsmMetadata.SsmSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SsmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SsmReceiver.class); @@ -55,7 +43,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { - String ssmJson = buildJsonSsmFromPacket(packet); + String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); if(ssmJson!=null){ // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic @@ -69,28 +57,5 @@ public void run() { } while (!isStopped()); } - public static String buildJsonSsmFromPacket(DatagramPacket packet){ - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload ssmPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); - if (ssmPayload == null) - return null; - OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - ssmMetadata.setOdeReceivedAt(timestamp); - - ssmMetadata.setOriginIp(senderIp); - ssmMetadata.setSsmSource(SsmSource.RSU); - ssmMetadata.setRecordType(RecordType.ssmTx); - ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - ssmMetadata.setSecurityResultCode(SecurityResultCode.success); - - return JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false); - } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java index 3fb8d1cc2..14a8518a8 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java @@ -1,25 +1,14 @@ package us.dot.its.jpo.ode.udp.tim; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeTimMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class TimReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(TimReceiver.class); @@ -53,7 +42,7 @@ public void run() { socket.receive(packet); if (packet.getLength() > 0) { - String timJson = buildJsonTimFromPacket(packet); + String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); if(timJson != null){ // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic timPublisher.publish(timJson, timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); @@ -66,29 +55,5 @@ public void run() { } while (!isStopped()); } - public static String buildJsonTimFromPacket(DatagramPacket packet){ - - String senderIp = packet.getAddress().getHostAddress(); - int senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload timPayload = AbstractUdpReceiverPublisher.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); - if (timPayload == null) - return null; - OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - timMetadata.setOdeReceivedAt(timestamp); - - timMetadata.setOriginIp(senderIp); - timMetadata.setRecordType(RecordType.timMsg); - timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - timMetadata.setSecurityResultCode(SecurityResultCode.success); - return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false); - - - } + } From 0ceaa6bb0e9f5be761393e46fb87da395a69b1bd Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 28 Jun 2024 08:57:08 -0600 Subject: [PATCH 3/9] Fixed bug in generic receiver script --- scripts/tests/udpsender_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tests/udpsender_generic.py b/scripts/tests/udpsender_generic.py index 36d02aaa4..80dd0a823 100644 --- a/scripts/tests/udpsender_generic.py +++ b/scripts/tests/udpsender_generic.py @@ -30,4 +30,4 @@ sock.sendto(bytes.fromhex(SSM_MESSAGE), (UDP_IP, UDP_PORT)) sock.sendto(bytes.fromhex(SRM_MESSAGE), (UDP_IP, UDP_PORT)) sock.sendto(bytes.fromhex(TIM_MESSAGE), (UDP_IP, UDP_PORT)) - sock.sendto(bytes.fromhex(JUNK_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(BAD_MESSAGE), (UDP_IP, UDP_PORT)) From 478c836e9f8cfa85804cf1279dbf48eca8666781 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 28 Jun 2024 09:17:07 -0600 Subject: [PATCH 4/9] Removing Unused imports from BSM Receiver --- .../us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index bdf4cbeeb..baf7d1126 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -1,30 +1,14 @@ package us.dot.its.jpo.ode.udp.bsm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeBsmMetadata; -import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.ReceivedMessageDetails; -import us.dot.its.jpo.ode.model.RxSource; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; import us.dot.its.jpo.ode.udp.UdpHexDecoder; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; public class BsmReceiver extends AbstractUdpReceiverPublisher { From f6cf582e558235fbdb5c943aa675734e1443d117 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Wed, 24 Jul 2024 17:17:32 -0600 Subject: [PATCH 5/9] Changed Tim Variable Name --- .../src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java index 1535f6745..8530e6652 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java @@ -52,9 +52,9 @@ public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag); logger.debug("Stripped {} packet: {}", msgType, payloadHexString); - OdeAsn1Payload timPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); + OdeAsn1Payload odePayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); - return timPayload; + return odePayload; } public static String buildJsonMapFromPacket(DatagramPacket packet){ From aac46af6b8f48e22d078fbe2a2f01774d01c7ccb Mon Sep 17 00:00:00 2001 From: john-wiens Date: Wed, 24 Jul 2024 17:17:52 -0600 Subject: [PATCH 6/9] Added Port definition to confluent cloud docker-compose --- docker-compose-confluent-cloud.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6d729451d..2b354c5f5 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -13,6 +13,7 @@ services: - "44910:44910/udp" - "44920:44920/udp" - "44930:44930/udp" + - "44990:44990/udp" - "5555:5555/udp" - "6666:6666/udp" environment: From 7b76880cfb7e600d40254d0b674c9b0e69f4738a Mon Sep 17 00:00:00 2001 From: John-Wiens Date: Tue, 13 Aug 2024 10:30:19 -0700 Subject: [PATCH 7/9] Fixed bug with decoding message headers --- .../us/dot/its/jpo/ode/udp/UdpHexDecoder.java | 2 +- .../jpo/ode/udp/generic/GenericReceiver.java | 21 +++++++++++-------- .../us/dot/its/jpo/ode/uper/UperUtil.java | 15 +++++++++---- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java index 8530e6652..d6e5344ee 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java @@ -49,7 +49,7 @@ public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil return null; logger.debug("Full {} packet: {}", msgType, payloadHexString); - payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag); + payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag).toLowerCase(); logger.debug("Stripped {} packet: {}", msgType, payloadHexString); OdeAsn1Payload odePayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java index 2746a7941..90a8042d7 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java @@ -44,7 +44,7 @@ public void run() { do { - + buffer = new byte[bufferSize]; // packet should be recreated on each loop to prevent latent data in buffer DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { @@ -61,41 +61,44 @@ public void run() { continue; } String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); - String messageType = UperUtil.determineHexPacketType(payloadHexString); + logger.debug("Raw Payload" + payloadHexString); + + String messageType = UperUtil.determineHexPacketType(payloadHexString); logger.debug("Detected Message Type {}", messageType); - if (messageType == "MAP") { + if (messageType.equals("MAP")) { String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); + logger.debug("Sending Data to Topic" + mapJson); if(mapJson != null){ publisher.publish(mapJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); } - } else if(messageType == "SPAT") { + } else if(messageType.equals("SPAT")) { String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); if(spatJson != null){ publisher.publish(spatJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } - } else if (messageType == "TIM") { + } else if (messageType.equals("TIM")) { String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); if(timJson != null){ publisher.publish(timJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); } - } else if (messageType == "BSM") { + } else if (messageType.equals("BSM")) { String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); if(bsmJson!=null){ publisher.publish(bsmJson, this.odeProperties.getKafkaTopicOdeRawEncodedBSMJson()); } - } else if (messageType == "SSM") { + } else if (messageType.equals("SSM")) { String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); if(ssmJson!=null){ publisher.publish(ssmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSSMJson()); } - } else if (messageType == "SRM") { + } else if (messageType.equals("SRM")) { String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); if(srmJson!=null){ publisher.publish(srmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSRMJson()); } - } else if (messageType == "PSM") { + } else if (messageType.equals("PSM")) { String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); if(psmJson!=null){ publisher.publish(psmJson, this.odeProperties.getKafkaTopicOdeRawEncodedPSMJson()); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java index 486c7b1eb..235f1bd7f 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java @@ -30,7 +30,7 @@ public enum SupportedMessageTypes { // Strips the IEEE 1609.2 security header (if it exists) and returns the payload public static String stripDot2Header(String hexString, String payload_start_flag) { hexString = hexString.toLowerCase(); - int startIndex = hexString.indexOf(payload_start_flag); + int startIndex = findValidStartFlagLocation(hexString, payload_start_flag); if (startIndex == -1) return "BAD DATA"; return hexString.substring(startIndex, hexString.length()); @@ -46,7 +46,7 @@ public static byte[] stripDot3Header(byte[] packet, HashMap msgS String hexPacketParsed = ""; for (String start_flag : msgStartFlags.values()) { - int payloadStartIndex = hexString.indexOf(start_flag); + int payloadStartIndex = findValidStartFlagLocation(hexString,start_flag); if (payloadStartIndex == -1) continue; @@ -77,7 +77,7 @@ public static byte[] stripDot3Header(byte[] packet, HashMap msgS * Otherwise, returns just the payload. */ public static String stripDot3Header(String hexString, String payload_start_flag) { - int payloadStartIndex = hexString.indexOf(payload_start_flag); + int payloadStartIndex = findValidStartFlagLocation(hexString,payload_start_flag); String headers = hexString.substring(0, payloadStartIndex); String payload = hexString.substring(payloadStartIndex, hexString.length()); // Look for the index of the start flag of a signed 1609.2 header @@ -113,7 +113,7 @@ public static String determineHexPacketType(String hexString){ flagIndexes.put("MAP", findValidStartFlagLocation(hexString, MAP_START_FLAG)); flagIndexes.put("SPAT", findValidStartFlagLocation(hexString, SPAT_START_FLAG)); - flagIndexes.put("TIM", findValidStartFlagLocation(hexString, TIM_START_FLAG)); + flagIndexes.put("TIM", findValidStartFlagLocation(hexString, TIM_START_FLAG)); flagIndexes.put("BSM", findValidStartFlagLocation(hexString, BSM_START_FLAG)); flagIndexes.put("SSM", findValidStartFlagLocation(hexString, SSM_START_FLAG)); flagIndexes.put("PSM", findValidStartFlagLocation(hexString, PSM_START_FLAG)); @@ -136,6 +136,13 @@ public static String determineHexPacketType(String hexString){ public static int findValidStartFlagLocation(String hexString, String startFlag){ int index = hexString.indexOf(startFlag); + // If the message has a header, make sure not to missidentify the message by the header + // Maximum Header Length is 17 Bytes: https://www.researchgate.net/figure/WAVE-Short-Message-format-Reproduced-by-permission_fig6_224242297 + // At 2 Hex Chars per byte that is a maximum length of 38 + if (index != 0){ + index = hexString.indexOf(startFlag, 38); + } + // Make sure start flag is on an even numbered byte while(index != -1 && index %2 != 0){ index = hexString.indexOf(startFlag, index+1); From bc1397bcc38e8caeac78a00b395f93405a3a8bec Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 20 Sep 2024 11:37:59 -0600 Subject: [PATCH 8/9] Updating to make receivers more conformal. --- .../stream/LogFileToAsn1CodecPublisher.java | 16 ++++----- .../us/dot/its/jpo/ode/udp/UdpHexDecoder.java | 36 ++++++++----------- .../dot/its/jpo/ode/udp/bsm/BsmReceiver.java | 2 -- .../dot/its/jpo/ode/udp/map/MapReceiver.java | 1 - .../dot/its/jpo/ode/udp/psm/PsmReceiver.java | 1 - .../its/jpo/ode/udp/spat/SpatReceiver.java | 6 ++-- .../dot/its/jpo/ode/udp/srm/SrmReceiver.java | 5 +-- .../dot/its/jpo/ode/udp/ssm/SsmReceiver.java | 1 - .../dot/its/jpo/ode/udp/tim/TimReceiver.java | 1 - 9 files changed, 28 insertions(+), 41 deletions(-) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java index 0454d8ecc..9420748f2 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java @@ -164,25 +164,25 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP } else { // Determine the message type (MAP, TIM, SSM, SRM, or PSM) String messageType = UperUtil.determineMessageType(msgPayload); - if (messageType == "MAP") { + if (messageType.equals("MAP")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); - } else if(messageType == "SPAT"){ + } else if(messageType.equals("SPAT")){ publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); - } else if (messageType == "TIM") { + } else if (messageType.equals("TIM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); - } else if (messageType == "BSM") { + } else if (messageType.equals("BSM")) { publisher.publish(JsonUtils.toJson(odeData, false), - publisher.getOdeProperties().getKafkaTopicOdeBsmJson()); - } else if (messageType == "SSM") { + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + } else if (messageType.equals("SSM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); - } else if (messageType == "SRM") { + } else if (messageType.equals("SRM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); - } else if (messageType == "PSM") { + } else if (messageType.equals("PSM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java index d6e5344ee..eacc43809 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java @@ -35,7 +35,7 @@ public class UdpHexDecoder { - private static Logger logger = LoggerFactory.getLogger(AbstractUdpReceiverPublisher.class); + private static Logger logger = LoggerFactory.getLogger(UdpHexDecoder.class); public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { String startFlag = UperUtil.getStartFlag(msgType); @@ -69,9 +69,7 @@ public static String buildJsonMapFromPacket(DatagramPacket packet){ OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - mapMetadata.setOdeReceivedAt(timestamp); + mapMetadata.setOdeReceivedAt(getUtcTimeString()); mapMetadata.setOriginIp(senderIp); mapMetadata.setMapSource(MapSource.RSU); @@ -94,9 +92,7 @@ public static String buildJsonSpatFromPacket(DatagramPacket packet){ OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - spatMetadata.setOdeReceivedAt(timestamp); + spatMetadata.setOdeReceivedAt(getUtcTimeString()); spatMetadata.setOriginIp(senderIp); spatMetadata.setSpatSource(SpatSource.RSU); @@ -121,9 +117,7 @@ public static String buildJsonTimFromPacket(DatagramPacket packet){ OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - timMetadata.setOdeReceivedAt(timestamp); + timMetadata.setOdeReceivedAt(getUtcTimeString()); timMetadata.setOriginIp(senderIp); timMetadata.setRecordType(RecordType.timMsg); @@ -143,9 +137,7 @@ public static String buildJsonBsmFromPacket(DatagramPacket packet){ OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); // Set BSM Metadata values that can be assumed from the UDP endpoint - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - bsmMetadata.setOdeReceivedAt(timestamp); + bsmMetadata.setOdeReceivedAt(getUtcTimeString()); ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( @@ -179,9 +171,7 @@ public static String buildJsonSsmFromPacket(DatagramPacket packet){ OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - ssmMetadata.setOdeReceivedAt(timestamp); + ssmMetadata.setOdeReceivedAt(getUtcTimeString()); ssmMetadata.setOriginIp(senderIp); ssmMetadata.setSsmSource(SsmSource.RSU); @@ -204,9 +194,7 @@ public static String buildJsonSrmFromPacket(DatagramPacket packet){ OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - srmMetadata.setOdeReceivedAt(timestamp); + srmMetadata.setOdeReceivedAt(getUtcTimeString()); srmMetadata.setOriginIp(senderIp); srmMetadata.setSrmSource(SrmSource.RSU); @@ -228,9 +216,7 @@ public static String buildJsonPsmFromPacket(DatagramPacket packet){ return null; OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - psmMetadata.setOdeReceivedAt(timestamp); + psmMetadata.setOdeReceivedAt(getUtcTimeString()); psmMetadata.setOriginIp(senderIp); psmMetadata.setPsmSource(PsmSource.RSU); @@ -241,5 +227,11 @@ public static String buildJsonPsmFromPacket(DatagramPacket packet){ return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false); } + public static String getUtcTimeString(){ + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + return timestamp; + } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index baf7d1126..f135593a1 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -43,11 +43,9 @@ public void run() { logger.debug("Waiting for UDP BSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - // Create OdeMsgPayload and OdeLogMetadata objects and populate them String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); if(bsmJson != null){ - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic bsmPublisher.publish(bsmJson, bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java index 9fb88cc11..53a4dcd55 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java @@ -45,7 +45,6 @@ public void run() { String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); if(mapJson != null){ - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic mapPublisher.publish(mapJson, mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java index 9e6f0d060..82bb93b41 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java @@ -44,7 +44,6 @@ public void run() { if (packet.getLength() > 0) { String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); if(psmJson != null){ - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic psmPublisher.publish(psmJson, psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java index 44ccdba29..d67706515 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java @@ -42,10 +42,10 @@ public void run() { logger.debug("Waiting for UDP SPaT packets..."); socket.receive(packet); if (packet.getLength() > 0) { - String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - spatPublisher.publish(spatJson,spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + if(spatJson != null){ + spatPublisher.publish(spatJson,spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java index 89091fad2..0c1361407 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java @@ -44,8 +44,9 @@ public void run() { if (packet.getLength() > 0) { String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - srmPublisher.publish(srmJson, srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); + if(srmJson != null){ + srmPublisher.publish(srmJson, srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java index ddb789f85..cd28f5fd0 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java @@ -46,7 +46,6 @@ public void run() { String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); if(ssmJson!=null){ - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic ssmPublisher.publish(ssmJson, ssmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java index 14a8518a8..cff647a00 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java @@ -44,7 +44,6 @@ public void run() { String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); if(timJson != null){ - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic timPublisher.publish(timJson, timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); } From 89b819c9fc1654bd65a341d1d0267e763a8202e6 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Tue, 24 Sep 2024 16:30:24 -0600 Subject: [PATCH 9/9] Change allowed number of skip bits --- .../src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java index 1136ab357..04cb4cc46 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java @@ -43,6 +43,7 @@ public static String stripDot2Header(String hexString, String payload_start_flag * Otherwise, returns just the payload. */ public static byte[] stripDot3Header(byte[] packet, HashMap msgStartFlags) { + String hexString = HexUtils.toHexString(packet); String hexPacketParsed = ""; @@ -145,11 +146,9 @@ public static int findValidStartFlagLocation(String hexString, String startFlag) int index = hexString.indexOf(startFlag); // If the message has a header, make sure not to missidentify the message by the header - // Maximum Header Length is 17 Bytes: https://www.researchgate.net/figure/WAVE-Short-Message-format-Reproduced-by-permission_fig6_224242297 - // At 2 Hex Chars per byte that is a maximum length of 38 if(index == 0 || index == -1){ - return -1; + return index; } else{ index = hexString.indexOf(startFlag,4);