diff --git a/apisix/discovery/nacos/init.lua b/apisix/discovery/nacos/init.lua index d850d06cced8..86e9d4125a83 100644 --- a/apisix/discovery/nacos/init.lua +++ b/apisix/discovery/nacos/init.lua @@ -19,7 +19,6 @@ local require = require local local_conf = require('apisix.core.config_local').local_conf() local http = require('resty.http') local core = require('apisix.core') -local connection_util = require("apisix.utils.connection-util") local ipairs = ipairs local type = type local math = math @@ -90,9 +89,6 @@ local function request(request_uri, path, body, method, basic_auth) body = body, ssl_verify = true, }) - - connection_util.close_http_connection(httpc) - if not res then return nil, err end diff --git a/apisix/plugins/authz-casdoor.lua b/apisix/plugins/authz-casdoor.lua index 2cc3247515f1..c496ab6b1b45 100644 --- a/apisix/plugins/authz-casdoor.lua +++ b/apisix/plugins/authz-casdoor.lua @@ -15,7 +15,6 @@ -- limitations under the License. -- local core = require("apisix.core") -local connection_util = require("apisix.utils.connection-util") local http = require("resty.http") local session = require("resty.session") local ngx = ngx @@ -61,7 +60,7 @@ local function fetch_access_token(code, conf) ["Content-Type"] = "application/x-www-form-urlencoded" } }) - connection_util.close_http_connection(client) + if not res then return nil, nil, err end diff --git a/apisix/plugins/batch-requests.lua b/apisix/plugins/batch-requests.lua index 6a538f3fa4e4..53a7b9f87f2f 100644 --- a/apisix/plugins/batch-requests.lua +++ b/apisix/plugins/batch-requests.lua @@ -17,7 +17,6 @@ local core = require("apisix.core") local http = require("resty.http") local plugin = require("apisix.plugin") -local connection_util = require("apisix.utils.connection-util") local ngx = ngx local ipairs = ipairs local pairs = pairs @@ -250,6 +249,7 @@ local function batch_requests(ctx) httpc:set_timeout(data.timeout) local ok, err = httpc:connect("127.0.0.1", ngx.var.server_port) if not ok then + httpc:close() return 500, {error_msg = "connect to apisix failed: " .. err} end @@ -259,7 +259,7 @@ local function batch_requests(ctx) local responses, err = httpc:request_pipeline(data.pipeline) if not responses then - connection_util.close_http_connection(httpc) + httpc:close() return 400, {error_msg = "request failed: " .. err} end @@ -288,7 +288,7 @@ local function batch_requests(ctx) end core.table.insert(aggregated_resp, sub_resp) end - connection_util.close_http_connection(httpc) + httpc:close() return 200, aggregated_resp end diff --git a/apisix/plugins/cas-auth.lua b/apisix/plugins/cas-auth.lua index 64becb07f0de..d949636f5c75 100644 --- a/apisix/plugins/cas-auth.lua +++ b/apisix/plugins/cas-auth.lua @@ -16,7 +16,6 @@ ---- local core = require("apisix.core") local http = require("resty.http") -local connection_util = require("apisix.utils.connection-util") local ngx = ngx local ngx_re_match = ngx.re.match @@ -131,8 +130,6 @@ local function validate(conf, ctx, ticket) core.log.error("validate ticket failed: status=", (res and res.status), ", has_body=", (res and res.body ~= nil or false), ", err=", err) end - - connection_util.close_http_connection(httpc) return nil end diff --git a/apisix/plugins/elasticsearch-logger.lua b/apisix/plugins/elasticsearch-logger.lua index 3db5a0914546..2b126c434b3e 100644 --- a/apisix/plugins/elasticsearch-logger.lua +++ b/apisix/plugins/elasticsearch-logger.lua @@ -18,7 +18,6 @@ local core = require("apisix.core") local http = require("resty.http") local log_util = require("apisix.utils.log-util") -local connection_util = require("apisix.utils.connection-util") local bp_manager_mod = require("apisix.utils.batch-processor-manager") local ngx = ngx @@ -179,8 +178,6 @@ local function send_to_elasticsearch(conf, entries) headers = headers, body = body }) - - connection_util.close_http_connection(httpc) if not resp then return false, err end diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index 9917ade5daf0..88eca65ca04e 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -18,7 +18,6 @@ local core = require("apisix.core") local errlog = require("ngx.errlog") local batch_processor = require("apisix.utils.batch-processor") -local connection_util = require("apisix.utils.connection-util") local plugin = require("apisix.plugin") local timers = require("apisix.timers") local http = require("resty.http") @@ -274,7 +273,6 @@ local function send_to_skywalking(log_message) } ) - connection_util.close_http_connection(httpc) if not httpc_res then return false, "error while sending data to skywalking[" .. config.skywalking.endpoint_addr .. "] " .. httpc_err @@ -326,8 +324,6 @@ local function send_to_clickhouse(log_message) } ) - connection_util.close_http_connection(httpc) - if not httpc_res then return false, "error while sending data to clickhouse[" .. config.clickhouse.endpoint_addr .. "] " .. httpc_err diff --git a/apisix/plugins/google-cloud-logging.lua b/apisix/plugins/google-cloud-logging.lua index bd2b9d854b7b..74360e9b3984 100644 --- a/apisix/plugins/google-cloud-logging.lua +++ b/apisix/plugins/google-cloud-logging.lua @@ -19,7 +19,6 @@ local core = require("apisix.core") local tostring = tostring local http = require("resty.http") local log_util = require("apisix.utils.log-util") -local connection_util = require("apisix.utils.connection-util") local bp_manager_mod = require("apisix.utils.batch-processor-manager") local google_oauth = require("apisix.plugins.google-cloud-logging.oauth") @@ -127,7 +126,6 @@ local function send_to_google(oauth, entries) }, }) - connection_util.close_http_connection(http_new) if not res then return nil, "failed to write log to google, " .. err end diff --git a/apisix/plugins/google-cloud-logging/oauth.lua b/apisix/plugins/google-cloud-logging/oauth.lua index 2b0c11feff90..9913f8e0bf2d 100644 --- a/apisix/plugins/google-cloud-logging/oauth.lua +++ b/apisix/plugins/google-cloud-logging/oauth.lua @@ -16,7 +16,6 @@ -- local core = require("apisix.core") -local connection_util = require("apisix.utils.connection-util") local type = type local setmetatable = setmetatable @@ -59,7 +58,6 @@ function _M:refresh_access_token() }, }) - connection_util.close_http_connection(http_new) if not res then core.log.error("failed to refresh google oauth access token, ", err) diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua index f7522e66667c..33f028420c58 100644 --- a/apisix/plugins/http-logger.lua +++ b/apisix/plugins/http-logger.lua @@ -17,7 +17,6 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager") local log_util = require("apisix.utils.log-util") -local connection_util = require("apisix.utils.connection-util") local core = require("apisix.core") local http = require("resty.http") local url = require("net.url") @@ -34,6 +33,9 @@ local schema = { uri = core.schema.uri_def, auth_header = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, + keepalive = {type = "boolean", default = true}, + keepalive_timeout = {type = "integer", minimum = 1, default = 60}, + keepalive_pool = {type = "integer", minimum = 1, default = 5}, log_format = {type = "object"}, include_req_body = {type = "boolean", default = false}, include_req_body_expr = { @@ -134,20 +136,31 @@ local function send_http_data(conf, log_message) content_type = "text/plain" end - local httpc_res, httpc_err = httpc:request({ + local auth_headers = { + ["Host"] = host, + ["Content-Type"] = content_type, + ["Authorization"] = conf.auth_header + } + + local params = { + headers = auth_headers, + keepalive = conf.keepalive, + ssl_verify = conf.ssl_verify, method = "POST", - path = #url_decoded.path ~= 0 and url_decoded.path or "/", query = url_decoded.query, body = log_message, - headers = { - ["Host"] = url_decoded.host, - ["Content-Type"] = content_type, - ["Authorization"] = conf.auth_header - } - }) + } + + local request_uri = url_decoded.scheme .. "://" .. host .. ":" .. tostring(port) .. (#url_decoded.path ~= 0 and url_decoded.path or "/") + + if conf.keepalive then + params.keepalive_timeout = conf.keepalive_timeout * 1000 + params.keepalive_pool = conf.keepalive_pool + end + + local httpc_res, httpc_err = httpc:request_uri(request_uri, params) if not httpc_res then - connection_util.close_http_connection(httpc) return false, "error while sending data to [" .. host .. "] port[" .. tostring(port) .. "] " .. httpc_err end @@ -160,7 +173,6 @@ local function send_http_data(conf, log_message) .. "body[" .. httpc_res:read_body() .. "]" end - connection_util.close_http_connection(httpc) return res, err_msg end diff --git a/apisix/plugins/skywalking-logger.lua b/apisix/plugins/skywalking-logger.lua index 80c1df3c5ce4..6f4a2be35e81 100644 --- a/apisix/plugins/skywalking-logger.lua +++ b/apisix/plugins/skywalking-logger.lua @@ -17,7 +17,6 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager") local log_util = require("apisix.utils.log-util") -local connection_util = require("apisix.utils.connection-util") local core = require("apisix.core") local http = require("resty.http") local url = require("net.url") @@ -39,6 +38,10 @@ local schema = { service_instance_name = {type = "string", default = "APISIX Instance Name"}, log_format = {type = "object"}, timeout = {type = "integer", minimum = 1, default = 3}, + keepalive = {type = "boolean", default = true}, + keepalive_timeout = {type = "integer", minimum = 1, default = 60}, + keepalive_pool = {type = "integer", minimum = 1, default = 5}, + ssl_verify = {type = "boolean", default = false}, include_req_body = {type = "boolean", default = false}, include_req_body_expr = { type = "array", @@ -98,8 +101,34 @@ local function send_http_data(conf, log_message) core.log.info("sending a batch logs to ", conf.endpoint_addr) + if ((not port) and url_decoded.scheme == "https") then + port = 443 + elseif not port then + port = 80 + end + + local request_uri = url_decoded.scheme .. "://" .. host .. ":" .. tostring(port) .. "/v3/logs" + + local auth_headers = { + ["Host"] = host, + ["Content-Type"] = "application/json", + } + + local params = { + headers = auth_headers, + keepalive = conf.keepalive, + method = "POST", + body = log_message, + } + local httpc = http.new() httpc:set_timeout(conf.timeout * 1000) + + if conf.keepalive then + params.keepalive_timeout = conf.keepalive_timeout * 1000 + params.keepalive_pool = conf.keepalive_pool + end + local ok, err = httpc:connect(host, port) if not ok then @@ -107,17 +136,16 @@ local function send_http_data(conf, log_message) .. tostring(port) .. "] " .. err end - local httpc_res, httpc_err = httpc:request({ - method = "POST", - path = "/v3/logs", - body = log_message, - headers = { - ["Host"] = url_decoded.host, - ["Content-Type"] = "application/json", - } - }) + if url_decoded.scheme == "https" then + ok, err = httpc:ssl_handshake(true, host, conf.ssl_verify) + if not ok then + return false, "failed to perform SSL with host[" .. host .. "] " + .. "port[" .. tostring(port) .. "] " .. err + end + end + + local httpc_res, httpc_err = httpc:request_uri(request_uri, params) - connection_util.close_http_connection(httpc) if not httpc_res then return false, "error while sending data to [" .. host .. "] port[" .. tostring(port) .. "] " .. httpc_err