Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions baseten-performance-client/scripts/continous_latency.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,54 @@
"""leightweight benchmarking script for user-style continuous latency testing"""
"""lightweight benchmarking script for user-style continuous latency testing"""

import asyncio
import time
import random

from baseten_performance_client import PerformanceClient

client = PerformanceClient(
base_url="https://model-yqv4yjjq.api.baseten.co/environments/production/sync"
base_url="https://model-7wl2op73.api.baseten.co/environments/production/sync"
)


async def benchmark_every(
interval=0.01,
lb_split=128,
tokens_per_sentence=500,
sentences_per_request=1,
n_requests=1000,
interval=0.50, # 1/0.5 * 60 = 120 requests per minute
tokens_per_sentence=[500, 1000],
max_concurrent_requests_per_user=100,
sentences_per_request=100,
n_requests=10000,
n_users=1,
lb_split=256,
):
async def kick_off_task():
"""kicks of a single task to measure latency."""
try:
t = time.time()
await client.async_embed(
input=["Hello " * tokens_per_sentence] * sentences_per_request,
max_concurrent_requests=lb_split,
batch_size=1,
model="model",
result = await client.async_classify(
inputs=[
"Hello " # "Hello " is one token if concatinated with more text
* random.randint(tokens_per_sentence[0], tokens_per_sentence[1])
]
* sentences_per_request,
max_concurrent_requests=max_concurrent_requests_per_user,
batch_size=16,
# splits in smaller chunks to pack large requests more sparseley
max_chars_per_request=5000,
# hedge_delay=0.5,
)
# TODO: use total time or
return [(time.time() - t)]
total_time = result.total_time
individual_times = result.individual_request_times
print(
f"total time {total_time}, min {min(individual_times)} max {max(individual_times)} avg {sum(individual_times) / len(individual_times)}"
)
return [total_time]
except Exception as e:
print(f"Error in task: {e}")
return [(time.time() - t)]
return [float("inf")]

async def simulate_single_user(launches_blocking=False):
"""user may launch tasks with concurrency=1 (blocking=True)
or without any feedback at x inferval. (blocking=False)"""
all_tasks = []
await asyncio.sleep(random.uniform(0, 1)) # Stagger start times slightly
for _ in range(n_requests):
task = asyncio.create_task(kick_off_task())
if launches_blocking:
Expand All @@ -58,6 +69,7 @@ async def simulate_single_user(launches_blocking=False):

return all_times

await kick_off_task() # Warm-up call to avoid initial latency spikes
user_times = await asyncio.gather(
*[simulate_single_user(launches_blocking=False) for _ in range(n_users)]
)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "truss"
version = "0.11.11"
version = "0.11.12"
description = "A seamless bridge from model development to model delivery"
authors = [
{ name = "Pankaj Gupta", email = "[email protected]" },
Expand Down Expand Up @@ -43,7 +43,7 @@ dependencies = [
"ruff>=0.4.8",
"tenacity>=8.0.1",
"watchfiles>=0.19.0,<0.20",
"truss-transfer>=0.0.32,<0.0.36",
"truss-transfer>=0.0.32,<0.0.37",
]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion truss-transfer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion truss-transfer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "truss_transfer"
version = "0.0.34-rc0"
version = "0.0.36"
edition = "2021"

[lib]
Expand Down
29 changes: 22 additions & 7 deletions truss-transfer/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,28 @@ pub async fn handle_write_b10cache(download_path: &Path, cache_path: &Path) -> R
"Atomic rename: renaming incomplete cache file {:?} to final cache file {:?}",
incomplete_cache_path, cache_path
);
fs::rename(&incomplete_cache_path, cache_path)
.await
.with_context(|| {
format!(
"Failed to atomically rename incomplete cache file {incomplete_cache_path:?} to final cache file {cache_path:?}"
)
})?;
// Try atomic rename first, fall back to copy+delete if rename fails
match fs::rename(&incomplete_cache_path, cache_path).await {
Ok(()) => {
// Atomic rename succeeded
}
Err(rename_err) => {
// Rename failed, try copy + delete as fallback
if let Err(copy_err) = fs::copy(&incomplete_cache_path, cache_path).await {
return Err(anyhow::Error::new(copy_err).context(format!(
"Failed to rename {incomplete_cache_path:?} to {cache_path:?} (rename error: {rename_err}) and copy fallback also failed"
)));
}

// Copy succeeded, now remove the incomplete file
if let Err(remove_err) = fs::remove_file(&incomplete_cache_path).await {
// Log warning but don't fail - the cache file was successfully created
warn!(
"Failed to remove incomplete cache file {incomplete_cache_path:?} after copy: {remove_err}"
);
}
}
}

// Delete the local file as its copy is now in the cache.
info!("Deleting local file at {:?}", download_path);
Expand Down
15 changes: 13 additions & 2 deletions truss-transfer/src/cloud_range_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const RANGE_DOWNLOAD_CHUNK_MB: u64 = 8 * 1024 * 1024;
pub fn should_use_cloud_range_download(_file_size: u64) -> bool {
use crate::constants::TRUSS_TRANSFER_USE_RANGE_DOWNLOAD;

// Only use range downloads for files larger than threshold and when enabled
*TRUSS_TRANSFER_USE_RANGE_DOWNLOAD // && file_size > RANGE_DOWNLOAD_CHUNK_MB
// Only use range downloads
*TRUSS_TRANSFER_USE_RANGE_DOWNLOAD
}

/// High-concurrency range download using positioned writes for large files
Expand All @@ -55,6 +55,17 @@ pub async fn download_cloud_range_streaming(
chunk_size / (1024 * 1024),
max_concurrency
);
if file_size == 0 {
// Create an empty file, as some object stores do not support 0-byte range requests
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(local_path)
.await
.context("Failed to create empty file")?;
return Ok(());
}

let semaphore = Arc::new(Semaphore::new(max_concurrency));
// todo: parallel failures could be improved
Expand Down
2 changes: 0 additions & 2 deletions truss-transfer/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub static LAZY_DATA_RESOLVER_PATHS: &[&str] = &[
];

/// Cache directory for b10fs

pub static CACHE_DIR: Lazy<String> = Lazy::new(|| {
env::var("TRUSS_TRANSFER_CACHE_DIR")
.unwrap_or_else(|_| "/cache/org/artifacts/truss_transfer_managed_v1".to_string())
Expand Down Expand Up @@ -50,7 +49,6 @@ pub static TRUSS_TRANSFER_NUM_WORKERS: Lazy<u8> = Lazy::new(|| {
});

/// Environment variable for download directory

/// Cleanup hours for b10fs, initialized from the `TRUSS_TRANSFER_B10FS_CLEANUP_HOURS`
/// environment variable, with a default of 96 hours (4 days).
pub static TRUSS_TRANSFER_B10FS_CLEANUP_HOURS: Lazy<u64> = Lazy::new(|| {
Expand Down
4 changes: 2 additions & 2 deletions truss-transfer/src/download_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub async fn download_http_to_path_fast(
let auth_token = if is_hf_url {
get_hf_secret_from_file(runtime_secret_name)
} else {
info!("no hf token, since using {url}");
info!("no hf token, since using {sanitized_url}");
None
};

Expand All @@ -164,7 +164,7 @@ pub async fn download_http_to_path_fast(
}

// assure that the file got flushed, without asking each file to flush it
for i in (0..1000).rev() {
for i in (0..20).rev() {
if check_metadata_size(path, size).await {
break;
}
Expand Down
53 changes: 49 additions & 4 deletions truss-transfer/src/hf_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// copied from https://github.com/huggingface/hf_transfer/blob/main/src/lib.rs Apache License
// Do not modify.

use anyhow::{anyhow, Result};
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use log::{warn};
use rand::{rng, Rng};
use reqwest::header::{HeaderMap, HeaderValue, ToStrError, AUTHORIZATION, CONTENT_RANGE, RANGE};
use reqwest::Url;
Expand All @@ -16,8 +18,6 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use anyhow::{anyhow, Result};

const BASE_WAIT_TIME: usize = 300;
const MAX_WAIT_TIME: usize = 10_000;
const CHUNK_SIZE: usize = 10 * 1024 * 1024;
Expand Down Expand Up @@ -67,7 +67,53 @@ pub async fn download_async(
.header(RANGE, "bytes=0-0")
.send()
.await
.map_err(|err| anyhow!("Error while downloading: {err}"))?
.map_err(|err| anyhow!("Error while downloading: {err}"))?;

// Check if range request failed - fallback to regular download for any non-206 response
if response.status() != 206 {
warn!(
"Range requests not supported (status: {}), falling back to regular download for url {}",
response.status(),
// sanitize URL to avoid logging tokens
url.split('?').next().unwrap_or(&url)
);

// Simple fallback download without ranges
let response = client
.get(&url)
.headers(headers.clone())
.send()
.await
.map_err(|err| anyhow!("Error while downloading: {err}"))?
.error_for_status()
.map_err(|err| anyhow!(err.to_string()))?;

let mut file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&filename)
.await
.map_err(|err| anyhow!("Error while downloading: {err}"))?;

let bytes = response
.bytes()
.await
.map_err(|err| anyhow!("Error downloading: {err}"))?;

file.write_all(&bytes)
.await
.map_err(|err| anyhow!("Error writing: {err}"))?;

if let Some(ref callback) = callback {
callback(bytes.len());
}

return Ok(());
}

// Continue with original range-based download logic
let response = response
.error_for_status()
.map_err(|err| anyhow!(err.to_string()))?;

Expand Down Expand Up @@ -105,7 +151,6 @@ pub async fn download_async(
.ok_or(anyhow!("Error while downloading: No size was detected"))?
.parse()
.map_err(|err| anyhow!("Error while downloading: {err}"))?;

let mut handles = FuturesUnordered::new();
let semaphore = Arc::new(Semaphore::new(max_files));
let parallel_failures_semaphore = Arc::new(Semaphore::new(parallel_failures));
Expand Down
8 changes: 5 additions & 3 deletions truss-transfer/tests/test_create_bptr_with_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def sort_manifest(manifest):
return sorted(manifest, key=lambda x: x["uid"])


def test_dolly():
def test_dolly(pattern="*tokenizer_config.json"):
# fix the below models
models = [
truss_transfer.PyModelRepo(
Expand All @@ -36,7 +36,7 @@ def test_dolly():
runtime_secret_name="aws-secret-json",
volume_folder="julien_dummy",
kind="s3",
allow_patterns=["*tokenizer_config.json"],
allow_patterns=[pattern],
ignore_patterns=["*.pth", "*cache*", "original*", "*.lock", "*.metadata"],
)
]
Expand All @@ -48,6 +48,8 @@ def test_dolly():

print("Testing create_basetenpointer_from_models...")
result = truss_transfer.create_basetenpointer_from_models(models)
if pattern == "*":
return result
print("Success! Generated BasetenPointer manifest:")
# Parse and pretty print the JSON
manifest = json.loads(result)["pointers"]
Expand Down Expand Up @@ -90,7 +92,7 @@ def test_dolly():


def test_dolly_with_download():
manifest = test_dolly()
manifest = test_dolly(pattern="*")
Path("/static-bptr").mkdir(parents=True, exist_ok=True)
shutil.rmtree("/app/model_cache/julien_dummy", ignore_errors=True)
with open("/static-bptr/static-bptr-manifest.json", "w") as f:
Expand Down
2 changes: 2 additions & 0 deletions truss-transfer/tests/test_create_bptr_with_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def sort_manifest(manifest):
return sorted(manifest, key=lambda x: x["uid"])


@pytest.mark.skip(reason="Skipping GCS download test in CI")
def test_dolly():
# fix the below models
models = [
Expand Down Expand Up @@ -216,6 +217,7 @@ def test_dolly():
return result


@pytest.mark.skip(reason="Skipping GCS download test in CI")
def test_dolly_with_download():
manifest = test_dolly()
Path("/static-bptr").mkdir(parents=True, exist_ok=True)
Expand Down
2 changes: 1 addition & 1 deletion truss/templates/server.Dockerfile.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ COPY --chown={{ default_owner }} ./{{ config.data_dir }} ${APP_HOME}/data

{%- if model_cache_v2 %}
{# v0.0.9, keep synced with server_requirements.txt #}
RUN curl -sSL --fail --retry 5 --retry-delay 2 -o /usr/local/bin/truss-transfer-cli https://github.com/basetenlabs/truss/releases/download/v0.11.10rc5/truss-transfer-cli-v0.11.10rc5-linux-x86_64-unknown-linux-musl
RUN curl -sSL --fail --retry 5 --retry-delay 2 -o /usr/local/bin/truss-transfer-cli https://github.com/basetenlabs/truss/releases/download/v0.11.12rc4/truss-transfer-cli-v0.11.12rc4-linux-x86_64-unknown-linux-musl
RUN chmod +x /usr/local/bin/truss-transfer-cli
RUN mkdir /static-bptr
RUN echo "hash {{model_cache_hash}}"
Expand Down
2 changes: 1 addition & 1 deletion truss/templates/server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ psutil>=5.9.4
python-json-logger>=2.0.2
pyyaml>=6.0.0
requests>=2.31.0
truss-transfer==0.0.32
truss-transfer==0.0.36
uvicorn>=0.24.0
uvloop>=0.19.0
websockets>=10.0
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading