Whamcloud - gitweb
Revert "LU-2675 obd: decruft md_enqueue() and md_intent_lock()" 03/10203/2
authorOleg Drokin <oleg.drokin@intel.com>
Sat, 3 May 2014 22:53:15 +0000 (22:53 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 4 May 2014 00:04:17 +0000 (00:04 +0000)
This reintroduced back LU-4805, so reverting.

This reverts commit a2877f1f6c0fd0ec5ece836a216f42af3ef913da.

Change-Id: Id6728d4fa6a322a45579f0ce744d4c7161ac9d69
Reviewed-on: http://review.whamcloud.com/10203
Tested-by: Jenkins
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/obd.h
lustre/include/obd_class.h
lustre/liblustre/dir.c
lustre/liblustre/namei.c
lustre/liblustre/super.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/xattr_cache.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c

index 6b81564..eee94a3 100644 (file)
@@ -955,7 +955,6 @@ struct md_op_data {
         __u32                   op_fsgid;
         cfs_cap_t               op_cap;
         void                   *op_data;
-       size_t                  op_data_size;
 
         /* iattr fields and blocks. */
         struct iattr            op_attr;
@@ -1201,15 +1200,15 @@ struct md_ops {
                        __u64, struct ptlrpc_request **);
 
        int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *,
-                        const union ldlm_policy_data *,
                         struct lookup_intent *, struct md_op_data *,
-                        struct lustre_handle *, __u64);
+                        struct lustre_handle *, void *, int,
+                        struct ptlrpc_request **, __u64);
 
        int (*m_getattr)(struct obd_export *, struct md_op_data *,
                         struct ptlrpc_request **);
 
        int (*m_intent_lock)(struct obd_export *, struct md_op_data *,
-                            struct lookup_intent *,
+                            void *, int, struct lookup_intent *, int,
                             struct ptlrpc_request **,
                             ldlm_blocking_callback, __u64);
 
index d8b03dc..7464435 100644 (file)
@@ -1680,18 +1680,19 @@ static inline int md_done_writing(struct obd_export *exp,
 
 static inline int md_enqueue(struct obd_export *exp,
                             struct ldlm_enqueue_info *einfo,
-                            const union ldlm_policy_data *policy,
                             struct lookup_intent *it,
                             struct md_op_data *op_data,
                             struct lustre_handle *lockh,
+                            void *lmm, int lmmsize,
+                            struct ptlrpc_request **req,
                             __u64 extra_lock_flags)
 {
-       int rc;
-       ENTRY;
-       EXP_CHECK_MD_OP(exp, enqueue);
-       EXP_MD_COUNTER_INCREMENT(exp, enqueue);
-       rc = MDP(exp->exp_obd, enqueue)(exp, einfo, policy, it, op_data, lockh,
-                                       extra_lock_flags);
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, enqueue);
+        EXP_MD_COUNTER_INCREMENT(exp, enqueue);
+        rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh,
+                                        lmm, lmmsize, req, extra_lock_flags);
         RETURN(rc);
 }
 
@@ -1708,19 +1709,20 @@ static inline int md_getattr_name(struct obd_export *exp,
 }
 
 static inline int md_intent_lock(struct obd_export *exp,
-                                struct md_op_data *op_data,
-                                struct lookup_intent *it,
-                                struct ptlrpc_request **reqp,
+                                struct md_op_data *op_data, void *lmm,
+                                int lmmsize, struct lookup_intent *it,
+                                int lookup_flags, struct ptlrpc_request **reqp,
                                 ldlm_blocking_callback cb_blocking,
                                 __u64 extra_lock_flags)
 {
-       int rc;
-       ENTRY;
-       EXP_CHECK_MD_OP(exp, intent_lock);
-       EXP_MD_COUNTER_INCREMENT(exp, intent_lock);
-       rc = MDP(exp->exp_obd, intent_lock)(exp, op_data, it, reqp, cb_blocking,
-                                           extra_lock_flags);
-       RETURN(rc);
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, intent_lock);
+        EXP_MD_COUNTER_INCREMENT(exp, intent_lock);
+        rc = MDP(exp->exp_obd, intent_lock)(exp, op_data, lmm, lmmsize,
+                                            it, lookup_flags, reqp, cb_blocking,
+                                            extra_lock_flags);
+        RETURN(rc);
 }
 
 static inline int md_link(struct obd_export *exp, struct md_op_data *op_data,
index b14e1a2..706cc74 100644 (file)
@@ -86,8 +86,9 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
                        .ei_cbdata      = inode,
                };
 
-               rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, &it, &op_data,
-                               &lockh, LDLM_FL_CANCEL_ON_BLOCK);
+                rc = md_enqueue(sbi->ll_md_exp, &einfo, &it,
+                                &op_data, &lockh, NULL, 0, NULL,
+                                LDLM_FL_CANCEL_ON_BLOCK);
                 request = (struct ptlrpc_request *)it.d.lustre.it_data;
                 if (request)
                         ptlrpc_req_finished(request);
index c129f0b..23d6843 100644 (file)
@@ -263,8 +263,9 @@ static int llu_pb_revalidate(struct pnode *pnode, int flags,
                             pb->pb_ino, pb->pb_name.name, pb->pb_name.len,
                             0, LUSTRE_OPC_ANY);
 
-       rc = md_intent_lock(exp, &op_data, it, &req, &llu_md_blocking_ast,
-                           LDLM_FL_CANCEL_ON_BLOCK);
+        rc = md_intent_lock(exp, &op_data, NULL, 0, it, flags,
+                            &req, llu_md_blocking_ast,
+                            LDLM_FL_CANCEL_ON_BLOCK);
         /* If req is NULL, then md_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0)
@@ -428,8 +429,9 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode,
                             pnode->p_base->pb_name.name,
                             pnode->p_base->pb_name.len, flags, opc);
 
-       rc = md_intent_lock(llu_i2mdexp(parent), &op_data, it, &req,
-                           &llu_md_blocking_ast, LDLM_FL_CANCEL_ON_BLOCK);
+        rc = md_intent_lock(llu_i2mdexp(parent), &op_data, NULL, 0, it,
+                            flags, &req, llu_md_blocking_ast,
+                            LDLM_FL_CANCEL_ON_BLOCK);
         if (rc < 0)
                 GOTO(out, rc);
 
index 9980b95..35fbb57 100644 (file)
@@ -1631,8 +1631,8 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
 
         llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR,
                             LUSTRE_OPC_ANY);
-       rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, &oit, &data, &lockh,
-                       LDLM_FL_INTENT_ONLY);
+        rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data,
+                        &lockh, lum, lum_size, NULL, LDLM_FL_INTENT_ONLY);
         if (rc)
                 GOTO(out, rc);
 
index a9edd6f..9debc22 100644 (file)
@@ -416,7 +416,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         const char *name = file->f_dentry->d_name.name;
         const int len = file->f_dentry->d_name.len;
         struct md_op_data *op_data;
-       struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         __u32 opc = LUSTRE_OPC_ANY;
         int rc;
         ENTRY;
@@ -445,12 +445,9 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-       op_data->op_data = lmm;
-       op_data->op_data_size = lmmsize;
-
        itp->it_flags |= MDS_OPEN_BY_FID;
-       rc = md_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
-                           &ll_md_blocking_ast, 0);
+        rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
+                            0 /*unused */, &req, ll_md_blocking_ast, 0);
         ll_finish_md_op_data(op_data);
         if (rc == -ESTALE) {
                 /* reason for keep own exit path - don`t flood log
@@ -778,7 +775,7 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
        struct lookup_intent it = { .it_op = IT_OPEN };
        struct ll_sb_info *sbi = ll_i2sbi(inode);
        struct md_op_data *op_data;
-       struct ptlrpc_request *req = NULL;
+       struct ptlrpc_request *req;
        struct lustre_handle old_handle = { 0 };
        struct obd_client_handle *och = NULL;
        int rc;
@@ -844,15 +841,15 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
 
        it.it_flags = fmode | open_flags;
        it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
-       rc = md_intent_lock(sbi->ll_md_exp, op_data, &it, &req,
-                           &ll_md_blocking_lease_ast,
+       rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
+                               ll_md_blocking_lease_ast,
        /* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
         * it can be cancelled which may mislead applications that the lease is
         * broken;
         * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
         * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
         * doesn't deal with openhandle, so normal openhandle will be leaked. */
-                           LDLM_FL_NO_LRU | LDLM_FL_EXCL);
+                               LDLM_FL_NO_LRU | LDLM_FL_EXCL);
        ll_finish_md_op_data(op_data);
        ptlrpc_req_finished(req);
        if (rc < 0)
@@ -2993,8 +2990,8 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
               flock.l_flock.pid, flags, einfo.ei_mode,
               flock.l_flock.start, flock.l_flock.end);
 
-       rc = md_enqueue(sbi->ll_md_exp, &einfo, &flock, NULL, op_data, &lockh,
-                       flags);
+        rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
+                        op_data, &lockh, &flock, 0, NULL /* req */, flags);
 
        /* Restore the file lock type if not TEST lock. */
        if (!(flags & LDLM_FL_TEST_LOCK))
@@ -3010,8 +3007,8 @@ ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
        if (rc2 && file_lock->fl_type != F_UNLCK) {
                einfo.ei_mode = LCK_NL;
-               md_enqueue(sbi->ll_md_exp, &einfo, &flock, NULL, op_data,
-                          &lockh, flags);
+               md_enqueue(sbi->ll_md_exp, &einfo, NULL,
+                       op_data, &lockh, &flock, 0, NULL /* req */, flags);
                rc = rc2;
        }
 
@@ -3256,8 +3253,11 @@ static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
                         RETURN(PTR_ERR(op_data));
 
                 oit.it_create_mode |= M_CHECK_STALE;
-               rc = md_intent_lock(exp, op_data, &oit, &req,
-                                   &ll_md_blocking_ast, 0);
+                rc = md_intent_lock(exp, op_data, NULL, 0,
+                                    /* we are not interested in name
+                                       based lookup */
+                                    &oit, 0, &req,
+                                    ll_md_blocking_ast, 0);
                 ll_finish_md_op_data(op_data);
                 oit.it_create_mode &= ~M_CHECK_STALE;
                 if (rc < 0) {
@@ -3956,8 +3956,8 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        struct ldlm_enqueue_info einfo = {
                .ei_type = LDLM_IBITS,
                .ei_mode = LCK_CR,
-               .ei_cb_bl = &ll_md_blocking_ast,
-               .ei_cb_cp = &ldlm_completion_ast,
+               .ei_cb_bl = ll_md_blocking_ast,
+               .ei_cb_cp = ldlm_completion_ast,
        };
        int rc;
        ENTRY;
@@ -4003,7 +4003,8 @@ again:
                          ll_get_fsname(inode->i_sb, NULL, 0),
                          PFID(&lli->lli_fid), inode);
 
-       rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, &it, op_data, &lockh, 0);
+       rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
+                       NULL, 0, NULL, 0);
        if (it.d.lustre.it_data != NULL)
                ptlrpc_req_finished(it.d.lustre.it_data);
        it.d.lustre.it_data = NULL;
index 4df0021..d041fea 100644 (file)
@@ -564,8 +564,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
         if (!IS_POSIXACL(parent) || !exp_connect_umask(ll_i2mdexp(parent)))
                it->it_create_mode &= ~current_umask();
 
-       rc = md_intent_lock(ll_i2mdexp(parent), op_data, it, &req,
-                           &ll_md_blocking_ast, 0);
+        rc = md_intent_lock(ll_i2mdexp(parent), op_data, NULL, 0, it,
+                            lookup_flags, &req, ll_md_blocking_ast, 0);
         ll_finish_md_op_data(op_data);
         if (rc < 0)
                 GOTO(out, retval = ERR_PTR(rc));
index d5c0c19..9ef62c5 100644 (file)
@@ -314,12 +314,10 @@ static int ll_xattr_find_get_lock(struct inode *inode,
        struct lustre_handle lockh = { 0 };
        struct md_op_data *op_data;
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct ldlm_enqueue_info einfo = {
-               .ei_type = LDLM_IBITS,
-               .ei_mode = it_to_lock_mode(oit),
-               .ei_cb_bl = &ll_md_blocking_ast,
-               .ei_cb_cp = &ldlm_completion_ast,
-       };
+       struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
+                                          .ei_mode = it_to_lock_mode(oit),
+                                          .ei_cb_bl = ll_md_blocking_ast,
+                                          .ei_cb_cp = ldlm_completion_ast };
        struct ll_sb_info *sbi = ll_i2sbi(inode);
        struct obd_export *exp = sbi->ll_md_exp;
        int rc;
@@ -346,7 +344,7 @@ static int ll_xattr_find_get_lock(struct inode *inode,
 
        op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS;
 
-       rc = md_enqueue(exp, &einfo, NULL, oit, op_data, &lockh, 0);
+       rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0);
        ll_finish_md_op_data(op_data);
 
        if (rc < 0) {
index 855a23d..09ee3b2 100644 (file)
@@ -59,8 +59,9 @@
 #include <lprocfs_status.h>
 #include "lmv_internal.h"
 
-static int lmv_intent_remote(struct obd_export *exp, struct lookup_intent *it,
-                            const struct lu_fid *parent_fid,
+static int lmv_intent_remote(struct obd_export *exp, void *lmm,
+                            int lmmsize, struct lookup_intent *it,
+                            const struct lu_fid *parent_fid, int flags,
                             struct ptlrpc_request **reqp,
                             ldlm_blocking_callback cb_blocking,
                             __u64 extra_lock_flags)
@@ -126,8 +127,8 @@ static int lmv_intent_remote(struct obd_export *exp, struct lookup_intent *it,
        CDEBUG(D_INODE, "REMOTE_INTENT with fid="DFID" -> mds #%d\n",
               PFID(&body->fid1), tgt->ltd_idx);
 
-       rc = md_intent_lock(tgt->ltd_exp, op_data, it, &req, cb_blocking,
-                           extra_lock_flags);
+        rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+                            flags, &req, cb_blocking, extra_lock_flags);
         if (rc)
                 GOTO(out_free_op_data, rc);
 
@@ -229,8 +230,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
                CDEBUG(D_INODE, "Revalidate slave "DFID" -> mds #%d\n",
                       PFID(&fid), tgt->ltd_idx);
 
-               rc = md_intent_lock(tgt->ltd_exp, op_data, &it, &req,
-                                   cb_blocking, extra_lock_flags);
+               rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0,
+                                   &req, cb_blocking, extra_lock_flags);
                if (rc < 0)
                        GOTO(cleanup, rc);
 
@@ -338,8 +339,10 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
  * may be split dir.
  */
 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
-                   struct lookup_intent *it, struct ptlrpc_request **reqp,
-                   ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking,
+                   __u64 extra_lock_flags)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
@@ -391,8 +394,8 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
               " name='%s' -> mds #%d\n", PFID(&op_data->op_fid1),
               PFID(&op_data->op_fid2), op_data->op_name, tgt->ltd_idx);
 
-       rc = md_intent_lock(tgt->ltd_exp, op_data, it, reqp, cb_blocking,
-                           extra_lock_flags);
+       rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it, flags,
+                           reqp, cb_blocking, extra_lock_flags);
        if (rc != 0)
                RETURN(rc);
        /*
@@ -410,8 +413,9 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
 
        /* Not cross-ref case, just get out of here. */
        if (unlikely((body->valid & OBD_MD_MDS))) {
-               rc = lmv_intent_remote(exp, it, &op_data->op_fid1, reqp,
-                                      cb_blocking, extra_lock_flags);
+               rc = lmv_intent_remote(exp, lmm, lmmsize, it, &op_data->op_fid1,
+                                      flags, reqp, cb_blocking,
+                                      extra_lock_flags);
                if (rc != 0)
                        RETURN(rc);
 
@@ -426,11 +430,11 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
 /*
  * Handler for: getattr, lookup and revalidate cases.
  */
-static int
-lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
-                 struct lookup_intent *it, struct ptlrpc_request **reqp,
-                 ldlm_blocking_callback cb_blocking,
-                 __u64 extra_lock_flags)
+int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
+                      void *lmm, int lmmsize, struct lookup_intent *it,
+                      int flags, struct ptlrpc_request **reqp,
+                      ldlm_blocking_callback cb_blocking,
+                     __u64 extra_lock_flags)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
@@ -455,8 +459,8 @@ lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
 
        op_data->op_bias &= ~MDS_CROSS_REF;
 
-       rc = md_intent_lock(tgt->ltd_exp, op_data, it, reqp, cb_blocking,
-                           extra_lock_flags);
+       rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+                            flags, reqp, cb_blocking, extra_lock_flags);
        if (rc < 0)
                RETURN(rc);
 
@@ -489,9 +493,8 @@ lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
 
                op_data->op_fid1 = lsm->lsm_md_oinfo[1].lmo_fid;
                it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
-               rc = md_intent_lock(tgt->ltd_exp, op_data, it, reqp,
-                                   cb_blocking, extra_lock_flags);
-               RETURN(rc);
+               rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+                                   flags, reqp, cb_blocking, extra_lock_flags);
        }
        /*
         * MDS has returned success. Probably name has been resolved in
@@ -503,8 +506,8 @@ lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
 
        /* Not cross-ref case, just get out of here. */
        if (unlikely((body->valid & OBD_MD_MDS))) {
-               rc = lmv_intent_remote(exp, it, NULL, reqp, cb_blocking,
-                                      extra_lock_flags);
+               rc = lmv_intent_remote(exp, lmm, lmmsize, it, NULL, flags,
+                                      reqp, cb_blocking, extra_lock_flags);
                if (rc != 0)
                        RETURN(rc);
                body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
@@ -516,8 +519,9 @@ lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
-                   struct lookup_intent *it, struct ptlrpc_request **reqp,
-                   ldlm_blocking_callback cb_blocking,
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking,
                    __u64 extra_lock_flags)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -535,14 +539,15 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-       if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))
-               rc = lmv_intent_lookup(exp, op_data, it, reqp, cb_blocking,
-                                      extra_lock_flags);
-       else if (it->it_op & IT_OPEN)
-               rc = lmv_intent_open(exp, op_data, it, reqp, cb_blocking,
-                                    extra_lock_flags);
-       else
-               LBUG();
-
-       RETURN(rc);
+        if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))
+                rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
+                                       flags, reqp, cb_blocking,
+                                       extra_lock_flags);
+        else if (it->it_op & IT_OPEN)
+                rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
+                                     flags, reqp, cb_blocking,
+                                     extra_lock_flags);
+        else
+                LBUG();
+        RETURN(rc);
 }
index 4882a84..622b2e1 100644 (file)
 int lmv_check_connect(struct obd_device *obd);
 
 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
-                   struct lookup_intent *it, struct ptlrpc_request **reqp,
-                   ldlm_blocking_callback cb_blocking,
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking,
+                   __u64 extra_lock_flags);
+
+int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
+                      void *lmm, int lmmsize, struct lookup_intent *it,
+                      int flags, struct ptlrpc_request **reqp,
+                      ldlm_blocking_callback cb_blocking,
+                     __u64 extra_lock_flags);
+
+int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking,
                    __u64 extra_lock_flags);
 
 int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
index 3936c06..ad76048 100644 (file)
@@ -1879,10 +1879,69 @@ static int lmv_done_writing(struct obd_export *exp,
 }
 
 static int
+lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                  struct lookup_intent *it, struct md_op_data *op_data,
+                  struct lustre_handle *lockh, void *lmm, int lmmsize,
+                  __u64 extra_lock_flags)
+{
+        struct ptlrpc_request      *req = it->d.lustre.it_data;
+        struct obd_device          *obd = exp->exp_obd;
+        struct lmv_obd             *lmv = &obd->u.lmv;
+        struct lustre_handle        plock;
+        struct lmv_tgt_desc        *tgt;
+        struct md_op_data          *rdata;
+        struct lu_fid               fid1;
+        struct mdt_body            *body;
+        int                         rc = 0;
+        int                         pmode;
+        ENTRY;
+
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        LASSERT(body != NULL);
+
+        if (!(body->valid & OBD_MD_MDS))
+                RETURN(0);
+
+        CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
+               LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
+
+        /*
+         * We got LOOKUP lock, but we really need attrs.
+         */
+        pmode = it->d.lustre.it_lock_mode;
+        LASSERT(pmode != 0);
+        memcpy(&plock, lockh, sizeof(plock));
+        it->d.lustre.it_lock_mode = 0;
+        it->d.lustre.it_data = NULL;
+        fid1 = body->fid1;
+
+        ptlrpc_req_finished(req);
+
+        tgt = lmv_find_target(lmv, &fid1);
+        if (IS_ERR(tgt))
+                GOTO(out, rc = PTR_ERR(tgt));
+
+        OBD_ALLOC_PTR(rdata);
+        if (rdata == NULL)
+                GOTO(out, rc = -ENOMEM);
+
+        rdata->op_fid1 = fid1;
+        rdata->op_bias = MDS_CROSS_REF;
+
+        rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
+                        lmm, lmmsize, NULL, extra_lock_flags);
+        OBD_FREE_PTR(rdata);
+        EXIT;
+out:
+        ldlm_lock_decref(&plock, pmode);
+        return rc;
+}
+
+static int
 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-           const union ldlm_policy_data *policy,
-           struct lookup_intent *it, struct md_op_data *op_data,
-           struct lustre_handle *lockh, __u64 extra_lock_flags)
+            struct lookup_intent *it, struct md_op_data *op_data,
+            struct lustre_handle *lockh, void *lmm, int lmmsize,
+           struct ptlrpc_request **req, __u64 extra_lock_flags)
 {
        struct obd_device        *obd = exp->exp_obd;
        struct lmv_obd           *lmv = &obd->u.lmv;
@@ -1904,9 +1963,13 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
        CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
               LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
 
-       rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
-                       extra_lock_flags);
+       rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
+                       lmm, lmmsize, req, extra_lock_flags);
 
+       if (rc == 0 && it && it->it_op == IT_OPEN) {
+               rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
+                                       lmm, lmmsize, extra_lock_flags);
+       }
        RETURN(rc);
 }
 
index cf66859..68c5cc6 100644 (file)
@@ -83,16 +83,16 @@ int mdc_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
                     ldlm_iterator_t it, void *data);
 
 int mdc_intent_lock(struct obd_export *exp,
-                   struct md_op_data *op_data,
-                   struct lookup_intent *it,
-                   struct ptlrpc_request **reqp,
+                    struct md_op_data *,
+                    void *lmm, int lmmsize,
+                    struct lookup_intent *, int,
+                    struct ptlrpc_request **reqp,
                    ldlm_blocking_callback cb_blocking,
                    __u64 extra_lock_flags);
-
 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-               const union ldlm_policy_data *policy,
-               struct lookup_intent *it, struct md_op_data *op_data,
-               struct lustre_handle *lockh, __u64 extra_lock_flags);
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+               struct ptlrpc_request **req, __u64 extra_lock_flags);
 
 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
                             cfs_list_t *cancels, ldlm_mode_t mode,
index 67a2032..ffc49c3 100644 (file)
@@ -263,23 +263,21 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
         }
 }
 
-static struct ptlrpc_request *
-mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
-                    struct md_op_data *op_data)
+static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
+                                                   struct lookup_intent *it,
+                                                   struct md_op_data *op_data,
+                                                   void *lmm, int lmmsize,
+                                                   void *cb_data)
 {
-       struct ptlrpc_request   *req;
-       struct obd_device       *obddev = class_exp2obd(exp);
-       struct ldlm_intent      *lit;
-       const void              *lmm = op_data->op_data;
-       int                      lmmsize = op_data->op_data_size;
-       struct list_head         cancels;
-       int count = 0;
-       int mode;
-       int rc;
+        struct ptlrpc_request *req;
+        struct obd_device     *obddev = class_exp2obd(exp);
+        struct ldlm_intent    *lit;
+        CFS_LIST_HEAD(cancels);
+        int                    count = 0;
+        int                    mode;
+        int                    rc;
         ENTRY;
 
-       INIT_LIST_HEAD(&cancels);
-
         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
 
         /* XXX: openlock is not cancelled for cross-refs. */
@@ -329,8 +327,8 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
 
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
-       req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
-                            max(lmmsize, obddev->u.cli.cl_default_mds_easize));
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                             max(lmmsize, obddev->u.cli.cl_default_mds_easize));
 
        rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc < 0) {
@@ -779,11 +777,10 @@ static int mdc_finish_enqueue(struct obd_export *exp,
 
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
-               struct ldlm_enqueue_info *einfo,
-               const union ldlm_policy_data *policy,
-               struct lookup_intent *it, struct md_op_data *op_data,
-               struct lustre_handle *lockh, __u64 extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+               struct ptlrpc_request **reqp, __u64 extra_lock_flags)
 {
         struct obd_device     *obddev = class_exp2obd(exp);
         struct ptlrpc_request *req = NULL;
@@ -798,6 +795,7 @@ int mdc_enqueue(struct obd_export *exp,
                            { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
        static const ldlm_policy_data_t getxattr_policy = {
                              .l_inodebits = { MDS_INODELOCK_XATTR } };
+        ldlm_policy_data_t const *policy = &lookup_policy;
         int                    generation, resends = 0;
         struct ldlm_reply     *lockrep;
        enum lvb_type          lvb_type = 0;
@@ -805,33 +803,38 @@ int mdc_enqueue(struct obd_export *exp,
 
         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
                  einfo->ei_type);
-        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 
-       if (it != NULL) {
-               LASSERT(policy == NULL);
+        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 
+       if (it) {
                saved_flags |= LDLM_FL_HAS_INTENT;
-               if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
+               if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
                        policy = &update_policy;
                else if (it->it_op & IT_LAYOUT)
                        policy = &layout_policy;
                else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
                        policy = &getxattr_policy;
-               else
-                       policy = &lookup_policy;
        }
 
+        LASSERT(reqp == NULL);
+
         generation = obddev->u.cli.cl_import->imp_generation;
 resend:
         flags = saved_flags;
-       if (it == NULL) {
-               /* The only way right now is FLOCK. */
-               LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
-                        einfo->ei_type);
-               res_id.name[3] = LDLM_FLOCK;
-       } else if (it->it_op & IT_OPEN) {
-               LASSERT(einfo->ei_cbdata == NULL);
-               req = mdc_intent_open_pack(exp, it, op_data);
+        if (!it) {
+                /* The only way right now is FLOCK, in this case we hide flock
+                   policy as lmm, but lmmsize is 0 */
+                LASSERT(lmm && lmmsize == 0);
+                LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
+                         einfo->ei_type);
+                policy = (ldlm_policy_data_t *)lmm;
+                res_id.name[3] = LDLM_FLOCK;
+        } else if (it->it_op & IT_OPEN) {
+                req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
+                                           einfo->ei_cbdata);
+                policy = &update_policy;
+                einfo->ei_cbdata = NULL;
+                lmm = NULL;
        } else if (it->it_op & IT_UNLINK) {
                req = mdc_intent_unlink_pack(exp, it, op_data);
        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
@@ -1151,8 +1154,10 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
  * child lookup.
  */
 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
-                   struct lookup_intent *it, struct ptlrpc_request **reqp,
-                   ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int lookup_flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking,
+                   __u64 extra_lock_flags)
 {
        struct ldlm_enqueue_info einfo = {
                .ei_type        = LDLM_IBITS,
@@ -1193,13 +1198,12 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                        RETURN(rc);
                }
        }
-
-       rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
+       rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
                         extra_lock_flags);
        if (rc < 0)
                RETURN(rc);
 
-       *reqp = it->d.lustre.it_data;
+        *reqp = it->d.lustre.it_data;
         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
         RETURN(rc);
 }
index 862687e..92f0332 100644 (file)
@@ -1528,7 +1528,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 
        mapping = dir->i_mapping;
 
-       rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
+       rc = mdc_intent_lock(exp, op_data, NULL, 0, &it, 0, &enq_req,
                             cb_op->md_blocking_ast, 0);
        if (enq_req != NULL)
                ptlrpc_req_finished(enq_req);