Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ jobs:
distribution: 'adopt'
java-version: '11'
cache: 'maven'
- name: Run Spotless Apply
run: mvn spotless:apply
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: mvn --file pom.xml clean install
env:
Expand Down
18 changes: 18 additions & 0 deletions eclipse-java-formatter.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<profiles version="13">
<profile kind="CodeFormatterProfile" name="Custom-4spaces-NoCommentSplit" version="13">

<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>

<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>

<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="false"/>

</profile>
</profiles>
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<common.version>2.0.3-RC6</common.version>
<common.version>2.0.5</common.version>
<testcontainers.version>1.17.6</testcontainers.version>
<spotlessMavenPlugin.version>2.43.0</spotlessMavenPlugin.version>
<googleJavaFormat.version>1.17.0</googleJavaFormat.version>
</properties>
<profiles>
<!-- The Configuration of the unit-test profile -->
Expand Down Expand Up @@ -205,6 +207,32 @@
</executions>
</plugin>

<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotlessMavenPlugin.version}</version>
<configuration>
<java>
<eclipse>
<file>${project.basedir}/eclipse-java-formatter.xml</file>
</eclipse>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>spotless-apply</id>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,23 @@ private void registerHandlers(PulsarApplicationContext context) {

final boolean filterTrainData = config.getBoolean("validator.filterTrainData");

processors.put(ProtobufSchema.InternalMessagesStopEstimate, new StopEstimateProcessor(tripUpdateProcessor, filterTrainData));
processors.put(ProtobufSchema.InternalMessagesTripCancellation, new TripCancellationProcessor(tripUpdateProcessor, filterTrainData));
processors.put(ProtobufSchema.InternalMessagesStopEstimate,
new StopEstimateProcessor(tripUpdateProcessor, filterTrainData));
processors.put(ProtobufSchema.InternalMessagesTripCancellation,
new TripCancellationProcessor(tripUpdateProcessor, filterTrainData));
}

private List<ITripUpdateValidator> registerTripUpdateValidators() {

List<ITripUpdateValidator> tripUpdateValidators = new ArrayList<>();

tripUpdateValidators.add(new TripUpdateMaxAgeValidator(config.getDuration("validator.tripUpdateMaxAge", TimeUnit.SECONDS)));
tripUpdateValidators.add(new PrematureDeparturesValidator(config.getDuration("validator.tripUpdateMinTimeBeforeDeparture", TimeUnit.SECONDS),
tripUpdateValidators
.add(new TripUpdateMaxAgeValidator(config.getDuration("validator.tripUpdateMaxAge", TimeUnit.SECONDS)));
tripUpdateValidators.add(new PrematureDeparturesValidator(
config.getDuration("validator.tripUpdateMinTimeBeforeDeparture", TimeUnit.SECONDS),
config.getString("validator.timezone")));
tripUpdateValidators.add(new MissingEstimatesValidator(config.getInt("validator.tripUpdateMaxMissingEstimates")));
tripUpdateValidators
.add(new MissingEstimatesValidator(config.getInt("validator.tripUpdateMaxMissingEstimates")));

return tripUpdateValidators;

Expand All @@ -84,7 +89,8 @@ public void handleMessage(Message received) {
if (processor != null) {
if (processor.validateMessage(received.getData())) {

Optional<AbstractMessageProcessor.TripUpdateWithId> maybeTripUpdate = processor.processMessage(received);
Optional<AbstractMessageProcessor.TripUpdateWithId> maybeTripUpdate = processor
.processMessage(received);
if (maybeTripUpdate.isPresent()) {
final AbstractMessageProcessor.TripUpdateWithId pair = maybeTripUpdate.get();
final GtfsRealtime.TripUpdate tripUpdate = pair.getTripUpdate();
Expand All @@ -93,9 +99,13 @@ public void handleMessage(Message received) {
final boolean isValid = validator.validate(tripUpdate);
if (!isValid) {
final GtfsRealtime.TripDescriptor trip = tripUpdate.getTrip();
log.debug("Trip update for {} / {} / {} / {} failed validation when validating with {}", trip.getRouteId(), trip.getDirectionId(), trip.getStartDate(), trip.getStartTime(), validator.getClass().getName());
log.debug(
"Trip update for {} / {} / {} / {} failed validation when validating with {}",
trip.getRouteId(), trip.getDirectionId(), trip.getStartDate(),
trip.getStartTime(), validator.getClass().getName());

messageStats.incrementInvalidTripUpdates("validator-" + validator.getClass().getSimpleName());
messageStats.incrementInvalidTripUpdates(
"validator-" + validator.getClass().getSimpleName());
}
return isValid;
});
Expand All @@ -118,12 +128,11 @@ public void handleMessage(Message received) {
}
});

consumer.acknowledgeAsync(received)
.exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
})
.thenRun(() -> {});
consumer.acknowledgeAsync(received).exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
}).thenRun(() -> {
});
} catch (Exception e) {
log.error("Exception while handling message", e);
}
Expand All @@ -133,22 +142,23 @@ public void handleMessage(Message received) {
}
}

private void sendTripUpdate(final AbstractMessageProcessor.TripUpdateWithId tuIdPair, final long pulsarEventTimestamp) {
private void sendTripUpdate(final AbstractMessageProcessor.TripUpdateWithId tuIdPair,
final long pulsarEventTimestamp) {
messageStats.incrementMessagesSent();

final String tripId = tuIdPair.getTripId();
final GtfsRealtime.TripUpdate tripUpdate = tuIdPair.getTripUpdate();

debouncer.debounce(tripId, () -> {
GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(tripId, tripUpdate, tripUpdate.getTimestamp());
producer.newMessage()
.key(tripId)
.eventTime(pulsarEventTimestamp)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA, TransitdataProperties.ProtobufSchema.GTFS_TripUpdate.toString())
.value(feedMessage.toByteArray())
.sendAsync()
GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createDifferentialFeedMessage(tripId, tripUpdate,
tripUpdate.getTimestamp());
producer.newMessage().key(tripId).eventTime(pulsarEventTimestamp)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA,
TransitdataProperties.ProtobufSchema.GTFS_TripUpdate.toString())
.value(feedMessage.toByteArray()).sendAsync()
.thenRun(() -> log.debug("Sending TripUpdate for tripId {} with {} StopTimeUpdates and status {}",
tripId, tripUpdate.getStopTimeUpdateCount(), tripUpdate.getTrip().getScheduleRelationship()));
tripId, tripUpdate.getStopTimeUpdateCount(),
tripUpdate.getTrip().getScheduleRelationship()));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package fi.hsl.transitdata.tripupdate.application;


import org.slf4j.Logger;

import java.util.HashMap;
Expand Down Expand Up @@ -71,12 +70,11 @@ public void logAndReset(Logger logger) {

@Override
public String toString() {
final String reasonsText = invalidTripUpdateReasons.entrySet().stream().map(entry -> entry.getKey() + ": " + entry.getValue()).collect(Collectors.joining(", "));
final String reasonsText = invalidTripUpdateReasons.entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue()).collect(Collectors.joining(", "));

return "Message stats:\n"+
"\tStart time: " + getDurationSecs() + " seconds ago\n" +
"\tMessages received: " + messagesReceived + "\n" +
"\tMessages sent: " + messagesSent + "\n" +
"\tInvalid trip updates: " + invalidTripUpdates + "(" + reasonsText + ")";
return "Message stats:\n" + "\tStart time: " + getDurationSecs() + " seconds ago\n" + "\tMessages received: "
+ messagesReceived + "\n" + "\tMessages sent: " + messagesSent + "\n" + "\tInvalid trip updates: "
+ invalidTripUpdates + "(" + reasonsText + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,34 @@ public static GtfsRealtime.TripUpdate.StopTimeUpdate newStopTimeUpdate(InternalM
}

public static GtfsRealtime.TripUpdate.StopTimeUpdate newStopTimeUpdateFromPrevious(
final InternalMessages.StopEstimate stopEstimate,
GtfsRealtime.TripUpdate.StopTimeUpdate previousUpdate) {
final InternalMessages.StopEstimate stopEstimate, GtfsRealtime.TripUpdate.StopTimeUpdate previousUpdate) {

GtfsRealtime.TripUpdate.StopTimeUpdate.Builder stopTimeUpdateBuilder = null;
if (previousUpdate != null) {
stopTimeUpdateBuilder = previousUpdate.toBuilder();
} else {
String stopId = stopEstimate.getStopId();
int stopSequence = stopEstimate.getStopSequence();
stopTimeUpdateBuilder = GtfsRealtime.TripUpdate.StopTimeUpdate.newBuilder()
.setStopId(stopId)
stopTimeUpdateBuilder = GtfsRealtime.TripUpdate.StopTimeUpdate.newBuilder().setStopId(stopId)
.setStopSequence(stopSequence);
}

switch (stopEstimate.getStatus()) {
case SKIPPED:
stopTimeUpdateBuilder.setScheduleRelationship(GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.SKIPPED);
case SKIPPED :
stopTimeUpdateBuilder
.setScheduleRelationship(GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.SKIPPED);
break;
case SCHEDULED:
stopTimeUpdateBuilder.setScheduleRelationship(GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.SCHEDULED);
case SCHEDULED :
stopTimeUpdateBuilder
.setScheduleRelationship(GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.SCHEDULED);
break;
case NO_DATA:
case NO_DATA :
//If there is no data for current or previous stop time update, set ScheduleRelationship to NO_DATA
if (previousUpdate == null || previousUpdate.getScheduleRelationship() == GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.NO_DATA) {
stopTimeUpdateBuilder.setScheduleRelationship(GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.NO_DATA);
//Otherwise use ScheduleRelationship of previous stop time update
if (previousUpdate == null || previousUpdate
.getScheduleRelationship() == GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.NO_DATA) {
stopTimeUpdateBuilder.setScheduleRelationship(
GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship.NO_DATA);
//Otherwise use ScheduleRelationship of previous stop time update
} else {
stopTimeUpdateBuilder.setScheduleRelationship(previousUpdate.getScheduleRelationship());
}
Expand All @@ -54,8 +56,8 @@ public static GtfsRealtime.TripUpdate.StopTimeUpdate newStopTimeUpdateFromPrevio
// GTFS-RT treats times in seconds
long stopEventTimeInSeconds = stopEstimate.getEstimatedTimeUtcMs() / 1000;

GtfsRealtime.TripUpdate.StopTimeEvent.Builder stopTimeEvent = GtfsRealtime.TripUpdate.StopTimeEvent.newBuilder()
.setTime(stopEventTimeInSeconds);
GtfsRealtime.TripUpdate.StopTimeEvent.Builder stopTimeEvent = GtfsRealtime.TripUpdate.StopTimeEvent
.newBuilder().setTime(stopEventTimeInSeconds);

//Whether the event was observed in real world (i.e. not an estimate)
final boolean observedTime = stopEstimate.hasObservedTime() && stopEstimate.getObservedTime();
Expand All @@ -64,10 +66,10 @@ public static GtfsRealtime.TripUpdate.StopTimeUpdate newStopTimeUpdateFromPrevio
}

switch (stopEstimate.getType()) {
case ARRIVAL:
case ARRIVAL :
stopTimeUpdateBuilder.setArrival(stopTimeEvent);
break;
case DEPARTURE:
case DEPARTURE :
stopTimeUpdateBuilder.setDeparture(stopTimeEvent);
break;
}
Expand All @@ -83,54 +85,50 @@ public static long lastModified(InternalMessages.StopEstimate estimate) {
public static GtfsRealtime.TripUpdate newTripUpdate(InternalMessages.StopEstimate estimate) {
final int direction = PubtransFactory.joreDirectionToGtfsDirection(estimate.getTripInfo().getDirectionId());
String routeId = RouteIdUtils.normalizeRouteId(estimate.getTripInfo().getRouteId());

GtfsRealtime.TripDescriptor.ScheduleRelationship scheduleType = mapInternalScheduleTypeToGtfsRt(estimate.getTripInfo().getScheduleType());


GtfsRealtime.TripDescriptor.ScheduleRelationship scheduleType = mapInternalScheduleTypeToGtfsRt(
estimate.getTripInfo().getScheduleType());

GtfsRealtime.TripDescriptor.Builder tripDescriptor = GtfsRealtime.TripDescriptor.newBuilder()
.setRouteId(routeId)
.setDirectionId(direction)
.setStartDate(estimate.getTripInfo().getOperatingDay()) // Local date as String
.setRouteId(routeId).setDirectionId(direction).setStartDate(estimate.getTripInfo().getOperatingDay()) // Local date as String
.setStartTime(estimate.getTripInfo().getStartTime()) // Local time as String
.setScheduleRelationship(scheduleType);

//Trips outside of static schedule need trip ID to be accepted by OTP
if (scheduleType != GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED) {
tripDescriptor.setTripId(generateTripId(estimate.getTripInfo()));
}

GtfsRealtime.TripUpdate.Builder tripUpdateBuilder = GtfsRealtime.TripUpdate.newBuilder()
.setTrip(tripDescriptor)
GtfsRealtime.TripUpdate.Builder tripUpdateBuilder = GtfsRealtime.TripUpdate.newBuilder().setTrip(tripDescriptor)
.setTimestamp(lastModified(estimate));

return tripUpdateBuilder.build();
}

private static GtfsRealtime.TripDescriptor.ScheduleRelationship mapInternalScheduleTypeToGtfsRt(InternalMessages.TripInfo.ScheduleType scheduleType) {
private static GtfsRealtime.TripDescriptor.ScheduleRelationship mapInternalScheduleTypeToGtfsRt(
InternalMessages.TripInfo.ScheduleType scheduleType) {
switch (scheduleType) {
case ADDED:
case ADDED :
return GtfsRealtime.TripDescriptor.ScheduleRelationship.ADDED;
case UNSCHEDULED:
case UNSCHEDULED :
return GtfsRealtime.TripDescriptor.ScheduleRelationship.UNSCHEDULED;
case SCHEDULED:
default:
case SCHEDULED :
default :
return GtfsRealtime.TripDescriptor.ScheduleRelationship.SCHEDULED;
}
}

public static GtfsRealtime.TripUpdate newTripUpdate(InternalMessages.TripCancellation cancellation, long timestampMs) {
public static GtfsRealtime.TripUpdate newTripUpdate(InternalMessages.TripCancellation cancellation,
long timestampMs) {
final int gtfsRtDirection = PubtransFactory.joreDirectionToGtfsDirection(cancellation.getDirectionId());
String routeId = RouteIdUtils.normalizeRouteId(cancellation.getRouteId());

GtfsRealtime.TripDescriptor tripDescriptor = GtfsRealtime.TripDescriptor.newBuilder()
.setRouteId(routeId)
.setDirectionId(gtfsRtDirection)
.setStartDate(cancellation.getStartDate())
GtfsRealtime.TripDescriptor tripDescriptor = GtfsRealtime.TripDescriptor.newBuilder().setRouteId(routeId)
.setDirectionId(gtfsRtDirection).setStartDate(cancellation.getStartDate())
.setStartTime(cancellation.getStartTime())
.setScheduleRelationship(GtfsRealtime.TripDescriptor.ScheduleRelationship.CANCELED)
.build();
.setScheduleRelationship(GtfsRealtime.TripDescriptor.ScheduleRelationship.CANCELED).build();

GtfsRealtime.TripUpdate.Builder tripUpdateBuilder = GtfsRealtime.TripUpdate.newBuilder()
.setTrip(tripDescriptor)
GtfsRealtime.TripUpdate.Builder tripUpdateBuilder = GtfsRealtime.TripUpdate.newBuilder().setTrip(tripDescriptor)
.setTimestamp(timestampMs / 1000);

return tripUpdateBuilder.build();
Expand All @@ -142,6 +140,7 @@ public static GtfsRealtime.TripUpdate newTripUpdate(InternalMessages.TripCancell
* @return Trip ID
*/
private static String generateTripId(InternalMessages.TripInfo tripInfo) {
return tripInfo.getRouteId()+"_"+tripInfo.getOperatingDay()+"_"+tripInfo.getStartTime()+"_"+tripInfo.getDirectionId();
return tripInfo.getRouteId() + "_" + tripInfo.getOperatingDay() + "_" + tripInfo.getStartTime() + "_"
+ tripInfo.getDirectionId();
}
}
Loading