Page tree
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 28 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The current scheduler has limited trigger types and constraints. We wish to add more configuration by chaining pipelines and programs based on the states of programs, as well as impose more scheduler constraints.

Goals

  • Triggers: Other programs or pipelines should be started based on the program status of another program or pipeline.
  • Constraints: Additional scheduling constraints should work with this new program status trigger.

User Stories 

Note: The "program" denoted below in the user stories could be a pipeline or any custom program. See the Assumptions section below for more details.

  1. As a user, I would like to be able to start program B if and only if program A successfully completed.
  2. As a user, I would like to be able to start program C if and only if program A failed.
  3. As a user, I would like to be able to start program B if and only if program A has failed, with a time window constraint for program B.
  4. As a user, I would like to be able to start program B if and only if program A successfully completed and dataset C has new partitions.
  5. As a user, I would like to be able to start program B if and only if program A successfully completed and dataset C has 100 MB of new data (Stretch)
  6. As a user, I would like to start program B if program A resulted in a lot of errors. (Stretch)

Design

Assumptions

  • The program specified in the user stories can be a pipeline or any custom program.

  • The program has to be in the same namespace.
  • We are assuming that that the program state based scheduling can take in the following program states: Running, Completed, Failed, Killed.
  • All combinations of existing constraints are possible.

Approach

  • Implementing Program State Based Scheduling is a two step process:
    1. We need to send program state based notifications to Transactional Messaging System (TMS) whenever the program status has been changed.
    2. We need to subscribe to these type of notifications in the scheduler and create a schedule based on this program state change.

Both of these steps will be covered below.

1. Sending ProgramStateNotifications to TMS:

  • Program States are maintained both in memory and in HBase. The program states become permanent when they are persisted to HBase.
  • The best approach here is to send a notification whenever these program states are persisted.
  • Program States are persisted to HBase soon after they are updated in memory, so the time delay between the state update and the message to TMS is minimal.
  • We can modify the DefaultStore's setStart and setStop methods (and add our own methods for other program statuses) which write to HBase to also send a notification to TMS.
  • This is a good solution because it is a central place where all program executors update their state.
  • These messages can be emitted to a program state topic in TMS.

Addressing Drawbacks:

  • Program execution may take longer than before because of these TMS message updates.
    • Response: Metric logging for program execution is sent through TMS already so these small notifications will not be as noticeable.
  • What happens if YARN container dies?
    • Response: We already monitor this and update the program state in memory (and persist to HBase). We can simply use this and tag on a notification to TMS.

2. Scheduling a Program:

  • A thread in the Scheduler will subscribe to these program state notifications.

  • The Scheduler needs to create or update a "Job" and maintain a list of jobs. A job is composed of:
    • ProgramSchedule
    • Trigger
    • Notifications
    • JobKey (since a job is persisted)
    • State (pending trigger, pending constraint, pending launch)
  • The scheduler needs to have a constraint checker component that monitors the job queue and determines if any job has all of its constraints satisfied.
  • When a job has satisfied all of its constraints, it is removed from the set of pending jobs (the job queue), the state is updated, and the job is sent to cdap-master to be executed.

The description above was a high level overview of the architecture. See the parent wiki for the architecture diagram and the full explanation.

TMS Message Design:

For a Program State message, the notification will contain:

  • namespace: <namespace of the program>
  • application: <application of the program>
  • applicationVersion: <applicationVersion of the program>
  • programType: <type of the program>
  • programName: <name of the program>
  • programStatus <of type SchedulableProgramStatus defined below>

  • timestamp: <a timestamp of when this message was sent>

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Program Status (Internal)

SchedulableProgramStatus
public enum SchedulableProgramStatus {
	RUNNING,
	COMPLETED,
	FAILED,
	KILLED
};
Triggers
public class ProgramStatusTrigger {
  private final ProgramId pId;
  private final SchedulableProgramStatus status;
 
  public ProgramStatusTrigger(ProgramId pId, SchedulableProgramStatus status) {
	this.pId = pid;
	this.status = status;
  }
}

External Methods (in ScheduleBuilder)

Trigger Definition
public class ScheduleBuilder {
	...
 
	public ScheduleCreationSpec triggerByProgramCompleted(String application, String applicationVersion, 
		ProgramType programType, String programName);
 
	public ScheduleCreationSpec triggerByProgramFailed(String application, String applicationVersion, 
		ProgramType programType, String programName);
 
	public ScheduleCreationSpec triggerByProgramKilled(String application, String applicationVersion, 
		ProgramType programType, String programName);
 
	public ScheduleCreationSpec triggerByProgramRunning(String application, String applicationVersion, 
		ProgramType programType, String programName); // May not be implemented if there is no explicit use case, see open questions
};

 

Deprecated Programmatic APIs

New REST APIs

New or Update Existing API MethodPathMethodDescriptionRequest BodyResponse CodeResponse
Update Existing API Method
/v3/namespaces/<namespace-id>/apps/<app-id>/schedules/<schedule-id>

PUT

To add a schedule for a program to an application

The body is entirely the same as the existing API endpoint already documented, but the trigger specified in the body is different.

Request
{
  "name": "<name of the schedule>",
  "description": "<schedule description>",
  "program": {
    "programName": "<name of the program>",
    "programType": "WORKFLOW"
  },
  "properties": {
    "<key>": "<value>",
    ...
  },
  "constraints": [
    {
      "type": "<constraint type>",
      "waitUntilMet": <boolean>,
      ...
    },
    ...
  ],
  "trigger": {
	"type": "PROGRAM-STATE",
	"status": "<status of the program; one of above>",
	"program": {
		"namespace": "<namespace of the program>",
		"application": "<application of the program>",
		"applicationVersion": "<application version of the program>",
		"programType": "<type of the program>",
		"programName": "<name of the program>",
	}
  }

200 - On success

404 - When application is not available

409 - Schedule with the same name already exists

500 - Any internal errors

 

 

New API Method/v3/namespaces/<namespace-id>/apps/<app-id>/programsGETGet a list of all programs configured in a certain namespace. 200 - On success
Response
[
 {
   "program": {
		"namespace": "<namespace of the program>",
		"application": "<application of the program>",
		"applicationVersion": "<application version of the program>",
		"programType": "<type of the program>",
		"programName": "<name of the program>"
   }
 },
 ...
]

 

 

Deprecated REST API

  • There are no API routes to be deprecated.

CLI Impact or Changes

  • Add CLI handlers for adding new ProgramSchedule. Currently only exists for TimeScheduling.

UI Impact or Changes

  • Add ProgramSchedule options to the "Schedule" panel, along with a dropdown for pipeline selection and a dropdown for program status selection

Potential Mockup:

Security Impact 

  • There is no notification security currently. 
  • There may be some security concern regarding access to programs within namespaces.

Impact on Infrastructure Outages

  • Since this change requires an expansion on an existing Scheduler system, the impact is the same as any scheduler improvements. There is no additional impact.

Test Scenarios

Test IDTest DescriptionExpected Results
1Configure a schedule to run every 5 minutes and always complete successfully. Configure a second schedule to run if the first schedule has succeeded.Both schedules run and finish successfully.
2Configure a schedule to fail. Configure a second schedule to run only if the first schedule has succeeded.The first schedule should run. The second schedule should not run.
3Configure a schedule to run for 10 seconds before it completes. Configure a second schedule to run only if the first schedule has succeeded. 500 milliseconds before the first schedule completes, delete the second schedule.The first schedule should run and finish successfully.
4Add a Spark program (or any other type of program besides another workflow). Configure a schedule to run if the Spark program has succeeded. Run the Spark program.The Spark program runs, then the schedule starts.
5Configure a schedule to run once and start at 9:50 PM. This schedule should finish in less than 10 minutes. Configure a second schedule to run only between 10:00 PM and 11:00 PM when the first schedule has been completed successfully.The first schedule should run and finish before 10:00 PM. The second schedule should not run
6Configure a schedule to run once. Configure a second schedule to run when there are five partitions and when the first schedule has succeeded. Add three partitions. Start the first schedule. Add two more partitions.Both schedules should start and finish.

Open Questions

  1. What is the logical start time for the programs launched?
  2. Is there a use case for starting another program when a program has "Started" or "Running"?
  3. Should we have a user-accessible class to encapsulate a program, rather than passing in namespace, application, applicationVersion, programType, and programName to encapsulate a program?
  4. Should we send notifications when we don't have access to a program?

Work Items

  • Notifications

    • Name topic in TMS for ProgramSchedulerStates

    • Send program state notifications through TMS

      • At central location for all programs for all ProgramSchedulerStates
  • API Specification
    • Parse new ProgramStateTrigger from a request
  • Scheduler
    • Add new thread to subscribe to ProgramStateNotifications from TMS and create schedule with new trigger
    • Add to ProgramScheduleStore and ensure atomicity of events
    • Ensure event keys are still unique for Program State Triggers
  • Tests
    • Need long running test to test series of pipelines starting from statuses of each other
    • Plus other integration tests
  • UI
    • Fetch all ProgramScheduleTypes
    • Fetch all programs for user to choose from in a pipeline
    • Design multiple tabs to pick between different triggers
  • Documentation
    • Add examples for how to build the trigger

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

Future work

  • No labels