Improve CIF Parsing to acheive 60% memory reduction
This commit is contained in:
		
							parent
							
								
									a2c52f7b8b
								
							
						
					
					
						commit
						94e4cd964d
					
				
							
								
								
									
										35
									
								
								cif/parse.go
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								cif/parse.go
									
									
									
									
									
								
							| @ -11,60 +11,55 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| // Unmarshalls data into the correct types for processing | ||||
| // This function suffers from extremely high memory usage | ||||
| func parseCifData(data *[]byte) (*parsedData, error) { | ||||
| 	log.Msg.Debug("Starting CIF Data parsing") | ||||
| 	if data == nil { | ||||
| 		err := errors.New("unable to parse nil pointer") | ||||
| 		return nil, err | ||||
| 		return nil, errors.New("unable to parse nil pointer") | ||||
| 	} | ||||
| 	// Split the data into lines | ||||
| 	lines := bytes.Split(*data, []byte("\n")) | ||||
| 
 | ||||
| 	// Initialise variable for the parsed data | ||||
| 	// Initialise data structures | ||||
| 	var parsed parsedData | ||||
| 	parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) | ||||
| 	parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) | ||||
| 
 | ||||
| 	for _, line := range lines { | ||||
| 	// Create JSON Decoder | ||||
| 	decoder := json.NewDecoder(bytes.NewReader(*data)) | ||||
| 
 | ||||
| 		// Skip empty lines to avoid logging errors when there is no error | ||||
| 		if len(bytes.TrimSpace(line)) == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// Map each line for processing | ||||
| 	// Iterate over JSON Objects using stream decoder | ||||
| 	for decoder.More() { | ||||
| 		var obj map[string]json.RawMessage | ||||
| 		if err := json.Unmarshal(line, &obj); err != nil { | ||||
| 			log.Msg.Error("Error decoding line", zap.String("line", string(line)), zap.Error(err)) | ||||
| 			continue | ||||
| 		if err := decoder.Decode(&obj); err != nil { | ||||
| 			log.Msg.Error("Error decoding JSON String") | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		// Loop through the mapped data and unmarshal to the correct type | ||||
| 		// Handle parsed data | ||||
| 		for key, value := range obj { | ||||
| 			switch key { | ||||
| 			case "JsonTimetableV1": | ||||
| 				var timetable upstreamApi.JsonTimetableV1 | ||||
| 				if err := json.Unmarshal(value, &timetable); err != nil { | ||||
| 					log.Msg.Error("Unable to parse JSON Timetable", zap.Error(err), zap.String("line", string(value))) | ||||
| 					log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err)) | ||||
| 					continue | ||||
| 				} | ||||
| 				parsed.header = timetable | ||||
| 			case "JsonAssociationV1": | ||||
| 				var association upstreamApi.JsonAssociationV1 | ||||
| 				if err := json.Unmarshal(value, &association); err != nil { | ||||
| 					log.Msg.Error("Error decoding JSON Association", zap.Error(err)) | ||||
| 					log.Msg.Error("Error decoding JSONAssociationV1 object", zap.Error(err)) | ||||
| 					continue | ||||
| 				} | ||||
| 				parsed.assoc = append(parsed.assoc, association) | ||||
| 			case "JsonScheduleV1": | ||||
| 				var schedule upstreamApi.JsonScheduleV1 | ||||
| 				if err := json.Unmarshal(value, &schedule); err != nil { | ||||
| 					log.Msg.Error("Error decoding JSON Schedule", zap.Error(err)) | ||||
| 					log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err)) | ||||
| 					continue | ||||
| 				} | ||||
| 				parsed.sched = append(parsed.sched, schedule) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	log.Msg.Debug("CIF Parsing completed") | ||||
| 	return &parsed, nil | ||||
| } | ||||
|  | ||||
| @ -12,6 +12,7 @@ import ( | ||||
| 
 | ||||
| // Processes parsed CIF data and applies the data to the database | ||||
| func processParsedCif(data *parsedData) error { | ||||
| 	log.Msg.Debug("Starting CIF Processing") | ||||
| 	createTasks := make([]*upstreamApi.JsonScheduleV1, 0) | ||||
| 	deleteTasks := make([]*upstreamApi.JsonScheduleV1, 0) | ||||
| 
 | ||||
|  | ||||
| @ -98,6 +98,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta | ||||
| 			debugWriteFile(&parsed.header, &parsed.sched) | ||||
| 		} | ||||
| 
 | ||||
| 		log.Msg.Debug("Starting metadata checks") | ||||
| 		// Check CIF Sequence | ||||
| 		// Skip if LastSequence is >= to this sequence | ||||
| 		if metadata.LastSequence >= parsed.header.Metadata.Sequence { | ||||
| @ -111,6 +112,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta | ||||
| 			log.Msg.Error("CIF sequence not as expected", zap.Error(err), zap.Int64("LastSequence", metadata.LastSequence), zap.Int64("New Sequence", parsed.header.Metadata.Sequence)) | ||||
| 			return err | ||||
| 		} | ||||
| 		log.Msg.Debug("Metadata checks complete") | ||||
| 
 | ||||
| 		// Do further sequence checks - parsed.header.Metadata.Sequence MUST = metadata.LastSequence + 1 | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user