13#define XRD_TRACE m_trace->
25unsigned XrdThrottleManager::GetTimerListHash() {
26 int cpu = sched_getcpu();
30 return cpu % m_timer_list_size;
35unsigned XrdThrottleManager::GetTimerListHash() {
42XrdThrottleManager::TraceID =
"ThrottleManager";
47 m_interval_length_seconds(1.0),
48 m_bytes_per_second(-1),
50 m_concurrency_limit(-1),
51 m_last_round_allocation(100*1024),
54 m_loadshed_frequency(0)
79 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
82 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
89 TRACE(
DEBUG,
"Initializing the throttle manager.");
91 m_primary_bytes_shares.resize(m_max_users);
92 m_secondary_bytes_shares.resize(m_max_users);
93 m_primary_ops_shares.resize(m_max_users);
94 m_secondary_ops_shares.resize(m_max_users);
95 for (
auto & waiter : m_waiter_info) {
96 waiter.m_manager =
this;
100 for (
int i=0; i<m_max_users; i++)
102 m_primary_bytes_shares[i] = m_last_round_allocation;
103 m_secondary_bytes_shares[i] = 0;
104 m_primary_ops_shares[i] = 10;
105 m_secondary_ops_shares[i] = 0;
110 if ((rc =
XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap,
static_cast<void *
>(
this), 0,
"Buffer Manager throttle")))
111 m_log->Emsg(
"ThrottleManager", rc,
"create throttle thread");
115std::tuple<std::string, uint16_t>
119 return std::make_tuple(
"nobody", GetUid(
"nobody"));
126 if (client->
eaAPI && client->
eaAPI->
Get(
"token.subject", user)) {
127 if (client->
vorg) user = std::string(client->
vorg) +
":" + user;
128 }
else if (client->
eaAPI) {
129 std::string request_name;
130 if (client->
eaAPI->
Get(
"request.name", request_name) && !request_name.empty()) user = request_name;
132 if (user.empty()) {user = client->
name ? client->
name :
"nobody";}
133 uint16_t uid = GetUid(user.c_str());
134 return std::make_tuple(user, uid);
142XrdThrottleManager::GetShares(
int &shares,
int &request)
148 request -= (remaining < request) ? remaining : request;
157XrdThrottleManager::StealShares(
int uid,
int &reqsize,
int &reqops)
159 if (!reqsize && !reqops)
return;
160 TRACE(BANDWIDTH,
"Stealing shares to fill request of " << reqsize <<
" bytes");
161 TRACE(IOPS,
"Stealing shares to fill request of " << reqops <<
" ops.");
163 for (
int i=uid+1; i % m_max_users == uid; i++)
165 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
166 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
169 TRACE(BANDWIDTH,
"After stealing shares, " << reqsize <<
" of request bytes remain.");
170 TRACE(IOPS,
"After stealing shares, " << reqops <<
" of request ops remain.");
181 if (m_max_open == 0 && m_max_conns == 0)
return true;
183 const std::lock_guard<std::mutex> lock(m_file_mutex);
184 auto iter = m_file_counters.find(entity);
185 unsigned long cur_open_files = 0, cur_open_conns;
187 if (iter == m_file_counters.end()) {
188 m_file_counters[entity] = 1;
189 TRACE(FILES,
"User " << entity <<
" has opened their first file");
191 }
else if (iter->second < m_max_open) {
193 cur_open_files = iter->second;
195 std::stringstream ss;
196 ss <<
"User " << entity <<
" has hit the limit of " << m_max_open <<
" open files";
197 TRACE(FILES, ss.str());
198 error_message = ss.str();
205 auto conn_iter = m_active_conns.find(entity);
206 auto conn_count_iter = m_conn_counters.find(entity);
207 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
208 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
211 if (m_max_open) iter->second--;
212 std::stringstream ss;
213 ss <<
"User " << entity <<
" has hit the limit of " << m_max_conns <<
215 TRACE(CONNS, ss.str());
216 error_message = ss.str();
219 if (conn_iter == m_active_conns.end()) {
220 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
221 new std::unordered_map<pid_t, unsigned long>());
222 (*conn_map)[pid] = 1;
223 m_active_conns[entity] = std::move(conn_map);
224 if (conn_count_iter == m_conn_counters.end()) {
225 m_conn_counters[entity] = 1;
228 m_conn_counters[entity] ++;
229 cur_open_conns = m_conn_counters[entity];
232 auto pid_iter = conn_iter->second->find(pid);
233 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
234 (*(conn_iter->second))[pid] = 1;
235 conn_count_iter->second++;
236 cur_open_conns = conn_count_iter->second;
238 (*(conn_iter->second))[pid] ++;
239 cur_open_conns = conn_count_iter->second;
242 TRACE(CONNS,
"User " << entity <<
" has " << cur_open_conns <<
" open connections");
244 if (m_max_open)
TRACE(FILES,
"User " << entity <<
" has " << cur_open_files <<
" open files");
258 if (m_max_open == 0 && m_max_conns == 0)
return true;
261 const std::lock_guard<std::mutex> lock(m_file_mutex);
263 auto iter = m_file_counters.find(entity);
264 if (iter == m_file_counters.end()) {
265 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin never saw an open file");
267 }
else if (iter->second == 0) {
268 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin thinks all files were already closed");
273 if (result)
TRACE(FILES,
"User " << entity <<
" closed a file; " << iter->second <<
279 auto conn_iter = m_active_conns.find(entity);
280 auto conn_count_iter = m_conn_counters.find(entity);
281 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
282 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
286 auto pid_iter = conn_iter->second->find(pid);
287 if (pid_iter == conn_iter->second->end()) {
288 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
292 if (pid_iter->second == 0) {
293 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on connection the throttle"
294 " plugin thinks was idle");
298 if (conn_count_iter == m_conn_counters.end()) {
299 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file but the throttle plugin never"
300 " observed an open file");
301 }
else if (pid_iter->second == 0) {
302 if (conn_count_iter->second == 0) {
303 TRACE(CONNS,
"WARNING: User " << entity <<
" had a connection go idle but the "
304 " throttle plugin already thought all connections were idle");
306 conn_count_iter->second--;
307 TRACE(CONNS,
"User " << entity <<
" had connection on thread " << pid <<
" go idle; "
308 << conn_count_iter->second <<
" active connections remain");
324 if (m_bytes_per_second < 0)
326 if (m_ops_per_second < 0)
328 while (reqsize || reqops)
332 GetShares(m_primary_bytes_shares[uid], reqsize);
335 TRACE(BANDWIDTH,
"Using secondary shares; request has " << reqsize <<
" bytes left.");
336 GetShares(m_secondary_bytes_shares[uid], reqsize);
337 TRACE(BANDWIDTH,
"Finished with secondary shares; request has " << reqsize <<
" bytes left.");
341 TRACE(BANDWIDTH,
"Filled byte shares out of primary; " << m_primary_bytes_shares[uid] <<
" left.");
343 GetShares(m_primary_ops_shares[uid], reqops);
346 GetShares(m_secondary_ops_shares[uid], reqops);
348 StealShares(uid, reqsize, reqops);
351 if (reqsize || reqops)
353 if (reqsize)
TRACE(BANDWIDTH,
"Sleeping to wait for throttle fairshare.");
354 if (reqops)
TRACE(IOPS,
"Sleeping to wait for throttle fairshare.");
355 m_compute_var.Wait();
356 m_loadshed_limit_hit++;
363XrdThrottleManager::UserIOAccounting()
365 std::chrono::steady_clock::duration::rep total_active_time = 0;
366 for (
size_t idx = 0; idx < m_timer_list.size(); idx++) {
367 auto &timerList = m_timer_list[idx];
368 std::unique_lock<std::mutex> lock(timerList.m_mutex);
369 auto timer = timerList.m_first;
371 auto next = timer->m_next;
372 auto uid = timer->m_owner;
373 auto &waiter = m_waiter_info[uid];
374 auto recent_duration = timer->Reset();
375 waiter.m_io_time += recent_duration.count();
377 total_active_time += recent_duration.count();
381 m_io_active_time += total_active_time;
385XrdThrottleManager::ComputeWaiterOrder()
392 auto now = std::chrono::steady_clock::now();
393 auto elapsed = now - m_last_waiter_recompute_time;
394 m_last_waiter_recompute_time = now;
395 std::chrono::duration<double> elapsed_secs = elapsed;
402 auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
404 std::vector<double> share;
405 share.resize(m_max_users);
406 size_t users_with_waiters = 0;
409 for (
int i = 0; i < m_max_users; i++)
411 auto &waiter = m_waiter_info[i];
412 auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
413 std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
414 std::chrono::duration<double> io_duration_secs = io_duration;
415 auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
416 float new_concurrency = waiter.m_concurrency;
418 new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
419 waiter.m_concurrency = new_concurrency;
420 if (new_concurrency > 0) {
421 TRACE(
DEBUG,
"User " << i <<
" has concurrency of " << new_concurrency);
425 std::lock_guard<std::mutex> lock(waiter.m_mutex);
426 waiting = waiter.m_waiting;
430 share[i] = new_concurrency;
431 TRACE(
DEBUG,
"User " << i <<
" has concurrency of " << share[i] <<
" and is waiting for " << waiting);
437 users_with_waiters++;
444 auto fair_share =
static_cast<double>(m_concurrency_limit) /
static_cast<double>(users_with_waiters);
445 std::vector<uint16_t> waiter_order;
446 waiter_order.resize(m_max_users);
451 double shares_sum = 0;
452 for (
int idx = 0; idx < m_max_users; idx++)
455 shares_sum += fair_share / share[idx];
463 auto scale_factor = (
static_cast<double>(m_max_users) -
static_cast<double>(users_with_waiters)) / shares_sum;
465 for (
int uid = 0; uid < m_max_users; uid++) {
466 if (share[uid] > 0) {
467 auto shares =
static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
468 TRACE(
DEBUG,
"User " << uid <<
" has " << shares <<
" shares");
469 for (
unsigned idx = 0; idx < shares; idx++)
471 waiter_order[offset % m_max_users] = uid;
476 if (offset < m_max_users) {
477 for (
size_t idx = offset; idx < m_max_users; idx++) {
478 waiter_order[idx] = -1;
482 std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
486 auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
487 std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
491 m_wake_order_active = (m_wake_order_active + 1) % 2;
498 if (users_with_waiters) {
499 m_waiting_users = users_with_waiters;
500 auto io_active = m_io_active.load(std::memory_order_acquire);
501 for (
size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
508XrdThrottleManager::RecomputeBootstrap(
void *instance)
511 manager->Recompute();
516XrdThrottleManager::Recompute()
523 if (m_max_open || m_max_conns) {
524 const std::lock_guard<std::mutex> lock(m_file_mutex);
525 for (
auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
527 auto & conn_count = *iter;
528 if (!conn_count.second) {
529 iter = m_active_conns.erase(iter);
532 for (
auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
533 if (iter2->second == 0) {
534 iter2 = conn_count.second->erase(iter2);
539 if (!conn_count.second->size()) {
540 iter = m_active_conns.erase(iter);
545 for (
auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
547 iter = m_conn_counters.erase(iter);
552 for (
auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
554 iter = m_file_counters.erase(iter);
561 TRACE(
DEBUG,
"Recomputing fairshares for throttle.");
563 ComputeWaiterOrder();
564 TRACE(
DEBUG,
"Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds <<
" seconds.");
587XrdThrottleManager::RecomputeInternal()
590 float intervals_per_second = 1.0/m_interval_length_seconds;
591 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
592 float total_ops_shares = m_ops_per_second / intervals_per_second;
597 float active_users = 0;
599 for (
int i=0; i<m_max_users; i++)
601 int primary =
AtomicFAZ(m_primary_bytes_shares[i]);
602 if (primary != m_last_round_allocation)
606 m_secondary_bytes_shares[i] = primary;
607 primary =
AtomicFAZ(m_primary_ops_shares[i]);
609 m_secondary_ops_shares[i] = primary;
610 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
614 if (active_users == 0)
622 m_last_round_allocation =
static_cast<int>(total_bytes_shares / active_users);
623 int ops_shares =
static_cast<int>(total_ops_shares / active_users);
624 TRACE(BANDWIDTH,
"Round byte allocation " << m_last_round_allocation <<
" ; last round used " << bytes_used <<
".");
625 TRACE(IOPS,
"Round ops allocation " << ops_shares);
626 for (
int i=0; i<m_max_users; i++)
628 m_primary_bytes_shares[i] = m_last_round_allocation;
629 m_primary_ops_shares[i] = ops_shares;
635 int limit_hit = m_loadshed_limit_hit.exchange(0);
636 TRACE(
DEBUG,
"Throttle limit hit " << limit_hit <<
" times during last interval.");
639 m_compute_var.Lock();
640 m_stable_io_active = m_io_active.load(std::memory_order_acquire);
641 auto io_active = m_stable_io_active;
642 m_stable_io_total = m_io_total;
643 auto io_total = m_stable_io_total;
644 auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
645 m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
647 m_compute_var.UnLock();
649 auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
650 TRACE(IOLOAD,
"Current IO counter is " << io_active <<
"; total IO active time is " << io_wait_ms <<
"ms.");
654 auto len = snprintf(buf, 128,
655 R
"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
656 static_cast<double>(io_wait_ms) / 1000.0, io_active,
static_cast<long long unsigned>(io_total));
657 auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) :
false;
660 TRACE(IOLOAD,
"Failed g-stream insertion of throttle_update record (len=" << len <<
"): " << buf);
663 m_compute_var.Broadcast();
670XrdThrottleManager::GetUid(
const std::string &username)
672 std::hash<std::string> hash_fn;
673 auto hash = hash_fn(username);
674 auto uid =
static_cast<uint16_t
>(hash % m_max_users);
675 TRACE(
DEBUG,
"Mapping user " << username <<
" to UID " << uid);
683XrdThrottleManager::NotifyOne()
685 auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
687 for (
size_t idx = 0; idx < m_max_users; ++idx)
689 auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
690 int16_t uid = wake_order[offset % m_max_users];
695 auto &waiter_info = m_waiter_info[uid];
696 std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
697 if (waiter_info.m_waiting) {
698 waiter_info.NotifyOne(std::move(lock));
710 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
713 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
718 if (m_waiter_info[uid].m_concurrency < 1)
722 m_loadshed_limit_hit++;
723 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
724 TRACE(
DEBUG,
"ThrottleManager (user=" << uid <<
"): IO concurrency limit hit; waiting for other IOs to finish.");
725 ok = m_waiter_info[uid].Wait();
727 TRACE(
DEBUG,
"ThrottleManager (user=" << uid <<
"): timed out waiting for other IOs to finish.");
730 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
743 m_io_active_time += event_duration.count();
744 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
745 m_waiter_info[uid].m_io_time += event_duration.count();
746 if (old_active ==
static_cast<unsigned>(m_concurrency_limit))
751 unsigned waiting_users = m_waiting_users;
752 if (waiting_users == 0) waiting_users = 1;
753 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
755 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
756 if (m_waiter_info[uid].m_waiting > 0)
758 m_waiter_info[uid].NotifyOne(std::move(lock));
776 if (m_loadshed_port == 0)
780 if (m_loadshed_limit_hit == 0)
784 if (
static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
798 if (m_loadshed_port == 0)
802 if (opaque && opaque[0])
806 if (env.
Get(
"throttle.shed") != 0)
811 lsOpaque +=
"&throttle.shed=1";
815 lsOpaque =
"throttle.shed=1";
822 host = m_loadshed_host;
825 port = m_loadshed_port;
829XrdThrottleManager::Waiter::Wait()
831 auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
833 std::unique_lock<std::mutex> lock(m_mutex);
835 m_cv.wait_until(lock, timeout,
836 [&] {
return m_manager->m_io_active.load(std::memory_order_acquire) <
static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
839 if (std::chrono::steady_clock::now() > timeout) {
#define AtomicFSub(w, x, y)
char * Get(const char *varname)
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetMaxWait() const
long long GetMaxConn() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetMaxOpen() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
int GetTraceLevels() const