|
|
|
@ -8,7 +8,7 @@ import math
|
|
|
|
|
import contextlib as ctx
|
|
|
|
|
import http.client as http
|
|
|
|
|
|
|
|
|
|
from typing import Callable, Any, Optional
|
|
|
|
|
from typing import Callable
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# GTFS reference:
|
|
|
|
@ -21,50 +21,58 @@ from typing import Callable, Any, Optional
|
|
|
|
|
GTFS_URL = "https://eu.ftp.opendatasoft.com/sncf/gtfs/export-ter-gtfs-last.zip"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cell = int | str | float | None
|
|
|
|
|
Mapper = Callable[[Any], Cell]
|
|
|
|
|
# Dataset datatypes
|
|
|
|
|
_Cell = int | str | float | None
|
|
|
|
|
|
|
|
|
|
# CSV(str) -> SQLite (_Cell) mapper
|
|
|
|
|
_Mapper = Callable[[str], _Cell]
|
|
|
|
|
|
|
|
|
|
# This data set uses large strings as primary/foreign keys that do not carry
|
|
|
|
|
# information. In order to save some space in the database and and time during
|
|
|
|
|
# requests, we substitute those large IDs with incrementing integers.
|
|
|
|
|
|
|
|
|
|
# Global register for primary key substitution
|
|
|
|
|
PKS: dict[str, dict[str, int]] = {}
|
|
|
|
|
# Global register for primary keys substitution
|
|
|
|
|
_g_pks: dict[str, dict[str, int]] = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Primary key substitution requester
|
|
|
|
|
def primary_key(table: str) -> Mapper:
|
|
|
|
|
assert table not in PKS
|
|
|
|
|
PKS[table] = {}
|
|
|
|
|
def _pk(table: str) -> _Mapper:
|
|
|
|
|
# Primary key substitution mapper
|
|
|
|
|
assert table not in _g_pks
|
|
|
|
|
_g_pks[table] = {}
|
|
|
|
|
|
|
|
|
|
def map(v: str):
|
|
|
|
|
PKS[table][v] = len(PKS[table]) + 1
|
|
|
|
|
return len(PKS[table])
|
|
|
|
|
_g_pks[table][v] = len(_g_pks[table]) + 1
|
|
|
|
|
return len(_g_pks[table])
|
|
|
|
|
|
|
|
|
|
return map
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Foreign key lookup
|
|
|
|
|
def foreign_key(table: str) -> Mapper:
|
|
|
|
|
def _fk(table: str) -> _Mapper:
|
|
|
|
|
# Foreign key lookup mapper
|
|
|
|
|
def map(v: str):
|
|
|
|
|
return PKS[table][v]
|
|
|
|
|
return _g_pks[table][v]
|
|
|
|
|
|
|
|
|
|
return map
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# A "can be null" mapper wrapper
|
|
|
|
|
def optional(f: Mapper) -> Mapper:
|
|
|
|
|
def _opt(f: _Mapper) -> _Mapper:
|
|
|
|
|
# A "can be null" mapper wrapper
|
|
|
|
|
def map(v: str):
|
|
|
|
|
return None if v == "" else f(v)
|
|
|
|
|
|
|
|
|
|
return map
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CSV_FIELDS: list[tuple[str, Optional[list[tuple[str, Optional[Mapper]]]]]] = [
|
|
|
|
|
def _discarded(v: str):
|
|
|
|
|
# No-op mapper for field intentionally not included in the schema
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GTFS_SPECS: list[tuple[str, list[tuple[str, _Mapper | None]]]] = [
|
|
|
|
|
(
|
|
|
|
|
"agency.txt",
|
|
|
|
|
[
|
|
|
|
|
("agency_id", primary_key("agency")),
|
|
|
|
|
("agency_id", _pk("agency")),
|
|
|
|
|
("agency_name", str),
|
|
|
|
|
("agency_url", str),
|
|
|
|
|
("agency_timezone", str),
|
|
|
|
@ -82,25 +90,25 @@ CSV_FIELDS: list[tuple[str, Optional[list[tuple[str, Optional[Mapper]]]]]] = [
|
|
|
|
|
(
|
|
|
|
|
"routes.txt",
|
|
|
|
|
[
|
|
|
|
|
("route_id", primary_key("routes")),
|
|
|
|
|
("agency_id", foreign_key("agency")),
|
|
|
|
|
("route_id", _pk("routes")),
|
|
|
|
|
("agency_id", _fk("agency")),
|
|
|
|
|
("route_short_name", str),
|
|
|
|
|
("route_long_name", str),
|
|
|
|
|
("route_desc", None),
|
|
|
|
|
("route_type", int),
|
|
|
|
|
("route_url", None),
|
|
|
|
|
("route_color", optional(str)),
|
|
|
|
|
("route_text_color", optional(str)),
|
|
|
|
|
("route_color", _opt(str)),
|
|
|
|
|
("route_text_color", _opt(str)),
|
|
|
|
|
],
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"trips.txt",
|
|
|
|
|
[
|
|
|
|
|
("route_id", foreign_key("routes")),
|
|
|
|
|
("route_id", _fk("routes")),
|
|
|
|
|
("service_id", int),
|
|
|
|
|
("trip_id", primary_key("trips")),
|
|
|
|
|
("trip_id", _pk("trips")),
|
|
|
|
|
("trip_headsign", str),
|
|
|
|
|
("direction_id", optional(int)),
|
|
|
|
|
("direction_id", _opt(int)),
|
|
|
|
|
("block_id", int),
|
|
|
|
|
("shape_id", None),
|
|
|
|
|
],
|
|
|
|
@ -108,7 +116,7 @@ CSV_FIELDS: list[tuple[str, Optional[list[tuple[str, Optional[Mapper]]]]]] = [
|
|
|
|
|
(
|
|
|
|
|
"stops.txt",
|
|
|
|
|
[
|
|
|
|
|
("stop_id", primary_key("stops")),
|
|
|
|
|
("stop_id", _pk("stops")),
|
|
|
|
|
("stop_name", str),
|
|
|
|
|
("stop_desc", None),
|
|
|
|
|
("stop_lat", float),
|
|
|
|
@ -116,16 +124,16 @@ CSV_FIELDS: list[tuple[str, Optional[list[tuple[str, Optional[Mapper]]]]]] = [
|
|
|
|
|
("zone_id", None),
|
|
|
|
|
("stop_url", None),
|
|
|
|
|
("location_type", int),
|
|
|
|
|
("parent_station", optional(foreign_key("stops"))),
|
|
|
|
|
("parent_station", _opt(_fk("stops"))),
|
|
|
|
|
],
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"stop_times.txt",
|
|
|
|
|
[
|
|
|
|
|
("trip_id", foreign_key("trips")),
|
|
|
|
|
("trip_id", _fk("trips")),
|
|
|
|
|
("arrival_time", str),
|
|
|
|
|
("departure_time", str),
|
|
|
|
|
("stop_id", foreign_key("stops")),
|
|
|
|
|
("stop_id", _fk("stops")),
|
|
|
|
|
("stop_sequence", int),
|
|
|
|
|
("stop_headsign", None),
|
|
|
|
|
("pickup_type", int),
|
|
|
|
@ -133,8 +141,33 @@ CSV_FIELDS: list[tuple[str, Optional[list[tuple[str, Optional[Mapper]]]]]] = [
|
|
|
|
|
("shape_dist_traveled", None),
|
|
|
|
|
],
|
|
|
|
|
),
|
|
|
|
|
("feed_info.txt", None),
|
|
|
|
|
("transfers.txt", None),
|
|
|
|
|
(
|
|
|
|
|
"feed_info.txt",
|
|
|
|
|
[
|
|
|
|
|
("feed_id", int),
|
|
|
|
|
("feed_publisher_name", str),
|
|
|
|
|
("feed_publisher_url", str),
|
|
|
|
|
("feed_lang", str),
|
|
|
|
|
("feed_start_date", str),
|
|
|
|
|
("feed_end_date", str),
|
|
|
|
|
("feed_version", str),
|
|
|
|
|
("conv_rev", _discarded), # proprietary, undocumented
|
|
|
|
|
("plan_rev", _discarded), # proprietary, undocumented
|
|
|
|
|
],
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
"transfers.txt",
|
|
|
|
|
[
|
|
|
|
|
# We expect that file to have no records.
|
|
|
|
|
# All None mappers will fail if it is not the case
|
|
|
|
|
('from_stop_id', None),
|
|
|
|
|
('to_stop_id', None),
|
|
|
|
|
('transfer_type', None),
|
|
|
|
|
('min_transfer_time', None),
|
|
|
|
|
('from_route_id', None),
|
|
|
|
|
('to_route_id', None),
|
|
|
|
|
]
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -148,47 +181,33 @@ def _get_file_names(etag: str):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fetch_dataset(response: http.HTTPResponse, dataset_file: str):
|
|
|
|
|
print("Fetching dataset...")
|
|
|
|
|
content_length = int(response.getheader("Content-Length"))
|
|
|
|
|
with open(dataset_file, "wb") as zip_out:
|
|
|
|
|
while True:
|
|
|
|
|
bytes = response.read(102400)
|
|
|
|
|
zip_out.write(bytes)
|
|
|
|
|
if not bytes:
|
|
|
|
|
break
|
|
|
|
|
progress = math.floor(100 * zip_out.tell() / content_length)
|
|
|
|
|
print(f"Fetched: {zip_out.tell()}/{content_length} {progress}%")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_dataset_files(zip_in: zipfile.ZipFile):
|
|
|
|
|
csv_files = list(sorted(zip_in.namelist()))
|
|
|
|
|
expected = list(sorted(csv_file for csv_file, _ in CSV_FIELDS))
|
|
|
|
|
expected = list(sorted(csv_file for csv_file, _ in GTFS_SPECS))
|
|
|
|
|
|
|
|
|
|
assert len(expected) == len(csv_files), csv_files
|
|
|
|
|
assert all(a == b for a, b in zip(csv_files, expected, strict=True)), csv_files
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_csv_headers(csv_headers: list[str], fields: list[tuple[str, Any]]):
|
|
|
|
|
def _check_dataset_file_headers(
|
|
|
|
|
csv_headers: list[str],
|
|
|
|
|
fields: list[tuple[str, _Mapper | None]]
|
|
|
|
|
):
|
|
|
|
|
expected = list(header for header, _mapper in fields)
|
|
|
|
|
|
|
|
|
|
assert len(expected) == len(csv_headers), csv_headers
|
|
|
|
|
assert all(a == b for a, b in zip(csv_headers, expected)), csv_headers
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _create_schema(db: sqlite3.Connection, schema_file: str):
|
|
|
|
|
def _create_database_schema(db: sqlite3.Connection, schema_file: str):
|
|
|
|
|
with db:
|
|
|
|
|
with open(schema_file) as sql_in:
|
|
|
|
|
db.executescript(sql_in.read())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_data(zip_in: zipfile.ZipFile, db: sqlite3.Connection):
|
|
|
|
|
for csv_file, fields in CSV_FIELDS:
|
|
|
|
|
if fields is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
def _load_dataset(zip_in: zipfile.ZipFile, db: sqlite3.Connection):
|
|
|
|
|
for csv_file, fields in GTFS_SPECS:
|
|
|
|
|
table = csv_file[:-4]
|
|
|
|
|
print(f"Loading table {table!r}")
|
|
|
|
|
|
|
|
|
|
with zip_in.open(csv_file, "r") as csv_in:
|
|
|
|
|
reader = iter(
|
|
|
|
@ -200,62 +219,124 @@ def _load_data(zip_in: zipfile.ZipFile, db: sqlite3.Connection):
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
_check_csv_headers(next(reader), fields)
|
|
|
|
|
headers = next(reader)
|
|
|
|
|
_check_dataset_file_headers(headers, fields)
|
|
|
|
|
|
|
|
|
|
place_holders = ",".join(
|
|
|
|
|
"?" for _field, mapper in fields if mapper is not None
|
|
|
|
|
"?" for _field, mapper in fields if mapper not in (None, _discarded)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not place_holders:
|
|
|
|
|
try:
|
|
|
|
|
first_row = next(reader)
|
|
|
|
|
except StopIteration:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
raise NotImplementedError(list(zip(headers, first_row)))
|
|
|
|
|
|
|
|
|
|
def map_row(row: list[str]):
|
|
|
|
|
assert all(
|
|
|
|
|
not value
|
|
|
|
|
for (_field, mapper), value in zip(fields, row)
|
|
|
|
|
if mapper is None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
mapper(value)
|
|
|
|
|
for (_field, mapper), value in zip(fields, row)
|
|
|
|
|
if mapper not in (None, _discarded)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
with db:
|
|
|
|
|
db.executemany(
|
|
|
|
|
f"INSERT INTO {table} VALUES ({place_holders})",
|
|
|
|
|
(
|
|
|
|
|
[
|
|
|
|
|
mapper(value)
|
|
|
|
|
for (_field, mapper), value in zip(fields, row)
|
|
|
|
|
if mapper is not None
|
|
|
|
|
]
|
|
|
|
|
for row in reader
|
|
|
|
|
),
|
|
|
|
|
(map_row(row) for row in reader),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_database(dataset_file: str, database_tempfile: str, schema_file: str):
|
|
|
|
|
print("Loading database...")
|
|
|
|
|
with zipfile.ZipFile(dataset_file) as zip_in:
|
|
|
|
|
def checkout_latest_dataset():
|
|
|
|
|
"""Fetch the ETAG and size of latest dataset."""
|
|
|
|
|
|
|
|
|
|
head_request = request.Request(GTFS_URL, method="HEAD")
|
|
|
|
|
response: http.HTTPResponse = request.urlopen(head_request)
|
|
|
|
|
with ctx.closing(response):
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
raise RuntimeError("Could not fetch the dataset")
|
|
|
|
|
|
|
|
|
|
if not (etag := response.getheader("ETag")):
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
|
|
if not (content_length := response.getheader("Content-Length")):
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
|
|
return etag[1:-1], int(content_length)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_dataset(dataset: str, chunk_size: int = 102400):
|
|
|
|
|
"""Download dataset to disk.
|
|
|
|
|
|
|
|
|
|
Yield (downloaded_bytes, total_bytes) until completion.
|
|
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
|
dataset -- path to the zipped dataset file
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
response: http.HTTPResponse = request.urlopen(GTFS_URL)
|
|
|
|
|
with ctx.closing(response):
|
|
|
|
|
if not (content_length := response.getheader("Content-Length")):
|
|
|
|
|
raise ValueError
|
|
|
|
|
content_length = int(content_length)
|
|
|
|
|
|
|
|
|
|
with open(dataset, "wb") as zip_out:
|
|
|
|
|
while True:
|
|
|
|
|
bytes = response.read(chunk_size)
|
|
|
|
|
zip_out.write(bytes)
|
|
|
|
|
if not bytes:
|
|
|
|
|
break
|
|
|
|
|
yield (zip_out.tell(), content_length)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_database(dataset: str, database: str, schema: str):
|
|
|
|
|
"""Create and initialize the database from the dataset.
|
|
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
|
dataset -- path to the zipped dataset file
|
|
|
|
|
database -- path to the SQLight database to create
|
|
|
|
|
schema -- path to the SQL schema definition file
|
|
|
|
|
"""
|
|
|
|
|
with zipfile.ZipFile(dataset) as zip_in:
|
|
|
|
|
_check_dataset_files(zip_in)
|
|
|
|
|
|
|
|
|
|
with ctx.closing(sqlite3.connect(database_tempfile)) as db:
|
|
|
|
|
_create_schema(db, schema_file)
|
|
|
|
|
_load_data(zip_in, db)
|
|
|
|
|
|
|
|
|
|
print("Done")
|
|
|
|
|
with ctx.closing(sqlite3.connect(database)) as db:
|
|
|
|
|
_create_database_schema(db, schema)
|
|
|
|
|
_load_dataset(zip_in, db)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
print("Checking dataset")
|
|
|
|
|
response: http.HTTPResponse = request.urlopen(GTFS_URL)
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
raise RuntimeError("Could not fetch the dataset")
|
|
|
|
|
print("Check dataset")
|
|
|
|
|
etag, size = checkout_latest_dataset()
|
|
|
|
|
|
|
|
|
|
etag = response.getheader("ETag")[1:-1]
|
|
|
|
|
(dataset_file, database_tempfile, schema_file, database_file) = _get_file_names(
|
|
|
|
|
(dataset, temp_database, schema, database) = _get_file_names(
|
|
|
|
|
etag
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if os.path.isfile(dataset_file):
|
|
|
|
|
print("Dataset is up to date")
|
|
|
|
|
response.close()
|
|
|
|
|
if os.path.isfile(dataset):
|
|
|
|
|
print("Dataset is up to date.")
|
|
|
|
|
else:
|
|
|
|
|
_fetch_dataset(response, dataset_file)
|
|
|
|
|
response.close()
|
|
|
|
|
print(f"Download dataset ({size} bytes)...")
|
|
|
|
|
for done, total in download_dataset(dataset):
|
|
|
|
|
progress = math.floor(100 * done / total)
|
|
|
|
|
print(f"Fetched: {done}/{total} {progress}%")
|
|
|
|
|
|
|
|
|
|
if os.path.isfile(database_tempfile):
|
|
|
|
|
os.unlink(database_tempfile)
|
|
|
|
|
_load_database(dataset_file, database_tempfile, schema_file)
|
|
|
|
|
if os.path.isfile(temp_database):
|
|
|
|
|
os.unlink(temp_database)
|
|
|
|
|
|
|
|
|
|
if os.path.isfile(database_file):
|
|
|
|
|
os.unlink(database_file)
|
|
|
|
|
os.rename(database_tempfile, database_file)
|
|
|
|
|
print("Initialize database...")
|
|
|
|
|
initialize_database(dataset, temp_database, schema)
|
|
|
|
|
|
|
|
|
|
if os.path.isfile(database):
|
|
|
|
|
os.unlink(database)
|
|
|
|
|
os.rename(temp_database, database)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|