Skip to content

Conversation

@jessicaKoehnke
Copy link
Contributor

@jessicaKoehnke jessicaKoehnke commented Sep 9, 2025

Summary

This PR adds the support to import SIRI-ET realtime updates via MQTT. It's implemented as a sandbox feature. When no SIRI MQTT updater is configured in the router-config.json the sandbox code is not executed.

Issue

Closes #6639

Unit tests

Without an MQTT this is hard to test. Existing tests all run successfully and changes to non-sandbox code are minimal.

Documentation

Documentation has been updated.

Changelog

Added to changelog

Bumping the serialization version id

Not necessary.

jessicaKoehnke and others added 30 commits May 16, 2025 18:10
# Conflicts:
#	application/src/main/java/org/opentripplanner/updater/trip/siri/ModifiedTripBuilder.java
@t2gran t2gran added this to the 2.9 (next release) milestone Sep 10, 2025
@jessicaKoehnke jessicaKoehnke added !Improvement A functional improvement or micro feature +Sandbox This will be implemented as a Sandbox feature +Real-Time The issue/PR is related to RealTime updates HBT HBT (Hamburg) roadmap labels Sep 24, 2025
@jessicaKoehnke
Copy link
Contributor Author

I've done some performance tests.
Max receiving rate (Paho): 1900 messages per second
Max receiving rate (HiveMQ): 4500 messages per second

Bottleneck is however the XML Parsing with about 1000 messages per second on my local machine and 420 messages per second on our cloud machine.

The reason to change the library to HiveMQ would be that it's a newer library that still receives updates. It is faster, but right now we would not be able to profit from that.

@jessicaKoehnke
Copy link
Contributor Author

We decided in the dev meeting to go forward with the HiveMq library. I will check, if all current MQTT implementations work with HiveMq, and then substitute Paho with HiveMQ.

@lmmhbt lmmhbt mentioned this pull request Sep 26, 2025
@codecov
Copy link

codecov bot commented Oct 16, 2025

Codecov Report

❌ Patch coverage is 22.46835% with 245 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.03%. Comparing base (1278304) to head (243923e).
⚠️ Report is 220 commits behind head on dev-2.x.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...iri/updater/mqtt/MqttEstimatedTimetableSource.java 0.00% 211 Missing ⚠️
...anner/ext/siri/updater/mqtt/SiriETMqttUpdater.java 0.00% 17 Missing ⚠️
...siri/updater/mqtt/MqttSiriETUpdaterParameters.java 52.00% 12 Missing ⚠️
...planner/updater/configure/UpdaterConfigurator.java 0.00% 2 Missing and 1 partial ⚠️
...routerconfig/updaters/SiriETMqttUpdaterConfig.java 98.21% 1 Missing ⚠️
...ip/siri/updater/AsyncEstimatedTimetableSource.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             dev-2.x    #6851      +/-   ##
=============================================
- Coverage      72.14%   72.03%   -0.12%     
- Complexity     19772    19918     +146     
=============================================
  Files           2151     2166      +15     
  Lines          79955    80535     +580     
  Branches        8058     8111      +53     
=============================================
+ Hits           57687    58015     +328     
- Misses         19423    19662     +239     
- Partials        2845     2858      +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@jessicaKoehnke jessicaKoehnke marked this pull request as ready for review October 16, 2025 09:34
@jessicaKoehnke jessicaKoehnke requested a review from a team as a code owner October 16, 2025 09:34
@leonardehrenfried leonardehrenfried changed the title Add Siri via MQTT Realtime Updater Add SIRI-ET via MQTT updater Oct 16, 2025
{
"updaters" : [
{
"type" : "siri-et-mqtt-updater",
Copy link
Member

@leonardehrenfried leonardehrenfried Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a reviewer but I just want to drop this: I find the suffix -updater in these type values strange because they are all updaters. In the ones I have added I didn't use it.

@leonardehrenfried leonardehrenfried changed the title Add SIRI-ET via MQTT updater SIRI-ET updater via MQTT Oct 16, 2025
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments you might want to consider.

Comment on lines 142 to 145
parameters.user() == null ||
parameters.user().isBlank() ||
parameters.password() == null ||
parameters.password().isBlank()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a utility method for this at StringUtils.hasValue() that you can use if you want.

primingFutures.add(f);
}
LOG.info("Started {} priming workers", parameters.numberOfPrimingWorkers());
liveExecutor.submit(new LiveRunner());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to run this in parallel with your priming? This could cause a live ET message be overwritten by a retained message. In our implementation we apply all the full history before we start consuming live messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks!

private void onMessage(Mqtt5Publish message) {
boolean offer;
if (message.isRetain() && !primed) {
offer = primingMessageQueue.offer(message.getPayloadAsBytes());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there could be a race condition here if a message is put on the primingMessageQueue at the same time as the last RetainRunner times out. Then this message won't be processed. That might not be a catastrophe, but worth to consider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a new client connects to the broker, only old messages will be marked as retained for that client. All messages that are processed immediately after they are sent to the broker will never have the retained flag. So the idea is that the fixed amount of retained messages get processed, and when the runners idle long enough (maxPrimingIdleTime), then it is assumed that all retained messages are processed so the runners can get shut down.

Copy link
Contributor

@habrahamsson-skanetrafiken habrahamsson-skanetrafiken Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! If there is network congestion or some other circumstances I guess this could still happen in theory. In practice it will be uncommon and won't have very bad consequences. If you really wanted to protect against this eventuality you could consume any remaining messages from the primingMessageQueue after you set primed = true. But it is up to you if you think it's worth it since it's your updater.

List<CompletableFuture<Void>> primingFutures = new ArrayList<>();

for (int i = 0; i < parameters.numberOfPrimingWorkers(); i++) {
CompletableFuture<Void> f = CompletableFuture.runAsync(new RetainRunner(i), primingExecutor);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing to consider about parallelizing your ET processing is that you might apply your ET messages out of order. If you have multiple messages for the same trip (for example a time update followed by a cancellation) then you will get a different state depending on the order that these are applied. If you don't have duplicate messages for the same trip in your retained messages this won't be a problem i think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have one message per trip, so the order shouldn't matter:

  • One mqtt topic per trip, in every topic is always only one message
  • the only way to get 2 messages for a trip is if a new live message comes in (which is exactly the problem you mentioned in your other comment about the LiveRunner starting too early)


@Override
public void teardown() {
client.disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you also shutdown the executors while tearing down?

 @Override
     public void teardown() {
       liveExecutor.shutdownNow();
       primingExecutor.shutdownNow();
       client.disconnect();
     }


public MqttEstimatedTimetableSource(MqttSiriETUpdaterParameters parameters) {
this.parameters = parameters;
this.primingExecutor = Executors.newFixedThreadPool(parameters.numberOfPrimingWorkers());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue;
}
var serviceDelivery = optionalServiceDelivery.get();
serviceDeliveryConsumer.apply(serviceDelivery);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the service delivery is sent to the graph writer thread without waiting for it to be applied.
In the priming logic of the GooglePubSub updater, we have a blocking wait: future.get();
https://github.com/OpenTripPlanner/OpenTripPlanner/blob/5198c5ffef3d2db4f78f36778501333d02ec8444/application/src/main/java/org/opentripplanner/updater/GraphUpdaterManager.java#L79
so that we can claim that the updater is primed when all initial data is applied to the transit model.

If you want to make sure that all priming runners are done AND all priming messages are applied to the transit model, you would have to collect all these futures and wait for them to be completed.

.addDisconnectedListener(this::onDisconnect)
.buildAsync();

client.connectWith().keepAlive(30).cleanStart(false).send().join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity: how does cleanStart=false work together with a random client ID and automatic reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I understand it, the client will reconnect with the same client ID as before and then only receive the retained messages that it didn't yet get. So you avoid processing the same message twice.

private Mqtt5AsyncClient client;
private Function<ServiceDelivery, Future<?>> serviceDeliveryConsumer;

private final BlockingQueue<byte[]> liveMessageQueue = new LinkedBlockingQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queues are unbounded, is there a risk of OOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the priming queue it could happen if there are more retained messages in the broker than can be hold in memory. I think this is a very specific number for different deployments. I could make the upper bound configurable, something like maxNumberOfRetainedMessages?

For the live queue the rate of the incoming messages would need to be higher than the processing rate. At the moment we are factor 10 to 20 away from that, even at absolute peak times. I could set an upper limit and then log warnings when the limit is reached. The queue needs to be able to hold all live messages that come in during priming however, so again, it's very deployment specific. I don't want to end up in configuration hell, so I am not quite sure. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a real problem. You could just release a first version of this updater without any additional configuration, and if needed come back to it later to put in place some back-pressure/throttling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok great. We will have monitoring on this, so I don't think it will be an issue for us. If it does become one, we can come back to this.

private void onMessage(Mqtt5Publish message) {
boolean offer;
if (message.isRetain() && !primed) {
offer = primingMessageQueue.offer(message.getPayloadAsBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offer is always true on unbounded queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I mainly did it that to get rid of the IntelliJ warning of an unused return value tbh. It would get relevant if the queue gets bounded in the future however.

primingFutures.add(f);
}
LOG.info("Started {} priming workers", parameters.numberOfPrimingWorkers());
liveExecutor.submit(new LiveRunner());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You submit a single task that runs "forever" and catches only InterruptedException. If that task throws an exception, the single thread will die and the pool will create a new one. But no task will be submitted automatically to this new thread and the updater will stop processing updates.
You should probably make LiveRunner.run() resilient to other types of exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

HBT HBT (Hamburg) roadmap !Improvement A functional improvement or micro feature +Real-Time The issue/PR is related to RealTime updates +Sandbox This will be implemented as a Sandbox feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Siri Realtime Updater via MQTT

6 participants