From 94e4cd964d21a4eda57dc0b651a361e44b67c5f9 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Tue, 9 Apr 2024 21:26:56 +0100 Subject: [PATCH] Improve CIF Parsing to acheive 60% memory reduction --- cif/parse.go | 35 +++++++++++++++-------------------- cif/process.go | 1 + cif/update.go | 2 ++ 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/cif/parse.go b/cif/parse.go index 9b5579c..d339522 100644 --- a/cif/parse.go +++ b/cif/parse.go @@ -11,60 +11,55 @@ import ( ) // Unmarshalls data into the correct types for processing -// This function suffers from extremely high memory usage func parseCifData(data *[]byte) (*parsedData, error) { + log.Msg.Debug("Starting CIF Data parsing") if data == nil { - err := errors.New("unable to parse nil pointer") - return nil, err + return nil, errors.New("unable to parse nil pointer") } - // Split the data into lines - lines := bytes.Split(*data, []byte("\n")) - // Initialise variable for the parsed data + // Initialise data structures var parsed parsedData parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) - for _, line := range lines { + // Create JSON Decoder + decoder := json.NewDecoder(bytes.NewReader(*data)) - // Skip empty lines to avoid logging errors when there is no error - if len(bytes.TrimSpace(line)) == 0 { - continue - } - - // Map each line for processing + // Iterate over JSON Objects using stream decoder + for decoder.More() { var obj map[string]json.RawMessage - if err := json.Unmarshal(line, &obj); err != nil { - log.Msg.Error("Error decoding line", zap.String("line", string(line)), zap.Error(err)) - continue + if err := decoder.Decode(&obj); err != nil { + log.Msg.Error("Error decoding JSON String") + return nil, err } - // Loop through the mapped data and unmarshal to the correct type + // Handle parsed data for key, value := range obj { switch key { case "JsonTimetableV1": var timetable upstreamApi.JsonTimetableV1 if err := json.Unmarshal(value, &timetable); err != nil { - log.Msg.Error("Unable to parse JSON Timetable", zap.Error(err), zap.String("line", string(value))) + log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err)) continue } parsed.header = timetable case "JsonAssociationV1": var association upstreamApi.JsonAssociationV1 if err := json.Unmarshal(value, &association); err != nil { - log.Msg.Error("Error decoding JSON Association", zap.Error(err)) + log.Msg.Error("Error decoding JSONAssociationV1 object", zap.Error(err)) continue } parsed.assoc = append(parsed.assoc, association) case "JsonScheduleV1": var schedule upstreamApi.JsonScheduleV1 if err := json.Unmarshal(value, &schedule); err != nil { - log.Msg.Error("Error decoding JSON Schedule", zap.Error(err)) + log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err)) continue } parsed.sched = append(parsed.sched, schedule) } } } + log.Msg.Debug("CIF Parsing completed") return &parsed, nil } diff --git a/cif/process.go b/cif/process.go index d644fad..3d73208 100644 --- a/cif/process.go +++ b/cif/process.go @@ -12,6 +12,7 @@ import ( // Processes parsed CIF data and applies the data to the database func processParsedCif(data *parsedData) error { + log.Msg.Debug("Starting CIF Processing") createTasks := make([]*upstreamApi.JsonScheduleV1, 0) deleteTasks := make([]*upstreamApi.JsonScheduleV1, 0) diff --git a/cif/update.go b/cif/update.go index bbdac84..9ffb947 100644 --- a/cif/update.go +++ b/cif/update.go @@ -98,6 +98,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta debugWriteFile(&parsed.header, &parsed.sched) } + log.Msg.Debug("Starting metadata checks") // Check CIF Sequence // Skip if LastSequence is >= to this sequence if metadata.LastSequence >= parsed.header.Metadata.Sequence { @@ -111,6 +112,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta log.Msg.Error("CIF sequence not as expected", zap.Error(err), zap.Int64("LastSequence", metadata.LastSequence), zap.Int64("New Sequence", parsed.header.Metadata.Sequence)) return err } + log.Msg.Debug("Metadata checks complete") // Do further sequence checks - parsed.header.Metadata.Sequence MUST = metadata.LastSequence + 1