Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ pub async fn update_snapshot(
let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
let p = format!(
"{}.{}.{}",
s.domain().unwrap(),
s.port().unwrap_or_default(),
MANIFEST_FILE
);
if m.manifest_path.contains(&p) {
ch = true;
}
Expand Down Expand Up @@ -152,7 +157,12 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = format!(
"{}.{}.{}",
addr.domain().unwrap(),
addr.port().unwrap_or_default(),
MANIFEST_FILE
);
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
Expand Down Expand Up @@ -186,7 +196,12 @@ pub async fn update_snapshot(
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = format!(
"{}.{}.{}",
addr.domain().unwrap(),
addr.port().unwrap(),
MANIFEST_FILE
);
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
Expand Down
14 changes: 7 additions & 7 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Cli {
pub mode: Mode,

/// public address for the parseable server ingestor
pub ingestor_url: String,
pub ingestor_endpoint: String,
}

impl Cli {
Expand All @@ -115,7 +115,7 @@ impl Cli {
pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
pub const MODE: &'static str = "mode";
pub const INGESTOR_URL: &'static str = "ingestor-url";
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
pub const DEFAULT_USERNAME: &'static str = "admin";
pub const DEFAULT_PASSWORD: &'static str = "admin";

Expand Down Expand Up @@ -317,9 +317,9 @@ impl Cli {
.help("Mode of operation"),
)
.arg(
Arg::new(Self::INGESTOR_URL)
.long(Self::INGESTOR_URL)
.env("P_INGESTOR_URL")
Arg::new(Self::INGESTOR_ENDPOINT)
.long(Self::INGESTOR_ENDPOINT)
.env("P_INGESTOR_ENDPOINT")
.value_name("URL")
.required(false)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
Expand Down Expand Up @@ -367,8 +367,8 @@ impl FromArgMatches for Cli {
.cloned()
.expect("default value for address");

self.ingestor_url = m
.get_one::<String>(Self::INGESTOR_URL)
self.ingestor_endpoint = m
.get_one::<String>(Self::INGESTOR_ENDPOINT)
.cloned()
.unwrap_or_else(String::default);

Expand Down
15 changes: 12 additions & 3 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ impl IngestServer {
let store = CONFIG.storage().get_object_store();

let sock = get_address();
let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string());
let path = ingestor_metadata_path(
sock.domain().unwrap().to_string(),
sock.port().unwrap_or_default().to_string(),
);

if store.get_object(&path).await.is_ok() {
println!("ingestor metadata already exists");
Expand All @@ -191,13 +194,19 @@ impl IngestServer {

let scheme = CONFIG.parseable.get_scheme();
let resource = IngestorMetadata::new(
sock.port().to_string(),
sock.port().unwrap_or_default().to_string(),
CONFIG
.parseable
.domain_address
.clone()
.unwrap_or_else(|| {
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
Url::parse(&format!(
"{}://{}:{}",
scheme,
sock.domain().unwrap(),
sock.port().unwrap_or_default()
))
.unwrap()
})
.to_string(),
DEFAULT_VERSION.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion server/src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ struct StorageMetrics {
impl Default for Metrics {
fn default() -> Self {
let socket = get_address();
let address = format!("http://{}:{}", socket.ip(), socket.port());
let address = format!(
"http://{}:{}",
socket.domain().unwrap(),
socket.port().unwrap_or_default()
);
Metrics {
address,
parseable_events_ingested: 0.0,
Expand Down
15 changes: 10 additions & 5 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
let addr = get_address();
let file_name = format!(
".ingestor.{}.{}{}",
addr.ip(),
addr.port(),
addr.domain().unwrap(),
addr.port().unwrap_or_default(),
SCHEMA_FILE_NAME
);

Expand All @@ -561,8 +561,8 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
let addr = get_address();
let file_name = format!(
".ingestor.{}.{}{}",
addr.ip(),
addr.port(),
addr.domain().unwrap(),
addr.port().unwrap_or_default(),
STREAM_METADATA_FILE_NAME
);
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
Expand All @@ -589,7 +589,12 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf {
#[inline(always)]
fn manifest_path(prefix: &str) -> RelativePathBuf {
let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = format!(
"{}.{}.{}",
addr.domain().unwrap(),
addr.port().unwrap_or_default(),
MANIFEST_FILE
);
RelativePathBuf::from_iter([prefix, &mainfest_file_name])
}

Expand Down
4 changes: 2 additions & 2 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ impl StorageDir {
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();
let local_uri = str::replace(&uri, "/", ".");
let sock = get_address();
let ip = sock.ip();
let port = sock.port();
let ip = sock.domain().unwrap();
let port = sock.port().unwrap_or_default();
format!("{local_uri}{ip}.{port}.{extention}")
}

Expand Down
61 changes: 37 additions & 24 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub mod update;
use crate::option::CONFIG;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use std::env;
#[allow(unused_imports)]
use std::net::SocketAddr;
use url::Url;

#[allow(dead_code)]
pub fn hostname() -> Option<String> {
Expand Down Expand Up @@ -224,32 +226,43 @@ impl TimePeriod {
}
}

#[inline(always)]
pub fn get_address() -> SocketAddr {
if CONFIG.parseable.ingestor_url.is_empty() {
CONFIG.parseable.address.parse::<SocketAddr>().unwrap()
} else {
let addr_from_env = CONFIG
.parseable
.ingestor_url
.split(':')
.collect::<Vec<&str>>();

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = get_from_env(&var_hostname);
}
if port.starts_with('$') {
let var_port = port[1..].to_string();
port = get_from_env(&var_port);
}
format!("{}:{}", hostname, port)
.parse::<SocketAddr>()
.unwrap()
pub fn get_address() -> Url {
if CONFIG.parseable.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
CONFIG.parseable.get_scheme(),
CONFIG.parseable.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap();
}
let addr_from_env = CONFIG
.parseable
.ingestor_endpoint
.split(':')
.collect::<Vec<&str>>();

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = get_from_env(&var_hostname);
}
if !hostname.starts_with("http") {
hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname);
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = get_from_env(&var_port);
}
format!("{}:{}", hostname, port).parse::<Url>().unwrap()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a detailed comment here on what are we trying to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

/// util fuction to fetch value from an env var
fn get_from_env(var_to_fetch: &str) -> String {
env::var(var_to_fetch).unwrap_or_else(|_| "".to_string())
}
Expand Down