diff --git a/docs/examples/test.py b/docs/examples/test.py new file mode 100644 index 0000000000..9cb9a3af1d --- /dev/null +++ b/docs/examples/test.py @@ -0,0 +1,27 @@ +import redis.asyncio as redis +import asyncio +from redis.commands.json.path import Path as Jsonpath + +host = "127.0.0.1" +port = 46379 +tls = False +ttl = 300 + + +async def main(): + r = await redis.RedisCluster(host=host, port=port) + print(f"ping: {await r.ping()}") + + async with r.pipeline() as pipe: + set_json, set_expire = await ( + pipe + .json().set('test:test6', Jsonpath.root_path(), {'test': 'works'}, nx=True) # nx: if key not exists + .expire('test:test6', ttl) + .execute() + ) + assert set_json, "setting key failed" + assert set_expire, "setting expire failed" + print(f"get result: {await r.json().get('test:test6')}") + await r.close() + +asyncio.run(main()) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 8d34b9ad21..8f5b0f48ec 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -10,6 +10,7 @@ Generator, List, Mapping, + MutableMapping, Optional, Type, TypeVar, @@ -25,7 +26,7 @@ parse_url, ) from redis.asyncio.parser import CommandsParser -from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis +from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis, CaseInsensitiveDict from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, PRIMARY, @@ -37,7 +38,7 @@ get_node_name, parse_cluster_slots, ) -from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands +from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands, AsyncRedisModuleCommands from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, @@ -78,7 +79,7 @@ class ClusterParser(DefaultParser): ) -class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): +class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands, AsyncRedisModuleCommands): """ Create a new RedisCluster client. @@ -152,6 +153,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand - none of the `host`/`port` & `startup_nodes` were provided """ + response_callbacks: MutableMapping[Union[str, bytes], ResponseCallbackT] @classmethod def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": @@ -298,7 +300,10 @@ def __init__( # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect - kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() + kwargs["cluster_response_callbacks"] = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS) + self.cluster_response_callbacks = CaseInsensitiveDict( + self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS + ) self.connection_kwargs = kwargs if startup_nodes: @@ -324,7 +329,7 @@ def __init__( self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() self.command_flags = self.__class__.COMMAND_FLAGS.copy() - self.response_callbacks = kwargs["response_callbacks"] + self.cluster_response_callbacks = kwargs["cluster_response_callbacks"] self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy() self.result_callbacks[ "CLUSTER SLOTS" @@ -479,7 +484,7 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" - self.response_callbacks[command] = callback + self.cluster_response_callbacks[command] = callback async def _determine_nodes( self, command: str, *args: Any, node_flag: Optional[str] = None @@ -809,7 +814,7 @@ def __init__( self.max_connections = max_connections self.connection_class = connection_class self.connection_kwargs = connection_kwargs - self.response_callbacks = connection_kwargs.pop("response_callbacks", {}) + self.response_callbacks = connection_kwargs.pop("cluster_response_callbacks", {}) self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) @@ -1206,7 +1211,7 @@ async def close(self, attr: str = "nodes_cache") -> None: ) -class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): +class ClusterPipeline(RedisCluster): """ Create a new ClusterPipeline object. @@ -1245,9 +1250,21 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm __slots__ = ("_command_stack", "_client") - def __init__(self, client: RedisCluster) -> None: + def __init__( + self, + client: RedisCluster, + nodes_manager=None, + commands_parser=None, + result_callbacks=None, + startup_nodes=None, + read_from_replicas=False, + cluster_error_retry_attempts=5, + reinitialize_steps=10, + lock=None, + **kwargs, + ) -> None: self._client = client - + self.cluster_response_callbacks = self.RESPONSE_CALLBACKS self._command_stack: List["PipelineCommand"] = [] async def initialize(self) -> "ClusterPipeline": diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 7d55023e1e..2c75dd72c8 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -1,9 +1,11 @@ from json import JSONDecodeError, JSONDecoder, JSONEncoder import redis - +from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster +from ...asyncio.client import Pipeline as AsyncioPipeline +from ...asyncio.cluster import ClusterPipeline as AsyncioClusterPipeline from ..helpers import nativestr -from .commands import JSONCommands +from .commands import JSONCommands, AsyncJSONCommands from .decoders import bulk_of_jsons, decode_list @@ -131,3 +133,75 @@ class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline): class Pipeline(JSONCommands, redis.client.Pipeline): """Pipeline for the module.""" + + +class AsyncJSON(JSON, AsyncJSONCommands): + """ + Create a client for talking to json. + :param decoder: + :type json.JSONDecoder: An instance of json.JSONDecoder + :param encoder: + :type json.JSONEncoder: An instance of json.JSONEncoder + """ + + def _decode(self, obj): + if obj is None: + return obj + + try: + x = self.__decoder__.decode(obj) + if x is None: + raise TypeError + return x + except TypeError: + try: + return self.__decoder__.decode(obj.decode()) + except AttributeError: + return decode_list(obj) + except (AttributeError, JSONDecodeError): + return decode_list(obj) + + def _encode(self, obj): + return self.__encoder__.encode(obj) + + def pipeline(self, transaction=True, shard_hint=None): + """Creates a pipeline for the JSON module, that can be used for executing + JSON commands, as well as classic core commands. + Usage example: + r = redis.Redis() + pipe = r.json().pipeline() + pipe.jsonset('foo', '.', {'hello!': 'world'}) + pipe.jsonget('foo') + pipe.jsonget('notakey') + """ + if isinstance(self.client, AsyncRedisCluster): + p = AsyncioClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + lock=self.client._lock, + ) + + else: + p = AsyncioPipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p._encode = self._encode + p._decode = self._decode + return p + + +class AsyncClusterPipeline(AsyncJSONCommands, AsyncioClusterPipeline): + """Cluster pipeline for the module.""" + + +class AsyncPipeline(AsyncJSONCommands, AsyncioPipeline): + """Pipeline for the module.""" diff --git a/redis/commands/json/commands.py b/redis/commands/json/commands.py index 9391c2a2c1..2da5a9b326 100644 --- a/redis/commands/json/commands.py +++ b/redis/commands/json/commands.py @@ -385,3 +385,348 @@ def jsonmget(self, *args, **kwargs): ) def jsonset(self, *args, **kwargs): return self.set(*args, **kwargs) + +class AsyncJSONCommands: + """json commands.""" + + async def arrappend( + self, name: str, path: Optional[str] = Path.root_path(), *args: List[JsonType] + ) -> List[Union[int, None]]: + """Append the objects ``args`` to the array under the + ``path` in key ``name``. + For more information see `JSON.ARRAPPEND `_.. + """ # noqa + pieces = [name, str(path)] + for o in args: + pieces.append(self._encode(o)) + return await self.execute_command("JSON.ARRAPPEND", *pieces) + + async def arrindex( + self, + name: str, + path: str, + scalar: int, + start: Optional[int] = 0, + stop: Optional[int] = -1, + ) -> List[Union[int, None]]: + """ + Return the index of ``scalar`` in the JSON array under ``path`` at key + ``name``. + The search can be limited using the optional inclusive ``start`` + and exclusive ``stop`` indices. + For more information see `JSON.ARRINDEX `_. + """ # noqa + return await self.execute_command( + "JSON.ARRINDEX", name, str(path), self._encode(scalar), start, stop + ) + + async def arrinsert( + self, name: str, path: str, index: int, *args: List[JsonType] + ) -> List[Union[int, None]]: + """Insert the objects ``args`` to the array at index ``index`` + under the ``path` in key ``name``. + For more information see `JSON.ARRINSERT `_. + """ # noqa + pieces = [name, str(path), index] + for o in args: + pieces.append(self._encode(o)) + return await self.execute_command("JSON.ARRINSERT", *pieces) + + async def arrlen( + self, name: str, path: Optional[str] = Path.root_path() + ) -> List[Union[int, None]]: + """Return the length of the array JSON value under ``path`` + at key``name``. + For more information see `JSON.ARRLEN `_. + """ # noqa + return await self.execute_command("JSON.ARRLEN", name, str(path)) + + async def arrpop( + self, + name: str, + path: Optional[str] = Path.root_path(), + index: Optional[int] = -1, + ) -> List[Union[str, None]]: + + """Pop the element at ``index`` in the array JSON value under + ``path`` at key ``name``. + For more information see `JSON.ARRPOP `_. + """ # noqa + return await self.execute_command("JSON.ARRPOP", name, str(path), index) + + async def arrtrim( + self, name: str, path: str, start: int, stop: int + ) -> List[Union[int, None]]: + """Trim the array JSON value under ``path`` at key ``name`` to the + inclusive range given by ``start`` and ``stop``. + For more information see `JSON.ARRTRIM `_. + """ # noqa + return await self.execute_command("JSON.ARRTRIM", name, str(path), start, stop) + + async def type(self, name: str, path: Optional[str] = Path.root_path()) -> List[str]: + """Get the type of the JSON value under ``path`` from key ``name``. + For more information see `JSON.TYPE `_. + """ # noqa + return await self.execute_command("JSON.TYPE", name, str(path)) + + async def resp(self, name: str, path: Optional[str] = Path.root_path()) -> List: + """Return the JSON value under ``path`` at key ``name``. + For more information see `JSON.RESP `_. + """ # noqa + return await self.execute_command("JSON.RESP", name, str(path)) + + async def objkeys( + self, name: str, path: Optional[str] = Path.root_path() + ) -> List[Union[List[str], None]]: + """Return the key names in the dictionary JSON value under ``path`` at + key ``name``. + For more information see `JSON.OBJKEYS `_. + """ # noqa + return await self.execute_command("JSON.OBJKEYS", name, str(path)) + + async def objlen(self, name: str, path: Optional[str] = Path.root_path()) -> int: + """Return the length of the dictionary JSON value under ``path`` at key + ``name``. + For more information see `JSON.OBJLEN `_. + """ # noqa + return await self.execute_command("JSON.OBJLEN", name, str(path)) + + async def numincrby(self, name: str, path: str, number: int) -> str: + """Increment the numeric (integer or floating point) JSON value under + ``path`` at key ``name`` by the provided ``number``. + For more information see `JSON.NUMINCRBY `_. + """ # noqa + return await self.execute_command( + "JSON.NUMINCRBY", name, str(path), self._encode(number) + ) + + @deprecated(version="4.0.0", reason="deprecated since redisjson 1.0.0") + async def nummultby(self, name: str, path: str, number: int) -> str: + """Multiply the numeric (integer or floating point) JSON value under + ``path`` at key ``name`` with the provided ``number``. + For more information see `JSON.NUMMULTBY `_. + """ # noqa + return await self.execute_command( + "JSON.NUMMULTBY", name, str(path), self._encode(number) + ) + + async def clear(self, name: str, path: Optional[str] = Path.root_path()) -> int: + """Empty arrays and objects (to have zero slots/keys without deleting the + array/object). + Return the count of cleared paths (ignoring non-array and non-objects + paths). + For more information see `JSON.CLEAR `_. + """ # noqa + return await self.execute_command("JSON.CLEAR", name, str(path)) + + async def delete(self, key: str, path: Optional[str] = Path.root_path()) -> int: + """Delete the JSON value stored at key ``key`` under ``path``. + For more information see `JSON.DEL `_. + """ + return await self.execute_command("JSON.DEL", key, str(path)) + + # forget is an alias for delete + forget = delete + + async def get( + self, name: str, *args, no_escape: Optional[bool] = False + ) -> List[JsonType]: + """ + Get the object stored as a JSON value at key ``name``. + ``args`` is zero or more paths, and defaults to root path + ```no_escape`` is a boolean flag to add no_escape option to get + non-ascii characters + For more information see `JSON.GET `_. + """ # noqa + pieces = [name] + if no_escape: + pieces.append("noescape") + + if len(args) == 0: + pieces.append(Path.root_path()) + + else: + for p in args: + pieces.append(str(p)) + + # Handle case where key doesn't exist. The JSONDecoder would raise a + # TypeError exception since it can't decode None + try: + return await self.execute_command("JSON.GET", *pieces) + except TypeError: + return None + + async def mget(self, keys: List[str], path: str) -> List[JsonType]: + """ + Get the objects stored as a JSON values under ``path``. ``keys`` + is a list of one or more keys. + For more information see `JSON.MGET `_. + """ # noqa + pieces = [] + pieces += keys + pieces.append(str(path)) + return await self.execute_command("JSON.MGET", *pieces) + + async def set( + self, + name: str, + path: str, + obj: JsonType, + nx: Optional[bool] = False, + xx: Optional[bool] = False, + decode_keys: Optional[bool] = False, + ) -> Optional[str]: + """ + Set the JSON value at key ``name`` under the ``path`` to ``obj``. + ``nx`` if set to True, set ``value`` only if it does not exist. + ``xx`` if set to True, set ``value`` only if it exists. + ``decode_keys`` If set to True, the keys of ``obj`` will be decoded + with utf-8. + For the purpose of using this within a pipeline, this command is also + aliased to JSON.SET. + For more information see `JSON.SET `_. + """ + if decode_keys: + obj = decode_dict_keys(obj) + + pieces = [name, str(path), self._encode(obj)] + + # Handle existential modifiers + if nx and xx: + raise Exception( + "nx and xx are mutually exclusive: use one, the " + "other or neither - but not both" + ) + elif nx: + pieces.append("NX") + elif xx: + pieces.append("XX") + return await self.execute_command("JSON.SET", *pieces) + + async def set_file( + self, + name: str, + path: str, + file_name: str, + nx: Optional[bool] = False, + xx: Optional[bool] = False, + decode_keys: Optional[bool] = False, + ) -> Optional[str]: + """ + Set the JSON value at key ``name`` under the ``path`` to the content + of the json file ``file_name``. + ``nx`` if set to True, set ``value`` only if it does not exist. + ``xx`` if set to True, set ``value`` only if it exists. + ``decode_keys`` If set to True, the keys of ``obj`` will be decoded + with utf-8. + """ + + with open(file_name, "r") as fp: + file_content = loads(fp.read()) + + return await self.set(name, path, file_content, nx=nx, xx=xx, decode_keys=decode_keys) + + async def set_path( + self, + json_path: str, + root_folder: str, + nx: Optional[bool] = False, + xx: Optional[bool] = False, + decode_keys: Optional[bool] = False, + ) -> List[Dict[str, bool]]: + """ + Iterate over ``root_folder`` and set each JSON file to a value + under ``json_path`` with the file name as the key. + ``nx`` if set to True, set ``value`` only if it does not exist. + ``xx`` if set to True, set ``value`` only if it exists. + ``decode_keys`` If set to True, the keys of ``obj`` will be decoded + with utf-8. + """ + set_files_result = {} + for root, dirs, files in os.walk(root_folder): + for file in files: + file_path = os.path.join(root, file) + try: + file_name = file_path.rsplit(".")[0] + await self.set_file( + file_name, + json_path, + file_path, + nx=nx, + xx=xx, + decode_keys=decode_keys, + ) + set_files_result[file_path] = True + except JSONDecodeError: + set_files_result[file_path] = False + + return set_files_result + + async def strlen(self, name: str, path: Optional[str] = None) -> List[Union[int, None]]: + """Return the length of the string JSON value under ``path`` at key + ``name``. + For more information see `JSON.STRLEN `_. + """ # noqa + pieces = [name] + if path is not None: + pieces.append(str(path)) + return await self.execute_command("JSON.STRLEN", *pieces) + + async def toggle( + self, name: str, path: Optional[str] = Path.root_path() + ) -> Union[bool, List[Optional[int]]]: + """Toggle boolean value under ``path`` at key ``name``. + returning the new value. + For more information see `JSON.TOGGLE `_. + """ # noqa + return await self.execute_command("JSON.TOGGLE", name, str(path)) + + async def strappend( + self, name: str, value: str, path: Optional[int] = Path.root_path() + ) -> Union[int, List[Optional[int]]]: + """Append to the string JSON value. If two options are specified after + the key name, the path is determined to be the first. If a single + option is passed, then the root_path (i.e Path.root_path()) is used. + For more information see `JSON.STRAPPEND `_. + """ # noqa + pieces = [name, str(path), self._encode(value)] + return await self.execute_command("JSON.STRAPPEND", *pieces) + + async def debug( + self, + subcommand: str, + key: Optional[str] = None, + path: Optional[str] = Path.root_path(), + ) -> Union[int, List[str]]: + """Return the memory usage in bytes of a value under ``path`` from + key ``name``. + For more information see `JSON.DEBUG `_. + """ # noqa + valid_subcommands = ["MEMORY", "HELP"] + if subcommand not in valid_subcommands: + raise DataError("The only valid subcommands are ", str(valid_subcommands)) + pieces = [subcommand] + if subcommand == "MEMORY": + if key is None: + raise DataError("No key specified") + pieces.append(key) + pieces.append(str(path)) + return await self.execute_command("JSON.DEBUG", *pieces) + + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) + async def jsonget(self, *args, **kwargs): + return await self.get(*args, **kwargs) + + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) + async def jsonmget(self, *args, **kwargs): + return await self.mget(*args, **kwargs) + + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) + async def jsonset(self, *args, **kwargs): + return await self.set(*args, **kwargs) \ No newline at end of file diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py index 7e2045a722..3c023ab8db 100644 --- a/redis/commands/redismodules.py +++ b/redis/commands/redismodules.py @@ -101,3 +101,11 @@ def graph(self, index_name="idx"): g = AsyncGraph(client=self, name=index_name) return g + + def json(self, encoder=JSONEncoder(), decoder=JSONDecoder()): + """Access the json namespace, providing support for redis json.""" + + from .json import AsyncJSON + + jj = AsyncJSON(client=self, encoder=encoder, decoder=decoder) + return jj