-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generic receiver #554
Generic receiver #554
Conversation
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
Outdated
Show resolved
Hide resolved
@@ -48,6 +48,7 @@ services: | |||
- "44920:44920/udp" | |||
- "44930:44930/udp" | |||
- "44940:44940/udp" | |||
- "44990:44990/udp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this port need to be exposed on any of the other docker-compose files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this port should be opened in the confluent version of the docker-compose as well. Fixed in the latest update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good and worked well for me when run locally - I just had two small questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!
// packet should be recreated on each loop to prevent latent data in buffer | ||
DatagramPacket packet = new DatagramPacket(buffer, buffer.length); | ||
try { | ||
logger.debug("Waiting for Generic UDP packets..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UDP packets being waited for here are not generic themselves, so this debug message might be a little misleading.
} else if (messageType == "TIM") { | ||
publisher.publish(JsonUtils.toJson(odeData, false), | ||
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); | ||
} else if (messageType == "BSM") { | ||
publisher.publish(JsonUtils.toJson(odeData, false), | ||
publisher.getOdeProperties().getKafkaTopicOdeBsmJson()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use raw plain
BSM json topic rather than raw encoded
BSM JSON? I notice other message types are using encoded json topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Catch - Fixed!
|
||
public class UdpHexDecoder { | ||
|
||
private static Logger logger = LoggerFactory.getLogger(AbstractUdpReceiverPublisher.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logger class shall be UdpHexDecoder.class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
Outdated
Show resolved
Hide resolved
return null; | ||
OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload); | ||
// Add header data for the decoding process | ||
ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably use a method for creating this timestamp if duplicated in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. I added a static method at the bottom that is shared between all the sources for this.
String bsmJson = UdpHexDecoder.buildJsonBsmFromPacket(packet); | ||
|
||
if(bsmJson != null){ | ||
// Submit JSON to the OdeRawEncodedMessageJson Kafka Topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment shall be OdeRawEncodedBSM
Json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/map/MapReceiver.java
Outdated
Show resolved
Hide resolved
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java
Outdated
Show resolved
Hide resolved
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/spat/SpatReceiver.java
Outdated
Show resolved
Hide resolved
@John-Wiens Is it possible to resolve the conflicts? |
Fixed Merge Conflicts |
This PR adds a Generic UDP receiver that can receive the following message types: MAP, SPaT, BSM, TIM, SRM, SSM, TIM.
The generic receiver allows the ODE to receive multiple different message types using a single UDP port. A new UDP port allocation 44990 has been created to act as the generic receiver port.
Generic Message decoding is done by first identifying the message type, than using existing logic to decode the message and send it to the relevant decoder topic. This identification strategy has been in use by Maricopa County since November 2023. This PR formalizes the strategy developed for Maricopa County and integrates it with the new receiver model developed in March and May 2024.
Summary of Changes
Related Issue
Related Issue: #553
Motivation and Context
The purpose of this update is to allow support for RSU units that are able to forward messages via UDP, but unable to forward different messages to different ports.
How Has This Been Tested?
I have created a new python udp sender script (udpsender_generic.py) which sends the following message types to port 44990: MAP, SPaT, BSM, TIM, SRM, SSM, TIM. For each message type, I monitored the corresponding decoded message topic (OdeMapJson, OdeSpatJson, etc) to ensure messages were decoded properly on the output topic. The generic receiver test script also sends a bad invalid message to the receiver to ensure the system handles invalid messages safely.
As this change refactors the existing udp receivers, I have also validated that the existing udpsender scripts function as expected and create decoded output on the corresponding decoder topics.
Types of changes
Checklist:
ODE Contributing Guide