package cif import ( "time" "git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" "git.fjla.uk/owlboard/timetable-mgr/dbAccess" "git.fjla.uk/owlboard/timetable-mgr/log" "go.uber.org/zap" ) // Processes parsed CIF data and applies the data to the database func processParsedCif(data *parsedData) error { log.Msg.Debug("Starting CIF Processing") // Batch size for processing batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB // Process deletions in batches for i := 0; i < len(data.sched); i += batchSize { end := i + batchSize if end > len(data.sched) { end = len(data.sched) } deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0) for _, item := range data.sched[i:end] { if item.TransactionType == "Delete" { deleteItem := item deleteBatch = append(deleteBatch, &deleteItem) } } if len(deleteBatch) > 0 { err := doDeletions(deleteBatch) if err != nil { log.Msg.Error("Error deleting CIF Entries", zap.Error(err)) return err } } } // Process creations in batches for i := 0; i < len(data.sched); i += batchSize { end := i + batchSize if end > len(data.sched) { end = len(data.sched) } createBatch := make([]*upstreamApi.JsonScheduleV1, 0) for _, item := range data.sched[i:end] { if item.TransactionType == "Create" { createItem := item createBatch = append(createBatch, &createItem) } } if len(createBatch) > 0 { err := doCreations(createBatch) if err != nil { log.Msg.Error("Error creating CIF Entries", zap.Error(err)) return err } } } log.Msg.Debug("CIF Processing complete") return nil } // Create delete query types and pass to the function which batches the deletions func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { deleteQueries := make([]database.DeleteQuery, 0) for _, item := range deletions { query := database.DeleteQuery{ ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"), StpIndicator: item.CifStpIndicator, TrainUid: item.CifTrainUid, } deleteQueries = append(deleteQueries, query) } err := dbAccess.DeleteCifEntries(deleteQueries) if err != nil { log.Msg.Error("Error deleting documents", zap.Error(err)) return err } return nil } // Convert to the correct struct for the database and pass to the function which batches insertions func doCreations(creations []*upstreamApi.JsonScheduleV1) error { createDocuments := make([]database.Service, 0) for _, item := range creations { document, err := ConvertServiceType(item, false) if err != nil { log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err)) } createDocuments = append(createDocuments, *document) } err := dbAccess.CreateCifEntries(createDocuments) if err != nil { log.Msg.Error("Error creating documents", zap.Error(err)) return err } return nil } // Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct. func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata { newMetadata := dbAccess.CifMetadata{ Doctype: dbAccess.Doctype, LastTimestamp: header.Timestamp, LastUpdate: time.Now().In(londonTimezone), LastSequence: header.Metadata.Sequence, } return &newMetadata }