package dbAccess import ( "context" "errors" "time" "git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/timetable-mgr/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) const Doctype = "CifMetadata" // The type describing the CifMetadata 'type' in the database. // This type will be moved to owlboard/go-types type CifMetadata struct { Doctype string `bson:"type"` LastUpdate time.Time `bson:"lastUpdate"` LastTimestamp int64 `bson:"lastTimestamp"` LastSequence int64 `bson:"lastSequence"` LastUpdateType string `bson:"lastUpdateType"` } // Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example. func GetCifMetadata() (*CifMetadata, error) { database := MongoClient.Database(databaseName) collection := database.Collection(metaCollection) filter := bson.M{"type": Doctype} var result CifMetadata err := collection.FindOne(context.Background(), filter).Decode(&result) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, nil } log.Msg.Error("Error fetching CIF Metadata") return nil, err } log.Msg.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result)) return &result, nil } // Uses upsert to Insert/Update the CifMetadata in the database func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool { database := MongoClient.Database(databaseName) collection := database.Collection(metaCollection) options := options.Update().SetUpsert(true) filter := bson.M{"type": Doctype} update := bson.M{ "$set": bson.M{ "type": Doctype, "lastUpdate": metadata.LastUpdate, "lastTimestamp": metadata.LastTimestamp, "lastSequence": metadata.LastSequence, "lastUpdateType": lastUpdateType, }, } _, err := collection.UpdateOne(context.Background(), filter, update, options) if err != nil { log.Msg.Error("Error updating CIF Metadata", zap.Error(err)) return false } log.Msg.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate)) return true } // Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions. func DeleteCifEntries(deletions []database.DeleteQuery) error { // Skip if deletions is empty if len(deletions) == 0 { log.Msg.Info("No deletions required") return nil } // Prepare deletion tasks collection := MongoClient.Database(databaseName).Collection(timetableCollection) bulkDeletions := make([]mongo.WriteModel, 0, len(deletions)) for _, deleteQuery := range deletions { filter := bson.M{ "trainUid": deleteQuery.TrainUid, "scheduleStartDate": deleteQuery.ScheduleStartDate, "stpIndicator": deleteQuery.StpIndicator, } bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) } bulkWriteOptions := options.BulkWrite().SetOrdered(false) _, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions) if err != nil { log.Msg.Error("Error deleting documents", zap.Error(err)) return err } return nil } // Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation. func CreateCifEntries(schedules []database.Service) error { // Skip if deletions is empty if len(schedules) == 0 { log.Msg.Info("No creations required") return nil } collection := MongoClient.Database(databaseName).Collection(timetableCollection) models := make([]mongo.WriteModel, 0, len(schedules)) for _, s := range schedules { model := mongo.NewInsertOneModel().SetDocument(s) models = append(models, model) } bulkWriteOptions := options.BulkWrite().SetOrdered(false) _, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions) if err != nil { log.Msg.Error("Error inserting documents", zap.Error(err)) return err } return nil }