Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
derive_builder = "0.20.2"

# Async
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] }
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
futures-util = "0.3.5"
Expand Down
14 changes: 11 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{cdn::CdnKind, storage::StorageKind};
use anyhow::{Context, Result, anyhow, bail};
use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration};
use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration};
use tracing::trace;
use url::Url;

Expand Down Expand Up @@ -209,10 +209,10 @@ impl Config {
.cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?))
.cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?)
.cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?)
.local_archive_cache_path(env(
.local_archive_cache_path(ensure_absolute_path(env(
"DOCSRS_ARCHIVE_INDEX_CACHE_PATH",
prefix.join("archive_cache"),
)?)
)?)?)
.compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?)
.temp_dir(temp_dir)
.rustwide_workspace(env(
Expand All @@ -235,6 +235,14 @@ impl Config {
}
}

fn ensure_absolute_path(path: PathBuf) -> io::Result<PathBuf> {
if path.is_absolute() {
Ok(path)
} else {
Ok(path::absolute(&path)?)
}
}

fn env<T>(var: &str, default: T) -> Result<T>
where
T: FromStr,
Expand Down
118 changes: 66 additions & 52 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
};
use anyhow::{anyhow, bail};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use fn_error_context::context;
use futures_util::{TryStreamExt as _, stream::BoxStream};
use mime::Mime;
Expand All @@ -30,24 +31,27 @@ use std::{
fmt,
fs::{self, File},
io::{self, BufReader},
iter,
num::ParseIntError,
ops::RangeInclusive,
path::{Path, PathBuf},
str::FromStr,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use std::{iter, str::FromStr};
use tokio::{
io::{AsyncRead, AsyncWriteExt},
runtime,
sync::RwLock,
};
use tracing::{error, info, info_span, instrument, trace, warn};
use tracing_futures::Instrument as _;
use walkdir::WalkDir;

const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00];
const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";

type FileRange = RangeInclusive<u64>;

Expand Down Expand Up @@ -186,6 +190,8 @@ enum StorageBackend {
pub struct AsyncStorage {
backend: StorageBackend,
config: Arc<Config>,
/// Locks to synchronize access to the locally cached archive index files.
locks: DashMap<PathBuf, Arc<RwLock<()>>>,
}

impl AsyncStorage {
Expand All @@ -204,6 +210,7 @@ impl AsyncStorage {
}
},
config,
locks: DashMap::new(),
})
}

Expand Down Expand Up @@ -318,12 +325,10 @@ impl AsyncStorage {
path: &str,
) -> Result<bool> {
match self
.download_archive_index(archive_path, latest_build_id)
.find_in_archive_index(archive_path, latest_build_id, path)
.await
{
Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)
.await?
.is_some()),
Ok(file_info) => Ok(file_info.is_some()),
Err(err) => {
if err.downcast_ref::<PathNotFoundError>().is_some() {
Ok(false)
Expand Down Expand Up @@ -384,41 +389,67 @@ impl AsyncStorage {
Ok(blob.decompress())
}

fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<RwLock<()>> {
let local_index_path = local_index_path.as_ref().to_path_buf();

self.locks
.entry(local_index_path)
.or_insert_with(|| Arc::new(RwLock::new(())))
.downgrade()
.clone()
}

#[instrument]
pub(super) async fn download_archive_index(
async fn find_in_archive_index(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
) -> Result<PathBuf> {
// remote/folder/and/x.zip.index
let remote_index_path = format!("{archive_path}.index");
path_in_archive: &str,
) -> Result<Option<archive_index::FileInfo>> {
// we know that config.local_archive_cache_path is an absolute path, not relative.
// So it will be usable as key in the DashMap.
let local_index_path = self.config.local_archive_cache_path.join(format!(
"{archive_path}.{}.index",
"{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}",
latest_build_id.map(|id| id.0).unwrap_or(0)
));

if !local_index_path.exists() {
let index_content = self.get(&remote_index_path, usize::MAX).await?.content;
let rwlock = self.local_index_cache_lock(&local_index_path);

tokio::fs::create_dir_all(
local_index_path
.parent()
.ok_or_else(|| anyhow!("index path without parent"))?,
)
.await?;
// directly acquire the read-lock, so the syscall (`path.exists()`) below is already
// protected.
let mut _read_guard = rwlock.read().await;

if !tokio::fs::try_exists(&local_index_path).await? {
// upgrade the lock to a write-lock for downloading & storing the index.
drop(_read_guard);
let _write_guard = rwlock.write().await;

// when we don't have a locally cached index and many parallel request
// we might download the same archive index multiple times here.
// So we're storing the content into a temporary file before renaming it
// into the final location.
let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)?
.into_temp_path();
let mut file = tokio::fs::File::create(&temp_path).await?;
file.write_all(&index_content).await?;
tokio::fs::rename(temp_path, &local_index_path).await?;
// check existence again in case of Race Condition (TOCTOU)
if !tokio::fs::try_exists(&local_index_path).await? {
// remote/folder/and/x.zip.index
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");

tokio::fs::create_dir_all(
local_index_path
.parent()
.ok_or_else(|| anyhow!("index path without parent"))?,
)
.await?;

{
let mut file = tokio::fs::File::create(&local_index_path).await?;
let mut stream = self.get_stream(&remote_index_path).await?.content;

tokio::io::copy(&mut stream, &mut file).await?;

file.flush().await?;
}
}

_read_guard = _write_guard.downgrade();
}

Ok(local_index_path)
archive_index::find_in_file(local_index_path, path_in_archive).await
}

#[instrument]
Expand All @@ -429,11 +460,8 @@ impl AsyncStorage {
path: &str,
max_size: usize,
) -> Result<Blob> {
let index_filename = self
.download_archive_index(archive_path, latest_build_id)
.await?;

let info = archive_index::find_in_file(index_filename, path)
let info = self
.find_in_archive_index(archive_path, latest_build_id, path)
.await?
.ok_or(PathNotFoundError)?;

Expand Down Expand Up @@ -463,11 +491,8 @@ impl AsyncStorage {
latest_build_id: Option<BuildId>,
path: &str,
) -> Result<StreamingBlob> {
let index_filename = self
.download_archive_index(archive_path, latest_build_id)
.await?;

let info = archive_index::find_in_file(index_filename, path)
let info = self
.find_in_archive_index(archive_path, latest_build_id, path)
.await?
.ok_or(PathNotFoundError)?;

Expand Down Expand Up @@ -540,7 +565,7 @@ impl AsyncStorage {
.await?;

let alg = CompressionAlgorithm::default();
let remote_index_path = format!("{}.index", &archive_path);
let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path);
let compressed_index_content = {
let _span = info_span!("create_archive_index", %remote_index_path).entered();

Expand Down Expand Up @@ -994,17 +1019,6 @@ impl Storage {
.block_on(self.inner.get_range(path, max_size, range, compression))
}

pub(super) fn download_index(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
) -> Result<PathBuf> {
self.runtime.block_on(
self.inner
.download_archive_index(archive_path, latest_build_id),
)
}

pub(crate) fn get_from_archive(
&self,
archive_path: &str,
Expand Down Expand Up @@ -1801,12 +1815,12 @@ mod backend_tests {
.inner
.config
.local_archive_cache_path
.join("folder/test.zip.0.index");
.join(format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}"));

let (stored_files, compression_alg) =
storage.store_all_in_archive("folder/test.zip", dir.path())?;

assert!(storage.exists("folder/test.zip.index")?);
assert!(storage.exists(&format!("folder/test.zip.{ARCHIVE_INDEX_FILE_EXTENSION}"))?);

assert_eq!(compression_alg, CompressionAlgorithm::Bzip2);
assert_eq!(stored_files.len(), files.len());
Expand Down
Loading