25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
51 #include <arpa/inet.h>
64 class LocalFileHandler;
100 case EntryRedirect:
return "Redirected from: " + fromstr +
" to: "
104 "Falling back to virtual redirector: " + tostr;
108 case EntryWait:
return "Waited at server request. Resending: "
112 return "Failed at: " + fromstr +
", retrying at: " + tostr;
137 std::shared_ptr<SIDManager> sidMgr,
140 pResponseHandler( respHandler ),
142 pEffectiveDataServerUrl( 0 ),
144 pLFileHandler( lFileHandler ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
156 pAsyncChunkIndex( 0 ),
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
162 pOtherRawStarted( false ),
164 pFollowMetalink( false ),
168 pAggregatedWaitTime( 0 ),
173 pTimeoutFence( false ),
175 pDirListStarted( false ),
176 pDirListWithStat( false ),
184 pHasSessionId =
true;
187 log->
Debug(
ExDbgMsg,
"[%s] MsgHandler created: %p (message: %s ).",
196 ntohl( pgrdreq->
rlen ) ) );
212 DumpRedirectTraceBack();
216 delete pEffectiveDataServerUrl;
218 pRequest =
reinterpret_cast<Message*
>( 0xDEADBEEF );
220 pPostMaster =
reinterpret_cast<PostMaster*
>( 0xDEADBEEF );
222 pChunkList =
reinterpret_cast<ChunkList*
>( 0xDEADBEEF );
223 pEffectiveDataServerUrl =
reinterpret_cast<URL*
>( 0xDEADBEEF );
237 virtual uint16_t
Examine( std::shared_ptr<Message> &msg )
override;
256 virtual uint16_t
GetSid()
const override;
263 virtual void Process()
override;
278 uint32_t &bytesRead )
override;
298 virtual bool IsRaw()
const override;
311 uint32_t &bytesWritten )
override;
325 pExpiration = expiration;
342 pRedirectAsAnswer = redirectAsAnswer;
351 pOksofarAsAnswer = oksofarAsAnswer;
369 pLoadBalancer = loadBalancer;
370 pHasLoadBalancer =
true;
378 pHosts.reset( hostList );
386 pChunkList = chunkList;
388 pBodyReader->SetChunkList( chunkList );
390 pChunkStatus.resize( chunkList->size() );
392 pChunkStatus.clear();
397 pCrc32cDigests = std::move( crc32cDigests );
413 pRedirectCounter = redirectCounter;
418 pFollowMetalink = followMetalink;
423 pStateful = stateful;
441 static constexpr
int kSendDone = 0x0001;
442 static constexpr
int kSawResp = 0x0002;
443 static constexpr
int kFinalResp = 0x0004;
458 void HandleResponse();
475 Status ParseXAttrResponse(
char *data,
size_t len,
AnyObject *&response );
481 Status RewriteRequestRedirect(
const URL &newUrl );
486 Status RewriteRequestWait();
491 void UpdateTriedCGI(uint32_t errNo=0);
496 void SwitchOnRefreshFlag();
502 void HandleRspOrQueue();
507 void HandleLocalRedirect(
URL *url );
525 bool OmitWait(
Message &request,
const URL &url );
534 bool RetriableErrorResponse(
const Status &status );
539 void DumpRedirectTraceBack();
549 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result );
559 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result );
570 Status ReadFromBuffer(
char *&buffer,
size_t &buflen,
size_t size,
571 std::string &result );
578 ChunkStatus(): sizeError( false ), done( false ) {}
583 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
585 static const size_t CksumSize =
sizeof( uint32_t );
587 static const size_t MaxSslErrRetry = 3;
589 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
599 if( _1stpg + CksumSize > dlen )
600 _1stpg = dlen - CksumSize;
601 dlen -= _1stpg + CksumSize;
603 pgcnt += dlen / PageWithCksum;
604 if( dlen % PageWithCksum )
610 std::shared_ptr<Message> pResponse;
611 std::vector<std::shared_ptr<Message>> pPartialResps;
612 ResponseHandler *pResponseHandler;
614 URL *pEffectiveDataServerUrl;
615 PostMaster *pPostMaster;
616 std::shared_ptr<SIDManager> pSidMgr;
617 LocalFileHandler *pLFileHandler;
618 XRootDStatus pStatus;
621 bool pRedirectAsAnswer;
622 bool pOksofarAsAnswer;
623 std::unique_ptr<HostList> pHosts;
624 bool pHasLoadBalancer;
625 HostInfo pLoadBalancer;
627 std::string pRedirectUrl;
629 std::vector<uint32_t> pCrc32cDigests;
631 std::vector<ChunkStatus> pChunkStatus;
632 uint16_t pRedirectCounter;
633 uint16_t pNotAuthorizedCounter;
635 uint32_t pAsyncOffset;
636 uint32_t pAsyncChunkIndex;
638 std::unique_ptr<AsyncPageReader> pPageReader;
639 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
641 Buffer pPgWrtCksumBuff;
642 uint32_t pPgWrtCurrentPageOffset;
643 uint32_t pPgWrtCurrentPageNb;
645 bool pOtherRawStarted;
647 bool pFollowMetalink;
650 int pAggregatedWaitTime;
652 std::unique_ptr<RedirectEntry> pRdirEntry;
653 RedirectTraceBack pRedirectTraceBack;
656 std::atomic<int> pSendingState;
663 std::atomic<bool> pTimeoutFence;
670 bool pDirListStarted;
671 bool pDirListWithStat;
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
std::string GetLocation() const
Get location (protocol://host:port/path)
bool IsValid() const
Is the url valid.
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
const Message * GetRequest() const
Get the request pointer.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void OnReadyToSend([[maybe_unused]] Message *msg) override
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
~XRootDMsgHandler()
Destructor.
void WaitDone(time_t now)
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
void SetStateful(bool stateful)
void SetOksofarAsAnswer(bool oksofarAsAnswer)
time_t GetExpiration() override
Get a timestamp after which we give up.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
virtual bool IsRaw() const override
Are we a raw writer or not?
void SetRedirectAsAnswer(bool redirectAsAnswer)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.