Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.9</version>
</dependency>
<dependency>
<groupId>io.github.ci-cmg</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
Expand All @@ -30,7 +34,7 @@
import org.slf4j.LoggerFactory;

/**
* This class starts an Paho MQTT client which opens a connection to a GTFS-RT data source. A
* This class starts a hive MQTT client which opens a connection to a GTFS-RT data source. A
* callback is registered which handles incoming GTFS-RT messages as they stream in by placing a
* GTFS-RT decoder Runnable task in the single-threaded executor for handling.
* <p>
Expand All @@ -56,16 +60,14 @@ public class MqttGtfsRealtimeUpdater implements GraphUpdater {
private final int qos;
private final ForwardsDelayPropagationType forwardsDelayPropagationType;
private final BackwardsDelayPropagationType backwardsDelayPropagationType;
private final String clientId = "OpenTripPlanner-" + MqttClient.generateClientId();
private final String configRef;
private final MemoryPersistence persistence = new MemoryPersistence();
private final GtfsRealTimeTripUpdateAdapter adapter;
private final Consumer<UpdateResult> recordMetrics;
private WriteToGraphCallback saveResultOnGraph;

private final boolean fuzzyTripMatching;

private MqttClient client;
private Mqtt5AsyncClient client;

public MqttGtfsRealtimeUpdater(
MqttGtfsRealtimeUpdaterParameters parameters,
Expand All @@ -92,106 +94,117 @@ public void setup(WriteToGraphCallback writeToGraphCallback) {

@Override
public void run() throws Exception {
client = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
client = connectAndSubscribeToClient();
}

private Mqtt5AsyncClient connectAndSubscribeToClient() throws URISyntaxException {
URI parsedUrl = new URI(url);
Mqtt5SimpleAuth auth = createAuthFromUrl(parsedUrl);

Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
.identifier("OpenTripPlanner-" + UUID.randomUUID())
.serverHost(parsedUrl.getHost())
.simpleAuth(auth)
.automaticReconnectWithDefaultConfig()
.addConnectedListener(ctx -> onConnect())
.addDisconnectedListener(this::onDisconnect);

if (parsedUrl.getPort() != -1) {
mqtt5ClientBuilder = mqtt5ClientBuilder.serverPort(parsedUrl.getPort());
}

Mqtt5AsyncClient asyncClient = mqtt5ClientBuilder.buildAsync();

asyncClient.connectWith().keepAlive(30).cleanStart(true).send().join();

asyncClient
.subscribeWith()
.topicFilter(topic)
.qos(Optional.ofNullable(MqttQos.fromCode(qos)).orElse(MqttQos.AT_MOST_ONCE))
.callback(this::onMessage)
.send()
.join();

return asyncClient;
}

private Mqtt5SimpleAuth createAuthFromUrl(URI parsedUrl) {
if (parsedUrl.getUserInfo() != null) {
String[] userinfo = parsedUrl.getUserInfo().split(":");
connOpts.setUserName(userinfo[0]);
connOpts.setPassword(userinfo[1].toCharArray());
return Mqtt5SimpleAuth.builder()
.username(userinfo[0])
.password(userinfo[1].getBytes(StandardCharsets.UTF_8))
.build();
}
client.setCallback(new Callback());
return null;
}

private void onDisconnect(MqttClientDisconnectedContext ctx) {
LOG.info("Disconnected client from MQTT broker: {}", url, ctx.getCause());
}

LOG.debug("Connecting to broker: {}", url);
client.connect(connOpts);
private void onConnect() {
LOG.info("Connected client to MQTT broker: {} with qos: {}", url, qos);
}

@Override
public void teardown() {
try {
client.disconnect();
} catch (MqttException e) {
LOG.error("Error disconnecting", e);
}
client.disconnect();
}

@Override
public String getConfigRef() {
return configRef;
}

private class Callback implements MqttCallbackExtended {

@Override
public void connectComplete(boolean reconnect, String serverURI) {
try {
LOG.debug("Connected");
client.subscribe(topic, qos);
} catch (MqttException e) {
LOG.warn("Could not subscribe to: {}", topic);
private void onMessage(Mqtt5Publish message) {
List<GtfsRealtime.TripUpdate> updates = null;
UpdateIncrementality updateIncrementality = FULL_DATASET;
try {
// Decode message
GtfsRealtime.FeedMessage feedMessage = GtfsRealtime.FeedMessage.parseFrom(
message.getPayloadAsBytes()
);
List<GtfsRealtime.FeedEntity> feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (
feedMessage.hasHeader() &&
feedMessage.getHeader().hasIncrementality() &&
feedMessage
.getHeader()
.getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)
) {
updateIncrementality = DIFFERENTIAL;
}
}

@Override
public void connectionLost(Throwable cause) {
LOG.debug("Disconnected");
}

@Override
public void messageArrived(String topic, MqttMessage message) {
List<GtfsRealtime.TripUpdate> updates = null;
UpdateIncrementality updateIncrementality = FULL_DATASET;
try {
// Decode message
GtfsRealtime.FeedMessage feedMessage = GtfsRealtime.FeedMessage.parseFrom(
message.getPayload()
);
List<GtfsRealtime.FeedEntity> feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (
feedMessage.hasHeader() &&
feedMessage.getHeader().hasIncrementality() &&
feedMessage
.getHeader()
.getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)
) {
updateIncrementality = DIFFERENTIAL;
}

// Create List of TripUpdates
updates = new ArrayList<>(feedEntityList.size());
for (GtfsRealtime.FeedEntity feedEntity : feedEntityList) {
if (feedEntity.hasTripUpdate()) {
updates.add(feedEntity.getTripUpdate());
}
// Create List of TripUpdates
updates = new ArrayList<>(feedEntityList.size());
for (GtfsRealtime.FeedEntity feedEntity : feedEntityList) {
if (feedEntity.hasTripUpdate()) {
updates.add(feedEntity.getTripUpdate());
}
} catch (InvalidProtocolBufferException e) {
LOG.error("Could not decode gtfs-rt message:", e);
}

if (updates != null) {
// Handle trip updates via graph writer runnable
saveResultOnGraph.execute(
new TripUpdateGraphWriterRunnable(
adapter,
fuzzyTripMatching,
forwardsDelayPropagationType,
backwardsDelayPropagationType,
updateIncrementality,
updates,
feedId,
recordMetrics
)
);
}
} catch (InvalidProtocolBufferException e) {
LOG.error("Could not decode gtfs-rt message:", e);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
if (updates != null) {
// Handle trip updates via graph writer runnable
saveResultOnGraph.execute(
new TripUpdateGraphWriterRunnable(
adapter,
fuzzyTripMatching,
forwardsDelayPropagationType,
backwardsDelayPropagationType,
updateIncrementality,
updates,
feedId,
recordMetrics
)
);
}
}

@Override
Expand Down
Loading