Skip to content

Commit c94821c

Browse files
dvora-hchayimszumkasilehttimgates42
authored
Add support for async GRAPH module (#2273)
* Add support for async graph * linters * fix docstring * Use retry mechanism in async version of Connection objects (#2271) * fix is_connected (#2278) * fix: workaround asyncio bug on connection reset by peer (#2259) Fixes #2237 * Fix crash: key expire while search (#2270) * fix expire while search * sleep * docs: Fix a few typos (#2274) * docs: Fix a few typos There are small typos in: - redis/cluster.py - redis/commands/core.py - redis/ocsp.py - tests/test_cluster.py Fixes: - Should read `validity` rather than `valididy`. - Should read `reinitialize` rather than `reinitilize`. - Should read `farthest` rather than `farest`. - Should read `commands` rather than `comamnds`. * Update core.py * async_cluster: fix concurrent pipeline (#2280) - each pipeline should create separate stacks for each node * Add support for TIMESERIES 1.8 (#2296) * Add support for timeseries 1.8 * fix info * linters * linters * fix info test * type hints * linters * Remove verbose logging from `redis-py/redis/cluster.py` (#2238) * removed the logging module and its corresponding methods * updated CHANGES * except block for RedisClusterException and BusyLoadingError removed * removed unused import (redis.exceptions.BusyLoadingError) * empty commit to re-trigger Actions workflow * replaced BaseException with Exception * empty commit to re-trigger Actions workflow * empty commit to re-trigger Actions workflow * redundant logic removed * re-trigger pipeline * reverted changes * re-trigger pipeline * except logic changed * redis stream example (#2269) * redis stream example * redis stream example on docs/examples.rst Co-authored-by: pedro.frazao <[email protected]> * Fix: `start_id` type for `XAUTOCLAIM` (#2257) * Changed start_id type for xautoclaim * Added to changes Co-authored-by: dvora-h <[email protected]> * Doc add timeseries example (#2267) * DOC add timeseries example * DOC add timeseries examples * Apply suggestions * Fix typo Detention period => Retention period Co-authored-by: Gauthier Imbert <gauthier@PC17> * Fix warnings and resource usage problems in asyncio unittests (#2258) * Use pytest-asyncio in auto mode Remove overly genereric `pytestmark=pytest.mark.asyncio` causing lots of warning noise * Use "Factories as Fixtures" test pattern for the `create_redis` fixture this fixture is now async, avoiding teardown problems with missing event loops. * Fix sporadic error on fast event loops, such as `--uvloop` * Close connection, even if "username" was in kwargs This fixes a resource usage warning in the async unittests. * Do async cleanup of acl passwords via a fixture * Remove unused import, fix whitespace * Fix test with missing "await" * Close pubsub objects after use in unittest Use a simple fixture where possible, otherwise manually call pubsub.close() * re-introduce `pytestmark=pytest.mark.asyncio` for python 3.6 * Use context manager to clean up connections in connection pool for unit tests * Provide asynccontextmanager for python 3.6 * make `test_late_subscribe()` more robuste * Catch a couple of additional leaked resources * Graph - add counters for removed labels and properties (#2292) * grpah - add counters for removed labels and properties * added mock graph result set statistics * docstrings for graph result set statistics * format * isort * moved docstrings into functions * cleaning up the readme and moving docs into readthedocs (#2291) * cleaning up the readme and moving docs into readthedocs * examples at the end as per pr comments * async_cluster: fix max_connections/ssl & improve args (#2217) * async_cluster: fix max_connections/ssl & improve args - set proper connection_class if ssl = True - pass max_connections/connection_class to ClusterNode - recreate startup_nodes to properly initialize - pass parser_class to Connection instead of changing it in on_connect - only pass redis_connect_func if read_from_replicas = True - add connection_error_retry_attempts parameter - skip is_connected check in acquire_connection as it is already checked in send_packed_command BREAKING: - RedisCluster args except host & port are kw-only now - RedisCluster will no longer accept unknown arguments - RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url - RedisCluster.require_full_coverage defaults to True - ClusterNode args except host, port, & server_type are kw-only now * async_cluster: remove kw-only requirement from client Co-authored-by: dvora-h <[email protected]> * fix review comments * fix * fix review comments * fix review comments Co-authored-by: Chayim <[email protected]> Co-authored-by: szumka <[email protected]> Co-authored-by: Mehdi ABAAKOUK <[email protected]> Co-authored-by: Tim Gates <[email protected]> Co-authored-by: Utkarsh Gupta <[email protected]> Co-authored-by: Nial Daly <[email protected]> Co-authored-by: pedrofrazao <[email protected]> Co-authored-by: pedro.frazao <[email protected]> Co-authored-by: Антон Безденежных <[email protected]> Co-authored-by: Iglesys <[email protected]> Co-authored-by: Gauthier Imbert <gauthier@PC17> Co-authored-by: Kristján Valur Jónsson <[email protected]> Co-authored-by: DvirDukhan <[email protected]>
1 parent f9f9d06 commit c94821c

File tree

5 files changed

+1028
-135
lines changed

5 files changed

+1028
-135
lines changed

redis/commands/graph/__init__.py

Lines changed: 111 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from ..helpers import quote_string, random_string, stringify_param_value
2-
from .commands import GraphCommands
2+
from .commands import AsyncGraphCommands, GraphCommands
33
from .edge import Edge # noqa
44
from .node import Node # noqa
55
from .path import Path # noqa
66

7+
DB_LABELS = "DB.LABELS"
8+
DB_RAELATIONSHIPTYPES = "DB.RELATIONSHIPTYPES"
9+
DB_PROPERTYKEYS = "DB.PROPERTYKEYS"
10+
711

812
class Graph(GraphCommands):
913
"""
@@ -44,25 +48,19 @@ def _refresh_labels(self):
4448
lbls = self.labels()
4549

4650
# Unpack data.
47-
self._labels = [None] * len(lbls)
48-
for i, l in enumerate(lbls):
49-
self._labels[i] = l[0]
51+
self._labels = [l[0] for _, l in enumerate(lbls)]
5052

5153
def _refresh_relations(self):
5254
rels = self.relationship_types()
5355

5456
# Unpack data.
55-
self._relationship_types = [None] * len(rels)
56-
for i, r in enumerate(rels):
57-
self._relationship_types[i] = r[0]
57+
self._relationship_types = [r[0] for _, r in enumerate(rels)]
5858

5959
def _refresh_attributes(self):
6060
props = self.property_keys()
6161

6262
# Unpack data.
63-
self._properties = [None] * len(props)
64-
for i, p in enumerate(props):
65-
self._properties[i] = p[0]
63+
self._properties = [p[0] for _, p in enumerate(props)]
6664

6765
def get_label(self, idx):
6866
"""
@@ -108,12 +106,12 @@ def get_property(self, idx):
108106
The index of the property
109107
"""
110108
try:
111-
propertie = self._properties[idx]
109+
p = self._properties[idx]
112110
except IndexError:
113111
# Refresh properties.
114112
self._refresh_attributes()
115-
propertie = self._properties[idx]
116-
return propertie
113+
p = self._properties[idx]
114+
return p
117115

118116
def add_node(self, node):
119117
"""
@@ -133,6 +131,8 @@ def add_edge(self, edge):
133131
self.edges.append(edge)
134132

135133
def _build_params_header(self, params):
134+
if params is None:
135+
return ""
136136
if not isinstance(params, dict):
137137
raise TypeError("'params' must be a dict")
138138
# Header starts with "CYPHER"
@@ -147,16 +147,109 @@ def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
147147
q = f"CALL {procedure}({','.join(args)})"
148148

149149
y = kwagrs.get("y", None)
150-
if y:
151-
q += f" YIELD {','.join(y)}"
150+
if y is not None:
151+
q += f"YIELD {','.join(y)}"
152152

153153
return self.query(q, read_only=read_only)
154154

155155
def labels(self):
156-
return self.call_procedure("db.labels", read_only=True).result_set
156+
return self.call_procedure(DB_LABELS, read_only=True).result_set
157157

158158
def relationship_types(self):
159-
return self.call_procedure("db.relationshipTypes", read_only=True).result_set
159+
return self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True).result_set
160160

161161
def property_keys(self):
162-
return self.call_procedure("db.propertyKeys", read_only=True).result_set
162+
return self.call_procedure(DB_PROPERTYKEYS, read_only=True).result_set
163+
164+
165+
class AsyncGraph(Graph, AsyncGraphCommands):
166+
"""Async version for Graph"""
167+
168+
async def _refresh_labels(self):
169+
lbls = await self.labels()
170+
171+
# Unpack data.
172+
self._labels = [l[0] for _, l in enumerate(lbls)]
173+
174+
async def _refresh_attributes(self):
175+
props = await self.property_keys()
176+
177+
# Unpack data.
178+
self._properties = [p[0] for _, p in enumerate(props)]
179+
180+
async def _refresh_relations(self):
181+
rels = await self.relationship_types()
182+
183+
# Unpack data.
184+
self._relationship_types = [r[0] for _, r in enumerate(rels)]
185+
186+
async def get_label(self, idx):
187+
"""
188+
Returns a label by it's index
189+
190+
Args:
191+
192+
idx:
193+
The index of the label
194+
"""
195+
try:
196+
label = self._labels[idx]
197+
except IndexError:
198+
# Refresh labels.
199+
await self._refresh_labels()
200+
label = self._labels[idx]
201+
return label
202+
203+
async def get_property(self, idx):
204+
"""
205+
Returns a property by it's index
206+
207+
Args:
208+
209+
idx:
210+
The index of the property
211+
"""
212+
try:
213+
p = self._properties[idx]
214+
except IndexError:
215+
# Refresh properties.
216+
await self._refresh_attributes()
217+
p = self._properties[idx]
218+
return p
219+
220+
async def get_relation(self, idx):
221+
"""
222+
Returns a relationship type by it's index
223+
224+
Args:
225+
226+
idx:
227+
The index of the relation
228+
"""
229+
try:
230+
relationship_type = self._relationship_types[idx]
231+
except IndexError:
232+
# Refresh relationship types.
233+
await self._refresh_relations()
234+
relationship_type = self._relationship_types[idx]
235+
return relationship_type
236+
237+
async def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
238+
args = [quote_string(arg) for arg in args]
239+
q = f"CALL {procedure}({','.join(args)})"
240+
241+
y = kwagrs.get("y", None)
242+
if y is not None:
243+
f"YIELD {','.join(y)}"
244+
return await self.query(q, read_only=read_only)
245+
246+
async def labels(self):
247+
return ((await self.call_procedure(DB_LABELS, read_only=True))).result_set
248+
249+
async def property_keys(self):
250+
return (await self.call_procedure(DB_PROPERTYKEYS, read_only=True)).result_set
251+
252+
async def relationship_types(self):
253+
return (
254+
await self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True)
255+
).result_set

redis/commands/graph/commands.py

Lines changed: 123 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,16 @@
33

44
from .exceptions import VersionMismatchException
55
from .execution_plan import ExecutionPlan
6-
from .query_result import QueryResult
6+
from .query_result import AsyncQueryResult, QueryResult
7+
8+
PROFILE_CMD = "GRAPH.PROFILE"
9+
RO_QUERY_CMD = "GRAPH.RO_QUERY"
10+
QUERY_CMD = "GRAPH.QUERY"
11+
DELETE_CMD = "GRAPH.DELETE"
12+
SLOWLOG_CMD = "GRAPH.SLOWLOG"
13+
CONFIG_CMD = "GRAPH.CONFIG"
14+
LIST_CMD = "GRAPH.LIST"
15+
EXPLAIN_CMD = "GRAPH.EXPLAIN"
716

817

918
class GraphCommands:
@@ -52,33 +61,28 @@ def query(self, q, params=None, timeout=None, read_only=False, profile=False):
5261
query = q
5362

5463
# handle query parameters
55-
if params is not None:
56-
query = self._build_params_header(params) + query
64+
query = self._build_params_header(params) + query
5765

5866
# construct query command
5967
# ask for compact result-set format
6068
# specify known graph version
6169
if profile:
62-
cmd = "GRAPH.PROFILE"
70+
cmd = PROFILE_CMD
6371
else:
64-
cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
72+
cmd = RO_QUERY_CMD if read_only else QUERY_CMD
6573
command = [cmd, self.name, query, "--compact"]
6674

6775
# include timeout is specified
68-
if timeout:
69-
if not isinstance(timeout, int):
70-
raise Exception("Timeout argument must be a positive integer")
71-
command += ["timeout", timeout]
76+
if isinstance(timeout, int):
77+
command.extend(["timeout", timeout])
78+
elif timeout is not None:
79+
raise Exception("Timeout argument must be a positive integer")
7280

7381
# issue query
7482
try:
7583
response = self.execute_command(*command)
7684
return QueryResult(self, response, profile)
7785
except ResponseError as e:
78-
if "wrong number of arguments" in str(e):
79-
print(
80-
"Note: RedisGraph Python requires server version 2.2.8 or above"
81-
) # noqa
8286
if "unknown command" in str(e) and read_only:
8387
# `GRAPH.RO_QUERY` is unavailable in older versions.
8488
return self.query(q, params, timeout, read_only=False)
@@ -106,7 +110,7 @@ def delete(self):
106110
For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa
107111
"""
108112
self._clear_schema()
109-
return self.execute_command("GRAPH.DELETE", self.name)
113+
return self.execute_command(DELETE_CMD, self.name)
110114

111115
# declared here, to override the built in redis.db.flush()
112116
def flush(self):
@@ -146,7 +150,7 @@ def slowlog(self):
146150
3. The issued query.
147151
4. The amount of time needed for its execution, in milliseconds.
148152
"""
149-
return self.execute_command("GRAPH.SLOWLOG", self.name)
153+
return self.execute_command(SLOWLOG_CMD, self.name)
150154

151155
def config(self, name, value=None, set=False):
152156
"""
@@ -170,14 +174,14 @@ def config(self, name, value=None, set=False):
170174
raise DataError(
171175
"``value`` can be provided only when ``set`` is True"
172176
) # noqa
173-
return self.execute_command("GRAPH.CONFIG", *params)
177+
return self.execute_command(CONFIG_CMD, *params)
174178

175179
def list_keys(self):
176180
"""
177181
Lists all graph keys in the keyspace.
178182
For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa
179183
"""
180-
return self.execute_command("GRAPH.LIST")
184+
return self.execute_command(LIST_CMD)
181185

182186
def execution_plan(self, query, params=None):
183187
"""
@@ -188,10 +192,9 @@ def execution_plan(self, query, params=None):
188192
query: the query that will be executed
189193
params: query parameters
190194
"""
191-
if params is not None:
192-
query = self._build_params_header(params) + query
195+
query = self._build_params_header(params) + query
193196

194-
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
197+
plan = self.execute_command(EXPLAIN_CMD, self.name, query)
195198
if isinstance(plan[0], bytes):
196199
plan = [b.decode() for b in plan]
197200
return "\n".join(plan)
@@ -206,8 +209,105 @@ def explain(self, query, params=None):
206209
query: the query that will be executed
207210
params: query parameters
208211
"""
209-
if params is not None:
210-
query = self._build_params_header(params) + query
212+
query = self._build_params_header(params) + query
213+
214+
plan = self.execute_command(EXPLAIN_CMD, self.name, query)
215+
return ExecutionPlan(plan)
216+
217+
218+
class AsyncGraphCommands(GraphCommands):
219+
async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
220+
"""
221+
Executes a query against the graph.
222+
For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa
223+
224+
Args:
225+
226+
q : str
227+
The query.
228+
params : dict
229+
Query parameters.
230+
timeout : int
231+
Maximum runtime for read queries in milliseconds.
232+
read_only : bool
233+
Executes a readonly query if set to True.
234+
profile : bool
235+
Return details on results produced by and time
236+
spent in each operation.
237+
"""
238+
239+
# maintain original 'q'
240+
query = q
241+
242+
# handle query parameters
243+
query = self._build_params_header(params) + query
244+
245+
# construct query command
246+
# ask for compact result-set format
247+
# specify known graph version
248+
if profile:
249+
cmd = PROFILE_CMD
250+
else:
251+
cmd = RO_QUERY_CMD if read_only else QUERY_CMD
252+
command = [cmd, self.name, query, "--compact"]
253+
254+
# include timeout is specified
255+
if isinstance(timeout, int):
256+
command.extend(["timeout", timeout])
257+
elif timeout is not None:
258+
raise Exception("Timeout argument must be a positive integer")
259+
260+
# issue query
261+
try:
262+
response = await self.execute_command(*command)
263+
return await AsyncQueryResult().initialize(self, response, profile)
264+
except ResponseError as e:
265+
if "unknown command" in str(e) and read_only:
266+
# `GRAPH.RO_QUERY` is unavailable in older versions.
267+
return await self.query(q, params, timeout, read_only=False)
268+
raise e
269+
except VersionMismatchException as e:
270+
# client view over the graph schema is out of sync
271+
# set client version and refresh local schema
272+
self.version = e.version
273+
self._refresh_schema()
274+
# re-issue query
275+
return await self.query(q, params, timeout, read_only)
276+
277+
async def execution_plan(self, query, params=None):
278+
"""
279+
Get the execution plan for given query,
280+
GRAPH.EXPLAIN returns an array of operations.
281+
282+
Args:
283+
query: the query that will be executed
284+
params: query parameters
285+
"""
286+
query = self._build_params_header(params) + query
211287

212-
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
288+
plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
289+
if isinstance(plan[0], bytes):
290+
plan = [b.decode() for b in plan]
291+
return "\n".join(plan)
292+
293+
async def explain(self, query, params=None):
294+
"""
295+
Get the execution plan for given query,
296+
GRAPH.EXPLAIN returns ExecutionPlan object.
297+
298+
Args:
299+
query: the query that will be executed
300+
params: query parameters
301+
"""
302+
query = self._build_params_header(params) + query
303+
304+
plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
213305
return ExecutionPlan(plan)
306+
307+
async def flush(self):
308+
"""
309+
Commit the graph and reset the edges and the nodes to zero length.
310+
"""
311+
await self.commit()
312+
self.nodes = {}
313+
self.edges = []

0 commit comments

Comments
 (0)