diff --git a/docker-compose.yml b/docker-compose.yml index 7b5fea135..e5f5e809a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,7 @@ services: - "44920:44920/udp" - "44930:44930/udp" - "44940:44940/udp" + - "44990:44990/udp" - "5555:5555/udp" - "6666:6666/udp" environment: diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java index de04905da..061b30700 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java @@ -165,6 +165,10 @@ public class OdeProperties implements EnvironmentAware { private String kafkaTopicOdePsmJson = "topic.OdePsmJson"; private int psmReceiverPort = 44940; private int psmBufferSize = 500; + + // Generic Receiver + private int genericReceiverPort = 44990; + private int genericBufferSize = 2000; // DriverAlerts private String kafkaTopicDriverAlertJson = "topic.OdeDriverAlertJson"; @@ -470,6 +474,22 @@ public void setPsmBufferSize(int psmBufferSize) { this.psmBufferSize = psmBufferSize; } + public int getGenericReceiverPort() { + return genericReceiverPort; + } + + public void setGenericReceiverPort(int genericReceiverPort) { + this.genericReceiverPort = genericReceiverPort; + } + + public int getGenericBufferSize() { + return genericBufferSize; + } + + public void setGenericBufferSize(int psmBufferSize) { + this.genericBufferSize = genericBufferSize; + } + public void setUploadLocationRoot(String uploadLocationRoot) { this.uploadLocationRoot = uploadLocationRoot; } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java index 17b9e42c4..9420748f2 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java @@ -164,19 +164,25 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP } else { // Determine the message type (MAP, TIM, SSM, SRM, or PSM) String messageType = UperUtil.determineMessageType(msgPayload); - if (messageType == "MAP") { + if (messageType.equals("MAP")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); - } else if (messageType == "TIM") { + } else if(messageType.equals("SPAT")){ + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + } else if (messageType.equals("TIM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); - } else if (messageType == "SSM") { + } else if (messageType.equals("BSM")) { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + } else if (messageType.equals("SSM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); - } else if (messageType == "SRM") { + } else if (messageType.equals("SRM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); - } else if (messageType == "PSM") { + } else if (messageType.equals("PSM")) { publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java index 96e373ce6..f4ed3b121 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java @@ -1,17 +1,13 @@ package us.dot.its.jpo.ode.udp; import java.net.DatagramSocket; -import java.net.DatagramPacket; import java.net.SocketException; -import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.OdeProperties; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.uper.UperUtil; public abstract class AbstractUdpReceiverPublisher implements Runnable { @@ -57,24 +53,6 @@ public AbstractUdpReceiverPublisher(OdeProperties odeProps, int port, int buffer } } - public OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { - String startFlag = UperUtil.getStartFlag(msgType); - // extract the actual packet from the buffer - byte[] payload = packet.getData(); - if (payload == null) - return null; - // convert bytes to hex string and verify identity - String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); - if (payloadHexString.indexOf(startFlag) == -1) - return null; - - logger.debug("Full {} packet: {}", msgType, payloadHexString); - payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag); - logger.debug("Stripped {} packet: {}", msgType, payloadHexString); - - OdeAsn1Payload timPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); - - return timPayload; - } + } \ No newline at end of file diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java new file mode 100644 index 000000000..bd62c7d0f --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java @@ -0,0 +1,238 @@ +package us.dot.its.jpo.ode.udp; + +import java.net.DatagramPacket; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import us.dot.its.jpo.ode.model.OdeAsn1Data; +import us.dot.its.jpo.ode.model.OdeAsn1Payload; +import us.dot.its.jpo.ode.model.OdeBsmMetadata; +import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; +import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; +import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; +import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; +import us.dot.its.jpo.ode.model.OdeMapMetadata; +import us.dot.its.jpo.ode.model.OdeMapMetadata.MapSource; +import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; +import us.dot.its.jpo.ode.model.OdePsmMetadata; +import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource; +import us.dot.its.jpo.ode.model.OdeSpatMetadata; +import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource; +import us.dot.its.jpo.ode.model.OdeSrmMetadata; +import us.dot.its.jpo.ode.model.OdeSrmMetadata.SrmSource; +import us.dot.its.jpo.ode.model.OdeSsmMetadata; +import us.dot.its.jpo.ode.model.OdeSsmMetadata.SsmSource; +import us.dot.its.jpo.ode.model.OdeTimMetadata; +import us.dot.its.jpo.ode.model.ReceivedMessageDetails; +import us.dot.its.jpo.ode.model.RxSource; +import us.dot.its.jpo.ode.uper.UperUtil; +import us.dot.its.jpo.ode.util.JsonUtils; + +public class UdpHexDecoder { + + private static Logger logger = LoggerFactory.getLogger(UdpHexDecoder.class); + + public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) { + String startFlag = UperUtil.getStartFlag(msgType); + // extract the actual packet from the buffer + byte[] payload = packet.getData(); + if (payload == null) + return null; + // convert bytes to hex string and verify identity + String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); + if (payloadHexString.indexOf(startFlag) == -1) + return null; + + logger.debug("Full {} packet: {}", msgType, payloadHexString); + + payloadHexString = UperUtil.stripTrailingZeros(UperUtil.stripDot3Header(payloadHexString, startFlag)).toLowerCase(); + logger.debug("Stripped {} packet: {}", msgType, payloadHexString); + + OdeAsn1Payload odePayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); + + return odePayload; + } + + public static String buildJsonMapFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload mapPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); + if (mapPayload == null) + return null; + OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); + + // Add header data for the decoding process + mapMetadata.setOdeReceivedAt(getUtcTimeString()); + + mapMetadata.setOriginIp(senderIp); + mapMetadata.setMapSource(MapSource.RSU); + mapMetadata.setRecordType(RecordType.mapTx); + mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + mapMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false); + } + + public static String buildJsonSpatFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload spatPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); + if (spatPayload == null) + return null; + OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); + + // Add header data for the decoding process + spatMetadata.setOdeReceivedAt(getUtcTimeString()); + + spatMetadata.setOriginIp(senderIp); + spatMetadata.setSpatSource(SpatSource.RSU); + spatMetadata.setRecordType(RecordType.spatTx); + spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + spatMetadata.setSecurityResultCode(SecurityResultCode.success); + + + return JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false); + } + + public static String buildJsonTimFromPacket(DatagramPacket packet){ + + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload timPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); + if (timPayload == null) + return null; + OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); + + // Add header data for the decoding process + timMetadata.setOdeReceivedAt(getUtcTimeString()); + + timMetadata.setOriginIp(senderIp); + timMetadata.setRecordType(RecordType.timMsg); + timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + timMetadata.setSecurityResultCode(SecurityResultCode.success); + return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false); + } + + public static String buildJsonBsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + OdeAsn1Payload bsmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); + if (bsmPayload == null) + return null; + OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); + + // Set BSM Metadata values that can be assumed from the UDP endpoint + bsmMetadata.setOdeReceivedAt(getUtcTimeString()); + + ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); + OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( + "unavailable", + "unavailable", + "unavailable", + "unavailable", + "unavailable"); + receivedMessageDetails.setRxSource(RxSource.RSU); + receivedMessageDetails.setLocationData(locationData); + bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); + + bsmMetadata.setOriginIp(senderIp); + bsmMetadata.setBsmSource(BsmSource.EV); + bsmMetadata.setRecordType(RecordType.bsmTx); + bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + bsmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false); + } + + public static String buildJsonSsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload ssmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); + if (ssmPayload == null) + return null; + OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); + + // Add header data for the decoding process + ssmMetadata.setOdeReceivedAt(getUtcTimeString()); + + ssmMetadata.setOriginIp(senderIp); + ssmMetadata.setSsmSource(SsmSource.RSU); + ssmMetadata.setRecordType(RecordType.ssmTx); + ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); + ssmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false); + } + + public static String buildJsonSrmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload srmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); + if (srmPayload == null) + return null; + OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); + + // Add header data for the decoding process + srmMetadata.setOdeReceivedAt(getUtcTimeString()); + + srmMetadata.setOriginIp(senderIp); + srmMetadata.setSrmSource(SrmSource.RSU); + srmMetadata.setRecordType(RecordType.srmTx); + srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); + srmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false); + } + + public static String buildJsonPsmFromPacket(DatagramPacket packet){ + String senderIp = packet.getAddress().getHostAddress(); + int senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload psmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); + if (psmPayload == null) + return null; + OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); + // Add header data for the decoding process + psmMetadata.setOdeReceivedAt(getUtcTimeString()); + + psmMetadata.setOriginIp(senderIp); + psmMetadata.setPsmSource(PsmSource.RSU); + psmMetadata.setRecordType(RecordType.psmTx); + psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); + psmMetadata.setSecurityResultCode(SecurityResultCode.success); + + return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false); + } + + public static String getUtcTimeString(){ + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); + return timestamp; + } + + +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index b8b252f48..f135593a1 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -1,29 +1,14 @@ package us.dot.its.jpo.ode.udp.bsm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeBsmMetadata; -import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.ReceivedMessageDetails; -import us.dot.its.jpo.ode.model.RxSource; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class BsmReceiver extends AbstractUdpReceiverPublisher { @@ -58,45 +43,19 @@ public void run() { logger.debug("Waiting for UDP BSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload bsmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM); - if (bsmPayload == null) - continue; - OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); - - // Set BSM Metadata values that can be assumed from the UDP endpoint - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - bsmMetadata.setOdeReceivedAt(timestamp); - - ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); - OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( - "unavailable", - "unavailable", - "unavailable", - "unavailable", - "unavailable"); - receivedMessageDetails.setRxSource(RxSource.RSU); - receivedMessageDetails.setLocationData(locationData); - bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); - - bsmMetadata.setOriginIp(senderIp); - bsmMetadata.setBsmSource(BsmSource.EV); - bsmMetadata.setRecordType(RecordType.bsmTx); - bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - bsmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - bsmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false), - bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); + + if(bsmJson != null){ + bsmPublisher.publish(bsmJson, bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + } + + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java index f16c657bd..c27352a9b 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java @@ -7,6 +7,7 @@ import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.bsm.BsmReceiver; +import us.dot.its.jpo.ode.udp.generic.GenericReceiver; import us.dot.its.jpo.ode.udp.tim.TimReceiver; import us.dot.its.jpo.ode.udp.ssm.SsmReceiver; import us.dot.its.jpo.ode.udp.srm.SrmReceiver; @@ -53,6 +54,9 @@ public UdpServicesController(OdeProperties odeProps) { // PSM internal port rm.submit(new PsmReceiver(odeProps)); + // Generic Receiver internal port + rm.submit(new GenericReceiver(odeProps)); + logger.debug("UDP receiver services started."); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java new file mode 100644 index 000000000..90a8042d7 --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java @@ -0,0 +1,115 @@ +package us.dot.its.jpo.ode.udp.generic; + +import java.net.DatagramPacket; +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.coder.StringPublisher; +import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; +import us.dot.its.jpo.ode.uper.UperUtil; + +public class GenericReceiver extends AbstractUdpReceiverPublisher { + + private static Logger logger = LoggerFactory.getLogger(GenericReceiver.class); + + private StringPublisher publisher; + + + + @Autowired + public GenericReceiver(OdeProperties odeProps) { + this(odeProps, odeProps.getGenericReceiverPort(), odeProps.getGenericBufferSize()); + + this.publisher = new StringPublisher(odeProps); + } + + public GenericReceiver(OdeProperties odeProps, int port, int bufferSize) { + super(odeProps, port, bufferSize); + + this.publisher = new StringPublisher(odeProps); + + } + + @Override + public void run() { + + logger.debug("Generic UDP Receiver Service started."); + + byte[] buffer = new byte[bufferSize]; + + + + do { + buffer = new byte[bufferSize]; + // packet should be recreated on each loop to prevent latent data in buffer + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + logger.debug("Waiting for Generic UDP packets..."); + socket.receive(packet); + if (packet.getLength() > 0) { + senderIp = packet.getAddress().getHostAddress(); + senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + byte[] payload = packet.getData(); + if (payload == null){ + logger.debug("Skipping Null Payload"); + continue; + } + String payloadHexString = HexUtils.toHexString(payload).toLowerCase(); + logger.debug("Raw Payload" + payloadHexString); + + String messageType = UperUtil.determineHexPacketType(payloadHexString); + + logger.debug("Detected Message Type {}", messageType); + + if (messageType.equals("MAP")) { + String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); + logger.debug("Sending Data to Topic" + mapJson); + if(mapJson != null){ + publisher.publish(mapJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } + } else if(messageType.equals("SPAT")) { + String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); + if(spatJson != null){ + publisher.publish(spatJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + } + } else if (messageType.equals("TIM")) { + String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); + if(timJson != null){ + publisher.publish(timJson, publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + } + } else if (messageType.equals("BSM")) { + String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); + if(bsmJson!=null){ + publisher.publish(bsmJson, this.odeProperties.getKafkaTopicOdeRawEncodedBSMJson()); + } + } else if (messageType.equals("SSM")) { + String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); + if(ssmJson!=null){ + publisher.publish(ssmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSSMJson()); + } + } else if (messageType.equals("SRM")) { + String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); + if(srmJson!=null){ + publisher.publish(srmJson, this.odeProperties.getKafkaTopicOdeRawEncodedSRMJson()); + } + } else if (messageType.equals("PSM")) { + String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); + if(psmJson!=null){ + publisher.publish(psmJson, this.odeProperties.getKafkaTopicOdeRawEncodedPSMJson()); + } + }else{ + logger.debug("Unknown Message Type"); + } + } + } catch (Exception e) { + logger.error("Error receiving packet", e); + } + } while (!isStopped()); + } +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java index d77b15cf2..53a4dcd55 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.map; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMapMetadata.MapSource; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeMapMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class MapReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(MapReceiver.class); @@ -54,34 +42,19 @@ public void run() { logger.debug("Waiting for UDP Map packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload mapPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP); - if (mapPayload == null) - continue; - OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - mapMetadata.setOdeReceivedAt(timestamp); - - mapMetadata.setOriginIp(senderIp); - mapMetadata.setMapSource(MapSource.RSU); - mapMetadata.setRecordType(RecordType.mapTx); - mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - mapMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - mapPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false), - mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + + String mapJson = UdpHexDecoder.buildJsonMapFromPacket(packet); + if(mapJson != null){ + mapPublisher.publish(mapJson, mapPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java index 7bd56c95c..82bb93b41 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.psm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource; -import us.dot.its.jpo.ode.model.OdePsmMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class PsmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(PsmReceiver.class); @@ -54,34 +42,16 @@ public void run() { logger.debug("Waiting for UDP PSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload psmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM); - if (psmPayload == null) - continue; - OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - psmMetadata.setOdeReceivedAt(timestamp); - - psmMetadata.setOriginIp(senderIp); - psmMetadata.setPsmSource(PsmSource.RSU); - psmMetadata.setRecordType(RecordType.psmTx); - psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN); - psmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - psmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false), - psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); + String psmJson = UdpHexDecoder.buildJsonPsmFromPacket(packet); + if(psmJson != null){ + psmPublisher.publish(psmJson, psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java index 5bb7bd8ef..d67706515 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.spat; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSpatMetadata; -import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SpatReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SpatReceiver.class); @@ -54,34 +42,17 @@ public void run() { logger.debug("Waiting for UDP SPaT packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload spatPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT); - if (spatPayload == null) - continue; - OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - spatMetadata.setOdeReceivedAt(timestamp); - - spatMetadata.setOriginIp(senderIp); - spatMetadata.setSpatSource(SpatSource.RSU); - spatMetadata.setRecordType(RecordType.spatTx); - spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - spatMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - spatPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false), - spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + String spatJson = UdpHexDecoder.buildJsonSpatFromPacket(packet); + if(spatJson != null){ + spatPublisher.publish(spatJson,spatPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java index b9d9121bd..0c1361407 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/srm/SrmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.srm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSrmMetadata; -import us.dot.its.jpo.ode.model.OdeSrmMetadata.SrmSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SrmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SrmReceiver.class); @@ -54,34 +42,17 @@ public void run() { logger.debug("Waiting for UDP SRM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload srmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM); - if (srmPayload == null) - continue; - OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - srmMetadata.setOdeReceivedAt(timestamp); - - srmMetadata.setOriginIp(senderIp); - srmMetadata.setSrmSource(SrmSource.RSU); - srmMetadata.setRecordType(RecordType.srmTx); - srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU); - srmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - srmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false), - srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); + + String srmJson = UdpHexDecoder.buildJsonSrmFromPacket(packet); + if(srmJson != null){ + srmPublisher.publish(srmJson, srmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson()); + } } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java index 27108eec7..cd28f5fd0 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/ssm/SsmReceiver.java @@ -1,26 +1,14 @@ package us.dot.its.jpo.ode.udp.ssm; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeSsmMetadata; -import us.dot.its.jpo.ode.model.OdeSsmMetadata.SsmSource; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class SsmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(SsmReceiver.class); @@ -54,34 +42,19 @@ public void run() { logger.debug("Waiting for UDP SSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload ssmPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM); - if (ssmPayload == null) - continue; - OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - ssmMetadata.setOdeReceivedAt(timestamp); + + String ssmJson = UdpHexDecoder.buildJsonSsmFromPacket(packet); - ssmMetadata.setOriginIp(senderIp); - ssmMetadata.setSsmSource(SsmSource.RSU); - ssmMetadata.setRecordType(RecordType.ssmTx); - ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - ssmMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - ssmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false), - ssmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); + if(ssmJson!=null){ + ssmPublisher.publish(ssmJson, ssmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java index 5524463fe..cff647a00 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java @@ -1,25 +1,14 @@ package us.dot.its.jpo.ode.udp.tim; import java.net.DatagramPacket; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import us.dot.its.jpo.ode.coder.StringPublisher; -import us.dot.its.jpo.ode.model.OdeAsn1Data; -import us.dot.its.jpo.ode.model.OdeAsn1Payload; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy; -import us.dot.its.jpo.ode.model.OdeTimMetadata; import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; -import us.dot.its.jpo.ode.uper.UperUtil; -import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.udp.UdpHexDecoder; public class TimReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(TimReceiver.class); @@ -52,33 +41,18 @@ public void run() { logger.debug("Waiting for UDP TIM packets..."); socket.receive(packet); if (packet.getLength() > 0) { - senderIp = packet.getAddress().getHostAddress(); - senderPort = packet.getPort(); - logger.debug("Packet received from {}:{}", senderIp, senderPort); - - // Create OdeMsgPayload and OdeLogMetadata objects and populate them - OdeAsn1Payload timPayload = super.getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM); - if (timPayload == null) - continue; - OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")); - timMetadata.setOdeReceivedAt(timestamp); - - timMetadata.setOriginIp(senderIp); - timMetadata.setRecordType(RecordType.timMsg); - timMetadata.setRecordGeneratedBy(GeneratedBy.RSU); - timMetadata.setSecurityResultCode(SecurityResultCode.success); - - // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - timPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false), - timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + + String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet); + if(timJson != null){ + timPublisher.publish(timJson, timPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); + } + } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } + + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java index 6261744c8..04cb4cc46 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/uper/UperUtil.java @@ -30,7 +30,7 @@ public enum SupportedMessageTypes { // Strips the IEEE 1609.2 security header (if it exists) and returns the payload public static String stripDot2Header(String hexString, String payload_start_flag) { hexString = hexString.toLowerCase(); - int startIndex = hexString.indexOf(payload_start_flag); + int startIndex = findValidStartFlagLocation(hexString, payload_start_flag); if (startIndex == -1) return "BAD DATA"; String strippedPayload = stripTrailingZeros(hexString.substring(startIndex, hexString.length())); @@ -43,15 +43,16 @@ public static String stripDot2Header(String hexString, String payload_start_flag * Otherwise, returns just the payload. */ public static byte[] stripDot3Header(byte[] packet, HashMap msgStartFlags) { + String hexString = HexUtils.toHexString(packet); String hexPacketParsed = ""; - + for (String start_flag : msgStartFlags.values()) { - int payloadStartIndex = hexString.indexOf(start_flag); - - if (payloadStartIndex == -1) + int payloadStartIndex = findValidStartFlagLocation(hexString, start_flag); + if (payloadStartIndex == -1){ continue; - + } + String headers = hexString.substring(0, payloadStartIndex); String payload = hexString.substring(payloadStartIndex, hexString.length()); @@ -81,7 +82,7 @@ public static byte[] stripDot3Header(byte[] packet, HashMap msgS * Otherwise, returns just the payload. */ public static String stripDot3Header(String hexString, String payload_start_flag) { - int payloadStartIndex = hexString.indexOf(payload_start_flag); + int payloadStartIndex = findValidStartFlagLocation(hexString,payload_start_flag); String headers = hexString.substring(0, payloadStartIndex); String payload = hexString.substring(payloadStartIndex, hexString.length()); logger.debug("Base payload: " + payload); @@ -95,41 +96,72 @@ public static String stripDot3Header(String hexString, String payload_start_flag return headers.substring(signedDot2StartIndex, headers.length()) + strippedPayload; } - /** - * Determines the message type based off the most likely start flag - * - * @param payload The OdeMsgPayload to check the content of. - */ + /** + * Determines the message type based off the most likely start flag + * + * @param payload The OdeMsgPayload to check the content of. + */ public static String determineMessageType(OdeMsgPayload payload) { String messageType = ""; try { JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson()); String hexString = payloadJson.getString("bytes").toLowerCase(); + hexString = stripTrailingZeros(hexString); + messageType = determineHexPacketType(hexString); - HashMap flagIndexes = new HashMap(); - flagIndexes.put("MAP", hexString.indexOf(MAP_START_FLAG)); - flagIndexes.put("TIM", hexString.indexOf(TIM_START_FLAG)); - flagIndexes.put("SSM", hexString.indexOf(SSM_START_FLAG)); - flagIndexes.put("PSM", hexString.indexOf(PSM_START_FLAG)); - flagIndexes.put("SRM", hexString.indexOf(SRM_START_FLAG)); - - int lowestIndex = Integer.MAX_VALUE; - for (String key : flagIndexes.keySet()) { - if (flagIndexes.get(key) == -1) { - logger.debug("This message is not of type " + key); - continue; - } - if (flagIndexes.get(key) < lowestIndex) { - messageType = key; - lowestIndex = flagIndexes.get(key); - } - } } catch (JsonUtilsException e) { logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString()); } return messageType; } + public static String determineHexPacketType(String hexString){ + + String messageType = ""; + HashMap flagIndexes = new HashMap(); + + flagIndexes.put("MAP", findValidStartFlagLocation(hexString, MAP_START_FLAG)); + flagIndexes.put("SPAT", findValidStartFlagLocation(hexString, SPAT_START_FLAG)); + flagIndexes.put("TIM", findValidStartFlagLocation(hexString, TIM_START_FLAG)); + flagIndexes.put("BSM", findValidStartFlagLocation(hexString, BSM_START_FLAG)); + flagIndexes.put("SSM", findValidStartFlagLocation(hexString, SSM_START_FLAG)); + flagIndexes.put("PSM", findValidStartFlagLocation(hexString, PSM_START_FLAG)); + flagIndexes.put("SRM", findValidStartFlagLocation(hexString, SRM_START_FLAG)); + + int lowestIndex = Integer.MAX_VALUE; + for (String key : flagIndexes.keySet()) { + if (flagIndexes.get(key) == -1) { + logger.debug("This message is not of type " + key); + continue; + } + if (flagIndexes.get(key) < lowestIndex) { + messageType = key; + lowestIndex = flagIndexes.get(key); + } + } + return messageType; + } + + public static int findValidStartFlagLocation(String hexString, String startFlag){ + int index = hexString.indexOf(startFlag); + + // If the message has a header, make sure not to missidentify the message by the header + + if(index == 0 || index == -1){ + return index; + } + else{ + index = hexString.indexOf(startFlag,4); + } + + // Make sure start flag is on an even numbered byte + while(index != -1 && index %2 != 0){ + index = hexString.indexOf(startFlag, index+1); + } + return index; + } + + /** * Trims extra `00` bytes off of the end of an ASN1 payload string * This is remove the padded bytes added to the payload when receiving ASN1 payloads diff --git a/scripts/tests/udpsender_generic.py b/scripts/tests/udpsender_generic.py new file mode 100644 index 000000000..80dd0a823 --- /dev/null +++ b/scripts/tests/udpsender_generic.py @@ -0,0 +1,33 @@ +import socket +import time +import os + +BSM_MESSAGE = "0022e12d18466c65c1493800000e00e4616183e85a8f0100c000038081bc001480b8494c4c950cd8cde6e9651116579f22a424dd78fffff00761e4fd7eb7d07f7fff80005f11d1020214c1c0ffc7c016aff4017a0ff65403b0fd204c20ffccc04f8fe40c420ffe6404cefe60e9a10133408fcfde1438103ab4138f00e1eec1048ec160103e237410445c171104e26bc103dc4154305c2c84103b1c1c8f0a82f42103f34262d1123198103dac25fb12034ce10381c259f12038ca103574251b10e3b2210324c23ad0f23d8efffe0000209340d10000004264bf00" +MAP_MESSAGE = "001283c138003000205e9c014d3eab092ca624b5518202dc3658042800000400023622c60ca009f66d48abfaf81388d8ad18070027d9b2ffcfe9804f13667b1ffd009ec2c76e3ffc82c4e0001004b00c5000000800066c4574101813ecd8b757fae027d9b30e6ff5604ec363561fe7809ec6cd69bfec813c4d8a617fc9027d9b2147008604fb163666000016250000802580228000001000096229e1309b51a6fe4204dd361cf1fe5009f6018e1000096020a00000080004d88a57f84027d9b3827002804ec36087600a009f62c289407282c310001c0440188800000006c46dbe02813ec5816d800710052200000001b11b6fad404fb16054a0000401c8800000006c47b3d24813ec5816d801b100c4200000000af890f12c580007e87100d4200000008af4c0f12c580077e7a2c0004000160002001cb028d000000800052c160bc40b5fffd8a9409d86bfebb5b40141457fef53b76c008b467014145800080002bffcbffc82c6a0001804b024d000000800036c2213c3b013ecd80096d64027d9affd8cdfc04f635ff7983bc09f66c0082aa2014280b1b80006012c0b3400000100004b02bcf0f6d7fe065d602788b0138eb900b1240001012c083400000080009b0c2af0b804fb15fe6de171afff6c63e04ec15fe1de670060e40002581ea8000004000135da6df0180a0a6adc2c00d0143cd51897fda028c8abb25001a0b0680008012c105400000200009aedbefae005053540ee003c0a326a9cf3fed8143c5667780010582c0004009608aa00000080004d76de7ee402829aba88ffdc050f354525fff80a322bcf23fa602c690000c04b0395000000200016bb4fbd4e01414d3215800802940ab108fff2030d2000110126200000001aee5103be050a15f6f1ffc8404d8800000006bb97c18e0142857dfa800010146200000001aee89099a050a15f8720000b05dd000000800046be3743b781428d80e1b00002879b00514b4404f63600827d8c09e22c000400015ffe6007016190000402582ce8000004000135ecee1de80a146c02e54758143cd8059ad3e027b1b00613dd004f102c360000804b055d000000200046bcc7c3c781428d80108c6e02829b002b2ece050a16019a4b29b00ab5c3604f136004e410409ec018a10000960c3a00000080004d7de9878602851b003923cc05053601623b440a0a6bfb8c3a5014140b0640005012c197400000100005afe570ef2050a36003a47c80a0a6bfd2c45f014140b054000501101a8200000001b05a90edc050535ffe605800a0a101b8200000001b08a30ec0050535ffe605300a0a101c8200000005b0c6f0ea4050515ffca0568b0001000e" +SPAT_MESSAGE = "001338000817a780000089680500204642b342b34802021a15a955a940181190acd0acd20100868555c555c00104342aae2aae002821a155715570" +PSM_MESSAGE = "011d0000201a0000021bd86891de75f84da101c13f042e2214141fff00022c2000270000000163b2cc7986010000" +SSM_MESSAGE = "001e120000000005e9c04071a26614c06000040ba0" +SRM_MESSAGE = "001d2130000010090bd341080d00855c6c0c6899853000a534f7c24cb29897694759b7c0" +TIM_MESSAGE = "005f498718cca69ec1a04600000100105d9b46ec5be401003a0103810040038081d4001f80d07016da410000000000000bbc2b0f775d9b0309c271431fa166ee0a27fff93f136b8205a0a107fb2ef979f4c5bfaeec97e4ad70c2fb36cd9730becdb355cc2fd2a7556b160b98b46ab98ae62c185fa55efb468d5b4000000004e2863f42cddc144ff7980040401262cdd7b809c509f5c62cdd35519c507b9062cdcee129c505cf262cdca5ff9c50432c62cdc5d3d9c502e3e62cdc13e79c501e9262cdbca2d9c5013ee62cdb80359c500e6a62cdb36299c500bc862cdaec1d9c50093c62cdaa2109c5006ea1080203091a859eeebb36006001830001aad27f4ff7580001aad355e39b5880a30029d6585009ef808332d8d9f80c3855151b38c772f765007967ec1170bcb7937f5cb880a25a52863493bcb87570dbcb5abc6bfb2faec606cfa34eb95a24790b2017366d3aabe7729e" +BAD_MESSAGE = "0000badc0de0000000000000000000000000000000" + +# Currently set to oim-dev environment's ODE +UDP_IP = os.getenv('DOCKER_HOST_IP') +UDP_PORT = 44990 +print("UDP target IP:", UDP_IP) +print("UDP target port:", UDP_PORT) +#print("message:", MESSAGE) + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + +while True: + time.sleep(5) + print("sending Messages every 5 second") + sock.sendto(bytes.fromhex(BSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(MAP_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SPAT_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(PSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SSM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(SRM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(TIM_MESSAGE), (UDP_IP, UDP_PORT)) + sock.sendto(bytes.fromhex(BAD_MESSAGE), (UDP_IP, UDP_PORT))