forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_odfreader.py
230 lines (184 loc) · 7.49 KB
/
_odfreader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
from typing import List, Optional, Sequence, cast
import numpy as np
from pandas._typing import FilePathOrBuffer, Scalar, StorageOptions, Union
from pandas.compat._optional import import_optional_dependency
import pandas as pd
from pandas.io.excel._base import _BaseExcelReader
class _ODFReader(_BaseExcelReader):
"""
Read tables out of OpenDocument formatted files.
Parameters
----------
filepath_or_buffer : string, path to be parsed or
an open readable stream.
storage_options : dict, optional
passed to fsspec for appropriate URLs (see ``get_filepath_or_buffer``)
"""
def __init__(
self,
filepath_or_buffer: FilePathOrBuffer,
storage_options: StorageOptions = None,
):
import_optional_dependency("odf")
super().__init__(filepath_or_buffer, storage_options=storage_options)
@property
def _workbook_class(self):
from odf.opendocument import OpenDocument
return OpenDocument
def load_workbook(self, filepath_or_buffer: FilePathOrBuffer):
from odf.opendocument import load
return load(filepath_or_buffer)
@property
def empty_value(self) -> str:
"""Property for compat with other readers."""
return ""
@property
def sheet_names(self) -> List[str]:
"""Return a list of sheet names present in the document"""
from odf.table import Table
tables = self.book.getElementsByType(Table)
return [t.getAttribute("name") for t in tables]
def get_sheet_by_index(self, index: int):
from odf.table import Table
tables = self.book.getElementsByType(Table)
return tables[index]
def get_sheet_by_name(self, name: str):
from odf.table import Table
tables = self.book.getElementsByType(Table)
for table in tables:
if table.getAttribute("name") == name:
return table
raise ValueError(f"sheet {name} not found")
def get_sheet_data(
self,
sheet,
convert_float: bool,
header: Optional[Union[int, Sequence[int]]],
skiprows: Optional[Union[int, Sequence[int]]],
nrows: Optional[int],
) -> List[List[Scalar]]:
"""
Parse an ODF Table into a list of lists
"""
from odf.table import CoveredTableCell, TableCell, TableRow
covered_cell_name = CoveredTableCell().qname
table_cell_name = TableCell().qname
cell_names = {covered_cell_name, table_cell_name}
sheet_rows = sheet.getElementsByType(TableRow)
empty_rows = 0
max_row_len = 0
table: List[List[Scalar]] = []
if nrows is not None and isinstance(header, int) and isinstance(skiprows, int):
sheet_rows = sheet_rows[: header + skiprows + nrows + 1]
for i, sheet_row in enumerate(sheet_rows):
if self.should_skip_row(i, header, skiprows, nrows):
table.append([])
continue
sheet_cells = [x for x in sheet_row.childNodes if x.qname in cell_names]
empty_cells = 0
table_row: List[Scalar] = []
for j, sheet_cell in enumerate(sheet_cells):
if sheet_cell.qname == table_cell_name:
value = self._get_cell_value(sheet_cell, convert_float)
else:
value = self.empty_value
column_repeat = self._get_column_repeat(sheet_cell)
# Queue up empty values, writing only if content succeeds them
if value == self.empty_value:
empty_cells += column_repeat
else:
table_row.extend([self.empty_value] * empty_cells)
empty_cells = 0
table_row.extend([value] * column_repeat)
if max_row_len < len(table_row):
max_row_len = len(table_row)
row_repeat = self._get_row_repeat(sheet_row)
if self._is_empty_row(sheet_row):
empty_rows += row_repeat
else:
# add blank rows to our table
table.extend([[self.empty_value]] * empty_rows)
empty_rows = 0
for _ in range(row_repeat):
table.append(table_row)
# Make our table square
for row in table:
if len(row) < max_row_len:
row.extend([self.empty_value] * (max_row_len - len(row)))
return table
def _get_row_repeat(self, row) -> int:
"""
Return number of times this row was repeated
Repeating an empty row appeared to be a common way
of representing sparse rows in the table.
"""
from odf.namespaces import TABLENS
return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
def _get_column_repeat(self, cell) -> int:
from odf.namespaces import TABLENS
return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
def _is_empty_row(self, row) -> bool:
"""
Helper function to find empty rows
"""
for column in row.childNodes:
if len(column.childNodes) > 0:
return False
return True
def _get_cell_value(self, cell, convert_float: bool) -> Scalar:
from odf.namespaces import OFFICENS
if str(cell) == "#N/A":
return np.nan
cell_type = cell.attributes.get((OFFICENS, "value-type"))
if cell_type == "boolean":
if str(cell) == "TRUE":
return True
return False
if cell_type is None:
return self.empty_value
elif cell_type == "float":
# GH5394
cell_value = float(cell.attributes.get((OFFICENS, "value")))
if convert_float:
val = int(cell_value)
if val == cell_value:
return val
return cell_value
elif cell_type == "percentage":
cell_value = cell.attributes.get((OFFICENS, "value"))
return float(cell_value)
elif cell_type == "string":
return self._get_cell_string_value(cell)
elif cell_type == "currency":
cell_value = cell.attributes.get((OFFICENS, "value"))
return float(cell_value)
elif cell_type == "date":
cell_value = cell.attributes.get((OFFICENS, "date-value"))
return pd.to_datetime(cell_value)
elif cell_type == "time":
result = pd.to_datetime(str(cell))
result = cast(pd.Timestamp, result)
return result.time()
else:
raise ValueError(f"Unrecognized type {cell_type}")
def _get_cell_string_value(self, cell) -> str:
"""
Find and decode OpenDocument text:s tags that represent
a run length encoded sequence of space characters.
"""
from odf.element import Element, Text
from odf.namespaces import TEXTNS
from odf.text import P, S
text_p = P().qname
text_s = S().qname
p = cell.childNodes[0]
value = []
if p.qname == text_p:
for k, fragment in enumerate(p.childNodes):
if isinstance(fragment, Text):
value.append(fragment.data)
elif isinstance(fragment, Element):
if fragment.qname == text_s:
spaces = int(fragment.attributes.get((TEXTNS, "c"), 1))
value.append(" " * spaces)
return "".join(value)