package dbAccess import ( "context" "errors" "time" "git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/timetable-mgr/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) const Doctype = "CifMetadata" // The type describing the CifMetadata 'type' in the database. // This type will be moved to owlboard/go-types type CifMetadata struct { Doctype string `json:"type"` LastUpdate time.Time `json:"lastUpdate"` LastTimestamp int64 `json:"lastTimestamp"` LastSequence int64 `json:"lastSequence"` } // Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example. func GetCifMetadata() (*CifMetadata, error) { database := MongoClient.Database(databaseName) collection := database.Collection(metaCollection) filter := bson.M{"type": Doctype} var result CifMetadata err := collection.FindOne(context.Background(), filter).Decode(&result) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, nil } log.Msg.Error("Error fetching CIF Metadata") return nil, err } return &result, nil } // Uses upsert to Insert/Update the CifMetadata in the database func PutCifMetadata(metadata CifMetadata) bool { database := MongoClient.Database(databaseName) collection := database.Collection(metaCollection) options := options.Update().SetUpsert(true) filter := bson.M{"type": Doctype} update := bson.M{ "type": Doctype, "LastUpdate": metadata.LastUpdate, "LastTimestamp": metadata.LastTimestamp, "LastSequence": metadata.LastSequence, } _, err := collection.UpdateOne(context.Background(), filter, update, options) if err != nil { log.Msg.Error("Error updating CIF Metadata", zap.Error(err)) return false } return true } // Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions. func DeleteCifEntries(deletions []database.DeleteQuery) error { collection := MongoClient.Database(databaseName).Collection(timetableCollection) // Prepare deletion tasks bulkDeletions := make([]mongo.WriteModel, 0, len(deletions)) for _, deleteQuery := range deletions { filter := bson.M{ "trainUid": deleteQuery.TrainUid, "scheduleStartDate": deleteQuery.ScheduleStartDate, "stpIndicator": deleteQuery.StpIndicator, } bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) } log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions))) result, err := collection.BulkWrite(context.Background(), bulkDeletions) if err != nil { log.Msg.Error("Error deleting documents", zap.Error(err)) return err } log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount)) return nil } // Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation. func CreateCifEntries(schedules []database.Service) error { collection := MongoClient.Database(databaseName).Collection(timetableCollection) models := make([]mongo.WriteModel, 0, len(schedules)) for _, s := range schedules { model := mongo.NewInsertOneModel().SetDocument(s) models = append(models, model) } bulkWriteOptions := options.BulkWrite().SetOrdered(false) log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules))) result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions) if err != nil { log.Msg.Error("Error inserting documents", zap.Error(err)) return err } log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount)) return nil }