return 0;
}
+static inline void
+mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
+{
+ /* Don't hold error requests for replay. */
+ if (req->rq_replay) {
+ unsigned long irqflags;
+ spin_lock_irqsave(&req->rq_lock, irqflags);
+ req->rq_replay = 0;
+ spin_unlock_irqrestore(&req->rq_lock, irqflags);
+ }
+ if (rc && req->rq_transno != 0) {
+ DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
+ LBUG();
+ }
+}
+
/* We always reserve enough space in the reply packet for a stripe MD, because
* we don't know in advance the file type. */
int mdc_enqueue(struct obd_export *exp,
ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
struct ldlm_intent *lit;
struct ldlm_request *lockreq;
- struct ldlm_reply *dlm_rep;
int reqsize[6] = {[MDS_REQ_SECDESC_OFF] = 0,
[MDS_REQ_INTENT_LOCKREQ_OFF] = sizeof(*lockreq),
[MDS_REQ_INTENT_IT_OFF] = sizeof(*lit)};
obddev->u.cli.cl_max_mds_easize};
int req_buffers = 3, reply_buffers = 0;
int rc, flags = LDLM_FL_HAS_INTENT;
+ struct ldlm_reply *dlm_rep = NULL;
void *eadata;
unsigned long irqflags;
ENTRY;
reqsize[req_buffers++] = sizeof(struct mds_rec_create);
reqsize[req_buffers++] = data->namelen + 1;
reqsize[req_buffers++] = obddev->u.cli.cl_max_mds_easize;
+
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
LDLM_ENQUEUE, req_buffers, reqsize, NULL);
if (!req)
/* pack the intended request */
mdc_open_pack(req->rq_reqmsg, MDS_REQ_INTENT_REC_OFF, data,
- it->it_create_mode, 0, it->it_flags,
- lmm, lmmsize);
+ it->it_create_mode, 0, it->it_flags, lmm, lmmsize);
/* get ready for the reply */
repsize[3] = 4;
repsize[4] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
/* This can go when we're sure that this can never happen */
LASSERT(rc != -ENOENT);
+ /* We need dlm_rep to be assigned this early, to check lock mode of
+ returned lock from request to avoid possible race with lock
+ conversion */
+ if (rc == ELDLM_LOCK_ABORTED || !rc) {
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
+ LASSERT(dlm_rep != NULL); /* checked by ldlm_cli_enqueue() */
+ }
if (rc == ELDLM_LOCK_ABORTED) {
lock_mode = 0;
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else if (rc != 0) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
- LASSERT (rc < 0);
+ LASSERTF(rc < 0, "rc = %d\n", rc);
+ mdc_clear_replay_flag(req, rc);
ptlrpc_req_finished(req);
RETURN(rc);
} else { /* rc = 0 */
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
- if (lock->l_req_mode != lock_mode) {
- ldlm_lock_addref(lockh, lock->l_req_mode);
+ if (dlm_rep->lock_desc.l_req_mode != lock_mode) {
+ ldlm_lock_addref(lockh, dlm_rep->lock_desc.l_req_mode);
ldlm_lock_decref(lockh, lock_mode);
- lock_mode = lock->l_req_mode;
+ lock_mode = dlm_rep->lock_desc.l_req_mode;
}
ldlm_lock_allow_match(lock);
LDLM_LOCK_PUT(lock);
}
- dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
- LASSERT(dlm_rep != NULL); /* checked by ldlm_cli_enqueue() */
LASSERT_REPSWABBED(req, 0); /* swabbed by ldlm_cli_enqueue() */
LUSTRE_IT(it)->it_disposition = (int) dlm_rep->lock_policy_res1;
LUSTRE_IT(it)->it_lock_mode = lock_mode;
LUSTRE_IT(it)->it_data = req;
- if (LUSTRE_IT(it)->it_status < 0 && req->rq_replay) {
- LASSERT(req->rq_transno == 0);
- /* Don't hold error requests for replay. */
- spin_lock(&req->rq_lock);
- req->rq_replay = 0;
- spin_unlock(&req->rq_lock);
- }
+ if (LUSTRE_IT(it)->it_status < 0 && req->rq_replay)
+ mdc_clear_replay_flag(req, LUSTRE_IT(it)->it_status);
DEBUG_REQ(D_RPCTRACE, req, "disposition: %x, status: %d",
LUSTRE_IT(it)->it_disposition, LUSTRE_IT(it)->it_status);
id_group(cid)}};
struct lustre_handle lockh;
ldlm_policy_data_t policy;
- int mode = LCK_PR;
+ int mode;
/* For the GETATTR case, ll_revalidate_it issues two separate
queries - for LOOKUP and for UPDATE lock because it cannot
check them together - we might have those two bits to be
present in two separate granted locks */
policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
- MDS_INODELOCK_UPDATE: MDS_INODELOCK_LOOKUP;
+ MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
mode = LCK_PR;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
LDLM_FL_BLOCK_GRANTED, &res_id,
- LDLM_IBITS, &policy, LCK_PR, &lockh);
+ LDLM_IBITS, &policy, mode,
+ &lockh);
+
+ if (!rc) {
+ mode = LCK_CR;
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy, mode,
+ &lockh);
+ }
if (!rc) {
mode = LCK_PW;
rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
LDLM_FL_BLOCK_GRANTED, &res_id,
- LDLM_IBITS, &policy, LCK_PW,
+ LDLM_IBITS, &policy, mode,
+ &lockh);
+ }
+ if (!rc) {
+ mode = LCK_CW;
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy, mode,
&lockh);
}
if (rc) {
* It's important that we do this first! Otherwise we might exit the
* function without doing so, and try to replay a failed create (bug
* 3440) */
- if (it->it_op & IT_OPEN) {
- if (!it_disposition(it, DISP_OPEN_OPEN) ||
- LUSTRE_IT(it)->it_status != 0) {
- unsigned long irqflags;
-
- spin_lock_irqsave(&request->rq_lock, irqflags);
- request->rq_replay = 0;
- spin_unlock_irqrestore(&request->rq_lock, irqflags);
- }
- }
+ if (it->it_op & IT_OPEN && request->rq_replay &&
+ (!it_disposition(it, DISP_OPEN_OPEN) || LUSTRE_IT(it)->it_status != 0))
+ mdc_clear_replay_flag(request, LUSTRE_IT(it)->it_status);
+
if (!it_disposition(it, DISP_IT_EXECD)) {
/* The server failed before it even started executing the
* intent, i.e. because it couldn't unpack the request. */