Michael Schleichardt https://github.com/schleichardt
You create threads to parallelize work.
How does a created thread report an exception to the caller?
If a lock is frequently hit, frequently computing time is wasted to suspend, check and resume the threads.
Wouldn't it be more useful for Threads to do something useful for the application instead of being suspended?
Imagine, you did it right, no deadlocks, performant code, readability.
Can you scale out?
ActorSystem system = ActorSystem.create("Bank");
ActorRef account = system.actorOf(
Props.create(BankAccountActor.class), "account");
ActorRef customer = system.actorOf(Props.create(CustomerActor.class), "you");
account.tell(new Deposit(5000), customer);
account.tell(new Withdraw(3000), customer);
account.tell(new Withdraw(3000), customer);
//no blocking, here may 0 to 3 messages processed by the BankAccountActor
public class BankAccountActor extends UntypedActor {
private final BankAccount account = new BankAccount();
//never use blocking code here, like waiting for future results
public void onReceive(Object message) throws Exception {
if (message instanceof Deposit) {
account.deposit(((Deposit) message).centAmount());
sender().tell(new Ack(), self());
} else if (message instanceof Withdraw) {
final long centAmount = ((Deposit) message).centAmount();
if (centAmount <= account.getBalanceInCents()) {
account.withdraw(centAmount);
sender().tell(new Ack(), self());
} else
sender().tell(new IllegalArgumentException(
"Insufficient balance"), self());A higher throughput, please.
Wouldn't it be great to see not only the current state but previous states and how their are achieved?
For dev, test and prod you can use different databases by changing values in Typesafe Config/Play Configuration
#application.conf
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.contrib.persistence.mongodb.mongo.urls =
["192.168.0.1:27017","192.168.0.2:27017"]
akka.contrib.persistence.mongodb.mongo.journal-write-concern =
"Acknowledged"
#reference.conf in MongoDB plugin
akka-contrib-mongodb-persistence-journal.
class = "akka.contrib.persistence.mongodb.MongoJournal"
//actually it is a Scala class
abstract class UntypedEventsourcedProcessor {
//[...]
//the "new" name for onReceive
public abstract void onReceiveCommand(Object message);
//the "new" name for onReceive at the complete rebuild
public abstract void onReceiveRecover(Object message);
}@Override public void onReceiveCommand(Object message) {
if (message instanceof DepositCommand) //[...]
else if (message instanceof WithdrawCommand) {
final long centAmount = ((DepositCommand) message).getCentAmount();
if (centAmount <= account.getBalanceInCents()) {
final ActorRef sender = sender();
persist(new WithdrawEvent(centAmount), new Procedure<WithdrawEvent>() {
@Override
public void apply(WithdrawEvent event) throws Exception {
updateState(event);
sender.tell(new Ack(), self());
}
});
} else sender().tell(new IllegalArgumentException("No sufficient balance"), self());
} else if (message.equals("snap")) saveSnapshot(account.copy());@Override
public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer)//binary snapshot
account = (BankAccount)((SnapshotOffer)message).snapshot();
else
updateState(message);
}
private BankAccount account = new BankAccount();
private void updateState(Object event) {
if (event instanceof DepositEvent) {
account.deposit(((DepositEvent)event).getCentAmount());
} else if (event instanceof WithdrawEvent) {
account.withdraw(((WithdrawEvent)event).getCentAmount());
}
}What is the difference between this and the previous slide?
@Override
public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer)
amount = (Integer)((SnapshotOffer)message).snapshot();
else
updateState(message);
}
private int amount = 0;
private void updateState(Object event) {
if (event instanceof DepositEvent) {
amount += ((DepositEvent)event).getCentAmount();
} else if (event instanceof WithdrawEvent) {
amount -= ((WithdrawEvent)event).getCentAmount();
}
}In Event Sourcing it is hard to change events, but easy to change the in-memory model.
class PastView extends UntypedView {
@Override
public String processorId() {
return "some-processor-id";
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
// set state, persistent message contains event as payload
}
}
//since nethod returns 5 it only replays to events sequence number 5
@Override
public long autoUpdateReplayMax(){
return 5;//TODO as constructor parameter
}And how do I do it on multiple servers for HA and updates?
The sequence number of the persisted events is generated inside the EventSourcedProcessor, not in the database (see Persistence.scala in Akka code base).
So in one cluster one Processor can only exist once.
With Akka Cluster Singleton you can deploy it on one machine, but they all need to fit into the RAM.
So what do I do if the they won't fit all in the RAM?
//setup
ClusterSharding.get(system).start("accounts",
Props.create(BankAccountActor.class), messageExtractor);
public interface MessageExtractor {
String entryId(Object message);//bankAccountId
Object entryMessage(Object message);//payload, new DepositCommand(500)
String shardId(Object message);//bankAccount.getId().hashCode % 5013
};
//send message
ActorRef region = ClusterSharding.get(system).shardRegion("accounts");
region.tell(new Evelope("bankAccountId", new DepositCommand(500)), getSelf());from http://doc.akka.io/docs/akka/2.3.2/contrib/distributed-pub-sub.html
public class Publisher extends UntypedActor {
// activate the extension
ActorRef mediator =
DistributedPubSubExtension.get(getContext().system()).mediator();
public void onReceive(Object msg) {
if (msg instanceof String) {
String in = (String) msg;
String out = in.toUpperCase();
String topic = "content";
mediator.tell(new DistributedPubSubMediator.Publish(topic, out),
getSelf());
} else {
unhandled(msg);
}
}
}from http://doc.akka.io/docs/akka/2.3.2/contrib/distributed-pub-sub.html
public class Subscriber extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber() {
ActorRef mediator =
DistributedPubSubExtension.get(getContext().system()).mediator();
// subscribe to the topic named "content"
mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()),
getSelf());
}
public void onReceive(Object msg) {
if (msg instanceof String)
log.info("Got: {}", msg);
else if (msg instanceof DistributedPubSubMediator.SubscribeAck)
log.info("subscribing");
else
unhandled(msg);
}
}use WebSockets to publish events to the browser
http://www.playframework.com/documentation/2.3.x/JavaWebSockets
import akka.actor.*;
public class MyWebSocketActor extends UntypedActor {
public static Props props(ActorRef out) {
return Props.create(MyWebSocketActor.class, out);
}
private final ActorRef out;
public MyWebSocketActor(ActorRef out) {
this.out = out;
}
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
out.tell("I received your message: " + message, self());
}
}
}use Server-Sent Events to publish events to the browser
def onStart(channel: PlayChannel[JsValue]) {
actorRef ! SubscriberActor.SetChannel(channel)
}
val enumerator: Enumerator[JsValue] = Concurrent.unicast(onStart,
actorRef ! PoisonPill, (_, _) => actorRef ! PoisonPill)
Ok.feed(enumerator &> EventSource()).as("text/event-stream")
class SubscriberActor(topic: String) extends Actor {
def ready(channel: PlayChannel[JsValue]): Actor.Receive = {
case e: Any => channel.push(JsString(e.toString))
}I hope your next green field application is event sourced.