Page tree
Skip to end of metadata
Go to start of metadata

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction

CDAP data pipelines currently allows time based triggering of pipelines. Having only the time based trigger to start the pipelines has a few drawbacks -  this doesn’t allow for logically separating and operating a lot of smaller pipelines which can depend on each other. Even if the pipeline can be separated into a smaller set of pipelines which can be triggered periodically, the first step in the pipeline has to check if the rest of the pipeline needs to be executed. There is a need to trigger pipelines based on states of a different pipeline.

Use Cases

FTP Download use-case

  1. As an ETL developer, I would like to process bloomberg and reuters feeds. This feed is downloaded from an FTP site and is already being processed by another team and stored in a staging area. I do not wish to download the feed again, but would like to run my pipeline as soon as the feed data is available in the staging area. To run my pipeline, I would like to know the list of files that are freshly downloaded.

  2. As an ETL developer, consuming the bloomberg and reuters feed, I do not wish to reprocess the same feed multiple times. I would like to process the feed only if the upstream pipeline has successfully completed.

Gold dataset consumption use-case

  1. As an ETL developer, I would like to read a golden data set that is created from another team. The gold data is read from a database incrementally, cleansed, anonymized and standardized by our IT ops team via ETL process. I wish to consume the data that is generated by the IT ops team and do not wish to rerun the standardization process myself since that might change. I only wish to consume the golden data set that is created for my purposes and as an input for my pipeline. I would like to know the directory location of the golden data set. I would like to run the pipeline as soon as the new dataset is available.

  2. As an ETL developer, I would like to know what feeds were responsible for creating the golden dataset, since I have to look up metadata for some feeds in my pipeline. I have configured the feeds that are present in the golden data set as a runtime argument.

  3. As an ETL developer, I would like to reprocess my pipeline that consumes golden dataset automatically up to two times if it has failed. If the re-processing has failed more than twice, then I do not wish to re-process further.

High Level Requirements

Platform

  1. Platform should allow event notification for all program states

  2. Event notification (EN) should be published to a system topic in TMS

  3. Event notification should have a well defined event structure

  4. Event notification should be published for all types of programs on Completion, failure and if killed by YARN

  5. Event notification should have a payload to pass in information from one program to another

  6. Pipelines should be configured to be triggered based on event from another program

  7. Pipelines should be triggered based on either

    1. Another Pipeline state

    2. Another Pipeline state and payload

      1. Conditions can be specified either if certain key exists, or if key equals a value or if key matches a condition (ex: runcount < 2)

Data pipeline

  1. Users should be able to configure what parameters are emitted in the event payload from a pipeline. Hence the plugin must expose what parameters are emitted. Example: file-path, files-processed

  2. Users configuring a pipeline (Golden Feed Consumer) should be able to select the fields from upstream pipeline (Gold Feed Generator) and set the the run time arguments needed for the current pipeline (Golden Feed Consumer)

Plugin

  1. Plugins should expose what parameters it can expose to the event notification

  2. The parameters that can be exposed is defined in pipeline configuration stage (pipeline studio) and not at runtime

UI

  1. Capabilities to add event based triggers in studio mode

  2. Capabilities to specify condition for triggering pipeline (ex: files.processed from Gold Feed Generator should not be empty)

  3. Capabilities to select what fields are published in event notification

Design

Assumptions

  • When we say "program" this refers to any program that can be scheduled. While the primary use case may be starting other pipelines after one pipeline finishes, MapReduce and Spark jobs can also be started.
  • We will only allow programs to be triggered with any combination of the following program statuses: ProgramStatus.COMPLETED, ProgramStatus.FAILED, and ProgramStatus.KILLED.
    • There is no known use case for triggering a program when another program is ProgramStatus.INITIALIZING or ProgramStatus.RUNNING, so this will be restricted to prevent unexpected behavior.
  • All combinations of existing constraints are possible.

Approach

  • Implementing Program Status Based Scheduling is a two step process:
    1. We need to determine when the program status has changed.
    2. Based on the program status change, the scheduler must schedule the program.

Both of these steps will be covered below.

1. Determining a Program State Change:

  • Whenever a program status changes, we can send program status based notifications to the Transactional Messaging System (TMS).
  • These messages can be emitted to topic program.status.event.topic within TMS.
  • The approach here will be to send a notification whenever the program status changes are persisted to the store. This occurs in each program runner.

Concerns:

  • What happens if a YARN container crashes?
    • Response: We already monitor running programs and mark the status as failed when it crashes.

2. Scheduling Based on the Program Status Change:

  • The scheduler will subscribe to the program status notifications with topic program.status.event.topic

  • Based on the notification received, it will add a new job to the job queue.
  • The existing scheduler implementation will handle data persistence and starting the job when all of its constraints have been satisfied, so the rest of the scheduler implementation remains unchanged and will work with the proposed changes.

Program Status Notification Payload Design:

Every time a program's status changes, the following will be sent in a payload through TMS (not used for scheduling, but the inherent design):

  • programRunId <the specific run of a program>
  • twillRunId: <only in distributed mode>
  • programRunStatus <the updated run status of the program>
  • stateChangeTime: <when the program has transitioned to starting, running, or 'terminal' state
  • userArguments and systemArguments: <only when the program has transition to starting>

Program Status Scheduling Notification Payload Design:

The program status notifications received from TMS will be forwarded to the scheduler if and only if that program status notification can trigger another program. The following parameters will be passed in this notification between a triggering program and a triggered program:

  • all userArguments (runtimeArguments) of the triggering program with no renaming (renaming to be done later)
  • all USER-scoped workflow token key-value pairs of the triggering program with no renaming (renaming to be done later)
  • triggering program run properties (programRunId, startTs, runTs, stopTs)

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Triggers
public class ProgramStatusTrigger extends Trigger {
  private final ProgramId programId;
  private final String key;
  private final Predicate<Value> condition;
  private final Set<ProgramStatus> statuses; // Use Set since it removes duplicates.
 
  public ProgramStatusTrigger(ProgramId programId, @Nullable String key, @Nullable Predicate<Value> condition, ProgramStatus... statuses) {
	this.programId = programId;
 	this.key = key;
	this.condition = condition;
	this.statuses = new HashSet<ProgramStatus>(Arrays.asList(statuses));
  }
}

External Methods (in ScheduleBuilder)

Trigger Definition
public class ScheduleBuilder {
	... 
 
	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, String applicationVersion,
			                                              ProgramType programType, String program,
			                                              ProgramStatus... programStatuses);

	// Assume same application version as starting program
	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, ProgramType programType,
                                               			  String program, ProgramStatus...  programStatuses);
 
	// Assume same application name and version as starting program
	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, ProgramType programType,
			                                              String program, ProgramStatus... programStatuses);
 
	// Assume same namespace, application, and application version as starting program
	public ScheduleCreationBuilder triggerOnProgramStatus(ProgramType programType, String program,
                                               			  ProgramStatus...  programStatuses);	
};

 

There needs to be a way to restrict these defined constraints to only schedules triggered by a program status (specific program status constraints)

Deprecated Programmatic APIs

There are none.

New REST APIs

PathMethodDescriptionRequest BodyResponse CodeResponse
/v3/namespaces/<namespace-id>/apps/<app-id>/schedules/<schedule-id>

PUT

To add a schedule for a program to an application

The body is entirely the same as the existing API endpoint already documented, but the trigger specified in the body is different.

Request
{
  "name": "<name of the schedule>",
  "description": "<schedule description>",
  "program": {
    "programName": "<name of the program>",
    "programType": "WORKFLOW"
  },
  "properties": {
    ...
  },
  "constraints": [
    {
      ...
    },
    ...
  ],
  "trigger": {
	"type": "PROGRAM_STATUS",
	"statuses": ["<status of the program; derived from ProgramStatus, only COMPLETED, FAILING, KILLED>"],
	"program": {
		"namespace": "<namespace of the program>",
		"application": "<application of the program>",
		"applicationVersion": "<application version of the program>",
		"programType": "<type of the program>",
		"programName": "<name of the program>",
	}
  }

200 - On success

404 - When application is not available

409 - Schedule with the same name already exists

500 - Any internal errors

 

 

      

Deprecated REST API

  • There are no API routes to be deprecated.

CLI Impact or Changes

  • add program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> status <program-status> [or <program-status>] [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] [concurrency <concurrency>] [properties "optional map of properties"]
    +-> PUT /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>
    update program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> status <program-status> [or <program-status>] [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] [concurrency <concurrency>] [properties "optional map of properties"]
    +-> POST /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>/update

     

  • The namespace, application, and applicationVersion for the triggering program can all be optional in the CLI because they will inherit from the program that is being scheduled if they are not included.

UI Impact or Changes

  • Add ProgramSchedule options to the "Schedule" panel, along with a dropdown for pipeline selection and a dropdown for program status selection

Potential Mockup:

Note that the program statuses in screenshot should be a checkbox, allowing you to select multiple statuses.

Security Impact 

  • Can a program A be triggered by a program B if the user running program A does not have read access to program B?
  • If we allow cross namespace triggers, how will we make this secure?

Impact on Infrastructure Outages

  • Since this change does not change the existing Scheduler architecture, the outage impact is the same as what we have today.
  • Any authorization added to TMS will also affect this feature.

Test Scenarios

Test IDTest DescriptionExpected Results
1Configure a schedule to run every 5 minutes and always complete successfully. Configure a second schedule to run if the first schedule has succeeded.Both schedules run and finish successfully.
2Configure a schedule to fail. Configure a second schedule to run only if the first schedule has finished and has written the key-value pair "key", "value" in the workflow token.The first schedule should run. The second schedule should run and see "key", "value" in the workflow token.
3Configure a schedule to run for 10 seconds before it completes. Configure a second schedule to run only if the first schedule has succeeded. 500 milliseconds before the first schedule completes, delete the second schedule.The first schedule should run and finish successfully.
4Configure a schedule to run once and start at 9:50 PM. This schedule should finish in less than 10 minutes. Configure a second schedule to run only between 10:00 PM and 11:00 PM when the first schedule has been completed successfully.The first schedule should run and finish before 10:00 PM. The second schedule should not run

Open Questions

  1. Is there any meaning to starting a program manually (i.e by a user) that is dependent on other pipelines / programs? How should this behavior be handled?

Releases

Release 4.3.0

Related Work

Future work

 

 

  • No labels

40 Comments

  1. A program cannot trigger itself to start based on its own status

    Why? So a program cannot schedule itself to reprocess if it failed?

    1. Right, it introduces unnecessary complications as it could enter a cycle. There are others constraints and time triggers that are better suited to this sort of thing, especially when we support composite triggers.

      1. What are the complications?? You can have two programs to create a cycle as well.

        1. That's true. I was thinking that if a program had a runtime error and was continually restarting itself because it was failing, then it might cause some unexpected issues on our end.

          If that's not the case, then it may not be worth the extra logic to enforce this.

  2. Since DefaultStore is a central place where all program runners update their status, we can send a notification to TMS from the DefaultStore when it persists the program status.

    This is not good. We should be using TMS as the place to trigger all actions need to be taken when a program state change (e.g. update Run record, create a new job in scheduler, ... etc).

    1. If we don't send notification from DefaultStore but from program runners, then it's possible that the status change is not persisted but the schedule is triggered, and user will see an inconsistent state?

      1. The calling code should send the notification in the same transaction that the status is persisted.
        Both calls are transactional, so either both will happen or neither, right?

      2. What do you need from the store when the schedule is triggered?

  3.  

    • namespace: <namespace of the program>
    • application: <application of the program>
    • applicationVersion: <applicationVersion of the program>
    • programType: <type of the program>
    • programName: <name of the program>

     

    So basically this is the ProgramId

    1. Yes. That is how it will be implemented when we actually send the notification. We were just listing out all of the attributes that would be needed.

  4. Triggerable Program Status (Internal)

    This is already a ProgramStatus in cdap-api. Why need to define yet another one? The mapping could simply just logic inside the scheduler tms subscriber??

    1. I thought it would be a good idea since we have SchedulableProgramType and ProgramType to represent the program types that can be scheduled and all supported program types, respectfully.

      I was modelling that behavior with TriggerableProgramStatus and ProgramStatus as well since we only wanted to allow certain program statuses to be triggered.

      1. The SchedulableProgramType was a mistake and that's something we want to get rid of. Please check with Ali Anwar as well.

        1. +1
          Use ProgramType class instead.

        2. Ali Anwar Terence Yim: I see how SchedulableProgramType may have been a mistake. However, TriggerableProgramStatus may be needed because ProgramStatus is defined as:

          public enum ProgramStatus {
           INITIALIZING,
            RUNNING,
            COMPLETED,
            FAILED,
            KILLED
          }

          We only want programs to be triggered from a success (completed), failure (failed or killed), and finished (completed, failure, or killed). If we tell users to use ProgramStatus, then they will have access to the other statuses, and we will need extra logic to ensure that only those statuses are used. Furthermore, ProgramStatus does not have a state to represent complete or failure or killed. If a user wanted to trigger program B if program A completed successfully or was killed, we wouldn't be able to do that now. The trigger function definitions would have to change to accept a list of ProgramStatus objects instead of a single ProgramStatus to allow users to trigger programs if it completes or fails. I think that a TriggerableProgramStatus would be clearer for the user and easier to implement as it serves a functional difference from ProgramStatus.

           

          What do you guys think?

          1. Do we also want to be able to trigger upon a RUNNING status?

            1. No, there aren't really any use cases for this.

          2. So the actual over is the validation at the configure time to make sure the status passed in is not RUNNING, right? Because to provide multiple states, we just need to make the state parameter a vararg and it becomes more explicit if the user want to trigger on FINISHED, FAILED or KILL (or any combination of them) instead of having to lookup what does COMPLETED actually means (also, how to we expand the API to support combinations of them?)

            1. The API would just change to take in a set of ProgramTypes rather than just one. Then during trigger creation, just as we limit schedules to just workflows, we can throw an argument exception if program status triggers try to trigger based on ProgramStatus.INITIALIZING or ProgramStatus.RUNNING.

              Edit: By API, I meant the program status trigger definition internally. See my comment below for what I mean.

              1. The API can't just use vararg so that user doesn't need to pass in a set?

                1. The user won't pass in a set. Internally, the trigger would look at the argument `ProgramStatus... statuses` and store it like: `Set<ProgramStatus> programStatuses = new HashSet<>(Arrays.asList(statuses))`

                  This way, the user does not know that it is stored as a set. They can just pass in any number of ProgramStatuses, like you mentioned.

  5. For the ProgramStatusTrigger class, it should be extending from Trigger, right?

    1. Yes, will make that change.

  6. For the triggerOnProgramStatus methods, there should be a variant that doesn't take application and version, for triggering from program within the same application.

    1. What about the ability to trigger based upon program status, that are in other namespaces?
      Today, we have the ability to trigger based upon new partitions in a PartitionedFileSet dataset in another namespace.

      1. Agreed. So all together the "variants" would look something like this:

         

        /**
        * Create a schedule which is triggered based upon the specified program in a namespace, application, application
        * version, and a specific program status.
        *
        * @param programNamespace the namespace where this program is defined
        * @param application the name of the application where this program is defined
        * @param applicationVersion the version of the application
        * @param programType the type of the program, as supported by the system
        * @param program the name of the program
        * @param programStatus the status of the program to trigger the schedule
        * @return this {@link ScheduleBuilder}
        */
        ScheduleCreationSpec triggerOnProgramStatus(String programNamespace, String application, String applicationVersion,
         ProgramType programType, String program,
         ProgramStatus programStatus);

        /**
        * Creates a schedule which is triggered in the same application version.
        *
        * @see ScheduleBuilder#triggerOnProgramStatus(String, String, ProgramType, String, ProgramStatus)
        */
        ScheduleCreationSpec triggerOnProgramStatus(String programNamespace, String application, ProgramType programType,
         String program, ProgramStatus programStatus);

        /**
        * Creates a schedule which is triggered in the same application and application version.
        *
        * @see ScheduleBuilder#triggerOnProgramStatus(String, String, ProgramType, String, ProgramStatus)
        */
        ScheduleCreationSpec triggerOnProgramStatus(String programNamespace, ProgramType programType,
         String program, ProgramStatus programStatus);

        /**
        * Creates a schedule which is triggered in the same namespace, application, and application version.
        *
        * @see ScheduleBuilder#triggerOnProgramStatus(String, String, ProgramType, String, ProgramStatus)
        */
        ScheduleCreationSpec triggerOnProgramStatus(ProgramType programType, String program, ProgramStatus programStatus);
        1. I am not sure about the namespace one. Do we want to allow cross namespace trigger?? How are security / authorization comes into play?

  7. The "trigger"."type" in the REST API should be PROGRAM_STATUS instead, as it is the name of an enum, which can't have hyphen ( - ). 

  8. We should have a way to pass in run-time arguments/workflow tokens to the triggering program

    1. We already do this when launching the program. A schedule creates a Job, which has Notifications. A notification contains a mapping of run-time arguments.

      1. Are these runtime args static? For example if pipeline-A writes a file that should be the input to pipeline-B. How would that work?

        1. I think Sree's suggesting that more information about the triggering program should be passed to the newly launched program: for instance, the runtime args of the triggering program.

          1. Ah, yes I misunderstood. Yes, this is something that could be done. We would need to have ProgramOptions's runtime arguments from the program runner's context sent in the the notification to TMS.

            I'm not sure specifically if we have information like generated output files of a pipeline, but if it's accessible, then that information could also be sent in a similar manner.

  9. Failed: A program ends with ProgramStatus.FAILED

    So KILLED does not count as Failure? Is KILLED equivalent to "killed through CDAP"? Or do programs killed by Yarn due to memory consumption also get status KILLED?

    1. Sorry, what you quoted is a typo. That should say: "Failed: A program ends with ProgramStatus.FAILED or ProgramStatus.KILLED", as it does in the enum definition for TriggerableProgramStatus. Our intention was for TriggerableProgramStatus.FAILED to represent not success, killed through CDAP, and killed by Yarn.

  10. Should capture the source of how the program got triggered (preferably in metadata store), will be useful to have this information in production. Users would like to know if a program is triggered by a time based schedule, program based schedule or an event based schedule. 

    1. Yes, I'll open a separate PR for this

  11. What happens when a program is triggered by a time based and program state based trigger. Will we run both? In this case, perhaps we should document the assumptions clearly on what will happen in this scenario.

    1. This depends on whether concurrent runs constraint is set for a schedule. If it's not set, then the program can be launched concurrently

  12. For the workflow token constraint, you can refer to ConstraintProgramScheduleBuilder class. Likewise, create another interface extending ScheduleBuilder with methods {{withWorkflowKeyPresent}} and etc. Let triggerOnProgramStatus return this new schedule builder class so that the workflow token constraint can only be applied on schedules with program status trigger. However, I think instead of making workflow token conditions as constraints, they can also be part of the trigger. Only when both the program status and the workflow token conditions are met, the schedule should be triggered.