A single unified programming model for
Now - if we replace
Everthing will still hold for both local and distributed Actors
// Message Protocol
public static class Greeting implements Serializable {
public final String who;
public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor {
// Put mutable state in here..
public void onReceive(Object message) {
if(message instanceof Greeting)
System.out.println("Hello " + ((Greeting) message).who);
}
}
// Message Protocol
case class Greeting(who: String)
class GreetingActor extends Actor {
// Put mutable state in here..
def receive = {
// Define the behavior here..
case Greeting(who) => println(s"Hello $who")
}
}
// Creates an Actor system
final ActorSystem system = ActorSystem.create("MySystem");
final ActorRef greeter =
system.actorOf(new Props(GreetingActor.class), "greeter");
// ^^ Create ^^ Actor configuration ^^ Actor name
Asynchronous and Non-blocking (Fire-and-forget)
Attention: Messages must be immutable!
Java
greeter.tell(new Greeting("Charlie"), ActorRef.noSender());
Scala
greeter ! Greeting("Charlie")
class SomeActor extends UntypedActor {
public void onReceive(Object msg) {
if(msg instanceof User) {
User user = (User) msg;
// Reply to sender
getSender().tell("Hi " + user.name, getSelf());
}
}
}
class SomeActor extends Actor {
def receive = {
// Reply to sender
case User(name) => sender ! (s"Hi $name")
}
}
akka {
actor {
# Configure Remote Provider
provider = akka.remote.RemoteActorRefProvider
deployment {
/greeter {
# Define Remote Path
remote = akka://MySystem@machine1:2552
}
}
}
}
Procedure<Object> happy = new Procedure<Object>() {
public void apply(Object msg) {
if(msg instanceof HappyMessage) {
HappyMessage happyMsg = (HappyMessage) msg;
...
}
}
});
public void onReceive(Object msg) {
if(msg instanceof HappyMessage)
getContext.become(happy);
}
def happy: Receive = {
case HappyMessage => ...
}
def receive = {
case HappyMessage => context become(happy)
}
Programmatically
int nrOfInstances = 5;
ActorRef router =
system.actorOf(new Props(SomeActor)
.withRouter(new RoundRobinRouter(nrOfInstances)));
Configuration
akka.actor.deployment {
/myRouter {
router = round-robin
nr-of-instances = 5
}
}
Java
ActorRef router =
system.actorOf(new Props(SomeActor)
.withRouter(new FromConfig(), "myRouter");
int lowerBound = 2;
int upperBound = 15;
DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
ActorRef routerWithResizer = system.actorOf(
new Props(SomeActor.class)
.withRouter(new RoundRobinRouter(resizer)));
Resizer can be definied in configuration file as well
Every Actor has a default supervisor strategy
Strategy can be overriden
public class Supervisor extends UntypedActor {
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override public Directive apply(Throwable t) {
if (t instanceof ArithmeticException)
return resume();
else if (t instanceof NullPointerException)
return restart();
else
return escalate();
}
});
...
...
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
}
class Supervisor extends Actor {
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: Exception => Escalate
}
}