Whamcloud - gitweb
LU-6578 statahead: drop support for remote entry 67/15767/12
authorLai Siyao <lai.siyao@intel.com>
Tue, 28 Jul 2015 02:44:55 +0000 (10:44 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 22 Sep 2015 23:22:58 +0000 (23:22 +0000)
This patch dropped support for remote entry statahead, because it
needs 2 async RPCs to fetch both LOOKUP lock from parent MDT and
UPDATE lock from client MDT, which is complicated. And not
supporting remote entry statahead won't cause any issue.

* pack child fid in statahead request.
* lmv_intent_getattr_async() will compare parent and child MDT,
  if child is remote, return -ENOTSUPP.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I8c075bab0a716f194eac3c338ffbdd37f787eff6
Reviewed-on: http://review.whamcloud.com/15767
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd.h
lustre/include/obd_class.h
lustre/llite/statahead.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c

index 5139b69..291f4b3 100644 (file)
@@ -875,12 +875,13 @@ typedef int (* md_enqueue_cb_t)(struct ptlrpc_request *req,
                                 int rc);
 
 struct md_enqueue_info {
-       struct md_op_data       mi_data;
-       struct lookup_intent    mi_it;
-       struct lustre_handle    mi_lockh;
-       struct inode           *mi_dir;
-       md_enqueue_cb_t         mi_cb;
-       void                   *mi_cbdata;
+       struct md_op_data               mi_data;
+       struct lookup_intent            mi_it;
+       struct lustre_handle            mi_lockh;
+       struct inode                   *mi_dir;
+       struct ldlm_enqueue_info        mi_einfo;
+       md_enqueue_cb_t                 mi_cb;
+       void                           *mi_cbdata;
 };
 
 struct obd_ops {
@@ -1078,9 +1079,8 @@ struct md_ops {
                          u64, const char *, const char *, int, int, int,
                          struct ptlrpc_request **);
 
-        int (*m_intent_getattr_async)(struct obd_export *,
-                                      struct md_enqueue_info *,
-                                      struct ldlm_enqueue_info *);
+       int (*m_intent_getattr_async)(struct obd_export *,
+                                     struct md_enqueue_info *);
 
         int (*m_revalidate_lock)(struct obd_export *, struct lookup_intent *,
                                  struct lu_fid *, __u64 *bits);
index e0c0ca3..7a37ba2 100644 (file)
@@ -1603,15 +1603,14 @@ static inline int md_get_remote_perm(struct obd_export *exp,
 }
 
 static inline int md_intent_getattr_async(struct obd_export *exp,
-                                          struct md_enqueue_info *minfo,
-                                          struct ldlm_enqueue_info *einfo)
+                                         struct md_enqueue_info *minfo)
 {
-        int rc;
-        ENTRY;
-        EXP_CHECK_MD_OP(exp, intent_getattr_async);
-        EXP_MD_COUNTER_INCREMENT(exp, intent_getattr_async);
-        rc = MDP(exp->exp_obd, intent_getattr_async)(exp, minfo, einfo);
-        RETURN(rc);
+       int rc;
+       ENTRY;
+       EXP_CHECK_MD_OP(exp, intent_getattr_async);
+       EXP_MD_COUNTER_INCREMENT(exp, intent_getattr_async);
+       rc = MDP(exp->exp_obd, intent_getattr_async)(exp, minfo);
+       RETURN(rc);
 }
 
 static inline int md_revalidate_lock(struct obd_export *exp,
index 5e136b1..c7494cc 100644 (file)
@@ -82,6 +82,8 @@ struct sa_entry {
        struct inode           *se_inode;
        /* entry name */
        struct qstr             se_qstr;
+       /* entry fid */
+       struct lu_fid           se_fid;
 };
 
 static unsigned int sai_generation = 0;
@@ -181,7 +183,8 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 
 /* allocate sa_entry and hash it to allow scanner process to find it */
 static struct sa_entry *
-sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
+sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len,
+        const struct lu_fid *fid)
 {
        struct ll_inode_info *lli;
        struct sa_entry *entry;
@@ -207,6 +210,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        entry->se_qstr.hash = full_name_hash(name, len);
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
+       entry->se_fid = *fid;
 
        lli = ll_i2info(sai->sai_dentry->d_inode);
 
@@ -577,29 +581,16 @@ static void sa_instantiate(struct ll_statahead_info *sai,
         if (body == NULL)
                 GOTO(out, rc = -EFAULT);
 
-        child = entry->se_inode;
-        if (child == NULL) {
-                /*
-                 * lookup.
-                 */
-                LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
-
-                /* XXX: No fid in reply, this is probaly cross-ref case.
-                 * SA can't handle it yet. */
-               if (body->mbo_valid & OBD_MD_MDS)
-                       GOTO(out, rc = -EAGAIN);
-       } else {
-               /*
-                * revalidate.
-                */
-               /* unlinked and re-created with the same name */
+       child = entry->se_inode;
+       if (child != NULL) {
+               /* revalidate; unlinked and re-created with the same name */
                if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
                                        &body->mbo_fid1))) {
-                        entry->se_inode = NULL;
-                        iput(child);
-                        child = NULL;
-                }
-        }
+                       entry->se_inode = NULL;
+                       iput(child);
+                       child = NULL;
+               }
+       }
 
         it->d.lustre.it_lock_handle = entry->se_handle;
        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
@@ -719,78 +710,66 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
 }
 
 /* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo,
-                         struct ldlm_enqueue_info *einfo)
+static void sa_fini_data(struct md_enqueue_info *minfo)
 {
-        LASSERT(minfo && einfo);
         iput(minfo->mi_dir);
         OBD_FREE_PTR(minfo);
-        OBD_FREE_PTR(einfo);
 }
 
 /*
  * prepare arguments for async stat RPC.
  */
-static int sa_prep_data(struct inode *dir, struct inode *child,
-                       struct sa_entry *entry, struct md_enqueue_info **pmi,
-                       struct ldlm_enqueue_info **pei)
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
 {
-        struct qstr              *qstr = &entry->se_qstr;
-        struct md_enqueue_info   *minfo;
-        struct ldlm_enqueue_info *einfo;
-        struct md_op_data        *op_data;
-
-        OBD_ALLOC_PTR(einfo);
-        if (einfo == NULL)
-                return -ENOMEM;
-
-        OBD_ALLOC_PTR(minfo);
-        if (minfo == NULL) {
-                OBD_FREE_PTR(einfo);
-                return -ENOMEM;
-        }
+       struct md_enqueue_info   *minfo;
+       struct ldlm_enqueue_info *einfo;
+       struct md_op_data        *op_data;
 
-        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
-                                     qstr->len, 0, LUSTRE_OPC_ANY, NULL);
-        if (IS_ERR(op_data)) {
-                OBD_FREE_PTR(einfo);
-                OBD_FREE_PTR(minfo);
-                return PTR_ERR(op_data);
-        }
+       OBD_ALLOC_PTR(minfo);
+       if (minfo == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, NULL, 0, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               OBD_FREE_PTR(minfo);
+               return (struct md_enqueue_info *)op_data;
+       }
+
+       if (child == NULL)
+               op_data->op_fid2 = entry->se_fid;
 
        minfo->mi_it.it_op = IT_GETATTR;
        minfo->mi_dir = igrab(dir);
        minfo->mi_cb = ll_statahead_interpret;
        minfo->mi_cbdata = entry;
 
-        einfo->ei_type   = LDLM_IBITS;
-        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
-        einfo->ei_cb_bl  = ll_md_blocking_ast;
-        einfo->ei_cb_cp  = ldlm_completion_ast;
-        einfo->ei_cb_gl  = NULL;
-        einfo->ei_cbdata = NULL;
-
-        *pmi = minfo;
-        *pei = einfo;
+       einfo = &minfo->mi_einfo;
+       einfo->ei_type   = LDLM_IBITS;
+       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+       einfo->ei_cb_bl  = ll_md_blocking_ast;
+       einfo->ei_cb_cp  = ldlm_completion_ast;
+       einfo->ei_cb_gl  = NULL;
+       einfo->ei_cbdata = NULL;
 
-        return 0;
+       return minfo;
 }
 
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
        struct md_enqueue_info   *minfo;
-       struct ldlm_enqueue_info *einfo;
        int                       rc;
        ENTRY;
 
-       rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo);
-       if (rc)
-               RETURN(rc);
+       minfo = sa_prep_data(dir, NULL, entry);
+       if (IS_ERR(minfo))
+               RETURN(PTR_ERR(minfo));
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
        if (rc < 0)
-               sa_fini_data(minfo, einfo);
+               sa_fini_data(minfo);
 
        RETURN(rc);
 }
@@ -809,7 +788,6 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        struct lookup_intent it = { .it_op = IT_GETATTR,
                                    .d.lustre.it_lock_handle = 0 };
        struct md_enqueue_info *minfo;
-       struct ldlm_enqueue_info *einfo;
        int rc;
        ENTRY;
 
@@ -828,25 +806,26 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                RETURN(1);
        }
 
-       rc = sa_prep_data(dir, inode, entry, &minfo, &einfo);
-       if (rc) {
+       minfo = sa_prep_data(dir, inode, entry);
+       if (IS_ERR(minfo)) {
                entry->se_inode = NULL;
                iput(inode);
-               RETURN(rc);
+               RETURN(PTR_ERR(minfo));
        }
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
-               sa_fini_data(minfo, einfo);
+               sa_fini_data(minfo);
        }
 
        RETURN(rc);
 }
 
 /* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len)
+static void sa_statahead(struct dentry *parent, const char *name, int len,
+                        const struct lu_fid *fid)
 {
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -856,7 +835,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len)
        int rc;
        ENTRY;
 
-       entry = sa_alloc(sai, sai->sai_index, name, len);
+       entry = sa_alloc(sai, sai->sai_index, name, len, fid);
        if (IS_ERR(entry))
                RETURN_EXIT;
 
@@ -1052,6 +1031,7 @@ static int ll_statahead_thread(void *arg)
                        __u64 hash;
                        int namelen;
                        char *name;
+                       struct lu_fid fid;
 
                        hash = le64_to_cpu(ent->lde_hash);
                        if (unlikely(hash < pos))
@@ -1094,6 +1074,8 @@ static int ll_statahead_thread(void *arg)
                        if (unlikely(++first == 1))
                                continue;
 
+                       fid_le_to_cpu(&fid, &ent->lde_fid);
+
                        /* wait for spare statahead window */
                        do {
                                l_wait_event(sa_thread->t_ctl_waitq,
@@ -1123,7 +1105,7 @@ static int ll_statahead_thread(void *arg)
                        } while (sa_sent_full(sai) &&
                                 thread_is_running(sa_thread));
 
-                       sa_statahead(parent, name, namelen);
+                       sa_statahead(parent, name, namelen, &fid);
                }
 
                pos = le64_to_cpu(dp->ldp_hash_end);
index 7acb440..8d90c41 100644 (file)
@@ -3187,25 +3187,41 @@ static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
 }
 
 int lmv_intent_getattr_async(struct obd_export *exp,
-                             struct md_enqueue_info *minfo,
-                             struct ldlm_enqueue_info *einfo)
+                            struct md_enqueue_info *minfo)
 {
        struct md_op_data       *op_data = &minfo->mi_data;
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
-       struct lmv_tgt_desc     *tgt = NULL;
+       struct lmv_tgt_desc     *ptgt = NULL;
+       struct lmv_tgt_desc     *ctgt = NULL;
        int                      rc;
        ENTRY;
 
+       if (!fid_is_sane(&op_data->op_fid2))
+               RETURN(-EINVAL);
+
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
 
-       tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
-       if (IS_ERR(tgt))
-               RETURN(PTR_ERR(tgt));
+       ptgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+       if (IS_ERR(ptgt))
+               RETURN(PTR_ERR(ptgt));
+
+       ctgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+       if (IS_ERR(ctgt))
+               RETURN(PTR_ERR(ctgt));
+
+       /*
+        * if child is on remote MDT, we need 2 async RPCs to fetch both LOOKUP
+        * lock on parent, and UPDATE lock on child MDT, which makes all
+        * complicated. Considering remote dir is rare case, and not supporting
+        * it in statahead won't cause any issue, drop its support for now.
+        */
+       if (ptgt != ctgt)
+               RETURN(-ENOTSUPP);
 
-       rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
+       rc = md_intent_getattr_async(ptgt->ltd_exp, minfo);
        RETURN(rc);
 }
 
index 019127a..45217eb 100644 (file)
@@ -134,8 +134,7 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
                         struct lu_fid *fid, __u64 *bits);
 
 int mdc_intent_getattr_async(struct obd_export *exp,
-                             struct md_enqueue_info *minfo,
-                             struct ldlm_enqueue_info *einfo);
+                            struct md_enqueue_info *minfo);
 
 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
                              const struct lu_fid *fid, enum ldlm_type type,
index 89a0638..aaa6771 100644 (file)
@@ -48,9 +48,8 @@
 #include "mdc_internal.h"
 
 struct mdc_getattr_args {
-        struct obd_export           *ga_exp;
-        struct md_enqueue_info      *ga_minfo;
-        struct ldlm_enqueue_info    *ga_einfo;
+       struct obd_export               *ga_exp;
+       struct md_enqueue_info          *ga_minfo;
 };
 
 int it_open_error(int phase, struct lookup_intent *it)
@@ -1151,16 +1150,16 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
                                               struct ptlrpc_request *req,
                                               void *args, int rc)
 {
-        struct mdc_getattr_args  *ga = args;
-        struct obd_export        *exp = ga->ga_exp;
-        struct md_enqueue_info   *minfo = ga->ga_minfo;
-        struct ldlm_enqueue_info *einfo = ga->ga_einfo;
-        struct lookup_intent     *it;
-        struct lustre_handle     *lockh;
-        struct obd_device        *obddev;
+       struct mdc_getattr_args  *ga = args;
+       struct obd_export        *exp = ga->ga_exp;
+       struct md_enqueue_info   *minfo = ga->ga_minfo;
+       struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
+       struct lookup_intent     *it;
+       struct lustre_handle     *lockh;
+       struct obd_device        *obddev;
        struct ldlm_reply        *lockrep;
        __u64                     flags = LDLM_FL_HAS_INTENT;
-        ENTRY;
+       ENTRY;
 
         it    = &minfo->mi_it;
         lockh = &minfo->mi_lockh;
@@ -1193,14 +1192,12 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
         EXIT;
 
 out:
-        OBD_FREE_PTR(einfo);
         minfo->mi_cb(req, minfo, rc);
         return 0;
 }
 
 int mdc_intent_getattr_async(struct obd_export *exp,
-                            struct md_enqueue_info *minfo,
-                            struct ldlm_enqueue_info *einfo)
+                            struct md_enqueue_info *minfo)
 {
        struct md_op_data       *op_data = &minfo->mi_data;
        struct lookup_intent    *it = &minfo->mi_it;
@@ -1208,9 +1205,6 @@ int mdc_intent_getattr_async(struct obd_export *exp,
        struct mdc_getattr_args *ga;
        struct obd_device       *obddev = class_exp2obd(exp);
        struct ldlm_res_id       res_id;
-       /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
-        *     for statahead currently. Consider CMD in future, such two bits
-        *     maybe managed by different MDS, should be adjusted then. */
        union ldlm_policy_data policy = {
                                .l_inodebits = { MDS_INODELOCK_LOOKUP |
                                                 MDS_INODELOCK_UPDATE } };
@@ -1229,27 +1223,26 @@ int mdc_intent_getattr_async(struct obd_export *exp,
                RETURN(PTR_ERR(req));
 
        rc = obd_get_request_slot(&obddev->u.cli);
-        if (rc != 0) {
-                ptlrpc_req_finished(req);
-                RETURN(rc);
-        }
+       if (rc != 0) {
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
 
-        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
-                             0, LVB_T_NONE, &minfo->mi_lockh, 1);
-        if (rc < 0) {
+       rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
+                             &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
+       if (rc < 0) {
                obd_put_request_slot(&obddev->u.cli);
-                ptlrpc_req_finished(req);
-                RETURN(rc);
-        }
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
 
-        CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
-        ga = ptlrpc_req_async_args(req);
-        ga->ga_exp = exp;
-        ga->ga_minfo = minfo;
-        ga->ga_einfo = einfo;
+       CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
+       ga = ptlrpc_req_async_args(req);
+       ga->ga_exp = exp;
+       ga->ga_minfo = minfo;
 
        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
        ptlrpcd_add_req(req);
 
-        RETURN(0);
+       RETURN(0);
 }