Skip to Content Skip to Search

class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public class methods

new()

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 51
def initialize
  @mutex = Mutex.new
  @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
  @other_subscribers = []
  @all_listeners_for = Concurrent::Map.new
  @groups_for = Concurrent::Map.new
  @silenceable_groups_for = Concurrent::Map.new
end

Public instance methods

all_listeners_for(name)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 298
def all_listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @all_listeners_for[name] || @mutex.synchronize do
    # use synchronisation when accessing @subscribers
    @all_listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end

build_handle(name, id, payload)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 273
def build_handle(name, id, payload)
  Handle.new(self, name, id, payload)
end

finish(name, id, payload, listeners = nil)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 284
def finish(name, id, payload, listeners = nil)
  handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
  handle = handle_stack.pop
  handle.finish_with_values(name, id, payload)
end

listeners_for(name)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 307
def listeners_for(name)
  all_listeners_for(name).reject { |s| s.silenced?(name) }
end

listening?(name)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 311
def listening?(name)
  all_listeners_for(name).any? { |s| !s.silenced?(name) }
end

publish(name, *args)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 290
def publish(name, *args)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end

publish_event(event)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 294
def publish_event(event)
  iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end

start(name, id, payload)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 277
def start(name, id, payload)
  handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
  handle = build_handle(name, id, payload)
  handle_stack << handle
  handle.start
end

subscribe(pattern = nil, callable = nil, monotonic: false, &block)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 65
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  @mutex.synchronize do
    case pattern
    when String
      @string_subscribers[pattern] << subscriber
      clear_cache(pattern)
    when NilClass, Regexp
      @other_subscribers << subscriber
      clear_cache
    else
      raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
    end
  end
  subscriber
end

unsubscribe(subscriber_or_name)

Permalink
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 82
def unsubscribe(subscriber_or_name)
  @mutex.synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      clear_cache(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        clear_cache(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        clear_cache
      end
    end
  end
end

wait()

Permalink

This is a sync queue, so there is no waiting.

Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 316
def wait
end

Namespace

Definition files