Page tree
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 42 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The current scheduler can set triggers based on time and data partitions. We wish to add a new trigger based on program statuses.

Goals

  • Triggers: Pipelines should be started based on the program status of another pipeline.
  • Constraints: Existing scheduling constraints should work with this new program status trigger.

User Stories

  1. As a user, I would like to be able to start a ML workflow after an ingestion workflow has succeeded.
  2. As a user, I would like to be able to start an error handling workflow when an analytics workflow has failed.
  3. As a user, I would like to be able to start a compaction workflow if and only if a workflow has finished, with a time window constraint for the compaction workflow.
  4. As a user, I would like to be able to start workflow B if and only if workflow A successfully completed and dataset C has new partitions. (Stretch)
  5. As a user, I would like to be able to start workflow B if and only if workflow A successfully completed and dataset C has 100 MB of new data (Stretch)
  6. As a user, I would like to start workflow B if workflow A resulted in a lot of errors. (Stretch)

Design

Assumptions

  • Since schedulers can only schedule workflows, this trigger will also only be triggered for workflows. If this changes in the future, this design can adapt to account for this.
  • Let us define the following program statuses to be used for triggers: Successful, Failed, Finished, defined as:
    • Successful:  A program ends with ProgramStatus.COMPLETED
    • Failed: A program ends with ProgramStatus.FAILED or ProgramStatus.KILLED
    • Finished: A program ends with ProgramStatus.COMPLETED, ProgramStatus.FAILED, or ProgramStatus.KILLED
  • All combinations of existing constraints are possible.

Approach

  • Implementing Program Status Based Scheduling is a two step process:
    1. We need to determine when the program status has changed.
    2. Based on the program status change, the scheduler must schedule the program.

Both of these steps will be covered below.

1. Determining a Program State Change:

  • Whenever a program status changes, we can send program status based notifications to the Transactional Messaging System (TMS).
  • These messages can be emitted to topic program.status.event.topic in TMS.
  • The approach here will be to send a notification whenever the program status changes are persisted.
  • Since DefaultStore is a central place where all program runners update their status, we can send a notification to TMS from the DefaultStore when it persists the program status.

Concerns:

  • What happens if a YARN container crashes?
    • Response: We already monitor running programs and mark the status as failed when it crashes.

2. Scheduling Based on the Program Status Change:

  • The scheduler will subscribe to the program status notifications with topic program.status.event.topic

  • Based on the notification received, it will add a new job to the job queue.
  • The existing scheduler implementation will handle data persistence and starting the job when all of its constraints have been satisfied, so the rest of the scheduler implementation remains unchanged and will work with the proposed changes.

TMS Message Design:

For a Program Status message, the notification will contain:

  • programId: <the program id>
  • programStatus: <the program status>
  • programRunId: <the specific run of a program>

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Triggerable Program Status (Internal)

TriggerableProgramStatus
public enum TriggerableProgramStatus {
	SUCCESSFUL, // A program ends with ProgramStatus.COMPLETED
	FAILED, //  A program ends with ProgramStatus.FAILED or ProgramStatus.KILLED
	FINISHED // A program ends with ProgramStatus.COMPLETED, ProgramStatus.FAILED, or ProgramStatus.KILLED
};
Triggers
public class ProgramStatusTrigger extends Trigger {
  private final ProgramId programId;
  private final TriggerableProgramStatus status;
 
  public ProgramStatusTrigger(ProgramId programId, TriggerableProgramStatus status) {
	this.programId = programId;
	this.status = status;
  }
}

External Methods (in ScheduleBuilder)

Trigger Definition
public class ScheduleBuilder {
	... 
 
	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, String applicationVersion,
			                                              ProgramType programType, String program,
			                                              TriggerableProgramStatus programStatus);

	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, String application, ProgramType programType,
                                               			  String program, TriggerableProgramStatus  programStatus);
	public ScheduleCreationBuilder triggerOnProgramStatus(String programNamespace, ProgramType programType,
			                                              String program, TriggerableProgramStatus programStatus);
 
	public ScheduleCreationBuilder triggerOnProgramStatus(ProgramType programType, String program,
                                               			  TriggerableProgramStatus  programStatus);
};

 

Deprecated Programmatic APIs

There are none.

New REST APIs

PathMethodDescriptionRequest BodyResponse CodeResponse
/v3/namespaces/<namespace-id>/apps/<app-id>/schedules/<schedule-id>

PUT

To add a schedule for a program to an application

The body is entirely the same as the existing API endpoint already documented, but the trigger specified in the body is different.

Request
{
  "name": "<name of the schedule>",
  "description": "<schedule description>",
  "program": {
    "programName": "<name of the program>",
    "programType": "WORKFLOW"
  },
  "properties": {
    ...
  },
  "constraints": [
    {
      ...
    },
    ...
  ],
  "trigger": {
	"type": "PROGRAM_STATUS",
	"status": "<status of the program; one of SUCCESSFUL, FAILED, or FINISHED>",
	"program": {
		"namespace": "<namespace of the program>",
		"application": "<application of the program>",
		"applicationVersion": "<application version of the program>",
		"programType": "<type of the program; only WORKFLOW is supported for now.>",
		"programName": "<name of the program>",
	}
  }

200 - On success

404 - When application is not available

409 - Schedule with the same name already exists

500 - Any internal errors

 

 

      

Deprecated REST API

  • There are no API routes to be deprecated.

CLI Impact or Changes

  • add program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] status <program-status> [concurrency <concurrency>] [properties "optional map of properties"]
    +-> PUT /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>
    update program schedule <schedule-name> for <program-type> <program> [version <version>] [description "optional"] after program <program-name> type <program-type> [namespace <program-namespace>] [application <program-application>] [version <program-application-version>] status <program-status> [concurrency <concurrency>] [properties "optional map of properties"]
    +-> POST /v3/namespaces/default/apps/<app-id>/versions/<app-version>/schedules/<schedule-id>/update

     

  • The namespace, application, and applicationVersion for the triggering program can all be optional in the CLI because they will inherit from the program that is being scheduled if they are not included.

UI Impact or Changes

  • Add ProgramSchedule options to the "Schedule" panel, along with a dropdown for pipeline selection and a dropdown for program status selection

Potential Mockup:

Security Impact 

  • There is no notification security currently. 
  • Can a program A be triggered by a program B if the user running program A does not have read access to program B?
  • If we allow cross namespace triggers, how will we make this secure?

Impact on Infrastructure Outages

  • Since this change does not change the existing Scheduler architecture, the outage impact is the same as what we have today.

Test Scenarios

Test IDTest DescriptionExpected Results
1Configure a schedule to run every 5 minutes and always complete successfully. Configure a second schedule to run if the first schedule has succeeded.Both schedules run and finish successfully.
2Configure a schedule to fail. Configure a second schedule to run only if the first schedule has succeeded.The first schedule should run. The second schedule should not run.
3Configure a schedule to run for 10 seconds before it completes. Configure a second schedule to run only if the first schedule has succeeded. 500 milliseconds before the first schedule completes, delete the second schedule.The first schedule should run and finish successfully.
4Configure a schedule to run once and start at 9:50 PM. This schedule should finish in less than 10 minutes. Configure a second schedule to run only between 10:00 PM and 11:00 PM when the first schedule has been completed successfully.The first schedule should run and finish before 10:00 PM. The second schedule should not run

Open Questions

  1. Is there a use case for configuring a schedule to start program B when program A has "Started"?
    1. Resolved: No there is not a use case.
  2. Should we have a user-accessible class to encapsulate a program, rather than passing in namespace, application, applicationVersion, programType, and programName to encapsulate a program?

Work Items

  • Notifications

    • Name topic in TMS for ProgramSchedulerStatuses

      • At central location for all programs for all ProgramSchedulerStatuses
    • Send program status notifications through TMS

  • API Specification
    • Parse new ProgramStateTrigger from a request
  • Scheduler
    • Add new thread to subscribe to ProgramStateNotifications from TMS and create schedule with new trigger
    • Add to ProgramScheduleStore and ensure atomicity of events
    • Ensure event keys are still unique for Program State Triggers
  • Tests
    • Need long running test to test series of pipelines starting from statuses of each other
    • Plus other integration tests
  • UI
    • Fetch all TriggerableProgramStatuses
    • Fetch all programs for user to choose from in a pipeline
    • Design multiple tabs to pick between different triggers
  • Documentation
    • Add examples for how to build the trigger

Releases

Release 4.3.0

  • Implementing program status based scheduling to schedule other workflows is a fundamental feature in this release, but the refactoring of the store to subscribe to the TMS notifications to know when to persist the program status changes, rather than making HTTP requests, may be a change for a later time.

Related Work

Future work

 

 

  • No labels