diff --git a/omicron-common/src/collection.rs b/omicron-common/src/collection.rs deleted file mode 100644 index 9a46f57456f..00000000000 --- a/omicron-common/src/collection.rs +++ /dev/null @@ -1,69 +0,0 @@ -/*! - * Functions for iterating pages from a collection - */ - -use crate::api::external::DataPageParams; -use crate::api::external::Error; -use crate::api::external::ListResult; -use crate::api::external::PaginationOrder::Ascending; -use crate::api::external::PaginationOrder::Descending; -use futures::StreamExt; -use std::collections::BTreeMap; -use std::convert::TryFrom; -use std::ops::Bound; -use std::sync::Arc; - -/** - * List a page of items from a collection `search_tree` that maps lookup keys - * directly to the actual objects - */ -pub fn collection_page( - search_tree: &BTreeMap>, - pagparams: &DataPageParams<'_, KeyType>, -) -> ListResult> -where - KeyType: std::cmp::Ord, - ValueType: Send + Sync + 'static, -{ - /* - * We assemble the list of results that we're going to return now. If the - * caller is holding a lock, they'll be able to release it right away. This - * also makes the lifetime of the return value much easier. - */ - let list = collection_page_as_iter(search_tree, pagparams) - .map(|(_, v)| Ok(Arc::clone(v))) - .collect::, Error>>>(); - Ok(futures::stream::iter(list).boxed()) -} - -/** - * Returns a page of items from a collection `search_tree` as an iterator - */ -pub fn collection_page_as_iter<'a, 'b, KeyType, ValueType>( - search_tree: &'a BTreeMap, - pagparams: &'b DataPageParams<'_, KeyType>, -) -> Box + 'a> -where - KeyType: std::cmp::Ord, -{ - /* - * Convert the 32-bit limit to a "usize". This can in principle fail, but - * not in any context in which we ever expect this code to run. - */ - let limit = usize::try_from(pagparams.limit.get()).unwrap(); - match (pagparams.direction, &pagparams.marker) { - (Ascending, None) => Box::new(search_tree.iter().take(limit)), - (Descending, None) => Box::new(search_tree.iter().rev().take(limit)), - (Ascending, Some(start_value)) => Box::new( - search_tree - .range((Bound::Excluded(*start_value), Bound::Unbounded)) - .take(limit), - ), - (Descending, Some(start_value)) => Box::new( - search_tree - .range((Bound::Unbounded, Bound::Excluded(*start_value))) - .rev() - .take(limit), - ), - } -} diff --git a/omicron-common/src/lib.rs b/omicron-common/src/lib.rs index bb1e1f1db6f..7ec764e8949 100644 --- a/omicron-common/src/lib.rs +++ b/omicron-common/src/lib.rs @@ -23,7 +23,6 @@ pub mod api; pub mod backoff; pub mod cmd; -pub mod collection; pub mod config; pub mod dev; pub mod http_client; diff --git a/omicron-common/src/sql/dbinit.sql b/omicron-common/src/sql/dbinit.sql index 33097cd34bd..bd0825c7321 100644 --- a/omicron-common/src/sql/dbinit.sql +++ b/omicron-common/src/sql/dbinit.sql @@ -39,6 +39,22 @@ CREATE DATABASE omicron; CREATE USER omicron; GRANT INSERT, SELECT, UPDATE, DELETE ON DATABASE omicron to omicron; +/* + * Sleds + */ + +CREATE TABLE omicron.public.Sled ( + /* Identity metadata */ + id UUID PRIMARY KEY, + time_created TIMESTAMPTZ NOT NULL, + time_modified TIMESTAMPTZ NOT NULL, + /* Indicates that the object has been deleted */ + time_deleted TIMESTAMPTZ, + + ip INET NOT NULL, + port INT4 NOT NULL +); + /* * Projects */ @@ -197,19 +213,6 @@ CREATE INDEX ON omicron.public.Disk ( time_deleted IS NULL AND attach_instance_id IS NOT NULL; -/* - * Sleds - */ - -CREATE TABLE omicron.public.Sled ( - /* Identity metadata -- abbreviated for sleds */ - id UUID PRIMARY KEY, - time_created TIMESTAMPTZ NOT NULL, - time_modified TIMESTAMPTZ NOT NULL, - - sled_agent_ip INET -); - /* * Oximeter collector servers. */ diff --git a/omicron-nexus/src/db/datastore.rs b/omicron-nexus/src/db/datastore.rs index f696598f42a..15da6e36034 100644 --- a/omicron-nexus/src/db/datastore.rs +++ b/omicron-nexus/src/db/datastore.rs @@ -60,6 +60,66 @@ impl DataStore { self.pool.pool() } + /// Stores a new sled in the database. + pub async fn sled_upsert( + &self, + sled: db::model::Sled, + ) -> CreateResult { + use db::schema::sled::dsl; + diesel::insert_into(dsl::sled) + .values(sled.clone()) + .on_conflict(dsl::id) + .do_update() + .set(( + dsl::time_modified.eq(Utc::now()), + dsl::ip.eq(sled.ip), + dsl::port.eq(sled.port), + )) + .get_result_async(self.pool()) + .await + .map_err(|e| { + public_error_from_diesel_pool_create( + e, + ResourceType::Sled, + &sled.id.to_string(), + ) + }) + } + + pub async fn sled_list( + &self, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + use db::schema::sled::dsl; + paginated(dsl::sled, dsl::id, pagparams) + .filter(dsl::time_deleted.is_null()) + .load_async::(self.pool()) + .await + .map_err(|e| { + public_error_from_diesel_pool( + e, + ResourceType::Sled, + LookupType::Other("Listing All".to_string()), + ) + }) + } + + pub async fn sled_fetch(&self, id: Uuid) -> LookupResult { + use db::schema::sled::dsl; + dsl::sled + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(id)) + .first_async::(self.pool()) + .await + .map_err(|e| { + public_error_from_diesel_pool( + e, + ResourceType::Sled, + LookupType::ById(id), + ) + }) + } + /// Create a project pub async fn project_create( &self, diff --git a/omicron-nexus/src/db/model.rs b/omicron-nexus/src/db/model.rs index 2e834471392..b7d2d4b520b 100644 --- a/omicron-nexus/src/db/model.rs +++ b/omicron-nexus/src/db/model.rs @@ -1,8 +1,8 @@ //! Structures stored to the database. use super::schema::{ - disk, instance, metricproducer, networkinterface, oximeter, project, vpc, - vpcsubnet, + disk, instance, metricproducer, networkinterface, oximeter, project, sled, + vpc, vpcsubnet, }; use chrono::{DateTime, Utc}; use diesel::backend::{Backend, RawValue}; @@ -33,20 +33,60 @@ impl Into for Rack { } } -// NOTE: This object is not currently stored in the database. -// -// However, it likely will be in the future. At the moment, -// Nexus simply reports all the live connections it knows about. +/// Database representation of a Sled. +#[derive(Queryable, Identifiable, Insertable, Debug, Clone)] +#[table_name = "sled"] pub struct Sled { - pub identity: IdentityMetadata, - pub service_address: SocketAddr, + // IdentityMetadata + pub id: Uuid, + pub time_created: DateTime, + pub time_modified: DateTime, + pub time_deleted: Option>, + + // ServiceAddress (Sled Agent). + pub ip: ipnetwork::IpNetwork, + pub port: i32, +} + +impl Sled { + pub fn new( + id: Uuid, + addr: SocketAddr, + params: external::IdentityMetadataCreateParams, + ) -> Self { + let identity = IdentityMetadata::new(id, params); + Self { + id, + time_created: identity.time_created, + time_modified: identity.time_modified, + time_deleted: identity.time_deleted, + ip: addr.ip().into(), + port: addr.port().into(), + } + } + + pub fn id(&self) -> &Uuid { + &self.id + } + + pub fn address(&self) -> SocketAddr { + // TODO: avoid this unwrap + SocketAddr::new(self.ip.ip(), u16::try_from(self.port).unwrap()) + } } impl Into for Sled { fn into(self) -> external::Sled { + let service_address = self.address(); external::Sled { - identity: self.identity.into(), - service_address: self.service_address, + identity: external::IdentityMetadata { + id: self.id, + name: external::Name::try_from("sled").unwrap(), + description: "sled description".to_string(), + time_created: self.time_created, + time_modified: self.time_modified, + }, + service_address, } } } diff --git a/omicron-nexus/src/db/schema.rs b/omicron-nexus/src/db/schema.rs index 65ef3dba63e..100c6a15818 100644 --- a/omicron-nexus/src/db/schema.rs +++ b/omicron-nexus/src/db/schema.rs @@ -118,7 +118,10 @@ table! { id -> Uuid, time_created -> Timestamptz, time_modified -> Timestamptz, - sled_agent_ip -> Nullable, + time_deleted -> Nullable, + + ip -> Inet, + port -> Int4, } } diff --git a/omicron-nexus/src/http_entrypoints_external.rs b/omicron-nexus/src/http_entrypoints_external.rs index 163fcee5ee8..facef3f1cb1 100644 --- a/omicron-nexus/src/http_entrypoints_external.rs +++ b/omicron-nexus/src/http_entrypoints_external.rs @@ -998,10 +998,13 @@ async fn hardware_sleds_get( let apictx = rqctx.context(); let nexus = &apictx.nexus; let query = query_params.into_inner(); - let sled_stream = - nexus.sleds_list(&data_page_params_for(&rqctx, &query)?).await?; - let view_list = to_list::(sled_stream).await; - Ok(HttpResponseOk(ScanById::results_page(&query, view_list)?)) + let sleds = nexus + .sleds_list(&data_page_params_for(&rqctx, &query)?) + .await? + .into_iter() + .map(|s| s.into()) + .collect(); + Ok(HttpResponseOk(ScanById::results_page(&query, sleds)?)) } /** diff --git a/omicron-nexus/src/http_entrypoints_internal.rs b/omicron-nexus/src/http_entrypoints_internal.rs index 9123b42ca00..ac62c732d31 100644 --- a/omicron-nexus/src/http_entrypoints_internal.rs +++ b/omicron-nexus/src/http_entrypoints_internal.rs @@ -15,7 +15,6 @@ use omicron_common::api::internal::nexus::InstanceRuntimeState; use omicron_common::api::internal::nexus::OximeterInfo; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::api::internal::nexus::SledAgentStartupInfo; -use omicron_common::SledAgentClient; use schemars::JsonSchema; use serde::Deserialize; use std::sync::Arc; @@ -68,11 +67,7 @@ async fn cpapi_sled_agents_post( let path = path_params.into_inner(); let si = sled_info.into_inner(); let sled_id = &path.sled_id; - let client_log = - apictx.log.new(o!("SledAgent" => sled_id.clone().to_string())); - let client = - Arc::new(SledAgentClient::new(&sled_id, si.sa_address, client_log)); - nexus.upsert_sled_agent(client).await; + nexus.upsert_sled(*sled_id, si.sa_address).await?; Ok(HttpResponseUpdatedNoContent()) } diff --git a/omicron-nexus/src/nexus.rs b/omicron-nexus/src/nexus.rs index d57a632322e..82da1643278 100644 --- a/omicron-nexus/src/nexus.rs +++ b/omicron-nexus/src/nexus.rs @@ -9,7 +9,6 @@ use anyhow::Context; use async_trait::async_trait; use chrono::Utc; use futures::future::ready; -use futures::lock::Mutex; use futures::StreamExt; use omicron_common::api::external; use omicron_common::api::external::CreateResult; @@ -47,13 +46,10 @@ use omicron_common::api::internal::sled_agent::InstanceHardware; use omicron_common::api::internal::sled_agent::InstanceRuntimeStateRequested; use omicron_common::api::internal::sled_agent::InstanceStateRequested; use omicron_common::bail_unless; -use omicron_common::collection::collection_page; use omicron_common::OximeterClient; use omicron_common::SledAgentClient; use slog::Logger; -use std::collections::BTreeMap; -use std::convert::TryFrom; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -108,16 +104,6 @@ pub struct Nexus { /** saga execution coordinator */ sec_client: Arc, - - /** - * List of sled agents known by this nexus. - * TODO This ought to have some representation in the data store as well so - * that we don't simply forget about sleds that aren't currently up. We'll - * need to think about the interface between this program and the sleds and - * how we discover them, both when they initially show up and when we come - * up. - */ - sled_agents: Mutex>>, } /* @@ -166,7 +152,6 @@ impl Nexus { }, db_datastore, sec_client: Arc::clone(&sec_client), - sled_agents: Mutex::new(BTreeMap::new()), }; /* @@ -191,11 +176,22 @@ impl Nexus { * TODO-robustness we should have a limit on how many sled agents there can * be (for graceful degradation at large scale). */ - pub async fn upsert_sled_agent(&self, sa: Arc) { - let mut scs = self.sled_agents.lock().await; - info!(self.log, "registered sled agent"; - "sled_uuid" => sa.id.to_string()); - scs.insert(sa.id, sa); + pub async fn upsert_sled( + &self, + id: Uuid, + address: SocketAddr, + ) -> Result<(), Error> { + info!(self.log, "registered sled agent"; "sled_uuid" => id.to_string()); + + // Insert the sled into the database. + let create_params = IdentityMetadataCreateParams { + name: Name::try_from("sled").unwrap(), + description: "Self-Identified Sled".to_string(), + }; + let sled = db::model::Sled::new(id, address, create_params); + self.db_datastore.sled_upsert(sled).await?; + + Ok(()) } /** @@ -544,16 +540,22 @@ impl Nexus { * SagaContext::alloc_server(). */ pub async fn sled_allocate(&self) -> Result { - let sleds = self.sled_agents.lock().await; + // TODO: replace this with a real allocation policy. + // + // This implementation always assigns the first sled (by ID order). + let pagparams = DataPageParams { + marker: None, + direction: dropshot::PaginationOrder::Ascending, + limit: std::num::NonZeroU32::new(1).unwrap(), + }; + let sleds = self.db_datastore.sled_list(&pagparams).await?; - /* TODO replace this with a real allocation policy. */ sleds - .values() - .next() + .first() .ok_or_else(|| Error::ServiceUnavailable { message: String::from("no sleds available for new Instance"), }) - .map(|s| s.id) + .map(|s| *s.id()) } pub async fn project_list_instances( @@ -710,14 +712,19 @@ impl Nexus { pub async fn sled_client( &self, - sled_uuid: &Uuid, + id: &Uuid, ) -> Result, Error> { - let sled_agents = self.sled_agents.lock().await; - Ok(Arc::clone(sled_agents.get(sled_uuid).ok_or_else(|| { - let message = - format!("no sled agent for sled_uuid \"{}\"", sled_uuid); - Error::ServiceUnavailable { message } - })?)) + // TODO: We should consider injecting connection pooling here, + // but for now, connections to sled agents are constructed + // on an "as requested" basis. + // + // Franky, returning an "Arc" here without a connection pool is a little + // silly; it's not actually used if each client connection exists as a + // one-shot. + let sled = self.sled_lookup(id).await?; + + let log = self.log.new(o!("SledAgent" => id.clone().to_string())); + Ok(Arc::new(SledAgentClient::new(id, sled.address(), log))) } /** @@ -1265,61 +1272,21 @@ impl Nexus { } /* - * Sleds. - * TODO-completeness: Eventually, we'll want sleds to be stored in the - * database, with a controlled process for adopting them, decommissioning - * them, etc. For now, we expose an Sled for each SledAgentClient - * that we've got. + * Sleds */ + pub async fn sleds_list( &self, pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResult { - let sled_agents = self.sled_agents.lock().await; - let sleds = collection_page(&sled_agents, pagparams)? - .filter(|maybe_object| ready(maybe_object.is_ok())) - .map(|sa| { - let sa = sa.unwrap(); - Ok(db::model::Sled { - identity: db::model::IdentityMetadata { - /* TODO-correctness cons up real metadata here */ - id: sa.id, - name: Name::try_from(format!("sled-{}", sa.id)) - .unwrap(), - description: String::from(""), - time_created: Utc::now(), - time_modified: Utc::now(), - time_deleted: None, - }, - service_address: sa.service_address, - }) - }) - .collect::>>() - .await; - Ok(futures::stream::iter(sleds).boxed()) + ) -> ListResultVec { + self.db_datastore.sled_list(pagparams).await } pub async fn sled_lookup( &self, sled_id: &Uuid, ) -> LookupResult { - let nexuses = self.sled_agents.lock().await; - let sa = nexuses.get(sled_id).ok_or_else(|| { - Error::not_found_by_id(ResourceType::Sled, sled_id) - })?; - - Ok(db::model::Sled { - identity: db::model::IdentityMetadata { - /* TODO-correctness cons up real metadata here */ - id: sa.id, - name: Name::try_from(format!("sled-{}", sa.id)).unwrap(), - description: String::from(""), - time_created: Utc::now(), - time_modified: Utc::now(), - time_deleted: None, - }, - service_address: sa.service_address, - }) + self.db_datastore.sled_fetch(*sled_id).await } /* diff --git a/omicron-nexus/src/saga_interface.rs b/omicron-nexus/src/saga_interface.rs index c9231308df7..f748f5ed778 100644 --- a/omicron-nexus/src/saga_interface.rs +++ b/omicron-nexus/src/saga_interface.rs @@ -38,9 +38,6 @@ impl SagaContext { * an undo action. The only thing needed at this layer is a way to read and * write to the database, which we already have. * - * For now, sleds aren't in the database. We rely on the fact that Nexus - * knows what sleds exist. - * * Note: the parameters appear here (unused) to make sure callers make sure * to have them available. They're not used now, but they will be in a real * implementation.