Lots more work on CIF processing
This commit is contained in:
parent
2fdb840644
commit
adf745aa61
@ -9,54 +9,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// REWRITING CifCheck into CheckCif (at bottom of file), CifCheck and parseMetadata become redundant
|
||||
|
||||
// Loads CifMetadata and passes it to parseMetadata, this function is what you should call to initiate the CifUpdate process.
|
||||
func CifCheck(cfg *helpers.Configuration) error {
|
||||
log.Msg.Debug("Checking age of CIF Data")
|
||||
metadata, err := dbAccess.GetCifMetadata()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = parseMetadata(metadata, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error updating CIF Data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Requests a full update if no metadata exists, or a daily update if metadata does exist.
|
||||
// The daily update function does further metadata parsing to determine what exactly needs downloading.
|
||||
func parseMetadata(metadata *dbAccess.CifMetadata, cfg *helpers.Configuration) error {
|
||||
if metadata == nil {
|
||||
log.Msg.Info("No metadata, creating Timetable data")
|
||||
newMeta, err := runFullUpdate(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ok := dbAccess.PutCifMetadata(*newMeta)
|
||||
if !ok {
|
||||
log.Msg.Error("CIF Data updated but Metadata Update failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Msg.Debug("Requesting CIF Data Update")
|
||||
// When testing done, this function returns newMetadata which needs putting into DB
|
||||
_, err := runUpdate(metadata, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//ok := dbAccess.PutCifMetadata(*newMeta)
|
||||
//if !ok {
|
||||
// log.Msg.Error("CIF Data updated but Metadata Update failed")
|
||||
//}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks if the CIF Data needs updating, and what type of update is needed (Full/Partial) and if partial
|
||||
// what days data needs updating, then calls an update function to handle the update.
|
||||
func CheckCif(cfg *helpers.Configuration) {
|
||||
@ -75,12 +27,12 @@ func CheckCif(cfg *helpers.Configuration) {
|
||||
return
|
||||
}
|
||||
|
||||
// If no metadata is found in DB, presume no CIF data exists
|
||||
if metadata == nil {
|
||||
log.Msg.Info("Full CIF download required")
|
||||
err := runCifFullDownload(cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to run full CIF Update")
|
||||
return
|
||||
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -91,8 +43,25 @@ func CheckCif(cfg *helpers.Configuration) {
|
||||
return
|
||||
}
|
||||
|
||||
//
|
||||
// Check how many days since last update, if more than 5, run full update, else run update
|
||||
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
|
||||
if daysSinceLastUpdate > 5 {
|
||||
log.Msg.Info("Full CIF download required")
|
||||
err := runCifFullDownload(cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Here I need to determine which days I need to update CIF data for, then pass to an appropriate function:
|
||||
// newMetadata, err := runCifUpdate(days []time.Time, cfg)
|
||||
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
|
||||
|
||||
// Run the update
|
||||
log.Msg.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
|
||||
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to run CIF update", zap.Error(err))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -51,3 +51,20 @@ func howManyDaysAgo(t time.Time) int {
|
||||
days := int(diff.Hours() / 24)
|
||||
return days
|
||||
}
|
||||
|
||||
// Generates a slice of time.Time values representing which days files need downloading
|
||||
func generateUpdateDays(days int) []time.Time {
|
||||
var updateDays []time.Time
|
||||
|
||||
for i := 0; i < days; i++ {
|
||||
day := time.Now().Add(-time.Duration(i) * 24 * time.Hour)
|
||||
updateDays = append(updateDays, day)
|
||||
}
|
||||
|
||||
// Reverse slice to ensure chronological order
|
||||
for i, j := 0, len(updateDays)-1; i < j; i, j = i+1, j-1 {
|
||||
updateDays[i], updateDays[j] = updateDays[j], updateDays[i]
|
||||
}
|
||||
|
||||
return updateDays
|
||||
}
|
||||
|
@ -17,3 +17,76 @@ func TestIsSameDay(t *testing.T) {
|
||||
t.Errorf("Error in isSameDay(notToday). Expected false, got true.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHowManyDaysAgo(t *testing.T) {
|
||||
testCases := []struct {
|
||||
input time.Time
|
||||
expected int
|
||||
}{
|
||||
{time.Now(), 0}, // Today
|
||||
{time.Now().Add(-24 * time.Hour), 1}, // Yesterday
|
||||
{time.Now().Add(-48 * time.Hour), 2}, // Ereyesterday
|
||||
{time.Now().Add(24 * time.Hour), -1}, // Tomorrow
|
||||
{time.Now().Add(48 * time.Hour), -2}, // Overmorrow
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
result := howManyDaysAgo(tc.input)
|
||||
if result != tc.expected {
|
||||
t.Errorf("For input %v, expected %d but got %d", tc.input, tc.expected, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetDayString(t *testing.T) {
|
||||
testCases := []struct {
|
||||
input time.Time
|
||||
expected string
|
||||
}{ // Note that the test times are in UTC, but days are checked in Europe/London
|
||||
{time.Date(2024, time.April, 7, 0, 0, 0, 0, time.UTC), "sun"},
|
||||
{time.Date(2024, time.April, 4, 21, 0, 0, 0, time.UTC), "thu"},
|
||||
{time.Date(2001, time.September, 11, 12, 46, 0, 0, time.UTC), "tue"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
result := getDayString(tc.input)
|
||||
if result != tc.expected {
|
||||
t.Errorf("For input %v, expected %s, but got %s", tc.input, tc.expected, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateUpdateDays(t *testing.T) {
|
||||
testCases := []struct {
|
||||
days int
|
||||
expected []time.Time
|
||||
}{
|
||||
{1, []time.Time{time.Now()}},
|
||||
{2, []time.Time{time.Now().Add(-24 * time.Hour), time.Now()}},
|
||||
{4, []time.Time{time.Now().Add(-72 * time.Hour),
|
||||
time.Now().Add(-48 * time.Hour),
|
||||
time.Now().Add(-24 * time.Hour),
|
||||
time.Now(),
|
||||
}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
result := generateUpdateDays(tc.days)
|
||||
|
||||
if len(result) != len(tc.expected) {
|
||||
t.Errorf("For %d days, expected %v, but got %v", tc.days, tc.expected, result)
|
||||
continue
|
||||
}
|
||||
|
||||
for i := range result {
|
||||
if !isSameDate(result[i], tc.expected[i]) {
|
||||
t.Errorf("For %d days, expected %v, but got %v", tc.days, tc.expected, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if two time values have the same year, month and day.
|
||||
func isSameDate(t1, t2 time.Time) bool {
|
||||
return t1.Year() == t2.Year() && t1.Month() == t2.Month() && t1.Day() == t2.Day()
|
||||
}
|
||||
|
105
src/cif/parse_test.go
Normal file
105
src/cif/parse_test.go
Normal file
@ -0,0 +1,105 @@
|
||||
package cif
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||
)
|
||||
|
||||
func TestParseCifData(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
input []byte
|
||||
expected *parsedData
|
||||
}
|
||||
}
|
||||
|
||||
func returnParsedData() *parsedData {
|
||||
var parsed = {
|
||||
name: "Valid JSON Data 1",
|
||||
input: []byte(`
|
||||
{"JsonTimetableV1":{"classification":"public","timestamp":1711227636,"owner":"Network Rail","Sender":{"organisation":"Rockshore","application":"NTROD","component":"SCHEDULE"},"Metadata":{"type":"update","sequence":4307}}}
|
||||
{"JsonAssociationV1":{"transaction_type":"Create","main_train_uid":"L65283","assoc_train_uid":"L65245","assoc_start_date":"2023-12-11T00:00:00Z","assoc_end_date":"2024-03-28T00:00:00Z","assoc_days":"1111000","category":" ","date_indicator":" ","location":"RDNGSTN","base_location_suffix":null,"assoc_location_suffix":null,"diagram_type":"T","CIF_stp_indicator":"C"}}
|
||||
{"JsonAssociationV1":{"transaction_type":"Delete","main_train_uid":"L65283","assoc_train_uid":"L65245","assoc_start_date":"2024-03-18T00:00:00Z","location":"RDNGSTN","base_location_suffix":null,"diagram_type":"T","CIF_stp_indicator":"C"}}
|
||||
{"JsonScheduleV1":{"CIF_stp_indicator":"O","CIF_train_uid":"C25513","schedule_start_date":"2024-03-23","transaction_type":"Delete"}}
|
||||
{"JsonScheduleV1":{"CIF_bank_holiday_running":null,"CIF_stp_indicator":"O","CIF_train_uid":"Y04345","applicable_timetable":"Y","atoc_code":"SW","new_schedule_segment":{"traction_class":"","uic_code":""},"schedule_days_runs":"0000001","schedule_end_date":"2024-05-19","schedule_segment":{"signalling_id":"5A20","CIF_train_category":"EE","CIF_headcode":"00","CIF_course_indicator":1,"CIF_train_service_code":"24676004","CIF_business_sector":"??","CIF_power_type":"EMU","CIF_timing_load":null,"CIF_speed":"100","CIF_operating_characteristics":"D","CIF_train_class":null,"CIF_sleepers":null,"CIF_reservations":null,"CIF_connection_indicator":null,"CIF_catering_code":null,"CIF_service_branding":"","schedule_location":[{"location_type":"LO","record_identity":"LO","tiploc_code":"FARNHMD","tiploc_instance":null,"departure":"0721","public_departure":null,"platform":null,"line":null,"engineering_allowance":null,"pathing_allowance":null,"performance_allowance":null},{"location_type":"LT","record_identity":"LT","tiploc_code":"FARNHAM","tiploc_instance":null,"platform":"1","arrival":"0729","public_arrival":null,"path":null}]},"schedule_start_date":"2024-05-19","train_status":"T","transaction_type":"Create"}}
|
||||
{"EOF":true}
|
||||
`),
|
||||
expected: &parsedData{
|
||||
header: upstreamApi.JsonTimetableV1{
|
||||
Classification: "public",
|
||||
Timestamp: 1711227636,
|
||||
Owner: "Network Rail",
|
||||
Sender: upstreamApi.TimetableSender{
|
||||
Organisation: "Rockshore",
|
||||
Application: "NTROD",
|
||||
Component: "SCHEDULE",
|
||||
},
|
||||
Metadata: upstreamApi.TimetableMetadata{
|
||||
Type: "update",
|
||||
Sequence: 4307,
|
||||
},
|
||||
},
|
||||
assoc: []upstreamApi.JsonAssociationV1{
|
||||
TransactionType: "Create",
|
||||
MainTrainUid: "L65283",
|
||||
AssocTrainUid: "L65245",
|
||||
AssocStartDate: time.Date(2023, time.December, 11, 0, 0, 0, 0, time.UTC),
|
||||
AssocEndDate: time.Date(2024, time.March, 28, 0, 0, 0, 0, time.UTC),
|
||||
AssocDays: "1111000",
|
||||
Location: "RDNGSTN",
|
||||
DiagramType: "T",
|
||||
CifStpIndicator: "C",
|
||||
}, {
|
||||
TransactionType: "Delete",
|
||||
MainTrainUid: "L65283",
|
||||
AssocTrainUid: "L65245",
|
||||
AssocStartDate: time.Date(2024, time.March, 18, 0, 0, 0, 0, time.UTC),
|
||||
Location: "RDNGSTN",
|
||||
DiagramType: "T",
|
||||
CifStpIndicator: "C",
|
||||
},
|
||||
sched: []upstreamApi.JsonScheduleV1{
|
||||
CifStpIndicator: "O",
|
||||
CifTrainUid: "C25513",
|
||||
ScheduleStartDate: "2024-03-23",
|
||||
TransactionType: "Delete",
|
||||
}, {
|
||||
CifStpIndicator: "O",
|
||||
CifTrainUid: "Y04345",
|
||||
ApplicableTimetable: "Y",
|
||||
AtocCode: "SW",
|
||||
NewScheduleSegment: upstreamApi.CifNewScheduleSegment{},
|
||||
ScheduleDaysRun: "0000001",
|
||||
ScheduleEndDate: "2024-05-19",
|
||||
ScheduleSegment: upstreamApi.CifScheduleSegment{
|
||||
SignallingId: "5A20",
|
||||
CifTrainCategory: "EE",
|
||||
CifHeadcode: "OO",
|
||||
CifCourseIndicator: 1,
|
||||
CifTrainServiceCode: "24676004",
|
||||
CifBusinessSector: "??",
|
||||
CifPowerType: "EMU",
|
||||
CifSpeed: "100",
|
||||
CifOperatingCharacteristics: "D",
|
||||
ScheduleLocation: []upstreamApi.CifScheduleLocation{
|
||||
LocationType: "LO",
|
||||
RecordIdentity: "LO",
|
||||
TiplocCode: "FARNHMD",
|
||||
Departure: "0721",
|
||||
}, {
|
||||
LocationType: "LT",
|
||||
RecordIdentity: "LT",
|
||||
TiplocCode: "FARNHAM",
|
||||
Platform: "1",
|
||||
Arrival: "0729",
|
||||
},
|
||||
},
|
||||
ScheduleStartDate: "2024-05-19",
|
||||
TrainStatus: "T",
|
||||
TransactionType: "Create",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
@ -1,21 +1,6 @@
|
||||
package cif
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
)
|
||||
|
||||
// Processes all data in the schedule segment of 'data', interacts with the database and returns new metadata
|
||||
func processCifData(metadata *dbAccess.CifMetadata, data *parsedData) (*dbAccess.CifMetadata, error) {
|
||||
if data == nil {
|
||||
err := errors.New("data is not defined")
|
||||
return nil, err
|
||||
}
|
||||
if metadata == nil {
|
||||
err := errors.New("metadata is not defined")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
// Processes parsed CIF data and applies the data to the database
|
||||
func processParsedCif(data *parsedData) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package cif
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
@ -11,105 +10,79 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Runs a full update of the CIF Data, discarding any existing data and returns a new metadata struct
|
||||
func runFullUpdate(cfg *helpers.Configuration) (*dbAccess.CifMetadata, error) {
|
||||
log.Msg.Warn("All existing timetable data will be deleted")
|
||||
// Replaces all existing CIF Data with a new download
|
||||
func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||
log.Msg.Info("Downloading all CIF Data")
|
||||
|
||||
// Download CIF Data file
|
||||
url, err := getUpdateUrl("full")
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to get update URL", zap.Error(err))
|
||||
return nil, err
|
||||
log.Msg.Error("Error getting download URL", zap.Error(err))
|
||||
}
|
||||
|
||||
fullCifData, err := nrod.NrodDownload(url, cfg)
|
||||
data, err := nrod.NrodDownload(url, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to get CIF Data", zap.Error(err))
|
||||
return nil, err
|
||||
log.Msg.Error("Error downloading CIF data", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Msg.Debug("CIF Data Downloaded", zap.ByteString("CIF Data", fullCifData))
|
||||
// Parse CIF file
|
||||
parsed, err := parseCifData(data)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error parsing CIF data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// I now need to define a processing function and ensure a valid type exists, then I can pass that type to a CIF Put Full function
|
||||
// which will handle placing the data into the database
|
||||
// Drop timetable collection
|
||||
dbAccess.DropCollection(dbAccess.TimetableCollection)
|
||||
|
||||
return nil, nil
|
||||
// Process CIF file
|
||||
err = processParsedCif(parsed)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error processing CIF data", zap.Error(err))
|
||||
}
|
||||
|
||||
// Generate & Write metadata
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Runs the daily update for CIF Data, can handle up to five days updates at once.
|
||||
func runUpdate(metadata *dbAccess.CifMetadata, cfg *helpers.Configuration) (*dbAccess.CifMetadata, error) {
|
||||
// Do not run update if last update was on same day
|
||||
if isSameToday(metadata.LastUpdate) {
|
||||
log.Msg.Info("No CIF Update Required", zap.Time("Last update", metadata.LastUpdate))
|
||||
return nil, nil
|
||||
}
|
||||
// Runs a CIF Update for up to five days
|
||||
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
|
||||
log.Msg.Info("Downloading CIF Updates")
|
||||
|
||||
// Do not run update before 0600 as todays data will not be available
|
||||
if time.Now().Hour() < 6 {
|
||||
log.Msg.Info("Too early to update CIF Data")
|
||||
return nil, nil
|
||||
}
|
||||
// Loop over dates
|
||||
for _, time := range days {
|
||||
log.Msg.Info("Downloading CIF File", zap.Time("CIF Data from", time))
|
||||
|
||||
// Check how many days ago last update was
|
||||
lastUpateDays := howManyDaysAgo(metadata.LastUpdate)
|
||||
if lastUpateDays > 5 {
|
||||
log.Msg.Warn("CIF Data is more than five days old. Running Full Update")
|
||||
newMeta, err := runFullUpdate(cfg)
|
||||
// Download CIF data file
|
||||
data, err := fetchUpdate(time, cfg)
|
||||
if err != nil {
|
||||
log.Msg.Error("CIF Update failed", zap.Error(err))
|
||||
return nil, err
|
||||
log.Msg.Error("Error fetching CIF update", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return newMeta, nil
|
||||
}
|
||||
|
||||
// Create a slice containing which dates need updating
|
||||
firstUpdate := time.Now().In(time.UTC).AddDate(0, 0, -lastUpateDays)
|
||||
finalUpdate := time.Now().In(time.UTC)
|
||||
var dates []time.Time
|
||||
|
||||
for d := firstUpdate; d.Before(finalUpdate) || d.Equal(finalUpdate); d = d.AddDate(0, 0, 1) {
|
||||
dates = append(dates, d)
|
||||
}
|
||||
|
||||
sort.Slice(dates, func(i, j int) bool {
|
||||
return dates[i].Before(dates[j])
|
||||
})
|
||||
|
||||
log.Msg.Info("Updating CIF Data", zap.Any("dates to update", dates))
|
||||
|
||||
// Iterate over each date, fetching then parsing the data
|
||||
for _, date := range dates {
|
||||
data, err := fetchUpdate(date, cfg)
|
||||
// Parse CIF file
|
||||
parsed, err := parseCifData(data)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error fetching data", zap.Time("date", date))
|
||||
continue
|
||||
} // parseCifData function needs writing
|
||||
parsedData, err := parseCifData(data)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error parsing data", zap.Time("date", date))
|
||||
log.Msg.Error("Error parsing CIF data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if parsedData == nil { // TEMPORARY DEVELOPMENT CHECK
|
||||
log.Msg.Error("No data parsed")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Process data (manages the database interactions and >>returns the new metadata)
|
||||
_, err = processCifData(metadata, parsedData)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error processing CIF Data", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Msg.Info("CIF Data updated", zap.Time("date", date))
|
||||
// Check metadata sequence - Handle a metadata sequence error. Probably by deleting metadata so next update triggers full download
|
||||
//// I need to check what the sequence looks like in a full download first.
|
||||
// Process CIF file
|
||||
// Generate & Write metadata
|
||||
}
|
||||
return nil, nil //TEMPORARY
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fetches CIF Updates for a given day
|
||||
// Wraps nrod.NrodDownload() into a function which can handle downloading data for a given day
|
||||
func fetchUpdate(t time.Time, cfg *helpers.Configuration) ([]byte, error) {
|
||||
url, err := getUpdateUrl("daily")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Append day string to URL
|
||||
url = url + getDayString(t)
|
||||
|
||||
downloadedData, err := nrod.NrodDownload(url, cfg)
|
||||
|
@ -12,12 +12,6 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
const databaseName string = "owlboard"
|
||||
const CorpusCollection string = "corpus"
|
||||
const StationsCollection string = "stations"
|
||||
const metaCollection string = "meta"
|
||||
const TimetableCollection string = "timetable"
|
||||
|
||||
// Provide the DB Connection to other functions
|
||||
var MongoClient (*mongo.Client)
|
||||
|
||||
|
7
src/dbAccess/contants.go
Normal file
7
src/dbAccess/contants.go
Normal file
@ -0,0 +1,7 @@
|
||||
package dbAccess
|
||||
|
||||
const databaseName string = "owlboard"
|
||||
const CorpusCollection string = "corpus"
|
||||
const StationsCollection string = "stations"
|
||||
const metaCollection string = "meta"
|
||||
const TimetableCollection string = "timetable"
|
Loading…
Reference in New Issue
Block a user