Skip to content

Commit f39e24c

Browse files
committed
[WIP]: Added cyberpandas support
* Updated example for change in import * Added dependency * refactored dataframe column check to assert dtypes Waiting on pandas-dev/pandas#20556
1 parent c854c57 commit f39e24c

File tree

8 files changed

+51
-24
lines changed

8 files changed

+51
-24
lines changed

conda/meta.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ requirements:
1919
- intake
2020
- python
2121
- pandas
22+
- cyberpandas
2223
- dask
2324
- pcapy
2425

examples/dump-live.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pcapy
44

5-
from intake_pcap import IPPacket
5+
from intake_pcap.packet import IPPacket
66

77

88
if __name__ == '__main__':

intake_pcap/stream.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from collections import namedtuple, OrderedDict
22

33
import pandas as pd
4+
from cyberpandas import to_ipaddress
45

56
import pcapy
67

@@ -22,9 +23,9 @@ def __init__(self, reader, protocol, payload):
2223
def dtype(self):
2324
items = [
2425
('time', 'datetime64[ns]'),
25-
('src_host', 'object'),
26+
('src_host', 'ip'),
2627
('src_port', 'u4'),
27-
('dst_host', 'object'),
28+
('dst_host', 'ip'),
2829
('dst_port', 'u4'),
2930
('protocol', 'str')]
3031

@@ -82,7 +83,15 @@ def decoder(header, data):
8283

8384
columns = FullPacket._fields if self._payload else BasePacket._fields
8485
df = pd.DataFrame(packets, columns=columns)
85-
return df.astype(dtype=self.dtype)
86+
87+
# DataFrame.astype doesn't work with extension types (yet).
88+
# https://github.com/pandas-dev/pandas/issues/20557
89+
known_types = {k: v for k, v in self.dtype.items()
90+
if k not in ('src_host', 'dst_host')}
91+
df = df.astype(known_types)
92+
df['src_host'] = to_ipaddress(df['src_host'])
93+
df['dst_host'] = to_ipaddress(df['dst_host'])
94+
return df
8695

8796

8897
class LiveStream(PacketStream):

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
intake
22
pandas
3+
cyberpandas
34
dask
45
libpcap
56
pcapy

tests/test_catalog.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from intake.catalog import Catalog
66

7-
from .utils import dataframe_has_required_columns
7+
from .utils import assert_dataframe_has_required_columns
88

99

1010
@pytest.fixture
@@ -20,7 +20,7 @@ def test_raw_http(catalog1):
2020
assert metadata['npartitions'] == 1
2121

2222
df = src.read()
23-
assert dataframe_has_required_columns(df, payload=True)
23+
assert_dataframe_has_required_columns(df, payload=True)
2424
assert len(df) == 43
2525

2626
src.close()
@@ -33,7 +33,7 @@ def test_tcp_http(catalog1):
3333
assert metadata['npartitions'] == 1
3434

3535
df = src.read()
36-
assert dataframe_has_required_columns(df, payload=False)
36+
assert_dataframe_has_required_columns(df, payload=False)
3737
assert len(df) == 41
3838

3939
src.close()

tests/test_offline.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,41 @@
1-
from .utils import dataframe_has_required_columns
1+
from .utils import assert_dataframe_has_required_columns
22

33

44
def test_offline_unfiltered(ping_stream):
55
df = ping_stream.to_dataframe()
6-
assert dataframe_has_required_columns(df, payload=False)
6+
assert_dataframe_has_required_columns(df, payload=False)
77
assert len(df) == 96
88

99

1010
def test_offline_filter_tcp(http_stream):
1111
http_stream.set_filter("tcp")
1212
df = http_stream.to_dataframe()
13-
assert dataframe_has_required_columns(df, payload=False)
13+
assert_dataframe_has_required_columns(df, payload=False)
1414
assert len(df) == 41
1515

1616

1717
def test_offline_filter_udp(http_stream):
1818
http_stream.set_filter("udp")
1919
df = http_stream.to_dataframe()
20-
assert dataframe_has_required_columns(df, payload=False)
20+
assert_dataframe_has_required_columns(df, payload=False)
2121
assert len(df) == 2
2222

2323

2424
def test_offline_filter_icmp(http_stream):
2525
http_stream.set_filter("icmp")
2626
df = http_stream.to_dataframe()
27-
assert dataframe_has_required_columns(df, payload=False)
27+
assert_dataframe_has_required_columns(df, payload=False)
2828
assert len(df) == 0
2929

3030

3131
def test_offline_limit(http_stream):
3232
df = http_stream.to_dataframe(n=10)
33-
assert dataframe_has_required_columns(df, payload=False)
33+
assert_dataframe_has_required_columns(df, payload=False)
3434
assert len(df) == 10
3535

3636

3737
def test_offline_filter_vlan(vlan_stream):
3838
vlan_stream.set_filter("tcp")
3939
df = vlan_stream.to_dataframe()
40-
assert dataframe_has_required_columns(df, payload=False)
40+
assert_dataframe_has_required_columns(df, payload=False)
4141
assert len(df) == 18

tests/test_source.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from .utils import dataframe_has_required_columns
1+
from .utils import assert_dataframe_has_required_columns
22

33

44
def test_unfiltered_source(ping_source):
55
metadata = ping_source.discover()
66
assert metadata['npartitions'] == 1
77

88
df = ping_source.read()
9-
assert dataframe_has_required_columns(df, payload=False)
9+
assert_dataframe_has_required_columns(df, payload=False)
1010
assert len(df) == 96
1111

1212
ping_source.close()
@@ -17,7 +17,7 @@ def test_filtered_source(tcp_http_source):
1717
assert metadata['npartitions'] == 1
1818

1919
df = tcp_http_source.read()
20-
assert dataframe_has_required_columns(df, payload=False)
20+
assert_dataframe_has_required_columns(df, payload=False)
2121
assert len(df) == 41
2222

2323
tcp_http_source.close()
@@ -28,7 +28,7 @@ def test_multiple_source(multiple_source):
2828
assert metadata['npartitions'] == 3
2929

3030
df = multiple_source.read()
31-
assert dataframe_has_required_columns(df, payload=False)
31+
assert_dataframe_has_required_columns(df, payload=False)
3232
assert len(df) == 157
3333

3434
multiple_source.close()
@@ -39,11 +39,11 @@ def test_repeated_reads(ping_source):
3939
assert metadata['npartitions'] == 1
4040

4141
df = ping_source.read()
42-
assert dataframe_has_required_columns(df, payload=False)
42+
assert_dataframe_has_required_columns(df, payload=False)
4343
assert len(df) == 96
4444

4545
df = ping_source.read()
46-
assert dataframe_has_required_columns(df, payload=False)
46+
assert_dataframe_has_required_columns(df, payload=False)
4747
assert len(df) == 96
4848

4949
ping_source.close()

tests/utils.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
1-
def dataframe_has_required_columns(df, payload):
2-
columns = ['time', 'src_host', 'src_port', 'dst_host', 'dst_port', 'protocol']
1+
import pandas as pd
2+
import pandas.util.testing as tm
3+
4+
5+
def assert_dataframe_has_required_columns(df, payload):
6+
items = [
7+
('time', 'datetime64[ns]'),
8+
('src_host', 'ip'),
9+
('src_port', 'u4'),
10+
('dst_host', 'ip'),
11+
('dst_port', 'u4'),
12+
('protocol', object)
13+
]
14+
315
if payload:
4-
columns.append("payload")
5-
return set(df.columns) == set(columns)
16+
items.append(("payload", 'object'))
17+
18+
names, types = zip(*items)
19+
20+
expected = pd.Series(types, index=names)
21+
tm.assert_series_equal(df.dtypes, expected)

0 commit comments

Comments
 (0)