From c3b950d0c057320b2f66be80aac7dbf970475be9 Mon Sep 17 00:00:00 2001 From: Luismi Cavalle Date: Sun, 16 Oct 2011 13:24:43 +0200 Subject: [PATCH] Eventwired! (TODO: Review this commit for future improvements on Eventwire) --- Gemfile | 2 + Gemfile.lock | 9 +++ Procfile | 2 +- config/initializers/infrastructure.rb | 14 ++++- lib/infrastructure/domain_repository.rb | 4 +- lib/infrastructure/event_bus.rb | 11 ---- lib/infrastructure/event_bus/amqp.rb | 63 ------------------- lib/infrastructure/event_bus/in_process.rb | 21 ------- lib/infrastructure/event_bus/redis.rb | 73 ---------------------- lib/infrastructure/event_bus/zero.rb | 51 --------------- lib/infrastructure/event_handler.rb | 12 ++-- lib/tasks/event_bus.rake | 5 +- spec/acceptance/acceptance_helper.rb | 11 ++-- spec/acceptance/support/commands.rb | 1 - 14 files changed, 37 insertions(+), 242 deletions(-) delete mode 100644 lib/infrastructure/event_bus.rb delete mode 100644 lib/infrastructure/event_bus/amqp.rb delete mode 100644 lib/infrastructure/event_bus/in_process.rb delete mode 100644 lib/infrastructure/event_bus/redis.rb delete mode 100644 lib/infrastructure/event_bus/zero.rb diff --git a/Gemfile b/Gemfile index ae231c4..630f5f8 100644 --- a/Gemfile +++ b/Gemfile @@ -5,6 +5,8 @@ gem 'rails', '3.0.0' gem 'uuidtools' gem 'ohm' +gem 'eventwire', :git => 'git://github.com/cavalle/eventwire.git' + gem 'em-redis', :require => false gem 'bunny', :require => false diff --git a/Gemfile.lock b/Gemfile.lock index 28ab31a..49137e6 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,10 @@ +GIT + remote: git://github.com/cavalle/eventwire.git + revision: 073cdf9e82f557779ea4e97c646ecae3b895df5a + specs: + eventwire (0.0.1) + json + GEM remote: http://rubygems.org/ specs: @@ -67,6 +74,7 @@ GEM term-ansicolor (~> 1.0.5) thor (>= 0.13.6) i18n (0.4.2) + json (1.6.1) json_pure (1.5.0) launchy (0.3.7) configuration (>= 0.0.5) @@ -167,6 +175,7 @@ DEPENDENCIES bunny capybara em-redis + eventwire! ffi-rzmq foreman launchy diff --git a/Procfile b/Procfile index 1e323fa..837267d 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1,2 @@ web: bundle exec thin start -bus: bundle exec rake event_bus:start \ No newline at end of file +bus: bundle exec rake environment eventwire:work --trace \ No newline at end of file diff --git a/config/initializers/infrastructure.rb b/config/initializers/infrastructure.rb index 81d4f52..ff79780 100644 --- a/config/initializers/infrastructure.rb +++ b/config/initializers/infrastructure.rb @@ -1,5 +1,13 @@ Rails.application.class.configure do - config.event_bus = 'EventBus::Redis' - config.event_subscribers = %w{ClientReport ClientDetailsReport AccountDetailsReport MoneyTransferSaga} - config.to_prepare { EventBus.init } + config.to_prepare do + # Initialize Eventwire before each request so that using the InProcess driver + # in development the event handlers are declared only once + Eventwire.driver = Rails.env.test? ? 'InProcess' : 'Redis' + Eventwire.on_error do |ex| + raise ex + end + ClientReport; ClientDetailsReport; AccountDetailsReport; MoneyTransferSaga + end end + + diff --git a/lib/infrastructure/domain_repository.rb b/lib/infrastructure/domain_repository.rb index 57511e0..4b7c82c 100644 --- a/lib/infrastructure/domain_repository.rb +++ b/lib/infrastructure/domain_repository.rb @@ -1,6 +1,8 @@ module DomainRepository class << self + + include Eventwire::Publisher def aggregates Thread.current[:"DomainRepositoryCurrentStore"] @@ -48,7 +50,7 @@ def save(event) end def publish(event) - EventBus.publish(event) + publish_event(event.name, {:data => event.data}) end end diff --git a/lib/infrastructure/event_bus.rb b/lib/infrastructure/event_bus.rb deleted file mode 100644 index 94fc57f..0000000 --- a/lib/infrastructure/event_bus.rb +++ /dev/null @@ -1,11 +0,0 @@ -module EventBus - class << self - attr_accessor :current - delegate :publish, :subscribe, :wait_for_events, :start, :purge, :stop, :to => :current - - def init - EventBus.current = Rails.configuration.event_bus.constantize.new - Rails.configuration.event_subscribers.each(&:constantize) - end - end -end \ No newline at end of file diff --git a/lib/infrastructure/event_bus/amqp.rb b/lib/infrastructure/event_bus/amqp.rb deleted file mode 100644 index 30d89fc..0000000 --- a/lib/infrastructure/event_bus/amqp.rb +++ /dev/null @@ -1,63 +0,0 @@ -require 'amqp' -require 'bunny' - -class EventBus::AMQP - - def publish(event) - Bunny.run do |mq| - mq.exchange(event.name, :type => :fanout).publish(event.id) - end - end - - def subscribe(event_name, handler_id, &handler) - subscriptions << Subscription.new(event_name, handler_id, handler) - end - - def wait_for_events - next_tick - end - - def next_tick - return unless EM.reactor_running? - t = Thread.current - EM.next_tick { t.wakeup } - Thread.stop - end - - def purge - Bunny.run do |mq| - subscriptions.each { |s| s.purge(mq) } - end - end - - def start - AMQP.start do - subscriptions.each { |s| s.subscribe(MQ) } - end - end - - def stop - AMQP.stop { EM.stop } - end - - private - - def subscriptions - @subscriptions ||= [] - end - - class Subscription < Struct.new(:event, :queue, :handler) - - def subscribe(mq) - mq.queue(queue).bind(mq.fanout(event)).subscribe do |event_id| - handler.call Event[event_id] - end - end - - def purge(mq) - mq.queue(queue).purge - end - - end - -end diff --git a/lib/infrastructure/event_bus/in_process.rb b/lib/infrastructure/event_bus/in_process.rb deleted file mode 100644 index bbc97f1..0000000 --- a/lib/infrastructure/event_bus/in_process.rb +++ /dev/null @@ -1,21 +0,0 @@ -class EventBus::InProcess - def publish(event) - subscriptions(event.name).each do |subscription| - subscription.call(event) - end - end - - def subscriptions(event_name) - @subscriptions ||= Hash.new - @subscriptions[event_name] ||= Set.new - end - - def subscribe(event_name, handler_id, &handler) - subscriptions(event_name) << handler - end - - def wait_for_events; end - def purge; end - def start; end - def stop; end -end \ No newline at end of file diff --git a/lib/infrastructure/event_bus/redis.rb b/lib/infrastructure/event_bus/redis.rb deleted file mode 100644 index 54d61ea..0000000 --- a/lib/infrastructure/event_bus/redis.rb +++ /dev/null @@ -1,73 +0,0 @@ -require 'redis' -require 'em-redis' - -class EventBus::Redis - def publish(event) - Redis.new.rpush event.name, event.id - end - - def initialize - @subscriptions = [] - @handlers = [] - end - - def subscribe(event_name, handler_id, &handler) - @subscriptions << [event_name.to_s, handler_id] - @handlers << [handler_id, handler] - end - - def wait_for_events - 7.times { next_tick } - end - - def next_tick - return unless EM.reactor_running? - t = Thread.current - EM.next_tick { t.wakeup } - Thread.stop - end - - def purge - redis = Redis.new - all_queues.each do |queue| - redis.del queue - end - end - - def all_queues - (@subscriptions + @handlers).map(&:first).uniq - end - - def start - EM.run do - redis = EM::Protocols::Redis.connect - - @subscriptions.group_by(&:first).each do |event, subscriptions| - subscribe_to_queue redis, event do |event_id| - subscriptions.each do |event, queue| - redis.rpush queue, event_id - end - end - end - - @handlers.each do |queue, handler| - subscribe_to_queue redis, queue do |event_id| - handler.call Event[event_id] - end - end - - end - end - - def subscribe_to_queue(redis, queue, &block) - callback = Proc.new do |response| - block.call(response) if response - redis.lpop(queue, &callback) - end - redis.lpop(queue, &callback) - end - - def stop - EM.stop - end -end diff --git a/lib/infrastructure/event_bus/zero.rb b/lib/infrastructure/event_bus/zero.rb deleted file mode 100644 index f7deeaf..0000000 --- a/lib/infrastructure/event_bus/zero.rb +++ /dev/null @@ -1,51 +0,0 @@ -require 'ffi-rzmq' - -class EventBus::Zero - def publish(event) - ctx = ZMQ::Context.new - s = ctx.socket ZMQ::PUSH - s.connect("tcp://127.0.0.1:5560") - s.send_string(event.id.to_s) - s.close - ctx.terminate - end - - def subscriptions(event_name) - @subscriptions ||= Hash.new - @subscriptions[event_name.to_s] ||= Set.new - end - - def subscribe(event_name, handler_id, &handler) - subscriptions(event_name.to_s) << handler - end - - def wait_for_events - sleep(0.15) # next_tick - end - - def purge - end - - def start - ctx = ZMQ::Context.new - s = ctx.socket ZMQ::PULL - s.bind("tcp://127.0.0.1:5560") - @running = true - while @running - event_id = s.recv_string(ZMQ::NOBLOCK) - next unless event_id - event = Event[event_id] - subscriptions(event.name).each do |subscription| - subscription.call(event) - end - end - ensure - s.close - ctx.terminate - end - - def stop - @running = false - end - -end diff --git a/lib/infrastructure/event_handler.rb b/lib/infrastructure/event_handler.rb index af21c6d..9b102c7 100644 --- a/lib/infrastructure/event_handler.rb +++ b/lib/infrastructure/event_handler.rb @@ -1,13 +1,13 @@ module EventHandler + include Eventwire::Subscriber::DSL + def on(*events, &block) events.each do |event_name| - ::EventBus.subscribe(event_name, "#{name}:#{increment_handlers_count}:#{event_name}", &block) + super(event_name) do |event| + event.data = event.data.to_hash.symbolize_keys + block.call(event) + end end end - def increment_handlers_count - @handlers_count ||= 0 - @handlers_count += 1 - end - end diff --git a/lib/tasks/event_bus.rake b/lib/tasks/event_bus.rake index 443c460..c645346 100644 --- a/lib/tasks/event_bus.rake +++ b/lib/tasks/event_bus.rake @@ -1,4 +1 @@ -desc "Start Event Bus" -task "event_bus:start" => :environment do - EventBus.start -end +require 'eventwire/tasks' diff --git a/spec/acceptance/acceptance_helper.rb b/spec/acceptance/acceptance_helper.rb index 174b508..134b956 100644 --- a/spec/acceptance/acceptance_helper.rb +++ b/spec/acceptance/acceptance_helper.rb @@ -8,21 +8,18 @@ config.before(:each) do Ohm.flush - EventBus.purge - @t = Thread.new { EventBus.start } - @t.abort_on_exception = true end config.after(:each) do - EventBus.stop - @t.join(1) - @t.kill Capybara.reset_sessions! end end +Eventwire.on_error do |ex| + raise ex +end + Capybara.app = Proc.new { |env| - EventBus.wait_for_events Rails.application.call(env) } diff --git a/spec/acceptance/support/commands.rb b/spec/acceptance/support/commands.rb index 5952cc7..eda4e76 100644 --- a/spec/acceptance/support/commands.rb +++ b/spec/acceptance/support/commands.rb @@ -28,7 +28,6 @@ def execute_command(*args) DomainRepository.begin result = "#{args.shift}_command_handler".camelize.constantize.new.execute(*args) DomainRepository.commit - EventBus.wait_for_events result end