XRootD
XrdNetPMarkFF.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d N e t P M a r k C f g . h h */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdint>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <time.h>
36 #include <unistd.h>
37 #include <sys/socket.h>
38 #include <sys/time.h>
39 #include <sys/types.h>
40 
41 #include "Xrd/XrdScheduler.hh"
42 #include "XrdNet/XrdNetAddrInfo.hh"
43 #include "XrdNet/XrdNetMsg.hh"
44 #include "XrdNet/XrdNetPMarkFF.hh"
45 #include "XrdNet/XrdNetUtils.hh"
46 #include "XrdSys/XrdSysError.hh"
47 #include "XrdSys/XrdSysTrace.hh"
48 
49 /******************************************************************************/
50 /* L o c a l M a c r o s */
51 /******************************************************************************/
52 
53 #define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
54 
55 #define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
56 
57 #define EPName(ep) const char *epName = ep
58 
59 /******************************************************************************/
60 /* F i r e f l y P a c k e t T e m p l a t e */
61 /******************************************************************************/
62 
63 namespace
64 {
65 const char *ffFmt0 =
66 "<134>1 - %s xrootd - firefly-json - " //RFC5424 syslog header (abbreviated)
67 "{"
68  "\"version\":1,"
69  "\"flow-lifecycle\":{"
70  "\"state\":\"%%s\"," //-> start | ongoing | end
71  "\"current-time\":\"%%s\"," //-> yyyy-mm-ddThh:mm:ss.uuuuuu+00:00
72  "\"start-time\":\"%s\""
73  "%%s" //-> ,"end-time":"<date-time>"
74  "},"
75  "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76  "\"netlink\":{\"rtt\":%%u.%%.03u},";
77 
78 const char *ffFmt1 =
79  "\"context\":{"
80  "\"experiment-id\":%d,"
81  "\"activity-id\":%d"
82  "%s" //-> ,application:<appname>
83  "},";
84 
85 const char *ffFmt2 =
86  "\"flow-id\":{"
87  "\"afi\":\"ipv%c\"," //-> ipv4 | ipv6
88  "\"src-ip\":\"%s\"," // source client for put o/w server
89  "\"dst-ip\":\"%s\"," // dest server for put o/w client
90  "\"protocol\":\"tcp\","
91  "\"src-port\":%d,"
92  "\"dst-port\":%d"
93  "}"
94 "}";
95 
96 const char *ffApp = ",\"application\":\"%.*s\"";
97 
98 const char *ffEnd = ",\"end-time\":\"%s\"";
99 }
100 
101 /******************************************************************************/
102 /* s t a t i c O b j e c t s */
103 /******************************************************************************/
104 
105 namespace XrdNetPMarkConfig
106 {
107 
108 // Other configuration values
109 //
110 extern XrdSysError *eDest;
111 extern XrdNetMsg *netMsg;
112 extern XrdNetMsg *netOrg;
113 extern XrdScheduler *Sched;
114 extern XrdSysTrace *Trace;
115 
116 extern char *ffDest;
117 extern int ffPortO;
118 extern int ffEcho;
119 extern bool doDebug;
120 extern bool doTrace;
121 
122 extern const char *myHostName;
123 }
124 using namespace XrdNetPMarkConfig;
125 
126 /******************************************************************************/
127 /* T h r e a d I n t e r f a c e s */
128 /******************************************************************************/
129 /*
130 namespace
131 {
132 void *Refresh(void *carg)
133  {int intvl = *(int *)carg;
134  while(true) {XrdSysTimer::Snooze(intvl); XrdNetPMarkCfg::Ping();}
135  }
136 XrdSysMutex ffMutex;
137 }
138 */
139 
140 /******************************************************************************/
141 /* Private: E m i t */
142 /******************************************************************************/
143 
144 bool XrdNetPMarkFF::Emit(const char *state, const char *cT, const char *eT)
145 {
146  EPName("Emit");
147  struct sockStats ss;
148  char msgBuff[1024];
149 
150  SockStats(ss);
151 
152 // Note that the supplier of the data is the source. Hence, for put requests
153 // the client is designated as the source o/w it is the server. So, on a
154 // put request the number of bytes we recived is the number of bytes the
155 // source (i.e. client) sent. Note the temlate is "usage: recv sent".
156 //
157  int n;
158  if (appName && !strcmp(appName, "http-put"))
159  {n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
160  ss.bSent, ss.bRecv, ss.msRTT, ss.usRTT);
161  } else {
162  n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
163  ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
164  }
165 
166  if (n + ffTailsz >= (int)sizeof(msgBuff))
167  {eDest->Emsg("PMarkFF", "invalid json; msgBuff truncated.");
168  fdOK = odOK = false;
169  return false;
170  }
171 
172  memcpy(msgBuff+n, ffTail, ffTailsz+1);
173 
174  if (fdOK)
175  {DEBUG("Sending pmark s-msg: " <<msgBuff);
176  if (netMsg->Send(msgBuff, n+ffTailsz) < 0)
177  {fdOK = false;
178  return false;
179  }
180  }
181 
182  if (odOK)
183  {DEBUG("Sending pmark o-msg: " <<(netMsg ? "=s-msg" : msgBuff));
184  if (netOrg->Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
185  {odOK = false;
186  return false;
187  }
188  }
189 
190  return true;
191 }
192 
193 /******************************************************************************/
194 /* Private: g e t U T C */
195 /******************************************************************************/
196 
197 const char *XrdNetPMarkFF::getUTC(char *utcBuff, int utcBLen)
198 {
199  struct timeval tod;
200  struct tm utcDT;
201  char *bP;
202 
203 // Get the current time in UTC
204 //
205  gettimeofday(&tod, 0);
206  gmtime_r(&tod.tv_sec, &utcDT);
207 
208 // Format this ISO-style
209 //
210  size_t n = strftime(utcBuff, utcBLen, "%FT%T", &utcDT);
211  bP = utcBuff + n; utcBLen -= n;
212  snprintf(bP, utcBLen, ".%06u+00:00", static_cast<unsigned int>(tod.tv_usec));
213 
214 // Return result
215 //
216  return utcBuff;
217 }
218 
219 /******************************************************************************/
220 /* P i n g */
221 /******************************************************************************/
222 /*
223 void XrdNetPMarkCfg::Ping()
224 {
225 // Tell every registered task to send out a continuation
226 //
227  ffMutex.Lock();
228  for (std::set<XdNetPMarkFF*> it = ffTasks.begin(); it!= ffTasks.end(); it++)
229 ???
230  ffMutex.UnLock();
231 }
232 */
233 /******************************************************************************/
234 /* R e g i s t r y */
235 /******************************************************************************/
236 /*
237 XrdNetMsg *XrdNetPMarkCfg::netMsg = 0;
238 std::set<XrdNetPMarkFF*> XrdNetPMarkCfg::ffTasks;
239 
240 void XrdNetPMarkCfg::Registry(XrdNetPMarkFF *ffobj, bool doadd)
241 {
242 // Add or delete ityem from task list
243 //
244  ffMutex.Lock();
245  if (doadd) ffTasks.insert(ffObj);
246  else ffTasks.erase(ffObj);
247  ffMutex.UnLock();
248 }
249 
250 // This is firefly so we must get a netmsg object
251 //
252  bool aOK;
253  netMsg = new XrdNetMsg(eLog, ffDest, aOK);
254  if (!aOK)
255  {eLog->Emsg("Config", "Unable to create UDP tunnel to", ffDest);
256  return 0;
257  }
258 
259 // If there is an interval, start a thread to handle continuations
260 //
261  if (ffIntvl && XrdSysThread::Run(&tid,Refresh,(void *)&ffIntvl,0,"pmark")
262  {eDest->Emsg(epname, errno, "start pmark refresh timer");
263  return 0;
264  }
265 */
266 
267 /******************************************************************************/
268 /* D e s t r u c t o r */
269 /******************************************************************************/
270 
272 {
273 // If all is well, emit the closing message
274 //
275  if (fdOK || odOK)
276  {char utcBuff[40], endBuff[80];
277  snprintf(endBuff, sizeof(endBuff), ffEnd,
278  getUTC(utcBuff, sizeof(utcBuff)));
279  Emit("end", utcBuff, endBuff);
280  }
281 
282 // Cleanup
283 //
284  if (mySad) delete(mySad);
285  if (oDest) free(oDest);
286  if (ffHdr) free(ffHdr);
287  if (ffTail) free(ffTail);
288  if (xtraFH) delete xtraFH;
289 };
290 
291 /******************************************************************************/
292 /* S o c k S t a t s */
293 /******************************************************************************/
294 
295 #ifdef __linux__
296 #include <linux/tcp.h>
297 #endif
298 
299 void XrdNetPMarkFF::SockStats(struct sockStats &ss)
300 {
301 #ifndef __linux__
302  memset(&ss, 0, sizeof(struct sockStats));
303 #else
304  EPName("SockStats");
305  struct tcp_info tcpInfo;
306  socklen_t tiLen = sizeof(tcpInfo);
307 
308 // The data returned is from the server's perspective. This must be
309 // resolved by the caller as to which perspective should be presented.
310 // Note that for put requests the source is the client.
311 //
312  if (getsockopt(sockFD, IPPROTO_TCP, TCP_INFO, (void *)&tcpInfo, &tiLen) == 0)
313  {ss.bRecv = static_cast<uint64_t>(tcpInfo.tcpi_bytes_received);
314  ss.bSent = static_cast<uint64_t>(tcpInfo.tcpi_bytes_acked);
315  ss.msRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt/1000);
316  ss.usRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt%1000);
317  } else {
318  memset(&ss, 0, sizeof(struct sockStats));
319  DEBUG("Unable to get TCP information errno=" << strerror(errno));
320  }
321 #endif
322 }
323 
324 /******************************************************************************/
325 /* S t a r t */
326 /******************************************************************************/
327 
329 {
330  char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
331  int clPort, svPort;
332  char clType, svType;
333  bool fdok = false, odok = false;
334 
335 // Preform app if we need to
336 //
337  if (!appName) *appInfo = 0;
338  else snprintf(appInfo,sizeof(appInfo),ffApp,sizeof(appInfo)-20,appName);
339 
340 // Get the file descriptor for the socket
341 //
342  sockFD = addr.SockFD();
343 
344 // Obtain connectivity information about the peer and ourselves. We really
345 // should obtain our external address and use that but the issue is that
346 // we may have multiple external addresses and the client determines which
347 // one actually gets used. So, it's complicated. A TODO.
348 //
349  clPort = XrdNetUtils::GetSokInfo( sockFD, clIP, sizeof(clIP), clType);
350  if (clPort < 0)
351  {eDest->Emsg("PMarkFF", clPort, "get peer information.");
352  return false;
353  }
354 
355  svPort = XrdNetUtils::GetSokInfo(-sockFD, svIP, sizeof(svIP), svType);
356  if (svPort < 0)
357  {eDest->Emsg("PMarkFF", clPort, "get self information.");
358  return false;
359  }
360 
361 // If there is no special collector, indicate so
362 //
363  if (netMsg) fdok = true;
364 
365 // If the messages need to flow to the origin, get the destination information
366 //
367  if (netOrg)
368  {const XrdNetSockAddr *urSad = addr.NetAddr();
369  if (!urSad) eDest->Emsg("PMarkFF", "unable to get origin address.");
370  else {char buff[1024];
371  mySad = new XrdNetSockAddr;
372  memcpy(mySad, urSad, sizeof(XrdNetSockAddr));
373  mySad->v4.sin_port = htons(static_cast<uint16_t>(ffPortO));
374  snprintf(buff, sizeof(buff), "%s:%d", clIP, ffPortO);
375  oDest = strdup(buff);
376  odok = true;
377  }
378  }
379 
380 // If we cannot report anywhere then indicate we failed
381 //
382  if (!fdok && !odok) return false;
383 
384 // Format the base firefly template. Note that the client determines the
385 // address family that is being used.
386 //
387  char utcBuff[40], bseg0[512];
388  int len0 = snprintf(bseg0, sizeof(bseg0), ffFmt0, myHostName,
389  getUTC(utcBuff, sizeof(utcBuff)));
390  if (len0 >= (int)sizeof(bseg0))
391  {eDest->Emsg("PMarkFF", "invalid json; bseg0 truncated.");
392  return false;
393  }
394 
395  ffHdr = strdup(bseg0);
396 
397  char bseg1[256];
398  int len1 = snprintf(bseg1, sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
399  if (len1 >= (int)sizeof(bseg1))
400  {eDest->Emsg("PMarkFF", "invalid json; bseg1 truncated.");
401  return false;
402  }
403 
404 // Note that by convention FF packets the supplier of the data is designated
405 // as the source. We only know this at this point for http requests and even
406 // then it's hardly accurate. So, for put requests the src if the client.
407 // Ottherwise, we designate the server as the source.
408 //
409  char bseg2[256];
410  int len2;
411  if (appName && !strcmp(appName, "http-put"))
412  {len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
413  clType, clIP, svIP, clPort, svPort);
414  } else {
415  len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
416  clType, svIP, clIP, svPort, clPort);
417  }
418  if (len2 >= (int)sizeof(bseg2))
419  {eDest->Emsg("PMarkFF", "invalid json; cl bseg2 truncated.");
420  return false;
421  }
422 
423  ffTailsz = len1 + len2;
424  ffTail = (char *)malloc(ffTailsz + 1);
425  strcpy(ffTail, bseg1);
426  strcpy(ffTail+len1, bseg2);
427 
428 // OK, we now can emit the starting packet
429 //
430  fdOK = fdok;
431  odOK = odok;
432  return Emit("start", utcBuff, "");
433 }
#define DEBUG(txt)
#define EPName(ep)
#define IPPROTO_TCP
Definition: XrdNetUtils.cc:800
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition: XrdNetMsg.cc:70
bool Start(XrdNetAddrInfo &addr)
virtual ~XrdNetPMarkFF()
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
Definition: XrdNetUtils.cc:498
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysTrace * Trace
XrdScheduler * Sched
XrdSysError * eDest
const char * myHostName