逸言

基于Akka的REST框架Spray

| Comments

基于Akka的REST框架Spray,由于采用Akka提供的Actor模型,写出来的代码与通常的REST有很大的区别。从Spray-Can接收Http请求,到处理具体的HTTP动词以实现业务逻辑,都是通过传递消息的方式。这些消息再交由Akka Actor接收处理。消息皆定义为Scala提供的样例类(Case Class),从而保证消息为immutable。既然如此,当我们在运用Spray时,就需要转换思想,从传统的面向对象中解放出来,充分理解Event、Command,及其传递的Message。这近似于事件驱动(Event Driven),因而在对领域建模时,也需要将Event看做是领域模型的一等公民,并将领域逻辑建模为一种状态机。

我们可以首先根据Http请求,确定需要哪些消息。这样的Request消息几乎与Http动词以及Resource对应,例如:

sealed trait RequestMessage

case class GetCustomer(id: Long) extends RequestMessage
case class DeleteCustomer(id: Long) extends RequestMessage
case class UpdateCustomer(customer: Customer) extends RequestMessage
case class CreateCustomer(dueDate: Date, text: String) extends RequestMessage

现在可以定义一个Actor来响应客户端请求。该Actor要求派生自Akka Actor,同时还要实现Spray提供的HttpService trait。若要支持Json格式,还需实现Spray-Json提供的Json4sSupport。例如:

class CustomerServiceActor extends Actor with HttpService with CustomerRequestCreator with Json4sSupport {
  implicit def actorRefFactory = context

  val json4sFormats = DefaultFormats

  def receive = runRoute(customerRoute)

  val customerRoute =
    path("customers" / LongNumber) {
      id: Long =>
        get {
          rejectEmptyResponse {
            handleRequest {
              GetCustomer(id)
            }
          }
        } ~ put {
          entity(as[Customer]) {
            customer =>
              handleRequest {
                UpdateCustomer(new Customer(id, customer.birthDate, customer.name))
              }
          }
        } ~ delete {
          handleRequest {
            DeleteCustomer(id)
          }
        }
    } ~ path("customers") {
      get {
        handleRequest {
          AllCustomers
        }
      }
    } ~ post {
      entity(as[Customer]) {
        customer =>
          handleRequest {
            CreateCustomer(customer.birthDate, customer.name)
          }
      }
    }

  def handleRequest(message: RequestMessage): Route =
    ctx => customerRequest(ctx, Props[CustomerActor], message)
}

该Actor与其他Akka Actor的不同之处在于它的receive方法调用了Spray提供的runRoute()方法。传入的参数customerRoute是Spray提供的DSL格式的Route。Route中对应支持Http动词。其中,get先调用了Spray提供的rejectEmptyResponse来过滤掉空的响应消息;而post方法则调用entity将url中的消息转换为Customer消息:

case class Customer(id: Long, birthDate: Date, name: String)

在Route中,可以定义多个Path,不同的Path支持不同的Http动词。在接受到请求后,通过handleRequest()方法来处理请求。这里的实现是将RequestMessage消息再转交到了另一个Actor。我会在后面介绍。

不过,这里的CustomerServiceActor事实上是将支持Route的CustomerService与Actor合并在了一起,职责显得不够单一。因此,更好地做法是为CustomerService单独定义trait,并使其派生自HttpService,从而将实现代码从Actor中分开。分开的这个CustomerService,更类似于一个Controller。例如:

class CustomerServiceActor extends Actor with CustomerService with CustomerRequestCreator {
  implicit def actorRefFactory = context

  def receive = runRoute(customerRoute)

  def handleRequest(message: RequestMessage): Route =
    ctx => customerRequest(ctx, Props[CustomerActor], message)
}

trait CustomerService extends HttpService with Json4sSupport {
  val json4sFormats = DefaultFormats

  val customerRoute =
    path("customers" / LongNumber) {
      id: Long =>
        get {
          rejectEmptyResponse {
            handleRequest {
              GetCustomer(id)
            }
          }
        } ~ put {
          entity(as[Customer]) {
            customer =>
              handleRequest {
                UpdateCustomer(new Customer(id, customer.birthDate, customer.name))
              }
          }
        } ~ delete {
          handleRequest {
            DeleteCustomer(id)
          }
        }
    } ~ path("customers") {
      get {
        handleRequest {
          AllCustomers
        }
      }
    } ~ post {
      entity(as[Customer]) {
        customer =>
          handleRequest {
            CreateCustomer(customer.birthDate, customer.name)
          }
      }
    }

  def handleRequest(message: RequestMessage): Route
}

这样分离的好处还在于可以定义多个包含不同Route的Controller,然后在Actor中用~ combinator操作符将它们合并起来。例如:

def receive = handleTimeouts orElse runRoute(
  new CustomerService1.customerService1 ~  new CustomerService2.customerService2)

Spray使用了Akka IO来支持Request、Reponse流的输入输出。IO实质上也是一个Akka Actor。所以在程序的主入口,既需要创建提供了REST服务的Actor,又需要通过Akka IO发出绑定Http的消息。与Akka一样,我们可以定义一个类派生自App:

object Boot extends App {
  implicit val system = ActorSystem("spray-demo")
  val service = system.actorOf(Props[CustomerServiceActor], "customer-service")
  IO(Http) ! Http.Bind(service, interface = "localhost", port = 8080)
}

要启动ActorSystem,需要配置Akka,而要绑定Spray服务容器,也需要配置Spray-Can。一个简单的配置文件application.conf为:

akka {
  loglevel = INFO
}

spray.can.server {
  request-timeout = 1s
}

现在再来看CustomerService中的handlRequest()方法的实现。该方法负责将对应的RequestMessage,例如GetCustomer、UpdateCustomer等转交给其下的Actor,即CustomerActor。它相当于是从RequestMessage到Domain Event的一个协调器(或控制器,也可以看做是Command)。同样,还是将Actor与具体的业务分离,因此定义了CustomerActor与CustomerOperation。

trait CustomerOperations {
  def getById(id: Long) = {
    OneCustomer(new Customer(id, new Date(1000), "item1"))
  }

  def all() =  {
    try{
      ListCustomers(List(new Customer(100, new Date(1000), "item1")))
    } catch{
      case e:Exception => {
        println(e.getMessage())
        List()
      }
    }
  }

  def delete(id: Long) = {
    Success("deleted successfully")
  }

  def create (dueDate: Date, text: String) =  {

    Created("")
  }

  def update (customer: Customer) = {
    getById(customer.id)
  }
}

class CustomerActor extends Actor with CustomerOperations{
  val log = Logging(context.system, this)
  def receive = {
    case GetCustomer(id) => sender ! getById(id)
    case UpdateCustomer(item) => sender ! update(item)
    case DeleteCustomer(id) => sender ! delete(id)
    case CreateCustomer(dueDate, text) => sender ! create(dueDate, text)
    case AllCustomers => sender ! all()
  }
}

CustomerActor会作为CustomerRequest(同样是一个Actor)内部的Actor,即CustomerRequest中的target。CustomerActor实例是在CustomerRequest伴生对象中创建的:

trait CustomerRequest extends Actor with Json4sSupport {
  def requestContext: RequestContext

  def target: ActorRef

  def message: RequestMessage

  import context._

  target ! message

  def receive = {
    case Created(location) => complete(spray.http.StatusCodes.Created, location)
    case OneCustomer(customer) => complete(OK, customer)
    case ListCustomers(customers) => complete(OK, customers)
    case Success(message) => complete(OK, message)
    case Error(message) => complete(BadRequest, message)
    case ReceiveTimeout => complete(GatewayTimeout, "Request Timeout")
  }

  def complete[T <: AnyRef](status: StatusCode, obj: T) = {
    requestContext.complete(status, obj)
    stop(self)
  }

  override val supervisorStrategy =
    OneForOneStrategy() {
      case e => {
        complete(InternalServerError, Error(e.getMessage))
        Stop
      }
    }
}

object CustomerRequest {

  case class WithProps(requestContext: RequestContext, props: Props, message: RequestMessage) extends CustomerRequest {
    lazy val target = context.actorOf(props)

    implicit def json4sFormats = DefaultFormats
  }

}

由于target在CustomerRequest中为lazy变量,所以只有在需要的时候才会创建CustomerActor。CustomerRequest定义了工厂:

trait CustomerRequestCreator {
  this: Actor =>

  def customerRequest(requestContext: RequestContext, props: Props, message: RequestMessage) =
    context.actorOf(Props(new WithProps(requestContext, props, message)))
}

而在CustomerRequest的定义中,会首先通过target发送message,这个message即CustomerService的Route中传递过来的RequestMessage。此时的target是CustomerActor,所以CustomerActor会接收到这些RequestMessage,然后调用CustomerOperation的相关操作,并由CustomerActor的sender发送消息。之后CustomerRequest会收到这些消息。

这种Actor的模型不同于传统的编程模型,但遵循的设计原则是一脉相承的,同样需要遵循单一职责原则。在编写Spray的代码时,需要事先分析清楚消息的传递路径,并分辨出承担这些消息传递与消息处理的Actor。同时,还应该尽量保证Actor与REST服务及操作分离,以保证REST服务与Actor的单一性。

示例代码在github上可以下载

Comments