From f90321927602cb1ba49dd861cf91c5a1a32191a5 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Wed, 10 Apr 2024 20:19:16 +0100 Subject: [PATCH] Streaming data into the CIF Parse function reduces memory down to 4.1GB - also looking to stream the parsed data directly into the database if feasable. --- cif/parse.go | 54 +++++++++++++++++++++++++++++++++++++++++++ cif/update.go | 12 +++++----- nrod/download.go | 4 ++-- nrod/streams.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 nrod/streams.go diff --git a/cif/parse.go b/cif/parse.go index d339522..faf961c 100644 --- a/cif/parse.go +++ b/cif/parse.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "io" "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" "git.fjla.uk/owlboard/timetable-mgr/log" @@ -63,3 +64,56 @@ func parseCifData(data *[]byte) (*parsedData, error) { log.Msg.Debug("CIF Parsing completed") return &parsed, nil } + +func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { + log.Msg.Debug("STREAM-Starting CIF Datastream parsing") + if dataStream == nil { + return nil, errors.New("unable to parse nil pointer") + } + + // Initialise data structures + var parsed parsedData + parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) + parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) + + // Create JSON Decoder + decoder := json.NewDecoder(dataStream) + + // Iterate over JSON Objects using stream decoder + for decoder.More() { + var obj map[string]json.RawMessage + if err := decoder.Decode(&obj); err != nil { + log.Msg.Error("Error decoding JSON String") + return nil, err + } + + // Handle parsed data + for key, value := range obj { + switch key { + case "JsonTimetableV1": + var timetable upstreamApi.JsonTimetableV1 + if err := json.Unmarshal(value, &timetable); err != nil { + log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err)) + continue + } + parsed.header = timetable + case "JsonAssociationV1": + var association upstreamApi.JsonAssociationV1 + if err := json.Unmarshal(value, &association); err != nil { + log.Msg.Error("Error decoding JSONAssociationV1 object", zap.Error(err)) + continue + } + parsed.assoc = append(parsed.assoc, association) + case "JsonScheduleV1": + var schedule upstreamApi.JsonScheduleV1 + if err := json.Unmarshal(value, &schedule); err != nil { + log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err)) + continue + } + parsed.sched = append(parsed.sched, schedule) + } + } + } + log.Msg.Debug("CIF Parsing completed") + return &parsed, nil +} diff --git a/cif/update.go b/cif/update.go index 6ed091d..8f3995c 100644 --- a/cif/update.go +++ b/cif/update.go @@ -20,25 +20,25 @@ func runCifFullDownload(cfg *helpers.Configuration) error { if err != nil { log.Msg.Error("Error getting download URL", zap.Error(err)) } - data, err := nrod.NrodDownload(url, cfg) + dataStream, err := nrod.NrodStream(url, cfg) if err != nil { log.Msg.Error("Error downloading CIF data", zap.Error(err)) } // If debug mode is on, call debugWriteDownload - if helpers.Runtime == "debug" { - debugWriteDownload(data) - } + // if helpers.Runtime == "debug" { + // debugWriteDownload(dataStream) + // } // Parse CIF file - parsed, err := parseCifData(data) + parsed, err := parseCifDataStream(dataStream) if err != nil { log.Msg.Error("Error parsing CIF data", zap.Error(err)) return err } // Make `data` a nil pointer as it is no longer required - data = nil + dataStream = nil // Drop timetable collection dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. diff --git a/nrod/download.go b/nrod/download.go index 240af66..4a8d697 100644 --- a/nrod/download.go +++ b/nrod/download.go @@ -39,7 +39,7 @@ func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) { } // Yes, I know `readedData` is not proper English. But readData reads more like a verb action. - readedData, err := nrodExtract(*resp) + readedData, err := nrodExtract(resp) if err != nil { log.Msg.Error("Unable to read response data") return nil, err @@ -49,7 +49,7 @@ func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) { } // Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array -func nrodExtract(resp http.Response) (*[]byte, error) { +func nrodExtract(resp *http.Response) (*[]byte, error) { log.Msg.Debug("Extracting HTTP Response Data") gzReader, err := gzip.NewReader(resp.Body) if err != nil { diff --git a/nrod/streams.go b/nrod/streams.go new file mode 100644 index 0000000..a4d6b61 --- /dev/null +++ b/nrod/streams.go @@ -0,0 +1,60 @@ +package nrod + +import ( + "compress/gzip" + "fmt" + "io" + "net/http" + "time" + + "git.fjla.uk/owlboard/timetable-mgr/helpers" + "git.fjla.uk/owlboard/timetable-mgr/log" + "go.uber.org/zap" +) + +// Downloads NROD Data and extracts if GZIP, returns a io.Reader +func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) { + log.Msg.Debug("Fetching NROD data stream", zap.String("Request URL", url)) + + client := http.Client{ + Timeout: time.Second * 300, + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + log.Msg.Error("Error creating HTTP Request", zap.Error(err)) + return nil, err + } + + req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass)) + + resp, err := client.Do(req) + if err != nil { + log.Msg.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode)) + return nil, err + } + + if resp.StatusCode != http.StatusOK { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + log.Msg.Error("Non-successful status code", zap.Error(err)) + return nil, err + } + + // Run the data through the extractor function and return io.ReadCloser, error from + // directly + return NrodStreamExtract(resp) +} + +func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) { + log.Msg.Debug("Extracting NROD Download") + + log.Msg.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding"))) + + gzReader, err := gzip.NewReader(resp.Body) + if err != nil { + log.Msg.Warn("Unable to create GZIP Reader, data probably not gzipped") + return resp.Body, err + } + + return gzReader, nil +}