37 #include <sys/socket.h>
39 #include <sys/types.h>
53 #define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
55 #define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
57 #define EPName(ep) const char *epName = ep
66 "<134>1 - %s xrootd - firefly-json - "
69 "\"flow-lifecycle\":{"
71 "\"current-time\":\"%%s\","
72 "\"start-time\":\"%s\""
75 "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76 "\"netlink\":{\"rtt\":%%u.%%.03u},";
80 "\"experiment-id\":%d,"
90 "\"protocol\":\"tcp\","
96 const char *ffApp =
",\"application\":\"%.*s\"";
98 const char *ffEnd =
",\"end-time\":\"%s\"";
144 bool XrdNetPMarkFF::Emit(
const char *state,
const char *cT,
const char *eT)
158 if (appName && !strcmp(appName,
"http-put"))
159 {n = snprintf(msgBuff,
sizeof(msgBuff), ffHdr, state, cT, eT,
160 ss.bSent, ss.bRecv, ss.msRTT, ss.usRTT);
162 n = snprintf(msgBuff,
sizeof(msgBuff), ffHdr, state, cT, eT,
163 ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
166 if (n + ffTailsz >= (
int)
sizeof(msgBuff))
167 {
eDest->
Emsg(
"PMarkFF",
"invalid json; msgBuff truncated.");
172 memcpy(msgBuff+n, ffTail, ffTailsz+1);
175 {
DEBUG(
"Sending pmark s-msg: " <<msgBuff);
183 {
DEBUG(
"Sending pmark o-msg: " <<(
netMsg ?
"=s-msg" : msgBuff));
184 if (
netOrg->
Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
197 const char *XrdNetPMarkFF::getUTC(
char *utcBuff,
int utcBLen)
205 gettimeofday(&tod, 0);
206 gmtime_r(&tod.tv_sec, &utcDT);
210 size_t n = strftime(utcBuff, utcBLen,
"%FT%T", &utcDT);
211 bP = utcBuff + n; utcBLen -= n;
212 snprintf(bP, utcBLen,
".%06u+00:00",
static_cast<unsigned int>(tod.tv_usec));
276 {
char utcBuff[40], endBuff[80];
277 snprintf(endBuff,
sizeof(endBuff), ffEnd,
278 getUTC(utcBuff,
sizeof(utcBuff)));
279 Emit(
"end", utcBuff, endBuff);
284 if (mySad)
delete(mySad);
285 if (oDest) free(oDest);
286 if (ffHdr) free(ffHdr);
287 if (ffTail) free(ffTail);
288 if (xtraFH)
delete xtraFH;
296 #include <linux/tcp.h>
299 void XrdNetPMarkFF::SockStats(
struct sockStats &ss)
302 memset(&ss, 0,
sizeof(
struct sockStats));
305 struct tcp_info tcpInfo;
306 socklen_t tiLen =
sizeof(tcpInfo);
312 if (getsockopt(sockFD,
IPPROTO_TCP, TCP_INFO, (
void *)&tcpInfo, &tiLen) == 0)
313 {ss.bRecv =
static_cast<uint64_t
>(tcpInfo.tcpi_bytes_received);
314 ss.bSent =
static_cast<uint64_t
>(tcpInfo.tcpi_bytes_acked);
315 ss.msRTT =
static_cast<uint32_t
>(tcpInfo.tcpi_rtt/1000);
316 ss.usRTT =
static_cast<uint32_t
>(tcpInfo.tcpi_rtt%1000);
318 memset(&ss, 0,
sizeof(
struct sockStats));
319 DEBUG(
"Unable to get TCP information errno=" << strerror(errno));
330 char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
333 bool fdok =
false, odok =
false;
337 if (!appName) *appInfo = 0;
338 else snprintf(appInfo,
sizeof(appInfo),ffApp,
sizeof(appInfo)-20,appName);
351 {
eDest->
Emsg(
"PMarkFF", clPort,
"get peer information.");
357 {
eDest->
Emsg(
"PMarkFF", clPort,
"get self information.");
369 if (!urSad)
eDest->
Emsg(
"PMarkFF",
"unable to get origin address.");
370 else {
char buff[1024];
373 mySad->v4.sin_port = htons(
static_cast<uint16_t
>(
ffPortO));
374 snprintf(buff,
sizeof(buff),
"%s:%d", clIP,
ffPortO);
375 oDest = strdup(buff);
382 if (!fdok && !odok)
return false;
387 char utcBuff[40], bseg0[512];
388 int len0 = snprintf(bseg0,
sizeof(bseg0), ffFmt0,
myHostName,
389 getUTC(utcBuff,
sizeof(utcBuff)));
390 if (len0 >= (
int)
sizeof(bseg0))
391 {
eDest->
Emsg(
"PMarkFF",
"invalid json; bseg0 truncated.");
395 ffHdr = strdup(bseg0);
398 int len1 = snprintf(bseg1,
sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
399 if (len1 >= (
int)
sizeof(bseg1))
400 {
eDest->
Emsg(
"PMarkFF",
"invalid json; bseg1 truncated.");
411 if (appName && !strcmp(appName,
"http-put"))
412 {len2 = snprintf(bseg2,
sizeof(bseg2), ffFmt2,
413 clType, clIP, svIP, clPort, svPort);
415 len2 = snprintf(bseg2,
sizeof(bseg2), ffFmt2,
416 clType, svIP, clIP, svPort, clPort);
418 if (len2 >= (
int)
sizeof(bseg2))
419 {
eDest->
Emsg(
"PMarkFF",
"invalid json; cl bseg2 truncated.");
423 ffTailsz = len1 + len2;
424 ffTail = (
char *)malloc(ffTailsz + 1);
425 strcpy(ffTail, bseg1);
426 strcpy(ffTail+len1, bseg2);
432 return Emit(
"start", utcBuff,
"");
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
bool Start(XrdNetAddrInfo &addr)
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)