26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
50 class PipelineHandler;
59 template<
bool HasHndl>
87 if( !op.valid )
throw std::invalid_argument(
"Cannot construct "
88 "Operation from an invalid Operation!" );
128 std::promise<XRootDStatus> prms,
221 std::promise<XRootDStatus> prms,
254 std::unique_ptr<ResponseHandler> responseHandler;
259 std::unique_ptr<Operation<true>> currentOperation;
264 std::unique_ptr<Operation<true>> nextOperation;
274 std::promise<XRootDStatus> prms;
280 std::function<void(
const XRootDStatus&)>
final;
293 friend std::future<XRootDStatus>
Async(
Pipeline, uint16_t );
309 operation( op->
Move() )
317 operation( op.
Move() )
325 operation( op.
Move() )
330 operation( op->ToHandled() )
338 operation( op.ToHandled() )
346 operation( op.ToHandled() )
351 operation( std::move( pipe.operation ) )
360 operation = std::move( pipe.operation );
369 operation->AddOperation( op.Move() );
378 operation->AddOperation( op.ToHandled() );
389 if( !
bool( operation ) )
throw std::logic_error(
"Invalid pipeline." );
390 return *operation.get();
400 return bool( operation );
440 return operation.get();
449 void Run( Timeout timeout, std::function<
void(
const XRootDStatus&)>
final =
nullptr )
452 throw std::logic_error(
"Pipeline is already running!" );
455 std::promise<XRootDStatus> prms;
456 ftr = prms.get_future();
458 if( !operation ) std::logic_error(
"Empty pipeline!" );
460 Operation<true> *opr = operation.release();
463 h->PreparePipelineStart();
465 opr->Run( timeout, std::move( prms ), std::move(
final ) );
471 std::unique_ptr<Operation<true>> operation;
476 std::future<XRootDStatus> ftr;
488 inline std::future<XRootDStatus>
Async(
Pipeline pipeline, uint16_t timeout = 0 )
490 pipeline.Run( timeout );
491 return std::move( pipeline.ftr );
505 return Async( std::move( pipeline ), timeout ).get();
516 template<
template<
bool>
class Derived,
bool HasHndl,
typename HdlrFactory,
typename ... Args>
519 template<
template<
bool>
class, bool,
typename,
typename ...>
532 static_assert( !HasHndl,
"It is only possible to construct operation without handler" );
557 template<
typename Hdlr>
617 this->
handler->Assign( fo.final );
618 return this->
template Transform<true>();
628 Derived<HasHndl> *me =
static_cast<Derived<HasHndl>*
>( this );
629 return new Derived<HasHndl>( std::move( *me ) );
640 Derived<HasHndl> *me =
static_cast<Derived<HasHndl>*
>( this );
641 return new Derived<true>( std::move( *me ) );
650 Derived<HasHndl> *me =
static_cast<Derived<HasHndl>*
>( this );
651 return std::move( *me );
664 Derived<HasHndl> *me =
static_cast<Derived<HasHndl>*
>( this );
665 return Derived<to>( std::move( *me ) );
677 static_assert( !HasHndl,
"Operator >> is available only for operation without handler" );
679 return Transform<true>();
713 me.AddOperation( op.
Move() );
714 return me.template Transform<true>();
731 return me.template Transform<true>();
747 template <
bool HasHndl>
751 static_assert(HasHndl,
"Only an operation that has a handler can be assigned to workflow");
754 handler->Assign(timeout, std::move(prms), std::move(f),
this);
758 st = RunImpl(h, timeout);
763 }
catch (
const std::exception &ex) {
773 template <
bool HasHndl>
777 handler->AddOperation(op);
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
Derived< to > Transform()
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Operation< true > * ToHandled()
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator|(Operation< true > &&op)
Derived< true > StreamImpl(ResponseHandler *handler)
Derived< true > operator|(Operation< false > &op)
Derived< true > operator|(Operation< true > &op)
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
friend class ConcreteOperation
Derived< true > operator>>(Hdlr &&hdlr)
Derived< true > operator|(Operation< false > &&op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
friend class PipelineHandler
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
virtual Operation< true > * ToHandled()=0
virtual Operation< HasHndl > * Move()=0
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
~PipelineHandler()
Destructor.
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend class PipelineHandler
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
Pipeline(Pipeline &&pipe)
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
bool IsOK() const
We're fine.