From 7acae49812087323cebf752b0bc71a3310459c42 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Tue, 9 Apr 2024 22:39:35 +0100 Subject: [PATCH] Re-implement processParsedCif() to reduce memory use by 10%. Further reductions are neccessary --- cif/process.go | 68 +++++++++++++++++++++++++++++++------------------ cif/update.go | 10 -------- dbAccess/cif.go | 11 ++------ 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/cif/process.go b/cif/process.go index 3d73208..643244f 100644 --- a/cif/process.go +++ b/cif/process.go @@ -13,40 +13,60 @@ import ( // Processes parsed CIF data and applies the data to the database func processParsedCif(data *parsedData) error { log.Msg.Debug("Starting CIF Processing") - createTasks := make([]*upstreamApi.JsonScheduleV1, 0) - deleteTasks := make([]*upstreamApi.JsonScheduleV1, 0) - for _, item := range data.sched { - switch item.TransactionType { - case "Delete": - deleteItem := item // Create new variable to ensure repetition of pointers - deleteTasks = append(deleteTasks, &deleteItem) - case "Create": - createItem := item // Create new variable to ensure repetition of pointers - createTasks = append(createTasks, &createItem) - default: - log.Msg.Error("Unknown transaction type in CIF Schedule", zap.String("TransactionType", item.TransactionType)) + // Batch size for processing + batchSize := 750 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB + + // Process deletions in batches + for i := 0; i < len(data.sched); i += batchSize { + end := i + batchSize + if end > len(data.sched) { + end = len(data.sched) + } + deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0) + for _, item := range data.sched[i:end] { + if item.TransactionType == "Delete" { + deleteItem := item + deleteBatch = append(deleteBatch, &deleteItem) + } + } + if len(deleteBatch) > 0 { + err := doDeletions(deleteBatch) + if err != nil { + log.Msg.Error("Error deleting CIF Entries", zap.Error(err)) + return err + } } } - err := doDeletions(deleteTasks) - if err != nil { - log.Msg.Error("Error deleting CIF Entries", zap.Error(err)) - return err - } - err = doCreations(createTasks) - if err != nil { - log.Msg.Error("Error creating CIF Entries", zap.Error(err)) - return err + // Process creations in batches + for i := 0; i < len(data.sched); i += batchSize { + end := i + batchSize + if end > len(data.sched) { + end = len(data.sched) + } + createBatch := make([]*upstreamApi.JsonScheduleV1, 0) + for _, item := range data.sched[i:end] { + if item.TransactionType == "Create" { + createItem := item + createBatch = append(createBatch, &createItem) + } + } + if len(createBatch) > 0 { + err := doCreations(createBatch) + if err != nil { + log.Msg.Error("Error creating CIF Entries", zap.Error(err)) + return err + } + } } + log.Msg.Debug("CIF Processing complete") return nil } // Create delete query types and pass to the function which batches the deletions func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { - log.Msg.Info("Preparing CIF update Delete tasks", zap.Int("Delete task count", len(deletions))) - deleteQueries := make([]database.DeleteQuery, 0) for _, item := range deletions { query := database.DeleteQuery{ @@ -69,8 +89,6 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { // Convert to the correct struct for the database and pass to the function which batches insertions func doCreations(creations []*upstreamApi.JsonScheduleV1) error { - log.Msg.Info("Preparing CIF update Create tasks", zap.Int("Create task count", len(creations))) - createDocuments := make([]database.Service, 0) for _, item := range creations { document, err := ConvertServiceType(item, false) diff --git a/cif/update.go b/cif/update.go index 9ffb947..6ed091d 100644 --- a/cif/update.go +++ b/cif/update.go @@ -43,11 +43,6 @@ func runCifFullDownload(cfg *helpers.Configuration) error { // Drop timetable collection dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. - // If debug mode is on, call debugWriteFile - if helpers.Runtime == "debug" { - debugWriteFile(&parsed.header, &parsed.sched) - } - // Process CIF file err = processParsedCif(parsed) if err != nil { @@ -93,11 +88,6 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta // Make `data` a nil pointer as it is no longer required data = nil - // If debug mode is on, call debugWriteFile - if helpers.Runtime == "debug" { - debugWriteFile(&parsed.header, &parsed.sched) - } - log.Msg.Debug("Starting metadata checks") // Check CIF Sequence // Skip if LastSequence is >= to this sequence diff --git a/dbAccess/cif.go b/dbAccess/cif.go index dabeb62..a44274e 100644 --- a/dbAccess/cif.go +++ b/dbAccess/cif.go @@ -91,16 +91,12 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error { bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) } - log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions))) - - result, err := collection.BulkWrite(context.Background(), bulkDeletions) + _, err := collection.BulkWrite(context.Background(), bulkDeletions) if err != nil { log.Msg.Error("Error deleting documents", zap.Error(err)) return err } - log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount)) - return nil } @@ -123,14 +119,11 @@ func CreateCifEntries(schedules []database.Service) error { bulkWriteOptions := options.BulkWrite().SetOrdered(false) - log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules))) - result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions) + _, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions) if err != nil { log.Msg.Error("Error inserting documents", zap.Error(err)) return err } - log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount)) - return nil }