Implement shared logic between CORPUS and CIF packages.
Begin implementation of shared logic between CIF and VSTP packages.
This commit is contained in:
parent
8c231fe4af
commit
2f5868e743
@ -1,63 +1,53 @@
|
||||
package cif
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Break this down in to smaller, simpler functions
|
||||
// Loads CifMetadata and passes it to parseMetadata, this function is what you should call to initiate the CifUpdate process.
|
||||
func CifCheck(cfg *helpers.Configuration) error {
|
||||
log.Msg.Debug("Checking age of CIF Data")
|
||||
|
||||
metadata, err := dbAccess.GetCifMetadata()
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to fetch CifMetadata", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = parseMetadata(metadata, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error updating CIF Data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Requests a full update if no metadata exists, or a daily update if metadata does exist.
|
||||
// The daily update function does further metadata parsing to determine what exactly needs downloading.
|
||||
func parseMetadata(metadata *dbAccess.CifMetadata, cfg *helpers.Configuration) error {
|
||||
if metadata == nil {
|
||||
log.Msg.Info("No metadata found for last CIF Update, recreating timetable")
|
||||
newMeta, err := runUpdate("full", nil)
|
||||
log.Msg.Info("No metadata, creating Timetable data")
|
||||
newMeta, err := runFullUpdate(cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("CIF Update failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
ok := dbAccess.PutCifMetadata(*newMeta)
|
||||
if !ok {
|
||||
log.Msg.Warn("CIF Update Successful but metadata update failed")
|
||||
return nil
|
||||
log.Msg.Error("CIF Data updated but Metadata Update failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
london, _ := time.LoadLocation("Europe/London")
|
||||
londonTimeNow := time.Now().In(london)
|
||||
day := 12 * time.Hour
|
||||
updateThreshold := londonTimeNow.Add(-day)
|
||||
availableHour := 6
|
||||
|
||||
if londonTimeNow.Hour() >= availableHour {
|
||||
if metadata.LastUpdate.Before(updateThreshold) || metadata.LastUpdate.Equal(updateThreshold) {
|
||||
newMeta, err := runUpdate("full", metadata)
|
||||
if err != nil {
|
||||
log.Msg.Error("CIF Update failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if newMeta == nil {
|
||||
log.Msg.Info("CIF Update requirements not met, will retry")
|
||||
return nil
|
||||
}
|
||||
ok := dbAccess.PutCifMetadata(*newMeta)
|
||||
if !ok {
|
||||
log.Msg.Warn("CIF Update Successful but metadata update failed")
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
log.Msg.Debug("Requesting CIF Data Update")
|
||||
newMeta, err := runUpdate("daily", metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ok := dbAccess.PutCifMetadata(*newMeta)
|
||||
if !ok {
|
||||
log.Msg.Error("CIF Data updated but Metadata Update failed")
|
||||
}
|
||||
log.Msg.Info("CIF Data does not require updating at this time", zap.Time("Last Update", metadata.LastUpdate))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1 +1,6 @@
|
||||
package cif
|
||||
|
||||
// Handles documents from CIF and VSTP Feeds.
|
||||
// Takes in individual documents, and returns them in the correct format for the Database
|
||||
// Uses types declared in owlboard/go-types/db
|
||||
func DocumentHandler() {}
|
||||
|
@ -4,10 +4,37 @@ import (
|
||||
"errors"
|
||||
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/nrod"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Runs a full update of the CIF Data, discarding any existing data and returns a new metadata struct
|
||||
func runFullUpdate(cfg *helpers.Configuration) (*dbAccess.CifMetadata, error) {
|
||||
log.Msg.Warn("All existing timetable data will be deleted")
|
||||
url, err := getUpdateUrl("full")
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to get update URL", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fullCifData, err := nrod.NrodDownload(url, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to get CIF Data", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Msg.Debug("CIF Data Downloaded", zap.ByteString("CIF Data", fullCifData))
|
||||
|
||||
// I now need to define a processing function and ensure a valid type exists, then I can pass that type to a CIF Put Full function
|
||||
// which will handle placing the data into the database
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Run the specified update type. Update type must be one of 'daily' or 'full'
|
||||
// In the case of daily update, things get complicated as it needs to handle cases where up to five days have been missed.
|
||||
func runUpdate(updateType string, metadata *dbAccess.CifMetadata) (*dbAccess.CifMetadata, error) {
|
||||
url, err := getUpdateUrl(updateType)
|
||||
if err != nil {
|
||||
@ -17,9 +44,11 @@ func runUpdate(updateType string, metadata *dbAccess.CifMetadata) (*dbAccess.Cif
|
||||
log.Msg.Debug("", zap.String("URL", url))
|
||||
return nil, errors.New("function is not yet defined")
|
||||
|
||||
// Fetch Data
|
||||
// Use the values in metadata to determine which day to attempt to update.
|
||||
// Before running any actions on the data, check the sequence number and timestamp againse previous updates
|
||||
// First check if the last update was today, if so, I can return nil, nil - No update required
|
||||
////// If the update was on the previous day, download todays data and check the sequence number and timestamp indicate that todays data is the next file that I need.
|
||||
////// If the sequence number and timestamp indicate I have missed a day, download that days data first, then todays.
|
||||
|
||||
// Write a parsing function that can handle VSTP as well as SCHEDULE data
|
||||
// Handle database management
|
||||
}
|
||||
|
@ -1,76 +1,33 @@
|
||||
package corpus
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/nrod"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const url string = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"
|
||||
|
||||
func fetchCorpus(cfg *helpers.Configuration) (*http.Response, error) {
|
||||
func fetchCorpus(cfg *helpers.Configuration) (*[]byte, error) {
|
||||
log.Msg.Info("Fetching CORPUS Data")
|
||||
client := http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
data, err := nrod.NrodDownload(url, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Failed to create CORPUS Request", zap.Error(err))
|
||||
log.Msg.Error("Corpus update failed")
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass))
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error requesting Corpus", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
log.Msg.Error("Unexpected status code", zap.Int("status_code", resp.StatusCode))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
func extractCorpusResponse(resp *http.Response) (string, error) {
|
||||
log.Msg.Info("Decompressing CORPUS Data")
|
||||
gzReader, err := gzip.NewReader(resp.Body)
|
||||
if err != nil {
|
||||
log.Msg.Error("Corpus response is not gzipped")
|
||||
return "", errors.New("response not gzipped")
|
||||
}
|
||||
|
||||
defer gzReader.Close()
|
||||
|
||||
log.Msg.Info("Reading CORPUS Data")
|
||||
decompressedData, err := io.ReadAll(gzReader)
|
||||
if err != nil {
|
||||
log.Msg.Error("Failed to read decompressed data", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
|
||||
responseBody := string(decompressedData)
|
||||
return responseBody, nil
|
||||
}
|
||||
|
||||
func parseCorpusData(jsonData string) ([]database.CorpusEntry, error) {
|
||||
func parseCorpusData(jsonData *[]byte) ([]database.CorpusEntry, error) {
|
||||
log.Msg.Info("Unmarshalling CORPUS Data")
|
||||
|
||||
var dataMap map[string]interface{}
|
||||
err := json.Unmarshal([]byte(jsonData), &dataMap)
|
||||
err := json.Unmarshal(*jsonData, &dataMap)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to unmarshal CORPUS data", zap.Error(err))
|
||||
}
|
||||
|
@ -14,13 +14,7 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
||||
return err
|
||||
}
|
||||
|
||||
datastring, err := extractCorpusResponse(resp)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error extracting Corpus data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
unsortedCorpusData, err := parseCorpusData(datastring)
|
||||
unsortedCorpusData, err := parseCorpusData(resp)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error parsing Corpus data", zap.Error(err))
|
||||
return err
|
||||
|
73
src/nrod/download.go
Normal file
73
src/nrod/download.go
Normal file
@ -0,0 +1,73 @@
|
||||
package nrod
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Downloads NROD Data over HTTP from the given URL, extracted data is returned
|
||||
func NrodDownload(url string, cfg *helpers.Configuration) ([]byte, error) {
|
||||
log.Msg.Debug("Fetching NROD data", zap.String("Request URL", url))
|
||||
client := http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error creating HTTP Request", zap.String("Request URL", url), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass))
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error carrying out HTTP Request", zap.String("Request URL", url), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
log.Msg.Error("Non-Successful status code from http response", zap.String("Request URL", url), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
readedData, err := nrodExtract(*resp)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to read response data")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return readedData, nil
|
||||
}
|
||||
|
||||
// Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array
|
||||
func nrodExtract(resp http.Response) ([]byte, error) {
|
||||
log.Msg.Debug("Extracting HTTP Response Data")
|
||||
gzReader, err := gzip.NewReader(resp.Body)
|
||||
if err != nil {
|
||||
log.Msg.Warn("Unable to create GZIP Reader, data probably not GZIPPED")
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to read response body")
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
defer gzReader.Close()
|
||||
|
||||
log.Msg.Debug("GZIP Reader Opened")
|
||||
extractedData, err := io.ReadAll(gzReader)
|
||||
if err != nil {
|
||||
log.Msg.Error("Failed to read GZIPped data", zap.Error(err))
|
||||
}
|
||||
|
||||
return extractedData, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user