Page tree
Skip to end of metadata
Go to start of metadata

Goals

JIRA: CDAP-4075: Error handling for Workflows.

Use Cases

  1. When Workflow run finishes, user may want to send an email about its success or failure.
  2. In case of hydrator pipeline, once the run is finish, user may wish to delete the data from the external source such as Oracle/Teradata etc.
  3. If the Workflow fails for some reason, user may want to cleanup the files/data written by the nodes in the Workflow.
  4. On failure of the Workflow, user may wish to keep certain local datasets for further debugging.
  5. In a Workflow, user can have a custom action at the start of the workflow that writes to a dataset (which acts as a lock). Next node in the Workflow is a MapReduce program that fails for that run of the Workflow. User would like to be able to clean up the state that custom action wrote to dataset 

User Stories

  1. As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case of Workflow action failure.
  2. As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case Workflow fails.
  3. As a developer of the Workflow, I want an ability to send an email once the Workflow run finishes. In case of failure, I should be able to access the nodes that failed and the failure cause.
  4. As a developer of the Workflow, I want an ability to instruct the Workflow system, not to delete the certain local datasets for triage purpose.

 Possible approach

  1. Ideally clean up activity should be done by the node in the Workflow which created the data, since the node knows what information need to be cleaned up. MapReduce and Spark program already have the onFinish method, which can be used to clean up any state on their failure. Custom action should similarly have the onFinish method to perform clean up on custom action failure. Custom action already have destroy method, however it does not know whether the run is succeeded or failed. We should deprecate it and introduce the new method onFinish to be consistent with other actions.

    public interface WorkflowAction extends Runnable {
      /**
       * This method is called after the {@link #run} method completes and it can be used for resource cleanup. 
       * Any exception thrown only gets logged but does not affect execution of the {@link Workflow}.
       */
      @Deprecated 
      void destroy();
     
      /**
       * This method is called after the execution of the action is done.
       * @param succeeded defines the result of action execution: true if job succeeded, false otherwise
      * @throws Exception if there is any error during execution. This exception will only be logged as error
      * without affecting the execution of the {@link Workflow}
      */
      void onFinish(boolean succeeded) throws Exception;
    }


  2. We will have onFinish method in the Workflow interface as well, which will get called when the Workflow finishes either successfully or on failure.
      

    public interface Workflow {
     /**
     * Called when the Workflow run finishes either successfully or on failure.
     * @param context the context associated with the Workflow
     * @param state the state of the Workflow
     * @throws Exception if there is an error during this method. This will not affect the status of the Workflow.
     */
     void onFinish(WorkflowContext context, WorkflowState state) throws Exception;
    }

     

  3. WorkflowState class contains the state of all nodes in the Workflow.

    public final class WorkflowState {
       private final Map<String, WorkflowNodeState> nodeState;
       private boolean isSucceeded; 
    }
     
    public final class WorkflowNodeState {
       private final String nodeId;
       private final NodeStatus nodeStatus;
       private final RunId runId;
       // Cause if the node execution failed, null otherwise
       private final Throwable failureCause;
    }
     
    public enum NodeStatus {
       KILLED,
       FAILED,
       COMPLETED
    }
  4. onFinish method in the Workflow can also update the preferences such as changing preferences for the local datasets.
    1. This can be either done through WorkflowToken, since user can get the WorkflowToken through WorkflowContext. However since we store the information in the WorkflowToken at node level, we will have to create an internal node for the onFinish method.
    2. Another approach is to have Map<String, String> properties in the WorkflowState instance, which user can update in the onFinish method.

  5. Similar to MapReduce and Spark, onFinish method of the Workflow will run in short transaction. Ideally user would like to have control over the kind of transaction that need to be started.

  6. Workflow can also have the beforeStart method which can be used for any cleanup activity, so that user do not have to put additional custom action only for initialization purpose. beforeStart method for now can run in short transaction.

    public interface Workflow {
       /**
        * Called before the start of the every run of the Workflow.
        * @param context the Workflow context
        * @throws Exception thrown if any error executing code. This will cause Workflow to fail.
        */
       void beforeStart(WorkflowContext context) throws Exception;
    }


  • No labels

13 Comments

  1. Should the workflow also have a "beforeStart" method for task that need to execute before the workflow runs? It may be a better approach to handle user stories 5 instead of using a custom action.

    1. Or call it beforeSubmit()?

      1. No strong opinion for the name. We can call it beforeSubmit() to be in consistent with the MapReduce and Spark, 

  2. Makes sense. Documented the beforeStart as well.

  3.  

    • In case of hydrator pipeline, once the run is finish, user may wish to delete the data from the external source such as Oracle/Teradata etc.

     

    So this is not only about error handling, right? Because this would only happen on success...

    1. Yes. Its not only about error handling. Renamed document to "Workflow hooks".

  4. In a Workflow, user can have a custom action at the start of the workflow that writes to a dataset (which acts as a lock).

    Interesting use case. However, if the tx fails because someone else got the lock, what happens? Will the workflow fail? In this case, we may want to wait and retry instead of failing. So the action or beforeStart method needs explicit control over transactions.

    1. Currently Workflow will fail. However there is a JIRA filed for supporting the control over transactions in custom actions -  CDAP-4648 - Getting issue details... STATUS . I think in 3.4 we can implement in consistent way with other hooks and later we can have transaction support for all.

  5. destroy

    will still need to keep destroy, as that is a lifecycle method that releases all external resources it may hold. 

    1. Releasing of external resources can be done in the "onFinish" method as well, right? 

  6. specify hook which will be called by the Workflow when it fini

    Not sure if I like this. The MapReduce (and its container) will already be out of scope when the workflow finishes. 

    1. Since action in the Workflow knows better about what sort of clean up needs to be done, ideally it should be actions responsibility to perform clean up once the Workflow finishes. However it needs to know when the Workflow is done, that is why we need these hooks. These hooks can be executed in the Workflow driver itself assuming they clean up the hdfs directories/datasets. However this needs more thinking. 

  7. I discussed in person with Andreas Neumann and Terence Yim and following is the conclusion - 

    1. WorkflowAction should extend from ProgramLifecycle interface to have standard initialize() and destroy() methods.
    2. WorkflowContext should have method getStatus(), which would return the appropriate state of the action. In initialize() method, it would return STARTING and in destroy method, it would return one of the COMPLETED, KILLED, or FAILED.
    3. WorkflowContext should also have the getState() method, which would return the instance of the WorkflowState as described in approach 3 above. The state would be stored in the RunRecord of the Workflow.
    4. MapReduce API should deprecate the beforeSubmit and onFinish method. Instead it should implement the interface ProgramLifecycle to be in consistent with the other programs. Filed JIRA -  CDAP-5279 - Getting issue details... STATUS  for the same.

    Andreas NeumannTerence Yim Please add if I missed anything. Thanks!