Page tree
Skip to end of metadata
Go to start of metadata

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

In Ops Dashboard, when a user selects a certain time range within the last 7 days, we want to display the statistics and information about the program runs that were running within that selected time range. However, existing program run meta data cannot allow such query for active program runs to be done efficiently as discussed in detail in https://wiki.cask.co/display/CE/Performance+improvement+for+getting+active+program+runs+within+a+time+range+for+Ops+Dashboard This page documents the design for writing and reading active program runs efficiently with a given time range. The general idea is that when a program is actively running, the information about this run will be published periodically like heartbeat. When querying for the active runs within a time range, they will be retrieved from the heartbeat within the time range. The approach in this document is first proposed here: https://wiki.cask.co/display/CE/Performance+improvement+for+getting+active+program+runs+within+a+time+range+for+Ops+Dashboard#PerformanceimprovementforgettingactiveprogramrunswithinatimerangeforOpsDashboard-Approach3:Periodicallyemitmetricsforrunningprograms(proposedbyTerence)

Goals

This document will focus on defining the information published in active program run heartbeat, the mechanism of publishing and processing heartbeat, the write path and read path of the heartbeat, and the API for reading the heartbeat.

User Stories 

  • A user wants to get all the active program runs from 04/11/2017- 5PM till 04/17/2017 5PM in namespaces "ns1", "ns2" and "ns3"

Design





On MessagingProgramStateWriter#run, a scheduled service will be launched to send emitting heartbeat (configurable heartbeat interval , Default : 30minutes) to TMS periodically

On Completed/Killed/Error we stop the thread and also publish a message to the same TMS topic. Since the use-case need to support requests for "get completed runs in a time window".

Why TMS ?

Q: Why do we need to send the heart beat to TMS instead of writing directly to the Table ?  

A: In CDAP-5.0, We have three different CDAP runtime, Distributed, Cloud and Sandbox. If we have to implement writing from program runtime to table directly, we need to have three different code path where we have to implement the logic and involves difficulty in maintaining them. Another concern is performance, If we start a thread in CDAP-master for each program to write heartbeat about the program, then we could quickly run into overloading the CDAP master with too many threads. However by enabling the CDAP programs to send heartbeat to TMS from the program runnnable, the work is distributed and there is only one unified place where the code for heartbeat emission has to happen. Simplifying implementation and maintenance.

It also handles the case where CDAP is down, however the programs in Distributed and Cloud could keep running, since these messages are sent to TMS, they could be processed by CDAP when it comes back up.  

List of fields expected on a time range:

These information have to be encoded as part of the heartbeat message sent to the TMS from ProgramStateWriter.

FieldDescription
Namespaceavailable in ProgramRunId argument
Application Name, Program Nameavailable in ProgramRunId argument
Type of application (artifactId)ProgramDescriptor passed to ProgramStateWriter#start - can be stored as a state in implementation
duration of run 

- determined based on startTime

- if program is still running on this time range, duration = time_window_end - startTime

- if program is completed, duration = endTime - startTime

- this requires storing the startTime state and propagating it in heartBeat messages

delay (running - start Time)delay initialized when running is called first time in MessagingProgramStateWriter.
User - user information is available as a part of system arguments
- system arguments is part of ProgramOptions passed to ProgramStateWriter#start - store the arguments in a state and pass that information in heartbeat
Start Method (scheduled/triggered/manual)similar to above, part of ProgramOptions
Program Statuscan be derived from the method.



Writing to Table

We will read from TMS topic, similar to the messages published about program status. but with a different type identifying the message as a heartbeat.

After reading heartbeat messages, it will be written to HBase table. 

Table configuration:

Since the longest query time range for Ops dashboard will be 7 days, the table will have TTL a little longer than 7 days, such as one month. The table will be salted to avoid hotspot in region server in case of multiple records are written at the same time with the same prefix in rounded timestamp.

Row key design:

<message-timestamp>:<program-run-id>

where:

<message-timestamp>: the time when the message is published

<program-run-id>: app+program type+program+run

Value:

Notification : NotificationType, Properties : (Map of <String, String> or object including ProgramRunId, system arguments, ArtifactId, principal, startTime, runningTime, stopTime).


Reading from Table

Typical queries are scan table for a time range (1 hr or 12hr) for running and completed programs. 

backend will scan and include rows in time range for the start and end time provided and respond to the user. 

Scan and add the running programs to a set  identified by programRunId (more than one heartbeat message in a time range)

If the program runid is stopped in the time-window, then it will be filtered out of running programs list and added to the completed programs list else it will be in the running programs list. 

Edge cases will be considered where a program is suspended and started in the same time window during implementation.

Finally these two collection will be merged and sent as response to the user.

----------------

Archive

Approach

Sending heartbeat from ProgramRuntimeService

When ProgramRuntimeService starts a program run (or when a program starts running?), a scheduled service will be launched to send messages to TMS containing ProgramRunId, user overrides, system overrides, ProgramDescriptor, and the time when the message is published. A listener will be attached to the ProgramController so that when the program run is stopped, the scheduled service will also be shutdown. Before the scheduled service is shutdown, a final message will be sent to the heartbeat topic for this run to as the final hearbeat of the program run regardless of the schedule.

Since there is heartbeat sent at the start of the program run and another at the end of the program run, the frequency of sending the heartbeat just can be as small as possible to minimize the storage space in ActivePrgoramRunStore, but also guarantee that the heartbeat emitted by such program run will be included in the query time range. Assume given a query time range [query_begin_time, query_end_time) the minimum difference (query_end_time - query_begin_time) is T, then the frequency of sending heartbeat is every T/2 according to Nyquist frequency https://en.wikipedia.org/wiki/Nyquist_frequency In the ops dashboard use case, the smallest duration of the time range will be 12 hours, so heartbeats will be written every 6 hours. As shown in the graph below, by sending heartbeat at such frequency, at least one heartbeat is guaranteed to be included in the query time range.

          query_begin_time           query_end_time
               |                       |
------------------------------------------------------------> t
        |          |            |            |       |
run1:  start     hb1           hb2         hb3       stop


            |          |            |      |
run2:      start     hb1           hb2    stop


* hb: heartbeat

Even if there is no such minimum query time range constraint, we can still get all the active runs given heartbeat are sent every N seconds, by looking ahead of the query_begin_time by [N - (query_end_time - query_begin_time)] when getting the active records as shown in the graph below:

In case 1, even though no heartbeat falls within the query time range, but by looking ahead of the query_begin_time, the initial heartbeat start will be found.

In case 2, a heartbeat can be found in the query time range, even though looking ahead of the query_begin_time hasn't reached any heartbeat.

1.
                              query_begin_time   query_end_time
      |<-------------- delta  ---------|             |
----------------------------------------------------------------------------------------------------> t (s)
            |<---------------       N      -------------------->|            |        
run1:       start                                               hb         stop



2.
                      query_begin_time      query_end_time
               |<--- delta -----|             |
-----------------------------------------------------------------------------------------------------> t (s)
         |<----------     N  --------->|<----------     N  --------->|          |            
run1:    start                         hb1                         hb2         stop

* delta = N - (query_end_time - query_begin_time)
* hb: heartbeat

ActiveProgramRunStore read/write path

Table configuration:

Since the longest query time range for Ops dashboard will be 7 days, the table will have TTL a little longer than 7 days, such as one month. The table will be salted to avoid hotspot in region server in case of multiple records are written at the same time with the same prefix in rounded timestamp.

Row key design:

<message-timestamp>:<namespace>:<program-run-id>

where:

<message-timestamp>: the time when the message is published

<namespace-id>: the namespace of the program run

<program-run-id>: app+program type+program+run

Value:

RunRecordMeta including ProgramRunId, system arguments, ArtifactId, principal, stop time if present, and etc.


Write Path:
-----------
ActiveProgramRunSubscriber will subscribe to the heartbeat topic and write the heartbeat messages as RunRecordMeta in the ActiveProgramRunStore with the row keys mentioned above.
Read Path:
-----------
When a user queries for "Active runs" for the last 12 hours in namespace "query-namespace" . 
we construct the rowkey such as start key:
<query-namespace>:<user-query-start-ts>
and stop key:
<query-namespace>:<user-query-stop-ts> 
For instance, with query time range 03/17/2017 10:35am - 10:35pm and namespace "ns2", the start key will be:
ns2:<03/17/2017 10:35am in epoch time>
and stop key:
ns2:<03/17/2017 10:35pm in epoch time>
The namespaces in the row keys will be used to get the program runs from the namespaces in the query.

New Programmatic APIs

ActiveProgramRunStore
/**
 * Store for the run records of active program runs in the history.
 *
 * The rowkeys are of the form:
 *
 * messageTimestamp|namespace|app|version|programtype|program|runid
 */
public interface ActiveProgramRunStore { 


  /**
   * Get a list of program runs that were running between given start and end time in the given namespaces
   * 
   * @param startTimeInSecs start time in seconds
   * @param endTimeInSecs end time in seconds
   * @param namespaces the namespaces to get the program runs from 
   * @return program runs that were running between given start and end time.
   */
  List<RunRecordMeta> getRunningInRange(long startTimeInSecs, long endTimeInSecs, Set<NamespaceId> namespaces);
}


Security Impact 

Will authorization be enforced on program level?

Test Scenarios

Test IDTest DescriptionExpected Results
1Get active runs from 03/17/2017 10:35am - 10:35pm in namespace "ns2"Only program runs that were running in that time range in namespace "ns2" will be returned

Releases

Release 5.0.0

Related Work


Future work

  • No labels

6 Comments

    1. How will we deal with situations where the program is running and the heart beat is not sent? 
    2. How will it work in Cloud scenarios, where we read from TMS on cluster, and the Heart beat is published to Cloud TMS
    3. This approach produces more load on TMS where there are lot of concurrent jobs running
    4. What's the trade off we are making? If we don't do this, how slower are the queries compared to this approach?
    5. If there are a lot of concurrent long running programs, then the Table that stores the active programs becomes bigger so what will be the impact of scanning that table? What's the sweet spot for this approach?
      1. This running program won't be included in the ops dashboard. However, this will be the same as recording program status, if the notification of program status fails to be sent, we cannot record the program status change either. I think we can only make the best effort to publish the heartbeat but there's not much can be done to correct the cases of not able to publish the heartbeat.
      2. Not sure about the cloud TMS scenario. Need some help with discussing about this.
      3.  As mentioned in the design, if we limit the minimum time range in the query as 12 hours, we only need to publish heartbeats every 6 hours. The impact on TMS won't be significant.
      4. The tradeoff is between 
        1. the cons of existing solution: slow scanning for run records; heavy reading operations in AppMetadataStore (almost full-table scan); the performance with this existing solution is 10 seconds to scan around 400k records. Experiment results can be referred here: Performance improvement for getting active program runs within a time range for Ops Dashboard#PerformanceExperimentwithApproach1optimizationVS.nooptimization
        2. the cons of this new solution: more load on TMS; extra storage for the heartbeats and duplicated run records in the ActiveProgramRunStore. However the load won't be heavy even if there are a lot of long running programs. In addition, the table has TTL moderately longer than 7 days, the longest time a user can query in Ops dashboard, so the space requirement will not be significant.
      5.  The impact of a lot of concurrent long running programs won't be significant since the heartbeats are sent at a very low frequency and the row keys are designed in a way that irrelevant time range can be efficiently skipped.
      1. If the frequency of writes is not a lot then this approach would work. 

        > Not sure about the cloud TMS scenario. Need some help with discussing about this.

        If pipelines are orchestrated in the cloud the program controller will be published to local TMS on the cloud and those needs to be fetched to the TMS on the cluster. If the publish frequency is less then it is not a cause of concern.

        > As mentioned in the design, if we limit the minimum time range in the query as 12 hours, we only need to publish heartbeats every 6 hours. The impact on TMS won't be significant.

        What is the product requirement on the range queries? 

        Also if we publish the message exactly on the same time for all running programs, there will be a huge spike would be good to avoid that.


        1. > What is the product requirement on the range queries? 

          From the current ops dashboard design, the requirement for query time range is 12 hours or 7 days. However, even if there's no such limit, as I mentioned in the updated section of "Sending heartbeat from ProgramRuntimeService", we can still get all the active runs by looking ahead of the query_begin_time. It will just be a trade-off between how far ahead we want to look back at query time VS. how often we want to emit heartbeat.


          > Also if we publish the message exactly on the same time for all running programs, there will be a huge spike would be good to avoid that.

          This will only happen if all those programs are started at the same time. It will be unlikely unless they are scheduled this way. Will think of a solution for this.


  1. When reading from the table, no need to keep two lists for running and stopped runs. For runs with the same runId, we can just keep the heartbeat with the latest message timestamp, assuming that the value in every heartbeat includes all the necessary info we need.

    1. yeah, it will be a set identified by programRunId.