136 lines
4.2 KiB
Python
136 lines
4.2 KiB
Python
import os
|
|
import sys
|
|
import glob
|
|
import datetime
|
|
import json
|
|
|
|
import parse_pdf
|
|
import train_detail
|
|
import validate
|
|
|
|
# List all PDF files in the given directory
|
|
def list_pdf_files(directory):
|
|
pdf_files = glob.glob(os.path.join(directory, '*.pdf'))
|
|
return pdf_files
|
|
|
|
# Extracts date from PDF filename and returns its content
|
|
def get_schedule_card_data(filepath):
|
|
filename = os.path.basename(filepath)
|
|
date_str = filename[:4]
|
|
date_object = datetime.datetime.now()
|
|
if len(date_str) < 4:
|
|
raise ValueError("Filename must have at least four characters")
|
|
return None
|
|
|
|
month_str = date_str[:2]
|
|
day_str = date_str[2:]
|
|
|
|
try:
|
|
month = int(month_str)
|
|
day = int(day_str)
|
|
|
|
year = datetime.datetime.now().year
|
|
date_object = datetime.datetime(year=year, month=month, day=day)
|
|
except ValueError as e:
|
|
print(f"Error parsing date: {e}")
|
|
return None
|
|
|
|
schedule_card_data = {
|
|
"schedule_date": date_object,
|
|
"schedule_data": parse_pdf.parse_pdf_file(filepath)
|
|
}
|
|
|
|
return schedule_card_data
|
|
|
|
|
|
# Loop through data and remove duplicate codes.
|
|
|
|
# Check for existing codes via OwlBoard API.
|
|
# Validate existing codes and submit issue if not correct.
|
|
|
|
# Use the train list to search for stopping pattern of any absent codes.
|
|
|
|
|
|
# Format file and commit to git
|
|
|
|
def main():
|
|
# Check for arguments
|
|
if len(sys.argv) > 1:
|
|
directory = sys.argv[1]
|
|
else:
|
|
directory = os.getcwd()
|
|
|
|
if not os.path.isdir(directory):
|
|
print(f"Error: '{directory}' is not a valid directory")
|
|
return
|
|
|
|
pdf_files = list_pdf_files(directory)
|
|
if len(pdf_files) == 0:
|
|
print(f"Error: '{directory}' contains no PDF files")
|
|
return
|
|
else:
|
|
print(f"Found {len(pdf_files)} PDF files")
|
|
|
|
|
|
# Find schedule card files
|
|
schedule_cards = []
|
|
for pdf_file in pdf_files:
|
|
schedule_cards.append(get_schedule_card_data(pdf_file))
|
|
|
|
# Iterate over schedule cards and fetch train data
|
|
trains = []
|
|
existing_codes = []
|
|
for schedule_card in schedule_cards:
|
|
date = schedule_card['schedule_date']
|
|
for schedule in schedule_card['schedule_data']:
|
|
|
|
## Skip over schedules which have no PIS Code
|
|
if schedule['pis_code'] is None:
|
|
continue
|
|
|
|
## Check if PIS code already processed, if so skip it. Else add it to 'existing_codes'
|
|
if schedule['pis_code'] in existing_codes:
|
|
continue
|
|
|
|
existing_codes.append(schedule['pis_code'])
|
|
|
|
full_schedule = train_detail.find_gw_trains_by_headcode(schedule['headcode'], date)
|
|
train = {
|
|
'schedule_date': date,
|
|
'schedule_headcode': schedule['headcode'],
|
|
'schedule_location': schedule['location'],
|
|
'schedule_time1': schedule['time0'],
|
|
'schedule_time2': schedule['time1'],
|
|
'schedule_pis_code': schedule['pis_code'],
|
|
'timetable_entry': full_schedule,
|
|
}
|
|
|
|
## Filter out values missing schedule details or with matching PIS Codes
|
|
if not full_schedule:
|
|
continue # Skip if timetable entry was not found
|
|
|
|
if len(train['timetable_entry']) == 1 and train['schedule_pis_code'] == train['timetable_entry'][0].get('pis', {}).get('code'):
|
|
continue # Skip if only one possible timetable entry exists, and PIS code matches OwlBoards suggestion
|
|
|
|
trains.append(train)
|
|
|
|
organised_for_processing = train_detail.organise_trains(trains)
|
|
|
|
auto_matched = validate.filter_timetable_entries(organised_for_processing)
|
|
|
|
validated = validate.check_and_validate_against_owlboard(auto_matched)
|
|
|
|
|
|
# print(trains)
|
|
out = open("organised_for_processing.txt", "w")
|
|
out.write(json.dumps(organised_for_processing, indent=4, default=str))
|
|
out.close()
|
|
out = open("auto_matched.txt", "w")
|
|
out.write(json.dumps(auto_matched, indent=4, default=str))
|
|
out.close()
|
|
out = open("validated.txt", "w")
|
|
out.write(json.dumps(validated, indent=4, default=str))
|
|
out.close()
|
|
|
|
if __name__ == "__main__":
|
|
main() |