Page tree
Skip to end of metadata
Go to start of metadata

Related JIRA:  CDAP-2756 - Getting issue details... STATUS

Use Cases:

  • Validator Filter: All records of a transform that are invalid go into one dataset; the remainder go into another.
  • Writing the same output data to two separate outputs, with different formats.


Existing APIs (in MapReduceContext, used in beforeSubmit):

// sets a single Dataset as the output for the MapReduce job
context.setOutput(String datasetName);
context.setOutput(String datasetName, Dataset dataset);


New APIs (in MapReduceContext, used in beforeSubmit):

// specify a Dataset and arguments, to be used as an output for the MapReduce job:
context.addOutput(String datasetName);
context.addOutput(String datasetName, Map<String, String> arguments); 


New APIs - note that this will be a custom mapper, reducer, and context classes which override the hadoop classes, providing the additional functionality of writing to multiple outputs:

// specifies which Dataset to write to and handles the delegation to the appropriate OutputFormat:
context.write(String datasetName, KEY key, VALUE value);


New APIs (in BatchSinkContext, used in prepareRun of the BatchSink):

// specify a Dataset and arguments, to be used as an output for the Adapter job:
context.addOutput(String datasetName);
context.addOutput(String datasetName, Map<String, String> arguments); 

Example Usage:

public void beforeSubmit(MapReduceContext context) throws Exception {
  // ...

public static class Counter extends AbstractReducer<Text, IntWritable, byte[], Long> {

  public void reduce(Text key, Iterable<IntWritable> values, Context context) {
    // do computation and output to the desired dataset
    if ( ... ) {
      context.write(key.getBytes(), val);
    } else {
      context.write("invalidCounts", key.getBytes(), val);


Take an approach similar to org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.
The Datasets to be written to must be defined in advance, in the beforeSubmit of the MapReduce job.
In the mapper/reducer, the user specifies the name of the output Dataset, and our helper class (MultipleOutputs) determines the appropriate OutputFormat and configuration for writing.
The MapperWrapper and ReducerWrapper will be responsible for instantiating the MultipleOutputs class and setting it on the user's mapper/reducer in a similar fashion as Metrics are set. The MapperWrapper and ReducerWrapper will also be responsible for closing the MultipleOutputs object.

Deprecate the setting of output dataset from the configure method as it provides no utility over setting it in the beforeSubmit.

New APIs in BatchSinkContext will simply delegate to MapReduceContext's new APIs for having multiple output Datasets.


Naming of the MultipleOutputs class that we expose is up for change.
Should we allow the user to write to non-Dataset files from our MultipleOutputs class? I suggest no for simplicity. What this will disallow is the ability to write to both a Dataset and non-Dataset files from the same MapReduce.
Should we restrict users from simply calling context.write(k, v), after having set multiple Datasets as the output? 



  • No labels


  1. What happens if somebody uses both setOutput() and addOutput()?

  2. setOutput(A) will simple set the collection of outputDatasets to a single-length list.
    Calling addOutput(B) after this will simply append the B dataset to the list.
    Calling setOutput(C) at this point will clear the list and set C as the single output dataset.

    Does that seem reasonable to you?

    An alternative behavior could be to disallow the user from calling both methods (set vs add).

    1. seems reasonable to me.

    2. Can we deprecate the set one and replace it with the add one?

      1. Even if user wants to have one output dataset, they should use addOutput() method?

        1. I think so. Also, if the user doesn't need to be use the MultipleOutputs class to write to output, it make more sense. Just like in Java, to create a list with single item, you also call add.

          1. Makes sense. If Albert Shau or anybody else has no objections, I'll use just addOutput() in the new APIs (and deprecate the setOutput methods).

  3. MultipleOutputs will have to be in cdap-api for people to use. One way we could do it is make it an interface in cdap-api, then add a method to MapReduceContext to get it.  Then somebody could have their mapper/reduce extend ProgramLifecycle<MapReduceContext> in order to get the MultipleOutputs at initialize time, where our implementation is hidden in cdap-app-fabric.

    It would also mean MultipleOutputs would need to take in the Context as an additional argument when writing.

    1. Currently, I tried the following approach and it worked. It also seems cleaner from the user's perspective:
      The MapperWrapper and ReducerWrapper will be responsible for instantiating the MultipleOutputs class and setting it on the user's mapper/reducer in a similar fashion as Metrics are set. The MapperWrapper and ReducerWrapper will also be responsible for closing the MultipleOutputs object.

      1. ah i see so they don't actually new it or anything that is a lot nicer.

      2. So, how does it compare to output it through the context object, which is the same as normal map reduce does

        1. If user just has added one output, they will be able to use the context object to write.
          If they have multiple outputs, then they need to use MultipleOutputs.write(namedOutput, k, v); . 

          1. That's seems like quite a confusing API to use

            1. Well, with more than one output, user needs to specify which output to write to, and MultipleOutputs.write(dsName, k, v) seemed like the best way to specify.
              This is similar API to a hadoop class:

              1. The confusing part is in the API you proposed. One add the output through the context object, but then writing of the actual key/value is through an injected class (vs in Hadoop API case, both are done through MultipleOutputs, and the construction of MultipleOutputs is explicit, and is clear that the MultipleOutputs class is a wrapper around the MR context object).

                I think it's better be consistent. You either do it all through context, or always through a class. For Hadoop API case, it has it's own legacy to maintain, which is less of a burden for CDAP API as of now.

                1. Ok. The reason we didn't have user instantiate the object directly is because it has hadoop dependencies and should sit in cdap-api.

                  So, to be consistent, in the beforeSubmit(), the user should specify the multiple output Datasets using the MultipleOutputs class?
                  So, instead of:
                  It would be: 
                  MultipleOutputs.addOutput(mapReduceContext, datasetName);

                  1. I don't like having MultipleOutputs. If we have that, then we need to document how to use multiple outputs for a MapReduce program explicitly. Is there a way we can have the reduce() do something like

                    context.write("datasetName", key.getBytes(), val);

                    I understand "context" is a Hadoop class, but maybe we can do something similar, like have a CDAPReducer that has a method "write(datasetName, KEYOUT, VALUEOUT)". I don't really like the CDAPReducer idea specifically though.

                    Also, in your example, "byte[], Long" is repeated twice, which should be unnecessary.

                    1. I think its fair to document how to write to multiple datasets.

                      The alternative would be to inject our own context object which allows writing to it and it would have methods such as:

                      context.write(k, v); // allowed only when only one output dataset is specified
                      context.write(dsName, k, v); // allowed only when multiple output datasets are defined

                      ... though I don't like this approach as it makes the user's mapper and reducer more different than hadoop mapper/reducers.

                      I removed the "byte[], Long" from the MultipleOutputs. That's what you were referring to as unnecessary, right?

            2. Terence Yim Alvin Wang
              Any thoughts on an improved API - better than a MultipleOutputs.write(ds, k, v)?
              Should the multiple datasets be added via a MultipleOutputs.addOutput(), which takes in the MapReduceContext instead of the user adding it directly to the MapReduceContext? This will help with the consistency in usage of specifying outputs and writing to the outputs.

  4. Why do we pass both dataset name and dataset instance to context.setOutput(String datasetName, Dataset dataset); ?

    1. The Dataset instance does not contain the name of the dataset.

      1. Why do we need the dataset instance?

        1. The Dataset instance can be an OutputFormatProvider, which will return an OutputFormat and configuration based upon the Dataset's runtime arguments.

          1. I understand it the current API behvior, but isn't it more make sense to have user pass in name and properties? Internally we can instantiate the dataset if the class implements format provider. This can help with managing dataset resources as well.

  5. I agree with Terence. Really don't like the yet-another-magically-injected MultipleOutputs thing you need if you want to write to multiple outputs. Ideally, users should be able to write to multiple outputs directly through Mapper/Reducer/Context interface, so that they don't have to look up an arbitrary class.

    However, we may want to preserve the fact that our Mapper/Reducer interfaces are plain Hadoop. Or alternatively, we can advise users to use our Mapper/Reducer interfaces, but also allow them to use the plain Hadoop interfaces.

    1. Terence Yim    Albert Shau    
      One of the limitations I see with having a custom class (such as a MultiOutputReducer) in our cdap-api is that it wouldn't be able to depend on hadoop classes. Because of that, it would be difficult to have a context object that that matches the functionality offered by the hadoop context object - which also gives insight to the job progress and other information.
      Do you guys see any way around this? 

      1. A couple of limitations - it requires a lot of workarounds, as listed below, which complicates the approach of having a custom mapper/reducer class in the API.
        1) our custom mapper/reducer can not extend hadoop mapper/reducer.

        2) our custom context can not extend hadoop context. It will have to simply have a getHadoopContext() method to expose that to the user if they want it.

        3) In user's beforeSubmit method, job.setMapperclass(ourCustomClass) doesn't work because it doesn't extend hadoop class


  6. One of the pending questions is regarding the API to add datasets in the beforeSubmit of the MapReduce job. One option is to only allow an addOutput method.

    MapReduceContext#addOutput(String datasetName);

    An alternative is to additionally allow a setOutput method, in case the user wants to add only one output dataset.

    MapReduceContext#addOutput(String datasetName);

    Yet another alternative is to allow a builder-style pattern for specifying datasets. For instance;


    Any thoughts on this are welcome.