Whamcloud - gitweb
LU-169 ldlm: add support for layout lock
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Fri, 29 Jun 2012 07:52:19 +0000 (15:52 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 9 Jul 2012 03:24:14 +0000 (23:24 -0400)
Add support for layout lock at client side.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: Icbdf691134bfb403b0e2019ed364da3e3a11bf5c
Reviewed-on: http://review.whamcloud.com/2025
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
13 files changed:
lustre/include/cl_object.h
lustre/ldlm/ldlm_lockd.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lov_object.c
lustre/lov/lov_page.c
lustre/lov/lovsub_lock.c
lustre/mdc/mdc_locks.c

index 53bb5ca..8f27bd6 100644 (file)
@@ -276,6 +276,16 @@ struct cl_object_conf {
          * VFS inode. This is consumed by vvp.
          */
         struct inode             *coc_inode;
+       /**
+        * Validate object conf. If object is using an invalid conf,
+        * then invalidate it and set the new layout.
+        */
+       bool                      coc_validate_only;
+       /**
+        * Invalidate the current stripe configuration due to losing
+        * layout lock.
+        */
+       bool                      coc_invalidate;
 };
 
 /**
index 7833e72..d4134b9 100644 (file)
@@ -1553,6 +1553,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                     struct ldlm_request *dlm_req,
                                     struct ldlm_lock *lock)
 {
+       int lvb_len;
         CFS_LIST_HEAD(ast_list);
         ENTRY;
 
@@ -1569,6 +1570,33 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 }
         }
 
+       lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
+       if (lvb_len > 0) {
+               if (lock->l_lvb_len > 0) {
+                       /* for extent lock, lvb contains ost_lvb{}. */
+                       LASSERT(lock->l_lvb_data != NULL);
+                       LASSERTF(lock->l_lvb_len == lvb_len,
+                               "preallocated %d, actual %d.\n",
+                               lock->l_lvb_len, lvb_len);
+               } else { /* for layout lock, lvb has variable length */
+                       void *lvb_data;
+
+                       OBD_ALLOC(lvb_data, lvb_len);
+                       if (lvb_data == NULL)
+                               LDLM_ERROR(lock, "no memory.\n");
+
+                       lock_res_and_lock(lock);
+                       if (lvb_data == NULL) {
+                               lock->l_flags |= LDLM_FL_FAILED;
+                       } else {
+                               LASSERT(lock->l_lvb_data == NULL);
+                               lock->l_lvb_data = lvb_data;
+                               lock->l_lvb_len = lvb_len;
+                       }
+                       unlock_res_and_lock(lock);
+               }
+       }
+
         lock_res_and_lock(lock);
         if (lock->l_destroyed ||
             lock->l_granted_mode == lock->l_req_mode) {
index c9dad81..8ac9cdf 100644 (file)
@@ -1487,7 +1487,7 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
        lsm = ccc_inode_lsm_get(inode);
        if (lsm != NULL)
                rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
-                               lsm, (void *)arg);
+                                  lsm, (void *)arg);
        ccc_inode_lsm_put(inode, lsm);
        RETURN(rc);
 }
@@ -2800,3 +2800,141 @@ enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
                 *rcp = rc;
         return ret;
 }
+
+int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct cl_env_nest nest;
+       struct lu_env *env;
+       int result;
+       ENTRY;
+
+       if (lli->lli_clob == NULL)
+               RETURN(0);
+
+       env = cl_env_nested_get(&nest);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       result = cl_conf_set(env, lli->lli_clob, conf);
+       cl_env_nested_put(&nest, env);
+       RETURN(result);
+}
+
+/**
+ * This function checks if there exists a LAYOUT lock on the client side,
+ * or enqueues it if it doesn't have one in cache.
+ *
+ * This function will not hold layout lock so it may be revoked any time after
+ * this function returns. Any operations depend on layout should be redone
+ * in that case.
+ *
+ * This function should be called before lov_io_init() to get an uptodate
+ * layout version, the caller should save the version number and after IO
+ * is finished, this function should be called again to verify that layout
+ * is not changed during IO time.
+ */
+int ll_layout_refresh(struct inode *inode, __u32 *gen)
+{
+       struct ll_inode_info  *lli = ll_i2info(inode);
+       struct ll_sb_info     *sbi = ll_i2sbi(inode);
+       struct md_op_data     *op_data = NULL;
+       struct ptlrpc_request *req = NULL;
+       struct lookup_intent   it = { .it_op = IT_LAYOUT };
+       struct lustre_handle   lockh;
+       ldlm_mode_t            mode;
+       struct cl_object_conf  conf = {  .coc_inode = inode,
+                                        .coc_validate_only = true };
+       int rc;
+       ENTRY;
+
+       *gen = 0;
+       if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
+               RETURN(0);
+
+       /* sanity checks */
+       LASSERT(fid_is_sane(ll_inode2fid(inode)));
+       LASSERT(S_ISREG(inode->i_mode));
+
+       /* mostly layout lock is caching on the local side, so try to match
+        * it before grabbing layout lock mutex. */
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
+       if (mode != 0) { /* hit cached lock */
+               struct lov_stripe_md *lsm;
+
+               lsm = ccc_inode_lsm_get(inode);
+               if (lsm != NULL)
+                       *gen = lsm->lsm_layout_gen;
+               ccc_inode_lsm_put(inode, lsm);
+               ldlm_lock_decref(&lockh, mode);
+
+               RETURN(0);
+       }
+
+       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
+                                    0, 0, LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data))
+               RETURN(PTR_ERR(op_data));
+
+       /* take layout lock mutex to enqueue layout lock exclusively. */
+       cfs_mutex_lock(&lli->lli_layout_mutex);
+
+       /* make sure the old conf goes away */
+       ll_layout_conf(inode, &conf);
+
+       /* enqueue layout lock */
+       rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
+                       &req, ll_md_blocking_ast, 0);
+       if (rc == 0) {
+               /* we get a new lock, so update the lock data */
+               lockh.cookie = it.d.lustre.it_lock_handle;
+               md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
+
+               /* req == NULL is when lock was found in client cache, without
+                * any request to server (but lsm can be canceled just after a
+                * release) */
+               if (req != NULL) {
+                       struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
+                       struct lustre_md md = { NULL };
+                       void *lmm;
+                       int lmmsize;
+
+                       /* for IT_LAYOUT lock, lmm is returned in lock's lvb
+                        * data via completion callback */
+                       LASSERT(lock != NULL);
+                       lmm = lock->l_lvb_data;
+                       lmmsize = lock->l_lvb_len;
+                       if (lmm != NULL)
+                               rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+                                               lmm, lmmsize);
+                       if (rc == 0) {
+                               if (md.lsm != NULL)
+                                       *gen = md.lsm->lsm_layout_gen;
+
+                               memset(&conf, 0, sizeof conf);
+                               conf.coc_inode = inode;
+                               conf.u.coc_md = &md;
+                               ll_layout_conf(inode, &conf);
+                               /* is this racy? */
+                               lli->lli_has_smd = md.lsm != NULL;
+                       }
+                       if (md.lsm != NULL)
+                               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+                       LDLM_LOCK_PUT(lock);
+                       ptlrpc_req_finished(req);
+               } else { /* hit caching lock */
+                       struct lov_stripe_md *lsm;
+
+                       lsm = ccc_inode_lsm_get(inode);
+                       if (lsm != NULL)
+                               *gen = lsm->lsm_layout_gen;
+                       ccc_inode_lsm_put(inode, lsm);
+               }
+               ll_intent_drop_lock(&it);
+       }
+       cfs_mutex_unlock(&lli->lli_layout_mutex);
+       ll_finish_md_op_data(op_data);
+
+       RETURN(rc);
+}
index ecc249e..510a9b9 100644 (file)
@@ -265,7 +265,10 @@ struct ll_inode_info {
          *      some of the following members can be moved into u.f.
          */
        bool                            lli_has_smd;
-        struct cl_object               *lli_clob;
+       struct cl_object               *lli_clob;
+
+       /* mutex to request for layout lock exclusively. */
+       cfs_mutex_t                     lli_layout_mutex;
 };
 
 /*
@@ -387,6 +390,7 @@ enum stats_track_type {
 #define LL_SBI_64BIT_HASH      0x4000 /* support 64-bits dir hash/offset */
 #define LL_SBI_AGL_ENABLED     0x8000 /* enable agl */
 #define LL_SBI_VERBOSE        0x10000 /* verbose mount/umount */
+#define LL_SBI_LAYOUT_LOCK    0x20000 /* layout lock support */
 
 /* default value for ll_sb_info->contention_time */
 #define SBI_DEFAULT_CONTENTION_SECONDS     60
@@ -661,6 +665,7 @@ struct page *ll_get_dir_page(struct file *filp, struct inode *dir, __u64 hash,
 int ll_readdir(struct file *filp, void *cookie, filldir_t filldir);
 
 int ll_get_mdt_idx(struct inode *inode);
+char *ll_get_fsname(struct inode *inode);
 /* llite/namei.c */
 int ll_objects_destroy(struct ptlrpc_request *request,
                        struct inode *dir);
@@ -1536,4 +1541,7 @@ struct if_quotactl_18 {
 #warning "remove old LL_IOC_QUOTACTL_18 compatibility code"
 #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0) */
 
+int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
+int ll_layout_refresh(struct inode *inode, __u32 *gen);
+
 #endif /* LLITE_INTERNAL_H */
index ec264c4..6da9064 100644 (file)
@@ -372,6 +372,11 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         else
                 sbi->ll_md_brw_size = CFS_PAGE_SIZE;
 
+       if (data->ocd_connect_flags & OBD_CONNECT_LAYOUTLOCK) {
+               LCONSOLE_INFO("Layout lock feature supported.\n");
+               sbi->ll_flags |= LL_SBI_LAYOUT_LOCK;
+       }
+
         obd = class_name2obd(dt);
         if (!obd) {
                 CERROR("DT %s: not setup or attached\n", dt);
@@ -908,6 +913,7 @@ void ll_lli_init(struct ll_inode_info *lli)
                 CFS_INIT_LIST_HEAD(&lli->lli_agl_list);
                 lli->lli_agl_index = 0;
         }
+       cfs_mutex_init(&lli->lli_layout_mutex);
 }
 
 static inline int ll_bdi_register(struct backing_dev_info *bdi)
@@ -1602,16 +1608,15 @@ void ll_inode_size_unlock(struct inode *inode)
 
 void ll_update_inode(struct inode *inode, struct lustre_md *md)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct mdt_body *body = md->body;
-        struct lov_stripe_md *lsm = md->lsm;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
-
-        LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
-        if (lsm != NULL) {
-                LASSERT(S_ISREG(inode->i_mode));
-
-                cfs_mutex_lock(&lli->lli_och_mutex);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct mdt_body *body = md->body;
+       struct lov_stripe_md *lsm = md->lsm;
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+
+       LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
+       if (lsm != NULL) {
+               LASSERT(S_ISREG(inode->i_mode));
+               cfs_mutex_lock(&lli->lli_och_mutex);
                CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
                                lsm, inode->i_ino, inode->i_generation, inode);
                /* cl_file_inode_init must go before lli_has_smd or a race
@@ -1621,12 +1626,13 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                if (cl_file_inode_init(inode, md) == 0)
                        lli->lli_has_smd = true;
                cfs_mutex_unlock(&lli->lli_och_mutex);
+
                lli->lli_maxbytes = lsm->lsm_maxbytes;
                if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
                        lli->lli_maxbytes = MAX_LFS_FILESIZE;
                if (md->lsm != NULL)
                        obd_free_memmd(ll_i2dtexp(inode), &md->lsm);
-        }
+       }
 
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
                 if (body->valid & OBD_MD_FLRMTPERM)
@@ -2064,8 +2070,9 @@ int ll_prep_inode(struct inode **inode,
                   struct ptlrpc_request *req,
                   struct super_block *sb)
 {
-        struct ll_sb_info *sbi = NULL;
-        struct lustre_md md;
+       struct ll_sb_info *sbi = NULL;
+       struct lustre_md md;
+       __u64 ibits;
         int rc;
         ENTRY;
 
@@ -2104,9 +2111,18 @@ int ll_prep_inode(struct inode **inode,
                 }
         }
 
+       /* sanity check for LAYOUT lock. */
+       ibits = MDS_INODELOCK_LAYOUT;
+       if (S_ISREG(md.body->mode) && sbi->ll_flags & LL_SBI_LAYOUT_LOCK &&
+           md.lsm != NULL && !ll_have_md_lock(*inode, &ibits, LCK_MINMODE)) {
+               CERROR("%s: inode "DFID" (%p) layout lock not granted.\n",
+                       ll_get_fsname(*inode), PFID(ll_inode2fid(*inode)),
+                       *inode);
+       }
+
 out:
-        md_free_lustre_md(sbi->ll_md_exp, &md);
-        RETURN(rc);
+       md_free_lustre_md(sbi->ll_md_exp, &md);
+       RETURN(rc);
 }
 
 int ll_obd_statfs(struct inode *inode, void *arg)
index 331db4e..1da7ddc 100644 (file)
@@ -216,8 +216,10 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                         break;
 
                 LASSERT(lock->l_flags & LDLM_FL_CANCELING);
-                /* For OPEN locks we differentiate between lock modes - CR, CW. PR - bug 22891 */
-                if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE))
+                /* For OPEN locks we differentiate between lock modes
+                * LCK_CR, LCK_CW, LCK_PR - bug 22891 */
+               if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                           MDS_INODELOCK_LAYOUT))
                         ll_have_md_lock(inode, &bits, LCK_MINMODE);
 
                 if (bits & MDS_INODELOCK_OPEN)
@@ -251,7 +253,15 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                         ll_md_real_close(inode, flags);
                 }
 
-                lli = ll_i2info(inode);
+               lli = ll_i2info(inode);
+               if (bits & MDS_INODELOCK_LAYOUT) {
+                       struct cl_object_conf conf = { .coc_inode = inode,
+                                                      .coc_invalidate = true };
+                       rc = ll_layout_conf(inode, &conf);
+                       if (rc)
+                               CDEBUG(D_INODE, "invaliding layout %d.\n", rc);
+               }
+
                 if (bits & MDS_INODELOCK_UPDATE)
                         lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK;
 
index 06834f7..3870ddf 100644 (file)
@@ -200,14 +200,29 @@ struct lov_object {
          */
         enum lov_layout_type   lo_type;
        /**
+        * True if layout is valid. This bit is cleared when layout lock
+        * is lost.
+        */
+       unsigned               lo_lsm_invalid:1;
+       /**
+        * Layout metadata.
+        */
+       struct lov_stripe_md  *lo_lsm;
+       /**
         * Waitq - wait for no one else is using lo_lsm
         */
-       cfs_waitq_t            lo_waitq;
+       cfs_waitq_t            lo_waitq;
 
         union lov_layout_state {
                 struct lov_layout_raid0 {
                         unsigned               lo_nr;
-                        struct lov_stripe_md  *lo_lsm;
+                       /**
+                        * When this is true, lov_object::lo_attr contains
+                        * valid up to date attributes for a top-level
+                        * object. This field is reset to 0 when attributes of
+                        * any sub-object change.
+                        */
+                       int                    lo_attr_valid;
                         /**
                          * Array of sub-objects. Allocated when top-object is
                          * created (lov_init_raid0()).
@@ -229,13 +244,6 @@ struct lov_object {
                          */
                         cfs_spinlock_t         lo_sub_lock;
                         /**
-                         * When this is true, lov_object::lo_attr contains
-                         * valid up to date attributes for a top-level
-                         * object. This field is reset to 0 when attributes of
-                         * any sub-object change.
-                         */
-                        int                    lo_attr_valid;
-                        /**
                          * Cached object attribute, built from sub-object
                          * attributes.
                          */
@@ -803,13 +811,10 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env)
 
 static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov)
 {
-        struct lov_layout_raid0 *raid0;
-
-        LASSERT(lov->lo_type == LLT_RAID0);
-        raid0 = &lov->u.raid0;
-        LASSERT(raid0->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC ||
-                raid0->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC_V3);
-        return raid0;
+       LASSERT(lov->lo_type == LLT_RAID0);
+       LASSERT(lov->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC ||
+               lov->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC_V3);
+       return &lov->u.raid0;
 }
 
 /** @} lov */
index 12f620b..e6578ed 100644 (file)
@@ -303,15 +303,13 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
 static void lov_io_slice_init(struct lov_io *lio,
                               struct lov_object *obj, struct cl_io *io)
 {
-       struct lov_stripe_md *lsm = lov_lsm_addref(obj);
        ENTRY;
 
        io->ci_result = 0;
        lio->lis_object = obj;
 
-       LASSERT(lsm != NULL);
-       lio->lis_lsm = lsm; /* called inside lo_type_guard. */
-        lio->lis_stripe_count = lsm->lsm_stripe_count;
+       LASSERT(lio->lis_lsm != NULL);
+        lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count;
 
         switch (io->ci_type) {
         case CIT_READ:
index 40192ce..1fc696c 100644 (file)
@@ -316,7 +316,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                  * XXX for wide striping smarter algorithm is desirable,
                  * breaking out of the loop, early.
                  */
-                if (lov_stripe_intersects(r0->lo_lsm, i,
+               if (lov_stripe_intersects(loo->lo_lsm, i,
                                           file_start, file_end, &start, &end))
                         nr++;
         }
@@ -334,7 +334,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
          * top-lock.
          */
         for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
-                if (lov_stripe_intersects(r0->lo_lsm, i,
+               if (lov_stripe_intersects(loo->lo_lsm, i,
                                           file_start, file_end, &start, &end)) {
                         struct cl_lock_descr *descr;
 
@@ -919,7 +919,7 @@ static int lock_lock_multi_match()
                 if (sub->sub_lock == NULL)
                         continue;
                 subobj = sub->sub_descr.cld_obj;
-                if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe,
+               if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
                                            fstart, fend, &start, &end))
                         continue;
                 subneed->cld_start = cl_index(subobj, start);
@@ -943,7 +943,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env,
                                        const struct cl_lock_descr *child,
                                        const struct cl_lock_descr *descr)
 {
-        struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+       struct lov_stripe_md *lsm = lov->lo_lsm;
         obd_off start;
         obd_off end;
         int result;
index 33a47ed..7f92431 100644 (file)
@@ -59,7 +59,7 @@ struct lov_layout_operations {
                         struct lov_object *lov,
                         const struct cl_object_conf *conf,
                         union lov_layout_state *state);
-        void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
+       int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state);
         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
                          union lov_layout_state *state);
@@ -137,7 +137,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
         subhdr = cl_object_header(stripe);
         parent = subhdr->coh_parent;
 
-        oinfo = r0->lo_lsm->lsm_oinfo[idx];
+       oinfo = lov->lo_lsm->lsm_oinfo[idx];
         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
                " idx: %d gen: %d\n",
                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
@@ -188,7 +188,8 @@ static int lov_init_raid0(const struct lu_env *env,
                         LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
        }
 
-       r0->lo_lsm = lsm_addref(lsm);
+       LASSERT(lov->lo_lsm == NULL);
+       lov->lo_lsm = lsm_addref(lsm);
        r0->lo_nr  = lsm->lsm_stripe_count;
         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
@@ -221,10 +222,11 @@ static int lov_init_raid0(const struct lu_env *env,
         RETURN(result);
 }
 
-static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
-                             union lov_layout_state *state)
+static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
+                           union lov_layout_state *state)
 {
-        LASSERT(lov->lo_type == LLT_EMPTY);
+       LASSERT(lov->lo_type == LLT_EMPTY);
+       return 0;
 }
 
 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
@@ -274,19 +276,18 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
         LASSERT(r0->lo_sub[idx] == NULL);
 }
 
-static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
-                             union lov_layout_state *state)
+static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
+                           union lov_layout_state *state)
 {
        struct lov_layout_raid0 *r0 = &state->raid0;
-       struct lov_stripe_md    *lsm = r0->lo_lsm;
-       struct l_wait_info       lwi = { 0 };
-       int                      i;
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+       int i;
 
        ENTRY;
 
-       /* wait until there is no extra users. */
        dump_lsm(D_INODE, lsm);
-       l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+       if (cfs_atomic_read(&lsm->lsm_refc) > 1)
+               RETURN(-EBUSY);
 
         if (r0->lo_sub != NULL) {
                 for (i = 0; i < r0->lo_nr; ++i) {
@@ -300,7 +301,7 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                                 lov_subobject_kill(env, lov, los, i);
                 }
         }
-        EXIT;
+       RETURN(0);
 }
 
 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
@@ -321,8 +322,11 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
                 r0->lo_sub = NULL;
         }
 
-       LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
-       lov_free_memmd(&r0->lo_lsm);
+       LASSERTF(cfs_atomic_read(&lov->lo_lsm->lsm_refc) == 1,
+               "actual %d proc %p.\n",
+               cfs_atomic_read(&lov->lo_lsm->lsm_refc), cfs_current());
+       lov_free_memmd(&lov->lo_lsm);
+       lov->lo_lsm = NULL;
 
        EXIT;
 }
@@ -371,14 +375,19 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
                               struct cl_attr *attr)
 {
-        struct lov_object       *lov = cl2lov(obj);
-        struct lov_layout_raid0 *r0 = lov_r0(lov);
-        struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
+       struct lov_object       *lov = cl2lov(obj);
+       struct lov_layout_raid0 *r0 = lov_r0(lov);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
         __u64                    kms;
         int                      result = 0;
 
         ENTRY;
+
+       /* this is called w/o holding type guard mutex, so it must be inside
+        * an on going IO otherwise lsm may be replaced. */
+       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1);
+
         if (!r0->lo_attr_valid) {
                 /*
                  * Fill LVB with attributes already initialized by the upper
@@ -452,18 +461,29 @@ const static struct lov_layout_operations lov_dispatch[] = {
         lov_dispatch[__llt].op(__VA_ARGS__);                            \
 })
 
+static inline void lov_conf_freeze(struct lov_object *lov)
+{
+       if (lov->lo_owner != cfs_current())
+               cfs_down_read(&lov->lo_type_guard);
+}
+
+static inline void lov_conf_thaw(struct lov_object *lov)
+{
+       if (lov->lo_owner != cfs_current())
+               cfs_up_read(&lov->lo_type_guard);
+}
+
 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
 ({                                                                      \
         struct lov_object                      *__obj = (obj);          \
         int                                     __lock = !!(lock);      \
         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
                                                                         \
-        __lock &= __obj->lo_owner != cfs_current();                     \
         if (__lock)                                                     \
-                cfs_down_read(&__obj->lo_type_guard);                   \
+                lov_conf_freeze(__obj);                                        \
         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
         if (__lock)                                                     \
-                cfs_up_read(&__obj->lo_type_guard);                     \
+                lov_conf_thaw(__obj);                                  \
         __result;                                                       \
 })
 
@@ -478,59 +498,72 @@ do {                                                                    \
         struct lov_object                      *__obj = (obj);          \
         enum lov_layout_type                    __llt;                  \
                                                                         \
-        if (__obj->lo_owner != cfs_current())                           \
-                cfs_down_read(&__obj->lo_type_guard);                   \
+       lov_conf_freeze(__obj);                                         \
         __llt = __obj->lo_type;                                         \
         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
         lov_dispatch[__llt].op(__VA_ARGS__);                            \
-        if (__obj->lo_owner != cfs_current())                           \
-                cfs_up_read(&__obj->lo_type_guard);                     \
+       lov_conf_thaw(__obj);                                           \
 } while (0)
 
+static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
+{
+       struct l_wait_info lwi = { 0 };
+       struct lov_stripe_md *lsm = lov->lo_lsm;
+       ENTRY;
+
+       if (!lov->lo_lsm_invalid || lsm == NULL)
+               RETURN(0);
+
+       l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+       RETURN(0);
+}
+
 static int lov_layout_change(const struct lu_env *env,
-                             struct lov_object *obj, enum lov_layout_type llt,
+                             struct lov_object *lov, enum lov_layout_type llt,
                              const struct cl_object_conf *conf)
 {
-        int result;
-        union lov_layout_state       *state = &lov_env_info(env)->lti_state;
-        const struct lov_layout_operations *old_ops;
-        const struct lov_layout_operations *new_ops;
+       int result;
+       union lov_layout_state *state = &lov_env_info(env)->lti_state;
+       const struct lov_layout_operations *old_ops;
+       const struct lov_layout_operations *new_ops;
 
-        LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
-        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
-        ENTRY;
+       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+       void *cookie;
+       struct lu_env *nested;
+       int refcheck;
 
-        old_ops = &lov_dispatch[obj->lo_type];
-        new_ops = &lov_dispatch[llt];
-
-        result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
-                                   obj, conf, state);
-        if (result == 0) {
-                struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
-                void                    *cookie;
-                struct lu_env           *nested;
-                int                      refcheck;
-
-                cookie = cl_env_reenter();
-                nested = cl_env_get(&refcheck);
-                if (!IS_ERR(nested))
-                        cl_object_prune(nested, &obj->lo_cl);
-                else
-                        result = PTR_ERR(nested);
-                cl_env_put(nested, &refcheck);
-                cl_env_reexit(cookie);
-
-               old_ops->llo_delete(env, obj, &obj->u);
-                old_ops->llo_fini(env, obj, &obj->u);
-                LASSERT(cfs_list_empty(&hdr->coh_locks));
-                LASSERT(hdr->coh_tree.rnode == NULL);
-                LASSERT(hdr->coh_pages == 0);
-
-                new_ops->llo_install(env, obj, state);
-                obj->lo_type = llt;
-        } else
-                new_ops->llo_fini(env, obj, state);
-        RETURN(result);
+       LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
+       LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
+       ENTRY;
+
+       cookie = cl_env_reenter();
+       nested = cl_env_get(&refcheck);
+       if (!IS_ERR(nested))
+               cl_object_prune(nested, &lov->lo_cl);
+       else
+               result = PTR_ERR(nested);
+       cl_env_put(nested, &refcheck);
+       cl_env_reexit(cookie);
+
+       old_ops = &lov_dispatch[lov->lo_type];
+       new_ops = &lov_dispatch[llt];
+
+       result = old_ops->llo_delete(env, lov, &lov->u);
+       if (result == 0) {
+               old_ops->llo_fini(env, lov, &lov->u);
+               LASSERT(cfs_list_empty(&hdr->coh_locks));
+               LASSERT(hdr->coh_tree.rnode == NULL);
+               LASSERT(hdr->coh_pages == 0);
+
+               result = new_ops->llo_init(env,
+                                       lu2lov_dev(lov->lo_cl.co_lu.lo_dev),
+                                       lov, conf, state);
+               if (result == 0) {
+                       new_ops->llo_install(env, lov, state);
+                       lov->lo_type = llt;
+               }
+       }
+       RETURN(result);
 }
 
 /*****************************************************************************
@@ -570,27 +603,47 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
        struct lov_object *lov = cl2lov(obj);
        int result = 0;
+       ENTRY;
+
+       /*
+        * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
+        */
+       LASSERT(lov->lo_owner != cfs_current());
+       cfs_down_write(&lov->lo_type_guard);
+       LASSERT(lov->lo_owner == NULL);
+       lov->lo_owner = cfs_current();
+
+       if (conf->coc_invalidate) {
+               lov->lo_lsm_invalid = 1;
+               GOTO(out, result = 0);
+       }
+
+       if (conf->coc_validate_only) {
+               if (!lov->lo_lsm_invalid)
+                       GOTO(out, result = 0);
+
+               lov_layout_wait(env, lov);
+               /* fall through to set up new layout */
+       }
 
-        ENTRY;
-        /*
-         * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
-         */
-        LASSERT(lov->lo_owner != cfs_current());
-        cfs_down_write(&lov->lo_type_guard);
-        LASSERT(lov->lo_owner == NULL);
-        lov->lo_owner = cfs_current();
        switch (lov->lo_type) {
        case LLT_EMPTY:
                if (lsm != NULL)
                        result = lov_layout_change(env, lov, LLT_RAID0, conf);
                break;
        case LLT_RAID0:
-               if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
+               if (lsm == NULL)
+                       result = lov_layout_change(env, lov, LLT_EMPTY, conf);
+               else if (lov_stripe_md_cmp(lov->lo_lsm, lsm))
                        result = -EOPNOTSUPP;
                break;
        default:
                LBUG();
        }
+       lov->lo_lsm_invalid = result != 0;
+       EXIT;
+
+out:
        lov->lo_owner = NULL;
        cfs_up_write(&lov->lo_type_guard);
        RETURN(result);
@@ -636,7 +689,13 @@ struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
                 struct cl_io *io)
 {
+       struct lov_io *lio = lov_env_io(env);
+
         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
+
+       /* hold lsm before initializing because io relies on it */
+       lio->lis_lsm = lov_lsm_addref(cl2lov(obj));
+
         /*
          * Do not take lock in case of CIT_MISC io, because
          *
@@ -728,16 +787,13 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
 {
        struct lov_stripe_md *lsm = NULL;
 
-       cfs_down_read(&lov->lo_type_guard);
-       switch (lov->lo_type) {
-       case LLT_RAID0:
-               lsm = lsm_addref(lov->u.raid0.lo_lsm);
-       case LLT_EMPTY:
-               break;
-       default:
-               LBUG();
+       lov_conf_freeze(lov);
+       if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) {
+               lsm = lsm_addref(lov->lo_lsm);
+               CDEBUG(D_INODE, "lsm %p addref %d by %p.\n",
+                       lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
        }
-       cfs_up_read(&lov->lo_type_guard);
+       lov_conf_thaw(lov);
        return lsm;
 }
 
@@ -746,8 +802,10 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
        if (lsm == NULL)
                return;
 
-       lov_free_memmd(&lsm);
-       if (lov->lo_owner != NULL)
+       CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
+               lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+
+       if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid)
                cfs_waitq_signal(&lov->lo_waitq);
 }
 
@@ -793,13 +851,13 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
        if (luobj != NULL) {
                struct lov_object *lov = lu2lov(luobj);
 
-               cfs_down_read(&lov->lo_type_guard);
+               lov_conf_freeze(lov);
                switch (lov->lo_type) {
                case LLT_RAID0: {
                        struct lov_stripe_md *lsm;
                        int i;
 
-                       lsm = lov->u.raid0.lo_lsm;
+                       lsm = lov->lo_lsm;
                        LASSERT(lsm != NULL);
                        for (i = 0; i < lsm->lsm_stripe_count; i++) {
                                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
@@ -813,7 +871,7 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                default:
                        LBUG();
                }
-               cfs_up_read(&lov->lo_type_guard);
+               lov_conf_thaw(lov);
        }
        RETURN(rc);
 }
index 03335ff..9e39543 100644 (file)
@@ -184,9 +184,9 @@ struct cl_page *lov_page_init_raid0(const struct lu_env *env,
         ENTRY;
 
         offset = cl_offset(obj, page->cp_index);
-        stripe = lov_stripe_number(r0->lo_lsm, offset);
-        LASSERT(stripe < r0->lo_nr);
-        rc = lov_stripe_offset(r0->lo_lsm, offset, stripe,
+       stripe = lov_stripe_number(loo->lo_lsm, offset);
+       LASSERT(stripe < r0->lo_nr);
+       rc = lov_stripe_offset(loo->lo_lsm, offset, stripe,
                                    &suboff);
         LASSERT(rc == 0);
 
index ab1d9d9..3915be9 100644 (file)
@@ -153,10 +153,9 @@ static unsigned long lovsub_lock_weigh(const struct lu_env *env,
  * Maps start/end offsets within a stripe, to offsets within a file.
  */
 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
-                                  struct lov_object *obj,
-                                  int stripe, struct cl_lock_descr *out)
+                                 struct lov_object *lov,
+                                 int stripe, struct cl_lock_descr *out)
 {
-        struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
         pgoff_t size; /* stripe size in pages */
         pgoff_t skip; /* how many pages in every stripe are occupied by
                        * "other" stripes */
@@ -167,9 +166,9 @@ static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
         start = in->cld_start;
         end   = in->cld_end;
 
-        if (lsm->lsm_stripe_count > 1) {
-                size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
-                skip = (lsm->lsm_stripe_count - 1) * size;
+       if (lov->lo_lsm->lsm_stripe_count > 1) {
+               size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
+               skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
 
                 /* XXX overflow check here? */
                 start += start/size * skip + stripe * size;
index d9219e0..746012e 100644 (file)
@@ -464,9 +464,11 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                               struct lustre_handle *lockh,
                               int rc)
 {
-        struct req_capsule  *pill = &req->rq_pill;
-        struct ldlm_request *lockreq;
-        struct ldlm_reply   *lockrep;
+       struct req_capsule  *pill = &req->rq_pill;
+       struct ldlm_request *lockreq;
+       struct ldlm_reply   *lockrep;
+       __u64                bits = 0;
+       struct lustre_intent_data *intent = &it->d.lustre;
         ENTRY;
 
         LASSERT(rc >= 0);
@@ -492,20 +494,21 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         ldlm_lock_decref(lockh, einfo->ei_mode);
                         einfo->ei_mode = lock->l_req_mode;
                 }
-                LDLM_LOCK_PUT(lock);
-        }
+               bits = lock->l_policy_data.l_inodebits.bits;
+               LDLM_LOCK_PUT(lock);
+       }
 
-        lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
-        LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
+       lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
+       LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
 
-        it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
-        it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
-        it->d.lustre.it_lock_mode = einfo->ei_mode;
-        it->d.lustre.it_lock_handle = lockh->cookie;
-        it->d.lustre.it_data = req;
+       intent->it_disposition = (int)lockrep->lock_policy_res1;
+       intent->it_status = (int)lockrep->lock_policy_res2;
+       intent->it_lock_mode = einfo->ei_mode;
+       intent->it_lock_handle = lockh->cookie;
+       intent->it_data = req;
 
-        if (it->d.lustre.it_status < 0 && req->rq_replay)
-                mdc_clear_replay_flag(req, it->d.lustre.it_status);
+       if (intent->it_status < 0 && req->rq_replay)
+               mdc_clear_replay_flag(req, intent->it_status);
 
         /* If we're doing an IT_OPEN which did not result in an actual
          * successful open, then we need to remove the bit which saves
@@ -515,11 +518,11 @@ static int mdc_finish_enqueue(struct obd_export *exp,
          * function without doing so, and try to replay a failed create
          * (bug 3440) */
         if (it->it_op & IT_OPEN && req->rq_replay &&
-            (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
-                mdc_clear_replay_flag(req, it->d.lustre.it_status);
+           (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
+               mdc_clear_replay_flag(req, intent->it_status);
 
-        DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
-                  it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
+       DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
+                 it->it_op, intent->it_disposition, intent->it_status);
 
         /* We know what to expect, so we do any byte flipping required here */
         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
@@ -540,7 +543,9 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                          * is swabbed by that handler correctly.
                          */
                         mdc_set_open_replay_data(NULL, NULL, req);
-                }
+               }
+
+               /* TODO: make sure LAYOUT lock must be granted along with EA */
 
                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
                         void *eadata;
@@ -616,9 +621,47 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         if (capa == NULL)
                                 RETURN(-EPROTO);
                 }
-        }
+        } else if (it->it_op & IT_LAYOUT) {
+                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
-        RETURN(rc);
+               if (lock != NULL && lock->l_lvb_data == NULL) {
+                       int lvb_len;
+
+                       /* maybe the lock was granted right away and layout
+                        * is packed into RMF_DLM_LVB of req */
+                       lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
+                                                      RCL_SERVER);
+                       if (lvb_len > 0) {
+                               void *lvb;
+                               void *lmm;
+
+                               lvb = req_capsule_server_get(pill,
+                                                            &RMF_DLM_LVB);
+                               if (lvb == NULL) {
+                                       LDLM_LOCK_PUT(lock);
+                                       RETURN(-EPROTO);
+                               }
+
+                               OBD_ALLOC_LARGE(lmm, lvb_len);
+                               if (lmm == NULL) {
+                                       LDLM_LOCK_PUT(lock);
+                                       RETURN(-ENOMEM);
+                               }
+                               memcpy(lmm, lvb, lvb_len);
+
+                               /* install lvb_data */
+                               lock_res_and_lock(lock);
+                               LASSERT(lock->l_lvb_data == NULL);
+                               lock->l_lvb_data = lmm;
+                               lock->l_lvb_len = lvb_len;
+                               unlock_res_and_lock(lock);
+                       }
+               }
+               if (lock != NULL)
+                       LDLM_LOCK_PUT(lock);
+       }
+
+       RETURN(rc);
 }
 
 /* We always reserve enough space in the reply packet for a stripe MD, because
@@ -637,6 +680,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         static const ldlm_policy_data_t update_policy =
                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
+       static const ldlm_policy_data_t layout_policy =
+                           { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
         ldlm_policy_data_t const *policy = &lookup_policy;
         int                    generation, resends = 0;
         struct ldlm_reply     *lockrep;
@@ -647,10 +692,13 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 
-        if (it)
-                saved_flags |= LDLM_FL_HAS_INTENT;
-        if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
-                policy = &update_policy;
+       if (it) {
+               saved_flags |= LDLM_FL_HAS_INTENT;
+               if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+                       policy = &update_policy;
+               else if (it->it_op & IT_LAYOUT)
+                       policy = &layout_policy;
+       }
 
         LASSERT(reqp == NULL);
 
@@ -673,11 +721,11 @@ resend:
                 lmm = NULL;
         } else if (it->it_op & IT_UNLINK)
                 req = mdc_intent_unlink_pack(exp, it, op_data);
-        else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
-                req = mdc_intent_getattr_pack(exp, it, op_data);
-        else if (it->it_op == IT_READDIR)
-                req = ldlm_enqueue_pack(exp);
-        else {
+       else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
+               req = mdc_intent_getattr_pack(exp, it, op_data);
+       else if (it->it_op & (IT_READDIR | IT_LAYOUT))
+               req = ldlm_enqueue_pack(exp);
+       else {
                 LBUG();
                 RETURN(-EINVAL);
         }