Akka documentation contains a basic example of a simple WebSocket server, but it just sends back a message it has received. Building on top of it to have an Actor handling user messages and sending data back to the user is not obvious.
Let’s build a simple WebSocket server with a /clock
endpoint. After connecting, a client can subscribe to receive time in the requested timezones. It is not a very useful application, but the concept is the same if you would like to build a subscription for chat, real-time game, or stock price updates.
We will be using Akka Streams with Akka HTTP. I will assume, at the very least, you read an introduction for each of these.
First, we need an Actor that will hold a state of the individual connection: a list of timezones the client has subscribed to receive. Additionally, it will have a Timer set to send an update to the user every second. We will call it Clock
.
Possible commands that we expect are:
import akka.http.scaladsl.model.ws.Message
object Clock {
sealed trait Command
object Command {
case class UserRequest(msg: Message) extends Command
case class Connection(actorRef: ActorRef[Clock.Outgoing]) extends Command
case class ConnectionFailure(ex: Throwable) extends Command
case object Complete extends Command
case object Tick extends Command
}
}
-
UserRequest
- Raw message from client is represented usingakka.http.scaladsl.model.ws.Message
in Akka HTTP. We will wrap it in this case class before passing it to Clock Actor. -
Connection
- We will have 2 Actors that will be sending messages to each other. The first is Clock Actor, 2nd Actor is on the other end; it is the user connection. We will provideActorRef
of that connection Actor in this Command. -
ConnectionFailure
- If there is some problem in theFlow
and it emits an exception, we will receive it wrapped in this Command. -
Complete
- We will receive this Command when the client closes the connection. -
Tick
- this a Command sent by our timer every second. We will send a message to the user on each Tick.
Connection Actor will handle the following outgoing messages:
import akka.http.scaladsl.model.ws.Message
object Clock {
sealed trait Command
/* ... */
sealed trait Outgoing
object Outgoing {
case class MessageToClient(msg: Message) extends Outgoing
case object Completed extends Outgoing
case class Failure(ex: Exception) extends Outgoing
}
}
MessageToClient
is a message that we send to the client.Completed
We can send this message to terminate the streamFailure
this message will close the stream with an exception.
Technically speaking, we will not define the Behavior
of the connection Actor in a typical way. Instead, we will create it using ActorSource.actorRef
. That is why I keep the Outgoing
type inside the Clock
Actor.
We have incoming and outgoing message types defined. Let’s define behaviour.
We will start our Clock
in a pending state, awaiting for Connection
message. In this state, we may receive Complete
and ConnectionFailure
messages too, for which we will stop the Actor. For any other message, we do nothing. When we receive the Connection
command, we will start a timer sending a Tick
every second and update Actor’s behaviour to the connected
state.
import java.time.{ ZoneId, ZonedDateTime }
import java.util.concurrent.TimeUnit
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior }
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success, Try }
object Clock {
sealed trait Command
/* ... */
sealed trait Outgoing
/* ... */
def pending(): Behavior[Command] = Behaviors.receive { (ctx, msg) =>
msg match {
case Command.Connection(ref) =>
Behaviors.withTimers { timers =>
timers.startTimerAtFixedRate(Command.Tick, Duration.apply(1, TimeUnit.SECONDS))
connected(ref, Seq.empty)
}
case Command.ConnectionFailure(ex) =>
ctx.log.warn("WebSocket failed", ex)
Behaviors.stopped
case Command.Complete =>
ctx.log.info("User closed connection")
Behaviors.stopped
case _ => Behaviors.same
}
}
def connected(actorRef: ActorRef[Clock.Outgoing], timeZones: Seq[ZoneId]): Behavior[Command] =
Behaviors.receive { (ctx, msg) =>
msg match {
case Command.Connection(_) => // shouldn't happen at this point
Behaviors.same
case Command.UserRequest(TextMessage.Strict(txt)) =>
Try {
ZoneId.of(txt)
} match {
case Success(tz) =>
actorRef ! Outgoing.MessageToClient(TextMessage(s"Subscribed to $tz"))
connected(actorRef, timeZones :+ tz)
case Failure(ex) =>
actorRef ! Outgoing.MessageToClient(TextMessage(ex.getMessage))
Behaviors.same
}
case Command.UserRequest(uk) =>
actorRef ! Outgoing.MessageToClient(TextMessage(s"Received unknown: $uk"))
Behaviors.same
case Command.Tick =>
actorRef ! Outgoing.MessageToClient(
TextMessage(timeZones.map(tz => tz.toString -> ZonedDateTime.now(tz)).toString())
)
Behaviors.same
case Command.ConnectionFailure(ex) =>
ctx.log.warn("WebSocket failed", ex)
Behaviors.stopped
case Command.Complete =>
ctx.log.info("User closed connection")
Behaviors.stopped
}
}
}
In this state, we need to handle all Commands
. I think most of the code is self-explanatory. Notice the actorRef ! Outgoing.MessageToClient
calls. This is how we send a message back to the client.
With Clock Actor defined, let’s build the Flow
. Akka HTTP WebSocket handler requires Flow[Message, Message, Any]
.
That means we need a Sink[Message, NotUsed]
and Source[Message, Unit]
Here is the incoming sink, taking Message
from a client, converting it to Clock.Command.UserRequest
and forwarding it to Clock actor.
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.typed.scaladsl.{ ActorSink, ActorSource }
object ClockWebSocketFlow {
def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
val incoming: Sink[Message, NotUsed] = Flow[Message]
.map(Clock.Command.UserRequest)
.to(
ActorSink.actorRef[Clock.Command](
clockActor,
onCompleteMessage = Clock.Command.Complete,
onFailureMessage = { case ex => Clock.Command.ConnectionFailure(ex) }
)
)
}
}
ActorSink.actorRef
requires:
ref
- ActorRef
of the Actor to be used.
onCompleteMessage
- a message sent to the Actor when the stream completes.
onFailureMessage
- a function from a Throwable
to a type that the target Actor can accept. Sent when the stram throws an Exception
.
object ClockWebSocketFlow {
def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
val incoming: Sink[Message, NotUsed] = ???
val outgoing: Source[Message, Unit] = ActorSource
.actorRef[Clock.Outgoing](
completionMatcher = { case Clock.Outgoing.Completed => },
failureMatcher = { case Clock.Outgoing.Failure(ex) => ex },
bufferSize = 10,
OverflowStrategy.dropHead
)
.mapMaterializedValue(client => clockActor ! Clock.Command.Connection(client))
.map {
case Clock.Outgoing.MessageToClient(msg) => msg
// These are already handled by completionMatcher and failureMatcher so should never happen
// added them just to silence exhaustiveness warning
case Clock.Outgoing.Completed | Clock.Outgoing.Failure(_) => TextMessage.Strict("")
}
}
}
Next, we define the outgoing Source. That is the Connection Actor to which we will be sending Clock.Outgoing
messages. The most important part is the mapMaterializedValue
call. This is how we will pass ActorRef
of Connection Actor to Clock Actor.
Next step is to build the flow:
object ClockWebSocketFlow {
def apply(clockActor: ActorRef[Clock.Command]): Flow[Message, Message, NotUsed] = {
val incoming: Sink[Message, NotUsed] = ???
val outgoing: Source[Message, Unit] = ???
Flow.fromSinkAndSource(incoming, outgoing)
}
}
All that is left is creating the HTTP client to handle our Flow which we will take care of in RootActor
. Our real root/top-level actor will handle SpawnProtocol.Command
, allowing us to create actors outside of the ActorSystem context. We can’t access actor context within Akka HTTP route definition and use it to spawn actors.
import java.util.concurrent.TimeUnit
import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props, SpawnProtocol }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{ handleWebSocketMessages, path }
import akka.http.scaladsl.server.Route
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
object RootActor {
def apply(system: ActorSystem[SpawnProtocol.Command]): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
implicit val timeout = Timeout(3, TimeUnit.SECONDS)
implicit val scheduler = system.scheduler
val websocketRoute =
path("clock") {
val clientF = system.ask[ActorRef[Clock.Command]] { ref =>
Spawn[Clock.Command](
Clock.pending(),
"client",
props = Props.empty,
replyTo = ref
)
}
val client = Await.result(clientF, Duration.Inf)
handleWebSocketMessages(
ClockWebSocketFlow(client)
)
}
startHttpServer(websocketRoute)(system)
Behaviors.empty
}
private def startHttpServer(routes: Route)(implicit system: ActorSystem[_]): Unit = {
import system.executionContext
val futureBinding = Http().newServerAt("localhost", 8080).bind(routes)
futureBinding.onComplete {
case Success(binding) =>
val address = binding.localAddress
system.log.info("Server online at http://{}:{}/", address.getHostString, address.getPort)
case Failure(ex) =>
system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
system.terminate()
}
}
}
The important part here is the implementation of path("clock")
We use the Ask pattern with SpawnProtocol
to create an instance of ClockActor
. We have to Await
for it and use the result to create a Flow that will handle the WebSocket connection.
The last step is the Main
class.
import java.util.concurrent.TimeUnit
import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed._
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.util.Timeout
object Main extends App {
val system = ActorSystem[SpawnProtocol.Command](SpawnProtocol(), "WebSocketServer")
implicit val timeout = Timeout(3, TimeUnit.SECONDS)
implicit val scheduler = system.scheduler
system.ask[ActorRef[Nothing]](ref => Spawn[Nothing](RootActor(system), "root", Props.empty, ref))
}
We create ActorSystem and use SpawnProtocol
to create a RootActor
.
That is it. You can now run it and use the WebSocket client to test it.
References:
- Full code on GitHub
- Akka Streams Introduction
- Akka HTTP Introduction
- Akka HTTP WebSocket Support
- Chat With Akka HTTP Websockets
- Building a Reactive, Distributed Messaging Server in Scala and Akka with WebSockets