Skip to content

Shufle Service on REEF

gwsshs22 edited this page Aug 14, 2015 · 2 revisions

Shuffle Service on REEF

Introduction

“Shuffle” is a data movement abstraction where senders (mappers) transfer key/value tuples to receivers (reducers) that are selected by a specific shuffling strategy. Shuffle is used in many distributed frameworks and programming models.

For example, in the MapReduce programming model, tuples created by mappers are sent to reducers by hash-based key grouping in order to guarantee all tuples that have the same key go to the same reducer. In Apache Storm, a distributed realtime streaming engine, a Storm Topology has a DAG for pipelining and processing tuple data. An edge in the DAG represents a shuffle from some processing element to another.

Although logical structures of shuffle are almost the same, implementations are quite different. Two major properties that differentiate shuffle implementations are batch/streaming and push/pull.

  1. batch / streaming Batch shuffle implementations send lots of tuples at the same time whereas streaming shuffles send tiny chunks of tuples in realtime.

  2. pull / push In push shuffle implementations, senders decide when to send push tuples to receivers. However in pull shuffle implementations, receivers decide when to receive tuples and fetch tuple data from senders.

Batch engines like MapReduce tend to use pull-based shuffles, while streaming engines favor push-based shuffles.

General-purpose shuffle API supporting all types of shuffle implementations are not common, since most frameworks only need a specific implementation of shuffle. We should design such a shuffle API to make shuffle as a service on REEF.

Design

We believe the base API of Shuffle Service is pushing tuples because even senders of pull-based shuffle also push tuple data when receivers request them. We made the base push-based shuffle API first and will later support other implementations using the push API.

1. Interfaces

  • Common

    1. ShuffleDescription
    • A ShuffleDescription contains (a) a sender identifier list, (b) a receiver identifier list, (c) key and value types and (d) a shuffling strategy.
    1. ShuffleStrategy
    • The strategy to select which receivers take which tuples, given a set of keys. A Shuffle Service user can define his or her own custom shuffle strategy. For example, a hash-based key grouping is a possible shuffle strategy. It selects one receiver from the receiver list in a shuffle based on the hash code of a key.
  • Driver-Side

    1. ShuffleDriver
    • A ShuffleDescription is registered through ShuffleDriver and managed by a ShuffleManager, which handles the control flow of the shuffling procedure. ShuffleDriver instantiates and maintains the ShuffleManagers and provides context, task configurations for a certain end point identifier to the user.
    1. ShuffleManager
    • A ShuffleManager communicates with evaluators to handle the whole control flow of shuffles such as initial synchronization or adding/removing end points in the shuffle.
  • Evaluator-Side

    1. ShuffleProvider
    • All serialized Shuffles are automatically instantiated in ShuffleProvider. Users retrieve Shuffles through this class.
    1. Shuffle
    • This provides ShuffleOperators for a specific Shuffle and internally communicates with the Driver-side ShuffleManager.
    1. ShuffleOperator (ShuffleSender, ShuffleReceiver)
    • Users can register callbacks to ShuffleReceiver and ShuffleSender, either to receive or send tuples, respectively.

2. Workflow

Shuffle Service Workflow

  1. A user registers a ShuffleDescription. The user can register more than one ShuffleDescriptions.

  2. ShuffleDriver instantiates a ShuffleManager and returns it.

  3. The user should retrieve shuffle configurations for each Tasks.

  4. Make the tasks and/or contexts using the configurations.

  5. Shuffle information is serialized by Tang injection.

  6. ShuffleProvider deserializes the information and creates Shuffle instances.

  7. The user gets a Shuffle instance.

  8. The user gets ShuffleOperators.

  9. The user now can exchanges tuples in evaluators through the operators.