115 lines
3.0 KiB
Go
115 lines
3.0 KiB
Go
package cif
|
|
|
|
import (
|
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
|
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Processes parsed CIF data and applies the data to the database
|
|
func ProcessParsedCif(data *ParsedData) error {
|
|
log.Debug("Starting CIF Processing")
|
|
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched)))
|
|
|
|
// Batch size for processing
|
|
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
|
|
|
// Process deletions in batches
|
|
for i := 0; i < len(data.Sched); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(data.Sched) {
|
|
end = len(data.Sched)
|
|
}
|
|
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
|
for _, item := range data.Sched[i:end] {
|
|
if item.TransactionType == "Delete" {
|
|
deleteItem := item
|
|
deleteBatch = append(deleteBatch, &deleteItem)
|
|
}
|
|
}
|
|
if len(deleteBatch) > 0 {
|
|
err := doDeletions(deleteBatch)
|
|
if err != nil {
|
|
log.Error("Error deleting CIF Entries", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process creations in batches
|
|
for i := 0; i < len(data.Sched); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(data.Sched) {
|
|
end = len(data.Sched)
|
|
}
|
|
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
|
for _, item := range data.Sched[i:end] {
|
|
if item.TransactionType == "Create" {
|
|
createItem := item
|
|
createBatch = append(createBatch, &createItem)
|
|
}
|
|
}
|
|
if len(createBatch) > 0 {
|
|
err := doCreations(createBatch)
|
|
if err != nil {
|
|
log.Error("Error creating CIF Entries", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Debug("CIF Processing complete")
|
|
return nil
|
|
}
|
|
|
|
// Create delete query types and pass to the function which batches the deletions
|
|
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Panic("Panic:", zap.Any("panic", r))
|
|
}
|
|
}()
|
|
deleteQueries := make([]database.DeleteQuery, 0)
|
|
for _, item := range deletions {
|
|
query := database.DeleteQuery{
|
|
ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"),
|
|
StpIndicator: item.CifStpIndicator,
|
|
TrainUid: item.CifTrainUid,
|
|
}
|
|
|
|
deleteQueries = append(deleteQueries, query)
|
|
}
|
|
|
|
err := dbAccess.DeleteCifEntries(deleteQueries)
|
|
if err != nil {
|
|
log.Error("Error deleting documents", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Convert to the correct struct for the database and pass to the function which batches insertions
|
|
func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
|
|
createDocuments := make([]database.Service, 0)
|
|
for _, item := range creations {
|
|
document, err := ConvertServiceType(item, false)
|
|
if err != nil {
|
|
log.Error("Error converting JsonSchedule to Service type", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
createDocuments = append(createDocuments, *document)
|
|
}
|
|
|
|
err := dbAccess.CreateCifEntries(createDocuments)
|
|
if err != nil {
|
|
log.Error("Error creating documents", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|