Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion adapter/src/ethereum/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub enum KeystoreError {
/// `address` key is missing from the keystore file
AddressMissing,
/// The `address` key in the keystore file is not a valid `ValidatorId`
AddressInvalid(primitives::DomainError),
AddressInvalid(primitives::address::Error),
/// reading the keystore file failed
ReadingFile(std::io::Error),
/// Deserializing the keystore file failed
Expand Down
6 changes: 6 additions & 0 deletions primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl fmt::Display for DomainError {
}
}

impl From<address::Error> for DomainError {
fn from(error: address::Error) -> Self {
Self::InvalidArgument(error.to_string())
}
}

impl error::Error for DomainError {
fn cause(&self) -> Option<&dyn error::Error> {
None
Expand Down
167 changes: 113 additions & 54 deletions primitives/src/sentry.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,112 @@
use crate::targeting::Rules;
use crate::validator::MessageTypes;
use crate::{BigNum, Channel, ChannelId, ValidatorId};
use crate::{
targeting::Rules,
validator::Type as MessageType,
validator::{ApproveState, Heartbeat, MessageTypes, NewState},
BigNum, Channel, ChannelId, ValidatorId,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::{collections::HashMap, fmt, hash::Hash};

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct LastApproved {
/// NewState can be None if the channel is brand new
pub new_state: Option<NewStateValidatorMessage>,
pub new_state: Option<MessageResponse<NewState>>,
/// ApproveState can be None if the channel is brand new
pub approve_state: Option<ApproveStateValidatorMessage>,
pub approve_state: Option<MessageResponse<ApproveState>>,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct NewStateValidatorMessage {
pub struct MessageResponse<T: MessageType> {
pub from: ValidatorId,
pub received: DateTime<Utc>,
pub msg: MessageTypes,
pub msg: message::Message<T>,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct ApproveStateValidatorMessage {
pub from: ValidatorId,
pub received: DateTime<Utc>,
pub msg: MessageTypes,
}
pub mod message {
use std::{convert::TryFrom, ops::Deref};

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct HeartbeatValidatorMessage {
pub from: ValidatorId,
pub received: DateTime<Utc>,
pub msg: MessageTypes,
use crate::validator::messages::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(try_from = "MessageTypes", into = "MessageTypes")]
pub struct Message<T: Type>(T);

impl<T: Type> Message<T> {
pub fn new(message: T) -> Self {
Self(message)
}

pub fn into_inner(self) -> T {
self.0
}
}

impl<T: Type> Deref for Message<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T: Type> TryFrom<MessageTypes> for Message<T> {
type Error = MessageTypeError<T>;

fn try_from(value: MessageTypes) -> Result<Self, Self::Error> {
<T as TryFrom<MessageTypes>>::try_from(value).map(Self)
}
}

impl<T: Type> Into<MessageTypes> for Message<T> {
fn into(self) -> MessageTypes {
self.0.into()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::sentry::MessageResponse;
use chrono::{TimeZone, Utc};
use serde_json::{from_value, json, to_value};

#[test]
fn de_serialization_of_a_message() {
let approve_state_message = json!({
"from":"0x2892f6C41E0718eeeDd49D98D648C789668cA67d",
"msg": {
"type":"ApproveState",
"stateRoot":"4739522efc1e81499541621759dadb331eaf08829d6a3851b4b654dfaddc9935",
"signature":"0x00128a39b715e87475666c3220fc0400bf34a84d24f77571d2b4e1e88b141d52305438156e526ff4fe96b7a13e707ab2f6f3ca00bd928dabc7f516b56cfe6fd61c",
"isHealthy":true,
"exhausted":false
},
"received":"2021-01-05T14:00:48.549Z"
});

let actual_message: MessageResponse<ApproveState> =
from_value(approve_state_message.clone()).expect("Should deserialize");
let expected_message = MessageResponse {
from: "0x2892f6C41E0718eeeDd49D98D648C789668cA67d".parse().expect("Valid ValidatorId"),
received: Utc.ymd(2021, 1, 5).and_hms_milli(14,0,48, 549),
msg: Message::new(ApproveState {
state_root: "4739522efc1e81499541621759dadb331eaf08829d6a3851b4b654dfaddc9935".to_string(),
signature: "0x00128a39b715e87475666c3220fc0400bf34a84d24f77571d2b4e1e88b141d52305438156e526ff4fe96b7a13e707ab2f6f3ca00bd928dabc7f516b56cfe6fd61c".to_string(),
is_healthy: true,
exhausted: false,
}),
};

pretty_assertions::assert_eq!(expected_message, actual_message);
pretty_assertions::assert_eq!(
to_value(expected_message).expect("should serialize"),
approve_state_message
);
}
}
}

#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -119,7 +191,7 @@ pub struct LastApprovedResponse {
/// None -> withHeartbeat=true wasn't passed
/// Some(vec![]) (empty vec) or Some(heartbeats) - withHeartbeat=true was passed
#[serde(default, skip_serializing_if = "Option::is_none")]
pub heartbeats: Option<Vec<HeartbeatValidatorMessage>>,
pub heartbeats: Option<Vec<MessageResponse<Heartbeat>>>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -232,16 +304,16 @@ pub mod channel_list {

#[cfg(feature = "postgres")]
mod postgres {
use super::{
ApproveStateValidatorMessage, HeartbeatValidatorMessage, NewStateValidatorMessage,
ValidatorMessage,
use super::{MessageResponse, ValidatorMessage};
use crate::{
sentry::EventAggregate,
validator::{messages::Type as MessageType, MessageTypes},
};
use crate::sentry::EventAggregate;
use crate::validator::MessageTypes;
use bytes::BytesMut;
use postgres_types::{accepts, to_sql_checked, IsNull, Json, ToSql, Type};
use std::error::Error;
use tokio_postgres::Row;
use serde::Deserialize;
use std::convert::TryFrom;
use tokio_postgres::{Error, Row};

impl From<&Row> for EventAggregate {
fn from(row: &Row) -> Self {
Expand All @@ -263,33 +335,20 @@ mod postgres {
}
}

impl From<&Row> for ApproveStateValidatorMessage {
fn from(row: &Row) -> Self {
Self {
from: row.get("from"),
received: row.get("received"),
msg: row.get::<_, Json<MessageTypes>>("msg").0,
}
}
}

impl From<&Row> for NewStateValidatorMessage {
fn from(row: &Row) -> Self {
Self {
from: row.get("from"),
received: row.get("received"),
msg: row.get::<_, Json<MessageTypes>>("msg").0,
}
}
}
impl<T> TryFrom<&Row> for MessageResponse<T>
where
T: MessageType,
for<'de> T: Deserialize<'de>,
{
type Error = Error;

impl From<&Row> for HeartbeatValidatorMessage {
fn from(row: &Row) -> Self {
Self {
fn try_from(row: &Row) -> Result<Self, Self::Error> {
Ok(Self {
from: row.get("from"),
received: row.get("received"),
msg: row.get::<_, Json<MessageTypes>>("msg").0,
}
// guard against mistakes from wrong Queries
msg: row.try_get::<_, Json<_>>("msg")?.0,
})
}
}

Expand All @@ -298,7 +357,7 @@ mod postgres {
&self,
ty: &Type,
w: &mut BytesMut,
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
Json(self).to_sql(ty, w)
}

Expand Down
Loading