Skip to content

Add support for async GRAPH module #2273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e48aac
Add support for async graph
dvora-h Jul 12, 2022
f5ee75e
linters
dvora-h Jul 12, 2022
db0b650
Merge branch 'master' into async-graph
chayim Jul 17, 2022
224778f
fix docstring
dvora-h Jul 19, 2022
fb2c74b
Use retry mechanism in async version of Connection objects (#2271)
szumka Jul 21, 2022
439f2ea
fix is_connected (#2278)
dvora-h Jul 21, 2022
d7e4ea1
fix: workaround asyncio bug on connection reset by peer (#2259)
sileht Jul 24, 2022
13941b8
Fix crash: key expire while search (#2270)
dvora-h Jul 24, 2022
e993e2c
docs: Fix a few typos (#2274)
timgates42 Jul 24, 2022
efe7c7a
async_cluster: fix concurrent pipeline (#2280)
utkarshgupta137 Jul 24, 2022
b12b025
Add support for TIMESERIES 1.8 (#2296)
dvora-h Jul 24, 2022
e9f8447
Remove verbose logging from `redis-py/redis/cluster.py` (#2238)
nialdaly Jul 24, 2022
8529170
redis stream example (#2269)
pedrofrazao Jul 24, 2022
8e2ea28
Fix: `start_id` type for `XAUTOCLAIM` (#2257)
GaMeRaM Jul 24, 2022
20fa54f
Doc add timeseries example (#2267)
Iglesys347 Jul 25, 2022
a44b952
Fix warnings and resource usage problems in asyncio unittests (#2258)
kristjanvalur Jul 26, 2022
4a83d67
Graph - add counters for removed labels and properties (#2292)
DvirDukhan Jul 26, 2022
115c259
cleaning up the readme and moving docs into readthedocs (#2291)
chayim Jul 27, 2022
fe93f41
async_cluster: fix max_connections/ssl & improve args (#2217)
utkarshgupta137 Jul 27, 2022
34c5177
fix review comments
dvora-h Jul 27, 2022
24413b1
Merge branch 'master' into async-graph
dvora-h Jul 27, 2022
e60ad1c
fix
dvora-h Jul 27, 2022
2e9cab8
fix review comments
dvora-h Jul 28, 2022
85732f4
fix review comments
dvora-h Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion redis/commands/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ..helpers import quote_string, random_string, stringify_param_value
from .commands import GraphCommands
from .commands import AsyncGraphCommands, GraphCommands
from .edge import Edge # noqa
from .node import Node # noqa
from .path import Path # noqa
Expand Down Expand Up @@ -160,3 +160,102 @@ def relationship_types(self):

def property_keys(self):
return self.call_procedure("db.propertyKeys", read_only=True).result_set


class AsyncGraph(Graph, AsyncGraphCommands):
"""Async version for Graph"""

async def _refresh_labels(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstrings... every function please

lbls = await self.labels()

# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

async def _refresh_attributes(self):
props = await self.property_keys()

# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

async def _refresh_relations(self):
rels = await self.relationship_types()

# Unpack data.
self._relationship_types = [None] * len(rels)
for i, r in enumerate(rels):
self._relationship_types[i] = r[0]

async def get_label(self, idx):
"""
Returns a label by it's index

Args:

idx:
The index of the label
"""
try:
label = self._labels[idx]
except IndexError:
# Refresh labels.
await self._refresh_labels()
label = self._labels[idx]
return label

async def get_property(self, idx):
"""
Returns a property by it's index

Args:

idx:
The index of the property
"""
try:
propertie = self._properties[idx]
except IndexError:
# Refresh properties.
await self._refresh_attributes()
propertie = self._properties[idx]
return propertie

async def get_relation(self, idx):
"""
Returns a relationship type by it's index

Args:

idx:
The index of the relation
"""
try:
relationship_type = self._relationship_types[idx]
except IndexError:
# Refresh relationship types.
await self._refresh_relations()
relationship_type = self._relationship_types[idx]
return relationship_type

async def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
args = [quote_string(arg) for arg in args]
q = f"CALL {procedure}({','.join(args)})"

y = kwagrs.get("y", None)
if y:
q += f" YIELD {','.join(y)}"
return await self.query(q, read_only=read_only)

async def labels(self):
return ((await self.call_procedure("db.labels", read_only=True))).result_set

async def property_keys(self):
return (await self.call_procedure("db.propertyKeys", read_only=True)).result_set

async def relationship_types(self):
return (
await self.call_procedure("db.relationshipTypes", read_only=True)
).result_set
107 changes: 106 additions & 1 deletion redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
from .query_result import QueryResult
from .query_result import AsyncQueryResult, QueryResult


class GraphCommands:
Expand Down Expand Up @@ -211,3 +211,108 @@ def explain(self, query, params=None):

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return ExecutionPlan(plan)


class AsyncGraphCommands(GraphCommands):
async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Executes a query against the graph.
For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa

Args:

q : str
The query.
params : dict
Query parameters.
timeout : int
Maximum runtime for read queries in milliseconds.
read_only : bool
Executes a readonly query if set to True.
profile : bool
Return details on results produced by and time
spent in each operation.
"""

# maintain original 'q'
query = q

# handle query parameters
if params is not None:
query = self._build_params_header(params) + query

# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
cmd = "GRAPH.PROFILE"
else:
cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
command = [cmd, self.name, query, "--compact"]

# include timeout is specified
if timeout:
if not isinstance(timeout, int):
raise Exception("Timeout argument must be a positive integer")
command += ["timeout", timeout]

# issue query
try:
response = await self.execute_command(*command)
return await AsyncQueryResult().initialize(self, response, profile)
except ResponseError as e:
if "wrong number of arguments" in str(e):
print(
"Note: RedisGraph Python requires server version 2.2.8 or above"
) # noqa
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return await self.query(q, params, timeout, read_only=False)
raise e
except VersionMismatchException as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern for possible client side caching in the future... just a thought?

# client view over the graph schema is out of sync
# set client version and refresh local schema
self.version = e.version
self._refresh_schema()
# re-issue query
return await self.query(q, params, timeout, read_only)

async def execution_plan(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns an array of operations.

Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = await self.execute_command("GRAPH.EXPLAIN", self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)

async def explain(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns ExecutionPlan object.

Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = await self.execute_command("GRAPH.EXPLAIN", self.name, query)
return ExecutionPlan(plan)

async def flush(self):
"""
Commit the graph and reset the edges and the nodes to zero length.
"""
await self.commit()
self.nodes = {}
self.edges = []
163 changes: 163 additions & 0 deletions redis/commands/graph/query_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,166 @@ def cached_execution(self):
@property
def run_time_ms(self):
return self._get_stat(INTERNAL_EXECUTION_TIME)


class AsyncQueryResult(QueryResult):
def __init__(self):
pass

async def initialize(self, graph, response, profile=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring + type hints

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type hints will be added in a separate PR.

self.graph = graph
self.header = []
self.result_set = []

# in case of an error an exception will be raised
self._check_for_errors(response)

if len(response) == 1:
self.parse_statistics(response[0])
elif profile:
self.parse_profile(response)
else:
# start by parsing statistics, matches the one we have
self.parse_statistics(response[-1]) # Last element.
await self.parse_results(response)

return self

async def parse_node(self, cell):
# Node ID (integer),
# [label string offset (integer)],
# [[name, value type, value] X N]

node_id = int(cell[0])
labels = None
if len(cell[1]) > 0:
labels = []
for inner_label in cell[1]:
labels.append(await self.graph.get_label(inner_label))
properties = await self.parse_entity_properties(cell[2])
return Node(node_id=node_id, label=labels, properties=properties)

async def parse_scalar(self, cell):
scalar_type = int(cell[0])
value = cell[1]
scalar = None

if scalar_type == ResultSetScalarTypes.VALUE_NULL:
scalar = None

elif scalar_type == ResultSetScalarTypes.VALUE_STRING:
scalar = self.parse_string(value)

elif scalar_type == ResultSetScalarTypes.VALUE_INTEGER:
scalar = int(value)

elif scalar_type == ResultSetScalarTypes.VALUE_BOOLEAN:
value = value.decode() if isinstance(value, bytes) else value
if value == "true":
scalar = True
elif value == "false":
scalar = False
else:
print("Unknown boolean type\n")

elif scalar_type == ResultSetScalarTypes.VALUE_DOUBLE:
scalar = float(value)

elif scalar_type == ResultSetScalarTypes.VALUE_ARRAY:
# array variable is introduced only for readability
scalar = array = value
for i in range(len(array)):
scalar[i] = await self.parse_scalar(array[i])

elif scalar_type == ResultSetScalarTypes.VALUE_NODE:
scalar = await self.parse_node(value)

elif scalar_type == ResultSetScalarTypes.VALUE_EDGE:
scalar = await self.parse_edge(value)

elif scalar_type == ResultSetScalarTypes.VALUE_PATH:
scalar = await self.parse_path(value)

elif scalar_type == ResultSetScalarTypes.VALUE_MAP:
scalar = await self.parse_map(value)

elif scalar_type == ResultSetScalarTypes.VALUE_POINT:
scalar = self.parse_point(value)

elif scalar_type == ResultSetScalarTypes.VALUE_UNKNOWN:
print("Unknown scalar type\n")

return scalar

async def parse_records(self, raw_result_set):
records = []
result_set = raw_result_set[1]
for row in result_set:
record = []
for idx, cell in enumerate(row):
if self.header[idx][0] == ResultSetColumnTypes.COLUMN_SCALAR: # noqa
record.append(await self.parse_scalar(cell))
elif self.header[idx][0] == ResultSetColumnTypes.COLUMN_NODE: # noqa
record.append(await self.parse_node(cell))
elif (
self.header[idx][0] == ResultSetColumnTypes.COLUMN_RELATION
): # noqa
record.append(await self.parse_edge(cell))
else:
print("Unknown column type.\n")
records.append(record)

return records

async def parse_results(self, raw_result_set):
self.header = self.parse_header(raw_result_set)

# Empty header.
if len(self.header) == 0:
return

self.result_set = await self.parse_records(raw_result_set)

async def parse_entity_properties(self, props):
# [[name, value type, value] X N]
properties = {}
for prop in props:
prop_name = await self.graph.get_property(prop[0])
prop_value = await self.parse_scalar(prop[1:])
properties[prop_name] = prop_value

return properties

async def parse_edge(self, cell):
# Edge ID (integer),
# reltype string offset (integer),
# src node ID offset (integer),
# dest node ID offset (integer),
# [[name, value, value type] X N]

edge_id = int(cell[0])
relation = await self.graph.get_relation(cell[1])
src_node_id = int(cell[2])
dest_node_id = int(cell[3])
properties = await self.parse_entity_properties(cell[4])
return Edge(
src_node_id, relation, dest_node_id, edge_id=edge_id, properties=properties
)

async def parse_path(self, cell):
nodes = await self.parse_scalar(cell[0])
edges = await self.parse_scalar(cell[1])
return Path(nodes, edges)

async def parse_map(self, cell):
m = OrderedDict()
n_entries = len(cell)

# A map is an array of key value pairs.
# 1. key (string)
# 2. array: (value type, value)
for i in range(0, n_entries, 2):
key = self.parse_string(cell[i])
m[key] = await self.parse_scalar(cell[i + 1])

return m
Loading