Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion vm/devices/vmbus/vmbus_channel/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl OpenRequest {
}
}

#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Protobuf)]
#[derive(Debug, Default, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Protobuf, Inspect)]
/// The identifying IDs for a channel offer.
#[mesh(package = "vmbus")]
pub struct OfferKey {
Expand All @@ -255,6 +255,16 @@ impl Display for OfferKey {
}
}

impl From<&protocol::OfferChannel> for OfferKey {
fn from(offer: &protocol::OfferChannel) -> Self {
Self {
interface_id: offer.interface_id,
instance_id: offer.instance_id,
subchannel_index: offer.subchannel_index,
}
}
}

/// Channel offer parameters.
#[derive(Debug, Clone, Default, mesh::MeshPayload)]
pub struct OfferParams {
Expand Down
47 changes: 37 additions & 10 deletions vm/devices/vmbus/vmbus_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use vmbus_async::async_dgram::AsyncRecv;
use vmbus_async::async_dgram::AsyncRecvExt;
use vmbus_channel::bus::GpadlRequest;
use vmbus_channel::bus::ModifyRequest;
use vmbus_channel::bus::OfferKey;
use vmbus_channel::bus::OpenData;
use vmbus_channel::gpadl::GpadlId;
use vmbus_core::HvsockConnectRequest;
Expand Down Expand Up @@ -810,9 +811,13 @@ impl ClientTask {
}

fn handle_rescind(&mut self, rescind: protocol::RescindChannelOffer) -> TriedRelease {
tracing::info!(state = %self.state, channel_id = rescind.channel_id.0, "received rescind");

let mut channel = self.channels.get_mut(rescind.channel_id);
tracing::info!(
state = %self.state,
channel_id = rescind.channel_id.0,
key = %OfferKey::from(&channel.offer),
"received rescind"
);
let event_flag = match std::mem::replace(&mut channel.state, ChannelState::Revoked) {
ChannelState::Offered => None,
ChannelState::Opening {
Expand Down Expand Up @@ -898,14 +903,14 @@ impl ClientTask {
}

fn handle_open_result(&mut self, result: protocol::OpenResult) {
let mut channel = self.channels.get_mut(result.channel_id);
tracing::debug!(
channel_id = result.channel_id.0,
key = %OfferKey::from(&channel.offer),
result = result.status,
"received open result"
);

let mut channel = self.channels.get_mut(result.channel_id);

let channel_opened = result.status == protocol::STATUS_SUCCESS as u32;
let old_state = std::mem::replace(&mut channel.state, ChannelState::Offered);
let ChannelState::Opening {
Expand All @@ -915,6 +920,7 @@ impl ClientTask {
} = old_state
else {
tracing::warn!(
key = %OfferKey::from(&channel.offer),
old_state = ?channel.state,
channel_opened,
"invalid state for open result"
Expand Down Expand Up @@ -946,13 +952,14 @@ impl ClientTask {
panic!("gpadl {:#x} not in teardown list", request.gpadl_id.0);
};

let mut channel = self.channels.get_mut(channel_id);
tracing::debug!(
gpadl_id = request.gpadl_id.0,
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"Received GpadlTorndown"
);

let mut channel = self.channels.get_mut(channel_id);
let gpadl_state = channel
.gpadls
.remove(&request.gpadl_id)
Expand Down Expand Up @@ -1108,7 +1115,12 @@ impl ClientTask {
}
}

tracing::info!(channel_id = channel_id.0, "opening channel on host");
tracing::info!(
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"opening channel on host"
);

let (request, rpc) = rpc.split();
let open_data = &request.open_data;

Expand Down Expand Up @@ -1241,6 +1253,7 @@ impl ClientTask {

tracing::trace!(
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
gpadl_id = request.id.0,
count = request.count,
len = request.buf.len(),
Expand Down Expand Up @@ -1286,6 +1299,7 @@ impl ClientTask {
tracing::warn!(
gpadl_id = gpadl_id.0,
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"Gpadl teardown for unknown gpadl or revoked channel"
);
return;
Expand All @@ -1296,6 +1310,7 @@ impl ClientTask {
tracing::warn!(
gpadl_id = gpadl_id.0,
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"gpadl teardown for offered gpadl"
);
}
Expand Down Expand Up @@ -1388,6 +1403,7 @@ impl ClientTask {
if let ChannelState::Opened { .. } = channel.state {
tracing::warn!(
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"Channel dropped without closing first"
);
self.inner.close_channel(channel_id, &mut channel);
Expand Down Expand Up @@ -1576,13 +1592,19 @@ impl ClientTaskInner {
if let Some(flag) = redirected_event_flag {
self.synic.free_event_flag(flag);
}
tracing::info!(channel_id = channel_id.0, "closing channel on host");
tracing::info!(
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
"closing channel on host"
);

self.messages.send(&protocol::CloseChannel { channel_id });
channel.state = ChannelState::Offered;
channel.connection_id.store(0, Ordering::Release);
} else {
tracing::warn!(
id = %channel_id.0,
channel_id = channel_id.0,
key = %OfferKey::from(&channel.offer),
channel_state = %channel.state,
"invalid channel state for close channel"
);
Expand Down Expand Up @@ -1744,15 +1766,20 @@ struct TriedRelease(());

impl ChannelRef<'_> {
/// If the channel has been fully released (revoked, released by the client,
/// no pending requests), notifes the server and removes this channel from
/// no pending requests), notifies the server and removes this channel from
/// the map.
fn try_release(self, messages: &mut OutgoingMessages) -> TriedRelease {
if self.is_client_released
&& matches!(self.state, ChannelState::Revoked)
&& self.pending_request().is_none()
{
let channel_id = *self.0.key();
tracelimit::info_ratelimited!(channel_id = channel_id.0, "releasing channel");
tracelimit::info_ratelimited!(
channel_id = channel_id.0,
key = %OfferKey::from(&self.offer),
"releasing channel"
);

messages.send(&protocol::RelIdReleased { channel_id });
self.0.remove();
}
Expand Down
23 changes: 20 additions & 3 deletions vm/devices/vmbus/vmbus_relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use vmbus_channel::bus::ChannelRequest;
use vmbus_channel::bus::ChannelServerRequest;
use vmbus_channel::bus::GpadlRequest;
use vmbus_channel::bus::ModifyRequest;
use vmbus_channel::bus::OfferKey;
use vmbus_channel::bus::OpenRequest;
use vmbus_client as client;
use vmbus_core::HvsockConnectRequest;
Expand Down Expand Up @@ -230,6 +231,8 @@ impl RelayChannelInfo {
struct RelayChannel {
/// The Channel Id given to us by the client
channel_id: ChannelId,
/// The identifying key for this channel.
key: OfferKey,
/// Receives requests from the relay.
#[inspect(skip)]
relay_request_recv: mesh::Receiver<RelayChannelRequest>,
Expand Down Expand Up @@ -331,7 +334,7 @@ impl RelayChannelTask {

fn handle_gpadl_teardown(&mut self, rpc: Rpc<GpadlId, ()>) {
let (gpadl_id, rpc) = rpc.split();
tracing::trace!(gpadl_id = gpadl_id.0, "Tearing down GPADL");
tracing::trace!(gpadl_id = gpadl_id.0, key = %self.channel.key, "Tearing down GPADL");

let call = self
.channel
Expand All @@ -342,9 +345,11 @@ impl RelayChannelTask {
// message immediately, for example if the channel is still open and the host device still
// has the gpadl mapped. We should not block further requests while waiting for the
// response.
let key = self.channel.key;
self.channel.gpadls_tearing_down.push(Box::pin(async move {
if let Err(err) = call.await {
tracing::warn!(
%key,
error = &err as &dyn std::error::Error,
"failed to send gpadl teardown"
);
Expand All @@ -365,7 +370,7 @@ impl RelayChannelTask {

/// Dispatch requests sent by VTL0
async fn handle_server_request(&mut self, request: ChannelRequest) -> Result<()> {
tracing::trace!(request = ?request, "received channel request");
tracing::trace!(key = %self.channel.key, request = ?request, "received channel request");
match request {
ChannelRequest::Open(rpc) => {
rpc.handle(async |open_request| {
Expand All @@ -374,6 +379,7 @@ impl RelayChannelTask {
.inspect_err(|err| {
tracelimit::error_ratelimited!(
err = err.as_ref() as &dyn std::error::Error,
key = %self.channel.key,
channel_id = self.channel.channel_id.0,
"failed to open channel"
);
Expand All @@ -390,6 +396,7 @@ impl RelayChannelTask {
.inspect_err(|err| {
tracelimit::error_ratelimited!(
err = err.as_ref() as &dyn std::error::Error,
key = %self.channel.key,
channel_id = self.channel.channel_id.0,
gpadl_id = id.0,
"failed to create gpadl"
Expand Down Expand Up @@ -418,6 +425,7 @@ impl RelayChannelTask {
async fn handle_relay_request(&mut self, request: RelayChannelRequest) {
tracing::trace!(
channel_id = self.channel.channel_id.0,
key = %self.channel.key,
?request,
"received relay request"
);
Expand Down Expand Up @@ -493,7 +501,11 @@ impl RelayChannelTask {
// that the channel has been revoked.
while let Some(()) = self.channel.gpadls_tearing_down.next().await {}

tracing::debug!(channel_id = %self.channel.channel_id.0, "dropped channel");
tracing::debug!(
channel_id = %self.channel.channel_id.0,
key = %self.channel.key,
"dropped channel"
);

// Dropping the channel would revoke it, but since that's not synchronized there's a chance
// we reoffer the channel before the server receives the revoke. Using the request ensures
Expand All @@ -506,6 +518,7 @@ impl RelayChannelTask {
{
tracing::warn!(
channel_id = self.channel.channel_id.0,
key = %self.channel.key,
err = &err as &dyn std::error::Error,
"failed to send revoke request"
);
Expand Down Expand Up @@ -706,6 +719,7 @@ impl RelayTask {
driver: Arc::clone(&self.spawner),
channel: RelayChannel {
channel_id: ChannelId(channel_id),
key,
relay_request_recv,
request_send: offer.request_send,
revoke_recv: offer.revoke_recv,
Expand Down Expand Up @@ -808,6 +822,7 @@ impl RelayTask {
Err(err) => {
tracing::error!(
error = err.as_ref() as &dyn std::error::Error,
?request,
"failed add hvsock offer"
);
false
Expand All @@ -822,9 +837,11 @@ impl RelayTask {
}

async fn handle_offer_request(&mut self, request: client::OfferInfo) -> Result<()> {
let offer = request.offer;
if let Err(err) = self.handle_offer(request).await {
tracing::error!(
error = err.as_ref() as &dyn std::error::Error,
?offer,
"failed to hot add offer"
);
}
Expand Down
Loading