-
Notifications
You must be signed in to change notification settings - Fork 1.1k
SIRI-ET updater via MQTT #6851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev-2.x
Are you sure you want to change the base?
SIRI-ET updater via MQTT #6851
Conversation
…ogging the password
# Conflicts: # application/src/main/java/org/opentripplanner/updater/trip/siri/ModifiedTripBuilder.java
|
I've done some performance tests. Bottleneck is however the XML Parsing with about 1000 messages per second on my local machine and 420 messages per second on our cloud machine. The reason to change the library to HiveMQ would be that it's a newer library that still receives updates. It is faster, but right now we would not be able to profit from that. |
|
We decided in the dev meeting to go forward with the HiveMq library. I will check, if all current MQTT implementations work with HiveMq, and then substitute Paho with HiveMQ. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## dev-2.x #6851 +/- ##
=============================================
- Coverage 72.14% 72.03% -0.12%
- Complexity 19772 19918 +146
=============================================
Files 2151 2166 +15
Lines 79955 80535 +580
Branches 8058 8111 +53
=============================================
+ Hits 57687 58015 +328
- Misses 19423 19662 +239
- Partials 2845 2858 +13 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| { | ||
| "updaters" : [ | ||
| { | ||
| "type" : "siri-et-mqtt-updater", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a reviewer but I just want to drop this: I find the suffix -updater in these type values strange because they are all updaters. In the ones I have added I didn't use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments you might want to consider.
| parameters.user() == null || | ||
| parameters.user().isBlank() || | ||
| parameters.password() == null || | ||
| parameters.password().isBlank() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a utility method for this at StringUtils.hasValue() that you can use if you want.
| primingFutures.add(f); | ||
| } | ||
| LOG.info("Started {} priming workers", parameters.numberOfPrimingWorkers()); | ||
| liveExecutor.submit(new LiveRunner()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to run this in parallel with your priming? This could cause a live ET message be overwritten by a retained message. In our implementation we apply all the full history before we start consuming live messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks!
| private void onMessage(Mqtt5Publish message) { | ||
| boolean offer; | ||
| if (message.isRetain() && !primed) { | ||
| offer = primingMessageQueue.offer(message.getPayloadAsBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there could be a race condition here if a message is put on the primingMessageQueue at the same time as the last RetainRunner times out. Then this message won't be processed. That might not be a catastrophe, but worth to consider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a new client connects to the broker, only old messages will be marked as retained for that client. All messages that are processed immediately after they are sent to the broker will never have the retained flag. So the idea is that the fixed amount of retained messages get processed, and when the runners idle long enough (maxPrimingIdleTime), then it is assumed that all retained messages are processed so the runners can get shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! If there is network congestion or some other circumstances I guess this could still happen in theory. In practice it will be uncommon and won't have very bad consequences. If you really wanted to protect against this eventuality you could consume any remaining messages from the primingMessageQueue after you set primed = true. But it is up to you if you think it's worth it since it's your updater.
| List<CompletableFuture<Void>> primingFutures = new ArrayList<>(); | ||
|
|
||
| for (int i = 0; i < parameters.numberOfPrimingWorkers(); i++) { | ||
| CompletableFuture<Void> f = CompletableFuture.runAsync(new RetainRunner(i), primingExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing to consider about parallelizing your ET processing is that you might apply your ET messages out of order. If you have multiple messages for the same trip (for example a time update followed by a cancellation) then you will get a different state depending on the order that these are applied. If you don't have duplicate messages for the same trip in your retained messages this won't be a problem i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only have one message per trip, so the order shouldn't matter:
- One mqtt topic per trip, in every topic is always only one message
- the only way to get 2 messages for a trip is if a new live message comes in (which is exactly the problem you mentioned in your other comment about the LiveRunner starting too early)
|
|
||
| @Override | ||
| public void teardown() { | ||
| client.disconnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you also shutdown the executors while tearing down?
@Override
public void teardown() {
liveExecutor.shutdownNow();
primingExecutor.shutdownNow();
client.disconnect();
}
|
|
||
| public MqttEstimatedTimetableSource(MqttSiriETUpdaterParameters parameters) { | ||
| this.parameters = parameters; | ||
| this.primingExecutor = Executors.newFixedThreadPool(parameters.numberOfPrimingWorkers()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configuring the naming of the threads may help when debugging, see
https://github.com/OpenTripPlanner/OpenTripPlanner/blob/5198c5ffef3d2db4f78f36778501333d02ec8444/application/src/main/java/org/opentripplanner/updater/GraphUpdaterManager.java#L79
| continue; | ||
| } | ||
| var serviceDelivery = optionalServiceDelivery.get(); | ||
| serviceDeliveryConsumer.apply(serviceDelivery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the service delivery is sent to the graph writer thread without waiting for it to be applied.
In the priming logic of the GooglePubSub updater, we have a blocking wait: future.get();
https://github.com/OpenTripPlanner/OpenTripPlanner/blob/5198c5ffef3d2db4f78f36778501333d02ec8444/application/src/main/java/org/opentripplanner/updater/GraphUpdaterManager.java#L79
so that we can claim that the updater is primed when all initial data is applied to the transit model.
If you want to make sure that all priming runners are done AND all priming messages are applied to the transit model, you would have to collect all these futures and wait for them to be completed.
| .addDisconnectedListener(this::onDisconnect) | ||
| .buildAsync(); | ||
|
|
||
| client.connectWith().keepAlive(30).cleanStart(false).send().join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity: how does cleanStart=false work together with a random client ID and automatic reconnect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I understand it, the client will reconnect with the same client ID as before and then only receive the retained messages that it didn't yet get. So you avoid processing the same message twice.
| private Mqtt5AsyncClient client; | ||
| private Function<ServiceDelivery, Future<?>> serviceDeliveryConsumer; | ||
|
|
||
| private final BlockingQueue<byte[]> liveMessageQueue = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queues are unbounded, is there a risk of OOM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the priming queue it could happen if there are more retained messages in the broker than can be hold in memory. I think this is a very specific number for different deployments. I could make the upper bound configurable, something like maxNumberOfRetainedMessages?
For the live queue the rate of the incoming messages would need to be higher than the processing rate. At the moment we are factor 10 to 20 away from that, even at absolute peak times. I could set an upper limit and then log warnings when the limit is reached. The queue needs to be able to hold all live messages that come in during priming however, so again, it's very deployment specific. I don't want to end up in configuration hell, so I am not quite sure. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is a real problem. You could just release a first version of this updater without any additional configuration, and if needed come back to it later to put in place some back-pressure/throttling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok great. We will have monitoring on this, so I don't think it will be an issue for us. If it does become one, we can come back to this.
| private void onMessage(Mqtt5Publish message) { | ||
| boolean offer; | ||
| if (message.isRetain() && !primed) { | ||
| offer = primingMessageQueue.offer(message.getPayloadAsBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offer is always true on unbounded queues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. I mainly did it that to get rid of the IntelliJ warning of an unused return value tbh. It would get relevant if the queue gets bounded in the future however.
| primingFutures.add(f); | ||
| } | ||
| LOG.info("Started {} priming workers", parameters.numberOfPrimingWorkers()); | ||
| liveExecutor.submit(new LiveRunner()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You submit a single task that runs "forever" and catches only InterruptedException. If that task throws an exception, the single thread will die and the pool will create a new one. But no task will be submitted automatically to this new thread and the updater will stop processing updates.
You should probably make LiveRunner.run() resilient to other types of exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thanks!
Summary
This PR adds the support to import SIRI-ET realtime updates via MQTT. It's implemented as a sandbox feature. When no SIRI MQTT updater is configured in the
router-config.jsonthe sandbox code is not executed.Issue
Closes #6639
Unit tests
Without an MQTT this is hard to test. Existing tests all run successfully and changes to non-sandbox code are minimal.
Documentation
Documentation has been updated.
Changelog
Added to changelog
Bumping the serialization version id
Not necessary.