Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ humantime = "2.1.0"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
171 changes: 171 additions & 0 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::sync::Arc;

use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;

use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
};

use self::{column::Column, snapshot::ManifestItem};

pub mod column;
pub mod manifest;
pub mod snapshot;

pub use manifest::create_from_parquet_file;

pub trait Snapshot {
fn manifests(&self, time_predicates: Vec<PartialTimeFilter>) -> Vec<ManifestItem>;
}

pub trait ManifestFile {
fn file_name(&self) -> &str;
fn ingestion_size(&self) -> u64;
fn file_size(&self) -> u64;
fn num_rows(&self) -> u64;
fn columns(&self) -> &[Column];
}

impl ManifestFile for manifest::File {
fn file_name(&self) -> &str {
&self.file_path
}

fn ingestion_size(&self) -> u64 {
self.ingestion_size
}

fn file_size(&self) -> u64 {
self.file_size
}

fn num_rows(&self) -> u64 {
self.num_rows
}

fn columns(&self) -> &[Column] {
self.columns.as_slice()
}
}

pub async fn update_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
change: manifest::File,
) -> Result<(), ObjectStorageError> {
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

let (lower_bound, _) = get_file_bounds(&change);
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});

// We update the manifest referenced by this position
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta).await?;
}

Ok(())
}

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
stream: &str,
lower_bound: DateTime<Utc>,
upper_bound: DateTime<Utc>,
) -> RelativePathBuf {
let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string();
let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string();
if lower == upper {
RelativePathBuf::from_iter([stream, &format!("date={}", lower)])
} else {
RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)])
}
}
146 changes: 146 additions & 0 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::cmp::{max, min};

use parquet::file::statistics::Statistics;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BoolType {
pub min: bool,
pub max: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Float64Type {
pub min: f64,
pub max: f64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Int64Type {
pub min: i64,
pub max: i64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Utf8Type {
pub min: String,
pub max: String,
}

// Typed statistics are typed variant of statistics
// Currently all parquet types are casted down to these 4 types
// Binary types are assumed to be of valid Utf8
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum TypedStatistics {
Bool(BoolType),
Int(Int64Type),
Float(Float64Type),
String(Utf8Type),
}

impl TypedStatistics {
pub fn update(self, other: Self) -> Self {
match (self, other) {
(TypedStatistics::Bool(this), TypedStatistics::Bool(other)) => {
TypedStatistics::Bool(BoolType {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min),
max: this.max.max(other.max),
})
}
(TypedStatistics::Int(this), TypedStatistics::Int(other)) => {
TypedStatistics::Int(Int64Type {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
(TypedStatistics::String(this), TypedStatistics::String(other)) => {
TypedStatistics::String(Utf8Type {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
_ => panic!("Cannot update wrong types"),
}
}
}

/// Column statistics are used to track statistics for a column in a given file.
/// This is similar to and derived from parquet statistics.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Column {
pub name: String,
pub stats: Option<TypedStatistics>,
pub uncompressed_size: u64,
pub compressed_size: u64,
}

impl TryFrom<&Statistics> for TypedStatistics {
type Error = parquet::errors::ParquetError;
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
if !value.has_min_max_set() {
return Err(parquet::errors::ParquetError::General(
"min max is not set".to_string(),
));
}

let res = match value {
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min() as i64,
max: *stats.max() as i64,
}),
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min().to_i64(),
max: stats.max().to_i64(),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min() as f64,
max: *stats.max() as f64,
}),
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
}),
};

Ok(res)
}
}
Loading