Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
CDAP data pipelines currently allows time based triggering of pipelines. Having only the time based trigger to start the pipelines has a few drawbacks - this doesn’t allow for logically separating and operating a lot of smaller pipelines which can depend on each other. Even if the pipeline can be separated into a smaller set of pipelines which can be triggered periodically, the first step in the pipeline has to check if the rest of the pipeline needs to be executed. There is a need to trigger pipelines based on states of a different pipeline.
Use Cases
FTP Download use-case
As an ETL developer, I would like to process bloomberg and reuters feeds. This feed is downloaded from an FTP site and is already being processed by another team and stored in a staging area. I do not wish to download the feed again, but would like to run my pipeline as soon as the feed data is available in the staging area. To run my pipeline, I would like to know the list of files that are freshly downloaded.
As an ETL developer, consuming the bloomberg and reuters feed, I do not wish to reprocess the same feed multiple times. I would like to process the feed only if the upstream pipeline has successfully completed.
Gold dataset consumption use-case
As an ETL developer, I would like to read a golden data set that is created from another team. The gold data is read from a database incrementally, cleansed, anonymized and standardized by our IT ops team via ETL process. I wish to consume the data that is generated by the IT ops team and do not wish to rerun the standardization process myself since that might change. I only wish to consume the golden data set that is created for my purposes and as an input for my pipeline. I would like to know the directory location of the golden data set. I would like to run the pipeline as soon as the new dataset is available.
As an ETL developer, I would like to know what feeds were responsible for creating the golden dataset, since I have to look up metadata for some feeds in my pipeline. I have configured the feeds that are present in the golden data set as a runtime argument.
As an ETL developer, I would like to reprocess my pipeline that consumes golden dataset automatically up to two times if it has failed. If the re-processing has failed more than twice, then I do not wish to re-process further.
High Level Requirements
Platform
Platform should allow event notification for all program states
Event notification (EN) should be published to a system topic in TMS
Event notification should have a well defined event structure
Event notification should be published for all types of programs on Completion, failure and if killed by YARN
Event notification should have a payload to pass in information from one program to another
Pipelines should be configured to be triggered based on event from another program
Pipelines should be triggered based on either
Another Pipeline state
Another Pipeline state and payload
Conditions can be specified either if certain key exists, or if key equals a value or if key matches a condition (ex: runcount < 2)
Data pipeline
Users should be able to configure what parameters are emitted in the event payload from a pipeline. Hence the plugin must expose what parameters are emitted. Example: file-path, files-processed
Users configuring a pipeline (Golden Feed Consumer) should be able to select the fields from upstream pipeline (Gold Feed Generator) and set the the run time arguments needed for the current pipeline (Golden Feed Consumer)
Plugin
Plugins should expose what parameters it can expose to the event notification
The parameters that can be exposed is defined in pipeline configuration stage (pipeline studio) and not at runtime
UI
Capabilities to add event based triggers in studio mode
Capabilities to specify condition for triggering pipeline (ex: files.processed from Gold Feed Generator should not be empty)
Capabilities to select what fields are published in event notification
Design
Assumptions
- When we say "program" this refers to any program that can be scheduled. While the primary use case may be starting other pipelines after one pipeline finishes, MapReduce and Spark jobs can also be started.
- We will only allow programs to be triggered with any combination of the following program statuses: ProgramStatus.COMPLETED, ProgramStatus.FAILED, and ProgramStatus.KILLED.
- There is no known use case for triggering a program when another program is ProgramStatus.INITIALIZING or ProgramStatus.RUNNING, so this will be restricted to prevent unexpected behavior.
- All combinations of existing constraints are possible.
Approach
- Implementing Program Status Based Scheduling is a two step process:
1. We need to determine when the program status has changed.
2. Based on the program status change, the scheduler must schedule the program.
Both of these steps will be covered below.
1. Determining a Program State Change:
- Whenever a program status changes, we can send program status based notifications to the Transactional Messaging System (TMS).
- These messages can be emitted to topic
program.status.event.topic
within TMS. - The approach here will be to send a notification whenever the program status changes are persisted to the store. This occurs in each program runner.
Concerns:
- What happens if a YARN container crashes?
- Response: We already monitor running programs and mark the status as failed when it crashes.
2. Scheduling Based on the Program Status Change:
The scheduler will subscribe to the program status notifications with topic
program.status.event.topic
- Based on the notification received, it will add a new job to the job queue.
- The existing scheduler implementation will handle data persistence and starting the job when all of its constraints have been satisfied, so the rest of the scheduler implementation remains unchanged and will work with the proposed changes.
Program Status Notification Payload Design:
Every time a program's status changes, the following will be sent in a payload through TMS (not used for scheduling, but the inherent design):
- programRunId <the specific run of a program>
- twillRunId: <only in distributed mode>
- programRunStatus <the updated run status of the program>
- stateChangeTime: <when the program has transitioned to starting, running, or 'terminal' state
- userArguments and systemArguments: <only when the program has transition to starting>
Program Status Scheduling Notification Payload Design:
The program status notifications received from TMS will be forwarded to the scheduler if and only if that program status notification can trigger another program. The following parameters will be passed in this notification between a triggering program and a triggered program:
- all userArguments (runtimeArguments) of the triggering program with no renaming (renaming to be done later)
- all USER-scoped workflow token key-value pairs of the triggering program with no renaming (renaming to be done later)
- triggering program run properties (programRunId, startTs, runTs, stopTs)
API changes
New Programmatic APIs
New Java APIs introduced (both user facing and internal)
External Methods (in ScheduleBuilder)
public class ScheduleBuilder { ... public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, String applicationVersion, ProgramType programType, String program, ProgramStatus... programStatuses); // Assume same application version as starting program public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, ProgramType programType, String program, ProgramStatus... programStatuses); // Assume same application name and version as starting program public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, ProgramType programType, String program, ProgramStatus... programStatuses); // Assume same namespace, application, and application version as starting program public ScheduleCreationBuilder triggerOnProgramStatus(ProgramType programType, String program, ProgramStatus... programStatuses); };
There needs to be a way to restrict these defined constraints to only schedules triggered by a program status (specific program status constraints)
Deprecated Programmatic APIs
There are none.
New REST APIs
Path | Method | Description | Request Body | Response Code | Response |
---|---|---|---|---|---|
PUT | To add a schedule for a program to an application | The body is entirely the same as the existing API endpoint already documented, but the trigger specified in the body is different. Request { "name": "<name of the schedule>", "description": "<schedule description>", "program": { "programName": "<name of the program>", "programType": "WORKFLOW" }, "properties": { ... }, "constraints": [ { ... }, ... ], "trigger": { "type": "PROGRAM_STATUS", "statuses": ["<status of the program; derived from ProgramStatus, only COMPLETED, FAILING, KILLED>"], "program": { "namespace": "<namespace of the program>", "application": "<application of the program>", "applicationVersion": "<application version of the program>", "programType": "<type of the program>", "programName": "<name of the program>", } } | 200 - On success 404 - When application is not available 409 - Schedule with the same name already exists 500 - Any internal errors |
| |
Deprecated REST API
- There are no API routes to be deprecated.
CLI Impact or Changes
add program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> status <program-status> [or <program-status>] [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] [concurrency <concurrency>] [properties "optional map of properties"]
+-> PUT /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>
update program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> status <program-status> [or <program-status>] [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] [concurrency <concurrency>] [properties "optional map of properties"]
+-> POST /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>/update
- The namespace, application, and applicationVersion for the triggering program can all be optional in the CLI because they will inherit from the program that is being scheduled if they are not included.
UI Impact or Changes
- Add ProgramSchedule options to the "Schedule" panel, along with a dropdown for pipeline selection and a dropdown for program status selection
Potential Mockup:
Note that the program statuses in screenshot should be a checkbox, allowing you to select multiple statuses.
Security Impact
- Can a program A be triggered by a program B if the user running program A does not have read access to program B?
- If we allow cross namespace triggers, how will we make this secure?
Impact on Infrastructure Outages
- Since this change does not change the existing Scheduler architecture, the outage impact is the same as what we have today.
Any authorization added to TMS will also affect this feature.
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
1 | Configure a schedule to run every 5 minutes and always complete successfully. Configure a second schedule to run if the first schedule has succeeded. | Both schedules run and finish successfully. |
2 | Configure a schedule to fail. Configure a second schedule to run only if the first schedule has finished and has written the key-value pair "key", "value" in the workflow token. | The first schedule should run. The second schedule should run and see "key", "value" in the workflow token. |
3 | Configure a schedule to run for 10 seconds before it completes. Configure a second schedule to run only if the first schedule has succeeded. 500 milliseconds before the first schedule completes, delete the second schedule. | The first schedule should run and finish successfully. |
4 | Configure a schedule to run once and start at 9:50 PM. This schedule should finish in less than 10 minutes. Configure a second schedule to run only between 10:00 PM and 11:00 PM when the first schedule has been completed successfully. | The first schedule should run and finish before 10:00 PM. The second schedule should not run |
Open Questions
- Is there any meaning to starting a program manually (i.e by a user) that is dependent on other pipelines / programs? How should this behavior be handled?
Releases
Release 4.3.0
Related Work
Future work
40 Comments
Terence Yim
Why? So a program cannot schedule itself to reprocess if it failed?
Sameet Sapra
Right, it introduces unnecessary complications as it could enter a cycle. There are others constraints and time triggers that are better suited to this sort of thing, especially when we support composite triggers.
Terence Yim
What are the complications?? You can have two programs to create a cycle as well.
Sameet Sapra
That's true. I was thinking that if a program had a runtime error and was continually restarting itself because it was failing, then it might cause some unexpected issues on our end.
If that's not the case, then it may not be worth the extra logic to enforce this.
Terence Yim
This is not good. We should be using TMS as the place to trigger all actions need to be taken when a program state change (e.g. update Run record, create a new job in scheduler, ... etc).
Chengfeng Mao
If we don't send notification from DefaultStore but from program runners, then it's possible that the status change is not persisted but the schedule is triggered, and user will see an inconsistent state?
Ali Anwar
The calling code should send the notification in the same transaction that the status is persisted.
Both calls are transactional, so either both will happen or neither, right?
Terence Yim
What do you need from the store when the schedule is triggered?
Terence Yim
So basically this is the
ProgramId
Sameet Sapra
Yes. That is how it will be implemented when we actually send the notification. We were just listing out all of the attributes that would be needed.
Terence Yim
This is already a
ProgramStatus
in cdap-api. Why need to define yet another one? The mapping could simply just logic inside the scheduler tms subscriber??Sameet Sapra
I thought it would be a good idea since we have SchedulableProgramType and ProgramType to represent the program types that can be scheduled and all supported program types, respectfully.
I was modelling that behavior with TriggerableProgramStatus and ProgramStatus as well since we only wanted to allow certain program statuses to be triggered.
Terence Yim
The
SchedulableProgramType
was a mistake and that's something we want to get rid of. Please check with Ali Anwar as well.Ali Anwar
+1
Use ProgramType class instead.
Sameet Sapra
Ali Anwar Terence Yim: I see how SchedulableProgramType may have been a mistake. However, TriggerableProgramStatus may be needed because ProgramStatus is defined as:
We only want programs to be triggered from a success (completed), failure (failed or killed), and finished (completed, failure, or killed). If we tell users to use ProgramStatus, then they will have access to the other statuses, and we will need extra logic to ensure that only those statuses are used. Furthermore, ProgramStatus does not have a state to represent complete or failure or killed. If a user wanted to trigger program B if program A completed successfully or was killed, we wouldn't be able to do that now. The trigger function definitions would have to change to accept a list of ProgramStatus objects instead of a single ProgramStatus to allow users to trigger programs if it completes or fails. I think that a TriggerableProgramStatus would be clearer for the user and easier to implement as it serves a functional difference from ProgramStatus.
What do you guys think?
Ali Anwar
Do we also want to be able to trigger upon a RUNNING status?
Sameet Sapra
No, there aren't really any use cases for this.
Terence Yim
So the actual over is the validation at the configure time to make sure the status passed in is not RUNNING, right? Because to provide multiple states, we just need to make the state parameter a vararg and it becomes more explicit if the user want to trigger on FINISHED, FAILED or KILL (or any combination of them) instead of having to lookup what does COMPLETED actually means (also, how to we expand the API to support combinations of them?)
Sameet Sapra
The API would just change to take in a set of ProgramTypes rather than just one. Then during trigger creation, just as we limit schedules to just workflows, we can throw an argument exception if program status triggers try to trigger based on ProgramStatus.INITIALIZING or ProgramStatus.RUNNING.
Edit: By API, I meant the program status trigger definition internally. See my comment below for what I mean.
Terence Yim
The API can't just use vararg so that user doesn't need to pass in a set?
Sameet Sapra
The user won't pass in a set. Internally, the trigger would look at the argument `ProgramStatus... statuses` and store it like: `Set<ProgramStatus> programStatuses = new HashSet<>(Arrays.asList(statuses))`
This way, the user does not know that it is stored as a set. They can just pass in any number of ProgramStatuses, like you mentioned.
Terence Yim
For the
ProgramStatusTrigger
class, it should be extending fromTrigger
, right?Sameet Sapra
Yes, will make that change.
Terence Yim
For the
triggerOnProgramStatus
methods, there should be a variant that doesn't take application and version, for triggering from program within the same application.Ali Anwar
What about the ability to trigger based upon program status, that are in other namespaces?
Today, we have the ability to trigger based upon new partitions in a PartitionedFileSet dataset in another namespace.
Sameet Sapra
Agreed. So all together the "variants" would look something like this:
Terence Yim
I am not sure about the namespace one. Do we want to allow cross namespace trigger?? How are security / authorization comes into play?
Terence Yim
The "trigger"."type" in the REST API should be
PROGRAM_STATUS
instead, as it is the name of an enum, which can't have hyphen (-
).Sreevatsan Raman
We should have a way to pass in run-time arguments/workflow tokens to the triggering program
Sameet Sapra
We already do this when launching the program. A schedule creates a Job, which has Notifications. A notification contains a mapping of run-time arguments.
Sreevatsan Raman
Are these runtime args static? For example if pipeline-A writes a file that should be the input to pipeline-B. How would that work?
Ali Anwar
I think Sree's suggesting that more information about the triggering program should be passed to the newly launched program: for instance, the runtime args of the triggering program.
Sameet Sapra
Ah, yes I misunderstood. Yes, this is something that could be done. We would need to have ProgramOptions's runtime arguments from the program runner's context sent in the the notification to TMS.
I'm not sure specifically if we have information like generated output files of a pipeline, but if it's accessible, then that information could also be sent in a similar manner.
Andreas Neumann
So KILLED does not count as Failure? Is KILLED equivalent to "killed through CDAP"? Or do programs killed by Yarn due to memory consumption also get status KILLED?
Sameet Sapra
Sorry, what you quoted is a typo. That should say: "Failed: A program ends with ProgramStatus.FAILED or ProgramStatus.KILLED", as it does in the enum definition for TriggerableProgramStatus. Our intention was for TriggerableProgramStatus.FAILED to represent not success, killed through CDAP, and killed by Yarn.
Sreevatsan Raman
Should capture the source of how the program got triggered (preferably in metadata store), will be useful to have this information in production. Users would like to know if a program is triggered by a time based schedule, program based schedule or an event based schedule.
Chengfeng Mao
Yes, I'll open a separate PR for this
Sreevatsan Raman
What happens when a program is triggered by a time based and program state based trigger. Will we run both? In this case, perhaps we should document the assumptions clearly on what will happen in this scenario.
Chengfeng Mao
This depends on whether concurrent runs constraint is set for a schedule. If it's not set, then the program can be launched concurrently
Chengfeng Mao
For the workflow token constraint, you can refer to
ConstraintProgramScheduleBuilder
class. Likewise, create another interface extendingScheduleBuilder
with methods {{withWorkflowKeyPresent}} and etc. LettriggerOnProgramStatus
return this new schedule builder class so that the workflow token constraint can only be applied on schedules with program status trigger. However, I think instead of making workflow token conditions as constraints, they can also be part of the trigger. Only when both the program status and the workflow token conditions are met, the schedule should be triggered.