From a84b9fc46ade53ab4c09c3841797072f7ffd10db Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Wed, 19 Jul 2023 21:31:00 +0100 Subject: [PATCH] Add parsing for the VSTP messages --- src/dbAccess/access.go | 23 +++++++- src/go.mod | 2 +- src/go.sum | 10 ++++ src/helpers/vstp.go | 28 ++++++++++ src/main.go | 3 +- src/messaging/client.go | 2 +- src/messaging/vstp.go | 68 ------------------------ src/vstp/actions.go | 4 ++ src/vstp/handler.go | 40 ++++++++++++++ src/vstp/parser.go | 113 ++++++++++++++++++++++++++++++++++++++++ src/vstp/subscribe.go | 31 +++++++++++ 11 files changed, 251 insertions(+), 73 deletions(-) create mode 100644 src/helpers/vstp.go delete mode 100644 src/messaging/vstp.go create mode 100644 src/vstp/actions.go create mode 100644 src/vstp/handler.go create mode 100644 src/vstp/parser.go create mode 100644 src/vstp/subscribe.go diff --git a/src/dbAccess/access.go b/src/dbAccess/access.go index 89106c5..d68a5a5 100644 --- a/src/dbAccess/access.go +++ b/src/dbAccess/access.go @@ -1,15 +1,34 @@ package dbAccess import ( + "context" "fmt" "git.fjla.uk/owlboard/go-types/pkg/database" + "git.fjla.uk/owlboard/mq-client/helpers" "git.fjla.uk/owlboard/mq-client/log" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" ) func init() { - log.Msg.Info("Pushing mq-client version to database") - fmt.Println("ACTUALLY DO THIS HERE!") + version := database.Version{ + Target: "mq-client", + Component: "mq-client", + Version: helpers.Version, + } + versionSelector := database.VersionSelector{ + Target: "mq-client", + Component: "mq-client", + } + opts := options.Update().SetUpsert(true) + coll := MongoClient.Database("owlboard").Collection("versions") + _, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts) + if err != nil { + log.Msg.Warn("Unable to push version to database: " + err.Error()) + } else { + log.Msg.Debug("Version up to date in Database") + } } func PutManyServices(collection string, data []database.Service) bool { diff --git a/src/go.mod b/src/go.mod index 279956a..ea3d762 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,7 +3,7 @@ module git.fjla.uk/owlboard/mq-client go 1.19 require ( - git.fjla.uk/owlboard/go-types v0.0.0-20230717113641-a1216e6e5106 + git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b github.com/go-stomp/stomp/v3 v3.0.5 go.mongodb.org/mongo-driver v1.12.0 go.uber.org/zap v1.24.0 diff --git a/src/go.sum b/src/go.sum index 27c53e0..85e3e6e 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,5 +1,15 @@ git.fjla.uk/owlboard/go-types v0.0.0-20230717113641-a1216e6e5106 h1:1sPFYr4/gEdWGWJhB4/Y7jJYemBSZBXcWtBm7XiZqZk= git.fjla.uk/owlboard/go-types v0.0.0-20230717113641-a1216e6e5106/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= +git.fjla.uk/owlboard/go-types v0.0.0-20230718113936-e8f6c7115eb9 h1:UHdSkwt7zSwU5cA8bCNW8Z+eyNHAXYZUgbzyOhvUKqQ= +git.fjla.uk/owlboard/go-types v0.0.0-20230718113936-e8f6c7115eb9/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= +git.fjla.uk/owlboard/go-types v0.0.0-20230719183133-0c31803e4c27 h1:1tA4ikudlUPo4MAmzBzU59um9OsiUbPbPxHBJ2FJkDw= +git.fjla.uk/owlboard/go-types v0.0.0-20230719183133-0c31803e4c27/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= +git.fjla.uk/owlboard/go-types v0.0.0-20230719183907-ef710419e906 h1:eB0Ed0BrJ2w6akyK3Jf/ZAFAdlLM/E1jXHVC8QNPtu4= +git.fjla.uk/owlboard/go-types v0.0.0-20230719183907-ef710419e906/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= +git.fjla.uk/owlboard/go-types v0.0.0-20230719185315-e492e8ad6ae7 h1:g+ovnwGpx24KqMzrFA7L6s+JKR2MWzcO51iMXdq7N50= +git.fjla.uk/owlboard/go-types v0.0.0-20230719185315-e492e8ad6ae7/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= +git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b h1:QKHrP3kjvypPO/YVrHdTNV68w6I7MNYyEy5zcD2Jk/A= +git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/src/helpers/vstp.go b/src/helpers/vstp.go new file mode 100644 index 0000000..73a59c3 --- /dev/null +++ b/src/helpers/vstp.go @@ -0,0 +1,28 @@ +package helpers + +// An error with the VSTP messages is that speed values are shown incorrectly, but not for all services +// This maps the displayed speed to the correct speed. + +var SpeedMap = map[string]string{ + "22": "10", + "34": "15", + "56": "20", + "67": "30", + "78": "35", + "89": "40", + "101": "45", + "112": "50", + "123": "55", + "134": "60", + "157": "70", + "168": "75", + "179": "80", + "195": "87", + "201": "90", + "213": "95", + "224": "100", + "246": "110", + "280": "125", + "314": "140", + "417": "186", +} diff --git a/src/main.go b/src/main.go index 16dc3f4..daaf0a8 100644 --- a/src/main.go +++ b/src/main.go @@ -5,11 +5,12 @@ import ( "git.fjla.uk/owlboard/mq-client/helpers" "git.fjla.uk/owlboard/mq-client/log" "git.fjla.uk/owlboard/mq-client/messaging" + "git.fjla.uk/owlboard/mq-client/vstp" ) func main() { log.Msg.Info("Initialised OwlBoard MQ Client " + helpers.Version) dbAccess.PrintFromDbPackage() defer messaging.Disconnect(messaging.Client) - messaging.Listen() + vstp.Subscribe() } diff --git a/src/messaging/client.go b/src/messaging/client.go index f1801e6..af45ecd 100644 --- a/src/messaging/client.go +++ b/src/messaging/client.go @@ -36,7 +36,7 @@ func dial() *stomp.Conn { conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618", stomp.ConnOpt.Login(credentials.user, credentials.pass), stomp.ConnOpt.HeartBeat(15000, 15000), - stomp.ConnOpt.Header("client-id", credentials.user), + stomp.ConnOpt.Header("client-id", credentials.user+"-mq-client"), ) if err != nil { log.Msg.Fatal("Unable to connect to STOMP Client", zap.String("err", err.Error())) diff --git a/src/messaging/vstp.go b/src/messaging/vstp.go deleted file mode 100644 index 772bf6a..0000000 --- a/src/messaging/vstp.go +++ /dev/null @@ -1,68 +0,0 @@ -package messaging - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "time" - - "git.fjla.uk/owlboard/mq-client/log" - - "github.com/go-stomp/stomp/v3" -) - -func Listen() { - sub, err := Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) - if err != nil { - log.Msg.Fatal("Unable to start subscription: " + err.Error()) - } - log.Msg.Info("Subscription to VSTP topic successful") - - go func() { - log.Msg.Debug("Message handler routine started") - for { - msg := <-sub.C - if msg.Err != nil { - fmt.Println(msg.Err) - } - handle(msg) - } - }() - - select {} -} - -var count uint64 = 0 - -func handle(msg *stomp.Message) { - log.Msg.Info("STOMP message received") - count++ - log.Msg.Info("Message count: " + fmt.Sprint(count)) - fmt.Println(string(msg.Body)) - saveToFile(string(msg.Body)) -} - -func saveToFile(msg string) { - timestamp := time.Now().Format("2006-01-02T15:04:05") - path := fmt.Sprintf("message-logs/%s-msg.txt", timestamp) - - var decodedData interface{} - if err := json.Unmarshal([]byte(msg), &decodedData); err != nil { - log.Msg.Error("Error decoding JSON: " + err.Error()) - return - } - - prettyJSON, err := json.MarshalIndent(decodedData, "", " ") - if err != nil { - log.Msg.Error("Error marshaling data to JSON: " + err.Error()) - return - } - - err = ioutil.WriteFile(path, prettyJSON, 0644) - if err != nil { - log.Msg.Error("Error saving message: " + err.Error()) - return - } - - log.Msg.Info("Saved message to: " + path) -} diff --git a/src/vstp/actions.go b/src/vstp/actions.go new file mode 100644 index 0000000..8e78241 --- /dev/null +++ b/src/vstp/actions.go @@ -0,0 +1,4 @@ +package vstp + +// Decide, based on the DB Formatted message type, what action needs taking +// then either insert, or delete from the database as required diff --git a/src/vstp/handler.go b/src/vstp/handler.go new file mode 100644 index 0000000..209be37 --- /dev/null +++ b/src/vstp/handler.go @@ -0,0 +1,40 @@ +package vstp + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "time" + + "git.fjla.uk/owlboard/go-types/pkg/database" + "git.fjla.uk/owlboard/mq-client/log" + "github.com/go-stomp/stomp/v3" +) + +var count uint64 = 0 + +func handle(msg *stomp.Message) { + count++ + log.Msg.Info("Message count: " + fmt.Sprint(count)) + schedule := unmarshalData(msg.Body) + saveToFile(schedule) +} + +func saveToFile(msg database.Service) { + timestamp := time.Now().Format("2006-01-02T15:04:05") + path := fmt.Sprintf("message-logs/%s-msg.json", timestamp) + + prettyJSON, err := json.MarshalIndent(msg, "", " ") + if err != nil { + log.Msg.Error("Error marshaling data to JSON: " + err.Error()) + return + } + + err = ioutil.WriteFile(path, prettyJSON, 0644) + if err != nil { + log.Msg.Error("Error saving message: " + err.Error()) + return + } + + log.Msg.Info("Saved message to: " + path) +} diff --git a/src/vstp/parser.go b/src/vstp/parser.go new file mode 100644 index 0000000..80c6ea9 --- /dev/null +++ b/src/vstp/parser.go @@ -0,0 +1,113 @@ +package vstp + +import ( + "encoding/json" + "strconv" + "time" + + "git.fjla.uk/owlboard/go-types/pkg/database" + "git.fjla.uk/owlboard/go-types/pkg/upstreamApi" + "git.fjla.uk/owlboard/mq-client/helpers" + "git.fjla.uk/owlboard/mq-client/log" +) + +// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct +func unmarshalData(jsonData []byte) database.Service { + var schedule upstreamApi.Schedule + err := json.Unmarshal(jsonData, &schedule) + if err != nil { + log.Msg.Error("Unable to unmarshal message body: " + err.Error()) + //return err + } + return formatData(schedule) +} + +// Transforms the upstreamApi.Schedule type into a database.Service type +func formatData(dataInput upstreamApi.Schedule) database.Service { + service := database.Service{ + TransactionType: dataInput.TransactionType, + StpIndicator: dataInput.CIFSTPIndicator, + Operator: dataInput.ScheduleSegment.ATOCCode, + TrainUid: dataInput.CIFTrainUID, + Headcode: dataInput.ScheduleSegment.SignallingID, + PowerType: dataInput.ScheduleSegment.CIFPowerType, + PlanSpeed: parseSpeed(dataInput.ScheduleSegment.CIFSpeed), + ScheduleStartDate: parseDate(dataInput.ScheduleStartDate, false), + ScheduleEndDate: parseDate(dataInput.ScheduleEndDate, true), + DaysRun: parseDaysRun(dataInput.ScheduleDaysRun), + Stops: parseStops(dataInput.ScheduleSegment.ScheduleLocation), + } + return service +} + +// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent +func parseSpeed(CIFSpeed string) int32 { + log.Msg.Debug("CIFSpeed Input: " + CIFSpeed) + actualSpeed, exists := helpers.SpeedMap[CIFSpeed] + if !exists { + actualSpeed = CIFSpeed + } + log.Msg.Debug("Corrected Speed: " + actualSpeed) + + speed, err := strconv.ParseInt(actualSpeed, 10, 32) + if err != nil { + log.Msg.Warn("Unable to parse speed: " + CIFSpeed + " " + actualSpeed) + return 0 + } + return int32(speed) +} + +// Converts the date string provided from the upstream API into a proper Date type and adds a time +func parseDate(dateString string, end bool) time.Time { + log.Msg.Debug("Date Input: " + dateString) + date, err := time.Parse("2006-01-02", dateString) + if err != nil { + log.Msg.Error("Unable to parse date: " + dateString) + return time.Time{} + } + log.Msg.Debug("Parsed date: " + date.String()) + + var hour, minute, second, nanosecond int + location := time.UTC + if end { + hour, minute, second, nanosecond = 23, 59, 59, 999999999 + } else { + hour, minute, second, nanosecond = 0, 0, 0, 0 + } + + dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location) + return dateWithTime +} + +// Converts the binary stype 'daysRun' field into an array of short days +func parseDaysRun(daysBinary string) []string { + log.Msg.Debug("daysRun Input: " + daysBinary) + shortDays := []string{"m", "t", "w", "th", "f", "s", "su"} + var result []string + for i, digit := range daysBinary { + if digit == '1' { + result = append(result, shortDays[i]) + } + } + return result +} + +// Converts an array if upstreamApi.ScheduleLocation types to an array of database.Stop types +func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop { + var stops []database.Stop + + for _, loc := range inputStops { + stop := database.Stop{ + PublicDeparture: &loc.PublicDepartureTime, + WttDeparture: &loc.ScheduledDepartureTime, + PublicArrival: &loc.PublicArrivalTime, + WttArrival: &loc.ScheduledArrivalTime, + IsPublic: loc.PublicDepartureTime != "" || loc.PublicArrivalTime != "", + Tiploc: loc.Location.Tiploc.TiplocID, + } + + stops = append(stops, stop) + } + + return stops +} diff --git a/src/vstp/subscribe.go b/src/vstp/subscribe.go new file mode 100644 index 0000000..2d4e729 --- /dev/null +++ b/src/vstp/subscribe.go @@ -0,0 +1,31 @@ +package vstp + +import ( + "git.fjla.uk/owlboard/mq-client/log" + "git.fjla.uk/owlboard/mq-client/messaging" + "github.com/go-stomp/stomp/v3" +) + +func Subscribe() { + sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) + if err != nil { + log.Msg.Fatal("Unable to start subscription: " + err.Error()) + } + log.Msg.Info("Subscription to VSTP topic successful, listening") + + go func() { + log.Msg.Debug("GOROUTINE: VSTP Message Handler Started") + defer log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped") + for { + msg := <-sub.C + if msg.Err != nil { + log.Msg.Error("STOMP Message Error: " + msg.Err.Error()) + } else { + log.Msg.Debug("STOMP Message Received") + handle(msg) + } + } + }() + + select {} +}