package dbAccess import ( "context" "errors" "time" "git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/timetable-mgr/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) const Doctype = "CifMetadata" // The type describing the CifMetadata 'type' in the database. // This type will be moved to owlboard/go-types type CifMetadata struct { Doctype string `bson:"type"` LastUpdate time.Time `bson:"lastUpdate"` LastTimestamp int64 `bson:"lastTimestamp"` LastSequence int64 `bson:"lastSequence"` LastUpdateType string `bson:"lastUpdateType"` } // Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example. func GetCifMetadata() (*CifMetadata, error) { database := MongoClient.Database(DatabaseName) collection := database.Collection(MetaCollection) filter := bson.M{"type": Doctype} var result CifMetadata err := collection.FindOne(context.Background(), filter).Decode(&result) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, nil } return nil, err } log.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result)) return &result, nil } // Uses upsert to Insert/Update the CifMetadata in the database func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool { database := MongoClient.Database(DatabaseName) collection := database.Collection(MetaCollection) options := options.Update().SetUpsert(true) filter := bson.M{"type": Doctype} update := bson.M{ "$set": bson.M{ "type": Doctype, "lastUpdate": metadata.LastUpdate, "lastTimestamp": metadata.LastTimestamp, "lastSequence": metadata.LastSequence, "lastUpdateType": lastUpdateType, }, } _, err := collection.UpdateOne(context.Background(), filter, update, options) if err != nil { log.Error("Error updating CIF Metadata", zap.Error(err)) return false } log.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate)) return true } // Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions. func DeleteCifEntries(deletions []database.DeleteQuery) error { defer func() { if r := recover(); r != nil { log.Panic("Panic:", zap.Any("panic", r)) } }() // Skip if deletions is empty if len(deletions) == 0 { log.Info("No deletions required") return nil } log.Debug("Running deletions against database", zap.Int("count", len(deletions))) // Prepare deletion tasks collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) bulkDeletions := make([]mongo.WriteModel, 0, len(deletions)) for _, deleteQuery := range deletions { filter := bson.M{ "trainUid": deleteQuery.TrainUid, "scheduleStartDate": deleteQuery.ScheduleStartDate, "stpIndicator": deleteQuery.StpIndicator, } bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) } bulkWriteOptions := options.BulkWrite().SetOrdered(false) _, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions) if err != nil { return err } return nil } // Clears all non-vstp services from the database. Used when a CIF full download is required. func PurgeNonVstp() (int64, error) { coll := MongoClient.Database(DatabaseName).Collection(TimetableCollection) filter := bson.M{"serviceDetail.vstp": false} result, err := coll.DeleteMany(context.Background(), filter) if err != nil { return result.DeletedCount, err } return result.DeletedCount, nil } // Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation. func CreateCifEntries(schedules []database.Service) error { // Skip if deletions is empty if len(schedules) == 0 { log.Info("No creations required") return nil } log.Debug("Running creations against database", zap.Int("count", len(schedules))) collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) models := make([]mongo.WriteModel, 0, len(schedules)) for _, s := range schedules { model := mongo.NewInsertOneModel().SetDocument(s) models = append(models, model) } bulkWriteOptions := options.BulkWrite().SetOrdered(false) _, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions) if err != nil { return err } return nil } // Removes any schedules which ended before 'cutoff' func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) { // Define filter filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}} collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) res, err := collection.DeleteMany(context.Background(), filter) if err != nil { return // Automatically returns named values } count = res.DeletedCount return // Automatically returns names values } // Creates indexes on the Timetable collection... Index suitability needs checking. func CreateTimetableIndexes() error { log.Info("Creating timetable indexes") coll := MongoClient.Database(DatabaseName).Collection(TimetableCollection) indexModels := []mongo.IndexModel{ { Keys: bson.D{ {Key: "trainUid", Value: 1}, {Key: "stpIndicator", Value: 1}, {Key: "scheduleStartDate", Value: 1}, }, Options: options.Index().SetName("delete_query"), }, // The find by UID Query can make use of the delete_query index { Keys: bson.D{ {Key: "headcode", Value: 1}, {Key: "daysRun", Value: 1}, {Key: "scheduleStartDate", Value: 1}, {Key: "scheduleEndDate", Value: 1}, }, Options: options.Index().SetName("find_by_headcode"), }, { Keys: bson.D{ {Key: "serviceDetail.vstp", Value: 1}, }, Options: options.Index().SetName("vstp"), }, } _, err := coll.Indexes().CreateMany(context.Background(), indexModels) if err != nil { return err } return nil }