-
Notifications
You must be signed in to change notification settings - Fork 0
654 disruption routes #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
a8b9457
656d19f
3cb9558
99e41d3
4f58c5f
1fc49fb
f979b75
9537f49
feac0ad
574065c
8f1e327
9cf3d47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package fi.hsl.transitdata.omm.db; | ||
|
|
||
| import fi.hsl.transitdata.omm.models.DisruptionRoute; | ||
|
|
||
| import java.sql.SQLException; | ||
| import java.util.List; | ||
|
|
||
| public interface DisruptionDAO { | ||
| List<DisruptionRoute> getActiveDisruptions() throws SQLException; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| package fi.hsl.transitdata.omm.db; | ||
|
|
||
|
|
||
| import fi.hsl.common.files.FileUtils; | ||
| import fi.hsl.common.transitdata.proto.InternalMessages; | ||
| import fi.hsl.transitdata.omm.models.DisruptionRoute; | ||
| import fi.hsl.transitdata.omm.models.DisruptionRouteLink; | ||
| import fi.hsl.transitdata.omm.models.cancellations.Stop; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.io.InputStream; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import java.util.*; | ||
| import java.util.stream.Collectors; | ||
| import java.time.Instant; | ||
| import java.time.ZoneId; | ||
| import java.time.ZonedDateTime; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.time.temporal.ChronoUnit; | ||
|
|
||
| public class DisruptionDAOImpl extends DAOImplBase implements DisruptionDAO { | ||
|
|
||
| String queryString; | ||
| String queryLinksString; | ||
| String timezone; | ||
| int pollIntervalInSeconds; | ||
| boolean queryAllModifiedAlerts; | ||
|
Comment on lines
+25
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should be |
||
|
|
||
| public DisruptionDAOImpl(Connection connection, String timezone) { | ||
| super(connection); | ||
| this.timezone = timezone; | ||
| this.pollIntervalInSeconds = pollIntervalInSeconds; | ||
| this.queryAllModifiedAlerts = queryAllModifiedAlerts; | ||
|
Comment on lines
+34
to
+35
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These values are not used for anything |
||
| queryString = createQuery("/disruption_routes.sql"); | ||
| queryLinksString = createQuery("/disruption_route_links.sql"); | ||
| } | ||
|
|
||
| private List<DisruptionRoute> parseDisruptionRoutes(ResultSet resultSet, Map<String, Stop> stopsByGid) throws SQLException { | ||
| HashMap<String, ArrayList<DisruptionRouteLink>> disruptionLinks = getActiveDisruptionLinks(); | ||
| List<DisruptionRoute> disruptionRoutes = new ArrayList<>(); | ||
| log.info("Processing disruptionRoutes resultset"); | ||
| while (resultSet.next()) { | ||
| try { | ||
| String disruptionRouteId = resultSet.getString("DISRUPTION_ROUTES_ID"); | ||
| ArrayList<DisruptionRouteLink> disruptionRouteLinks = new ArrayList<DisruptionRouteLink>(); | ||
| ArrayList<DisruptionRouteLink> disruptionRouteLinksByRouteId = disruptionLinks.get(disruptionRouteId); | ||
| if (disruptionRouteLinksByRouteId != null) { | ||
| disruptionRouteLinks = disruptionRouteLinksByRouteId; | ||
| } | ||
| String startStopGid = resultSet.getString("START_STOP_ID"); | ||
| String startStopId = stopsByGid.containsKey(startStopGid) ? stopsByGid.get(startStopGid).stopId : ""; | ||
| String endStopGid = resultSet.getString("END_STOP_ID"); | ||
| String endStopId = stopsByGid.containsKey(startStopGid) ? stopsByGid.get(endStopGid).stopId : ""; | ||
|
|
||
| String affectedRoutes = resultSet.getString("AFFECTED_ROUTE_IDS"); | ||
| List<String> affectedRoutesList = Arrays.stream(affectedRoutes.split(",")).map(String::trim).collect(Collectors.toList()); | ||
|
|
||
| String validFrom = resultSet.getString("DC_VALID_FROM"); | ||
| String validTo = resultSet.getString("DC_VALID_TO"); | ||
|
|
||
| disruptionRoutes.add(new DisruptionRoute(disruptionRouteId, startStopId, endStopId, affectedRoutesList, validFrom, validTo, timezone, disruptionRouteLinks)); | ||
|
|
||
| String name = resultSet.getString("NAME"); | ||
| String description = resultSet.getString("DESCRIPTION"); | ||
| String type = resultSet.getString("DC_TYPE"); | ||
| log.info("Found disruption route with name: {}, description: {} and type: {}", name, description, type); | ||
| } catch (IllegalArgumentException iae) { | ||
| log.error("Error while parsing the disruptionRoutes resultset", iae); | ||
| } | ||
| } | ||
| log.info("Found total {} disruption routes", disruptionRoutes.size()); | ||
| return disruptionRoutes; | ||
| } | ||
|
|
||
| private String createQuery(String query) { | ||
| InputStream stream = getClass().getResourceAsStream(query); | ||
| try { | ||
| return FileUtils.readFileFromStreamOrThrow(stream); | ||
| } catch (Exception e) { | ||
| log.error("Error in reading sql from file:", e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| public static String localDateAsString(Instant instant, String zoneId) { | ||
| return DateTimeFormatter.ofPattern("yyyy-MM-dd").format(instant.atZone(ZoneId.of(zoneId))); | ||
| } | ||
|
|
||
| public HashMap<String, ArrayList<DisruptionRouteLink>> getActiveDisruptionLinks() throws SQLException { | ||
| log.info("Querying disruption routes from database"); | ||
| List<DisruptionRouteLink> links = new ArrayList<>(); | ||
| HashMap<String, ArrayList<DisruptionRouteLink>> linksByRouteId = new HashMap<String, ArrayList<DisruptionRouteLink>>(); | ||
| String preparedString = queryLinksString; | ||
| try (PreparedStatement statement = connection.prepareStatement(preparedString)) { | ||
| ResultSet resultSet = statement.executeQuery(); | ||
| while (resultSet.next()) { | ||
| String id = resultSet.getString("disruption_routes_id"); | ||
| String deviationId = resultSet.getString("deviation_case_id"); | ||
| String startStopId = resultSet.getString("start_stop_id"); | ||
| String endStopId = resultSet.getString("end_stop_id"); | ||
| String sequenceNumber = resultSet.getString("link_location_sequence_number"); | ||
| String latitude = resultSet.getString("latitude"); | ||
| String longitude = resultSet.getString("longitude"); | ||
| DisruptionRouteLink disruptionLink = new DisruptionRouteLink(id, deviationId, startStopId, endStopId, sequenceNumber, latitude, longitude); | ||
|
|
||
| ArrayList<DisruptionRouteLink> list = linksByRouteId.get(id); | ||
| if (list != null) { | ||
| list.add(disruptionLink); | ||
| linksByRouteId.replace(id, list); | ||
| } else { | ||
| ArrayList<DisruptionRouteLink> newList = new ArrayList<DisruptionRouteLink>(); | ||
| newList.add(disruptionLink); | ||
| linksByRouteId.put(id, newList); | ||
| } | ||
| } | ||
| } | ||
| catch (Exception e) { | ||
| log.error("Error while querying and processing messages", e); | ||
| throw e; | ||
| } | ||
| return linksByRouteId; | ||
| } | ||
|
|
||
| @Override | ||
| public List<DisruptionRoute> getActiveDisruptions() throws SQLException { | ||
| log.info("Querying disruption route links from database"); | ||
| String dateFrom = localDateAsString(Instant.now(), timezone); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| String preparedString = queryString.replace("VAR_DATE_FROM", "1970-03-07"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the date intentionally hardcoded here? |
||
| try (PreparedStatement statement = connection.prepareStatement(preparedString)) { | ||
| ResultSet resultSet = statement.executeQuery(); | ||
| HashMap<String, Stop> stopsByGid = new HashMap<String, Stop>(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This HashMap is empty, so stop IDs in DisruptionRoute will be null. Is this a problem? |
||
| return parseDisruptionRoutes(resultSet, stopsByGid); | ||
| } | ||
| catch (Exception e) { | ||
| log.error("Error while querying and processing messages", e); | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| package fi.hsl.transitdata.omm.models; | ||
|
|
||
| import fi.hsl.common.transitdata.proto.InternalMessages; | ||
| import fi.hsl.transitdata.omm.models.cancellations.Journey; | ||
| import fi.hsl.transitdata.omm.models.cancellations.JourneyPattern; | ||
| import fi.hsl.transitdata.omm.models.cancellations.JourneyPatternStop; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.LocalDateTime; | ||
| import java.time.ZoneId; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.util.*; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class DisruptionRoute { | ||
| private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(DisruptionRoute.class); | ||
|
|
||
| public final String disruptionRouteId; | ||
| public final String startStopId; | ||
| public final String endStopId; | ||
| private final Optional<LocalDateTime> validFromDate; | ||
| private final Optional<LocalDateTime> validToDate; | ||
| private final ZoneId timezoneId; | ||
| public final Collection<String> affectedRoutes; | ||
| private final Map<String, List<Journey>> affectedJourneysByJourneyPatternId; | ||
| private final Map<String, List<String>> affectedStopIdsByJourneyPatternId; | ||
| public final ArrayList<DisruptionRouteLink> disruptionRouteLinks; | ||
|
|
||
| public DisruptionRoute(String disruptionRouteId, String startStopId, String endStopId, Collection<String> affectedRoutes, String validFromDate, String validToDate, String timezone, ArrayList<DisruptionRouteLink> disruptionRouteLinks) { | ||
| this.disruptionRouteId = disruptionRouteId; | ||
| this.startStopId = startStopId; | ||
| this.endStopId = endStopId; | ||
| this.validFromDate = getDateOrEmpty(validFromDate); | ||
| this.validToDate = getDateOrEmpty(validToDate); | ||
| this.timezoneId = ZoneId.of(timezone); | ||
| this.affectedRoutes = affectedRoutes; | ||
| this.affectedJourneysByJourneyPatternId = new HashMap<>(); | ||
| this.affectedStopIdsByJourneyPatternId = new HashMap<>(); | ||
| this.disruptionRouteLinks = disruptionRouteLinks; | ||
| } | ||
|
|
||
| public void addAffectedJourneys(List<Journey> journeys) { | ||
| for (Journey journey : journeys) { | ||
| if (!affectedJourneysByJourneyPatternId.containsKey(journey.journeyPatternId)) { | ||
| affectedJourneysByJourneyPatternId.put(journey.journeyPatternId, new ArrayList<>()); | ||
| } | ||
| affectedJourneysByJourneyPatternId.get(journey.journeyPatternId).add(journey); | ||
| } | ||
| } | ||
|
|
||
| public Optional<String> getValidFrom() { | ||
| return validFromDate.map(DATE_TIME_FORMATTER::format); | ||
| } | ||
|
|
||
| public Optional<String> getValidTo() { | ||
| return validToDate.map(DATE_TIME_FORMATTER::format); | ||
| } | ||
|
|
||
| public Collection<String> getAffectedJourneyPatternIds() { | ||
| return affectedJourneysByJourneyPatternId.keySet(); | ||
| } | ||
|
|
||
| public void findAddAffectedStops(Map<String, JourneyPattern> journeyPatternsById) { | ||
| for (String jpId : affectedJourneysByJourneyPatternId.keySet()) { | ||
| JourneyPattern journeyPattern = journeyPatternsById.get(jpId); | ||
| Optional<List<JourneyPatternStop>> stopsBetween = journeyPattern.getStopsBetweenTwoStops(startStopId, endStopId); | ||
| stopsBetween.ifPresent(stops -> affectedStopIdsByJourneyPatternId.put(jpId, stops.stream().map(stop -> stop.stopId).collect(Collectors.toList()))); | ||
| } | ||
| } | ||
|
|
||
| public Optional<LocalDateTime> getDateOrEmpty(String dateStr) { | ||
| try { | ||
| return Optional.of(LocalDateTime.parse(dateStr.replace(" ", "T"))); | ||
| } catch (Exception e) { | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
|
|
||
| public List <InternalMessages.StopCancellations.StopCancellation> getAsStopCancellations() { | ||
| // find unique cancelledStopIds from all affected stop ids | ||
| Set<String> cancelledStopIds = affectedStopIdsByJourneyPatternId.values().stream() | ||
| .flatMap(Collection::stream) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| if (cancelledStopIds.isEmpty()) { | ||
| log.info("No stop cancellations were created by disruption route {}", disruptionRouteId); | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| validateStopCancellations(cancelledStopIds); | ||
| return cancelledStopIds.stream().map(stopId -> { | ||
| InternalMessages.StopCancellations.StopCancellation.Builder builder = InternalMessages.StopCancellations.StopCancellation.newBuilder(); | ||
| builder.setCause(InternalMessages.StopCancellations.Cause.JOURNEY_PATTERN_DETOUR); | ||
| builder.setStopId(stopId); | ||
| validFromDate.ifPresent(dateTime -> builder.setValidFromUnixS(toUtcEpochSeconds(dateTime))); | ||
| validToDate.ifPresent(dateTime -> builder.setValidToUnixS(toUtcEpochSeconds(dateTime))); | ||
| builder.addAllAffectedJourneyPatternIds(new ArrayList<>(affectedStopIdsByJourneyPatternId.keySet())); | ||
| return builder.build(); | ||
| }).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| //TODO: what does this do? | ||
| private void validateStopCancellations(Set<String> cancelledStopIds) { | ||
| // check that there's no journey pattern specific stop cancellations | ||
| Set<String> compareStopIds = new HashSet<>(affectedStopIdsByJourneyPatternId.values().iterator().next()); | ||
| if (!cancelledStopIds.equals(compareStopIds)) { | ||
| log.warn("Found journey pattern specific stop cancellations by disruption route"); | ||
| } | ||
| } | ||
|
|
||
| public List <InternalMessages.JourneyPattern> getAffectedJourneyPatterns(Map<String, JourneyPattern> journeyPatternById) { | ||
| return affectedJourneysByJourneyPatternId.keySet().stream().map(jpId -> { | ||
| JourneyPattern affectedJourneyPattern = journeyPatternById.get(jpId).createNewWithSameStops(); | ||
| affectedJourneyPattern.addAffectedJourneys(affectedJourneysByJourneyPatternId.get(jpId)); | ||
| return affectedJourneyPattern.getAsProtoBuf(); | ||
| }).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private Long toUtcEpochSeconds(LocalDateTime dt) { | ||
| // convert dt to Unix time (i.e. Epoch time in seconds) | ||
| return dt.atZone(timezoneId).toEpochSecond(); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
builder.addAllAffectedDisruptionRoutestakes a list of strings as argument. Maybe there should be DisruptionRoute message in the protobuf? What are we trying to do here?