Compare commits

...

4 Commits

7 changed files with 98 additions and 101 deletions

View File

@ -1,12 +1,10 @@
package cif package cif
import ( import (
"encoding/json"
"errors" "errors"
"os" "os"
"time" "time"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/helpers" "git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap" "go.uber.org/zap"
@ -108,34 +106,6 @@ func parseDaysRun(daysBinary *string) []string {
return result return result
} }
func debugWriteFile(header *upstreamApi.JsonTimetableV1, schedule *[]upstreamApi.JsonScheduleV1) {
if helpers.Runtime == "debug" {
log.Msg.Debug("Writing CIF Header and Schedule elements to file")
filepath := "./cif_debug_data/"
filename := time.Now().In(londonTimezone).Format("2006-01-02_15:04:05_ParsedCIF")
data, err := json.MarshalIndent(map[string]interface{}{
"header": header,
"schedule": schedule,
}, "", " ")
if err != nil {
log.Msg.Error("Error marshalling data", zap.Error(err))
return
}
err = os.MkdirAll(filepath, 0777)
if err != nil {
log.Msg.Error("Error creating directory", zap.Error(err))
return
}
writeErr := os.WriteFile(filepath+filename+".json", data, 0777)
if writeErr != nil {
log.Msg.Error("Error writing debug file to disk", zap.Error(writeErr))
}
}
}
func debugWriteDownload(input *[]byte) { func debugWriteDownload(input *[]byte) {
if helpers.Runtime == "debug" { if helpers.Runtime == "debug" {
log.Msg.Debug("Writing CIF Download to file") log.Msg.Debug("Writing CIF Download to file")
@ -148,9 +118,25 @@ func debugWriteDownload(input *[]byte) {
return return
} }
err = os.WriteFile(filepath+filename+".jsonl", *input, 0777) file, err := os.Create(filepath + filename + ".jsonl")
if err != nil { if err != nil {
log.Msg.Error("Error writing debug file to disk", zap.Error(err)) log.Msg.Error("Error creating file", zap.Error(err))
return
}
defer file.Close()
// Write data to file in smaller chunks
bufferSize := 4096 // Adjust the buffer size as needed
for i := 0; i < len(*input); i += bufferSize {
end := i + bufferSize
if end > len(*input) {
end = len(*input)
}
_, err := file.Write((*input)[i:end])
if err != nil {
log.Msg.Error("Error writing data to file", zap.Error(err))
return
}
} }
} }
} }

View File

@ -3,6 +3,7 @@ package cif
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi" "git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
@ -10,55 +11,55 @@ import (
) )
// Unmarshalls data into the correct types for processing // Unmarshalls data into the correct types for processing
func parseCifData(data []byte) (*parsedData, error) { func parseCifData(data *[]byte) (*parsedData, error) {
// Split the data into lines log.Msg.Debug("Starting CIF Data parsing")
lines := bytes.Split(data, []byte("\n")) if data == nil {
return nil, errors.New("unable to parse nil pointer")
}
// Initialise variable for the parsed data // Initialise data structures
var parsed parsedData var parsed parsedData
parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0)
parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) parsed.sched = make([]upstreamApi.JsonScheduleV1, 0)
for _, line := range lines { // Create JSON Decoder
decoder := json.NewDecoder(bytes.NewReader(*data))
// Skip empty lines to avoid logging errors when there is no error // Iterate over JSON Objects using stream decoder
if len(bytes.TrimSpace(line)) == 0 { for decoder.More() {
continue
}
// Map each line for processing
var obj map[string]json.RawMessage var obj map[string]json.RawMessage
if err := json.Unmarshal(line, &obj); err != nil { if err := decoder.Decode(&obj); err != nil {
log.Msg.Error("Error decoding line", zap.String("line", string(line)), zap.Error(err)) log.Msg.Error("Error decoding JSON String")
continue return nil, err
} }
// Loop through the mapped data and unmarshal to the correct type // Handle parsed data
for key, value := range obj { for key, value := range obj {
switch key { switch key {
case "JsonTimetableV1": case "JsonTimetableV1":
var timetable upstreamApi.JsonTimetableV1 var timetable upstreamApi.JsonTimetableV1
if err := json.Unmarshal(value, &timetable); err != nil { if err := json.Unmarshal(value, &timetable); err != nil {
log.Msg.Error("Unable to parse JSON Timetable", zap.Error(err), zap.String("line", string(value))) log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
continue continue
} }
parsed.header = timetable parsed.header = timetable
case "JsonAssociationV1": case "JsonAssociationV1":
var association upstreamApi.JsonAssociationV1 var association upstreamApi.JsonAssociationV1
if err := json.Unmarshal(value, &association); err != nil { if err := json.Unmarshal(value, &association); err != nil {
log.Msg.Error("Error decoding JSON Association", zap.Error(err)) log.Msg.Error("Error decoding JSONAssociationV1 object", zap.Error(err))
continue continue
} }
parsed.assoc = append(parsed.assoc, association) parsed.assoc = append(parsed.assoc, association)
case "JsonScheduleV1": case "JsonScheduleV1":
var schedule upstreamApi.JsonScheduleV1 var schedule upstreamApi.JsonScheduleV1
if err := json.Unmarshal(value, &schedule); err != nil { if err := json.Unmarshal(value, &schedule); err != nil {
log.Msg.Error("Error decoding JSON Schedule", zap.Error(err)) log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
continue continue
} }
parsed.sched = append(parsed.sched, schedule) parsed.sched = append(parsed.sched, schedule)
} }
} }
} }
log.Msg.Debug("CIF Parsing completed")
return &parsed, nil return &parsed, nil
} }

View File

@ -12,40 +12,61 @@ import (
// Processes parsed CIF data and applies the data to the database // Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error { func processParsedCif(data *parsedData) error {
createTasks := make([]*upstreamApi.JsonScheduleV1, 0) log.Msg.Debug("Starting CIF Processing")
deleteTasks := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched { // Batch size for processing
switch item.TransactionType { batchSize := 750 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
case "Delete":
deleteItem := item // Create new variable to ensure repetition of pointers // Process deletions in batches
deleteTasks = append(deleteTasks, &deleteItem) for i := 0; i < len(data.sched); i += batchSize {
case "Create": end := i + batchSize
createItem := item // Create new variable to ensure repetition of pointers if end > len(data.sched) {
createTasks = append(createTasks, &createItem) end = len(data.sched)
default: }
log.Msg.Error("Unknown transaction type in CIF Schedule", zap.String("TransactionType", item.TransactionType)) deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.sched[i:end] {
if item.TransactionType == "Delete" {
deleteItem := item
deleteBatch = append(deleteBatch, &deleteItem)
}
}
if len(deleteBatch) > 0 {
err := doDeletions(deleteBatch)
if err != nil {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
return err
}
} }
} }
err := doDeletions(deleteTasks) // Process creations in batches
if err != nil { for i := 0; i < len(data.sched); i += batchSize {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err)) end := i + batchSize
return err if end > len(data.sched) {
} end = len(data.sched)
err = doCreations(createTasks) }
if err != nil { createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
log.Msg.Error("Error creating CIF Entries", zap.Error(err)) for _, item := range data.sched[i:end] {
return err if item.TransactionType == "Create" {
createItem := item
createBatch = append(createBatch, &createItem)
}
}
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil {
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
return err
}
}
} }
log.Msg.Debug("CIF Processing complete")
return nil return nil
} }
// Create delete query types and pass to the function which batches the deletions // Create delete query types and pass to the function which batches the deletions
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error { func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
log.Msg.Info("Preparing CIF update Delete tasks", zap.Int("Delete task count", len(deletions)))
deleteQueries := make([]database.DeleteQuery, 0) deleteQueries := make([]database.DeleteQuery, 0)
for _, item := range deletions { for _, item := range deletions {
query := database.DeleteQuery{ query := database.DeleteQuery{
@ -68,8 +89,6 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
// Convert to the correct struct for the database and pass to the function which batches insertions // Convert to the correct struct for the database and pass to the function which batches insertions
func doCreations(creations []*upstreamApi.JsonScheduleV1) error { func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
log.Msg.Info("Preparing CIF update Create tasks", zap.Int("Create task count", len(creations)))
createDocuments := make([]database.Service, 0) createDocuments := make([]database.Service, 0)
for _, item := range creations { for _, item := range creations {
document, err := ConvertServiceType(item, false) document, err := ConvertServiceType(item, false)

View File

@ -27,7 +27,7 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
// If debug mode is on, call debugWriteDownload // If debug mode is on, call debugWriteDownload
if helpers.Runtime == "debug" { if helpers.Runtime == "debug" {
debugWriteDownload(&data) debugWriteDownload(data)
} }
// Parse CIF file // Parse CIF file
@ -37,14 +37,12 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
return err return err
} }
// Make `data` a nil pointer as it is no longer required
data = nil
// Drop timetable collection // Drop timetable collection
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
// If debug mode is on, call debugWriteFile
if helpers.Runtime == "debug" {
debugWriteFile(&parsed.header, &parsed.sched)
}
// Process CIF file // Process CIF file
err = processParsedCif(parsed) err = processParsedCif(parsed)
if err != nil { if err != nil {
@ -77,7 +75,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
// If debug mode is on, call debugWriteDownload // If debug mode is on, call debugWriteDownload
if helpers.Runtime == "debug" { if helpers.Runtime == "debug" {
debugWriteDownload(&data) debugWriteDownload(data)
} }
// Parse CIF file // Parse CIF file
@ -87,11 +85,10 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
return err return err
} }
// If debug mode is on, call debugWriteFile // Make `data` a nil pointer as it is no longer required
if helpers.Runtime == "debug" { data = nil
debugWriteFile(&parsed.header, &parsed.sched)
}
log.Msg.Debug("Starting metadata checks")
// Check CIF Sequence // Check CIF Sequence
// Skip if LastSequence is >= to this sequence // Skip if LastSequence is >= to this sequence
if metadata.LastSequence >= parsed.header.Metadata.Sequence { if metadata.LastSequence >= parsed.header.Metadata.Sequence {
@ -105,6 +102,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
log.Msg.Error("CIF sequence not as expected", zap.Error(err), zap.Int64("LastSequence", metadata.LastSequence), zap.Int64("New Sequence", parsed.header.Metadata.Sequence)) log.Msg.Error("CIF sequence not as expected", zap.Error(err), zap.Int64("LastSequence", metadata.LastSequence), zap.Int64("New Sequence", parsed.header.Metadata.Sequence))
return err return err
} }
log.Msg.Debug("Metadata checks complete")
// Do further sequence checks - parsed.header.Metadata.Sequence MUST = metadata.LastSequence + 1 // Do further sequence checks - parsed.header.Metadata.Sequence MUST = metadata.LastSequence + 1
@ -123,7 +121,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
} }
// Wraps nrod.NrodDownload() into a function which can handle downloading data for a given day // Wraps nrod.NrodDownload() into a function which can handle downloading data for a given day
func fetchUpdate(t time.Time, cfg *helpers.Configuration) ([]byte, error) { func fetchUpdate(t time.Time, cfg *helpers.Configuration) (*[]byte, error) {
url, err := getUpdateUrl("daily") url, err := getUpdateUrl("daily")
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -16,7 +16,7 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
return err return err
} }
unsortedCorpusData, err := parseCorpusData(&resp) unsortedCorpusData, err := parseCorpusData(resp)
if err != nil { if err != nil {
log.Msg.Error("Error parsing Corpus data", zap.Error(err)) log.Msg.Error("Error parsing Corpus data", zap.Error(err))
return err return err

View File

@ -91,16 +91,12 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter)) bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter))
} }
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions))) _, err := collection.BulkWrite(context.Background(), bulkDeletions)
result, err := collection.BulkWrite(context.Background(), bulkDeletions)
if err != nil { if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err)) log.Msg.Error("Error deleting documents", zap.Error(err))
return err return err
} }
log.Msg.Info("Deleted CIF Documents", zap.Int64("Deletion count", result.DeletedCount))
return nil return nil
} }
@ -123,14 +119,11 @@ func CreateCifEntries(schedules []database.Service) error {
bulkWriteOptions := options.BulkWrite().SetOrdered(false) bulkWriteOptions := options.BulkWrite().SetOrdered(false)
log.Msg.Info("Running `Create` tasks from CIF Update", zap.Int("Documents to create", len(schedules))) _, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
result, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil { if err != nil {
log.Msg.Error("Error inserting documents", zap.Error(err)) log.Msg.Error("Error inserting documents", zap.Error(err))
return err return err
} }
log.Msg.Info("Inserted CIF Documents", zap.Int64("Insertion count", result.InsertedCount))
return nil return nil
} }

View File

@ -13,7 +13,7 @@ import (
) )
// Downloads NROD Data over HTTP from the given URL, extracted data is returned // Downloads NROD Data over HTTP from the given URL, extracted data is returned
func NrodDownload(url string, cfg *helpers.Configuration) ([]byte, error) { func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) {
log.Msg.Debug("Fetching NROD data", zap.String("Request URL", url)) log.Msg.Debug("Fetching NROD data", zap.String("Request URL", url))
client := http.Client{ client := http.Client{
Timeout: time.Second * 10, Timeout: time.Second * 10,
@ -49,7 +49,7 @@ func NrodDownload(url string, cfg *helpers.Configuration) ([]byte, error) {
} }
// Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array // Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array
func nrodExtract(resp http.Response) ([]byte, error) { func nrodExtract(resp http.Response) (*[]byte, error) {
log.Msg.Debug("Extracting HTTP Response Data") log.Msg.Debug("Extracting HTTP Response Data")
gzReader, err := gzip.NewReader(resp.Body) gzReader, err := gzip.NewReader(resp.Body)
if err != nil { if err != nil {
@ -59,7 +59,7 @@ func nrodExtract(resp http.Response) ([]byte, error) {
log.Msg.Error("Unable to read response body") log.Msg.Error("Unable to read response body")
return nil, err return nil, err
} }
return data, nil return &data, nil
} }
defer gzReader.Close() defer gzReader.Close()
@ -70,5 +70,5 @@ func nrodExtract(resp http.Response) ([]byte, error) {
log.Msg.Error("Failed to read GZIPped data", zap.Error(err)) log.Msg.Error("Failed to read GZIPped data", zap.Error(err))
} }
return extractedData, nil return &extractedData, nil
} }