Intro to Spark MLLIB

By: Octavian Geagla

Link to slides: octaviangeagla.com/spark/mllib_slides.html

whoami

  • Computational science/data analytics background.
  • Contributions to Spark MLLIB 1.3 and 1.4.
  • Currently a software engineer at Tendril in Boulder, CO.
  • ...We are hiring!

Motivation

  • Spark is growing fast.
  • MLLIB number of exp/alpha features is huge, central components are maturing into cohesive pipeline.
  • MLLIB facilitates lots of interesting problem-solving techniques.

Quick Intro to Spark

  • RDD: Distributed data.
  • RDD => RDD: map, join, filter, group, reduce, agg, sort, etc.
  • Builds smart execution graph of ops => fast!
  • Streaming, SQL, GraphX, MLLIB, Cassandra, Mesos, etc.

What is MLLIB?

  • Scalable machine learning, statistics, math lib.
    • Huge breadth and depth of algorithms.
  • Some streaming algo support.
  • Breeze types are a breeze!
    • vectors, matrices, sparse/dense, etc
  • BLAS on the JVM, native without JNI.
    • ATLAS, LAPACK

Why use it?

  • Easy to use API

val svd = matrix.computeSVD(20, computeU = true)
val U: RowMatrix = svd.U
val s: Vector = svd.s
val V: Matrix = svd.V
                    
  • Fast prototyping

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTermFreq, logisticRegression))
                    
  • Testing is easy

assert(model.intercept ~== interceptR relTol 1E-3)
                    

Types

  • Sparse vs dense. Under the hood:
    • Sparse requires a coordinate 'lookup' list.
    • Dense data can be contiguous in memory, optimize/vectorize ops (BLAS Level 1).
  • Vectors (local)
  • 
    val denseVec: Vector = Vectors.dense(1.0, 0.0, 3.0)
    val sparseVec: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
                            
  • Labeled Point (local): useful for supervised learning
  • 
    val denseLP = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
    val sparseLP = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
                            

Types Continued

  • Matrices (local)

val denseMat: Matrix = Matrices.dense(3, 2,
  Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
val sparseMat: Matrix = Matrices.sparse(3, 2,
  Array(0, 1), Array(0, 1), Array(2.3, -1.0))
                    

Types Continued (Distributed Matrices)

Structure RDD Type Uses
BlockMatrix ((Int, Int), Matrix) Intuitively like using parallelization using MPI. Phase space multi-grid discretization, iterative approximations.
RowMatrix (Vector) When order doesn't matter, and only integer-countable number of Vectors.
IndexedRowMatrix (Long, Vector) When order does matter. Eg., multivariate time-series aggregation.
CoordinateMatrix (Long, Long, Double) High-dimensional, sparse data.

Native BLAS

  • MLLIB can be used with natively-compiled optimized numerical libraries (disabled by default due to licensing issues).
  • Netlib/JBlas link at runtime to ATLAS/OpenBLAS/MKL/cuBLAS if configured via ldconfig.
  • JVM JIT is as good as if not better than native C/Fortran applications, particularly true for small matrices. Reference.

ML Pipelines

  • Uniform set of APIs for creating and tuning data processing/machine learning pipelines (alpha).
  • Core concepts:
    • DataFrame: RDD with names columns. SQL-like syntax and other core RDD operations.
    • Transformer: DataFrame => DataFrame. Eg., features to predictions.
    • Estimator: DataFrame => Transformer. Eg., supervised learning algo.
    • Param: map of params.
    • Pipeline: Chain of Transformers and Estimators. Specifies the data flow.

ML Pipeline Example

  • Split text into words => convert numerical features => generate a prediction model. Reference.

val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training.toDF)
val test = sc.parallelize(Seq(
  Document(4L, "spark i j k"),
  Document(5L, "l m n"),
  Document(6L, "mapreduce spark"),
  Document(7L, "apache hadoop")))
val predictions = model.transform(test.toDF)
                    

Cons of ML Pipelines

(opinion alert, mostly nitpicks)
  • No compile-time checking of RDD types. Runtime only.
  • SQL-like syntax for operating on DataFrames; further decoupling from the case class (personally like to use when w/C*).
  • From developer perspective, two APIs of your MLLIB component must be maintained, allowing two modes of use.

A Production Use Case

Problem:

  • Have some vector data, and a known number of clusters.
  • Some components of the vector are more important than others.
  • Find the cluster centers and to which each vector belongs!

What's a solution look like?

  • Standardize: StandardScaler/StandardScalerModel
  • Weight: VectorTransformer (ElementwiseProduct in 1.4)
  • Train: KMeans
  • Predict: KMeansModel
  • Persist Stuff: The scaling means/stddevs, weighting vector, kmeans centers.

Something like...


val standardizer = new StandardScaler(withMean = true, withStd = true)
val model = standardizer.fit(dataRDD)
val standardizedDataRDD = model.transform(dataRDD)

val weightVec = Vectors.dense(2.0, 0.5, 0.0, 0.25)
val weightTransformer = new ElementwiseProduct(weightVec) //in Spark 1.4
val weightedDataRDD = weightTransformer.transform(standardizedDataRDD)

val kModel = KMeans.train(weightedDataRDD, 2, 2, 1, initMode)
val predictions = model.predict(weightedDataRDD)

//persist these for reuse:
val means  = model.mean
val vars = model.variance
val weightVec = ...
val centers = kModel.clusterCenters
                    

Current experiments

  • Multi-grid iterative algorithms in Spark.

Questions?