Skip to content

Commit e1e2f2a

Browse files
author
Devdutt Shenoi
authored
refactor: DRY object_storage (#1147)
New Features: Introduced a unified directory listing capability for managing stored content. Added new timeout settings and improved utilities for consistent path handling. Bug Fixes: Enhanced logging format for better readability in error messages. Refactor: - Removed legacy methods for fetching detailed user data to simplify functionality. - Updated file upload procedures for better consistency and maintainability. - Consolidated internal dependencies to enhance clarity and streamline operations. - Enhanced the ObjectStorage trait with new asynchronous methods for better data management. - Improved organization and clarity in the S3 storage implementation.
1 parent cf59e4d commit e1e2f2a

File tree

6 files changed

+236
-446
lines changed

6 files changed

+236
-446
lines changed

src/parseable/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,16 +761,14 @@ impl Parseable {
761761
.await
762762
{
763763
error!(
764-
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
765-
stream_name
764+
"Failed to update first_event_at in storage for stream {stream_name:?}: {err:?}"
766765
);
767766
}
768767

769768
match self.get_stream(stream_name) {
770769
Ok(stream) => stream.set_first_event_at(first_event_at),
771770
Err(err) => error!(
772-
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
773-
stream_name
771+
"Failed to update first_event_at in stream info for stream {stream_name:?}: {err:?}"
774772
),
775773
}
776774

src/storage/azure_blob.rs

Lines changed: 42 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,46 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
use super::object_storage::parseable_json_path;
19-
use super::{
20-
ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
21-
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
18+
19+
use std::{
20+
collections::{BTreeMap, HashSet},
21+
path::Path,
22+
sync::Arc,
23+
time::{Duration, Instant},
2224
};
25+
2326
use async_trait::async_trait;
2427
use bytes::Bytes;
25-
use datafusion::datasource::listing::ListingTableUrl;
26-
use datafusion::datasource::object_store::{
27-
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
28+
use datafusion::{
29+
datasource::listing::ListingTableUrl,
30+
execution::{
31+
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
32+
runtime_env::RuntimeEnvBuilder,
33+
},
34+
};
35+
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
36+
use object_store::{
37+
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
38+
limit::LimitStore,
39+
path::Path as StorePath,
40+
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
2841
};
29-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
30-
use futures::stream::FuturesUnordered;
31-
use futures::{StreamExt, TryStreamExt};
32-
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
33-
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
3442
use relative_path::{RelativePath, RelativePathBuf};
35-
use std::path::Path as StdPath;
3643
use tracing::{error, info};
3744
use url::Url;
3845

39-
use super::metrics_layer::MetricLayer;
40-
use crate::handlers::http::users::USERS_ROOT_DIR;
41-
use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
42-
use crate::metrics::storage::StorageMetrics;
43-
use crate::parseable::LogStream;
44-
use object_store::limit::LimitStore;
45-
use object_store::path::Path as StorePath;
46-
use std::collections::{BTreeMap, HashMap, HashSet};
47-
use std::sync::Arc;
48-
use std::time::{Duration, Instant};
46+
use crate::{
47+
handlers::http::users::USERS_ROOT_DIR,
48+
metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics},
49+
parseable::LogStream,
50+
};
4951

50-
const CONNECT_TIMEOUT_SECS: u64 = 5;
51-
const REQUEST_TIMEOUT_SECS: u64 = 300;
52+
use super::{
53+
metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path,
54+
ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS,
55+
PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
56+
STREAM_ROOT_DIRECTORY,
57+
};
5258

5359
#[derive(Debug, Clone, clap::Args)]
5460
#[command(
@@ -161,7 +167,7 @@ impl ObjectStorageProvider for AzureBlobConfig {
161167
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);
162168
let azure = MetricLayer::new(azure);
163169

164-
let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
170+
let object_store_registry = DefaultObjectStoreRegistry::new();
165171
let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account))
166172
.unwrap();
167173
object_store_registry.register_store(url.as_ref(), Arc::new(azure));
@@ -190,10 +196,6 @@ impl ObjectStorageProvider for AzureBlobConfig {
190196
}
191197
}
192198

193-
pub fn to_object_store_path(path: &RelativePath) -> StorePath {
194-
StorePath::from(path.as_str())
195-
}
196-
197199
// ObjStoreClient is generic client to enable interactions with different cloudprovider's
198200
// object store such as S3 and Azure Blob
199201
#[derive(Debug)]
@@ -347,7 +349,7 @@ impl BlobStore {
347349
}
348350
Ok(result_file_list)
349351
}
350-
async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
352+
async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
351353
let instant = Instant::now();
352354

353355
// // TODO: Uncomment this when multipart is fixed
@@ -376,7 +378,7 @@ impl BlobStore {
376378
}
377379

378380
// TODO: introduce parallel, multipart-uploads if required
379-
// async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
381+
// async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
380382
// let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2];
381383
// let mut file = OpenOptions::new().read(true).open(path).await?;
382384

@@ -623,7 +625,7 @@ impl ObjectStorage for BlobStore {
623625
Ok(files)
624626
}
625627

626-
async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
628+
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
627629
self._upload_file(key, path).await?;
628630

629631
Ok(())
@@ -663,126 +665,21 @@ impl ObjectStorage for BlobStore {
663665
.collect::<Vec<_>>())
664666
}
665667

666-
async fn get_all_dashboards(
667-
&self,
668-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
669-
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
670-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
671-
let resp = self
672-
.client
673-
.list_with_delimiter(Some(&users_root_path))
674-
.await?;
675-
676-
let users = resp
677-
.common_prefixes
678-
.iter()
679-
.flat_map(|path| path.parts())
680-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
681-
.map(|name| name.as_ref().to_string())
682-
.collect::<Vec<_>>();
683-
for user in users {
684-
let user_dashboard_path =
685-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards"));
686-
let dashboards_path = RelativePathBuf::from(&user_dashboard_path);
687-
let dashboard_bytes = self
688-
.get_objects(
689-
Some(&dashboards_path),
690-
Box::new(|file_name| file_name.ends_with(".json")),
691-
)
692-
.await?;
693-
694-
dashboards
695-
.entry(dashboards_path)
696-
.or_default()
697-
.extend(dashboard_bytes);
698-
}
699-
Ok(dashboards)
700-
}
701-
702-
async fn get_all_saved_filters(
668+
async fn list_dirs_relative(
703669
&self,
704-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
705-
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
706-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
707-
let resp = self
708-
.client
709-
.list_with_delimiter(Some(&users_root_path))
710-
.await?;
670+
relative_path: &RelativePath,
671+
) -> Result<Vec<String>, ObjectStorageError> {
672+
let prefix = object_store::path::Path::from(relative_path.as_str());
673+
let resp = self.client.list_with_delimiter(Some(&prefix)).await?;
711674

712-
let users = resp
675+
Ok(resp
713676
.common_prefixes
714677
.iter()
715678
.flat_map(|path| path.parts())
716-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
717679
.map(|name| name.as_ref().to_string())
718-
.collect::<Vec<_>>();
719-
for user in users {
720-
let user_filters_path =
721-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",));
722-
let resp = self
723-
.client
724-
.list_with_delimiter(Some(&user_filters_path))
725-
.await?;
726-
let streams = resp
727-
.common_prefixes
728-
.iter()
729-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
730-
.map(|name| name.as_ref().to_string())
731-
.collect::<Vec<_>>();
732-
for stream in streams {
733-
let filters_path = RelativePathBuf::from(&stream);
734-
let filter_bytes = self
735-
.get_objects(
736-
Some(&filters_path),
737-
Box::new(|file_name| file_name.ends_with(".json")),
738-
)
739-
.await?;
740-
filters
741-
.entry(filters_path)
742-
.or_default()
743-
.extend(filter_bytes);
744-
}
745-
}
746-
Ok(filters)
680+
.collect::<Vec<_>>())
747681
}
748682

749-
///fetch all correlations uploaded in object store
750-
/// return the correlation file path and all correlation json bytes for each file path
751-
async fn get_all_correlations(
752-
&self,
753-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
754-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
755-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
756-
let resp = self
757-
.client
758-
.list_with_delimiter(Some(&users_root_path))
759-
.await?;
760-
761-
let users = resp
762-
.common_prefixes
763-
.iter()
764-
.flat_map(|path| path.parts())
765-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
766-
.map(|name| name.as_ref().to_string())
767-
.collect::<Vec<_>>();
768-
for user in users {
769-
let user_correlation_path =
770-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations"));
771-
let correlations_path = RelativePathBuf::from(&user_correlation_path);
772-
let correlation_bytes = self
773-
.get_objects(
774-
Some(&correlations_path),
775-
Box::new(|file_name| file_name.ends_with(".json")),
776-
)
777-
.await?;
778-
779-
correlations
780-
.entry(correlations_path)
781-
.or_default()
782-
.extend(correlation_bytes);
783-
}
784-
Ok(correlations)
785-
}
786683
fn get_bucket_name(&self) -> String {
787684
self.container.clone()
788685
}

0 commit comments

Comments
 (0)