Skip to content

Update UBX Integrity Checker #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion Utils/UBX_Integrity_Checker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Checks the integrity of u-blox binary files

# Written by: Paul Clark
# Last update: February 21st, 2023
# Last update: May 10th, 2023

# Reads a UBX file and checks the integrity of UBX, NMEA and RTCM data
# Will rewind and re-sync if an error is found
# Will create a repaired file if desired
# Will print any GNTXT messages if desired

# SparkFun code, firmware, and software is released under the MIT License (http://opensource.org/licenses/MIT)
#
Expand Down Expand Up @@ -106,6 +107,13 @@ def crc24q(byte, sum):
else:
repairFilename = filename + '.repair'

# Ask user if GNTXT is to be printed
printGNTXT = False
if containsNMEA == True:
response = input('Do you want to print any GNTXT messages found? (y/N): ') # Get the response
if (response == 'Y') or (response == 'y'):
printGNTXT = True

print()
print('Processing',filename)
print()
Expand Down Expand Up @@ -262,11 +270,13 @@ def crc24q(byte, sum):
if (c == 0xB5): # Have we found Sync Char 1 (0xB5) if we were expecting one?
if (ubx_nmea_state == sync_lost):
print("UBX Sync Char 1 (0xB5) found at byte "+str(processed))
print()
ubx_nmea_state = looking_for_62 # Now look for Sync Char 2 (0x62)
message_start_byte = processed # Record the message start byte for resync reporting
elif (c == 0x24) and (containsNMEA == True): # Have we found an NMEA '$' if we were expecting one?
if (ubx_nmea_state == sync_lost):
print("NMEA $ found at byte "+str(processed))
print()
ubx_nmea_state = looking_for_asterix # Now keep going until we receive an asterix
nmea_length = 0 # Reset nmea_length then use it to check for excessive message length
nmea_csum = 0 # Reset the nmea_csum. Update it as each character arrives
Expand All @@ -276,9 +286,13 @@ def crc24q(byte, sum):
nmea_char_4 = 0x30
nmea_char_5 = 0x30
message_start_byte = processed # Record the message start byte for resync reporting
nmea_string = None
if printGNTXT == True:
nmea_string = '$'
elif (c == 0xD3) and (containsRTCM == True): # Have we found 0xD3 if we were expecting one?
if (ubx_nmea_state == sync_lost):
print("RTCM 0xD3 found at byte "+str(processed))
print()
ubx_nmea_state = looking_for_RTCM_len1 # Now keep going until we receive the checksum
rtcm_expected_csum = 0 # Reset the RTCM csum with a seed of 0. Update it as each character arrives
rtcm_expected_csum = crc24q(c, rtcm_expected_csum) # Update expected checksum
Expand All @@ -287,8 +301,10 @@ def crc24q(byte, sum):
#print("Was expecting Sync Char 0xB5, NMEA $ or RTCM 0xD3 but did not receive one!")
if (c == 0x24):
print("Warning: * found at byte "+str(processed)+"! Are you sure this file does not contain NMEA messages?")
print()
if (c == 0xD3):
print("Warning: 0xD3 found at byte "+str(processed)+"! Are you sure this file does not contain RTCM messages?")
print()
sync_lost_at = processed
ubx_nmea_state = sync_lost

Expand All @@ -301,6 +317,7 @@ def crc24q(byte, sum):
else:
print("Panic!! Was expecting Sync Char 2 (0x62) but did not receive one!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
Expand Down Expand Up @@ -344,6 +361,7 @@ def crc24q(byte, sum):
if ((ubx_expected_checksum_A != ubx_checksum_A) or (ubx_expected_checksum_B != ubx_checksum_B)):
print("Panic!! UBX checksum error!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync.")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
Expand Down Expand Up @@ -377,12 +395,16 @@ def crc24q(byte, sum):
# NMEA messages
elif (ubx_nmea_state == looking_for_asterix):
nmea_length = nmea_length + 1 # Increase the message length count
if nmea_string is not None:
nmea_string += chr(c)
if (nmea_length > max_nmea_len): # If the length is greater than max_nmea_len, something bad must have happened (sync_lost)
print("Panic!! Excessive NMEA message length!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
nmea_string = None
continue
# If this is one of the first five characters, store it
if (nmea_length <= 5):
Expand All @@ -400,6 +422,8 @@ def crc24q(byte, sum):
message_type = chr(nmea_char_1) + chr(nmea_char_2) + chr(nmea_char_3) + chr(nmea_char_4) + chr(nmea_char_5) # Record the message type
if (message_type == "PUBX,"): # Remove the comma from PUBX
message_type = "PUBX"
if (message_type != "GNTXT"): # Reset nmea_string if this is not GNTXT
nmea_string = None
# Now check if this is an '*'
if (c == 0x2A):
# Asterix received
Expand All @@ -420,6 +444,8 @@ def crc24q(byte, sum):
elif (ubx_nmea_state == looking_for_csum1):
# Store the first NMEA checksum character
nmea_csum1 = c
if nmea_string is not None:
nmea_string += chr(c)
ubx_nmea_state = looking_for_csum2
elif (ubx_nmea_state == looking_for_csum2):
# Store the second NMEA checksum character
Expand All @@ -429,30 +455,40 @@ def crc24q(byte, sum):
# The checksum does not match so sync_lost
print("Panic!! NMEA checksum error!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
nmea_string = None
else:
# Checksum was valid so wait for the terminators
if nmea_string is not None:
nmea_string += chr(c)
ubx_nmea_state = looking_for_term1
elif (ubx_nmea_state == looking_for_term1):
# Check if this is CR
if (c != 0x0D):
print("Panic!! NMEA CR not found!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
nmea_string = None
else:
if nmea_string is not None:
nmea_string += chr(c)
ubx_nmea_state = looking_for_term2
elif (ubx_nmea_state == looking_for_term2):
# Check if this is LF
if (c != 0x0A):
print("Panic!! NMEA LF not found!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
nmea_string = None
else:
# Valid NMEA message was received. Check if we have seen this message type before
if message_type in messages:
Expand All @@ -461,6 +497,11 @@ def crc24q(byte, sum):
messages[message_type] = 1 # if we have not, set its count to 1
if (nmea_length > longest_NMEA): # Update the longest NMEA message length
longest_NMEA = nmea_length
# Print GNTXT
if nmea_string is not None and printGNTXT == True:
nmea_string += chr(c)
print(nmea_string)
nmea_string = None
# LF was received so go back to looking for B5 or a $
ubx_nmea_state = looking_for_B5_dollar_D3
rewind_in_progress = False # Clear rewind_in_progress
Expand Down Expand Up @@ -542,6 +583,7 @@ def crc24q(byte, sum):
if (rtcm_expected_csum != rtcm_actual_csum):
print("Panic!! RTCM checksum error!")
print("Sync lost at byte "+str(processed)+". Attemting to re-sync.")
print()
sync_lost_at = processed
resync_in_progress = True
ubx_nmea_state = sync_lost
Expand Down Expand Up @@ -587,6 +629,7 @@ def crc24q(byte, sum):
keepGoing = False
else:
print("Sync has been lost. Currently processing byte "+str(processed)+". Rewinding to byte "+str(rewind_to))
print()
fi.seek(rewind_to) # Rewind the file
processed = rewind_to - 1 # Rewind processed too! (-1 is needed as processed is incremented at the start of the loop)
rewind_in_progress = True # Flag that a rewind is in progress
Expand Down