timetable-extension #1

Open
fred.boniface wants to merge 160 commits from timetable-extension into main
67 changed files with 3832 additions and 631 deletions

View File

@ -9,6 +9,9 @@
*.dylib
message-logs
cif_debug_data
EXAMPLE_CIF_FILES
# Test binary, built with `go test -c`
*.test

View File

@ -0,0 +1,23 @@
name: Go Test
on:
push:
jobs:
test:
env:
RUNNER_TOOL_CACHE: /toolcache
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: ./go.mod
cache: true
- name: Run tests
run: go test ./...

7
.gitignore vendored
View File

@ -9,6 +9,13 @@
*.dylib
message-logs
# Manually built binaries
main
timetable-mgr
# Debug data
cif_debug_data
# Test binary, built with `go test -c`
*.test

View File

@ -1,8 +1,9 @@
FROM golang:alpine as builder
WORKDIR /source
COPY ./src .
COPY . .
RUN go build .
FROM scratch
COPY --from=builder /source/mq-client /bin/mq-client
CMD [ "/bin/mq-client" ]
FROM alpine:latest
COPY --from=builder /source/timetable-mgr /bin/timetable-mgr
USER 20400
CMD [ "/bin/timetable-mgr" ]

View File

@ -1,5 +1,41 @@
# mq-client
# timetable-mgr
Client for these message queue feeds:
- VSTP Schedule data
timetable-mgr is the new name for mq-client.
It has been extended to manage all timetable management aspects within OwlBoard:
- Periodic fetch of CORPUS Location Data
- Periodic fetch of Knowledgebase Stations data
- Daily fetch of Network Rail timetable files
- Subscribe to VSTP Messages from the MQ Feed
- Update the database with new timetable data
- Clean old services from the database
The configuration options have also been changed to support simple files, this allows support for Docker Secrets as well as the mounting of secrets within a Kubernetes cluster in addition to the existing method of configuring via environment variables.
## Configuration
The application requires the following configuration values to be set.
The preferred method of configuration in Kubernetes is mapping secrets to environment variables, and in Docker is to mount the secrets to a file - one value per file. This is the simplest method and provides an acceptable level of security - provided you have configured secret management within Kubernetes.
Docker Swarm has secret support built-in, for Docker Standalone/Podman you will need to manage secrets yourself.
See the table below for the environment variable name, the configuration file path. Some values have defaults which will be used if the variable is not set and the file path does not exist. In cases where the file path exists and the environment variable also exists, the environment variable will be used. All values without a default are required,timetable-mgr will exit if it cannot load a value.
| Variable Name | File Path | Default Value | Description |
| :------------ | :------------------ | :------------ | :---------- |
| OWL_VSTP_ON | /owl/conf/vstp/on | on | Enable/Disable VSTP Updates <sup>1</sup> |
| OWL_NROD_USER | /owl/conf/nrod/user | | Network Rail NROD Username |
| OWL_NROD_PASS | /owl/conf/nrod/pass | | Network Rail NROD Password |
| OWL_DB_HOST | /owl/conf/db/host | localhost | MongoDB Host |
| OWL_DB_PORT | /owl/conf/db/port | 27017 | MongoDB Port |
| OWL_DB_USER | /owl/conf/db/user | | MongoDB Username |
| OWL_DB_PASS | /owl/conf/db/pass | | MondoDB Password |
<sup>1</sup> Set to 'off' to disable VSTP Updates. Useful in testing as you cannot have two clients connected.
## Logging
Logging is at INFO level. The environment variable 'runtime' is checked. If it is set to `debug`, then the log level will be set to DEBUG.
If the environment variable 'perflog' is set to `on`, then the number of running goroutines and heap memory allocation will be printed to the console every two seconds. Of course this is off by default.

75
background/ticker.go Normal file
View File

@ -0,0 +1,75 @@
package background
import (
"math"
"os"
"runtime"
"time"
"git.fjla.uk/owlboard/timetable-mgr/cif"
"git.fjla.uk/owlboard/timetable-mgr/corpus"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/pis"
"git.fjla.uk/owlboard/timetable-mgr/stations"
"go.uber.org/zap"
)
const frequency = 2 * time.Hour // Figure out a sensible frequency!
// Starts a background ticker to run background tasks. Uses the frequency configured in the background/ticker.go file
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
go runTicker(cfg, stop)
// Run goroutine logging ticker if env "perflog" is set to "on"
if os.Getenv("perflog") == "on" {
go goroutineTicker(stop)
}
}
// Runs the ticker and handles tick events
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
log.Info("Starting background task ticker", zap.Duration("frequency", frequency))
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-stop:
log.Debug("Stopping background task ticker")
return
case <-ticker.C:
log.Debug("Running background tasks")
go cif.CheckCif(cfg)
go corpus.CheckCorpus(cfg)
go stations.Check()
go pis.Check()
}
}
}
// Starts a ticker that logs how many goroutines are running every two seconds
func goroutineTicker(stop <-chan struct{}) {
log.Debug("Starting goroutine resource logging ticker")
ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
debugLog()
}
}
}
func debugLog() {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
goroutines := runtime.NumGoroutine()
heapMem := float64(memStats.HeapAlloc) / (1024 * 1024)
heapMemRound := math.Round(heapMem*100) / 100
log.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
}

66
cif/check.go Normal file
View File

@ -0,0 +1,66 @@
package cif
import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Checks if the CIF Data needs updating, and what type of update is needed (Full/Partial) and if partial
// what days data needs updating, then calls an update function to handle the update.
func CheckCif(cfg *helpers.Configuration) {
// Check that it is after 0600, if not then skip update
if time.Now().In(londonTimezone).Hour() <= dataAvailable {
log.Info("Too early to update CIF data, not published until 0600")
return
}
log.Debug("Checking age of CIF Data")
// Load and read metadata from database
metadata, err := dbAccess.GetCifMetadata()
if err != nil {
log.Error("Unable to read last update time", zap.Error(err))
return
}
// If no metadata is found in DB, presume no CIF data exists
if metadata == nil {
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Warn("Unable to run full CIF Update", zap.Error(err))
}
return
}
// Check if last update was today
if isSameToday(metadata.LastUpdate) {
log.Info("CIF Data has already been updated today, skipping")
return
}
// Check how many days since last update, if more than 5, run full update, else run update
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
if daysSinceLastUpdate > 5 {
log.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Error("Full CIF update failed", zap.Error(err))
}
return
}
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
// Run the update
log.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
if err != nil {
log.Warn("Daily CIF update failed", zap.Error(err))
}
}

19
cif/constants.go Normal file
View File

@ -0,0 +1,19 @@
package cif
import "time"
// The URL required for a daily update of the CIF Data - The 'day string' must be appended
const dailyUpdateUrl = "https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_ALL_UPDATE_DAILY&day=toc-update-"
// The URL required for a full fetch of the CIF Data
const fullUpdateUrl = "https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_ALL_FULL_DAILY&day=toc-full"
// The time at which CIF Data is expected to be available for download (full hour)
const dataAvailable = 6
// Define update type strings to pass into metadata
const fullUpdateType = "full"
const dailyUpdateType = "daily"
// An object representing the Europe/London timezone
var londonTimezone, _ = time.LoadLocation("Europe/London")

128
cif/convert.go Normal file
View File

@ -0,0 +1,128 @@
package cif
import (
"strconv"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
output := database.Service{
//TransactionType: input.TransactionType,
StpIndicator: input.CifStpIndicator,
Operator: input.AtocCode,
TrainUid: input.CifTrainUid,
Headcode: input.ScheduleSegment.SignallingId,
PowerType: input.ScheduleSegment.CifPowerType,
PlanSpeed: parseSpeed(&input.ScheduleSegment.CifSpeed),
ScheduleStartDate: ParseCifDate(&input.ScheduleStartDate, "start"),
ScheduleEndDate: ParseCifDate(&input.ScheduleEndDate, "end"),
ServiceDetail: generateServiceDetail(&input.ScheduleSegment.CifTrainClass, &input.ScheduleSegment.SignallingId, &input.ScheduleSegment.CifCateringCode, &input.ScheduleSegment.CifSleepers, vstp),
DaysRun: parseDaysRun(&input.ScheduleDaysRun),
Stops: parseStops(&input.ScheduleSegment.ScheduleLocation),
}
return &output, nil
}
// Converts CifSpeed input string to an int32, automatically corrects VSTP speeds which are not actual speed values
func parseSpeed(CIFSpeed *string) int32 {
if CIFSpeed == nil {
return 0
}
if *CIFSpeed == "" {
return int32(0)
}
actualSpeed, exists := helpers.SpeedMap[*CIFSpeed]
if !exists {
actualSpeed = *CIFSpeed
}
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Warn("Unable to parse speed", zap.String("input-value", *CIFSpeed))
return int32(0)
}
return int32(speed)
}
// Not tested
func parseStops(input *[]upstreamApi.CifScheduleLocation) []database.Stop {
output := make([]database.Stop, 0, len(*input))
for _, item := range *input {
stop := database.Stop{
PublicDeparture: item.PublicDeparture,
PublicArrival: item.PublicArrival,
WttDeparture: item.Departure,
WttArrival: item.Arrival,
Pass: item.Pass,
Platform: item.Platform,
ArrLine: item.Path,
DepLine: item.Line,
Tiploc: item.TiplocCode,
IsPublic: isPublic(&item),
}
output = append(output, stop)
}
return output
}
// Ascertains whether a given location is a public stop or not
func isPublic(input *upstreamApi.CifScheduleLocation) bool {
if input.PublicArrival == "" && input.PublicDeparture == "" {
return false
}
return true
}
// Generates a ServiceDetail struct based on the input
func generateServiceDetail(
cifTrainClass, signallingId,
cateringCode, sleepers *string,
vstp bool,
) database.ServiceDetail {
return database.ServiceDetail{
FirstClass: hasFirstClass(cifTrainClass, signallingId),
Catering: hasCatering(cateringCode),
Sleeper: hasSleeper(sleepers),
Vstp: vstp,
}
}
// Ascertains whether the service offers first class
func hasFirstClass(input, signallingId *string) bool {
if input == nil || signallingId == nil {
return false
}
// Handle non passenger headcodes and ensure first class is not shown as available
firstChar := (*signallingId)[0]
if firstChar == '3' || firstChar == '4' || firstChar == '5' || firstChar == '6' || firstChar == '7' || firstChar == '8' || firstChar == '0' {
return false
}
return *input != "S"
}
// Ascertains whether the service offers catering
func hasCatering(input *string) bool {
if input == nil {
return false
}
return *input != ""
}
// Ascertains whether the service offers sleeping berths
func hasSleeper(input *string) bool {
if input == nil {
return false
}
return *input != ""
}

222
cif/convert_test.go Normal file
View File

@ -0,0 +1,222 @@
package cif
import (
"reflect"
"testing"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
)
func TestParseSpeed(t *testing.T) {
testCases := []struct {
input string
expected int32
}{
{"075", 75},
{"125", 125},
{"40", 40},
{"040", 40},
{"134", 60},
{"179", 80},
{"186", 186},
{"417", 186},
}
for _, tc := range testCases {
result := parseSpeed(&tc.input)
if result != tc.expected {
t.Errorf("For speed: %s, expected: %d, but got: %d", tc.input, tc.expected, result)
}
}
nilResult := parseSpeed(nil)
if nilResult != 0 {
t.Errorf("parseSpeed failed to handle nil pointer, expected %d, but got %d", 0, nilResult)
}
}
func TestIsPublic(t *testing.T) {
testCases := []struct {
input upstreamApi.CifScheduleLocation
expect bool
}{
{upstreamApi.CifScheduleLocation{
LocationType: "LO",
RecordIdentity: "",
TiplocCode: "BATHSPA",
TiplocInstance: "0",
Arrival: "0422H",
Departure: "0445",
PublicDeparture: "0445",
Pass: "",
Path: "UFR",
Platform: "1",
Line: "DFR",
EngineeringAllowance: "",
PathingAllowance: "",
PerformanceAllowance: "",
}, true},
{upstreamApi.CifScheduleLocation{
LocationType: "LO",
RecordIdentity: "",
TiplocCode: "BATHSPA",
TiplocInstance: "0",
Arrival: "0422H",
Departure: "0445",
PublicDeparture: "",
Pass: "",
Path: "UFR",
Platform: "1",
Line: "DFR",
EngineeringAllowance: "",
PathingAllowance: "",
PerformanceAllowance: "",
}, false},
}
for num, tc := range testCases {
result := isPublic(&tc.input)
if result != tc.expect {
t.Errorf("For testCase %d, expected %t, but got %t", num, tc.expect, result)
}
}
}
func TestHasFirstClass(t *testing.T) {
testCases := []struct {
input string
headcode string
expect bool
}{
{"", "1A00", true},
{"B", "2A05", true},
{"S", "1C99", false},
{"", "3C23", false},
{"", "5Q21", false},
{"", "5D32", false},
{"", "9O12", true},
{"B", "9D32", true},
{"", "7R43", false},
{"B", "6Y77", false},
{"", "8P98", false},
{"S", "4O89", false},
{"", "4E43", false},
}
for _, tc := range testCases {
result := hasFirstClass(&tc.input, &tc.headcode)
if result != tc.expect {
t.Errorf("For %s & headcode %s, expected %t, but got %t", tc.input, tc.headcode, tc.expect, result)
}
}
nilResult := hasFirstClass(nil, nil)
if nilResult {
t.Errorf("hasFirstClass failed to handle nil pointer, expected %t, got %t", false, nilResult)
}
}
func TestHasCatering(t *testing.T) {
testCases := []struct {
input string
expect bool
}{
{"", false},
{"CF", true},
{"HT", true},
{"MR", true},
{"RP", true},
{"T", true},
{"F", true},
}
for _, tc := range testCases {
result := hasCatering(&tc.input)
if result != tc.expect {
t.Errorf("For %s, expected %t, but got %t", tc.input, tc.expect, result)
}
}
nilResult := hasCatering(nil)
if nilResult {
t.Errorf("hasCatering failed to handle nil pointer, expected %t, but got %t", false, nilResult)
}
}
func TestHasSleeper(t *testing.T) {
testCases := []struct {
input string
expect bool
}{
{"B", true},
{"F", true},
{"S", true},
{"", false},
}
for _, tc := range testCases {
result := hasSleeper(&tc.input)
if result != tc.expect {
t.Errorf("For %s, expected %t, but got %t", tc.input, tc.expect, result)
}
}
nilResult := hasSleeper(nil)
if nilResult {
t.Errorf("hasSleeper failed to handle nil pointer, expected %t, but got %t", false, nilResult)
}
}
func TestParseStops(t *testing.T) {
testCases := []struct {
input []upstreamApi.CifScheduleLocation
expected []database.Stop
}{
{
input: []upstreamApi.CifScheduleLocation{
{
LocationType: "LO",
RecordIdentity: "Yes",
TiplocCode: "BATHSPA",
TiplocInstance: "0",
Arrival: "0445",
Departure: "0449",
PublicDeparture: "0449",
Pass: "",
Platform: "1",
Line: "DM",
Path: "DM",
EngineeringAllowance: "",
PathingAllowance: "",
PerformanceAllowance: "",
}},
expected: []database.Stop{
{
PublicDeparture: "0449",
WttDeparture: "0449",
PublicArrival: "",
WttArrival: "0445",
IsPublic: true,
Tiploc: "BATHSPA",
Pass: "",
Platform: "1",
ArrLine: "DM",
DepLine: "DM",
},
},
},
}
for _, tc := range testCases {
result := parseStops(&tc.input)
if !reflect.DeepEqual(result, tc.expected) {
t.Errorf("Test case failed. Input: %v, Expected: %v, Got: %v", tc.input, tc.expected, result)
}
}
}

99
cif/helpers.go Normal file
View File

@ -0,0 +1,99 @@
package cif
import (
"errors"
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Fetches the day string for the provided date.
func getDayString(t time.Time) string {
time := t.In(londonTimezone)
day := time.Weekday()
dayStrings := [...]string{"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
return dayStrings[day]
}
// Simply returns the correct URL for either a 'daily' or 'full' update.
func getUpdateUrl(updateType string) (string, error) {
if updateType == "daily" {
return dailyUpdateUrl, nil
} else if updateType == "full" {
return fullUpdateUrl, nil
}
err := errors.New("invalid update type provided, must be one of 'daily' or 'full'")
return "", err
}
// Takes a time.Time as input and returns True if it is
// the same day as now, or false if it is not the same day as now
func isSameToday(t time.Time) bool {
test := t.In(time.UTC)
today := time.Now().In(time.UTC)
return test.Year() == today.Year() && test.Month() == today.Month() && test.Day() == today.Day()
}
// Returns how many days ago `t` was compared to today
func howManyDaysAgo(t time.Time) int {
// Truncate both times to midnight in UTC timezone
today := time.Now().UTC().Truncate(24 * time.Hour)
input := t.UTC().Truncate(24 * time.Hour)
diff := today.Sub(input)
days := int(diff / (24 * time.Hour))
return days
}
// Generates a slice of time.Time values representing which days files need downloading
func generateUpdateDays(days int) []time.Time {
var updateDays []time.Time
for i := 0; i < days; i++ {
day := time.Now().Add(-time.Duration(i) * 24 * time.Hour)
updateDays = append(updateDays, day)
}
// Reverse slice to ensure chronological order
for i, j := 0, len(updateDays)-1; i < j; i, j = i+1, j-1 {
updateDays[i], updateDays[j] = updateDays[j], updateDays[i]
}
return updateDays
}
// Parses CIF Schedule Start/End Dates (YYYY-MM-DD) into time.Time types (00:00:00 for start, 23:59:59 for end)
func ParseCifDate(input *string, startOrEnd string) time.Time {
layout := "2006-01-02" // Layout of input
t, err := time.ParseInLocation(layout, *input, londonTimezone)
if err != nil {
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}
if startOrEnd == "start" {
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, londonTimezone)
} else if startOrEnd == "end" {
t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, londonTimezone)
} else {
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}
return t.UTC()
}
// Parses CIF days_run field and converts to array of day strings
func parseDaysRun(daysBinary *string) []string {
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
var result []string
for i, digit := range *daysBinary {
if digit == '1' {
result = append(result, shortDays[i])
}
}
return result
}

137
cif/helpers_test.go Normal file
View File

@ -0,0 +1,137 @@
package cif
import (
"reflect"
"testing"
"time"
)
func TestIsSameDay(t *testing.T) {
today := time.Now()
notToday := time.Date(2024, 01, 23, 23, 01, 3, 421, time.Local)
if !isSameToday(today) {
t.Errorf("Error in isSameDay(today). Expected true, got false.")
}
if isSameToday(notToday) {
t.Errorf("Error in isSameDay(notToday). Expected false, got true.")
}
}
func TestHowManyDaysAgo(t *testing.T) {
testCases := []struct {
input time.Time
expected int
}{
{time.Now().In(time.UTC), 0}, // Today
{time.Now().In(time.UTC).Add(-24 * time.Hour), 1}, // Yesterday
{time.Now().In(time.UTC).Add(-48 * time.Hour), 2}, // Ereyesterday
{time.Now().In(time.UTC).Add(24 * time.Hour), -1}, // Tomorrow
{time.Now().In(time.UTC).Add(48 * time.Hour), -2}, // Overmorrow
}
for _, tc := range testCases {
result := howManyDaysAgo(tc.input)
if result != tc.expected {
t.Errorf("For input %v, expected %d but got %d", tc.input, tc.expected, result)
}
}
}
func TestGetDayString(t *testing.T) {
testCases := []struct {
input time.Time
expected string
}{ // Note that the test times are in UTC, but days are checked in Europe/London
{time.Date(2024, time.April, 7, 0, 0, 0, 0, time.UTC), "sun"},
{time.Date(2024, time.April, 4, 21, 0, 0, 0, time.UTC), "thu"},
{time.Date(2001, time.September, 11, 12, 46, 0, 0, time.UTC), "tue"},
}
for _, tc := range testCases {
result := getDayString(tc.input)
if result != tc.expected {
t.Errorf("For input %v, expected %s, but got %s", tc.input, tc.expected, result)
}
}
}
func TestGenerateUpdateDays(t *testing.T) {
testCases := []struct {
days int
expected []time.Time
}{
{1, []time.Time{time.Now()}},
{2, []time.Time{time.Now().Add(-24 * time.Hour), time.Now()}},
{4, []time.Time{time.Now().Add(-72 * time.Hour),
time.Now().Add(-48 * time.Hour),
time.Now().Add(-24 * time.Hour),
time.Now(),
}},
}
for _, tc := range testCases {
result := generateUpdateDays(tc.days)
if len(result) != len(tc.expected) {
t.Errorf("For %d days, expected %v, but got %v", tc.days, tc.expected, result)
continue
}
for i := range result {
if !isSameDate(result[i], tc.expected[i]) {
t.Errorf("For %d days, expected %v, but got %v", tc.days, tc.expected, result)
}
}
}
}
func TestParseCifDate(t *testing.T) {
testCases := []struct {
dateString string
startOrEnd string
expect time.Time
}{
{"2024-04-05", "start", time.Date(2024, time.April, 5, 0, 0, 0, 0, londonTimezone)},
{"2022-01-01", "start", time.Date(2022, time.January, 1, 0, 0, 0, 0, londonTimezone)},
{"2015-09-26", "end", time.Date(2015, time.September, 26, 23, 59, 59, 0, londonTimezone)},
{"2018-03-13", "end", time.Date(2018, time.March, 13, 23, 59, 59, 0, londonTimezone)},
}
layout := "2006-01-02 15:04:05" // Layout for printing times in error cases.
for _, tc := range testCases {
result := ParseCifDate(&tc.dateString, tc.startOrEnd)
result = result.In(londonTimezone)
//fmt.Println(tc.dateString, "|UTC: ", result.In(time.UTC), "|EU/Lon: ", result)
if result != tc.expect {
t.Errorf("For datestring %s, startOrEnd %s, expected %s, but got %s", tc.dateString, tc.startOrEnd, tc.expect.Format(layout), result.Format(layout))
}
}
}
func TestParseDaysRun(t *testing.T) {
testCases := []struct {
input string
expect []string
}{
{"1111111", []string{"m", "t", "w", "th", "f", "s", "su"}},
{"0000001", []string{"su"}},
{"1000000", []string{"m"}},
{"0000100", []string{"f"}},
{"0111000", []string{"t", "w", "th"}},
}
for _, tc := range testCases {
result := parseDaysRun(&tc.input)
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("For input %s, expected %v, but got %v", tc.input, tc.expect, result)
}
}
}
// Checks if two time values have the same year, month and day.
func isSameDate(t1, t2 time.Time) bool {
return t1.Year() == t2.Year() && t1.Month() == t2.Month() && t1.Day() == t2.Day()
}

69
cif/metadata.go Normal file
View File

@ -0,0 +1,69 @@
package cif
import (
"fmt"
"time"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Evaluates last and new metadata and determines whether an update is required.
// // If last update == "full" && lastupdate after yesterday at 0600, run update.
// // Beyond that condition, comparisons are made between old and new metadata.
func checkMetadata(oldMeta *dbAccess.CifMetadata, newMeta *upstreamApi.JsonTimetableV1) (reason string, updateRequired bool) {
// Handle nil pointer - although this should be resolved in the calling function in ideal situations
if oldMeta == nil || newMeta == nil {
log.Debug("oldMeta or newMeta is a nil pointer.")
return "nil-pointer", false
}
if !isTimestampWithinLastFiveDays(newMeta.Timestamp) {
log.Debug("Downloaded CIF File not produced in last five days", zap.Time("file_timestamp", time.Unix(newMeta.Timestamp, 0)))
return "downloaded-data-is-too-old", false
}
// Handle non-matching sequence numbers between full and daily download types
// if isAfterYesterdayAt0600(oldMeta.LastUpdate) {
// return "last-update-full", true
// }
// Check that the new metadata sequence is as expected.
if newMeta.Metadata.Sequence == oldMeta.LastSequence+1 {
log.Debug("Sequence numbers", zap.Int64("New", newMeta.Metadata.Sequence), zap.Int64("Old", oldMeta.LastSequence))
return "correct-sequence", true
} else {
s := fmt.Sprintf("incorrect sequence, Old: %d, New: %d, Expected New: %d", oldMeta.LastSequence, newMeta.Metadata.Sequence, oldMeta.LastSequence+1)
return s, false
}
}
// Evaluates whether the given time is after yesterday at 0600
func isAfterYesterdayAt0600(t time.Time) bool {
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1)
yesterday0600 = time.Date(yesterday0600.Year(), yesterday0600.Month(), yesterday0600.Day(), 6, 0, 0, 0, time.UTC)
return t.After(yesterday0600)
}
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
newMetadata := dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastUpdate: time.Now().In(londonTimezone),
LastSequence: header.Metadata.Sequence,
}
return &newMetadata
}
// Accepts a unix timestamp and returns true if the timestamp is within the last five days, else false
func isTimestampWithinLastFiveDays(ts int64) bool {
timestamp := time.Unix(ts, 0)
timeNow := time.Now()
fiveDaysAgo := timeNow.AddDate(0, 0, -5)
return !timestamp.Before(fiveDaysAgo)
}

101
cif/metadata_test.go Normal file
View File

@ -0,0 +1,101 @@
package cif
import (
"testing"
"time"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
)
func TestGenerateMetadata(t *testing.T) {
header := &upstreamApi.JsonTimetableV1{
Classification: "public",
Timestamp: 1711227636,
Owner: "Network Rail",
Sender: upstreamApi.TimetableSender{
Organisation: "Rockshore",
Application: "NTROD",
Component: "SCHEDULE",
},
Metadata: upstreamApi.TimetableMetadata{
Type: "update",
Sequence: 4307,
},
}
expected := &dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastSequence: header.Metadata.Sequence,
LastUpdate: time.Now().In(londonTimezone),
}
result := generateMetadata(header)
if result == nil {
t.Errorf("generateMetadata returned nil pointer")
return // Static type checking likes this return to be here, even if it is redundant in reality.
}
if result.Doctype != expected.Doctype {
t.Errorf("Doctype: expected %s, got %s", expected.Doctype, result.Doctype)
}
if result.LastTimestamp != expected.LastTimestamp {
t.Errorf("LastTimestamp: expected %d, got %d", expected.LastTimestamp, result.LastTimestamp)
}
if result.LastSequence != expected.LastSequence {
t.Errorf("LastSequence: expected %d, got %d", expected.LastSequence, result.LastSequence)
}
tolerance := time.Second
if !result.LastUpdate.Before(expected.LastUpdate.Add(tolerance)) ||
!result.LastUpdate.After(expected.LastUpdate.Add(-tolerance)) {
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
}
}
func TestIsAfterYesterdayAt0600(t *testing.T) {
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1).Truncate(24 * time.Hour).Add(6 * time.Hour)
testCases := []struct {
input time.Time
expect bool
}{
{yesterday0600.Add(-1 * time.Hour), false},
{yesterday0600.Add(-12 * time.Hour), false},
{yesterday0600.Add(-24 * time.Hour), false},
{yesterday0600.Add(1 * time.Microsecond), true},
{yesterday0600.Add(1 * time.Hour), true},
{yesterday0600.Add(12 * time.Hour), true},
{yesterday0600.Add(24 * time.Hour), true},
}
for _, tc := range testCases {
result := isAfterYesterdayAt0600(tc.input)
if result != tc.expect {
t.Errorf("For input %v, expected %t, but got %t", tc.input, tc.expect, result)
}
}
}
func TestIsTimestampWithinLastFiveDays(t *testing.T) {
testCases := []struct {
name string
ts int64
expect bool
}{
{"WithinLastFiveDays", time.Now().AddDate(0, 0, -3).Unix(), true},
{"ExactlyFiveDaysAgo", time.Now().AddDate(0, 0, -5).Unix(), false}, //False due to elapsed time during test
{"MoreThanFiveDaysAgo", time.Now().AddDate(0, 0, -7).Unix(), false},
}
for _, tc := range testCases {
result := isTimestampWithinLastFiveDays(tc.ts)
if result != tc.expect {
t.Errorf("For %s (%d), expected %t, but got %t", tc.name, tc.ts, tc.expect, result)
}
}
}

69
cif/parse.go Normal file
View File

@ -0,0 +1,69 @@
package cif
import (
"encoding/json"
"errors"
"io"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Accepts the CIF data as a stream and outputs parsed data
func parseCifDataStream(dataStream io.ReadCloser) (*ParsedData, error) {
defer dataStream.Close()
log.Debug("Starting CIF Datastream parsing")
if dataStream == nil {
return nil, errors.New("unable to parse nil pointer")
}
var parsed ParsedData
parsed.Assoc = make([]upstreamApi.JsonAssociationV1, 0)
parsed.Sched = make([]upstreamApi.JsonScheduleV1, 0)
// Create JSON Decoder
decoder := json.NewDecoder(dataStream)
// Iterate over JSON Objects using stream decoder
for decoder.More() {
var obj map[string]json.RawMessage
if err := decoder.Decode(&obj); err != nil {
log.Error("Error decoding JSON String")
return nil, err
}
// Handle parsed data
for key, value := range obj {
switch key {
case "JsonTimetableV1":
var timetable upstreamApi.JsonTimetableV1
if err := json.Unmarshal(value, &timetable); err != nil {
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
continue
}
parsed.Header = timetable
case "TiplocV1":
// This data is not used and is sourced from CORPUS
continue
case "JsonAssociationV1":
// Association data is not currently used
// but may be used in the future
continue
case "JsonScheduleV1":
var schedule upstreamApi.JsonScheduleV1
if err := json.Unmarshal(value, &schedule); err != nil {
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
continue
}
parsed.Sched = append(parsed.Sched, schedule)
case "EOF":
log.Debug("Reached EOF")
default:
log.Warn("Unknown CIF Data type", zap.String("key", key))
}
}
}
log.Debug("CIF Parsing completed")
return &parsed, nil
}

114
cif/process.go Normal file
View File

@ -0,0 +1,114 @@
package cif
import (
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Processes parsed CIF data and applies the data to the database
func ProcessParsedCif(data *ParsedData) error {
log.Debug("Starting CIF Processing")
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.Sched)))
// Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
// Process deletions in batches
for i := 0; i < len(data.Sched); i += batchSize {
end := i + batchSize
if end > len(data.Sched) {
end = len(data.Sched)
}
deleteBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.Sched[i:end] {
if item.TransactionType == "Delete" {
deleteItem := item
deleteBatch = append(deleteBatch, &deleteItem)
}
}
if len(deleteBatch) > 0 {
err := doDeletions(deleteBatch)
if err != nil {
log.Error("Error deleting CIF Entries", zap.Error(err))
return err
}
}
}
// Process creations in batches
for i := 0; i < len(data.Sched); i += batchSize {
end := i + batchSize
if end > len(data.Sched) {
end = len(data.Sched)
}
createBatch := make([]*upstreamApi.JsonScheduleV1, 0)
for _, item := range data.Sched[i:end] {
if item.TransactionType == "Create" {
createItem := item
createBatch = append(createBatch, &createItem)
}
}
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil {
log.Error("Error creating CIF Entries", zap.Error(err))
return err
}
}
}
log.Debug("CIF Processing complete")
return nil
}
// Create delete query types and pass to the function which batches the deletions
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
defer func() {
if r := recover(); r != nil {
log.Panic("Panic:", zap.Any("panic", r))
}
}()
deleteQueries := make([]database.DeleteQuery, 0)
for _, item := range deletions {
query := database.DeleteQuery{
ScheduleStartDate: ParseCifDate(&item.ScheduleStartDate, "start"),
StpIndicator: item.CifStpIndicator,
TrainUid: item.CifTrainUid,
}
deleteQueries = append(deleteQueries, query)
}
err := dbAccess.DeleteCifEntries(deleteQueries)
if err != nil {
log.Error("Error deleting documents", zap.Error(err))
return err
}
return nil
}
// Convert to the correct struct for the database and pass to the function which batches insertions
func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
createDocuments := make([]database.Service, 0)
for _, item := range creations {
document, err := ConvertServiceType(item, false)
if err != nil {
log.Error("Error converting JsonSchedule to Service type", zap.Error(err))
continue
}
createDocuments = append(createDocuments, *document)
}
err := dbAccess.CreateCifEntries(createDocuments)
if err != nil {
log.Error("Error creating documents", zap.Error(err))
return err
}
return nil
}

11
cif/readme.md Normal file
View File

@ -0,0 +1,11 @@
# package cif
This package follows a similar pattern to `package corpus`.
First, CheckCorpus() retreived cif metadata from the database and determines whether an update is required and what type of update.
Then, one of the update functions is called which run through the update process. There are two update types, 'full' and 'update'. A 'full' update drops the entire timetable collection and rebuilds with a full CIF download. 'update' downloads CIF updates from specified days and applies updates.
Downloads are handled by `package nrod` which returns an io.ReadCloser which is passed to the parsing function.
Currently the parsing function returns a parsedCif pointer, however this is using significant memory due to the size of a full CIF download (Often around 4.5GB). The intention is to instead use a worker pool to handle the data.

13
cif/types.go Normal file
View File

@ -0,0 +1,13 @@
package cif
import "git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
// This file is for internal types to `package cif`, any types which represent
// database or external API resources should be defined in git.fjla.uk/owlboard/go-types
// Holds parsed data for processing
type ParsedData struct {
Header upstreamApi.JsonTimetableV1
Assoc []upstreamApi.JsonAssociationV1
Sched []upstreamApi.JsonScheduleV1
}

159
cif/update.go Normal file
View File

@ -0,0 +1,159 @@
package cif
import (
"io"
"time"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/nrod"
"go.uber.org/zap"
)
// Replaces all existing CIF Data with a new download
func runCifFullDownload(cfg *helpers.Configuration) error {
preTime := time.Now()
log.Info("Downloading all CIF Data")
// Download CIF Data file
url, err := getUpdateUrl("full")
if err != nil {
log.Error("Error getting download URL", zap.Error(err))
}
dataStream, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Error("Error downloading CIF data", zap.Error(err))
return err
}
// Parse CIF file
parsed, err := parseCifDataStream(dataStream)
if err != nil {
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
// Try to remove all non-vstp entries, else give up and delete collection
count, err := dbAccess.PurgeNonVstp()
if err != nil {
log.Warn("Error purging non-vstp schedules, dropping collection", zap.Error(err))
dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database.
} else {
log.Info("Removed non-vstp services", zap.Int64("deletion count", count))
}
// Process CIF file
err = ProcessParsedCif(parsed)
if err != nil {
log.Error("Error processing CIF data", zap.Error(err))
}
newMeta := generateMetadata(&parsed.Header)
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
if !ok {
log.Warn("CIF Data updated, but metadata write failed")
}
// Set parsed to nil to encourage garbage collection
parsed = nil
// Create Indexes
err = dbAccess.CreateTimetableIndexes()
if err != nil {
log.Warn("Error creating timetable indexes, degraded performance", zap.Error(err))
}
postTime := time.Now()
updateDuration := postTime.Sub(preTime)
log.Info("Execution time", zap.Duration("duration", updateDuration))
return nil
}
// Runs a CIF Update for up to five days
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
startTime := time.Now()
log.Info("Downloading CIF Updates")
// Loop over dates
for _, time := range days {
log.Info("Downloading CIF File", zap.Time("CIF Data from", time))
// Download CIF data file
data, err := fetchUpdate(time, cfg)
if err != nil {
log.Error("Error fetching CIF update", zap.Error(err))
return err
}
// Parse CIF file
parsed, err := parseCifDataStream(data)
if err != nil {
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
// Check CIF Metadata
log.Debug("Starting metadata checks")
reason, update := checkMetadata(metadata, &parsed.Header)
if !update {
log.Warn("Update file not processed", zap.String("reason", reason))
continue
}
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
// Process CIF file
err = ProcessParsedCif(parsed)
if err != nil {
log.Error("Error processing CIF data", zap.Error(err))
}
metadata = generateMetadata(&parsed.Header)
parsed = nil
}
// Write metadata to database
ok := dbAccess.PutCifMetadata(metadata, dailyUpdateType)
if !ok {
log.Warn("CIF Data updated, but metadata write failed.")
}
// Clear out of date schedules
cutoff := time.Now().Add(-time.Hour * 24 * 7)
log.Debug("Attempting to remove outdated services", zap.Time("scheduleEnd before", cutoff))
count, err := dbAccess.RemoveOutdatedServices(cutoff)
if err != nil {
log.Warn("Out of date services not removed", zap.Error(err))
} else {
log.Info("Out of date services removed", zap.Int64("removal count", count))
}
endTime := time.Now()
duration := endTime.Sub(startTime)
log.Info("CIF Update process ended", zap.Duration("duration", duration))
return nil
}
// Wraps nrod.NrodDownload() into a function which can handle downloading data for a given day.
// Note that the previous days update is the latest and is downloaded.
func fetchUpdate(t time.Time, cfg *helpers.Configuration) (io.ReadCloser, error) {
url, err := getUpdateUrl("daily")
if err != nil {
return nil, err
}
// Calcuates the day yesterday which is the file that needs downloading
updateDay := t.Add(-24 * time.Hour)
// Append day string to URL
url = url + getDayString(updateDay)
log.Debug("Fetching CIF Data", zap.Time("Update_File_Produced", updateDay))
dataStream, err := nrod.NrodStream(url, cfg)
if err != nil {
return nil, err
}
return dataStream, nil
}

38
corpus/check.go Normal file
View File

@ -0,0 +1,38 @@
package corpus
import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Checks if the CORPUS Data needs updating, and calls an updater function if needed.
func CheckCorpus(cfg *helpers.Configuration) {
log.Debug("Checking age of CORPUS Data")
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
if err != nil {
log.Error("Error checking last CORPUS update", zap.Error(err))
}
lastUpdate := time.Unix(utime, 0)
currentTime := time.Now()
dataAge := currentTime.Sub(lastUpdate)
fortnight := 14 * 24 * time.Hour
log.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
if dataAge >= fortnight {
log.Info("CORPUS update required")
err := RunCorpusUpdate(cfg)
if err != nil {
log.Warn("CORPUS Update did not run")
} else {
log.Info("CORPUS data has been updated")
}
} else {
log.Info("CORPUS Data not stale, skipping updating")
}
}

4
corpus/constants.go Normal file
View File

@ -0,0 +1,4 @@
package corpus
// The download URL for CORPUS data
const url string = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"

87
corpus/parse.go Normal file
View File

@ -0,0 +1,87 @@
package corpus
import (
"encoding/json"
"errors"
"io"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Accepts CORPUS data as a byte array and formats it ready for database insertion
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
defer stream.Close()
log.Debug("Starting CORPUS Data parsing")
var corpusEntries []database.CorpusEntry
decoder := json.NewDecoder(stream)
// Expect an object at the root of the JSON stream
if _, err := decoder.Token(); err != nil {
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Search for the "TIPLOCDATA" key
for decoder.More() {
// Decode the next JSON token
if tok, err := decoder.Token(); err != nil {
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
} else if tok == "TIPLOCDATA" {
// Found the "TIPLOCDATA" key, expect the associated array
if !decoder.More() {
err := errors.New("missing array after TIPLOCDATA key")
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Start reading the array associated with the "TIPLOCDATA" key
if _, err := decoder.Token(); err != nil {
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Iterate over the JSON array
for decoder.More() {
var corpusEntry database.CorpusEntry
if err := decoder.Decode(&corpusEntry); err != nil {
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
corpusEntries = append(corpusEntries, corpusEntry)
}
break // Exit loop after processing "TIPLOCDATA" array
}
}
log.Debug("CORPUS parsing complete")
return &corpusEntries, nil
}
// Removes empty fields from CORPUS entries
func pruneCorpusEntries(corpusEntries []database.CorpusEntry) *[]database.CorpusEntry {
for i := range corpusEntries {
if corpusEntries[i].CRS == " " {
corpusEntries[i].CRS = ""
}
if corpusEntries[i].TIPLOC == " " {
corpusEntries[i].TIPLOC = ""
}
if corpusEntries[i].NLCDESC16 == " " {
corpusEntries[i].NLCDESC16 = ""
}
if corpusEntries[i].STANOX == " " {
corpusEntries[i].STANOX = ""
}
if corpusEntries[i].UIC == " " {
corpusEntries[i].UIC = ""
}
}
return &corpusEntries
}

23
corpus/stations.go Normal file
View File

@ -0,0 +1,23 @@
package corpus
import "git.fjla.uk/owlboard/go-types/pkg/database"
// Removes non-station entities from the CORPUS Data, ready for insertion to the database (stations collection)
func createStationEntries(corpusData *[]database.CorpusEntry) *[]database.StationEntry {
var stationEntries []database.StationEntry
for _, entry := range *corpusData {
if entry.CRS != "" {
stationEntry := database.StationEntry{
CRS: entry.CRS,
TIPLOC: entry.TIPLOC,
NLCDESC: entry.NLCDESC,
STANOX: entry.STANOX,
}
stationEntries = append(stationEntries, stationEntry)
}
}
return &stationEntries
}

55
corpus/update.go Normal file
View File

@ -0,0 +1,55 @@
package corpus
import (
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/nrod"
"go.uber.org/zap"
)
// Runs all stages of the CORPUS Update process
func RunCorpusUpdate(cfg *helpers.Configuration) error {
resp, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Error("Failed to fetch CORPUS data", zap.Error(err))
return err
}
unsortedCorpusData, err := parseCorpusData(resp)
if err != nil {
log.Error("Error parsing Corpus data", zap.Error(err))
return err
}
corpusData := pruneCorpusEntries(*unsortedCorpusData)
//stationData := createStationEntries(corpusData)
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
log.Warn("CORPUS data may be incomplete")
log.Error("Error dropping CORPUS Data", zap.Error(err))
return err
}
//if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
// log.Warn("Stations data may be incomplete")
// log.Error("Error dropping stations Data", zap.Error(err))
// return err
//}
if err := dbAccess.PutManyCorpus(corpusData); err != nil {
log.Warn("CORPUS data may be incomplete")
log.Error("Error inserting CORPUS Data", zap.Error(err))
return err
}
//if err := dbAccess.PutManyStations(stationData); err != nil {
// log.Warn("Stations data may be incomplete")
// log.Error("Error inserting stations data", zap.Error(err))
// return err
//}
if err := dbAccess.CreateCorpusIndexes(); err != nil {
log.Error("Corpus Indexes creation failed, application speed will be reduced", zap.Error(err))
}
return nil
}

32
dbAccess/access.go Normal file
View File

@ -0,0 +1,32 @@
package dbAccess
import (
"context"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Pushes the current version of this application to the data base 'versions' collection.
func PushVersionToDb() {
version := database.Version{
Target: "timetable-mgr",
Component: "timetable-mgr",
Version: helpers.Version,
}
versionSelector := database.VersionSelector{
Target: "timetable-mgr",
Component: "timetable-mgr",
}
opts := options.Update().SetUpsert(true)
coll := MongoClient.Database("owlboard").Collection("versions")
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
if err != nil {
log.Warn("Unable to push version to database: " + err.Error())
} else {
log.Debug("Version up to date in Database")
}
}

204
dbAccess/cif.go Normal file
View File

@ -0,0 +1,204 @@
package dbAccess
import (
"context"
"errors"
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
const Doctype = "CifMetadata"
// The type describing the CifMetadata 'type' in the database.
// This type will be moved to owlboard/go-types
type CifMetadata struct {
Doctype string `bson:"type"`
LastUpdate time.Time `bson:"lastUpdate"`
LastTimestamp int64 `bson:"lastTimestamp"`
LastSequence int64 `bson:"lastSequence"`
LastUpdateType string `bson:"lastUpdateType"`
}
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
func GetCifMetadata() (*CifMetadata, error) {
database := MongoClient.Database(DatabaseName)
collection := database.Collection(MetaCollection)
filter := bson.M{"type": Doctype}
var result CifMetadata
err := collection.FindOne(context.Background(), filter).Decode(&result)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}
return nil, err
}
log.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
return &result, nil
}
// Uses upsert to Insert/Update the CifMetadata in the database
func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
database := MongoClient.Database(DatabaseName)
collection := database.Collection(MetaCollection)
options := options.Update().SetUpsert(true)
filter := bson.M{"type": Doctype}
update := bson.M{
"$set": bson.M{
"type": Doctype,
"lastUpdate": metadata.LastUpdate,
"lastTimestamp": metadata.LastTimestamp,
"lastSequence": metadata.LastSequence,
"lastUpdateType": lastUpdateType,
},
}
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Error("Error updating CIF Metadata", zap.Error(err))
return false
}
log.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
return true
}
// Handles 'Delete' tasks from CIF Schedule updates, accepts DeleteQuery types and batches deletions.
func DeleteCifEntries(deletions []database.DeleteQuery) error {
defer func() {
if r := recover(); r != nil {
log.Panic("Panic:", zap.Any("panic", r))
}
}()
// Skip if deletions is empty
if len(deletions) == 0 {
log.Info("No deletions required")
return nil
}
log.Debug("Running deletions against database", zap.Int("count", len(deletions)))
// Prepare deletion tasks
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
for _, deleteQuery := range deletions {
filter := bson.M{
"trainUid": deleteQuery.TrainUid,
"scheduleStartDate": deleteQuery.ScheduleStartDate,
"stpIndicator": deleteQuery.StpIndicator,
}
bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter))
}
bulkWriteOptions := options.BulkWrite().SetOrdered(false)
_, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions)
if err != nil {
return err
}
return nil
}
// Clears all non-vstp services from the database. Used when a CIF full download is required.
func PurgeNonVstp() (int64, error) {
coll := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
filter := bson.M{"serviceDetail.vstp": false}
result, err := coll.DeleteMany(context.Background(), filter)
if err != nil {
return result.DeletedCount, err
}
return result.DeletedCount, nil
}
// Handles 'Create' tasks for CIF Schedule updates, accepts Service structs and batches their creation.
func CreateCifEntries(schedules []database.Service) error {
// Skip if deletions is empty
if len(schedules) == 0 {
log.Info("No creations required")
return nil
}
log.Debug("Running creations against database", zap.Int("count", len(schedules)))
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
models := make([]mongo.WriteModel, 0, len(schedules))
for _, s := range schedules {
model := mongo.NewInsertOneModel().SetDocument(s)
models = append(models, model)
}
bulkWriteOptions := options.BulkWrite().SetOrdered(false)
_, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil {
return err
}
return nil
}
// Removes any schedules which ended before 'cutoff'
func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) {
// Define filter
filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}}
collection := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
res, err := collection.DeleteMany(context.Background(), filter)
if err != nil {
return // Automatically returns named values
}
count = res.DeletedCount
return // Automatically returns names values
}
// Creates indexes on the Timetable collection... Index suitability needs checking.
func CreateTimetableIndexes() error {
log.Info("Creating timetable indexes")
coll := MongoClient.Database(DatabaseName).Collection(TimetableCollection)
indexModels := []mongo.IndexModel{
{
Keys: bson.D{
{Key: "trainUid", Value: 1},
{Key: "stpIndicator", Value: 1},
{Key: "scheduleStartDate", Value: 1},
},
Options: options.Index().SetName("delete_query"),
}, // The find by UID Query can make use of the delete_query index
{
Keys: bson.D{
{Key: "headcode", Value: 1},
{Key: "daysRun", Value: 1},
{Key: "scheduleStartDate", Value: 1},
{Key: "scheduleEndDate", Value: 1},
},
Options: options.Index().SetName("find_by_headcode"),
},
{
Keys: bson.D{
{Key: "serviceDetail.vstp", Value: 1},
},
Options: options.Index().SetName("vstp"),
},
}
_, err := coll.Indexes().CreateMany(context.Background(), indexModels)
if err != nil {
return err
}
return nil
}

79
dbAccess/client.go Normal file
View File

@ -0,0 +1,79 @@
package dbAccess
import (
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Provide the DB Connection to other functions
var MongoClient (*mongo.Client)
// Builds the DB URI based on the loaded configuration parameters
func getDbUri(cfg *helpers.Configuration) string {
var uri = "mongodb://" + cfg.DbUser + ":" + cfg.DbPass + "@" + cfg.DbHost + ":" + cfg.DbPort
return uri
}
// Configure bsonOpts
var bsonOpts = &options.BSONOptions{
UseJSONStructTags: true,
}
func InitDataAccess(cfg *helpers.Configuration) {
log.Debug("Starting database connection")
url := getDbUri(cfg)
const maxRetries = 8
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Info("Attempting to connect to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(url).SetBSONOptions(bsonOpts))
if err != nil {
log.Warn("Error connecting to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
cancel()
if attempt != maxRetries {
helpers.BackoffDelay(attempt)
}
continue
}
err = client.Ping(ctx, nil)
if err != nil {
log.Warn("Error pinging database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
cancel()
if attempt != maxRetries {
helpers.BackoffDelay(attempt)
}
continue
}
MongoClient = client
log.Info("Database connection successful")
return
}
log.Fatal("Failed to connect to database on multiple attempts", zap.Int("attempts", maxRetries))
}
// Closes the connection to the database - used for cleanup functions
func CloseMongoClient() {
if MongoClient != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := MongoClient.Disconnect(ctx); err != nil {
log.Warn("Error disconnecting MongoDB client: " + err.Error())
} else {
log.Info("MongoDB client disconnected.")
}
}
}

72
dbAccess/common.go Normal file
View File

@ -0,0 +1,72 @@
package dbAccess
import (
"context"
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// CAUTION: Drops the collection named in collectionName
func DropCollection(collectionName string) error {
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
database := MongoClient.Database(DatabaseName)
collection := database.Collection(collectionName)
err := collection.Drop(context.Background())
if err != nil {
log.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
return err
}
return nil
}
// Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection
func CheckUpdateTime(collectionName string) (int64, error) {
database := MongoClient.Database(DatabaseName)
collection := database.Collection(MetaCollection)
filter := bson.D{
{Key: "target", Value: collectionName},
{Key: "type", Value: "collection"},
}
var result struct {
Updated int64 `bson:"updated"`
}
err := collection.FindOne(context.Background(), filter).Decode(&result)
if err != nil {
return 0, err
}
return result.Updated, nil
}
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
func SetUpdateTime(collectionName string) error {
log.Info("Setting update time", zap.String("collection", collectionName))
database := MongoClient.Database(DatabaseName)
collection := database.Collection("meta")
options := options.Update().SetUpsert(true)
updateTime := time.Now().Unix()
filter := bson.M{
"target": collectionName,
"type": "collection",
}
update := bson.M{
"$set": bson.M{
"updated": updateTime,
"updated_time": time.Now().In(time.UTC),
"target": collectionName,
"type": "collection",
},
}
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
return err
}
return nil
}

8
dbAccess/contants.go Normal file
View File

@ -0,0 +1,8 @@
package dbAccess
const DatabaseName string = "owlboard"
const CorpusCollection string = "corpus"
const StationsCollection string = "stations"
const MetaCollection string = "meta"
const TimetableCollection string = "timetable"
const PisCollection string = "pis"

167
dbAccess/corpus.go Normal file
View File

@ -0,0 +1,167 @@
package dbAccess
import (
"context"
"strings"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Puts an array of Corpus Documents into the database
func PutManyCorpus(corpusData *[]database.CorpusEntry) error {
collection := MongoClient.Database(DatabaseName).Collection(CorpusCollection)
documents := convertCorpusToInterfaceSlice(corpusData)
_, err := collection.InsertMany(context.Background(), *documents)
if err != nil {
return err
}
SetUpdateTime(CorpusCollection)
return nil
}
// Puts an array of Stations documents into the database
func PutManyStations(stationsData *[]database.StationEntry) error {
collection := MongoClient.Database(DatabaseName).Collection(StationsCollection)
documents := convertStationsToInterfaceSlice(stationsData)
_, err := collection.InsertMany(context.Background(), *documents)
if err != nil {
return err
}
SetUpdateTime(StationsCollection)
return nil
}
// Converts []database.CorpusEntry types into interface slices required to put them into the database
func convertCorpusToInterfaceSlice(corpusData *[]database.CorpusEntry) *[]interface{} {
interfaceSlice := make([]interface{}, len(*corpusData))
for i, doc := range *corpusData {
interfaceSlice[i] = doc
}
return &interfaceSlice
}
// Converts []database.StationEntry types into interface slices required to put them into the database
func convertStationsToInterfaceSlice(stationsData *[]database.StationEntry) *[]interface{} {
interfaceSlice := make([]interface{}, len(*stationsData))
for i, doc := range *stationsData {
interfaceSlice[i] = doc
}
return &interfaceSlice
}
func CreateCorpusIndexes() error {
coll := MongoClient.Database(DatabaseName).Collection(CorpusCollection)
indexModels := []mongo.IndexModel{
{
Keys: bson.M{
"tiploc": 1,
},
Options: options.Index().SetName("tiploc"),
},
{
Keys: bson.M{
"3alpha": 1,
},
Options: options.Index().SetName("3alpha"),
},
}
_, err := coll.Indexes().CreateMany(context.Background(), indexModels)
if err != nil {
return err
}
return nil
}
func GetTiplocFromCrs(crs string) (tiploc string, err error) {
// Return TIPLOC from CRS code
// TEMP FIX: WPH does not return a tiploc
if crs == "WPH" {
return "WORCPHL", nil
}
crs = strings.ToUpper(crs)
// PIPELINE:
pipeline := bson.A{
bson.D{{"$match", bson.D{{"3ALPHA", crs}}}},
bson.D{
{"$project",
bson.D{
{"TIPLOC", 1},
{"_id", 0},
},
},
},
}
coll := MongoClient.Database(DatabaseName).Collection(StationsCollection)
cursor, err := coll.Aggregate(context.Background(), pipeline)
if err != nil {
return "", err
}
defer cursor.Close(context.Background())
var result struct {
TIPLOC string `bson:"TIPLOC"`
}
if cursor.Next(context.Background()) {
if err := cursor.Decode(&result); err != nil {
return "", err
}
return result.TIPLOC, nil
}
log.Warn("No TIPLOC Found", zap.String("CRS", crs))
return "", nil
}
func GetStanoxFromCrs(crs string) (stanox string, err error) {
// Return STANOX from CRS code
crs = strings.ToUpper(crs)
// PIPELINE:
pipeline := bson.A{
bson.D{{"$match", bson.D{{"3ALPHA", crs}}}},
bson.D{
{"$project",
bson.D{
{"STANOX", 1},
{"_id", 0},
},
},
},
}
coll := MongoClient.Database(DatabaseName).Collection(StationsCollection)
cursor, err := coll.Aggregate(context.Background(), pipeline)
if err != nil {
return "", err
}
defer cursor.Close(context.Background())
var result struct {
STANOX string `bson:"STANOX"`
}
if cursor.Next(context.Background()) {
if err := cursor.Decode(&result); err != nil {
return "", err
}
return result.STANOX, nil
}
log.Warn("No STANOX Found", zap.String("CRS", crs))
return "", nil
}

88
dbAccess/pis.go Normal file
View File

@ -0,0 +1,88 @@
package dbAccess
import (
"context"
"errors"
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
func GetPisMetadata() (*database.PisMetadata, error) {
coll := MongoClient.Database(DatabaseName).Collection(MetaCollection)
filter := bson.M{"type": "PisMetadata"}
var result database.PisMetadata
err := coll.FindOne(context.Background(), filter).Decode(&result)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}
return nil, err
}
return &result, nil
}
func PutPisMetadata(version string) error {
coll := MongoClient.Database(DatabaseName).Collection(MetaCollection)
options := options.Update().SetUpsert(true)
filter := bson.M{"type": "PisMetadata"}
update := bson.M{
"$set": bson.M{
"type": "PisMetadata",
"lastUpdate": time.Now().Format(time.RFC3339),
"lastVersion": version,
},
}
_, err := coll.UpdateOne(context.Background(), filter, update, options)
if err != nil {
return err
}
log.Info("New Stations Metadata written", zap.String("version", version))
return nil
}
// Puts complete PIS dataset to database
func PutPisData(pis *[]database.PIS) (int64, error) {
coll := MongoClient.Database(DatabaseName).Collection("pis_testing")
var docs []interface{}
for _, entry := range *pis {
docs = append(docs, entry)
}
res, err := coll.InsertMany(context.TODO(), docs)
if err != nil {
return 0, err
}
return int64(len(res.InsertedIDs)), nil
}
func CreatePisIndeces() error {
coll := MongoClient.Database(DatabaseName).Collection("pis_testing")
crsIndex := mongo.IndexModel{
Keys: bson.D{{"stops", 1}},
}
tiplocIndex := mongo.IndexModel{
Keys: bson.D{{"tiplocs", 1}},
}
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{crsIndex, tiplocIndex})
if err != nil {
return err
}
return nil
}

110
dbAccess/stations.go Normal file
View File

@ -0,0 +1,110 @@
package dbAccess
import (
"context"
"errors"
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
const StationsMetaDoctype string = "StationsMetadata"
type StationsMetadata struct {
Doctype string
LastUpdate time.Time
}
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
func GetStationsMetadata() (*StationsMetadata, error) {
database := MongoClient.Database(DatabaseName)
collection := database.Collection(MetaCollection)
filter := bson.M{"type": StationsMetaDoctype}
var result StationsMetadata
err := collection.FindOne(context.Background(), filter).Decode(&result)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}
return nil, err
}
log.Debug("Fetched Stations Metadata from database", zap.Any("Metadata", result))
return &result, nil
}
func SetStationsMetadata(time time.Time) bool {
database := MongoClient.Database(DatabaseName)
collection := database.Collection(MetaCollection)
options := options.Update().SetUpsert(true)
filter := bson.M{"type": StationsMetaDoctype}
update := bson.M{
"$set": bson.M{
"type": StationsMetaDoctype,
"lastUpdate": time,
},
}
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Error("Error updating Stations Metadata", zap.Error(err))
return false
}
log.Info("New Stations Metadata written", zap.Time("Update time", time))
return true
}
// Puts an array of Stations documents into the database
func PutManyNewStations(stationsData *[]database.Station) error {
collection := MongoClient.Database(DatabaseName).Collection(StationsCollection)
documents := convertNewStationsToInterfaceSlice(stationsData)
_, err := collection.InsertMany(context.Background(), *documents)
if err != nil {
return err
}
return nil
}
// Converts []database.Station types into interface slices required to put them into the database
func convertNewStationsToInterfaceSlice(stationsData *[]database.Station) *[]interface{} {
interfaceSlice := make([]interface{}, len(*stationsData))
for i, doc := range *stationsData {
interfaceSlice[i] = doc
}
return &interfaceSlice
}
func CreateStationIndeces() error {
coll := MongoClient.Database(DatabaseName).Collection(StationsCollection)
locationIndex := mongo.IndexModel{
Keys: bson.D{{"location", "2dsphere"}},
Options: nil,
}
crsIndex := mongo.IndexModel{
Keys: bson.D{{"3ALPHA", 1}},
}
tiplocIndex := mongo.IndexModel{
Keys: bson.D{{"TIPLOC", 1}},
}
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{locationIndex, crsIndex, tiplocIndex})
if err != nil {
return err
}
return nil
}

6
dbAccess/types.go Normal file
View File

@ -0,0 +1,6 @@
package dbAccess
// This file should define types used within dbAccess.
// Any types representing database or upstream API resources should
// instead be defined in git.fjla.uk/owlboard/go-types and also be
// reflected in git.fjla.uk/owlboard/ts-types

27
go.mod Normal file
View File

@ -0,0 +1,27 @@
module git.fjla.uk/owlboard/timetable-mgr
go 1.22
toolchain go1.22.4
require (
git.fjla.uk/owlboard/go-types v1.1.11
github.com/go-stomp/stomp/v3 v3.1.2
go.mongodb.org/mongo-driver v1.17.1
go.uber.org/zap v1.27.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/text v0.19.0 // indirect
)

View File

@ -1,77 +1,68 @@
git.fjla.uk/owlboard/go-types v0.0.0-20230721082911-9a574276d572 h1:shnlNyIV1jG+xQsg5zCt2fEjiDzCQQeDTjTFuKZa97c=
git.fjla.uk/owlboard/go-types v0.0.0-20230721082911-9a574276d572/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
git.fjla.uk/owlboard/go-types v0.0.0-20230727191457-d15ddc556312 h1:IolAJJTttdcmykOI73Zjfh3V8Gd01l9TrM+OmliM4h0=
git.fjla.uk/owlboard/go-types v0.0.0-20230727191457-d15ddc556312/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
git.fjla.uk/owlboard/go-types v0.0.0-20230727192011-171bd3eafd83 h1:q+I66M4YVRnKwdyYqcwou7TTniM1uwUSh3Bpa8SDLuM=
git.fjla.uk/owlboard/go-types v0.0.0-20230727192011-171bd3eafd83/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
git.fjla.uk/owlboard/go-types v1.1.11 h1:EKIPcSHymmiTBTj5/NmSoe7ycv9YnUK2hjriRmWJl7Y=
git.fjla.uk/owlboard/go-types v1.1.11/go.mod h1:kG+BX9UF+yJaAVnln/QSKlTdrtKRRReezMeSk1ZLMzY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-stomp/stomp/v3 v3.0.5 h1:yOORvXLqSu0qF4loJjfWrcVE1o0+9cFudclcP0an36Y=
github.com/go-stomp/stomp/v3 v3.0.5/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10=
github.com/go-stomp/stomp/v3 v3.1.2 h1:kmrNek021BsFUO8rxDhbkOYslRomKO/JIrUCIqyL0r8=
github.com/go-stomp/stomp/v3 v3.1.2/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.12.0 h1:aPx33jmn/rQuJXPQLZQ8NtfPQG8CaqgLThFtqRb0PiE=
go.mongodb.org/mongo-driver v1.12.0/go.mod h1:AZkxhPnFJUoH7kZlFkVKucV20K387miPfm7oimrSmK0=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM=
go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -80,20 +71,19 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

18
helpers/backoff.go Normal file
View File

@ -0,0 +1,18 @@
package helpers
import (
"math"
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Implements an exponential backoff strategy and sleeps for a duration calculated as 1 second to the power of (attempt - 1).
// The backoff time doubles with each attempt, starting from 1 second for the first attempt.
func BackoffDelay(attempt int) {
base := time.Second
backoff := base * time.Duration(math.Pow(2, float64(attempt-1)))
log.Info("Retry backoff", zap.Duration("delay", backoff))
time.Sleep(backoff)
}

9
helpers/basicAuth.go Normal file
View File

@ -0,0 +1,9 @@
package helpers
import "encoding/base64"
// Provides a BasicAuth string
func BasicAuth(username, password string) string {
authString := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(authString))
}

View File

@ -5,9 +5,9 @@ import (
)
// Version Constants
const versionNum string = "2023.7.1"
const versionSuffix string = "beta"
const Version string = versionNum + "-" + versionSuffix
const versionNum string = "2024.11.4"
const versionSuffix string = ""
const Version string = versionNum + versionSuffix
// Environment Variables
var Runtime string = getRuntime()

132
helpers/config_loader.go Normal file
View File

@ -0,0 +1,132 @@
package helpers
import (
"errors"
"fmt"
"os"
"strings"
)
type ConfigParameter struct {
EnvVarName string `json:"env_var_name"`
ConfFilePath string `json:"conf_file_path"`
DefaultValue string `json:"default_value"`
FailIfAbsent bool `json:"fail_if_absent"`
}
type Configuration struct {
VstpOn bool `json:"vstp_on"`
NrodUser string `json:"nrod_user"`
NrodPass string `json:"nrod_pass"`
DbHost string `json:"db_host"`
DbPass string `json:"db_pass"`
DbUser string `json:"db_user"`
DbPort string `json:"db_port"`
}
func LoadConfig() (*Configuration, error) {
configParams := map[string]ConfigParameter{
"vstp_on": {
EnvVarName: "OWL_VSTP_ON",
ConfFilePath: "/owl/conf/vstp/on",
DefaultValue: "on",
FailIfAbsent: false,
},
"nrod_user": {
EnvVarName: "OWL_NROD_USER",
ConfFilePath: "/owl/conf/nrod/user",
FailIfAbsent: true,
},
"nrod_pass": {
EnvVarName: "OWL_NROD_PASS",
ConfFilePath: "/owl/conf/nrod/pass",
FailIfAbsent: true,
},
"db_host": {
EnvVarName: "OWL_DB_HOST",
ConfFilePath: "/owl/conf/db/host",
DefaultValue: "localhost",
FailIfAbsent: false,
},
"db_port": {
EnvVarName: "OWL_DB_PORT",
ConfFilePath: "/owl/conf/db/port",
DefaultValue: "27017",
FailIfAbsent: false,
},
"db_user": {
EnvVarName: "OWL_DB_USER",
ConfFilePath: "/owl/conf/db/user",
FailIfAbsent: true,
},
"db_pass": {
EnvVarName: "OWL_DB_PASS",
ConfFilePath: "/owl/conf/db/pass",
FailIfAbsent: true,
},
}
config := &Configuration{}
for key, param := range configParams {
if val, ok := os.LookupEnv(param.EnvVarName); ok {
config.setConfigValue(key, val)
continue
}
if data, err := os.ReadFile(param.ConfFilePath); err == nil {
config.setConfigValue(key, string(data))
continue
}
if param.DefaultValue != "" {
config.setConfigValue(key, param.DefaultValue)
continue
}
if param.FailIfAbsent {
return nil, errors.New("Failed to load configuration: " + key + " is required but not set")
}
}
return config, nil
}
// Applies configuration strings to the configuration struct
func (c *Configuration) setConfigValue(key, value string) {
value = strings.TrimSpace(value)
switch key {
case "nrod_user":
c.NrodUser = value
case "nrod_pass":
c.NrodPass = value
case "db_host":
c.DbHost = value
case "db_port":
c.DbPort = value
case "db_user":
c.DbUser = value
case "db_pass":
c.DbPass = value
case "vstp_on":
if value == "on" {
c.VstpOn = true
} else {
c.VstpOn = false
}
}
}
// Provides a method to print the configuration struct. Only when the DEBUG env is set to true
func (c *Configuration) PrintConfig() {
if os.Getenv("debug") == "true" {
fmt.Println("Configuration:")
fmt.Println("VstpOn: ", c.VstpOn)
fmt.Println("NrodUser: ", c.NrodUser)
fmt.Println("NrodPass: ", c.NrodPass)
fmt.Println("DbHost: ", c.DbHost)
fmt.Println("DbUser: ", c.DbUser)
fmt.Println("DbPass: ", c.DbPass)
fmt.Println("DbPort: ", c.DbPort)
}
}

View File

@ -2,7 +2,6 @@ package helpers
// An error with the VSTP messages is that speed values are shown incorrectly, but not for all services
// This maps the displayed speed to the correct speed.
var SpeedMap = map[string]string{
"22": "10",
"34": "15",

77
log/log.go Normal file
View File

@ -0,0 +1,77 @@
package log
import (
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Use outside `package log`
// should be avoided.
var post *zap.Logger
// Initialises the logger
func InitLogger() {
var err error
mode := os.Getenv("runtime")
if mode == "" {
mode = "prod"
}
var level zapcore.Level
if mode == "debug" {
level = zap.DebugLevel
} else {
level = zap.InfoLevel
}
config := zap.NewDevelopmentConfig()
config.DisableStacktrace = true
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.Level = zap.NewAtomicLevelAt(level)
post, err = config.Build(zap.AddCallerSkip(1))
if err != nil {
panic("Failed to initialize logger: " + err.Error())
}
defer post.Sync()
Info("Logger initialised", zap.String("level", level.String()), zap.String("runtime", mode))
}
// Logs message at info level
func Info(msg string, fields ...zap.Field) {
post.Info(msg, fields...)
}
// Logs message at debug level
func Debug(msg string, fields ...zap.Field) {
post.Debug(msg, fields...)
}
// Logs message at warn level
func Warn(msg string, fields ...zap.Field) {
post.Warn(msg, fields...)
}
// Logs message at error level
func Error(msg string, fields ...zap.Field) {
post.Error(msg, fields...)
}
// Logs message at fatal level then call os.exit(1)
func Fatal(msg string, fields ...zap.Field) {
post.Fatal(msg, fields...)
}
// Logs message at panic level the panics
func Panic(msg string, fields ...zap.Field) {
post.Panic(msg, fields...)
}
// Flushes log messages
func Cleanup() {
Info("Flushing log messages")
post.Sync()
}

150
main.go Normal file
View File

@ -0,0 +1,150 @@
package main
import (
"fmt"
"os"
"os/signal"
"os/user"
"syscall"
"time"
_ "time/tzdata"
"git.fjla.uk/owlboard/timetable-mgr/background"
"git.fjla.uk/owlboard/timetable-mgr/cif"
"git.fjla.uk/owlboard/timetable-mgr/corpus"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging"
"git.fjla.uk/owlboard/timetable-mgr/pis"
"git.fjla.uk/owlboard/timetable-mgr/stations"
"git.fjla.uk/owlboard/timetable-mgr/vstp"
"go.uber.org/zap"
)
const (
bold = "\033[1m"
redB = "\033[1;31m"
blue = "\033[32m" //Actually green!
cyan = "\033[36m"
reset = "\033[0m"
)
func init() {
printStartupBanner()
fmt.Printf("%sVersion %s \n\n%s", bold+blue, helpers.Version, reset)
// Exits is being run as root
// not necessary and not secure
checkRunAsRoot()
}
func main() {
log.InitLogger()
defer log.Cleanup()
log.Info("Initialising OwlBoard timetable-mgr", zap.String("version", helpers.Version))
cfg, err := helpers.LoadConfig()
if err != nil {
log.Fatal("Unable to load configuration", zap.Error(err))
return
}
cfg.PrintConfig()
dbAccess.InitDataAccess(cfg)
dbAccess.PushVersionToDb()
// Initialise a `stop` channel to signal goroutines to cleanup
stop := make(chan struct{})
// Start CIF Task Ticker
background.InitTicker(cfg, stop)
// Handle signals from the OS
go handleSignals(cfg, stop)
// Manually call CIF and CORPUS checks to ensure that they are
// not delayed until the first ticker event.
go cif.CheckCif(cfg)
go corpus.CheckCorpus(cfg)
go stations.Check()
go pis.Check()
if cfg.VstpOn {
messaging.StompInit(cfg)
vstp.Subscribe()
}
<-stop
}
// Traps SIGINT and SIGTERM signals and ensures cleanup() is run
func handleSignals(cfg *helpers.Configuration, stop chan<- struct{}) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Warn("Signal received: " + sig.String())
cleanup(cfg, stop)
}
// Cleans up open connections ready for a clean exit of the program
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
log.Debug("Cleaning up open connections")
if cfg.VstpOn {
log.Info("Closing STOMP Client")
messaging.Disconnect(messaging.Client)
}
if dbAccess.MongoClient != nil {
log.Info("Closing MongoDB Client")
dbAccess.CloseMongoClient()
}
log.Info("Signalling to other goroutines")
close(stop)
log.Info("Program ready to exit")
time.Sleep(500 * time.Millisecond)
log.Cleanup()
os.Exit(0)
}
func printStartupBanner() {
art := `
___ _ ____ _
/ _ \__ _| | __ ) ___ __ _ _ __ __| |
| | | \ \ /\ / / | _ \ / _ \ / _' | '__/ _' |
| |_| |\ V V /| | |_) | (_) | (_| | | | (_| |
\___/ \_/\_/ |_|____/ \___/ \__,_|_| \__,_|
_ _ _ _ _
| |_(_)_ __ ___ ___| |_ __ _| |__ | | ___ _ __ ___ __ _ _ __
| __| | '_ ' _ \ / _ \ __/ _' | '_ \| |/ _ \_____| '_ ' _ \ / _' | '__|
| |_| | | | | | | __/ || (_| | |_) | | __/_____| | | | | | (_| | |
\__|_|_| |_| |_|\___|\__\__,_|_.__/|_|\___| |_| |_| |_|\__, |_|
|___/
`
fmt.Println(cyan + art + reset)
}
func checkRunAsRoot() {
uid := os.Getuid()
if uid != 0 {
currUser, err := user.Current()
var msg string
if err != nil {
msg = "Unable to determine which user is running the application."
} else {
msg = fmt.Sprintf("Running as user: %s, %s", currUser.Uid, currUser.Username)
}
fmt.Println(blue + msg + "\nRunning as non-root user" + reset)
return
}
fmt.Println(redB + "This program must not be run as the root user" + reset)
fmt.Println("")
os.Exit(1)
}

54
messaging/client.go Normal file
View File

@ -0,0 +1,54 @@
package messaging
import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"github.com/go-stomp/stomp/v3"
)
var Client *stomp.Conn
// Initialises the connection to the STOMP server
func StompInit(cfg *helpers.Configuration) {
Client = dial(cfg.NrodUser, cfg.NrodPass)
}
// Connects the STOMP file to the Network Rail MQ Server
func dial(user, pass string) *stomp.Conn {
conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618",
stomp.ConnOpt.Login(user, pass),
stomp.ConnOpt.HeartBeat(15*time.Second, 15*time.Second),
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
)
if err != nil {
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
conn.Disconnect()
}
log.Info("Initialised STOMP Client")
return conn
}
// Handles graceful disconnection of the STOMP client, falls back to
// a force disconnect if this fails.
func Disconnect(conn *stomp.Conn) {
if conn != nil {
err := conn.Disconnect()
log.Warn("Disconnected STOMP Client")
if err != nil {
conn.MustDisconnect()
log.Error("STOMP Disconnection failed, forced disconnect")
}
return
}
log.Error("STOMP Disconnect failed, next connection attempt may fail")
err := Client.Disconnect()
if err != nil {
Client.MustDisconnect()
log.Warn("STOMP Disconnect failed, forced disconnection")
}
log.Info("STOMP Client disconnected")
}

64
nrod/streams.go Normal file
View File

@ -0,0 +1,64 @@
package nrod
import (
"compress/gzip"
"errors"
"fmt"
"io"
"net/http"
"time"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Downloads NROD Data and extracts if GZIP, returns a io.Reader
func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
log.Debug("Fetching NROD data stream", zap.String("Request URL", url))
client := http.Client{
Timeout: time.Second * 300,
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass))
resp, err := client.Do(req)
if err != nil {
log.Error("Error carrying out HTTP Request", zap.Error(err))
return nil, err
}
if resp == nil {
err = errors.New("http response error - response = nil")
return nil, err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
return nil, err
}
// Run the data through the extractor function and return io.ReadCloser, error from
// that function directly
return NrodStreamExtract(resp)
}
func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) {
log.Debug("Extracting NROD Download")
log.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
gzReader, err := gzip.NewReader(resp.Body)
if err != nil {
log.Warn("Unable to create GZIP Reader, data probably not gzipped")
return resp.Body, err
}
return gzReader, nil
}

70
pis/check.go Normal file
View File

@ -0,0 +1,70 @@
package pis
import (
"encoding/json"
"fmt"
"io"
"net/http"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Checks the Gitea API for the latest release and compares to metadata in the database
// to determine if a PIS update is required.
func Check() {
log.Info("PIS Update Check started")
repoName := "data"
repoOwner := "owlboard"
baseUrl := "https://git.fjla.uk"
apiUrl := fmt.Sprintf("%s/api/v1/repos/%s/%s/releases/latest", baseUrl, repoOwner, repoName)
log.Info("Initiating PIS Check")
resp, err := http.Get(apiUrl)
if err != nil {
log.Error("Error GETting Gitea API", zap.Error(err))
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Error("Errot GETting Gitea API", zap.Error(fmt.Errorf("response status code %d", resp.StatusCode)))
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("Error reading API Response", zap.Error(err))
return
}
var apiResp GiteaReleaseData
err = json.Unmarshal(body, &apiResp)
if err != nil {
log.Error("Error unmarshalling API response", zap.Error(err))
return
}
oldMetadata, err := dbAccess.GetPisMetadata()
if err != nil {
log.Error("Error reading PIS Metadata from database")
return
}
if oldMetadata != nil {
if oldMetadata.LastVersion != apiResp.Name {
log.Info("PIS Data is up to date")
return
}
}
log.Info("PIS Data being updated")
err = runUpdate(apiResp.TarballUrl)
if err != nil {
log.Error("Error updating PIS Data", zap.Error(err))
}
}

100
pis/data.go Normal file
View File

@ -0,0 +1,100 @@
package pis
import (
"fmt"
"sort"
"strings"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
// Process the YAML data to a struct
func processYaml(yamlStr string) (*[]database.PIS, error) {
// Define 'container' struct
var pisData struct {
Pis []PisData `yaml:"pis"`
}
// Unmarshal the YAML data into the structure
err := yaml.Unmarshal([]byte(yamlStr), &pisData)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML: %v", err)
}
/* DISABLE DEDUPLICATION - Add 'dupe' flag and have backend handle duplicates?
// Perform deduplication on the entire pis slice
err = deduplicateCodes(&pisData.Pis)
if err != nil {
return nil, err
}
*/
documents, err := convertPisForDatabase(&pisData.Pis)
if err != nil {
return nil, err
}
return documents, nil
}
// Deduplicate data in place and return error if failed
func deduplicateCodes(pis *[]PisData) error {
uniqueStops := make(map[string]bool)
var dedupedPis []PisData
for _, data := range *pis {
stopsKey := stopsToString(data.Stops)
// If stopsKey does not exist, add to map
if _, exists := uniqueStops[stopsKey]; !exists {
uniqueStops[stopsKey] = true
dedupedPis = append(dedupedPis, data)
}
}
*pis = dedupedPis
return nil
}
// Join slice elements to a string for comparison
func stopsToString(stops []string) string {
sort.Strings(stops)
return strings.Join(stops, ",")
}
// Convert PisData to database.PIS
func convertPisForDatabase(in *[]PisData) (*[]database.PIS, error) {
var out []database.PIS
for _, code := range *in {
var document database.PIS
document.Code = code.Code
document.Operator = code.Toc
document.Stops = code.Stops
document.Tiplocs = GetTiplocsFromCrs(code.Stops)
out = append(out, document)
}
return &out, nil
}
// Return a list of TIPLOCS from a list of CRS using Database Lookups
func GetTiplocsFromCrs(crsList []string) (TiplocList []string) {
for _, crs := range crsList {
alpha := strings.ToUpper(crs)
tiploc, err := dbAccess.GetTiplocFromCrs(alpha)
if err != nil {
log.Warn("Unable to find matching TIPLOC for 3ALPHA", zap.String("3ALPHA", alpha), zap.Error(err))
return
}
TiplocList = append(TiplocList, strings.ToUpper(tiploc))
}
return
}

15
pis/types.go Normal file
View File

@ -0,0 +1,15 @@
package pis
// Omits data which is not required
type GiteaReleaseData struct {
Name string `json:"name"`
TarballUrl string `json:"tarball_url"`
Draft bool `json:"draft"`
Prerelease bool `json:"prerelease"`
}
type PisData struct {
Code string `json:"code"`
Stops []string `json:"stops"`
Toc string `json:"toc"`
}

221
pis/update.go Normal file
View File

@ -0,0 +1,221 @@
package pis
import (
"archive/tar"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
const (
destPath = "/tmp/pis.tar.gz"
extractPath = "/tmp/extract"
)
// Downloads the release tarball, extracts then applies to database
func runUpdate(tarballUrl string) error {
log.Info("PIS Update Process started")
err := downloadFile(tarballUrl, destPath)
if err != nil {
return err
}
// Extract to disk
file, err := os.Open(destPath)
if err != nil {
return err
}
log.Info("Saved PIS Release to disk")
defer file.Close()
if err := extractFiles(file, extractPath); err != nil {
return err
}
log.Info("Extracted PIS Release")
// Load YAML to string
pisData, err := extractYamlData(extractPath)
if err != nil {
return err
}
pisSlice, err := processYaml(pisData)
if err != nil {
return err
}
log.Info("Loaded PIS Files to Slice")
// Temporarily use "pis_testing" collection to ensure all works as expected
err = dbAccess.DropCollection("pis_testing")
if err != nil {
return err
}
count, err := dbAccess.PutPisData(pisSlice)
if err != nil {
return err
}
log.Info("Insterted new PIS Data", zap.Int64("PIS Codes", count))
err = dbAccess.CreatePisIndeces()
if err != nil {
log.Error("Failed to create PIS Indeces, poor performance expected", zap.Error(err))
}
// Cleanup files
cleanupFiles(destPath, extractPath)
return nil
}
// Download the tarball to disk
func downloadFile(url, filepath string) error {
resp, err := http.Get(url)
if err != nil {
return err
}
log.Info("PIS Release downloaded")
defer resp.Body.Close()
out, err := os.Create(filepath)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
return err
}
// Extract tarball to disk
func extractFiles(gzipStream io.Reader, dest string) error {
uncompressedStream, err := gzip.NewReader(gzipStream)
if err != nil {
return err
}
defer uncompressedStream.Close()
log.Info("Extracting PIS File")
tarReader := tar.NewReader(uncompressedStream)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Handle pax_global_header or other unsupported types
if header.Typeflag == tar.TypeXGlobalHeader || header.Name == "pax_global_header" {
// Skip this special header file
continue
}
filePath := filepath.Join(dest, header.Name)
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(filepath.Join(dest, header.Name), 0755); err != nil {
return err
}
case tar.TypeReg:
outFile, err := os.Create(filePath)
if err != nil {
return err
}
if _, err := io.Copy(outFile, tarReader); err != nil {
return err
}
outFile.Close()
default:
log.Warn("Unable to handle filetype", zap.String("Typeflag", string(header.Typeflag)), zap.String("Filename", header.Name))
}
}
return nil
}
// Return YAML PIS files as a string
func extractYamlData(dir string) (string, error) {
var allContent strings.Builder // Using a string builder to accumulate content
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err // Only returning error since Walk callback doesn't accept string
}
// Check if the path contains 'pis' and has a .yaml or .yml extension
if strings.Contains(path, "/pis/") && !info.IsDir() &&
(strings.HasSuffix(info.Name(), ".yaml") || strings.HasSuffix(info.Name(), ".yml")) {
log.Debug("Processing YAML", zap.String("directory", path))
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open YAML file %s: %v", path, err)
}
defer file.Close()
content, err := io.ReadAll(file)
if err != nil {
return fmt.Errorf("failed to read YAML file %s: %v", path, err)
}
// Accumulate content from each YAML file in 'pis' directory
allContent.Write(content)
allContent.WriteString("\n") // Add a newline between file contents
}
return nil
})
if err != nil {
return "", err
}
// Return the accumulated content as a single string
return allContent.String(), nil
}
// Cleans up downloaded and extracted files
func cleanupFiles(paths ...string) {
for _, path := range paths {
err := os.RemoveAll(path)
if err != nil {
log.Warn("Error removing file", zap.String("path", path), zap.Error(err))
}
}
log.Info("Removed PIS Release files")
}
// Saves pisSlice to file
func DEBUG_saveSliceToFile(pisSlice interface{}, filename string) error {
data, err := json.MarshalIndent(pisSlice, "", " ")
if err != nil {
return fmt.Errorf("error marshalling slice to JSON: %v", err)
}
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("error creating file: %v", err)
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
return fmt.Errorf("error writing JSON to file: %v", err)
}
return nil
}

View File

@ -1,59 +0,0 @@
package dbAccess
import (
"context"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/mq-client/helpers"
"git.fjla.uk/owlboard/mq-client/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
const timetableCollection string = "timetable"
const databaseName string = "owlboard"
func init() {
version := database.Version{
Target: "mq-client",
Component: "mq-client",
Version: helpers.Version,
}
versionSelector := database.VersionSelector{
Target: "mq-client",
Component: "mq-client",
}
opts := options.Update().SetUpsert(true)
coll := MongoClient.Database("owlboard").Collection("versions")
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
if err != nil {
log.Msg.Warn("Unable to push version to database: " + err.Error())
} else {
log.Msg.Debug("Version up to date in Database")
}
}
func PutOneService(data database.Service) bool {
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
_, err := coll.InsertOne(context.TODO(), data)
if err != nil {
log.Msg.Error("Unable to insert to database: " + err.Error())
return false
}
return true
}
func DeleteOneService(data database.DeleteQuery) bool {
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
var filter = bson.D{
{Key: "trainUid", Value: data.TrainUid},
{Key: "stpIndicator", Value: data.StpIndicator},
{Key: "scheduleStartDate", Value: data.ScheduleStartDate},
}
_, err := coll.DeleteOne(context.TODO(), filter)
if err != nil {
log.Msg.Error("Unable to delete service: " + err.Error())
return false
}
return true
}

View File

@ -1,72 +0,0 @@
package dbAccess
import (
"git.fjla.uk/owlboard/mq-client/log"
"context"
"fmt"
"os"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Generate DB Url
var dbUri string = getDbUri()
func getDbUri() string {
log.Msg.Debug("Fetching DB Access details")
var dbHost string = os.Getenv("OWL_DB_HOST")
if dbHost == "" {
dbHost = "localhost"
}
var dbPort string = os.Getenv("OWL_DB_PORT")
if dbPort == "" {
dbPort = "27017"
}
var dbUser string = os.Getenv("OWL_DB_USER")
if dbUser == "" {
dbUser = "owl"
}
var dbPass string = os.Getenv("OWL_DB_PASS")
if dbPass == "" {
dbPass = "twittwoo"
}
var uri = "mongodb://" + dbUser + ":" + dbPass + "@" + dbHost + ":" + dbPort
return uri
}
// Provide the DB Connection to other functions
var MongoClient (*mongo.Client) = initDataAccess()
// Configure bsonOpts
var bsonOpts = &options.BSONOptions{
UseJSONStructTags: true,
}
// Initialise the DB Connection
func initDataAccess() *mongo.Client {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(dbUri).SetBSONOptions(bsonOpts))
if err != nil {
fmt.Println(err)
log.Msg.Fatal("Error connecting to database: " + err.Error())
} else {
log.Msg.Info("Database connection successful")
}
return client
}
func CloseMongoClient() {
if MongoClient != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := MongoClient.Disconnect(ctx); err != nil {
log.Msg.Warn("Error disconnecting MongoDB client: " + err.Error())
} else {
log.Msg.Info("MongoDB client disconnected.")
}
}
}

View File

@ -1,25 +0,0 @@
module git.fjla.uk/owlboard/mq-client
go 1.19
require (
git.fjla.uk/owlboard/go-types v0.0.0-20230727192011-171bd3eafd83
github.com/go-stomp/stomp/v3 v3.0.5
go.mongodb.org/mongo-driver v1.12.0
go.uber.org/zap v1.24.0
)
require (
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/text v0.7.0 // indirect
)

View File

@ -1,35 +0,0 @@
package log
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"git.fjla.uk/owlboard/mq-client/helpers"
)
var Msg *zap.Logger
func init() {
var err error
// Create a custom configuration with a human-readable "Console" encoder
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder // Adds color to log levels
// Determine the log level based on the runtime mode
logLevel := zapcore.DebugLevel
if helpers.Runtime == "production" {
logLevel = zapcore.InfoLevel
}
// Set the log level
config.Level = zap.NewAtomicLevelAt(logLevel)
Msg, err = config.Build() // Potential source of the error
if err != nil {
panic("Failed to initialize logger: " + err.Error())
}
// Log the selected log level (optional, can be helpful for debugging)
Msg.Info("Log level set to: " + logLevel.String())
}

View File

@ -1,51 +0,0 @@
package main
import (
"os"
"os/signal"
"syscall"
"git.fjla.uk/owlboard/mq-client/dbAccess"
"git.fjla.uk/owlboard/mq-client/helpers"
"git.fjla.uk/owlboard/mq-client/log"
"git.fjla.uk/owlboard/mq-client/messaging"
"git.fjla.uk/owlboard/mq-client/vstp"
)
func main() {
log.Msg.Info("Initialised OwlBoard MQ Client " + helpers.Version)
defer cleanup()
go handleSignals()
vstp.Subscribe()
}
// Traps SIGINT and SIGTERM signals and ensures cleanup() is run
func handleSignals() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Msg.Warn("Signal received: " + sig.String())
cleanup()
os.Exit(1)
}
// Cleans up open connections ready for a clean exit of the program
func cleanup() {
log.Msg.Debug("Cleaning up open connections")
if messaging.Client != nil {
log.Msg.Info("Closing STOMP Client")
messaging.Disconnect(messaging.Client)
}
if dbAccess.MongoClient != nil {
log.Msg.Info("Closing MongoDB Client")
dbAccess.CloseMongoClient()
}
log.Msg.Info("Program ready to exit")
if log.Msg != nil {
log.Msg.Sync()
}
}

View File

@ -1,64 +0,0 @@
package messaging
import (
"os"
"git.fjla.uk/owlboard/mq-client/log"
"github.com/go-stomp/stomp/v3"
)
type nrodCredential struct {
user string
pass string
}
// Fetches credentials from environment variables and exits if none provided.
func getCredentials() nrodCredential {
var nrod_user string = os.Getenv("OWL_LDB_CORPUSUSER")
var nrod_pass string = os.Getenv("OWL_LDB_CORPUSPASS")
if nrod_user == "" || nrod_pass == "" {
log.Msg.Fatal("No NROD Credentials provided")
}
log.Msg.Debug("NROD Credentials loaded for user: " + nrod_user)
return nrodCredential{
user: nrod_user,
pass: nrod_pass,
}
}
var Client = dial()
// Connects the STOMP file to the Network Rail MQ Server
func dial() *stomp.Conn {
var credentials nrodCredential = getCredentials()
conn, err := stomp.Dial("tcp", "publicdatafeeds.networkrail.co.uk:61618",
stomp.ConnOpt.Login(credentials.user, credentials.pass),
stomp.ConnOpt.HeartBeat(15000, 15000),
stomp.ConnOpt.Header("client-id", credentials.user+"-mq-client"),
)
if err != nil {
log.Msg.Fatal("Unable to connect to STOMP Client: " + err.Error())
conn.MustDisconnect()
}
log.Msg.Info("Initialised STOMP Client")
return conn
}
// Handles graceful disconnection of the STOMP client, falls back to
// a force disconnect if this fails.
func Disconnect(conn *stomp.Conn) {
if conn != nil {
err := conn.Disconnect()
log.Msg.Warn("Disconnected STOMP Client")
if err != nil {
conn.MustDisconnect()
log.Msg.Error("STOMP Disconnection failed, forced disconnect")
}
return
}
log.Msg.Error("STOMP Disconnect failed, next connection attempt may fail")
}
// Register against the MQ Server and log each message for testing purposes

View File

@ -1,55 +0,0 @@
package vstp
import (
"fmt"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/mq-client/dbAccess"
"git.fjla.uk/owlboard/mq-client/log"
)
// Decide, based on the DB Formatted message type, what action needs taking
// then either insert, or delete from the database as required
func processEntryType(entry database.Service) {
switch entry.TransactionType {
case "Create":
createEntry(entry)
case "Update":
updateEntry(entry)
case "Delete":
deleteEntry(entry)
default:
log.Msg.Error("Unknown transaction type: " + entry.TransactionType)
}
}
func createEntry(entry database.Service) {
log.Msg.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
status := dbAccess.PutOneService(entry)
if status {
log.Msg.Info("Database entry created")
} else {
log.Msg.Error("Database entry failed, skipped service")
}
}
func updateEntry(entry database.Service) {
log.Msg.Info("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
}
func deleteEntry(entry database.Service) {
log.Msg.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
var deletionQuery = database.DeleteQuery{
TrainUid: entry.TrainUid,
ScheduleStartDate: entry.ScheduleStartDate,
StpIndicator: entry.StpIndicator,
}
status := dbAccess.DeleteOneService(deletionQuery)
if status {
log.Msg.Info("Database entry deleted")
} else {
log.Msg.Error("Database deletion failed, skipped deletion")
fmt.Printf("%+v\n", deletionQuery)
}
}

View File

@ -1,17 +0,0 @@
package vstp
import (
"fmt"
"git.fjla.uk/owlboard/mq-client/log"
"github.com/go-stomp/stomp/v3"
)
var count uint64 = 0
func handle(msg *stomp.Message) {
count++
log.Msg.Info("Messages since started: " + fmt.Sprint(count))
schedule := unmarshalData(string(msg.Body))
processEntryType(schedule)
}

View File

@ -1,158 +0,0 @@
package vstp
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/mq-client/helpers"
"git.fjla.uk/owlboard/mq-client/log"
)
// Unmarshals the JSON data and runs it through the formatData() function and returns the data in a DB ready Struct
func unmarshalData(jsonData string) database.Service {
var schedule upstreamApi.MsgData
err := json.Unmarshal([]byte(jsonData), &schedule)
if err != nil {
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
//return err
}
log.Msg.Debug("Unmarshalling Complete")
if schedule.Data.CIFMsg.ScheduleSegment == nil {
log.Msg.Warn("ScheduleSegment is nil")
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
log.Msg.Warn("ScheduleSegment is empty")
}
return formatData(&schedule.Data.CIFMsg)
}
// Transforms the upstreamApi.Schedule type into a database.Service type
func formatData(dataInput *upstreamApi.Schedule) database.Service {
log.Msg.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
log.Msg.Debug("Printing dataInput to console:")
var operator, headcode, powerType string
var planSpeed int32
var stops []database.Stop
// Check that the ScheduleSegment contains data, 'Delete' messages have no ScheduleSegment
if len(dataInput.ScheduleSegment) > 0 {
operator = dataInput.ScheduleSegment[0].ATOCCode
headcode = dataInput.ScheduleSegment[0].SignallingID
powerType = dataInput.ScheduleSegment[0].CIFPowerType
planSpeed = parseSpeed(dataInput.ScheduleSegment[0].CIFSpeed)
stops = parseStops(dataInput.ScheduleSegment[0].ScheduleLocation)
}
if operator == "" {
operator = "UK"
}
service := database.Service{
TransactionType: dataInput.TransactionType,
StpIndicator: dataInput.CIFSTPIndicator,
Vstp: true,
Operator: operator,
TrainUid: dataInput.CIFTrainUID,
Headcode: headcode,
PowerType: powerType,
PlanSpeed: planSpeed,
ScheduleStartDate: parseDate(dataInput.ScheduleStartDate, false),
ScheduleEndDate: parseDate(dataInput.ScheduleEndDate, true),
DaysRun: parseDaysRun(dataInput.ScheduleDaysRun),
Stops: stops,
}
return service
}
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
func parseSpeed(CIFSpeed string) int32 {
log.Msg.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
if CIFSpeed == "" {
log.Msg.Debug("Speed data not provided")
return int32(0)
}
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
if !exists {
actualSpeed = CIFSpeed
}
log.Msg.Debug("Corrected Speed: " + actualSpeed)
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Msg.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
return int32(0)
}
return int32(speed)
}
// Converts the date string provided from the upstream API into a proper Date type and adds a time
func parseDate(dateString string, end bool) time.Time {
log.Msg.Debug("Date Input: " + dateString)
date, err := time.Parse("2006-01-02", dateString)
if err != nil {
log.Msg.Error("Unable to parse date: " + dateString)
return time.Time{}
}
var hour, minute, second, nanosecond int
location := time.UTC
if end {
hour, minute, second, nanosecond = 23, 59, 59, 999999999
} else {
hour, minute, second, nanosecond = 0, 0, 0, 0
}
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
log.Msg.Debug("Parsed date: " + dateWithTime.String())
return dateWithTime
}
// Converts the binary stype 'daysRun' field into an array of short days
func parseDaysRun(daysBinary string) []string {
log.Msg.Debug("daysRun Input: " + daysBinary)
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
var result []string
for i, digit := range daysBinary {
if digit == '1' {
result = append(result, shortDays[i])
}
}
return result
}
// Converts an array if upstreamApi.ScheduleLocation types to an array of database.Stop types
func parseStops(inputStops []upstreamApi.ScheduleLocation) []database.Stop {
var stops []database.Stop
for _, loc := range inputStops {
stop := database.Stop{
PublicDeparture: parseTimeStrings(loc.PublicDepartureTime),
WttDeparture: parseTimeStrings(loc.ScheduledDepartureTime),
PublicArrival: parseTimeStrings(loc.PublicArrivalTime),
WttArrival: parseTimeStrings(loc.ScheduledArrivalTime),
IsPublic: strings.TrimSpace(loc.PublicDepartureTime) != "" || strings.TrimSpace(loc.PublicArrivalTime) != "",
Tiploc: loc.Tiploc.Tiploc.TiplocId,
}
stops = append(stops, stop)
}
return stops
}
func parseTimeStrings(t string) string {
if t == "" {
return t
}
strippedT := strings.TrimSpace(t)
if strippedT == "" {
return ""
} else {
return strippedT[:4]
}
}

View File

@ -1,39 +0,0 @@
package vstp
import (
"time"
"git.fjla.uk/owlboard/mq-client/log"
"git.fjla.uk/owlboard/mq-client/messaging"
"github.com/go-stomp/stomp/v3"
)
func Subscribe() {
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Msg.Fatal("Unable to start subscription: " + err.Error())
}
log.Msg.Info("Subscription to VSTP topic successful, listening")
go func() {
log.Msg.Debug("GOROUTINE: VSTP Message Handler Started")
defer func() {
if r := recover(); r != nil {
log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped")
time.Sleep(time.Second * 10)
log.Msg.Fatal("GOROUTINE: VSTP Message Handler Failed")
}
}()
for {
msg := <-sub.C
if msg.Err != nil {
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
} else {
log.Msg.Info("STOMP Message Received")
handle(msg)
}
}
}()
select {}
}

72
stations/check.go Normal file
View File

@ -0,0 +1,72 @@
package stations
import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
func Check() {
oldMetadata, err := dbAccess.GetStationsMetadata()
if err != nil {
log.Error("Error reading Stations metadata", zap.Error(err))
}
if oldMetadata == nil {
log.Info("No old metadata for stations, rebuilding")
} else {
timeSinceLastUpdate := time.Since(oldMetadata.LastUpdate)
if timeSinceLastUpdate <= time.Hour*24*7 {
log.Info("Stations update not required")
return
}
}
ok := run()
if !ok {
log.Error("Error updating Stations data")
}
}
func run() bool {
// Download
data, data2, err := download()
log.Info("Downloaded station data from two sources")
if err != nil {
log.Error("error downloading station data", zap.Error(err))
return false
}
// Parse
stations, err := parseData(data, data2)
log.Info("Parsed station data")
if err != nil {
log.Error("error parsing station data", zap.Error(err))
return false
}
// Drop
err = dbAccess.DropCollection("stations")
if err != nil {
log.Error("Error dropping stations collection", zap.Error(err))
}
// Push
err = dbAccess.PutManyNewStations(&stations)
if err != nil {
log.Error("Error putting new station data", zap.Error(err))
}
err = dbAccess.CreateStationIndeces()
if err != nil {
log.Error("Error creating station indeces", zap.Error(err))
}
ok := dbAccess.SetStationsMetadata(time.Now())
if !ok {
log.Warn("Error setting new metadata for Stations")
}
return true
}

81
stations/parse.go Normal file
View File

@ -0,0 +1,81 @@
package stations
import (
"bytes"
"encoding/xml"
"fmt"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
)
// Parses n number of XML byte arrays
func parseData(data ...[]byte) ([]database.Station, error) {
var stations []upstreamApi.Station
for _, d := range data {
parsedStations, err := parseXML(d)
if err != nil {
return nil, err
}
stations = append(stations, parsedStations...)
}
var output []database.Station
for _, s := range stations {
outputStation, err := convertApiToDatabase(s)
if err != nil {
return nil, err
}
output = append(output, outputStation)
}
return output, nil
}
// Parses XML and converts to struct
func parseXML(data []byte) ([]upstreamApi.Station, error) {
var stationList upstreamApi.StationList
reader := bytes.NewReader(data)
decoder := xml.NewDecoder(reader)
err := decoder.Decode(&stationList)
if err != nil {
return nil, fmt.Errorf("error parsing XML: %v", err)
}
return stationList.Stations, nil
}
// Convert API type to Database type ready for insertion
func convertApiToDatabase(data upstreamApi.Station) (database.Station, error) {
if data.CrsCode == "" {
return database.Station{}, fmt.Errorf("CRS code is required but missing")
}
tiploc, err := dbAccess.GetTiplocFromCrs(data.CrsCode)
if err != nil {
return database.Station{}, err
}
stanox, err := dbAccess.GetStanoxFromCrs(data.CrsCode)
if err != nil {
return database.Station{}, err
}
output := database.Station{
CRS: data.CrsCode,
TIPLOC: tiploc,
STANOX: stanox,
NLCDESC: data.Name,
Location: database.GeoJson{
Type: "Point",
Coordinates: []float64{
data.Longitude,
data.Latitude,
},
},
Operator: data.StationOperator,
}
return output, nil
}

93
stations/run.go Normal file
View File

@ -0,0 +1,93 @@
package stations
import (
"archive/zip"
"bytes"
"fmt"
"io"
"net/http"
)
const (
// URL to RDG XML file
url string = "https://internal.nationalrail.co.uk/4.0/stations.zip"
// URL to additional XML file
add string = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/knowledgebase/additional.xml"
)
func download() ([]byte, []byte, error) {
data1, err := downloadAndExtractZip(url)
if err != nil {
return nil, nil, err
}
data2, err := downloadUrl(add)
if err != nil {
return nil, nil, err
}
return data1, data2, nil
}
func downloadUrl(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to download from %s: status code: %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
func downloadAndExtractZip(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to download from %s: status code %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Read zip
reader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, err
}
// Check zip not empty
if len(reader.File) == 0 {
return nil, fmt.Errorf("no files found in the zip archive")
}
// Read first file
file := reader.File[0]
rc, err := file.Open()
if err != nil {
return nil, err
}
defer rc.Close()
extractedData, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
return extractedData, nil
}

14
test-db.compose.yaml Normal file
View File

@ -0,0 +1,14 @@
version: '3.8'
services:
mongodb:
image: mongo:7
ports:
- '27017:27017'
volumes:
- dbdata7:/data/db
environment:
- MONGO_INITDB_ROOT_USERNAME=owl
- MONGO_INITDB_ROOT_PASSWORD=twittwoo
restart: unless-stopped
volumes:
dbdata7:

44
vstp/actions.go Normal file
View File

@ -0,0 +1,44 @@
package vstp
import (
"fmt"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/cif"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
)
// Converts to the correct struct for database insertion, then processes accordingly
func processCifData(s *upstreamApi.JsonScheduleV1) error {
if s.TransactionType == "Create" {
service, err := cif.ConvertServiceType(s, true)
if err != nil {
return err
}
// Create slice as required by CreateCifEntries()
services := []database.Service{*service}
err = dbAccess.CreateCifEntries(services)
if err != nil {
return err
}
return nil
} else if s.TransactionType == "Delete" {
query := database.DeleteQuery{
TrainUid: s.CifTrainUid,
ScheduleStartDate: cif.ParseCifDate(&s.ScheduleStartDate, "start"),
StpIndicator: s.CifStpIndicator,
}
// Create slice as required by DeleteCifEntries()
queries := []database.DeleteQuery{query}
err := dbAccess.DeleteCifEntries(queries)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("unknown transaction type: %s", s.TransactionType)
}
}

107
vstp/convert.go Normal file
View File

@ -0,0 +1,107 @@
package vstp
import (
"errors"
"strings"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log"
)
// Simple type conversion
func convertCifType(in *upstreamApi.VSTPSchedule) (error, *upstreamApi.JsonScheduleV1) {
if in == nil {
return errors.New("input is nil"), nil
}
// Delete Message Example - Does not process as is.
// {"VSTPCIFMsgV1":{"schedule":{"transaction_type":"Delete","train_status":" ","schedule_start_date":"2024-04-29","schedule_end_date":"2024-04-29","schedule_days_runs":"1000000","CIF_train_uid":" 68613","CIF_stp_indicator":"N","CIF_bank_holiday_running":" "},"Sender":{"organisation":"Network Rail","application":"TSIA","component":"TSIA","userID":"#QJP0070","sessionID":"CT02000"},"classification":"industry","timestamp":"1714372444000","owner":"Network Rail","originMsgId":"2024-04-29T06:34:04-00:00@vstp.networkrail.co.uk"}}
out := &upstreamApi.JsonScheduleV1{
TransactionType: in.TransactionType,
CifBankHolidayRunning: in.CifBankHolidayRunning,
CifStpIndicator: in.CifStpIndicator,
CifTrainUid: in.CifTrainUid,
ApplicableTimetable: in.ApplicableTimetable,
ScheduleDaysRun: in.ScheduleDaysRun,
ScheduleStartDate: in.ScheduleStartDate,
ScheduleEndDate: in.ScheduleEndDate,
}
if len(in.ScheduleSegment) > 0 {
if len(in.ScheduleSegment) > 1 {
log.Warn("More than one element in schedule segment")
}
out.ScheduleSegment = convertSchedule(&in.ScheduleSegment[0])
if trim(in.ScheduleSegment[0].AtocCode) != "" {
out.AtocCode = in.ScheduleSegment[0].AtocCode
} else {
out.AtocCode = "UK"
}
return nil, out
} else {
log.Warn("VSTP Schedule Segment empty")
return errors.New("schedule segment empty"), nil
}
}
func convertSchedule(in *upstreamApi.VSTPScheduleSegment) upstreamApi.CifScheduleSegment {
out := upstreamApi.CifScheduleSegment{
SignallingId: in.SignallingId,
CifTrainCategory: in.CifTrainCategory,
CifHeadcode: in.CifHeadcode,
CifTrainServiceCode: in.CifTrainServiceCode,
CifBusinessSector: in.CifBusinessSector,
CifPowerType: in.CifPowerType,
CifTimingLoad: in.CifTimingLoad,
CifSpeed: in.CifSpeed,
CifOperatingCharacteristics: in.CifOperatingCharacteristics,
CifTrainClass: in.CifTrainClass,
CifSleepers: in.CifSleepers,
CifReservations: in.CifReservations,
CifCateringCode: in.CifCateringCode,
ScheduleLocation: *convertLocations(&in.ScheduleLocation),
}
return out
}
func convertLocations(in *[]upstreamApi.VSTPScheduleLocation) *[]upstreamApi.CifScheduleLocation {
if in == nil {
log.Error("Input is nil")
return nil
}
cifLocations := make([]upstreamApi.CifScheduleLocation, len(*in))
for i, loc := range *in {
cifLoc := upstreamApi.CifScheduleLocation{
TiplocCode: trim(loc.Location.Tiploc.TiplocId),
Arrival: convertTime(trim(loc.Arrival)),
PublicArrival: convertTime(trim(loc.PublicArrival)),
Departure: convertTime(trim(loc.Departure)),
PublicDeparture: convertTime(trim(loc.PublicDeparture)),
Pass: convertTime(trim(loc.Pass)),
Path: trim(loc.Path),
Platform: trim(loc.Platform),
EngineeringAllowance: trim(loc.EngineeringAllowance),
PathingAllowance: trim(loc.PathingAllowance),
PerformanceAllowance: trim(loc.PerformanceAllowance),
}
cifLocations[i] = cifLoc
}
return &cifLocations
}
func convertTime(in string) string {
if len(in) < 4 {
return in
}
return in[:4]
}
func trim(s string) string {
return strings.TrimSpace(s)
}

31
vstp/handler.go Normal file
View File

@ -0,0 +1,31 @@
package vstp
import (
"fmt"
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
)
func handle(msg *stomp.Message) {
start := time.Now()
schedule, err := unmarshalData(msg.Body)
if err != nil {
log.Error("Error unmarshalling VSTP Message", zap.Error(err))
}
err, convertedType := convertCifType(schedule)
if err != nil {
log.Error("Error converting VSTP to CIF", zap.Error(err))
fmt.Println(string(msg.Body))
return
}
err = processCifData(convertedType)
if err != nil {
log.Error("Error processing VSTP Schedule", zap.Error(err))
}
end := time.Now()
duration := end.Sub(start)
log.Info("Message processed", zap.Duration("processing-time", duration))
}

63
vstp/subscribe.go Normal file
View File

@ -0,0 +1,63 @@
package vstp
import (
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging"
"github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
)
func Subscribe() {
var sub *stomp.Subscription
var err error
retryCount := 0
maxRetries := 5
for retryCount < maxRetries {
sub, err = messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Warn("Unable to start subscription", zap.Error(err))
time.Sleep(10 * time.Second)
retryCount++
continue
}
break
}
if sub == nil {
log.Fatal("Failed to subscribe to VSTP topic", zap.Int("attempts", maxRetries))
}
log.Info("Subscription to VSTP topic successful, listening")
go func() {
for {
func() {
defer func() {
if r := recover(); r != nil {
log.Warn("VSTP Message Handler Stopped, waiting for recovery")
time.Sleep(time.Second * 10)
}
}()
log.Info("VSTP Message handler started")
for {
msg := <-sub.C
if msg.Err != nil {
log.Error("VSTP Message Error", zap.Error(msg.Err))
} else {
if msg != nil {
log.Debug("VSTP Message Received")
handle(msg)
} else {
log.Info("VSTP Message Empty")
}
}
}
}()
}
}()
select {}
}

20
vstp/unmarshaller.go Normal file
View File

@ -0,0 +1,20 @@
package vstp
import (
"encoding/json"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/log"
)
// Unmarshals the JSON data and returns the schedule data
func unmarshalData(jsonData []byte) (*upstreamApi.VSTPSchedule, error) {
var schedule upstreamApi.MsgData
err := json.Unmarshal(jsonData, &schedule)
if err != nil {
log.Error("Unable to unmarshal message body: " + err.Error())
return nil, err
}
log.Debug("Unmarshalling Complete")
return &schedule.Data.CIFMsg, nil
}