Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/main/java/fi/hsl/transitdata/omm/OmmAlertHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import fi.hsl.common.transitdata.TransitdataProperties;
import fi.hsl.common.transitdata.proto.InternalMessages;
import fi.hsl.transitdata.omm.db.BulletinDAO;
import fi.hsl.transitdata.omm.db.DisruptionDAO;
import fi.hsl.transitdata.omm.db.LineDAO;
import fi.hsl.transitdata.omm.db.OmmDbConnector;
import fi.hsl.transitdata.omm.db.StopPointDAO;
import fi.hsl.transitdata.omm.models.AlertState;
import fi.hsl.transitdata.omm.models.Bulletin;
import fi.hsl.transitdata.omm.models.DisruptionRoute;
import fi.hsl.transitdata.omm.models.Line;
import fi.hsl.transitdata.omm.models.Route;
import fi.hsl.transitdata.omm.models.StopPoint;
Expand Down Expand Up @@ -61,13 +63,16 @@ public void pollAndSend() throws SQLException, PulsarClientException, Exception
//For some reason the connection seem to be flaky, let's reconnect on each request.
ommConnector.connect();

DisruptionDAO disruptionDAO = ommConnector.getDisruptionDAO();
List<DisruptionRoute> disruptionRoutes = disruptionDAO.getActiveDisruptions();

BulletinDAO bulletinDAO = ommConnector.getBulletinDAO();
LineDAO lineDAO = ommConnector.getLineDAO();
StopPointDAO stopPointDAO = ommConnector.getStopPointDAO();

List<Bulletin> bulletins = bulletinDAO.getActiveBulletins();
AlertState latestState = new AlertState(bulletins);

if (!LocalDate.now().equals(linesUpdateDate)) {
lines = lineDAO.getAllLines();
linesUpdateDate = LocalDate.now();
Expand All @@ -81,7 +86,7 @@ public void pollAndSend() throws SQLException, PulsarClientException, Exception
// We want to keep Pulsar internal timestamps as accurate as possible (ms) but GTFS-RT expects milliseconds
final long currentTimestampUtcMs = ZonedDateTime.now(ZoneId.of(timeZone)).toInstant().toEpochMilli();

final InternalMessages.ServiceAlert alert = createServiceAlert(bulletins, lines, stopPoints, timeZone);
final InternalMessages.ServiceAlert alert = createServiceAlert(bulletins, lines, stopPoints, disruptionRoutes, timeZone);
sendPulsarMessage(alert, currentTimestampUtcMs);
} else {
log.info("No changes to current Service Alerts.");
Expand All @@ -93,11 +98,11 @@ public void pollAndSend() throws SQLException, PulsarClientException, Exception
}
}

static InternalMessages.ServiceAlert createServiceAlert(final List<Bulletin> bulletins, final Map<Long, Line> lines, final Map<Long, List<StopPoint>> stopPoints, final String timeZone) {
static InternalMessages.ServiceAlert createServiceAlert(final List<Bulletin> bulletins, final Map<Long, Line> lines, final Map<Long, List<StopPoint>> stopPoints, List<DisruptionRoute> disruptionRoutes, final String timeZone) {
final InternalMessages.ServiceAlert.Builder builder = InternalMessages.ServiceAlert.newBuilder();
builder.setSchemaVersion(builder.getSchemaVersion());
final List<InternalMessages.Bulletin> internalMessageBulletins = bulletins.stream()
.map(bulletin -> createBulletin(bulletin, lines, stopPoints, timeZone))
.map(bulletin -> createBulletin(bulletin, lines, stopPoints, disruptionRoutes, timeZone))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
Expand All @@ -110,7 +115,7 @@ private static long toUtcEpochMs(final LocalDateTime localTimestamp, final Strin
return localTimestamp.atZone(zone).toInstant().toEpochMilli();
}

static Optional<InternalMessages.Bulletin> createBulletin(final Bulletin bulletin, final Map<Long, Line> lines, final Map<Long, List<StopPoint>> stopPoints, final String timezone) {
static Optional<InternalMessages.Bulletin> createBulletin(final Bulletin bulletin, final Map<Long, Line> lines, final Map<Long, List<StopPoint>> stopPoints, List<DisruptionRoute> disruptionRoutes, final String timezone) {
Optional<InternalMessages.Bulletin> maybeBulletin;
try {
final InternalMessages.Bulletin.Builder builder = InternalMessages.Bulletin.newBuilder();
Expand Down Expand Up @@ -144,7 +149,7 @@ static Optional<InternalMessages.Bulletin> createBulletin(final Bulletin bulleti
builder.addAllTitles(bulletin.titles);
builder.addAllDescriptions(bulletin.descriptions);
builder.addAllUrls(bulletin.urls);

builder.addAllAffectedDisruptionRoutes(disruptionRoutes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

builder.addAllAffectedDisruptionRoutes takes a list of strings as argument. Maybe there should be DisruptionRoute message in the protobuf? What are we trying to do here?

List<InternalMessages.Bulletin.AffectedEntity> affectedRoutes = getAffectedRoutes(bulletin, lines);
List<InternalMessages.Bulletin.AffectedEntity> affectedStops = getAffectedStops(bulletin, stopPoints);
if (affectedRoutes.isEmpty() && affectedStops.isEmpty() && !bulletin.affectsAllRoutes && !bulletin.affectsAllStops) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/fi/hsl/transitdata/omm/db/DisruptionDAO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package fi.hsl.transitdata.omm.db;

import fi.hsl.transitdata.omm.models.DisruptionRoute;

import java.sql.SQLException;
import java.util.List;

public interface DisruptionDAO {
List<DisruptionRoute> getActiveDisruptions() throws SQLException;
}
141 changes: 141 additions & 0 deletions src/main/java/fi/hsl/transitdata/omm/db/DisruptionDAOImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package fi.hsl.transitdata.omm.db;


import fi.hsl.common.files.FileUtils;
import fi.hsl.common.transitdata.proto.InternalMessages;
import fi.hsl.transitdata.omm.models.DisruptionRoute;
import fi.hsl.transitdata.omm.models.DisruptionRouteLink;
import fi.hsl.transitdata.omm.models.cancellations.Stop;

import java.sql.Connection;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;

public class DisruptionDAOImpl extends DAOImplBase implements DisruptionDAO {

String queryString;
String queryLinksString;
String timezone;
int pollIntervalInSeconds;
boolean queryAllModifiedAlerts;
Comment on lines +25 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be private final


public DisruptionDAOImpl(Connection connection, String timezone) {
super(connection);
this.timezone = timezone;
this.pollIntervalInSeconds = pollIntervalInSeconds;
this.queryAllModifiedAlerts = queryAllModifiedAlerts;
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values are not used for anything

queryString = createQuery("/disruption_routes.sql");
queryLinksString = createQuery("/disruption_route_links.sql");
}

private List<DisruptionRoute> parseDisruptionRoutes(ResultSet resultSet, Map<String, Stop> stopsByGid) throws SQLException {
HashMap<String, ArrayList<DisruptionRouteLink>> disruptionLinks = getActiveDisruptionLinks();
List<DisruptionRoute> disruptionRoutes = new ArrayList<>();
log.info("Processing disruptionRoutes resultset");
while (resultSet.next()) {
try {
String disruptionRouteId = resultSet.getString("DISRUPTION_ROUTES_ID");
ArrayList<DisruptionRouteLink> disruptionRouteLinks = new ArrayList<DisruptionRouteLink>();
ArrayList<DisruptionRouteLink> disruptionRouteLinksByRouteId = disruptionLinks.get(disruptionRouteId);
if (disruptionRouteLinksByRouteId != null) {
disruptionRouteLinks = disruptionRouteLinksByRouteId;
}
String startStopGid = resultSet.getString("START_STOP_ID");
String startStopId = stopsByGid.containsKey(startStopGid) ? stopsByGid.get(startStopGid).stopId : "";
String endStopGid = resultSet.getString("END_STOP_ID");
String endStopId = stopsByGid.containsKey(startStopGid) ? stopsByGid.get(endStopGid).stopId : "";

String affectedRoutes = resultSet.getString("AFFECTED_ROUTE_IDS");
List<String> affectedRoutesList = Arrays.stream(affectedRoutes.split(",")).map(String::trim).collect(Collectors.toList());

String validFrom = resultSet.getString("DC_VALID_FROM");
String validTo = resultSet.getString("DC_VALID_TO");

disruptionRoutes.add(new DisruptionRoute(disruptionRouteId, startStopId, endStopId, affectedRoutesList, validFrom, validTo, timezone, disruptionRouteLinks));

String name = resultSet.getString("NAME");
String description = resultSet.getString("DESCRIPTION");
String type = resultSet.getString("DC_TYPE");
log.info("Found disruption route with name: {}, description: {} and type: {}", name, description, type);
} catch (IllegalArgumentException iae) {
log.error("Error while parsing the disruptionRoutes resultset", iae);
}
}
log.info("Found total {} disruption routes", disruptionRoutes.size());
return disruptionRoutes;
}

private String createQuery(String query) {
InputStream stream = getClass().getResourceAsStream(query);
try {
return FileUtils.readFileFromStreamOrThrow(stream);
} catch (Exception e) {
log.error("Error in reading sql from file:", e);
return null;
}
}

public static String localDateAsString(Instant instant, String zoneId) {
return DateTimeFormatter.ofPattern("yyyy-MM-dd").format(instant.atZone(ZoneId.of(zoneId)));
}

public HashMap<String, ArrayList<DisruptionRouteLink>> getActiveDisruptionLinks() throws SQLException {
log.info("Querying disruption routes from database");
List<DisruptionRouteLink> links = new ArrayList<>();
HashMap<String, ArrayList<DisruptionRouteLink>> linksByRouteId = new HashMap<String, ArrayList<DisruptionRouteLink>>();
String preparedString = queryLinksString;
try (PreparedStatement statement = connection.prepareStatement(preparedString)) {
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
String id = resultSet.getString("disruption_routes_id");
String deviationId = resultSet.getString("deviation_case_id");
String startStopId = resultSet.getString("start_stop_id");
String endStopId = resultSet.getString("end_stop_id");
String sequenceNumber = resultSet.getString("link_location_sequence_number");
String latitude = resultSet.getString("latitude");
String longitude = resultSet.getString("longitude");
DisruptionRouteLink disruptionLink = new DisruptionRouteLink(id, deviationId, startStopId, endStopId, sequenceNumber, latitude, longitude);

ArrayList<DisruptionRouteLink> list = linksByRouteId.get(id);
if (list != null) {
list.add(disruptionLink);
linksByRouteId.replace(id, list);
} else {
ArrayList<DisruptionRouteLink> newList = new ArrayList<DisruptionRouteLink>();
newList.add(disruptionLink);
linksByRouteId.put(id, newList);
}
}
}
catch (Exception e) {
log.error("Error while querying and processing messages", e);
throw e;
}
return linksByRouteId;
}

@Override
public List<DisruptionRoute> getActiveDisruptions() throws SQLException {
log.info("Querying disruption route links from database");
String dateFrom = localDateAsString(Instant.now(), timezone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dateFrom is not used for anything

String preparedString = queryString.replace("VAR_DATE_FROM", "1970-03-07");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the date intentionally hardcoded here?

try (PreparedStatement statement = connection.prepareStatement(preparedString)) {
ResultSet resultSet = statement.executeQuery();
HashMap<String, Stop> stopsByGid = new HashMap<String, Stop>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This HashMap is empty, so stop IDs in DisruptionRoute will be null. Is this a problem?

return parseDisruptionRoutes(resultSet, stopsByGid);
}
catch (Exception e) {
log.error("Error while querying and processing messages", e);
throw e;
}
}
}
8 changes: 8 additions & 0 deletions src/main/java/fi/hsl/transitdata/omm/db/OmmDbConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class OmmDbConnector implements AutoCloseable {
private boolean queryAllModifiedAlerts;

private BulletinDAO bulletinDAO;
private DisruptionDAO disruptionDAO;
private LineDAO lineDAO;
private StopPointDAO stopPointDAO;

Expand All @@ -35,13 +36,15 @@ public OmmDbConnector(Config config, int pollIntervalInSeconds, String jdbcConne
public void connect() throws SQLException {
connection = DriverManager.getConnection(connectionString);
bulletinDAO = new BulletinDAOImpl(connection, timezone, pollIntervalInSeconds, queryAllModifiedAlerts);
disruptionDAO = new DisruptionDAOImpl(connection, timezone);
stopPointDAO = new StopPointDAOImpl(connection, timezone);
lineDAO = new LineDAOImpl(connection);
}

@Override
public void close() throws Exception {
bulletinDAO = null;
disruptionDAO = null;
stopPointDAO = null;
lineDAO = null;
connection.close();
Expand All @@ -52,6 +55,11 @@ public BulletinDAO getBulletinDAO() {
return bulletinDAO;
}


public DisruptionDAO getDisruptionDAO() {
return disruptionDAO;
}

public LineDAO getLineDAO() {
return lineDAO;
}
Expand Down
127 changes: 127 additions & 0 deletions src/main/java/fi/hsl/transitdata/omm/models/DisruptionRoute.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package fi.hsl.transitdata.omm.models;

import fi.hsl.common.transitdata.proto.InternalMessages;
import fi.hsl.transitdata.omm.models.cancellations.Journey;
import fi.hsl.transitdata.omm.models.cancellations.JourneyPattern;
import fi.hsl.transitdata.omm.models.cancellations.JourneyPatternStop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

public class DisruptionRoute {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

private static final Logger log = LoggerFactory.getLogger(DisruptionRoute.class);

public final String disruptionRouteId;
public final String startStopId;
public final String endStopId;
private final Optional<LocalDateTime> validFromDate;
private final Optional<LocalDateTime> validToDate;
private final ZoneId timezoneId;
public final Collection<String> affectedRoutes;
private final Map<String, List<Journey>> affectedJourneysByJourneyPatternId;
private final Map<String, List<String>> affectedStopIdsByJourneyPatternId;
public final ArrayList<DisruptionRouteLink> disruptionRouteLinks;

public DisruptionRoute(String disruptionRouteId, String startStopId, String endStopId, Collection<String> affectedRoutes, String validFromDate, String validToDate, String timezone, ArrayList<DisruptionRouteLink> disruptionRouteLinks) {
this.disruptionRouteId = disruptionRouteId;
this.startStopId = startStopId;
this.endStopId = endStopId;
this.validFromDate = getDateOrEmpty(validFromDate);
this.validToDate = getDateOrEmpty(validToDate);
this.timezoneId = ZoneId.of(timezone);
this.affectedRoutes = affectedRoutes;
this.affectedJourneysByJourneyPatternId = new HashMap<>();
this.affectedStopIdsByJourneyPatternId = new HashMap<>();
this.disruptionRouteLinks = disruptionRouteLinks;
}

public void addAffectedJourneys(List<Journey> journeys) {
for (Journey journey : journeys) {
if (!affectedJourneysByJourneyPatternId.containsKey(journey.journeyPatternId)) {
affectedJourneysByJourneyPatternId.put(journey.journeyPatternId, new ArrayList<>());
}
affectedJourneysByJourneyPatternId.get(journey.journeyPatternId).add(journey);
}
}

public Optional<String> getValidFrom() {
return validFromDate.map(DATE_TIME_FORMATTER::format);
}

public Optional<String> getValidTo() {
return validToDate.map(DATE_TIME_FORMATTER::format);
}

public Collection<String> getAffectedJourneyPatternIds() {
return affectedJourneysByJourneyPatternId.keySet();
}

public void findAddAffectedStops(Map<String, JourneyPattern> journeyPatternsById) {
for (String jpId : affectedJourneysByJourneyPatternId.keySet()) {
JourneyPattern journeyPattern = journeyPatternsById.get(jpId);
Optional<List<JourneyPatternStop>> stopsBetween = journeyPattern.getStopsBetweenTwoStops(startStopId, endStopId);
stopsBetween.ifPresent(stops -> affectedStopIdsByJourneyPatternId.put(jpId, stops.stream().map(stop -> stop.stopId).collect(Collectors.toList())));
}
}

public Optional<LocalDateTime> getDateOrEmpty(String dateStr) {
try {
return Optional.of(LocalDateTime.parse(dateStr.replace(" ", "T")));
} catch (Exception e) {
return Optional.empty();
}
}

public List <InternalMessages.StopCancellations.StopCancellation> getAsStopCancellations() {
// find unique cancelledStopIds from all affected stop ids
Set<String> cancelledStopIds = affectedStopIdsByJourneyPatternId.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

if (cancelledStopIds.isEmpty()) {
log.info("No stop cancellations were created by disruption route {}", disruptionRouteId);
return Collections.emptyList();
}

validateStopCancellations(cancelledStopIds);
return cancelledStopIds.stream().map(stopId -> {
InternalMessages.StopCancellations.StopCancellation.Builder builder = InternalMessages.StopCancellations.StopCancellation.newBuilder();
builder.setCause(InternalMessages.StopCancellations.Cause.JOURNEY_PATTERN_DETOUR);
builder.setStopId(stopId);
validFromDate.ifPresent(dateTime -> builder.setValidFromUnixS(toUtcEpochSeconds(dateTime)));
validToDate.ifPresent(dateTime -> builder.setValidToUnixS(toUtcEpochSeconds(dateTime)));
builder.addAllAffectedJourneyPatternIds(new ArrayList<>(affectedStopIdsByJourneyPatternId.keySet()));
return builder.build();
}).collect(Collectors.toList());
}

//TODO: what does this do?
private void validateStopCancellations(Set<String> cancelledStopIds) {
// check that there's no journey pattern specific stop cancellations
Set<String> compareStopIds = new HashSet<>(affectedStopIdsByJourneyPatternId.values().iterator().next());
if (!cancelledStopIds.equals(compareStopIds)) {
log.warn("Found journey pattern specific stop cancellations by disruption route");
}
}

public List <InternalMessages.JourneyPattern> getAffectedJourneyPatterns(Map<String, JourneyPattern> journeyPatternById) {
return affectedJourneysByJourneyPatternId.keySet().stream().map(jpId -> {
JourneyPattern affectedJourneyPattern = journeyPatternById.get(jpId).createNewWithSameStops();
affectedJourneyPattern.addAffectedJourneys(affectedJourneysByJourneyPatternId.get(jpId));
return affectedJourneyPattern.getAsProtoBuf();
}).collect(Collectors.toList());
}

private Long toUtcEpochSeconds(LocalDateTime dt) {
// convert dt to Unix time (i.e. Epoch time in seconds)
return dt.atZone(timezoneId).toEpochSecond();
}

}
Loading