Page tree
Skip to end of metadata
Go to start of metadata
The primary use case for this feature is writing records to multiple partitions, where the partition can vary based upon the data in the key/value.
For instance, the records being processed may encode an event time of that record, which needs to be one of the partition keys.
CDAP-2757 - Getting issue details... STATUS  

API

User will need to extend and implement a DynamicPartitioner, which is responsible for defining the PartitionKey to use for each record.
For each of the partition keys that this DynamicPartitioner returns, a partition will be created in the output PartitionedFileSet dataset.

 

public static final class CustomDynamicPartitioner extends DynamicPartitioner<Long, Long> {
  private long logicalStartTime;

  @Override
  public void initialize(MapReduceTaskContext mapReduceTaskContext) {
    this.logicalStartTime = mapReduceTaskContext.getLogicalStartTime();
  }

  @Override
  public PartitionKey getPartitionKey(Long key, Long value) {
    return PartitionKey.builder().addLongField("time", logicalStartTime).addLongField("other", key).build();
  }
}


 In the beforeSubmit() method of the user's MapReduce job, set the PartitionedFileSet dataset with the user's Dynamic Partitioner as output

public void beforeSubmit(MapReduceContext context) throws Exception {
  // define input and other setup
  // ...

  Map<String, String> outputArgs = new HashMap<>();
  // alternative to setting the PartitionKey, set a DynamicPartitioner class
  PartitionedFileSetArguments.setDynamicPartitioner(outputArgs, CustomDynamicPartitioner.class);
  context.addOutput("outputLines", outputArgs);
}

High-level Implementation Design

Currently, when using a PartitionedFileSet dataset as an output for MapReduce, a single partition must be set as the output partition.
This means that currently, the MapReduce job will write to a single output directory and register a single partition within the PartitionedFileSet's metadata table.
Dynamic Partitioning will allow users to specify a PartitionKey based upon the records being processed by the MapReduce job. 


If this output PartitionKey is missing, we will look for a specified DynamicPartitioner, which maps records to PartitionKey.
We will also have to implement our own DynamicPartitioningOutputFormat which is responsible for writing to multiple paths, depending on the PartitionKey.
We will also need to implement an OutputCommitter, which is responsible for creating partitions for each of the partition keys written to.

DynamicPartitioner.java
/**
 * Responsible for dynamically determining a @{link PartitionKey}.
 * For each K, V pair, the getPartitionKey(K, V) method is called to determine a PartitionKey.
 *
 * @param <K> Type of key
 * @param <V> Type of value
 */
public abstract class DynamicPartitioner<K, V> {


  /**
   *  Initializes a DynamicPartitioner.
   *  <p>
   *    This method will be called only once per {@link DynamicPartitioner} instance. It is the first method call
   *    on that instance.
   *  </p>
   *  @param mapReduceTaskContext the mapReduceTaskContext for the task that this DynamicPartitioner is running in.
   *
   */
  public void initialize(MapReduceTaskContext<K, V> mapReduceTaskContext) {
    // do nothing by default
  }

  /**
   *  Destroys a DynamicPartitioner.
   *  <p>
   *    This method will be called only once per {@link DynamicPartitioner} instance. It is the last method call
   *    on that instance.
   *  </p>
   */
  public void destroy() {
    // do nothing by default
  }

  /**
   * Determine the PartitionKey for the key-value pair to be written to.
   *
   * @param key the key to be written
   * @param value the value to be written
   * @return the {@link PartitionKey} for the key-value pair to be written to.
   */
  public abstract PartitionKey getPartitionKey(K key, V value);
}
  • No labels

6 Comments

  1. Generally, whenever you have initialize method, you should have destroy method as well.

  2. I like passing in the context rather than the logicalStartTime.  This makes it more generic, as partition dimensions could be many other things besides the logical start time.

  3. I added methods to transform the key and value to the DynamicPartitioner interface.
    The purpose is to write a different key/value (for instance writing null as the key, if using the key in the output partition key).

    Terence Yim Albert Shau 
    What do you guys think? 

  4. not sure I understand the use case for the transform methods

  5. The key and value can be used in determining the PartitionKey being written to.
    Because of this, the user might want to omit the key from being written in that partition, so they might transform the key to null. 

  6. Nevermind; there's no concrete use case or need for the transform methods, so removed them.