A Framework for parallel computation in Data Mining – Motivation – Solution



A Framework for parallel computation in Data Mining – Motivation – Solution

0 0


scavenger_presentation


On Github tyukiand / scavenger_presentation

A Framework for parallel computation in Data Mining

Andrey Tyukin, 2015-07-22

Motivation

Suppose that we have an algorithm: Preprocessing, Core algorithm, Posprocessing

Metaparameter optimization

Multiple variants of each sub-algorithm

  • Different settings of some metaparameters
  • Entirely different versions of subalgorithm

Combinatoric explosion

How do we (usually) evaluate it?

How should we evaluate it?

It's no longer embarassingly parallel!

Solution

Scavenger 2.x-framework:
  • Allows to submit modular jobs
  • Plans the execution order
  • Coordinates evaluation on multiple nodes
  • Backup, caching
  • Recovery from node crashes

Why the name?

  • Encourages user to subdivide jobs into smaller steps.
  • Smaller steps need less time and fewer processors
  • => Easier to schedule!
  • Encourages user to back up intermediate results
  • => Easier to re-run failed experiments!
Big monolithic jobs:
Scavenger 2.x-worker nodes jobs:

Computation[X]

How do we specify the jobs? Jobs are represented by instances of the Computation[X]-trait

// Scala code
trait Computation[+X] {
  def identifier: Identifier[X]
  def compute(ctx: Context): Future[X]
  ...
} 
            

  • Identifiers are used for caching and back-ups
  • Context is an interface that allows to submit subjobs
  • Futures represent asynchronous computations

Futures

trait Future[+X] {
  def map[Y](f: X => Y): Future[Y]
  def flatMap[Y](f: X => Future[Y]): Future[Y]
  ...
} 
            
  • Doesn't block
  • Triggers registered callbacks as soon as data is available
  • Implements monadic interface

Futures: Example

// Will eventually return a list of website addresses
val listOfUrls: Future[JSON] = 
  myClient.getJson("www.google.com/api/findMePagesWith=JGU")

// Will eventually return page content
val content: Future[String] = addr.flatMap{ json => 
  val url = json.get(0).get("url")
  myClient.getHtml(url)
}
content.map{ c => println(c) }
            
  • Gets a list of URLs from Google
  • Visits first URL as soon as the list is available
  • Prints page content as soon as it's there

Notion of computation

  • The trait `Computation[X]` builds on top of `Future[X]`
  • Adds the possibility to use `Context` to submit subjobs
  • It's indeed a monad!

Ways to combine algorithms

  • Identity algorithms
  • Composition of algorithms
  • Pairs of algorithms
  • Partially applied algorithms
  • ...
Similar to Pipes-and-Filters, only better! Jobs are essetially DAGs built from data and algorithms

Identification of resources

  • If we back sth. up, we have to be able to find it
  • We must be able to identify common components of different `Computation[X]`-instances

We use a separate little language (elements of a free Cartesian closed category) as identifiers. Thus, we can describe function compositions, products, partial applications etc. in a way that can be used by the framework for planning of a good execution order.

FreeCCC: Example

// `o` denotes composition
// [-,-] denotes product of morphisms
a = A o [f,g] o (x, y)
b = B o [f,h] o (x, y)
            
Our identifiers are clever enough to recognize that we need `f(x)` in both `a` and `b`.

FreeCCC: Example 2

// `o` denotes composition
// `Lambda` denotes currying
// `[-,-]` denotes product of morphisms
// `Fst` and `Snd` are canonical projections
a = Lambda(f o [ Snd, Fst ])(y)(x)
b = f((x,y))
            
Our identifiers allow the framework to recognize that `a = b`. This frequently occurs when using partial application.

Implementation

The API:

  • Formulate jobs in terms of `Computation[X]`-trait
  • Submit job to a `Context`
  • Receive result packaged in a `Future[X]`

Backend:
  • Built on top of Akka-actor system
  • Master-worker pattern, seed-node
  • Typical layer architecture in each node

Seed node

  • Establishes initial connections between other nodes
  • Requires a fixed IP (MOGON login node?)

Master node

  • Transforms separate jobs into one big DAG
  • Plans the order of execution
  • Distributes subjobs across multiple Worker nodes
  • Caches and backs up intermediate results
  • Responsible for load balancing
  • Handles failure of worker nodes

Worker nodes

  • Have their own cache
  • Compute what the master has requested, send results back as soon as possible

Master/Worker

Conclusion

We have implemented a framework that

  • efficiently executes multiple jobs on multiple nodes
  • allows us to break up big computations into small jobs
  • simplifies sharing of intermediate results
  • simplifies generation of back-ups
  • is based on a theoretically sound model

Todo's:

  • API's for other languages (Java, Python?)
  • At the moment: no authentication mechanisms