Compare commits
No commits in common. "3481c4e314b3d881b75d815ad43096f2a5c705fe" and "43d89119bf9dfe2e48b8a6c7b58f3c1a4e143f76" have entirely different histories.
3481c4e314
...
43d89119bf
|
@ -1,11 +1,8 @@
|
|||
package cif
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -54,13 +51,6 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
|||
|
||||
deleteQueries = append(deleteQueries, query)
|
||||
}
|
||||
|
||||
err := dbAccess.DeleteCifEntries(deleteQueries)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error deleting documents", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -71,28 +61,8 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
|
|||
createDocuments := make([]database.Service, 0)
|
||||
for _, item := range creations {
|
||||
document := database.Service{}
|
||||
// Do type conversion here - REMOVE THIS LOG LINE, IT WILL CAUSE 10000s of log entries
|
||||
log.Msg.Debug("item", zap.Any("item", item))
|
||||
|
||||
createDocuments = append(createDocuments, document)
|
||||
}
|
||||
|
||||
err := dbAccess.CreateCifEntries(createDocuments)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error creating documents", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
|
||||
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
|
||||
newMetadata := dbAccess.CifMetadata{
|
||||
Doctype: dbAccess.Doctype,
|
||||
LastTimestamp: header.Timestamp,
|
||||
LastUpdate: time.Now().In(londonTimezone),
|
||||
LastSequence: header.Metadata.Sequence,
|
||||
}
|
||||
|
||||
return &newMetadata
|
||||
}
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
package cif
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||
)
|
||||
|
||||
func TestGenerateMetadata(t *testing.T) {
|
||||
header := &upstreamApi.JsonTimetableV1{
|
||||
Classification: "public",
|
||||
Timestamp: 1711227636,
|
||||
Owner: "Network Rail",
|
||||
Sender: upstreamApi.TimetableSender{
|
||||
Organisation: "Rockshore",
|
||||
Application: "NTROD",
|
||||
Component: "SCHEDULE",
|
||||
},
|
||||
Metadata: upstreamApi.TimetableMetadata{
|
||||
Type: "update",
|
||||
Sequence: 4307,
|
||||
},
|
||||
}
|
||||
|
||||
expected := &dbAccess.CifMetadata{
|
||||
Doctype: dbAccess.Doctype,
|
||||
LastTimestamp: header.Timestamp,
|
||||
LastSequence: header.Metadata.Sequence,
|
||||
LastUpdate: time.Now().In(londonTimezone),
|
||||
}
|
||||
|
||||
result := generateMetadata(header)
|
||||
|
||||
if result == nil {
|
||||
t.Errorf("generateMetadata returned nil pointer")
|
||||
}
|
||||
|
||||
if result.Doctype != expected.Doctype {
|
||||
t.Errorf("Doctype: expected %s, got %s", expected.Doctype, result.Doctype)
|
||||
}
|
||||
|
||||
if result.LastTimestamp != expected.LastTimestamp {
|
||||
t.Errorf("LastTimestamp: expected %d, got %d", expected.LastTimestamp, result.LastTimestamp)
|
||||
}
|
||||
|
||||
if result.LastSequence != expected.LastSequence {
|
||||
t.Errorf("LastSequence: expected %d, got %d", expected.LastSequence, result.LastSequence)
|
||||
}
|
||||
|
||||
tolerance := time.Second
|
||||
if !result.LastUpdate.Before(expected.LastUpdate.Add(tolerance)) ||
|
||||
!result.LastUpdate.After(expected.LastUpdate.Add(-tolerance)) {
|
||||
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
|
||||
}
|
||||
}
|
|
@ -60,7 +60,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
|
|||
return err
|
||||
}
|
||||
// Parse CIF file
|
||||
parsed, err := parseCifData(data)
|
||||
parsed, err = parseCifData(data)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error parsing CIF data", zap.Error(err))
|
||||
return err
|
||||
|
@ -68,14 +68,10 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
|
|||
|
||||
// Check metadata sequence - Handle a metadata sequence error. Probably by deleting metadata so next update triggers full download
|
||||
//// I need to check what the sequence looks like in a full download first.
|
||||
//// Regarding metadata, I will need to replace 'metadata *dbAccess.CifMetadata' with the new metadata to ensure it is checked correctly in each iteration.
|
||||
// Process CIF file
|
||||
|
||||
metadata = generateMetadata(&parsed.header)
|
||||
// Generate & Write metadata
|
||||
}
|
||||
|
||||
// Write metadata
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -64,10 +64,7 @@ func PutCifMetadata(metadata CifMetadata) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions.
|
||||
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
|
||||
// Prepare deletion tasks
|
||||
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
||||
|
||||
|
@ -81,39 +78,16 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
|||
}
|
||||
|
||||
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions)))
|
||||
|
||||
result, err := collection.BulkWrite(context.Background(), bulkDeletions)
|
||||
for i := 0; i < len(bulkDeletions); i += batchsize {
|
||||
end := i + batchsize
|
||||
if end > len(bulkDeletions) {
|
||||
end = len(bulkDeletions)
|
||||
}
|
||||
_, err := MongoClient.Database(databaseName).Collection(TimetableCollection).BulkWrite(context.Background(), bulkDeletions[i:end])
|
||||
if err != nil {
|
||||
log.Msg.Error("Error deleting documents", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation.
|
||||
func CreateCifEntries(schedules []database.Service) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
|
||||
models := make([]mongo.WriteModel, 0, len(schedules))
|
||||
|
||||
for _, s := range schedules {
|
||||
model := mongo.NewInsertOneModel().SetDocument(s)
|
||||
models = append(models, model)
|
||||
}
|
||||
|
||||
bulkWriteOptions := options.BulkWrite().SetOrdered(false)
|
||||
|
||||
log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules)))
|
||||
result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error inserting documents", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue