基于Akka的REST框架Spray,由于采用Akka提供的Actor模型,写出来的代码与通常的REST有很大的区别。从Spray-Can接收Http请求,到处理具体的HTTP动词以实现业务逻辑,都是通过传递消息的方式。这些消息再交由Akka Actor接收处理。消息皆定义为Scala提供的样例类(Case Class),从而保证消息为immutable。既然如此,当我们在运用Spray时,就需要转换思想,从传统的面向对象中解放出来,充分理解Event、Command,及其传递的Message。这近似于事件驱动(Event Driven),因而在对领域建模时,也需要将Event看做是领域模型的一等公民,并将领域逻辑建模为一种状态机。
我们可以首先根据Http请求,确定需要哪些消息。这样的Request消息几乎与Http动词以及Resource对应,例如:
sealed trait RequestMessage
case class GetCustomer(id: Long) extends RequestMessage
case class DeleteCustomer(id: Long) extends RequestMessage
case class UpdateCustomer(customer: Customer) extends RequestMessage
case class CreateCustomer(dueDate: Date, text: String) extends RequestMessage
|
现在可以定义一个Actor来响应客户端请求。该Actor要求派生自Akka Actor,同时还要实现Spray提供的HttpService trait。若要支持Json格式,还需实现Spray-Json提供的Json4sSupport。例如:
class CustomerServiceActor extends Actor with HttpService with CustomerRequestCreator with Json4sSupport {
implicit def actorRefFactory = context
val json4sFormats = DefaultFormats
def receive = runRoute(customerRoute)
val customerRoute =
path("customers" / LongNumber) {
id: Long =>
get {
rejectEmptyResponse {
handleRequest {
GetCustomer(id)
}
}
} ~ put {
entity(as[Customer]) {
customer =>
handleRequest {
UpdateCustomer(new Customer(id, customer.birthDate, customer.name))
}
}
} ~ delete {
handleRequest {
DeleteCustomer(id)
}
}
} ~ path("customers") {
get {
handleRequest {
AllCustomers
}
}
} ~ post {
entity(as[Customer]) {
customer =>
handleRequest {
CreateCustomer(customer.birthDate, customer.name)
}
}
}
def handleRequest(message: RequestMessage): Route =
ctx => customerRequest(ctx, Props[CustomerActor], message)
}
|
该Actor与其他Akka Actor的不同之处在于它的receive方法调用了Spray提供的runRoute()方法。传入的参数customerRoute是Spray提供的DSL格式的Route。Route中对应支持Http动词。其中,get先调用了Spray提供的rejectEmptyResponse来过滤掉空的响应消息;而post方法则调用entity将url中的消息转换为Customer消息:
case class Customer(id: Long, birthDate: Date, name: String)
|
在Route中,可以定义多个Path,不同的Path支持不同的Http动词。在接受到请求后,通过handleRequest()方法来处理请求。这里的实现是将RequestMessage消息再转交到了另一个Actor。我会在后面介绍。
不过,这里的CustomerServiceActor事实上是将支持Route的CustomerService与Actor合并在了一起,职责显得不够单一。因此,更好地做法是为CustomerService单独定义trait,并使其派生自HttpService,从而将实现代码从Actor中分开。分开的这个CustomerService,更类似于一个Controller。例如:
class CustomerServiceActor extends Actor with CustomerService with CustomerRequestCreator {
implicit def actorRefFactory = context
def receive = runRoute(customerRoute)
def handleRequest(message: RequestMessage): Route =
ctx => customerRequest(ctx, Props[CustomerActor], message)
}
trait CustomerService extends HttpService with Json4sSupport {
val json4sFormats = DefaultFormats
val customerRoute =
path("customers" / LongNumber) {
id: Long =>
get {
rejectEmptyResponse {
handleRequest {
GetCustomer(id)
}
}
} ~ put {
entity(as[Customer]) {
customer =>
handleRequest {
UpdateCustomer(new Customer(id, customer.birthDate, customer.name))
}
}
} ~ delete {
handleRequest {
DeleteCustomer(id)
}
}
} ~ path("customers") {
get {
handleRequest {
AllCustomers
}
}
} ~ post {
entity(as[Customer]) {
customer =>
handleRequest {
CreateCustomer(customer.birthDate, customer.name)
}
}
}
def handleRequest(message: RequestMessage): Route
}
|
这样分离的好处还在于可以定义多个包含不同Route的Controller,然后在Actor中用~ combinator操作符将它们合并起来。例如:
def receive = handleTimeouts orElse runRoute(
new CustomerService1.customerService1 ~ new CustomerService2.customerService2)
|
Spray使用了Akka IO来支持Request、Reponse流的输入输出。IO实质上也是一个Akka Actor。所以在程序的主入口,既需要创建提供了REST服务的Actor,又需要通过Akka IO发出绑定Http的消息。与Akka一样,我们可以定义一个类派生自App:
object Boot extends App {
implicit val system = ActorSystem("spray-demo")
val service = system.actorOf(Props[CustomerServiceActor], "customer-service")
IO(Http) ! Http.Bind(service, interface = "localhost", port = 8080)
}
|
要启动ActorSystem,需要配置Akka,而要绑定Spray服务容器,也需要配置Spray-Can。一个简单的配置文件application.conf为:
akka {
loglevel = INFO
}
spray.can.server {
request-timeout = 1s
}
|
现在再来看CustomerService中的handlRequest()方法的实现。该方法负责将对应的RequestMessage,例如GetCustomer、UpdateCustomer等转交给其下的Actor,即CustomerActor。它相当于是从RequestMessage到Domain Event的一个协调器(或控制器,也可以看做是Command)。同样,还是将Actor与具体的业务分离,因此定义了CustomerActor与CustomerOperation。
trait CustomerOperations {
def getById(id: Long) = {
OneCustomer(new Customer(id, new Date(1000), "item1"))
}
def all() = {
try{
ListCustomers(List(new Customer(100, new Date(1000), "item1")))
} catch{
case e:Exception => {
println(e.getMessage())
List()
}
}
}
def delete(id: Long) = {
Success("deleted successfully")
}
def create (dueDate: Date, text: String) = {
Created("")
}
def update (customer: Customer) = {
getById(customer.id)
}
}
class CustomerActor extends Actor with CustomerOperations{
val log = Logging(context.system, this)
def receive = {
case GetCustomer(id) => sender ! getById(id)
case UpdateCustomer(item) => sender ! update(item)
case DeleteCustomer(id) => sender ! delete(id)
case CreateCustomer(dueDate, text) => sender ! create(dueDate, text)
case AllCustomers => sender ! all()
}
}
|
CustomerActor会作为CustomerRequest(同样是一个Actor)内部的Actor,即CustomerRequest中的target。CustomerActor实例是在CustomerRequest伴生对象中创建的:
trait CustomerRequest extends Actor with Json4sSupport {
def requestContext: RequestContext
def target: ActorRef
def message: RequestMessage
import context._
target ! message
def receive = {
case Created(location) => complete(spray.http.StatusCodes.Created, location)
case OneCustomer(customer) => complete(OK, customer)
case ListCustomers(customers) => complete(OK, customers)
case Success(message) => complete(OK, message)
case Error(message) => complete(BadRequest, message)
case ReceiveTimeout => complete(GatewayTimeout, "Request Timeout")
}
def complete[T <: AnyRef](status: StatusCode, obj: T) = {
requestContext.complete(status, obj)
stop(self)
}
override val supervisorStrategy =
OneForOneStrategy() {
case e => {
complete(InternalServerError, Error(e.getMessage))
Stop
}
}
}
object CustomerRequest {
case class WithProps(requestContext: RequestContext, props: Props, message: RequestMessage) extends CustomerRequest {
lazy val target = context.actorOf(props)
implicit def json4sFormats = DefaultFormats
}
}
|
由于target在CustomerRequest中为lazy变量,所以只有在需要的时候才会创建CustomerActor。CustomerRequest定义了工厂:
trait CustomerRequestCreator {
this: Actor =>
def customerRequest(requestContext: RequestContext, props: Props, message: RequestMessage) =
context.actorOf(Props(new WithProps(requestContext, props, message)))
}
|
而在CustomerRequest的定义中,会首先通过target发送message,这个message即CustomerService的Route中传递过来的RequestMessage。此时的target是CustomerActor,所以CustomerActor会接收到这些RequestMessage,然后调用CustomerOperation的相关操作,并由CustomerActor的sender发送消息。之后CustomerRequest会收到这些消息。
这种Actor的模型不同于传统的编程模型,但遵循的设计原则是一脉相承的,同样需要遵循单一职责原则。在编写Spray的代码时,需要事先分析清楚消息的传递路径,并分辨出承担这些消息传递与消息处理的Actor。同时,还应该尽量保证Actor与REST服务及操作分离,以保证REST服务与Actor的单一性。
示例代码在github上可以下载。