@@ -137,10 +137,11 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
137137    //  SAFETY: always lock in definition order!
138138    const  std::size_t  max_kv_count = 20 ;
139139
140+     std::shared_mutex connection_m;
141+     WtConnection connection;
142+ 
140143    std::shared_mutex existing_kvs_m;
141144    std::set<uint32_t > existing_kvs;
142-     
143-     WtConnection connection;
144145
145146    std::shared_mutex oldest_timestamp_m;
146147    uint64_t  oldest_timestamp = 0 ;
@@ -192,9 +193,9 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
192193            }
193194        }
194195
195-         uint64_t  find_timestamp_at (std::initializer_list<uint64_t > constraints ) {
196-             auto  lower_bound = std::max (constraints );
197-             uint64_t  upper_bound = std::numeric_limits< int32_t >:: max ( );
196+         uint64_t  find_timestamp_at (std::initializer_list<uint64_t > lower_constraints, std::initializer_list< uint64_t > upper_constraints = {std::numeric_limits< int32_t >:: max ()} ) {
197+             auto  lower_bound = std::max (lower_constraints );
198+             uint64_t  upper_bound = std::min  (upper_constraints );
198199            if  (lower_bound > upper_bound) {
199200                throw  omnilink::UnsupportedException{};
200201            }
@@ -252,6 +253,7 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
252253                throw  omnilink::UnsupportedException{};
253254            }
254255            int  currTransactionId = transactionId;
256+             std::shared_lock connection_lck{workload_context.connection_m };
255257            std::shared_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
256258            std::shared_lock active_read_timestamps_lck{workload_context.active_read_timestamps_m };
257259            auto  commit_timestamp = find_timestamp_at ({
@@ -280,6 +282,7 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
280282            if  (transactionId == -1  || transactionIsPrepared || transactionHasFailed) {
281283                throw  omnilink::UnsupportedException{};
282284            }
285+             std::shared_lock connection_lck{workload_context.connection_m };
283286            std::shared_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
284287            std::shared_lock active_read_timestamps_lck{workload_context.active_read_timestamps_m };
285288            auto  commit_timestamp = find_timestamp_at ({
@@ -305,6 +308,7 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
305308            if  (transactionId == -1  || transactionIsPrepared || transactionHasFailed) {
306309                throw  omnilink::UnsupportedException{};
307310            }
311+             std::shared_lock connection_lck{workload_context.connection_m };
308312            std::shared_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
309313            std::shared_lock active_read_timestamps_lck{workload_context.active_read_timestamps_m };
310314            auto  prepare_timestamp = find_timestamp_at ({
@@ -329,21 +333,60 @@ struct WorkloadContext : public omnilink::WorkloadContext<WorkloadContext, Stora
329333        }
330334
331335        Storage::RollbackToStable perform_operation (omnilink::Tag<Storage::RollbackToStable>) {
332-             throw  omnilink::UnsupportedException{};
336+             std::unique_lock connection_lck{workload_context.connection_m };
337+             std::unique_lock existing_kvs_lck{workload_context.existing_kvs_m };
338+             std::unique_lock oldest_timestamp_lck{workload_context.oldest_timestamp_m };
339+             std::unique_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
340+             std::unique_lock active_read_timestamps_lck{workload_context.active_read_timestamps_m };
341+             if (max_active_read_timestamp_nolock () != 0 ) {
342+                 throw  omnilink::UnsupportedException{};
343+             }
344+             if (workload_context.stable_timestamp  == 0 ) {
345+                 throw  omnilink::UnsupportedException{};
346+             }
347+             int  ret = workload_context.connection ->rollback_to_stable (workload_context.connection , nullptr );
348+             return  Storage::RollbackToStable{
349+                 .n  = " n" 
350+                 ._did_abort  = ret != 0 ,
351+                 ._meta  = std::map<std::string, int >{{" result_code" 
352+             };
333353        }
334354
335355        Storage::SetOldestTimestamp perform_operation (omnilink::Tag<Storage::SetOldestTimestamp>) {
336-             throw  omnilink::UnsupportedException{};
356+             std::unique_lock connection_lck{workload_context.connection_m };
357+             std::unique_lock existing_kvs_lck{workload_context.existing_kvs_m };
358+             std::unique_lock oldest_timestamp_lck{workload_context.oldest_timestamp_m };
359+             std::unique_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
360+             uint64_t  ts = find_timestamp_at ({workload_context.oldest_timestamp }, {workload_context.stable_timestamp });
361+             int  ret = workload_context.connection ->set_timestamp (workload_context.connection , config{{" oldest_timestamp" hex64 (ts)}});
362+             return  Storage::SetOldestTimestamp{
363+                 .n  = " n" 
364+                 .ts  = ts,
365+                 ._did_abort  = ret != 0 ,
366+                 ._meta  = std::map<std::string, int >{{" result_code" 
367+             };
337368        }
338369
339370        Storage::SetStableTimestamp perform_operation (omnilink::Tag<Storage::SetStableTimestamp>) {
340-             throw  omnilink::UnsupportedException{};
371+             std::unique_lock connection_lck{workload_context.connection_m };
372+             std::unique_lock existing_kvs_lck{workload_context.existing_kvs_m };
373+             std::unique_lock oldest_timestamp_lck{workload_context.oldest_timestamp_m };
374+             std::unique_lock stable_timestamp_lck{workload_context.stable_timestamp_m };
375+             uint64_t  ts = find_timestamp_at ({workload_context.stable_timestamp }, {});
376+             int  ret = workload_context.connection ->set_timestamp (workload_context.connection , config{{" stable_timestamp" hex64 (ts)}});
377+             return  Storage::SetStableTimestamp{
378+                 .n  = " n" 
379+                 .ts  = ts,
380+                 ._did_abort  = ret != 0 ,
381+                 ._meta  = std::map<std::string, int >{{" result_code" 
382+             };
341383        }
342384
343385        Storage::StartTransaction perform_operation (omnilink::Tag<Storage::StartTransaction>) {
344386            if  (transactionId != -1 ) {
345387                throw  omnilink::UnsupportedException{};
346388            }
389+             std::shared_lock connection_lck{workload_context.connection_m };
347390            std::shared_lock oldest_timestamp_lck{workload_context.oldest_timestamp_m };
348391            auto  read_timestamp = find_timestamp_at ({workload_context.oldest_timestamp });
349392            {
0 commit comments