XRootD
XrdTpcTPC.hh
Go to the documentation of this file.
1 
2 #include <memory>
3 #include <string>
4 #include <vector>
5 #include <sys/time.h>
6 
8 
10 #include "XrdHttp/XrdHttpUtils.hh"
11 
12 #include "XrdTls/XrdTlsTempCA.hh"
13 #include "XrdTpcPMarkManager.hh"
14 
15 #include <curl/curl.h>
16 
17 class XrdOucErrInfo;
18 class XrdOucStream;
19 class XrdSfsFile;
20 class XrdSfsFileSystem;
21 class XrdXrootdTpcMon;
22 typedef void CURL;
23 
24 namespace TPC {
25 class State;
26 
27 enum LogMask {
28  Debug = 0x01,
29  Info = 0x02,
30  Warning = 0x04,
31  Error = 0x08,
32  All = 0xff
33 };
34 
35 enum class TpcType {
36  Pull,
37  Push
38 };
39 
40 struct CurlDeleter {
41  void operator()(CURL *curl);
42 };
43 using ManagedCurlHandle = std::unique_ptr<CURL, CurlDeleter>;
44 
45 
46 class TPCHandler : public XrdHttpExtHandler {
47 public:
48  TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv);
49  virtual ~TPCHandler();
50 
51  virtual bool MatchesPath(const char *verb, const char *path);
52  virtual int ProcessReq(XrdHttpExtReq &req);
53  // Abstract method in the base class, but does not seem to be used
54  virtual int Init(const char *cfgfile) {return 0;}
55  static constexpr std::string_view OSS_TASK_OPAQUE = "oss.task=httptpc";
56 private:
57 
58  static int sockopt_callback(void * clientp, curl_socket_t curlfd, curlsocktype purpose);
59  static int opensocket_callback(void *clientp,
60  curlsocktype purpose,
61  struct curl_sockaddr *address);
62 
63  static int closesocket_callback(void *clientp, curl_socket_t fd);
64 
65  struct TPCLogRecord {
66 
67  TPCLogRecord(XrdHttpExtReq & req, const TpcType tpcType) : bytes_transferred( -1 ), status( -1 ),
68  tpc_status(-1), streams( 1 ), isIPv6(false), mReq(req), pmarkManager(mReq,tpcType), mTpcType(tpcType)
69  {
70  gettimeofday(&begT, 0); // Set effective start time
71  }
72  ~TPCLogRecord();
73 
74  std::string log_prefix;
75  std::string local;
76  std::string remote;
77  std::string name;
78  std::string clID;
79  static XrdXrootdTpcMon* tpcMonitor;
80  timeval begT;
81  off_t bytes_transferred;
82  int status;
83  int tpc_status;
84  unsigned int streams;
85  bool isIPv6;
86  XrdHttpExtReq & mReq;
87  XrdTpc::PMarkManager pmarkManager;
88  XrdSysError * m_log;
89  TpcType mTpcType;
90  };
91 
92  int ProcessOptionsReq(XrdHttpExtReq &req);
93 
94  static std::string GetAuthz(XrdHttpExtReq &req);
95 
96  // Configure curl handle's CA settings. The CA files present here should
97  // be valid for the lifetime of the process.
98  void ConfigureCurlCA(CURL *curl);
99 
100  // Redirect the transfer according to the contents of an XrdOucErrInfo object.
101  int RedirectTransfer(CURL *curl, const std::string &redirect_resource, XrdHttpExtReq &req,
102  XrdOucErrInfo &error, TPCLogRecord &);
103 
104  int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode,
105  int openMode, const XrdSecEntity &sec,
106  const std::string &authz);
107 
108  int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
109  bool &success, TPCLogRecord &, bool shouldReturnErrorToClient = true);
110 
111  int GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t & contentLength, bool & success, TPCLogRecord &rec);
112 
113  // Send a 'performance marker' back to the TPC client, informing it of our
114  // progress. The TPC client will use this information to determine whether
115  // the transfer is making sufficient progress and/or other monitoring info
116  // (such as whether the transfer is happening over IPv4, IPv6, or both).
117  int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state);
118  int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
119  off_t bytes_transferred);
120 
121  // Perform the libcurl transfer, periodically sending back chunked updates.
122  int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
123  TPCLogRecord &rec);
124 
125  // Experimental multi-stream version of RunCurlWithUpdates
126  int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state,
127  size_t streams, TPCLogRecord &rec);
128  int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state,
129  size_t streams, std::vector<TPC::State*> &streams_handles,
130  std::vector<ManagedCurlHandle> &curl_handles,
131  TPCLogRecord &rec);
132 
133  int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req);
134  int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req);
135 
136  bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt,
137  std::string &path2, bool &path2_alt);
138  bool Configure(const char *configfn, XrdOucEnv *myEnv);
139  bool ConfigureLogger(XrdOucStream &Config);
140 
141  // Generate a consistently-formatted log message.
142  void logTransferEvent(LogMask lvl, const TPCLogRecord &record,
143  const std::string &event, const std::string &message="");
144 
145  std::string generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode = CURLcode::CURLE_OK);
146 
147  std::string prepareURL(XrdHttpExtReq &req);
148 
149  static int m_marker_period;
150  static size_t m_block_size;
151  static size_t m_small_block_size;
152  bool m_desthttps;
153  bool m_fixed_route; // If 'true' the Destination IP in an HTTP-TPC is forced to be the same as the IP used to contact the server
154  // when 'false' any IP available can be selected
155  int m_timeout; // the 'timeout interval'; if no bytes have been received during this time period, abort the transfer.
156  int m_first_timeout; // the 'first timeout interval'; the amount of time we're willing to wait to get the first byte.
157  // Unless explicitly specified, this is 2x the timeout interval.
158  std::string m_cadir; // The directory to use for CAs.
159  std::string m_cafile; // The file to use for CAs in libcurl
160  static XrdSysMutex m_monid_mutex;
161  static uint64_t m_monid;
162  XrdSysError m_log;
163  XrdSfsFileSystem *m_sfs;
164  std::shared_ptr<XrdTlsTempCA> m_ca_file;
165 
166  // 16 blocks in flight at 16 MB each, meaning that there will be up to 256MB
167  // in flight; this is equal to the bandwidth delay product of a 200ms transcontinental
168  // connection at 10Gbps.
169 #ifdef USE_PIPELINING
170  static const int m_pipelining_multiplier = 16;
171 #else
172  static const int m_pipelining_multiplier = 1;
173 #endif
174 
175  bool usingEC; // indicate if XrdEC is used
176 
177  // Time to connect the curl socket to the remote server uses the linux's default value
178  // of 60 seconds
179  static const long CONNECT_TIMEOUT = 60;
180 
181  // hdr2cgimap
182  std::map<std::string,std::string> hdr2cgimap;
183 };
184 }
Utility functions for XrdHTTP.
void CURL
Definition: XrdTpcState.hh:14
void CURL
Definition: XrdTpcTPC.hh:21
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
Definition: XrdTpcTPC.cc:289
virtual int ProcessReq(XrdHttpExtReq &req)
Definition: XrdTpcTPC.cc:253
virtual ~TPCHandler()
Definition: XrdTpcTPC.cc:281
static constexpr std::string_view OSS_TASK_OPAQUE
Definition: XrdTpcTPC.hh:55
virtual int Init(const char *cfgfile)
Initializes the external request handler.
Definition: XrdTpcTPC.hh:54
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
Definition: XrdTpcTPC.cc:234
TpcType
Definition: XrdTpcTPC.hh:35
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
Definition: XrdTpcTPC.hh:43
LogMask
Definition: XrdTpcTPC.hh:27
@ All
Definition: XrdTpcTPC.hh:32
@ Info
Definition: XrdTpcTPC.hh:29
@ Error
Definition: XrdTpcTPC.hh:31
@ Debug
Definition: XrdTpcTPC.hh:28
@ Warning
Definition: XrdTpcTPC.hh:30
XrdCmsConfig Config
void operator()(CURL *curl)
Definition: XrdTpcTPC.cc:84