forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_odfreader.py
282 lines (232 loc) · 9.04 KB
/
_odfreader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
from __future__ import annotations
import datetime
import re
from typing import (
TYPE_CHECKING,
cast,
)
import numpy as np
from pandas._typing import (
FilePath,
ReadBuffer,
Scalar,
StorageOptions,
)
from pandas.compat._optional import import_optional_dependency
from pandas.util._decorators import doc
import pandas as pd
from pandas.core.shared_docs import _shared_docs
from pandas.io.excel._base import BaseExcelReader
if TYPE_CHECKING:
from odf.opendocument import OpenDocument
from pandas._libs.tslibs.nattype import NaTType
# ODF variant of ISO 8601 time/duration format: "PThhhHmmMss.sssS"
# see https://www.w3.org/TR/xmlschema-2/#duration for details
ODF_ISOTIME_PATTERN = re.compile(
r"^\s*PT\s*(\d+)\s*H\s*(\d+)\s*M\s*(\d+)(\.(\d+))?\s*S$"
)
@doc(storage_options=_shared_docs["storage_options"])
class ODFReader(BaseExcelReader["OpenDocument"]):
def __init__(
self,
filepath_or_buffer: FilePath | ReadBuffer[bytes],
storage_options: StorageOptions | None = None,
engine_kwargs: dict | None = None,
) -> None:
"""
Read tables out of OpenDocument formatted files.
Parameters
----------
filepath_or_buffer : str, path to be parsed or
an open readable stream.
{storage_options}
engine_kwargs : dict, optional
Arbitrary keyword arguments passed to excel engine.
"""
import_optional_dependency("odf")
super().__init__(
filepath_or_buffer,
storage_options=storage_options,
engine_kwargs=engine_kwargs,
)
@property
def _workbook_class(self) -> type[OpenDocument]:
from odf.opendocument import OpenDocument
return OpenDocument
def load_workbook(
self, filepath_or_buffer: FilePath | ReadBuffer[bytes], engine_kwargs
) -> OpenDocument:
from odf.opendocument import load
return load(filepath_or_buffer, **engine_kwargs)
@property
def empty_value(self) -> str:
"""Property for compat with other readers."""
return ""
@property
def sheet_names(self) -> list[str]:
"""Return a list of sheet names present in the document"""
from odf.table import Table
tables = self.book.getElementsByType(Table)
return [t.getAttribute("name") for t in tables]
def get_sheet_by_index(self, index: int):
from odf.table import Table
self.raise_if_bad_sheet_by_index(index)
tables = self.book.getElementsByType(Table)
return tables[index]
def get_sheet_by_name(self, name: str):
from odf.table import Table
self.raise_if_bad_sheet_by_name(name)
tables = self.book.getElementsByType(Table)
for table in tables:
if table.getAttribute("name") == name:
return table
self.close()
raise ValueError(f"sheet {name} not found")
def get_sheet_data(
self, sheet, file_rows_needed: int | None = None
) -> list[list[Scalar | NaTType]]:
"""
Parse an ODF Table into a list of lists
"""
from odf.table import (
CoveredTableCell,
TableCell,
TableRow,
)
covered_cell_name = CoveredTableCell().qname
table_cell_name = TableCell().qname
cell_names = {covered_cell_name, table_cell_name}
sheet_rows = sheet.getElementsByType(TableRow)
empty_rows = 0
max_row_len = 0
table: list[list[Scalar | NaTType]] = []
for sheet_row in sheet_rows:
sheet_cells = [
x
for x in sheet_row.childNodes
if hasattr(x, "qname") and x.qname in cell_names
]
empty_cells = 0
table_row: list[Scalar | NaTType] = []
for sheet_cell in sheet_cells:
if sheet_cell.qname == table_cell_name:
value = self._get_cell_value(sheet_cell)
else:
value = self.empty_value
column_repeat = self._get_column_repeat(sheet_cell)
# Queue up empty values, writing only if content succeeds them
if value == self.empty_value:
empty_cells += column_repeat
else:
table_row.extend([self.empty_value] * empty_cells)
empty_cells = 0
table_row.extend([value] * column_repeat)
if max_row_len < len(table_row):
max_row_len = len(table_row)
row_repeat = self._get_row_repeat(sheet_row)
if len(table_row) == 0:
empty_rows += row_repeat
else:
# add blank rows to our table
table.extend([[self.empty_value]] * empty_rows)
empty_rows = 0
table.extend(table_row for _ in range(row_repeat))
if file_rows_needed is not None and len(table) >= file_rows_needed:
break
# Make our table square
for row in table:
if len(row) < max_row_len:
row.extend([self.empty_value] * (max_row_len - len(row)))
return table
def _get_row_repeat(self, row) -> int:
"""
Return number of times this row was repeated
Repeating an empty row appeared to be a common way
of representing sparse rows in the table.
"""
from odf.namespaces import TABLENS
return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
def _get_column_repeat(self, cell) -> int:
from odf.namespaces import TABLENS
return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
def _get_cell_value(self, cell) -> Scalar | NaTType:
from odf.namespaces import OFFICENS
if str(cell) == "#N/A":
return np.nan
cell_type = cell.attributes.get((OFFICENS, "value-type"))
if cell_type == "boolean":
if str(cell) == "TRUE":
return True
return False
if cell_type is None:
return self.empty_value
elif cell_type == "float":
# GH5394
cell_value = float(cell.attributes.get((OFFICENS, "value")))
val = int(cell_value)
if val == cell_value:
return val
return cell_value
elif cell_type == "percentage":
cell_value = cell.attributes.get((OFFICENS, "value"))
return float(cell_value)
elif cell_type == "string":
return self._get_cell_string_value(cell)
elif cell_type == "currency":
cell_value = cell.attributes.get((OFFICENS, "value"))
return float(cell_value)
elif cell_type == "date":
cell_value = cell.attributes.get((OFFICENS, "date-value"))
return pd.Timestamp(cell_value)
elif cell_type == "time":
stamp = self._get_cell_time_value(cell)
# cast needed here because Scalar doesn't include datetime.time
return cast(Scalar, stamp)
else:
self.close()
raise ValueError(f"Unrecognized type {cell_type}")
def _get_cell_string_value(self, cell) -> str:
"""
Find and decode OpenDocument text:s tags that represent
a run length encoded sequence of space characters.
"""
from odf.element import Element
from odf.namespaces import TEXTNS
from odf.text import S
text_s = S().qname
value = []
for fragment in cell.childNodes:
if isinstance(fragment, Element):
if fragment.qname == text_s:
spaces = int(fragment.attributes.get((TEXTNS, "c"), 1))
value.append(" " * spaces)
else:
# recursive impl needed in case of nested fragments
# with multiple spaces
# https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704
value.append(self._get_cell_string_value(fragment))
else:
value.append(str(fragment).strip("\n"))
return "".join(value)
def _get_cell_time_value(self, cell) -> datetime.time:
"""
This helper function parses ODF time value
"""
from odf.namespaces import OFFICENS
value = cell.attributes.get((OFFICENS, "time-value"))
parts = ODF_ISOTIME_PATTERN.match(value)
if parts is None:
raise ValueError(f"Failed to parse ODF time value: {value}")
hours, minutes, seconds, _, second_part = parts.group(*range(1, 6))
if second_part is None:
microseconds = 0
else:
microseconds = int(int(second_part) * pow(10, 6 - len(second_part)))
return datetime.time(
# ignore date part from some representations
# and datetime.time restrict hour values to 0..23
hour=int(hours) % 24,
minute=int(minutes),
second=int(seconds),
microsecond=microseconds,
)