Skip to content

Commit e62ad58

Browse files
committed
Improvements
1 parent 4ac3bc6 commit e62ad58

File tree

5 files changed

+72
-54
lines changed

5 files changed

+72
-54
lines changed

.changeset/funny-rockets-speak.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
---
22
'hive-console-sdk-rs': patch
3-
'hive-apollo-router-plugin': patch
3+
'hive-apollo-router-plugin': minor
44
---
55

66
Extract Hive Console integration implementation into a new package `hive-console-sdk` which can
77
be used by any Rust library for Hive Console integration
8+
9+
It also includes a refactor to use less Mutexes like replacing `lru` + `Mutex` with the thread-safe `moka` package.
10+
Only one place that handles queueing uses `Mutex` now.
11+
12+
13+
Fixes a bug when Persisted Operations are enabled by default which should be explicitly enabled

packages/libraries/router/src/persisted_documents.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ impl Plugin for PersistedDocumentsPlugin {
6464
}
6565

6666
fn router_service(&self, service: router::BoxService) -> router::BoxService {
67-
let enabled = self.configuration.enabled.unwrap_or(true);
67+
let enabled = self.configuration.enabled.unwrap_or(false);
68+
if enabled {
6869
let allow_arbitrary_documents = self
6970
.configuration
7071
.allow_arbitrary_documents
7172
.unwrap_or(false);
7273
let mgr_ref = self.persisted_documents_manager.clone();
7374

74-
if enabled {
7575
ServiceBuilder::new()
7676
.checkpoint_async(move |req: router::Request| {
7777
let mgr = mgr_ref.clone();

packages/libraries/router/src/usage.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,6 @@ impl Plugin for UsagePlugin {
207207
.clone()
208208
.or_else(|| env::var("HIVE_TARGET_ID").ok());
209209

210-
// In case target ID is specified in configuration, append it to the endpoint
211-
if let Some(target_id) = &target_id {
212-
endpoint.push_str(&format!("/{}", target_id));
213-
}
214-
215210
let default_config = Config::default();
216211
let user_config = init.config;
217212
let enabled = user_config
@@ -251,6 +246,7 @@ impl Plugin for UsagePlugin {
251246
let agent = Arc::new(UsageAgent::new(
252247
token.expect("token is set"),
253248
endpoint,
249+
target_id,
254250
buffer_size,
255251
connect_timeout,
256252
request_timeout,

packages/libraries/sdk-rs/src/agent.rs

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ impl UsageAgent {
141141
pub fn new(
142142
token: String,
143143
endpoint: String,
144+
target_id: Option<String>,
144145
buffer_size: usize,
145146
connect_timeout: u64,
146147
request_timeout: u64,
@@ -164,6 +165,14 @@ impl UsageAgent {
164165
.build();
165166
let buffer = Arc::new(Buffer::new());
166167

168+
let mut endpoint = endpoint;
169+
170+
if token.starts_with("hvo1/") {
171+
if let Some(target_id) = target_id {
172+
endpoint.push_str(&format!("/{}", target_id));
173+
}
174+
}
175+
167176
UsageAgent {
168177
buffer,
169178
processor,
@@ -198,43 +207,51 @@ impl UsageAgent {
198207
);
199208
continue;
200209
}
201-
Ok(operation) => {
202-
let hash = operation.hash;
203-
204-
let client_name = non_empty_string(op.client_name);
205-
let client_version = non_empty_string(op.client_version);
206-
207-
let metadata: Option<Metadata> =
208-
if client_name.is_some() || client_version.is_some() {
209-
Some(Metadata {
210-
client: Some(ClientInfo {
211-
name: client_name,
212-
version: client_version,
213-
}),
214-
})
215-
} else {
216-
None
217-
};
218-
report.operations.push(Operation {
219-
operationMapKey: hash.clone(),
220-
timestamp: op.timestamp,
221-
execution: Execution {
222-
ok: op.ok,
223-
duration: op.duration.as_nanos(),
224-
errorsTotal: op.errors,
225-
},
226-
persistedDocumentHash: op.persisted_document_hash,
227-
metadata,
228-
});
229-
if let std::collections::hash_map::Entry::Vacant(e) = report.map.entry(hash) {
230-
e.insert(OperationMapRecord {
231-
operation: operation.operation,
232-
operationName: non_empty_string(op.operation_name),
233-
fields: operation.coordinates,
210+
Ok(operation) => match operation {
211+
Some(operation) => {
212+
let hash = operation.hash;
213+
214+
let client_name = non_empty_string(op.client_name);
215+
let client_version = non_empty_string(op.client_version);
216+
217+
let metadata: Option<Metadata> =
218+
if client_name.is_some() || client_version.is_some() {
219+
Some(Metadata {
220+
client: Some(ClientInfo {
221+
name: client_name,
222+
version: client_version,
223+
}),
224+
})
225+
} else {
226+
None
227+
};
228+
report.operations.push(Operation {
229+
operationMapKey: hash.clone(),
230+
timestamp: op.timestamp,
231+
execution: Execution {
232+
ok: op.ok,
233+
duration: op.duration.as_nanos(),
234+
errorsTotal: op.errors,
235+
},
236+
persistedDocumentHash: op.persisted_document_hash,
237+
metadata,
234238
});
239+
if let std::collections::hash_map::Entry::Vacant(e) = report.map.entry(hash)
240+
{
241+
e.insert(OperationMapRecord {
242+
operation: operation.operation,
243+
operationName: non_empty_string(op.operation_name),
244+
fields: operation.coordinates,
245+
});
246+
}
247+
report.size += 1;
235248
}
236-
report.size += 1;
237-
}
249+
None => {
250+
tracing::debug!(
251+
"Dropping operation (phase: PROCESSING): probably introspection query"
252+
);
253+
}
254+
},
238255
}
239256
}
240257

packages/libraries/sdk-rs/src/graphql.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ pub struct ProcessedOperation {
849849
}
850850

851851
pub struct OperationProcessor {
852-
cache: Cache<String, ProcessedOperation>,
852+
cache: Cache<String, Option<ProcessedOperation>>,
853853
}
854854

855855
impl Default for OperationProcessor {
@@ -869,18 +869,17 @@ impl OperationProcessor {
869869
&self,
870870
query: &str,
871871
schema: &SchemaDocument<'static, String>,
872-
) -> Result<ProcessedOperation, String> {
873-
let key = query.to_string();
874-
if let Some(cached) = self.cache.get(&key) {
875-
Ok(cached)
872+
) -> Result<Option<ProcessedOperation>, String> {
873+
if self.cache.contains_key(query) {
874+
let entry = self
875+
.cache
876+
.get(query)
877+
.expect("Unable to acquire Cache in OperationProcessor.process");
878+
Ok(entry.clone())
876879
} else {
877880
let result = self.transform(query, schema)?;
878-
if let Some(result) = result {
879-
self.cache.insert(key, result.clone());
880-
Ok(result)
881-
} else {
882-
Err("Unable to acquire Cache in OperationProcessor.process".to_string())
883-
}
881+
self.cache.insert(query.to_string(), result.clone());
882+
Ok(result)
884883
}
885884
}
886885

0 commit comments

Comments
 (0)