Skip to content

Commit 7e0ba04

Browse files
committed
Hacked a fix to work around pandas limitation
... around to_csv/read_csv and pd.api.types.CategoricalDtype for instance using pd.api.types.CategoricalDtype(categories=ConnectionRoles, ordered=True) to_csv will correctly write the column but read_csv will fail expecting a string pandas-dev/pandas#20826. I ended up writing my own converter
1 parent a3ae624 commit 7e0ba04

File tree

2 files changed

+62
-43
lines changed

2 files changed

+62
-43
lines changed

mptcpanalyzer/cli.py

+27-23
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,17 @@ def do_qualify_reinjections(self, line):
580580
# keep only those that matched both for now
581581

582582
df_all["redundant"] = False
583+
583584
df = df_all[ df_all._merge == "both" ]
585+
print("MATT %d df packets" % len(df))
586+
584587

585-
df.to_excel("temp.xls")
588+
print(df_all[ pd.notnull(df_all[_sender("reinjection_of")])] [
589+
_sender(["reinjection_of", "reinjected_in", "packetid", "reltime"]) +
590+
_receiver(["packetid", "reltime"])
591+
])
592+
# to help debug
593+
# df.to_excel("temp.xls")
586594

587595
def _print_reinjection_comparison(original_packet, reinj):
588596
"""
@@ -607,18 +615,9 @@ def _print_reinjection_comparison(original_packet, reinj):
607615
if getattr(row, _receiver("abstime")) > original_packet[ _receiver("abstime") ]:
608616
print("BUG: this is not a valid reinjection after all ?")
609617

610-
print("debugging ")
618+
# print("debugging ")
611619
print("dataframe size = %d" % len(df))
612620

613-
# print(df.columns)
614-
# print(df[['owd']].head())
615-
# print("MERGED_DF", merged_df[TCP_DEBUG_FIELDS].head(20))
616-
# print(df[mpdata.MPTCP_DEBUG_FIELDS].head(20))
617-
618-
# TODO for debug
619-
# todo we need to add
620-
# res['mptcpdest'] = dest.name
621-
622621
# TODO keep only the ones with "merge_" : "both" ?
623622

624623
# reinjections = df[['tcpstream', "reinjection_of"]].dropna(axis=0, )
@@ -638,12 +637,16 @@ def _print_reinjection_comparison(original_packet, reinj):
638637
for destination in ConnectionRoles:
639638
self.poutput("looking for reinjections towards mptcp %s" % destination)
640639

640+
print(df["mptcpdest"])
641641
sender_df = df[df.mptcpdest == destination]
642642

643643
# print(sender_df[ sender_df.reinjected_in.notna() ][["packetid", "reinjected_in"]])
644644
# print("successful reinjections" % len(reinjected_in))
645645

646646
# select only packets that have been reinjected
647+
648+
print("%d sender_df packets" % len(sender_df))
649+
print(sender_df["reinjection_of"])
647650
reinjected_packets = sender_df.dropna(axis='index', subset=[ _sender("reinjection_of") ])
648651

649652
print("%d reinjected packets" % len(reinjected_packets))
@@ -653,24 +656,24 @@ def _print_reinjection_comparison(original_packet, reinj):
653656
print(reinjected_packets[["packetid", "packetid_receiver", *_receiver(["reinjected_in", "reinjection_of"])]].head())
654657

655658

656-
for row in reinjected_packets.itertuples():
659+
for reinjection in reinjected_packets.itertuples():
657660
# here we look at all the reinjected packets
658661

659-
# print("full row %r" % (row,))
662+
# print("full reinjection %r" % (reinjection,))
660663

661664
# if there are packets in _receiver(reinjected_in), it means the reinjections
662665
# arrived before other similar segments and thus these segments are useless
663666
# it should work because
664-
# useless_reinjections = getattr(row, _receiver("reinjected_in"), [])
667+
# useless_reinjections = getattr(reinjection, _receiver("reinjected_in"), [])
665668

666669
# if it was correctly mapped
667-
# row._merge doesn't exist ?
668-
if row._1 != "both":
670+
# reinjection._merge doesn't exist ?
671+
if reinjection._1 != "both":
669672
# TODO count missed classifications ?
670-
log.debug("reinjection %d could not be mapped, giving up..." % (row.packetid))
673+
log.debug("reinjection %d could not be mapped, giving up..." % (reinjection.packetid))
671674
continue
672675

673-
initial_packetid = row.reinjection_of[0]
676+
initial_packetid = reinjection.reinjection_of[0]
674677
# print("initial_packetid = %r %s" % (initial_packetid, type(initial_packetid)))
675678

676679
#
@@ -683,7 +686,7 @@ def _print_reinjection_comparison(original_packet, reinj):
683686

684687

685688
orig_arrival = getattr(original_packet, _receiver("reltime"))
686-
reinj_arrival = getattr(row, _receiver("reltime"))
689+
reinj_arrival = getattr(reinjection, _receiver("reltime"))
687690

688691

689692
# print("useless_reinjections listing %r" % (useless_reinjections,))
@@ -694,7 +697,8 @@ def _print_reinjection_comparison(original_packet, reinj):
694697

695698
if orig_arrival < reinj_arrival:
696699
print("GOT A MATCH")
697-
sender_df.loc[ sender_df[ _sender("packetid")] == row.packetid, "redundant"] = True
700+
sender_df.loc[ sender_df[ _sender("packetid")] == reinjection.packetid, "redundant"] = True
701+
print("is this where it's wrong ?")
698702

699703

700704
print("results: ", df[ df.redundant == True] )
@@ -718,15 +722,15 @@ def _print_reinjection_comparison(original_packet, reinj):
718722
# df[ df.redundant == False] && df["reinjected_in" + RECEIVER_SUFFIX])
719723

720724
for row in successful_reinjections.itertuples(index=False):
721-
print("full row %r" % (row,))
725+
# print("full row %r" % (row,))
722726

723727
# loc ? this is an array, sort it and take the first one ?
724728
# initial_packetid = getattr(row, _sender("reinjection_of")),
725729
initial_packetid = row.reinjection_of[0]
726-
print("initial_packetid = %r %s" % (initial_packetid, type(initial_packetid)))
730+
# print("initial_packetid = %r %s" % (initial_packetid, type(initial_packetid)))
727731

728732
original_packet = df_all.loc[ df_all.packetid == initial_packetid ].iloc[0]
729-
print("original packet = %r %s" % (original_packet, type(original_packet)))
733+
# print("original packet = %r %s" % (original_packet, type(original_packet)))
730734

731735
_print_reinjection_comparison(original_packet, row)
732736

mptcpanalyzer/data.py

+35-20
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919

2020
pp = pprint.PrettyPrinter(indent=4)
2121

22-
# ['b', 'a']
23-
dtype_role = pd.api.types.CategoricalDtype(categories=ConnectionRoles, ordered=True)
24-
# df1['mptcpdest'] = pd.Categorical(np.nan, ordered=False, categories=ConnectionRoles) ;
22+
# dtype_role = pd.api.types.CategoricalDtype(categories=ConnectionRoles, ordered=True)
23+
dtype_role = pd.api.types.CategoricalDtype(categories=[ x.name for x in ConnectionRoles], ordered=True)
2524

2625

2726
# columns we usually display to debug dataframes
@@ -64,6 +63,7 @@ def _convert_flags(x):
6463

6564
def _convert_to_list(x, field="pass a field to debug"):
6665
"""
66+
Loads x of the form "1,2,5" or None
6767
for instance functools.partial(_convert_to_list, field="reinjectionOf"),
6868
returns np.nan instead of [] to allow for faster filtering
6969
"""
@@ -235,10 +235,11 @@ def load_merged_streams_into_pandas(
235235
# columns=columns,
236236
index=False,
237237
header=True,
238-
239238
sep=tshark_config.delimiter,
240239
)
241240

241+
print("MATT=", dict(merged_df.dtypes))
242+
242243
# print("MERGED_DF", merged_df[TCP_DEBUG_FIELDS].head(20))
243244

244245

@@ -247,8 +248,6 @@ def load_merged_streams_into_pandas(
247248
csv_fields = tshark_config.get_fields("name", "type")
248249
# dtypes = {k: v for k, v in temp.items() if v is not None or k not in ["tcpflags"]}
249250
def _gen_dtypes(fields):
250-
251-
252251
dtypes = {} # type: ignore
253252
for suffix in [ SENDER_SUFFIX, RECEIVER_SUFFIX]:
254253

@@ -257,24 +256,40 @@ def _gen_dtypes(fields):
257256
dtypes.setdefault(suffix_fields(suffix, k), v)
258257

259258
dtypes.update({
259+
# during the merge, we join even unmapped packets so some entries
260+
# may be empty => float64
261+
_sender("packetid"): np.float64,
262+
_receiver("packetid"): np.float64,
263+
# there is a bug currently
264+
# https://github.com/pandas-dev/pandas/pull/20826
260265
'mptcpdest': dtype_role,
261266
'tcpdest': dtype_role,
267+
# '_merge':
262268
})
263269
return dtypes
264270

271+
def _load_list(x, field="set field to debug"):
272+
"""
273+
Contrary to _convert_to_list
274+
"""
275+
res = ast.literal_eval(x) if (x is not None and x != '') else np.nan
276+
return res
277+
265278
with open(cachename) as fd:
266279
import ast
267280
dtypes = _gen_dtypes(csv_fields)
268-
pd.set_option('display.max_rows', 200)
269-
pd.set_option('display.max_colwidth', -1)
281+
282+
# more recent versions can do without it
283+
# pd.set_option('display.max_rows', 200)
284+
# pd.set_option('display.max_colwidth', -1)
270285
print("dtypes=", dict(dtypes))
271286
merged_df = pd.read_csv(
272287
fd,
273-
# skip_blank_lines=True,
288+
skip_blank_lines=True,
274289
# hum not needed with comment='#'
275290
comment='#',
276291
# we don't need 'header' when metadata is with comment
277-
header=0, # read column names from row 2 (before, it's metadata)
292+
# header=0, # read column names from row 2 (before, it's metadata)
278293
# skiprows
279294
sep=tshark_config.delimiter,
280295
# converters={
@@ -288,10 +303,15 @@ def _gen_dtypes(fields):
288303
converters={
289304
_sender("tcpflags"): _convert_flags,
290305
# reinjections, converts to list of integers
291-
_sender("reinjection_of"): ast.literal_eval,
292-
_sender("reinjected_in"): ast.literal_eval,
293-
_receiver("reinjection_of"): ast.literal_eval,
294-
_receiver("reinjected_in"): ast.literal_eval,
306+
_sender("reinjection_of"): functools.partial(_load_list, field="reinjectedOfSender"),
307+
_sender("reinjected_in"): functools.partial(_load_list, field="reinjectedInSender"),
308+
_receiver("reinjection_of"): functools.partial(_load_list, field="reinjectedInReceiver"),
309+
_receiver("reinjected_in"): functools.partial(_load_list, field="reinjectedInReceiver"),
310+
311+
# there is a bug in pandas see https://github.com/pandas-dev/pandas/pull/20826
312+
# where the
313+
"mptcpdest": lambda x: ConnectionRoles[x] if x else np.nan,
314+
295315
# "mptcp.reinjection_of": functools.partial(_convert_to_list, field="reinjectionOf"),
296316
# "mptcp.reinjection_listing": functools.partial(_convert_to_list, field="reinjectedIn"),
297317
# "mptcp.reinjected_in": functools.partial(_convert_to_list, field="reinjectedIn"),
@@ -374,8 +394,6 @@ def load_into_pandas(
374394
try:
375395
with open(csv_filename) as fd:
376396

377-
378-
# TODO use packetid as Index
379397
data = pd.read_csv(
380398
fd,
381399
comment='#',
@@ -388,14 +406,13 @@ def load_into_pandas(
388406
"tcp.flags": _convert_flags,
389407
# reinjections, converts to list of integers
390408
"mptcp.reinjection_of": functools.partial(_convert_to_list, field="reinjectionOf"),
391-
# "mptcp.reinjection_listing": functools.partial(_convert_to_list, field="reinjectedIn"),
392409
"mptcp.reinjected_in": functools.partial(_convert_to_list, field="reinjectedIn"),
393-
# "mptcp.duplicated_dsn": lambda x: list(map(int, x.split(','))) if x is not None else np.nan,
394410
},
395411
# nrows=10, # useful for debugging purpose
396412
)
397413
data.rename(inplace=True, columns=config.get_fields("fullname", "name"))
398414
# we want packetid column to survive merges/dataframe transformation so keepit as a column
415+
# TODO remove ? let other functions do it ?
399416
data.set_index("packetid", drop=False, inplace=True)
400417
log.debug("Column names: %s", data.columns)
401418

@@ -887,8 +904,6 @@ def map_tcp_packets_via_hash(
887904
res = pd.merge(
888905
sender_df, receiver_df,
889906
on="hash",
890-
# right_index=True,
891-
# TODO en fait suffit d'inverser les suffixes, h1, h2
892907
suffixes=(SENDER_SUFFIX, RECEIVER_SUFFIX), # columns suffixes (sender/receiver)
893908
how="outer", # we want to keep packets from both
894909
# we want to know how many packets were not mapped correctly, adds the _merge column

0 commit comments

Comments
 (0)