Skip to content

Commit 3323079

Browse files
authored
bugfix: handle mixture of read_reply and other commands. (#195)
1 parent 057093e commit 3323079

File tree

2 files changed

+130
-1
lines changed

2 files changed

+130
-1
lines changed

lib/resty/redis.lua

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
local sub = string.sub
55
local byte = string.byte
6+
local tab_insert = table.insert
7+
local tab_remove = table.remove
68
local tcp = ngx.socket.tcp
79
local null = ngx.null
810
local type = type
@@ -306,6 +308,12 @@ local function _gen_req(args)
306308
end
307309

308310

311+
local function _check_msg(self, res)
312+
return rawget(self, "_subscribed") and
313+
type(res) == "table" and res[1] == "message"
314+
end
315+
316+
309317
local function _do_cmd(self, ...)
310318
local args = {...}
311319

@@ -329,7 +337,17 @@ local function _do_cmd(self, ...)
329337
return nil, err
330338
end
331339

332-
return _read_reply(self, sock)
340+
local res, err = _read_reply(self, sock)
341+
while _check_msg(self, res) do
342+
if rawget(self, "_buffered_msg") == nil then
343+
self._buffered_msg = new_tab(1, 0)
344+
end
345+
346+
tab_insert(self._buffered_msg, res)
347+
res, err = _read_reply(self, sock)
348+
end
349+
350+
return res, err
333351
end
334352

335353

@@ -339,6 +357,8 @@ local function _check_subscribed(self, res)
339357
and res[3] == 0
340358
then
341359
self._subscribed = false
360+
-- FIXME: support multiple subscriptions in the next PR
361+
self._buffered_msg = nil
342362
end
343363
end
344364

@@ -353,6 +373,18 @@ function _M.read_reply(self)
353373
return nil, "not subscribed"
354374
end
355375

376+
local buffered_msg = rawget(self, "_buffered_msg")
377+
if buffered_msg then
378+
local msg = buffered_msg[1]
379+
tab_remove(buffered_msg, 1)
380+
381+
if #buffered_msg == 0 then
382+
self._buffered_msg = nil
383+
end
384+
385+
return msg
386+
end
387+
356388
local res, err = _read_reply(self, sock)
357389
_check_subscribed(self, res)
358390

t/pubsub.t

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,3 +556,100 @@ GET /t
556556

557557
--- no_error_log
558558
[error]
559+
560+
561+
562+
=== TEST 8: mix read_reply and other commands
563+
--- http_config eval: $::HttpConfig
564+
--- config
565+
location /t {
566+
lua_socket_log_errors off;
567+
content_by_lua '
568+
local cjson = require "cjson"
569+
local redis = require "resty.redis"
570+
571+
local red = redis:new()
572+
local red2 = redis:new()
573+
574+
red:set_timeout(1000) -- 1 sec
575+
red2:set_timeout(1000) -- 1 sec
576+
577+
local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
578+
if not ok then
579+
ngx.say("1: failed to connect: ", err)
580+
return
581+
end
582+
583+
ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
584+
if not ok then
585+
ngx.say("2: failed to connect: ", err)
586+
return
587+
end
588+
589+
res, err = red:subscribe("dog")
590+
if not res then
591+
ngx.say("1: failed to subscribe: ", err)
592+
return
593+
end
594+
595+
res, err = red2:publish("dog", "Hello")
596+
if not res then
597+
ngx.say("2: failed to publish: ", err)
598+
return
599+
end
600+
601+
res, err = red:ping()
602+
if not res then
603+
ngx.say("1: failed to subscribe: ", err)
604+
return
605+
end
606+
607+
res, err = red2:publish("dog", "World")
608+
if not res then
609+
ngx.say("2: failed to publish: ", err)
610+
return
611+
end
612+
613+
res, err = red:read_reply()
614+
if not res then
615+
ngx.say("1: failed to read reply: ", err)
616+
else
617+
ngx.say("1: receive: ", cjson.encode(res))
618+
end
619+
620+
res, err = red:read_reply()
621+
if not res then
622+
ngx.say("1: failed to read reply: ", err)
623+
else
624+
ngx.say("1: receive: ", cjson.encode(res))
625+
end
626+
627+
res, err = red:unsubscribe()
628+
if not res then
629+
ngx.say("1: failed to unscribe: ", err)
630+
else
631+
ngx.say("1: unsubscribe: ", cjson.encode(res))
632+
end
633+
634+
red:set_timeout(1) -- 1s
635+
res, err = red:read_reply()
636+
if not res then
637+
ngx.say("1: failed to read reply: ", err)
638+
else
639+
ngx.say("1: receive: ", cjson.encode(res))
640+
end
641+
642+
red:close()
643+
red2:close()
644+
';
645+
}
646+
--- request
647+
GET /t
648+
--- response_body_like chop
649+
1: receive: \["message","dog","Hello"\]
650+
1: receive: \["message","dog","World"\]
651+
1: unsubscribe: \["unsubscribe","dog",0\]
652+
1: failed to read reply: not subscribed$
653+
654+
--- no_error_log
655+
[error]

0 commit comments

Comments
 (0)