@@ -1198,9 +1198,42 @@ def _set_encoding(self) -> None:
1198
1198
else :
1199
1199
self ._encoding = "utf-8"
1200
1200
1201
+ def _read_int8 (self ) -> int :
1202
+ return struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1203
+
1204
+ def _read_uint8 (self ) -> int :
1205
+ return struct .unpack ("B" , self .path_or_buf .read (1 ))[0 ]
1206
+
1207
+ def _read_uint16 (self ) -> int :
1208
+ return struct .unpack (f"{ self .byteorder } H" , self .path_or_buf .read (2 ))[0 ]
1209
+
1210
+ def _read_uint32 (self ) -> int :
1211
+ return struct .unpack (f"{ self .byteorder } I" , self .path_or_buf .read (4 ))[0 ]
1212
+
1213
+ def _read_uint64 (self ) -> int :
1214
+ return struct .unpack (f"{ self .byteorder } Q" , self .path_or_buf .read (8 ))[0 ]
1215
+
1216
+ def _read_int16 (self ) -> int :
1217
+ return struct .unpack (f"{ self .byteorder } h" , self .path_or_buf .read (2 ))[0 ]
1218
+
1219
+ def _read_int32 (self ) -> int :
1220
+ return struct .unpack (f"{ self .byteorder } i" , self .path_or_buf .read (4 ))[0 ]
1221
+
1222
+ def _read_int64 (self ) -> int :
1223
+ return struct .unpack (f"{ self .byteorder } q" , self .path_or_buf .read (8 ))[0 ]
1224
+
1225
+ def _read_char8 (self ) -> bytes :
1226
+ return struct .unpack ("c" , self .path_or_buf .read (1 ))[0 ]
1227
+
1228
+ def _read_int16_count (self , count : int ) -> tuple [int , ...]:
1229
+ return struct .unpack (
1230
+ f"{ self .byteorder } { 'h' * count } " ,
1231
+ self .path_or_buf .read (2 * count ),
1232
+ )
1233
+
1201
1234
def _read_header (self ) -> None :
1202
- first_char = self .path_or_buf . read ( 1 )
1203
- if struct . unpack ( "c" , first_char )[ 0 ] == b"<" :
1235
+ first_char = self ._read_char8 ( )
1236
+ if first_char == b"<" :
1204
1237
self ._read_new_header ()
1205
1238
else :
1206
1239
self ._read_old_header (first_char )
@@ -1220,11 +1253,9 @@ def _read_new_header(self) -> None:
1220
1253
self .path_or_buf .read (21 ) # </release><byteorder>
1221
1254
self .byteorder = ">" if self .path_or_buf .read (3 ) == b"MSF" else "<"
1222
1255
self .path_or_buf .read (15 ) # </byteorder><K>
1223
- nvar_type = "H" if self .format_version <= 118 else "I"
1224
- nvar_size = 2 if self .format_version <= 118 else 4
1225
- self .nvar = struct .unpack (
1226
- self .byteorder + nvar_type , self .path_or_buf .read (nvar_size )
1227
- )[0 ]
1256
+ self .nvar = (
1257
+ self ._read_uint16 () if self .format_version <= 118 else self ._read_uint32 ()
1258
+ )
1228
1259
self .path_or_buf .read (7 ) # </K><N>
1229
1260
1230
1261
self .nobs = self ._get_nobs ()
@@ -1236,46 +1267,27 @@ def _read_new_header(self) -> None:
1236
1267
self .path_or_buf .read (8 ) # 0x0000000000000000
1237
1268
self .path_or_buf .read (8 ) # position of <map>
1238
1269
1239
- self ._seek_vartypes = (
1240
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 16
1241
- )
1242
- self ._seek_varnames = (
1243
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1244
- )
1245
- self ._seek_sortlist = (
1246
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1247
- )
1248
- self ._seek_formats = (
1249
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 9
1250
- )
1251
- self ._seek_value_label_names = (
1252
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 19
1253
- )
1270
+ self ._seek_vartypes = self ._read_int64 () + 16
1271
+ self ._seek_varnames = self ._read_int64 () + 10
1272
+ self ._seek_sortlist = self ._read_int64 () + 10
1273
+ self ._seek_formats = self ._read_int64 () + 9
1274
+ self ._seek_value_label_names = self ._read_int64 () + 19
1254
1275
1255
1276
# Requires version-specific treatment
1256
1277
self ._seek_variable_labels = self ._get_seek_variable_labels ()
1257
1278
1258
1279
self .path_or_buf .read (8 ) # <characteristics>
1259
- self .data_location = (
1260
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 6
1261
- )
1262
- self .seek_strls = (
1263
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 7
1264
- )
1265
- self .seek_value_labels = (
1266
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 14
1267
- )
1280
+ self .data_location = self ._read_int64 () + 6
1281
+ self .seek_strls = self ._read_int64 () + 7
1282
+ self .seek_value_labels = self ._read_int64 () + 14
1268
1283
1269
1284
self .typlist , self .dtyplist = self ._get_dtypes (self ._seek_vartypes )
1270
1285
1271
1286
self .path_or_buf .seek (self ._seek_varnames )
1272
1287
self .varlist = self ._get_varlist ()
1273
1288
1274
1289
self .path_or_buf .seek (self ._seek_sortlist )
1275
- self .srtlist = struct .unpack (
1276
- self .byteorder + ("h" * (self .nvar + 1 )),
1277
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1278
- )[:- 1 ]
1290
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1279
1291
1280
1292
self .path_or_buf .seek (self ._seek_formats )
1281
1293
self .fmtlist = self ._get_fmtlist ()
@@ -1291,10 +1303,7 @@ def _get_dtypes(
1291
1303
self , seek_vartypes : int
1292
1304
) -> tuple [list [int | str ], list [str | np .dtype ]]:
1293
1305
self .path_or_buf .seek (seek_vartypes )
1294
- raw_typlist = [
1295
- struct .unpack (self .byteorder + "H" , self .path_or_buf .read (2 ))[0 ]
1296
- for _ in range (self .nvar )
1297
- ]
1306
+ raw_typlist = [self ._read_uint16 () for _ in range (self .nvar )]
1298
1307
1299
1308
def f (typ : int ) -> int | str :
1300
1309
if typ <= 2045 :
@@ -1363,16 +1372,16 @@ def _get_variable_labels(self) -> list[str]:
1363
1372
1364
1373
def _get_nobs (self ) -> int :
1365
1374
if self .format_version >= 118 :
1366
- return struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1375
+ return self ._read_uint64 ()
1367
1376
else :
1368
- return struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1377
+ return self ._read_uint32 ()
1369
1378
1370
1379
def _get_data_label (self ) -> str :
1371
1380
if self .format_version >= 118 :
1372
- strlen = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1381
+ strlen = self ._read_uint16 ()
1373
1382
return self ._decode (self .path_or_buf .read (strlen ))
1374
1383
elif self .format_version == 117 :
1375
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1384
+ strlen = self ._read_int8 ()
1376
1385
return self ._decode (self .path_or_buf .read (strlen ))
1377
1386
elif self .format_version > 105 :
1378
1387
return self ._decode (self .path_or_buf .read (81 ))
@@ -1381,10 +1390,10 @@ def _get_data_label(self) -> str:
1381
1390
1382
1391
def _get_time_stamp (self ) -> str :
1383
1392
if self .format_version >= 118 :
1384
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1393
+ strlen = self ._read_int8 ()
1385
1394
return self .path_or_buf .read (strlen ).decode ("utf-8" )
1386
1395
elif self .format_version == 117 :
1387
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1396
+ strlen = self ._read_int8 ()
1388
1397
return self ._decode (self .path_or_buf .read (strlen ))
1389
1398
elif self .format_version > 104 :
1390
1399
return self ._decode (self .path_or_buf .read (18 ))
@@ -1399,22 +1408,20 @@ def _get_seek_variable_labels(self) -> int:
1399
1408
# variable, 20 for the closing tag and 17 for the opening tag
1400
1409
return self ._seek_value_label_names + (33 * self .nvar ) + 20 + 17
1401
1410
elif self .format_version >= 118 :
1402
- return struct . unpack ( self .byteorder + "q" , self . path_or_buf . read ( 8 ))[ 0 ] + 17
1411
+ return self ._read_int64 () + 17
1403
1412
else :
1404
1413
raise ValueError ()
1405
1414
1406
1415
def _read_old_header (self , first_char : bytes ) -> None :
1407
- self .format_version = struct . unpack ( "b" , first_char ) [0 ]
1416
+ self .format_version = int ( first_char [0 ])
1408
1417
if self .format_version not in [104 , 105 , 108 , 111 , 113 , 114 , 115 ]:
1409
1418
raise ValueError (_version_error .format (version = self .format_version ))
1410
1419
self ._set_encoding ()
1411
- self .byteorder = (
1412
- ">" if struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ] == 0x1 else "<"
1413
- )
1414
- self .filetype = struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1420
+ self .byteorder = (">" if self ._read_int8 () == 0x1 else "<" )
1421
+ self .filetype = self ._read_int8 ()
1415
1422
self .path_or_buf .read (1 ) # unused
1416
1423
1417
- self .nvar = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1424
+ self .nvar = self ._read_uint16 ()
1418
1425
self .nobs = self ._get_nobs ()
1419
1426
1420
1427
self ._data_label = self ._get_data_label ()
@@ -1423,7 +1430,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1423
1430
1424
1431
# descriptors
1425
1432
if self .format_version > 108 :
1426
- typlist = [ord ( self . path_or_buf . read ( 1 )) for _ in range (self .nvar )]
1433
+ typlist = [int ( c ) for c in self . path_or_buf . read (self .nvar )]
1427
1434
else :
1428
1435
buf = self .path_or_buf .read (self .nvar )
1429
1436
typlistb = np .frombuffer (buf , dtype = np .uint8 )
@@ -1453,10 +1460,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1453
1460
self .varlist = [
1454
1461
self ._decode (self .path_or_buf .read (9 )) for _ in range (self .nvar )
1455
1462
]
1456
- self .srtlist = struct .unpack (
1457
- self .byteorder + ("h" * (self .nvar + 1 )),
1458
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1459
- )[:- 1 ]
1463
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1460
1464
1461
1465
self .fmtlist = self ._get_fmtlist ()
1462
1466
@@ -1471,17 +1475,11 @@ def _read_old_header(self, first_char: bytes) -> None:
1471
1475
1472
1476
if self .format_version > 104 :
1473
1477
while True :
1474
- data_type = struct .unpack (
1475
- self .byteorder + "b" , self .path_or_buf .read (1 )
1476
- )[0 ]
1478
+ data_type = self ._read_int8 ()
1477
1479
if self .format_version > 108 :
1478
- data_len = struct .unpack (
1479
- self .byteorder + "i" , self .path_or_buf .read (4 )
1480
- )[0 ]
1480
+ data_len = self ._read_int32 ()
1481
1481
else :
1482
- data_len = struct .unpack (
1483
- self .byteorder + "h" , self .path_or_buf .read (2 )
1484
- )[0 ]
1482
+ data_len = self ._read_int16 ()
1485
1483
if data_type == 0 :
1486
1484
break
1487
1485
self .path_or_buf .read (data_len )
@@ -1565,8 +1563,8 @@ def _read_value_labels(self) -> None:
1565
1563
labname = self ._decode (self .path_or_buf .read (129 ))
1566
1564
self .path_or_buf .read (3 ) # padding
1567
1565
1568
- n = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1569
- txtlen = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1566
+ n = self ._read_uint32 ()
1567
+ txtlen = self ._read_uint32 ()
1570
1568
off = np .frombuffer (
1571
1569
self .path_or_buf .read (4 * n ), dtype = self .byteorder + "i4" , count = n
1572
1570
)
@@ -1594,7 +1592,7 @@ def _read_strls(self) -> None:
1594
1592
break
1595
1593
1596
1594
if self .format_version == 117 :
1597
- v_o = struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1595
+ v_o = self ._read_uint64 ()
1598
1596
else :
1599
1597
buf = self .path_or_buf .read (12 )
1600
1598
# Only tested on little endian file on little endian machine.
@@ -1605,8 +1603,8 @@ def _read_strls(self) -> None:
1605
1603
# This path may not be correct, impossible to test
1606
1604
buf = buf [0 :v_size ] + buf [(4 + v_size ) :]
1607
1605
v_o = struct .unpack ("Q" , buf )[0 ]
1608
- typ = struct . unpack ( "B" , self .path_or_buf . read ( 1 ))[ 0 ]
1609
- length = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1606
+ typ = self ._read_uint8 ()
1607
+ length = self ._read_uint32 ()
1610
1608
va = self .path_or_buf .read (length )
1611
1609
if typ == 130 :
1612
1610
decoded_va = va [0 :- 1 ].decode (self ._encoding )
0 commit comments