Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions mapillary_tools/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def extract_blackvue_info(fp: T.BinaryIO) -> BlackVueInfo | None:
if gps_data is None:
return None

points = list(_parse_gps_box(gps_data))
points = _parse_gps_box(gps_data)
points.sort(key=lambda p: p.time)

if points:
Expand Down Expand Up @@ -114,7 +114,7 @@ def _extract_camera_model_from_cprt(cprt_bytes: bytes) -> str:
return ""


def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
def _parse_gps_box(gps_data: bytes) -> list[geo.Point]:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
[Point(time=1623057074211, lat=51.150436666666664, lon=-114.03067833333333, alt=1097.36, angle=None)]
Expand All @@ -131,6 +131,8 @@ def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
"""
points_by_sentence_type: dict[str, list[geo.Point]] = {}

for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
if match is None:
Expand Down Expand Up @@ -159,20 +161,35 @@ def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
if message.sentence_type in ["GGA"]:
if not message.is_valid:
continue
yield geo.Point(
point = geo.Point(
time=epoch_ms,
lat=message.latitude,
lon=message.longitude,
alt=message.altitude,
angle=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

elif message.sentence_type in ["RMC", "GLL"]:
if not message.is_valid:
continue
yield geo.Point(
point = geo.Point(
time=epoch_ms,
lat=message.latitude,
lon=message.longitude,
alt=None,
angle=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

# This is the extraction order in exiftool
if "RMC" in points_by_sentence_type:
return points_by_sentence_type["RMC"]

if "GGA" in points_by_sentence_type:
return points_by_sentence_type["GGA"]

if "GLL" in points_by_sentence_type:
return points_by_sentence_type["GLL"]

return []
56 changes: 43 additions & 13 deletions mapillary_tools/exiftool_read_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ def _extract_alternative_fields(
return None


def _same_gps_point(left: GPSPoint, right: GPSPoint) -> bool:
"""
>>> left = GPSPoint(time=56.0, lat=36.741385, lon=29.021274, alt=141.6, angle=1.54, epoch_time=None, fix=None, precision=None, ground_speed=None)
>>> right = GPSPoint(time=56.0, lat=36.741385, lon=29.021274, alt=142.4, angle=1.54, epoch_time=None, fix=None, precision=None, ground_speed=None)
>>> _same_gps_point(left, right)
True
"""
return (
left.time == right.time
and left.lon == right.lon
and left.lat == right.lat
and left.epoch_time == right.epoch_time
and left.angle == right.angle
)


def _deduplicate_gps_points(
track: list[GPSPoint], same_gps_point: T.Callable[[GPSPoint, GPSPoint], bool]
) -> list[GPSPoint]:
deduplicated_track: list[GPSPoint] = []
for point in track:
if not deduplicated_track or not same_gps_point(deduplicated_track[-1], point):
deduplicated_track.append(point)
return deduplicated_track


def _aggregate_gps_track(
texts_by_tag: dict[str, list[str]],
time_tag: str | None,
Expand Down Expand Up @@ -174,7 +200,7 @@ def _aggregate_float_values_same_length(
epoch_time = geo.as_unix_time(dt)

# build track
track = []
track: list[GPSPoint] = []
for timestamp, lon, lat, alt, direction, ground_speed in zip(
timestamps,
lons,
Expand All @@ -185,22 +211,26 @@ def _aggregate_float_values_same_length(
):
if timestamp is None or lon is None or lat is None:
continue
track.append(
GPSPoint(
time=timestamp,
lon=lon,
lat=lat,
alt=alt,
angle=direction,
epoch_time=epoch_time,
fix=None,
precision=None,
ground_speed=ground_speed,
)

point = GPSPoint(
time=timestamp,
lon=lon,
lat=lat,
alt=alt,
angle=direction,
epoch_time=epoch_time,
fix=None,
precision=None,
ground_speed=ground_speed,
)

if not track or not _same_gps_point(track[-1], point):
track.append(point)

track.sort(key=lambda point: point.time)

track = _deduplicate_gps_points(track, same_gps_point=_same_gps_point)

if time_tag is not None:
if track:
first_time = track[0].time
Expand Down
27 changes: 3 additions & 24 deletions mapillary_tools/exiftool_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

import platform
import shutil
import subprocess
import typing as T
from pathlib import Path
Expand All @@ -12,32 +10,13 @@ class ExiftoolRunner:
Wrapper around ExifTool to run it in a subprocess
"""

def __init__(self, exiftool_path: str | None = None, recursive: bool = False):
if exiftool_path is None:
exiftool_path = self._search_preferred_exiftool_path()
self.exiftool_path = exiftool_path
def __init__(self, exiftool_executable: str = "exiftool", recursive: bool = False):
self.exiftool_executable = exiftool_executable
self.recursive = recursive

def _search_preferred_exiftool_path(self) -> str:
system = platform.system()

if system and system.lower() == "windows":
exiftool_paths = ["exiftool.exe", "exiftool"]
else:
exiftool_paths = ["exiftool", "exiftool.exe"]

for path in exiftool_paths:
full_path = shutil.which(path)
if full_path:
return path

# Always return the prefered one, even if it is not found,
# and let the subprocess.run figure out the error later
return exiftool_paths[0]

def _build_args_read_stdin(self) -> list[str]:
args: list[str] = [
self.exiftool_path,
self.exiftool_executable,
"-fast",
"-q",
"-n", # Disable print conversion
Expand Down
Empty file.
Loading