Compare commits
5 Commits
9bd6bbde25
...
1231371bcf
Author | SHA1 | Date |
---|---|---|
Fred Boniface | 1231371bcf | |
Fred Boniface | b0f9f547dd | |
Fred Boniface | 5b9c444ac5 | |
Fred Boniface | f3745da86e | |
Fred Boniface | aef52be1e8 |
|
@ -12,7 +12,7 @@ import (
|
||||||
|
|
||||||
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
|
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
|
||||||
output := database.Service{
|
output := database.Service{
|
||||||
TransactionType: input.TransactionType,
|
//TransactionType: input.TransactionType,
|
||||||
StpIndicator: input.CifStpIndicator,
|
StpIndicator: input.CifStpIndicator,
|
||||||
Operator: input.AtocCode,
|
Operator: input.AtocCode,
|
||||||
TrainUid: input.CifTrainUid,
|
TrainUid: input.CifTrainUid,
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module git.fjla.uk/owlboard/timetable-mgr
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.fjla.uk/owlboard/go-types v1.1.0
|
git.fjla.uk/owlboard/go-types v1.1.5
|
||||||
github.com/go-stomp/stomp/v3 v3.1.0
|
github.com/go-stomp/stomp/v3 v3.1.0
|
||||||
go.mongodb.org/mongo-driver v1.15.0
|
go.mongodb.org/mongo-driver v1.15.0
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -1,5 +1,5 @@
|
||||||
git.fjla.uk/owlboard/go-types v1.1.0 h1:3o8My2O3KMOtSjXApYyI3VBS03PPdk+NGt7QonoFkl0=
|
git.fjla.uk/owlboard/go-types v1.1.5 h1:QSCmO0tL4PLGBhpjnvcwULyaTzJQbCD+YK7I3WUkb5k=
|
||||||
git.fjla.uk/owlboard/go-types v1.1.0/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
git.fjla.uk/owlboard/go-types v1.1.5/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/go-stomp/stomp/v3 v3.1.0 h1:JnvRJuua/fX2Lq5Ie5DXzrOL18dnzIUenCZXM6rr8/0=
|
github.com/go-stomp/stomp/v3 v3.1.0 h1:JnvRJuua/fX2Lq5Ie5DXzrOL18dnzIUenCZXM6rr8/0=
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package messaging
|
package messaging
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"github.com/go-stomp/stomp/v3"
|
"github.com/go-stomp/stomp/v3"
|
||||||
|
@ -17,7 +19,7 @@ func StompInit(cfg *helpers.Configuration) {
|
||||||
func dial(user, pass string) *stomp.Conn {
|
func dial(user, pass string) *stomp.Conn {
|
||||||
conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618",
|
conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618",
|
||||||
stomp.ConnOpt.Login(user, pass),
|
stomp.ConnOpt.Login(user, pass),
|
||||||
stomp.ConnOpt.HeartBeat(15000, 15000),
|
stomp.ConnOpt.HeartBeat(15*time.Second, 15*time.Second),
|
||||||
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
|
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
package vstp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||||
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Simple type conversion
|
||||||
|
func convertCifType(in *upstreamApi.VSTPSchedule) (error, *upstreamApi.JsonScheduleV1) {
|
||||||
|
if in == nil {
|
||||||
|
return errors.New("input is nil"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
out := &upstreamApi.JsonScheduleV1{
|
||||||
|
TransactionType: in.TransactionType,
|
||||||
|
CifBankHolidayRunning: in.CifBankHolidayRunning,
|
||||||
|
CifStpIndicator: in.CifStpIndicator,
|
||||||
|
CifTrainUid: in.CifTrainUid,
|
||||||
|
ApplicableTimetable: in.ApplicableTimetable,
|
||||||
|
ScheduleDaysRun: in.ScheduleDaysRun,
|
||||||
|
ScheduleStartDate: in.ScheduleStartDate,
|
||||||
|
ScheduleEndDate: in.ScheduleEndDate,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(in.ScheduleSegment) > 0 {
|
||||||
|
if len(in.ScheduleSegment) > 1 {
|
||||||
|
log.Warn("More than one element in schedule segment")
|
||||||
|
}
|
||||||
|
out.ScheduleSegment = convertSchedule(&in.ScheduleSegment[0])
|
||||||
|
return nil, out
|
||||||
|
} else {
|
||||||
|
log.Warn("VSTP Schedule Segment empty")
|
||||||
|
return errors.New("schedule segment empty"), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertSchedule(in *upstreamApi.VSTPScheduleSegment) upstreamApi.CifScheduleSegment {
|
||||||
|
out := upstreamApi.CifScheduleSegment{
|
||||||
|
SignallingId: in.SignallingId,
|
||||||
|
CifTrainCategory: in.CifTrainCategory,
|
||||||
|
CifHeadcode: in.CifHeadcode,
|
||||||
|
CifTrainServiceCode: in.CifTrainServiceCode,
|
||||||
|
CifBusinessSector: in.CifBusinessSector,
|
||||||
|
CifPowerType: in.CifPowerType,
|
||||||
|
CifTimingLoad: in.CifTimingLoad,
|
||||||
|
CifSpeed: in.CifSpeed,
|
||||||
|
CifOperatingCharacteristics: in.CifOperatingCharacteristics,
|
||||||
|
CifTrainClass: in.CifTrainClass,
|
||||||
|
CifSleepers: in.CifSleepers,
|
||||||
|
CifReservations: in.CifReservations,
|
||||||
|
CifCateringCode: in.CifCateringCode,
|
||||||
|
ScheduleLocation: *convertLocations(&in.ScheduleLocation),
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertLocations(in *[]upstreamApi.VSTPScheduleLocation) *[]upstreamApi.CifScheduleLocation {
|
||||||
|
if in == nil {
|
||||||
|
log.Error("Input is nil")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cifLocations := make([]upstreamApi.CifScheduleLocation, len(*in))
|
||||||
|
|
||||||
|
for i, loc := range *in {
|
||||||
|
cifLoc := upstreamApi.CifScheduleLocation{
|
||||||
|
TiplocCode: trim(loc.Location.Tiploc.TiplocId),
|
||||||
|
Arrival: convertTime(trim(loc.Arrival)),
|
||||||
|
PublicArrival: convertTime(trim(loc.PublicArrival)),
|
||||||
|
Departure: convertTime(trim(loc.Departure)),
|
||||||
|
PublicDeparture: convertTime(trim(loc.PublicDeparture)),
|
||||||
|
Pass: convertTime(trim(loc.Pass)),
|
||||||
|
Path: trim(loc.Path),
|
||||||
|
Platform: trim(loc.Platform),
|
||||||
|
EngineeringAllowance: trim(loc.EngineeringAllowance),
|
||||||
|
PathingAllowance: trim(loc.PathingAllowance),
|
||||||
|
PerformanceAllowance: trim(loc.PerformanceAllowance),
|
||||||
|
}
|
||||||
|
|
||||||
|
cifLocations[i] = cifLoc
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cifLocations
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertTime(in string) string {
|
||||||
|
if len(in) < 4 {
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
return in[:4]
|
||||||
|
}
|
||||||
|
|
||||||
|
func trim(s string) string {
|
||||||
|
return strings.TrimSpace(s)
|
||||||
|
}
|
|
@ -14,11 +14,16 @@ func handle(msg *stomp.Message) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
count++
|
count++
|
||||||
log.Info("Message received", zap.Uint64("total since startup", count))
|
log.Info("Message received", zap.Uint64("total since startup", count))
|
||||||
schedule, err := unmarshalData(string(msg.Body))
|
schedule, err := unmarshalData(msg.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error unmarshalling VSTP Message", zap.Error(err))
|
log.Error("Error unmarshalling VSTP Message", zap.Error(err))
|
||||||
}
|
}
|
||||||
err = processCifData(schedule)
|
err, convertedType := convertCifType(schedule)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error converting VSTP to CIF", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = processCifData(convertedType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error processing VSTP Schedule", zap.Error(err))
|
log.Error("Error processing VSTP Schedule", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,38 +1,64 @@
|
||||||
package vstp
|
package vstp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/messaging"
|
"git.fjla.uk/owlboard/timetable-mgr/messaging"
|
||||||
"github.com/go-stomp/stomp/v3"
|
"github.com/go-stomp/stomp/v3"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Subscribe() {
|
func Subscribe() {
|
||||||
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
|
var sub *stomp.Subscription
|
||||||
|
var err error
|
||||||
|
retryCount := 0
|
||||||
|
maxRetries := 5
|
||||||
|
|
||||||
|
for retryCount < maxRetries {
|
||||||
|
sub, err = messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unable to start subscription: " + err.Error())
|
log.Warn("Unable to start subscription", zap.Error(err))
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
retryCount++
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if sub == nil {
|
||||||
|
log.Fatal("Failed to subscribe to VSTP topic", zap.Int("attempts", maxRetries))
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Subscription to VSTP topic successful, listening")
|
log.Info("Subscription to VSTP topic successful, listening")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Debug("GOROUTINE: VSTP Message Handler Started")
|
for {
|
||||||
|
func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
|
log.Warn("VSTP Message Handler Stopped, waiting for recovery")
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
log.Info("VSTP Message handler started")
|
||||||
for {
|
for {
|
||||||
msg := <-sub.C
|
msg := <-sub.C
|
||||||
if msg.Err != nil {
|
if msg.Err != nil {
|
||||||
log.Error("STOMP Message Error: " + msg.Err.Error())
|
log.Error("STOMP Message Error", zap.Error(msg.Err))
|
||||||
} else {
|
} else {
|
||||||
|
if msg != nil {
|
||||||
log.Info("STOMP Message Received")
|
log.Info("STOMP Message Received")
|
||||||
|
fmt.Println(string(msg.Body))
|
||||||
handle(msg)
|
handle(msg)
|
||||||
|
} else {
|
||||||
|
log.Info("STOMP Message Empty")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
|
@ -7,10 +7,10 @@ import (
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
// Unmarshals the JSON data and returns the schedule data
|
||||||
func unmarshalData(jsonData string) (*upstreamApi.JsonScheduleV1, error) {
|
func unmarshalData(jsonData []byte) (*upstreamApi.VSTPSchedule, error) {
|
||||||
var schedule upstreamApi.MsgData
|
var schedule upstreamApi.MsgData
|
||||||
err := json.Unmarshal([]byte(jsonData), &schedule)
|
err := json.Unmarshal(jsonData, &schedule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to unmarshal message body: " + err.Error())
|
log.Error("Unable to unmarshal message body: " + err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue