Page tree
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 30 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The current scheduler can set triggers based on time and data partitions. We wish to add a new trigger based on program states.

Goals

  • Triggers: Programs or pipelines should be started based on the program status of another program or pipeline.
  • Constraints: Existing scheduling constraints should work with this new program status trigger.

User Stories

Note: The "program" denoted below in the user stories could be a pipeline or any custom program.

  1. As a user, I would like to be able to start a ML program after an ingestion program has succeeded.
  2. As a user, I would like to be able to start an error handling program when a program fails.
  3. As a user, I would like to be able to start a compaction program if and only if a program has finished, with a time window constraint for the compaction program.
  4. As a user, I would like to be able to start program B if and only if program A successfully completed and dataset C has new partitions. (Stretch)
  5. As a user, I would like to be able to start program B if and only if program A successfully completed and dataset C has 100 MB of new data (Stretch)
  6. As a user, I would like to start program B if program A resulted in a lot of errors. (Stretch)

Design

Assumptions

  • The program specified in the user stories can be a pipeline or any custom program.

  • We are assuming that that the program state based scheduling can take in the following program states: Successful, Failed, Finished.
  • All combinations of existing constraints are possible.

Approach

  • Implementing Program State Based Scheduling is a two step process:
    1. We need to determine when the program state has changed.
    2. Based on the program state change, the scheduler can schedule the program.

Both of these steps will be covered below.

1. Determining a Program State Change:

  • Whenever a program state changes, we can send program state based notifications to the Transactional Messaging System (TMS).
  • Program states are maintained both in memory and in HBase. The program states become permanent when they are persisted to HBase.
  • The approach here will be to send a notification whenever these program states are persisted.
  • Since DefaultStore is a central place where all program runners update their state, we can modify the DefaultStore's setStart and setStop methods which write to HBase to also send a notification to TMS.
  • These messages can be emitted to topic program.state.topic in TMS.

Concerns:

  • What happens if YARN container dies?
    • Response: We already monitor running programs and mark their states as failed on crash. This already takes care of sending the notification.

2. Scheduling Based on the Program State Change:

  • The scheduler will subscribe to the program state notifications with topic program.state.topic

  • Based on the notification received, it will add a new job to the job queue.
  • The existing scheduler implementation will handle processing the job's constraints and starting the job, so the rest of the scheduler implementation remains unchanged.

TMS Message Design:

For a Program State message, the notification will contain:

  • namespace: <namespace of the program>
  • application: <application of the program>
  • applicationVersion: <applicationVersion of the program>
  • programType: <type of the program>
  • programName: <name of the program>
  • programStatus <of type TriggerableProgramStatus defined below>

  • timestamp: <when the program state change occurred>

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Triggerable Program Status (Internal Java API)

TriggerableProgramStatus
public enum TriggerableProgramStatus {
	SUCCESSFUL, // Indicates when a program has ran successfully
	FAILED, // Indicates when a program has failed
	FINISHED // Indicates when a program has finished (either successfully or failed or unexpectedly killed)
};
Triggers
public class ProgramStatusTrigger {
  private final ProgramId programId;
  private final TriggerableProgramStatus status;
 
  public ProgramStatusTrigger(ProgramId programId, TriggerableProgramStatus status) {
	this.programId = programId;
	this.status = status;
  }
}

External Methods (in ScheduleBuilder)

Trigger Definition
public class ScheduleBuilder {
	... 

	public ScheduleCreationSpec triggerOnProgramStatus(String application,
		ProgramType programType, String programName, TriggerableProgramStatus programStatus);
 
	public ScheduleCreationSpec triggerOnProgramStatus(String application, String applicationVersion, 
		ProgramType programType, String programName, TriggerableProgramStatus programStatus);
};

 

Deprecated Programmatic APIs

There are none.

New REST APIs

PathMethodDescriptionRequest BodyResponse CodeResponse
/v3/namespaces/<namespace-id>/apps/<app-id>/schedules/<schedule-id>

PUT

To add a schedule for a program to an application

The body is entirely the same as the existing API endpoint already documented, but the trigger specified in the body is different.

Request
{
  "name": "<name of the schedule>",
  "description": "<schedule description>",
  "program": {
    "programName": "<name of the program>",
    "programType": "WORKFLOW"
  },
  "properties": {
    "<key>": "<value>",
    ...
  },
  "constraints": [
    {
      "type": "<constraint type>",
      "waitUntilMet": <boolean>,
      ...
    },
    ...
  ],
  "trigger": {
	"type": "PROGRAM-STATE",
	"status": "<status of the program; one of SUCCESSFUL, FAILED, or FINISHED>",
	"program": {
		"namespace": "<namespace of the program>",
		"application": "<application of the program>",
		"applicationVersion": "<application version of the program>",
		"programType": "<type of the program>",
		"programName": "<name of the program>",
	}
  }

200 - On success

404 - When application is not available

409 - Schedule with the same name already exists

500 - Any internal errors

 

 

Deprecated REST API

  • There are no API routes to be deprecated.

CLI Impact or Changes

  • Add CLI handlers for adding new ProgramSchedule. Currently only exists for TimeScheduling.

UI Impact or Changes

  • Add ProgramSchedule options to the "Schedule" panel, along with a dropdown for pipeline selection and a dropdown for program status selection

Potential Mockup:

Security Impact 

  • There is no notification security currently. 
  • Question: Can a program A be triggered by a program B if the user running program A does not have read access to program B?

Impact on Infrastructure Outages

  • Since this change requires an expansion on an existing Scheduler system, the impact is the same as what we have today.

Test Scenarios

Test IDTest DescriptionExpected Results
1Configure a schedule to run every 5 minutes and always complete successfully. Configure a second schedule to run if the first schedule has succeeded.Both schedules run and finish successfully.
2Configure a schedule to fail. Configure a second schedule to run only if the first schedule has succeeded.The first schedule should run. The second schedule should not run.
3Configure a schedule to run for 10 seconds before it completes. Configure a second schedule to run only if the first schedule has succeeded. 500 milliseconds before the first schedule completes, delete the second schedule.The first schedule should run and finish successfully.
4Add a Spark program (or any other type of program besides another workflow). Configure a schedule to run if the Spark program has succeeded. Run the Spark program.The Spark program runs, then the schedule starts.
5Configure a schedule to run once and start at 9:50 PM. This schedule should finish in less than 10 minutes. Configure a second schedule to run only between 10:00 PM and 11:00 PM when the first schedule has been completed successfully.The first schedule should run and finish before 10:00 PM. The second schedule should not run
6Configure a schedule to run once. Configure a second schedule to run when there are five partitions and when the first schedule has succeeded. Add three partitions. Start the first schedule. Add two more partitions.Both schedules should start and finish.

Open Questions

  1. Is there a use case for configuring a schedule to start program B when program A has "Started"?
  2. Should we have a user-accessible class to encapsulate a program, rather than passing in namespace, application, applicationVersion, programType, and programName to encapsulate a program?

Work Items

  • Notifications

    • Name topic in TMS for ProgramSchedulerStates

    • Send program state notifications through TMS

      • At central location for all programs for all ProgramSchedulerStates
  • API Specification
    • Parse new ProgramStateTrigger from a request
  • Scheduler
    • Add new thread to subscribe to ProgramStateNotifications from TMS and create schedule with new trigger
    • Add to ProgramScheduleStore and ensure atomicity of events
    • Ensure event keys are still unique for Program State Triggers
  • Tests
    • Need long running test to test series of pipelines starting from statuses of each other
    • Plus other integration tests
  • UI
    • Fetch all ProgramScheduleTypes
    • Fetch all programs for user to choose from in a pipeline
    • Design multiple tabs to pick between different triggers
  • Documentation
    • Add examples for how to build the trigger

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

Future work

 

 

  • No labels