From a03a49bf04e4d9c4dd8fecca751c16ce9283b101 Mon Sep 17 00:00:00 2001 From: Luis Duarte Date: Fri, 2 Jun 2017 15:28:13 +0100 Subject: [PATCH] Replaced atomic and thread_safe for concurrent_ruby. Fix #17 --- lib/logstash/filters/throttle.rb | 15 ++++++++------- logstash-filter-throttle.gemspec | 5 ++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/logstash/filters/throttle.rb b/lib/logstash/filters/throttle.rb index 36dfefd..f2c8a08 100644 --- a/lib/logstash/filters/throttle.rb +++ b/lib/logstash/filters/throttle.rb @@ -1,7 +1,8 @@ require "logstash/filters/base" require "logstash/namespace" require "thread_safe" -require "atomic" +require "concurrent/atomics" +require "concurrent" # The throttle filter is for throttling the number of events. The filter is # configured with a lower bound, the "before_count", and upper bound, the "after_count", @@ -143,12 +144,12 @@ # Mike Pilone (@mikepilone) # -class ThreadSafe::TimeslotCache < ThreadSafe::Cache +class Concurrent::TimeslotCache < Concurrent::Map attr_reader :created def initialize(epoch, options = nil, &block) @created = epoch - @latest = Atomic.new(epoch) + @latest = Concurrent::AtomicFixnum.new(epoch) super(options, &block) end @@ -214,7 +215,7 @@ class LogStash::Filters::Throttle < LogStash::Filters::Base # performs initialization of the filter public def register - @key_cache = ThreadSafe::Cache.new + @key_cache = Concurrent::Map.new @max_age_orig = @max_age end # def register @@ -228,7 +229,7 @@ def filter(event) while true # initialise timeslot cache (if required) - @key_cache.compute_if_absent(key) { ThreadSafe::TimeslotCache.new(epoch) } + @key_cache.compute_if_absent(key) { Concurrent::TimeslotCache.new(epoch) } timeslot_cache = @key_cache[key] # try to get timeslot cache break unless timeslot_cache.nil? # retry until succesful @@ -243,14 +244,14 @@ def filter(event) while true # initialise timeslot and counter (if required) - timeslot_cache.compute_if_absent(timeslot_key) { Atomic.new(0) } + timeslot_cache.compute_if_absent(timeslot_key) { Concurrent::AtomicFixnum.new(0) } timeslot = timeslot_cache[timeslot_key] # try to get timeslot break unless timeslot.nil? # retry until succesful @logger.warn? and @logger.warn( "filters/#{self.class.name}: timeslot disappeared, increase max_age to prevent this.") end - + timeslot.update { |v| v + 1 } # increment counter count = timeslot.value # get latest counter value diff --git a/logstash-filter-throttle.gemspec b/logstash-filter-throttle.gemspec index 103db3b..6ef6254 100644 --- a/logstash-filter-throttle.gemspec +++ b/logstash-filter-throttle.gemspec @@ -20,9 +20,8 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency "thread_safe" - s.add_runtime_dependency "atomic" + s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0" + s.add_runtime_dependency "concurrent-ruby", "~> 1.0" s.add_development_dependency 'logstash-devutils' end