51 #include "XrdVersion.hh"
53 #include <arpa/inet.h>
54 #include <sys/types.h>
88 std::pair< std::set<std::string>::iterator,
bool > ret =
protocols.insert( protocol );
145 strmqueues.resize( size - 1, 0 );
153 strmqueues.resize( size - 1, 0);
161 uint16_t
Select(
const std::vector<bool> &connected )
164 size_t minval = std::numeric_limits<size_t>::max();
166 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
168 if( !connected[i] )
continue;
170 if( strmqueues[i] < minval )
173 minval = strmqueues[i];
187 --strmqueues[substrm - 1];
192 std::vector<size_t> strmqueues;
198 bindprefs( std::move( bindprefs ) ), next( 0 )
202 inline const std::string&
Get()
204 std::string &ret = bindprefs[next];
206 if( next >= bindprefs.size() )
212 std::vector<std::string> bindprefs;
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
320 size_t leftToBeRead = 8 - message.
GetCursor();
321 while( leftToBeRead )
325 leftToBeRead, bytesRead );
329 leftToBeRead -= bytesRead;
334 uint32_t bodySize = *(uint32_t*)(message.
GetBuffer(4));
337 "body", &message, bodySize );
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
355 bodySize = rsphdr->
dlen;
357 if( message.
GetSize() < bodySize + 8 )
360 leftToBeRead = bodySize-(message.
GetCursor()-8);
361 while( leftToBeRead )
369 leftToBeRead -= bytesRead;
392 uint32_t bodySize = rsphdr->
dlen;
395 "kXR_status: invalid message size." );
400 if( message.
GetSize() < bodySize + 8 )
403 size_t leftToBeRead = bodySize-(message.
GetCursor()-8);
404 while( leftToBeRead )
412 leftToBeRead -= bytesRead;
444 channelData.
Set( info );
448 env->
GetInt(
"SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->
stream.resize( streams );
471 channelData.
Get( info );
478 "[%s] Internal error: not enough substreams",
486 return HandShakeMain( handShakeData, channelData );
488 return HandShakeParallel( handShakeData, channelData );
498 channelData.
Get( info );
507 handShakeData->
out = GenerateInitialHSProtocol( handShakeData, info,
518 XRootDStatus st = ProcessServerHS( handShakeData, info );
532 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
542 handShakeData->
out = GenerateProtocol( handShakeData, info,
548 handShakeData->
out = GenerateLogIn( handShakeData, info );
559 XRootDStatus st = ProcessLogInResp( handShakeData, info );
567 if( st.IsOK() && st.code ==
suDone )
576 handShakeData->
out = GenerateEndSession( handShakeData, info );
586 st = DoAuthentication( handShakeData, info );
599 XRootDStatus st = DoAuthentication( handShakeData, info );
607 if( st.IsOK() && st.code ==
suDone )
614 handShakeData->
out = GenerateEndSession( handShakeData, info );
632 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
634 if( st.IsOK() && st.code ==
suDone )
638 else if( !st.IsOK() )
652 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
653 AnyObject &channelData )
655 XRootDChannelInfo *info = 0;
656 channelData.Get( info );
658 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
666 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
678 XRootDStatus st = ProcessServerHS( handShakeData, info );
692 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
700 handShakeData->out = GenerateBind( handShakeData, info );
710 XRootDStatus st = ProcessBindResp( handShakeData, info );
718 return XRootDStatus();
720 return XRootDStatus();
731 channelData.
Get( info );
743 channelData.
Get( info );
754 env->
GetInt(
"DataServerTTL", ttl );
759 env->
GetInt(
"LoadBalancerTTL", ttl );
766 uint16_t allocatedSIDs = info->
sidManager->GetNumberOfAllocatedSIDs();
768 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769 info->
streamName.c_str(), (
long long) inactiveTime, ttl, allocatedSIDs,
772 if( info->
openFiles != 0 && info->
finstcnt.load( std::memory_order_relaxed ) != 0 )
775 if( !allocatedSIDs && inactiveTime > ttl )
789 channelData.
Get( info );
794 env->
GetInt(
"StreamTimeout", streamTimeout );
798 const time_t now = time(0);
800 info->
sidManager->IsAnySIDOldAs( now - streamTimeout );
803 "stream timeout: %d, any SID: %d, wait barrier: %s",
804 info->
streamName.c_str(), (
long long) inactiveTime, streamTimeout,
807 if( inactiveTime < streamTimeout )
810 if( now < info->waitBarrier )
835 channelData.
Get( info );
849 uint16_t upStream = 0;
850 uint16_t downStream = 0;
855 downStream = hint->
down;
860 std::vector<bool> connected;
861 connected.reserve( info->
stream.size() - 1 );
862 size_t nbConnected = 0;
863 for(
size_t i = 1; i < info->
stream.size(); ++i )
866 connected.push_back(
true );
870 connected.push_back(
false );
872 if( nbConnected == 0 )
878 if( upStream >= info->
stream.size() )
881 "[%s] Up link stream %d does not exist, using 0",
886 if( downStream >= info->
stream.size() )
889 "[%s] Down link stream %d does not exist, using 0",
913 memset( newBuf, 0, 8 );
987 return PathID( upStream, downStream );
998 channelData.
Get( info );
1011 uint16_t ret = info->
stream.size();
1015 env->
GetInt(
"TlsNoData", nodata );
1028 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1035 if( ret == 1 ) ++ret;
1038 if( ret > info->
stream.size() )
1040 info->
stream.resize( ret );
1139 uint16_t numChunks = (req->
readv.
dlen)/16;
1141 for(
size_t i = 0; i < numChunks; ++i )
1143 dataChunk[i].
rlen = htonl( dataChunk[i].rlen );
1144 dataChunk[i].
offset = htonll( dataChunk[i].offset );
1157 for(
size_t i = 0; i < numChunks; ++i )
1159 wrtList[i].
wlen = htonl( wrtList[i].wlen );
1160 wrtList[i].
offset = htonll( wrtList[i].offset );
1244 m->
body.protocol.pval = ntohl( m->
body.protocol.pval );
1245 m->
body.protocol.flags = ntohl( m->
body.protocol.flags );
1256 m->
body.error.errnum = ntohl( m->
body.error.errnum );
1266 m->
body.wait.seconds = htonl( m->
body.wait.seconds );
1276 m->
body.redirect.port = htonl( m->
body.redirect.port );
1286 m->
body.waitresp.seconds = htonl( m->
body.waitresp.seconds );
1296 m->
body.attn.actnum = htonl( m->
body.attn.actnum );
1332 "kXR_status: invalid message size." );
1360 "corrupted (crc32c integrity check failed)." );
1367 "(stream ID mismatch)." );
1375 "(request ID mismatch)." );
1399 "kXR_status: invalid message size." );
1413 if( crcval != cse->
cseCRC )
1416 "corrupted (crc32c integrity check failed)." );
1427 for(
size_t i = 0; i < pgcnt; ++i )
1428 pgoffs[i] = ntohll( pgoffs[i] );
1448 header->
dlen = ntohl( header->
dlen );
1458 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
1459 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
1461 rsp->
body.error.errnum, errmsg );
1471 channelData.
Get( info );
1474 uint16_t nbConnected = 0;
1475 for(
size_t i = 1; i < info->
stream.size(); ++i )
1486 uint16_t subStreamId )
1489 channelData.
Get( info );
1492 CleanUpProtection( info );
1494 if( !info->
stream.empty() )
1500 if( subStreamId == 0 )
1518 channelData.
Get( info );
1527 result.
Set( (
const char*)
"XRootD",
false );
1566 channelData.
Get( info );
1588 "response that we're no longer interested in (timed out)",
1600 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1601 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1613 uint32_t seconds = 0;
1615 seconds = ntohl( rsp->
body.wait.seconds ) + 5;
1619 seconds = ntohl( rsp->
body.waitresp.seconds );
1621 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %u seconds, "
1622 "setting up wait barrier.",
1627 time_t barrier = time(0) + seconds;
1635 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1636 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1645 info->
finstcnt.fetch_add( 1, std::memory_order_relaxed );
1681 channelData.
Get( info );
1706 channelData.
Get( info );
1734 sign->
Grab(
reinterpret_cast<char*
>( newreq ), rc );
1746 channelData.
Get( info );
1747 if( info->
finstcnt.load( std::memory_order_relaxed ) > 0 )
1748 info->
finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1757 pSecUnloadHandler->
unloaded =
true;
1767 channelData.
Get( info );
1771 env->
GetInt(
"NoTlsOK", notlsok );
1845 channelData.
Get( info );
1862 "[%s] Sending out the initial hand shake + kXR_protocol",
1872 init->
fifth = htonl(2012);
1875 InitProtocolReq( proto, info, expect );
1883 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1884 XRootDChannelInfo *info,
1889 "[%s] Sending out the kXR_protocol",
1890 hsData->streamName.c_str() );
1892 Message *msg =
new Message();
1897 InitProtocolReq( proto, info, expect );
1906 XRootDChannelInfo *info,
1919 env->
GetInt(
"NoTlsOK", notlsok );
1922 env->
GetInt(
"TlsNoData", tlsnodata );
1924 if (info->encrypted ||
InitTLS())
1927 if (info->encrypted && !(notlsok || tlsnodata))
1930 request->
expect = expect;
1943 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
1944 XRootDChannelInfo *info )
1948 Message *msg = hsData->in;
1955 hsData->streamName.c_str() );
1960 info->protocolVersion = ntohl(hs->
protover);
1966 "[%s] Got the server hand shake response (%s, protocol "
1968 hsData->streamName.c_str(),
1969 ServerFlagsToStr( info->serverFlags ).c_str(),
1970 info->protocolVersion );
1978 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
1979 XRootDChannelInfo *info )
1993 hsData->streamName.c_str() );
2000 env->
GetInt(
"NoTlsOK", notlsok );
2008 if( !notlsok )
return XRootDStatus(
stFatal,
errTlsError, ENOTSUP,
"TLS not supported" );
2015 "[%s] Falling back to unencrypted transmission, server does "
2016 "not support TLS encryption.",
2017 hsData->streamName.c_str() );
2018 info->encrypted =
false;
2021 if( rsp->
body.protocol.pval >= 0x297 )
2022 info->serverFlags = rsp->
body.protocol.flags;
2027 info->protRespBody->flags = rsp->
body.protocol.flags;
2028 info->protRespBody->pval = rsp->
body.protocol.pval;
2030 char* bodybuff =
reinterpret_cast<char*
>( &rsp->
body.protocol.secreq );
2031 size_t bodysize = rsp->
hdr.
dlen - 8;
2032 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2038 "[%s] kXR_protocol successful (%s, protocol version %x)",
2039 hsData->streamName.c_str(),
2040 ServerFlagsToStr( info->serverFlags ).c_str(),
2041 info->protocolVersion );
2043 if( !( info->serverFlags &
kXR_haveTLS ) && info->encrypted )
2050 "Server was not configured to support encryption." );
2058 env->
GetInt(
"WantTlsOnNoPgrw", tlsOnNoPgrw );
2059 if( !( info->serverFlags &
kXR_suppgrw ) && tlsOnNoPgrw )
2065 if( info->encrypted )
2068 "[%s] Server does not support PgRead/PgWrite and"
2069 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2070 hsData->streamName.c_str() );
2080 info->encrypted =
true;
2088 XRootDStatus XRootDTransport::ProcessProtocolBody(
char *bodybuff,
2090 XRootDChannelInfo *info )
2101 if( bodysize < bifreq->bifILen )
2103 "protocol response." );
2104 std::string bindprefs_str( bodybuff, bifreq->
bifILen );
2105 std::vector<std::string> bindprefs;
2107 info->bindSelector.reset(
new BindPrefSelector( std::move( bindprefs ) ) );
2115 if( bodysize >= 6 && secreq->
theTag ==
'S' )
2117 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2118 info->protRespSize = bodysize + 8 ;
2121 return XRootDStatus();
2127 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2128 XRootDChannelInfo *info )
2133 "[%s] Sending out the bind request",
2134 hsData->streamName.c_str() );
2141 memcpy( bindReq->
sessid, info->sessionId, 16 );
2150 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2151 XRootDChannelInfo *info )
2164 hsData->streamName.c_str() );
2168 info->stream[hsData->subStreamId].pathId = rsp->
body.bind.substreamid;
2170 hsData->streamName.c_str() );
2172 return XRootDStatus();
2178 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2179 XRootDChannelInfo *info )
2190 char *cgiBuffer =
new char[1024 + info->logintoken.size()];
2191 std::string appName;
2192 std::string monInfo;
2193 env->GetString(
"AppName", appName );
2194 env->GetString(
"MonInfo", monInfo );
2195 if( info->logintoken.empty() )
2197 snprintf( cgiBuffer, 1024,
2198 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2199 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2200 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2204 snprintf( cgiBuffer, 1024,
2205 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2206 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2207 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2209 uint16_t cgiLen = strlen( cgiBuffer );
2219 loginReq->
pid = ::getpid();
2221 loginReq->
dlen = cgiLen;
2227 int multiProtocol = 0;
2228 env->GetInt(
"MultiProtocol", multiProtocol );
2236 bool dualStack =
false;
2237 bool privateIPv6 =
false;
2238 bool privateIPv4 =
false;
2262 if( !dualStack && hsData->serverAddr )
2275 std::string buffer( 8, 0 );
2276 if( hsData->url->GetUserName().length() )
2277 buffer = hsData->url->GetUserName();
2280 char *name =
new char[1024];
2287 buffer.resize( 8, 0 );
2288 std::copy( buffer.begin(), buffer.end(), (
char*)loginReq->
username );
2290 msg->Append( cgiBuffer, cgiLen, 24 );
2293 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2294 "private IPv6: %s", hsData->streamName.c_str(),
2295 loginReq->
username, cgiBuffer, dualStack ?
"true" :
"false",
2296 privateIPv4 ?
"true" :
"false",
2297 privateIPv6 ?
"true" :
"false" );
2299 delete [] cgiBuffer;
2307 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2308 XRootDChannelInfo *info )
2321 hsData->streamName.c_str() );
2325 if( !info->firstLogIn )
2326 memcpy( info->oldSessionId, info->sessionId, 16 );
2328 if( rsp->
hdr.
dlen == 0 && info->protocolVersion <= 0x289 )
2335 memset( info->sessionId, 0, 16 );
2337 "[%s] Logged in, accepting empty login response.",
2338 hsData->streamName.c_str() );
2339 return XRootDStatus();
2345 memcpy( info->sessionId, rsp->
body.login.sessid, 16 );
2350 hsData->streamName.c_str(), sessId.c_str() );
2357 size_t len = rsp->
hdr.
dlen-16;
2358 info->authBuffer =
new char[len+1];
2359 info->authBuffer[len] = 0;
2360 memcpy( info->authBuffer, rsp->
body.login.sec, len );
2362 hsData->streamName.c_str(), info->authBuffer );
2367 return XRootDStatus();
2373 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2374 XRootDChannelInfo *info )
2380 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2382 std::string protocolName;
2390 hsData->streamName.c_str() );
2396 info->authEnv->Put(
"sockname", hsData->clientName.c_str() );
2397 info->authEnv->Put(
"username", hsData->url->GetUserName().c_str() );
2398 info->authEnv->Put(
"password", hsData->url->GetPassword().c_str() );
2401 URL::ParamsMap::const_iterator it;
2402 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2404 if( it->first.compare( 0, 4,
"xrd." ) == 0 ||
2405 it->first.compare( 0, 6,
"xrdcl." ) == 0 )
2406 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2412 size_t authBuffLen = strlen( info->authBuffer );
2413 char *pars = (
char *)malloc( authBuffLen + 1 );
2414 memcpy( pars, info->authBuffer, authBuffLen );
2417 delete [] info->authBuffer;
2418 info->authBuffer = 0;
2423 XRootDStatus st = GetCredentials( credentials, hsData, info );
2426 CleanUpAuthentication( info );
2429 protocolName = info->authProtocol->Entity.prot;
2438 protocolName = info->authProtocol->Entity.prot;
2446 "[%s] Sending more authentication data for %s",
2447 hsData->streamName.c_str(), protocolName.c_str() );
2450 char *secTokenData = (
char*)malloc( len );
2451 memcpy( secTokenData, rsp->
body.authmore.data, len );
2454 credentials = info->authProtocol->getCredentials( secToken, &ei );
2463 "[%s] Auth protocol handler for %s refuses to give "
2464 "us more credentials %s",
2465 hsData->streamName.c_str(), protocolName.c_str(),
2467 CleanUpAuthentication( info );
2477 info->authProtocolName = info->authProtocol->Entity.prot;
2482 if( info->protRespBody )
2484 int rc =
XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2488 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2493 "[%s] XrdSecProtect: no protection needed.",
2494 hsData->streamName.c_str() );
2499 "[%s] Failed to load XrdSecProtect: %s",
2500 hsData->streamName.c_str(),
XrdSysE2T( -rc ) );
2501 CleanUpAuthentication( info );
2507 if( !info->protection )
2508 CleanUpAuthentication( info );
2510 pSecUnloadHandler->
Register( info->authProtocolName );
2513 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2514 protocolName.c_str() );
2522 return XRootDStatus();
2529 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
2530 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
2532 "[%s] Authentication with %s failed: %s",
2533 hsData->streamName.c_str(), protocolName.c_str(),
2537 info->authProtocol->Delete();
2538 info->authProtocol = 0;
2543 XRootDStatus st = GetCredentials( credentials, hsData, info );
2546 CleanUpAuthentication( info );
2549 protocolName = info->authProtocol->Entity.prot;
2556 info->authProtocolName = info->authProtocol->Entity.prot;
2557 CleanUpAuthentication( info );
2560 "[%s] Authentication with %s failed: unexpected answer",
2561 hsData->streamName.c_str(), protocolName.c_str() );
2577 protocolName.length() > 4 ? 4 : protocolName.length() );
2579 memcpy( reqBuffer, credentials->
buffer, credentials->
size );
2597 HandShakeData *hsData,
2598 XRootDChannelInfo *info )
2614 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secuid") : 0;
2615 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secgid") : 0;
2620 if(secuidc) secuid = atoi(secuidc);
2621 if(secgidc) secgid = atoi(secgidc);
2624 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2625 if(!uidSetter.IsOk()) {
2626 log->Error(
XRootDTransportMsg,
"[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2627 hsData->streamName.c_str(), secuid, secgid );
2631 if(secuid >= 0 || secgid >= 0) {
2632 log->Error(
XRootDTransportMsg,
"[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2633 hsData->streamName.c_str() );
2635 " only supported on Linux" );
2644 srvAddrInfo.
SetTLS( info->encrypted );
2650 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2654 if( !info->authProtocol )
2657 hsData->streamName.c_str() );
2661 std::string protocolName = info->authProtocol->Entity.prot;
2663 hsData->streamName.c_str(), protocolName.c_str() );
2668 credentials = info->authProtocol->getCredentials( 0, &ei );
2672 "[%s] Cannot get credentials for protocol %s: %s",
2673 hsData->streamName.c_str(), protocolName.c_str(),
2675 info->authProtocol->Delete();
2685 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2687 if( info->authProtocol )
2688 info->authProtocol->Delete();
2689 delete info->authParams;
2690 delete info->authEnv;
2691 info->authProtocol = 0;
2692 info->authParams = 0;
2701 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2706 if( info->protection )
2708 info->protection->Delete();
2709 info->protection = 0;
2711 CleanUpAuthentication( info );
2714 if( info->protRespBody )
2716 delete info->protRespBody;
2717 info->protRespBody = 0;
2718 info->protRespSize = 0;
2730 char errorBuff[1024];
2735 auto ret = authHandler.load( std::memory_order_relaxed );
2736 if( ret )
return ret;
2745 ret = authHandler.load( std::memory_order_relaxed );
2746 if( ret )
return ret;
2750 authHandler.store( ret, std::memory_order_relaxed );
2755 "Unable to get the security framework: %s", errorBuff );
2764 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2765 XRootDChannelInfo *info )
2776 memcpy( endsessReq->
sessid, info->oldSessionId, 16 );
2780 " %s", hsData->streamName.c_str(), sessId.c_str() );
2791 uint32_t size = sign->GetSize();
2792 sign->ReAllocate( size + msg->GetSize() );
2793 char* buffer = sign->GetBuffer( size );
2794 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2795 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2804 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2805 XRootDChannelInfo *info )
2827 std::string errorMsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen - 4 );
2829 "kXR_endsess: %s", hsData->streamName.c_str(),
2837 std::string msg( rsp->
body.wait.infomsg, rsp->
hdr.
dlen - 4 );
2839 "kXR_endsess: %s", hsData->streamName.c_str(),
2841 hsData->out = GenerateEndSession( hsData, info );
2852 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2854 std::string repr =
"type: ";
2878 repr.erase( repr.length()-1, 1 );
2889 char *GetDataAsString(
char *msg )
2892 char *fn =
new char[req->
dlen+1];
2893 memcpy( fn, msg + 24, req->
dlen );
2920 char *fn = GetDataAsString( msg );
2921 o <<
"file: " << fn <<
", ";
2923 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
2924 o << std::setbase(10);
2941 o <<
"kXR_open_apnd ";
2943 o <<
"kXR_open_read ";
2945 o <<
"kXR_open_updt ";
2949 o <<
"kXR_refresh ";
2951 o <<
"kXR_replica ";
2957 o <<
"kXR_retstat ";
2970 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
2984 char *fn = GetDataAsString( msg );;
2985 o <<
"path: " << fn <<
", ";
2990 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3012 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3013 o << std::setbase(10);
3015 o <<
"offset: " << sreq->
offset <<
", ";
3016 o <<
"size: " << sreq->
rlen <<
")";
3026 o <<
"kXR_pgread (";
3027 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3028 o << std::setbase(10);
3030 o <<
"offset: " << sreq->
offset <<
", ";
3031 o <<
"size: " << sreq->
rlen <<
")";
3042 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3043 o << std::setbase(10);
3045 o <<
"offset: " << sreq->
offset <<
", ";
3046 o <<
"size: " << sreq->
dlen <<
")";
3056 o <<
"kXR_pgwrite (";
3057 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3058 o << std::setbase(10);
3060 o <<
"offset: " << sreq->
offset <<
", ";
3061 o <<
"size: " << sreq->
dlen <<
")";
3072 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3083 o <<
"kXR_truncate (";
3085 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3088 char *fn = GetDataAsString( msg );
3089 o <<
"file: " << fn;
3092 o << std::setbase(10);
3094 o <<
"offset: " << sreq->
offset;
3104 unsigned char *fhandle = 0;
3109 fhandle = dataChunk[0].
fhandle;
3111 o << FileHandleToStr( fhandle );
3115 o << std::setbase(10);
3120 size += dataChunk[i].
rlen;
3121 o <<
"(offset: " << dataChunk[i].
offset;
3122 o <<
", size: " << dataChunk[i].
rlen <<
"); ";
3125 o <<
"total size: " << size <<
")";
3134 unsigned char *fhandle = 0;
3135 o <<
"kXR_writev (";
3140 uint32_t numChunks = 0;
3144 size += wrtList[i].
wlen;
3149 o << FileHandleToStr( fhandle );
3153 o << std::setbase(10);
3154 o <<
"chunks: " << numChunks <<
", ";
3155 o <<
"total size: " << size <<
")";
3165 char *fn = GetDataAsString( msg );;
3166 o <<
"kXR_locate (";
3167 o <<
"path: " << fn <<
", ";
3175 o <<
"kXR_refresh ";
3177 o <<
"kXR_prefname ";
3183 o <<
"kXR_compress ";
3199 o <<
"destination: ";
3221 case kXR_QPrep: o <<
"kXR_QPrep";
break;
3224 case kXR_Qvisa: o <<
"kXR_Qvisa";
break;
3226 default: o << sreq->
infotype;
break;
3232 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3236 o <<
"arg length: " << sreq->
dlen <<
")";
3246 char *fn = GetDataAsString( msg );;
3247 o <<
"path: " << fn <<
")";
3259 char *fn = GetDataAsString( msg );
3260 o <<
"path: " << fn <<
", ";
3262 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
3263 o << std::setbase(10);
3270 o <<
"kXR_mkdirpath";
3282 char *fn = GetDataAsString( msg );
3283 o <<
"path: " << fn <<
")";
3295 char *fn = GetDataAsString( msg );
3296 o <<
"path: " << fn <<
", ";
3298 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
")";
3317 o <<
"kXR_protocol (";
3318 o <<
"clientpv: 0x" << std::setbase(16) << sreq->
clientpv <<
")";
3327 o <<
"kXR_dirlist (";
3328 char *fn = GetDataAsString( msg );;
3329 o <<
"path: " << fn <<
")";
3340 char *fn = GetDataAsString( msg );;
3341 o <<
"data: " << fn <<
")";
3352 o <<
"kXR_prepare (";
3369 o <<
", priority: " << (int) sreq->
prty <<
", ";
3371 char *fn = GetDataAsString( msg );
3373 for( cursor = fn; *cursor; ++cursor )
3374 if( *cursor ==
'\n' ) *cursor =
' ';
3376 o <<
"paths: " << fn <<
")";
3384 o <<
"kXR_chkpoint (";
3392 o <<
"kXR_ckpXeq) ";
3406 o <<
"kXR_unknown (length: " << req->
dlen <<
")";
3415 std::string XRootDTransport::FileHandleToStr(
const unsigned char handle[4] )
3417 std::ostringstream o;
3419 for( uint8_t i = 0; i < 4; ++i )
3421 o << std::setbase(16) << std::setfill(
'0') << std::setw(2);
3422 o << (int)handle[i];
static const int kXR_ckpRollback
struct ClientTruncateRequest truncate
union ServerResponse::@0 body
ServerResponseStatus status
struct ClientPgReadRequest pgread
struct ClientMkdirRequest mkdir
struct ClientAuthRequest auth
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientReadVRequest readv
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
struct ClientWriteVRequest writev
struct ClientLoginRequest login
struct ClientChmodRequest chmod
struct ClientQueryRequest query
struct ClientReadRequest read
struct ClientMvRequest mv
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
static const int kXR_ckpCommit
struct ClientPrepareRequest prepare
static const int kXR_ckpQuery
struct ClientWriteRequest write
#define kXR_PROTTLSVERSION
struct ClientProtocolRequest protocol
struct ClientLocateRequest locate
static const int kXR_ckpBegin
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters §oken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::map< std::string, std::string > ParamsMap
bool IsSecure() const
Does the protocol indicate encryption.
bool IsTPC() const
Is the URL used in TPC context.
std::string GetLoginToken() const
Get the login token if present in the opaque info.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
XRootDTransport()
Constructor.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
~XRootDTransport()
Destructor.
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suContinue
const int DefaultTlsNoData
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
struct ServerResponseBifs_Protocol bifReqs
BindPrefSelector(std::vector< std::string > &&bindprefs)
const std::string & Get()
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
static void UnloadHandler()
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
StreamSelector(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
XrdSecParameters * authParams
XrdSecProtocol * authProtocol
XrdSecProtect * protection
std::unique_ptr< BindPrefSelector > bindSelector
std::string authProtocolName
std::atomic< uint32_t > finstcnt
unsigned int protRespSize
ServerResponseBody_Protocol * protRespBody
XRootDChannelInfo(const URL &url)
std::set< uint16_t > sentOpens
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.