timetable-extension #1
@ -64,7 +64,10 @@ func PutCifMetadata(metadata CifMetadata) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions.
|
||||||
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||||
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
|
||||||
// Prepare deletion tasks
|
// Prepare deletion tasks
|
||||||
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
||||||
|
|
||||||
@ -78,16 +81,39 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions)))
|
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions)))
|
||||||
for i := 0; i < len(bulkDeletions); i += batchsize {
|
|
||||||
end := i + batchsize
|
result, err := collection.BulkWrite(context.Background(), bulkDeletions)
|
||||||
if end > len(bulkDeletions) {
|
if err != nil {
|
||||||
end = len(bulkDeletions)
|
log.Msg.Error("Error deleting documents", zap.Error(err))
|
||||||
}
|
return err
|
||||||
_, err := MongoClient.Database(databaseName).Collection(TimetableCollection).BulkWrite(context.Background(), bulkDeletions[i:end])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation.
|
||||||
|
func CreateCifEntries(schedules []database.Service) error {
|
||||||
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
|
||||||
|
models := make([]mongo.WriteModel, 0, len(schedules))
|
||||||
|
|
||||||
|
for _, s := range schedules {
|
||||||
|
model := mongo.NewInsertOneModel().SetDocument(s)
|
||||||
|
models = append(models, model)
|
||||||
|
}
|
||||||
|
|
||||||
|
bulkWriteOptions := options.BulkWrite().SetOrdered(false)
|
||||||
|
|
||||||
|
log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules)))
|
||||||
|
result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.Msg.Error("Error inserting documents", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user