Add parsing for the VSTP messages
This commit is contained in:
4
src/vstp/actions.go
Normal file
4
src/vstp/actions.go
Normal file
@@ -0,0 +1,4 @@
|
||||
package vstp
|
||||
|
||||
// Decide, based on the DB Formatted message type, what action needs taking
|
||||
// then either insert, or delete from the database as required
|
||||
40
src/vstp/handler.go
Normal file
40
src/vstp/handler.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package vstp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/mq-client/log"
|
||||
"github.com/go-stomp/stomp/v3"
|
||||
)
|
||||
|
||||
var count uint64 = 0
|
||||
|
||||
func handle(msg *stomp.Message) {
|
||||
count++
|
||||
log.Msg.Info("Message count: " + fmt.Sprint(count))
|
||||
schedule := unmarshalData(msg.Body)
|
||||
saveToFile(schedule)
|
||||
}
|
||||
|
||||
func saveToFile(msg database.Service) {
|
||||
timestamp := time.Now().Format("2006-01-02T15:04:05")
|
||||
path := fmt.Sprintf("message-logs/%s-msg.json", timestamp)
|
||||
|
||||
prettyJSON, err := json.MarshalIndent(msg, "", " ")
|
||||
if err != nil {
|
||||
log.Msg.Error("Error marshaling data to JSON: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
err = ioutil.WriteFile(path, prettyJSON, 0644)
|
||||
if err != nil {
|
||||
log.Msg.Error("Error saving message: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Msg.Info("Saved message to: " + path)
|
||||
}
|
||||
113
src/vstp/parser.go
Normal file
113
src/vstp/parser.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package vstp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||
"git.fjla.uk/owlboard/mq-client/helpers"
|
||||
"git.fjla.uk/owlboard/mq-client/log"
|
||||
)
|
||||
|
||||
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
||||
func unmarshalData(jsonData []byte) database.Service {
|
||||
var schedule upstreamApi.Schedule
|
||||
err := json.Unmarshal(jsonData, &schedule)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
|
||||
//return err
|
||||
}
|
||||
return formatData(schedule)
|
||||
}
|
||||
|
||||
// Transforms the upstreamApi.Schedule type into a database.Service type
|
||||
func formatData(dataInput upstreamApi.Schedule) database.Service {
|
||||
service := database.Service{
|
||||
TransactionType: dataInput.TransactionType,
|
||||
StpIndicator: dataInput.CIFSTPIndicator,
|
||||
Operator: dataInput.ScheduleSegment.ATOCCode,
|
||||
TrainUid: dataInput.CIFTrainUID,
|
||||
Headcode: dataInput.ScheduleSegment.SignallingID,
|
||||
PowerType: dataInput.ScheduleSegment.CIFPowerType,
|
||||
PlanSpeed: parseSpeed(dataInput.ScheduleSegment.CIFSpeed),
|
||||
ScheduleStartDate: parseDate(dataInput.ScheduleStartDate, false),
|
||||
ScheduleEndDate: parseDate(dataInput.ScheduleEndDate, true),
|
||||
DaysRun: parseDaysRun(dataInput.ScheduleDaysRun),
|
||||
Stops: parseStops(dataInput.ScheduleSegment.ScheduleLocation),
|
||||
}
|
||||
return service
|
||||
}
|
||||
|
||||
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
|
||||
func parseSpeed(CIFSpeed string) int32 {
|
||||
log.Msg.Debug("CIFSpeed Input: " + CIFSpeed)
|
||||
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
|
||||
if !exists {
|
||||
actualSpeed = CIFSpeed
|
||||
}
|
||||
log.Msg.Debug("Corrected Speed: " + actualSpeed)
|
||||
|
||||
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
||||
if err != nil {
|
||||
log.Msg.Warn("Unable to parse speed: " + CIFSpeed + " " + actualSpeed)
|
||||
return 0
|
||||
}
|
||||
return int32(speed)
|
||||
}
|
||||
|
||||
// Converts the date string provided from the upstream API into a proper Date type and adds a time
|
||||
func parseDate(dateString string, end bool) time.Time {
|
||||
log.Msg.Debug("Date Input: " + dateString)
|
||||
date, err := time.Parse("2006-01-02", dateString)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to parse date: " + dateString)
|
||||
return time.Time{}
|
||||
}
|
||||
log.Msg.Debug("Parsed date: " + date.String())
|
||||
|
||||
var hour, minute, second, nanosecond int
|
||||
location := time.UTC
|
||||
if end {
|
||||
hour, minute, second, nanosecond = 23, 59, 59, 999999999
|
||||
} else {
|
||||
hour, minute, second, nanosecond = 0, 0, 0, 0
|
||||
}
|
||||
|
||||
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
|
||||
return dateWithTime
|
||||
}
|
||||
|
||||
// Converts the binary stype 'daysRun' field into an array of short days
|
||||
func parseDaysRun(daysBinary string) []string {
|
||||
log.Msg.Debug("daysRun Input: " + daysBinary)
|
||||
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
|
||||
var result []string
|
||||
for i, digit := range daysBinary {
|
||||
if digit == '1' {
|
||||
result = append(result, shortDays[i])
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Converts an array if upstreamApi.ScheduleLocation types to an array of database.Stop types
|
||||
func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop {
|
||||
var stops []database.Stop
|
||||
|
||||
for _, loc := range inputStops {
|
||||
stop := database.Stop{
|
||||
PublicDeparture: &loc.PublicDepartureTime,
|
||||
WttDeparture: &loc.ScheduledDepartureTime,
|
||||
PublicArrival: &loc.PublicArrivalTime,
|
||||
WttArrival: &loc.ScheduledArrivalTime,
|
||||
IsPublic: loc.PublicDepartureTime != "" || loc.PublicArrivalTime != "",
|
||||
Tiploc: loc.Location.Tiploc.TiplocID,
|
||||
}
|
||||
|
||||
stops = append(stops, stop)
|
||||
}
|
||||
|
||||
return stops
|
||||
}
|
||||
31
src/vstp/subscribe.go
Normal file
31
src/vstp/subscribe.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package vstp
|
||||
|
||||
import (
|
||||
"git.fjla.uk/owlboard/mq-client/log"
|
||||
"git.fjla.uk/owlboard/mq-client/messaging"
|
||||
"github.com/go-stomp/stomp/v3"
|
||||
)
|
||||
|
||||
func Subscribe() {
|
||||
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
|
||||
if err != nil {
|
||||
log.Msg.Fatal("Unable to start subscription: " + err.Error())
|
||||
}
|
||||
log.Msg.Info("Subscription to VSTP topic successful, listening")
|
||||
|
||||
go func() {
|
||||
log.Msg.Debug("GOROUTINE: VSTP Message Handler Started")
|
||||
defer log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped")
|
||||
for {
|
||||
msg := <-sub.C
|
||||
if msg.Err != nil {
|
||||
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
|
||||
} else {
|
||||
log.Msg.Debug("STOMP Message Received")
|
||||
handle(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {}
|
||||
}
|
||||
Reference in New Issue
Block a user