timetable-mgr/cif/process.go

123 lines
3.4 KiB
Go

package cif
import (
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error {
log.Msg.Debug("Starting CIF Processing")
log.Msg.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
// Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
// Process deletions in batches
for i := 0; i < len(data.sched); i += batchSize {
end := i + batchSize
if end > len(data.sched) {
end = len(data.sched)
}
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
if item.TransactionType == "Delete" {
deleteItem := item
deleteBatch = append(deleteBatch, &deleteItem)
}
}
if len(deleteBatch) > 0 {
err := doDeletions(deleteBatch)
if err != nil {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
return err
}
}
}
// Process creations in batches
for i := 0; i < len(data.sched); i += batchSize {
end := i + batchSize
if end > len(data.sched) {
end = len(data.sched)
}
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
if item.TransactionType == "Create" {
createItem := item
createBatch = append(createBatch, &createItem)
}
}
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil {
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
return err
}
}
}
log.Msg.Debug("CIF Processing complete")
return nil
}
// Create delete query types and pass to the function which batches the deletions
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
deleteQueries := make([]database.DeleteQuery, 0)
for _, item := range deletions {
query := database.DeleteQuery{
ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"),
StpIndicator: item.CifStpIndicator,
TrainUid: item.CifTrainUid,
}
deleteQueries = append(deleteQueries, query)
}
err := dbAccess.DeleteCifEntries(deleteQueries)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
return err
}
return nil
}
// Convert to the correct struct for the database and pass to the function which batches insertions
func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
createDocuments := make([]database.Service, 0)
for _, item := range creations {
document, err := ConvertServiceType(item, false)
if err != nil {
log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err))
}
createDocuments = append(createDocuments, *document)
}
err := dbAccess.CreateCifEntries(createDocuments)
if err != nil {
log.Msg.Error("Error creating documents", zap.Error(err))
return err
}
return nil
}
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
newMetadata := dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastUpdate: time.Now().In(londonTimezone),
LastSequence: header.Metadata.Sequence,
}
return &newMetadata
}