package cif import ( "git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" "git.fjla.uk/owlboard/timetable-mgr/dbAccess" "git.fjla.uk/owlboard/timetable-mgr/log" "go.uber.org/zap" ) // Processes parsed CIF data and applies the data to the database func ProcessParsedCif(data *ParsedData) error { log.Debug("Starting CIF Processing") log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched))) // Batch size for processing batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB // Process deletions in batches for i := 0; i < len(data.Sched); i += batchSize { end := i + batchSize if end > len(data.Sched) { end = len(data.Sched) } deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0) for _, item := range data.Sched[i:end] { if item.TransactionType == "Delete" { deleteItem := item deleteBatch = append(deleteBatch, &deleteItem) } } if len(deleteBatch) > 0 { err := doDeletions(deleteBatch) if err != nil { log.Error("Error deleting CIF Entries", zap.Error(err)) return err } } } // Process creations in batches for i := 0; i < len(data.Sched); i += batchSize { end := i + batchSize if end > len(data.Sched) { end = len(data.Sched) } createBatch := make([]*upstreamApi.JsonScheduleV1, 0) for _, item := range data.Sched[i:end] { if item.TransactionType == "Create" { createItem := item createBatch = append(createBatch, &createItem) } } if len(createBatch) > 0 { err := doCreations(createBatch) if err != nil { log.Error("Error creating CIF Entries", zap.Error(err)) return err } } } log.Debug("CIF Processing complete") return nil } // Create delete query types and pass to the function which batches the deletions func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { defer func() { if r := recover(); r != nil { log.Panic("Panic:", zap.Any("panic", r)) } }() deleteQueries := make([]database.DeleteQuery, 0) for _, item := range deletions { query := database.DeleteQuery{ ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"), StpIndicator: item.CifStpIndicator, TrainUid: item.CifTrainUid, } deleteQueries = append(deleteQueries, query) } err := dbAccess.DeleteCifEntries(deleteQueries) if err != nil { log.Error("Error deleting documents", zap.Error(err)) return err } return nil } // Convert to the correct struct for the database and pass to the function which batches insertions func doCreations(creations []*upstreamApi.JsonScheduleV1) error { createDocuments := make([]database.Service, 0) for _, item := range creations { document, err := ConvertServiceType(item, false) if err != nil { log.Error("Error converting JsonSchedule to Service type", zap.Error(err)) continue } createDocuments = append(createDocuments, *document) } err := dbAccess.CreateCifEntries(createDocuments) if err != nil { log.Error("Error creating documents", zap.Error(err)) return err } return nil }