Compare commits

...

4 Commits

4 changed files with 129 additions and 12 deletions

View File

@ -1,8 +1,11 @@
package cif
import (
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
@ -51,6 +54,13 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
deleteQueries = append(deleteQueries, query)
}
err := dbAccess.DeleteCifEntries(deleteQueries)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
return err
}
return nil
}
@ -61,8 +71,28 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
createDocuments := make([]database.Service, 0)
for _, item := range creations {
document := database.Service{}
// Do type conversion here - REMOVE THIS LOG LINE, IT WILL CAUSE 10000s of log entries
log.Msg.Debug("item", zap.Any("item", item))
createDocuments = append(createDocuments, document)
}
err := dbAccess.CreateCifEntries(createDocuments)
if err != nil {
log.Msg.Error("Error creating documents", zap.Error(err))
return err
}
return nil
}
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
newMetadata := dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastUpdate: time.Now().In(londonTimezone),
LastSequence: header.Metadata.Sequence,
}
return &newMetadata
}

57
cif/process_test.go Normal file
View File

@ -0,0 +1,57 @@
package cif
import (
"testing"
"time"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
)
func TestGenerateMetadata(t *testing.T) {
header := &upstreamApi.JsonTimetableV1{
Classification: "public",
Timestamp: 1711227636,
Owner: "Network Rail",
Sender: upstreamApi.TimetableSender{
Organisation: "Rockshore",
Application: "NTROD",
Component: "SCHEDULE",
},
Metadata: upstreamApi.TimetableMetadata{
Type: "update",
Sequence: 4307,
},
}
expected := &dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastSequence: header.Metadata.Sequence,
LastUpdate: time.Now().In(londonTimezone),
}
result := generateMetadata(header)
if result == nil {
t.Errorf("generateMetadata returned nil pointer")
}
if result.Doctype != expected.Doctype {
t.Errorf("Doctype: expected %s, got %s", expected.Doctype, result.Doctype)
}
if result.LastTimestamp != expected.LastTimestamp {
t.Errorf("LastTimestamp: expected %d, got %d", expected.LastTimestamp, result.LastTimestamp)
}
if result.LastSequence != expected.LastSequence {
t.Errorf("LastSequence: expected %d, got %d", expected.LastSequence, result.LastSequence)
}
tolerance := time.Second
if !result.LastUpdate.Before(expected.LastUpdate.Add(tolerance)) ||
!result.LastUpdate.After(expected.LastUpdate.Add(-tolerance)) {
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
}
}

View File

@ -60,7 +60,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
return err
}
// Parse CIF file
parsed, err = parseCifData(data)
parsed, err := parseCifData(data)
if err != nil {
log.Msg.Error("Error parsing CIF data", zap.Error(err))
return err
@ -68,10 +68,14 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
// Check metadata sequence - Handle a metadata sequence error. Probably by deleting metadata so next update triggers full download
//// I need to check what the sequence looks like in a full download first.
//// Regarding metadata, I will need to replace 'metadata *dbAccess.CifMetadata' with the new metadata to ensure it is checked correctly in each iteration.
// Process CIF file
// Generate & Write metadata
metadata = generateMetadata(&parsed.header)
}
// Write metadata
return nil
}

View File

@ -64,7 +64,10 @@ func PutCifMetadata(metadata CifMetadata) bool {
return true
}
// Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions.
func DeleteCifEntries(deletions []database.DeleteQuery) error {
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
// Prepare deletion tasks
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
@ -78,16 +81,39 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
}
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions)))
for i := 0; i < len(bulkDeletions); i += batchsize {
end := i + batchsize
if end > len(bulkDeletions) {
end = len(bulkDeletions)
}
_, err := MongoClient.Database(databaseName).Collection(TimetableCollection).BulkWrite(context.Background(), bulkDeletions[i:end])
if err != nil {
return err
}
result, err := collection.BulkWrite(context.Background(), bulkDeletions)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
return err
}
log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount))
return nil
}
// Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation.
func CreateCifEntries(schedules []database.Service) error {
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
models := make([]mongo.WriteModel, 0, len(schedules))
for _, s := range schedules {
model := mongo.NewInsertOneModel().SetDocument(s)
models = append(models, model)
}
bulkWriteOptions := options.BulkWrite().SetOrdered(false)
log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules)))
result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil {
log.Msg.Error("Error inserting documents", zap.Error(err))
return err
}
log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount))
return nil
}