Whamcloud - gitweb
- landed b_hd_cray_merge3
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
index e3bda59..436a155 100644 (file)
@@ -175,6 +175,22 @@ int mdc_change_cbdata(struct obd_export *exp, struct lustre_id *id,
         return 0;
 }
 
+static inline void
+mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
+{
+        /* Don't hold error requests for replay. */
+        if (req->rq_replay) {
+                unsigned long irqflags;
+                spin_lock_irqsave(&req->rq_lock, irqflags);
+                req->rq_replay = 0;
+                spin_unlock_irqrestore(&req->rq_lock, irqflags);
+        }
+        if (rc && req->rq_transno != 0) {
+                DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
+                LBUG();
+        }
+}
+
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
 int mdc_enqueue(struct obd_export *exp,
@@ -197,7 +213,6 @@ int mdc_enqueue(struct obd_export *exp,
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         struct ldlm_intent *lit;
         struct ldlm_request *lockreq;
-        struct ldlm_reply *dlm_rep;
         int reqsize[6] = {[MDS_REQ_SECDESC_OFF] = 0,
                           [MDS_REQ_INTENT_LOCKREQ_OFF] = sizeof(*lockreq),
                           [MDS_REQ_INTENT_IT_OFF] = sizeof(*lit)};
@@ -206,6 +221,7 @@ int mdc_enqueue(struct obd_export *exp,
                           obddev->u.cli.cl_max_mds_easize};
         int req_buffers = 3, reply_buffers = 0;
         int rc, flags = LDLM_FL_HAS_INTENT;
+        struct ldlm_reply *dlm_rep = NULL;
         void *eadata;
         unsigned long irqflags;
         ENTRY;
@@ -222,6 +238,7 @@ int mdc_enqueue(struct obd_export *exp,
                 reqsize[req_buffers++] = sizeof(struct mds_rec_create);
                 reqsize[req_buffers++] = data->namelen + 1;
                 reqsize[req_buffers++] = obddev->u.cli.cl_max_mds_easize;
+
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
                                       LDLM_ENQUEUE, req_buffers, reqsize, NULL);
                 if (!req)
@@ -238,8 +255,7 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* pack the intended request */
                 mdc_open_pack(req->rq_reqmsg, MDS_REQ_INTENT_REC_OFF, data,
-                              it->it_create_mode, 0, it->it_flags,
-                              lmm, lmmsize);
+                              it->it_create_mode, 0, it->it_flags, lmm, lmmsize);
                 /* get ready for the reply */
                 repsize[3] = 4;
                 repsize[4] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
@@ -330,13 +346,21 @@ int mdc_enqueue(struct obd_export *exp,
 
         /* This can go when we're sure that this can never happen */
         LASSERT(rc != -ENOENT);
+        /* We need dlm_rep to be assigned this early, to check lock mode of
+           returned lock from request to avoid possible race with lock
+           conversion */
+        if (rc == ELDLM_LOCK_ABORTED || !rc) {
+                dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
+                LASSERT(dlm_rep != NULL);   /* checked by ldlm_cli_enqueue() */
+        }
         if (rc == ELDLM_LOCK_ABORTED) {
                 lock_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
         } else if (rc != 0) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
-                LASSERT (rc < 0);
+                LASSERTF(rc < 0, "rc = %d\n", rc);
+                mdc_clear_replay_flag(req, rc);
                 ptlrpc_req_finished(req);
                 RETURN(rc);
         } else { /* rc = 0 */
@@ -345,18 +369,16 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* If the server gave us back a different lock mode, we should
                  * fix up our variables. */
-                if (lock->l_req_mode != lock_mode) {
-                        ldlm_lock_addref(lockh, lock->l_req_mode);
+                if (dlm_rep->lock_desc.l_req_mode != lock_mode) {
+                        ldlm_lock_addref(lockh, dlm_rep->lock_desc.l_req_mode);
                         ldlm_lock_decref(lockh, lock_mode);
-                        lock_mode = lock->l_req_mode;
+                        lock_mode = dlm_rep->lock_desc.l_req_mode;
                 }
 
                 ldlm_lock_allow_match(lock);
                 LDLM_LOCK_PUT(lock);
         }
 
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
-        LASSERT(dlm_rep != NULL);           /* checked by ldlm_cli_enqueue() */
         LASSERT_REPSWABBED(req, 0);         /* swabbed by ldlm_cli_enqueue() */
 
         LUSTRE_IT(it)->it_disposition = (int) dlm_rep->lock_policy_res1;
@@ -364,13 +386,8 @@ int mdc_enqueue(struct obd_export *exp,
         LUSTRE_IT(it)->it_lock_mode = lock_mode;
         LUSTRE_IT(it)->it_data = req;
 
-        if (LUSTRE_IT(it)->it_status < 0 && req->rq_replay) {
-                LASSERT(req->rq_transno == 0);
-                /* Don't hold error requests for replay. */
-                spin_lock(&req->rq_lock);
-                req->rq_replay = 0;
-                spin_unlock(&req->rq_lock);
-        }
+        if (LUSTRE_IT(it)->it_status < 0 && req->rq_replay)
+                mdc_clear_replay_flag(req, LUSTRE_IT(it)->it_status);
 
         DEBUG_REQ(D_RPCTRACE, req, "disposition: %x, status: %d",
                   LUSTRE_IT(it)->it_disposition, LUSTRE_IT(it)->it_status);
@@ -472,24 +489,40 @@ int mdc_intent_lock(struct obd_export *exp, struct lustre_id *pid,
                                                       id_group(cid)}};
                 struct lustre_handle lockh;
                 ldlm_policy_data_t policy;
-                int mode = LCK_PR;
+                int mode;
 
                 /* For the GETATTR case, ll_revalidate_it issues two separate
                    queries - for LOOKUP and for UPDATE lock because it cannot
                    check them together - we might have those two bits to be
                    present in two separate granted locks */
                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
-                        MDS_INODELOCK_UPDATE: MDS_INODELOCK_LOOKUP;
+                        MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
                 
                 mode = LCK_PR;
                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                      LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_IBITS, &policy, LCK_PR, &lockh);
+                                     LDLM_IBITS, &policy, mode,
+                                     &lockh);
+
+                if (!rc) {
+                        mode = LCK_CR;
+                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                             LDLM_FL_BLOCK_GRANTED, &res_id,
+                                             LDLM_IBITS, &policy, mode,
+                                             &lockh);
+                }
                 if (!rc) {
                         mode = LCK_PW;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                              LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, LCK_PW,
+                                             LDLM_IBITS, &policy, mode,
+                                             &lockh);
+                }
+                if (!rc) {
+                        mode = LCK_CW;
+                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                             LDLM_FL_BLOCK_GRANTED, &res_id,
+                                             LDLM_IBITS, &policy, mode,
                                              &lockh);
                 }
                 if (rc) {
@@ -543,16 +576,10 @@ int mdc_intent_lock(struct obd_export *exp, struct lustre_id *pid,
          * It's important that we do this first!  Otherwise we might exit the
          * function without doing so, and try to replay a failed create (bug
          * 3440) */
-        if (it->it_op & IT_OPEN) {
-                if (!it_disposition(it, DISP_OPEN_OPEN) ||
-                    LUSTRE_IT(it)->it_status != 0) {
-                        unsigned long irqflags;
-
-                        spin_lock_irqsave(&request->rq_lock, irqflags);
-                        request->rq_replay = 0;
-                        spin_unlock_irqrestore(&request->rq_lock, irqflags);
-                }
-        }
+        if (it->it_op & IT_OPEN && request->rq_replay &&
+            (!it_disposition(it, DISP_OPEN_OPEN) || LUSTRE_IT(it)->it_status != 0))
+                mdc_clear_replay_flag(request, LUSTRE_IT(it)->it_status);
         if (!it_disposition(it, DISP_IT_EXECD)) {
                 /* The server failed before it even started executing the
                  * intent, i.e. because it couldn't unpack the request. */