Compare commits
30 Commits
main
...
Meating-it
Author | SHA1 | Date | |
---|---|---|---|
7585453ff0 | |||
69ec6d2732 | |||
31f1495833 | |||
87cbd484ce | |||
452ce699ee | |||
259c5bc9b7 | |||
bcac814800 | |||
fdb6f73f26 | |||
9439a4e251 | |||
38053cf161 | |||
0a494ad81f | |||
8e6bb25471 | |||
fddda2063e | |||
c23baffa36 | |||
99fd2e3e8d | |||
676beab6b3 | |||
f5d0877151 | |||
a98e069b88 | |||
d1728770c3 | |||
1b658209ad | |||
e9a6fcfb66 | |||
3faed4a41c | |||
b4fb7211f3 | |||
4d3f7ce342 | |||
d5d7b6626b | |||
ef8b8f1fd2 | |||
82f885466e | |||
de482074e6 | |||
bb15cf492a | |||
59f6439872 |
9
.gitignore
vendored
9
.gitignore
vendored
@ -1,3 +1,10 @@
|
|||||||
|
env_conf
|
||||||
|
include
|
||||||
|
*.docx
|
||||||
|
*.pdf
|
||||||
|
git
|
||||||
|
run.sh
|
||||||
|
|
||||||
# ---> Python
|
# ---> Python
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
__pycache__/
|
__pycache__/
|
||||||
@ -17,6 +24,8 @@ eggs/
|
|||||||
.eggs/
|
.eggs/
|
||||||
lib/
|
lib/
|
||||||
lib64/
|
lib64/
|
||||||
|
lib64
|
||||||
|
bin
|
||||||
parts/
|
parts/
|
||||||
sdist/
|
sdist/
|
||||||
var/
|
var/
|
||||||
|
29
README.md
29
README.md
@ -1,14 +1,10 @@
|
|||||||
# DEPRECATED - DIAGRAMS ARE NOW IN PDF FORMAT.
|
|
||||||
|
|
||||||
https://git.fjla.uk/owlboard/dgp2 supports new PDF format schedule cards and offers some automated validation of codes. This project will not be maintained.
|
|
||||||
|
|
||||||
# diagram-parser
|
# diagram-parser
|
||||||
|
|
||||||
This is an experimental project and is not yet used as part of the OwlBoard stack.
|
This is an experimental project and is not yet used as part of the OwlBoard stack.
|
||||||
|
|
||||||
## Language
|
## Language
|
||||||
|
|
||||||
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which coule be helpful.
|
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which could be helpful.
|
||||||
|
|
||||||
## File formats
|
## File formats
|
||||||
|
|
||||||
@ -18,24 +14,25 @@ Diagrams are received in DOCX format, however can be easily be converted to ODT,
|
|||||||
|
|
||||||
The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows:
|
The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows:
|
||||||
|
|
||||||
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
|
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
|
||||||
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
|
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
|
||||||
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
|
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
|
||||||
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
|
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
|
||||||
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
|
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
|
||||||
- The evantual aim is to avoid any manual searching of the files.
|
- The evantual aim is to avoid any manual searching of the files.
|
||||||
|
|
||||||
The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source.
|
The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source.
|
||||||
|
|
||||||
## Points to Remember
|
## Points to Remember
|
||||||
|
|
||||||
- Emails received should be verified.
|
- Emails received should be verified.
|
||||||
- A pre-authorised key in the subject field, any emails not matching the key should be discarded.
|
- A pre-authorised key in the subject field, any emails not matching the key should be discarded.
|
||||||
- Attachment formats may vary slightly.
|
- Attachment formats may vary slightly.
|
||||||
- The format of the attachment should be checked and any errors handled gracefully.
|
- The format of the attachment should be checked and any errors handled gracefully.
|
||||||
- Avoid duplicate issues
|
- Avoid duplicate issues
|
||||||
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
|
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
|
||||||
|
|
||||||
## Main external dependencies (Expected)
|
## Main external dependencies (Expected)
|
||||||
- imaplib
|
|
||||||
- email
|
- imaplib
|
||||||
|
- email
|
5
pyvenv.cfg
Normal file
5
pyvenv.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
home = /usr/bin
|
||||||
|
include-system-site-packages = false
|
||||||
|
version = 3.12.3
|
||||||
|
executable = /usr/bin/python3.12
|
||||||
|
command = /usr/bin/python -m venv /home/fred.boniface/Desktop/diagrams-to-parse/diagram-parser
|
13
requirements.txt
Normal file
13
requirements.txt
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
certifi==2024.2.2
|
||||||
|
cffi==1.16.0
|
||||||
|
charset-normalizer==3.3.2
|
||||||
|
gitdb==4.0.11
|
||||||
|
GitPython==3.1.42
|
||||||
|
idna==3.6
|
||||||
|
lxml==5.1.0
|
||||||
|
pycparser==2.21
|
||||||
|
python-docx-2023==0.2.17
|
||||||
|
PyYAML==6.0.1
|
||||||
|
requests==2.31.0
|
||||||
|
smmap==5.0.1
|
||||||
|
urllib3==2.2.1
|
63
src/config.py
Normal file
63
src/config.py
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
# Load configuration from file/env variables
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
def load():
|
||||||
|
cfg = {}
|
||||||
|
toLoad = [
|
||||||
|
{
|
||||||
|
"envname": "DG_IMAP_HOST",
|
||||||
|
"filepath": "/owlboard/dgp/imap/host"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_IMAP_PORT",
|
||||||
|
"filepath": "/owlboard/dgp/imap/port",
|
||||||
|
"default": "unk"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_IMAP_USER",
|
||||||
|
"filepath": "/owlboard/dgp/imap/user",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_IMAP_PASS",
|
||||||
|
"filepath": "/owlboard/dgp/imap/pass",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_OWL_UUID",
|
||||||
|
"filepath": "/owlboard/dgp/api/uuid",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_GITEA_KEY",
|
||||||
|
"filepath": "/owlboard/dgp/gitea/key"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_GITEA_HOST",
|
||||||
|
"filepath": "/owlboard/dgp/gitea/host"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"envname": "DG_GITEA_SSHPORT",
|
||||||
|
"filepath": "/owlboard/dgp/gitea/sshport"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
for item in toLoad:
|
||||||
|
filepath = item["filepath"]
|
||||||
|
envname = item["envname"]
|
||||||
|
default = item.get("default")
|
||||||
|
|
||||||
|
# Try to load value from file
|
||||||
|
try:
|
||||||
|
with open(filepath, "r") as file:
|
||||||
|
value = file.read().strip()
|
||||||
|
except FileNotFoundError:
|
||||||
|
# If file doesn't exist, try to get value from environment variable
|
||||||
|
value = os.environ.get(envname)
|
||||||
|
|
||||||
|
# If value is still not found, use the default if provided
|
||||||
|
if value is None and default is not None:
|
||||||
|
value = default
|
||||||
|
|
||||||
|
# Add the value to the cfg dictionary
|
||||||
|
cfg[envname] = value
|
||||||
|
|
||||||
|
return cfg
|
@ -1,34 +0,0 @@
|
|||||||
### This uses the 'python-docx-2023' module
|
|
||||||
from docx import Document
|
|
||||||
|
|
||||||
def extract_table(file_path):
|
|
||||||
document = Document(file_path)
|
|
||||||
|
|
||||||
table = document.tables[4]
|
|
||||||
print(document.tables[1])
|
|
||||||
print(document.tables[2])
|
|
||||||
print(document.tables[3])
|
|
||||||
print(document.tables[4])
|
|
||||||
print(document.tables[5])
|
|
||||||
|
|
||||||
data = []
|
|
||||||
keys = None
|
|
||||||
for i, row in enumerate(table.rows):
|
|
||||||
text = (cell.text for cell in row.cells)
|
|
||||||
if i == 0:
|
|
||||||
keys = tuple(text)
|
|
||||||
continue
|
|
||||||
row_data = dict(zip(keys, text))
|
|
||||||
data.append(row_data)
|
|
||||||
|
|
||||||
print(data)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
extract_table("./file.docx")
|
|
||||||
|
|
||||||
### This can parse each table. What needs to happen next
|
|
||||||
### is to parse all tables, then check for a PIS code.
|
|
||||||
### If PIS code exists, then find the associated headcode,
|
|
||||||
### Then an API request can be made to OwlBoard to try
|
|
||||||
### and find a service with valid stopping pattern,
|
|
||||||
### then the PIS codes can be generated for review.
|
|
@ -1,87 +0,0 @@
|
|||||||
import os, sys, json, subprocess, re, yaml, requests
|
|
||||||
|
|
||||||
report_file_path = "./report.txt"
|
|
||||||
code_store_list_url = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/pis/gw.yaml"
|
|
||||||
|
|
||||||
def is_pdfgrep_installed():
|
|
||||||
try:
|
|
||||||
subprocess.check_call(["pdfgrep", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
return True
|
|
||||||
except subprocess.CalledProcessError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def fetch_and_parse_yaml(url):
|
|
||||||
try:
|
|
||||||
response = requests.get(url)
|
|
||||||
response.raise_for_status()
|
|
||||||
existing_codes = yaml.safe_load(response.text)
|
|
||||||
return existing_codes
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error downloading and parsing codes: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
|
|
||||||
if len(sys.argv) != 2:
|
|
||||||
print("Usage: python pdf_code_extraction.py <directory_path>")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
pdf_directory = sys.argv[1]
|
|
||||||
|
|
||||||
if not os.path.isdir(pdf_directory):
|
|
||||||
print(f"'{pdf_directory}' is not a valid directory.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not is_pdfgrep_installed():
|
|
||||||
print("pdfgrep is not installed on your system.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
code_list = []
|
|
||||||
|
|
||||||
pdfgrep_cmd = f"pdfgrep -Prno 'code\\s*:\\s*\\d{{4}}' {pdf_directory}"
|
|
||||||
pdfgrep_output = subprocess.getoutput(pdfgrep_cmd)
|
|
||||||
|
|
||||||
|
|
||||||
for line in pdfgrep_output.splitlines():
|
|
||||||
match = re.search(r"^(.*?):\s*code\s*:\s*(\d{4})", line)
|
|
||||||
if match:
|
|
||||||
filename, code = match.groups()
|
|
||||||
code_list.append({"file":filename, "code":str(code)})
|
|
||||||
|
|
||||||
existing_codes = fetch_and_parse_yaml(code_store_list_url)['pis']
|
|
||||||
existing_set = set()
|
|
||||||
for item in existing_codes:
|
|
||||||
code = item['code']
|
|
||||||
existing_set.add(str(code))
|
|
||||||
|
|
||||||
unique_codes = set()
|
|
||||||
unique_code_list = []
|
|
||||||
missing_codes = []
|
|
||||||
for item in code_list:
|
|
||||||
code = item['code']
|
|
||||||
if code not in unique_codes:
|
|
||||||
unique_codes.add(code)
|
|
||||||
unique_code_list.append(item)
|
|
||||||
if code not in existing_set:
|
|
||||||
missing_codes.append(item)
|
|
||||||
|
|
||||||
#print(missing_codes)
|
|
||||||
|
|
||||||
report = f"""
|
|
||||||
Number of missing codes found: {len(missing_codes)}
|
|
||||||
|
|
||||||
Missing Codes:
|
|
||||||
"""
|
|
||||||
|
|
||||||
for item in missing_codes:
|
|
||||||
report += f"\n - code: {item['code']}\n stops: (File: {item['file']})"
|
|
||||||
|
|
||||||
|
|
||||||
print(f"Saving report to {report_file_path}")
|
|
||||||
with open(report_file_path, 'w') as report_file:
|
|
||||||
report_file.write(report)
|
|
||||||
|
|
||||||
print(report)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
3
src/find_service.py
Normal file
3
src/find_service.py
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
## Uses the HEADCODE to guess at the service the PIS code matches
|
||||||
|
## Where there are multiple matches both are prepared and
|
||||||
|
## await human review.
|
34
src/formatter.py
Normal file
34
src/formatter.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
import owlboard_connector
|
||||||
|
import sys
|
||||||
|
|
||||||
|
def humanYaml(pis_list):
|
||||||
|
additional_pis = ''
|
||||||
|
manual_review = ''
|
||||||
|
for pis in pis_list:
|
||||||
|
if len(pis['services']) == 1:
|
||||||
|
crs = []
|
||||||
|
try:
|
||||||
|
if (len(pis['services'][0]['stops']) > 0) :
|
||||||
|
for stop in pis['services'][0]['stops']:
|
||||||
|
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
|
||||||
|
additional_pis += f' - code: "{pis["pis"]}"\n'
|
||||||
|
additional_pis += f' #headcode: {pis["headcode"]}\n'
|
||||||
|
additional_pis += f' #date: {pis["date"]}\n'
|
||||||
|
additional_pis += f' #source_file: {pis["diagram_file"]}\n'
|
||||||
|
additional_pis += f' stops: [{",".join(crs)}]\n'
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
elif len(pis['services']) > 1:
|
||||||
|
manual_review += f'## THIS CODE REQUIRES MANUAL VERIFICATION\n'
|
||||||
|
manual_review += f' - code: "{pis["pis"]}"\n'
|
||||||
|
manual_review += f' #headcode: {pis["headcode"]}\n'
|
||||||
|
manual_review += f' #date: {pis["date"]}\n'
|
||||||
|
manual_review += f' #source_file: {pis["diagram_file"]}\n'
|
||||||
|
for service in pis["services"]:
|
||||||
|
crs = []
|
||||||
|
if service and service['stops']:
|
||||||
|
for stop in service['stops']:
|
||||||
|
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
|
||||||
|
manual_review += f' stops: [{",".join(crs)}]\n'
|
||||||
|
|
||||||
|
return "FOR REVIEW\n" + additional_pis + manual_review
|
42
src/gitea_connector.py
Normal file
42
src/gitea_connector.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
import requests, os, git
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
BASE_URL = "https://git.fjla.uk/"
|
||||||
|
REPO_URL = f"{BASE_URL}owlboard/data"
|
||||||
|
REPO_PATH = "./git/clone/data"
|
||||||
|
USER = 'owlbot'
|
||||||
|
TOKEN = os.environ.get('DGP_GITEA_TOK')
|
||||||
|
HEADERS = {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'accept': 'application/json',
|
||||||
|
}
|
||||||
|
BRANCH_NAME = 'auto-' + datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||||
|
FILE_NAME = 'dg_parser_' + datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
I need a way here to get the original file from the 'main' branch and
|
||||||
|
append the generated PIS codes. Then push to a new generated branch.
|
||||||
|
|
||||||
|
Then a pull request should be created but can probably be done with actions.
|
||||||
|
In reality this program should just take in DOCX files and spit out formatted
|
||||||
|
PIS data to the repo, everything else can be handled at the repo level??
|
||||||
|
|
||||||
|
None of this currently works...
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
def clone_repository():
|
||||||
|
git.Repo.clone_from(REPO_URL, REPO_PATH)
|
||||||
|
|
||||||
|
def commit_and_push_changes(text_to_append, commit_message):
|
||||||
|
repo = git.Repo(REPO_PATH)
|
||||||
|
repo.git.checkout("-b", BRANCH_NAME)
|
||||||
|
with open(REPO_PATH + f"/pis/{FILE_NAME}.yaml", 'w') as file:
|
||||||
|
file.write(text_to_append)
|
||||||
|
repo.index.add([f"pis/{FILE_NAME}.yaml"])
|
||||||
|
repo.index.commit(commit_message)
|
||||||
|
origin = repo.remote(name='origin')
|
||||||
|
origin_url_credentials = REPO_URL.replace('https://', f'https://{USER}:{TOKEN}@')
|
||||||
|
origin.set_url(origin_url_credentials)
|
||||||
|
origin.push(refspec=BRANCH_NAME)
|
44
src/imap_connector.py
Normal file
44
src/imap_connector.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
import imaplib, email, os
|
||||||
|
|
||||||
|
class IMAPConnector:
|
||||||
|
_instance = None
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
if not cls._instance:
|
||||||
|
cls._instance = super().__new__(cls)
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
if not hasattr(self, 'imap_connection'):
|
||||||
|
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
|
||||||
|
IMAP_USER = os.environ.get('DGP_EML_USER')
|
||||||
|
IMAP_PASS = os.environ.get('DGP_EML_PASS')
|
||||||
|
|
||||||
|
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
|
||||||
|
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
|
||||||
|
|
||||||
|
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
|
||||||
|
self.imap_connection.login(IMAP_USER, IMAP_PASS)
|
||||||
|
self.imap_connection.select('INBOX')
|
||||||
|
|
||||||
|
def fetch_filtered_emails(self, sender_email):
|
||||||
|
filtered_emails = []
|
||||||
|
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
|
||||||
|
if result == 'OK':
|
||||||
|
for num in data[0].split():
|
||||||
|
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
|
||||||
|
if result == 'OK':
|
||||||
|
raw_email = email_data[0][1]
|
||||||
|
email_message = email.message_from_bytes(raw_email)
|
||||||
|
filtered_emails.append(email_message)
|
||||||
|
return filtered_emails
|
||||||
|
|
||||||
|
|
||||||
|
def delete_emails_from_sender(self, sender_email):
|
||||||
|
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
|
||||||
|
if result == 'OK':
|
||||||
|
for num in data[0].split():
|
||||||
|
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
|
||||||
|
self.imap_connection.expunge()
|
||||||
|
print(f"All messages from {sender_email} deleted successfully")
|
||||||
|
|
76
src/local_mode.py
Normal file
76
src/local_mode.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
import parse_docx, pis_find, owlboard_connector, formatter, gitea_connector
|
||||||
|
import os, sys
|
||||||
|
|
||||||
|
def start():
|
||||||
|
print("Running OwlBoard Diagram Parser in local mode")
|
||||||
|
if not owlboard_connector.check_connection():
|
||||||
|
print("Exiting")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
print("OwlBoard connection successful")
|
||||||
|
working_directory = os.getcwd()
|
||||||
|
print("Working directory: ", working_directory)
|
||||||
|
|
||||||
|
## Get all files in directory
|
||||||
|
files = [f for f in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, f))]
|
||||||
|
docx_files = [f for f in files if f.endswith(".docx")]
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
if docx_files:
|
||||||
|
print(f"Found {len(docx_files)} DOCX files in directory")
|
||||||
|
for file in docx_files:
|
||||||
|
print(file)
|
||||||
|
items = parse_docx.extract_tables(file)
|
||||||
|
results.extend(items)
|
||||||
|
else:
|
||||||
|
print("No DOCX files found")
|
||||||
|
|
||||||
|
print(f"Found {len(results)} PIS Codes in documents")
|
||||||
|
missing_pis = pis_find.run(results)
|
||||||
|
print(missing_pis)
|
||||||
|
get_detail = []
|
||||||
|
for code in missing_pis:
|
||||||
|
print(f"Fetching services with code: {code}")
|
||||||
|
services = owlboard_connector.get_services(code['headcode'], code['date'])
|
||||||
|
get_detail.append({
|
||||||
|
'pis': code['pis'],
|
||||||
|
'services': services,
|
||||||
|
'diagram_file': code['source_file'],
|
||||||
|
'date': code['date'],
|
||||||
|
'headcode': code['headcode'],
|
||||||
|
})
|
||||||
|
|
||||||
|
details = []
|
||||||
|
for item in get_detail:
|
||||||
|
detail = {
|
||||||
|
'pis': item['pis'],
|
||||||
|
'headcode': item['headcode'],
|
||||||
|
'date': item['date'],
|
||||||
|
'services': [],
|
||||||
|
'diagram_file': item['diagram_file']
|
||||||
|
}
|
||||||
|
for service in item['services']:
|
||||||
|
service_detail = owlboard_connector.get_service_detail(service['trainUid'], item['date'])
|
||||||
|
detail['services'].append(service_detail)
|
||||||
|
|
||||||
|
details.append(detail)
|
||||||
|
|
||||||
|
formatted_additions = formatter.humanYaml(details)
|
||||||
|
print(formatted_additions)
|
||||||
|
out = open("pis_output", "a")
|
||||||
|
f.write('\n---\n')
|
||||||
|
f.write(formatted_additions)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
gitea_connector.clone_repository()
|
||||||
|
gitea_connector.commit_and_push_changes(formatted_additions,"From owlbot diagram-parser")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("To use local mode, please call `main.py local`")
|
2
src/mailbox_mode.py
Normal file
2
src/mailbox_mode.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
import imaplib
|
||||||
|
import email
|
16
src/main.py
Normal file
16
src/main.py
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
|
def main():
|
||||||
|
mode = sys.argv[1] if len(sys.argv) > 1 else "local"
|
||||||
|
|
||||||
|
if mode == "local":
|
||||||
|
import local_mode
|
||||||
|
local_mode.start()
|
||||||
|
elif mode == "mailbox":
|
||||||
|
print("MailBox mode not available yet")
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
print("Invalid mode. Please specify 'local' or 'mailbox'")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
104
src/owlboard_connector.py
Normal file
104
src/owlboard_connector.py
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
### API REQUESTS HERE
|
||||||
|
|
||||||
|
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
|
||||||
|
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
|
||||||
|
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION... DON'T BE A FUCKING IDIOT... I CAN JUST PASS
|
||||||
|
### A UUID IN TO THE PROGRAM!! TWAT.
|
||||||
|
|
||||||
|
import requests, os
|
||||||
|
|
||||||
|
OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
|
||||||
|
OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/"
|
||||||
|
OB_TIP_BASE_URL = "https://owlboard.info/api/v2/ref/locationCode/tiploc/"
|
||||||
|
#OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/"
|
||||||
|
#OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/"
|
||||||
|
#OB_TIP_BASE_URL = "http://localhost:8460/api/v2/ref/locationCode/tiploc/"
|
||||||
|
OB_TEST_URL = OB_PIS_BASE_URL + "5001"
|
||||||
|
UUID = os.environ.get('DGP_OB_UUID')
|
||||||
|
HEADERS = {
|
||||||
|
'user-agent': 'owlboard-diagram-parser',
|
||||||
|
'uuid': UUID
|
||||||
|
}
|
||||||
|
|
||||||
|
def check_connection():
|
||||||
|
if not UUID:
|
||||||
|
print("'DGP_OB_UUID' must be set in the environment")
|
||||||
|
return False
|
||||||
|
|
||||||
|
res = requests.get(OB_TEST_URL, headers=HEADERS, timeout=10)
|
||||||
|
if res.status_code == 401:
|
||||||
|
print("Error - Unauthorised. The UUID is not valid. STATUS: ", res.status_code, "UUID: ", UUID)
|
||||||
|
return False
|
||||||
|
elif res.status_code != 200:
|
||||||
|
print("Error - Unable to reach OwlBoard. STATUS: ", res.status_code)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_services(headcode, date):
|
||||||
|
print("Finding GWR service: ", headcode, ", ", date)
|
||||||
|
results = []
|
||||||
|
url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}"
|
||||||
|
print(url)
|
||||||
|
try:
|
||||||
|
res = requests.get(url, headers=HEADERS)
|
||||||
|
if res.status_code == 200:
|
||||||
|
json_res = res.json()
|
||||||
|
for item in json_res:
|
||||||
|
if item['operator'] == 'GW':
|
||||||
|
results.append(item)
|
||||||
|
print(f"Found {len(results)} valid GWR Service")
|
||||||
|
return results
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
def get_service_detail(trainUid, date):
|
||||||
|
try:
|
||||||
|
print("Getting GWR service details: ", trainUid, ", ", date)
|
||||||
|
url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}"
|
||||||
|
print(url)
|
||||||
|
res = requests.get(url, headers=HEADERS)
|
||||||
|
if res.status_code == 200:
|
||||||
|
json_res = res.json()
|
||||||
|
if json_res:
|
||||||
|
svc_detail = {
|
||||||
|
'stops': json_res['stops'],
|
||||||
|
'vstp': json_res.get('vstp', False)
|
||||||
|
}
|
||||||
|
organised = organise_svc(svc_detail)
|
||||||
|
#print(res.text)
|
||||||
|
#print(organised)
|
||||||
|
print("Service Details Found")
|
||||||
|
return organised
|
||||||
|
else:
|
||||||
|
print("Service Not Found")
|
||||||
|
sys.exit()
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
def organise_svc(input):
|
||||||
|
stop_tiplocs = []
|
||||||
|
vstp = input['vstp']
|
||||||
|
|
||||||
|
for stop in input['stops']:
|
||||||
|
if stop['isPublic']:
|
||||||
|
stop_tiplocs.append(stop['tiploc'])
|
||||||
|
|
||||||
|
existingPis = False
|
||||||
|
if 'pis' in input and input['pis'].get('skipCount', 0) == 0:
|
||||||
|
existingPis = True
|
||||||
|
|
||||||
|
return {'stops': stop_tiplocs, 'vstp': vstp}
|
||||||
|
|
||||||
|
def convert_tiploc_to_crs(tiploc):
|
||||||
|
if tiploc == 'RDNG4AB':
|
||||||
|
return 'rdg'
|
||||||
|
res = requests.get(OB_TIP_BASE_URL + tiploc.upper(), headers=HEADERS)
|
||||||
|
if res.status_code == 200:
|
||||||
|
json_res = res.json()
|
||||||
|
if json_res:
|
||||||
|
crs = json_res[0]['3ALPHA']
|
||||||
|
return crs.lower()
|
||||||
|
else:
|
||||||
|
return "NO_CRS"
|
70
src/parse_docx.py
Normal file
70
src/parse_docx.py
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
### This uses the 'python-docx-2023' module
|
||||||
|
from docx import Document
|
||||||
|
from datetime import datetime
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
|
||||||
|
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
|
||||||
|
|
||||||
|
def extract_tables(file_path):
|
||||||
|
document = Document(file_path)
|
||||||
|
print(f"Reading {len(document.tables)} tables from {file_path}")
|
||||||
|
|
||||||
|
pis_info = []
|
||||||
|
|
||||||
|
for table in document.tables:
|
||||||
|
data = []
|
||||||
|
for i, row in enumerate(table.rows):
|
||||||
|
text = (cell.text for cell in row.cells)
|
||||||
|
if i == 0:
|
||||||
|
keys = tuple(text)
|
||||||
|
continue
|
||||||
|
row_data = dict(zip(keys, text))
|
||||||
|
data.append(row_data)
|
||||||
|
pis_and_headcode = match_pis_and_headcode(data)
|
||||||
|
if pis_and_headcode:
|
||||||
|
pis_and_headcode['source_file'] = file_path
|
||||||
|
current_year = datetime.now().year
|
||||||
|
date_string_with_year = f"{current_year}{file_path.split()[0]}"
|
||||||
|
pis_and_headcode['date'] = datetime.strptime(date_string_with_year, "%Y%m%d")
|
||||||
|
pis_info.append(pis_and_headcode)
|
||||||
|
|
||||||
|
return(pis_info)
|
||||||
|
|
||||||
|
|
||||||
|
def match_pis_and_headcode(table_data):
|
||||||
|
pis_code = None
|
||||||
|
headcode = None
|
||||||
|
job_head = None
|
||||||
|
|
||||||
|
for item in table_data:
|
||||||
|
for key, value in item.items():
|
||||||
|
match = PIS_PATTERN.search(value)
|
||||||
|
if match:
|
||||||
|
pis_code = match.group(1)
|
||||||
|
job_head = key.strip()
|
||||||
|
break
|
||||||
|
if pis_code:
|
||||||
|
break
|
||||||
|
|
||||||
|
if pis_code:
|
||||||
|
for item in table_data:
|
||||||
|
for key in item:
|
||||||
|
match = HEADCODE_PATTERN.search(key)
|
||||||
|
if match:
|
||||||
|
headcode = match.group()
|
||||||
|
break
|
||||||
|
if headcode:
|
||||||
|
break
|
||||||
|
|
||||||
|
if pis_code and headcode:
|
||||||
|
return {'job_head': job_head, 'headcode': headcode, 'pis': pis_code}
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def solo_run():
|
||||||
|
print(extract_tables("./file.docx"))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
solo_run()
|
58
src/pis_fetch.py
Normal file
58
src/pis_fetch.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
## This module downloads and compiles a list of all PIS codes across all branches of the OwlBoard/Data repo.
|
||||||
|
## The function load_existing_pis() is expected to be used outside of the module.
|
||||||
|
|
||||||
|
import os, requests, yaml
|
||||||
|
|
||||||
|
## TESTING
|
||||||
|
GIT_URL = 'https://git.fjla.uk'
|
||||||
|
|
||||||
|
GIT_API = GIT_URL + '/api/v1'
|
||||||
|
|
||||||
|
def load_existing_pis():
|
||||||
|
all_pis_data = []
|
||||||
|
branches = get_branch_list()
|
||||||
|
for branch in branches:
|
||||||
|
branch_pis_data = get_branch_pis(branch)
|
||||||
|
if branch_pis_data is not None:
|
||||||
|
all_pis_data.append(branch_pis_data)
|
||||||
|
print(f"Branch: {branch}, PIS Codes: {len(branch_pis_data['pis'])}")
|
||||||
|
|
||||||
|
# Merging data and removing duplicates based on 'code' key
|
||||||
|
merged_pis_data = {} ### THIS BIT DOESN'T COMPARE PROPERLY... PRINT EACH TYPE TO SEE STRUCTURE
|
||||||
|
for branch_data in all_pis_data:
|
||||||
|
for item in branch_data['pis']:
|
||||||
|
code = item['code']
|
||||||
|
# Only keep the first occurrence of each 'code'
|
||||||
|
if code not in merged_pis_data:
|
||||||
|
merged_pis_data[code] = item
|
||||||
|
|
||||||
|
# Convert the dictionary back to a list of dictionaries
|
||||||
|
merged_pis_list = [{'code': code, 'stops': value['stops']} for code, value in merged_pis_data.items()]
|
||||||
|
|
||||||
|
print(f"Total unique codes: {len(merged_pis_list)}")
|
||||||
|
return merged_pis_list
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_branch_list():
|
||||||
|
get_branches_endpoint = GIT_API + '/repos/owlboard/data/branches'
|
||||||
|
res = requests.get(get_branches_endpoint)
|
||||||
|
branches_json = res.json()
|
||||||
|
|
||||||
|
branches = []
|
||||||
|
for repo in branches_json:
|
||||||
|
branches.append(repo['name'])
|
||||||
|
|
||||||
|
print(branches)
|
||||||
|
return branches
|
||||||
|
|
||||||
|
def get_branch_pis(branch_name):
|
||||||
|
get_file_url = GIT_API + f'/repos/owlboard/data/raw/%2Fpis%2Fgw.yaml?ref={branch_name}'
|
||||||
|
res = requests.get(get_file_url)
|
||||||
|
print(res.status_code)
|
||||||
|
pis_yaml = res.text
|
||||||
|
dic = yaml.safe_load(pis_yaml)
|
||||||
|
return dic
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print(load_existing_pis())
|
35
src/pis_find.py
Normal file
35
src/pis_find.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
## This modile compares discovered PIS codes with existing PIS codes obtained by calling pis_fetch
|
||||||
|
|
||||||
|
import pis_fetch
|
||||||
|
import sys
|
||||||
|
|
||||||
|
def run(data_list):
|
||||||
|
deduplicated_data = dedup(data_list)
|
||||||
|
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
|
||||||
|
print(f"Searching for {len(deduplicated_data)} PIS codes")
|
||||||
|
missing_data = find_missing(deduplicated_data)
|
||||||
|
print(f"{len(missing_data)} missing PIS codes in OwlBoard data")
|
||||||
|
return missing_data
|
||||||
|
|
||||||
|
def dedup(data_list):
|
||||||
|
unique_dicts = {d['pis']: d for d in data_list}.values()
|
||||||
|
unique_list_of_dicts = list(unique_dicts)
|
||||||
|
return unique_list_of_dicts
|
||||||
|
|
||||||
|
|
||||||
|
def find_missing(data_list):
|
||||||
|
existing_pis_list = pis_fetch.load_existing_pis()
|
||||||
|
missing_data = []
|
||||||
|
|
||||||
|
for item in data_list:
|
||||||
|
pis_code = item.get('pis')
|
||||||
|
if pis_code:
|
||||||
|
code_exists = False
|
||||||
|
for existing_pis in existing_pis_list:
|
||||||
|
if str(existing_pis['code']) == pis_code:
|
||||||
|
code_exists = True
|
||||||
|
break
|
||||||
|
if not code_exists:
|
||||||
|
missing_data.append(item)
|
||||||
|
|
||||||
|
return missing_data
|
Reference in New Issue
Block a user