@@ -1202,9 +1202,42 @@ def _set_encoding(self) -> None:
1202
1202
else :
1203
1203
self ._encoding = "utf-8"
1204
1204
1205
+ def _read_int8 (self ) -> int :
1206
+ return struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1207
+
1208
+ def _read_uint8 (self ) -> int :
1209
+ return struct .unpack ("B" , self .path_or_buf .read (1 ))[0 ]
1210
+
1211
+ def _read_uint16 (self ) -> int :
1212
+ return struct .unpack (f"{ self .byteorder } H" , self .path_or_buf .read (2 ))[0 ]
1213
+
1214
+ def _read_uint32 (self ) -> int :
1215
+ return struct .unpack (f"{ self .byteorder } I" , self .path_or_buf .read (4 ))[0 ]
1216
+
1217
+ def _read_uint64 (self ) -> int :
1218
+ return struct .unpack (f"{ self .byteorder } Q" , self .path_or_buf .read (8 ))[0 ]
1219
+
1220
+ def _read_int16 (self ) -> int :
1221
+ return struct .unpack (f"{ self .byteorder } h" , self .path_or_buf .read (2 ))[0 ]
1222
+
1223
+ def _read_int32 (self ) -> int :
1224
+ return struct .unpack (f"{ self .byteorder } i" , self .path_or_buf .read (4 ))[0 ]
1225
+
1226
+ def _read_int64 (self ) -> int :
1227
+ return struct .unpack (f"{ self .byteorder } q" , self .path_or_buf .read (8 ))[0 ]
1228
+
1229
+ def _read_char8 (self ) -> bytes :
1230
+ return struct .unpack ("c" , self .path_or_buf .read (1 ))[0 ]
1231
+
1232
+ def _read_int16_count (self , count : int ) -> tuple [int , ...]:
1233
+ return struct .unpack (
1234
+ f"{ self .byteorder } { 'h' * count } " ,
1235
+ self .path_or_buf .read (2 * count ),
1236
+ )
1237
+
1205
1238
def _read_header (self ) -> None :
1206
- first_char = self .path_or_buf . read ( 1 )
1207
- if struct . unpack ( "c" , first_char )[ 0 ] == b"<" :
1239
+ first_char = self ._read_char8 ( )
1240
+ if first_char == b"<" :
1208
1241
self ._read_new_header ()
1209
1242
else :
1210
1243
self ._read_old_header (first_char )
@@ -1224,11 +1257,10 @@ def _read_new_header(self) -> None:
1224
1257
self .path_or_buf .read (21 ) # </release><byteorder>
1225
1258
self .byteorder = self .path_or_buf .read (3 ) == b"MSF" and ">" or "<"
1226
1259
self .path_or_buf .read (15 ) # </byteorder><K>
1227
- nvar_type = "H" if self .format_version <= 118 else "I"
1228
- nvar_size = 2 if self .format_version <= 118 else 4
1229
- self .nvar = struct .unpack (
1230
- self .byteorder + nvar_type , self .path_or_buf .read (nvar_size )
1231
- )[0 ]
1260
+ if self .format_version <= 118 :
1261
+ self .nvar = self ._read_uint16 ()
1262
+ else :
1263
+ self .nvar = self ._read_uint32 ()
1232
1264
self .path_or_buf .read (7 ) # </K><N>
1233
1265
1234
1266
self .nobs = self ._get_nobs ()
@@ -1240,46 +1272,27 @@ def _read_new_header(self) -> None:
1240
1272
self .path_or_buf .read (8 ) # 0x0000000000000000
1241
1273
self .path_or_buf .read (8 ) # position of <map>
1242
1274
1243
- self ._seek_vartypes = (
1244
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 16
1245
- )
1246
- self ._seek_varnames = (
1247
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1248
- )
1249
- self ._seek_sortlist = (
1250
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1251
- )
1252
- self ._seek_formats = (
1253
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 9
1254
- )
1255
- self ._seek_value_label_names = (
1256
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 19
1257
- )
1275
+ self ._seek_vartypes = self ._read_int64 () + 16
1276
+ self ._seek_varnames = self ._read_int64 () + 10
1277
+ self ._seek_sortlist = self ._read_int64 () + 10
1278
+ self ._seek_formats = self ._read_int64 () + 9
1279
+ self ._seek_value_label_names = self ._read_int64 () + 19
1258
1280
1259
1281
# Requires version-specific treatment
1260
1282
self ._seek_variable_labels = self ._get_seek_variable_labels ()
1261
1283
1262
1284
self .path_or_buf .read (8 ) # <characteristics>
1263
- self .data_location = (
1264
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 6
1265
- )
1266
- self .seek_strls = (
1267
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 7
1268
- )
1269
- self .seek_value_labels = (
1270
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 14
1271
- )
1285
+ self .data_location = self ._read_int64 () + 6
1286
+ self .seek_strls = self ._read_int64 () + 7
1287
+ self .seek_value_labels = self ._read_int64 () + 14
1272
1288
1273
1289
self .typlist , self .dtyplist = self ._get_dtypes (self ._seek_vartypes )
1274
1290
1275
1291
self .path_or_buf .seek (self ._seek_varnames )
1276
1292
self .varlist = self ._get_varlist ()
1277
1293
1278
1294
self .path_or_buf .seek (self ._seek_sortlist )
1279
- self .srtlist = struct .unpack (
1280
- self .byteorder + ("h" * (self .nvar + 1 )),
1281
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1282
- )[:- 1 ]
1295
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1283
1296
1284
1297
self .path_or_buf .seek (self ._seek_formats )
1285
1298
self .fmtlist = self ._get_fmtlist ()
@@ -1296,10 +1309,7 @@ def _get_dtypes(
1296
1309
) -> tuple [list [int | str ], list [str | np .dtype ]]:
1297
1310
1298
1311
self .path_or_buf .seek (seek_vartypes )
1299
- raw_typlist = [
1300
- struct .unpack (self .byteorder + "H" , self .path_or_buf .read (2 ))[0 ]
1301
- for _ in range (self .nvar )
1302
- ]
1312
+ raw_typlist = [self ._read_uint16 () for _ in range (self .nvar )]
1303
1313
1304
1314
def f (typ : int ) -> int | str :
1305
1315
if typ <= 2045 :
@@ -1368,16 +1378,16 @@ def _get_variable_labels(self) -> list[str]:
1368
1378
1369
1379
def _get_nobs (self ) -> int :
1370
1380
if self .format_version >= 118 :
1371
- return struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1381
+ return self ._read_uint64 ()
1372
1382
else :
1373
- return struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1383
+ return self ._read_uint32 ()
1374
1384
1375
1385
def _get_data_label (self ) -> str :
1376
1386
if self .format_version >= 118 :
1377
- strlen = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1387
+ strlen = self ._read_uint16 ()
1378
1388
return self ._decode (self .path_or_buf .read (strlen ))
1379
1389
elif self .format_version == 117 :
1380
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1390
+ strlen = self ._read_int8 ()
1381
1391
return self ._decode (self .path_or_buf .read (strlen ))
1382
1392
elif self .format_version > 105 :
1383
1393
return self ._decode (self .path_or_buf .read (81 ))
@@ -1386,10 +1396,10 @@ def _get_data_label(self) -> str:
1386
1396
1387
1397
def _get_time_stamp (self ) -> str :
1388
1398
if self .format_version >= 118 :
1389
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1399
+ strlen = self ._read_int8 ()
1390
1400
return self .path_or_buf .read (strlen ).decode ("utf-8" )
1391
1401
elif self .format_version == 117 :
1392
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1402
+ strlen = self ._read_int8 ()
1393
1403
return self ._decode (self .path_or_buf .read (strlen ))
1394
1404
elif self .format_version > 104 :
1395
1405
return self ._decode (self .path_or_buf .read (18 ))
@@ -1404,22 +1414,20 @@ def _get_seek_variable_labels(self) -> int:
1404
1414
# variable, 20 for the closing tag and 17 for the opening tag
1405
1415
return self ._seek_value_label_names + (33 * self .nvar ) + 20 + 17
1406
1416
elif self .format_version >= 118 :
1407
- return struct . unpack ( self .byteorder + "q" , self . path_or_buf . read ( 8 ))[ 0 ] + 17
1417
+ return self ._read_int64 () + 17
1408
1418
else :
1409
1419
raise ValueError ()
1410
1420
1411
1421
def _read_old_header (self , first_char : bytes ) -> None :
1412
- self .format_version = struct . unpack ( "b" , first_char ) [0 ]
1422
+ self .format_version = int ( first_char [0 ])
1413
1423
if self .format_version not in [104 , 105 , 108 , 111 , 113 , 114 , 115 ]:
1414
1424
raise ValueError (_version_error .format (version = self .format_version ))
1415
1425
self ._set_encoding ()
1416
- self .byteorder = (
1417
- struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ] == 0x1 and ">" or "<"
1418
- )
1419
- self .filetype = struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1426
+ self .byteorder = self ._read_int8 () == 0x1 and ">" or "<"
1427
+ self .filetype = self ._read_int8 ()
1420
1428
self .path_or_buf .read (1 ) # unused
1421
1429
1422
- self .nvar = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1430
+ self .nvar = self ._read_uint16 ()
1423
1431
self .nobs = self ._get_nobs ()
1424
1432
1425
1433
self ._data_label = self ._get_data_label ()
@@ -1428,7 +1436,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1428
1436
1429
1437
# descriptors
1430
1438
if self .format_version > 108 :
1431
- typlist = [ord ( self . path_or_buf . read ( 1 )) for _ in range (self .nvar )]
1439
+ typlist = [int ( c ) for c in self . path_or_buf . read (self .nvar )]
1432
1440
else :
1433
1441
buf = self .path_or_buf .read (self .nvar )
1434
1442
typlistb = np .frombuffer (buf , dtype = np .uint8 )
@@ -1458,10 +1466,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1458
1466
self .varlist = [
1459
1467
self ._decode (self .path_or_buf .read (9 )) for _ in range (self .nvar )
1460
1468
]
1461
- self .srtlist = struct .unpack (
1462
- self .byteorder + ("h" * (self .nvar + 1 )),
1463
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1464
- )[:- 1 ]
1469
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1465
1470
1466
1471
self .fmtlist = self ._get_fmtlist ()
1467
1472
@@ -1476,17 +1481,11 @@ def _read_old_header(self, first_char: bytes) -> None:
1476
1481
1477
1482
if self .format_version > 104 :
1478
1483
while True :
1479
- data_type = struct .unpack (
1480
- self .byteorder + "b" , self .path_or_buf .read (1 )
1481
- )[0 ]
1484
+ data_type = self ._read_int8 ()
1482
1485
if self .format_version > 108 :
1483
- data_len = struct .unpack (
1484
- self .byteorder + "i" , self .path_or_buf .read (4 )
1485
- )[0 ]
1486
+ data_len = self ._read_int32 ()
1486
1487
else :
1487
- data_len = struct .unpack (
1488
- self .byteorder + "h" , self .path_or_buf .read (2 )
1489
- )[0 ]
1488
+ data_len = self ._read_int16 ()
1490
1489
if data_type == 0 :
1491
1490
break
1492
1491
self .path_or_buf .read (data_len )
@@ -1570,8 +1569,8 @@ def _read_value_labels(self) -> None:
1570
1569
labname = self ._decode (self .path_or_buf .read (129 ))
1571
1570
self .path_or_buf .read (3 ) # padding
1572
1571
1573
- n = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1574
- txtlen = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1572
+ n = self ._read_uint32 ()
1573
+ txtlen = self ._read_uint32 ()
1575
1574
off = np .frombuffer (
1576
1575
self .path_or_buf .read (4 * n ), dtype = self .byteorder + "i4" , count = n
1577
1576
)
@@ -1599,7 +1598,7 @@ def _read_strls(self) -> None:
1599
1598
break
1600
1599
1601
1600
if self .format_version == 117 :
1602
- v_o = struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1601
+ v_o = self ._read_uint64 ()
1603
1602
else :
1604
1603
buf = self .path_or_buf .read (12 )
1605
1604
# Only tested on little endian file on little endian machine.
@@ -1610,8 +1609,8 @@ def _read_strls(self) -> None:
1610
1609
# This path may not be correct, impossible to test
1611
1610
buf = buf [0 :v_size ] + buf [(4 + v_size ) :]
1612
1611
v_o = struct .unpack ("Q" , buf )[0 ]
1613
- typ = struct . unpack ( "B" , self .path_or_buf . read ( 1 ))[ 0 ]
1614
- length = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1612
+ typ = self ._read_uint8 ()
1613
+ length = self ._read_uint32 ()
1615
1614
va = self .path_or_buf .read (length )
1616
1615
if typ == 130 :
1617
1616
decoded_va = va [0 :- 1 ].decode (self ._encoding )
0 commit comments