Event Sourcing – with Akka and Play



Event Sourcing – with Akka and Play

0 0


play-event-sourcing-talk-19052014

Event Sourcing Talk

On Github schleichardt / play-event-sourcing-talk-19052014

Event Sourcing

with Akka and Play

Michael Schleichardt https://github.com/schleichardt

Table of Contents

  • Java API for Concurrency
  • Akka
  • Actor Model
  • Event Sourcing
  • Akka Persistence
  • Akka Cluster Sharding
  • Code Example

Java API for Concurrency

Toolbox

  • synchronized
  • ReentrantLock
  • AtomicInteger
  • ExecutorService
  • callbacks
  • java.util.concurrent.Future
  • ...

Exceptions

You create threads to parallelize work.

How does a created thread report an exception to the caller?

Locks

If a lock is frequently hit, frequently computing time is wasted to suspend, check and resume the threads.

Wouldn't it be more useful for Threads to do something useful for the application instead of being suspended?

Scalability

Imagine, you did it right, no deadlocks, performant code, readability.

Can you scale out?

Akka

Akka

  • toolkit for concurrent applications
  • API for Java and Scala
  • Play 2 is built on top of it
    • Play 2.2 uses Akka 2.2
    • Play 2.3 uses Akka 2.3
  • uses Actor Model
  • location transparent code for working on multiple machines up to a cluster

Actor Model

Actor Model Basics

  • mathematical model of concurrent computation
  • there are actors and messages
  • actors can contain state, but state is never exposed directly
  • actor has messages in a mailbox (by default in-memory)
  • actors are decoupled from each other, from outside they look all alike
  • man in the middle is a feature, a makes them testable

Actor Model Messages

  • you can only talk with actors via (immutable) messages
  • message processing is asynchronous
  • one actor processes one message at a time, so no explicit locking is necessary
  • do not wait or block, use async tools that produce messages
  • use a scheduler to send messages in the future

Actor Model

  • is not deterministic, message order undefined, (transparent) processing in parallel possible via multiple actors
  • model not deterministic => tests can be not deterministic
  • harder to create deadlocks
  • maintaining the mailbox can cause a lot of CPU overhead
  • lmax disruptor or just Scala Futures can be alternatives

Back to Akka

ActorSystem system = ActorSystem.create("Bank");
ActorRef account = system.actorOf(
    Props.create(BankAccountActor.class), "account");
ActorRef customer = system.actorOf(Props.create(CustomerActor.class), "you");
account.tell(new Deposit(5000), customer);
account.tell(new Withdraw(3000), customer);
account.tell(new Withdraw(3000), customer);
//no blocking, here may 0 to 3 messages processed by the BankAccountActor
public class BankAccountActor extends UntypedActor {
        private final BankAccount account = new BankAccount();

        //never use blocking code here, like waiting for future results
        public void onReceive(Object message) throws Exception {
            if (message instanceof Deposit) {
                account.deposit(((Deposit) message).centAmount());
                sender().tell(new Ack(), self());
            } else if (message instanceof Withdraw) {
                final long centAmount = ((Deposit) message).centAmount();
                if (centAmount <= account.getBalanceInCents()) {
                    account.withdraw(centAmount);
                    sender().tell(new Ack(), self());
                } else
                    sender().tell(new IllegalArgumentException(
                                        "Insufficient balance"), self());

Before Event Sourcing

  • CRUD
  • Database contains full model state and is single source of truth
  • newState = f(proposedState) = proposedState
  • locking necessary for object over writes
  • ORM, Ebean & Hibernate =>object/relational impedence, but data should be in the focus, not code
  • Changes on the model class need database migrations, tough
  • previous states not available

A higher throughput, please.

Wouldn't it be great to see not only the current state but previous states and how their are achieved?

Event Sourcing

Event Sourcing Basics

  • we don't save the application state directly but the sequence of events that created the state
  • Events are records of the past, they can't be changed, even cancellation of an order is just a new event
    • Examples for events:
    • cart created, item added to cart, item removed from cart, order cart, order payed, order return, order refund
  • events != commands, command is request to do something, for example an order cart command can fail if no shipping address is provided, an order cart event is the record of the successful order creation
  • our source of truth does not contain the result but the road to it

Event Sourcing is functional

  • newState = f(previousState, event)
  • currentBalance = previousBalance + depositAmount
  • state3 = f(f(f(f(initialState, ev0), ev1), ev2), ev3)
  • aah, a fold left:
    • val eventList = List(ev0, ev1, ev2, ev3)
    • val currentState = eventList.foldLeft(initialState)(f)
  • in my programs the initial state is the first event that is recorded
  • my flavor: events recorded per entity

Event Sourcing Vocabulary

  • Memory Model/MemoryImage: representation of current state as object in RAM
  • Complete Rebuild: may drop memory model and rerun all events for the entity
  • Temporal Query: retrieve previous state
    • current: state3 = f(f(f(f(initialState, ev0), ev1), ev2), ev3)
    • previous: state2 = f(f(f(initialState, ev0), ev1), ev2)

(Binary) Snapshots

  • without snapshot
    • val eventList = List(ev0, ev1, ev2, ev3, ev4, ..., ev100)
    • eventList.size == 101
    • val currentState = eventList.foldLeft(initialState)(f)
  • with snapshot
    • val eventList = List(ev90, ev91, ev92, ev93, ev94, ..., ev100)
    • eventList.size == 11
    • val currentState = eventList.foldLeft(state90)(f)

(Queryable/Searchable) Snapshots

  • the event based nature of the write model is not suitable to search the current state
    • e.g., search orders for one customer
  • so events should be used to fill a (document-oriented) database like MongoDB or ElasticSearch
  • the separation of the models you read from and you write to is called CQRS http://martinfowler.com/bliki/CQRS.html

Event Sourcing

  • you store all the events, binary snapshots and queryable snapshots => more disc space is needed
  • events are immutable and you just append the to a store => less locking in write model, higher throughput possible
  • even if you avoid DELETE in CRUD SQL, UPDATE command can also delete data, in ES the data is still available

Event Sourcing

  • Error in production? Crappy logging? But you stored events? You will love it!
  • higher cognitive load to work with ES
  • eventual consistency: CRUD: single source of truth, ES: write model up date, views & snapshots take additional time to update, but if you wait long enough (and don't change sth.) it will be consistent

Command Sourcing

  • Saves state by commands instead of events, which may fail
  • Fowler describes a mixture of ES and Command Sourcing on http://martinfowler.com/eaaDev/EventSourcing.html
  • external interaction needs gateways
    • or commands are exectuted multiple times, e.g., payments
    • queries may respond differently, e.g., exchange rates for currencies
  • better for deeper analytics what failed
  • commands maybe corrected in which order they happen

Akka Persistence

ES to Akka Persistence

  • Events are immutable messages
  • state is internal and can change at any time, can be dropped
  • newState = f(previousState, event) is side effect free and functions are in the foreground
  • that sounds like functional programming and actors
  • Akka with Scala or Akka with Java 8 seem to be a good fit to implement Event Sourcing
  • One tool to implement ES can be the Akka Persistence module

Akka Persistence

  • for Java and Scala
  • since Akka 2.3, so not by default compatible with Play 2.2 but with Play 2.3
  • enables stateful actors by persisting messages and optional binary snapshots in an append only way
  • it also provides channels to provide at-least-once message delivery semantics
    • If you don't acknowledge messages, they will be sent again
  • Eligotech developed the Akka extension eventsourced
  • Typesafe and Eligotech partnered to create Akka Persistence and used the ideas from eventsourced
  • experimental => no guarantees module is binary or source compatible with the next version

Key concepts: Journal

  • persistent store of persistent messages
  • operations: append, read, delete
  • different plugins for databases

Persistence in Akka Persistence

  • 2 kinds of storage: journal for events, snapshot store for binary snapshots
  • different plugins available (most of them only for journals)
    • LevelDB (local)
    • Cassandra (async driver)
    • DynamoDB
    • HBase
    • MapDB
    • MongoDB (via Casbah, a sync driver)

For dev, test and prod you can use different databases by changing values in Typesafe Config/Play Configuration

#application.conf
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"

akka.contrib.persistence.mongodb.mongo.urls =
    ["192.168.0.1:27017","192.168.0.2:27017"]
akka.contrib.persistence.mongodb.mongo.journal-write-concern =
    "Acknowledged"
#reference.conf in MongoDB plugin
akka-contrib-mongodb-persistence-journal.
        class = "akka.contrib.persistence.mongodb.MongoJournal"

Key concepts: EventsourcedProcessor

  • receives non persistent commands, validates them, generates events and persists them and after that, it applies it to the internal state
    • be careful to not persist events that cause exceptions, but you can delete these events
  • not every message needs to be persisted
  • stateful actor, contains the memory model and f
  • has a processorId, String as ID, should be stable, override it!
  • if reinstantiated, automatically receives the events to replay
//actually it is a Scala class
abstract class UntypedEventsourcedProcessor {
    //[...]

    //the "new" name for onReceive
    public abstract void onReceiveCommand(Object message);

    //the "new" name for onReceive at the complete rebuild
    public abstract void onReceiveRecover(Object message);
}
@Override public void onReceiveCommand(Object message) {
if (message instanceof DepositCommand) //[...]
else if (message instanceof WithdrawCommand) {
    final long centAmount = ((DepositCommand) message).getCentAmount();
    if (centAmount <= account.getBalanceInCents()) {
        final ActorRef sender = sender();
        persist(new WithdrawEvent(centAmount), new Procedure<WithdrawEvent>() {
            @Override
            public void apply(WithdrawEvent event) throws Exception {
                updateState(event);
                sender.tell(new Ack(), self());
            }
        });
    } else sender().tell(new IllegalArgumentException("No sufficient balance"), self());
} else if (message.equals("snap")) saveSnapshot(account.copy());
@Override
public void onReceiveRecover(Object message) {
    if (message instanceof SnapshotOffer)//binary snapshot
        account = (BankAccount)((SnapshotOffer)message).snapshot();
    else
        updateState(message);
}

private BankAccount account = new BankAccount();

private void updateState(Object event) {
    if (event instanceof DepositEvent) {
        account.deposit(((DepositEvent)event).getCentAmount());
    } else if (event instanceof WithdrawEvent) {
        account.withdraw(((WithdrawEvent)event).getCentAmount());
    }
}

What is the difference between this and the previous slide?

@Override
public void onReceiveRecover(Object message) {
    if (message instanceof SnapshotOffer)
        amount = (Integer)((SnapshotOffer)message).snapshot();
    else
        updateState(message);
}

private int amount = 0;

private void updateState(Object event) {
    if (event instanceof DepositEvent) {
        amount += ((DepositEvent)event).getCentAmount();
    } else if (event instanceof WithdrawEvent) {
        amount -= ((WithdrawEvent)event).getCentAmount();
    }
}

In Event Sourcing it is hard to change events, but easy to change the in-memory model.

Key concepts: View

  • mirrors the state of another persistent actor
  • read only
  • useful to extract the events, for example if a new database representation should be created
  • useful for temporal queries
  • can be automatically updated if new events arrive the processor => very useful to provide feed to browser via Server-Sent Events or WebSockets
class PastView extends UntypedView {
    @Override
    public String processorId() {
        return "some-processor-id";
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Persistent) {
        // set state, persistent message contains event as payload
        }
    }

    //since nethod returns 5 it only replays to events sequence number 5
    @Override
    public long autoUpdateReplayMax(){
        return 5;//TODO as constructor parameter
    }

And how do I do it on multiple servers for HA and updates?

The sequence number of the persisted events is generated inside the EventSourcedProcessor, not in the database (see Persistence.scala in Akka code base).

So in one cluster one Processor can only exist once.

With Akka Cluster Singleton you can deploy it on one machine, but they all need to fit into the RAM.

So what do I do if the they won't fit all in the RAM?

Akka Cluster Sharding

Akka Cluster Sharding

  • distribute actors ("entries") across several nodes in the cluster
  • messaging with entries is location transparent, but you need to give hints
  • automatic distribution
  • automatic rebalacing, be careful about stateful actors which do not use event sourcing
  • contrib module
//setup
ClusterSharding.get(system).start("accounts",
    Props.create(BankAccountActor.class), messageExtractor);

public interface MessageExtractor {
    String entryId(Object message);//bankAccountId
    Object entryMessage(Object message);//payload, new DepositCommand(500)
    String shardId(Object message);//bankAccount.getId().hashCode % 5013
};

//send message
ActorRef region = ClusterSharding.get(system).shardRegion("accounts");
region.tell(new Evelope("bankAccountId", new DepositCommand(500)), getSelf());

PubSub in Cluster

from http://doc.akka.io/docs/akka/2.3.2/contrib/distributed-pub-sub.html

public class Publisher extends UntypedActor {

  // activate the extension
  ActorRef mediator =
    DistributedPubSubExtension.get(getContext().system()).mediator();

  public void onReceive(Object msg) {
    if (msg instanceof String) {
      String in = (String) msg;
      String out = in.toUpperCase();
      String topic = "content";
      mediator.tell(new DistributedPubSubMediator.Publish(topic, out),
        getSelf());
    } else {
      unhandled(msg);
    }
  }
}

PubSub in Cluster

from http://doc.akka.io/docs/akka/2.3.2/contrib/distributed-pub-sub.html

public class Subscriber extends UntypedActor {
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  public Subscriber() {
    ActorRef mediator =
      DistributedPubSubExtension.get(getContext().system()).mediator();
    // subscribe to the topic named "content"
    mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()),
      getSelf());
  }

  public void onReceive(Object msg) {
    if (msg instanceof String)
      log.info("Got: {}", msg);
    else if (msg instanceof DistributedPubSubMediator.SubscribeAck)
      log.info("subscribing");
    else
      unhandled(msg);
  }
}

use WebSockets to publish events to the browser

Actors for WebSockets

http://www.playframework.com/documentation/2.3.x/JavaWebSockets

import akka.actor.*;

public class MyWebSocketActor extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MyWebSocketActor.class, out);
    }

    private final ActorRef out;

    public MyWebSocketActor(ActorRef out) {
        this.out = out;
    }

    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell("I received your message: " + message, self());
        }
    }
}

use Server-Sent Events to publish events to the browser

Server-Sent Events

def onStart(channel: PlayChannel[JsValue]) {
        actorRef ! SubscriberActor.SetChannel(channel)
      }
      val enumerator: Enumerator[JsValue] = Concurrent.unicast(onStart,
          actorRef ! PoisonPill, (_, _) => actorRef ! PoisonPill)
      Ok.feed(enumerator &> EventSource()).as("text/event-stream")
class SubscriberActor(topic: String) extends Actor {
    def ready(channel: PlayChannel[JsValue]): Actor.Receive = {
        case e: Any => channel.push(JsString(e.toString))
    }

Akka Cluster

  • startup time may takes seconds, testing is time consuming
  • deployment: please use correct host name, even mixed use of 127.0.0.1 and localhost may cause dead letters
  • if you work with one node make sure it is a seed node or joins itself to form a cluster
  • scaling up to 2400 nodes
  • heartbeat messages, nothing happens, but still traffic

I hope your next green field application is event sourced.

Demo

0