Added further validation and issue creation
This commit is contained in:
parent
be31e6cfe1
commit
8e78319cbd
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,4 +1,8 @@
|
|||||||
*.pdf
|
*.pdf
|
||||||
|
auto_matched.txt
|
||||||
|
organised_for_processing.txt
|
||||||
|
validated.txt
|
||||||
|
output.txt
|
||||||
output
|
output
|
||||||
|
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
|
96
src/gitea.py
Normal file
96
src/gitea.py
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
import requests
|
||||||
|
import base64
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
|
class GiteaConnector:
|
||||||
|
def __init__(self, repo_url, user, key):
|
||||||
|
self.repo_url = repo_url
|
||||||
|
self.user = user
|
||||||
|
self.key = key
|
||||||
|
|
||||||
|
self.header = {
|
||||||
|
"accept": "application/json",
|
||||||
|
"Authorization": f"token {self.key}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
## Create an issue in the repo
|
||||||
|
def create_issue(self,title,content):
|
||||||
|
url_suffix = "issues"
|
||||||
|
url = urljoin(self.repo_url, url_suffix)
|
||||||
|
|
||||||
|
issue_data = {
|
||||||
|
"title": title,
|
||||||
|
"body": content,
|
||||||
|
"assignees": ["fred.boniface"],
|
||||||
|
"labels": ["bug", "urgent"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(url, headers=self.header, json=issue_data)
|
||||||
|
if response.status_code == 201:
|
||||||
|
print("Succesfully creates issue: ", response.json())
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print("Failed to create issue: ", response.status_code, response.text)
|
||||||
|
return False
|
||||||
|
|
||||||
|
## Download the 'gw' pis file from the repo
|
||||||
|
def download_pis_file(self):
|
||||||
|
url = urljoin(self.repo_url, 'contents/pis/gw.yaml')
|
||||||
|
response = requests.get(url, headers=self.header)
|
||||||
|
if response.status_code == 200:
|
||||||
|
file_content = base64.b64decode(response.json()['content']).decode('utf-8')
|
||||||
|
print("File downloaded successfully")
|
||||||
|
return file_content
|
||||||
|
else:
|
||||||
|
print("Failed to download file: ", response.status_code, response.text)
|
||||||
|
return ""
|
||||||
|
|
||||||
|
## Create new branch in the repo
|
||||||
|
def create_branch(self, branch_name):
|
||||||
|
BASE_BRANCH = "main"
|
||||||
|
base_branch_url = urljoin(self.repo_url, f"branch/{BASE_BRANCH}")
|
||||||
|
branch_response = requests.get(base_branch_url, headers=self.header)
|
||||||
|
|
||||||
|
if branch_response.status_code == 200:
|
||||||
|
base_sha = branch_response.json()['commit']['id']
|
||||||
|
|
||||||
|
create_branch_url = urljoin(self.repo_url, "git/refs")
|
||||||
|
branch_data = {
|
||||||
|
"ref": f"refs/heads/{branch_name}",
|
||||||
|
"sha": base_sha,
|
||||||
|
}
|
||||||
|
|
||||||
|
create_branch_response = requests.post(create_branch_url, headers=self.header, data=json.dumps(branch_data))
|
||||||
|
if create_branch_response.status_code == 201:
|
||||||
|
print(f"Branch {branch_name} created successfully")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"Failed to create branch: ", branch_name, create_branch_response.status_code, create_branch_response.text)
|
||||||
|
return False
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("Failed to retreive branch SHA: ", branch_response.status_code, branch_response.text)
|
||||||
|
return False
|
||||||
|
|
||||||
|
## Create a new file in the repo
|
||||||
|
def create_pis_file(self, branch, filename, file_content):
|
||||||
|
file_path = f"pis/{filename}"
|
||||||
|
encoded_content = base64.encode(file_content.encode('utf-8'))
|
||||||
|
|
||||||
|
create_file_url = url_multijoin(self.repo_url, f'contents/{file_path}')
|
||||||
|
|
||||||
|
file_data = {
|
||||||
|
"content": encoded_content,
|
||||||
|
"message": "Create new PIS File",
|
||||||
|
"branch": branch,
|
||||||
|
}
|
||||||
|
|
||||||
|
create_file_response = requests.post(create_file_url, headers=self.header, data=json.dumps(file_data))
|
||||||
|
|
||||||
|
if create_file_response.status_code == 201:
|
||||||
|
print("File created successfully")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print("File creation failed.", create_file_response.status_code, create_file_response.text)
|
||||||
|
return False
|
12
src/main.py
12
src/main.py
@ -71,6 +71,8 @@ def main():
|
|||||||
else:
|
else:
|
||||||
print(f"Found {len(pdf_files)} PDF files")
|
print(f"Found {len(pdf_files)} PDF files")
|
||||||
|
|
||||||
|
|
||||||
|
# Find schedule card files
|
||||||
schedule_cards = []
|
schedule_cards = []
|
||||||
for pdf_file in pdf_files:
|
for pdf_file in pdf_files:
|
||||||
schedule_cards.append(get_schedule_card_data(pdf_file))
|
schedule_cards.append(get_schedule_card_data(pdf_file))
|
||||||
@ -116,11 +118,19 @@ def main():
|
|||||||
|
|
||||||
auto_matched = validate.filter_timetable_entries(organised_for_processing)
|
auto_matched = validate.filter_timetable_entries(organised_for_processing)
|
||||||
|
|
||||||
|
validated = validate.check_and_validate_against_owlboard(auto_matched)
|
||||||
|
|
||||||
|
|
||||||
# print(trains)
|
# print(trains)
|
||||||
out = open("output.txt", "w")
|
out = open("organised_for_processing.txt", "w")
|
||||||
|
out.write(json.dumps(organised_for_processing, indent=4, default=str))
|
||||||
|
out.close()
|
||||||
|
out = open("auto_matched.txt", "w")
|
||||||
out.write(json.dumps(auto_matched, indent=4, default=str))
|
out.write(json.dumps(auto_matched, indent=4, default=str))
|
||||||
out.close()
|
out.close()
|
||||||
|
out = open("validated.txt", "w")
|
||||||
|
out.write(json.dumps(validated, indent=4, default=str))
|
||||||
|
out.close()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
@ -1,6 +1,7 @@
|
|||||||
from pyOwlBoard import client
|
from pyOwlBoard import client
|
||||||
|
import os
|
||||||
|
|
||||||
ob_client = client.OwlBoardClient('https://owlboard.info', 'x')
|
ob_client = client.OwlBoardClient('https://owlboard.info', os.getenv("OWLBOARD_KEY"))
|
||||||
|
|
||||||
def find_gw_trains_by_headcode(headcode, date):
|
def find_gw_trains_by_headcode(headcode, date):
|
||||||
train_list = ob_client.get_trains_by_headcode(headcode, date)
|
train_list = ob_client.get_trains_by_headcode(headcode, date)
|
||||||
@ -38,6 +39,7 @@ def organise_trains(trains):
|
|||||||
def rationalise_timetable_entry(timetable_entries):
|
def rationalise_timetable_entry(timetable_entries):
|
||||||
rationalised_entries = []
|
rationalised_entries = []
|
||||||
for timetable_entry in timetable_entries:
|
for timetable_entry in timetable_entries:
|
||||||
|
if timetable_entry['stpIndicator'] == "C": continue
|
||||||
entry = {
|
entry = {
|
||||||
'stpIndicator': timetable_entry['stpIndicator'],
|
'stpIndicator': timetable_entry['stpIndicator'],
|
||||||
'operator': timetable_entry['operator'],
|
'operator': timetable_entry['operator'],
|
||||||
|
@ -1,3 +1,8 @@
|
|||||||
|
from pyOwlBoard import client
|
||||||
|
import os
|
||||||
|
|
||||||
|
from gitea import GiteaConnector
|
||||||
|
|
||||||
## Validates and filters input based on whether diagram time matches schedule start time
|
## Validates and filters input based on whether diagram time matches schedule start time
|
||||||
def filter_timetable_entries(diagram_entries):
|
def filter_timetable_entries(diagram_entries):
|
||||||
for entry in diagram_entries:
|
for entry in diagram_entries:
|
||||||
@ -28,9 +33,45 @@ def filter_timetable_entries(diagram_entries):
|
|||||||
|
|
||||||
## Checks OwlBoard API for existing PIS codes and whether they match
|
## Checks OwlBoard API for existing PIS codes and whether they match
|
||||||
def check_and_validate_against_owlboard(train_entries):
|
def check_and_validate_against_owlboard(train_entries):
|
||||||
### Loop through input list (which is output of above function, currently in output.txt)
|
ob_client = client.OwlBoardClient("https://owlboard.info", os.getenv("OWLBOARD_KEY"))
|
||||||
### check whether code exists in OwlBoard API, if so - does it match.
|
gitea_client = GiteaConnector("https://git.fjla.uk/api/v1/repos/owlboard/data", "fred.boniface", os.getenv("GITEA_PASS"))
|
||||||
### If exists but no match, open an issue.
|
|
||||||
### If does not exist, do nothing.
|
output = []
|
||||||
### If exists and does match, remove from input list.
|
for train_entry in train_entries:
|
||||||
### Return output
|
existing_pis = ob_client.get_stops_by_pis(train_entry['diagram_pis_code'])
|
||||||
|
if existing_pis:
|
||||||
|
database_stops = [stop.upper() for stop in existing_pis[0].get("stops", [])]
|
||||||
|
else:
|
||||||
|
database_stops = []
|
||||||
|
|
||||||
|
## If only one possible entry, and it matches stops in Database, continue
|
||||||
|
if (
|
||||||
|
len(train_entry['timetable_entries']) == 1
|
||||||
|
and database_stops == train_entry['timetable_entries'][0]["stops"]
|
||||||
|
): continue
|
||||||
|
|
||||||
|
## If database stops are empty (not in database) add to output list
|
||||||
|
elif not database_stops:
|
||||||
|
output.append(train_entry)
|
||||||
|
|
||||||
|
# Else if only one possible entry (and previous statements false), open issue
|
||||||
|
elif len(train_entry['timetable_entries']) == 1:
|
||||||
|
issue_title = f"PIS Error | Code: {train_entry['diagram_pis_code']}"
|
||||||
|
issue_content = f"""
|
||||||
|
PIS Code {train_entry['diagram_pis_code']}.
|
||||||
|
Diagram dated {train_entry['diagram_date']} has been parsed and produced a mismatch with the OwlBoard database.
|
||||||
|
|
||||||
|
Diagram Stops: {",".join(train_entry['timetable_entries'][0]["stops"])}
|
||||||
|
|
||||||
|
Database Stops: {",".join(database_stops)}
|
||||||
|
|
||||||
|
This requires a manual check of the PIS file to identify mistakes or changes.
|
||||||
|
"""
|
||||||
|
print(issue_content)
|
||||||
|
gitea_client.create_issue(issue_title, issue_content)
|
||||||
|
continue
|
||||||
|
|
||||||
|
## Else append to output
|
||||||
|
else:
|
||||||
|
output.append(train_entry)
|
||||||
|
return output
|
||||||
|
Loading…
Reference in New Issue
Block a user