For: Developers testing custom Duroxide providers
Reference: See sqlite-stress/src/lib.rs for SQLite stress test implementation
Duroxide provides two types of tests for custom providers:
- Measures: throughput, latency, success rate
- Use
provider-testfeature - See Stress Tests section below
- Validates: atomicity, locking, error handling, queue semantics, management capabilities
- Use
provider-testfeature (same as stress tests) - See Provider Validation Tests section below
Recommended Testing Strategy:
- Run validation tests first to validate behavior
- Run stress tests to measure performance
- Both should pass with 100% success rate
Add Duroxide with the provider-test feature to your repository's Cargo.toml:
[dependencies]
duroxide = { path = "../duroxide", features = ["provider-test"] }The provider-test feature enables access to the stress testing infrastructure.
Stress tests validate your provider under load, measuring throughput, latency, and success rate with concurrent orchestrations.
Implement the ProviderStressFactory trait to enable stress testing:
use duroxide::provider_stress_tests::parallel_orchestrations::{
ProviderStressFactory, run_parallel_orchestrations_test
};
use duroxide::providers::Provider;
use std::sync::Arc;
struct MyProviderFactory;
#[async_trait::async_trait]
impl ProviderStressFactory for MyProviderFactory {
async fn create_provider(&self) -> Arc<dyn Provider> {
// Create a fresh provider instance for stress testing
Arc::new(MyCustomProvider::new("connection_string").await.unwrap())
}
}
#[tokio::test]
async fn stress_test_my_provider() {
let factory = MyProviderFactory;
let result = run_parallel_orchestrations_test(&factory)
.await
.expect("Stress test failed");
// Validate results
assert!(result.success_rate() > 99.0, "Success rate too low: {:.2}%", result.success_rate());
assert!(result.orch_throughput > 1.0, "Throughput too low: {:.2} orch/sec", result.orch_throughput);
}The test will:
- Create a fresh provider instance
- Launch orchestrations continuously for the configured duration
- Track completed and failed instances
- Calculate throughput and latency metrics
- Return detailed
StressTestResultwith all metrics
Override the default configuration by implementing stress_test_config():
use duroxide::provider_stress_tests::StressTestConfig;
#[async_trait::async_trait]
impl ProviderStressFactory for MyProviderFactory {
async fn create_provider(&self) -> Arc<dyn Provider> {
Arc::new(MyCustomProvider::new().await.unwrap())
}
// Optional: customize the stress test configuration
fn stress_test_config(&self) -> StressTestConfig {
StressTestConfig {
max_concurrent: 10, // Max concurrent instances at once
duration_secs: 30, // How long to run the test
tasks_per_instance: 3, // Activities per orchestration
activity_delay_ms: 50, // Simulated activity work time
orch_concurrency: 1, // Orchestration dispatcher threads
worker_concurrency: 1, // Activity worker threads
wait_timeout_secs: 60, // Timeout for wait_for_orchestration
}
}
}Or pass a custom config directly:
use duroxide::provider_stress_tests::parallel_orchestrations::run_parallel_orchestrations_test_with_config;
let config = StressTestConfig {
max_concurrent: 20,
duration_secs: 10,
tasks_per_instance: 5,
activity_delay_ms: 10,
orch_concurrency: 2,
worker_concurrency: 2,
wait_timeout_secs: 60,
};
let result = run_parallel_orchestrations_test_with_config(&factory, config).await?;Quick Validation (for CI):
StressTestConfig {
max_concurrent: 5,
duration_secs: 2,
tasks_per_instance: 2,
activity_delay_ms: 5,
orch_concurrency: 1,
worker_concurrency: 1,
wait_timeout_secs: 60,
}Performance Baseline:
StressTestConfig {
max_concurrent: 20,
duration_secs: 10,
tasks_per_instance: 5,
activity_delay_ms: 10,
orch_concurrency: 1,
worker_concurrency: 1,
wait_timeout_secs: 60,
}Concurrency Stress Test:
StressTestConfig {
max_concurrent: 20,
duration_secs: 10,
tasks_per_instance: 5,
activity_delay_ms: 10,
orch_concurrency: 2,
worker_concurrency: 2,
wait_timeout_secs: 60,
}For custom test scenarios, use the lower-level run_stress_test function:
use duroxide::provider_stress_tests::{run_stress_test, create_default_activities, StressTestConfig};
use duroxide::{OrchestrationContext, OrchestrationRegistry, ActivityContext};
use std::sync::Arc;
// Custom orchestration
async fn custom_orchestration(ctx: OrchestrationContext, input: String) -> Result<String, String> {
let tasks = vec!["task1", "task2", "task3"];
let mut results = Vec::new();
for task in tasks {
let result = ctx.schedule_activity("ProcessTask", task.to_string())
.await?;
results.push(result);
}
Ok(format!("completed: {}", results.join(", ")))
}
#[tokio::test]
async fn custom_stress_test() {
let provider = Arc::new(MyCustomProvider::new().await.unwrap());
// Use default activities or create custom ones
let activities = create_default_activities(10);
// Register custom orchestration
let orchestrations = OrchestrationRegistry::builder()
.register("CustomWorkflow", custom_orchestration)
.build();
let config = StressTestConfig::default();
let result = run_stress_test(config, provider, activities, orchestrations)
.await
.expect("Stress test failed");
assert!(result.success_rate() > 99.0);
}Duroxide provides two built-in stress test scenarios:
Each orchestration:
- Fans out to N activities in parallel
- Waits for all activities to complete
- Returns a success message
This pattern tests:
- Concurrent activity execution
- Message queue throughput
- Database write concurrency
- Instance-level locking correctness
Use the ProviderStressFactory trait:
use duroxide::provider_stress_tests::parallel_orchestrations::{
ProviderStressFactory, run_parallel_orchestrations_test
};Each orchestration:
- Schedules activities with large payloads (10KB, 50KB, 100KB)
- Creates moderate-length histories (~80-100 events)
- Spawns sub-orchestrations with large inputs/outputs
This pattern tests:
- Memory allocation efficiency
- History storage and retrieval with large events
- Event serialization/deserialization overhead
- Provider memory footprint under load
Use the same ProviderStressFactory trait as parallel orchestrations:
use duroxide::provider_stress_tests::parallel_orchestrations::ProviderStressFactory;
use duroxide::provider_stress_tests::large_payload::{
LargePayloadConfig, run_large_payload_test, run_large_payload_test_with_config
};
// Same factory implementation works for both stress tests!
struct MyProviderFactory;
#[async_trait::async_trait]
impl ProviderStressFactory for MyProviderFactory {
async fn create_provider(&self) -> Arc<dyn Provider> {
Arc::new(MyProvider::new().await.unwrap())
}
}
#[tokio::test]
async fn large_payload_stress_test() {
let factory = MyProviderFactory;
// Run with default config
let result = run_large_payload_test(&factory).await.unwrap();
assert!(result.success_rate() > 99.0);
}
#[tokio::test]
async fn large_payload_stress_test_custom_config() {
let factory = MyProviderFactory;
// Or run with custom config
let config = LargePayloadConfig {
small_payload_kb: 5,
medium_payload_kb: 25,
large_payload_kb: 50,
..Default::default()
};
let result = run_large_payload_test_with_config(&factory, config).await.unwrap();
assert!(result.success_rate() > 99.0);
}To add a new test scenario:
- Define the orchestration:
async fn my_scenario(ctx: OrchestrationContext, input: String) -> Result<String, Box<dyn std::error::Error>> {
// Your test scenario logic
Ok("done".to_string())
}- Register it in the test:
let orchestrations = OrchestrationRegistry::builder()
.register("MyScenario", my_scenario)
.build();- Create activities if needed:
let activities = Arc::new(
ActivityRegistry::builder()
.register("MyActivity", my_activity)
.build()
);- Launch test instances:
for i in 0..num_instances {
let instance_id = format!("test-{}", i);
client.start_orchestration("MyScenario", instance_id, input.clone()).await?;
}The stress test outputs a results table:
=== Comparison Table ===
Provider Config Completed Failed Success % Orch/sec Activity/sec Avg Latency
------------------------------------------------------------------------------------------------------------------------
In-Memory SQLite 1/1 179 0 100.00 4.63 23.13 216.21 ms
In-Memory SQLite 2/2 278 0 100.00 7.09 35.45 141.04 ms
File SQLite 1/1 167 0 100.00 14.75 73.76 67.78 ms
File SQLite 2/2 281 0 100.00 25.98 129.88 38.49 ms
Completed: Number of orchestrations that finished successfully
Failed: Number of orchestrations that encountered errors
Success %: (completed / (completed + failed)) * 100 - MUST be 100%
Orch/sec: Orchestrations completed per second (throughput)
Activity/sec: Activities executed per second
Avg Latency: Mean time from start to completion of an orchestration
✅ Success Rate = 100%: All orchestrations complete without errors
✅ Consistent Throughput: Metrics remain stable across runs
✅ Deterministic: Same input produces same output
✅ Scalable: Higher concurrency increases throughput
❌ Success Rate < 100%: Indicates correctness issues (locks, atomicity, etc.)
❌ Throughput = 0: Provider not committing work
❌ Highly Variable Latency: Lock contention or database issues
❌ Decreasing Throughput: Resource exhaustion or contention
Duroxide includes a comprehensive suite of validation tests that validate provider behavior. These tests verify critical correctness properties like atomicity, locking, error handling, queue semantics, and management capabilities.
To run validation tests against your custom provider:
- Add
duroxidewithprovider-testfeature to your project - Implement the
ProviderFactorytrait - Run individual test functions for each validation test
Add Duroxide with the provider-test feature:
[dependencies]
duroxide = { path = "../duroxide", features = ["provider-test"] }Note: The
provider-testfeature enables both stress tests and validation tests. Enable this single feature to get all provider testing infrastructure.
Run validation tests by calling individual test functions:
use duroxide::providers::Provider;
use duroxide::provider_validations::{
ProviderFactory,
// Atomicity tests
test_atomicity_failure_rollback,
test_multi_operation_atomic_ack,
// Locking tests
test_exclusive_instance_lock,
// Queue tests
test_worker_queue_fifo_ordering,
// Instance creation tests
test_instance_creation_via_metadata,
test_no_instance_creation_on_enqueue,
test_null_version_handling,
test_sub_orchestration_instance_creation,
// Cancellation support tests (execution state)
test_fetch_returns_running_state_for_active_orchestration,
test_fetch_returns_terminal_state_when_orchestration_completed,
test_renew_returns_running_when_orchestration_active,
test_ack_work_item_none_deletes_without_enqueue,
// Lock-stealing activity cancellation tests
test_cancelled_activities_deleted_from_worker_queue,
test_ack_work_item_fails_when_entry_deleted,
test_renew_fails_when_entry_deleted,
test_cancelling_nonexistent_activities_is_idempotent,
test_batch_cancellation_deletes_multiple_activities,
test_same_activity_in_worker_items_and_cancelled_is_noop,
test_orphan_activity_after_instance_force_deletion,
// ... import other tests as needed
};
use std::sync::Arc;
use std::time::Duration;
const TEST_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
struct MyProviderFactory;
#[async_trait::async_trait]
impl ProviderFactory for MyProviderFactory {
async fn create_provider(&self) -> Arc<dyn Provider> {
// Create a fresh provider instance for each test
Arc::new(MyCustomProvider::new().await?)
}
fn lock_timeout(&self) -> Duration {
// Return the lock timeout configured in your provider
// This must match the timeout used when creating the provider
TEST_LOCK_TIMEOUT
}
// Optional: Override these for deserialization contract tests (Category I).
// Required by tests that inject corrupted history and verify attempt counting.
async fn corrupt_instance_history(&self, instance: &str) {
// Replace stored history event data with undeserializable content.
// The exact mechanism is provider-specific (e.g., raw SQL UPDATE for
// SQL-backed providers, direct key mutation for KV stores).
my_provider_raw_update_history(instance, "NOT_VALID_JSON{{{").await;
}
async fn get_max_attempt_count(&self, instance: &str) -> u32 {
// Return the max attempt_count from the orchestrator queue for this instance.
my_provider_query_max_attempt_count(instance).await
}
}
#[tokio::test]
async fn test_my_provider_atomicity_failure_rollback() {
let factory = MyProviderFactory;
test_atomicity_failure_rollback(&factory).await;
}
#[tokio::test]
async fn test_my_provider_exclusive_instance_lock() {
let factory = MyProviderFactory;
test_exclusive_instance_lock(&factory).await;
}
#[tokio::test]
async fn test_my_provider_worker_queue_fifo_ordering() {
let factory = MyProviderFactory;
test_worker_queue_fifo_ordering(&factory).await;
}Important: Run each test function individually. This provides better test isolation, clearer failure reporting, and allows parallel execution in CI/CD pipelines. When a test fails, you'll know exactly which behavior is broken.
Note: All provider methods return Result<..., ProviderError> instead of Result<..., String>. Tests that check error messages should access the message field: err.message.contains(...) instead of err.contains(...).
The validation test suite includes 169 individual test functions organized into 18 categories:
-
Atomicity Tests (4 tests)
test_atomicity_failure_rollback- All-or-nothing commit semantics, rollback on failuretest_multi_operation_atomic_ack- Complex ack succeeds atomicallytest_lock_released_only_on_successful_ack- Lock only released on successtest_concurrent_ack_prevention- Only one ack succeeds with same token
-
Error Handling Tests (5 tests)
test_invalid_lock_token_on_ack- Invalid lock token rejectiontest_duplicate_event_id_rejection- Duplicate event ID detectiontest_missing_instance_metadata- Missing instances handled gracefullytest_corrupted_serialization_data- Corrupted data handled gracefullytest_lock_expiration_during_ack- Expired locks are rejected
-
Instance Locking Tests (11 tests)
test_exclusive_instance_lock- Exclusive access to instancestest_lock_token_uniqueness- Each fetch generates unique lock tokentest_invalid_lock_token_rejection- Invalid tokens rejected for ack/abandontest_concurrent_instance_fetching- Concurrent fetches don't duplicate instancestest_completions_arriving_during_lock_blocked- New messages blocked during locktest_cross_instance_lock_isolation- Locks don't block other instancestest_message_tagging_during_lock- Only fetched messages deleted on acktest_ack_only_affects_locked_messages- Ack only affects locked messagestest_multi_threaded_lock_contention- Locks prevent concurrent processing (multi-threaded)test_multi_threaded_no_duplicate_processing- No duplicate processing (multi-threaded)test_multi_threaded_lock_expiration_recovery- Lock expiration recovery (multi-threaded)
-
Lock Expiration Tests (13 tests)
test_lock_expires_after_timeout- Automatic lock release after timeouttest_abandon_releases_lock_immediately- Abandon releases lock immediatelytest_lock_renewal_on_ack- Successful ack releases lock immediatelytest_concurrent_lock_attempts_respect_expiration- Concurrent attempts respect expirationtest_worker_lock_renewal_success- Worker lock can be renewed with valid tokentest_worker_lock_renewal_invalid_token- Renewal fails with invalid tokentest_worker_lock_renewal_after_expiration- Renewal fails after lock expirestest_worker_lock_renewal_extends_timeout- Renewal properly extends lock timeouttest_worker_lock_renewal_after_ack- Renewal fails after item has been ackedtest_abandon_work_item_releases_lock- abandon_work_item releases lock immediatelytest_abandon_work_item_with_delay- abandon_work_item with delay defers visibilitytest_worker_ack_fails_after_lock_expiry- Worker ack rejected after lock expirestest_orchestration_lock_renewal_after_expiration- Orchestration lock renewal fails after expiry
-
Multi-Execution Tests (5 tests)
test_execution_isolation- Each execution has separate historytest_latest_execution_detection- read() returns latest executiontest_execution_id_sequencing- Execution IDs increment correctlytest_continue_as_new_creates_new_execution- ContinueAsNew creates new executiontest_execution_history_persistence- All executions' history persists independently
-
Queue Semantics Tests (8 tests)
test_worker_queue_fifo_ordering- Worker items dequeued in FIFO ordertest_worker_peek_lock_semantics- Dequeue doesn't remove item until acktest_worker_ack_atomicity- Ack_worker atomically removes item and enqueues completiontest_timer_delayed_visibility- TimerFired items only dequeued when visibletest_lost_lock_token_handling- Locked items become available after expirationtest_worker_delayed_visibility_skips_future_items- Future-visible worker items skippedtest_worker_item_immediate_visibility- Worker items immediately visible by defaulttest_orphan_queue_messages_dropped- QueueMessage for non-existent instance is dropped; QueueMessage for existing instance is kept
-
Instance Creation Tests (4 tests)
test_instance_creation_via_metadata- Instances created via ack metadata, not on enqueuetest_no_instance_creation_on_enqueue- No instance created when enqueueing work itemstest_null_version_handling- NULL version handled correctlytest_sub_orchestration_instance_creation- Sub-orchestrations follow same pattern
-
Management Capability Tests (10 tests)
test_list_instances- Instance listing returns all instance IDstest_list_instances_by_status- Instance filtering by status works correctlytest_list_executions- Execution queries return all execution IDstest_get_instance_info- Instance metadata retrievaltest_get_execution_info- Execution metadata retrievaltest_get_system_metrics- System metrics are accuratetest_get_queue_depths- Queue depth reporting is correcttest_get_instance_stats_nonexistent- Returns None for unknown instancetest_get_instance_stats_history- History event count and byte sizetest_get_instance_stats_kv- KV key count and value byte size
-
Long Polling Tests (5 tests)
test_short_poll_returns_immediately- Short-poll providers return immediately when queue is emptytest_short_poll_work_item_returns_immediately- Worker queue short-poll returns immediatelytest_fetch_respects_timeout_upper_bound- Fetch returns within poll_timeout even if blockingtest_long_poll_waits_for_timeout- Long-poll orchestration fetch waits for durationtest_long_poll_work_item_waits_for_timeout- Long-poll worker fetch waits for duration
-
Poison Message Tests (9 tests)
orchestration_attempt_count_starts_at_one- First fetch has attempt_count = 1orchestration_attempt_count_increments_on_refetch- Attempt count increments on abandon/refetchworker_attempt_count_starts_at_one- Worker items start with attempt_count = 1worker_attempt_count_increments_on_lock_expiry- Attempt count increments when lock expiresattempt_count_is_per_message- Each message has independent attempt countabandon_work_item_ignore_attempt_decrements- ignore_attempt=true decrements countabandon_orchestration_item_ignore_attempt_decrements- ignore_attempt=true decrements countignore_attempt_never_goes_negative- Attempt count never goes below 0max_attempt_count_across_message_batch- MAX attempt_count returned for batched messages
-
Cancellation Support Tests (16 tests)
test_fetch_returns_running_state_for_active_orchestration- Fetching activity for running orchestration proceeds normallytest_fetch_returns_terminal_state_when_orchestration_completed- Fetching activity for completed orchestrationtest_fetch_returns_terminal_state_when_orchestration_failed- Fetching activity for failed orchestrationtest_fetch_returns_terminal_state_when_orchestration_continued_as_new- Fetching activity for continued-as-new orchestrationtest_fetch_returns_missing_state_when_instance_deleted- Fetching activity when instance deletedtest_renew_returns_running_when_orchestration_active- Lock renewal succeeds for active orchestrationtest_renew_returns_terminal_when_orchestration_completed- Lock renewal for completed orchestrationtest_renew_returns_missing_when_instance_deleted- Lock renewal when instance deletedtest_ack_work_item_none_deletes_without_enqueue- ack_work_item(None) deletes item without enqueueing completion- Lock-Stealing Tests (6 tests):
test_cancelled_activities_deleted_from_worker_queue-cancelled_activitiesinack_orchestration_itemdeletes matching worker entriestest_ack_work_item_fails_when_entry_deleted-ack_work_itemreturns permanent error when entry was deleted (lock stolen)test_renew_fails_when_entry_deleted-renew_work_item_lockfails when entry was deleted (lock stolen)test_cancelling_nonexistent_activities_is_idempotent- Cancelling activities that don't exist is silently ignoredtest_batch_cancellation_deletes_multiple_activities- Multiple activities can be cancelled in a singleack_orchestration_itemtest_same_activity_in_worker_items_and_cancelled_is_noop- Activity in bothworker_itemsandcancelled_activitiesresults in no-op (INSERT then DELETE)test_orphan_activity_after_instance_force_deletion- Force-deleting an instance while activities are in the worker queue is handled gracefully
-
Deletion Tests (13 tests) -
duroxide::provider_validations::deletiontest_delete_terminal_instances- Delete completed/failed instancestest_delete_running_rejected_force_succeeds- Running instances rejected without forcetest_delete_nonexistent_instance- Deleting non-existent instance is idempotenttest_cascade_delete_hierarchy- Deleting parent cascades to all descendantstest_delete_cleans_queues_and_locks- Deletion removes all queue entries and lockstest_delete_instances_atomic- Atomic batch deletion of multiple instancestest_delete_instances_atomic_force- Force delete multiple running instancestest_delete_instances_atomic_orphan_detection- Detect orphan children after deletiontest_delete_get_instance_tree- Build instance tree for cascade deletiontest_delete_get_parent_id- Get parent instance ID for sub-orchestrationstest_list_children- List direct children of an instancetest_force_delete_prevents_ack_recreation- Force delete prevents ack from recreatingtest_stale_activity_after_delete_recreate- Stale activity completion after delete+recreate doesn't corrupt new instance
-
Pruning Tests (4 tests) -
duroxide::provider_validations::prunetest_prune_options_combinations- Verify keep_last and completed_before work togethertest_prune_safety- Current execution never pruned, including terminal instancestest_prune_bulk- Bulk prune across multiple instancestest_prune_bulk_includes_running_instances- Prune includes Running instances with CAN history (not just terminal)
-
Bulk Deletion Tests (4 tests) -
duroxide::provider_validations::bulk_deletiontest_delete_instance_bulk_completed_before_filter- Filter by completion timestamptest_delete_instance_bulk_filter_combinations- Combined filter optionstest_delete_instance_bulk_safety_and_limits- Respect limit and safety constraintstest_delete_instance_bulk_cascades_to_children- Bulk delete cascades to sub-orchestrations
-
Capability Filtering Tests (20 tests) -
duroxide::provider_validations::capability_filteringtest_fetch_with_filter_none_returns_any_item- Legacy behavior: filter=None returns any itemtest_fetch_with_compatible_filter_returns_item- Compatible filter returns matching itemtest_fetch_with_incompatible_filter_skips_item- Incompatible filter returns Ok(None)test_fetch_filter_skips_incompatible_selects_compatible- Mixed versions: only compatible returnedtest_fetch_filter_does_not_lock_skipped_instances- Skipped items not locked (fetchable by compatible runtime)test_fetch_filter_null_pinned_version_always_compatible- NULL pinned version = always compatible (pre-migration data)test_fetch_filter_boundary_versions- Boundary correctness at range edges (inclusive min/max)test_pinned_version_stored_via_ack_metadata- Pinned version stored from ExecutionMetadata on acktest_pinned_version_immutable_across_ack_cycles- Pinned version persists across ack cyclestest_continue_as_new_execution_gets_own_pinned_version- ContinueAsNew execution gets independent pinned version (also covers non-inheritance from previous execution)test_filter_with_empty_supported_versions_returns_nothing- Empty filter = supports nothing → Ok(None)test_concurrent_filtered_fetch_no_double_lock- Filtering doesn't break instance-lock exclusivitytest_ack_stores_pinned_version_via_metadata_update- Backfill path: ack writes pinned version on existing executiontest_provider_updates_pinned_version_when_told- Provider overwrites pinned version unconditionally (dumb storage)test_fetch_single_range_only_uses_first_range- Phase 1 limitation: multi-range only uses first rangetest_fetch_corrupted_history_filtered_vs_unfiltered- Filter excludes corrupted item = no error; unfiltered =Ok(Some(...))withhistory_error(requirescorrupt_instance_history)test_fetch_deserialization_error_increments_attempt_count- Attempt count increments across deserialization error cycles, returnshistory_error(requirescorrupt_instance_history,get_max_attempt_count)test_fetch_deserialization_error_eventually_reaches_poison- Corrupted history reaches max attempts viahistory_error+ poison path (requirescorrupt_instance_history,get_max_attempt_count)test_fetch_filter_applied_before_history_deserialization- Filter applied before history loading (corrupted + excluded = Ok(None)) (requirescorrupt_instance_history)test_ack_appends_event_to_corrupted_history- Ack with new event succeeds despite corrupted history rows (append-only contract) (requirescorrupt_instance_history)
-
Session Routing Tests (33 tests) -
duroxide::provider_validations::sessions- Basic routing:
test_non_session_items_fetchable_by_any_worker,test_session_item_claimable_when_no_session,test_session_affinity_same_worker,test_session_affinity_blocks_other_worker,test_different_sessions_different_workers,test_mixed_session_and_non_session_items - Lock/expiration:
test_session_claimable_after_lock_expiry,test_none_session_skips_session_items,test_some_session_returns_all_items,test_session_lock_expires_new_owner_gets_redelivery,test_session_lock_expires_same_worker_reacquires - Renewal:
test_renew_session_lock_active,test_renew_session_lock_skips_idle,test_renew_session_lock_no_sessions - Cleanup:
test_cleanup_removes_expired_no_items,test_cleanup_keeps_sessions_with_pending_items,test_cleanup_keeps_active_sessions - Piggybacking:
test_ack_updates_session_last_activity,test_renew_work_item_updates_session_last_activity - Edge cases:
test_session_items_processed_in_order,test_non_session_items_returned_with_session_config - Process-level identity:
test_shared_worker_id_any_caller_can_fetch_owned_session - Race conditions:
test_concurrent_session_claim_only_one_wins,test_session_takeover_after_lock_expiry,test_cleanup_then_new_item_recreates_session,test_abandoned_session_item_retryable,test_abandoned_session_item_ignore_attempt - Cross-concern locks:
test_renew_session_lock_after_expiry_returns_zero,test_original_worker_reclaims_expired_session,test_activity_lock_expires_session_lock_valid_same_worker_refetches,test_both_locks_expire_different_worker_claims,test_session_lock_expires_activity_lock_valid_ack_succeeds,test_session_lock_renewal_extends_past_original_timeout
- Basic routing:
-
Tag Filtering Tests (10 tests) -
duroxide::provider_validations::tag_filteringtest_default_only_fetches_untagged- DefaultOnly filter returns only untagged itemstest_tags_fetches_only_matching- Tags filter returns only matching tagged itemstest_default_and_fetches_untagged_and_matching- DefaultAnd filter returns untagged + matchingtest_none_filter_returns_nothing- None filter never returns itemstest_any_filter_fetches_everything- Any filter returns all items regardless of tagtest_multi_tag_filter- Multiple tags in filter work correctlytest_tag_round_trip_preservation- Tag value preserved through enqueue → fetch → dequeuetest_tag_survives_abandon_and_refetch- Tag preserved after lock → abandon → refetch cycletest_multi_runtime_tag_isolation- Concurrent runtimes with different TagFilters (mutual exclusion, partial overlap, full overlap)test_tag_preserved_through_ack_orchestration_item- Tags on worker items survive the ack_orchestration_item path (enqueue via orchestrator ack → fetch with tag filter)
-
KV Store Tests (26 tests) -
duroxide::provider_validations::kv_storetest_kv_set_and_get- Set a key and retrieve ittest_kv_overwrite- Overwriting an existing key returns the new valuetest_kv_clear_single- Clear a single keytest_kv_clear_all- Clear all keys for an instancetest_kv_get_nonexistent- Get missing key returns Nonetest_kv_snapshot_in_fetch- Fetch returns KV snapshot (withKvEntry { value, last_updated_at_ms })test_kv_snapshot_after_clear_single- Snapshot omits cleared keytest_kv_snapshot_after_clear_all- Snapshot empty after clear-alltest_kv_execution_id_tracking- Last-writer-wins for execution_id trackingtest_kv_cross_execution_overwrite- Value updated across executionstest_kv_cross_execution_remove_readd- Clear in exec 2, re-add in exec 3test_kv_prune_preserves_overwritten- Pruned execution's key survives if overwrittentest_kv_prune_preserves_all_keys- KV entries are instance-scoped and survive execution pruningtest_kv_instance_isolation- Same key name, different instances, different valuestest_kv_delete_instance_cascades- Deleting instance removes all KVtest_kv_clear_nonexistent_key- Clearing missing key is idempotenttest_kv_get_unknown_instance- get_kv_value for nonexistent instance returns Nonetest_kv_set_after_clear- Clear-all then set in same acktest_kv_empty_value- Empty string value is valid (not None)test_kv_large_value- 16KB value stored and retrieved correctlytest_kv_special_chars_in_key- Unicode, spaces, dots, slashes in key namestest_kv_snapshot_empty- Fresh instance has empty KV snapshottest_kv_snapshot_cross_execution- Keys from multiple executions in snapshottest_kv_prune_current_execution_protected- Single execution's KV survives prunetest_kv_delete_instance_with_children- Parent deletion cascades child KVtest_kv_clear_isolation- Clearing one instance doesn't affect another
Each validation test should be run individually. This provides:
- Better isolation: Failures are clearly attributed to specific behaviors
- Clearer reporting: Test output shows exactly which test failed
- Parallel execution: CI/CD can run tests in parallel
- Focused debugging: Fix one behavior at a time without running unrelated tests
- Easier debugging: When a test fails, you know exactly which behavior is broken
use duroxide::provider_validations::{
ProviderFactory,
test_atomicity_failure_rollback,
test_exclusive_instance_lock,
test_worker_queue_fifo_ordering,
};
#[tokio::test]
async fn test_my_provider_atomicity_failure_rollback() {
let factory = MyProviderFactory;
test_atomicity_failure_rollback(&factory).await;
}
#[tokio::test]
async fn test_my_provider_exclusive_instance_lock() {
let factory = MyProviderFactory;
test_exclusive_instance_lock(&factory).await;
}
#[tokio::test]
async fn test_my_provider_worker_queue_fifo_ordering() {
let factory = MyProviderFactory;
test_worker_queue_fifo_ordering(&factory).await;
}Available test functions: See duroxide::provider_validations module documentation for the complete list of all test functions, or refer to tests/sqlite_provider_validations.rs for a complete example using all tests.
Your factory should create fresh, isolated provider instances for each test. Importantly, you must implement lock_timeout() to return the lock timeout used in validation tests - this ensures validation tests wait for the correct duration when testing lock expiration behavior. Note that in production, lock timeouts are configured via RuntimeOptions (orchestrator_lock_timeout and worker_lock_timeout), not provider options.
use duroxide::providers::Provider;
use duroxide::provider_validations::ProviderFactory;
use std::sync::Arc;
use std::time::Duration;
const TEST_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
struct MyProviderFactory {
// Keep temp directory alive
_temp_dir: TempDir,
}
#[async_trait::async_trait]
impl ProviderFactory for MyProviderFactory {
async fn create_provider(&self) -> Arc<dyn Provider> {
// Create a new provider instance with unique path
// Configure the provider with the same lock timeout
let db_path = self._temp_dir.path().join(format!("test_{}.db",
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()));
std::fs::File::create(&db_path).unwrap();
let options = MyProviderOptions {
lock_timeout: TEST_LOCK_TIMEOUT,
};
Arc::new(MyProvider::new(&format!("sqlite:{}", db_path.display()), Some(options)).await?)
}
fn lock_timeout(&self) -> Duration {
// CRITICAL: This must match the lock_timeout configured in create_provider()
// Validation tests use this value to determine sleep durations when waiting
// for lock expiration. If this doesn't match your provider's timeout,
// tests will fail with timing issues.
TEST_LOCK_TIMEOUT
}
}Important: The lock_timeout() value should match the timeout you pass to fetch_orchestration_item() and fetch_work_item() in your tests. In production, lock timeouts are configured via RuntimeOptions (orchestrator_lock_timeout for orchestrations, worker_lock_timeout for activities) and passed to these methods by the runtime dispatchers.
Add validation tests to your CI pipeline:
# .github/workflows/provider-tests.yml
name: Provider Validation Tests
on: [pull_request]
jobs:
validation-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Run validation tests
run: |
cargo test --features provider-testThe tests will run individually, providing granular failure reporting. You can also run specific tests:
# Run a specific test
cargo test --features provider-test test_my_provider_atomicity_failure_rollback
# Run all atomicity tests
cargo test --features provider-test test_my_provider_atomicity
# Run tests in parallel
cargo test --features provider-test -- --test-threads=4Compare multiple providers or configurations side-by-side:
use duroxide::provider_stress_tests::{
parallel_orchestrations::run_parallel_orchestrations_test_with_config,
print_comparison_table, StressTestConfig
};
#[tokio::test]
async fn compare_providers() {
let mut results = Vec::new();
// Test different concurrency settings
for (orch, worker) in [(1, 1), (2, 2)] {
let config = StressTestConfig {
orch_concurrency: orch,
worker_concurrency: worker,
duration_secs: 5,
..Default::default()
};
let result = run_parallel_orchestrations_test_with_config(
&MyProviderFactory,
config
).await.unwrap();
results.push((
"MyProvider".to_string(),
format!("{}/{}", orch, worker),
result,
));
}
// Print comparison table
print_comparison_table(&results);
}Output:
Provider Config Completed Failed Infra Config App Success % Orch/sec Activity/sec Avg Latency
------------------------------------------------------------------------------------------------------------------------------------------------------
MyProvider 1/1 57 0 0 0 0 100.00 11.40 57.00 87.72 ms
MyProvider 2/2 81 0 0 0 0 100.00 16.20 81.00 61.73 ms
Possible Causes:
- Non-atomic commits: Ensure
ack_orchestration_itemuses a single transaction - Lock expiration: Provider may be releasing locks prematurely
- Queue semantics: Messages lost or duplicated
- Database errors: Deadlocks, connection failures, or constraint violations
Debug Steps:
- Enable verbose logging:
RUST_LOG=debug cargo run --release - Check provider logs for errors
- Verify transaction boundaries
- Test with lower concurrency first
Possible Causes:
- Work not committed: Provider not writing to queues correctly
- Lock not released: Messages stay locked indefinitely
- Queue not polling:
fetch_orchestration_itemnot returning work
Debug Steps:
- Manually inspect database/queue contents
- Verify
fetch_orchestration_itemreturns items - Check
ack_orchestration_itemcompletes successfully - Test with a simple unit test first
Possible Causes:
- Lock contention: Too many workers competing for same locks
- Database bottleneck: Insufficient connection pool or slow queries
- Missing indexes: Full table scans on large tables
Debug Steps:
- Reduce concurrency to isolate contention
- Profile database queries
- Check for missing indexes
- Verify connection pool size
Possible Causes:
- History not cleaned: Old executions accumulate in history table
- Connection leak: Connections not properly pooled or closed
- Large payloads: Work items contain excessive data
Debug Steps:
- Monitor memory usage during test
- Check history table size
- Verify connection pool limits
- Inspect work item sizes
Duroxide provides a shell script to run stress tests with resource monitoring.
Script: run-stress-tests.sh
./run-stress-tests.sh # Run all tests for 10s (default)
./run-stress-tests.sh 60 # Run all tests for 60 seconds
./run-stress-tests.sh --parallel-only # Run only parallel orchestrations
./run-stress-tests.sh --large-payload # Run only large payload test
./run-stress-tests.sh --help # Show all options- Create a stress test binary similar to
sqlite-stress/src/bin/sqlite-stress.rs - Implement
ProviderStressFactoryfor your provider (works for both parallel orchestrations and large payload tests) - Run with the same configurations to compare performance
Add stress tests to your CI pipeline:
# .github/workflows/stress-tests.yml
name: Stress Tests
on:
pull_request:
branches: [main]
schedule:
- cron: '0 0 * * 0' # Weekly
jobs:
stress-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Run stress tests
run: |
./run-stress-tests.sh 10Use these benchmarks to validate your provider:
- ✅ Success Rate: 100% under all configurations
- ✅ Baseline Throughput: ≥ 10 orch/sec (file-based provider, 1/1 config)
- ✅ Latency: Average < 200ms per orchestration
- ✅ Scalability: 2/2 config increases throughput by ≥ 30%
- Throughput: ≥ 50 orch/sec (file-based provider, 2/2 config)
- Latency: Average < 100ms per orchestration
- Scalability: Linear throughput increase with concurrency
For complete control, build your own test runner:
use duroxide::{Runtime, RuntimeOptions, Client};
use duroxide::providers::Provider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let provider = Arc::new(MyCustomProvider::new().await?);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder().build();
let runtime = Runtime::start_with_options(
provider.clone(),
activities,
orchestrations,
RuntimeOptions {
orchestration_concurrency: 2,
worker_concurrency: 2,
..Default::default()
},
).await;
let client = Client::new(provider.clone()).await?;
// Launch orchestrations
for i in 0..100 {
client.start_orchestration("TestOrch", format!("instance-{}", i), "input".to_string()).await?;
}
// Wait for completion
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
runtime.shutdown().await;
Ok(())
}To track results over time (like Duroxide's own tracking):
# Run with tracking enabled
./sqlite-stress/track-results.shThis generates stress-test-results.md with:
- Commit hash and timestamp
- Changes since last test
- Performance metrics
- Historical trends
- Test Implementation:
src/provider_validation/(individual test modules) - Test API:
src/provider_validations.rs(test function exports) - Example Usage:
tests/sqlite_provider_validations.rs(complete example with all 202 tests) - Test Specification: See individual test function documentation
- Provider Guide:
docs/provider-implementation-guide.md - Built-in Providers:
src/providers/sqlite.rs
With this guide, you can thoroughly test your custom Duroxide provider! 🎉