Monadic Bakery with Spray and Scalaz



Monadic Bakery with Spray and Scalaz

0 1


Monadic-bakery-with-spray-and-scalaz

slides for my presentation at MSUG

On Github 4lex1v / Monadic-bakery-with-spray-and-scalaz

Monadic Bakery with Spray and Scalaz

By Ivanov Alexander / @4lex1v

Agenda

  • Spray
  • Cake Pattern
  • Magnet Pattern
  • Scalaz

Spray

webinar: introduction to spray

Spray

webinar: introduction to spray

Spray-Routing

  • internal DSL for working with user requests
  • type-safe, flexible and composable
  • easy to work with
webinar: introduction to spray

Spray-Routing: Route

type Route = RequestContext => Unit

Spray-Routing: Directive

  • Transform request
  • Filter request
  • Extract values from the request
  • Complete request

Spray-Routing: Marshaller

  • Not a part of Spray-routing module
  • “Marshalling” is the process of converting a higher-level (object) structure into some kind of lower-level representation, often a “wire format”
  • Marshaller converts object of T into HttpEntity
trait Marshaller[-T] {
  def apply(value: T, ctx: MarshallingContext)
}

Spray-Routing: Example

lazy val registration: Route = {
  (path("register") & put) {
    register { completeAs(Created) }
  }
}
lazy val authentication: Route = {
  (path("authenticate") & post) {
    authenticate { session =>
      setCookie(session.asCookie) {
        complete(Accepted)
      }
    }
  }
}
val modules: Route = registration ~ authentication ~ ...

Cake Pattern

trait RepositoryModule[M[+_]] {
  type Repository <: RepositoryLike
  implicit val M: Monad[M]
  val repo: Repository
  trait RepositoryLike {
    def getAllUsers(): M[List[Users]]
  }
}

trait RemoteRepositoryModule extends RepositoryModule[Task] {
  trait Repository extends RepositoryLike {
    def getAllUsers(): Task[List[Users]] = { ... }
  }
} 

trait InMemoRepoModule extends RepositoryModule[Id] {
  trait Repository extends RepositoryLike {
    def getAllUsers(): Id[List[Users]] = { ... }
  }
}

Cake Pattern

trait ServiceStackBase[M[+_]]
  extends RepositoryModule[M]

class Service 
  extends ServiceStackBase[Task] 
     with RemoteRepositoryModule {
  object repo extends Repository
}

class ServiceSpec
  extends ServiceStackBase[Id] 
     with InMemoRepoModule {
  object repo extends Repository
}

Spray-Routing: Example, Revised

trait ActivationModule[M[+_]]
  extends ServiceStorageModule[M]
     with MessengerModule[M]
     with SchedulingServiceModule[M]
     with StatisticsESModule
     with LoggingSupport {

  private implicit lazy val repo = collections.medikit.activations

  def initActivation(program: Program): D1[(ActivationKey, Activation)] = for {
    checked   <- checkLimit(program)
    activation = Activation(checked)
    result    <- storeActivation(activation)
  } yield result

  def findActivation(activationId: String): D1[Activation] = unwrap {
    findById[Activation](activationId).runLast.attempt >>= {
      case \/-(Some(res)) => Task.now(res)
      case _ => Task.fail(WrongActivationKeyRejection)
    }
  }

  def validKey: Directive1[String] = path(Segment) flatMap {
    case key if ObjectId.isValid(key) => provide(key)
    case _ => reject(WrongActivationKeyRejection)
  }

  def validateCode(submission: ConfirmationCode, activation: Activation): D0 = {
    if (activation.code == submission.code) pass else reject(ConfirmationCodeRejection)
  }

  def closeActivation(activationId: String): D0 = removeActivation(activationId).asD0

  private def rejectIfExceeded(jobs: List[JobDetail]): D0 = {
    val exceeded = jobs.length < maxJobs
    if (exceeded) pass else reject(ProgramLimitExceededRejection)
  }

  private def checkLimit(program: Program): D1[Program] = rejectIfExceeded {
    jobFactory findJobDetails program.phone
  } asD1 program

  def storeActivation(activation: Activation): D1[(ActivationKey, Activation)] = unwrap {
    store[Activation](activation).runLast >>= {
      case None => Task.fail(DOR("Couldn't store activation request"))
      case Some(result) => result.fold(Task.fail(_), Task.now) map {
        case (id, act) => ActivationKey(id) -> act
      }
    }
  }

  def removeActivation(id: String): D1[Boolean] = unwrap {
    removeById(id).runLast >>= {
      case Some(\/-(res)) => Task.now(res)
      case _              => Task.fail(DOR("Couldn't remove activation"))
    }
  }

  lazy val activation = {
    (pathPrefix("activation") & validKey) { implicit (key: ApiKey) =>
      (post & pathEnd) {
        (entityAs[Program] >>= initActivation) {
          case (key: ActivationKey, activation: Activation) =>
            sendMessage(ActivationCode(activation)) {
              complete(Created, key)
            }
        }
      } ~
      (post & path(ProgramIdSegment)) { (id: ProgramId) =>
        (entityAs[ConfirmationCode] & findActivation(apiKey)) {
          case (submitCode: ConfirmationCode, activation: Activation) =>
            validateCode(submitCode, activation) {
              (closeActivation(id) & scheduleProgram(activation.program)) {
                (sendMessage(Activated(activation)) & emitEvent(ProgramActivated(activation))) {
                  completeAs(Accepted) { "Program activated" }
                }
              }
            }
        }
      } ~
      (get & path(ProgramIdSegment)) { (id: ProgramId) =>
        findActivation(id) { (activation: Activation) =>
          sendMessage(ActivationCode(activation)) {
            completeAs(OK) { ActivationKey(activation.code) }
          }
        }
      }
    }
  }

}

SRM: Spray-Routing Module Pattern

Routing component

trait ActivationRoute[M[+_]] { module: ActivationModule[M] =>
  private val activate = {
    (post & pathEnd) {
      (entityAs[Program] >>= initActivation) {
        case (key: ActivationKey, activation: Activation) =>
          sendMessage(ActivationCode(activation)) {
            complete(Created, key)
          }
      }
    }
  }

  private val submit = {
    (post & validKey) { (aid: String) =>
      (entityAs[ConfirmationCode] & findActivation(aid)) {
        case (submitCode: ConfirmationCode, activation: Activation) =>
          validateCode(submitCode, activation) {
            (closeActivation(aid) & scheduleProgram(activation.program)) {
              (sendMessage(Activated(activation)) & emitEvent(ProgramActivated(activation))) {
                completeAs(Accepted) { "Program activated" }
              }
            }
          }
      }
    }
  }

  private val repeat = {
    (get & validKey) { (aid: String) =>
      findActivation(aid) { (activation: Activation) =>
        sendMessage(ActivationCode(activation)) {
          completeAs(OK) { ActivationKey(activation.code) }
        }
      }
    }
  }

  lazy val activation = pathPrefix("activation") {
    activate ~ submit ~ repeat
  }
}

SRM: Spray-Routing Module Pattern

Manager component

trait ActivationManager[M[+_]] { module: ActivationModule[M] =>
  import RemoteApi._
  import constraints.maxJobs

  private implicit lazy val repo = collections.medikit.activations

  def initActivation(program: Program): D1[(ActivationKey, Activation)] = for {
    checked   <- checkLimit(program)
    activation = Activation(checked)
    result    <- storeActivation(activation)
  } yield result

  def findActivation(activationId: String): D1[Activation] = unwrap {
    findById[Activation](activationId).runLast.attempt >>= {
      case \/-(Some(res)) => Task.now(res)
      case _ => Task.fail(WrongActivationKeyRejection)
    }
  }

  def validKey: Directive1[String] = path(Segment) flatMap {
    case key if ObjectId.isValid(key) => provide(key)
    case _ => reject(WrongActivationKeyRejection)
  }

  def validateCode(submission: ConfirmationCode, activation: Activation): D0 = {
    if (activation.code == submission.code) pass else reject(ConfirmationCodeRejection)
  }

  def closeActivation(activationId: String): D0 = removeActivation(activationId).asD0

  private def rejectIfExceeded(jobs: List[JobDetail]): D0 = {
    val exceeded = jobs.length < maxJobs
    if (exceeded) pass else reject(ProgramLimitExceededRejection)
  }

  private def checkLimit(program: Program): D1[Program] = rejectIfExceeded {
    jobFactory findJobDetails program.phone
  } asD1 program

  def storeActivation(activation: Activation): D1[(ActivationKey, Activation)] = unwrap {
    store[Activation](activation).runLast >>= {
      case None => Task.fail(DOR("Couldn't store activation request"))
      case Some(result) => result.fold(Task.fail(_), Task.now) map {
        case (id, act) => ActivationKey(id) -> act
      }
    }
  }

  def removeActivation(id: String): D1[Boolean] = unwrap {
    removeById(id).runLast >>= {
      case Some(\/-(res)) => Task.now(res)
      case _              => Task.fail(DOR("Couldn't remove activation"))
    }
  }

}

SRM: Spray-Routing Module Pattern

Module declaration

trait ActivationModule[M[+_]]
  extends ActivationManager[M]
     with ActivationRoutes[M] 
     with ServiceStorageModule[M]
     with MessengerModule[M]
     with SchedulingServiceModule[M]
     with StatisticsESModule
     with LoggingSupport

Revising Cake Pattern

trait StorageModule[M[+_]] {
  type Storage <: StorageLike
  implicit val M: Monad[M]
  val storage: Storage
  trait StorageLike {
    def store[A: Serializer](object: A): M[Key]
  }
}

Revising Cake Pattern

Won't be compiled!

def register = for {
  creds   <- credentials
  _       <- check(creds)
  account <- newAccount(creds)
  userId  <- storage store account // returns M[Key]
} yield userId

Can we substitute M with Directive1

without breaking abstraction?

Magnet pattern

def unwrap[T](magnet: MonadMagnet[T]): D1[T] = magnet()
trait MonadMagnet[T] {
  def apply(): D1[T]
}
object MonadMagnet {
  implicit def apply[M[_], V](monad: M[V])
    (implicit mmh: MonadMagnetHandler[M]) = {
    new MonadMagnet[V] {
      def apply(): D1[V] = mmh handle monad
    }
  }
}

Magnet pattern

implicit val taskMHandler: MonadMagnetHandler[Task] = {
  new MonadMagnetHandler[Task] {
    def handle[V](monad: Task[V]): D1[V] = onSuccess(monad)
  }
}
implicit val idMHandler: MonadMagnetHandler[Id] = {
  new MonadMagnetHandler[Id] {
    def handle[V](value: V): D1[V] = provide(monad)
  }
}

Now it perfectly compiles

def register = for {
  creds   <- credentials
  _       <- check(creds)
  account <- newAccount(creds)
  userId  <- unwrap { 
    storage store account
  } // returns D1[Key]
} yield userId

But there is another way!

trait ExtMonad[M[+_]] extends Monad[M] {
  def fork[A](monad: => M[A]): M[A]
  def withResult[A](monad: M[A])
    (callback: (Throwable \/ A) => Unit): Unit
}

Now it's more general

implicit def apply[M[+_], A](monad: M[A])
  (implicit M: ExtMonad[M]) = {
  new MonadUnwrapper[A] {
    def apply(): D1[A] = new D1[A] {
      def happly(f: (A :: HNil) => Route): Route = { ctx =>
        M.withResult(M.fork(monad)) {
          case \/-(result) => f(result :: HNil)(ctx)
          case -\/(ex) => ctx.failWith(ex)
        }
      }
    }
  }
}

Directive.scala

abstract class Directive[L <: HList] { self ⇒
  def happly(f: L ⇒ Route): Route
  def hflatMap[R <: HList](f: L ⇒ Directive[R]) = {
    new Directive[R] {
      def happly(g: R ⇒ Route) = self.happly { 
        values ⇒ f(values).happly(g) 
      }
    }
  }
  ...
}

type Directive0    = Directive[HNil]
type Directive1[A] = Directive[A :: HNil]

Simplify:

Directive[HList] to Directive1[A]

formFields("firstName", "lastName", "userName", "pwd") { 
  (fn, ln, un, pwd) => // Directive[HList]
    // implementation
}
Extract into a case class:
case class Credentials(firstName: String, lastName: String, 
  userName: String, pwd: String)

val data = formFields("firstName", "lastName", "userName", "pwd")

data.as(Credentials) { credentials =>
    // Directive1[Credentials]
}

Let the magic begin

implicit val d1Monad: Monad[D1] = new Monad[D1] {
  def point[A](a: => A): D1[A] = provide(a)
  def bind[A, B](fa: D1[A])(f: (A) => D1[B]): D1[B] = {
    fa hflatMap {
      case value :: HNil => f(value)
    }
  }
}

WARNING!

Don't write this

implicit val d1Monad: Monad[D1] = new Monad[D1] {
  def point[A](a: => A): D1[A] = provide(a)
  def bind[A, B](fa: D1[A])(f: (A) => D1[B]): D1[B] = {
    fa flatMap f
  }
}

What about Directive0?

              def point[A](a: => A): D0 = new D0 {
  def happly(f: HNil => Route): Route =
    { ctx => a; f(HNil)(ctx) }
}
              def bind[A, B](fa: D0)(f: A => D0): D0 = ???

For-comprehension

              val withSession: D1[Session] = for {
  creds    <- credentials
  _        <- authenticate(creds)
  session  <- initSession
} yield session

Implicit conversion to the rescue

implicit class Dir0Modifier(dir0: Directive0) {
  def flatMap(func: HNil => D1[Unit]): D1[Unit] = {
    new D1[Unit] {
      def happly(f: (::[Unit, HNil]) => Route): Route = {
        f(() :: HNil)
      }
    }
  }
}

So far so good, but ...

What's wrong with this code?

def tasksByUserId(id: String) = for {
  user   <- userById(id)
  tasks  <- scheduledTasks
} yield userTasks(user, tasks)

Monads are sequential

userById(id) scheduledTasks userTasks(user, tasks)

All Monads are Applicatives!

trait Monad[F[_]] extends Applicative[F]

Parallelize!

def tasksByUserId(id: String) = {
  (userById(id) |@| scheduledTasks) { userTasks }
}
def tasksByUserId(id: String) = {
  ^(userById(id), scheduledTasks) { userTasks }
}