### API REQUESTS HERE ### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY ### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS ### MAILBOX ACCESS IS NEEDED FOR REGISTRATION... DON'T BE A FUCKING IDIOT... I CAN JUST PASS ### A UUID IN TO THE PROGRAM!! TWAT. import requests, os OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/" OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/" OB_TIP_BASE_URL = "https://owlboard.info/api/v2/ref/locationCode/tiploc/" #OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/" #OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/" #OB_TIP_BASE_URL = "http://localhost:8460/api/v2/ref/locationCode/tiploc/" OB_TEST_URL = OB_PIS_BASE_URL + "5001" UUID = os.environ.get('DGP_OB_UUID') HEADERS = { 'user-agent': 'owlboard-diagram-parser', 'uuid': UUID } def check_connection(): if not UUID: print("'DGP_OB_UUID' must be set in the environment") return False res = requests.get(OB_TEST_URL, headers=HEADERS) if res.status_code == 401: print("Error - Unauthorised. The UUID is not valid. STATUS: ", res.status_code, "UUID: ", UUID) return False elif res.status_code != 200: print("Error - Unable to reach OwlBoard. STATUS: ", res.status_code) return False return True def get_services(headcode, date): print("Finding GWR service: ", headcode, ", ", date) results = [] url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}" print(url) res = requests.get(url, headers=HEADERS) if res.status_code == 200: json_res = res.json() for item in json_res: if item['operator'] == 'GW': results.append(item) print(f"Found {len(results)} valid GWR Service") return results def get_service_detail(trainUid, date): try: print("Getting GWR service details: ", trainUid, ", ", date) url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}" print(url) res = requests.get(url, headers=HEADERS) if res.status_code == 200: json_res = res.json() if json_res: svc_detail = { 'stops': json_res['stops'], 'vstp': json_res.get('vstp', False) } organised = organise_svc(svc_detail) #print(res.text) #print(organised) print("Service Details Found") return organised else: print("Service Not Found") sys.exit() except Exception as e: print(e) sys.exit() def organise_svc(input): stop_tiplocs = [] vstp = input['vstp'] for stop in input['stops']: if stop['isPublic']: stop_tiplocs.append(stop['tiploc']) existingPis = False if 'pis' in input and input['pis'].get('skipCount', 0) == 0: existingPis = True return {'stops': stop_tiplocs, 'vstp': vstp} def convert_tiploc_to_crs(tiploc): if tiploc == 'RDNG4AB': return 'rdg' res = requests.get(OB_TIP_BASE_URL + tiploc.upper(), headers=HEADERS) if res.status_code == 200: json_res = res.json() if json_res: crs = json_res[0]['3ALPHA'] return crs.lower() else: return "NO_CRS"