Compare commits
2 Commits
d5d7b6626b
...
b4fb7211f3
Author | SHA1 | Date |
---|---|---|
Fred Boniface | b4fb7211f3 | |
Fred Boniface | 4d3f7ce342 |
File diff suppressed because one or more lines are too long
|
@ -29,7 +29,33 @@ def start():
|
||||||
print("No DOCX files found")
|
print("No DOCX files found")
|
||||||
|
|
||||||
print(f"Found {len(results)} PIS Codes in documents")
|
print(f"Found {len(results)} PIS Codes in documents")
|
||||||
pis_find.run(results)
|
missing_pis = pis_find.run(results)
|
||||||
|
get_detail = []
|
||||||
|
for code in missing_pis:
|
||||||
|
print(code)
|
||||||
|
services = owlboard_connector.get_services(code['headcode'], code['date'])
|
||||||
|
get_detail.append({
|
||||||
|
'pis': code['pis'],
|
||||||
|
'services': services,
|
||||||
|
'date': code['date']
|
||||||
|
})
|
||||||
|
|
||||||
|
details = []
|
||||||
|
for item in get_detail:
|
||||||
|
detail = {
|
||||||
|
'pis': item['pis'],
|
||||||
|
'services': [],
|
||||||
|
}
|
||||||
|
for service in item['services']:
|
||||||
|
service_detail = owlboard_connector.get_service_detail(service['trainUid'], item['date'])
|
||||||
|
detail['services'].append(service_detail)
|
||||||
|
|
||||||
|
details.append(detail)
|
||||||
|
|
||||||
|
print(details)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -7,8 +7,11 @@
|
||||||
|
|
||||||
import requests, os
|
import requests, os
|
||||||
|
|
||||||
OB_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
|
#OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
|
||||||
OB_TEST_URL = OB_BASE_URL + "5001"
|
#OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/"
|
||||||
|
OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/"
|
||||||
|
OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/"
|
||||||
|
OB_TEST_URL = OB_PIS_BASE_URL + "5001"
|
||||||
UUID = os.environ.get('DGP_OB_UUID')
|
UUID = os.environ.get('DGP_OB_UUID')
|
||||||
HEADERS = {
|
HEADERS = {
|
||||||
'user-agent': 'owlboard-diagram-parser',
|
'user-agent': 'owlboard-diagram-parser',
|
||||||
|
@ -29,16 +32,46 @@ def check_connection():
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def find_pis_code(code):
|
def get_services(headcode, date):
|
||||||
print("Searching for PIS Code: ", code)
|
print("Finding GWR service: ", headcode, ", ", date)
|
||||||
url = OB_BASE_URL + code
|
results = []
|
||||||
|
url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}"
|
||||||
|
print(url)
|
||||||
res = requests.get(url, headers=HEADERS)
|
res = requests.get(url, headers=HEADERS)
|
||||||
if res.status_code == 200:
|
if res.status_code == 200:
|
||||||
json_response = res.json()
|
json_res = res.json()
|
||||||
if json_response and isinstance(json_response, list):
|
for item in json_res:
|
||||||
return json_response
|
if item['operator'] == 'GW':
|
||||||
else:
|
results.append(item)
|
||||||
return False
|
print(f"Found {len(results)} valid GWR Service")
|
||||||
else:
|
return results
|
||||||
print("Unable to reach OwlBoard. STATUS: ", res.status_code)
|
|
||||||
return True
|
def get_service_detail(trainUid, date):
|
||||||
|
print("Getting GWR service details: ", trainUid, ", ", date)
|
||||||
|
url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}"
|
||||||
|
print(url)
|
||||||
|
res = requests.get(url, headers=HEADERS)
|
||||||
|
if res.status_code == 200:
|
||||||
|
json_res = res.json()
|
||||||
|
if json_res:
|
||||||
|
svc_detail = {
|
||||||
|
'stops': json_res['stops'],
|
||||||
|
'existing_pis': json_res.get('pis', None),
|
||||||
|
'vstp': json_res.get('vstp', False)
|
||||||
|
}
|
||||||
|
|
||||||
|
return organise_svc(svc_detail)
|
||||||
|
|
||||||
|
def organise_svc(input):
|
||||||
|
stop_tiplocs = []
|
||||||
|
vstp = input['vstp']
|
||||||
|
|
||||||
|
for stop in input['stops']:
|
||||||
|
if stop['isPublic']:
|
||||||
|
stop_tiplocs.append(stop['tiploc'])
|
||||||
|
|
||||||
|
existingPis = False
|
||||||
|
if 'pis' in input and input['pis'].get('skipCount', 0) == 0:
|
||||||
|
existingPis = True
|
||||||
|
|
||||||
|
return {'stops': stop_tiplocs, 'existingPis': existingPis, 'vstp': vstp}
|
|
@ -1,20 +1,8 @@
|
||||||
### This uses the 'python-docx-2023' module
|
### This uses the 'python-docx-2023' module
|
||||||
from docx import Document
|
from docx import Document
|
||||||
|
from datetime import datetime
|
||||||
import re
|
import re
|
||||||
|
|
||||||
### This can parse each table. What needs to happen next
|
|
||||||
### is to parse all tables, then check for a PIS code.
|
|
||||||
### If PIS code exists, then find the associated headcode,
|
|
||||||
### Then an API request can be made to OwlBoard to try
|
|
||||||
### and find a service with valid stopping pattern,
|
|
||||||
### then the PIS codes can be generated for review.
|
|
||||||
|
|
||||||
### I think that I need to match each page. I need to search each page for the
|
|
||||||
### days of week the diagram is valid for, eg FSX, FO. Then I need to find the
|
|
||||||
### dates the diagram is valid for. Then use that to search for trains.
|
|
||||||
|
|
||||||
### Alternatively, I could ensure that only daily diagrams are sent and the date
|
|
||||||
### is contained within the body/subject of the email.
|
|
||||||
|
|
||||||
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
|
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
|
||||||
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
|
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
|
||||||
|
@ -37,6 +25,7 @@ def extract_tables(file_path):
|
||||||
pis_and_headcode = match_pis_and_headcode(data)
|
pis_and_headcode = match_pis_and_headcode(data)
|
||||||
if pis_and_headcode:
|
if pis_and_headcode:
|
||||||
pis_and_headcode['source_file'] = file_path
|
pis_and_headcode['source_file'] = file_path
|
||||||
|
pis_and_headcode['date'] = datetime.strptime(file_path.split('_')[0], "%Y%m%d")
|
||||||
pis_info.append(pis_and_headcode)
|
pis_info.append(pis_and_headcode)
|
||||||
|
|
||||||
return(pis_info)
|
return(pis_info)
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
import os, requests, yaml
|
||||||
|
|
||||||
|
API_KEY = os.environ.get('DGP_GITEA_KEY')
|
||||||
|
|
||||||
|
## TESTING
|
||||||
|
GIT_URL = 'https://git.fjla.uk'
|
||||||
|
|
||||||
|
GIT_API = GIT_URL + '/api/v1'
|
||||||
|
|
||||||
|
def load_existing_pis():
|
||||||
|
all_pis_data = []
|
||||||
|
branches = get_branch_list()
|
||||||
|
for branch in branches:
|
||||||
|
branch_pis_data = get_branch_pis(branch)
|
||||||
|
if branch_pis_data is not None:
|
||||||
|
all_pis_data.append(branch_pis_data)
|
||||||
|
print(f"Branch: {branch}, PIS Codes: {len(branch_pis_data['pis'])}")
|
||||||
|
|
||||||
|
# Merging data and removing duplicates based on 'code' key
|
||||||
|
merged_pis_data = {} ### THIS BIT DOESN'T COMPARE PROPERLY... PRINT EACH TYPE TO SEE STRUCTURE
|
||||||
|
for branch_data in all_pis_data:
|
||||||
|
for item in branch_data['pis']:
|
||||||
|
code = item['code']
|
||||||
|
# Only keep the first occurrence of each 'code'
|
||||||
|
if code not in merged_pis_data:
|
||||||
|
merged_pis_data[code] = item
|
||||||
|
|
||||||
|
# Convert the dictionary back to a list of dictionaries
|
||||||
|
merged_pis_list = [{'code': code, 'stops': value['stops']} for code, value in merged_pis_data.items()]
|
||||||
|
|
||||||
|
print(f"Total unique codes: {len(merged_pis_list)}")
|
||||||
|
return merged_pis_list
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_branch_list():
|
||||||
|
get_branches_endpoint = GIT_API + '/repos/owlboard/data/branches'
|
||||||
|
res = requests.get(get_branches_endpoint)
|
||||||
|
branches_json = res.json()
|
||||||
|
|
||||||
|
branches = []
|
||||||
|
for repo in branches_json:
|
||||||
|
branches.append(repo['name'])
|
||||||
|
|
||||||
|
print(branches)
|
||||||
|
return branches
|
||||||
|
|
||||||
|
def get_branch_pis(branch_name):
|
||||||
|
get_file_url = GIT_API + f'/repos/owlboard/data/raw/%2Fpis%2Fgw.yaml?ref={branch_name}'
|
||||||
|
res = requests.get(get_file_url)
|
||||||
|
print(res.status_code)
|
||||||
|
pis_yaml = res.text
|
||||||
|
dic = yaml.safe_load(pis_yaml)
|
||||||
|
return dic
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print(load_existing_pis())
|
|
@ -1,13 +1,13 @@
|
||||||
import owlboard_connector
|
import pis_fetch
|
||||||
import requests
|
import requests, sys
|
||||||
|
|
||||||
def run(data_list):
|
def run(data_list):
|
||||||
deduplicated_data = dedup(data_list)
|
deduplicated_data = dedup(data_list)
|
||||||
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
|
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
|
||||||
print(f"Searching for {len(deduplicated_data)} PIS codes")
|
print(f"Searching for {len(deduplicated_data)} PIS codes")
|
||||||
missing_data = find_missing(deduplicated_data)
|
missing_data = find_missing(deduplicated_data)
|
||||||
print(f"{missing_data} missing PIS codes in OwlBoard data")
|
print(f"{len(missing_data)} missing PIS codes in OwlBoard data")
|
||||||
|
return missing_data
|
||||||
|
|
||||||
def dedup(data_list):
|
def dedup(data_list):
|
||||||
unique_dicts = {d['pis']: d for d in data_list}.values()
|
unique_dicts = {d['pis']: d for d in data_list}.values()
|
||||||
|
@ -15,17 +15,20 @@ def dedup(data_list):
|
||||||
return unique_list_of_dicts
|
return unique_list_of_dicts
|
||||||
|
|
||||||
|
|
||||||
## AUTH REQUIRED!!! Move to owlboard_connector.py
|
|
||||||
def find_missing(data_list):
|
def find_missing(data_list):
|
||||||
#BASEURL = 'http://localhost:8460/api/v2/pis/byCode/'
|
existing_pis_list = pis_fetch.load_existing_pis()
|
||||||
BASEURL = 'https://owlboard.info/api/v2/pis/byCode/'
|
|
||||||
missing_data = []
|
missing_data = []
|
||||||
|
|
||||||
for item in data_list:
|
for item in data_list:
|
||||||
pis_code = item.get('pis')
|
pis_code = item.get('pis')
|
||||||
if pis_code:
|
if pis_code:
|
||||||
pis_code_res = owlboard_connector.find_pis_code(pis_code)
|
code_exists = False
|
||||||
if not pis_code_res:
|
for existing_pis in existing_pis_list:
|
||||||
print("PIS Code ", pis_code, " not found in existing data")
|
if existing_pis['code'] == pis_code:
|
||||||
missing_data.append(pis_code)
|
code_exists = True
|
||||||
print(missing_data)
|
break
|
||||||
|
if not code_exists:
|
||||||
|
print("PIS Code", pis_code, "not found in existing data")
|
||||||
|
missing_data.append(item)
|
||||||
|
|
||||||
|
return missing_data
|
Loading…
Reference in New Issue