Skip to content

Commit f260efe

Browse files
committed
resolve conflict
Signed-off-by: wiseaidev <[email protected]>
2 parents 3b07e3f + a00a68b commit f260efe

15 files changed

+495
-709
lines changed

aredis_om/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .async_redis import redis # isort:skip
12
from .checks import has_redis_json, has_redisearch
23
from .connections import get_redis_connection
34
from .model.migrations.migrator import MigrationError, Migrator

aredis_om/async_redis.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from redis import asyncio as redis

aredis_om/connections.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
import os
22

3-
import aioredis
3+
from . import redis
44

55

66
URL = os.environ.get("REDIS_OM_URL", None)
77

88

9-
def get_redis_connection(**kwargs) -> aioredis.Redis:
9+
def get_redis_connection(**kwargs) -> redis.Redis:
1010
# If someone passed in a 'url' parameter, or specified a REDIS_OM_URL
1111
# environment variable, we'll create the Redis client from the URL.
1212
url = kwargs.pop("url", URL)
1313
if url:
14-
return aioredis.Redis.from_url(url, **kwargs)
14+
return redis.Redis.from_url(url, **kwargs)
1515

1616
# Decode from UTF-8 by default
1717
if "decode_responses" not in kwargs:
1818
kwargs["decode_responses"] = True
19-
return aioredis.Redis(**kwargs)
19+
return redis.Redis(**kwargs)

aredis_om/model/migrations/migrator.py

+20-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from enum import Enum
55
from typing import List, Optional
66

7-
from aioredis import Redis, ResponseError
7+
from ... import redis
88

99

1010
log = logging.getLogger(__name__)
@@ -39,18 +39,19 @@ def schema_hash_key(index_name):
3939
return f"{index_name}:hash"
4040

4141

42-
async def create_index(redis: Redis, index_name, schema, current_hash):
43-
db_number = redis.connection_pool.connection_kwargs.get("db")
42+
async def create_index(conn: redis.Redis, index_name, schema, current_hash):
43+
db_number = conn.connection_pool.connection_kwargs.get("db")
4444
if db_number and db_number > 0:
4545
raise MigrationError(
4646
"Creating search indexes is only supported in database 0. "
4747
f"You attempted to create an index in database {db_number}"
4848
)
4949
try:
50-
await redis.execute_command(f"ft.info {index_name}")
51-
except ResponseError:
52-
await redis.execute_command(f"ft.create {index_name} {schema}")
53-
await redis.set(schema_hash_key(index_name), current_hash)
50+
await conn.execute_command(f"ft.info {index_name}")
51+
except redis.ResponseError:
52+
await conn.execute_command(f"ft.create {index_name} {schema}")
53+
# TODO: remove "type: ignore" when type stubs will be fixed
54+
await conn.set(schema_hash_key(index_name), current_hash) # type: ignore
5455
else:
5556
log.info("Index already exists, skipping. Index hash: %s", index_name)
5657

@@ -67,7 +68,7 @@ class IndexMigration:
6768
schema: str
6869
hash: str
6970
action: MigrationAction
70-
redis: Redis
71+
conn: redis.Redis
7172
previous_hash: Optional[str] = None
7273

7374
async def run(self):
@@ -78,14 +79,14 @@ async def run(self):
7879

7980
async def create(self):
8081
try:
81-
await create_index(self.redis, self.index_name, self.schema, self.hash)
82-
except ResponseError:
82+
await create_index(self.conn, self.index_name, self.schema, self.hash)
83+
except redis.ResponseError:
8384
log.info("Index already exists: %s", self.index_name)
8485

8586
async def drop(self):
8687
try:
87-
await self.redis.execute_command(f"FT.DROPINDEX {self.index_name}")
88-
except ResponseError:
88+
await self.conn.execute_command(f"FT.DROPINDEX {self.index_name}")
89+
except redis.ResponseError:
8990
log.info("Index does not exist: %s", self.index_name)
9091

9192

@@ -105,7 +106,7 @@ async def detect_migrations(self):
105106

106107
for name, cls in model_registry.items():
107108
hash_key = schema_hash_key(cls.Meta.index_name)
108-
redis = cls.db()
109+
conn = cls.db()
109110
try:
110111
schema = cls.redisearch_schema()
111112
except NotImplementedError:
@@ -114,21 +115,21 @@ async def detect_migrations(self):
114115
current_hash = hashlib.sha1(schema.encode("utf-8")).hexdigest() # nosec
115116

116117
try:
117-
await redis.execute_command("ft.info", cls.Meta.index_name)
118-
except ResponseError:
118+
await conn.execute_command("ft.info", cls.Meta.index_name)
119+
except redis.ResponseError:
119120
self.migrations.append(
120121
IndexMigration(
121122
name,
122123
cls.Meta.index_name,
123124
schema,
124125
current_hash,
125126
MigrationAction.CREATE,
126-
redis,
127+
conn,
127128
)
128129
)
129130
continue
130131

131-
stored_hash = await redis.get(hash_key)
132+
stored_hash = await conn.get(hash_key)
132133
schema_out_of_date = current_hash != stored_hash
133134

134135
if schema_out_of_date:
@@ -140,7 +141,7 @@ async def detect_migrations(self):
140141
schema,
141142
current_hash,
142143
MigrationAction.DROP,
143-
redis,
144+
conn,
144145
stored_hash,
145146
)
146147
)
@@ -151,7 +152,7 @@ async def detect_migrations(self):
151152
schema,
152153
current_hash,
153154
MigrationAction.CREATE,
154-
redis,
155+
conn,
155156
stored_hash,
156157
)
157158
)

aredis_om/model/model.py

+59-31
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
no_type_check,
2525
)
2626

27-
import aioredis
28-
from aioredis.client import Pipeline
27+
from more_itertools import ichunked
2928
from pydantic import BaseModel, validator
3029
from pydantic.fields import FieldInfo as PydanticFieldInfo
3130
from pydantic.fields import ModelField, Undefined, UndefinedType
@@ -35,9 +34,10 @@
3534
from typing_extensions import Protocol, get_args, get_origin
3635
from ulid import ULID
3736

37+
from .. import redis
3838
from ..checks import has_redis_json, has_redisearch
3939
from ..connections import get_redis_connection
40-
from ..unasync_util import ASYNC_MODE
40+
from ..util import ASYNC_MODE
4141
from .encoders import jsonable_encoder
4242
from .render_tree import render_tree
4343
from .token_escaper import TokenEscaper
@@ -760,6 +760,9 @@ async def all(self, batch_size=DEFAULT_PAGE_SIZE):
760760
return await query.execute()
761761
return await self.execute()
762762

763+
async def page(self, offset=0, limit=10):
764+
return await self.copy(offset=offset, limit=limit).execute()
765+
763766
def sort_by(self, *fields: str):
764767
if not fields:
765768
return self
@@ -975,7 +978,7 @@ class BaseMeta(Protocol):
975978
global_key_prefix: str
976979
model_key_prefix: str
977980
primary_key_pattern: str
978-
database: aioredis.Redis
981+
database: redis.Redis
979982
primary_key: PrimaryKey
980983
primary_key_creator_cls: Type[PrimaryKeyCreator]
981984
index_name: str
@@ -994,7 +997,7 @@ class DefaultMeta:
994997
global_key_prefix: Optional[str] = None
995998
model_key_prefix: Optional[str] = None
996999
primary_key_pattern: Optional[str] = None
997-
database: Optional[aioredis.Redis] = None
1000+
database: Optional[redis.Redis] = None
9981001
primary_key: Optional[PrimaryKey] = None
9991002
primary_key_creator_cls: Optional[Type[PrimaryKeyCreator]] = None
10001003
index_name: Optional[str] = None
@@ -1115,9 +1118,17 @@ def key(self):
11151118
return self.make_primary_key(pk)
11161119

11171120
@classmethod
1118-
async def delete(cls, pk: Any) -> int:
1121+
async def _delete(cls, db, *pks):
1122+
return await db.delete(*pks)
1123+
1124+
@classmethod
1125+
async def delete(
1126+
cls, pk: Any, pipeline: Optional[redis.client.Pipeline] = None
1127+
) -> int:
11191128
"""Delete data at this key."""
1120-
return await cls.db().delete(cls.make_primary_key(pk))
1129+
db = cls._get_db(pipeline)
1130+
1131+
return await cls._delete(db, cls.make_primary_key(pk))
11211132

11221133
@classmethod
11231134
async def get(cls, pk: Any) -> "RedisModel":
@@ -1127,14 +1138,15 @@ async def update(self, **field_values):
11271138
"""Update this model instance with the specified key-value pairs."""
11281139
raise NotImplementedError
11291140

1130-
async def save(self, pipeline: Optional[Pipeline] = None) -> "RedisModel":
1141+
async def save(
1142+
self, pipeline: Optional[redis.client.Pipeline] = None
1143+
) -> "RedisModel":
11311144
raise NotImplementedError
11321145

1133-
async def expire(self, num_seconds: int, pipeline: Optional[Pipeline] = None):
1134-
if pipeline is None:
1135-
db = self.db()
1136-
else:
1137-
db = pipeline
1146+
async def expire(
1147+
self, num_seconds: int, pipeline: Optional[redis.client.Pipeline] = None
1148+
):
1149+
db = self._get_db(pipeline)
11381150

11391151
# TODO: Wrap any Redis response errors in a custom exception?
11401152
await db.expire(self.make_primary_key(self.pk), num_seconds)
@@ -1223,19 +1235,10 @@ def get_annotations(cls):
12231235
async def add(
12241236
cls,
12251237
models: Sequence["RedisModel"],
1226-
pipeline: Optional[Pipeline] = None,
1238+
pipeline: Optional[redis.client.Pipeline] = None,
12271239
pipeline_verifier: Callable[..., Any] = verify_pipeline_response,
12281240
) -> Sequence["RedisModel"]:
1229-
if pipeline is None:
1230-
# By default, send commands in a pipeline. Saving each model will
1231-
# be atomic, but Redis may process other commands in between
1232-
# these saves.
1233-
db = cls.db().pipeline(transaction=False)
1234-
else:
1235-
# If the user gave us a pipeline, add our commands to that. The user
1236-
# will be responsible for executing the pipeline after they've accumulated
1237-
# the commands they want to send.
1238-
db = pipeline
1241+
db = cls._get_db(pipeline, bulk=True)
12391242

12401243
for model in models:
12411244
# save() just returns the model, we don't need that here.
@@ -1249,6 +1252,31 @@ async def add(
12491252

12501253
return models
12511254

1255+
@classmethod
1256+
def _get_db(
1257+
self, pipeline: Optional[redis.client.Pipeline] = None, bulk: bool = False
1258+
):
1259+
if pipeline is not None:
1260+
return pipeline
1261+
elif bulk:
1262+
return self.db().pipeline(transaction=False)
1263+
else:
1264+
return self.db()
1265+
1266+
@classmethod
1267+
async def delete_many(
1268+
cls,
1269+
models: Sequence["RedisModel"],
1270+
pipeline: Optional[redis.client.Pipeline] = None,
1271+
) -> int:
1272+
db = cls._get_db(pipeline)
1273+
1274+
for chunk in ichunked(models, 100):
1275+
pks = [cls.make_primary_key(model.pk) for model in chunk]
1276+
await cls._delete(db, *pks)
1277+
1278+
return len(models)
1279+
12521280
@classmethod
12531281
def redisearch_schema(cls):
12541282
raise NotImplementedError
@@ -1283,11 +1311,11 @@ def __init_subclass__(cls, **kwargs):
12831311
f"HashModels cannot index dataclass fields. Field: {name}"
12841312
)
12851313

1286-
def dict(self) -> Dict[str, Any]:
1314+
def dict(self) -> Dict[str, Any]: # type: ignore
12871315
# restore none values
12881316
return dict(self)
12891317

1290-
async def save(self, pipeline: Optional[Pipeline] = None) -> "HashModel":
1318+
async def save(self, pipeline: Optional[redis.client.Pipeline] = None) -> "HashModel":
12911319
self.check()
12921320
if pipeline is None:
12931321
db = self.db()
@@ -1461,12 +1489,12 @@ def __init__(self, *args, **kwargs):
14611489
)
14621490
super().__init__(*args, **kwargs)
14631491

1464-
async def save(self, pipeline: Optional[Pipeline] = None) -> "JsonModel":
1492+
async def save(
1493+
self, pipeline: Optional[redis.client.Pipeline] = None
1494+
) -> "JsonModel":
14651495
self.check()
1466-
if pipeline is None:
1467-
db = self.db()
1468-
else:
1469-
db = pipeline
1496+
db = self._get_db(pipeline)
1497+
14701498
# TODO: Wrap response errors in a custom exception?
14711499
await db.execute_command("JSON.SET", self.key(), ".", self.json())
14721500
return self

aredis_om/sync_redis.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
import redis

aredis_om/unasync_util.py

-41
This file was deleted.

aredis_om/util.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import inspect
2+
3+
4+
def is_async_mode():
5+
async def f():
6+
"""Unasync transforms async functions in sync functions"""
7+
return None
8+
9+
return inspect.iscoroutinefunction(f)
10+
11+
12+
ASYNC_MODE = is_async_mode()

0 commit comments

Comments
 (0)