88
99import asyncio
1010import unittest
11+ import weakref
1112
1213from uvloop import _testbase as tb
1314
@@ -25,12 +26,11 @@ async def on_request(request):
2526 app = aiohttp .web .Application ()
2627 app .router .add_get ('/' , on_request )
2728
28- f = self .loop .create_server (
29- app .make_handler (),
30- '0.0.0.0' , '0' )
31- srv = self .loop .run_until_complete (f )
32-
33- port = srv .sockets [0 ].getsockname ()[1 ]
29+ runner = aiohttp .web .AppRunner (app )
30+ self .loop .run_until_complete (runner .setup ())
31+ site = aiohttp .web .TCPSite (runner , '0.0.0.0' , '0' )
32+ self .loop .run_until_complete (site .start ())
33+ port = site ._server .sockets [0 ].getsockname ()[1 ]
3434
3535 async def test ():
3636 # Make sure we're using the correct event loop.
@@ -45,11 +45,61 @@ async def test():
4545 self .assertEqual (result , PAYLOAD )
4646
4747 self .loop .run_until_complete (test ())
48- self .loop .run_until_complete (app .shutdown ())
49- self .loop .run_until_complete (app .cleanup ())
48+ self .loop .run_until_complete (runner .cleanup ())
49+
50+ def test_aiohttp_graceful_shutdown (self ):
51+ async def websocket_handler (request ):
52+ ws = aiohttp .web .WebSocketResponse ()
53+ await ws .prepare (request )
54+ request .app ['websockets' ].add (ws )
55+ try :
56+ async for msg in ws :
57+ await ws .send_str (msg .data )
58+ finally :
59+ request .app ['websockets' ].discard (ws )
60+ return ws
61+
62+ async def on_shutdown (app ):
63+ for ws in set (app ['websockets' ]):
64+ await ws .close (
65+ code = aiohttp .WSCloseCode .GOING_AWAY ,
66+ message = 'Server shutdown' )
67+
68+ asyncio .set_event_loop (self .loop )
69+ app = aiohttp .web .Application ()
70+ app .router .add_get ('/' , websocket_handler )
71+ app .on_shutdown .append (on_shutdown )
72+ app ['websockets' ] = weakref .WeakSet ()
73+
74+ runner = aiohttp .web .AppRunner (app )
75+ self .loop .run_until_complete (runner .setup ())
76+ site = aiohttp .web .TCPSite (runner , '0.0.0.0' , '0' )
77+ self .loop .run_until_complete (site .start ())
78+ port = site ._server .sockets [0 ].getsockname ()[1 ]
79+
80+ async def client ():
81+ async with aiohttp .ClientSession () as client :
82+ async with client .ws_connect (
83+ 'http://127.0.0.1:{}' .format (port )) as ws :
84+ await ws .send_str ("hello" )
85+ async for msg in ws :
86+ assert msg .data == "hello"
87+
88+ client_task = asyncio .ensure_future (client ())
89+
90+ async def stop ():
91+ await asyncio .sleep (0.1 )
92+ try :
93+ await asyncio .wait_for (runner .cleanup (), timeout = 0.1 )
94+ finally :
95+ try :
96+ client_task .cancel ()
97+ await client_task
98+ except asyncio .CancelledError :
99+ pass
100+
101+ self .loop .run_until_complete (stop ())
50102
51- srv .close ()
52- self .loop .run_until_complete (srv .wait_closed ())
53103
54104
55105@unittest .skipIf (skip_tests , "no aiohttp module" )
0 commit comments