-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Add support for async GRAPH module #2273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
5e48aac
f5ee75e
db0b650
224778f
fb2c74b
439f2ea
d7e4ea1
13941b8
e993e2c
efe7c7a
b12b025
e9f8447
8529170
8e2ea28
20fa54f
a44b952
4a83d67
115c259
fe93f41
34c5177
24413b1
e60ad1c
2e9cab8
85732f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
|
||
from .exceptions import VersionMismatchException | ||
from .execution_plan import ExecutionPlan | ||
from .query_result import QueryResult | ||
from .query_result import AsyncQueryResult, QueryResult | ||
|
||
|
||
class GraphCommands: | ||
|
@@ -211,3 +211,108 @@ def explain(self, query, params=None): | |
|
||
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) | ||
return ExecutionPlan(plan) | ||
|
||
|
||
class AsyncGraphCommands(GraphCommands): | ||
async def query(self, q, params=None, timeout=None, read_only=False, profile=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. type hints There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
""" | ||
Executes a query against the graph. | ||
For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa | ||
|
||
Args: | ||
|
||
q : str | ||
The query. | ||
params : dict | ||
Query parameters. | ||
timeout : int | ||
Maximum runtime for read queries in milliseconds. | ||
read_only : bool | ||
Executes a readonly query if set to True. | ||
profile : bool | ||
Return details on results produced by and time | ||
spent in each operation. | ||
""" | ||
|
||
# maintain original 'q' | ||
query = q | ||
|
||
# handle query parameters | ||
if params is not None: | ||
query = self._build_params_header(params) + query | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# construct query command | ||
# ask for compact result-set format | ||
# specify known graph version | ||
if profile: | ||
cmd = "GRAPH.PROFILE" | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY" | ||
command = [cmd, self.name, query, "--compact"] | ||
|
||
# include timeout is specified | ||
if timeout: | ||
if not isinstance(timeout, int): | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise Exception("Timeout argument must be a positive integer") | ||
command += ["timeout", timeout] | ||
|
||
# issue query | ||
try: | ||
response = await self.execute_command(*command) | ||
return await AsyncQueryResult().initialize(self, response, profile) | ||
except ResponseError as e: | ||
if "wrong number of arguments" in str(e): | ||
print( | ||
"Note: RedisGraph Python requires server version 2.2.8 or above" | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) # noqa | ||
if "unknown command" in str(e) and read_only: | ||
# `GRAPH.RO_QUERY` is unavailable in older versions. | ||
return await self.query(q, params, timeout, read_only=False) | ||
raise e | ||
except VersionMismatchException as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pattern for possible client side caching in the future... just a thought? |
||
# client view over the graph schema is out of sync | ||
# set client version and refresh local schema | ||
self.version = e.version | ||
self._refresh_schema() | ||
# re-issue query | ||
return await self.query(q, params, timeout, read_only) | ||
|
||
async def execution_plan(self, query, params=None): | ||
""" | ||
Get the execution plan for given query, | ||
GRAPH.EXPLAIN returns an array of operations. | ||
|
||
Args: | ||
query: the query that will be executed | ||
params: query parameters | ||
""" | ||
if params is not None: | ||
query = self._build_params_header(params) + query | ||
|
||
plan = await self.execute_command("GRAPH.EXPLAIN", self.name, query) | ||
if isinstance(plan[0], bytes): | ||
plan = [b.decode() for b in plan] | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return "\n".join(plan) | ||
|
||
async def explain(self, query, params=None): | ||
""" | ||
Get the execution plan for given query, | ||
GRAPH.EXPLAIN returns ExecutionPlan object. | ||
|
||
Args: | ||
query: the query that will be executed | ||
params: query parameters | ||
""" | ||
if params is not None: | ||
query = self._build_params_header(params) + query | ||
|
||
plan = await self.execute_command("GRAPH.EXPLAIN", self.name, query) | ||
return ExecutionPlan(plan) | ||
|
||
async def flush(self): | ||
""" | ||
Commit the graph and reset the edges and the nodes to zero length. | ||
""" | ||
await self.commit() | ||
self.nodes = {} | ||
self.edges = [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -360,3 +360,166 @@ def cached_execution(self): | |
@property | ||
def run_time_ms(self): | ||
return self._get_stat(INTERNAL_EXECUTION_TIME) | ||
chayim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class AsyncQueryResult(QueryResult): | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def __init__(self): | ||
pass | ||
|
||
async def initialize(self, graph, response, profile=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docstring + type hints There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type hints will be added in a separate PR. |
||
self.graph = graph | ||
self.header = [] | ||
self.result_set = [] | ||
|
||
# in case of an error an exception will be raised | ||
self._check_for_errors(response) | ||
|
||
if len(response) == 1: | ||
self.parse_statistics(response[0]) | ||
elif profile: | ||
chayim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.parse_profile(response) | ||
else: | ||
# start by parsing statistics, matches the one we have | ||
self.parse_statistics(response[-1]) # Last element. | ||
await self.parse_results(response) | ||
|
||
return self | ||
|
||
async def parse_node(self, cell): | ||
# Node ID (integer), | ||
# [label string offset (integer)], | ||
# [[name, value type, value] X N] | ||
|
||
node_id = int(cell[0]) | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
labels = None | ||
if len(cell[1]) > 0: | ||
labels = [] | ||
for inner_label in cell[1]: | ||
labels.append(await self.graph.get_label(inner_label)) | ||
properties = await self.parse_entity_properties(cell[2]) | ||
return Node(node_id=node_id, label=labels, properties=properties) | ||
|
||
async def parse_scalar(self, cell): | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
scalar_type = int(cell[0]) | ||
value = cell[1] | ||
scalar = None | ||
|
||
if scalar_type == ResultSetScalarTypes.VALUE_NULL: | ||
scalar = None | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_STRING: | ||
scalar = self.parse_string(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_INTEGER: | ||
scalar = int(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_BOOLEAN: | ||
value = value.decode() if isinstance(value, bytes) else value | ||
if value == "true": | ||
scalar = True | ||
elif value == "false": | ||
scalar = False | ||
else: | ||
print("Unknown boolean type\n") | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_DOUBLE: | ||
scalar = float(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_ARRAY: | ||
# array variable is introduced only for readability | ||
scalar = array = value | ||
for i in range(len(array)): | ||
scalar[i] = await self.parse_scalar(array[i]) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_NODE: | ||
scalar = await self.parse_node(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_EDGE: | ||
scalar = await self.parse_edge(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_PATH: | ||
scalar = await self.parse_path(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_MAP: | ||
scalar = await self.parse_map(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_POINT: | ||
scalar = self.parse_point(value) | ||
|
||
elif scalar_type == ResultSetScalarTypes.VALUE_UNKNOWN: | ||
print("Unknown scalar type\n") | ||
|
||
return scalar | ||
|
||
async def parse_records(self, raw_result_set): | ||
records = [] | ||
result_set = raw_result_set[1] | ||
for row in result_set: | ||
chayim marked this conversation as resolved.
Show resolved
Hide resolved
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
record = [] | ||
for idx, cell in enumerate(row): | ||
if self.header[idx][0] == ResultSetColumnTypes.COLUMN_SCALAR: # noqa | ||
record.append(await self.parse_scalar(cell)) | ||
elif self.header[idx][0] == ResultSetColumnTypes.COLUMN_NODE: # noqa | ||
record.append(await self.parse_node(cell)) | ||
elif ( | ||
self.header[idx][0] == ResultSetColumnTypes.COLUMN_RELATION | ||
): # noqa | ||
record.append(await self.parse_edge(cell)) | ||
else: | ||
print("Unknown column type.\n") | ||
records.append(record) | ||
|
||
return records | ||
|
||
async def parse_results(self, raw_result_set): | ||
self.header = self.parse_header(raw_result_set) | ||
|
||
# Empty header. | ||
if len(self.header) == 0: | ||
return | ||
|
||
self.result_set = await self.parse_records(raw_result_set) | ||
|
||
async def parse_entity_properties(self, props): | ||
# [[name, value type, value] X N] | ||
properties = {} | ||
for prop in props: | ||
prop_name = await self.graph.get_property(prop[0]) | ||
prop_value = await self.parse_scalar(prop[1:]) | ||
properties[prop_name] = prop_value | ||
|
||
return properties | ||
|
||
async def parse_edge(self, cell): | ||
# Edge ID (integer), | ||
# reltype string offset (integer), | ||
# src node ID offset (integer), | ||
# dest node ID offset (integer), | ||
# [[name, value, value type] X N] | ||
|
||
edge_id = int(cell[0]) | ||
dvora-h marked this conversation as resolved.
Show resolved
Hide resolved
|
||
relation = await self.graph.get_relation(cell[1]) | ||
src_node_id = int(cell[2]) | ||
dest_node_id = int(cell[3]) | ||
properties = await self.parse_entity_properties(cell[4]) | ||
return Edge( | ||
src_node_id, relation, dest_node_id, edge_id=edge_id, properties=properties | ||
) | ||
|
||
async def parse_path(self, cell): | ||
nodes = await self.parse_scalar(cell[0]) | ||
edges = await self.parse_scalar(cell[1]) | ||
return Path(nodes, edges) | ||
|
||
async def parse_map(self, cell): | ||
m = OrderedDict() | ||
n_entries = len(cell) | ||
|
||
# A map is an array of key value pairs. | ||
# 1. key (string) | ||
# 2. array: (value type, value) | ||
for i in range(0, n_entries, 2): | ||
key = self.parse_string(cell[i]) | ||
m[key] = await self.parse_scalar(cell[i + 1]) | ||
|
||
return m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstrings... every function please