From ba8e4e4c72b8ed429cf03995c1c3917427b58603 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Mon, 15 Apr 2024 20:03:48 +0100 Subject: [PATCH] Update VSTP package to use processing from CIF package --- cif/parse.go | 12 ++-- cif/process.go | 20 +++--- cif/types.go | 8 +-- cif/update.go | 14 ++-- dbAccess/access.go | 30 --------- dbAccess/cif.go | 14 ++-- dbAccess/common.go | 8 +-- dbAccess/contants.go | 4 +- dbAccess/corpus.go | 4 +- vstp/actions.go | 72 +++++++++----------- vstp/handler.go | 10 ++- vstp/parser.go | 157 ------------------------------------------- vstp/unmarshaller.go | 20 ++++++ 13 files changed, 103 insertions(+), 270 deletions(-) delete mode 100644 vstp/parser.go create mode 100644 vstp/unmarshaller.go diff --git a/cif/parse.go b/cif/parse.go index 9bb1990..f3c6737 100644 --- a/cif/parse.go +++ b/cif/parse.go @@ -11,16 +11,16 @@ import ( ) // Accepts the CIF data as a stream and outputs parsed data -func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { +func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) { defer dataStream.Close() log.Debug("Starting CIF Datastream parsing") if dataStream == nil { return nil, errors.New("unable to parse nil pointer") } - var parsed parsedData - parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) - parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) + var parsed ParsedData + parsed.Assoc = make([]upstreamApi.JsonAssociationV1, 0) + parsed.Sched = make([]upstreamApi.JsonScheduleV1, 0) // Create JSON Decoder decoder := json.NewDecoder(dataStream) @@ -42,7 +42,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { log.Error("Error decoding JSONTimetableV1 object", zap.Error(err)) continue } - parsed.header = timetable + parsed.Header = timetable case "TiplocV1": // This data is not used and is sourced from CORPUS continue @@ -56,7 +56,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { log.Error("Error decoding JSONScheduleV1 object", zap.Error(err)) continue } - parsed.sched = append(parsed.sched, schedule) + parsed.Sched = append(parsed.Sched, schedule) case "EOF": log.Debug("Reached EOF") default: diff --git a/cif/process.go b/cif/process.go index ee10b05..0b7dada 100644 --- a/cif/process.go +++ b/cif/process.go @@ -9,21 +9,21 @@ import ( ) // Processes parsed CIF data and applies the data to the database -func processParsedCif(data *parsedData) error { +func ProcessParsedCif(data *ParsedData) error { log.Debug("Starting CIF Processing") - log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched))) + log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched))) // Batch size for processing batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB // Process deletions in batches - for i := 0; i < len(data.sched); i += batchSize { + for i := 0; i < len(data.Sched); i += batchSize { end := i + batchSize - if end > len(data.sched) { - end = len(data.sched) + if end > len(data.Sched) { + end = len(data.Sched) } deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0) - for _, item := range data.sched[i:end] { + for _, item := range data.Sched[i:end] { if item.TransactionType == "Delete" { deleteItem := item deleteBatch = append(deleteBatch, &deleteItem) @@ -39,13 +39,13 @@ func processParsedCif(data *parsedData) error { } // Process creations in batches - for i := 0; i < len(data.sched); i += batchSize { + for i := 0; i < len(data.Sched); i += batchSize { end := i + batchSize - if end > len(data.sched) { - end = len(data.sched) + if end > len(data.Sched) { + end = len(data.Sched) } createBatch := make([]*upstreamApi.JsonScheduleV1, 0) - for _, item := range data.sched[i:end] { + for _, item := range data.Sched[i:end] { if item.TransactionType == "Create" { createItem := item createBatch = append(createBatch, &createItem) diff --git a/cif/types.go b/cif/types.go index ddc2839..2b74cc8 100644 --- a/cif/types.go +++ b/cif/types.go @@ -6,8 +6,8 @@ import "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" // database or external API resources should be defined in git.fjla.uk/owlboard/go-types // Holds parsed data for processing -type parsedData struct { - header upstreamApi.JsonTimetableV1 - assoc []upstreamApi.JsonAssociationV1 - sched []upstreamApi.JsonScheduleV1 +type ParsedData struct { + Header upstreamApi.JsonTimetableV1 + Assoc []upstreamApi.JsonAssociationV1 + Sched []upstreamApi.JsonScheduleV1 } diff --git a/cif/update.go b/cif/update.go index 8995ad2..353e637 100644 --- a/cif/update.go +++ b/cif/update.go @@ -13,6 +13,7 @@ import ( // Replaces all existing CIF Data with a new download func runCifFullDownload(cfg *helpers.Configuration) error { + preTime := time.Now() log.Info("Downloading all CIF Data") // Download CIF Data file @@ -36,12 +37,12 @@ func runCifFullDownload(cfg *helpers.Configuration) error { dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. // Process CIF file - err = processParsedCif(parsed) + err = ProcessParsedCif(parsed) if err != nil { log.Error("Error processing CIF data", zap.Error(err)) } - newMeta := generateMetadata(&parsed.header) + newMeta := generateMetadata(&parsed.Header) ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType) if !ok { log.Warn("CIF Data updated, but metadata write failed") @@ -60,6 +61,9 @@ func runCifFullDownload(cfg *helpers.Configuration) error { log.Info("Out of date services removed", zap.Int64("removal count", count)) } + postTime := time.Now() + updateDuration := postTime.Sub(preTime) + log.Info("Execution time", zap.Duration("duration", updateDuration)) return nil } @@ -88,7 +92,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta // Check CIF Metadata log.Debug("Starting metadata checks") - reason, update := checkMetadata(metadata, &parsed.header) + reason, update := checkMetadata(metadata, &parsed.Header) if !update { log.Warn("Update file not processed", zap.String("reason", reason)) continue @@ -97,12 +101,12 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta log.Info("CIF Data is suitable for processing", zap.String("reason", reason)) // Process CIF file - err = processParsedCif(parsed) + err = ProcessParsedCif(parsed) if err != nil { log.Error("Error processing CIF data", zap.Error(err)) } - metadata = generateMetadata(&parsed.header) + metadata = generateMetadata(&parsed.Header) parsed = nil } diff --git a/dbAccess/access.go b/dbAccess/access.go index 51afaee..ca8d3f0 100644 --- a/dbAccess/access.go +++ b/dbAccess/access.go @@ -10,10 +10,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -const timetableCollection string = "timetable" - // Pushes the current version of this application to the data base 'versions' collection. -// Currently uses the old name of mq-client func PushVersionToDb() { version := database.Version{ Target: "timetable-mgr", @@ -33,30 +30,3 @@ func PushVersionToDb() { log.Debug("Version up to date in Database") } } - -// Puts one item of the type `database.Service` to the database, used by the VSTP package which receives services one at a time -func PutOneService(data database.Service) bool { - coll := MongoClient.Database(databaseName).Collection(timetableCollection) - _, err := coll.InsertOne(context.TODO(), data) - if err != nil { - log.Error("Unable to insert to database: " + err.Error()) - return false - } - return true -} - -// Deletes one service from the database. -func DeleteOneService(data database.DeleteQuery) bool { - coll := MongoClient.Database(databaseName).Collection(timetableCollection) - var filter = bson.D{ - {Key: "trainUid", Value: data.TrainUid}, - {Key: "stpIndicator", Value: data.StpIndicator}, - {Key: "scheduleStartDate", Value: data.ScheduleStartDate}, - } - _, err := coll.DeleteOne(context.TODO(), filter) - if err != nil { - log.Error("Unable to delete service: " + err.Error()) - return false - } - return true -} diff --git a/dbAccess/cif.go b/dbAccess/cif.go index 77db12b..f879ef8 100644 --- a/dbAccess/cif.go +++ b/dbAccess/cif.go @@ -27,8 +27,8 @@ type CifMetadata struct { // Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example. func GetCifMetadata() (*CifMetadata, error) { - database := MongoClient.Database(databaseName) - collection := database.Collection(metaCollection) + database := MongoClient.Database(DatabaseName) + collection := database.Collection(MetaCollection) filter := bson.M{"type": Doctype} var result CifMetadata err := collection.FindOne(context.Background(), filter).Decode(&result) @@ -46,8 +46,8 @@ func GetCifMetadata() (*CifMetadata, error) { // Uses upsert to Insert/Update the CifMetadata in the database func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool { - database := MongoClient.Database(databaseName) - collection := database.Collection(metaCollection) + database := MongoClient.Database(DatabaseName) + collection := database.Collection(MetaCollection) options := options.Update().SetUpsert(true) filter := bson.M{"type": Doctype} update := bson.M{ @@ -86,7 +86,7 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error { log.Debug("Running deletions against database", zap.Int("count", len(deletions))) // Prepare deletion tasks - collection := MongoClient.Database(databaseName).Collection(timetableCollection) + collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) bulkDeletions := make([]mongo.WriteModel, 0, len(deletions)) for _, deleteQuery := range deletions { @@ -117,7 +117,7 @@ func CreateCifEntries(schedules []database.Service) error { } log.Debug("Running creations against database", zap.Int("count", len(schedules))) - collection := MongoClient.Database(databaseName).Collection(timetableCollection) + collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) models := make([]mongo.WriteModel, 0, len(schedules)) @@ -141,7 +141,7 @@ func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) { // Define filter filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}} - collection := MongoClient.Database(databaseName).Collection(timetableCollection) + collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection) res, err := collection.DeleteMany(context.Background(), filter) if err != nil { diff --git a/dbAccess/common.go b/dbAccess/common.go index 3b5a365..e87df04 100644 --- a/dbAccess/common.go +++ b/dbAccess/common.go @@ -13,7 +13,7 @@ import ( // CAUTION: Drops the collection named in collectionName func DropCollection(collectionName string) error { log.Info("Dropping collection", zap.String("Collection Name", collectionName)) - database := MongoClient.Database(databaseName) + database := MongoClient.Database(DatabaseName) collection := database.Collection(collectionName) err := collection.Drop(context.Background()) @@ -27,8 +27,8 @@ func DropCollection(collectionName string) error { // Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection func CheckUpdateTime(collectionName string) (int64, error) { - database := MongoClient.Database(databaseName) - collection := database.Collection(metaCollection) + database := MongoClient.Database(DatabaseName) + collection := database.Collection(MetaCollection) filter := bson.D{ {Key: "target", Value: collectionName}, {Key: "type", Value: "collection"}, @@ -46,7 +46,7 @@ func CheckUpdateTime(collectionName string) (int64, error) { // Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function. func SetUpdateTime(collectionName string) error { log.Info("Setting update time", zap.String("collection", collectionName)) - database := MongoClient.Database(databaseName) + database := MongoClient.Database(DatabaseName) collection := database.Collection("meta") options := options.Update().SetUpsert(true) updateTime := time.Now().Unix() diff --git a/dbAccess/contants.go b/dbAccess/contants.go index 612f334..d0bd67d 100644 --- a/dbAccess/contants.go +++ b/dbAccess/contants.go @@ -1,7 +1,7 @@ package dbAccess -const databaseName string = "owlboard" +const DatabaseName string = "owlboard" const CorpusCollection string = "corpus" const StationsCollection string = "stations" -const metaCollection string = "meta" +const MetaCollection string = "meta" const TimetableCollection string = "timetable" diff --git a/dbAccess/corpus.go b/dbAccess/corpus.go index 46b1db5..8466bdc 100644 --- a/dbAccess/corpus.go +++ b/dbAccess/corpus.go @@ -8,7 +8,7 @@ import ( // Puts an array of Corpus Documents into the database func PutManyCorpus(corpusData *[]database.CorpusEntry) error { - collection := MongoClient.Database(databaseName).Collection(CorpusCollection) + collection := MongoClient.Database(DatabaseName).Collection(CorpusCollection) documents := convertCorpusToInterfaceSlice(corpusData) @@ -23,7 +23,7 @@ func PutManyCorpus(corpusData *[]database.CorpusEntry) error { // Puts an array of Stations documents into the database func PutManyStations(stationsData *[]database.StationEntry) error { - collection := MongoClient.Database(databaseName).Collection(StationsCollection) + collection := MongoClient.Database(DatabaseName).Collection(StationsCollection) documents := convertStationsToInterfaceSlice(stationsData) diff --git a/vstp/actions.go b/vstp/actions.go index 33df0de..4fc148d 100644 --- a/vstp/actions.go +++ b/vstp/actions.go @@ -4,52 +4,42 @@ import ( "fmt" "git.fjla.uk/owlboard/go-types/pkg/database" + "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" + "git.fjla.uk/owlboard/timetable-mgr/cif" "git.fjla.uk/owlboard/timetable-mgr/dbAccess" - "git.fjla.uk/owlboard/timetable-mgr/log" ) -// Decide, based on the DB Formatted message type, what action needs taking -// then either insert, or delete from the database as required -func processEntryType(entry database.Service) { +// Converts to the correct struct for database insertion, then processes accordingly +func processCifData(s *upstreamApi.JsonScheduleV1) error { - switch entry.TransactionType { - case "Create": - createEntry(entry) - case "Update": - updateEntry(entry) - case "Delete": - deleteEntry(entry) - default: - log.Warn("Unknown transaction type: " + entry.TransactionType) - } -} + if s.TransactionType == "Create" { + service, err := cif.ConvertServiceType(s, true) + if err != nil { + return err + } + // Create slice as required by CreateCifEntries() + services := []database.Service{*service} + err = dbAccess.CreateCifEntries(services) + if err != nil { + return err + } + return nil -func createEntry(entry database.Service) { - log.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator) - status := dbAccess.PutOneService(entry) - if status { - log.Info("Database entry created") + } else if s.TransactionType == "Delete" { + query := database.DeleteQuery{ + TrainUid: s.CifTrainUid, + ScheduleStartDate: cif.ParseCifDate(&s.ScheduleStartDate, "start"), + StpIndicator: s.CifStpIndicator, + } + // Create slice as required by DeleteCifEntries() + queries := []database.DeleteQuery{query} + err := dbAccess.DeleteCifEntries(queries) + if err != nil { + return err + } + return nil } else { - log.Error("Database entry failed, skipped service") - } -} - -func updateEntry(entry database.Service) { - log.Warn("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator) -} - -func deleteEntry(entry database.Service) { - log.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode) - var deletionQuery = database.DeleteQuery{ - TrainUid: entry.TrainUid, - ScheduleStartDate: entry.ScheduleStartDate, - StpIndicator: entry.StpIndicator, - } - status := dbAccess.DeleteOneService(deletionQuery) - if status { - log.Info("Database entry deleted") - } else { - log.Error("Database deletion failed, skipped deletion") - fmt.Printf("%+v\n", deletionQuery) + err := fmt.Errorf("unknown transaction type: %s", s.TransactionType) + return err } } diff --git a/vstp/handler.go b/vstp/handler.go index 417ad49..9d175fd 100644 --- a/vstp/handler.go +++ b/vstp/handler.go @@ -11,6 +11,12 @@ var count uint64 = 0 func handle(msg *stomp.Message) { count++ log.Info("Message received", zap.Uint64("total since startup", count)) - schedule := unmarshalData(string(msg.Body)) - processEntryType(schedule) + schedule, err := unmarshalData(string(msg.Body)) + if err != nil { + log.Error("Error unmarshalling VSTP Message", zap.Error(err)) + } + err = processCifData(schedule) + if err != nil { + log.Error("Error processing VSTP Schedule", zap.Error(err)) + } } diff --git a/vstp/parser.go b/vstp/parser.go deleted file mode 100644 index de0e146..0000000 --- a/vstp/parser.go +++ /dev/null @@ -1,157 +0,0 @@ -package vstp - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - "time" - - "git.fjla.uk/owlboard/go-types/pkg/database" - "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" - "git.fjla.uk/owlboard/timetable-mgr/helpers" - "git.fjla.uk/owlboard/timetable-mgr/log" -) - -// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct -func unmarshalData(jsonData string) database.Service { - var schedule upstreamApi.MsgData - err := json.Unmarshal([]byte(jsonData), &schedule) - if err != nil { - log.Error("Unable to unmarshal message body: " + err.Error()) - //return err - } - log.Debug("Unmarshalling Complete") - - if schedule.Data.CIFMsg.ScheduleSegment == nil { - log.Warn("ScheduleSegment is nil") - } else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 { - log.Warn("ScheduleSegment is empty") - } - return formatData(&schedule.Data.CIFMsg) -} - -// Transforms the upstreamApi.Schedule type into a database.Service type -func formatData(dataInput *upstreamApi.Schedule) database.Service { - log.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment))) - - var operator, headcode, powerType string - var planSpeed int32 - var stops []database.Stop - - // Check that the ScheduleSegment contains data, 'Delete' messages have no ScheduleSegment - if len(dataInput.ScheduleSegment) > 0 { - operator = dataInput.ScheduleSegment[0].ATOCCode - headcode = dataInput.ScheduleSegment[0].SignallingID - powerType = dataInput.ScheduleSegment[0].CIFPowerType - planSpeed = parseSpeed(dataInput.ScheduleSegment[0].CIFSpeed) - stops = parseStops(dataInput.ScheduleSegment[0].ScheduleLocation) - } - if operator == "" { - operator = "UK" - } - service := database.Service{ - TransactionType: dataInput.TransactionType, - StpIndicator: dataInput.CIFSTPIndicator, - Vstp: true, - Operator: operator, - TrainUid: dataInput.CIFTrainUID, - Headcode: headcode, - PowerType: powerType, - PlanSpeed: planSpeed, - ScheduleStartDate: parseDate(dataInput.ScheduleStartDate, false), - ScheduleEndDate: parseDate(dataInput.ScheduleEndDate, true), - DaysRun: parseDaysRun(dataInput.ScheduleDaysRun), - Stops: stops, - } - return service -} - -// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent -func parseSpeed(CIFSpeed string) int32 { - log.Debug("CIFSpeed Input: '" + CIFSpeed + "'") - if CIFSpeed == "" { - log.Debug("Speed data not provided") - return int32(0) - } - actualSpeed, exists := helpers.SpeedMap[CIFSpeed] - if !exists { - actualSpeed = CIFSpeed - } - log.Debug("Corrected Speed: " + actualSpeed) - - speed, err := strconv.ParseInt(actualSpeed, 10, 32) - if err != nil { - log.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0") - return int32(0) - } - return int32(speed) -} - -// Converts the date string provided from the upstream API into a proper Date type and adds a time -func parseDate(dateString string, end bool) time.Time { - log.Debug("Date Input: " + dateString) - date, err := time.Parse("2006-01-02", dateString) - if err != nil { - log.Error("Unable to parse date: " + dateString) - return time.Time{} - } - - var hour, minute, second, nanosecond int - location := time.UTC - if end { - hour, minute, second, nanosecond = 23, 59, 59, 0 - } else { - hour, minute, second, nanosecond = 0, 0, 0, 0 - } - - dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location) - log.Debug("Parsed date: " + dateWithTime.String()) - return dateWithTime -} - -// Converts the binary stype 'daysRun' field into an array of short days -func parseDaysRun(daysBinary string) []string { - log.Debug("daysRun Input: " + daysBinary) - shortDays := []string{"m", "t", "w", "th", "f", "s", "su"} - var result []string - for i, digit := range daysBinary { - if digit == '1' { - result = append(result, shortDays[i]) - } - } - return result -} - -// Converts an array if upstreamApi.ScheduleLocation types to an array of database.Stop types -func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop { - var stops []database.Stop - - for _, loc := range inputStops { - stop := database.Stop{ - PublicDeparture: parseTimeStrings(loc.PublicDepartureTime), - WttDeparture: parseTimeStrings(loc.ScheduledDepartureTime), - PublicArrival: parseTimeStrings(loc.PublicArrivalTime), - WttArrival: parseTimeStrings(loc.ScheduledArrivalTime), - IsPublic: strings.TrimSpace(loc.PublicDepartureTime) != "" || strings.TrimSpace(loc.PublicArrivalTime) != "", - Tiploc: loc.Tiploc.Tiploc.TiplocId, - } - - stops = append(stops, stop) - } - - return stops -} - -func parseTimeStrings(t string) string { - if t == "" { - return t - } - - strippedT := strings.TrimSpace(t) - if strippedT == "" { - return "" - } else { - return strippedT[:4] - } -} diff --git a/vstp/unmarshaller.go b/vstp/unmarshaller.go new file mode 100644 index 0000000..559fec3 --- /dev/null +++ b/vstp/unmarshaller.go @@ -0,0 +1,20 @@ +package vstp + +import ( + "encoding/json" + + "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" + "git.fjla.uk/owlboard/timetable-mgr/log" +) + +// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct +func unmarshalData(jsonData string) (*upstreamApi.JsonScheduleV1, error) { + var schedule upstreamApi.MsgData + err := json.Unmarshal([]byte(jsonData), &schedule) + if err != nil { + log.Error("Unable to unmarshal message body: " + err.Error()) + return nil, err + } + log.Debug("Unmarshalling Complete") + return &schedule.Data.CIFMsg, nil +}