Further work on CIF handling

This commit is contained in:
Fred Boniface 2024-04-03 22:25:27 +01:00
parent 2374a3ca37
commit 3ebe7bfe11
15 changed files with 160 additions and 53 deletions

View File

@ -9,7 +9,7 @@ import (
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
) )
const frequency = 3 * time.Second // Figure out a sensible frequency! const frequency = 600 * time.Hour // Figure out a sensible frequency!
// Starts a background ticker to run background tasks. Uses the frequency configured in the background/ticker.go file // Starts a background ticker to run background tasks. Uses the frequency configured in the background/ticker.go file
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) { func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
@ -27,7 +27,7 @@ func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
case <-stop: case <-stop:
return return
case <-ticker.C: case <-ticker.C:
go cif.CifCheck(cfg) go cif.CheckCif(cfg)
go corpus.CheckCorpus(cfg) go corpus.CheckCorpus(cfg)
} }
} }

View File

@ -41,13 +41,45 @@ func parseMetadata(metadata *dbAccess.CifMetadata, cfg *helpers.Configuration) e
} }
log.Msg.Debug("Requesting CIF Data Update") log.Msg.Debug("Requesting CIF Data Update")
newMeta, err := runUpdate(metadata, cfg) // When testing done, this function returns newMetadata which needs putting into DB
_, err := runUpdate(metadata, cfg)
if err != nil { if err != nil {
return err return err
} }
ok := dbAccess.PutCifMetadata(*newMeta) //ok := dbAccess.PutCifMetadata(*newMeta)
if !ok { //if !ok {
log.Msg.Error("CIF Data updated but Metadata Update failed") // log.Msg.Error("CIF Data updated but Metadata Update failed")
} //}
return nil return nil
} }
// Checks if the CIF Data needs updating, and what type of update is needed (Full/Partial) and if partial
// what days data needs updating, then calls an update function to handle the update.
func CheckCif(cfg *helpers.Configuration) {
// Check that it is after 0600, if not then skip update
// Not written yet
log.Msg.Info("Checking age of CIF Data")
// Load and read metadata from database
metadata, err := dbAccess.GetCifMetadata()
if err != nil {
log.Msg.Error("Unable to read last update time")
return
}
if metadata == nil {
log.Msg.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Msg.Error("Unable to run full CIF Update")
return
}
return
}
// Check if last update was before today
// Here I need to determine which days I need to update CIF data for, then pass to an appropriate function:
// newMetadata, err := runCifUpdate(days []time.Time, cfg)
}

64
src/cif/parse.go Normal file
View File

@ -0,0 +1,64 @@
package cif
import (
"bytes"
"encoding/json"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Unmarshalls data into the correct types for processing
func parseCifData(data []byte) (*parsedData, error) {
// Split the data into lines
lines := bytes.Split(data, []byte("\n"))
// Initialise variable for the parsed data
var parsed parsedData
parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0)
parsed.sched = make([]upstreamApi.JsonScheduleV1, 0)
for _, line := range lines {
// Skip empty lines to avoid logging errors when there is no error
if len(bytes.TrimSpace(line)) == 0 {
continue
}
// Map each line for processing
var obj map[string]json.RawMessage
if err := json.Unmarshal(line, &obj); err != nil {
log.Msg.Error("Error decoding line", zap.String("line", string(line)), zap.Error(err))
continue
}
// Loop through the mapped data and unmarshal to the correct type
for key, value := range obj {
switch key {
case "JsonTimetableV1":
var timetable upstreamApi.JsonTimetableV1
if err := json.Unmarshal(value, &timetable); err != nil {
log.Msg.Error("Unable to parse JSON Timetable", zap.Error(err), zap.String("line", string(value)))
continue
}
parsed.header = timetable
case "JsonAssociationV1":
var association upstreamApi.JsonAssociationV1
if err := json.Unmarshal(value, &association); err != nil {
log.Msg.Error("Error decoding JSON Association", zap.Error(err))
continue
}
parsed.assoc = append(parsed.assoc, association)
case "JsonScheduleV1":
var schedule upstreamApi.JsonScheduleV1
if err := json.Unmarshal(value, &schedule); err != nil {
log.Msg.Error("Error decoding JSON Schedule", zap.Error(err))
continue
}
parsed.sched = append(parsed.sched, schedule)
}
}
}
return &parsed, nil
}

View File

@ -1,17 +1,21 @@
package cif package cif
import ( import (
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi" "errors"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess" "git.fjla.uk/owlboard/timetable-mgr/dbAccess"
) )
// Probably looking to return some kind of array, containing the database actions. Associations aren't needed at the moment. // Processes all data in the schedule segment of 'data', interacts with the database and returns new metadata
// however, maybe I need to process them too. I need to plan how to return the data properly. func processCifData(metadata *dbAccess.CifMetadata, data *parsedData) (*dbAccess.CifMetadata, error) {
func parseCifData(data []byte, metadata *dbAccess.CifMetadata) (*[]upstreamApi.CifScheduleSegment, error) { if data == nil {
err := errors.New("data is not defined")
return nil, err
}
if metadata == nil {
err := errors.New("metadata is not defined")
return nil, err
}
return nil, nil return nil, nil
} }
// Handles documents from CIF and VSTP Feeds.
// Takes in individual documents, and returns them in the correct format for the Database
// Uses types declared in owlboard/go-types/db
func DocumentHandler() {}

View File

@ -1,7 +1,10 @@
package cif package cif
// I believe that this is not used, instead I opted for the CifMetadata type. import "git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
type CIFUpdate struct {
Timestamp int64 // Holds parsed data for processing
Sequence int64 type parsedData struct {
header upstreamApi.JsonTimetableV1
assoc []upstreamApi.JsonAssociationV1
sched []upstreamApi.JsonScheduleV1
} }

View File

@ -82,21 +82,25 @@ func runUpdate(metadata *dbAccess.CifMetadata, cfg *helpers.Configuration) (*dbA
log.Msg.Error("Error fetching data", zap.Time("date", date)) log.Msg.Error("Error fetching data", zap.Time("date", date))
continue continue
} // parseCifData function needs writing } // parseCifData function needs writing
parsedData, err := parseCifData(data, metadata) parsedData, err := parseCifData(data)
if err != nil { if err != nil {
log.Msg.Error("Error parsing data", zap.Time("date", date)) log.Msg.Error("Error parsing data", zap.Time("date", date))
} }
// Apply data to Database
log.Msg.Info("CIF Data updated", zap.Time("date", date)) if parsedData == nil { // TEMPORARY DEVELOPMENT CHECK
log.Msg.Error("No data parsed")
return nil, nil
} }
// Use the values in metadata to determine which day to attempt to update. // Process data (manages the database interactions and >>returns the new metadata)
// First check if the last update was today, if so, I can return nil, nil - No update required _, err = processCifData(metadata, parsedData)
////// If the update was on the previous day, download todays data and check the sequence number and timestamp indicate that todays data is the next file that I need. if err != nil {
////// If the sequence number and timestamp indicate I have missed a day, download that days data first, then todays. log.Msg.Error("Error processing CIF Data", zap.Error(err))
}
// Write a parsing function that can handle VSTP as well as SCHEDULE data log.Msg.Info("CIF Data updated", zap.Time("date", date))
// Handle database management }
return nil, nil //TEMPORARY
} }
// Fetches CIF Updates for a given day // Fetches CIF Updates for a given day

View File

@ -9,7 +9,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// Checks if the CORPUS Data needs updating, and carrys out the process if needed // Checks if the CORPUS Data needs updating, and calls an updater function if needed.
func CheckCorpus(cfg *helpers.Configuration) { func CheckCorpus(cfg *helpers.Configuration) {
log.Msg.Debug("Checking age of CORPUS Data") log.Msg.Debug("Checking age of CORPUS Data")
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection) utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
@ -29,9 +29,11 @@ func CheckCorpus(cfg *helpers.Configuration) {
err := RunCorpusUpdate(cfg) err := RunCorpusUpdate(cfg)
if err != nil { if err != nil {
log.Msg.Warn("CORPUS Update did not run") log.Msg.Warn("CORPUS Update did not run")
} else {
log.Msg.Info("CORPUS data has been updated")
} }
} else { } else {
log.Msg.Info("CORPUS Data is less than two weeks old") log.Msg.Info("CORPUS Data is less than two weeks old, not updating")
} }
} }

4
src/corpus/constants.go Normal file
View File

@ -0,0 +1,4 @@
package corpus
// The download URL for CORPUS data
const url string = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"

View File

@ -5,25 +5,10 @@ import (
"errors" "errors"
"git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/nrod"
"go.uber.org/zap" "go.uber.org/zap"
) )
const url string = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"
// Fetches CORPUS Data using the nrod.NrodDownload() function and returns the byte array
func fetchCorpus(cfg *helpers.Configuration) (*[]byte, error) {
log.Msg.Info("Fetching CORPUS Data")
data, err := nrod.NrodDownload(url, cfg)
if err != nil {
log.Msg.Error("Corpus update failed")
return nil, err
}
return &data, nil
}
// Accepts CORPUS data as a byte array and formats it ready for database insertion // Accepts CORPUS data as a byte array and formats it ready for database insertion
func parseCorpusData(jsonData *[]byte) ([]database.CorpusEntry, error) { func parseCorpusData(jsonData *[]byte) ([]database.CorpusEntry, error) {
log.Msg.Info("Unmarshalling CORPUS Data") log.Msg.Info("Unmarshalling CORPUS Data")

View File

@ -4,18 +4,19 @@ import (
"git.fjla.uk/owlboard/timetable-mgr/dbAccess" "git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers" "git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/nrod"
"go.uber.org/zap" "go.uber.org/zap"
) )
// Runs all stages of the CORPUS Update process // Runs all stages of the CORPUS Update process
func RunCorpusUpdate(cfg *helpers.Configuration) error { func RunCorpusUpdate(cfg *helpers.Configuration) error {
resp, err := fetchCorpus(cfg) resp, err := nrod.NrodDownload(url, cfg)
if err != nil { if err != nil {
log.Msg.Error("Failed to update Corpus data", zap.Error(err)) log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
return err return err
} }
unsortedCorpusData, err := parseCorpusData(resp) unsortedCorpusData, err := parseCorpusData(&resp)
if err != nil { if err != nil {
log.Msg.Error("Error parsing Corpus data", zap.Error(err)) log.Msg.Error("Error parsing Corpus data", zap.Error(err))
return err return err
@ -27,19 +28,23 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil { if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
log.Msg.Warn("CORPUS data may be incomplete") log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error dropping CORPUS Data", zap.Error(err)) log.Msg.Error("Error dropping CORPUS Data", zap.Error(err))
return err
} }
if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil { if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
log.Msg.Warn("Stations data may be incomplete") log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error dropping stations Data", zap.Error(err)) log.Msg.Error("Error dropping stations Data", zap.Error(err))
return err
} }
if err := dbAccess.PutManyCorpus(corpusData); err != nil { if err := dbAccess.PutManyCorpus(corpusData); err != nil {
log.Msg.Warn("CORPUS data may be incomplete") log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error inserting CORPUS Data", zap.Error(err)) log.Msg.Error("Error inserting CORPUS Data", zap.Error(err))
return err
} }
if err := dbAccess.PutManyStations(stationData); err != nil { if err := dbAccess.PutManyStations(stationData); err != nil {
log.Msg.Warn("Stations data may be incomplete") log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error inserting stations data", zap.Error(err)) log.Msg.Error("Error inserting stations data", zap.Error(err))
return err
} }
return nil return nil

View File

@ -16,6 +16,7 @@ const timetableCollection string = "timetable"
// CAUTION: Drops the collection named in collectionName // CAUTION: Drops the collection named in collectionName
func DropCollection(collectionName string) error { func DropCollection(collectionName string) error {
log.Msg.Info("Dropping collection", zap.String("Collection Name", collectionName))
database := MongoClient.Database(databaseName) database := MongoClient.Database(databaseName)
collection := database.Collection(collectionName) collection := database.Collection(collectionName)

View File

@ -3,7 +3,7 @@ module git.fjla.uk/owlboard/timetable-mgr
go 1.21 go 1.21
require ( require (
git.fjla.uk/owlboard/go-types v0.0.0-20240331204922-8f8899eb6072 git.fjla.uk/owlboard/go-types v0.0.0-20240403200521-41796e25b6c3
github.com/go-stomp/stomp/v3 v3.0.5 github.com/go-stomp/stomp/v3 v3.0.5
go.mongodb.org/mongo-driver v1.12.0 go.mongodb.org/mongo-driver v1.12.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0

View File

@ -2,6 +2,8 @@ git.fjla.uk/owlboard/go-types v0.0.0-20240326154559-f85646ac1a58 h1:8r1oGpD1yG4J
git.fjla.uk/owlboard/go-types v0.0.0-20240326154559-f85646ac1a58/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= git.fjla.uk/owlboard/go-types v0.0.0-20240326154559-f85646ac1a58/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
git.fjla.uk/owlboard/go-types v0.0.0-20240331204922-8f8899eb6072 h1:QjaTVm4bpnXZPrYmJUQhBaaWFVHwpDzKb78wX0xcZu0= git.fjla.uk/owlboard/go-types v0.0.0-20240331204922-8f8899eb6072 h1:QjaTVm4bpnXZPrYmJUQhBaaWFVHwpDzKb78wX0xcZu0=
git.fjla.uk/owlboard/go-types v0.0.0-20240331204922-8f8899eb6072/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= git.fjla.uk/owlboard/go-types v0.0.0-20240331204922-8f8899eb6072/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
git.fjla.uk/owlboard/go-types v0.0.0-20240403200521-41796e25b6c3 h1:veGmL8GeWsgGCeTgPPSDw5tbUr1pUz8F6DBgFMf6IGc=
git.fjla.uk/owlboard/go-types v0.0.0-20240403200521-41796e25b6c3/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -9,6 +9,7 @@ import (
_ "time/tzdata" _ "time/tzdata"
"git.fjla.uk/owlboard/timetable-mgr/background" "git.fjla.uk/owlboard/timetable-mgr/background"
"git.fjla.uk/owlboard/timetable-mgr/corpus"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess" "git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers" "git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
@ -39,8 +40,8 @@ func main() {
// Start CIF Task Ticker // Start CIF Task Ticker
background.InitTicker(cfg, stop) background.InitTicker(cfg, stop)
// Test Corpus Fetching // Test CORPUS Fetching
//corpus.RunCorpusUpdate(cfg) go corpus.CheckCorpus(cfg)
if cfg.VstpOn { if cfg.VstpOn {
messaging.StompInit(cfg) messaging.StompInit(cfg)

Binary file not shown.