Update VSTP package to use processing from CIF package

This commit is contained in:
Fred Boniface
2024-04-15 20:03:48 +01:00
parent edbfbac23c
commit ba8e4e4c72
13 changed files with 103 additions and 270 deletions

View File

@@ -11,16 +11,16 @@ import (
)
// Accepts the CIF data as a stream and outputs parsed data
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) {
defer dataStream.Close()
log.Debug("Starting CIF Datastream parsing")
if dataStream == nil {
return nil, errors.New("unable to parse nil pointer")
}
var parsed parsedData
parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0)
parsed.sched = make([]upstreamApi.JsonScheduleV1, 0)
var parsed ParsedData
parsed.Assoc = make([]upstreamApi.JsonAssociationV1, 0)
parsed.Sched = make([]upstreamApi.JsonScheduleV1, 0)
// Create JSON Decoder
decoder := json.NewDecoder(dataStream)
@@ -42,7 +42,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
continue
}
parsed.header = timetable
parsed.Header = timetable
case "TiplocV1":
// This data is not used and is sourced from CORPUS
continue
@@ -56,7 +56,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
continue
}
parsed.sched = append(parsed.sched, schedule)
parsed.Sched = append(parsed.Sched, schedule)
case "EOF":
log.Debug("Reached EOF")
default:

View File

@@ -9,21 +9,21 @@ import (
)
// Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error {
func ProcessParsedCif(data *ParsedData) error {
log.Debug("Starting CIF Processing")
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched)))
// Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
// Process deletions in batches
for i := 0; i < len(data.sched); i += batchSize {
for i := 0; i < len(data.Sched); i += batchSize {
end := i + batchSize
if end > len(data.sched) {
end = len(data.sched)
if end > len(data.Sched) {
end = len(data.Sched)
}
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
for _, item := range data.Sched[i:end] {
if item.TransactionType == "Delete" {
deleteItem := item
deleteBatch = append(deleteBatch, &deleteItem)
@@ -39,13 +39,13 @@ func processParsedCif(data *parsedData) error {
}
// Process creations in batches
for i := 0; i < len(data.sched); i += batchSize {
for i := 0; i < len(data.Sched); i += batchSize {
end := i + batchSize
if end > len(data.sched) {
end = len(data.sched)
if end > len(data.Sched) {
end = len(data.Sched)
}
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
for _, item := range data.Sched[i:end] {
if item.TransactionType == "Create" {
createItem := item
createBatch = append(createBatch, &createItem)

View File

@@ -6,8 +6,8 @@ import "git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
// database or external API resources should be defined in git.fjla.uk/owlboard/go-types
// Holds parsed data for processing
type parsedData struct {
header upstreamApi.JsonTimetableV1
assoc []upstreamApi.JsonAssociationV1
sched []upstreamApi.JsonScheduleV1
type ParsedData struct {
Header upstreamApi.JsonTimetableV1
Assoc []upstreamApi.JsonAssociationV1
Sched []upstreamApi.JsonScheduleV1
}

View File

@@ -13,6 +13,7 @@ import (
// Replaces all existing CIF Data with a new download
func runCifFullDownload(cfg *helpers.Configuration) error {
preTime := time.Now()
log.Info("Downloading all CIF Data")
// Download CIF Data file
@@ -36,12 +37,12 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
// Process CIF file
err = processParsedCif(parsed)
err = ProcessParsedCif(parsed)
if err != nil {
log.Error("Error processing CIF data", zap.Error(err))
}
newMeta := generateMetadata(&parsed.header)
newMeta := generateMetadata(&parsed.Header)
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
if !ok {
log.Warn("CIF Data updated, but metadata write failed")
@@ -60,6 +61,9 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
log.Info("Out of date services removed", zap.Int64("removal count", count))
}
postTime := time.Now()
updateDuration := postTime.Sub(preTime)
log.Info("Execution time", zap.Duration("duration", updateDuration))
return nil
}
@@ -88,7 +92,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
// Check CIF Metadata
log.Debug("Starting metadata checks")
reason, update := checkMetadata(metadata, &parsed.header)
reason, update := checkMetadata(metadata, &parsed.Header)
if !update {
log.Warn("Update file not processed", zap.String("reason", reason))
continue
@@ -97,12 +101,12 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
// Process CIF file
err = processParsedCif(parsed)
err = ProcessParsedCif(parsed)
if err != nil {
log.Error("Error processing CIF data", zap.Error(err))
}
metadata = generateMetadata(&parsed.header)
metadata = generateMetadata(&parsed.Header)
parsed = nil
}