Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c02be78
Update version to 1.1.1
mjaakko Apr 20, 2023
ce4489d
Bump caffeine from 3.1.5 to 3.1.6
dependabot[bot] May 1, 2023
6dfebc3
Bump mockito-core from 5.2.0 to 5.3.1
dependabot[bot] May 1, 2023
c643415
Merge pull request #57 from HSLdevcom/dependabot/maven/develop/org.mo…
mjaakko May 3, 2023
c6f8d60
Merge pull request #56 from HSLdevcom/dependabot/maven/develop/com.gi…
mjaakko May 3, 2023
ea0d416
fix: Add logging for debugging purposes (#61)
thjarvin Oct 19, 2023
f8ea20d
fix: Divide try-catch block into smaller ones
thjarvin Oct 19, 2023
37f307c
Merge pull request #62 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 19, 2023
ea975cf
fix: Merge two try-catch blocks
thjarvin Oct 19, 2023
52b469f
Merge pull request #63 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 19, 2023
68d50ac
fix: Remove temporary logging
thjarvin Oct 19, 2023
a100376
Merge pull request #64 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 19, 2023
6834f30
fix: Add temporarily more info to log.error message
thjarvin Oct 20, 2023
8fad937
Merge pull request #65 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 20, 2023
4f3fd30
fix: Add temporarily even more info to log.error message
thjarvin Oct 20, 2023
89b3fe5
Merge pull request #66 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 20, 2023
9b9bc6a
fix: Add temporary info about passenger count payload to log.error me…
thjarvin Oct 20, 2023
93ebc71
Merge pull request #67 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Oct 20, 2023
090f723
fix: HFP occupancy status is set EMPTY when occu is equal to or below…
thjarvin Nov 2, 2023
4f76aff
fix: Add temporarily more info to log.error message
thjarvin Nov 2, 2023
8764910
Merge pull request #69 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
d1c588f
fix: Removed temporary info from log.error message
thjarvin Nov 2, 2023
8310167
Merge pull request #71 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
07dbfde
fix: Add logging around method gtfsRtOccupancyStatusHelper.getOccupan…
thjarvin Nov 2, 2023
919a9a7
Merge pull request #72 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
a29cbb8
fix: Add logging inside method GtfsRtOccupancyStatusHelper.getOccupan…
thjarvin Nov 2, 2023
8d2674d
Merge pull request #73 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
1235d5e
fix: Add more logging inside GtfsRtOccupancyStatusHelper.getOccupancy…
thjarvin Nov 2, 2023
eaebb34
Merge pull request #74 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
6835f5b
fix: Add more logging inside GtfsRtOccupancyStatusHelper.getOccupancy…
thjarvin Nov 2, 2023
14df645
Merge pull request #75 from HSLdevcom/fix/exception-while-handling-me…
thjarvin Nov 2, 2023
3784729
fix: Add more logging inside GtfsRtOccupancyStatusHelper.getOccupancy…
thjarvin Nov 2, 2023
977f0f0
Merge pull request #76 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 2, 2023
7df9170
fix: Add more logging inside GtfsRtOccupancyStatusHelper.getOccupancy…
thjarvin Nov 3, 2023
83df082
Merge pull request #77 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 3, 2023
145562a
fix: Add more logging inside GtfsRtOccupancyStatusHelper.getOccupancy…
thjarvin Nov 6, 2023
0930abc
Merge pull request #78 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 6, 2023
b688079
fix: Add more logging to GtfsRtOccupancyStatusHelper.getOccupancyStatus
thjarvin Nov 6, 2023
409666e
Merge pull request #79 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 6, 2023
8f901c8
fix: Add more logging to GtfsRtOccupancyStatusHelper.getOccupancyStatus
thjarvin Nov 6, 2023
8680c4c
Merge pull request #80 from HSLdevcom/fix/preparing_or_sending_pulsar…
thjarvin Nov 6, 2023
332f79e
fix: Remove debug logging
thjarvin Nov 6, 2023
3f94187
fix: Refactor tests
thjarvin Nov 6, 2023
378f6fe
fix: Occupancy status is set EMPTY when vehicle load ratio is equal t…
thjarvin Nov 6, 2023
7483ab0
fix: '==' updated to '>=' and '<='
thjarvin Nov 7, 2023
976e020
Merge pull request #81 from HSLdevcom/fix/vehicle-load-ratio-is-zero
thjarvin Nov 7, 2023
f40f5b8
fix: Occu value zero is not used
thjarvin Nov 16, 2023
883fb44
Merge pull request #83 from HSLdevcom/fix/dont-use-zero-occu
thjarvin Nov 17, 2023
bd7d889
Add environment variable pulsar.consumer.topics
thjarvin Oct 18, 2024
e52a530
Update dependency common to version 1.6.3
thjarvin Oct 18, 2024
60a6fa2
Update dependency common to version 1.6.4
thjarvin Oct 18, 2024
951c3b5
Update dependency common to version 1.6.5-RC
thjarvin Oct 18, 2024
777554f
Update dependency common to version 1.6.6-RC
thjarvin Oct 21, 2024
5370033
Update transitdata-common to 2.0.1
thjarvin Nov 22, 2024
c96182b
Update version to 2.0.0
thjarvin Nov 22, 2024
b1abd16
Merge branch 'master' into develop
thjarvin Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

<groupId>fi.hsl.transitdata</groupId>
<artifactId>transitdata-vehicleposition-processor</artifactId>
<version>1.1.1</version>
<version>2.0.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<common.version>1.4.4</common.version>
<common.version>2.0.1</common.version>
</properties>

<repositories>
Expand All @@ -28,7 +28,7 @@
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.5</version>
<version>3.1.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.2.0</version>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fi.hsl.transitdata.vehicleposition.application;


import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import com.typesafe.config.Config;
import fi.hsl.common.gtfsrt.FeedMessageFactory;
Expand Down Expand Up @@ -99,84 +100,121 @@ private static String getUniqueVehicleId(int oper, int veh) {
public void handleMessage(Message message) {
try {
if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.PassengerCount)) {
PassengerCount.Data data = PassengerCount.Data.parseFrom(message.getData());

final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh());

passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload());
} else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) {
Hfp.Data data = Hfp.Data.parseFrom(message.getData());

//Ignore HFP messages that are not sent from vehicles on a journey
if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) {
return;
PassengerCount.Data data = null;
try {
data = PassengerCount.Data.parseFrom(message.getData());
} catch (InvalidProtocolBufferException e) {
log.error("Failed to parse passenger count data", e);
throw new Exception(e);
}

//Ignore HFP messages that are not sent from vehicles on an ongoing journey
if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) {
return;
}

//Ignore events that are not relevant to calculating stop status
if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP &&
data.getTopic().getEventType() != Hfp.Topic.EventType.DUE &&
data.getTopic().getEventType() != Hfp.Topic.EventType.PAS &&
data.getTopic().getEventType() != Hfp.Topic.EventType.ARS &&
data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) {
log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType());
return;

try {
final String uniqueVehicleId = getUniqueVehicleId(data.getPayload().getOper(), data.getPayload().getVeh());

passengerCountCache.updatePassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir(), data.getPayload());
} catch (Exception x) {
log.error("Failed to get unique vehicleId and update passenger count");
throw x;
}

final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir());

if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) {
//If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position
log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId());
return;
} else if (TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.HfpData)) {
Hfp.Data data = null;
try {
data = Hfp.Data.parseFrom(message.getData());
} catch (InvalidProtocolBufferException e) {
log.error("Failed to parse HfpData", e);
throw new Exception(e);
}

if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) {
//Vehicle had invalid timestamp..
return;

try {
//Ignore HFP messages that are not sent from vehicles on a journey
if (data.getTopic().getJourneyType() != Hfp.Topic.JourneyType.journey) {
// This log statement is commented out because it produces a big amount of log items
//log.info("Ignore HFP messages that are not sent from vehicles on a journey");
return;
}

//Ignore HFP messages that are not sent from vehicles on an ongoing journey
if (data.getTopic().getTemporalType() != Hfp.Topic.TemporalType.ongoing) {
// This log statement is commented out because it produces a big amount of log items
//log.info("Ignored message since vehicle wasn't on a journey");
return;
}

//Ignore events that are not relevant to calculating stop status
if (data.getTopic().getEventType() != Hfp.Topic.EventType.VP &&
data.getTopic().getEventType() != Hfp.Topic.EventType.DUE &&
data.getTopic().getEventType() != Hfp.Topic.EventType.PAS &&
data.getTopic().getEventType() != Hfp.Topic.EventType.ARS &&
data.getTopic().getEventType() != Hfp.Topic.EventType.PDE) {
log.debug("Ignoring HFP message with event type {}", data.getTopic().getEventType());
return;
}
} catch (Exception x) {
log.error("Topic related checks failed");
throw x;
}

if (!vehicleDelayValidator.validateDelay(data)) {
// Vehicle was delayed too much
return;
final boolean tripAlreadyTaken = !tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir());

try {
if (tripAlreadyTaken && !addedTripsEnabledModes.contains(data.getTopic().getTransportMode())) {
//If some other vehicle was registered for the trip and the vehicle is not a bus, do not produce vehicle position
log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId());
return;
}
} catch (Exception x) {
log.error("tripAlreadyTaken check failed");
throw x;
}

StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data);

String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber());
PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir());
if (!isValidPassengerCountData(passengerCount)) {
if (passengerCount != null) {
log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})",
uniqueVehicleId,
passengerCount.getVehicleCounts().getVehicleLoad(),
passengerCount.getVehicleCounts().getVehicleLoadRatio());

try {
if (!vehicleTimestampValidator.validateTimestamp(data, message.getEventTime())) {
//Vehicle had invalid timestamp..
return;
}

//Don't use invalid data
passengerCount = null;

if (!vehicleDelayValidator.validateDelay(data)) {
// Vehicle was delayed too much
return;
}
} catch (Exception x) {
log.error("Validations failed");
throw x;
}

Optional<GtfsRealtime.VehiclePosition.OccupancyStatus> maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount);

Optional<GtfsRealtime.VehiclePosition> optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus);

if (optionalVehiclePosition.isPresent()) {
final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get();

final String topicSuffix = getTopicSuffix(vehiclePosition);

final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi());

if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) {
messagesDelayed++;

try {
StopStatusProcessor.StopStatus stopStatus = stopStatusProcessor.getStopStatus(data);
String uniqueVehicleId = getUniqueVehicleId(data.getTopic().getOperatorId(), data.getTopic().getVehicleNumber());
PassengerCount.Payload passengerCount = passengerCountCache.getPassengerCount(uniqueVehicleId, data.getPayload().getRoute(), data.getPayload().getOday(), data.getPayload().getStart(), data.getPayload().getDir());

if (!isValidPassengerCountData(passengerCount)) {
if (passengerCount != null) {
log.debug("Passenger count for vehicle {} was invalid (vehicle load: {}, vehicle load ratio: {})",
uniqueVehicleId,
passengerCount.getVehicleCounts().getVehicleLoad(),
passengerCount.getVehicleCounts().getVehicleLoadRatio());
}

//Don't use invalid data
passengerCount = null;
}

sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi());

Optional<GtfsRealtime.VehiclePosition.OccupancyStatus> maybeOccupancyStatus = gtfsRtOccupancyStatusHelper.getOccupancyStatus(data.getPayload(), passengerCount);
Optional<GtfsRealtime.VehiclePosition> optionalVehiclePosition = GtfsRtGenerator.generateVehiclePosition(data, tripAlreadyTaken ? GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED : GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED, stopStatus, maybeOccupancyStatus);

if (optionalVehiclePosition.isPresent()) {
final GtfsRealtime.VehiclePosition vehiclePosition = optionalVehiclePosition.get();
final String topicSuffix = getTopicSuffix(vehiclePosition);
final GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(generateEntityId(data), vehiclePosition, data.getPayload().getTsi());

if (Duration.ofMillis(System.currentTimeMillis() - (data.getPayload().getTsi() * 1000)).compareTo(DELAYED_MESSAGE_THRESHOLD) >= 0) {
messagesDelayed++;
}
sendPulsarMessage(data.getTopic().getUniqueVehicleId(), topicSuffix, feedMessage, data.getPayload().getTsi());
}
} catch (Exception x) {
log.error("Preparing or sending pulsar message failed.", x);
throw x;
}
} else {
log.warn("Invalid protobuf schema, expecting HfpData");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,35 @@ public GtfsRtOccupancyStatusHelper(NavigableMap<Integer, GtfsRealtime.VehiclePos
public Optional<GtfsRealtime.VehiclePosition.OccupancyStatus> getOccupancyStatus(Hfp.Payload hfpPayload, PassengerCount.Payload passengerCountPayload) {
if (passengerCountEnabledVehicles == null || passengerCountEnabledVehicles.contains(hfpPayload.getOper() + "/" + hfpPayload.getVeh())) {
//If occu field is 100, the driver has marked the vehicle as full
if (hfpPayload.getOccu() == 100) {
if (hfpPayload.getOccu() >= 100) {
return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.FULL);
}

if (passengerCountPayload != null) {
if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() == 0) {
if (passengerCountPayload.getVehicleCounts().hasVehicleLoad() && passengerCountPayload.getVehicleCounts().getVehicleLoad() <= 0) {
//If vehicle load is zero, vehicle load ratio is unavailable and the vehicle is empty
return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY);
}


if (passengerCountPayload.getVehicleCounts().getVehicleLoadRatio() <= 0.0) {
return Optional.of(GtfsRealtime.VehiclePosition.OccupancyStatus.EMPTY);
}

return Optional.of(loadRatioToOccupancyStatus.lowerEntry(passengerCountPayload.getVehicleCounts().getVehicleLoadRatio()).getValue());
}

//If passenger count from APC message is not available, but occu contains value other than 0, use that
//Currently occu is only available for Suomenlinna ferries
if (hfpPayload.getOccu() != 0) {

//If passenger count from APC message is not available, but occu
//contains value other than 0, use that.
//
//Currently occu values larger than 0 but smaller than 100 are only
//available for Suomenlinna ferries.
//
//Many vehicles send '"occu":0' as part of the HFP MQTT payload,
//probably to mean "not full". Do not use occu == 0 even for
//ferries as it might be just a broken device or a default value. We
//would rather not publish information than publish false
//information in case of broken devices or implementation.
if (hfpPayload.getOccu() > 0) {
return Optional.of(occuToOccupancyStatus.lowerEntry(hfpPayload.getOccu()).getValue());
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/environment.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pulsar {
consumer {
multipleTopics=true
multipleTopics=${?PULSAR_CONSUMER_ENABLE_MULTIPLE_TOPICS}
topics=["hfp-data, passenger-count"]
topics=${?PULSAR_CONSUMER_TOPICS}
topicsPattern="persistent://public/default/(hfp-data|passenger-count)"
topicsPattern=${?PULSAR_CONSUMER_MULTIPLE_TOPICS_PATTERN}
subscription="transitdata-vehicleposition-processor-subscription"
Expand Down
Loading
Loading