2
2
import os
3
3
import pandas as pd
4
4
import numpy as np
5
- from mptcpanalyzer .tshark import TsharkConfig
5
+ from mptcpanalyzer .tshark import TsharkConfig , Field
6
6
from mptcpanalyzer .connection import MpTcpSubflow , MpTcpConnection , TcpConnection , MpTcpMapping , TcpMapping
7
7
import mptcpanalyzer as mp
8
8
from mptcpanalyzer import RECEIVER_SUFFIX , SENDER_SUFFIX , _receiver , _sender , suffix_fields
12
12
import tempfile
13
13
import pprint
14
14
import functools
15
- from enum import Enum
15
+ from enum import Enum , auto
16
16
17
17
log = logging .getLogger (__name__ )
18
18
slog = logging .getLogger (__name__ )
35
35
MPTCP_DEBUG_FIELDS = TCP_DEBUG_FIELDS + [ 'mptcpdest' ]
36
36
37
37
38
+ def _convert_role (x ):
39
+ """
40
+ Workaround https://github.com/pandas-dev/pandas/pull/20826
41
+ """
42
+ return ConnectionRoles [x ] if x else np .nan
43
+
38
44
39
45
def ignore (f1 , f2 ):
40
46
return 0
@@ -133,6 +139,16 @@ def _convert_list2str(serie):
133
139
"tcplen"
134
140
]
135
141
142
+
143
+ """
144
+ On top of Tshark fields, we also describe fields generated by mptcpanalyzer
145
+ """
146
+ artificial_fields = [
147
+ # TODO use dtype_role as type
148
+ Field ("mptcpdest" , "mptcpdest" , dtype_role , "MPTCP destination" ),
149
+ Field ("tcpdest" , "tcpdest" , dtype_role , "TCP destination" )
150
+ ]
151
+
136
152
class PacketMappingMode (Enum ):
137
153
"""
138
154
How to map packets from one stream to another
@@ -142,9 +158,8 @@ class PacketMappingMode(Enum):
142
158
143
159
The hash based is more straightforward
144
160
"""
145
- HASH = 1
146
- SCORE = 2
147
-
161
+ HASH = auto ()
162
+ SCORE = auto ()
148
163
149
164
150
165
def load_merged_streams_into_pandas (
@@ -276,11 +291,6 @@ def _load_list(x, field="set field to debug"):
276
291
res = ast .literal_eval (x ) if (x is not None and x != '' ) else np .nan
277
292
return res
278
293
279
- def _convert_role (x ):
280
- """
281
- Workaround https://github.com/pandas-dev/pandas/pull/20826
282
- """
283
- return ConnectionRoles [x ] if x else np .nan
284
294
285
295
with open (cachename ) as fd :
286
296
import ast
@@ -370,10 +380,20 @@ def load_into_pandas(
370
380
filename = getrealpath (input_file )
371
381
cache = mp .get_cache ()
372
382
383
+ fields = config .get_fields ("fullname" , "type" )
384
+ tshark_dtypes = {k : v for k , v in fields .items () if v is not None or k not in ["tcpflags" ]}
385
+
386
+ artifical_dtypes = { field .fullname : field .type for field in artificial_fields }
387
+ print ("artifical_dtypes" , artifical_dtypes )
388
+ dtypes = dict (tshark_dtypes , ** artifical_dtypes )
389
+
390
+
391
+ # TODO add artificial_fields hash
392
+ pseudohash = hash (config ) + hash (frozenset (dtypes .items ()))
373
393
uid = cache .cacheuid (
374
394
'' , # prefix (might want to shorten it a bit)
375
395
[ filename ], # dependencies
376
- str (config . hash () ) + '.csv'
396
+ str (pseudohash ) + '.csv'
377
397
)
378
398
379
399
is_cache_valid , csv_filename = cache .get (uid )
@@ -395,8 +415,9 @@ def load_into_pandas(
395
415
else :
396
416
raise Exception (stderr )
397
417
398
- temp = config .get_fields ("fullname" , "type" )
399
- dtypes = {k : v for k , v in temp .items () if v is not None or k not in ["tcpflags" ]}
418
+ print ("ARTIFICAL_DTYPES:" , artifical_dtypes )
419
+
420
+
400
421
log .debug ("Loading a csv file %s" % csv_filename )
401
422
402
423
try :
@@ -409,12 +430,15 @@ def load_into_pandas(
409
430
# having both a converter and a dtype for a field generates warnings
410
431
# so we pop tcp.flags
411
432
# dtype=dtypes.pop("tcp.flags"),
412
- dtype = dtypes , # poping still generates
433
+ dtype = dtypes ,
413
434
converters = {
414
435
"tcp.flags" : _convert_flags ,
415
436
# reinjections, converts to list of integers
416
437
"mptcp.reinjection_of" : functools .partial (_convert_to_list , field = "reinjectionOf" ),
417
438
"mptcp.reinjected_in" : functools .partial (_convert_to_list , field = "reinjectedIn" ),
439
+
440
+ "mptcpdest" : _convert_role ,
441
+ "tcpdest" : _convert_role ,
418
442
},
419
443
# nrows=10, # useful for debugging purpose
420
444
)
@@ -444,6 +468,32 @@ def load_into_pandas(
444
468
raise e
445
469
446
470
log .info ("Finished loading dataframe for %s. Size=%d" % (input_file , len (data )))
471
+
472
+ names = set ([ field .name for field in artificial_fields ])
473
+ print ("NAMES" , names )
474
+ column_names = set (data .columns )
475
+ print ("column_names" , column_names )
476
+
477
+
478
+ # TODO here I should assign the type
479
+ new = pd .DataFrame (dtype = {
480
+ "tcpdest" : dtype_role
481
+ })
482
+ data = pd .concat ([ data , new ],
483
+ # ignore_index=False,
484
+ # copy=False,
485
+ )
486
+
487
+ # for missing_field in names - column_names:
488
+ # print("missing field", missing_field)
489
+ # data[missing_field] = np.nan
490
+
491
+ # data.astype({ })
492
+ # data.assign( { missing_field: np.nan for missing_field in (names - column_names) } )
493
+
494
+ print ("FINAL_DTYPES" )
495
+ print (data .dtypes )
496
+ print (data .tcpdest .head (10 ))
447
497
return data
448
498
449
499
@@ -519,6 +569,7 @@ def tcpdest_from_connections(df, con: TcpConnection):
519
569
log .debug ("Looking at destination %s" % dest )
520
570
q = con .generate_direction_query (dest )
521
571
df_dest = df .query (q )
572
+ print ("tcpdest %r" % dest )
522
573
df .loc [df_dest .index , 'tcpdest' ] = dest
523
574
524
575
# print("df",
@@ -604,13 +655,6 @@ def merge_tcp_dataframes_known_streams(
604
655
605
656
# TODO move elsewhere, to outer function
606
657
# total = total.reindex(columns=firstcols + list(filter(lambda x: x not in firstcols, total.columns.tolist())))
607
- # total.to_csv(
608
- # cachename, # output
609
- # # columns=self.columns,
610
- # index=False,
611
- # header=True,
612
- # # sep=main.config["DEFAULT"]["delimiter"],
613
- # )
614
658
log .info ("Resulting merged tcp dataframe of size {} (to compare with {} and {})" .format (
615
659
len (total ), len (h1_df ), len (h2_df )
616
660
))
0 commit comments