1212 KVConnectorFactory )
1313from vllm .distributed .kv_transfer .kv_connector .v1 .shared_storage_connector import ( # noqa
1414 SharedStorageConnector )
15+ from vllm .v1 .core .kv_cache_manager import KVCacheBlocks
1516
1617MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct"
1718
@@ -32,7 +33,7 @@ def __init__(self, config: VllmConfig, role):
3233 self .call_record : dict [str , int ] = defaultdict (int )
3334 # Use a unique temp file per connector
3435 self ._event_file = tempfile .gettempdir (
35- ) + f"/connector_{ self .name } _events.log"
36+ ) + f"/connector_{ self .name } - { self . role . name } _events.log"
3637 # Start with an empty file
3738 with open (self ._event_file , "w" ) as _ :
3839 pass
@@ -52,10 +53,19 @@ def __getattribute__(self, name):
5253
5354 def wrapper (* args , ** kwargs ):
5455 self .call_record [name ] += 1
56+
57+ # Include args that we're interested in
58+ to_log = [name ]
59+ for arg in args :
60+ if isinstance (arg , int ):
61+ to_log .append (str (arg ))
62+ elif isinstance (arg , KVCacheBlocks ):
63+ to_log .append (f"num_blocks={ len (arg .blocks )} " )
64+
5565 # Log the event as a line to the file
5666 try :
5767 with open (self ._event_file , "a" ) as f :
58- f .write (name + "\n " )
68+ f .write (' ' . join ( to_log ) + "\n " )
5969 except Exception as e :
6070 print (f"[ERROR] Could not log event { name } "
6171 f"for { self .name } : { e } " )
@@ -162,15 +172,23 @@ def test_multi_shared_storage_connector_consistency():
162172 f"{ storage_1_path } and { storage_2_path } " )
163173
164174 events = get_connector_events ()
165- # get_num_new_matched_tokens will be called on each connector in turn.
166- # neither of them have hits so update_state_after_alloc won't be called.
167- assert events ["storage1" ][:3 ] == [
168- 'get_num_new_matched_tokens' , 'build_connector_meta' ,
169- 'bind_connector_metadata'
175+ # get_num_new_matched_tokens and update_state_after_alloc will be called
176+ # on each connector in turn.
177+ assert events ["storage1-SCHEDULER" ][:3 ] == [
178+ 'get_num_new_matched_tokens 0' ,
179+ 'update_state_after_alloc num_blocks=0 0' , 'build_connector_meta'
180+ ]
181+ assert events ["storage1-WORKER" ][:5 ] == [
182+ 'register_kv_caches' , 'bind_connector_metadata' , 'start_load_kv' ,
183+ 'wait_for_layer_load' , 'save_kv_layer'
184+ ]
185+ assert events ["storage2-SCHEDULER" ][:3 ] == [
186+ 'get_num_new_matched_tokens 0' ,
187+ 'update_state_after_alloc num_blocks=0 0' , 'build_connector_meta'
170188 ]
171- assert events ["storage2" ][:3 ] == [
172- 'get_num_new_matched_tokens ' , 'build_connector_meta ' ,
173- 'bind_connector_metadata '
189+ assert events ["storage2-WORKER " ][:5 ] == [
190+ 'register_kv_caches ' , 'bind_connector_metadata' , 'start_load_kv ' ,
191+ 'wait_for_layer_load' , 'save_kv_layer '
174192 ]
175193
176194 # Reset prefix cache or else we'll just get the tokens back from there.
@@ -182,16 +200,16 @@ def test_multi_shared_storage_connector_consistency():
182200
183201 events = get_connector_events ()
184202 # get_num_new_matched_tokens will return new tokens from the first
185- # connector so update_state_after_alloc will be called once blocks
186- # are allocated for the first connector.
187- # get_num_new_matched_tokens *won't* be called on the second connector
188- # in this case.
189- assert events ["storage1" ][:4 ] == [
190- 'get_num_new_matched_tokens' , 'update_state_after_alloc' ,
191- 'build_connector_meta' , 'bind_connector_metadata'
203+ # connector so update_state_after_alloc will be with allocated blocks
204+ # on that one but with zero blocks for others (first nonzero match is
205+ # chosen).
206+ assert events ["storage1-SCHEDULER" ][:3 ] == [
207+ 'get_num_new_matched_tokens 0' ,
208+ 'update_state_after_alloc num_blocks=7 96' , 'build_connector_meta'
192209 ]
193- assert events ["storage2" ][:2 ] == [
194- 'build_connector_meta' , 'bind_connector_metadata'
210+ assert events ["storage2-SCHEDULER" ][:3 ] == [
211+ 'get_num_new_matched_tokens 0' ,
212+ 'update_state_after_alloc num_blocks=0 0' , 'build_connector_meta'
195213 ]
196214
197215 # Delete storage1 connector state
@@ -205,17 +223,17 @@ def test_multi_shared_storage_connector_consistency():
205223 _ = llm .generate (PROMPTS , SAMPLING_PARAMS )
206224
207225 events = get_connector_events ()
208- # get_num_new_matched_tokens will be called for the first connector but it
209- # won't have a hit so update_state_after_alloc won't be called.
210- # get_num_new_matched_tokens will also be called on the second connector,
211- # but it should have a hit so update_state_after_alloc will be called .
212- assert events ["storage1" ][:3 ] == [
213- 'get_num_new_matched_tokens' , 'build_connector_meta ' ,
214- 'bind_connector_metadata '
226+ # get_num_new_matched_tokens will be called for both connectors but will
227+ # return 0 from the first connector, but the second connector should have
228+ # a hit, so update_state_after_alloc will only be called with allocated
229+ # blocks for the second connector .
230+ assert events ["storage1-SCHEDULER " ][:3 ] == [
231+ 'get_num_new_matched_tokens 0 ' ,
232+ 'update_state_after_alloc num_blocks=0 0' , 'build_connector_meta '
215233 ]
216- assert events ["storage2" ][:4 ] == [
217- 'get_num_new_matched_tokens' , 'update_state_after_alloc ' ,
218- 'build_connector_meta ' , 'bind_connector_metadata '
234+ assert events ["storage2-SCHEDULER " ][:3 ] == [
235+ 'get_num_new_matched_tokens 0 ' ,
236+ 'update_state_after_alloc num_blocks=7 96 ' , 'build_connector_meta '
219237 ]
220238
221239 # Clean up
0 commit comments