Compare commits
2 Commits
fb510e1408
...
4459d4d316
Author | SHA1 | Date |
---|---|---|
Fred Boniface | 4459d4d316 | |
Fred Boniface | b2f82b0250 |
|
@ -12,7 +12,8 @@ import (
|
||||||
|
|
||||||
// Accepts the CIF data as a stream and outputs parsed data
|
// Accepts the CIF data as a stream and outputs parsed data
|
||||||
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
log.Msg.Debug("STREAM-Starting CIF Datastream parsing")
|
defer dataStream.Close()
|
||||||
|
log.Msg.Debug("Starting CIF Datastream parsing")
|
||||||
if dataStream == nil {
|
if dataStream == nil {
|
||||||
return nil, errors.New("unable to parse nil pointer")
|
return nil, errors.New("unable to parse nil pointer")
|
||||||
}
|
}
|
||||||
|
@ -56,6 +57,8 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parsed.sched = append(parsed.sched, schedule)
|
parsed.sched = append(parsed.sched, schedule)
|
||||||
|
case "EOF":
|
||||||
|
log.Msg.Info("Reached EOF")
|
||||||
default:
|
default:
|
||||||
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
|
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ func processParsedCif(data *parsedData) error {
|
||||||
log.Msg.Debug("Starting CIF Processing")
|
log.Msg.Debug("Starting CIF Processing")
|
||||||
|
|
||||||
// Batch size for processing
|
// Batch size for processing
|
||||||
batchSize := 750 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
||||||
|
|
||||||
// Process deletions in batches
|
// Process deletions in batches
|
||||||
for i := 0; i < len(data.sched); i += batchSize {
|
for i := 0; i < len(data.sched); i += batchSize {
|
||||||
|
|
|
@ -26,12 +26,6 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
log.Msg.Error("Error downloading CIF data", zap.Error(err))
|
log.Msg.Error("Error downloading CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DOES NOT WORK WITH NEW DOWNLOAD STREAMING
|
|
||||||
// If debug mode is on, call debugWriteDownload
|
|
||||||
// if helpers.Runtime == "debug" {
|
|
||||||
// debugWriteDownload(dataStream)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Parse CIF file
|
// Parse CIF file
|
||||||
parsed, err := parseCifDataStream(dataStream)
|
parsed, err := parseCifDataStream(dataStream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -39,11 +33,6 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look to stream data onwards to the parsing function
|
|
||||||
|
|
||||||
// Make `data` a nil pointer as it is no longer required
|
|
||||||
dataStream = nil
|
|
||||||
|
|
||||||
// Drop timetable collection
|
// Drop timetable collection
|
||||||
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
|
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package corpus
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"io"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
|
@ -10,50 +11,55 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Accepts CORPUS data as a byte array and formats it ready for database insertion
|
// Accepts CORPUS data as a byte array and formats it ready for database insertion
|
||||||
func parseCorpusData(jsonData *[]byte) (*[]database.CorpusEntry, error) {
|
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
||||||
|
defer stream.Close()
|
||||||
|
|
||||||
log.Msg.Debug("Starting CORPUS Data parsing")
|
log.Msg.Debug("Starting CORPUS Data parsing")
|
||||||
|
|
||||||
// Initialise data structure
|
|
||||||
var dataMap map[string]interface{}
|
|
||||||
|
|
||||||
// Create JSON
|
|
||||||
err := json.Unmarshal(*jsonData, &dataMap)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Unable to unmarshal CORPUS data", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
corpusDataArrayInterface, ok := dataMap["TIPLOCDATA"]
|
|
||||||
if !ok {
|
|
||||||
err := errors.New("corpus Data not in expected format")
|
|
||||||
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
corpusDataArray, ok := corpusDataArrayInterface.([]interface{})
|
|
||||||
if !ok {
|
|
||||||
err := errors.New("corpus data missing the data array")
|
|
||||||
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var corpusEntries []database.CorpusEntry
|
var corpusEntries []database.CorpusEntry
|
||||||
for _, item := range corpusDataArray {
|
decoder := json.NewDecoder(stream)
|
||||||
jsonItem, err := json.Marshal(item)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var corpusEntry database.CorpusEntry
|
// Expect an object at the root of the JSON stream
|
||||||
err = json.Unmarshal(jsonItem, &corpusEntry)
|
if _, err := decoder.Token(); err != nil {
|
||||||
if err != nil {
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
return nil, err
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
corpusEntries = append(corpusEntries, corpusEntry)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Search for the "TIPLOCDATA" key
|
||||||
|
for decoder.More() {
|
||||||
|
// Decode the next JSON token
|
||||||
|
if tok, err := decoder.Token(); err != nil {
|
||||||
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
} else if tok == "TIPLOCDATA" {
|
||||||
|
// Found the "TIPLOCDATA" key, expect the associated array
|
||||||
|
if !decoder.More() {
|
||||||
|
err := errors.New("missing array after TIPLOCDATA key")
|
||||||
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start reading the array associated with the "TIPLOCDATA" key
|
||||||
|
if _, err := decoder.Token(); err != nil {
|
||||||
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over the JSON array
|
||||||
|
for decoder.More() {
|
||||||
|
var corpusEntry database.CorpusEntry
|
||||||
|
if err := decoder.Decode(&corpusEntry); err != nil {
|
||||||
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
corpusEntries = append(corpusEntries, corpusEntry)
|
||||||
|
}
|
||||||
|
break // Exit loop after processing "TIPLOCDATA" array
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Msg.Debug("CORPUS parsing complete")
|
||||||
|
|
||||||
return &corpusEntries, nil
|
return &corpusEntries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
|
|
||||||
// Runs all stages of the CORPUS Update process
|
// Runs all stages of the CORPUS Update process
|
||||||
func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
||||||
resp, err := nrod.NrodDownload(url, cfg)
|
resp, err := nrod.NrodStream(url, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
|
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,74 +0,0 @@
|
||||||
package nrod
|
|
||||||
|
|
||||||
import (
|
|
||||||
"compress/gzip"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Downloads NROD Data over HTTP from the given URL, extracted data is returned
|
|
||||||
func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) {
|
|
||||||
log.Msg.Debug("Fetching NROD data", zap.String("Request URL", url))
|
|
||||||
client := http.Client{
|
|
||||||
Timeout: time.Second * 10,
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Error creating HTTP Request", zap.String("Request URL", url), zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass))
|
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Error carrying out HTTP Request", zap.String("Request URL", url), zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
||||||
log.Msg.Error("Non-Successful status code from http response", zap.String("Request URL", url), zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yes, I know `readedData` is not proper English. But readData reads more like a verb action.
|
|
||||||
readedData, err := nrodExtract(resp)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Unable to read response data")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return readedData, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array
|
|
||||||
func nrodExtract(resp *http.Response) (*[]byte, error) {
|
|
||||||
log.Msg.Debug("Extracting HTTP Response Data")
|
|
||||||
gzReader, err := gzip.NewReader(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Warn("Unable to create GZIP Reader, data probably not GZIPPED")
|
|
||||||
data, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Unable to read response body")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
defer gzReader.Close()
|
|
||||||
|
|
||||||
log.Msg.Debug("GZIP Reader Opened")
|
|
||||||
extractedData, err := io.ReadAll(gzReader)
|
|
||||||
if err != nil {
|
|
||||||
log.Msg.Error("Failed to read GZIPped data", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return &extractedData, nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue