This way we can drop it async.
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: Id67c41b0f56201183c07bbbc0369ceb32fe3bbb3
Reviewed-on: http://review.whamcloud.com/7891
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
cfs_atomic_t cl_resends; /* resend count */
- /* ptlrpc work for writeback in ptlrpcd context */
- void *cl_writeback_work;
+ /* ptlrpc work for writeback in ptlrpcd context */
+ void *cl_writeback_work;
+ void *cl_lru_work;
/* hash tables for osc_quota_info */
- cfs_hash_t *cl_quota_hash[MAXQUOTAS];
+ cfs_hash_t *cl_quota_hash[MAXQUOTAS];
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
struct super_block *sb = data;
struct ll_sb_info *sbi = ll_s2sbi(sb);
struct cl_client_cache *cache = &sbi->ll_cache;
+ struct lu_env *env;
+ int refcheck;
int mult, rc, pages_number;
int diff = 0;
int nrpages = 0;
GOTO(out, rc = 0);
}
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(rc);
+
diff = -diff;
while (diff > 0) {
int tmp;
/* difficult - have to ask OSCs to drop LRU slots. */
tmp = diff << 1;
- rc = obd_set_info_async(NULL, sbi->ll_dt_exp,
+ rc = obd_set_info_async(env, sbi->ll_dt_exp,
sizeof(KEY_CACHE_LRU_SHRINK),
KEY_CACHE_LRU_SHRINK,
sizeof(tmp), &tmp, NULL);
if (rc < 0)
break;
}
+ cl_env_put(env, &refcheck);
out:
if (rc >= 0) {
return -ERANGE;
rc = cfs_atomic_read(&cli->cl_lru_in_list) - pages_number;
- if (rc > 0)
- (void)osc_lru_shrink(cli, rc, true);
+ if (rc > 0) {
+ struct lu_env *env;
+ int refcheck;
+
+ env = cl_env_get(&refcheck);
+ if (!IS_ERR(env)) {
+ (void)osc_lru_shrink(env, cli, rc, true);
+ cl_env_put(env, &refcheck);
+ }
+ }
return count;
}
pgoff_t start, pgoff_t end);
void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
struct osc_object *osc, pdl_policy_t pol);
+int lru_queue_work(const struct lu_env *env, void *data);
void osc_object_set_contended (struct osc_object *obj);
void osc_object_clear_contended(struct osc_object *obj);
int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
cfs_list_t *ext_list, int cmd, pdl_policy_t p);
-int osc_lru_shrink(struct client_obd *cli, int target, bool force);
+int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
+ int target, bool force);
int osc_lru_reclaim(struct client_obd *cli);
extern spinlock_t osc_ast_guard;
return 0;
}
+int lru_queue_work(const struct lu_env *env, void *data)
+{
+ struct client_obd *cli = data;
+
+ CDEBUG(D_CACHE, "Run LRU work for client obd %p.\n", cli);
+
+ if (osc_cache_too_much(cli))
+ osc_lru_shrink(env, cli, lru_shrink_max, true);
+
+ RETURN(0);
+}
+
void osc_lru_add_batch(struct client_obd *cli, cfs_list_t *plist)
{
CFS_LIST_HEAD(lru);
client_obd_list_unlock(&cli->cl_lru_list_lock);
/* XXX: May set force to be true for better performance */
- osc_lru_shrink(cli, osc_cache_too_much(cli), false);
+ if (osc_cache_too_much(cli))
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
}
}
* this osc occupies too many LRU pages and kernel is
* stealing one of them. */
if (!memory_pressure_get())
- osc_lru_shrink(cli, osc_cache_too_much(cli), false);
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
wake_up(&osc_lru_waitq);
} else {
LASSERT(cfs_list_empty(&opg->ops_lru));
/**
* Drop @target of pages from LRU at most.
*/
-int osc_lru_shrink(struct client_obd *cli, int target, bool force)
+int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
+ int target, bool force)
{
- struct cl_env_nest nest;
- struct lu_env *env;
struct cl_io *io;
struct cl_object *clobj = NULL;
struct cl_page **pvec;
cfs_atomic_inc(&cli->cl_lru_shrinkers);
}
- env = cl_env_nested_get(&nest);
- if (IS_ERR(env))
- GOTO(out, rc = PTR_ERR(env));
-
pvec = osc_env_info(env)->oti_pvec;
io = &osc_env_info(env)->oti_io;
cl_io_fini(env, io);
cl_object_put(env, clobj);
}
- cl_env_nested_put(&nest, env);
-out:
cfs_atomic_dec(&cli->cl_lru_shrinkers);
if (count > 0) {
cfs_atomic_add(count, cli->cl_lru_left);
int osc_lru_reclaim(struct client_obd *cli)
{
+ struct cl_env_nest nest;
+ struct lu_env *env;
struct cl_client_cache *cache = cli->cl_cache;
int max_scans;
int rc = 0;
+ ENTRY;
LASSERT(cache != NULL);
LASSERT(!cfs_list_empty(&cache->ccc_lru));
- rc = osc_lru_shrink(cli, osc_cache_too_much(cli), false);
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
+ RETURN(rc);
+
+ rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli), false);
if (rc != 0) {
if (rc == -EBUSY)
rc = 0;
CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
cli->cl_import->imp_obd->obd_name, rc, cli);
- return rc;
+ GOTO(out, rc);
}
CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
if (osc_cache_too_much(cli) > 0) {
spin_unlock(&cache->ccc_lru_lock);
- rc = osc_lru_shrink(cli, osc_cache_too_much(cli), true);
+ rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli),
+ true);
spin_lock(&cache->ccc_lru_lock);
if (rc != 0)
break;
}
spin_unlock(&cache->ccc_lru_lock);
+out:
+ cl_env_nested_put(&nest, env);
CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n",
cli->cl_import->imp_obd->obd_name, cli, rc);
return rc;
int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
int target = *(int *)val;
- nr = osc_lru_shrink(cli, min(nr, target), true);
+ nr = osc_lru_shrink(env, cli, min(nr, target), true);
*(int *)val -= nr;
RETURN(0);
}
GOTO(out_client_setup, rc = PTR_ERR(handler));
cli->cl_writeback_work = handler;
+ handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
+ if (IS_ERR(handler))
+ GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
+ cli->cl_lru_work = handler;
+
rc = osc_quota_setup(obd);
if (rc)
GOTO(out_ptlrpcd_work, rc);
RETURN(rc);
out_ptlrpcd_work:
- ptlrpcd_destroy_work(handler);
+ if (cli->cl_writeback_work != NULL) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
+ if (cli->cl_lru_work != NULL) {
+ ptlrpcd_destroy_work(cli->cl_lru_work);
+ cli->cl_lru_work = NULL;
+ }
out_client_setup:
client_obd_cleanup(obd);
out_ptlrpcd:
ptlrpcd_destroy_work(cli->cl_writeback_work);
cli->cl_writeback_work = NULL;
}
+ if (cli->cl_lru_work) {
+ ptlrpcd_destroy_work(cli->cl_lru_work);
+ cli->cl_lru_work = NULL;
+ }
obd_cleanup_client_import(obd);
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
*/
static int ptlrpcd(void *arg)
{
- struct ptlrpcd_ctl *pc = arg;
- struct ptlrpc_request_set *set = pc->pc_set;
- struct lu_env env = { .le_ses = NULL };
- int rc, exit = 0;
- ENTRY;
+ struct ptlrpcd_ctl *pc = arg;
+ struct ptlrpc_request_set *set = pc->pc_set;
+ struct lu_context ses = { 0 };
+ struct lu_env env = { .le_ses = &ses };
+ int rc, exit = 0;
+ ENTRY;
unshare_fs_struct();
#if defined(CONFIG_SMP)
*/
rc = lu_context_init(&env.le_ctx,
LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
+ if (rc == 0) {
+ rc = lu_context_init(env.le_ses,
+ LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
+ if (rc != 0)
+ lu_context_fini(&env.le_ctx);
+ }
complete(&pc->pc_starting);
if (rc != 0)
lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
ptlrpc_expired_set, set);
- lu_context_enter(&env.le_ctx);
- l_wait_event(set->set_waitq,
- ptlrpcd_check(&env, pc), &lwi);
- lu_context_exit(&env.le_ctx);
+ lu_context_enter(&env.le_ctx);
+ lu_context_enter(env.le_ses);
+ l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi);
+ lu_context_exit(&env.le_ctx);
+ lu_context_exit(env.le_ses);
- /*
- * Abort inflight rpcs for forced stop case.
- */
+ /*
+ * Abort inflight rpcs for forced stop case.
+ */
if (test_bit(LIOD_STOP, &pc->pc_flags)) {
if (test_bit(LIOD_FORCE, &pc->pc_flags))
ptlrpc_abort_set(set);
*/
if (!cfs_list_empty(&set->set_requests))
ptlrpc_set_wait(set);
- lu_context_fini(&env.le_ctx);
+ lu_context_fini(&env.le_ctx);
+ lu_context_fini(env.le_ses);
complete(&pc->pc_finishing);
- return 0;
+ return 0;
}
/* XXX: We want multiple CPU cores to share the async RPC load. So we start many