91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i < pipelines.size(); i++ )
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
146 policy.reset(
new AllPolicy() );
147 return std::move( *
this );
158 policy.reset(
new AnyPolicy( pipelines.size() ) );
159 return std::move( *
this );
170 policy.reset(
new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *
this );
183 policy.reset(
new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *
this );
201 if( status.
IsOK() )
return false;
206 XRootDStatus Result()
220 struct AnyPolicy :
public PolicyExecutor
222 AnyPolicy(
size_t size) : cnt( size )
226 bool Examine(
const XrdCl::XRootDStatus &status )
231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
233 if( status.
IsOK() )
return true;
235 if( nb == 1 )
return true;
240 XRootDStatus Result()
246 std::atomic<size_t> cnt;
256 struct SomePolicy : PolicyExecutor
258 SomePolicy(
size_t size,
size_t threshold ) : failed( 0 ), succeeded( 0 ),
259 threshold( threshold ), size( size )
263 bool Examine(
const XrdCl::XRootDStatus &status )
269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270 if( s + 1 == threshold )
return true;
274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
276 if( f == size - threshold )
return true;
281 XRootDStatus Result()
287 std::atomic<size_t> failed;
288 std::atomic<size_t> succeeded;
289 const size_t threshold;
301 struct AtLeastPolicy : PolicyExecutor
303 AtLeastPolicy(
size_t size,
size_t threshold ) : pending_cnt( size ),
305 failed_threshold( size - threshold )
314 bool Examine(
const XrdCl::XRootDStatus &status )
316 if (!status.
IsOK()) {
317 if (failed_cnt.fetch_add(1, std::memory_order_relaxed) == failed_threshold) {
323 return pending_cnt.fetch_sub(1, std::memory_order_relaxed) == 1;
326 XRootDStatus Result()
332 std::atomic<size_t> pending_cnt;
333 std::atomic<size_t> failed_cnt;
334 const size_t failed_threshold;
343 barrier_t() : on( true ) { }
347 std::unique_lock<std::mutex> lck( mtx );
348 if( on ) cv.wait( lck );
353 std::unique_lock<std::mutex> lck( mtx );
359 std::condition_variable cv;
377 Ctx(
PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
387 Handle( XRootDStatus() );
396 inline void Examine(
const XRootDStatus &st )
398 if( policy->Examine( st ) )
399 Handle( policy->Result() );
408 inline void Handle(
const XRootDStatus &st )
410 PipelineHandler* hdlr = handler.exchange(
nullptr, std::memory_order_relaxed );
414 hdlr->HandleResponse(
new XRootDStatus( st ),
nullptr );
421 std::atomic<PipelineHandler*> handler;
426 std::unique_ptr<PolicyExecutor> policy;
438 struct PipelineEnd :
public Job
443 PipelineEnd( std::shared_ptr<Ctx> &ctx,
444 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
458 std::shared_ptr<Ctx> ctx;
459 XrdCl::XRootDStatus st;
466 void Schedule( std::shared_ptr<Ctx> &ctx,
const XrdCl::XRootDStatus &st)
469 PipelineEnd *end =
new PipelineEnd( ctx, st );
483 if( !policy ) policy.reset(
new AllPolicy() );
485 std::shared_ptr<Ctx> ctx =
486 std::make_shared<Ctx>(
handler, policy.release() );
489 pipelineTimeout : this->
timeout;
491 for(
size_t i = 0; i < pipelines.size(); ++i )
493 if( !pipelines[i] )
continue;
495 [ctx](
const XRootDStatus &st )
mutable { Schedule( ctx, st ); } );
499 return XRootDStatus();
502 std::vector<Pipeline> pipelines;
503 std::unique_ptr<PolicyExecutor> policy;
JobManager * GetJobManager()
Get the job manager object user by the post master.