Source code for changes.jobs.signals
from flask import current_app
from changes.queue.task import tracked_task
from changes.utils.imports import import_string
class SuspiciousOperation(Exception):
pass
@tracked_task
[docs]def fire_signal(signal, kwargs):
"""
Tasks fire signals by spawning fire_signal tasks; they grab every
associated listener and spawn run_event_listener tasks for each
"""
for listener, l_signal in current_app.config['EVENT_LISTENERS']:
if l_signal == signal:
run_event_listener.delay(
listener=listener,
signal=signal,
kwargs=kwargs,
)
@tracked_task
[docs]def run_event_listener(listener, signal, kwargs):
"""
Actually run the listener
See fire_signal, which doesn't actually run it
"""
# simple check to make sure this is registered
if not any(l == listener for l, _ in current_app.config['EVENT_LISTENERS']):
raise SuspiciousOperation('%s is not a registered event listener' % (listener,))
func = import_string(listener)
func(**kwargs)