51 #include "XrdVersion.hh"
53 #include <arpa/inet.h>
54 #include <sys/types.h>
88 std::pair< std::set<std::string>::iterator,
bool > ret =
protocols.insert( protocol );
145 strmqueues.resize( size - 1, 0 );
153 strmqueues.resize( size - 1, 0);
161 uint16_t
Select(
const std::vector<bool> &connected )
164 size_t minval = std::numeric_limits<size_t>::max();
166 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
168 if( !connected[i] )
continue;
170 if( strmqueues[i] < minval )
173 minval = strmqueues[i];
187 --strmqueues[substrm - 1];
192 std::vector<size_t> strmqueues;
198 bindprefs( std::move( bindprefs ) ), next( 0 )
202 inline const std::string&
Get()
204 std::string &ret = bindprefs[next];
206 if( next >= bindprefs.size() )
212 std::vector<std::string> bindprefs;
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
320 size_t leftToBeRead = 8 - message.
GetCursor();
321 while( leftToBeRead )
325 leftToBeRead, bytesRead );
329 leftToBeRead -= bytesRead;
334 uint32_t bodySize = *(uint32_t*)(message.
GetBuffer(4));
337 "body", (
void*)&message, bodySize );
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
355 bodySize = rsphdr->
dlen;
357 if( message.
GetSize() < bodySize + 8 )
360 leftToBeRead = bodySize-(message.
GetCursor()-8);
361 while( leftToBeRead )
369 leftToBeRead -= bytesRead;
392 uint32_t bodySize = rsphdr->
dlen;
395 "kXR_status: invalid message size." );
400 if( message.
GetSize() < bodySize + 8 )
403 size_t leftToBeRead = bodySize-(message.
GetCursor()-8);
404 while( leftToBeRead )
412 leftToBeRead -= bytesRead;
444 channelData.
Set( info );
448 env->
GetInt(
"SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->
stream.resize( streams );
471 channelData.
Get( info );
482 "[%s] Internal error: not enough substreams",
490 return HandShakeMain( handShakeData, channelData );
492 return HandShakeParallel( handShakeData, channelData );
502 channelData.
Get( info );
506 "[%s] Internal error: no channel info",
519 handShakeData->
out = GenerateInitialHSProtocol( handShakeData, info,
530 XRootDStatus st = ProcessServerHS( handShakeData, info );
544 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
554 handShakeData->
out = GenerateProtocol( handShakeData, info,
560 handShakeData->
out = GenerateLogIn( handShakeData, info );
571 XRootDStatus st = ProcessLogInResp( handShakeData, info );
579 if( st.IsOK() && st.code ==
suDone )
588 handShakeData->
out = GenerateEndSession( handShakeData, info );
598 st = DoAuthentication( handShakeData, info );
611 XRootDStatus st = DoAuthentication( handShakeData, info );
619 if( st.IsOK() && st.code ==
suDone )
626 handShakeData->
out = GenerateEndSession( handShakeData, info );
644 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
646 if( st.IsOK() && st.code ==
suDone )
650 else if( !st.IsOK() )
664 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
665 AnyObject &channelData )
667 XRootDChannelInfo *info = 0;
668 channelData.Get( info );
672 "[%s] Internal error: no channel info",
673 handShakeData->streamName.c_str());
677 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
685 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
697 XRootDStatus st = ProcessServerHS( handShakeData, info );
711 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
719 handShakeData->out = GenerateBind( handShakeData, info );
729 XRootDStatus st = ProcessBindResp( handShakeData, info );
737 return XRootDStatus();
739 return XRootDStatus();
750 channelData.
Get( info );
754 "[%s] Internal error: no channel info",
770 channelData.
Get( info );
777 "Internal error: no channel info, behaving as if TTL has elapsed");
788 env->
GetInt(
"DataServerTTL", ttl );
793 env->
GetInt(
"LoadBalancerTTL", ttl );
800 uint16_t allocatedSIDs = info->
sidManager->GetNumberOfAllocatedSIDs();
802 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
803 info->
streamName.c_str(), (
long long) inactiveTime, ttl, allocatedSIDs,
806 if( info->
openFiles != 0 && info->
finstcnt.load( std::memory_order_relaxed ) != 0 )
809 if( !allocatedSIDs && inactiveTime > ttl )
823 channelData.
Get( info );
829 "Internal error: no channel info, behaving as if stream is broken");
834 env->
GetInt(
"StreamTimeout", streamTimeout );
838 const time_t now = time(0);
840 info->
sidManager->IsAnySIDOldAs( now - streamTimeout );
843 "stream timeout: %d, any SID: %d, wait barrier: %s",
844 info->
streamName.c_str(), (
long long) inactiveTime, streamTimeout,
847 if( inactiveTime < streamTimeout )
850 if( now < info->waitBarrier )
875 channelData.
Get( info );
879 "Internal error: no channel info, cannot multiplex");
896 uint16_t upStream = 0;
897 uint16_t downStream = 0;
902 downStream = hint->
down;
907 std::vector<bool> connected;
908 connected.reserve( info->
stream.size() - 1 );
909 size_t nbConnected = 0;
910 for(
size_t i = 1; i < info->
stream.size(); ++i )
913 connected.push_back(
true );
917 connected.push_back(
false );
919 if( nbConnected == 0 )
925 if( upStream >= info->
stream.size() )
928 "[%s] Up link stream %d does not exist, using 0",
933 if( downStream >= info->
stream.size() )
936 "[%s] Down link stream %d does not exist, using 0",
960 memset( newBuf, 0, 8 );
1034 return PathID( upStream, downStream );
1045 channelData.
Get( info );
1064 uint16_t ret = info->
stream.size();
1068 env->
GetInt(
"TlsNoData", nodata );
1081 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1082 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1088 if( ret == 1 ) ++ret;
1091 if( ret > info->
stream.size() )
1093 info->
stream.resize( ret );
1192 uint16_t numChunks = (req->
readv.
dlen)/16;
1194 for(
size_t i = 0; i < numChunks; ++i )
1196 dataChunk[i].
rlen = htonl( dataChunk[i].rlen );
1197 dataChunk[i].
offset = htonll( dataChunk[i].offset );
1210 for(
size_t i = 0; i < numChunks; ++i )
1212 wrtList[i].
wlen = htonl( wrtList[i].wlen );
1213 wrtList[i].
offset = htonll( wrtList[i].offset );
1297 m->
body.protocol.pval = ntohl( m->
body.protocol.pval );
1298 m->
body.protocol.flags = ntohl( m->
body.protocol.flags );
1309 m->
body.error.errnum = ntohl( m->
body.error.errnum );
1319 m->
body.wait.seconds = htonl( m->
body.wait.seconds );
1329 m->
body.redirect.port = htonl( m->
body.redirect.port );
1339 m->
body.waitresp.seconds = htonl( m->
body.waitresp.seconds );
1349 m->
body.attn.actnum = htonl( m->
body.attn.actnum );
1385 "kXR_status: invalid message size." );
1413 "corrupted (crc32c integrity check failed)." );
1420 "(stream ID mismatch)." );
1428 "(request ID mismatch)." );
1452 "kXR_status: invalid message size." );
1466 if( crcval != cse->
cseCRC )
1469 "corrupted (crc32c integrity check failed)." );
1480 for(
size_t i = 0; i < pgcnt; ++i )
1481 pgoffs[i] = ntohll( pgoffs[i] );
1501 header->
dlen = ntohl( header->
dlen );
1511 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
1512 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
1514 rsp->
body.error.errnum, errmsg );
1524 channelData.
Get( info );
1533 uint16_t nbConnected = 0;
1534 for(
size_t i = 1; i < info->
stream.size(); ++i )
1545 uint16_t subStreamId )
1548 channelData.
Get( info );
1557 CleanUpProtection( info );
1559 if( !info->
stream.empty() )
1565 if( subStreamId == 0 )
1583 channelData.
Get( info );
1596 result.
Set( (
const char*)
"XRootD",
false );
1635 channelData.
Get( info );
1657 "response that we're no longer interested in (timed out)",
1669 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1670 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1682 uint32_t seconds = 0;
1684 seconds = ntohl( rsp->
body.wait.seconds ) + 5;
1688 seconds = ntohl( rsp->
body.waitresp.seconds );
1690 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %u seconds, "
1691 "setting up wait barrier.",
1696 time_t barrier = time(0) + seconds;
1704 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1705 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1714 info->
finstcnt.fetch_add( 1, std::memory_order_relaxed );
1750 channelData.
Get( info );
1775 channelData.
Get( info );
1803 sign->
Grab(
reinterpret_cast<char*
>( newreq ), rc );
1815 channelData.
Get( info );
1816 if( info->
finstcnt.load( std::memory_order_relaxed ) > 0 )
1817 info->
finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1826 pSecUnloadHandler->
unloaded =
true;
1836 channelData.
Get( info );
1840 env->
GetInt(
"NoTlsOK", notlsok );
1915 channelData.
Get( info );
1933 "[%s] Sending out the initial hand shake + kXR_protocol",
1943 init->
fifth = htonl(2012);
1946 InitProtocolReq( proto, info, expect );
1954 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1955 XRootDChannelInfo *info,
1960 "[%s] Sending out the kXR_protocol",
1961 hsData->streamName.c_str() );
1963 Message *msg =
new Message();
1968 InitProtocolReq( proto, info, expect );
1977 XRootDChannelInfo *info,
1990 env->
GetInt(
"NoTlsOK", notlsok );
1993 env->
GetInt(
"TlsNoData", tlsnodata );
1995 if (info->encrypted ||
InitTLS())
1998 if (info->encrypted && !(notlsok || tlsnodata))
2001 request->
expect = expect;
2014 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
2015 XRootDChannelInfo *info )
2019 Message *msg = hsData->in;
2026 hsData->streamName.c_str() );
2031 info->protocolVersion = ntohl(hs->
protover);
2037 "[%s] Got the server hand shake response (%s, protocol "
2039 hsData->streamName.c_str(),
2040 ServerFlagsToStr( info->serverFlags ).c_str(),
2041 info->protocolVersion );
2049 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
2050 XRootDChannelInfo *info )
2064 hsData->streamName.c_str() );
2071 env->
GetInt(
"NoTlsOK", notlsok );
2079 if( !notlsok )
return XRootDStatus(
stFatal,
errTlsError, ENOTSUP,
"TLS not supported" );
2086 "[%s] Falling back to unencrypted transmission, server does "
2087 "not support TLS encryption.",
2088 hsData->streamName.c_str() );
2089 info->encrypted =
false;
2092 if( rsp->
body.protocol.pval >= 0x297 )
2093 info->serverFlags = rsp->
body.protocol.flags;
2098 info->protRespBody->flags = rsp->
body.protocol.flags;
2099 info->protRespBody->pval = rsp->
body.protocol.pval;
2101 char* bodybuff =
reinterpret_cast<char*
>( &rsp->
body.protocol.secreq );
2102 size_t bodysize = rsp->
hdr.
dlen - 8;
2103 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2109 "[%s] kXR_protocol successful (%s, protocol version %x)",
2110 hsData->streamName.c_str(),
2111 ServerFlagsToStr( info->serverFlags ).c_str(),
2112 info->protocolVersion );
2114 if( !( info->serverFlags &
kXR_haveTLS ) && info->encrypted )
2121 "Server was not configured to support encryption." );
2129 env->
GetInt(
"WantTlsOnNoPgrw", tlsOnNoPgrw );
2130 if( !( info->serverFlags &
kXR_suppgrw ) && tlsOnNoPgrw )
2136 if( info->encrypted )
2139 "[%s] Server does not support PgRead/PgWrite and"
2140 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2141 hsData->streamName.c_str() );
2151 info->encrypted =
true;
2159 XRootDStatus XRootDTransport::ProcessProtocolBody(
char *bodybuff,
2161 XRootDChannelInfo *info )
2172 if( bodysize < bifreq->bifILen )
2174 "protocol response." );
2175 std::string bindprefs_str( bodybuff, bifreq->
bifILen );
2176 std::vector<std::string> bindprefs;
2178 info->bindSelector.reset(
new BindPrefSelector( std::move( bindprefs ) ) );
2186 if( bodysize >= 6 && secreq->
theTag ==
'S' )
2188 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2189 info->protRespSize = bodysize + 8 ;
2192 return XRootDStatus();
2198 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2199 XRootDChannelInfo *info )
2204 "[%s] Sending out the bind request",
2205 hsData->streamName.c_str() );
2212 memcpy( bindReq->
sessid, info->sessionId, 16 );
2221 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2222 XRootDChannelInfo *info )
2235 hsData->streamName.c_str() );
2239 info->stream[hsData->subStreamId].pathId = rsp->
body.bind.substreamid;
2241 hsData->streamName.c_str() );
2243 return XRootDStatus();
2249 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2250 XRootDChannelInfo *info )
2261 char *cgiBuffer =
new char[1024 + info->logintoken.size()];
2262 std::string appName;
2263 std::string monInfo;
2264 env->GetString(
"AppName", appName );
2265 env->GetString(
"MonInfo", monInfo );
2266 if( info->logintoken.empty() )
2268 snprintf( cgiBuffer, 1024,
2269 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2270 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2271 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2275 snprintf( cgiBuffer, 1024,
2276 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2277 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2278 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2280 uint16_t cgiLen = strlen( cgiBuffer );
2290 loginReq->
pid = ::getpid();
2292 loginReq->
dlen = cgiLen;
2298 int multiProtocol = 0;
2299 env->GetInt(
"MultiProtocol", multiProtocol );
2307 bool dualStack =
false;
2308 bool privateIPv6 =
false;
2309 bool privateIPv4 =
false;
2333 if( !dualStack && hsData->serverAddr )
2346 std::string buffer( 8, 0 );
2347 if( hsData->url->GetUserName().length() )
2348 buffer = hsData->url->GetUserName();
2351 char *name =
new char[1024];
2358 buffer.resize( 8, 0 );
2359 std::copy( buffer.begin(), buffer.end(), (
char*)loginReq->
username );
2361 msg->Append( cgiBuffer, cgiLen, 24 );
2364 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2365 "private IPv6: %s", hsData->streamName.c_str(),
2366 loginReq->
username, cgiBuffer, dualStack ?
"true" :
"false",
2367 privateIPv4 ?
"true" :
"false",
2368 privateIPv6 ?
"true" :
"false" );
2370 delete [] cgiBuffer;
2378 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2379 XRootDChannelInfo *info )
2392 hsData->streamName.c_str() );
2396 if( !info->firstLogIn )
2397 memcpy( info->oldSessionId, info->sessionId, 16 );
2399 if( rsp->
hdr.
dlen == 0 && info->protocolVersion <= 0x289 )
2406 memset( info->sessionId, 0, 16 );
2408 "[%s] Logged in, accepting empty login response.",
2409 hsData->streamName.c_str() );
2410 return XRootDStatus();
2416 memcpy( info->sessionId, rsp->
body.login.sessid, 16 );
2421 hsData->streamName.c_str(), sessId.c_str() );
2428 size_t len = rsp->
hdr.
dlen-16;
2429 info->authBuffer =
new char[len+1];
2430 info->authBuffer[len] = 0;
2431 memcpy( info->authBuffer, rsp->
body.login.sec, len );
2433 hsData->streamName.c_str(), info->authBuffer );
2438 return XRootDStatus();
2444 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2445 XRootDChannelInfo *info )
2451 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2453 std::string protocolName;
2461 hsData->streamName.c_str() );
2467 info->authEnv->Put(
"sockname", hsData->clientName.c_str() );
2468 info->authEnv->Put(
"username", hsData->url->GetUserName().c_str() );
2469 info->authEnv->Put(
"password", hsData->url->GetPassword().c_str() );
2472 URL::ParamsMap::const_iterator it;
2473 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2475 if( it->first.compare( 0, 4,
"xrd." ) == 0 ||
2476 it->first.compare( 0, 6,
"xrdcl." ) == 0 )
2477 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2483 size_t authBuffLen = strlen( info->authBuffer );
2484 char *pars = (
char *)malloc( authBuffLen + 1 );
2485 memcpy( pars, info->authBuffer, authBuffLen );
2488 delete [] info->authBuffer;
2489 info->authBuffer = 0;
2494 XRootDStatus st = GetCredentials( credentials, hsData, info );
2497 CleanUpAuthentication( info );
2500 protocolName = info->authProtocol->Entity.prot;
2509 protocolName = info->authProtocol->Entity.prot;
2517 "[%s] Sending more authentication data for %s",
2518 hsData->streamName.c_str(), protocolName.c_str() );
2521 char *secTokenData = (
char*)malloc( len );
2522 memcpy( secTokenData, rsp->
body.authmore.data, len );
2525 credentials = info->authProtocol->getCredentials( secToken, &ei );
2534 "[%s] Auth protocol handler for %s refuses to give "
2535 "us more credentials %s",
2536 hsData->streamName.c_str(), protocolName.c_str(),
2538 CleanUpAuthentication( info );
2548 info->authProtocolName = info->authProtocol->Entity.prot;
2553 if( info->protRespBody )
2555 int rc =
XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2559 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2564 "[%s] XrdSecProtect: no protection needed.",
2565 hsData->streamName.c_str() );
2570 "[%s] Failed to load XrdSecProtect: %s",
2571 hsData->streamName.c_str(),
XrdSysE2T( -rc ) );
2572 CleanUpAuthentication( info );
2578 if( !info->protection )
2579 CleanUpAuthentication( info );
2581 pSecUnloadHandler->
Register( info->authProtocolName );
2584 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2585 protocolName.c_str() );
2593 return XRootDStatus();
2600 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
2601 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
2603 "[%s] Authentication with %s failed: %s",
2604 hsData->streamName.c_str(), protocolName.c_str(),
2608 info->authProtocol->Delete();
2609 info->authProtocol = 0;
2614 XRootDStatus st = GetCredentials( credentials, hsData, info );
2617 CleanUpAuthentication( info );
2620 protocolName = info->authProtocol->Entity.prot;
2627 info->authProtocolName = info->authProtocol->Entity.prot;
2628 CleanUpAuthentication( info );
2631 "[%s] Authentication with %s failed: unexpected answer",
2632 hsData->streamName.c_str(), protocolName.c_str() );
2648 protocolName.length() > 4 ? 4 : protocolName.length() );
2650 memcpy( reqBuffer, credentials->
buffer, credentials->
size );
2668 HandShakeData *hsData,
2669 XRootDChannelInfo *info )
2685 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secuid") : 0;
2686 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secgid") : 0;
2691 if(secuidc) secuid = atoi(secuidc);
2692 if(secgidc) secgid = atoi(secgidc);
2695 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2696 if(!uidSetter.IsOk()) {
2697 log->Error(
XRootDTransportMsg,
"[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2698 hsData->streamName.c_str(), secuid, secgid );
2702 if(secuid >= 0 || secgid >= 0) {
2703 log->Error(
XRootDTransportMsg,
"[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2704 hsData->streamName.c_str() );
2706 " only supported on Linux" );
2715 srvAddrInfo.
SetTLS( info->encrypted );
2721 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2725 if( !info->authProtocol )
2728 hsData->streamName.c_str() );
2732 std::string protocolName = info->authProtocol->Entity.prot;
2734 hsData->streamName.c_str(), protocolName.c_str() );
2739 credentials = info->authProtocol->getCredentials( 0, &ei );
2743 "[%s] Cannot get credentials for protocol %s: %s",
2744 hsData->streamName.c_str(), protocolName.c_str(),
2746 info->authProtocol->Delete();
2756 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2758 if( info->authProtocol )
2759 info->authProtocol->Delete();
2760 delete info->authParams;
2761 delete info->authEnv;
2762 info->authProtocol = 0;
2763 info->authParams = 0;
2772 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2777 if( info->protection )
2779 info->protection->Delete();
2780 info->protection = 0;
2782 CleanUpAuthentication( info );
2785 if( info->protRespBody )
2787 delete info->protRespBody;
2788 info->protRespBody = 0;
2789 info->protRespSize = 0;
2801 char errorBuff[1024];
2806 auto ret = authHandler.load( std::memory_order_relaxed );
2807 if( ret )
return ret;
2816 ret = authHandler.load( std::memory_order_relaxed );
2817 if( ret )
return ret;
2821 authHandler.store( ret, std::memory_order_relaxed );
2826 "Unable to get the security framework: %s", errorBuff );
2835 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2836 XRootDChannelInfo *info )
2847 memcpy( endsessReq->
sessid, info->oldSessionId, 16 );
2851 " %s", hsData->streamName.c_str(), sessId.c_str() );
2862 uint32_t size = sign->GetSize();
2863 sign->ReAllocate( size + msg->GetSize() );
2864 char* buffer = sign->GetBuffer( size );
2865 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2866 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2875 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2876 XRootDChannelInfo *info )
2898 std::string errorMsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen - 4 );
2900 "kXR_endsess: %s", hsData->streamName.c_str(),
2908 std::string msg( rsp->
body.wait.infomsg, rsp->
hdr.
dlen - 4 );
2910 "kXR_endsess: %s", hsData->streamName.c_str(),
2912 hsData->out = GenerateEndSession( hsData, info );
2923 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2925 std::string repr =
"type: ";
2949 repr.erase( repr.length()-1, 1 );
2960 char *GetDataAsString(
char *msg )
2963 char *fn =
new char[req->
dlen+1];
2964 memcpy( fn, msg + 24, req->
dlen );
2991 char *fn = GetDataAsString( msg );
2992 o <<
"file: " << fn <<
", ";
2994 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
2995 o << std::setbase(10);
3002 o <<
"kXR_compress ";
3014 o <<
"kXR_open_apnd ";
3016 o <<
"kXR_open_read ";
3018 o <<
"kXR_open_updt ";
3020 o <<
"kXR_open_wrto ";
3024 o <<
"kXR_prefname ";
3026 o <<
"kXR_refresh ";
3028 o <<
"kXR_4dirlist ";
3030 o <<
"kXR_replica ";
3036 o <<
"kXR_retstat ";
3049 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3063 char *fn = GetDataAsString( msg );;
3064 o <<
"path: " << fn <<
", ";
3069 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3091 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3092 o << std::setbase(10);
3094 o <<
"offset: " << sreq->
offset <<
", ";
3095 o <<
"size: " << sreq->
rlen <<
")";
3105 o <<
"kXR_pgread (";
3106 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3107 o << std::setbase(10);
3109 o <<
"offset: " << sreq->
offset <<
", ";
3110 o <<
"size: " << sreq->
rlen <<
")";
3121 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3122 o << std::setbase(10);
3124 o <<
"offset: " << sreq->
offset <<
", ";
3125 o <<
"size: " << sreq->
dlen <<
")";
3135 o <<
"kXR_pgwrite (";
3136 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3137 o << std::setbase(10);
3139 o <<
"offset: " << sreq->
offset <<
", ";
3140 o <<
"size: " << sreq->
dlen <<
")";
3167 o <<
" unknown subcode: " << sreq->
subcode;
3170 o <<
" (handle: " << FileHandleToStr( sreq->
fhandle );
3171 o << std::setbase(10);
3173 o <<
", numattr: " << nattr;
3181 o <<
", total size: " << req->
dlen <<
")";
3192 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3203 o <<
"kXR_truncate (";
3205 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3208 char *fn = GetDataAsString( msg );
3209 o <<
"file: " << fn;
3212 o << std::setbase(10);
3214 o <<
"offset: " << sreq->
offset;
3224 unsigned char *fhandle = 0;
3229 fhandle = dataChunk[0].
fhandle;
3231 o << FileHandleToStr( fhandle );
3235 o << std::setbase(10);
3240 size += dataChunk[i].
rlen;
3241 o <<
"(offset: " << dataChunk[i].
offset;
3242 o <<
", size: " << dataChunk[i].
rlen <<
"); ";
3245 o <<
"total size: " << size <<
")";
3254 unsigned char *fhandle = 0;
3255 o <<
"kXR_writev (";
3260 uint32_t numChunks = 0;
3264 size += wrtList[i].
wlen;
3269 o << FileHandleToStr( fhandle );
3273 o << std::setbase(10);
3274 o <<
"chunks: " << numChunks <<
", ";
3275 o <<
"total size: " << size <<
")";
3285 char *fn = GetDataAsString( msg );;
3286 o <<
"kXR_locate (";
3287 o <<
"path: " << fn <<
", ";
3295 o <<
"kXR_refresh ";
3297 o <<
"kXR_prefname ";
3303 o <<
"kXR_compress ";
3319 o <<
"destination: ";
3341 case kXR_QPrep: o <<
"kXR_QPrep";
break;
3344 case kXR_Qvisa: o <<
"kXR_Qvisa";
break;
3346 default: o << sreq->
infotype;
break;
3352 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3356 o <<
"arg length: " << sreq->
dlen <<
")";
3366 char *fn = GetDataAsString( msg );;
3367 o <<
"path: " << fn <<
")";
3379 char *fn = GetDataAsString( msg );
3380 o <<
"path: " << fn <<
", ";
3382 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
3383 o << std::setbase(10);
3390 o <<
"kXR_mkdirpath";
3402 char *fn = GetDataAsString( msg );
3403 o <<
"path: " << fn <<
")";
3415 char *fn = GetDataAsString( msg );
3416 o <<
"path: " << fn <<
", ";
3418 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
")";
3437 o <<
"kXR_protocol (";
3438 o <<
"clientpv: 0x" << std::setbase(16) << sreq->
clientpv <<
")";
3447 o <<
"kXR_dirlist (";
3448 char *fn = GetDataAsString( msg );;
3449 o <<
"path: " << fn <<
")";
3460 char *fn = GetDataAsString( msg );;
3461 o <<
"data: " << fn <<
")";
3472 o <<
"kXR_prepare (";
3489 o <<
", priority: " << (int) sreq->
prty <<
", ";
3491 char *fn = GetDataAsString( msg );
3493 for( cursor = fn; *cursor; ++cursor )
3494 if( *cursor ==
'\n' ) *cursor =
' ';
3496 o <<
"paths: " << fn <<
")";
3504 o <<
"kXR_chkpoint (";
3512 o <<
"kXR_ckpXeq) ";
3526 o <<
"kXR_unknown (length: " << req->
dlen <<
")";
3535 std::string XRootDTransport::FileHandleToStr(
const unsigned char handle[4] )
3537 std::ostringstream o;
3539 for( uint8_t i = 0; i < 4; ++i )
3541 o << std::setbase(16) << std::setfill(
'0') << std::setw(2);
3542 o << (int)handle[i];
static const int kXR_ckpRollback
struct ClientTruncateRequest truncate
union ServerResponse::@0 body
ServerResponseStatus status
struct ClientPgReadRequest pgread
struct ClientMkdirRequest mkdir
struct ClientAuthRequest auth
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientReadVRequest readv
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
struct ClientWriteVRequest writev
struct ClientLoginRequest login
struct ClientChmodRequest chmod
struct ClientQueryRequest query
struct ClientReadRequest read
struct ClientMvRequest mv
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
static const int kXR_ckpCommit
struct ClientPrepareRequest prepare
static const int kXR_ckpQuery
struct ClientWriteRequest write
#define kXR_PROTTLSVERSION
struct ClientProtocolRequest protocol
struct ClientLocateRequest locate
static const int kXR_ckpBegin
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters §oken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::map< std::string, std::string > ParamsMap
bool IsSecure() const
Does the protocol indicate encryption.
bool IsTPC() const
Is the URL used in TPC context.
std::string GetLoginToken() const
Get the login token if present in the opaque info.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
XRootDTransport()
Constructor.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
~XRootDTransport()
Destructor.
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suContinue
const int DefaultTlsNoData
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
struct ServerResponseBifs_Protocol bifReqs
BindPrefSelector(std::vector< std::string > &&bindprefs)
const std::string & Get()
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
static void UnloadHandler()
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
StreamSelector(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
XrdSecParameters * authParams
XrdSecProtocol * authProtocol
XrdSecProtect * protection
std::unique_ptr< BindPrefSelector > bindSelector
std::string authProtocolName
std::atomic< uint32_t > finstcnt
unsigned int protRespSize
ServerResponseBody_Protocol * protRespBody
XRootDChannelInfo(const URL &url)
std::set< uint16_t > sentOpens
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.