Compare commits
No commits in common. "790d293bc48ef7efa28f2b30f054a5c7f9d35aec" and "edbfbac23c1b7c30aacd21a225717cc69da3ced7" have entirely different histories.
790d293bc4
...
edbfbac23c
12
cif/parse.go
12
cif/parse.go
|
@ -11,16 +11,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Accepts the CIF data as a stream and outputs parsed data
|
// Accepts the CIF data as a stream and outputs parsed data
|
||||||
func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) {
|
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
defer dataStream.Close()
|
defer dataStream.Close()
|
||||||
log.Debug("Starting CIF Datastream parsing")
|
log.Debug("Starting CIF Datastream parsing")
|
||||||
if dataStream == nil {
|
if dataStream == nil {
|
||||||
return nil, errors.New("unable to parse nil pointer")
|
return nil, errors.New("unable to parse nil pointer")
|
||||||
}
|
}
|
||||||
|
|
||||||
var parsed ParsedData
|
var parsed parsedData
|
||||||
parsed.Assoc = make([]upstreamApi.JsonAssociationV1, 0)
|
parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0)
|
||||||
parsed.Sched = make([]upstreamApi.JsonScheduleV1, 0)
|
parsed.sched = make([]upstreamApi.JsonScheduleV1, 0)
|
||||||
|
|
||||||
// Create JSON Decoder
|
// Create JSON Decoder
|
||||||
decoder := json.NewDecoder(dataStream)
|
decoder := json.NewDecoder(dataStream)
|
||||||
|
@ -42,7 +42,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) {
|
||||||
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
|
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parsed.Header = timetable
|
parsed.header = timetable
|
||||||
case "TiplocV1":
|
case "TiplocV1":
|
||||||
// This data is not used and is sourced from CORPUS
|
// This data is not used and is sourced from CORPUS
|
||||||
continue
|
continue
|
||||||
|
@ -56,7 +56,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) {
|
||||||
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
|
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parsed.Sched = append(parsed.Sched, schedule)
|
parsed.sched = append(parsed.sched, schedule)
|
||||||
case "EOF":
|
case "EOF":
|
||||||
log.Debug("Reached EOF")
|
log.Debug("Reached EOF")
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -9,21 +9,21 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Processes parsed CIF data and applies the data to the database
|
// Processes parsed CIF data and applies the data to the database
|
||||||
func ProcessParsedCif(data *ParsedData) error {
|
func processParsedCif(data *parsedData) error {
|
||||||
log.Debug("Starting CIF Processing")
|
log.Debug("Starting CIF Processing")
|
||||||
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched)))
|
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
|
||||||
|
|
||||||
// Batch size for processing
|
// Batch size for processing
|
||||||
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
||||||
|
|
||||||
// Process deletions in batches
|
// Process deletions in batches
|
||||||
for i := 0; i < len(data.Sched); i += batchSize {
|
for i := 0; i < len(data.sched); i += batchSize {
|
||||||
end := i + batchSize
|
end := i + batchSize
|
||||||
if end > len(data.Sched) {
|
if end > len(data.sched) {
|
||||||
end = len(data.Sched)
|
end = len(data.sched)
|
||||||
}
|
}
|
||||||
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
||||||
for _, item := range data.Sched[i:end] {
|
for _, item := range data.sched[i:end] {
|
||||||
if item.TransactionType == "Delete" {
|
if item.TransactionType == "Delete" {
|
||||||
deleteItem := item
|
deleteItem := item
|
||||||
deleteBatch = append(deleteBatch, &deleteItem)
|
deleteBatch = append(deleteBatch, &deleteItem)
|
||||||
|
@ -39,13 +39,13 @@ func ProcessParsedCif(data *ParsedData) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process creations in batches
|
// Process creations in batches
|
||||||
for i := 0; i < len(data.Sched); i += batchSize {
|
for i := 0; i < len(data.sched); i += batchSize {
|
||||||
end := i + batchSize
|
end := i + batchSize
|
||||||
if end > len(data.Sched) {
|
if end > len(data.sched) {
|
||||||
end = len(data.Sched)
|
end = len(data.sched)
|
||||||
}
|
}
|
||||||
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
|
||||||
for _, item := range data.Sched[i:end] {
|
for _, item := range data.sched[i:end] {
|
||||||
if item.TransactionType == "Create" {
|
if item.TransactionType == "Create" {
|
||||||
createItem := item
|
createItem := item
|
||||||
createBatch = append(createBatch, &createItem)
|
createBatch = append(createBatch, &createItem)
|
||||||
|
|
|
@ -6,8 +6,8 @@ import "git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||||
// database or external API resources should be defined in git.fjla.uk/owlboard/go-types
|
// database or external API resources should be defined in git.fjla.uk/owlboard/go-types
|
||||||
|
|
||||||
// Holds parsed data for processing
|
// Holds parsed data for processing
|
||||||
type ParsedData struct {
|
type parsedData struct {
|
||||||
Header upstreamApi.JsonTimetableV1
|
header upstreamApi.JsonTimetableV1
|
||||||
Assoc []upstreamApi.JsonAssociationV1
|
assoc []upstreamApi.JsonAssociationV1
|
||||||
Sched []upstreamApi.JsonScheduleV1
|
sched []upstreamApi.JsonScheduleV1
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,6 @@ import (
|
||||||
|
|
||||||
// Replaces all existing CIF Data with a new download
|
// Replaces all existing CIF Data with a new download
|
||||||
func runCifFullDownload(cfg *helpers.Configuration) error {
|
func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
preTime := time.Now()
|
|
||||||
log.Info("Downloading all CIF Data")
|
log.Info("Downloading all CIF Data")
|
||||||
|
|
||||||
// Download CIF Data file
|
// Download CIF Data file
|
||||||
|
@ -37,12 +36,12 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
|
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
|
||||||
|
|
||||||
// Process CIF file
|
// Process CIF file
|
||||||
err = ProcessParsedCif(parsed)
|
err = processParsedCif(parsed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error processing CIF data", zap.Error(err))
|
log.Error("Error processing CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
newMeta := generateMetadata(&parsed.Header)
|
newMeta := generateMetadata(&parsed.header)
|
||||||
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
|
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("CIF Data updated, but metadata write failed")
|
log.Warn("CIF Data updated, but metadata write failed")
|
||||||
|
@ -61,9 +60,6 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
log.Info("Out of date services removed", zap.Int64("removal count", count))
|
log.Info("Out of date services removed", zap.Int64("removal count", count))
|
||||||
}
|
}
|
||||||
|
|
||||||
postTime := time.Now()
|
|
||||||
updateDuration := postTime.Sub(preTime)
|
|
||||||
log.Info("Execution time", zap.Duration("duration", updateDuration))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +88,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
|
||||||
// Check CIF Metadata
|
// Check CIF Metadata
|
||||||
log.Debug("Starting metadata checks")
|
log.Debug("Starting metadata checks")
|
||||||
|
|
||||||
reason, update := checkMetadata(metadata, &parsed.Header)
|
reason, update := checkMetadata(metadata, &parsed.header)
|
||||||
if !update {
|
if !update {
|
||||||
log.Warn("Update file not processed", zap.String("reason", reason))
|
log.Warn("Update file not processed", zap.String("reason", reason))
|
||||||
continue
|
continue
|
||||||
|
@ -101,12 +97,12 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
|
||||||
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
|
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
|
||||||
|
|
||||||
// Process CIF file
|
// Process CIF file
|
||||||
err = ProcessParsedCif(parsed)
|
err = processParsedCif(parsed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error processing CIF data", zap.Error(err))
|
log.Error("Error processing CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata = generateMetadata(&parsed.Header)
|
metadata = generateMetadata(&parsed.header)
|
||||||
parsed = nil
|
parsed = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,10 @@ import (
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const timetableCollection string = "timetable"
|
||||||
|
|
||||||
// Pushes the current version of this application to the data base 'versions' collection.
|
// Pushes the current version of this application to the data base 'versions' collection.
|
||||||
|
// Currently uses the old name of mq-client
|
||||||
func PushVersionToDb() {
|
func PushVersionToDb() {
|
||||||
version := database.Version{
|
version := database.Version{
|
||||||
Target: "timetable-mgr",
|
Target: "timetable-mgr",
|
||||||
|
@ -30,3 +33,30 @@ func PushVersionToDb() {
|
||||||
log.Debug("Version up to date in Database")
|
log.Debug("Version up to date in Database")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Puts one item of the type `database.Service` to the database, used by the VSTP package which receives services one at a time
|
||||||
|
func PutOneService(data database.Service) bool {
|
||||||
|
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
_, err := coll.InsertOne(context.TODO(), data)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to insert to database: " + err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes one service from the database.
|
||||||
|
func DeleteOneService(data database.DeleteQuery) bool {
|
||||||
|
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
var filter = bson.D{
|
||||||
|
{Key: "trainUid", Value: data.TrainUid},
|
||||||
|
{Key: "stpIndicator", Value: data.StpIndicator},
|
||||||
|
{Key: "scheduleStartDate", Value: data.ScheduleStartDate},
|
||||||
|
}
|
||||||
|
_, err := coll.DeleteOne(context.TODO(), filter)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to delete service: " + err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
|
@ -27,8 +27,8 @@ type CifMetadata struct {
|
||||||
|
|
||||||
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
|
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
|
||||||
func GetCifMetadata() (*CifMetadata, error) {
|
func GetCifMetadata() (*CifMetadata, error) {
|
||||||
database := MongoClient.Database(DatabaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection(MetaCollection)
|
collection := database.Collection(metaCollection)
|
||||||
filter := bson.M{"type": Doctype}
|
filter := bson.M{"type": Doctype}
|
||||||
var result CifMetadata
|
var result CifMetadata
|
||||||
err := collection.FindOne(context.Background(), filter).Decode(&result)
|
err := collection.FindOne(context.Background(), filter).Decode(&result)
|
||||||
|
@ -46,8 +46,8 @@ func GetCifMetadata() (*CifMetadata, error) {
|
||||||
|
|
||||||
// Uses upsert to Insert/Update the CifMetadata in the database
|
// Uses upsert to Insert/Update the CifMetadata in the database
|
||||||
func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
|
func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
|
||||||
database := MongoClient.Database(DatabaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection(MetaCollection)
|
collection := database.Collection(metaCollection)
|
||||||
options := options.Update().SetUpsert(true)
|
options := options.Update().SetUpsert(true)
|
||||||
filter := bson.M{"type": Doctype}
|
filter := bson.M{"type": Doctype}
|
||||||
update := bson.M{
|
update := bson.M{
|
||||||
|
@ -86,7 +86,7 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||||
log.Debug("Running deletions against database", zap.Int("count", len(deletions)))
|
log.Debug("Running deletions against database", zap.Int("count", len(deletions)))
|
||||||
|
|
||||||
// Prepare deletion tasks
|
// Prepare deletion tasks
|
||||||
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
||||||
|
|
||||||
for _, deleteQuery := range deletions {
|
for _, deleteQuery := range deletions {
|
||||||
|
@ -117,7 +117,7 @@ func CreateCifEntries(schedules []database.Service) error {
|
||||||
}
|
}
|
||||||
log.Debug("Running creations against database", zap.Int("count", len(schedules)))
|
log.Debug("Running creations against database", zap.Int("count", len(schedules)))
|
||||||
|
|
||||||
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
|
||||||
models := make([]mongo.WriteModel, 0, len(schedules))
|
models := make([]mongo.WriteModel, 0, len(schedules))
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) {
|
||||||
// Define filter
|
// Define filter
|
||||||
filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}}
|
filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}}
|
||||||
|
|
||||||
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
|
||||||
res, err := collection.DeleteMany(context.Background(), filter)
|
res, err := collection.DeleteMany(context.Background(), filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
// CAUTION: Drops the collection named in collectionName
|
// CAUTION: Drops the collection named in collectionName
|
||||||
func DropCollection(collectionName string) error {
|
func DropCollection(collectionName string) error {
|
||||||
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
|
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
|
||||||
database := MongoClient.Database(DatabaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection(collectionName)
|
collection := database.Collection(collectionName)
|
||||||
|
|
||||||
err := collection.Drop(context.Background())
|
err := collection.Drop(context.Background())
|
||||||
|
@ -27,8 +27,8 @@ func DropCollection(collectionName string) error {
|
||||||
|
|
||||||
// Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection
|
// Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection
|
||||||
func CheckUpdateTime(collectionName string) (int64, error) {
|
func CheckUpdateTime(collectionName string) (int64, error) {
|
||||||
database := MongoClient.Database(DatabaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection(MetaCollection)
|
collection := database.Collection(metaCollection)
|
||||||
filter := bson.D{
|
filter := bson.D{
|
||||||
{Key: "target", Value: collectionName},
|
{Key: "target", Value: collectionName},
|
||||||
{Key: "type", Value: "collection"},
|
{Key: "type", Value: "collection"},
|
||||||
|
@ -46,7 +46,7 @@ func CheckUpdateTime(collectionName string) (int64, error) {
|
||||||
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
||||||
func SetUpdateTime(collectionName string) error {
|
func SetUpdateTime(collectionName string) error {
|
||||||
log.Info("Setting update time", zap.String("collection", collectionName))
|
log.Info("Setting update time", zap.String("collection", collectionName))
|
||||||
database := MongoClient.Database(DatabaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection("meta")
|
collection := database.Collection("meta")
|
||||||
options := options.Update().SetUpsert(true)
|
options := options.Update().SetUpsert(true)
|
||||||
updateTime := time.Now().Unix()
|
updateTime := time.Now().Unix()
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package dbAccess
|
package dbAccess
|
||||||
|
|
||||||
const DatabaseName string = "owlboard"
|
const databaseName string = "owlboard"
|
||||||
const CorpusCollection string = "corpus"
|
const CorpusCollection string = "corpus"
|
||||||
const StationsCollection string = "stations"
|
const StationsCollection string = "stations"
|
||||||
const MetaCollection string = "meta"
|
const metaCollection string = "meta"
|
||||||
const TimetableCollection string = "timetable"
|
const TimetableCollection string = "timetable"
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
// Puts an array of Corpus Documents into the database
|
// Puts an array of Corpus Documents into the database
|
||||||
func PutManyCorpus(corpusData *[]database.CorpusEntry) error {
|
func PutManyCorpus(corpusData *[]database.CorpusEntry) error {
|
||||||
collection := MongoClient.Database(DatabaseName).Collection(CorpusCollection)
|
collection := MongoClient.Database(databaseName).Collection(CorpusCollection)
|
||||||
|
|
||||||
documents := convertCorpusToInterfaceSlice(corpusData)
|
documents := convertCorpusToInterfaceSlice(corpusData)
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ func PutManyCorpus(corpusData *[]database.CorpusEntry) error {
|
||||||
|
|
||||||
// Puts an array of Stations documents into the database
|
// Puts an array of Stations documents into the database
|
||||||
func PutManyStations(stationsData *[]database.StationEntry) error {
|
func PutManyStations(stationsData *[]database.StationEntry) error {
|
||||||
collection := MongoClient.Database(DatabaseName).Collection(StationsCollection)
|
collection := MongoClient.Database(databaseName).Collection(StationsCollection)
|
||||||
|
|
||||||
documents := convertStationsToInterfaceSlice(stationsData)
|
documents := convertStationsToInterfaceSlice(stationsData)
|
||||||
|
|
||||||
|
|
7
main.go
7
main.go
|
@ -74,9 +74,10 @@ func handleSignals(cfg *helpers.Configuration, stop chan<- struct{}) {
|
||||||
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
|
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
|
||||||
log.Debug("Cleaning up open connections")
|
log.Debug("Cleaning up open connections")
|
||||||
if cfg.VstpOn {
|
if cfg.VstpOn {
|
||||||
log.Info("Closing STOMP Client")
|
if messaging.Client != nil {
|
||||||
messaging.Disconnect(messaging.Client)
|
log.Info("Closing STOMP Client")
|
||||||
|
messaging.Disconnect(messaging.Client)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if dbAccess.MongoClient != nil {
|
if dbAccess.MongoClient != nil {
|
||||||
log.Info("Closing MongoDB Client")
|
log.Info("Closing MongoDB Client")
|
||||||
|
|
|
@ -22,7 +22,7 @@ func dial(user, pass string) *stomp.Conn {
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
|
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
|
||||||
conn.Disconnect()
|
conn.MustDisconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Initialised STOMP Client")
|
log.Info("Initialised STOMP Client")
|
||||||
|
@ -42,11 +42,6 @@ func Disconnect(conn *stomp.Conn) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Error("STOMP Disconnect failed, next connection attempt may fail")
|
log.Error("STOMP Disconnect failed, next connection attempt may fail")
|
||||||
err := Client.Disconnect()
|
|
||||||
if err != nil {
|
|
||||||
Client.MustDisconnect()
|
|
||||||
log.Warn("STOMP Disconnect failed, forced disconnection")
|
|
||||||
}
|
|
||||||
log.Info("STOMP Client disconnected")
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register against the MQ Server and log each message for testing purposes
|
||||||
|
|
|
@ -4,41 +4,52 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/cif"
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||||
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Converts to the correct struct for database insertion, then processes accordingly
|
// Decide, based on the DB Formatted message type, what action needs taking
|
||||||
func processCifData(s *upstreamApi.JsonScheduleV1) error {
|
// then either insert, or delete from the database as required
|
||||||
|
func processEntryType(entry database.Service) {
|
||||||
|
|
||||||
if s.TransactionType == "Create" {
|
switch entry.TransactionType {
|
||||||
service, err := cif.ConvertServiceType(s, true)
|
case "Create":
|
||||||
if err != nil {
|
createEntry(entry)
|
||||||
return err
|
case "Update":
|
||||||
}
|
updateEntry(entry)
|
||||||
// Create slice as required by CreateCifEntries()
|
case "Delete":
|
||||||
services := []database.Service{*service}
|
deleteEntry(entry)
|
||||||
err = dbAccess.CreateCifEntries(services)
|
default:
|
||||||
if err != nil {
|
log.Warn("Unknown transaction type: " + entry.TransactionType)
|
||||||
return err
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
func createEntry(entry database.Service) {
|
||||||
} else if s.TransactionType == "Delete" {
|
log.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
||||||
query := database.DeleteQuery{
|
status := dbAccess.PutOneService(entry)
|
||||||
TrainUid: s.CifTrainUid,
|
if status {
|
||||||
ScheduleStartDate: cif.ParseCifDate(&s.ScheduleStartDate, "start"),
|
log.Info("Database entry created")
|
||||||
StpIndicator: s.CifStpIndicator,
|
} else {
|
||||||
}
|
log.Error("Database entry failed, skipped service")
|
||||||
// Create slice as required by DeleteCifEntries()
|
}
|
||||||
queries := []database.DeleteQuery{query}
|
}
|
||||||
err := dbAccess.DeleteCifEntries(queries)
|
|
||||||
if err != nil {
|
func updateEntry(entry database.Service) {
|
||||||
return err
|
log.Warn("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
} else {
|
func deleteEntry(entry database.Service) {
|
||||||
return fmt.Errorf("unknown transaction type: %s", s.TransactionType)
|
log.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
|
||||||
|
var deletionQuery = database.DeleteQuery{
|
||||||
|
TrainUid: entry.TrainUid,
|
||||||
|
ScheduleStartDate: entry.ScheduleStartDate,
|
||||||
|
StpIndicator: entry.StpIndicator,
|
||||||
|
}
|
||||||
|
status := dbAccess.DeleteOneService(deletionQuery)
|
||||||
|
if status {
|
||||||
|
log.Info("Database entry deleted")
|
||||||
|
} else {
|
||||||
|
log.Error("Database deletion failed, skipped deletion")
|
||||||
|
fmt.Printf("%+v\n", deletionQuery)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package vstp
|
package vstp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"github.com/go-stomp/stomp/v3"
|
"github.com/go-stomp/stomp/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -11,18 +9,8 @@ import (
|
||||||
var count uint64 = 0
|
var count uint64 = 0
|
||||||
|
|
||||||
func handle(msg *stomp.Message) {
|
func handle(msg *stomp.Message) {
|
||||||
start := time.Now()
|
|
||||||
count++
|
count++
|
||||||
log.Info("Message received", zap.Uint64("total since startup", count))
|
log.Info("Message received", zap.Uint64("total since startup", count))
|
||||||
schedule, err := unmarshalData(string(msg.Body))
|
schedule := unmarshalData(string(msg.Body))
|
||||||
if err != nil {
|
processEntryType(schedule)
|
||||||
log.Error("Error unmarshalling VSTP Message", zap.Error(err))
|
|
||||||
}
|
|
||||||
err = processCifData(schedule)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error processing VSTP Schedule", zap.Error(err))
|
|
||||||
}
|
|
||||||
end := time.Now()
|
|
||||||
duration := start.Sub(end)
|
|
||||||
log.Info("Message processed", zap.Duration("processing-time", duration))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
package vstp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||||
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||||
|
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||||
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
||||||
|
func unmarshalData(jsonData string) database.Service {
|
||||||
|
var schedule upstreamApi.MsgData
|
||||||
|
err := json.Unmarshal([]byte(jsonData), &schedule)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to unmarshal message body: " + err.Error())
|
||||||
|
//return err
|
||||||
|
}
|
||||||
|
log.Debug("Unmarshalling Complete")
|
||||||
|
|
||||||
|
if schedule.Data.CIFMsg.ScheduleSegment == nil {
|
||||||
|
log.Warn("ScheduleSegment is nil")
|
||||||
|
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
|
||||||
|
log.Warn("ScheduleSegment is empty")
|
||||||
|
}
|
||||||
|
return formatData(&schedule.Data.CIFMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transforms the upstreamApi.Schedule type into a database.Service type
|
||||||
|
func formatData(dataInput *upstreamApi.Schedule) database.Service {
|
||||||
|
log.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
|
||||||
|
|
||||||
|
var operator, headcode, powerType string
|
||||||
|
var planSpeed int32
|
||||||
|
var stops []database.Stop
|
||||||
|
|
||||||
|
// Check that the ScheduleSegment contains data, 'Delete' messages have no ScheduleSegment
|
||||||
|
if len(dataInput.ScheduleSegment) > 0 {
|
||||||
|
operator = dataInput.ScheduleSegment[0].ATOCCode
|
||||||
|
headcode = dataInput.ScheduleSegment[0].SignallingID
|
||||||
|
powerType = dataInput.ScheduleSegment[0].CIFPowerType
|
||||||
|
planSpeed = parseSpeed(dataInput.ScheduleSegment[0].CIFSpeed)
|
||||||
|
stops = parseStops(dataInput.ScheduleSegment[0].ScheduleLocation)
|
||||||
|
}
|
||||||
|
if operator == "" {
|
||||||
|
operator = "UK"
|
||||||
|
}
|
||||||
|
service := database.Service{
|
||||||
|
TransactionType: dataInput.TransactionType,
|
||||||
|
StpIndicator: dataInput.CIFSTPIndicator,
|
||||||
|
Vstp: true,
|
||||||
|
Operator: operator,
|
||||||
|
TrainUid: dataInput.CIFTrainUID,
|
||||||
|
Headcode: headcode,
|
||||||
|
PowerType: powerType,
|
||||||
|
PlanSpeed: planSpeed,
|
||||||
|
ScheduleStartDate: parseDate(dataInput.ScheduleStartDate, false),
|
||||||
|
ScheduleEndDate: parseDate(dataInput.ScheduleEndDate, true),
|
||||||
|
DaysRun: parseDaysRun(dataInput.ScheduleDaysRun),
|
||||||
|
Stops: stops,
|
||||||
|
}
|
||||||
|
return service
|
||||||
|
}
|
||||||
|
|
||||||
|
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
|
||||||
|
func parseSpeed(CIFSpeed string) int32 {
|
||||||
|
log.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
|
||||||
|
if CIFSpeed == "" {
|
||||||
|
log.Debug("Speed data not provided")
|
||||||
|
return int32(0)
|
||||||
|
}
|
||||||
|
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
|
||||||
|
if !exists {
|
||||||
|
actualSpeed = CIFSpeed
|
||||||
|
}
|
||||||
|
log.Debug("Corrected Speed: " + actualSpeed)
|
||||||
|
|
||||||
|
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
|
||||||
|
return int32(0)
|
||||||
|
}
|
||||||
|
return int32(speed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Converts the date string provided from the upstream API into a proper Date type and adds a time
|
||||||
|
func parseDate(dateString string, end bool) time.Time {
|
||||||
|
log.Debug("Date Input: " + dateString)
|
||||||
|
date, err := time.Parse("2006-01-02", dateString)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to parse date: " + dateString)
|
||||||
|
return time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var hour, minute, second, nanosecond int
|
||||||
|
location := time.UTC
|
||||||
|
if end {
|
||||||
|
hour, minute, second, nanosecond = 23, 59, 59, 0
|
||||||
|
} else {
|
||||||
|
hour, minute, second, nanosecond = 0, 0, 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
|
||||||
|
log.Debug("Parsed date: " + dateWithTime.String())
|
||||||
|
return dateWithTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Converts the binary stype 'daysRun' field into an array of short days
|
||||||
|
func parseDaysRun(daysBinary string) []string {
|
||||||
|
log.Debug("daysRun Input: " + daysBinary)
|
||||||
|
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
|
||||||
|
var result []string
|
||||||
|
for i, digit := range daysBinary {
|
||||||
|
if digit == '1' {
|
||||||
|
result = append(result, shortDays[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Converts an array if upstreamApi.ScheduleLocation types to an array of database.Stop types
|
||||||
|
func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop {
|
||||||
|
var stops []database.Stop
|
||||||
|
|
||||||
|
for _, loc := range inputStops {
|
||||||
|
stop := database.Stop{
|
||||||
|
PublicDeparture: parseTimeStrings(loc.PublicDepartureTime),
|
||||||
|
WttDeparture: parseTimeStrings(loc.ScheduledDepartureTime),
|
||||||
|
PublicArrival: parseTimeStrings(loc.PublicArrivalTime),
|
||||||
|
WttArrival: parseTimeStrings(loc.ScheduledArrivalTime),
|
||||||
|
IsPublic: strings.TrimSpace(loc.PublicDepartureTime) != "" || strings.TrimSpace(loc.PublicArrivalTime) != "",
|
||||||
|
Tiploc: loc.Tiploc.Tiploc.TiplocId,
|
||||||
|
}
|
||||||
|
|
||||||
|
stops = append(stops, stop)
|
||||||
|
}
|
||||||
|
|
||||||
|
return stops
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTimeStrings(t string) string {
|
||||||
|
if t == "" {
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
strippedT := strings.TrimSpace(t)
|
||||||
|
if strippedT == "" {
|
||||||
|
return ""
|
||||||
|
} else {
|
||||||
|
return strippedT[:4]
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,20 +0,0 @@
|
||||||
package vstp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
|
|
||||||
func unmarshalData(jsonData string) (*upstreamApi.JsonScheduleV1, error) {
|
|
||||||
var schedule upstreamApi.MsgData
|
|
||||||
err := json.Unmarshal([]byte(jsonData), &schedule)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to unmarshal message body: " + err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
log.Debug("Unmarshalling Complete")
|
|
||||||
return &schedule.Data.CIFMsg, nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue