不知道大家有没有发现,Tower Service trait 中,要求 ready + call
都接收一个 &mut self
,这也就意味着,我们无法将其很好地扩展到多线程上!
你可能会觉得,针对每一个 service,调用一下 service.clone()
,再传入不就好了吗?但假如一个 Service 包装了很多中间件,clone 的开销也是不可小觑的。
Tower 提供了这样一个中间件:将一个 multiproducer, single consumer channel
和某个服务绑定,客户端可以通过任意 Clone 得到的 Buffer Service 句柄发送请求,由 single consumer
,也就是 Worker 依次处理请求,为我们探测内层 Service 的状态,等到,同时将 Response 通过某种方式发送回请求者。
1. channel + semaphore 模型 进一步分析 Buffer 的定义:需要一个 mspc channel 来存放所有的请求,有多个 service 句柄可以发送请求,单个 worker 处理请求,但同时我们又希望对于该 channel 容量加以控制,更专业的术语叫做 backpressure
,即需要根据有限的服务处理能力 限制请求数量。 不难想到很多异步 runtime 都提供的 bounded mspc channel 非常适合描述这种模型,并且 tokio 等 runtime 都已经提供了完善的实现,可不幸的是,Tokio bounded mspc channel
并没有提供 polling-based 的接口,因此,我们不得不自己造一个 bounded mspc channel 的轮子。
幸运的是,我们也不是从零起步,完全可以借助已有的一些设施:unbounded channel + semaphore
Semaphore 这个之前已经在 ConcurrencyLimit 中使用过的数据结构,没错,又是将服务处理能力抽象为 Semaphore 的计数,每次需要向 Semaphore 获取许可 permit,才能接着往下进行,并且当请求处理结束之后,需要将许可返还给 Semaphore,这样才能循环利用。 有了 Semaphore,就可以对 unbounded channel 容量做出限制,正好符合我们的需求。
因此 Buffer 定义了下面的结构:
tx 是 mpsc channel 的发送端
semaphore 是所有 service 句柄共享的信号量
permit 是当前 service 获取的服务调用许可
handle 是对于服务内部错误的处理
#[derive(Debug)] pub struct Buffer <T, Request>where T: Service<Request>, { tx: mpsc::UnboundedSender<Message<Request, T::Future>>, semaphore: PollSemaphore, permit: Option <OwnedSemaphorePermit>, handle: Handle, }#[derive(Debug)] pub (crate ) struct Handle { inner: Arc<Mutex<Option <ServiceError>>>, }
Buffer 保存了 channel 的发送端,每一个 clone 出来的 buffer,都可以发送请求,与此同时还需要有一个对应的接收端,从 channel 中不断取出请求,这样才能驱动 producer-consumer
模型运转,不难想象我们其实在 Buffer::new 中处理了这件事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 impl <T, Request> Buffer<T, Request>where T: Service<Request>, T::Error: Into <crate::BoxError>, { pub fn new (service: T, bound: usize ) -> Self where T: Send + 'static , T::Future: Send , T::Error: Send + Sync , Request: Send + 'static , { let (service, worker) = Self::pair(service, bound); tokio::spawn(worker); service } pub fn pair (service: T, bound: usize ) -> (Buffer<T, Request>, Worker<T, Request>) where T: Send + 'static , T::Error: Send + Sync , Request: Send + 'static , { let (tx, rx) = mpsc::unbounded_channel(); let semaphore = Arc::new(Semaphore::new(bound)); let (handle, worker) = Worker::new(service, rx, &semaphore); let buffer = Buffer { tx, handle, semaphore: PollSemaphore::new(semaphore), permit: None , }; (buffer, worker) } fn get_worker_error (&self ) -> crate::BoxError { self .handle.get_error_on_closed() } }
2. 总结 在 Buffer 这个中间件中,我们不仅学习到 mspc 的请求处理模式还需要注意一个坑
如果我们定义了下面的中间件,简单地将内部的服务包装一层,返回一个 Box::pin 类型的 Future,看上去一切 Ok,但是,当内层服务包含了 Buffer 的时候,会出现一个巨大的问题!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 impl <S> Service<hyper::Request<Body>> for MyMiddleware<S>where S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static , S::Future: Send + 'static , { type Response = S::Response; type Error = S::Error; type Future = futures::future::BoxFuture<'static , Result <Self::Response, Self::Error>>; fn poll_ready (&mut self , cx: &mut Context<'_ >) -> Poll<Result <(), Self::Error>> { self .inner.poll_ready(cx) } fn call (&mut self , req: hyper::Request<Body>) -> Self::Future { let inner = self .inner.clone(); Box ::pin(async move { let response = inner.call(req).await ?; Ok (response) }) } }
具体什么问题呢?经过试验,出现了这个错误信息:panicked at 'buffer full; poll_ready must be called first'
what?我们的调用方式明明是 svc.ready().await.unwrap().call(req).await;
,按道理已经调用过 poll_ready()
了 进一步查看 Buffer 的代码中,是这样确保 poll_ready 和 call 之间的调用次序的:在 poll_ready 中获取 permit,在 call 中取出 permit,如果 call 中取出的 permit 为 None,说明错误调用了 poll_ready 和call,问题的关键在于 permit 。
1 2 3 4 5 6 7 8 9 let permit = ready!(self .semaphore.poll_acquire(cx)).ok_or_else(|| self .get_worker_error())?;let _permit = self .permit .take() .expect("buffer full; poll_ready must be called first" );
定位到这一点,接下来的问题就很好找了,在 MyMiddleware::call 中,实际上 Clone 得到的服务,里面已经没有 permit 了,或者说,每当一个 Buffer Clone 出去,该 clone 的服务都无 permit,这一点通过源码也很容易定位,其注释也强调了这一点:Clone 得到的 service,必须要 poll_ready 返回 Ready 才能调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 impl <T, Request> Clone for Buffer<T, Request>where T: Service<Request>, { fn clone (&self ) -> Self { Self { tx: self .tx.clone(), handle: self .handle.clone(), semaphore: self .semaphore.clone(), permit: None , } } }
我们解决的办法很简单,因为只有已经 ready 的服务才能调用,所以,我们 Clone 得到一个未 ready 的服务,然后将其和当前服务置换,这样闭包捕获的 inner 就是已经获取了 Semaphore permit 的服务了,再调用就不会有问题。
1 2 3 4 5 6 7 8 9 10 11 fn call (&mut self , req: hyper::Request<Body>) -> Self::Future { let clone = self .inner.clone(); let mut inner = std::mem::replace(&mut self .inner, clone); Box ::pin(async move { let response = inner.call(req).await ?; Ok (response) }) }