Compare commits

..

No commits in common. "4459d4d31665419863ef14402cee149c34077306" and "fb510e1408b22fadf38e944f9b6b67a709621fd7" have entirely different histories.

6 changed files with 122 additions and 46 deletions

View File

@ -12,8 +12,7 @@ import (
// Accepts the CIF data as a stream and outputs parsed data // Accepts the CIF data as a stream and outputs parsed data
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
defer dataStream.Close() log.Msg.Debug("STREAM-Starting CIF Datastream parsing")
log.Msg.Debug("Starting CIF Datastream parsing")
if dataStream == nil { if dataStream == nil {
return nil, errors.New("unable to parse nil pointer") return nil, errors.New("unable to parse nil pointer")
} }
@ -57,8 +56,6 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
continue continue
} }
parsed.sched = append(parsed.sched, schedule) parsed.sched = append(parsed.sched, schedule)
case "EOF":
log.Msg.Info("Reached EOF")
default: default:
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key)) log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
} }

View File

@ -15,7 +15,7 @@ func processParsedCif(data *parsedData) error {
log.Msg.Debug("Starting CIF Processing") log.Msg.Debug("Starting CIF Processing")
// Batch size for processing // Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB batchSize := 750 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
// Process deletions in batches // Process deletions in batches
for i := 0; i < len(data.sched); i += batchSize { for i := 0; i < len(data.sched); i += batchSize {

View File

@ -26,6 +26,12 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
log.Msg.Error("Error downloading CIF data", zap.Error(err)) log.Msg.Error("Error downloading CIF data", zap.Error(err))
} }
// DOES NOT WORK WITH NEW DOWNLOAD STREAMING
// If debug mode is on, call debugWriteDownload
// if helpers.Runtime == "debug" {
// debugWriteDownload(dataStream)
// }
// Parse CIF file // Parse CIF file
parsed, err := parseCifDataStream(dataStream) parsed, err := parseCifDataStream(dataStream)
if err != nil { if err != nil {
@ -33,6 +39,11 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
return err return err
} }
// Look to stream data onwards to the parsing function
// Make `data` a nil pointer as it is no longer required
dataStream = nil
// Drop timetable collection // Drop timetable collection
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.

View File

@ -3,7 +3,6 @@ package corpus
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"io"
"git.fjla.uk/owlboard/go-types/pkg/database" "git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
@ -11,54 +10,49 @@ import (
) )
// Accepts CORPUS data as a byte array and formats it ready for database insertion // Accepts CORPUS data as a byte array and formats it ready for database insertion
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) { func parseCorpusData(jsonData *[]byte) (*[]database.CorpusEntry, error) {
defer stream.Close()
log.Msg.Debug("Starting CORPUS Data parsing") log.Msg.Debug("Starting CORPUS Data parsing")
var corpusEntries []database.CorpusEntry // Initialise data structure
decoder := json.NewDecoder(stream) var dataMap map[string]interface{}
// Expect an object at the root of the JSON stream // Create JSON
if _, err := decoder.Token(); err != nil { err := json.Unmarshal(*jsonData, &dataMap)
if err != nil {
log.Msg.Error("Unable to unmarshal CORPUS data", zap.Error(err))
}
corpusDataArrayInterface, ok := dataMap["TIPLOCDATA"]
if !ok {
err := errors.New("corpus Data not in expected format")
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err)) log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err return nil, err
} }
// Search for the "TIPLOCDATA" key corpusDataArray, ok := corpusDataArrayInterface.([]interface{})
for decoder.More() { if !ok {
// Decode the next JSON token err := errors.New("corpus data missing the data array")
if tok, err := decoder.Token(); err != nil { log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err)) return nil, err
return nil, err
} else if tok == "TIPLOCDATA" {
// Found the "TIPLOCDATA" key, expect the associated array
if !decoder.More() {
err := errors.New("missing array after TIPLOCDATA key")
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Start reading the array associated with the "TIPLOCDATA" key
if _, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Iterate over the JSON array
for decoder.More() {
var corpusEntry database.CorpusEntry
if err := decoder.Decode(&corpusEntry); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
corpusEntries = append(corpusEntries, corpusEntry)
}
break // Exit loop after processing "TIPLOCDATA" array
}
} }
log.Msg.Debug("CORPUS parsing complete") var corpusEntries []database.CorpusEntry
for _, item := range corpusDataArray {
jsonItem, err := json.Marshal(item)
if err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
var corpusEntry database.CorpusEntry
err = json.Unmarshal(jsonItem, &corpusEntry)
if err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
corpusEntries = append(corpusEntries, corpusEntry)
}
return &corpusEntries, nil return &corpusEntries, nil
} }

View File

@ -10,7 +10,7 @@ import (
// Runs all stages of the CORPUS Update process // Runs all stages of the CORPUS Update process
func RunCorpusUpdate(cfg *helpers.Configuration) error { func RunCorpusUpdate(cfg *helpers.Configuration) error {
resp, err := nrod.NrodStream(url, cfg) resp, err := nrod.NrodDownload(url, cfg)
if err != nil { if err != nil {
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err)) log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
return err return err

74
nrod/download.go Normal file
View File

@ -0,0 +1,74 @@
package nrod
import (
"compress/gzip"
"fmt"
"io"
"net/http"
"time"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Downloads NROD Data over HTTP from the given URL, extracted data is returned
func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) {
log.Msg.Debug("Fetching NROD data", zap.String("Request URL", url))
client := http.Client{
Timeout: time.Second * 10,
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Msg.Error("Error creating HTTP Request", zap.String("Request URL", url), zap.Error(err))
return nil, err
}
req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass))
resp, err := client.Do(req)
if err != nil {
log.Msg.Error("Error carrying out HTTP Request", zap.String("Request URL", url), zap.Error(err))
return nil, err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
log.Msg.Error("Non-Successful status code from http response", zap.String("Request URL", url), zap.Error(err))
return nil, err
}
// Yes, I know `readedData` is not proper English. But readData reads more like a verb action.
readedData, err := nrodExtract(resp)
if err != nil {
log.Msg.Error("Unable to read response data")
return nil, err
}
return readedData, nil
}
// Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array
func nrodExtract(resp *http.Response) (*[]byte, error) {
log.Msg.Debug("Extracting HTTP Response Data")
gzReader, err := gzip.NewReader(resp.Body)
if err != nil {
log.Msg.Warn("Unable to create GZIP Reader, data probably not GZIPPED")
data, err := io.ReadAll(resp.Body)
if err != nil {
log.Msg.Error("Unable to read response body")
return nil, err
}
return &data, nil
}
defer gzReader.Close()
log.Msg.Debug("GZIP Reader Opened")
extractedData, err := io.ReadAll(gzReader)
if err != nil {
log.Msg.Error("Failed to read GZIPped data", zap.Error(err))
}
return &extractedData, nil
}