2024-03-25 12:21:59 +00:00
|
|
|
package cif
|
2024-03-29 13:45:58 +00:00
|
|
|
|
2024-04-05 21:42:00 +01:00
|
|
|
import (
|
2024-04-06 22:28:26 +01:00
|
|
|
"time"
|
|
|
|
|
2024-04-05 21:42:00 +01:00
|
|
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
|
|
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
2024-04-06 22:28:26 +01:00
|
|
|
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
2024-04-05 21:42:00 +01:00
|
|
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2024-04-04 22:39:09 +01:00
|
|
|
// Processes parsed CIF data and applies the data to the database
|
|
|
|
func processParsedCif(data *parsedData) error {
|
2024-04-09 21:26:56 +01:00
|
|
|
log.Msg.Debug("Starting CIF Processing")
|
2024-04-12 20:43:03 +01:00
|
|
|
log.Msg.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
|
2024-04-09 22:39:35 +01:00
|
|
|
|
|
|
|
// Batch size for processing
|
2024-04-11 20:50:13 +01:00
|
|
|
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
2024-04-09 22:39:35 +01:00
|
|
|
|
|
|
|
// Process deletions in batches
|
|
|
|
for i := 0; i < len(data.sched); i += batchSize {
|
|
|
|
end := i + batchSize
|
|
|
|
if end > len(data.sched) {
|
|
|
|
end = len(data.sched)
|
|
|
|
}
|
|
|
|
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
|
|
|
for _, item := range data.sched[i:end] {
|
|
|
|
if item.TransactionType == "Delete" {
|
|
|
|
deleteItem := item
|
|
|
|
deleteBatch = append(deleteBatch, &deleteItem)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(deleteBatch) > 0 {
|
|
|
|
err := doDeletions(deleteBatch)
|
|
|
|
if err != nil {
|
|
|
|
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-04-05 21:42:00 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-09 22:39:35 +01:00
|
|
|
// Process creations in batches
|
|
|
|
for i := 0; i < len(data.sched); i += batchSize {
|
|
|
|
end := i + batchSize
|
|
|
|
if end > len(data.sched) {
|
|
|
|
end = len(data.sched)
|
|
|
|
}
|
|
|
|
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
|
|
|
for _, item := range data.sched[i:end] {
|
|
|
|
if item.TransactionType == "Create" {
|
|
|
|
createItem := item
|
|
|
|
createBatch = append(createBatch, &createItem)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(createBatch) > 0 {
|
|
|
|
err := doCreations(createBatch)
|
|
|
|
if err != nil {
|
|
|
|
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2024-04-05 21:42:00 +01:00
|
|
|
}
|
|
|
|
|
2024-04-09 22:39:35 +01:00
|
|
|
log.Msg.Debug("CIF Processing complete")
|
2024-04-05 21:42:00 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create delete query types and pass to the function which batches the deletions
|
|
|
|
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
2024-04-13 21:45:24 +01:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
log.Msg.Panic("Panic:", zap.Any("panic", r))
|
|
|
|
}
|
|
|
|
}()
|
2024-04-05 21:42:00 +01:00
|
|
|
deleteQueries := make([]database.DeleteQuery, 0)
|
|
|
|
for _, item := range deletions {
|
|
|
|
query := database.DeleteQuery{
|
2024-04-07 20:59:41 +01:00
|
|
|
ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"),
|
2024-04-05 21:42:00 +01:00
|
|
|
StpIndicator: item.CifStpIndicator,
|
|
|
|
TrainUid: item.CifTrainUid,
|
|
|
|
}
|
|
|
|
|
|
|
|
deleteQueries = append(deleteQueries, query)
|
|
|
|
}
|
2024-04-06 22:28:26 +01:00
|
|
|
|
|
|
|
err := dbAccess.DeleteCifEntries(deleteQueries)
|
|
|
|
if err != nil {
|
|
|
|
log.Msg.Error("Error deleting documents", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-04-05 21:42:00 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert to the correct struct for the database and pass to the function which batches insertions
|
|
|
|
func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
|
|
|
|
createDocuments := make([]database.Service, 0)
|
|
|
|
for _, item := range creations {
|
2024-04-07 20:59:41 +01:00
|
|
|
document, err := ConvertServiceType(item, false)
|
|
|
|
if err != nil {
|
|
|
|
log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
createDocuments = append(createDocuments, *document)
|
2024-04-05 21:42:00 +01:00
|
|
|
}
|
2024-04-06 22:28:26 +01:00
|
|
|
|
|
|
|
err := dbAccess.CreateCifEntries(createDocuments)
|
|
|
|
if err != nil {
|
|
|
|
log.Msg.Error("Error creating documents", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-04-04 22:39:09 +01:00
|
|
|
return nil
|
2024-04-02 21:07:01 +01:00
|
|
|
}
|
2024-04-06 22:28:26 +01:00
|
|
|
|
|
|
|
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
|
|
|
|
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
|
|
|
|
newMetadata := dbAccess.CifMetadata{
|
|
|
|
Doctype: dbAccess.Doctype,
|
|
|
|
LastTimestamp: header.Timestamp,
|
|
|
|
LastUpdate: time.Now().In(londonTimezone),
|
|
|
|
LastSequence: header.Metadata.Sequence,
|
|
|
|
}
|
|
|
|
|
|
|
|
return &newMetadata
|
|
|
|
}
|