diff --git a/pom.xml b/pom.xml
index 4710c59..6394185 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,13 +3,13 @@
fi.hsl.transitdata
transitdata-vehicleposition-processor
- 1.1.1
+ 2.0.0
UTF-8
11
11
- 1.4.4
+ 2.0.1
@@ -28,7 +28,7 @@
com.github.ben-manes.caffeine
caffeine
- 3.1.5
+ 3.1.6
junit
@@ -39,7 +39,7 @@
org.mockito
mockito-core
- 5.2.0
+ 5.3.1
test
diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java
index 59bb9ad..a7364f4 100644
--- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java
+++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java
@@ -1,6 +1,7 @@
package fi.hsl.transitdata.vehicleposition.application;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import com.typesafe.config.Config;
import fi.hsl.common.gtfsrt.FeedMessageFactory;
@@ -99,84 +100,121 @@ private static String getUniqueVehicleId(int oper, int veh) {
public void handleMessage(Message message) {
try {
if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.PassengerCount)) {
- PassengerCount.Data data = PassengerCount.Data.parseFrom(message.getData());
-
- final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh());
-
- passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload());
- } else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) {
- Hfp.Data data = Hfp.Data.parseFrom(message.getData());
-
- //Ignore HFP messages that are not sent from vehicles on a journey
- if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) {
- return;
+ PassengerCount.Data data = null;
+ try {
+ data = PassengerCount.Data.parseFrom(message.getData());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Failed to parse passenger count data", e);
+ throw new Exception(e);
}
-
- //Ignore HFP messages that are not sent from vehicles on an ongoing journey
- if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) {
- return;
- }
-
- //Ignore events that are not relevant to calculating stop status
- if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP &&
- data.getTopic().getEventType() != Hfp.Topic.EventType.DUE &&
- data.getTopic().getEventType() != Hfp.Topic.EventType.PAS &&
- data.getTopic().getEventType() != Hfp.Topic.EventType.ARS &&
- data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) {
- log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType());
- return;
+
+ try {
+ final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh());
+
+ passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload());
+ } catch (Exception x) {
+ log.error("Failed to get unique vehicleId and update passenger count");
+ throw x;
}
-
- final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir());
-
- if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) {
- //If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position
- log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId());
- return;
+ } else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) {
+ Hfp.Data data = null;
+ try {
+ data = Hfp.Data.parseFrom(message.getData());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Failed to parse HfpData", e);
+ throw new Exception(e);
}
-
- if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) {
- //Vehicle had invalid timestamp..
- return;
+
+ try {
+ //Ignore HFP messages that are not sent from vehicles on a journey
+ if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) {
+ // This log statement is commented out because it produces a big amount of log items
+ //log.info("Ignore HFP messages that are not sent from vehicles on a journey");
+ return;
+ }
+
+ //Ignore HFP messages that are not sent from vehicles on an ongoing journey
+ if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) {
+ // This log statement is commented out because it produces a big amount of log items
+ //log.info("Ignored message since vehicle wasn't on a journey");
+ return;
+ }
+
+ //Ignore events that are not relevant to calculating stop status
+ if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP &&
+ data.getTopic().getEventType() != Hfp.Topic.EventType.DUE &&
+ data.getTopic().getEventType() != Hfp.Topic.EventType.PAS &&
+ data.getTopic().getEventType() != Hfp.Topic.EventType.ARS &&
+ data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) {
+ log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType());
+ return;
+ }
+ } catch (Exception x) {
+ log.error("Topic related checks failed");
+ throw x;
}
- if (!vehicleDelayValidator.validateDelay(data)) {
- // Vehicle was delayed too much
- return;
+ final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir());
+
+ try {
+ if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) {
+ //If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position
+ log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId());
+ return;
+ }
+ } catch (Exception x) {
+ log.error("tripAlreadyTaken check failed");
+ throw x;
}
-
- StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data);
-
- String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber());
- PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir());
- if (!isValidPassengerCountData(passengerCount)) {
- if (passengerCount != null) {
- log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})",
- uniqueVehicleId,
- passengerCount.getVehicleCounts().getVehicleLoad(),
- passengerCount.getVehicleCounts().getVehicleLoadRatio());
+
+ try {
+ if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) {
+ //Vehicle had invalid timestamp..
+ return;
}
-
- //Don't use invalid data
- passengerCount = null;
+
+ if (!vehicleDelayValidator.validateDelay(data)) {
+ // Vehicle was delayed too much
+ return;
+ }
+ } catch (Exception x) {
+ log.error("Validations failed");
+ throw x;
}
-
- Optional maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount);
-
- Optional optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus);
-
- if (optionalVehiclePosition.isPresent()) {
- final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get();
-
- final String topicSuffix = getTopicSuffix(vehiclePosition);
-
- final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi());
-
- if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) {
- messagesDelayed++;
+
+ try {
+ StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data);
+ String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber());
+ PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir());
+
+ if (!isValidPassengerCountData(passengerCount)) {
+ if (passengerCount != null) {
+ log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})",
+ uniqueVehicleId,
+ passengerCount.getVehicleCounts().getVehicleLoad(),
+ passengerCount.getVehicleCounts().getVehicleLoadRatio());
+ }
+
+ //Don't use invalid data
+ passengerCount = null;
}
-
- sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi());
+
+ Optional maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount);
+ Optional optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus);
+
+ if (optionalVehiclePosition.isPresent()) {
+ final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get();
+ final String topicSuffix = getTopicSuffix(vehiclePosition);
+ final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi());
+
+ if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) {
+ messagesDelayed++;
+ }
+ sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi());
+ }
+ } catch (Exception x) {
+ log.error("Preparing or sending pulsar message failed.", x);
+ throw x;
}
} else {
log.warn("Invalid protobuf schema, expecting HfpData");
diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java
index c90eb8e..646df08 100644
--- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java
+++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java
@@ -43,22 +43,35 @@ public GtfsRtOccupancyStatusHelper(NavigableMap getOccupancyStatus(Hfp.Payload hfpPayload, PassengerCount.Payload passengerCountPayload) {
if (passengerCountEnabledVehicles == null || passengerCountEnabledVehicles.contains(hfpPayload.getOper() + "/" + hfpPayload.getVeh())) {
//If occu field is 100, the driver has marked the vehicle as full
- if (hfpPayload.getOccu() == 100) {
+ if (hfpPayload.getOccu() >= 100) {
return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.FULL);
}
if (passengerCountPayload != null) {
- if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() == 0) {
+ if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() <= 0) {
//If vehicle load is zero, vehicle load ratio is unavailable and the vehicle is empty
return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY);
}
-
+
+ if (passengerCountPayload.getVehicleCounts().getVehicleLoadRatio() <= 0.0) {
+ return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY);
+ }
+
return Optional.of(loadRatioToOccupancyStatus.lowerEntry(passengerCountPayload.getVehicleCounts().getVehicleLoadRatio()).getValue());
}
-
- //If passenger count from APC message is not available, but occu contains value other than 0, use that
- //Currently occu is only available for Suomenlinna ferries
- if (hfpPayload.getOccu() != 0) {
+
+ //If passenger count from APC message is not available, but occu
+ //contains value other than 0, use that.
+ //
+ //Currently occu values larger than 0 but smaller than 100 are only
+ //available for Suomenlinna ferries.
+ //
+ //Many vehicles send '"occu":0' as part of the HFP MQTT payload,
+ //probably to mean "not full". Do not use occu == 0 even for
+ //ferries as it might be just a broken device or a default value. We
+ //would rather not publish information than publish false
+ //information in case of broken devices or implementation.
+ if (hfpPayload.getOccu() > 0) {
return Optional.of(occuToOccupancyStatus.lowerEntry(hfpPayload.getOccu()).getValue());
}
}
diff --git a/src/main/resources/environment.conf b/src/main/resources/environment.conf
index bcc3631..33d17ce 100644
--- a/src/main/resources/environment.conf
+++ b/src/main/resources/environment.conf
@@ -4,6 +4,8 @@ pulsar {
consumer {
multipleTopics=true
multipleTopics=${?PULSAR_CONSUMER_ENABLE_MULTIPLE_TOPICS}
+ topics=["hfp-data, passenger-count"]
+ topics=${?PULSAR_CONSUMER_TOPICS}
topicsPattern="persistent://public/default/(hfp-data|passenger-count)"
topicsPattern=${?PULSAR_CONSUMER_MULTIPLE_TOPICS_PATTERN}
subscription="transitdata-vehicleposition-processor-subscription"
diff --git a/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java b/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java
index 31be5de..efe5607 100644
--- a/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java
+++ b/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java
@@ -2,6 +2,8 @@
import com.google.transit.realtime.GtfsRealtime;
import fi.hsl.common.hfp.proto.Hfp;
+import fi.hsl.common.passengercount.proto.PassengerCount;
+import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.Test;
@@ -34,12 +36,91 @@ public void setup() {
gtfsRtOccupancyStatusHelper = new GtfsRtOccupancyStatusHelper(occuToOccupancyStatus, loadRatioToOccypancyStatus);
}
+
+ @NotNull
+ private static Hfp.Payload getHfpPayload(int occu) {
+ return Hfp.Payload.newBuilder().
+ setSchemaVersion(1).setTsi(0).setTst("").setOccu(occu).build();
+ }
+
+ @NotNull
+ private static PassengerCount.Payload getPassengerCountPayload(int vehicleLoad, double vehicleLoadRatio) {
+ return PassengerCount.Payload.newBuilder().setVehicleCounts(PassengerCount.VehicleCounts.newBuilder()
+ .setCountQuality("").setVehicleLoad(vehicleLoad).setVehicleLoadRatio(vehicleLoadRatio).build()).build();
+ }
@Test
- public void testOccupancyStatus() {
- Optional occuStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(Hfp.Payload.newBuilder().setSchemaVersion(1).setTsi(0).setTst("").setOccu(55).build(), null);
+ public void testHfpOccupancyStatus() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(55),
+ null);
assertTrue(occuStatus.isPresent());
assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.STANDING_ROOM_ONLY, occuStatus.get());
}
+
+ @Test
+ public void testHfpOccupancyStatusEmpty() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(0),
+ null);
+
+ assertTrue(occuStatus.isEmpty());
+ }
+
+ @Test
+ public void testHfpOccupancyStatusNegative() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(-1),
+ null);
+
+ assertTrue(occuStatus.isEmpty());
+ }
+
+ @Test
+ public void testPassengerCountOccupancyStatus() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(0),
+ getPassengerCountPayload(10, 0.55));
+
+ assertTrue(occuStatus.isPresent());
+ assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.STANDING_ROOM_ONLY, occuStatus.get());
+ }
+
+ @Test
+ public void testPassengerCountOccupancyStatusEmpty() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(0),
+ getPassengerCountPayload(0, 0.0));
+
+ assertTrue(occuStatus.isPresent());
+ assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get());
+ }
+
+ @Test
+ public void testPassengerCountOccupancyStatusWithVehicleLoadZeroWithVehicleLoadRatioNegative() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(0),
+ getPassengerCountPayload(0, -1.0));
+
+ assertTrue(occuStatus.isPresent());
+ assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get());
+ }
+
+ @Test
+ public void testPassengerCountOccupancyStatusWithVehicleLoadPositiveWithVehicleLoadRatioNegative() {
+ Optional occuStatus =
+ gtfsRtOccupancyStatusHelper.getOccupancyStatus(
+ getHfpPayload(0),
+ getPassengerCountPayload(1, -1.0));
+
+ assertTrue(occuStatus.isPresent());
+ assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get());
+ }
}