Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
c4a450e
wip: broken
Eshanatnight Feb 5, 2024
392d704
ran cargo fmt
Eshanatnight Feb 5, 2024
daa701b
made stuff public on crate level
Eshanatnight Feb 5, 2024
d5977cb
Staring with a Somewhat clean slate
Eshanatnight Feb 5, 2024
f5d859c
create server.rs + impl ParseableServer Trait
Eshanatnight Feb 5, 2024
15b1909
create ssl_acceptor.rs
Eshanatnight Feb 5, 2024
b32c539
General Implementation
Eshanatnight Feb 5, 2024
d601538
chore: move cli handler to a seperate file
Eshanatnight Feb 5, 2024
0f2ec9b
chore: changes to function and cli struct name, to avoid collision
Eshanatnight Feb 5, 2024
eb6d427
fix imports
Eshanatnight Feb 5, 2024
d8e28d1
chore: add LICENSE
Eshanatnight Feb 5, 2024
e40d2d4
chore: move the constants storage.rs
Eshanatnight Feb 5, 2024
0f9c680
chore: replaced sentinals with the constants
Eshanatnight Feb 5, 2024
9c96195
fix: forgot default
Eshanatnight Feb 5, 2024
abd702a
fix: make name more discriptive
Eshanatnight Feb 5, 2024
d7ca187
fix: from_iter not from
Eshanatnight Feb 5, 2024
0a50e84
using the modal servers
Eshanatnight Feb 5, 2024
e03e639
update struct name
Eshanatnight Feb 5, 2024
a2ab7c1
move server module from modal to http
Eshanatnight Feb 5, 2024
a56268c
update imports
Eshanatnight Feb 5, 2024
fc84446
fix changed mode to storage_mode for better clarity
Eshanatnight Feb 5, 2024
7852a52
revert back to module name modal
Eshanatnight Feb 5, 2024
c2dbb66
fix: remove call to ::default
Eshanatnight Feb 5, 2024
d56ddf3
run cargofmt
Eshanatnight Feb 5, 2024
cf19487
add constant for default version
Eshanatnight Feb 5, 2024
2047c9a
add new trait function get_bucket_name
Eshanatnight Feb 5, 2024
773a4bc
chore: move variables where it makes more sence
Eshanatnight Feb 5, 2024
3495ce1
add struct IngestorMetadata
Eshanatnight Feb 5, 2024
1079d71
add function to get ingestor address
Eshanatnight Feb 5, 2024
43698dc
WIP: ingestor config put_object
Eshanatnight Feb 5, 2024
5bced8e
ran cargo fmt
Eshanatnight Feb 6, 2024
dcb1f01
add some comments
Eshanatnight Feb 6, 2024
00fb597
move ingestor metadata struct
Eshanatnight Feb 6, 2024
0d7b7fa
clean up
Eshanatnight Feb 6, 2024
177cac1
fix: mod.rs mistake
Eshanatnight Feb 6, 2024
3dfc0c3
chore: rename function to something more discripive
Eshanatnight Feb 6, 2024
bbfbb7b
commiting for sanity
Eshanatnight Feb 6, 2024
8712cbb
misc
Eshanatnight Feb 7, 2024
afb8c7c
add root param to s3 struct
Eshanatnight Feb 7, 2024
dff79d7
does my life have any meaning?
Eshanatnight Feb 7, 2024
b08804b
remove upload interval from cli (rebase on main #fb3fd21)
Eshanatnight Feb 7, 2024
b93e63e
misc comment
Eshanatnight Feb 7, 2024
f6755ae
make deepsource happy
Eshanatnight Feb 8, 2024
f47b4fb
move data sync logic to a different file
Eshanatnight Feb 8, 2024
f6e351e
misc
Eshanatnight Feb 8, 2024
cb8ab46
add comments
Eshanatnight Feb 8, 2024
b5c1bd2
temp
Eshanatnight Feb 9, 2024
2073cb8
change origin to domain name in IngesterMetadata struct
Eshanatnight Feb 12, 2024
7901801
init trait method added
Eshanatnight Feb 12, 2024
3b5a8b0
misc
Eshanatnight Feb 12, 2024
0d650e9
rename function
Eshanatnight Feb 13, 2024
cf9d769
update trait ParseableServer
Eshanatnight Feb 13, 2024
851d070
update to IngesterMetadata Struct
Eshanatnight Feb 13, 2024
74dd2e4
make the routed inline
Eshanatnight Feb 13, 2024
03f1632
add custom metrics api points
Eshanatnight Feb 13, 2024
0cbf1e6
fix: User Api on Default Mode
Eshanatnight Feb 13, 2024
bde8bdd
misc: add comment
Eshanatnight Feb 13, 2024
e6e9965
fix: Metadata value mode
Eshanatnight Feb 13, 2024
7549449
remove redundant else statement
Eshanatnight Feb 13, 2024
94a45c6
fix: Check for Query Server Deployment
Eshanatnight Feb 14, 2024
65cee8b
feat: impl to_str for Mode Enum
Eshanatnight Feb 14, 2024
0a228c1
fix: Banner Changes
Eshanatnight Feb 14, 2024
8964cf1
chore: add unit test
Eshanatnight Feb 14, 2024
329c727
fix: ingester file name change
Eshanatnight Feb 14, 2024
a66f7b1
chore: refactor get_ingestor_info function
Eshanatnight Feb 14, 2024
fc88c69
fix: banner server status
Eshanatnight Feb 14, 2024
b4d6d23
fix: server behaviour when new ingest server is set up
Eshanatnight Feb 14, 2024
04788ce
fix: Ingest Server Not Ingesting when attatched to an old data store
Eshanatnight Feb 15, 2024
691dc7d
remove INGESTOR_FILE_EXTENSION constant
Eshanatnight Feb 15, 2024
ab17f5f
chore: remove commented out code
Eshanatnight Feb 15, 2024
0449fcc
Trait update
Eshanatnight Feb 15, 2024
676c976
rm rwlock
Eshanatnight Feb 15, 2024
a9676b4
chore: clean up
Eshanatnight Feb 15, 2024
a333c21
chore: clean up
Eshanatnight Feb 15, 2024
bb7a78a
debug out the written buff size
Eshanatnight Feb 15, 2024
08ab3a9
add sync tokio task to sync ingestor info
Eshanatnight Feb 15, 2024
4d4cb30
fix: bug where query.json was not being created properly
Eshanatnight Feb 16, 2024
093fbd0
update: change parquet file names to include port
Eshanatnight Feb 16, 2024
311ae8b
create a util func to get the sock_addr
Eshanatnight Feb 21, 2024
26a1170
manifest file name change
Eshanatnight Feb 21, 2024
755994a
debug: for working
Eshanatnight Feb 23, 2024
7f73462
fix: manifest not being created in multi mode
Eshanatnight Feb 23, 2024
72186b4
chore: remove debug macros
Eshanatnight Feb 23, 2024
a802a71
clean up
Eshanatnight Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/src/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ pub fn print_about(
eprint!(
"
{}
Version: \"v{}\"",
Version:\t\t\t\t\t\"v{}\"",
"About:".to_string().bold(),
current_version,
);
); // " " " "

if let Some(latest_release) = latest_release {
if latest_release.version > current_version {
Expand All @@ -103,8 +103,8 @@ pub fn print_about(

eprintln!(
"
Commit: \"{commit_hash}\"
Docs: \"https://logg.ing/docs\""
Commit:\t\t\t\t\t\t\"{commit_hash}\"
Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
);
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Report {
cpu_count,
memory_total_bytes: mem_total,
platform: platform().to_string(),
mode: CONFIG.mode_string().to_string(),
mode: CONFIG.get_storage_mode_string().to_string(),
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
metrics: build_metrics(),
Expand Down
32 changes: 17 additions & 15 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ pub async fn print(config: &Config, meta: &StorageMetadata) {

fn print_ascii_art() {
let ascii_name = r#"
`7MM"""Mq. *MM `7MM
MM `MM. MM MM
MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
.JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
`7MM"""Mq. *MM `7MM
MM `MM. MM MM
MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
.JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
"#;

eprint!("{ascii_name}");
Expand Down Expand Up @@ -77,12 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
Address: {}
Credentials: {}
LLM Status: \"{}\"",
Address:\t\t\t\t\t{}
Credentials:\t\t\t\t\t{}
Server Mode:\t\t\t\t\t\"{}\"
LLM Status:\t\t\t\t\t\"{}\"",
"Server:".to_string().bold(),
address,
credentials,
config.parseable.mode.to_str(),
llm_status
);
}
Expand All @@ -99,10 +101,10 @@ async fn storage_info(config: &Config) {
eprintln!(
"
{}
Mode: \"{}\"
Staging: \"{}\"",
Storage Mode:\t\t\t\t\t\"{}\"
Staging Path:\t\t\t\t\t\"{}\"",
"Storage:".to_string().bold(),
config.mode_string(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
);

Expand All @@ -114,7 +116,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Cache: \"{}\", (size: {})",
{:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
"",
path.display(),
size
Expand All @@ -123,7 +125,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Store: \"{}\", (latency: {:?})",
{:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
Expand Down
72 changes: 61 additions & 11 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use relative_path::RelativePathBuf;
use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
utils::get_address,
};

use self::{column::Column, snapshot::ManifestItem};
Expand Down Expand Up @@ -105,20 +106,67 @@ pub async fn update_snapshot(
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});

// if the mode in I.S. manifest needs to be created but it is not getting created because
// there is already a pos, to index into stream.json

// We update the manifest referenced by this position
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;

let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
if m.manifest_path.contains(&p) {
ch = true;
}
}
if ch {
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta).await?;
}
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand All @@ -137,7 +185,9 @@ pub async fn update_snapshot(
..Manifest::default()
};

let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
Expand Down
Loading