Further work on the unmarshaling process
This commit is contained in:
parent
a84b9fc46a
commit
5ee737f3f8
@ -3,7 +3,7 @@ module git.fjla.uk/owlboard/mq-client
|
|||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b
|
git.fjla.uk/owlboard/go-types v0.0.0-20230720095555-ad24002a6acc
|
||||||
github.com/go-stomp/stomp/v3 v3.0.5
|
github.com/go-stomp/stomp/v3 v3.0.5
|
||||||
go.mongodb.org/mongo-driver v1.12.0
|
go.mongodb.org/mongo-driver v1.12.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
|
@ -10,6 +10,8 @@ git.fjla.uk/owlboard/go-types v0.0.0-20230719185315-e492e8ad6ae7 h1:g+ovnwGpx24K
|
|||||||
git.fjla.uk/owlboard/go-types v0.0.0-20230719185315-e492e8ad6ae7/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
git.fjla.uk/owlboard/go-types v0.0.0-20230719185315-e492e8ad6ae7/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
||||||
git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b h1:QKHrP3kjvypPO/YVrHdTNV68w6I7MNYyEy5zcD2Jk/A=
|
git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b h1:QKHrP3kjvypPO/YVrHdTNV68w6I7MNYyEy5zcD2Jk/A=
|
||||||
git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
git.fjla.uk/owlboard/go-types v0.0.0-20230719192727-845a41f8572b/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
||||||
|
git.fjla.uk/owlboard/go-types v0.0.0-20230720095555-ad24002a6acc h1:7s+svu/FOU/qwLLThtwvOV2CqID8sZmVyd15UldgBR8=
|
||||||
|
git.fjla.uk/owlboard/go-types v0.0.0-20230720095555-ad24002a6acc/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
|
||||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
//"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||||
"git.fjla.uk/owlboard/mq-client/log"
|
"git.fjla.uk/owlboard/mq-client/log"
|
||||||
"github.com/go-stomp/stomp/v3"
|
"github.com/go-stomp/stomp/v3"
|
||||||
)
|
)
|
||||||
@ -16,13 +16,13 @@ var count uint64 = 0
|
|||||||
func handle(msg *stomp.Message) {
|
func handle(msg *stomp.Message) {
|
||||||
count++
|
count++
|
||||||
log.Msg.Info("Message count: " + fmt.Sprint(count))
|
log.Msg.Info("Message count: " + fmt.Sprint(count))
|
||||||
schedule := unmarshalData(msg.Body)
|
schedule := unmarshalData(string(msg.Body))
|
||||||
saveToFile(schedule)
|
saveToFile(schedule, "transformed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func saveToFile(msg database.Service) {
|
func saveToFile(msg any, suffix string) {
|
||||||
timestamp := time.Now().Format("2006-01-02T15:04:05")
|
timestamp := time.Now().Format("2006-01-02T15:04:05")
|
||||||
path := fmt.Sprintf("message-logs/%s-msg.json", timestamp)
|
path := fmt.Sprintf("message-logs/%s-%s.json", timestamp, suffix)
|
||||||
|
|
||||||
prettyJSON, err := json.MarshalIndent(msg, "", " ")
|
prettyJSON, err := json.MarshalIndent(msg, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2,6 +2,7 @@ package vstp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -12,14 +13,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
||||||
func unmarshalData(jsonData []byte) database.Service {
|
func unmarshalData(jsonData string) database.Service {
|
||||||
var schedule upstreamApi.Schedule
|
log.Msg.Debug("Unmarshalling message body")
|
||||||
err := json.Unmarshal(jsonData, &schedule)
|
fmt.Println(jsonData)
|
||||||
|
log.Msg.Debug("Converting to byte array")
|
||||||
|
jsonDataBytes := []byte(jsonData)
|
||||||
|
fmt.Println(jsonDataBytes)
|
||||||
|
var schedule upstreamApi.VstpMsg
|
||||||
|
err := json.Unmarshal(jsonDataBytes, &schedule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
|
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
|
||||||
//return err
|
//return err
|
||||||
}
|
}
|
||||||
return formatData(schedule)
|
log.Msg.Debug("Unmarshalling Complete")
|
||||||
|
fmt.Println(schedule)
|
||||||
|
saveToFile(schedule, "unmarshalled")
|
||||||
|
return formatData(schedule.CIFMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transforms the upstreamApi.Schedule type into a database.Service type
|
// Transforms the upstreamApi.Schedule type into a database.Service type
|
||||||
@ -98,10 +107,10 @@ func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop {
|
|||||||
|
|
||||||
for _, loc := range inputStops {
|
for _, loc := range inputStops {
|
||||||
stop := database.Stop{
|
stop := database.Stop{
|
||||||
PublicDeparture: &loc.PublicDepartureTime,
|
PublicDeparture: getStringPointer(loc.PublicDepartureTime),
|
||||||
WttDeparture: &loc.ScheduledDepartureTime,
|
WttDeparture: getStringPointer(loc.ScheduledDepartureTime),
|
||||||
PublicArrival: &loc.PublicArrivalTime,
|
PublicArrival: getStringPointer(loc.PublicArrivalTime),
|
||||||
WttArrival: &loc.ScheduledArrivalTime,
|
WttArrival: getStringPointer(loc.ScheduledArrivalTime),
|
||||||
IsPublic: loc.PublicDepartureTime != "" || loc.PublicArrivalTime != "",
|
IsPublic: loc.PublicDepartureTime != "" || loc.PublicArrivalTime != "",
|
||||||
Tiploc: loc.Location.Tiploc.TiplocID,
|
Tiploc: loc.Location.Tiploc.TiplocID,
|
||||||
}
|
}
|
||||||
@ -111,3 +120,10 @@ func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop {
|
|||||||
|
|
||||||
return stops
|
return stops
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getStringPointer(s string) *string {
|
||||||
|
if s == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &s
|
||||||
|
}
|
||||||
|
@ -22,6 +22,7 @@ func Subscribe() {
|
|||||||
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
|
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Msg.Debug("STOMP Message Received")
|
log.Msg.Debug("STOMP Message Received")
|
||||||
|
saveToFile(string(msg.Body), "msgBody")
|
||||||
handle(msg)
|
handle(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user