Whamcloud - gitweb
LU-1876 hsm: bugfix about layout lock on the client
authorJinshan Xiong <jinshan.xiong@intel.com>
Wed, 7 Nov 2012 00:56:16 +0000 (16:56 -0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 29 Nov 2012 05:00:29 +0000 (00:00 -0500)
The following issues are fixed in this patch:
* deadlock at add_lsmref
* lov_delete_raid0 should wait for refcount of lsm to be zero
* handle empty layout at lov layer since layout can be changed
  anytime so original assumption of skipping lov layer when lsm is
  NULL is no longer correct
* other fixes

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: Ie7cd744b188f4d28fdbebda8870259b931328d00
Reviewed-on: http://review.whamcloud.com/4416
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
17 files changed:
lustre/include/cl_object.h
lustre/lclient/glimpse.c
lustre/lclient/lcommon_cl.c
lustre/ldlm/ldlm_lock.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/statahead.c
lustre/llite/vvp_io.c
lustre/llite/vvp_object.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lov_object.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_lock.c

index 78f0b49..b4f7917 100644 (file)
@@ -277,11 +277,6 @@ struct cl_object_conf {
          */
         struct inode             *coc_inode;
        /**
-        * Validate object conf. If object is using an invalid conf,
-        * then invalidate it and set the new layout.
-        */
-       bool                      coc_validate_only;
-       /**
         * Invalidate the current stripe configuration due to losing
         * layout lock.
         */
@@ -2360,18 +2355,19 @@ struct cl_io {
         */
                             ci_need_restart:1,
        /**
-        * Ignore layout change.
-        * Most of the CIT_MISC operations can ignore layout change, because
-        * the purpose to create this kind of cl_io is to give an environment
-        * to run clio methods, for example:
-        *   1. request group lock;
-        *   2. flush caching pages by osc;
-        *   3. writepage
-        *   4. echo client
-        * So far, only direct IO and glimpse clio need restart if layout
-        * change during IO time.
+        * to not refresh layout - the IO issuer knows that the layout won't
+        * change(page operations, layout change causes all page to be
+        * discarded), or it doesn't matter if it changes(sync).
+        */
+                            ci_ignore_layout:1,
+       /**
+        * Check if layout changed after the IO finishes. Mainly for HSM
+        * requirement. If IO occurs to openning files, it doesn't need to
+        * verify layout because HSM won't release openning files.
+        * Right now, only two opertaions need to verify layout: glimpse
+        * and setattr.
         */
-                            ci_ignore_layout:1;
+                            ci_verify_layout:1;
         /**
          * Number of pages owned by this IO. For invariant checking.
          */
index d03b855..58aad5e 100644 (file)
@@ -173,6 +173,7 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
                         cl_lock_release(env, lock, "glimpse", cfs_current());
                 } else {
                         CDEBUG(D_DLMTRACE, "No objects for inode\n");
+                       cl_merge_lvb(inode);
                 }
         }
 
@@ -225,6 +226,7 @@ int cl_glimpse_size0(struct inode *inode, int agl)
         result = cl_io_get(inode, &env, &io, &refcheck);
         if (result > 0) {
        again:
+               io->ci_verify_layout = 1;
                 result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
                 if (result > 0)
                         /*
index 22e3526..9f97e75 100644 (file)
@@ -1334,6 +1334,14 @@ __u32 cl_fid_build_gen(const struct lu_fid *fid)
         RETURN(gen);
 }
 
+/* lsm is unreliable after hsm implementation as layout can be changed at
+ * any time. This is only to support old, non-clio-ized interfaces. It will
+ * cause deadlock if clio operations are called with this extra layout refcount
+ * because in case the layout changed during the IO, ll_layout_refresh() will
+ * have to wait for the refcount to become zero to destroy the older layout.
+ *
+ * Notice that the lsm returned by this function may not be valid unless called
+ * inside layout lock - MDS_INODELOCK_LAYOUT. */
 struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode)
 {
        return lov_lsm_get(cl_i2info(inode)->lli_clob);
index f575bda..8c78ae6 100644 (file)
@@ -1249,7 +1249,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
                                      lock->l_flags & LDLM_FL_LVB_READY ||
-                                     lock->l_failed,
+                                    lock->l_destroyed || lock->l_failed,
                                      &lwi);
                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
                                 if (flags & LDLM_FL_TEST_LOCK)
index 1edafba..f94d0de 100644 (file)
@@ -262,7 +262,20 @@ void ll_intent_drop_lock(struct lookup_intent *it)
         struct lustre_handle *handle;
 
         if (it->it_op && it->d.lustre.it_lock_mode) {
-                handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
+               struct ldlm_lock *lock;
+
+               handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
+               lock = ldlm_handle2lock(handle);
+               if (lock != NULL) {
+                       /* it can only be allowed to match after layout is
+                        * applied to inode otherwise false layout would be
+                        * seen. Applying layout shoud happen before dropping
+                        * the intent lock. */
+                       if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT)
+                               ldlm_lock_allow_match(lock);
+                       LDLM_LOCK_PUT(lock);
+               }
+
                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
                        " from it %p\n", handle->cookie, it);
                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
@@ -543,6 +556,9 @@ out:
         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
                 ptlrpc_req_finished(req);
         if (rc == 0) {
+               /* mdt may grant layout lock for the newly created file, so
+                * release the lock to avoid leaking */
+               ll_intent_drop_lock(it);
                ll_invalidate_aliases(de->d_inode);
        } else {
                __u64 bits = 0;
index 34c08cc..21011ae 100644 (file)
@@ -779,11 +779,10 @@ int ll_merge_lvb(struct inode *inode)
                CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
                                PFID(&lli->lli_fid), lvb.lvb_size);
                inode->i_blocks = lvb.lvb_blocks;
-
-               LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
-               LTIME_S(inode->i_atime) = lvb.lvb_atime;
-               LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
        }
+       LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+       LTIME_S(inode->i_atime) = lvb.lvb_atime;
+       LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
        ll_inode_size_unlock(inode);
        ccc_inode_lsm_put(inode, lsm);
 
@@ -2071,9 +2070,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ptlrpc_request *req;
         struct obd_capa *oc;
-       struct lov_stripe_md *lsm;
         int rc, err;
         ENTRY;
+
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
@@ -2108,8 +2107,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         if (!err)
                 ptlrpc_req_finished(req);
 
-       lsm = ccc_inode_lsm_get(inode);
-       if (data && lsm) {
+       if (data) {
                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
 
                err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
@@ -2121,7 +2119,6 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                else
                        fd->fd_write_failed = false;
        }
-       ccc_inode_lsm_put(inode, lsm);
 
 #ifdef HAVE_FILE_FSYNC_4ARGS
        mutex_unlock(&inode->i_mutex);
@@ -2308,19 +2305,18 @@ int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
 }
 
 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
-                            struct lustre_handle *lockh)
+                            struct lustre_handle *lockh, __u64 flags)
 {
         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
         struct lu_fid *fid;
         ldlm_mode_t rc;
-       __u64 flags;
         ENTRY;
 
         fid = &ll_i2info(inode)->lli_fid;
         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
 
-        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+        rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
+                          fid, LDLM_IBITS, &policy,
                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
         RETURN(rc);
 }
@@ -2453,21 +2449,17 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
         ENTRY;
 
         rc = __ll_inode_revalidate_it(dentry, it, ibits);
-
-        /* if object not yet allocated, don't validate size */
-       if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
-                LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
-                LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
-                LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
-                RETURN(0);
-        }
-
-        /* ll_glimpse_size will prefer locally cached writes if they extend
-         * the file */
-
-        if (rc == 0)
-                rc = ll_glimpse_size(inode);
-
+       if (rc != 0)
+               RETURN(rc);
+
+       /* if object isn't regular file, don't validate size */
+       if (!S_ISREG(inode->i_mode)) {
+               LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
+               LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
+               LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
+       } else {
+               rc = ll_glimpse_size(inode);
+       }
         RETURN(rc);
 }
 
@@ -2871,17 +2863,19 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
        struct md_op_data     *op_data = NULL;
-       struct ptlrpc_request *req = NULL;
        struct lookup_intent   it = { .it_op = IT_LAYOUT };
-       struct lustre_handle   lockh;
+       struct lustre_handle   lockh = { 0 };
        ldlm_mode_t            mode;
-       struct cl_object_conf  conf = {  .coc_inode = inode,
-                                        .coc_validate_only = true };
+       struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
+                                          .ei_mode = LCK_CR,
+                                          .ei_cb_bl = ll_md_blocking_ast,
+                                          .ei_cb_cp = ldlm_completion_ast,
+                                          .ei_cbdata = inode };
        int rc;
        ENTRY;
 
        *gen = 0;
-       if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
+       if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
        /* sanity checks */
@@ -2890,16 +2884,14 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
+                               LDLM_FL_LVB_READY);
        if (mode != 0) { /* hit cached lock */
-               struct lov_stripe_md *lsm;
+               /* lsm_layout_gen is started from 0, plus 1 here to distinguish
+                * the cases of no layout and first layout. */
+               *gen = lli->lli_layout_gen + 1;
 
-               lsm = ccc_inode_lsm_get(inode);
-               if (lsm != NULL)
-                       *gen = lsm->lsm_layout_gen;
-               ccc_inode_lsm_put(inode, lsm);
                ldlm_lock_decref(&lockh, mode);
-
                RETURN(0);
        }
 
@@ -2911,60 +2903,71 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        /* take layout lock mutex to enqueue layout lock exclusively. */
        cfs_mutex_lock(&lli->lli_layout_mutex);
 
-       /* make sure the old conf goes away */
-       ll_layout_conf(inode, &conf);
+       /* try again inside layout mutex */
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
+                               LDLM_FL_LVB_READY);
+       if (mode != 0) { /* hit cached lock */
+               *gen = lli->lli_layout_gen + 1;
+
+               ldlm_lock_decref(&lockh, mode);
+               cfs_mutex_unlock(&lli->lli_layout_mutex);
+               ll_finish_md_op_data(op_data);
+               RETURN(0);
+       }
+
+       /* have to enqueue one */
+       rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
+                       NULL, 0, NULL, 0);
+       if (it.d.lustre.it_data != NULL)
+               ptlrpc_req_finished(it.d.lustre.it_data);
+       it.d.lustre.it_data = NULL;
 
-       /* enqueue layout lock */
-       rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
-                       &req, ll_md_blocking_ast, 0);
        if (rc == 0) {
-               /* we get a new lock, so update the lock data */
-               lockh.cookie = it.d.lustre.it_lock_handle;
-               md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
-
-               /* req == NULL is when lock was found in client cache, without
-                * any request to server (but lsm can be canceled just after a
-                * release) */
-               if (req != NULL) {
-                       struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
-                       struct lustre_md md = { NULL };
-                       void *lmm;
-                       int lmmsize;
-
-                       /* for IT_LAYOUT lock, lmm is returned in lock's lvb
-                        * data via completion callback */
-                       LASSERT(lock != NULL);
-                       lmm = lock->l_lvb_data;
-                       lmmsize = lock->l_lvb_len;
-                       if (lmm != NULL)
-                               rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
-                                               lmm, lmmsize);
-                       if (rc == 0) {
+               struct ldlm_lock *lock;
+               struct cl_object_conf conf;
+               struct lustre_md md = { NULL };
+               void *lmm;
+               int lmmsize;
+
+               LASSERT(lustre_handle_is_used(&lockh));
+
+               /* set lock data in case this is a new lock */
+               ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
+
+               lock = ldlm_handle2lock(&lockh);
+               LASSERT(lock != NULL);
+
+               /* for IT_LAYOUT lock, lmm is returned in lock's lvb
+                * data via completion callback */
+               lmm = lock->l_lvb_data;
+               lmmsize = lock->l_lvb_len;
+               if (lmm != NULL) {
+                       rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+                                       lmm, lmmsize);
+                       if (rc >= 0) {
                                if (md.lsm != NULL)
-                                       *gen = md.lsm->lsm_layout_gen;
-
-                               memset(&conf, 0, sizeof conf);
-                               conf.coc_inode = inode;
-                               conf.u.coc_md = &md;
-                               ll_layout_conf(inode, &conf);
-                               /* is this racy? */
-                               lli->lli_has_smd = md.lsm != NULL;
+                                       *gen = md.lsm->lsm_layout_gen + 1;
+                               rc = 0;
+                       } else {
+                               CERROR("file: "DFID" unpackmd error: %d\n",
+                                       PFID(&lli->lli_fid), rc);
                        }
-                       if (md.lsm != NULL)
-                               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
-
-                       LDLM_LOCK_PUT(lock);
-                       ptlrpc_req_finished(req);
-               } else { /* hit caching lock */
-                       struct lov_stripe_md *lsm;
-
-                       lsm = ccc_inode_lsm_get(inode);
-                       if (lsm != NULL)
-                               *gen = lsm->lsm_layout_gen;
-                       ccc_inode_lsm_put(inode, lsm);
                }
-               ll_intent_drop_lock(&it);
+               LDLM_LOCK_PUT(lock);
+
+               /* set layout to file. This may cause lock expiration as we
+                * set layout inside layout ibits lock. */
+               memset(&conf, 0, sizeof conf);
+               conf.coc_inode = inode;
+               conf.u.coc_md = &md;
+               ll_layout_conf(inode, &conf);
+               /* is this racy? */
+               lli->lli_has_smd = md.lsm != NULL;
+               if (md.lsm != NULL)
+                       obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
        }
+       ll_intent_drop_lock(&it);
+
        cfs_mutex_unlock(&lli->lli_layout_mutex);
        ll_finish_md_op_data(op_data);
 
index 8b3ab1b..b4eb309 100644 (file)
@@ -165,8 +165,7 @@ struct ll_inode_info {
         __u64                           lli_open_fd_read_count;
         __u64                           lli_open_fd_write_count;
         __u64                           lli_open_fd_exec_count;
-        /* Protects access to och pointers and their usage counters, also
-        * atomicity of check-update of lli_has_smd */
+        /* Protects access to och pointers and their usage counters */
         cfs_mutex_t                     lli_och_mutex;
 
         struct inode                    lli_vfs_inode;
@@ -270,6 +269,8 @@ struct ll_inode_info {
 
        /* mutex to request for layout lock exclusively. */
        cfs_mutex_t                     lli_layout_mutex;
+       /* valid only inside LAYOUT ibits lock, protected by lli_layout_mutex */
+       __u32                           lli_layout_gen;
 };
 
 /*
@@ -704,7 +705,7 @@ extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *,
 extern int ll_have_md_lock(struct inode *inode, __u64 *bits,
                            ldlm_mode_t l_req_mode);
 extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
-                                   struct lustre_handle *lockh);
+                                   struct lustre_handle *lockh, __u64 flags);
 int __ll_inode_revalidate_it(struct dentry *, struct lookup_intent *,
                              __u64 bits);
 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd);
index 5ee8fb1..2d8665b 100644 (file)
@@ -1479,6 +1479,8 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
         * resides on the MDS, ie, this file has no objects. */
        if (lsm != NULL)
                attr->ia_valid &= ~ATTR_SIZE;
+       /* can't call ll_setattr_ost() while holding a refcount of lsm */
+       ccc_inode_lsm_put(inode, lsm);
 
         memcpy(&op_data->op_attr, attr, sizeof(*attr));
 
@@ -1492,10 +1494,8 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                 GOTO(out, rc);
 
         ll_ioepoch_open(lli, op_data->op_ioepoch);
-       if (lsm == NULL || !S_ISREG(inode->i_mode)) {
-                CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
+       if (!S_ISREG(inode->i_mode))
                 GOTO(out, rc = 0);
-        }
 
         if (ia_valid & ATTR_SIZE)
                 attr->ia_valid |= ATTR_SIZE;
@@ -1511,7 +1511,6 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                 rc = ll_setattr_ost(inode, attr);
         EXIT;
 out:
-       ccc_inode_lsm_put(inode, lsm);
         if (op_data) {
                 if (op_data->op_ioepoch) {
                         rc1 = ll_setattr_done_writing(inode, op_data, mod);
@@ -1678,7 +1677,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
        LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
        if (lsm != NULL) {
                LASSERT(S_ISREG(inode->i_mode));
-               cfs_mutex_lock(&lli->lli_och_mutex);
                CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
                                lsm, inode->i_ino, inode->i_generation, inode);
                /* cl_file_inode_init must go before lli_has_smd or a race
@@ -1687,7 +1685,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                 * glimpse would try to use uninitialized lov */
                if (cl_file_inode_init(inode, md) == 0)
                        lli->lli_has_smd = true;
-               cfs_mutex_unlock(&lli->lli_och_mutex);
 
                lli->lli_maxbytes = lsm->lsm_maxbytes;
                if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
@@ -1777,7 +1774,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                          * lock on the client and set LLIF_MDS_SIZE_LOCK holding
                          * it. */
                         mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE,
-                                               &lockh);
+                                               &lockh, LDLM_FL_CBPENDING);
                         if (mode) {
                                 if (lli->lli_flags & (LLIF_DONE_WRITING |
                                                       LLIF_EPOCH_PENDING |
index 5cf4b9c..3ac64c9 100644 (file)
@@ -138,10 +138,7 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 static inline int agl_should_run(struct ll_statahead_info *sai,
                                  struct inode *inode)
 {
-       if (inode != NULL && S_ISREG(inode->i_mode) &&
-           ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid)
-               return 1;
-       return 0;
+       return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
 }
 
 static inline struct ll_sa_entry *
index 413fd94..f6f2271 100644 (file)
@@ -88,14 +88,22 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        struct cl_io     *io  = ios->cis_io;
        struct cl_object *obj = io->ci_obj;
        struct ccc_io    *cio = cl2ccc_io(env, ios);
-       __u32 gen;
 
         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
 
-       /* check layout version */
-       ll_layout_refresh(ccc_object_inode(obj), &gen);
-       if (cio->cui_layout_gen > 0)
-               io->ci_need_restart = cio->cui_layout_gen == gen;
+       CDEBUG(D_VFSTRACE, "ignore/verify layout %d/%d, layout version %d.\n",
+               io->ci_ignore_layout, io->ci_verify_layout, cio->cui_layout_gen);
+
+       if (!io->ci_ignore_layout && io->ci_verify_layout) {
+               __u32 gen = 0;
+
+               /* check layout version */
+               ll_layout_refresh(ccc_object_inode(obj), &gen);
+               io->ci_need_restart = cio->cui_layout_gen != gen;
+               if (io->ci_need_restart)
+                       CDEBUG(D_VFSTRACE, "layout changed from %d to %d.\n",
+                               cio->cui_layout_gen, gen);
+       }
 }
 
 static void vvp_io_fault_fini(const struct lu_env *env,
index 85a8dd6..48132bd 100644 (file)
@@ -121,13 +121,24 @@ static int vvp_attr_set(const struct lu_env *env, struct cl_object *obj,
         return 0;
 }
 
+int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
+               const struct cl_object_conf *conf)
+{
+       struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
+
+       if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
+               lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
+
+       return 0;
+}
+
 static const struct cl_object_operations vvp_ops = {
         .coo_page_init = vvp_page_init,
         .coo_lock_init = vvp_lock_init,
         .coo_io_init   = vvp_io_init,
         .coo_attr_get  = vvp_attr_get,
         .coo_attr_set  = vvp_attr_set,
-        .coo_conf_set  = ccc_conf_set,
+        .coo_conf_set  = vvp_conf_set,
         .coo_glimpse   = ccc_object_glimpse
 };
 
index 627abb2..b74df27 100644 (file)
@@ -579,6 +579,8 @@ int   lovsub_lock_init    (const struct lu_env *env, struct cl_object *obj,
 
 int   lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj,
                            struct cl_lock *lock, const struct cl_io *io);
+int   lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj,
+                           struct cl_lock *lock, const struct cl_io *io);
 int   lov_io_init_raid0   (const struct lu_env *env, struct cl_object *obj,
                            struct cl_io *io);
 int   lov_io_init_empty   (const struct lu_env *env, struct cl_object *obj,
index e6578ed..e8a5c54 100644 (file)
@@ -308,7 +308,8 @@ static void lov_io_slice_init(struct lov_io *lio,
        io->ci_result = 0;
        lio->lis_object = obj;
 
-       LASSERT(lio->lis_lsm != NULL);
+       LASSERT(obj->lo_lsm != NULL);
+       lio->lis_lsm = lsm_addref(obj->lo_lsm);
         lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count;
 
         switch (io->ci_type) {
@@ -932,13 +933,15 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
        switch (io->ci_type) {
        default:
                LBUG();
+       case CIT_MISC:
+       case CIT_READ:
+               result = 0;
+               break;
        case CIT_FSYNC:
-        case CIT_MISC:
-        case CIT_READ:
-                result = 0;
-                break;
+       case CIT_SETATTR:
+               result = +1;
+               break;
         case CIT_WRITE:
-        case CIT_SETATTR:
                 result = -EBADF;
                 break;
         case CIT_FAULT:
@@ -949,8 +952,8 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
         }
         if (result == 0)
                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
-        io->ci_result = result;
-        RETURN(result != 0);
+       io->ci_result = result < 0 ? result : 0;
+       RETURN(result != 0);
 }
 
 /** @} lov */
index 6d66adb..2424581 100644 (file)
@@ -1186,6 +1186,41 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
         RETURN(result);
 }
 
+static void lov_empty_lock_fini(const struct lu_env *env,
+                               struct cl_lock_slice *slice)
+{
+       struct lov_lock *lck = cl2lov_lock(slice);
+       OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
+}
+
+static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
+                       lu_printer_t p, const struct cl_lock_slice *slice)
+{
+       (*p)(env, cookie, "empty\n");
+       return 0;
+}
+
+static const struct cl_lock_operations lov_empty_lock_ops = {
+       .clo_fini  = lov_empty_lock_fini,
+       .clo_print = lov_empty_lock_print
+};
+
+int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
+               struct cl_lock *lock, const struct cl_io *io)
+{
+       struct lov_lock *lck;
+       int result = -ENOMEM;
+
+       ENTRY;
+       OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
+       if (lck != NULL) {
+               cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
+               lck->lls_orig = lock->cll_descr;
+               result = 0;
+       }
+       RETURN(result);
+}
+
 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
                                                struct cl_lock *parent)
 {
index bef4eef..99d543a 100644 (file)
@@ -233,6 +233,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
        LASSERT(lov->lo_type == LLT_EMPTY);
+       cl_object_prune(env, &lov->lo_cl);
        return 0;
 }
 
@@ -300,12 +301,14 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                 for (i = 0; i < r0->lo_nr; ++i) {
                         struct lovsub_object *los = r0->lo_sub[i];
 
-                        if (los != NULL)
+                        if (los != NULL) {
+                               cl_locks_prune(env, &los->lso_cl, 1);
                                 /*
                                  * If top-level object is to be evicted from
                                  * the cache, so are its sub-objects.
                                  */
                                 lov_subobject_kill(env, lov, los, i);
+                       }
                 }
         }
        RETURN(0);
@@ -388,8 +391,14 @@ static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
         ENTRY;
 
        /* this is called w/o holding type guard mutex, so it must be inside
-        * an on going IO otherwise lsm may be replaced. */
-       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1);
+        * an on going IO otherwise lsm may be replaced.
+        * LU-2117: it turns out there exists one exception. For mmaped files,
+        * the lock of those files may be requested in the other file's IO
+        * context, and this function is called in ccc_lock_state(), it will
+        * hit this assertion.
+        * Anyway, it's still okay to call attr_get w/o type guard as layout
+        * can't go if locks exist. */
+       /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */
 
         if (!r0->lo_attr_valid) {
                 /*
@@ -433,7 +442,7 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_install   = lov_install_empty,
                 .llo_print     = lov_print_empty,
                 .llo_page_init = lov_page_init_empty,
-                .llo_lock_init = NULL,
+                .llo_lock_init = lov_lock_init_empty,
                 .llo_io_init   = lov_io_init_empty,
                 .llo_getattr   = lov_attr_get_empty
         },
@@ -508,6 +517,20 @@ do {                                                                    \
        lov_conf_thaw(__obj);                                           \
 } while (0)
 
+static void lov_conf_lock(struct lov_object *lov)
+{
+       LASSERT(lov->lo_owner != cfs_current());
+       cfs_down_write(&lov->lo_type_guard);
+       LASSERT(lov->lo_owner == NULL);
+       lov->lo_owner = cfs_current();
+}
+
+static void lov_conf_unlock(struct lov_object *lov)
+{
+       lov->lo_owner = NULL;
+       cfs_up_write(&lov->lo_type_guard);
+}
+
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 {
        struct l_wait_info lwi = { 0 };
@@ -517,11 +540,17 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
        if (!lov->lo_lsm_invalid || lsm == NULL)
                RETURN(0);
 
-       l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+       while (cfs_atomic_read(&lsm->lsm_refc) > 1) {
+               lov_conf_unlock(lov);
+               l_wait_event(lov->lo_waitq,
+                            cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+               lov_conf_lock(lov);
+       }
        RETURN(0);
 }
 
-static int lov_layout_change(const struct lu_env *env,
+static int lov_layout_change(const struct lu_env *unused,
                              struct lov_object *lov, enum lov_layout_type llt,
                              const struct cl_object_conf *conf)
 {
@@ -532,7 +561,7 @@ static int lov_layout_change(const struct lu_env *env,
 
        struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
        void *cookie;
-       struct lu_env *nested;
+       struct lu_env *env;
        int refcheck;
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
@@ -540,13 +569,11 @@ static int lov_layout_change(const struct lu_env *env,
        ENTRY;
 
        cookie = cl_env_reenter();
-       nested = cl_env_get(&refcheck);
-       if (!IS_ERR(nested))
-               cl_object_prune(nested, &lov->lo_cl);
-       else
-               result = PTR_ERR(nested);
-       cl_env_put(nested, &refcheck);
-       cl_env_reexit(cookie);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env)) {
+               cl_env_reexit(cookie);
+               RETURN(PTR_ERR(env));
+       }
 
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
@@ -571,6 +598,9 @@ static int lov_layout_change(const struct lu_env *env,
                        /* this file becomes an EMPTY file. */
                }
        }
+
+       cl_env_put(env, &refcheck);
+       cl_env_reexit(cookie);
        RETURN(result);
 }
 
@@ -606,32 +636,33 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                         const struct cl_object_conf *conf)
 {
-       struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+       struct lov_stripe_md *lsm = NULL;
        struct lov_object *lov = cl2lov(obj);
        int result = 0;
        ENTRY;
 
-       /*
-        * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
-        */
-       LASSERT(lov->lo_owner != cfs_current());
-       cfs_down_write(&lov->lo_type_guard);
-       LASSERT(lov->lo_owner == NULL);
-       lov->lo_owner = cfs_current();
-
+       lov_conf_lock(lov);
        if (conf->coc_invalidate) {
                lov->lo_lsm_invalid = 1;
                GOTO(out, result = 0);
        }
 
-       if (conf->coc_validate_only) {
-               if (!lov->lo_lsm_invalid)
-                       GOTO(out, result = 0);
+       if (conf->u.coc_md != NULL)
+               lsm = conf->u.coc_md->lsm;
 
-               lov_layout_wait(env, lov);
-               /* fall through to set up new layout */
+       if ((lsm == NULL && lov->lo_lsm == NULL) ||
+           (lsm != NULL && lov->lo_lsm != NULL &&
+            lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+               lov->lo_lsm_invalid = 0;
+               GOTO(out, result = 0);
        }
 
+       /* will change layout */
+       lov_layout_wait(env, lov);
+
+       /*
+        * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
+        */
        switch (lov->lo_type) {
        case LLT_EMPTY:
                if (lsm != NULL)
@@ -650,8 +681,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        EXIT;
 
 out:
-       lov->lo_owner = NULL;
-       cfs_up_write(&lov->lo_type_guard);
+       lov_conf_unlock(lov);
        RETURN(result);
 }
 
@@ -684,8 +714,8 @@ static int lov_object_print(const struct lu_env *env, void *cookie,
 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
                               struct cl_page *page, cfs_page_t *vmpage)
 {
-        return LOV_2DISPATCH(cl2lov(obj),
-                             llo_page_init, env, obj, page, vmpage);
+        return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
+                                   llo_page_init, env, obj, page, vmpage);
 }
 
 /**
@@ -695,15 +725,9 @@ struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io)
 {
-       struct lov_io *lio = lov_env_io(env);
-
        CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
-
-       /* hold lsm before initializing because io relies on it */
-       lio->lis_lsm = lov_lsm_addref(cl2lov(obj));
-
-       /* No need to lock because we've taken one refcount of layout.  */
-       return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_io_init, env, obj, io);
+       return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
+                                    !io->ci_ignore_layout, env, obj, io);
 }
 
 /**
@@ -784,10 +808,11 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
        struct lov_stripe_md *lsm = NULL;
 
        lov_conf_freeze(lov);
-       if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) {
+       if (lov->lo_lsm != NULL) {
                lsm = lsm_addref(lov->lo_lsm);
-               CDEBUG(D_INODE, "lsm %p addref %d by %p.\n",
-                       lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+               CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
+                       lsm, cfs_atomic_read(&lsm->lsm_refc),
+                       lov->lo_lsm_invalid, cfs_current());
        }
        lov_conf_thaw(lov);
        return lsm;
index 6619073..5bae736 100644 (file)
@@ -112,7 +112,7 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io)
         ENTRY;
 
         while (!cfs_list_empty(&io->ci_layers)) {
-                slice = container_of(io->ci_layers.next, struct cl_io_slice,
+                slice = container_of(io->ci_layers.prev, struct cl_io_slice,
                                      cis_linkage);
                 cfs_list_del_init(&slice->cis_linkage);
                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
@@ -137,10 +137,11 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io)
        case CIT_FSYNC:
                LASSERT(!io->ci_need_restart);
                break;
+       case CIT_SETATTR:
        case CIT_MISC:
                /* Check ignore layout change conf */
-               LASSERT(ergo(io->ci_ignore_layout, !io->ci_need_restart));
-       case CIT_SETATTR:
+               LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
+                               !io->ci_need_restart));
                break;
        default:
                LBUG();
index c3ad51e..72f8a1a 100644 (file)
@@ -828,8 +828,8 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
 
                 cfs_spin_lock(&head->coh_lock_guard);
                 cfs_list_del_init(&lock->cll_linkage);
-
                 cfs_spin_unlock(&head->coh_lock_guard);
+
                 /*
                  * From now on, no new references to this lock can be acquired
                  * by cl_lock_lookup().