33
33
AsyncRLock ,
34
34
)
35
35
from ..._async_compat .network import AsyncNetworkUtil
36
- from ..._async_compat .util import AsyncUtil
37
36
from ..._conf import (
38
37
PoolConfig ,
39
38
WorkspaceConfig ,
40
39
)
41
40
from ..._deadline import (
42
41
connection_deadline ,
43
42
Deadline ,
44
- merge_deadlines ,
45
- merge_deadlines_and_timeouts ,
46
43
)
47
44
from ..._exceptions import BoltError
48
45
from ..._routing import RoutingTable
@@ -222,18 +219,18 @@ async def health_check(connection_, deadline_):
222
219
223
220
@abc .abstractmethod
224
221
async def acquire (
225
- self , access_mode , timeout , acquisition_timeout ,
226
- database , bookmarks , liveness_check_timeout
222
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
227
223
):
228
224
""" Acquire a connection to a server that can satisfy a set of parameters.
229
225
230
226
:param access_mode:
231
- :param timeout: total timeout (including potential preparation)
232
- :param acquisition_timeout: timeout for actually acquiring a connection
227
+ :param timeout: timeout for the core acquisition
228
+ (excluding potential preparation like fetching routing tables).
233
229
:param database:
234
230
:param bookmarks:
235
231
:param liveness_check_timeout:
236
232
"""
233
+ ...
237
234
238
235
def kill_and_release (self , * connections ):
239
236
""" Release connections back into the pool after closing them.
@@ -397,12 +394,11 @@ def __repr__(self):
397
394
self .address )
398
395
399
396
async def acquire (
400
- self , access_mode , timeout , acquisition_timeout ,
401
- database , bookmarks , liveness_check_timeout
397
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
402
398
):
403
399
# The access_mode and database is not needed for a direct connection,
404
400
# it's just there for consistency.
405
- deadline = merge_deadlines_and_timeouts (timeout , acquisition_timeout )
401
+ deadline = Deadline . from_timeout_or_deadline (timeout )
406
402
return await self ._acquire (
407
403
self .address , deadline , liveness_check_timeout
408
404
)
@@ -464,22 +460,6 @@ def __repr__(self):
464
460
"""
465
461
return "<{} addresses={!r}>" .format (self .__class__ .__name__ , self .get_default_database_initial_router_addresses ())
466
462
467
- @asynccontextmanager
468
- async def _refresh_lock_deadline (self , deadline ):
469
- timeout = deadline .to_timeout ()
470
- if timeout == float ("inf" ):
471
- timeout = - 1
472
- if not await self .refresh_lock .acquire (timeout = timeout ):
473
- raise ClientError (
474
- "pool failed to update routing table within {!r}s (timeout)"
475
- .format (deadline .original_timeout )
476
- )
477
-
478
- try :
479
- yield
480
- finally :
481
- self .refresh_lock .release ()
482
-
483
463
@property
484
464
def first_initial_routing_address (self ):
485
465
return self .get_default_database_initial_router_addresses ()[0 ]
@@ -513,7 +493,7 @@ async def get_or_create_routing_table(self, database):
513
493
return self .routing_tables [database ]
514
494
515
495
async def fetch_routing_info (
516
- self , address , database , imp_user , bookmarks , deadline
496
+ self , address , database , imp_user , bookmarks , acquisition_timeout
517
497
):
518
498
""" Fetch raw routing info from a given router address.
519
499
@@ -524,32 +504,32 @@ async def fetch_routing_info(
524
504
:type imp_user: str or None
525
505
:param bookmarks: iterable of bookmark values after which the routing
526
506
info should be fetched
527
- :param deadline : connection acquisition deadline
507
+ :param acquisition_timeout : connection acquisition timeout
528
508
529
509
:return: list of routing records, or None if no connection
530
510
could be established or if no readers or writers are present
531
511
:raise ServiceUnavailable: if the server does not support
532
512
routing, or if routing support is broken or outdated
533
513
"""
514
+ deadline = Deadline .from_timeout_or_deadline (acquisition_timeout )
534
515
cx = await self ._acquire (address , deadline , None )
535
516
try :
536
- with connection_deadline (cx , deadline ):
537
- routing_table = await cx .route (
538
- database or self .workspace_config .database ,
539
- imp_user or self .workspace_config .impersonated_user ,
540
- bookmarks
541
- )
517
+ routing_table = await cx .route (
518
+ database or self .workspace_config .database ,
519
+ imp_user or self .workspace_config .impersonated_user ,
520
+ bookmarks
521
+ )
542
522
finally :
543
523
await self .release (cx )
544
524
return routing_table
545
525
546
526
async def fetch_routing_table (
547
- self , * , address , deadline , database , imp_user , bookmarks
527
+ self , * , address , acquisition_timeout , database , imp_user , bookmarks
548
528
):
549
529
""" Fetch a routing table from a given router address.
550
530
551
531
:param address: router address
552
- :param deadline: deadline
532
+ :param acquisition_timeout: connection acquisition timeout
553
533
:param database: the database name
554
534
:type: str
555
535
:param imp_user: the user to impersonate while fetching the routing
@@ -563,7 +543,7 @@ async def fetch_routing_table(
563
543
new_routing_info = None
564
544
try :
565
545
new_routing_info = await self .fetch_routing_info (
566
- address , database , imp_user , bookmarks , deadline
546
+ address , database , imp_user , bookmarks , acquisition_timeout
567
547
)
568
548
except Neo4jError as e :
569
549
# checks if the code is an error that is caused by the client. In
@@ -606,7 +586,7 @@ async def fetch_routing_table(
606
586
return new_routing_table
607
587
608
588
async def _update_routing_table_from (
609
- self , * routers , database , imp_user , bookmarks , deadline ,
589
+ self , * routers , database , imp_user , bookmarks , acquisition_timeout ,
610
590
database_callback
611
591
):
612
592
""" Try to update routing tables with the given routers.
@@ -621,11 +601,8 @@ async def _update_routing_table_from(
621
601
async for address in AsyncNetworkUtil .resolve_address (
622
602
router , resolver = self .pool_config .resolver
623
603
):
624
- if deadline .expired ():
625
- return False
626
604
new_routing_table = await self .fetch_routing_table (
627
- address = address ,
628
- deadline = deadline ,
605
+ address = address , acquisition_timeout = acquisition_timeout ,
629
606
database = database , imp_user = imp_user , bookmarks = bookmarks
630
607
)
631
608
if new_routing_table is not None :
@@ -645,7 +622,7 @@ async def _update_routing_table_from(
645
622
return False
646
623
647
624
async def update_routing_table (
648
- self , * , database , imp_user , bookmarks , timeout = None ,
625
+ self , * , database , imp_user , bookmarks , acquisition_timeout = None ,
649
626
database_callback = None
650
627
):
651
628
""" Update the routing table from the first router able to provide
@@ -656,7 +633,7 @@ async def update_routing_table(
656
633
table
657
634
:type imp_user: str or None
658
635
:param bookmarks: bookmarks used when fetching routing table
659
- :param timeout: timeout in seconds for how long to try updating
636
+ :param acquisition_timeout: connection acquisition timeout
660
637
:param database_callback: A callback function that will be called with
661
638
the database name as only argument when a new routing table has been
662
639
acquired. This database name might different from `database` if that
@@ -665,10 +642,7 @@ async def update_routing_table(
665
642
666
643
:raise neo4j.exceptions.ServiceUnavailable:
667
644
"""
668
- deadline = merge_deadlines_and_timeouts (
669
- timeout , self .pool_config .update_routing_table_timeout
670
- )
671
- async with self ._refresh_lock_deadline (deadline ):
645
+ async with self .refresh_lock :
672
646
routing_table = await self .get_or_create_routing_table (database )
673
647
# copied because it can be modified
674
648
existing_routers = set (routing_table .routers )
@@ -681,22 +655,24 @@ async def update_routing_table(
681
655
if await self ._update_routing_table_from (
682
656
self .first_initial_routing_address , database = database ,
683
657
imp_user = imp_user , bookmarks = bookmarks ,
684
- deadline = deadline , database_callback = database_callback
658
+ acquisition_timeout = acquisition_timeout ,
659
+ database_callback = database_callback
685
660
):
686
661
# Why is only the first initial routing address used?
687
662
return
688
663
if await self ._update_routing_table_from (
689
664
* (existing_routers - {self .first_initial_routing_address }),
690
665
database = database , imp_user = imp_user , bookmarks = bookmarks ,
691
- deadline = deadline , database_callback = database_callback
666
+ acquisition_timeout = acquisition_timeout ,
667
+ database_callback = database_callback
692
668
):
693
669
return
694
670
695
671
if not prefer_initial_routing_address :
696
672
if await self ._update_routing_table_from (
697
673
self .first_initial_routing_address , database = database ,
698
674
imp_user = imp_user , bookmarks = bookmarks ,
699
- deadline = deadline ,
675
+ acquisition_timeout = acquisition_timeout ,
700
676
database_callback = database_callback
701
677
):
702
678
# Why is only the first initial routing address used?
@@ -714,8 +690,8 @@ async def update_connection_pool(self, *, database):
714
690
await super (AsyncNeo4jPool , self ).deactivate (address )
715
691
716
692
async def ensure_routing_table_is_fresh (
717
- self , * , access_mode , database , imp_user , bookmarks , deadline = None ,
718
- database_callback = None
693
+ self , * , access_mode , database , imp_user , bookmarks ,
694
+ acquisition_timeout = None , database_callback = None
719
695
):
720
696
""" Update the routing table if stale.
721
697
@@ -730,15 +706,16 @@ async def ensure_routing_table_is_fresh(
730
706
:return: `True` if an update was required, `False` otherwise.
731
707
"""
732
708
from neo4j .api import READ_ACCESS
733
- async with self ._refresh_lock_deadline ( deadline ) :
709
+ async with self .refresh_lock :
734
710
routing_table = await self .get_or_create_routing_table (database )
735
711
if routing_table .is_fresh (readonly = (access_mode == READ_ACCESS )):
736
712
# Readers are fresh.
737
713
return False
738
714
739
715
await self .update_routing_table (
740
716
database = database , imp_user = imp_user , bookmarks = bookmarks ,
741
- timeout = deadline , database_callback = database_callback
717
+ acquisition_timeout = acquisition_timeout ,
718
+ database_callback = database_callback
742
719
)
743
720
await self .update_connection_pool (database = database )
744
721
@@ -778,34 +755,24 @@ async def _select_address(self, *, access_mode, database):
778
755
return choice (addresses_by_usage [min (addresses_by_usage )])
779
756
780
757
async def acquire (
781
- self , access_mode , timeout , acquisition_timeout ,
782
- database , bookmarks , liveness_check_timeout
758
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
783
759
):
784
760
if access_mode not in (WRITE_ACCESS , READ_ACCESS ):
785
761
raise ClientError ("Non valid 'access_mode'; {}" .format (access_mode ))
786
762
if not timeout :
787
763
raise ClientError ("'timeout' must be a float larger than 0; {}"
788
764
.format (timeout ))
789
- if not acquisition_timeout :
790
- raise ClientError ("'acquisition_timeout' must be a float larger "
791
- "than 0; {}" .format (acquisition_timeout ))
792
- deadline = Deadline .from_timeout_or_deadline (timeout )
793
765
794
766
from neo4j .api import check_access_mode
795
767
access_mode = check_access_mode (access_mode )
796
- async with self ._refresh_lock_deadline ( deadline ) :
768
+ async with self .refresh_lock :
797
769
log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" ,
798
770
self .routing_tables )
799
771
await self .ensure_routing_table_is_fresh (
800
772
access_mode = access_mode , database = database , imp_user = None ,
801
- bookmarks = bookmarks , deadline = deadline
773
+ bookmarks = bookmarks , acquisition_timeout = timeout
802
774
)
803
775
804
- # Making sure the routing table is fresh is not considered part of the
805
- # connection acquisition. Hence, the acquisition_timeout starts now!
806
- deadline = merge_deadlines (
807
- deadline , Deadline .from_timeout_or_deadline (acquisition_timeout )
808
- )
809
776
while True :
810
777
try :
811
778
# Get an address for a connection that have the fewest in-use
@@ -817,6 +784,7 @@ async def acquire(
817
784
raise SessionExpired ("Failed to obtain connection towards '%s' server." % access_mode ) from err
818
785
try :
819
786
log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
787
+ deadline = Deadline .from_timeout_or_deadline (timeout )
820
788
# should always be a resolved address
821
789
connection = await self ._acquire (
822
790
address , deadline , liveness_check_timeout
0 commit comments