Page tree
Skip to end of metadata
Go to start of metadata


  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post


There are a few use cases that we want to support with this feature. The first is to support a use case where the pipeline is a geofencing use case, where the pipeline is processing user locations, and wants to send alerts when a user enters or leaves a geo fence. Similar use cases include pipelines that are reading user biometric data that need to send alerts if user heart rate exceeds a certain value, or pipelines that read machine cpu usage that need to send alerts if cpu usage exceeds a certain value. In these use cases, some uncommon event occurs that must be acted on by a separate system. The events that trigger these alerts still need to be processed by the rest of the pipeline, there is just some side effect that is triggered once they are observed.


The goal is to allow stages in a pipeline to emit alerts, which can be configured to be published to TMS or to Kafka.

User Stories 

  • As a pipeline developer, I want to be able to create a pipeline where alerts are published to Kafka or TMS when some condition is met
  • As a pipeline developer, I want to be able to create some pipelines that publish alerts and some that do not, even when the conditions are met
  • As a pipeline developer, I want to be able to configure which topic alerts are published to
  • As a pipeline developer, I want to be able to tell which plugins can emit alerts and which plugins cannot
  • As a pipeline developer, I want to be able to aggregate alerts before publishing them, to avoid duplicate alerts
  • As a cluster administrator, I want to be able to see how many alerts were published for a pipeline run
  • As a plugin developer, I want to be able to write arbitrary logic to control when to publish alerts
  • As a plugin developer, I want to be able to indicate which plugins can emit alerts
  • As a plugin developer, I want to be able to set a payload for any alert emitted
  • As a plugin developer, I want to be able to write a plugin that publishes alerts
  • As an alert consumer, I want the alert to contain the namespace, pipeline name, stage name, and payload


At a high level, we would like each existing plugin type (except sinks) to be able to emit alerts. An alert is not an arbitrary record, but must conform to a specific schema. Each plugin will indicate whether it can emit notifications or not, which can be reflected in the UI by an additional 'port'. When a pipeline stage is connected to a new 'AlertPublisher' plugin type, any notifications emitted by that stage will be sent to the AlertPublisher for actual publishing.


When the actual alerts are published is left up to the pipeline. If a AlertPublisher plugin is not attached to a stage, any alerts emitted by the stage will be dropped.


Publishing Alerts

Alerts can be published using a new AlertPublisher plugin type:

public abstract class AlertPublisher extends PipelineConfigurable, implements StageLifecycle<AlertPublisherContext> {
  private MessagingContext context;
  public void initialize(AlertPublisherContext context) throws Exception {
    this.context = context;

  public void publish(Iterator<Alert> alerts) throws Exception {

  public void destroy() {
    // no-op
public interface AlertPublisherContext extends MessagingContext, StageContext {
public interface Alert {
  // get the stage the alert was emitted from
  String getStageName();
  // get the payload of the alert
  Map<String, String> getPayload();

We'll also need to add a getNamespace() and getPipelineName() methods to StageContext, as MessagePublisher requires the namespace, and publishers need access to the pipeline name.

This approach means that it is the plugin determines the structure of the alert that is published, meaning whatever consumes the alert must be aware of the format.

Emitting Alerts

The Emitter interface will be enhanced to allow emitting an alert:

public interface Emitter<T> {

  // existing method
  void emit(T value);

  // existing method
  void emitError(InvalidEntry<T> invalidEntry);

  void emitAlert(Map<String, String> payload);

Plugin Alert Port Indication

The UI should have some way of indicating which plugins can possibly emit alerts and which ones cannot. Since this is really just a UI thing, we can add a property to the widgets json config

  "metadata": {
    "spec-version": "1.3",
    "emitsAlerts": true
  "configuration-groups": [ ],
  "outputs": []

Note that this can also be done for plugins that can emit errors.

Aggregating Alerts

Each stage will be able to specify some aggregations to perform on the alerts it emits. Aggregations are useful when the pipeline receives many records that trigger an alert, but only wants to publish a single alert. For example, when receiving heartbeat data, a transform may alert if the heartbeat exceeds a threshold. However, there may be a heartbeat measurement for each user for each second, and we may only want to publish one alert for each user, not an alert for each second for each user. For example, suppose a transform emits alerts with payloads:

  { "userid": "abc", "name": "alice", "heartbeat": "180" },
  { "userid": "abc", "name": "alice", "heartbeat": "184" },
  { "userid": "abc", "name": "alice", "heartbeat": "182" },
  { "userid": "def", "name": "bob", "heartbeat": "200" },
  { "userid": "def", "name": "bob", "heartbeat": "198" },
  { "userid": "def", "name": "bob", "heartbeat": "202" }

The pipeline developer may want to aggregate these alerts to reduce them to:

  { "userid": "abc", "name": "alice", "avgheartbeat": "182" },
  { "userid": "def", "name": "bob", "avgheartbeat": "200" }

To support this, all stages in will be able to specify an alerts section that will support grouping alerts and performing simple aggregations:

  "stages": [
      "name": "transform",
      "plugin": { ... },
      "alerts": {
        "groupBy": [ "userid", "name" ], // optional. If not specified, all alerts will be aggregated into a single alert
        "aggregates": [ 
            "name": "avgheartbeat",
            "function": "mean",
            "field": "heartbeat"

Aggregates will be performed on the alert payloads. To start with, 'avg', 'sum', 'count', 'min', 'max' aggregate functions will be supported. If used, payload values will be interpreted as doubles.


Alerts will be collected in the Spark Driver in memory (through the .collect() method), before being sent to an AlertPublisher. This is because we are not able to publish within a Spark closure, and in case the publisher wants to do some dedup logic or aggregation logic. This also means that users should be educated to avoid emitting a large number of alerts, or alerts with large payloads, as the driver could run out of memory. In MapReduce, tasks are also unable to publish to TMS. The program will need to write all alerts to a local FileSet, then read from the FileSet and publish everything at the end.

API changes

New Programmatic APIs

Emitter.emitAlert and AlertPublisher are new APIs.

Deprecated Programmatic APIs


Modified REST APIs


Deprecated REST API


CLI Impact or Changes

  • None

UI Impact or Changes

  • UI must be able to detect which plugins can emit notifications, and display a corresponding port
  • UI must display metrics for notifications emitted

Security Impact 

This feature will use TMS, so any authorization added to TMS will affect this feature.

Impact on Infrastructure Outages 


Test Scenarios

Test IDTest DescriptionExpected Results


Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work

  • No labels


  1. We should capture user stories around the alert events. Can this be anything or will it have a fixed form? What are all the content that is be sent in the alert event and how to control those?

    1. Good points, I have added a couple user stories around the alert content. The publisher will have access to the pipeline namespace, pipeline name, stage the alert was emitted from, and payload. It will be up to the publisher how to format it, but for simplicity we can start with a couple plugins that publish that data as a json object. 



  2. "When the actual alerts are published is left up to the pipeline."
    What are the different options available for when to publish? How will user convey his choice among those options?

    1. That was unclear wording. I was trying to say that it's left up to the app code, meaning there is no contract for how much time should pass between the code emitting an alert and the publisher actually publishing the alert. Alerts may not (and will not for the first version) be emitted immediately. If the pipeline fails, alerts may or may not have been published.

  3. pipeline developer may want to aggregate these alerts to reduce them to:

    How often are they aggregated? In a streaming pipeline vs a batch pipeline?

    How is the aggregation happening? If a pipeline emits a lot of alerts (in extreme case, one for each event it processes), will it cause high memory usage?

    1. Aggregation will happen once per batch (once in batch, once every interval in streaming), and will be done through the execution engine, not in memory. In other words, they will be similar to actually placing an aggregator node in the pipeline. 

      As I'm typing this, it's another reminder that Alerts really seem like just a special case of writing to an output port (once Pipeline Splitter Design is done), and having a sink that writes to some queue system. I wonder if the difference is really large enough to warrant differentiating them.

      1. It probably is worth it, because to user an alert is a very different thing than a data sink. Even if the implementation is similar.