void *ei_namespace; /** lock namespace **/
u64 ei_inodebits; /** lock inode bits **/
unsigned int ei_enq_slave:1; /** whether enqueue slave stripes */
- unsigned int ei_enq_slot:1; /** whether acquire rpc slot */
+ unsigned int ei_req_slot:1; /** whether acquire rpc slot */
+ unsigned int ei_mod_slot:1; /** whether acquire mod rpc slot */
};
#define ei_res_id ei_cb_gl
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
struct ldlm_enqueue_info *einfo, __u8 with_policy,
__u64 *flags, void *lvb, __u32 lvb_len,
- const struct lustre_handle *lockh, int rc);
+ const struct lustre_handle *lockh, int rc,
+ bool request_slot);
int ldlm_cli_enqueue_local(const struct lu_env *env,
struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
CLI_API32 = BIT(3),
CLI_MIGRATE = BIT(4),
CLI_DIRTY_DATA = BIT(5),
+ CLI_NO_SLOT = BIT(6),
};
enum md_op_code {
/* exclude EXTENT locks and DOM-only IBITS locks because they
* are asynchronous and don't wait on server being blocked.
*/
- return einfo->ei_type == LDLM_FLOCK ||
- (einfo->ei_type == LDLM_IBITS &&
- einfo->ei_inodebits != MDS_INODELOCK_DOM);
+ return einfo->ei_req_slot &&
+ (einfo->ei_type == LDLM_FLOCK ||
+ (einfo->ei_type == LDLM_IBITS &&
+ einfo->ei_inodebits != MDS_INODELOCK_DOM));
}
/**
struct ldlm_enqueue_info *einfo,
__u8 with_policy, __u64 *ldlm_flags, void *lvb,
__u32 lvb_len, const struct lustre_handle *lockh,
- int rc)
+ int rc, bool request_slot)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
const struct lu_env *env = NULL;
ENTRY;
- if (ldlm_request_slot_needed(einfo))
+ if (request_slot)
obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
ptlrpc_put_mod_rpc_slot(req);
int is_replay = *flags & LDLM_FL_REPLAY;
int req_passed_in = 1;
int rc, err;
+ bool need_req_slot;
struct ptlrpc_request *req;
ENTRY;
* that threads that are waiting for a modify RPC slot are not polluting
* our rpcs in flight counter. */
- if (einfo->ei_enq_slot)
+ if (einfo->ei_mod_slot)
ptlrpc_get_mod_rpc_slot(req);
- if (ldlm_request_slot_needed(einfo)) {
+ need_req_slot = ldlm_request_slot_needed(einfo);
+
+ if (need_req_slot) {
rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
if (rc) {
- if (einfo->ei_enq_slot)
+ if (einfo->ei_mod_slot)
ptlrpc_put_mod_rpc_slot(req);
failed_lock_cleanup(ns, lock, einfo->ei_mode);
LDLM_LOCK_RELEASE(lock);
rc = ptlrpc_queue_wait(req);
err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
- lvb, lvb_len, lockh, rc);
+ lvb, lvb_len, lockh, rc, need_req_slot);
/*
* If ldlm_cli_enqueue_fini did not find the lock, we need to free
einfo->ei_cb_bl = ldlm_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_res_id = resid;
+ einfo->ei_req_slot = 1;
rc = dt_object_lock(env, obj, lh, einfo, policy);
/* for regular checks LFSCK doesn't use LDLM locking,
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_cb_gl = NULL;
einfo->ei_cbdata = NULL;
+ einfo->ei_req_slot = 1;
return minfo;
}
}
op_data->op_bias = MDS_CROSS_REF;
+ op_data->op_cli_flags = CLI_NO_SLOT;
CDEBUG(D_INODE, "REMOTE_INTENT with fid="DFID" -> mds #%u\n",
PFID(&body->mbo_fid1), tgt->ltd_index);
* it's remote object.
*/
op_data->op_bias = MDS_CROSS_REF;
+ op_data->op_cli_flags = CLI_NO_SLOT;
tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
if (!tgt)
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
einfo->ei_cbdata = osc; /* value to be put into ->l_ast_data */
+ einfo->ei_req_slot = 1;
}
static void mdc_lock_lvb_update(const struct lu_env *env,
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
aa->oa_lvb, aa->oa_lvb ?
- sizeof(*aa->oa_lvb) : 0, lockh, rc);
+ sizeof(*aa->oa_lvb) : 0, lockh, rc, true);
/* Complete mdc stuff. */
rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie,
lockh, mode, aa->oa_flags, rc);
req->rq_sent = ktime_get_real_seconds() + resends;
}
- einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
+ einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
+ einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
/* With Data-on-MDT the glimpse callback is needed too.
* It is set here in advance but not in mdc_finish_enqueue()
rc = -ETIMEDOUT;
rc = ldlm_cli_enqueue_fini(exp, req, einfo, 1, &flags, NULL, 0,
- lockh, rc);
+ lockh, rc, true);
if (rc < 0) {
CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
exp->exp_obd->obd_name, rc);
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 0;
einfo->ei_res_id = res_id;
+ einfo->ei_req_slot = 1;
if (cache) {
/*
einfo->ei_enq_slave = 1;
einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
einfo->ei_inodebits = ibits;
+ einfo->ei_req_slot = 1;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
- lvb, lvb_len, lockh, rc);
+ lvb, lvb_len, lockh, rc, false);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
aa->oa_flags, aa->oa_speculative, rc);
rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, &einfo, 0, &flags,
aa->aa_lvb, sizeof(*(aa->aa_lvb)),
- lockh, rc);
+ lockh, rc, false);
if (rc < 0) {
/* the lock has been destroyed, forget about the lock handle */
memset(lockh, 0, sizeof(*lockh));