Compare commits

..

5 Commits

8 changed files with 160 additions and 28 deletions

View File

@ -12,7 +12,7 @@ import (
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) { func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
output := database.Service{ output := database.Service{
TransactionType: input.TransactionType, //TransactionType: input.TransactionType,
StpIndicator: input.CifStpIndicator, StpIndicator: input.CifStpIndicator,
Operator: input.AtocCode, Operator: input.AtocCode,
TrainUid: input.CifTrainUid, TrainUid: input.CifTrainUid,

2
go.mod
View File

@ -3,7 +3,7 @@ module git.fjla.uk/owlboard/timetable-mgr
go 1.21 go 1.21
require ( require (
git.fjla.uk/owlboard/go-types v1.1.0 git.fjla.uk/owlboard/go-types v1.1.5
github.com/go-stomp/stomp/v3 v3.1.0 github.com/go-stomp/stomp/v3 v3.1.0
go.mongodb.org/mongo-driver v1.15.0 go.mongodb.org/mongo-driver v1.15.0
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0

4
go.sum
View File

@ -1,5 +1,5 @@
git.fjla.uk/owlboard/go-types v1.1.0 h1:3o8My2O3KMOtSjXApYyI3VBS03PPdk+NGt7QonoFkl0= git.fjla.uk/owlboard/go-types v1.1.5 h1:QSCmO0tL4PLGBhpjnvcwULyaTzJQbCD+YK7I3WUkb5k=
git.fjla.uk/owlboard/go-types v1.1.0/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY= git.fjla.uk/owlboard/go-types v1.1.5/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-stomp/stomp/v3 v3.1.0 h1:JnvRJuua/fX2Lq5Ie5DXzrOL18dnzIUenCZXM6rr8/0= github.com/go-stomp/stomp/v3 v3.1.0 h1:JnvRJuua/fX2Lq5Ie5DXzrOL18dnzIUenCZXM6rr8/0=

View File

@ -1,6 +1,8 @@
package messaging package messaging
import ( import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/helpers" "git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"github.com/go-stomp/stomp/v3" "github.com/go-stomp/stomp/v3"
@ -17,7 +19,7 @@ func StompInit(cfg *helpers.Configuration) {
func dial(user, pass string) *stomp.Conn { func dial(user, pass string) *stomp.Conn {
conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618", conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618",
stomp.ConnOpt.Login(user, pass), stomp.ConnOpt.Login(user, pass),
stomp.ConnOpt.HeartBeat(15000, 15000), stomp.ConnOpt.HeartBeat(15*time.Second, 15*time.Second),
stomp.ConnOpt.Header("client-id", user+"-mq-client"), stomp.ConnOpt.Header("client-id", user+"-mq-client"),
) )
if err != nil { if err != nil {

99
vstp/convert.go Normal file
View File

@ -0,0 +1,99 @@
package vstp
import (
"errors"
"strings"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log"
)
// Simple type conversion
func convertCifType(in *upstreamApi.VSTPSchedule) (error, *upstreamApi.JsonScheduleV1) {
if in == nil {
return errors.New("input is nil"), nil
}
out := &upstreamApi.JsonScheduleV1{
TransactionType: in.TransactionType,
CifBankHolidayRunning: in.CifBankHolidayRunning,
CifStpIndicator: in.CifStpIndicator,
CifTrainUid: in.CifTrainUid,
ApplicableTimetable: in.ApplicableTimetable,
ScheduleDaysRun: in.ScheduleDaysRun,
ScheduleStartDate: in.ScheduleStartDate,
ScheduleEndDate: in.ScheduleEndDate,
}
if len(in.ScheduleSegment) > 0 {
if len(in.ScheduleSegment) > 1 {
log.Warn("More than one element in schedule segment")
}
out.ScheduleSegment = convertSchedule(&in.ScheduleSegment[0])
return nil, out
} else {
log.Warn("VSTP Schedule Segment empty")
return errors.New("schedule segment empty"), nil
}
}
func convertSchedule(in *upstreamApi.VSTPScheduleSegment) upstreamApi.CifScheduleSegment {
out := upstreamApi.CifScheduleSegment{
SignallingId: in.SignallingId,
CifTrainCategory: in.CifTrainCategory,
CifHeadcode: in.CifHeadcode,
CifTrainServiceCode: in.CifTrainServiceCode,
CifBusinessSector: in.CifBusinessSector,
CifPowerType: in.CifPowerType,
CifTimingLoad: in.CifTimingLoad,
CifSpeed: in.CifSpeed,
CifOperatingCharacteristics: in.CifOperatingCharacteristics,
CifTrainClass: in.CifTrainClass,
CifSleepers: in.CifSleepers,
CifReservations: in.CifReservations,
CifCateringCode: in.CifCateringCode,
ScheduleLocation: *convertLocations(&in.ScheduleLocation),
}
return out
}
func convertLocations(in *[]upstreamApi.VSTPScheduleLocation) *[]upstreamApi.CifScheduleLocation {
if in == nil {
log.Error("Input is nil")
return nil
}
cifLocations := make([]upstreamApi.CifScheduleLocation, len(*in))
for i, loc := range *in {
cifLoc := upstreamApi.CifScheduleLocation{
TiplocCode: trim(loc.Location.Tiploc.TiplocId),
Arrival: convertTime(trim(loc.Arrival)),
PublicArrival: convertTime(trim(loc.PublicArrival)),
Departure: convertTime(trim(loc.Departure)),
PublicDeparture: convertTime(trim(loc.PublicDeparture)),
Pass: convertTime(trim(loc.Pass)),
Path: trim(loc.Path),
Platform: trim(loc.Platform),
EngineeringAllowance: trim(loc.EngineeringAllowance),
PathingAllowance: trim(loc.PathingAllowance),
PerformanceAllowance: trim(loc.PerformanceAllowance),
}
cifLocations[i] = cifLoc
}
return &cifLocations
}
func convertTime(in string) string {
if len(in) < 4 {
return in
}
return in[:4]
}
func trim(s string) string {
return strings.TrimSpace(s)
}

View File

@ -14,11 +14,16 @@ func handle(msg *stomp.Message) {
start := time.Now() start := time.Now()
count++ count++
log.Info("Message received", zap.Uint64("total since startup", count)) log.Info("Message received", zap.Uint64("total since startup", count))
schedule, err := unmarshalData(string(msg.Body)) schedule, err := unmarshalData(msg.Body)
if err != nil { if err != nil {
log.Error("Error unmarshalling VSTP Message", zap.Error(err)) log.Error("Error unmarshalling VSTP Message", zap.Error(err))
} }
err = processCifData(schedule) err, convertedType := convertCifType(schedule)
if err != nil {
log.Error("Error converting VSTP to CIF", zap.Error(err))
return
}
err = processCifData(convertedType)
if err != nil { if err != nil {
log.Error("Error processing VSTP Schedule", zap.Error(err)) log.Error("Error processing VSTP Schedule", zap.Error(err))
} }

View File

@ -1,37 +1,63 @@
package vstp package vstp
import ( import (
"fmt"
"time" "time"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging" "git.fjla.uk/owlboard/timetable-mgr/messaging"
"github.com/go-stomp/stomp/v3" "github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
) )
func Subscribe() { func Subscribe() {
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) var sub *stomp.Subscription
if err != nil { var err error
log.Fatal("Unable to start subscription: " + err.Error()) retryCount := 0
maxRetries := 5
for retryCount < maxRetries {
sub, err = messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Warn("Unable to start subscription", zap.Error(err))
time.Sleep(10 * time.Second)
retryCount++
continue
}
break
} }
if sub == nil {
log.Fatal("Failed to subscribe to VSTP topic", zap.Int("attempts", maxRetries))
}
log.Info("Subscription to VSTP topic successful, listening") log.Info("Subscription to VSTP topic successful, listening")
go func() { go func() {
log.Debug("GOROUTINE: VSTP Message Handler Started")
defer func() {
if r := recover(); r != nil {
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
time.Sleep(time.Second * 10)
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
}
}()
for { for {
msg := <-sub.C func() {
if msg.Err != nil { defer func() {
log.Error("STOMP Message Error: " + msg.Err.Error()) if r := recover(); r != nil {
} else { log.Warn("VSTP Message Handler Stopped, waiting for recovery")
log.Info("STOMP Message Received") time.Sleep(time.Second * 10)
handle(msg) }
} }()
log.Info("VSTP Message handler started")
for {
msg := <-sub.C
if msg.Err != nil {
log.Error("STOMP Message Error", zap.Error(msg.Err))
} else {
if msg != nil {
log.Info("STOMP Message Received")
fmt.Println(string(msg.Body))
handle(msg)
} else {
log.Info("STOMP Message Empty")
}
}
}
}()
} }
}() }()

View File

@ -7,10 +7,10 @@ import (
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
) )
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct // Unmarshals the JSON data and returns the schedule data
func unmarshalData(jsonData string) (*upstreamApi.JsonScheduleV1, error) { func unmarshalData(jsonData []byte) (*upstreamApi.VSTPSchedule, error) {
var schedule upstreamApi.MsgData var schedule upstreamApi.MsgData
err := json.Unmarshal([]byte(jsonData), &schedule) err := json.Unmarshal(jsonData, &schedule)
if err != nil { if err != nil {
log.Error("Unable to unmarshal message body: " + err.Error()) log.Error("Unable to unmarshal message body: " + err.Error())
return nil, err return nil, err