33
33
AsyncRLock ,
34
34
)
35
35
from ..._async_compat .network import AsyncNetworkUtil
36
- from ..._async_compat .util import AsyncUtil
37
36
from ..._conf import (
38
37
PoolConfig ,
39
38
WorkspaceConfig ,
40
39
)
41
40
from ..._deadline import (
42
41
connection_deadline ,
43
42
Deadline ,
44
- merge_deadlines ,
45
- merge_deadlines_and_timeouts ,
46
43
)
47
44
from ..._exceptions import BoltError
48
45
from ..._routing import RoutingTable
@@ -222,18 +219,18 @@ async def health_check(connection_, deadline_):
222
219
223
220
@abc .abstractmethod
224
221
async def acquire (
225
- self , access_mode , timeout , acquisition_timeout ,
226
- database , bookmarks , liveness_check_timeout
222
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
227
223
):
228
224
""" Acquire a connection to a server that can satisfy a set of parameters.
229
225
230
226
:param access_mode:
231
- :param timeout: total timeout (including potential preparation)
232
- :param acquisition_timeout: timeout for actually acquiring a connection
227
+ :param timeout: timeout for the core acquisition
228
+ (excluding potential preparation like fetching routing tables).
233
229
:param database:
234
230
:param bookmarks:
235
231
:param liveness_check_timeout:
236
232
"""
233
+ ...
237
234
238
235
def kill_and_release (self , * connections ):
239
236
""" Release connections back into the pool after closing them.
@@ -397,12 +394,11 @@ def __repr__(self):
397
394
self .address )
398
395
399
396
async def acquire (
400
- self , access_mode , timeout , acquisition_timeout ,
401
- database , bookmarks , liveness_check_timeout
397
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
402
398
):
403
399
# The access_mode and database is not needed for a direct connection,
404
400
# it's just there for consistency.
405
- deadline = merge_deadlines_and_timeouts (timeout , acquisition_timeout )
401
+ deadline = Deadline . from_timeout_or_deadline (timeout )
406
402
return await self ._acquire (
407
403
self .address , deadline , liveness_check_timeout
408
404
)
@@ -464,22 +460,6 @@ def __repr__(self):
464
460
"""
465
461
return "<{} addresses={!r}>" .format (self .__class__ .__name__ , self .get_default_database_initial_router_addresses ())
466
462
467
- @asynccontextmanager
468
- async def _refresh_lock_deadline (self , deadline ):
469
- timeout = deadline .to_timeout ()
470
- if timeout == float ("inf" ):
471
- timeout = - 1
472
- if not await self .refresh_lock .acquire (timeout = timeout ):
473
- raise ClientError (
474
- "pool failed to update routing table within {!r}s (timeout)"
475
- .format (deadline .original_timeout )
476
- )
477
-
478
- try :
479
- yield
480
- finally :
481
- self .refresh_lock .release ()
482
-
483
463
@property
484
464
def first_initial_routing_address (self ):
485
465
return self .get_default_database_initial_router_addresses ()[0 ]
@@ -513,7 +493,7 @@ async def get_or_create_routing_table(self, database):
513
493
return self .routing_tables [database ]
514
494
515
495
async def fetch_routing_info (
516
- self , address , database , imp_user , bookmarks , deadline
496
+ self , address , database , imp_user , bookmarks , timeout
517
497
):
518
498
""" Fetch raw routing info from a given router address.
519
499
@@ -524,32 +504,32 @@ async def fetch_routing_info(
524
504
:type imp_user: str or None
525
505
:param bookmarks: iterable of bookmark values after which the routing
526
506
info should be fetched
527
- :param deadline : connection acquisition deadline
507
+ :param timeout : connection acquisition timeout
528
508
529
509
:return: list of routing records, or None if no connection
530
510
could be established or if no readers or writers are present
531
511
:raise ServiceUnavailable: if the server does not support
532
512
routing, or if routing support is broken or outdated
533
513
"""
514
+ deadline = Deadline .from_timeout_or_deadline (timeout )
534
515
cx = await self ._acquire (address , deadline , None )
535
516
try :
536
- with connection_deadline (cx , deadline ):
537
- routing_table = await cx .route (
538
- database or self .workspace_config .database ,
539
- imp_user or self .workspace_config .impersonated_user ,
540
- bookmarks
541
- )
517
+ routing_table = await cx .route (
518
+ database or self .workspace_config .database ,
519
+ imp_user or self .workspace_config .impersonated_user ,
520
+ bookmarks
521
+ )
542
522
finally :
543
523
await self .release (cx )
544
524
return routing_table
545
525
546
526
async def fetch_routing_table (
547
- self , * , address , deadline , database , imp_user , bookmarks
527
+ self , * , address , timeout , database , imp_user , bookmarks
548
528
):
549
529
""" Fetch a routing table from a given router address.
550
530
551
531
:param address: router address
552
- :param deadline: deadline
532
+ :param timeout: connection acquisition timeout
553
533
:param database: the database name
554
534
:type: str
555
535
:param imp_user: the user to impersonate while fetching the routing
@@ -563,7 +543,7 @@ async def fetch_routing_table(
563
543
new_routing_info = None
564
544
try :
565
545
new_routing_info = await self .fetch_routing_info (
566
- address , database , imp_user , bookmarks , deadline
546
+ address , database , imp_user , bookmarks , timeout
567
547
)
568
548
except Neo4jError as e :
569
549
# checks if the code is an error that is caused by the client. In
@@ -606,7 +586,7 @@ async def fetch_routing_table(
606
586
return new_routing_table
607
587
608
588
async def _update_routing_table_from (
609
- self , * routers , database , imp_user , bookmarks , deadline ,
589
+ self , * routers , database , imp_user , bookmarks , timeout ,
610
590
database_callback
611
591
):
612
592
""" Try to update routing tables with the given routers.
@@ -621,12 +601,9 @@ async def _update_routing_table_from(
621
601
async for address in AsyncNetworkUtil .resolve_address (
622
602
router , resolver = self .pool_config .resolver
623
603
):
624
- if deadline .expired ():
625
- return False
626
604
new_routing_table = await self .fetch_routing_table (
627
- address = address ,
628
- deadline = deadline ,
629
- database = database , imp_user = imp_user , bookmarks = bookmarks
605
+ address = address , timeout = timeout , database = database ,
606
+ imp_user = imp_user , bookmarks = bookmarks
630
607
)
631
608
if new_routing_table is not None :
632
609
new_database = new_routing_table .database
@@ -656,7 +633,7 @@ async def update_routing_table(
656
633
table
657
634
:type imp_user: str or None
658
635
:param bookmarks: bookmarks used when fetching routing table
659
- :param timeout: timeout in seconds for how long to try updating
636
+ :param timeout: connection acquisition timeout
660
637
:param database_callback: A callback function that will be called with
661
638
the database name as only argument when a new routing table has been
662
639
acquired. This database name might different from `database` if that
@@ -665,10 +642,7 @@ async def update_routing_table(
665
642
666
643
:raise neo4j.exceptions.ServiceUnavailable:
667
644
"""
668
- deadline = merge_deadlines_and_timeouts (
669
- timeout , self .pool_config .update_routing_table_timeout
670
- )
671
- async with self ._refresh_lock_deadline (deadline ):
645
+ async with self .refresh_lock :
672
646
routing_table = await self .get_or_create_routing_table (database )
673
647
# copied because it can be modified
674
648
existing_routers = set (routing_table .routers )
@@ -681,23 +655,22 @@ async def update_routing_table(
681
655
if await self ._update_routing_table_from (
682
656
self .first_initial_routing_address , database = database ,
683
657
imp_user = imp_user , bookmarks = bookmarks ,
684
- deadline = deadline , database_callback = database_callback
658
+ timeout = timeout , database_callback = database_callback
685
659
):
686
660
# Why is only the first initial routing address used?
687
661
return
688
662
if await self ._update_routing_table_from (
689
663
* (existing_routers - {self .first_initial_routing_address }),
690
664
database = database , imp_user = imp_user , bookmarks = bookmarks ,
691
- deadline = deadline , database_callback = database_callback
665
+ timeout = timeout , database_callback = database_callback
692
666
):
693
667
return
694
668
695
669
if not prefer_initial_routing_address :
696
670
if await self ._update_routing_table_from (
697
671
self .first_initial_routing_address , database = database ,
698
672
imp_user = imp_user , bookmarks = bookmarks ,
699
- deadline = deadline ,
700
- database_callback = database_callback
673
+ timeout = timeout , database_callback = database_callback
701
674
):
702
675
# Why is only the first initial routing address used?
703
676
return
@@ -714,7 +687,7 @@ async def update_connection_pool(self, *, database):
714
687
await super (AsyncNeo4jPool , self ).deactivate (address )
715
688
716
689
async def ensure_routing_table_is_fresh (
717
- self , * , access_mode , database , imp_user , bookmarks , deadline = None ,
690
+ self , * , access_mode , database , imp_user , bookmarks , timeout = None ,
718
691
database_callback = None
719
692
):
720
693
""" Update the routing table if stale.
@@ -730,15 +703,15 @@ async def ensure_routing_table_is_fresh(
730
703
:return: `True` if an update was required, `False` otherwise.
731
704
"""
732
705
from neo4j .api import READ_ACCESS
733
- async with self ._refresh_lock_deadline ( deadline ) :
706
+ async with self .refresh_lock :
734
707
routing_table = await self .get_or_create_routing_table (database )
735
708
if routing_table .is_fresh (readonly = (access_mode == READ_ACCESS )):
736
709
# Readers are fresh.
737
710
return False
738
711
739
712
await self .update_routing_table (
740
713
database = database , imp_user = imp_user , bookmarks = bookmarks ,
741
- timeout = deadline , database_callback = database_callback
714
+ timeout = timeout , database_callback = database_callback
742
715
)
743
716
await self .update_connection_pool (database = database )
744
717
@@ -778,34 +751,24 @@ async def _select_address(self, *, access_mode, database):
778
751
return choice (addresses_by_usage [min (addresses_by_usage )])
779
752
780
753
async def acquire (
781
- self , access_mode , timeout , acquisition_timeout ,
782
- database , bookmarks , liveness_check_timeout
754
+ self , access_mode , timeout , database , bookmarks , liveness_check_timeout
783
755
):
784
756
if access_mode not in (WRITE_ACCESS , READ_ACCESS ):
785
757
raise ClientError ("Non valid 'access_mode'; {}" .format (access_mode ))
786
758
if not timeout :
787
759
raise ClientError ("'timeout' must be a float larger than 0; {}"
788
760
.format (timeout ))
789
- if not acquisition_timeout :
790
- raise ClientError ("'acquisition_timeout' must be a float larger "
791
- "than 0; {}" .format (acquisition_timeout ))
792
- deadline = Deadline .from_timeout_or_deadline (timeout )
793
761
794
762
from neo4j .api import check_access_mode
795
763
access_mode = check_access_mode (access_mode )
796
- async with self ._refresh_lock_deadline ( deadline ) :
764
+ async with self .refresh_lock :
797
765
log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" ,
798
766
self .routing_tables )
799
767
await self .ensure_routing_table_is_fresh (
800
768
access_mode = access_mode , database = database , imp_user = None ,
801
- bookmarks = bookmarks , deadline = deadline
769
+ bookmarks = bookmarks , timeout = timeout
802
770
)
803
771
804
- # Making sure the routing table is fresh is not considered part of the
805
- # connection acquisition. Hence, the acquisition_timeout starts now!
806
- deadline = merge_deadlines (
807
- deadline , Deadline .from_timeout_or_deadline (acquisition_timeout )
808
- )
809
772
while True :
810
773
try :
811
774
# Get an address for a connection that have the fewest in-use
@@ -817,6 +780,7 @@ async def acquire(
817
780
raise SessionExpired ("Failed to obtain connection towards '%s' server." % access_mode ) from err
818
781
try :
819
782
log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
783
+ deadline = Deadline .from_timeout_or_deadline (timeout )
820
784
# should always be a resolved address
821
785
connection = await self ._acquire (
822
786
address , deadline , liveness_check_timeout
0 commit comments