Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/sync): improve error handling #14196

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,16 @@ local function is_rpc_ready()
end


local function do_sync()
local do_sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this forward declaration should be removed.



local function error_handler(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have removed some code in this function, I don't think it is necessary now (only two lines).,

ngx_log(ngx_ERR, err)
return nil, err
end


function do_sync()
ADD-SP marked this conversation as resolved.
Show resolved Hide resolved
if not is_rpc_ready() then
return nil, "rpc is not ready"
end
Expand All @@ -195,13 +204,14 @@ local function do_sync()

local ns_delta = ns_deltas.default
if not ns_delta then
return nil, "default namespace does not exist inside params"
return error_handler("default namespace does not exist inside params")
end

local deltas = ns_delta.deltas
local is_full_sync = ns_delta.wipe

if not deltas then
return nil, "sync get_delta error: deltas is null"
return error_handler("sync get_delta error: deltas is null")
end

if isempty(deltas) then
Expand All @@ -225,8 +235,7 @@ local function do_sync()

local t = txn.begin(512)

local wipe = ns_delta.wipe
if wipe then
if is_full_sync then
ngx_log(ngx_INFO, "[kong.sync.v2] full sync begins")

t:db_drop(false)
Expand Down Expand Up @@ -257,20 +266,20 @@ local function do_sync()
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
return error_handler(err)
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
if old_entity and not is_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
return error_handler(err)
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
return error_handler(err)
end

ngx_log(ngx_DEBUG,
Expand All @@ -279,22 +288,22 @@ local function do_sync()
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
if not is_full_sync then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

else
-- delete the entity, opts for getting correct lmdb key
local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key
if err then
return nil, err
return error_handler(err)
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
if old_entity and not is_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
return error_handler(err)
end
end

Expand All @@ -304,13 +313,13 @@ local function do_sync()
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
if not is_full_sync then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
if not is_full_sync then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand All @@ -329,16 +338,16 @@ local function do_sync()
-- record the default workspace into LMDB for any of the following case:
-- * wipe is false, but the default workspace has been changed
-- * wipe is true (full sync)
if default_ws_changed or wipe then
if default_ws_changed or is_full_sync then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
end

local ok, err = t:commit()
if not ok then
return nil, err
return error_handler("failure on commiting to LMDB: " .. err)
end

if wipe then
if is_full_sync then
ngx_log(ngx_INFO, "[kong.sync.v2] full sync ends")

kong.core_cache:purge()
Expand All @@ -351,7 +360,7 @@ local function do_sync()
local reconfigure_data = { kong.default_workspace, nil, nil, nil, }
local ok, err = events.declarative_reconfigure_notify(reconfigure_data)
if not ok then
return nil, err
return error_handler(err)
end

else
Expand Down
Loading