void *ei_cb_cp; /* lock completion callback */
void *ei_cb_gl; /* lock glimpse callback */
void *ei_cbdata; /* Data to be passed into callbacks. */
+ short ei_async:1; /* async request */
};
extern struct obd_ops ldlm_obd_ops;
struct md_open_data *);
int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *,
struct lookup_intent *, struct md_op_data *,
- struct lustre_handle *, void *, int, int);
+ struct lustre_handle *, void *, int,
+ struct ptlrpc_request **, int);
int (*m_getattr)(struct obd_export *, const struct lu_fid *,
struct obd_capa *, obd_valid, int,
struct ptlrpc_request **);
struct md_op_data *op_data,
struct lustre_handle *lockh,
void *lmm, int lmmsize,
+ struct ptlrpc_request **req,
int extra_lock_flags)
{
int rc;
EXP_CHECK_MD_OP(exp, enqueue);
EXP_MD_COUNTER_INCREMENT(exp, enqueue);
rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh,
- lmm, lmmsize, extra_lock_flags);
+ lmm, lmmsize, req, extra_lock_flags);
RETURN(rc);
}
if (flags == LDLM_FL_WAIT_NOREPROC) {
/* client side - set a flag to prevent sending a CANCEL */
lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
- ldlm_lock_decref_internal(lock, mode);
+
+ /* when reaching here, it is under lock_res_and_lock(). Thus,
+ need call the nolock version of ldlm_lock_decref_internal*/
+ ldlm_lock_decref_internal_nolock(lock, mode);
}
ldlm_lock_destroy_nolock(lock);
int local = ns_is_client(ns);
int added = (mode == LCK_NL);
int overlaps = 0;
+ int splitted = 0;
ENTRY;
CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
req->l_blocking_ast = ldlm_flock_blocking_ast;
}
+reprocess:
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
/* This loop determines where this processes locks start
* in the resource lr_granted list. */
/* XXX - if ldlm_lock_new() can sleep we should
* release the ns_lock, allocate the new lock,
* and restart processing this lock. */
- new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
+ if (!new2) {
+ unlock_res_and_lock(req);
+ new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
lock->l_granted_mode, NULL, NULL, NULL,
NULL, 0);
- if (!new2) {
- ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
- *err = -ENOLCK;
- RETURN(LDLM_ITER_STOP);
+ lock_res_and_lock(req);
+ if (!new2) {
+ ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
+ *err = -ENOLCK;
+ RETURN(LDLM_ITER_STOP);
+ }
+ goto reprocess;
}
+ splitted = 1;
+
new2->l_granted_mode = lock->l_granted_mode;
new2->l_policy_data.l_flock.pid =
new->l_policy_data.l_flock.pid;
&new2->l_export->exp_ldlm_data.led_held_locks);
spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
}
- if (*flags == LDLM_FL_WAIT_NOREPROC)
- ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+ if (*flags == LDLM_FL_WAIT_NOREPROC) {
+ ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
+ }
/* insert new2 at lock */
ldlm_resource_add_lock(res, ownlocks, new2);
break;
}
+ /* if new2 is created but never used, destroy it*/
+ if (splitted == 0 && new2 != NULL)
+ ldlm_lock_destroy_nolock(new2);
+
/* At this point we're granting the lock request. */
req->l_granted_mode = req->l_req_mode;
ldlm_reprocess_queue(res, &res->lr_waiting,
&rpc_list);
- unlock_res(res);
- rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
- lock_res(res);
+ unlock_res_and_lock(req);
+ rc = ldlm_run_ast_work(&rpc_list,
+ LDLM_WORK_BL_AST);
+ lock_res_and_lock(req);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
}
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
LDLM_DEBUG(lock, "client-side enqueue granted");
ns = lock->l_resource->lr_namespace;
- lock_res(lock->l_resource);
+ lock_res_and_lock(lock);
/* before flock's complete ast gets here, the flock
* can possibly be freed by another thread
if (flags == 0)
cfs_waitq_signal(&lock->l_waitq);
}
- unlock_res(lock->l_resource);
+ unlock_res_and_lock(lock);
RETURN(0);
}
EXPORT_SYMBOL(ldlm_flock_completion_ast);
ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **,
void *cookie, int *flags);
void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode);
void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode);
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *, __u32 mode);
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
struct list_head *work_list);
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
unlock_res_and_lock(lock);
}
-void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+/* only called in ldlm_flock_destroy and for local locks.
+ * * for LDLM_FLOCK type locks, l_blocking_ast is null, and
+ * * ldlm_lock_remove_from_lru() does nothing, it is safe
+ * * for ldlm_flock_destroy usage by dropping some code */
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
{
- struct ldlm_namespace *ns;
- ENTRY;
-
- lock_res_and_lock(lock);
-
- ns = lock->l_resource->lr_namespace;
-
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
LASSERT(lock->l_readers > 0);
lock->l_writers--;
}
+ LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
+}
+
+void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+{
+ struct ldlm_namespace *ns;
+ ENTRY;
+
+ lock_res_and_lock(lock);
+
+ ns = lock->l_resource->lr_namespace;
+
+ ldlm_lock_decref_internal_nolock(lock, mode);
+
if (lock->l_flags & LDLM_FL_LOCAL &&
!lock->l_readers && !lock->l_writers) {
/* If this is a local lock on a server namespace and this was
unlock_res_and_lock(lock);
}
- LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
-
EXIT;
}
LUSTRE_OPC_ANY);
rc = md_enqueue(sbi->ll_md_exp, &einfo, &it,
- &op_data, &lockh, NULL, 0,
+ &op_data, &lockh, NULL, 0, NULL,
LDLM_FL_CANCEL_ON_BLOCK);
request = (struct ptlrpc_request *)it.d.lustre.it_data;
if (request)
LUSTRE_OPC_ANY);
rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data,
- &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY);
+ &lockh, lum, lum_size, NULL, LDLM_FL_INTENT_ONLY);
if (rc)
GOTO(out, rc);
return (void *)op_data;
rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it,
- op_data, &lockh, NULL, 0, 0);
+ op_data, &lockh, NULL, 0, NULL, 0);
ll_finish_md_op_data(op_data);
RETURN(PTR_ERR(op_data));
rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
- op_data, &lockh, NULL, 0, 0);
+ op_data, &lockh, NULL, 0, NULL, 0);
ll_finish_md_op_data(op_data);
if (rc < 0)
{
struct inode *inode = file->f_dentry->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ldlm_res_id res_id =
- { .name = { fid_seq(ll_inode2fid(inode)),
- fid_oid(ll_inode2fid(inode)),
- fid_ver(ll_inode2fid(inode)),
- LDLM_FLOCK} };
- struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
- ldlm_flock_completion_ast, NULL, file_lock };
+ struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
+ .ei_cb_cp =ldlm_flock_completion_ast,
+ .ei_cbdata = file_lock };
+ struct md_op_data *op_data;
struct lustre_handle lockh = {0};
ldlm_policy_data_t flock;
int flags = 0;
LBUG();
}
+ op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data))
+ RETURN(PTR_ERR(op_data));
+
CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
"start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
- rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
- &flock, &flags, NULL, 0, NULL, &lockh, 0);
+ rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
+ op_data, &lockh, &flock, 0, NULL /* req */, flags);
+
+ ll_finish_md_op_data(op_data);
+
if ((file_lock->fl_flags & FL_FLOCK) &&
(rc == 0 || file_lock->fl_type == F_UNLCK))
ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
continue;
rc = md_enqueue(tgt_exp, einfo, it, op_data2,
- lockh + i, lmm, lmmsize, 0);
+ lockh + i, lmm, lmmsize, NULL, 0);
CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
rdata->op_bias = MDS_CROSS_REF;
rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh,
- lmm, lmmsize, extra_lock_flags);
+ lmm, lmmsize, NULL, extra_lock_flags);
OBD_FREE_PTR(rdata);
EXIT;
out:
lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize,
- int extra_lock_flags)
+ struct ptlrpc_request **req, int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
if (rc)
RETURN(rc);
- if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
+ if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) {
rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
lockh, lmm, lmmsize);
RETURN(rc);
PFID(&op_data->op_fid1));
rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh,
- lmm, lmmsize, extra_lock_flags);
+ lmm, lmmsize, req, extra_lock_flags);
- if (rc == 0 && it->it_op == IT_OPEN)
+ if (rc == 0 && it && it->it_op == IT_OPEN)
rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
lmm, lmmsize, extra_lock_flags);
RETURN(rc);
int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize,
- int extra_lock_flags);
+ struct ptlrpc_request **req, int extra_lock_flags);
int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
struct list_head *cancels, ldlm_mode_t mode,
int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize,
- int extra_lock_flags)
+ struct ptlrpc_request **reqp, int extra_lock_flags)
{
struct obd_device *obddev = class_exp2obd(exp);
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct req_capsule *pill;
- int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+ int flags = extra_lock_flags;
int rc;
struct ldlm_res_id res_id;
ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
ENTRY;
- LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
+ LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
+ einfo->ei_type);
fid_build_reg_res_name(&op_data->op_fid1, &res_id);
- if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+ if (it)
+ flags |= LDLM_FL_HAS_INTENT;
+ if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- if (it->it_op & IT_OPEN) {
+ if (reqp)
+ req = *reqp;
+
+ if (!it) {
+ /* The only way right now is FLOCK, in this case we hide flock
+ policy as lmm, but lmmsize is 0 */
+ LASSERT(lmm && lmmsize == 0);
+ LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
+ einfo->ei_type);
+ policy = *(ldlm_policy_data_t *)lmm;
+ res_id.name[3] = LDLM_FLOCK;
+ } else if (it->it_op & IT_OPEN) {
int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
op_data->op_data);
/* It is important to obtain rpc_lock first (if applicable), so that
* threads that are serialised with rpc_lock are not polluting our
- * rpcs in flight counter */
- mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
- mdc_enter_request(&obddev->u.cli);
+ * rpcs in flight counter. We do not do flock request limiting, though*/
+ if (it) {
+ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ mdc_enter_request(&obddev->u.cli);
+ }
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
0, NULL, lockh, 0);
- mdc_exit_request(&obddev->u.cli);
- mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ if (reqp)
+ *reqp = req;
+
+ if (it) {
+ mdc_exit_request(&obddev->u.cli);
+ mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+ }
+ if (!it) {
+ /* For flock requests we immediatelly return without further
+ delay and let caller deal with the rest, since rest of
+ this function metadata processing makes no sense for flock
+ requests anyway */
+ RETURN(rc);
+ }
+
if (rc < 0) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
mdc_clear_replay_flag(req, rc);
}
}
rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
- lmm, lmmsize, extra_lock_flags);
+ lmm, lmmsize, NULL, extra_lock_flags);
if (rc < 0)
RETURN(rc);
it->d.lustre.it_lock_handle = lockh.cookie;