timetable-extension #1
@ -11,10 +11,12 @@ import (
|
||||
|
||||
const frequency = 3 * time.Second // Figure out a sensible frequency!
|
||||
|
||||
// Starts a background ticker to run background tasks. Uses the frequency configured in the background/ticker.go file
|
||||
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
||||
go runTicker(cfg, stop)
|
||||
}
|
||||
|
||||
// Runs the ticker and handles tick events
|
||||
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
||||
log.Msg.Sugar().Infof("Starting background ticker, runs every %s", frequency)
|
||||
ticker := time.NewTicker(frequency)
|
||||
|
@ -1,4 +1,7 @@
|
||||
package cif
|
||||
|
||||
// The URL required for a daily update of the CIF Data - The 'day string' must be appended
|
||||
const dailyUpdateUrl = "https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_ALL_UPDATE_DAILY&day=toc-update-"
|
||||
|
||||
// The URL required for a full fetch of the CIF Data
|
||||
const fullUpdateUrl = "https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_ALL_FULL_DAILY&day=toc-full"
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Fetches the day string for TODAYs update. Needs adjusting to be able to accept a time.Time type and return the day string for that day
|
||||
func getDayString() string {
|
||||
london, err := time.LoadLocation("Europe/London")
|
||||
if err != nil {
|
||||
@ -22,6 +23,7 @@ func getDayString() string {
|
||||
return dayStrings[day]
|
||||
}
|
||||
|
||||
// Simply returns the correct URL for either a 'daily' or 'full' update.
|
||||
func getUpdateUrl(updateType string) (string, error) {
|
||||
if updateType == "daily" {
|
||||
return dailyUpdateUrl + getDayString(), nil
|
||||
|
@ -1,5 +1,6 @@
|
||||
package cif
|
||||
|
||||
// I believe that this is not used, instead I opted for the CifMetadata type.
|
||||
type CIFUpdate struct {
|
||||
Timestamp int64
|
||||
Sequence int64
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Checks if the CORPUS Data needs updating, and carrys out the process if needed
|
||||
func CheckCorpus(cfg *helpers.Configuration) {
|
||||
log.Msg.Debug("Checking age of CORPUS Data")
|
||||
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
const url string = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"
|
||||
|
||||
// Fetches CORPUS Data using the nrod.NrodDownload() function and returns the byte array
|
||||
func fetchCorpus(cfg *helpers.Configuration) (*[]byte, error) {
|
||||
log.Msg.Info("Fetching CORPUS Data")
|
||||
data, err := nrod.NrodDownload(url, cfg)
|
||||
@ -23,6 +24,7 @@ func fetchCorpus(cfg *helpers.Configuration) (*[]byte, error) {
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
// Accepts CORPUS data as a byte array and formats it ready for database insertion
|
||||
func parseCorpusData(jsonData *[]byte) ([]database.CorpusEntry, error) {
|
||||
log.Msg.Info("Unmarshalling CORPUS Data")
|
||||
|
||||
@ -67,6 +69,7 @@ func parseCorpusData(jsonData *[]byte) ([]database.CorpusEntry, error) {
|
||||
return corpusEntries, nil
|
||||
}
|
||||
|
||||
// Removes empty fields from CORPUS entries
|
||||
func pruneCorpusEntries(corpusEntries []database.CorpusEntry) []database.CorpusEntry {
|
||||
for i := range corpusEntries {
|
||||
if corpusEntries[i].CRS == " " {
|
||||
|
@ -2,6 +2,7 @@ package corpus
|
||||
|
||||
import "git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
|
||||
// Removes non-station entities from the CORPUS Data, ready for insertion to the database (stations collection)
|
||||
func createStationEntries(corpusData []database.CorpusEntry) []database.StationEntry {
|
||||
var stationEntries []database.StationEntry
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Runs all stages of the CORPUS Update process
|
||||
func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
||||
resp, err := fetchCorpus(cfg)
|
||||
if err != nil {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
const timetableCollection string = "timetable"
|
||||
|
||||
// CAUTION: Drops the collection named in collectionName
|
||||
func DropCollection(collectionName string) error {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(collectionName)
|
||||
@ -27,6 +28,7 @@ func DropCollection(collectionName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection
|
||||
func CheckUpdateTime(collectionName string) (int64, error) {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
@ -44,6 +46,7 @@ func CheckUpdateTime(collectionName string) (int64, error) {
|
||||
return result.Updated, nil
|
||||
}
|
||||
|
||||
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
||||
func SetUpdateTime(collectionName string) error {
|
||||
log.Msg.Info("Setting update time", zap.String("collection", collectionName))
|
||||
database := MongoClient.Database(databaseName)
|
||||
@ -70,6 +73,8 @@ func SetUpdateTime(collectionName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pushes the current version of this application to the data base 'versions' collection.
|
||||
// Currently uses the old name of mq-client
|
||||
func PushVersionToDb() {
|
||||
version := database.Version{
|
||||
Target: "mq-client",
|
||||
@ -90,6 +95,7 @@ func PushVersionToDb() {
|
||||
}
|
||||
}
|
||||
|
||||
// Puts one item of the type `database.Service` to the database, used by the VSTP package which receives services one at a time
|
||||
func PutOneService(data database.Service) bool {
|
||||
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
_, err := coll.InsertOne(context.TODO(), data)
|
||||
@ -100,6 +106,7 @@ func PutOneService(data database.Service) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Deletes one service from the database.
|
||||
func DeleteOneService(data database.DeleteQuery) bool {
|
||||
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
var filter = bson.D{
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
|
||||
const Doctype = "CifMetadata"
|
||||
|
||||
// The type describing the CifMetadata 'type' in the database.
|
||||
// This type will be moved to owlboard/go-types
|
||||
type CifMetadata struct {
|
||||
Doctype string `json:"type"`
|
||||
LastUpdate time.Time `json:"lastUpdate"`
|
||||
@ -21,6 +23,7 @@ type CifMetadata struct {
|
||||
LastSequence int64 `json:"lastSequence"`
|
||||
}
|
||||
|
||||
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
|
||||
func GetCifMetadata() (*CifMetadata, error) {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
@ -38,6 +41,7 @@ func GetCifMetadata() (*CifMetadata, error) {
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Uses upsert to Insert/Update the CifMetadata in the database
|
||||
func PutCifMetadata(metadata CifMetadata) bool {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
|
@ -16,12 +16,12 @@ const databaseName string = "owlboard"
|
||||
const CorpusCollection string = "corpus"
|
||||
const StationsCollection string = "stations"
|
||||
const metaCollection string = "meta"
|
||||
const cifMetaCollection string = "cifMeta"
|
||||
const TimetableCollection string = "timetable"
|
||||
|
||||
// Provide the DB Connection to other functions
|
||||
var MongoClient (*mongo.Client)
|
||||
|
||||
// Builds the DB URI based on the loaded configuration parameters
|
||||
func getDbUri(cfg *helpers.Configuration) string {
|
||||
var uri = "mongodb://" + cfg.DbUser + ":" + cfg.DbPass + "@" + cfg.DbHost + ":" + cfg.DbPort
|
||||
return uri
|
||||
@ -47,6 +47,7 @@ func InitDataAccess(cfg *helpers.Configuration) {
|
||||
MongoClient = client
|
||||
}
|
||||
|
||||
// Closes the connection to the database - used for cleanup functions
|
||||
func CloseMongoClient() {
|
||||
if MongoClient != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
)
|
||||
|
||||
// Puts an array of Corpus Documents into the database
|
||||
func PutManyCorpus(corpusData []database.CorpusEntry) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(CorpusCollection)
|
||||
|
||||
@ -20,6 +21,7 @@ func PutManyCorpus(corpusData []database.CorpusEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Puts an array of Stations documents into the database
|
||||
func PutManyStations(stationsData []database.StationEntry) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(StationsCollection)
|
||||
|
||||
@ -34,6 +36,7 @@ func PutManyStations(stationsData []database.StationEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Converts []database.CorpusEntry types into interface slices required to put them into the database
|
||||
func convertCorpusToInterfaceSlice(corpusData []database.CorpusEntry) []interface{} {
|
||||
interfaceSlice := make([]interface{}, len(corpusData))
|
||||
for i, doc := range corpusData {
|
||||
@ -42,6 +45,7 @@ func convertCorpusToInterfaceSlice(corpusData []database.CorpusEntry) []interfac
|
||||
return interfaceSlice
|
||||
}
|
||||
|
||||
// Converts []database.StationEntry types into interface slices required to put them into the database
|
||||
func convertStationsToInterfaceSlice(stationsData []database.StationEntry) []interface{} {
|
||||
interfaceSlice := make([]interface{}, len(stationsData))
|
||||
for i, doc := range stationsData {
|
||||
|
@ -2,6 +2,7 @@ package helpers
|
||||
|
||||
import "encoding/base64"
|
||||
|
||||
// Provides a BasicAuth string
|
||||
func BasicAuth(username, password string) string {
|
||||
authString := username + ":" + password
|
||||
return base64.StdEncoding.EncodeToString([]byte(authString))
|
||||
|
@ -92,6 +92,7 @@ func LoadConfig() (*Configuration, error) {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Applies configuration strings to the configuration struct
|
||||
func (c *Configuration) setConfigValue(key, value string) {
|
||||
value = strings.TrimSpace(value)
|
||||
switch key {
|
||||
@ -116,6 +117,7 @@ func (c *Configuration) setConfigValue(key, value string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Provides a method to print the configuration struct. Only when the DEBUG env is set to true
|
||||
func (c *Configuration) PrintConfig() {
|
||||
if os.Getenv("DEBUG") == "true" {
|
||||
fmt.Println("Configuration:")
|
||||
|
@ -2,7 +2,6 @@ package helpers
|
||||
|
||||
// An error with the VSTP messages is that speed values are shown incorrectly, but not for all services
|
||||
// This maps the displayed speed to the correct speed.
|
||||
|
||||
var SpeedMap = map[string]string{
|
||||
"22": "10",
|
||||
"34": "15",
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
var Msg *zap.Logger
|
||||
|
||||
// Initialises the logger
|
||||
func init() {
|
||||
var err error
|
||||
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
var Client *stomp.Conn
|
||||
|
||||
// Initialises the connection to the STOMP server
|
||||
func StompInit(cfg *helpers.Configuration) {
|
||||
Client = dial(cfg.NrodUser, cfg.NrodPass)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user