xrootd
XrdThrottleManager.hh
Go to the documentation of this file.
1 
2 /*
3  * XrdThrottleManager
4  *
5  * This class provides an implementation of a throttle manager.
6  * The throttled manager purposely pause if the bandwidth, IOPS
7  * rate, or number of outstanding IO requests is sustained above
8  * a certain level.
9  *
10  * The XrdThrottleManager is user-aware and provides fairshare.
11  *
12  * This works by having a separate thread periodically refilling
13  * each user's shares.
14  *
15  * Note that we do not actually keep close track of users, but rather
16  * put them into a hash. This way, we can pretend there's a constant
17  * number of users and use a lock-free algorithm.
18  */
19 
20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
22 
23 #ifdef __GNUC__
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
26 #else
27 #define likely(x) x
28 #define unlikely(x) x
29 #endif
30 
31 #include <string>
32 #include <vector>
33 #include <ctime>
34 #include <mutex>
35 #include <unordered_map>
36 #include <memory>
37 
38 #include "XrdSys/XrdSysPthread.hh"
39 
40 class XrdSysError;
41 class XrdOucTrace;
42 class XrdThrottleTimer;
43 
45 {
46 
47 friend class XrdThrottleTimer;
48 
49 public:
50 
51 void Init();
52 
53 bool OpenFile(const std::string &entity, std::string &open_error_message);
54 bool CloseFile(const std::string &entity);
55 
56 void Apply(int reqsize, int reqops, int uid);
57 
58 bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
59 
60 void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
61  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
62  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
63 
64 void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
65  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
66 
67 void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
68 
69 void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
70 
71 //int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
72 
73 static
74 int GetUid(const char *username);
75 
77 
78 void PrepLoadShed(const char *opaque, std::string &lsOpaque);
79 
80 bool CheckLoadShed(const std::string &opaque);
81 
82 void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
83 
85 
86  ~XrdThrottleManager() {} // The buffmanager is never deleted
87 
88 protected:
89 
90 void StopIOTimer(struct timespec);
91 
92 private:
93 
94 void Recompute();
95 
96 void RecomputeInternal();
97 
98 static
99 void * RecomputeBootstrap(void *pp);
100 
101 int WaitForShares();
102 
103 void GetShares(int &shares, int &request);
104 
105 void StealShares(int uid, int &reqsize, int &reqops);
106 
109 
111 
112 // Controls for the various rates.
117 
118 // Maintain the shares
119 static const
121 std::vector<int> m_primary_bytes_shares;
122 std::vector<int> m_secondary_bytes_shares;
123 std::vector<int> m_primary_ops_shares;
124 std::vector<int> m_secondary_ops_shares;
126 
127 // Active IO counter
129 struct timespec m_io_wait;
130 // Stable IO counters - must hold m_compute_var lock when reading/writing;
132 struct timespec m_stable_io_wait;
133 
134 // Load shed details
135 std::string m_loadshed_host;
139 
140 // Maximum number of open files
141 unsigned long m_max_open{0};
142 unsigned long m_max_conns{0};
143 std::unordered_map<std::string, unsigned long> m_file_counters;
144 std::unordered_map<std::string, unsigned long> m_conn_counters;
145 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
146 std::mutex m_file_mutex;
147 
148 static const char *TraceID;
149 
150 };
151 
153 {
154 
155 friend class XrdThrottleManager;
156 
157 public:
158 
159 void StopTimer()
160 {
161  struct timespec end_timer = {0, 0};
162 #if defined(__linux__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
163  int retval = clock_gettime(clock_id, &end_timer);
164 #else
165  int retval = -1;
166 #endif
167  if (likely(retval == 0))
168  {
169  end_timer.tv_sec -= m_timer.tv_sec;
170  end_timer.tv_nsec -= m_timer.tv_nsec;
171  if (end_timer.tv_nsec < 0)
172  {
173  end_timer.tv_sec--;
174  end_timer.tv_nsec += 1000000000;
175  }
176  }
177  if (m_timer.tv_nsec != -1)
178  {
179  m_manager.StopIOTimer(end_timer);
180  }
181  m_timer.tv_sec = 0;
182  m_timer.tv_nsec = -1;
183 }
184 
186 {
187  if (!((m_timer.tv_sec == 0) && (m_timer.tv_nsec == -1)))
188  {
189  StopTimer();
190  }
191 }
192 
193 protected:
194 
196  m_manager(manager)
197 {
198 #if defined(__linux__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
199  int retval = clock_gettime(clock_id, &m_timer);
200 #else
201  int retval = -1;
202 #endif
203  if (unlikely(retval == -1))
204  {
205  m_timer.tv_sec = 0;
206  m_timer.tv_nsec = 0;
207  }
208 }
209 
210 private:
212 struct timespec m_timer;
213 
214 static int clock_id;
215 };
216 
217 #endif
unsigned long m_max_conns
Definition: XrdThrottleManager.hh:142
~XrdThrottleManager()
Definition: XrdThrottleManager.hh:86
static int clock_id
Definition: XrdThrottleManager.hh:214
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
Definition: XrdThrottleManager.hh:44
static const int m_max_users
Definition: XrdThrottleManager.hh:120
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
static void * RecomputeBootstrap(void *pp)
void SetMaxOpen(unsigned long max_open)
Definition: XrdThrottleManager.hh:67
float m_ops_per_second
Definition: XrdThrottleManager.hh:115
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
float m_interval_length_seconds
Definition: XrdThrottleManager.hh:113
void StopTimer()
Definition: XrdThrottleManager.hh:159
unsigned long m_max_open
Definition: XrdThrottleManager.hh:141
std::vector< int > m_secondary_bytes_shares
Definition: XrdThrottleManager.hh:122
bool CloseFile(const std::string &entity)
Definition: XrdOucTrace.hh:35
Definition: XrdSysError.hh:89
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
Definition: XrdThrottleManager.hh:64
XrdThrottleTimer(XrdThrottleManager &manager)
Definition: XrdThrottleManager.hh:195
Definition: XrdThrottleManager.hh:152
static int GetUid(const char *username)
std::unordered_map< std::string, unsigned long > m_file_counters
Definition: XrdThrottleManager.hh:143
void GetShares(int &shares, int &request)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleManager & m_manager
Definition: XrdThrottleManager.hh:211
XrdOucTrace * m_trace
Definition: XrdThrottleManager.hh:107
struct timespec m_stable_io_wait
Definition: XrdThrottleManager.hh:132
XrdSysCondVar m_compute_var
Definition: XrdThrottleManager.hh:110
Definition: XrdSysPthread.hh:78
std::vector< int > m_primary_bytes_shares
Definition: XrdThrottleManager.hh:121
bool IsThrottling()
Definition: XrdThrottleManager.hh:58
void SetMaxConns(unsigned long max_conns)
Definition: XrdThrottleManager.hh:69
struct timespec m_timer
Definition: XrdThrottleManager.hh:212
std::string m_loadshed_host
Definition: XrdThrottleManager.hh:135
struct timespec m_io_wait
Definition: XrdThrottleManager.hh:129
std::mutex m_file_mutex
Definition: XrdThrottleManager.hh:146
unsigned m_loadshed_port
Definition: XrdThrottleManager.hh:136
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
Definition: XrdThrottleManager.hh:60
std::vector< int > m_primary_ops_shares
Definition: XrdThrottleManager.hh:123
void StealShares(int uid, int &reqsize, int &reqops)
~XrdThrottleTimer()
Definition: XrdThrottleManager.hh:185
std::unordered_map< std::string, std::unique_ptr< std::unordered_map< pid_t, unsigned long > > > m_active_conns
Definition: XrdThrottleManager.hh:145
int m_last_round_allocation
Definition: XrdThrottleManager.hh:125
int m_concurrency_limit
Definition: XrdThrottleManager.hh:116
unsigned m_loadshed_frequency
Definition: XrdThrottleManager.hh:137
std::unordered_map< std::string, unsigned long > m_conn_counters
Definition: XrdThrottleManager.hh:144
int m_loadshed_limit_hit
Definition: XrdThrottleManager.hh:138
float m_bytes_per_second
Definition: XrdThrottleManager.hh:114
int m_io_counter
Definition: XrdThrottleManager.hh:128
#define likely(x)
Definition: XrdThrottleManager.hh:27
#define unlikely(x)
Definition: XrdThrottleManager.hh:28
int m_stable_io_counter
Definition: XrdThrottleManager.hh:131
std::vector< int > m_secondary_ops_shares
Definition: XrdThrottleManager.hh:124
static const char * TraceID
Definition: XrdThrottleManager.hh:148
void Apply(int reqsize, int reqops, int uid)
XrdSysError * m_log
Definition: XrdThrottleManager.hh:108
void StopIOTimer(struct timespec)
XrdThrottleTimer StartIOTimer()