32
32
MPTCP_DEBUG_FIELDS = TCP_DEBUG_FIELDS + ['mptcpdest' ]
33
33
34
34
35
- def _convert_role (x ):
36
- """
37
- Workaround https://github.com/pandas-dev/pandas/pull/20826
38
- """
39
- log .log (mp .TRACE , "converting [%r] into role" % x )
40
-
41
- # else throw
42
- return ConnectionRoles .from_string (x )
43
- # return ConnectionRoles[x] if x else np.nan
35
+ # def _convert_role(x):
36
+ # """
37
+ # Workaround https://github.com/pandas-dev/pandas/pull/20826
38
+ # """
39
+ # log.log(mp.TRACE, "converting [%r] into role" % x)
40
+ # return ConnectionRoles.from_string(x)
44
41
45
42
def ignore (f1 , f2 ):
46
43
return 0
@@ -97,14 +94,14 @@ def getrealpath(input_file):
97
94
On top of Tshark fields, we also describe fields generated by mptcpanalyzer
98
95
"""
99
96
per_pcap_artificial_fields = {
100
- # TODO use dtype_role as type
101
97
"mptcpdest" : Field ("mptcpdest" , dtype_role , "MPTCP destination" , False , None ),
102
98
"tcpdest" : Field ("tcpdest" , dtype_role , "TCP destination" , False , None ),
99
+ # TODO use int? as type
103
100
"hash" : Field ("hash" , str , "Hash of fields" , False , None ),
104
101
105
102
# TODO rename ?
106
103
# TODO should be a CategoryDataType !
107
- "merge" : Field ("_merge" , None , "How many packets were merged" , False , None )
104
+ # "merge": Field("_merge", None, "How many packets were merged", False, None)
108
105
}
109
106
110
107
# merged_per_pcap_artificial_fields = {
@@ -149,8 +146,8 @@ def load_merged_streams_into_pandas(
149
146
"""
150
147
Arguments:
151
148
protocol: mptcp or tcp
152
-
153
149
mapping_mode: Only HASH works for now
150
+ clock_offset: untested
154
151
155
152
Returns
156
153
a dataframe with columns... owd ?
@@ -247,12 +244,10 @@ def _gen_dtypes(fields) -> Dict[str, Any]:
247
244
dtypes .update ({_name (f .fullname ): f .type for f in per_pcap_artificial_fields .values ()})
248
245
249
246
# these are overrides from the generated dtypes
250
- dtypes .update ({
251
- # during the merge, we join even unmapped packets so some entries
252
- # may be empty => float64
253
- _first ("packetid" ): tshark_config .fields ["packetid" ].type ,
254
- _second ("packetid" ): tshark_config .fields ["packetid" ].type ,
255
- })
247
+ # dtypes.update({
248
+ # _first("packetid"): tshark_config.fields["packetid"].type,
249
+ # _second("packetid"): tshark_config.fields["packetid"].type,
250
+ # })
256
251
257
252
return dtypes
258
253
@@ -321,6 +316,9 @@ def _gen_converters() -> Dict[str, Callable]:
321
316
# don't do it here else we might repeat it
322
317
# data["abstime"] += clock_offset
323
318
319
+ debug_dataframe (res , "checking merge" , usecols = ["merge_status" ])
320
+ print ("%d nan values" % len (res [res .merge_status == np .nan ]))
321
+
324
322
# log.debug("Column names: %s", res.columns)
325
323
# log.debug("Dtypes after load:%s\n" % dict(res.dtypes))
326
324
# print(res["mptcpdest"].dtype)
@@ -545,14 +543,10 @@ def tcpdest_from_connections(df, con: TcpConnection) -> pd.DataFrame:
545
543
546
544
def convert_to_sender_receiver (
547
545
df
548
- # def tcp_compute_owd(
549
- # already merged df
550
- # con1: Tuple[pd.DataFrame, TcpConnection],
551
- # con2: Tuple[pd.DataFrame, TcpConnection]
552
- # tcp_sender_df,
553
- # tcp_receiver_df
554
546
):
555
547
"""
548
+ Convert dataframe from X_HOST1 | X_HOST2 to X_SENDER | X_RECEIVER
549
+
556
550
each packet has a destination marker
557
551
Assume clocks are fine here !
558
552
"""
@@ -616,7 +610,9 @@ def _rename_column(col_name, suffixes) -> str:
616
610
log .log (mp .TRACE , "renaming inplace" )
617
611
618
612
tdf .rename (columns = rename_func , inplace = True )
613
+ debug_dataframe (tdf , "temporary dataframe" )
619
614
total = pd .concat ([total , tdf ], ignore_index = True , sort = False , )
615
+ print ("total df size = %d" % len (total ))
620
616
621
617
# subdf[ _first("tcpdest") == ConnectionRole.Client] .rename(columns=_rename_cols, inplace=True)
622
618
# print(subdf.columns)
@@ -645,6 +641,8 @@ def merge_tcp_dataframes_known_streams(
645
641
2/ identify which dataframe is server's/client's
646
642
2/
647
643
644
+ Adds a merge_status column
645
+
648
646
Args:
649
647
con1: Tuple dataframe/tcpstream id
650
648
con2: same
@@ -707,7 +705,7 @@ def merge_tcp_dataframes_known_streams(
707
705
log .info ("Resulting merged tcp dataframe of size {} ({} mapped packets vs {} unmapped)"
708
706
"with input dataframes of size {} and {}." .format (
709
707
len (total ),
710
- len (total [total ._merge == "both" ]), len (total [total ._merge != "both" ]),
708
+ len (total [total .merge_status == "both" ]), len (total [total .merge_status != "both" ]),
711
709
len (h1_df ), len (h2_df )
712
710
))
713
711
@@ -895,21 +893,23 @@ def map_tcp_packets(
895
893
mode = "hash"
896
894
# con1: TcpConnection, con2: TcpConnection
897
895
) -> pd .DataFrame :
896
+ '''
897
+ '''
898
898
if mode == "hash" :
899
899
res = map_tcp_packets_via_hash (sender_df , receiver_df , explain )
900
900
else :
901
901
res = map_tcp_packets_score_based (sender_df , receiver_df , explain )
902
902
903
- log .info ("Merged packets. Resulting dataframe of size {} generated from {} and {}" .format (
903
+ log .info ("Merged dataframe of size {} generated from {} and {} sources. " .format (
904
904
len (res ), len (sender_df ), len (receiver_df )
905
905
))
906
906
log .info ("{} unmapped packets. " .format (
907
- len (res [res ._merge == "left_only" ]) + len (res [res ._merge == "right_only" ])
907
+ len (res [res .merge_status == "left_only" ]) + len (res [res .merge_status == "right_only" ])
908
908
))
909
909
910
910
def _show_unmapped_pkts ():
911
- print (res [res ._merge == "left_only" ])
912
- print (res [res ._merge == "right_only" ])
911
+ print (res [res .merge_status == "left_only" ])
912
+ print (res [res .merge_status == "right_only" ])
913
913
914
914
_show_unmapped_pkts ()
915
915
@@ -942,9 +942,9 @@ def map_tcp_packets_via_hash(
942
942
# suffixes=(SENDER_SUFFIX, RECEIVER_SUFFIX), # columns suffixes (sender/receiver)
943
943
suffixes = (HOST1_SUFFIX , HOST2_SUFFIX ), # columns suffixes (sender/receiver)
944
944
how = "outer" , # we want to keep packets from both
945
- # we want to know how many packets were not mapped correctly, adds the _merge column
945
+ # we want to know how many packets were not mapped correctly, adds the merge column
946
946
# can take values "left_only"/ "right_only" or both
947
- indicator = True ,
947
+ indicator = "merge_status" ,
948
948
# TODO reestablish
949
949
validate = "one_to_one" , # can slow process
950
950
)
@@ -954,7 +954,8 @@ def map_tcp_packets_via_hash(
954
954
## print(receiver_df[['hash', 'packetid']].head(20))
955
955
956
956
log .debug ("Just after hash" )
957
- log .debug (res .columns )
957
+ debug_dataframe (res , "Just after hash" )
958
+ # log.debug(res.columns)
958
959
# print(res[TCP_DEBUG_FIELDS].head(20))
959
960
return res
960
961
@@ -1153,7 +1154,7 @@ def classify_reinjections(df_all: pd.DataFrame) -> pd.DataFrame:
1153
1154
df_all ["reinj_delta" ] = np .nan
1154
1155
1155
1156
# rename to df_both ?
1156
- df = df_all [df_all ._merge == "both" ]
1157
+ df = df_all [df_all .merge_status == "both" ]
1157
1158
1158
1159
# print(df_all[ pd.notnull(df_all[_sender("reinjection_of")])] [
1159
1160
# _sender(["reinjection_of", "reinjected_in", "packetid", "reltime"]) +
@@ -1192,7 +1193,7 @@ def classify_reinjections(df_all: pd.DataFrame) -> pd.DataFrame:
1192
1193
1193
1194
# if it was correctly mapped
1194
1195
# TODO why reinjection._merge doesn't exist ?
1195
- if reinjection ._1 != "both" :
1196
+ if reinjection .merge_status != "both" :
1196
1197
# TODO count missed classifications ?
1197
1198
log .debug ("reinjection %d could not be mapped, giving up..." % (reinjection .packetid ))
1198
1199
continue
@@ -1203,7 +1204,7 @@ def classify_reinjections(df_all: pd.DataFrame) -> pd.DataFrame:
1203
1204
1204
1205
original_packet = df_all .loc [df_all .packetid == initial_packetid ].iloc [0 ]
1205
1206
1206
- if original_packet ._merge != "both" :
1207
+ if original_packet .merge_status != "both" :
1207
1208
# TODO count missed classifications ?
1208
1209
logging .debug ("Original packet %d could not be mapped, giving up..." % (original_packet .packetid ))
1209
1210
continue
0 commit comments