diff --git a/.travis.yml b/.travis.yml index 95f42b1275..599c9d7dc6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,9 @@ dist: xenial +branches: + only: + - "master" + os: linux language: c diff --git a/src/ngx_http_lua_bodyfilterby.c b/src/ngx_http_lua_bodyfilterby.c index 75608695e1..a85de31689 100644 --- a/src/ngx_http_lua_bodyfilterby.c +++ b/src/ngx_http_lua_bodyfilterby.c @@ -235,18 +235,15 @@ ngx_http_lua_body_filter(ngx_http_request_t *r, ngx_chain_t *in) uint16_t old_context; ngx_http_cleanup_t *cln; ngx_chain_t *out; + ngx_chain_t *cl, *ln; ngx_http_lua_main_conf_t *lmcf; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua body filter for user lua code, uri \"%V\"", &r->uri); - if (in == NULL) { - return ngx_http_next_body_filter(r, in); - } - llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); - if (llcf->body_filter_handler == NULL) { + if (llcf->body_filter_handler == NULL || r->header_only) { dd("no body filter handler found"); return ngx_http_next_body_filter(r, in); } @@ -269,7 +266,48 @@ ngx_http_lua_body_filter(ngx_http_request_t *r, ngx_chain_t *in) in->buf->file_pos = in->buf->file_last; } - return NGX_OK; + in = NULL; + + /* continue to call ngx_http_next_body_filter to process cached data */ + } + + if (in != NULL + && ngx_chain_add_copy(r->pool, &ctx->filter_in_bufs, in) != NGX_OK) { + return NGX_ERROR; + } + + if (ctx->filter_busy_bufs != NULL + && (r->connection->buffered + & (NGX_HTTP_LOWLEVEL_BUFFERED | NGX_LOWLEVEL_BUFFERED))) + { + /* Socket write buffer was full on last write. + * Try to write the remain data, if still can not write + * do not execute body_filter_by_lua otherwise the `in` chain will be + * replaced by new content from lua and buf of `in` mark as consumed. + * And then ngx_output_chain will call the filter chain again which + * make all the data cached in the memory and long ngx_chain_t link + * cause CPU 100%. + */ + rc = ngx_http_next_body_filter(r, NULL); + + if (rc == NGX_ERROR) { + return rc; + } + + out = NULL; + ngx_chain_update_chains(r->pool, + &ctx->free_bufs, &ctx->filter_busy_bufs, &out, + (ngx_buf_tag_t) &ngx_http_lua_module); + if (rc != NGX_OK + && ctx->filter_busy_bufs != NULL + && (r->connection->buffered + & (NGX_HTTP_LOWLEVEL_BUFFERED | NGX_LOWLEVEL_BUFFERED))) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "waiting body filter busy buffer to be sent"); + return NGX_AGAIN; + } + + /* continue to process bufs in ctx->filter_in_bufs */ } if (ctx->cleanup == NULL) { @@ -286,38 +324,57 @@ ngx_http_lua_body_filter(ngx_http_request_t *r, ngx_chain_t *in) old_context = ctx->context; ctx->context = NGX_HTTP_LUA_CONTEXT_BODY_FILTER; - dd("calling body filter handler"); - rc = llcf->body_filter_handler(r, in); + in = ctx->filter_in_bufs; + ctx->filter_in_bufs = NULL; - dd("calling body filter handler returned %d", (int) rc); + if (in != NULL) { + dd("calling body filter handler"); + rc = llcf->body_filter_handler(r, in); - ctx->context = old_context; + dd("calling body filter handler returned %d", (int) rc); - if (rc != NGX_OK) { - return NGX_ERROR; - } + ctx->context = old_context; - lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); - out = lmcf->body_filter_chain; + if (rc != NGX_OK) { + return NGX_ERROR; + } - if (in == out) { - return ngx_http_next_body_filter(r, in); - } + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + + /* lmcf->body_filter_chain is the new buffer chain if + * body_filter_by_lua set new body content via ngx.arg[1] = new_content + * otherwise it is the original `in` buffer chain. + */ + out = lmcf->body_filter_chain; + + if (in != out) { + /* content of body was replaced in + * ngx_http_lua_body_filter_param_set and the buffers was marked + * as consumed. + */ + for (cl = in; cl != NULL; cl = ln) { + ln = cl->next; + ngx_free_chain(r->pool, cl); + } - if (out == NULL) { - /* do not forward NULL to the next filters because the input is - * not NULL */ - return NGX_OK; + if (out == NULL) { + /* do not forward NULL to the next filters because the input is + * not NULL */ + return NGX_OK; + } + } + + } else { + out = NULL; } - /* in != out */ rc = ngx_http_next_body_filter(r, out); if (rc == NGX_ERROR) { return NGX_ERROR; } ngx_chain_update_chains(r->pool, - &ctx->free_bufs, &ctx->busy_bufs, &out, + &ctx->free_bufs, &ctx->filter_busy_bufs, &out, (ngx_buf_tag_t) &ngx_http_lua_module); return rc; diff --git a/src/ngx_http_lua_common.h b/src/ngx_http_lua_common.h index 027ff71044..b3b6f59a0f 100644 --- a/src/ngx_http_lua_common.h +++ b/src/ngx_http_lua_common.h @@ -548,6 +548,9 @@ typedef struct ngx_http_lua_ctx_s { ngx_chain_t *busy_bufs; ngx_chain_t *free_recv_bufs; + ngx_chain_t *filter_in_bufs; /* for the body filter */ + ngx_chain_t *filter_busy_bufs; /* for the body filter */ + ngx_http_cleanup_pt *cleanup; ngx_http_cleanup_t *free_cleanup; /* free list of cleanup records */ diff --git a/src/ngx_http_lua_output.c b/src/ngx_http_lua_output.c index c0aadffb4f..8d8c71d89e 100644 --- a/src/ngx_http_lua_output.c +++ b/src/ngx_http_lua_output.c @@ -244,6 +244,11 @@ ngx_http_lua_ngx_echo(lua_State *L, unsigned newline) return 2; } + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "%s has %sbusy bufs", + newline ? "lua say response" : "lua print response", + ctx->busy_bufs != NULL ? "" : "no "); + dd("downstream write: %d, buf len: %d", (int) rc, (int) (b->last - b->pos)); diff --git a/t/082-body-filter-2.t b/t/082-body-filter-2.t new file mode 100644 index 0000000000..27baf400c2 --- /dev/null +++ b/t/082-body-filter-2.t @@ -0,0 +1,55 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: + +BEGIN { + $ENV{TEST_NGINX_POSTPONE_OUTPUT} = 1; + $ENV{TEST_NGINX_EVENT_TYPE} = 'poll'; + $ENV{MOCKEAGAIN}='w' +} + +use Test::Nginx::Socket::Lua; + +#worker_connections(1014); +#master_process_enabled(1); +#log_level('warn'); + +log_level('debug'); + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 5); + +#no_diff(); +no_long_string(); + +run_tests(); + +__DATA__ + +=== TEST 1: check ctx->busy_bufs +--- config + location /t { + postpone_output 1; + content_by_lua_block { + for i = 1, 5 do + ngx.say(i, ": Hello World!") + end + } + + body_filter_by_lua_block { + ngx.arg[1] = ngx.arg[1] + } + } +--- request +GET /t +--- response_body +1: Hello World! +2: Hello World! +3: Hello World! +4: Hello World! +5: Hello World! + +--- error_log +waiting body filter busy buffer to be sent +lua say response has busy bufs +--- no_error_log +[error]