/* We always reserve enough space in the reply packet for a stripe MD, because
* we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
- int lock_type,
- struct lookup_intent *it,
- int lock_mode,
- struct mdc_op_data *data,
- struct lustre_handle *lockh,
- void *lmm,
- int lmmsize,
- ldlm_completion_callback cb_completion,
- ldlm_blocking_callback cb_blocking,
- void *cb_data, int extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct mdc_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
+ int extra_lock_flags)
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_exp2obd(exp);
struct ldlm_res_id res_id =
- { .name = {data->fid1.id, data->fid1.generation} };
+ { .name = {op_data->fid1.id, op_data->fid1.generation} };
ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
struct ldlm_request *lockreq;
struct ldlm_intent *lit;
struct ldlm_reply *lockrep;
int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*lockreq),
- [DLM_INTENT_IT_OFF] = sizeof(*lit) };
+ [DLM_INTENT_IT_OFF] = sizeof(*lit),
+ 0, 0, 0, 0 };
int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
[DLM_REPLY_REC_OFF] = sizeof(struct mds_body),
[DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize };
+ cl_max_mds_easize, 0 };
int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
int repbufcnt = 4, rc;
void *eadata;
ENTRY;
- LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
+ LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
// ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
if (it->it_op & IT_OPEN) {
+ CFS_LIST_HEAD(cancels);
+ int count = 0;
+ int mode;
+
it->it_create_mode |= S_IFREG;
size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
- size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
/* As an optimization, we allocate an RPC request buffer for
* at least a default-sized LOV EA even if we aren't sending
* one. We grow the whole request to the next power-of-two
min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
obddev->u.cli.cl_max_mds_easize);
- if (it->it_flags & O_JOIN_FILE) {
- __u64 head_size = *(__u32*)cb_data;
- __u32 tsize = *(__u32*)lmm;
+ /* If inode is known, cancel conflicting OPEN locks. */
+ if (op_data->fid2.id) {
+ if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+ mode = LCK_CW;
+#ifdef FMODE_EXEC
+ else if (it->it_flags & FMODE_EXEC)
+ mode = LCK_PR;
+#endif
+ else
+ mode = LCK_CR;
+ count = mdc_resource_get_unused(exp, &op_data->fid2,
+ &cancels, mode,
+ MDS_INODELOCK_OPEN);
+ }
+ /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+ if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+ mode = LCK_EX;
+ else
+ mode = LCK_CR;
+ count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
+ mode, MDS_INODELOCK_UPDATE);
+ if (it->it_flags & O_JOIN_FILE) {
/* join is like an unlink of the tail */
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
size[DLM_INTENT_REC_OFF + 3] =
sizeof(struct mds_rec_join);
- req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
- 7, size, NULL);
- /* when joining file, cb_data and lmm args together
- * indicate the head file size*/
- mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
- (head_size << 32) | tsize);
- cb_data = NULL;
- lmm = NULL;
+ req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
+ count);
+ mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
+ (*(__u64 *)op_data->data));
} else {
- req = ptlrpc_prep_req(class_exp2cliimp(exp),
- LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
- 6, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
+ count);
}
if (!req)
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode,
- 0, it->it_flags, lmm, lmmsize);
+ mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
+ it->it_create_mode, 0, it->it_flags,
+ lmm, lmmsize);
repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
} else if (it->it_op & IT_UNLINK) {
size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
- size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 5, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
lit->opc = (__u64)it->it_op;
/* pack the intended request */
- mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
+ mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
OBD_MD_FLDIREA;
size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
- size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+ size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
if (it->it_op & IT_GETATTR)
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 5, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
/* pack the intended request */
mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
- it->it_flags, data);
+ it->it_flags, op_data);
repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
} else if (it->it_op == IT_READDIR) {
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 2, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
* rpcs in flight counter */
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
mdc_enter_request(&obddev->u.cli);
- rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy,
- lock_mode, &flags, cb_blocking, cb_completion,
- NULL, cb_data, NULL, 0, NULL, lockh, 0);
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+ 0, NULL, lockh, 0);
mdc_exit_request(&obddev->u.cli);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* This can go when we're sure that this can never happen */
LASSERT(rc != -ENOENT);
if (rc == ELDLM_LOCK_ABORTED) {
- lock_mode = 0;
+ einfo->ei_mode = 0;
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else if (rc != 0) {
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
- if (lock->l_req_mode != lock_mode) {
+ if (lock->l_req_mode != einfo->ei_mode) {
ldlm_lock_addref(lockh, lock->l_req_mode);
- ldlm_lock_decref(lockh, lock_mode);
- lock_mode = lock->l_req_mode;
+ ldlm_lock_decref(lockh, einfo->ei_mode);
+ einfo->ei_mode = lock->l_req_mode;
}
LDLM_LOCK_PUT(lock);
}
it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
- it->d.lustre.it_lock_mode = lock_mode;
+ it->d.lustre.it_lock_mode = einfo->ei_mode;
it->d.lustre.it_data = req;
if (it->d.lustre.it_status < 0 && req->rq_replay)
* this and use the request from revalidate. In this case, revalidate
* never dropped its reference, so the refcounts are all OK */
if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+ struct ldlm_enqueue_info einfo =
+ { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+ ldlm_completion_ast, NULL, NULL };
- rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
- op_data, &lockh, lmm, lmmsize,
- ldlm_completion_ast, cb_blocking, NULL,
- extra_lock_flags);
+ rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+ lmm, lmmsize, extra_lock_flags);
if (rc < 0)
RETURN(rc);
memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
!it_open_error(DISP_OPEN_OPEN, it)) {
it_set_disposition(it, DISP_ENQ_OPEN_REF);
ptlrpc_request_addref(request); /* balanced in ll_file_open */
+ /* BUG 11546 - eviction in the middle of open rpc processing */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
}
if (it->it_op & IT_CREAT) {