This repository has been archived on 2024-11-02. You can view files and clone it, but cannot push or open issues or pull requests.
db-manager/src/timetable.py

198 lines
7.9 KiB
Python

# db-manager - Builds and manages an OwlBoard database instance - To be run on a
# cron schedule
# Copyright (C) 2023 Frederick Boniface
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
REBUILD :bool = False ## Set to true to rebuild database
#Imports
import os
import requests
import logger as log
import zlib
import json
import mongo
import time
from datetime import datetime, timedelta
# This module downloads a single TOCs Schedule data
now = datetime.now()
yesterday = now - timedelta(days=1)
yesterdayDay = yesterday.strftime("%a").lower()
TOC_Code = "EF" # Business code for GWR
fullDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_FULL_DAILY&day=toc-full"
updateDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_UPDATE_DAILY&day=toc-update-{yesterdayDay}"
CORPUS_USER = os.getenv('OWL_LDB_CORPUSUSER')
CORPUS_PASS = os.getenv('OWL_LDB_CORPUSPASS')
log.out("timetable.py: Timetable module loaded", "DBUG")
# Determine state of current Timetable Database
def isUpdateRequired():
timetableLength = mongo.getLength("timetable")
log.out(f"timetable.isUpdateRequired: timetable collection contains {timetableLength} documents", "DBUG")
timetableUpdateDate = mongo.metaCheckTime("timetable")
log.out(f"timetable.isUpdateRequired: Timetable last updated at {timetableUpdateDate}", "INFO")
if (not timetableLength or int(time.time()) > timetableUpdateDate + 172800 or REBUILD):
log.out(f"timetable.isUpdateRequired: timetable collection requires rebuild", "INFO")
return "full"
if (int(time.time()) > (timetableUpdateDate + 86400)):
log.out(f"timetable.isUpdateRequired: timetable collection requires update", "INFO")
return "update"
return False
def getTimetable(full :bool = False):
downloadUrl :str = fullDataUrl if full else updateDataUrl
response = requests.get(downloadUrl, auth=(CORPUS_USER, CORPUS_PASS))
mongo.incrementCounter("schedule_api")
log.out(f"timetable.getTimetable: Fetch (Full:{full}) response: {response.status_code}", "DBUG")
decompressed = zlib.decompress(response.content, 16+zlib.MAX_WBITS)
#print(decompressed)
return decompressed
def loopTimetable(data):
listify = data.splitlines()
documents :list = []
for item in listify:
dic = json.loads(item)
if ('JsonTimetableV1' in dic):
timestamp = dic['JsonTimetableV1']['timestamp']
sequence = dic['JsonTimetableV1']['Metadata']['sequence']
detail = {timestamp: timestamp, sequence: sequence}
# Do something with this data here
# Check if timestamp and sequence are correct, if not trigger a full download
elif ('TiplocV1' in dic):
pass
# Not used as TIPLOCs etc. are sourced from CORPUS
elif ('JsonAssociationV1' in dic):
pass
# Associates trains with eachother - not planning to use yet.
elif ('JsonScheduleV1' in dic):
document = insertSchedule(dic)
documents.append(document)
return documents
def runUpdate():
required = isUpdateRequired()
if (required == "full"):
log.out("timetable.runUpdate: Fetching full timetable data", "INFO")
data = getTimetable(full = True)
elif (required == "update"):
log.out("timetable.runUpdate: Fetching todays timetable update", "INFO")
data = getTimetable()
else:
log.out("timetable.runUpdate: timetable update is not needed", "INFO")
return "done"
parsed = loopTimetable(data)
status = _insertToDb(parsed, required)
if (status):
mongo.metaUpdateTime("timetable")
_removeOutdatedServices()
## Check what happens if there is no update
def insertSchedule(sch_record):
schedule = sch_record['JsonScheduleV1']
scheduleId = schedule['CIF_train_uid']
transactionType = schedule['transaction_type']
if ('schedule_start_date' in schedule):
scheduleStart = _helpParseDate(schedule['schedule_start_date'])
else:
now = datetime.now()
scheduleStart = now.replace(hour=0,minute=0,second=0,microsecond=0)
if ('schedule_end_date' in schedule):
scheduleEnd = _helpParseDate(schedule['schedule_end_date'], "end")
else:
scheduleEnd = "null"
document = {
'transactionType': schedule.get('transaction_type'),
'stpIndicator': schedule.get('CIF_stp_indicator'),
'trainUid': scheduleId,
'headcode': schedule.get('schedule_segment', {}).get('signalling_id'),
'powerType': schedule.get('schedule_segment', {}).get('CIF_power_type'),
'planSpeed': schedule.get('schedule_segment', {}).get('CIF_speed'),
'scheduleStartDate': scheduleStart,
'scheduleEndDate': scheduleEnd,
'daysRun': _helpParseDays(schedule.get('schedule_days_runs', '0000000'))
}
if ('schedule_location' in schedule.get('schedule_segment', {})):
stops = _helpParseStops(schedule['schedule_segment']['schedule_location'])
else:
stops = []
document['stops'] = stops
return document
def _insertToDb(data :list, type :str):
if type == "full":
mongo.dropCollection("timetable")
mongo.putTimetable(data)
mongo.createSingleIndex("timetable", "headcode")
elif type == "update":
for item in data:
if item['transactionType'] == "Create":
singleList = [item]
mongo.putTimetable(singleList)
elif item['transactionType'] == "Delete":
mongo.deleteTimetableData({'trainUid': item['trainUid']})
return True #If Successful else False
def _helpParseStops(schedule_segment):
stops = []
for i in schedule_segment:
timing_point = {}
public_departure = i.get("public_departure")
wtt_departure = i.get("departure")
public_arrival = i.get("public_arrival")
wtt_arrival = i.get("arrival")
tiploc_code = i.get("tiploc_code")
isPublic = False
if public_departure and len(public_departure) == 4 and public_departure.isdigit():
isPublic = True
timing_point['publicDeparture'] = public_departure
if public_arrival and len(public_arrival) == 4 and public_arrival.isdigit():
isPublic = True
timing_point['publicArrival'] = public_arrival
if wtt_departure:
timing_point['wttDeparture'] = wtt_departure
if wtt_arrival:
timing_point['wttArrival'] = wtt_arrival
timing_point['isPublic'] = isPublic
timing_point['tiploc'] = tiploc_code
stops.append(timing_point)
return stops
def _helpParseDays(string):
# Incoming string contains seven numbers, each number from 0-6 representing days Mon-Sun
daysList = ["m", "t", "w", "th", "f", "s", "su"]
selectedDays = [daysList[i] for i, value in enumerate(string) if value == "1"]
return selectedDays
def _helpParseDate(string :str, time :str = "false"):
# Incoming string contains date in format %Y-%m-%d, if the time signified end of schedule,
# append 23:59:59 to the time, else append 00:00:00 to the string.
if time == "end":
string += " 235959"
else:
string += " 000000"
return datetime.strptime(string, "%Y-%m-%d %H%M%S")
def _removeOutdatedServices():
log.out("timetable._removeOutdatedServices: Removing out of date schedules", "INFO")
preCount = mongo.getLength("timetable")
query = {
"schedule_end_date": {
"$lte": datetime.now()
}
}
postCount = mongo.getLength("timetable")
log.out(f"timetable._removeOutdatedServices: Removed {preCount - postCount} out of date services", "DBUG")