13 #define XRD_TRACE m_trace->
22 #if defined(__linux__)
25 unsigned XrdThrottleManager::GetTimerListHash() {
26 int cpu = sched_getcpu();
30 return cpu % m_timer_list_size;
35 unsigned XrdThrottleManager::GetTimerListHash() {
42 XrdThrottleManager::TraceID =
"ThrottleManager";
47 m_interval_length_seconds(1.0),
48 m_bytes_per_second(-1),
50 m_concurrency_limit(-1),
51 m_last_round_allocation(100*1024),
54 m_loadshed_frequency(0)
79 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
82 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
89 TRACE(
DEBUG,
"Initializing the throttle manager.");
91 m_primary_bytes_shares.resize(m_max_users);
92 m_secondary_bytes_shares.resize(m_max_users);
93 m_primary_ops_shares.resize(m_max_users);
94 m_secondary_ops_shares.resize(m_max_users);
95 for (
auto & waiter : m_waiter_info) {
96 waiter.m_manager =
this;
100 for (
int i=0; i<m_max_users; i++)
102 m_primary_bytes_shares[i] = m_last_round_allocation;
103 m_secondary_bytes_shares[i] = 0;
104 m_primary_ops_shares[i] = 10;
105 m_secondary_ops_shares[i] = 0;
110 if ((rc =
XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap,
static_cast<void *
>(
this), 0,
"Buffer Manager throttle")))
111 m_log->
Emsg(
"ThrottleManager", rc,
"create throttle thread");
115 std::tuple<std::string, uint16_t>
121 if (client->
eaAPI && client->
eaAPI->
Get(
"token.subject", user)) {
122 if (client->
vorg) user = std::string(client->
vorg) +
":" + user;
123 }
else if (client->
eaAPI) {
124 std::string request_name;
125 if (client->
eaAPI->
Get(
"request.name", request_name) && !request_name.empty()) user = request_name;
127 if (user.empty()) {user = client->
name ? client->
name :
"nobody";}
128 uint16_t uid = GetUid(user.c_str());
129 return std::make_tuple(user, uid);
137 XrdThrottleManager::GetShares(
int &shares,
int &request)
143 request -= (remaining < request) ? remaining : request;
152 XrdThrottleManager::StealShares(
int uid,
int &reqsize,
int &reqops)
154 if (!reqsize && !reqops)
return;
155 TRACE(BANDWIDTH,
"Stealing shares to fill request of " << reqsize <<
" bytes");
156 TRACE(IOPS,
"Stealing shares to fill request of " << reqops <<
" ops.");
158 for (
int i=uid+1; i % m_max_users == uid; i++)
160 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
161 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
164 TRACE(BANDWIDTH,
"After stealing shares, " << reqsize <<
" of request bytes remain.");
165 TRACE(IOPS,
"After stealing shares, " << reqops <<
" of request ops remain.");
176 if (m_max_open == 0 && m_max_conns == 0)
return true;
178 const std::lock_guard<std::mutex> lock(m_file_mutex);
179 auto iter = m_file_counters.find(entity);
180 unsigned long cur_open_files = 0, cur_open_conns;
182 if (iter == m_file_counters.end()) {
183 m_file_counters[entity] = 1;
184 TRACE(FILES,
"User " << entity <<
" has opened their first file");
186 }
else if (iter->second < m_max_open) {
188 cur_open_files = iter->second;
190 std::stringstream ss;
191 ss <<
"User " << entity <<
" has hit the limit of " << m_max_open <<
" open files";
192 TRACE(FILES, ss.str());
193 error_message = ss.str();
200 auto conn_iter = m_active_conns.find(entity);
201 auto conn_count_iter = m_conn_counters.find(entity);
202 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
203 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
206 if (m_max_open) iter->second--;
207 std::stringstream ss;
208 ss <<
"User " << entity <<
" has hit the limit of " << m_max_conns <<
210 TRACE(CONNS, ss.str());
211 error_message = ss.str();
214 if (conn_iter == m_active_conns.end()) {
215 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
216 new std::unordered_map<pid_t, unsigned long>());
217 (*conn_map)[pid] = 1;
218 m_active_conns[entity] = std::move(conn_map);
219 if (conn_count_iter == m_conn_counters.end()) {
220 m_conn_counters[entity] = 1;
223 m_conn_counters[entity] ++;
224 cur_open_conns = m_conn_counters[entity];
227 auto pid_iter = conn_iter->second->find(pid);
228 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
229 (*(conn_iter->second))[pid] = 1;
230 conn_count_iter->second++;
231 cur_open_conns = conn_count_iter->second;
233 (*(conn_iter->second))[pid] ++;
234 cur_open_conns = conn_count_iter->second;
237 TRACE(CONNS,
"User " << entity <<
" has " << cur_open_conns <<
" open connections");
239 if (m_max_open)
TRACE(FILES,
"User " << entity <<
" has " << cur_open_files <<
" open files");
253 if (m_max_open == 0 && m_max_conns == 0)
return true;
256 const std::lock_guard<std::mutex> lock(m_file_mutex);
258 auto iter = m_file_counters.find(entity);
259 if (iter == m_file_counters.end()) {
260 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin never saw an open file");
262 }
else if (iter->second == 0) {
263 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin thinks all files were already closed");
268 if (result)
TRACE(FILES,
"User " << entity <<
" closed a file; " << iter->second <<
274 auto conn_iter = m_active_conns.find(entity);
275 auto conn_count_iter = m_conn_counters.find(entity);
276 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
277 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
281 auto pid_iter = conn_iter->second->find(pid);
282 if (pid_iter == conn_iter->second->end()) {
283 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
287 if (pid_iter->second == 0) {
288 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on connection the throttle"
289 " plugin thinks was idle");
293 if (conn_count_iter == m_conn_counters.end()) {
294 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file but the throttle plugin never"
295 " observed an open file");
296 }
else if (pid_iter->second == 0) {
297 if (conn_count_iter->second == 0) {
298 TRACE(CONNS,
"WARNING: User " << entity <<
" had a connection go idle but the "
299 " throttle plugin already thought all connections were idle");
301 conn_count_iter->second--;
302 TRACE(CONNS,
"User " << entity <<
" had connection on thread " << pid <<
" go idle; "
303 << conn_count_iter->second <<
" active connections remain");
319 if (m_bytes_per_second < 0)
321 if (m_ops_per_second < 0)
323 while (reqsize || reqops)
327 GetShares(m_primary_bytes_shares[uid], reqsize);
330 TRACE(BANDWIDTH,
"Using secondary shares; request has " << reqsize <<
" bytes left.");
331 GetShares(m_secondary_bytes_shares[uid], reqsize);
332 TRACE(BANDWIDTH,
"Finished with secondary shares; request has " << reqsize <<
" bytes left.");
336 TRACE(BANDWIDTH,
"Filled byte shares out of primary; " << m_primary_bytes_shares[uid] <<
" left.");
338 GetShares(m_primary_ops_shares[uid], reqops);
341 GetShares(m_secondary_ops_shares[uid], reqops);
343 StealShares(uid, reqsize, reqops);
346 if (reqsize || reqops)
348 if (reqsize)
TRACE(BANDWIDTH,
"Sleeping to wait for throttle fairshare.");
349 if (reqops)
TRACE(IOPS,
"Sleeping to wait for throttle fairshare.");
350 m_compute_var.
Wait();
351 m_loadshed_limit_hit++;
358 XrdThrottleManager::UserIOAccounting()
360 std::chrono::steady_clock::duration::rep total_active_time = 0;
361 for (
size_t idx = 0; idx < m_timer_list.size(); idx++) {
362 auto &timerList = m_timer_list[idx];
363 std::unique_lock<std::mutex> lock(timerList.m_mutex);
364 auto timer = timerList.m_first;
366 auto next = timer->m_next;
367 auto uid = timer->m_owner;
368 auto &waiter = m_waiter_info[uid];
369 auto recent_duration = timer->Reset();
370 waiter.m_io_time += recent_duration.count();
372 total_active_time += recent_duration.count();
376 m_io_active_time += total_active_time;
380 XrdThrottleManager::ComputeWaiterOrder()
387 auto now = std::chrono::steady_clock::now();
388 auto elapsed = now - m_last_waiter_recompute_time;
389 m_last_waiter_recompute_time = now;
390 std::chrono::duration<double> elapsed_secs = elapsed;
397 auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
399 std::vector<double> share;
400 share.resize(m_max_users);
401 size_t users_with_waiters = 0;
404 for (
int i = 0; i < m_max_users; i++)
406 auto &waiter = m_waiter_info[i];
407 auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
408 std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
409 std::chrono::duration<double> io_duration_secs = io_duration;
410 auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
411 float new_concurrency = waiter.m_concurrency;
413 new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
414 waiter.m_concurrency = new_concurrency;
415 if (new_concurrency > 0) {
416 TRACE(
DEBUG,
"User " << i <<
" has concurrency of " << new_concurrency);
420 std::lock_guard<std::mutex> lock(waiter.m_mutex);
421 waiting = waiter.m_waiting;
425 share[i] = new_concurrency;
426 TRACE(
DEBUG,
"User " << i <<
" has concurrency of " << share[i] <<
" and is waiting for " << waiting);
432 users_with_waiters++;
439 auto fair_share =
static_cast<double>(m_concurrency_limit) /
static_cast<double>(users_with_waiters);
440 std::vector<uint16_t> waiter_order;
441 waiter_order.resize(m_max_users);
446 double shares_sum = 0;
447 for (
int idx = 0; idx < m_max_users; idx++)
450 shares_sum += fair_share / share[idx];
458 auto scale_factor = (
static_cast<double>(m_max_users) -
static_cast<double>(users_with_waiters)) / shares_sum;
460 for (
int uid = 0; uid < m_max_users; uid++) {
461 if (share[uid] > 0) {
462 auto shares =
static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
463 TRACE(
DEBUG,
"User " << uid <<
" has " << shares <<
" shares");
464 for (
unsigned idx = 0; idx < shares; idx++)
466 waiter_order[offset % m_max_users] = uid;
471 if (offset < m_max_users) {
472 for (
size_t idx = offset; idx < m_max_users; idx++) {
473 waiter_order[idx] = -1;
477 std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
481 auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
482 std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
486 m_wake_order_active = (m_wake_order_active + 1) % 2;
493 if (users_with_waiters) {
494 m_waiting_users = users_with_waiters;
495 auto io_active = m_io_active.load(std::memory_order_acquire);
496 for (
size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
503 XrdThrottleManager::RecomputeBootstrap(
void *instance)
506 manager->Recompute();
511 XrdThrottleManager::Recompute()
518 if (m_max_open || m_max_conns) {
519 const std::lock_guard<std::mutex> lock(m_file_mutex);
520 for (
auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
522 auto & conn_count = *iter;
523 if (!conn_count.second) {
524 iter = m_active_conns.erase(iter);
527 for (
auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
528 if (iter2->second == 0) {
529 iter2 = conn_count.second->erase(iter2);
534 if (!conn_count.second->size()) {
535 iter = m_active_conns.erase(iter);
540 for (
auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
542 iter = m_conn_counters.erase(iter);
547 for (
auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
549 iter = m_file_counters.erase(iter);
556 TRACE(
DEBUG,
"Recomputing fairshares for throttle.");
558 ComputeWaiterOrder();
559 TRACE(
DEBUG,
"Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds <<
" seconds.");
582 XrdThrottleManager::RecomputeInternal()
585 float intervals_per_second = 1.0/m_interval_length_seconds;
586 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
587 float total_ops_shares = m_ops_per_second / intervals_per_second;
592 float active_users = 0;
594 for (
int i=0; i<m_max_users; i++)
596 int primary =
AtomicFAZ(m_primary_bytes_shares[i]);
597 if (primary != m_last_round_allocation)
601 m_secondary_bytes_shares[i] = primary;
602 primary =
AtomicFAZ(m_primary_ops_shares[i]);
604 m_secondary_ops_shares[i] = primary;
605 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
609 if (active_users == 0)
617 m_last_round_allocation =
static_cast<int>(total_bytes_shares / active_users);
618 int ops_shares =
static_cast<int>(total_ops_shares / active_users);
619 TRACE(BANDWIDTH,
"Round byte allocation " << m_last_round_allocation <<
" ; last round used " << bytes_used <<
".");
620 TRACE(IOPS,
"Round ops allocation " << ops_shares);
621 for (
int i=0; i<m_max_users; i++)
623 m_primary_bytes_shares[i] = m_last_round_allocation;
624 m_primary_ops_shares[i] = ops_shares;
630 int limit_hit = m_loadshed_limit_hit.
exchange(0);
631 TRACE(
DEBUG,
"Throttle limit hit " << limit_hit <<
" times during last interval.");
634 m_compute_var.
Lock();
635 m_stable_io_active = m_io_active.load(std::memory_order_acquire);
636 auto io_active = m_stable_io_active;
637 m_stable_io_total = m_io_total;
638 auto io_total = m_stable_io_total;
639 auto io_wait_rep = m_io_active_time.
exchange(std::chrono::steady_clock::duration(0).count());
640 m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
644 auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
645 TRACE(IOLOAD,
"Current IO counter is " << io_active <<
"; total IO active time is " << io_wait_ms <<
"ms.");
649 auto len = snprintf(buf, 128,
650 R
"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
651 static_cast<double>(io_wait_ms) / 1000.0, io_active,
static_cast<long long unsigned>(io_total));
652 auto suc = (len < 128) ? m_gstream->
Insert(buf, len + 1) :
false;
655 TRACE(IOLOAD,
"Failed g-stream insertion of throttle_update record (len=" << len <<
"): " << buf);
665 XrdThrottleManager::GetUid(
const std::string &username)
667 std::hash<std::string> hash_fn;
668 auto hash = hash_fn(username);
669 auto uid =
static_cast<uint16_t
>(hash % m_max_users);
670 TRACE(
DEBUG,
"Mapping user " << username <<
" to UID " << uid);
678 XrdThrottleManager::NotifyOne()
680 auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
682 for (
size_t idx = 0; idx < m_max_users; ++idx)
684 auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
685 int16_t uid = wake_order[offset % m_max_users];
690 auto &waiter_info = m_waiter_info[uid];
691 std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
692 if (waiter_info.m_waiting) {
693 waiter_info.NotifyOne(std::move(lock));
705 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
708 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
713 if (m_waiter_info[uid].m_concurrency < 1)
717 m_loadshed_limit_hit++;
718 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
719 TRACE(
DEBUG,
"ThrottleManager (user=" << uid <<
"): IO concurrency limit hit; waiting for other IOs to finish.");
720 ok = m_waiter_info[uid].Wait();
722 TRACE(
DEBUG,
"ThrottleManager (user=" << uid <<
"): timed out waiting for other IOs to finish.");
725 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
738 m_io_active_time += event_duration.count();
739 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
740 m_waiter_info[uid].m_io_time += event_duration.count();
741 if (old_active ==
static_cast<unsigned>(m_concurrency_limit))
746 unsigned waiting_users = m_waiting_users;
747 if (waiting_users == 0) waiting_users = 1;
748 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
750 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
751 if (m_waiter_info[uid].m_waiting > 0)
753 m_waiter_info[uid].NotifyOne(std::move(lock));
771 if (m_loadshed_port == 0)
775 if (m_loadshed_limit_hit == 0)
779 if (
static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
793 if (m_loadshed_port == 0)
797 if (opaque && opaque[0])
801 if (env.
Get(
"throttle.shed") != 0)
806 lsOpaque +=
"&throttle.shed=1";
810 lsOpaque =
"throttle.shed=1";
817 host = m_loadshed_host;
820 port = m_loadshed_port;
824 XrdThrottleManager::Waiter::Wait()
826 auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
828 std::unique_lock<std::mutex> lock(m_mutex);
830 m_cv.wait_until(lock, timeout,
831 [&] {
return m_manager->m_io_active.load(std::memory_order_acquire) <
static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
834 if (std::chrono::steady_clock::now() > timeout) {
#define AtomicFSub(w, x, y)
char * Get(const char *varname)
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetMaxWait() const
long long GetMaxConn() const
long long GetThrottleConcurrency() const
long long GetMaxOpen() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
int GetTraceLevels() const
bool Insert(const char *data, int dlen)