Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
index 797485f..c728797 100644 (file)
@@ -146,28 +146,27 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
         RETURN(0);
 }
 
-int mdc_lock_match(struct obd_export *exp, int flags,
-                   const struct lu_fid *fid, ldlm_type_t type,
-                   ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                   struct lustre_handle *lockh)
+ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
+                           const struct lu_fid *fid, ldlm_type_t type,
+                           ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                           struct lustre_handle *lockh)
 {
         struct ldlm_res_id res_id =
                 { .name = {fid_seq(fid),
                            fid_oid(fid),
                            fid_ver(fid)} };
-        struct obd_device *obd = class_exp2obd(exp);
-        int rc;
+        ldlm_mode_t rc;
         ENTRY;
 
-        rc = ldlm_lock_match(obd->obd_namespace, flags,
+        rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
                              &res_id, type, policy, mode, lockh);
-
         RETURN(rc);
 }
 
 int mdc_cancel_unused(struct obd_export *exp,
                       const struct lu_fid *fid,
-                      int flags, void *opaque)
+                      ldlm_policy_data_t *policy,
+                      ldlm_mode_t mode, int flags, void *opaque)
 {
         struct ldlm_res_id res_id =
                 { .name = {fid_seq(fid),
@@ -178,8 +177,8 @@ int mdc_cancel_unused(struct obd_export *exp,
 
         ENTRY;
 
-        rc = ldlm_cli_cancel_unused(obd->obd_namespace, &res_id,
-                                    flags, opaque);
+        rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
+                                             policy, mode, flags, opaque);
         RETURN(rc);
 }
 
@@ -245,17 +244,10 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
-                int lock_type,
-                struct lookup_intent *it,
-                int lock_mode,
-                struct md_op_data *op_data,
-                struct lustre_handle *lockh,
-                void *lmm,
-                int lmmsize,
-                ldlm_completion_callback cb_completion,
-                ldlm_blocking_callback cb_blocking,
-                void *cb_data, int extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
 {
         struct ptlrpc_request *req;
         struct obd_device *obddev = class_exp2obd(exp);
@@ -269,22 +261,25 @@ int mdc_enqueue(struct obd_export *exp,
         struct ldlm_reply *lockrep;
         int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
-                        [DLM_INTENT_IT_OFF]   = sizeof(*lit) };
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        0, 0, 0, 0, 0, 0 };
         int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize };
+                                                   cl_max_mds_easize,
+                           0, 0, 0 };
         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repbufcnt = 4, rc;
         ENTRY;
 
-        LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
-//        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-//                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
 
         if (it->it_op & IT_OPEN) {
                 int do_join = !!(it->it_flags & O_JOIN_FILE);
+                CFS_LIST_HEAD(cancels);
+                int count = 0;
+                int mode;
 
                 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
 
@@ -301,27 +296,46 @@ int mdc_enqueue(struct obd_export *exp,
                  */
                 size[DLM_INTENT_REC_OFF + 4] = max(lmmsize,
                                            obddev->u.cli.cl_default_mds_easize);
+
+                /* XXX: openlock is not cancelled for cross-refs. */
+                /* If inode is known, cancel conflicting OPEN locks. */
+                if (fid_is_sane(&op_data->op_fid2)) {
+                        if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+                                mode = LCK_CW;
+#ifdef FMODE_EXEC
+                        else if (it->it_flags & FMODE_EXEC)
+                                mode = LCK_PR;
+#endif
+                        else 
+                                mode = LCK_CR;
+                        count = mdc_resource_get_unused(exp, &op_data->op_fid2,
+                                                        &cancels, mode,
+                                                        MDS_INODELOCK_OPEN);
+                }
+
+                /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+                if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+                        mode = LCK_EX;
+                else
+                        mode = LCK_CR;
+                count += mdc_resource_get_unused(exp, &op_data->op_fid1,
+                                                 &cancels, mode,
+                                                 MDS_INODELOCK_UPDATE);
+
                 if (do_join)
                         size[DLM_INTENT_REC_OFF + 5] =
                                                 sizeof(struct mdt_rec_join);
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 8 + do_join, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels,
+                                            count);
                 if (!req)
                         RETURN(-ENOMEM);
 
                 if (do_join) {
-                        __u64 head_size = *(__u32*)cb_data;
-                        __u32 tsize = *(__u32*)lmm;
-
                         /* join is like an unlink of the tail */
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        /* when joining file, cb_data and lmm args together
-                         * indicate the head file size*/
                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data,
-                                      (head_size << 32) | tsize);
-                        cb_data = NULL;
-                        lmm = NULL;
+                                      (*(__u64 *)op_data->op_data));
                 }
 
                 spin_lock(&req->rq_lock);
@@ -350,8 +364,7 @@ int mdc_enqueue(struct obd_export *exp,
                                                sizeof(struct lustre_capa) : 0;
                 size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1;
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 6, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -378,8 +391,7 @@ int mdc_enqueue(struct obd_export *exp,
                 if (it->it_op & IT_GETATTR)
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 6, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -398,8 +410,7 @@ int mdc_enqueue(struct obd_export *exp,
                 repsize[repbufcnt++] = sizeof(struct lustre_capa);
         } else if (it->it_op == IT_READDIR) {
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 2, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -417,9 +428,8 @@ int mdc_enqueue(struct obd_export *exp,
           * rpcs in flight counter */
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, &res_id, lock_type, &policy,
-                              lock_mode, &flags, cb_blocking, cb_completion,
-                              NULL, cb_data, NULL, 0, NULL, lockh, 0);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
         mdc_exit_request(&obddev->u.cli);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
 
@@ -431,10 +441,8 @@ int mdc_enqueue(struct obd_export *exp,
                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
         }
 
-        /* This can go when we're sure that this can never happen */
-        LASSERT(rc != -ENOENT);
         if (rc == ELDLM_LOCK_ABORTED) {
-                lock_mode = 0;
+                einfo->ei_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
         } else if (rc != 0) {
@@ -449,10 +457,10 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* If the server gave us back a different lock mode, we should
                  * fix up our variables. */
-                if (lock->l_req_mode != lock_mode) {
+                if (lock->l_req_mode != einfo->ei_mode) {
                         ldlm_lock_addref(lockh, lock->l_req_mode);
-                        ldlm_lock_decref(lockh, lock_mode);
-                        lock_mode = lock->l_req_mode;
+                        ldlm_lock_decref(lockh, einfo->ei_mode);
+                        einfo->ei_mode = lock->l_req_mode;
                 }
                 LDLM_LOCK_PUT(lock);
         }
@@ -460,11 +468,12 @@ int mdc_enqueue(struct obd_export *exp,
         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
                                  sizeof(*lockrep));
         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
-        LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
+        /* swabbed by ldlm_cli_enqueue() */
+        LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
 
         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
-        it->d.lustre.it_lock_mode = lock_mode;
+        it->d.lustre.it_lock_mode = einfo->ei_mode;
         it->d.lustre.it_data = req;
 
         if (it->d.lustre.it_status < 0 && req->rq_replay)
@@ -658,11 +667,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
-                struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2),
-                                                        fid_oid(&op_data->op_fid2),
-                                                        fid_ver(&op_data->op_fid2) } };
                 ldlm_policy_data_t policy;
-                ldlm_mode_t mode = LCK_CR;
+                ldlm_mode_t mode;
 
                 /* As not all attributes are kept under update lock, e.g.
                    owner/group/acls are under lookup lock, we need both
@@ -675,30 +681,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
 
-                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_IBITS, &policy, mode, &lockh);
-                if (!rc) {
-                        mode = LCK_CW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-                if (!rc) {
-                        mode = LCK_PR;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-
-                if (!rc) {
-                        mode = LCK_PW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-
-                if (rc) {
+                mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
+                                      &op_data->op_fid2, LDLM_IBITS, &policy,
+                                      LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+                if (mode) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
                                sizeof(lockh));
                         it->d.lustre.it_lock_mode = mode;
@@ -706,8 +692,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
 
                 /* Only return failure if it was not GETATTR by cfid
                    (from inode_revalidate) */
-                if (rc || op_data->op_namelen != 0)
-                        RETURN(rc);
+                if (mode || op_data->op_namelen != 0)
+                        RETURN(!!mode);
         }
 
         /* lookup_it may be called only after revalidate_it has run, because
@@ -719,6 +705,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
          * this and use the request from revalidate.  In this case, revalidate
          * never dropped its reference, so the refcounts are all OK */
         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+                struct ldlm_enqueue_info einfo =
+                        { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+                          ldlm_completion_ast, NULL, NULL };
+
                 /* For case if upper layer did not alloc fid, do it now. */
                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
@@ -727,11 +717,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                                 RETURN(rc);
                         }
                 }
-                
-                rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
-                                 op_data, &lockh, lmm, lmmsize,
-                                 ldlm_completion_ast, cb_blocking, NULL,
-                                 extra_lock_flags);
+                rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+                                 lmm, lmmsize, extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
@@ -760,8 +747,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
 
         mdt_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
                                   sizeof(*mdt_body));
-        LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
-        LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
+        /* mdc_enqueue checked */
+        LASSERT(mdt_body != NULL);
+        /* mdc_enqueue swabbed */
+        LASSERT(lustre_rep_swabbed(request, 1));
 
         /* If we were revalidating a fid/name pair, mark the intent in
          * case we fail and get called again from lookup */