Skip to content
This repository has been archived by the owner on Feb 3, 2025. It is now read-only.

Commit

Permalink
Nhse o32 orkv.i58 backport (#60)
Browse files Browse the repository at this point in the history
* Optimisation for riak stats

Optimisations:

- Take the timestamp to the cache after get_stats/0 has returned, so that if get_stats/0 takes > 1s any requests in the queue for riak_kv_http_cache will still use the cache.

- refactor riak_kv_status:aliases/0 to use simple lists rather than orddict.

- remove altogether the sys_monitor_count, it is simply too expensive.  Available as a riak_kv_util module function instead for the experienced operator.

* Wait forever for call to respond

It takes more than 5s on some systems at the moment - and this then dumps an unhelpful crashdump to the user.  Make a longer default timeout, allow the timeout to be passed by the operator, and also return more operator-friendly error on timeout occurring.

* Fix meck dep

* Update following review
  • Loading branch information
martinsumner authored Jan 24, 2025
1 parent 9cb8d00 commit fe9d3ab
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 83 deletions.
3 changes: 1 addition & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
{provider_hooks, [
{pre, [{compile, {protobuf, compile}}]}
]}.

{profiles, [
{test, [{deps, [meck]}]},
{test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]},
{gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]}
]}.

Expand Down
36 changes: 21 additions & 15 deletions src/riak_kv_http_cache.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(riak_kv_http_cache).

-export([start_link/0,
get_stats/0]).
get_stats/1]).

-export([init/1,
handle_call/3,
Expand All @@ -10,15 +10,20 @@
terminate/2,
code_change/3]).

-define(SERVER, ?MODULE).
-record(st,
{
ts :: undefined|erlang:timestamp(),
stats = []
}
).

-record(st, {ts, stats = []}).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

get_stats() ->
gen_server:call(?MODULE, get_stats).
-spec get_stats(Milliseconds :: pos_integer()) -> list({atom(), term()}).
get_stats(Timeout) ->
gen_server:call(?MODULE, get_stats, Timeout).

init(_) ->
{ok, #st{}}.
Expand All @@ -40,15 +45,16 @@ code_change(_, S, _) ->
{ok, S}.

check_cache(#st{ts = undefined} = S) ->
S#st{ts = os:timestamp(), stats = do_get_stats()};
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats};
check_cache(#st{ts = Then} = S) ->
CacheTime =
application:get_env(riak_kv, http_stats_cache_milliseconds, 1000),
Now = os:timestamp(),
case timer:now_diff(Now, Then) < 1000000 of
true ->
S;
false ->
S#st{ts = Now, stats = do_get_stats()}
end.

do_get_stats() ->
riak_kv_wm_stats:get_stats().
case timer:now_diff(Now, Then) < (CacheTime * 1000) of
true ->
S;
false ->
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats}
end.
15 changes: 2 additions & 13 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
%% Update each stat with the exported function update/1. Add
%% a new stat to the internal stats/0 func to register a new stat with
%% folsom.
%%
%% Get the latest aggregation of stats with the exported function
%% get_stats/0. Or use folsom_metrics:get_metric_value/1,
%% or riak_core_stat_q:get_stats/1.
%%

-module(riak_kv_stat).

Expand All @@ -41,7 +36,7 @@
-endif.

%% API
-export([start_link/0, get_stats/0,
-export([start_link/0,
update/1, perform_update/1, register_stats/0, unregister_vnode_stats/1, produce_stats/0,
leveldb_read_block_errors/0, stat_update_error/3, stop/0]).
-export([track_bucket/1, untrack_bucket/1]).
Expand Down Expand Up @@ -71,11 +66,6 @@ unregister_vnode_stats(Index) ->
unregister_per_index(heads, Index),
unregister_per_index(puts, Index).

%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
riak_kv_wm_stats:get_stats().


%% Creation of a dynamic stat _must_ be serialized.
register_stat(Name, Type) ->
Expand Down Expand Up @@ -969,7 +959,6 @@ bc_stats(Pfx) ->
{sys_global_heaps_size, ?MODULE, value, [deprecated]},
{sys_heap_type, erlang, system_info, [heap_type]},
{sys_logical_processors, erlang, system_info, [logical_processors]},
{sys_monitor_count, riak_kv_stat_bc, sys_monitor_count, []},
{sys_otp_release, riak_kv_stat_bc, otp_release, []},
{sys_port_count, erlang, system_info, [port_count]},
{sys_process_count, erlang, system_info, [process_count]},
Expand Down Expand Up @@ -1127,7 +1116,7 @@ create_or_update_histogram_test() ->
Metric = [riak_kv,put_fsm,counter,time],
ok = repeat_create_or_update(Metric, 1, histogram, 100),
?assertNotEqual(exometer:get_value(Metric), 0),
Stats = get_stats(),
Stats = riak_kv_status:get_stats(web),
?LOG_INFO("stats prop list ~s", [Stats]),
?assertNotEqual(proplists:get_value({node_put_fsm_counter_time_mean}, Stats), 0)
after
Expand Down
12 changes: 0 additions & 12 deletions src/riak_kv_stat_bc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ system_version() ->
system_architecture() ->
list_to_binary(erlang:system_info(system_architecture)).

%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end, 0, processes()).

app_stats() ->
[{list_to_atom(atom_to_list(A) ++ "_version"), list_to_binary(V)}
|| {A,_,V} <- application:which_applications()].
Expand Down
79 changes: 53 additions & 26 deletions src/riak_kv_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,60 @@ get_stats(console) ->
++ riak_kv_stat_bc:disk_stats()
++ riak_kv_stat_bc:app_stats().

aliases() ->
Grouped = exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) ->
orddict:append(Entry, {DP, Alias}, Acc)
end, orddict:new()),
lists:keysort(
1,
lists:foldl(
fun({K, DPs}, Acc) ->
case exometer:get_value(K, [D || {D,_} <- DPs]) of
{ok, Vs} when is_list(Vs) ->
lists:foldr(fun({D,V}, Acc1) ->
{_,N} = lists:keyfind(D,1,DPs),
[{N,V}|Acc1]
end, Acc, Vs);
Other ->
Val = case Other of
{ok, disabled} -> undefined;
_ -> 0
end,
lists:foldr(fun({_,N}, Acc1) ->
[{N,Val}|Acc1]
end, Acc, DPs)
end
end, [], orddict:to_list(Grouped))).

aliases() ->
AllStats =
exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) -> [{Entry, {DP, Alias}}|Acc] end,
[]
),
case AllStats of
[] ->
[];
AllStats when is_list(AllStats) ->
{{FinalEntry, FinalDPMap}, AliasVals} =
lists:foldl(
fun({Entry, {DP, Alias}}, {{PrevEntry, DPmap}, Acc}) ->
case Entry of
Entry when Entry == PrevEntry ->
{{PrevEntry, maps:put(DP, Alias, DPmap)}, Acc};
Entry when PrevEntry == none ->
{{Entry, maps:put(DP, Alias, DPmap)}, Acc};
Entry ->
UpdAcc = get_exometer_values(PrevEntry, DPmap),
{{Entry, #{DP => Alias}}, UpdAcc ++ Acc}
end
end,
{{none, #{}}, []},
AllStats
),
lists:keysort(
1,
get_exometer_values(FinalEntry, FinalDPMap) ++ AliasVals
)
end.

get_exometer_values(Entry, DPmap) ->
case exometer:get_value(Entry, maps:keys(DPmap)) of
{ok, Vs} when is_list(Vs) ->
lists:map(
fun({D, V}) ->
{maps:get(D, DPmap), V}
end,
Vs
);
Other ->
DefaultValue =
case Other of
{ok, disabled} -> disabled;
_ -> 0
end,
lists:map(
fun(A) -> {A, DefaultValue} end,
maps:values(DPmap)
)
end.

expand_disk_stats([{disk, Stats}]) ->
[{disk, [{struct, [{id, list_to_binary(Id)}, {size, Size}, {used, Used}]}
Expand Down
22 changes: 20 additions & 2 deletions src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
is_modfun_allowed/2,
shuffle_list/1,
kv_ready/0,
ngr_initial_timeout/0
ngr_initial_timeout/0,
sys_monitor_count/0
]).
-export([report_hashtree_tokens/0, reset_hashtree_tokens/2]).

Expand Down Expand Up @@ -714,7 +715,24 @@ get_initial_call(P) ->
_ ->
undefined
end.


%% @doc sys_monitor_count/0
%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(
fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end,
0, processes()
).


%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down
76 changes: 63 additions & 13 deletions src/riak_kv_wm_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
content_types_provided/2,
service_available/2,
forbidden/2,
malformed_request/2,
produce_body/2,
pretty_print/2
]).
-export([get_stats/0]).

-define(TIMEOUT, 30000). %% In milliseconds

-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").

-record(ctx, {}).
-record(ctx, {timeout = ?TIMEOUT :: non_neg_integer()}).

init(_) ->
{ok, #ctx{}}.
Expand Down Expand Up @@ -66,25 +69,72 @@ content_types_provided(ReqData, Context) ->
{"text/plain", pretty_print}],
ReqData, Context}.


service_available(ReqData, Ctx) ->
{true, ReqData, Ctx}.

malformed_request(RD, Ctx) ->
case wrq:get_qs_value("timeout", RD) of
undefined ->
{false, RD, Ctx};
TimeoutStr ->
try
case list_to_integer(TimeoutStr) of
Timeout when Timeout > 0 ->
{false, RD, Ctx#ctx{timeout=Timeout}}
end
catch
_:_ ->
{true,
wrq:append_to_resp_body(
io_lib:format(
"Bad timeout value ~0p "
"expected milliseconds > 0",
[TimeoutStr]
),
wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
Ctx}
end
end.

forbidden(RD, Ctx) ->
{riak_kv_wm_utils:is_forbidden(RD), RD, Ctx}.

produce_body(ReqData, Ctx) ->
Stats= riak_kv_http_cache:get_stats(),
Body = mochijson2:encode({struct, Stats}),
{Body, ReqData, Ctx}.
produce_body(RD, Ctx) ->
try
Stats = riak_kv_http_cache:get_stats(Ctx#ctx.timeout),
Body = mochijson2:encode({struct, Stats}),
{Body, RD, Ctx}
catch
exit:{timeout, _} ->
{
{halt, 503},
wrq:set_resp_header(
?HEAD_CTYPE,
"text/plain",
wrq:append_to_response_body(
io_lib:format(
"Request timed out after ~w ms",
[Ctx#ctx.timeout]
),
RD
)
),
Ctx
}
end.

%% @spec pretty_print(webmachine:wrq(), context()) ->
%% {string(), webmachine:wrq(), context()}
%% @doc Format the respons JSON object is a "pretty-printed" style.
pretty_print(RD1, C1=#ctx{}) ->
{Json, RD2, C2} = produce_body(RD1, C1),
{json_pp:print(binary_to_list(list_to_binary(Json))), RD2, C2}.

pretty_print(RD, Ctx) ->
case produce_body(RD, Ctx) of
{{halt, RepsonseCode}, UpdRD, UpdCtx} ->
{{halt, RepsonseCode}, UpdRD, UpdCtx};
{Json, UpdRD, UpdCtx} ->
{
json_pp:print(binary_to_list(list_to_binary(Json))),
UpdRD,
UpdCtx
}
end.

get_stats() ->
riak_kv_status:get_stats(web).

0 comments on commit fe9d3ab

Please sign in to comment.