|
7 | 7 | // modified, or distributed except according to those terms. |
8 | 8 |
|
9 | 9 | use std::{ |
10 | | - future::Future, |
11 | | - pin::Pin, |
| 10 | + future::poll_fn, |
| 11 | + sync::atomic, |
12 | 12 | task::{Context, Poll}, |
13 | 13 | }; |
14 | 14 |
|
15 | | -use futures_core::ready; |
16 | | -use tokio::sync::mpsc::UnboundedSender; |
17 | | - |
18 | 15 | use crate::{ |
19 | | - conn::pool::{Inner, Pool, QUEUE_END_ID}, |
| 16 | + conn::pool::{Pool, QUEUE_END_ID}, |
20 | 17 | error::Error, |
21 | | - Conn, |
22 | 18 | }; |
23 | 19 |
|
24 | | -use std::sync::{atomic, Arc}; |
25 | | - |
26 | | -/// Future that disconnects this pool from a server and resolves to `()`. |
| 20 | +/// Disconnect this pool from a server and resolves to `()`. |
27 | 21 | /// |
28 | | -/// **Note:** This Future won't resolve until all active connections, taken from it, |
| 22 | +/// **Note:** This won't resolve until all active connections, taken from the poll, |
29 | 23 | /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. |
30 | | -#[derive(Debug)] |
31 | | -#[must_use = "futures do nothing unless you `.await` or poll them"] |
32 | | -struct DisconnectPool { |
33 | | - pool_inner: Arc<Inner>, |
34 | | - drop: Option<UnboundedSender<Option<Conn>>>, |
35 | | -} |
36 | | - |
37 | | -impl DisconnectPool { |
38 | | - fn new(pool: Pool) -> Self { |
39 | | - Self { |
40 | | - pool_inner: pool.inner, |
41 | | - drop: Some(pool.drop), |
42 | | - } |
43 | | - } |
44 | | -} |
| 24 | +pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { |
| 25 | + let inner = pool.inner; |
| 26 | + let drop = pool.drop; |
45 | 27 |
|
46 | | -impl Future for DisconnectPool { |
47 | | - type Output = Result<(), Error>; |
| 28 | + inner.close.store(true, atomic::Ordering::Release); |
48 | 29 |
|
49 | | - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
50 | | - self.pool_inner.close.store(true, atomic::Ordering::Release); |
51 | | - let mut exchange = self.pool_inner.exchange.lock().unwrap(); |
52 | | - exchange.spawn_futures_if_needed(&self.pool_inner); |
| 30 | + let f = |cx: &mut Context| { |
| 31 | + let mut exchange = inner.exchange.lock().unwrap(); |
| 32 | + exchange.spawn_futures_if_needed(&inner); |
53 | 33 | exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID); |
54 | | - drop(exchange); |
55 | | - |
56 | | - if self.pool_inner.closed.load(atomic::Ordering::Acquire) { |
57 | | - Poll::Ready(Ok(())) |
58 | | - } else { |
59 | | - match self.drop.take() { |
60 | | - Some(drop) => match drop.send(None) { |
61 | | - Ok(_) => { |
62 | | - // Recycler is alive. Waiting for it to finish. |
63 | | - ready!(Box::pin(drop.closed()).as_mut().poll(cx)); |
64 | | - Poll::Ready(Ok(())) |
65 | | - } |
66 | | - Err(_) => { |
67 | | - // Recycler seem dead. No one will wake us. |
68 | | - Poll::Ready(Ok(())) |
69 | | - } |
70 | | - }, |
71 | | - None => Poll::Pending, |
| 34 | + Poll::Ready(()) |
| 35 | + }; |
| 36 | + poll_fn(f).await; |
| 37 | + |
| 38 | + if inner.closed.load(atomic::Ordering::Acquire) { |
| 39 | + Ok(()) |
| 40 | + } else { |
| 41 | + match drop.send(None) { |
| 42 | + Ok(_) => { |
| 43 | + // Recycler is alive. Waiting for it to finish. |
| 44 | + drop.closed().await; |
| 45 | + Ok(()) |
| 46 | + } |
| 47 | + Err(_) => { |
| 48 | + // Recycler seem dead. No one will wake us. |
| 49 | + Ok(()) |
72 | 50 | } |
73 | 51 | } |
74 | 52 | } |
75 | 53 | } |
76 | | - |
77 | | -pub(crate) async fn disconnect_pool(pool: Pool) -> Result<(), Error> { |
78 | | - DisconnectPool::new(pool).await |
79 | | -} |
0 commit comments