diff --git a/pom.xml b/pom.xml index 4710c59..6394185 100644 --- a/pom.xml +++ b/pom.xml @@ -3,13 +3,13 @@ fi.hsl.transitdata transitdata-vehicleposition-processor - 1.1.1 + 2.0.0 UTF-8 11 11 - 1.4.4 + 2.0.1 @@ -28,7 +28,7 @@ com.github.ben-manes.caffeine caffeine - 3.1.5 + 3.1.6 junit @@ -39,7 +39,7 @@ org.mockito mockito-core - 5.2.0 + 5.3.1 test diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java index 59bb9ad..a7364f4 100644 --- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java @@ -1,6 +1,7 @@ package fi.hsl.transitdata.vehicleposition.application; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.transit.realtime.GtfsRealtime; import com.typesafe.config.Config; import fi.hsl.common.gtfsrt.FeedMessageFactory; @@ -99,84 +100,121 @@ private static String getUniqueVehicleId(int oper, int veh) { public void handleMessage(Message message) { try { if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.PassengerCount)) { - PassengerCount.Data data = PassengerCount.Data.parseFrom(message.getData()); - - final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh()); - - passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload()); - } else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) { - Hfp.Data data = Hfp.Data.parseFrom(message.getData()); - - //Ignore HFP messages that are not sent from vehicles on a journey - if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) { - return; + PassengerCount.Data data = null; + try { + data = PassengerCount.Data.parseFrom(message.getData()); + } catch (InvalidProtocolBufferException e) { + log.error("Failed to parse passenger count data", e); + throw new Exception(e); } - - //Ignore HFP messages that are not sent from vehicles on an ongoing journey - if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) { - return; - } - - //Ignore events that are not relevant to calculating stop status - if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP && - data.getTopic().getEventType() != Hfp.Topic.EventType.DUE && - data.getTopic().getEventType() != Hfp.Topic.EventType.PAS && - data.getTopic().getEventType() != Hfp.Topic.EventType.ARS && - data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) { - log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType()); - return; + + try { + final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh()); + + passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload()); + } catch (Exception x) { + log.error("Failed to get unique vehicleId and update passenger count"); + throw x; } - - final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir()); - - if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) { - //If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position - log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId()); - return; + } else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) { + Hfp.Data data = null; + try { + data = Hfp.Data.parseFrom(message.getData()); + } catch (InvalidProtocolBufferException e) { + log.error("Failed to parse HfpData", e); + throw new Exception(e); } - - if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) { - //Vehicle had invalid timestamp.. - return; + + try { + //Ignore HFP messages that are not sent from vehicles on a journey + if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) { + // This log statement is commented out because it produces a big amount of log items + //log.info("Ignore HFP messages that are not sent from vehicles on a journey"); + return; + } + + //Ignore HFP messages that are not sent from vehicles on an ongoing journey + if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) { + // This log statement is commented out because it produces a big amount of log items + //log.info("Ignored message since vehicle wasn't on a journey"); + return; + } + + //Ignore events that are not relevant to calculating stop status + if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP && + data.getTopic().getEventType() != Hfp.Topic.EventType.DUE && + data.getTopic().getEventType() != Hfp.Topic.EventType.PAS && + data.getTopic().getEventType() != Hfp.Topic.EventType.ARS && + data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) { + log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType()); + return; + } + } catch (Exception x) { + log.error("Topic related checks failed"); + throw x; } - if (!vehicleDelayValidator.validateDelay(data)) { - // Vehicle was delayed too much - return; + final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir()); + + try { + if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) { + //If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position + log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId()); + return; + } + } catch (Exception x) { + log.error("tripAlreadyTaken check failed"); + throw x; } - - StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data); - - String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber()); - PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir()); - if (!isValidPassengerCountData(passengerCount)) { - if (passengerCount != null) { - log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})", - uniqueVehicleId, - passengerCount.getVehicleCounts().getVehicleLoad(), - passengerCount.getVehicleCounts().getVehicleLoadRatio()); + + try { + if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) { + //Vehicle had invalid timestamp.. + return; } - - //Don't use invalid data - passengerCount = null; + + if (!vehicleDelayValidator.validateDelay(data)) { + // Vehicle was delayed too much + return; + } + } catch (Exception x) { + log.error("Validations failed"); + throw x; } - - Optional maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount); - - Optional optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus); - - if (optionalVehiclePosition.isPresent()) { - final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get(); - - final String topicSuffix = getTopicSuffix(vehiclePosition); - - final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi()); - - if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) { - messagesDelayed++; + + try { + StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data); + String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber()); + PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir()); + + if (!isValidPassengerCountData(passengerCount)) { + if (passengerCount != null) { + log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})", + uniqueVehicleId, + passengerCount.getVehicleCounts().getVehicleLoad(), + passengerCount.getVehicleCounts().getVehicleLoadRatio()); + } + + //Don't use invalid data + passengerCount = null; } - - sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi()); + + Optional maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount); + Optional optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus); + + if (optionalVehiclePosition.isPresent()) { + final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get(); + final String topicSuffix = getTopicSuffix(vehiclePosition); + final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi()); + + if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) { + messagesDelayed++; + } + sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi()); + } + } catch (Exception x) { + log.error("Preparing or sending pulsar message failed.", x); + throw x; } } else { log.warn("Invalid protobuf schema, expecting HfpData"); diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java index c90eb8e..646df08 100644 --- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelper.java @@ -43,22 +43,35 @@ public GtfsRtOccupancyStatusHelper(NavigableMap getOccupancyStatus(Hfp.Payload hfpPayload, PassengerCount.Payload passengerCountPayload) { if (passengerCountEnabledVehicles == null || passengerCountEnabledVehicles.contains(hfpPayload.getOper() + "/" + hfpPayload.getVeh())) { //If occu field is 100, the driver has marked the vehicle as full - if (hfpPayload.getOccu() == 100) { + if (hfpPayload.getOccu() >= 100) { return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.FULL); } if (passengerCountPayload != null) { - if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() == 0) { + if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() <= 0) { //If vehicle load is zero, vehicle load ratio is unavailable and the vehicle is empty return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY); } - + + if (passengerCountPayload.getVehicleCounts().getVehicleLoadRatio() <= 0.0) { + return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY); + } + return Optional.of(loadRatioToOccupancyStatus.lowerEntry(passengerCountPayload.getVehicleCounts().getVehicleLoadRatio()).getValue()); } - - //If passenger count from APC message is not available, but occu contains value other than 0, use that - //Currently occu is only available for Suomenlinna ferries - if (hfpPayload.getOccu() != 0) { + + //If passenger count from APC message is not available, but occu + //contains value other than 0, use that. + // + //Currently occu values larger than 0 but smaller than 100 are only + //available for Suomenlinna ferries. + // + //Many vehicles send '"occu":0' as part of the HFP MQTT payload, + //probably to mean "not full". Do not use occu == 0 even for + //ferries as it might be just a broken device or a default value. We + //would rather not publish information than publish false + //information in case of broken devices or implementation. + if (hfpPayload.getOccu() > 0) { return Optional.of(occuToOccupancyStatus.lowerEntry(hfpPayload.getOccu()).getValue()); } } diff --git a/src/main/resources/environment.conf b/src/main/resources/environment.conf index bcc3631..33d17ce 100644 --- a/src/main/resources/environment.conf +++ b/src/main/resources/environment.conf @@ -4,6 +4,8 @@ pulsar { consumer { multipleTopics=true multipleTopics=${?PULSAR_CONSUMER_ENABLE_MULTIPLE_TOPICS} + topics=["hfp-data, passenger-count"] + topics=${?PULSAR_CONSUMER_TOPICS} topicsPattern="persistent://public/default/(hfp-data|passenger-count)" topicsPattern=${?PULSAR_CONSUMER_MULTIPLE_TOPICS_PATTERN} subscription="transitdata-vehicleposition-processor-subscription" diff --git a/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java b/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java index 31be5de..efe5607 100644 --- a/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java +++ b/src/test/java/fi/hsl/transitdata/vehicleposition/application/gtfsrt/GtfsRtOccupancyStatusHelperTest.java @@ -2,6 +2,8 @@ import com.google.transit.realtime.GtfsRealtime; import fi.hsl.common.hfp.proto.Hfp; +import fi.hsl.common.passengercount.proto.PassengerCount; +import org.jetbrains.annotations.NotNull; import org.junit.Before; import org.junit.Test; @@ -34,12 +36,91 @@ public void setup() { gtfsRtOccupancyStatusHelper = new GtfsRtOccupancyStatusHelper(occuToOccupancyStatus, loadRatioToOccypancyStatus); } + + @NotNull + private static Hfp.Payload getHfpPayload(int occu) { + return Hfp.Payload.newBuilder(). + setSchemaVersion(1).setTsi(0).setTst("").setOccu(occu).build(); + } + + @NotNull + private static PassengerCount.Payload getPassengerCountPayload(int vehicleLoad, double vehicleLoadRatio) { + return PassengerCount.Payload.newBuilder().setVehicleCounts(PassengerCount.VehicleCounts.newBuilder() + .setCountQuality("").setVehicleLoad(vehicleLoad).setVehicleLoadRatio(vehicleLoadRatio).build()).build(); + } @Test - public void testOccupancyStatus() { - Optional occuStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(Hfp.Payload.newBuilder().setSchemaVersion(1).setTsi(0).setTst("").setOccu(55).build(), null); + public void testHfpOccupancyStatus() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(55), + null); assertTrue(occuStatus.isPresent()); assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.STANDING_ROOM_ONLY, occuStatus.get()); } + + @Test + public void testHfpOccupancyStatusEmpty() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(0), + null); + + assertTrue(occuStatus.isEmpty()); + } + + @Test + public void testHfpOccupancyStatusNegative() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(-1), + null); + + assertTrue(occuStatus.isEmpty()); + } + + @Test + public void testPassengerCountOccupancyStatus() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(0), + getPassengerCountPayload(10, 0.55)); + + assertTrue(occuStatus.isPresent()); + assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.STANDING_ROOM_ONLY, occuStatus.get()); + } + + @Test + public void testPassengerCountOccupancyStatusEmpty() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(0), + getPassengerCountPayload(0, 0.0)); + + assertTrue(occuStatus.isPresent()); + assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get()); + } + + @Test + public void testPassengerCountOccupancyStatusWithVehicleLoadZeroWithVehicleLoadRatioNegative() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(0), + getPassengerCountPayload(0, -1.0)); + + assertTrue(occuStatus.isPresent()); + assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get()); + } + + @Test + public void testPassengerCountOccupancyStatusWithVehicleLoadPositiveWithVehicleLoadRatioNegative() { + Optional occuStatus = + gtfsRtOccupancyStatusHelper.getOccupancyStatus( + getHfpPayload(0), + getPassengerCountPayload(1, -1.0)); + + assertTrue(occuStatus.isPresent()); + assertEquals(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY, occuStatus.get()); + } }