timetable-extension #1

Open
fred.boniface wants to merge 160 commits from timetable-extension into main
3 changed files with 45 additions and 44 deletions
Showing only changes of commit 7acae49812 - Show all commits

View File

@ -13,40 +13,60 @@ import (
// Processes parsed CIF data and applies the data to the database // Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error { func processParsedCif(data *parsedData) error {
log.Msg.Debug("Starting CIF Processing") log.Msg.Debug("Starting CIF Processing")
createTasks := make([]*upstreamApi.JsonScheduleV1, 0)
deleteTasks := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched { // Batch size for processing
switch item.TransactionType { batchSize := 750 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
case "Delete":
deleteItem := item // Create new variable to ensure repetition of pointers // Process deletions in batches
deleteTasks = append(deleteTasks, &deleteItem) for i := 0; i < len(data.sched); i += batchSize {
case "Create": end := i + batchSize
createItem := item // Create new variable to ensure repetition of pointers if end > len(data.sched) {
createTasks = append(createTasks, &createItem) end = len(data.sched)
default: }
log.Msg.Error("Unknown transaction type in CIF Schedule", zap.String("TransactionType", item.TransactionType)) deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
if item.TransactionType == "Delete" {
deleteItem := item
deleteBatch = append(deleteBatch, &deleteItem)
} }
} }
if len(deleteBatch) > 0 {
err := doDeletions(deleteTasks) err := doDeletions(deleteBatch)
if err != nil { if err != nil {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err)) log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
return err return err
} }
err = doCreations(createTasks) }
}
// Process creations in batches
for i := 0; i < len(data.sched); i += batchSize {
end := i + batchSize
if end > len(data.sched) {
end = len(data.sched)
}
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
if item.TransactionType == "Create" {
createItem := item
createBatch = append(createBatch, &createItem)
}
}
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil { if err != nil {
log.Msg.Error("Error creating CIF Entries", zap.Error(err)) log.Msg.Error("Error creating CIF Entries", zap.Error(err))
return err return err
} }
}
}
log.Msg.Debug("CIF Processing complete")
return nil return nil
} }
// Create delete query types and pass to the function which batches the deletions // Create delete query types and pass to the function which batches the deletions
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
log.Msg.Info("Preparing CIF update Delete tasks", zap.Int("Delete task count", len(deletions)))
deleteQueries := make([]database.DeleteQuery, 0) deleteQueries := make([]database.DeleteQuery, 0)
for _, item := range deletions { for _, item := range deletions {
query := database.DeleteQuery{ query := database.DeleteQuery{
@ -69,8 +89,6 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
// Convert to the correct struct for the database and pass to the function which batches insertions // Convert to the correct struct for the database and pass to the function which batches insertions
func doCreations(creations []*upstreamApi.JsonScheduleV1) error { func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
log.Msg.Info("Preparing CIF update Create tasks", zap.Int("Create task count", len(creations)))
createDocuments := make([]database.Service, 0) createDocuments := make([]database.Service, 0)
for _, item := range creations { for _, item := range creations {
document, err := ConvertServiceType(item, false) document, err := ConvertServiceType(item, false)

View File

@ -43,11 +43,6 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
// Drop timetable collection // Drop timetable collection
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
// If debug mode is on, call debugWriteFile
if helpers.Runtime == "debug" {
debugWriteFile(&parsed.header, &parsed.sched)
}
// Process CIF file // Process CIF file
err = processParsedCif(parsed) err = processParsedCif(parsed)
if err != nil { if err != nil {
@ -93,11 +88,6 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
// Make `data` a nil pointer as it is no longer required // Make `data` a nil pointer as it is no longer required
data = nil data = nil
// If debug mode is on, call debugWriteFile
if helpers.Runtime == "debug" {
debugWriteFile(&parsed.header, &parsed.sched)
}
log.Msg.Debug("Starting metadata checks") log.Msg.Debug("Starting metadata checks")
// Check CIF Sequence // Check CIF Sequence
// Skip if LastSequence is >= to this sequence // Skip if LastSequence is >= to this sequence

View File

@ -91,16 +91,12 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter))
} }
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions))) _, err := collection.BulkWrite(context.Background(), bulkDeletions)
result, err := collection.BulkWrite(context.Background(), bulkDeletions)
if err != nil { if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err)) log.Msg.Error("Error deleting documents", zap.Error(err))
return err return err
} }
log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount))
return nil return nil
} }
@ -123,14 +119,11 @@ func CreateCifEntries(schedules []database.Service) error {
bulkWriteOptions := options.BulkWrite().SetOrdered(false) bulkWriteOptions := options.BulkWrite().SetOrdered(false)
log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules))) _, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil { if err != nil {
log.Msg.Error("Error inserting documents", zap.Error(err)) log.Msg.Error("Error inserting documents", zap.Error(err))
return err return err
} }
log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount))
return nil return nil
} }