Skip to content

Commit 8b4554c

Browse files
committed
Add a flag to optionally include or exclude the cudf ID column from CSV output, due to a known issue in cudf & pandas (rapidsai/cudf#11317 & pandas-dev/pandas#37600) this option has no effect on JSON output
1 parent 81d4fd8 commit 8b4554c

File tree

8 files changed

+86
-33
lines changed

8 files changed

+86
-33
lines changed

morpheus/_lib/include/morpheus/io/serializers.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525

2626
namespace morpheus {
2727

28-
std::string df_to_csv(const TableInfo& tbl, bool include_header);
28+
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true);
2929

30-
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header);
30+
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true);
3131

32-
std::string df_to_json(const TableInfo& tbl);
32+
// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
33+
// Pandas: https://github.com/pandas-dev/pandas/issues/37600
34+
std::string df_to_json(const TableInfo& tbl, bool include_index_col = true);
3335

34-
void df_to_json(const TableInfo& tbl, std::ostream& out_stream);
36+
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true);
3537

3638
} // namespace morpheus

morpheus/_lib/include/morpheus/stages/write_to_file.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
4949
*/
5050
WriteToFileStage(const std::string &filename,
5151
std::ios::openmode mode = std::ios::out,
52-
FileTypes file_type = FileTypes::Auto);
52+
FileTypes file_type = FileTypes::Auto,
53+
bool include_index_col = true);
5354

5455
private:
5556
/**
@@ -64,6 +65,7 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
6465
subscribe_fn_t build_operator();
6566

6667
bool m_is_first;
68+
bool m_include_index_col;
6769
std::ofstream m_fstream;
6870
std::function<void(sink_type_t &)> m_write_func;
6971
};
@@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy
8183
const std::string &name,
8284
const std::string &filename,
8385
const std::string &mode = "w",
84-
FileTypes file_type = FileTypes::Auto);
86+
FileTypes file_type = FileTypes::Auto,
87+
bool include_index_col = true);
8588
};
8689

8790
#pragma GCC visibility pop

morpheus/_lib/src/io/serializers.cpp

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
* limitations under the License.
1616
*/
1717

18+
#include <cudf/table/table_view.hpp>
19+
#include <cudf/types.hpp>
20+
#include <memory>
1821
#include <morpheus/io/serializers.hpp>
1922

23+
#include <bits/c++config.h>
2024
#include <pybind11/gil.h>
2125
#include <pybind11/pybind11.h>
2226
#include <pybind11/pytypes.h>
2327
#include <cudf/io/csv.hpp>
2428
#include <cudf/io/data_sink.hpp>
2529
#include <rmm/mr/device/per_device_resource.hpp>
2630

31+
#include <numeric>
2732
#include <ostream>
2833
#include <sstream>
2934

@@ -75,38 +80,48 @@ class OStreamSink : public cudf::io::data_sink
7580
size_t m_bytest_written{0};
7681
};
7782

78-
std::string df_to_csv(const TableInfo& tbl, bool include_header)
83+
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
7984
{
8085
// Create an ostringstream and use that with the overload accepting an ostream
8186
std::ostringstream out_stream;
8287

83-
df_to_csv(tbl, out_stream, include_header);
88+
df_to_csv(tbl, out_stream, include_header, include_index_col);
8489

8590
return out_stream.str();
8691
}
8792

88-
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header)
93+
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col)
8994
{
95+
auto column_names = tbl.get_column_names();
96+
cudf::size_type start_col = 1;
97+
if (include_index_col)
98+
{
99+
start_col = 0;
100+
column_names.insert(column_names.begin(), ""s); // insert the id column
101+
}
102+
103+
std::vector<cudf::size_type> col_idexes(column_names.size());
104+
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
105+
auto tbl_view = tbl.get_view().select(col_idexes);
106+
90107
OStreamSink sink(out_stream);
91108
auto destination = cudf::io::sink_info(&sink);
92-
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view())
109+
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
93110
.include_header(include_header)
94111
.true_value("True"s)
95112
.false_value("False"s);
96113

97114
cudf::io::table_metadata metadata{};
98115
if (include_header)
99116
{
100-
auto column_names = tbl.get_column_names();
101-
column_names.insert(column_names.begin(), ""s); // insert the id column
102117
metadata.column_names = column_names;
103118
options_builder = options_builder.metadata(&metadata);
104119
}
105120

106121
cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource());
107122
}
108123

109-
std::string df_to_json(const TableInfo& tbl)
124+
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
110125
{
111126
std::string results;
112127
// no cpp impl for to_json, instead python module converts to pandas and calls to_json
@@ -116,7 +131,7 @@ std::string df_to_json(const TableInfo& tbl)
116131

117132
auto df = tbl.as_py_object();
118133
auto buffer = StringIO();
119-
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true);
134+
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col);
120135
df.attr("to_json")(buffer, **kwargs);
121136
buffer.attr("seek")(0);
122137

@@ -127,11 +142,11 @@ std::string df_to_json(const TableInfo& tbl)
127142
return results;
128143
}
129144

130-
void df_to_json(const TableInfo& tbl, std::ostream& out_stream)
145+
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col)
131146
{
132147
// Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation
133148
// of to_json
134-
std::string output = df_to_json(tbl);
149+
std::string output = df_to_json(tbl, include_index_col);
135150

136151
// Now write the contents to the stream
137152
out_stream.write(output.data(), output.size());

morpheus/_lib/src/python_modules/stages.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ PYBIND11_MODULE(stages, m)
167167
py::arg("builder"),
168168
py::arg("name"),
169169
py::arg("filename"),
170-
py::arg("mode") = "w",
171-
py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime
170+
py::arg("mode") = "w",
171+
py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime
172+
py::arg("include_index_col") = true);
172173

173174
#ifdef VERSION_INFO
174175
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);

morpheus/_lib/src/stages/write_to_file.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
namespace morpheus {
2828
// Component public implementations
2929
// ************ WriteToFileStage **************************** //
30-
WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) :
30+
WriteToFileStage::WriteToFileStage(const std::string &filename,
31+
std::ios::openmode mode,
32+
FileTypes file_type,
33+
bool include_index_col) :
3134
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
32-
m_is_first(true)
35+
m_is_first(true),
36+
m_include_index_col(include_index_col)
3337
{
3438
if (file_type == FileTypes::Auto)
3539
{
@@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo
5963
void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg)
6064
{
6165
// Call df_to_json passing our fstream
62-
df_to_json(msg->get_info(), m_fstream);
66+
df_to_json(msg->get_info(), m_fstream, m_include_index_col);
6367
}
6468

6569
void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg)
6670
{
6771
// Call df_to_csv passing our fstream
68-
df_to_csv(msg->get_info(), m_fstream, m_is_first);
72+
df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col);
6973
}
7074

7175
void WriteToFileStage::close()
@@ -102,7 +106,8 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
102106
const std::string &name,
103107
const std::string &filename,
104108
const std::string &mode,
105-
FileTypes file_type)
109+
FileTypes file_type,
110+
bool include_index_col)
106111
{
107112
std::ios::openmode fsmode = std::ios::out;
108113

@@ -138,7 +143,7 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
138143
throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode);
139144
}
140145

141-
auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type);
146+
auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type, include_index_col);
142147

143148
return stage;
144149
}

morpheus/cli.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,12 @@ def validate(ctx: click.Context, **kwargs):
13211321
@click.command(short_help="Write all messages to a file", **command_kwargs)
13221322
@click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to")
13231323
@click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file")
1324+
@click.option('--include-index-col',
1325+
'include_index_col',
1326+
default=True,
1327+
type=bool,
1328+
help=("Includes dataframe's index column in the output "
1329+
"Note: this currently only works for CSV file output"))
13241330
@prepare_command()
13251331
def to_file(ctx: click.Context, **kwargs):
13261332

morpheus/io/serializers.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import cudf
2020

2121

22-
def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]:
22+
def df_to_csv(df: cudf.DataFrame,
23+
include_header=False,
24+
strip_newline=False,
25+
include_index_col=True) -> typing.List[str]:
2326
"""
2427
Serializes a DataFrame into CSV and returns the serialized output seperated by lines.
2528
@@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
3134
Whether or not to include the header, by default False.
3235
strip_newline : bool, optional
3336
Whether or not to strip the newline characters from each string, by default False.
37+
include_index_col: bool, optional
38+
Write out the index as a column, by default True.
3439
3540
Returns
3641
-------
3742
typing.List[str]
3843
List of strings for each line
3944
"""
40-
results = df.to_csv(header=include_header)
45+
results = df.to_csv(header=include_header, index=include_index_col)
4146
if strip_newline:
4247
results = results.split("\n")
4348
else:
@@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
4651
return results
4752

4853

49-
def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
54+
def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]:
5055
"""
5156
Serializes a DataFrame into JSON and returns the serialized output seperated by lines.
5257
@@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
5661
Input DataFrame to serialize.
5762
strip_newline : bool, optional
5863
Whether or not to strip the newline characters from each string, by default False.
59-
64+
include_index_col: bool, optional
65+
Write out the index as a column, by default True.
66+
Note: This value is currently being ignored due to a known issue in Pandas:
67+
https://github.com/pandas-dev/pandas/issues/37600
6068
Returns
6169
-------
6270
typing.List[str]
@@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
6573
str_buf = StringIO()
6674

6775
# Convert to list of json string objects
68-
df.to_json(str_buf, orient="records", lines=True)
76+
df.to_json(str_buf, orient="records", lines=True, index=include_index_col)
6977

7078
# Start from beginning
7179
str_buf.seek(0)

morpheus/stages/output/write_to_file_stage.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ class WriteToFileStage(SinglePortStage):
4949
5050
"""
5151

52-
def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTypes = FileTypes.Auto):
52+
def __init__(self,
53+
c: Config,
54+
filename: str,
55+
overwrite: bool,
56+
file_type: FileTypes = FileTypes.Auto,
57+
include_index_col: bool = True):
5358

5459
super().__init__(c)
5560

@@ -69,6 +74,7 @@ def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTyp
6974
self._file_type = determine_file_type(self._output_file)
7075

7176
self._is_first = True
77+
self._include_index_col = include_index_col
7278

7379
@property
7480
def name(self) -> str:
@@ -91,9 +97,11 @@ def supports_cpp_node(self):
9197

9298
def _convert_to_strings(self, df: typing.Union[pd.DataFrame, cudf.DataFrame]):
9399
if (self._file_type == FileTypes.JSON):
94-
output_strs = serializers.df_to_json(df)
100+
output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col)
95101
elif (self._file_type == FileTypes.CSV):
96-
output_strs = serializers.df_to_csv(df, include_header=self._is_first)
102+
output_strs = serializers.df_to_csv(df,
103+
include_header=self._is_first,
104+
include_index_col=self._include_index_col)
97105
self._is_first = False
98106
else:
99107
raise NotImplementedError("Unknown file type: {}".format(self._file_type))
@@ -110,7 +118,12 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea
110118

111119
# Sink to file
112120
if (self._build_cpp_node()):
113-
to_file = _stages.WriteToFileStage(builder, self.unique_name, self._output_file, "w", self._file_type)
121+
to_file = _stages.WriteToFileStage(builder,
122+
self.unique_name,
123+
self._output_file,
124+
"w",
125+
self._file_type,
126+
self._include_index_col)
114127
else:
115128

116129
def node_fn(obs: srf.Observable, sub: srf.Subscriber):

0 commit comments

Comments
 (0)