Skip to content

support json() +with pipeline in asyncio.cluster mode #2423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/examples/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import redis.asyncio as redis
import asyncio
from redis.commands.json.path import Path as Jsonpath

host = "127.0.0.1"
port = 46379
tls = False
ttl = 300


async def main():
r = await redis.RedisCluster(host=host, port=port)
print(f"ping: {await r.ping()}")

async with r.pipeline() as pipe:
set_json, set_expire = await (
pipe
.json().set('test:test6', Jsonpath.root_path(), {'test': 'works'}, nx=True) # nx: if key not exists
.expire('test:test6', ttl)
.execute()
)
assert set_json, "setting key failed"
assert set_expire, "setting expire failed"
print(f"get result: {await r.json().get('test:test6')}")
await r.close()

asyncio.run(main())
37 changes: 27 additions & 10 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Generator,
List,
Mapping,
MutableMapping,
Optional,
Type,
TypeVar,
Expand All @@ -25,7 +26,7 @@
parse_url,
)
from redis.asyncio.parser import CommandsParser
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis, CaseInsensitiveDict
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
PRIMARY,
Expand All @@ -37,7 +38,7 @@
get_node_name,
parse_cluster_slots,
)
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands, AsyncRedisModuleCommands
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
Expand Down Expand Up @@ -78,7 +79,7 @@ class ClusterParser(DefaultParser):
)


class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands, AsyncRedisModuleCommands):
"""
Create a new RedisCluster client.

Expand Down Expand Up @@ -152,6 +153,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
- none of the `host`/`port` & `startup_nodes` were provided

"""
response_callbacks: MutableMapping[Union[str, bytes], ResponseCallbackT]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is defined but never initialized?


@classmethod
def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
Expand Down Expand Up @@ -298,7 +300,10 @@ def __init__(
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect

kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
kwargs["cluster_response_callbacks"] = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of renaming this variable? Also, what is the advantage of using CaseInsensitiveDict? It is significantly slower that a normal dict due to the case conversion for each call.

self.cluster_response_callbacks = CaseInsensitiveDict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always get overwritten on line 332.

self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
)
self.connection_kwargs = kwargs

if startup_nodes:
Expand All @@ -324,7 +329,7 @@ def __init__(
self.commands_parser = CommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
self.response_callbacks = kwargs["response_callbacks"]
self.cluster_response_callbacks = kwargs["cluster_response_callbacks"]
self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy()
self.result_callbacks[
"CLUSTER SLOTS"
Expand Down Expand Up @@ -479,7 +484,7 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:

def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
self.response_callbacks[command] = callback
self.cluster_response_callbacks[command] = callback

async def _determine_nodes(
self, command: str, *args: Any, node_flag: Optional[str] = None
Expand Down Expand Up @@ -809,7 +814,7 @@ def __init__(
self.max_connections = max_connections
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.response_callbacks = connection_kwargs.pop("response_callbacks", {})
self.response_callbacks = connection_kwargs.pop("cluster_response_callbacks", {})

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
Expand Down Expand Up @@ -1206,7 +1211,7 @@ async def close(self, attr: str = "nodes_cache") -> None:
)


class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
class ClusterPipeline(RedisCluster):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the bases for this class changed? Inheriting directly from the RedisCluster class could lead to collisions in the namespace

"""
Create a new ClusterPipeline object.

Expand Down Expand Up @@ -1245,9 +1250,21 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm

__slots__ = ("_command_stack", "_client")

def __init__(self, client: RedisCluster) -> None:
def __init__(
self,
client: RedisCluster,
nodes_manager=None,
Copy link
Contributor

@utkarshgupta137 utkarshgupta137 Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new kwargs are not used anywhere?

commands_parser=None,
result_callbacks=None,
startup_nodes=None,
read_from_replicas=False,
cluster_error_retry_attempts=5,
reinitialize_steps=10,
lock=None,
**kwargs,
) -> None:
self._client = client

self.cluster_response_callbacks = self.RESPONSE_CALLBACKS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used anywhere?

self._command_stack: List["PipelineCommand"] = []

async def initialize(self) -> "ClusterPipeline":
Expand Down
78 changes: 76 additions & 2 deletions redis/commands/json/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from json import JSONDecodeError, JSONDecoder, JSONEncoder

import redis

from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
from ...asyncio.client import Pipeline as AsyncioPipeline
from ...asyncio.cluster import ClusterPipeline as AsyncioClusterPipeline
from ..helpers import nativestr
from .commands import JSONCommands
from .commands import JSONCommands, AsyncJSONCommands
from .decoders import bulk_of_jsons, decode_list


Expand Down Expand Up @@ -131,3 +133,75 @@ class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline):

class Pipeline(JSONCommands, redis.client.Pipeline):
"""Pipeline for the module."""


class AsyncJSON(JSON, AsyncJSONCommands):
"""
Create a client for talking to json.
:param decoder:
:type json.JSONDecoder: An instance of json.JSONDecoder
:param encoder:
:type json.JSONEncoder: An instance of json.JSONEncoder
"""

def _decode(self, obj):
if obj is None:
return obj

try:
x = self.__decoder__.decode(obj)
if x is None:
raise TypeError
return x
except TypeError:
try:
return self.__decoder__.decode(obj.decode())
except AttributeError:
return decode_list(obj)
except (AttributeError, JSONDecodeError):
return decode_list(obj)

def _encode(self, obj):
return self.__encoder__.encode(obj)

def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the JSON module, that can be used for executing
JSON commands, as well as classic core commands.
Usage example:
r = redis.Redis()
pipe = r.json().pipeline()
pipe.jsonset('foo', '.', {'hello!': 'world'})
pipe.jsonget('foo')
pipe.jsonget('notakey')
"""
if isinstance(self.client, AsyncRedisCluster):
p = AsyncioClusterPipeline(
nodes_manager=self.client.nodes_manager,
commands_parser=self.client.commands_parser,
startup_nodes=self.client.nodes_manager.startup_nodes,
result_callbacks=self.client.result_callbacks,
cluster_response_callbacks=self.client.cluster_response_callbacks,
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
read_from_replicas=self.client.read_from_replicas,
reinitialize_steps=self.client.reinitialize_steps,
lock=self.client._lock,
)

else:
p = AsyncioPipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
p._encode = self._encode
p._decode = self._decode
return p


class AsyncClusterPipeline(AsyncJSONCommands, AsyncioClusterPipeline):
"""Cluster pipeline for the module."""


class AsyncPipeline(AsyncJSONCommands, AsyncioPipeline):
"""Pipeline for the module."""
Loading