unsigned long ra_max_read_ahead_whole_pages;
struct workqueue_struct *ll_readahead_wq;
/*
- * Max number of active works for readahead workqueue,
- * default is 0 which make workqueue init number itself,
- * unless there is a specific need for throttling the
- * number of active work items, specifying '0' is recommended.
+ * Max number of active works could be triggered
+ * for async readahead.
*/
unsigned int ra_async_max_active;
+ /* how many async readahead triggered in flight */
+ atomic_t ra_async_inflight;
/* Threshold to control when to trigger async readahead */
unsigned long ra_async_pages_per_file_threshold;
};
#define log2(n) ffz(~(n))
#endif
+/**
+ * If there is only one number of core visible to Lustre,
+ * async readahead will be disabled, to avoid massive over
+ * subscription, we use 1/2 of active cores as default max
+ * async readahead requests.
+ */
+static inline unsigned int ll_get_ra_async_max_active(void)
+{
+ return cfs_cpt_weight(cfs_cpt_tab, CFS_CPT_ANY) >> 1;
+}
+
static struct ll_sb_info *ll_init_sbi(void)
{
struct ll_sb_info *sbi = NULL;
sbi->ll_ra_info.ra_max_pages_per_file;
sbi->ll_ra_info.ra_max_pages = sbi->ll_ra_info.ra_max_pages_per_file;
sbi->ll_ra_info.ra_max_read_ahead_whole_pages = -1;
+ sbi->ll_ra_info.ra_async_max_active = ll_get_ra_async_max_active();
+ atomic_set(&sbi->ll_ra_info.ra_async_inflight, 0);
sbi->ll_flags |= LL_SBI_VERBOSE;
#ifdef ENABLE_CHECKSUM
if (rc)
return rc;
- if (val < 1 || val > WQ_UNBOUND_MAX_ACTIVE) {
- CERROR("%s: cannot set max_read_ahead_async_active=%u %s than %u\n",
- sbi->ll_fsname, val,
- val < 1 ? "smaller" : "larger",
- val < 1 ? 1 : WQ_UNBOUND_MAX_ACTIVE);
+ /**
+ * It doesn't make any sense to make it exceed what
+ * workqueue could acutally support.
+ */
+ if (val > WQ_UNBOUND_MAX_ACTIVE) {
+ CERROR("%s: cannot set max_read_ahead_async_active=%u larger than %u\n",
+ sbi->ll_fsname, val, WQ_UNBOUND_MAX_ACTIVE);
return -ERANGE;
}
spin_lock(&sbi->ll_lock);
sbi->ll_ra_info.ra_async_max_active = val;
spin_unlock(&sbi->ll_lock);
- workqueue_set_max_active(sbi->ll_ra_info.ll_readahead_wq, val);
return count;
}
__u64 kms;
int rc;
pgoff_t eof_index;
+ struct ll_sb_info *sbi;
work = container_of(wq, struct ll_readahead_work,
lrw_readahead_work);
ras = &fd->fd_ras;
file = work->lrw_file;
inode = file_inode(file);
+ sbi = ll_i2sbi(inode);
env = cl_env_alloc(&refcheck, LCT_NOREF);
if (IS_ERR(env))
ria->ria_end_idx = work->lrw_end_idx;
pages = ria->ria_end_idx - ria->ria_start_idx + 1;
- ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria,
+ ria->ria_reserved = ll_ra_count_get(sbi, ria,
ria_page_count(ria), pages_min);
CDEBUG(D_READA,
out_free_work:
if (ra_end_idx > 0)
ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC);
+ atomic_dec(&sbi->ll_ra_info.ra_async_inflight);
ll_readahead_work_free(work);
}
* we do async readahead, allowing the user thread to do fast i/o.
*/
if (stride_io_mode(ras) || !throttle ||
- ras->ras_window_pages < throttle)
+ ras->ras_window_pages < throttle ||
+ atomic_read(&ra->ra_async_inflight) > ra->ra_async_max_active)
return 0;
if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages)
/* ll_readahead_work_free() free it */
OBD_ALLOC_PTR(lrw);
if (lrw) {
+ atomic_inc(&sbi->ll_ra_info.ra_async_inflight);
lrw->lrw_file = get_file(file);
lrw->lrw_start_idx = start_idx;
lrw->lrw_end_idx = end_idx;
llite.*.max_read_ahead_async_active 2>/dev/null)
[ $max_active -ne 256 ] && error "expected 256 but got $max_active"
- # currently reset to 0 is unsupported, leave it 512 for now.
- $LCTL set_param llite.*.max_read_ahead_async_active=0 &&
- error "set max_read_ahead_async_active should fail"
+ $LCTL set_param llite.*.max_read_ahead_async_active=0 ||
+ error "set max_read_ahead_async_active should succeed"
$LCTL set_param llite.*.max_read_ahead_async_active=512
max_active=$($LCTL get_param -n \