Compare commits
30 Commits
main
...
Meating-it
Author | SHA1 | Date | |
---|---|---|---|
7585453ff0 | |||
69ec6d2732 | |||
31f1495833 | |||
87cbd484ce | |||
452ce699ee | |||
259c5bc9b7 | |||
bcac814800 | |||
fdb6f73f26 | |||
9439a4e251 | |||
38053cf161 | |||
0a494ad81f | |||
8e6bb25471 | |||
fddda2063e | |||
c23baffa36 | |||
99fd2e3e8d | |||
676beab6b3 | |||
f5d0877151 | |||
a98e069b88 | |||
d1728770c3 | |||
1b658209ad | |||
e9a6fcfb66 | |||
3faed4a41c | |||
b4fb7211f3 | |||
4d3f7ce342 | |||
d5d7b6626b | |||
ef8b8f1fd2 | |||
82f885466e | |||
de482074e6 | |||
bb15cf492a | |||
59f6439872 |
9
.gitignore
vendored
9
.gitignore
vendored
@ -1,3 +1,10 @@
|
||||
env_conf
|
||||
include
|
||||
*.docx
|
||||
*.pdf
|
||||
git
|
||||
run.sh
|
||||
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
@ -17,6 +24,8 @@ eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
lib64
|
||||
bin
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
|
29
README.md
29
README.md
@ -1,14 +1,10 @@
|
||||
# DEPRECATED - DIAGRAMS ARE NOW IN PDF FORMAT.
|
||||
|
||||
https://git.fjla.uk/owlboard/dgp2 supports new PDF format schedule cards and offers some automated validation of codes. This project will not be maintained.
|
||||
|
||||
# diagram-parser
|
||||
|
||||
This is an experimental project and is not yet used as part of the OwlBoard stack.
|
||||
|
||||
## Language
|
||||
|
||||
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which coule be helpful.
|
||||
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which could be helpful.
|
||||
|
||||
## File formats
|
||||
|
||||
@ -18,24 +14,25 @@ Diagrams are received in DOCX format, however can be easily be converted to ODT,
|
||||
|
||||
The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows:
|
||||
|
||||
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
|
||||
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
|
||||
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
|
||||
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
|
||||
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
|
||||
- The evantual aim is to avoid any manual searching of the files.
|
||||
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
|
||||
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
|
||||
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
|
||||
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
|
||||
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
|
||||
- The evantual aim is to avoid any manual searching of the files.
|
||||
|
||||
The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source.
|
||||
|
||||
## Points to Remember
|
||||
|
||||
- Emails received should be verified.
|
||||
- Emails received should be verified.
|
||||
- A pre-authorised key in the subject field, any emails not matching the key should be discarded.
|
||||
- Attachment formats may vary slightly.
|
||||
- Attachment formats may vary slightly.
|
||||
- The format of the attachment should be checked and any errors handled gracefully.
|
||||
- Avoid duplicate issues
|
||||
- Avoid duplicate issues
|
||||
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
|
||||
|
||||
## Main external dependencies (Expected)
|
||||
- imaplib
|
||||
- email
|
||||
|
||||
- imaplib
|
||||
- email
|
5
pyvenv.cfg
Normal file
5
pyvenv.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
home = /usr/bin
|
||||
include-system-site-packages = false
|
||||
version = 3.12.3
|
||||
executable = /usr/bin/python3.12
|
||||
command = /usr/bin/python -m venv /home/fred.boniface/Desktop/diagrams-to-parse/diagram-parser
|
13
requirements.txt
Normal file
13
requirements.txt
Normal file
@ -0,0 +1,13 @@
|
||||
certifi==2024.2.2
|
||||
cffi==1.16.0
|
||||
charset-normalizer==3.3.2
|
||||
gitdb==4.0.11
|
||||
GitPython==3.1.42
|
||||
idna==3.6
|
||||
lxml==5.1.0
|
||||
pycparser==2.21
|
||||
python-docx-2023==0.2.17
|
||||
PyYAML==6.0.1
|
||||
requests==2.31.0
|
||||
smmap==5.0.1
|
||||
urllib3==2.2.1
|
63
src/config.py
Normal file
63
src/config.py
Normal file
@ -0,0 +1,63 @@
|
||||
# Load configuration from file/env variables
|
||||
|
||||
import os
|
||||
|
||||
def load():
|
||||
cfg = {}
|
||||
toLoad = [
|
||||
{
|
||||
"envname": "DG_IMAP_HOST",
|
||||
"filepath": "/owlboard/dgp/imap/host"
|
||||
},
|
||||
{
|
||||
"envname": "DG_IMAP_PORT",
|
||||
"filepath": "/owlboard/dgp/imap/port",
|
||||
"default": "unk"
|
||||
},
|
||||
{
|
||||
"envname": "DG_IMAP_USER",
|
||||
"filepath": "/owlboard/dgp/imap/user",
|
||||
},
|
||||
{
|
||||
"envname": "DG_IMAP_PASS",
|
||||
"filepath": "/owlboard/dgp/imap/pass",
|
||||
},
|
||||
{
|
||||
"envname": "DG_OWL_UUID",
|
||||
"filepath": "/owlboard/dgp/api/uuid",
|
||||
},
|
||||
{
|
||||
"envname": "DG_GITEA_KEY",
|
||||
"filepath": "/owlboard/dgp/gitea/key"
|
||||
},
|
||||
{
|
||||
"envname": "DG_GITEA_HOST",
|
||||
"filepath": "/owlboard/dgp/gitea/host"
|
||||
},
|
||||
{
|
||||
"envname": "DG_GITEA_SSHPORT",
|
||||
"filepath": "/owlboard/dgp/gitea/sshport"
|
||||
}
|
||||
]
|
||||
|
||||
for item in toLoad:
|
||||
filepath = item["filepath"]
|
||||
envname = item["envname"]
|
||||
default = item.get("default")
|
||||
|
||||
# Try to load value from file
|
||||
try:
|
||||
with open(filepath, "r") as file:
|
||||
value = file.read().strip()
|
||||
except FileNotFoundError:
|
||||
# If file doesn't exist, try to get value from environment variable
|
||||
value = os.environ.get(envname)
|
||||
|
||||
# If value is still not found, use the default if provided
|
||||
if value is None and default is not None:
|
||||
value = default
|
||||
|
||||
# Add the value to the cfg dictionary
|
||||
cfg[envname] = value
|
||||
|
||||
return cfg
|
@ -1,34 +0,0 @@
|
||||
### This uses the 'python-docx-2023' module
|
||||
from docx import Document
|
||||
|
||||
def extract_table(file_path):
|
||||
document = Document(file_path)
|
||||
|
||||
table = document.tables[4]
|
||||
print(document.tables[1])
|
||||
print(document.tables[2])
|
||||
print(document.tables[3])
|
||||
print(document.tables[4])
|
||||
print(document.tables[5])
|
||||
|
||||
data = []
|
||||
keys = None
|
||||
for i, row in enumerate(table.rows):
|
||||
text = (cell.text for cell in row.cells)
|
||||
if i == 0:
|
||||
keys = tuple(text)
|
||||
continue
|
||||
row_data = dict(zip(keys, text))
|
||||
data.append(row_data)
|
||||
|
||||
print(data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
extract_table("./file.docx")
|
||||
|
||||
### This can parse each table. What needs to happen next
|
||||
### is to parse all tables, then check for a PIS code.
|
||||
### If PIS code exists, then find the associated headcode,
|
||||
### Then an API request can be made to OwlBoard to try
|
||||
### and find a service with valid stopping pattern,
|
||||
### then the PIS codes can be generated for review.
|
@ -1,87 +0,0 @@
|
||||
import os, sys, json, subprocess, re, yaml, requests
|
||||
|
||||
report_file_path = "./report.txt"
|
||||
code_store_list_url = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/pis/gw.yaml"
|
||||
|
||||
def is_pdfgrep_installed():
|
||||
try:
|
||||
subprocess.check_call(["pdfgrep", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def fetch_and_parse_yaml(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
existing_codes = yaml.safe_load(response.text)
|
||||
return existing_codes
|
||||
except Exception as e:
|
||||
print(f"Error downloading and parsing codes: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def main():
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python pdf_code_extraction.py <directory_path>")
|
||||
sys.exit(1)
|
||||
|
||||
pdf_directory = sys.argv[1]
|
||||
|
||||
if not os.path.isdir(pdf_directory):
|
||||
print(f"'{pdf_directory}' is not a valid directory.")
|
||||
sys.exit(1)
|
||||
|
||||
if not is_pdfgrep_installed():
|
||||
print("pdfgrep is not installed on your system.")
|
||||
sys.exit(1)
|
||||
|
||||
code_list = []
|
||||
|
||||
pdfgrep_cmd = f"pdfgrep -Prno 'code\\s*:\\s*\\d{{4}}' {pdf_directory}"
|
||||
pdfgrep_output = subprocess.getoutput(pdfgrep_cmd)
|
||||
|
||||
|
||||
for line in pdfgrep_output.splitlines():
|
||||
match = re.search(r"^(.*?):\s*code\s*:\s*(\d{4})", line)
|
||||
if match:
|
||||
filename, code = match.groups()
|
||||
code_list.append({"file":filename, "code":str(code)})
|
||||
|
||||
existing_codes = fetch_and_parse_yaml(code_store_list_url)['pis']
|
||||
existing_set = set()
|
||||
for item in existing_codes:
|
||||
code = item['code']
|
||||
existing_set.add(str(code))
|
||||
|
||||
unique_codes = set()
|
||||
unique_code_list = []
|
||||
missing_codes = []
|
||||
for item in code_list:
|
||||
code = item['code']
|
||||
if code not in unique_codes:
|
||||
unique_codes.add(code)
|
||||
unique_code_list.append(item)
|
||||
if code not in existing_set:
|
||||
missing_codes.append(item)
|
||||
|
||||
#print(missing_codes)
|
||||
|
||||
report = f"""
|
||||
Number of missing codes found: {len(missing_codes)}
|
||||
|
||||
Missing Codes:
|
||||
"""
|
||||
|
||||
for item in missing_codes:
|
||||
report += f"\n - code: {item['code']}\n stops: (File: {item['file']})"
|
||||
|
||||
|
||||
print(f"Saving report to {report_file_path}")
|
||||
with open(report_file_path, 'w') as report_file:
|
||||
report_file.write(report)
|
||||
|
||||
print(report)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
3
src/find_service.py
Normal file
3
src/find_service.py
Normal file
@ -0,0 +1,3 @@
|
||||
## Uses the HEADCODE to guess at the service the PIS code matches
|
||||
## Where there are multiple matches both are prepared and
|
||||
## await human review.
|
34
src/formatter.py
Normal file
34
src/formatter.py
Normal file
@ -0,0 +1,34 @@
|
||||
import owlboard_connector
|
||||
import sys
|
||||
|
||||
def humanYaml(pis_list):
|
||||
additional_pis = ''
|
||||
manual_review = ''
|
||||
for pis in pis_list:
|
||||
if len(pis['services']) == 1:
|
||||
crs = []
|
||||
try:
|
||||
if (len(pis['services'][0]['stops']) > 0) :
|
||||
for stop in pis['services'][0]['stops']:
|
||||
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
|
||||
additional_pis += f' - code: "{pis["pis"]}"\n'
|
||||
additional_pis += f' #headcode: {pis["headcode"]}\n'
|
||||
additional_pis += f' #date: {pis["date"]}\n'
|
||||
additional_pis += f' #source_file: {pis["diagram_file"]}\n'
|
||||
additional_pis += f' stops: [{",".join(crs)}]\n'
|
||||
except Exception as err:
|
||||
print(err)
|
||||
elif len(pis['services']) > 1:
|
||||
manual_review += f'## THIS CODE REQUIRES MANUAL VERIFICATION\n'
|
||||
manual_review += f' - code: "{pis["pis"]}"\n'
|
||||
manual_review += f' #headcode: {pis["headcode"]}\n'
|
||||
manual_review += f' #date: {pis["date"]}\n'
|
||||
manual_review += f' #source_file: {pis["diagram_file"]}\n'
|
||||
for service in pis["services"]:
|
||||
crs = []
|
||||
if service and service['stops']:
|
||||
for stop in service['stops']:
|
||||
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
|
||||
manual_review += f' stops: [{",".join(crs)}]\n'
|
||||
|
||||
return "FOR REVIEW\n" + additional_pis + manual_review
|
42
src/gitea_connector.py
Normal file
42
src/gitea_connector.py
Normal file
@ -0,0 +1,42 @@
|
||||
import requests, os, git
|
||||
from datetime import datetime
|
||||
|
||||
BASE_URL = "https://git.fjla.uk/"
|
||||
REPO_URL = f"{BASE_URL}owlboard/data"
|
||||
REPO_PATH = "./git/clone/data"
|
||||
USER = 'owlbot'
|
||||
TOKEN = os.environ.get('DGP_GITEA_TOK')
|
||||
HEADERS = {
|
||||
'Content-Type': 'application/json',
|
||||
'accept': 'application/json',
|
||||
}
|
||||
BRANCH_NAME = 'auto-' + datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
FILE_NAME = 'dg_parser_' + datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
|
||||
|
||||
'''
|
||||
I need a way here to get the original file from the 'main' branch and
|
||||
append the generated PIS codes. Then push to a new generated branch.
|
||||
|
||||
Then a pull request should be created but can probably be done with actions.
|
||||
In reality this program should just take in DOCX files and spit out formatted
|
||||
PIS data to the repo, everything else can be handled at the repo level??
|
||||
|
||||
None of this currently works...
|
||||
'''
|
||||
|
||||
|
||||
def clone_repository():
|
||||
git.Repo.clone_from(REPO_URL, REPO_PATH)
|
||||
|
||||
def commit_and_push_changes(text_to_append, commit_message):
|
||||
repo = git.Repo(REPO_PATH)
|
||||
repo.git.checkout("-b", BRANCH_NAME)
|
||||
with open(REPO_PATH + f"/pis/{FILE_NAME}.yaml", 'w') as file:
|
||||
file.write(text_to_append)
|
||||
repo.index.add([f"pis/{FILE_NAME}.yaml"])
|
||||
repo.index.commit(commit_message)
|
||||
origin = repo.remote(name='origin')
|
||||
origin_url_credentials = REPO_URL.replace('https://', f'https://{USER}:{TOKEN}@')
|
||||
origin.set_url(origin_url_credentials)
|
||||
origin.push(refspec=BRANCH_NAME)
|
44
src/imap_connector.py
Normal file
44
src/imap_connector.py
Normal file
@ -0,0 +1,44 @@
|
||||
import imaplib, email, os
|
||||
|
||||
class IMAPConnector:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if not cls._instance:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if not hasattr(self, 'imap_connection'):
|
||||
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
|
||||
IMAP_USER = os.environ.get('DGP_EML_USER')
|
||||
IMAP_PASS = os.environ.get('DGP_EML_PASS')
|
||||
|
||||
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
|
||||
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
|
||||
|
||||
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
|
||||
self.imap_connection.login(IMAP_USER, IMAP_PASS)
|
||||
self.imap_connection.select('INBOX')
|
||||
|
||||
def fetch_filtered_emails(self, sender_email):
|
||||
filtered_emails = []
|
||||
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
|
||||
if result == 'OK':
|
||||
for num in data[0].split():
|
||||
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
|
||||
if result == 'OK':
|
||||
raw_email = email_data[0][1]
|
||||
email_message = email.message_from_bytes(raw_email)
|
||||
filtered_emails.append(email_message)
|
||||
return filtered_emails
|
||||
|
||||
|
||||
def delete_emails_from_sender(self, sender_email):
|
||||
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
|
||||
if result == 'OK':
|
||||
for num in data[0].split():
|
||||
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
|
||||
self.imap_connection.expunge()
|
||||
print(f"All messages from {sender_email} deleted successfully")
|
||||
|
76
src/local_mode.py
Normal file
76
src/local_mode.py
Normal file
@ -0,0 +1,76 @@
|
||||
import parse_docx, pis_find, owlboard_connector, formatter, gitea_connector
|
||||
import os, sys
|
||||
|
||||
def start():
|
||||
print("Running OwlBoard Diagram Parser in local mode")
|
||||
if not owlboard_connector.check_connection():
|
||||
print("Exiting")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("OwlBoard connection successful")
|
||||
working_directory = os.getcwd()
|
||||
print("Working directory: ", working_directory)
|
||||
|
||||
## Get all files in directory
|
||||
files = [f for f in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, f))]
|
||||
docx_files = [f for f in files if f.endswith(".docx")]
|
||||
|
||||
results = []
|
||||
|
||||
if docx_files:
|
||||
print(f"Found {len(docx_files)} DOCX files in directory")
|
||||
for file in docx_files:
|
||||
print(file)
|
||||
items = parse_docx.extract_tables(file)
|
||||
results.extend(items)
|
||||
else:
|
||||
print("No DOCX files found")
|
||||
|
||||
print(f"Found {len(results)} PIS Codes in documents")
|
||||
missing_pis = pis_find.run(results)
|
||||
print(missing_pis)
|
||||
get_detail = []
|
||||
for code in missing_pis:
|
||||
print(f"Fetching services with code: {code}")
|
||||
services = owlboard_connector.get_services(code['headcode'], code['date'])
|
||||
get_detail.append({
|
||||
'pis': code['pis'],
|
||||
'services': services,
|
||||
'diagram_file': code['source_file'],
|
||||
'date': code['date'],
|
||||
'headcode': code['headcode'],
|
||||
})
|
||||
|
||||
details = []
|
||||
for item in get_detail:
|
||||
detail = {
|
||||
'pis': item['pis'],
|
||||
'headcode': item['headcode'],
|
||||
'date': item['date'],
|
||||
'services': [],
|
||||
'diagram_file': item['diagram_file']
|
||||
}
|
||||
for service in item['services']:
|
||||
service_detail = owlboard_connector.get_service_detail(service['trainUid'], item['date'])
|
||||
detail['services'].append(service_detail)
|
||||
|
||||
details.append(detail)
|
||||
|
||||
formatted_additions = formatter.humanYaml(details)
|
||||
print(formatted_additions)
|
||||
out = open("pis_output", "a")
|
||||
f.write('\n---\n')
|
||||
f.write(formatted_additions)
|
||||
f.close()
|
||||
|
||||
gitea_connector.clone_repository()
|
||||
gitea_connector.commit_and_push_changes(formatted_additions,"From owlbot diagram-parser")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("To use local mode, please call `main.py local`")
|
2
src/mailbox_mode.py
Normal file
2
src/mailbox_mode.py
Normal file
@ -0,0 +1,2 @@
|
||||
import imaplib
|
||||
import email
|
16
src/main.py
Normal file
16
src/main.py
Normal file
@ -0,0 +1,16 @@
|
||||
import sys
|
||||
|
||||
def main():
|
||||
mode = sys.argv[1] if len(sys.argv) > 1 else "local"
|
||||
|
||||
if mode == "local":
|
||||
import local_mode
|
||||
local_mode.start()
|
||||
elif mode == "mailbox":
|
||||
print("MailBox mode not available yet")
|
||||
pass
|
||||
else:
|
||||
print("Invalid mode. Please specify 'local' or 'mailbox'")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
104
src/owlboard_connector.py
Normal file
104
src/owlboard_connector.py
Normal file
@ -0,0 +1,104 @@
|
||||
### API REQUESTS HERE
|
||||
|
||||
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
|
||||
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
|
||||
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION... DON'T BE A FUCKING IDIOT... I CAN JUST PASS
|
||||
### A UUID IN TO THE PROGRAM!! TWAT.
|
||||
|
||||
import requests, os
|
||||
|
||||
OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
|
||||
OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/"
|
||||
OB_TIP_BASE_URL = "https://owlboard.info/api/v2/ref/locationCode/tiploc/"
|
||||
#OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/"
|
||||
#OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/"
|
||||
#OB_TIP_BASE_URL = "http://localhost:8460/api/v2/ref/locationCode/tiploc/"
|
||||
OB_TEST_URL = OB_PIS_BASE_URL + "5001"
|
||||
UUID = os.environ.get('DGP_OB_UUID')
|
||||
HEADERS = {
|
||||
'user-agent': 'owlboard-diagram-parser',
|
||||
'uuid': UUID
|
||||
}
|
||||
|
||||
def check_connection():
|
||||
if not UUID:
|
||||
print("'DGP_OB_UUID' must be set in the environment")
|
||||
return False
|
||||
|
||||
res = requests.get(OB_TEST_URL, headers=HEADERS, timeout=10)
|
||||
if res.status_code == 401:
|
||||
print("Error - Unauthorised. The UUID is not valid. STATUS: ", res.status_code, "UUID: ", UUID)
|
||||
return False
|
||||
elif res.status_code != 200:
|
||||
print("Error - Unable to reach OwlBoard. STATUS: ", res.status_code)
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_services(headcode, date):
|
||||
print("Finding GWR service: ", headcode, ", ", date)
|
||||
results = []
|
||||
url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}"
|
||||
print(url)
|
||||
try:
|
||||
res = requests.get(url, headers=HEADERS)
|
||||
if res.status_code == 200:
|
||||
json_res = res.json()
|
||||
for item in json_res:
|
||||
if item['operator'] == 'GW':
|
||||
results.append(item)
|
||||
print(f"Found {len(results)} valid GWR Service")
|
||||
return results
|
||||
except Exception as e:
|
||||
print(e)
|
||||
sys.exit()
|
||||
|
||||
def get_service_detail(trainUid, date):
|
||||
try:
|
||||
print("Getting GWR service details: ", trainUid, ", ", date)
|
||||
url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}"
|
||||
print(url)
|
||||
res = requests.get(url, headers=HEADERS)
|
||||
if res.status_code == 200:
|
||||
json_res = res.json()
|
||||
if json_res:
|
||||
svc_detail = {
|
||||
'stops': json_res['stops'],
|
||||
'vstp': json_res.get('vstp', False)
|
||||
}
|
||||
organised = organise_svc(svc_detail)
|
||||
#print(res.text)
|
||||
#print(organised)
|
||||
print("Service Details Found")
|
||||
return organised
|
||||
else:
|
||||
print("Service Not Found")
|
||||
sys.exit()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
sys.exit()
|
||||
|
||||
def organise_svc(input):
|
||||
stop_tiplocs = []
|
||||
vstp = input['vstp']
|
||||
|
||||
for stop in input['stops']:
|
||||
if stop['isPublic']:
|
||||
stop_tiplocs.append(stop['tiploc'])
|
||||
|
||||
existingPis = False
|
||||
if 'pis' in input and input['pis'].get('skipCount', 0) == 0:
|
||||
existingPis = True
|
||||
|
||||
return {'stops': stop_tiplocs, 'vstp': vstp}
|
||||
|
||||
def convert_tiploc_to_crs(tiploc):
|
||||
if tiploc == 'RDNG4AB':
|
||||
return 'rdg'
|
||||
res = requests.get(OB_TIP_BASE_URL + tiploc.upper(), headers=HEADERS)
|
||||
if res.status_code == 200:
|
||||
json_res = res.json()
|
||||
if json_res:
|
||||
crs = json_res[0]['3ALPHA']
|
||||
return crs.lower()
|
||||
else:
|
||||
return "NO_CRS"
|
70
src/parse_docx.py
Normal file
70
src/parse_docx.py
Normal file
@ -0,0 +1,70 @@
|
||||
### This uses the 'python-docx-2023' module
|
||||
from docx import Document
|
||||
from datetime import datetime
|
||||
import re
|
||||
|
||||
|
||||
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
|
||||
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
|
||||
|
||||
def extract_tables(file_path):
|
||||
document = Document(file_path)
|
||||
print(f"Reading {len(document.tables)} tables from {file_path}")
|
||||
|
||||
pis_info = []
|
||||
|
||||
for table in document.tables:
|
||||
data = []
|
||||
for i, row in enumerate(table.rows):
|
||||
text = (cell.text for cell in row.cells)
|
||||
if i == 0:
|
||||
keys = tuple(text)
|
||||
continue
|
||||
row_data = dict(zip(keys, text))
|
||||
data.append(row_data)
|
||||
pis_and_headcode = match_pis_and_headcode(data)
|
||||
if pis_and_headcode:
|
||||
pis_and_headcode['source_file'] = file_path
|
||||
current_year = datetime.now().year
|
||||
date_string_with_year = f"{current_year}{file_path.split()[0]}"
|
||||
pis_and_headcode['date'] = datetime.strptime(date_string_with_year, "%Y%m%d")
|
||||
pis_info.append(pis_and_headcode)
|
||||
|
||||
return(pis_info)
|
||||
|
||||
|
||||
def match_pis_and_headcode(table_data):
|
||||
pis_code = None
|
||||
headcode = None
|
||||
job_head = None
|
||||
|
||||
for item in table_data:
|
||||
for key, value in item.items():
|
||||
match = PIS_PATTERN.search(value)
|
||||
if match:
|
||||
pis_code = match.group(1)
|
||||
job_head = key.strip()
|
||||
break
|
||||
if pis_code:
|
||||
break
|
||||
|
||||
if pis_code:
|
||||
for item in table_data:
|
||||
for key in item:
|
||||
match = HEADCODE_PATTERN.search(key)
|
||||
if match:
|
||||
headcode = match.group()
|
||||
break
|
||||
if headcode:
|
||||
break
|
||||
|
||||
if pis_code and headcode:
|
||||
return {'job_head': job_head, 'headcode': headcode, 'pis': pis_code}
|
||||
else:
|
||||
return None
|
||||
|
||||
def solo_run():
|
||||
print(extract_tables("./file.docx"))
|
||||
|
||||
if __name__ == "__main__":
|
||||
solo_run()
|
58
src/pis_fetch.py
Normal file
58
src/pis_fetch.py
Normal file
@ -0,0 +1,58 @@
|
||||
## This module downloads and compiles a list of all PIS codes across all branches of the OwlBoard/Data repo.
|
||||
## The function load_existing_pis() is expected to be used outside of the module.
|
||||
|
||||
import os, requests, yaml
|
||||
|
||||
## TESTING
|
||||
GIT_URL = 'https://git.fjla.uk'
|
||||
|
||||
GIT_API = GIT_URL + '/api/v1'
|
||||
|
||||
def load_existing_pis():
|
||||
all_pis_data = []
|
||||
branches = get_branch_list()
|
||||
for branch in branches:
|
||||
branch_pis_data = get_branch_pis(branch)
|
||||
if branch_pis_data is not None:
|
||||
all_pis_data.append(branch_pis_data)
|
||||
print(f"Branch: {branch}, PIS Codes: {len(branch_pis_data['pis'])}")
|
||||
|
||||
# Merging data and removing duplicates based on 'code' key
|
||||
merged_pis_data = {} ### THIS BIT DOESN'T COMPARE PROPERLY... PRINT EACH TYPE TO SEE STRUCTURE
|
||||
for branch_data in all_pis_data:
|
||||
for item in branch_data['pis']:
|
||||
code = item['code']
|
||||
# Only keep the first occurrence of each 'code'
|
||||
if code not in merged_pis_data:
|
||||
merged_pis_data[code] = item
|
||||
|
||||
# Convert the dictionary back to a list of dictionaries
|
||||
merged_pis_list = [{'code': code, 'stops': value['stops']} for code, value in merged_pis_data.items()]
|
||||
|
||||
print(f"Total unique codes: {len(merged_pis_list)}")
|
||||
return merged_pis_list
|
||||
|
||||
|
||||
|
||||
def get_branch_list():
|
||||
get_branches_endpoint = GIT_API + '/repos/owlboard/data/branches'
|
||||
res = requests.get(get_branches_endpoint)
|
||||
branches_json = res.json()
|
||||
|
||||
branches = []
|
||||
for repo in branches_json:
|
||||
branches.append(repo['name'])
|
||||
|
||||
print(branches)
|
||||
return branches
|
||||
|
||||
def get_branch_pis(branch_name):
|
||||
get_file_url = GIT_API + f'/repos/owlboard/data/raw/%2Fpis%2Fgw.yaml?ref={branch_name}'
|
||||
res = requests.get(get_file_url)
|
||||
print(res.status_code)
|
||||
pis_yaml = res.text
|
||||
dic = yaml.safe_load(pis_yaml)
|
||||
return dic
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(load_existing_pis())
|
35
src/pis_find.py
Normal file
35
src/pis_find.py
Normal file
@ -0,0 +1,35 @@
|
||||
## This modile compares discovered PIS codes with existing PIS codes obtained by calling pis_fetch
|
||||
|
||||
import pis_fetch
|
||||
import sys
|
||||
|
||||
def run(data_list):
|
||||
deduplicated_data = dedup(data_list)
|
||||
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
|
||||
print(f"Searching for {len(deduplicated_data)} PIS codes")
|
||||
missing_data = find_missing(deduplicated_data)
|
||||
print(f"{len(missing_data)} missing PIS codes in OwlBoard data")
|
||||
return missing_data
|
||||
|
||||
def dedup(data_list):
|
||||
unique_dicts = {d['pis']: d for d in data_list}.values()
|
||||
unique_list_of_dicts = list(unique_dicts)
|
||||
return unique_list_of_dicts
|
||||
|
||||
|
||||
def find_missing(data_list):
|
||||
existing_pis_list = pis_fetch.load_existing_pis()
|
||||
missing_data = []
|
||||
|
||||
for item in data_list:
|
||||
pis_code = item.get('pis')
|
||||
if pis_code:
|
||||
code_exists = False
|
||||
for existing_pis in existing_pis_list:
|
||||
if str(existing_pis['code']) == pis_code:
|
||||
code_exists = True
|
||||
break
|
||||
if not code_exists:
|
||||
missing_data.append(item)
|
||||
|
||||
return missing_data
|
Reference in New Issue
Block a user