Whamcloud - gitweb
LU-169 lov: add lsm refcounting
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Tue, 19 Jun 2012 13:09:48 +0000 (21:09 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 3 Jul 2012 08:42:03 +0000 (04:42 -0400)
This patch adds a reference counter to the lsm structure.
Each time a lsm is used, a reference must be taken with lsm_get/addref
(lsm_addref() can be used when we already own a reference on the lsm).
This reference can be released via obd_free_memmd(). The lsm is freed
when the last reference is dropped.

This patch also moves lov_stripe_md into a private data of
lov_object.

Signed-off-by: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I156b4cc2dc82bb15ae8107cf9842f100048c03d4
Reviewed-on: http://review.whamcloud.com/1874
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
31 files changed:
libcfs/include/libcfs/posix/posix-types.h
lustre/include/lclient.h
lustre/include/obd.h
lustre/lclient/glimpse.c
lustre/lclient/lcommon_cl.c
lustre/liblustre/file.c
lustre/liblustre/llite_lib.h
lustre/liblustre/namei.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_close.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/rw.c
lustre/llite/rw26.c
lustre/llite/statahead.c
lustre/llite/symlink.c
lustre/llite/vvp_io.c
lustre/llite/xattr.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_merge.c
lustre/lov/lov_obd.c
lustre/lov/lov_object.c
lustre/lov/lov_pack.c
lustre/obdclass/debug.c
lustre/obdecho/echo_client.c

index 45954e6..aa62e71 100644 (file)
@@ -41,6 +41,7 @@
 #define _LUSTRE_POSIX_TYPES_H
 
 #include <asm/types.h>
+#include <stdbool.h> /* for bool */
 #ifndef HAVE_UMODE_T
 typedef unsigned short cfs_umode_t;
 #else
index 4072e4b..c79ddfc 100644 (file)
@@ -404,4 +404,18 @@ int  cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
                       struct ccc_grouplock *cg);
 void cl_put_grouplock(struct ccc_grouplock *cg);
 
+/**
+ * New interfaces to get and put lov_stripe_md from lov layer. This violates
+ * layering because lov_stripe_md is supposed to be a private data in lov.
+ *
+ * NB: If you find you have to use these interfaces for your new code, please
+ * think about it again. These interfaces may be removed in the future for
+ * better layering. */
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj);
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm);
+int lov_read_and_clear_async_rc(struct cl_object *clob);
+
+struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode);
+void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
+
 #endif /*LCLIENT_H */
index 054e025..2780fcf 100644 (file)
@@ -102,6 +102,7 @@ static inline void loi_init(struct lov_oinfo *loi)
 }
 
 struct lov_stripe_md {
+       cfs_atomic_t     lsm_refc;
         cfs_spinlock_t   lsm_lock;
         pid_t            lsm_lock_owner; /* debugging */
 
index 4783b68..31acda0 100644 (file)
@@ -115,7 +115,7 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
         result = 0;
         if (!(lli->lli_flags & LLIF_MDS_SIZE_LOCK)) {
                 CDEBUG(D_DLMTRACE, "Glimpsing inode "DFID"\n", PFID(fid));
-                if (lli->lli_smd) {
+               if (lli->lli_has_smd) {
                         /* NOTE: this looks like DLM lock request, but it may
                          *       not be one. Due to CEF_ASYNC flag (translated
                          *       to LDLM_FL_HAS_INTENT by osc), this is
@@ -253,7 +253,7 @@ int cl_local_size(struct inode *inode)
 
         ENTRY;
 
-        if (!cl_i2info(inode)->lli_smd)
+       if (!cl_i2info(inode)->lli_has_smd)
                 RETURN(0);
 
         result = cl_io_get(inode, &env, &io, &refcheck);
index 05c348d..3071da5 100644 (file)
@@ -457,6 +457,22 @@ int ccc_conf_set(const struct lu_env *env, struct cl_object *obj,
         return 0;
 }
 
+static void ccc_object_size_lock(struct cl_object *obj)
+{
+       struct inode *inode = ccc_object_inode(obj);
+
+       cl_isize_lock(inode);
+       cl_object_attr_lock(obj);
+}
+
+static void ccc_object_size_unlock(struct cl_object *obj)
+{
+       struct inode *inode = ccc_object_inode(obj);
+
+       cl_object_attr_unlock(obj);
+       cl_isize_unlock(inode);
+}
+
 /*****************************************************************************
  *
  * Page operations.
@@ -684,8 +700,7 @@ void ccc_lock_state(const struct lu_env *env,
                  * cancel the result of the truncate.  Getting the
                  * ll_inode_size_lock() after the enqueue maintains the DLM
                  * -> ll_inode_size_lock() acquiring order. */
-                cl_isize_lock(inode, 0);
-                cl_object_attr_lock(obj);
+               ccc_object_size_lock(obj);
                 rc = cl_object_attr_get(env, obj, attr);
                 if (rc == 0) {
                         if (lock->cll_descr.cld_start == 0 &&
@@ -702,10 +717,9 @@ void ccc_lock_state(const struct lu_env *env,
                 } else {
                         CL_LOCK_DEBUG(D_INFO, env, lock, "attr_get: %d\n", rc);
                 }
-                cl_object_attr_unlock(obj);
-                cl_isize_unlock(inode, 0);
-        }
-        EXIT;
+               ccc_object_size_unlock(obj);
+       }
+       EXIT;
 }
 
 /*****************************************************************************
@@ -825,22 +839,6 @@ void ccc_io_advance(const struct lu_env *env,
         }
 }
 
-static void ccc_object_size_lock(struct cl_object *obj)
-{
-        struct inode *inode = ccc_object_inode(obj);
-
-        cl_isize_lock(inode, 0);
-        cl_object_attr_lock(obj);
-}
-
-static void ccc_object_size_unlock(struct cl_object *obj)
-{
-        struct inode *inode = ccc_object_inode(obj);
-
-        cl_object_attr_unlock(obj);
-        cl_isize_unlock(inode, 0);
-}
-
 /**
  * Helper function that if necessary adjusts file size (inode->i_size), when
  * position at the offset \a pos is accessed. File size can be arbitrary stale
@@ -1336,3 +1334,13 @@ __u32 cl_fid_build_gen(const struct lu_fid *fid)
         gen = (fid_flatten(fid) >> 32);
         RETURN(gen);
 }
+
+struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode)
+{
+       return lov_lsm_get(cl_i2info(inode)->lli_clob);
+}
+
+void inline ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm)
+{
+       lov_lsm_put(cl_i2info(inode)->lli_clob, lsm);
+}
index 85c307a..35bc6d9 100644 (file)
@@ -186,7 +186,6 @@ int llu_iop_open(struct pnode *pnode, int flags, mode_t mode)
         struct intnl_stat *st = llu_i2stat(inode);
         struct ptlrpc_request *request;
         struct lookup_intent *it;
-        struct lov_stripe_md *lsm;
         int rc = 0;
         ENTRY;
 
@@ -214,8 +213,7 @@ int llu_iop_open(struct pnode *pnode, int flags, mode_t mode)
         if (!S_ISREG(st->st_mode))
                 GOTO(out_release, rc = 0);
 
-        lsm = lli->lli_smd;
-        if (lsm)
+       if (lli->lli_has_smd)
                 flags &= ~O_LOV_DELAY_CREATE;
         /*XXX: open_flags are overwritten and the previous ones are lost */
         lli->lli_open_flags = flags & ~(O_CREAT | O_EXCL | O_TRUNC);
@@ -335,10 +333,7 @@ int llu_som_update(struct inode *inode, struct md_op_data *op_data)
                                        old_flags & MF_GETATTR_LOCK);
                 if (rc) {
                         oa.o_valid = 0;
-                        if (rc == -ENOENT)
-                                CDEBUG(D_INODE, "objid "LPX64" is destroyed\n",
-                                       lli->lli_smd->lsm_object_id);
-                        else
+                       if (rc != -ENOENT)
                                 CERROR("inode_getattr failed (%d): unable to "
                                        "send a Size-on-MDS attribute update "
                                        "for inode %llu/%lu\n", rc,
index 9609e27..8eadab8 100644 (file)
@@ -99,7 +99,6 @@ struct llu_inode_info {
         struct llu_sb_info     *lli_sbi;
         struct lu_fid           lli_fid;
 
-        struct lov_stripe_md   *lli_smd;
         char                   *lli_symlink_name;
         __u64                   lli_maxbytes;
         unsigned long           lli_flags;
@@ -118,6 +117,7 @@ struct llu_inode_info {
          * was opened several times without close, we track an
          * open_count here */
         struct ll_file_data    *lli_file_data;
+       bool                    lli_has_smd;
         int                     lli_open_flags;
         int                     lli_open_count;
 
@@ -407,11 +407,11 @@ static inline struct ll_file_data *cl_iattr2fd(struct inode *inode,
         return llu_i2info(inode)->lli_file_data;
 }
 
-static inline void cl_isize_lock(struct inode *inode, int lsmlock)
+static inline void cl_isize_lock(struct inode *inode)
 {
 }
 
-static inline void cl_isize_unlock(struct inode *inode, int lsmlock)
+static inline void cl_isize_unlock(struct inode *inode)
 {
 }
 
index 9e06c82..004461a 100644 (file)
@@ -346,8 +346,7 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
                         if (md.lsm != NULL)
                                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
                         RETURN(inode ? PTR_ERR(inode) : -ENOMEM);
-                } else if (md.lsm != NULL &&
-                           llu_i2info(inode)->lli_smd != md.lsm) {
+               } else if (md.lsm != NULL) {
                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
                 }
 
@@ -356,12 +355,9 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
 
                 /* If this is a stat, get the authoritative file size */
                 if (it->it_op == IT_GETATTR && S_ISREG(st->st_mode) &&
-                    lli->lli_smd != NULL) {
-                        struct lov_stripe_md *lsm = lli->lli_smd;
+                   lli->lli_has_smd) {
                         ldlm_error_t rc;
 
-                        LASSERT(lsm->lsm_object_id != 0);
-
                         /* bug 2334: drop MDS lock before acquiring OST lock */
                         ll_intent_drop_lock(it);
 
index 1557089..4991ca9 100644 (file)
@@ -58,11 +58,10 @@ typedef ssize_t llu_file_piov_t(const struct iovec *iovec, int iovlen,
 
 size_t llap_cookie_size;
 
-static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
+static int llu_lock_to_stripe_offset(struct obd_export *exp,
+                                    struct lov_stripe_md *lsm,
+                                    struct ldlm_lock *lock)
 {
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        struct obd_export *exp = llu_i2obdexp(inode);
         struct {
                 char name[16];
                 struct ldlm_lock *lock;
@@ -71,8 +70,8 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock
         int rc;
         ENTRY;
 
-        if (lsm->lsm_stripe_count == 1)
-                RETURN(0);
+       if (lsm == NULL || lsm->lsm_stripe_count == 1)
+               RETURN(0);
 
         /* get our offset in the lov */
         rc = obd_get_info(NULL, exp, sizeof(key), &key, &vallen, &stripe, lsm);
@@ -81,7 +80,7 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock
                 LBUG();
         }
         LASSERT(stripe < lsm->lsm_stripe_count);
-        RETURN(stripe);
+       RETURN(stripe);
 }
 
 int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
@@ -121,11 +120,15 @@ int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
                 lli= llu_i2info(inode);
                 if (!lli)
                         goto iput;
-                if (!lli->lli_smd)
-                        goto iput;
-                lsm = lli->lli_smd;
+               if (!lli->lli_has_smd)
+                       goto iput;
 
-                stripe = llu_lock_to_stripe_offset(inode, lock);
+               lsm = ccc_inode_lsm_get(inode);
+               if (lsm == NULL)
+                       goto iput;
+
+                stripe = llu_lock_to_stripe_offset(llu_i2obdexp(inode),
+                                                  lsm, lock);
                 lock_res_and_lock(lock);
                 kms = ldlm_extent_shift_kms(lock,
                                             lsm->lsm_oinfo[stripe]->loi_kms);
@@ -134,6 +137,7 @@ int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
                 loi_kms_set(lsm->lsm_oinfo[stripe], kms);
+               ccc_inode_lsm_put(inode, lsm);
 iput:
                 I_RELE(inode);
                 break;
@@ -147,10 +151,11 @@ iput:
 
 static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 {
-        struct ptlrpc_request *req = reqp;
-        struct inode *inode = llu_inode_from_lock(lock);
-        struct llu_inode_info *lli;
-        struct ost_lvb *lvb;
+       struct ptlrpc_request *req = reqp;
+       struct inode *inode = llu_inode_from_lock(lock);
+       struct llu_inode_info *lli;
+       struct ost_lvb *lvb;
+       struct lov_stripe_md *lsm;
         int rc, stripe = 0;
         ENTRY;
 
@@ -159,12 +164,6 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         lli = llu_i2info(inode);
         if (lli == NULL)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
-        if (lli->lli_smd == NULL)
-                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
-
-        /* First, find out which stripe index this lock corresponds to. */
-        if (lli->lli_smd->lsm_stripe_count > 1)
-                stripe = llu_lock_to_stripe_offset(inode, lock);
 
         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
@@ -175,8 +174,16 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
                 GOTO(iput, rc);
         }
 
-        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-        lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm == NULL)
+               GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
+
+        /* First, find out which stripe index this lock corresponds to. */
+       stripe = llu_lock_to_stripe_offset(llu_i2obdexp(inode), lsm, lock);
+
+       lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+       lvb->lvb_size = lsm->lsm_oinfo[stripe]->loi_kms;
+       ccc_inode_lsm_put(inode, lsm);
 
         LDLM_DEBUG(lock, "i_size: "LPU64" -> stripe number %u -> kms "LPU64,
                    (__u64)llu_i2stat(inode)->st_size, stripe,lvb->lvb_size);
@@ -194,21 +201,26 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 
 int llu_merge_lvb(struct inode *inode)
 {
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct llu_sb_info *sbi = llu_i2sbi(inode);
-        struct intnl_stat *st = llu_i2stat(inode);
-        struct ost_lvb lvb;
-        int rc;
-        ENTRY;
-
-        lov_stripe_lock(lli->lli_smd);
+       struct llu_inode_info *lli = llu_i2info(inode);
+       struct llu_sb_info *sbi = llu_i2sbi(inode);
+       struct intnl_stat *st = llu_i2stat(inode);
+       struct ost_lvb lvb;
+       struct lov_stripe_md *lsm;
+       int rc;
+       ENTRY;
+
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm == NULL)
+               RETURN(0);
+
+       lov_stripe_lock(lsm);
         inode_init_lvb(inode, &lvb);
         /* merge timestamps the most resently obtained from mds with
            timestamps obtained from osts */
         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
-        rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
+       rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
         st->st_size = lvb.lvb_size;
         st->st_blocks = lvb.lvb_blocks;
         /* handle st_blocks overflow gracefully */
@@ -217,9 +229,10 @@ int llu_merge_lvb(struct inode *inode)
         st->st_mtime = lvb.lvb_mtime;
         st->st_atime = lvb.lvb_atime;
         st->st_ctime = lvb.lvb_ctime;
-        lov_stripe_unlock(lli->lli_smd);
+       lov_stripe_unlock(lsm);
+       ccc_inode_lsm_put(inode, lsm);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
index e98c239..b89c5e9 100644 (file)
@@ -147,19 +147,15 @@ void llu_update_inode(struct inode *inode, struct lustre_md *md)
                 st->st_mode = (st->st_mode & ~S_IFMT)|(body->mode & S_IFMT);
 
         if (lsm != NULL) {
-                if (lli->lli_smd == NULL) {
-                        cl_file_inode_init(inode, md);
-                        lli->lli_smd = lsm;
-                        lli->lli_maxbytes = lsm->lsm_maxbytes;
-                        if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
-                                lli->lli_maxbytes = MAX_LFS_FILESIZE;
-                } else {
-                        if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
-                                CERROR("lsm mismatch for inode %lld\n",
-                                       (long long)st->st_ino);
-                                LBUG();
-                        }
-                }
+               if (!lli->lli_has_smd) {
+                       cl_file_inode_init(inode, md);
+                       lli->lli_has_smd = true;
+                       lli->lli_maxbytes = lsm->lsm_maxbytes;
+                       if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
+                               lli->lli_maxbytes = MAX_LFS_FILESIZE;
+               }
+               if (md->lsm != NULL)
+                       obd_free_memmd(llu_i2obdexp(inode), &md->lsm);
         }
 
         if (body->valid & OBD_MD_FLATIME) {
@@ -193,7 +189,7 @@ void llu_update_inode(struct inode *inode, struct lustre_md *md)
                 lli->lli_st_flags = body->flags;
         if (body->valid & OBD_MD_FLSIZE) {
                 if ((llu_i2sbi(inode)->ll_lco.lco_flags & OBD_CONNECT_SOM) &&
-                    S_ISREG(st->st_mode) && lli->lli_smd) {
+                   S_ISREG(st->st_mode) && lli->lli_has_smd) {
                         struct lustre_handle lockh;
                         ldlm_mode_t mode;
 
@@ -267,13 +263,13 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid)
 int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
                       __u64 ioepoch, int sync)
 {
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct ptlrpc_request_set *set;
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        struct obd_info oinfo = { { { 0 } } };
-        int rc;
-        ENTRY;
+       struct ptlrpc_request_set *set;
+       struct lov_stripe_md *lsm = NULL;
+       struct obd_info oinfo = { { { 0 } } };
+       int rc;
+       ENTRY;
 
+       lsm = ccc_inode_lsm_get(inode);
         LASSERT(lsm);
 
         oinfo.oi_md = lsm;
@@ -303,6 +299,7 @@ int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
                         rc = ptlrpc_set_wait(set);
                 ptlrpc_set_destroy(set);
         }
+       ccc_inode_lsm_put(inode, lsm);
         if (rc)
                 RETURN(rc);
 
@@ -312,11 +309,11 @@ int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
 
         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
         CDEBUG(D_INODE, "objid "LPX64" size %llu, blocks %llu, "
-               "blksize %llu\n", lli->lli_smd->lsm_object_id,
-               (long long unsigned)llu_i2stat(inode)->st_size,
-               (long long unsigned)llu_i2stat(inode)->st_blocks,
-               (long long unsigned)llu_i2stat(inode)->st_blksize);
-        RETURN(0);
+              "blksize %llu\n", oinfo.oi_oa->o_id,
+              (long long unsigned)llu_i2stat(inode)->st_size,
+              (long long unsigned)llu_i2stat(inode)->st_blocks,
+              (long long unsigned)llu_i2stat(inode)->st_blksize);
+       RETURN(0);
 }
 
 static struct inode* llu_new_inode(struct filesys *fs,
@@ -346,7 +343,7 @@ static struct inode* llu_new_inode(struct filesys *fs,
 
         /* initialize lli here */
         lli->lli_sbi = llu_fs2sbi(fs);
-        lli->lli_smd = NULL;
+       lli->lli_has_smd = false;
         lli->lli_symlink_name = NULL;
         lli->lli_flags = 0;
         lli->lli_maxbytes = (__u64)(~0UL);
@@ -442,12 +439,12 @@ static int llu_inode_revalidate(struct inode *inode)
 
 
                 llu_update_inode(inode, &md);
-                if (md.lsm != NULL && lli->lli_smd != md.lsm)
-                        obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
-                ptlrpc_req_finished(req);
-        }
+               if (md.lsm != NULL)
+                       obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+               ptlrpc_req_finished(req);
+       }
 
-        if (!lli->lli_smd) {
+       if (!lli->lli_has_smd) {
                 /* object not yet allocated, don't validate size */
                 st->st_atime = lli->lli_lvb.lvb_atime;
                 st->st_mtime = lli->lli_lvb.lvb_mtime;
@@ -509,8 +506,9 @@ static int null_if_equal(struct ldlm_lock *lock, void *data)
 
 void llu_clear_inode(struct inode *inode)
 {
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct llu_sb_info *sbi = llu_i2sbi(inode);
+       struct llu_inode_info *lli = llu_i2info(inode);
+       struct llu_sb_info *sbi = llu_i2sbi(inode);
+       struct lov_stripe_md *lsm;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu/%lu(%p)\n",
@@ -521,16 +519,13 @@ void llu_clear_inode(struct inode *inode)
         md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
                          null_if_equal, inode);
 
-        if (lli->lli_smd)
-                obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd,
-                                  null_if_equal, inode);
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm != NULL)
+               obd_change_cbdata(sbi->ll_dt_exp, lsm, null_if_equal, inode);
+       ccc_inode_lsm_put(inode, lsm);
 
-        cl_inode_fini(inode);
-
-        if (lli->lli_smd) {
-                obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
-                lli->lli_smd = NULL;
-        }
+       cl_inode_fini(inode);
+       lli->lli_has_smd = false;
 
         if (lli->lli_symlink_name) {
                 OBD_FREE(lli->lli_symlink_name,
@@ -563,7 +558,7 @@ static int inode_setattr(struct inode * inode, struct iattr * attr)
          * inode_setattr() is only ever invoked with ATTR_SIZE (by
          * llu_setattr_raw()) when file has no bodies. Check this.
          */
-        LASSERT(ergo(ia_valid & ATTR_SIZE, llu_i2info(inode)->lli_smd == NULL));
+       LASSERT(ergo(ia_valid & ATTR_SIZE, !llu_i2info(inode)->lli_has_smd));
 
         if (ia_valid & ATTR_SIZE)
                 st->st_size = attr->ia_size;
@@ -675,7 +670,7 @@ static int llu_setattr_done_writing(struct inode *inode,
  */
 int llu_setattr_raw(struct inode *inode, struct iattr *attr)
 {
-        struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd;
+       int has_lsm = llu_i2info(inode)->lli_has_smd;
         struct intnl_stat *st = llu_i2stat(inode);
         int ia_valid = attr->ia_valid;
         struct md_op_data op_data = { { 0 } };
@@ -713,18 +708,18 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
         if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
                 CDEBUG(D_INODE, "setting mtime "CFS_TIME_T", ctime "CFS_TIME_T
                       ", now = "CFS_TIME_T"\n",
-                       LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
-                       LTIME_S(CFS_CURRENT_TIME));
-
-        /* NB: ATTR_SIZE will only be set after this point if the size
-         * resides on the MDS, ie, this file has no objects. */
-        if (lsm)
-                attr->ia_valid &= ~ATTR_SIZE;
-
-        /* If only OST attributes being set on objects, don't do MDS RPC.
-         * In that case, we need to check permissions and update the local
-         * inode ourselves so we can call obdo_from_inode() always. */
-        if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
+                      LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
+                      LTIME_S(CFS_CURRENT_TIME));
+
+       /* NB: ATTR_SIZE will only be set after this point if the size
+        * resides on the MDS, ie, this file has no objects. */
+       if (has_lsm)
+               attr->ia_valid &= ~ATTR_SIZE;
+
+       /* If only OST attributes being set on objects, don't do MDS RPC.
+        * In that case, we need to check permissions and update the local
+        * inode ourselves so we can call obdo_from_inode() always. */
+       if (ia_valid & (has_lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
                 memcpy(&op_data.op_attr, attr, sizeof(*attr));
 
                 /* Open epoch for truncate. */
@@ -736,7 +731,7 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                         RETURN(rc);
 
                 llu_ioepoch_open(llu_i2info(inode), op_data.op_ioepoch);
-                if (!lsm || !S_ISREG(st->st_mode)) {
+               if (!has_lsm || !S_ISREG(st->st_mode)) {
                         CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
                         GOTO(out, rc);
                 }
@@ -1656,7 +1651,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         int rc = 0;
         ENTRY;
 
-        if (lli->lli_smd) {
+       if (lli->lli_has_smd) {
                 CDEBUG(D_IOCTL, "stripe already exists for ino "DFID"\n",
                        PFID(&lli->lli_fid));
                 return -EEXIST;
@@ -1732,13 +1727,15 @@ static int llu_lov_setstripe(struct inode *ino, unsigned long arg)
 
 static int llu_lov_getstripe(struct inode *ino, unsigned long arg)
 {
-        struct lov_stripe_md *lsm = llu_i2info(ino)->lli_smd;
-
-        if (!lsm)
-                RETURN(-ENODATA);
-
-        return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm,
-                            (void *)arg);
+       struct lov_stripe_md *lsm = NULL;
+       int rc = -ENODATA;
+
+       lsm = ccc_inode_lsm_get(ino);
+       if (lsm != NULL)
+               rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm,
+                                  (void *)arg);
+       ccc_inode_lsm_put(ino, lsm);
+       return rc;
 }
 
 static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
index c15e3e7..4a4f5f5 100644 (file)
@@ -135,8 +135,8 @@ static inline int return_if_equal(struct ldlm_lock *lock, void *data)
  *      < 0    error */
 static int find_cbdata(struct inode *inode)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct lov_stripe_md *lsm;
         int rc = 0;
         ENTRY;
 
@@ -146,11 +146,14 @@ static int find_cbdata(struct inode *inode)
         if (rc != 0)
                  RETURN(rc);
 
-        if (lli->lli_smd)
-                rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
-                                     return_if_equal, NULL);
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm == NULL)
+               RETURN(rc);
 
-        RETURN(rc);
+       rc = obd_find_cbdata(sbi->ll_dt_exp, lsm, return_if_equal, NULL);
+       ccc_inode_lsm_put(inode, lsm);
+
+       RETURN(rc);
 }
 
 /**
@@ -178,24 +181,28 @@ static int ll_ddelete(HAVE_D_DELETE_CONST struct dentry *de)
        LASSERT(d_refcount(de) == 1);
 #endif
 
+       /* Disable this piece of code temproarily because this is called
+        * inside dcache_lock so it's not appropriate to do lots of work
+        * here. */
+#if 0
        /* if not ldlm lock for this inode, set i_nlink to 0 so that
         * this inode can be recycled later b=20433 */
        if (de->d_inode && !find_cbdata(de->d_inode))
                de->d_inode->i_nlink = 0;
+#endif
 
        if (d_lustre_invalid((struct dentry *)de))
-                RETURN(1);
-
-        RETURN(0);
+               RETURN(1);
+       RETURN(0);
 }
 
 static int ll_set_dd(struct dentry *de)
 {
-        ENTRY;
-        LASSERT(de != NULL);
+       ENTRY;
+       LASSERT(de != NULL);
 
-        CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
-               de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
+       CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
+               de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
                d_refcount(de));
 
        if (de->d_fsdata == NULL) {
@@ -209,12 +216,12 @@ static int ll_set_dd(struct dentry *de)
                        else
                                OBD_FREE_PTR(lld);
                        spin_unlock(&de->d_lock);
-                } else {
-                        RETURN(-ENOMEM);
-                }
-        }
+               } else {
+                       RETURN(-ENOMEM);
+               }
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 int ll_dops_init(struct dentry *de, int block, int init_sa)
index 1893082..c9dad81 100644 (file)
@@ -274,8 +274,6 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
         RETURN(rc);
 }
 
-int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
-
 /* While this returns an error code, fput() the caller does not, so we need
  * to make every effort to clean up all of our state here.  Also, applications
  * rarely check close errors and even if an error is returned they will not
@@ -286,7 +284,6 @@ int ll_file_release(struct inode *inode, struct file *file)
         struct ll_file_data *fd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
         int rc;
         ENTRY;
 
@@ -326,8 +323,7 @@ int ll_file_release(struct inode *inode, struct file *file)
         }
 
         if (!S_ISDIR(inode->i_mode)) {
-                if (lsm)
-                        lov_test_and_clear_async_rc(lsm);
+               lov_read_and_clear_async_rc(lli->lli_clob);
                 lli->lli_async_rc = 0;
         }
 
@@ -497,7 +493,6 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
                                           .it_flags = file->f_flags };
-        struct lov_stripe_md *lsm;
         struct obd_client_handle **och_p = NULL;
         __u64 *och_usecount = NULL;
         struct ll_file_data *fd;
@@ -643,8 +638,7 @@ restart:
 
         ll_capa_open(inode);
 
-        lsm = lli->lli_smd;
-        if (lsm == NULL) {
+       if (!lli->lli_has_smd) {
                 if (file->f_flags & O_LOV_DELAY_CREATE ||
                     !(file->f_mode & FMODE_WRITE)) {
                         CDEBUG(D_INODE, "object creation was delayed\n");
@@ -736,55 +730,62 @@ static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
                      __u64 ioepoch, int sync)
 {
-        struct ll_inode_info *lli  = ll_i2info(inode);
-        struct obd_capa      *capa = ll_mdscapa_get(inode);
-        int rc;
-        ENTRY;
+       struct obd_capa      *capa = ll_mdscapa_get(inode);
+       struct lov_stripe_md *lsm;
+       int rc;
+       ENTRY;
 
-        rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode),
-                            capa, obdo, ioepoch, sync);
-        capa_put(capa);
-        if (rc == 0) {
-                obdo_refresh_inode(inode, obdo, obdo->o_valid);
-                CDEBUG(D_INODE,
-                       "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
-                       lli->lli_smd->lsm_object_id, i_size_read(inode),
-                       (unsigned long long)inode->i_blocks,
-                       (unsigned long)ll_inode_blksize(inode));
-        }
-        RETURN(rc);
+       lsm = ccc_inode_lsm_get(inode);
+       rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
+                           capa, obdo, ioepoch, sync);
+       capa_put(capa);
+       if (rc == 0) {
+               obdo_refresh_inode(inode, obdo, obdo->o_valid);
+               CDEBUG(D_INODE,
+                      "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
+                      lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
+                      (unsigned long long)inode->i_blocks,
+                      (unsigned long)ll_inode_blksize(inode));
+       }
+       ccc_inode_lsm_put(inode, lsm);
+       RETURN(rc);
 }
 
 int ll_merge_lvb(struct inode *inode)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ost_lvb lvb;
-        int rc;
-
-        ENTRY;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct lov_stripe_md *lsm;
+       struct ost_lvb lvb;
+       int rc = 0;
 
-        ll_inode_size_lock(inode, 1);
-        inode_init_lvb(inode, &lvb);
-
-        /* merge timestamps the most resently obtained from mds with
-           timestamps obtained from osts */
-        lvb.lvb_atime = lli->lli_lvb.lvb_atime;
-        lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
-        lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
-        rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
-        cl_isize_write_nolock(inode, lvb.lvb_size);
-
-        CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
-               PFID(&lli->lli_fid), lvb.lvb_size);
-        inode->i_blocks = lvb.lvb_blocks;
+       ENTRY;
 
-        LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
-        LTIME_S(inode->i_atime) = lvb.lvb_atime;
-        LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
-        ll_inode_size_unlock(inode, 1);
+       lsm = ccc_inode_lsm_get(inode);
+       ll_inode_size_lock(inode);
+       inode_init_lvb(inode, &lvb);
+
+       /* merge timestamps the most resently obtained from mds with
+          timestamps obtained from osts */
+       lvb.lvb_atime = lli->lli_lvb.lvb_atime;
+       lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
+       lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
+       if (lsm != NULL) {
+               rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
+               cl_isize_write_nolock(inode, lvb.lvb_size);
+
+               CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
+                               PFID(&lli->lli_fid), lvb.lvb_size);
+               inode->i_blocks = lvb.lvb_blocks;
+
+               LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+               LTIME_S(inode->i_atime) = lvb.lvb_atime;
+               LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
+       }
+       ll_inode_size_unlock(inode);
+       ccc_inode_lsm_put(inode, lsm);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
@@ -1221,22 +1222,22 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
                            obd_count ost_idx)
 {
-        struct obd_export *exp = ll_i2dtexp(inode);
-        struct obd_trans_info oti = { 0 };
-        struct obdo *oa = NULL;
-        int lsm_size;
-        int rc = 0;
-        struct lov_stripe_md *lsm, *lsm2;
-        ENTRY;
+       struct obd_export *exp = ll_i2dtexp(inode);
+       struct obd_trans_info oti = { 0 };
+       struct obdo *oa = NULL;
+       int lsm_size;
+       int rc = 0;
+       struct lov_stripe_md *lsm = NULL, *lsm2;
+       ENTRY;
 
-        OBDO_ALLOC(oa);
-        if (oa == NULL)
-                RETURN(-ENOMEM);
+       OBDO_ALLOC(oa);
+       if (oa == NULL)
+               RETURN(-ENOMEM);
 
-        ll_inode_size_lock(inode, 0);
-        lsm = ll_i2info(inode)->lli_smd;
+       lsm = ccc_inode_lsm_get(inode);
         if (lsm == NULL)
                 GOTO(out, rc = -ENOENT);
+
         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
                    (lsm->lsm_stripe_count));
 
@@ -1253,14 +1254,16 @@ static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
         memcpy(lsm2, lsm, lsm_size);
-        rc = obd_create(NULL, exp, oa, &lsm2, &oti);
+       ll_inode_size_lock(inode);
+       rc = obd_create(NULL, exp, oa, &lsm2, &oti);
+       ll_inode_size_unlock(inode);
 
-        OBD_FREE_LARGE(lsm2, lsm_size);
-        GOTO(out, rc);
+       OBD_FREE_LARGE(lsm2, lsm_size);
+       GOTO(out, rc);
 out:
-        ll_inode_size_unlock(inode, 0);
-        OBDO_FREE(oa);
-        return rc;
+       ccc_inode_lsm_put(inode, lsm);
+       OBDO_FREE(oa);
+       return rc;
 }
 
 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
@@ -1301,20 +1304,20 @@ static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
                              int flags, struct lov_user_md *lum, int lum_size)
 {
-        struct lov_stripe_md *lsm;
-        struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
-        int rc = 0;
-        ENTRY;
+       struct lov_stripe_md *lsm = NULL;
+       struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
+       int rc = 0;
+       ENTRY;
 
-        ll_inode_size_lock(inode, 0);
-        lsm = ll_i2info(inode)->lli_smd;
-        if (lsm) {
-                ll_inode_size_unlock(inode, 0);
-                CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
-                       inode->i_ino);
-                RETURN(-EEXIST);
-        }
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm != NULL) {
+               ccc_inode_lsm_put(inode, lsm);
+               CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
+                      inode->i_ino);
+               RETURN(-EEXIST);
+       }
 
+       ll_inode_size_lock(inode);
         rc = ll_intent_file_open(file, lum, lum_size, &oit);
         if (rc)
                 GOTO(out, rc);
@@ -1325,12 +1328,13 @@ int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
         ll_release_openhandle(file->f_dentry, &oit);
 
  out:
-        ll_inode_size_unlock(inode, 0);
-        ll_intent_release(&oit);
-        RETURN(rc);
+       ll_inode_size_unlock(inode);
+       ll_intent_release(&oit);
+       ccc_inode_lsm_put(inode, lsm);
+       RETURN(rc);
 out_req_free:
-        ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
-        goto out;
+       ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
+       goto out;
 }
 
 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
@@ -1464,24 +1468,28 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
 
         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
         if (rc == 0) {
-                 put_user(0, &lumv1p->lmm_stripe_count);
-                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
-                                    0, ll_i2info(inode)->lli_smd,
-                                    (void *)arg);
-        }
-        RETURN(rc);
+               struct lov_stripe_md *lsm;
+               put_user(0, &lumv1p->lmm_stripe_count);
+               lsm = ccc_inode_lsm_get(inode);
+               rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
+                                  0, lsm, (void *)arg);
+               ccc_inode_lsm_put(inode, lsm);
+       }
+       RETURN(rc);
 }
 
 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
 {
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        int rc = -ENODATA;
-        ENTRY;
+       struct lov_stripe_md *lsm;
+       int rc = -ENODATA;
+       ENTRY;
 
-        if (lsm != NULL)
-                rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
-                                   lsm, (void *)arg);
-        RETURN(rc);
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm != NULL)
+               rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
+                               lsm, (void *)arg);
+       ccc_inode_lsm_put(inode, lsm);
+       RETURN(rc);
 }
 
 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
@@ -1611,8 +1619,8 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
               int num_bytes)
 {
-        struct obd_export *exp = ll_i2dtexp(inode);
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+       struct obd_export *exp = ll_i2dtexp(inode);
+       struct lov_stripe_md *lsm = NULL;
         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
         int vallen = num_bytes;
         int rc;
@@ -1631,12 +1639,16 @@ int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
                         return rc;
         }
 
-        /* If the stripe_count > 1 and the application does not understand
-         * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
-         */
-        if (lsm->lsm_stripe_count > 1 &&
-            !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
-                return -EOPNOTSUPP;
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm == NULL)
+               return -ENOENT;
+
+       /* If the stripe_count > 1 and the application does not understand
+        * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
+        */
+       if (lsm->lsm_stripe_count > 1 &&
+           !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
+               GOTO(out, rc = -EOPNOTSUPP);
 
         fm_key.oa.o_id = lsm->lsm_object_id;
         fm_key.oa.o_seq = lsm->lsm_object_seq;
@@ -1647,7 +1659,7 @@ int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
         /* If filesize is 0, then there would be no objects for mapping */
         if (fm_key.oa.o_size == 0) {
                 fiemap->fm_mapped_extents = 0;
-                RETURN(0);
+               GOTO(out, rc = 0);
         }
 
         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
@@ -1657,7 +1669,9 @@ int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
         if (rc)
                 CERROR("obd_get_info failed: rc = %d\n", rc);
 
-        RETURN(rc);
+out:
+       ccc_inode_lsm_put(inode, lsm);
+       RETURN(rc);
 }
 
 int ll_fid2path(struct obd_export *exp, void *arg)
@@ -1760,22 +1774,25 @@ error:
 static int ll_data_version(struct inode *inode, __u64 *data_version,
                            int extent_lock)
 {
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        struct ll_sb_info    *sbi = ll_i2sbi(inode);
-        struct obdo          *obdo = NULL;
-        int                   rc;
-        ENTRY;
+       struct lov_stripe_md *lsm = NULL;
+       struct ll_sb_info    *sbi = ll_i2sbi(inode);
+       struct obdo          *obdo = NULL;
+       int                   rc;
+       ENTRY;
 
-        /* If no stripe, we consider version is 0. */
-        if (!lsm) {
+       /* If no stripe, we consider version is 0. */
+       lsm = ccc_inode_lsm_get(inode);
+       if (lsm == NULL) {
                 *data_version = 0;
                 CDEBUG(D_INODE, "No object for inode\n");
                 RETURN(0);
         }
 
         OBD_ALLOC_PTR(obdo);
-        if (obdo == NULL)
-                RETURN(-ENOMEM);
+       if (obdo == NULL) {
+               ccc_inode_lsm_put(inode, lsm);
+               RETURN(-ENOMEM);
+       }
 
         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
         if (!rc) {
@@ -1786,8 +1803,9 @@ static int ll_data_version(struct inode *inode, __u64 *data_version,
         }
 
         OBD_FREE_PTR(obdo);
+       ccc_inode_lsm_put(inode, lsm);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
@@ -1962,7 +1980,6 @@ int ll_flush(struct file *file)
 {
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
         int rc, err;
 
         LASSERT(!S_ISDIR(inode->i_mode));
@@ -1975,13 +1992,11 @@ int ll_flush(struct file *file)
          * failed for pages in this mapping. */
         rc = lli->lli_async_rc;
         lli->lli_async_rc = 0;
-        if (lsm) {
-                err = lov_test_and_clear_async_rc(lsm);
-                if (rc == 0)
-                        rc = err;
-        }
+       err = lov_read_and_clear_async_rc(lli->lli_clob);
+       if (rc == 0)
+               rc = err;
 
-        return rc ? -EIO : 0;
+       return rc ? -EIO : 0;
 }
 
 /**
@@ -2047,9 +2062,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
 {
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
         struct ptlrpc_request *req;
         struct obd_capa *oc;
+       struct lov_stripe_md *lsm;
         int rc, err;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
@@ -2067,11 +2082,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                 lli->lli_async_rc = 0;
                 if (rc == 0)
                         rc = err;
-                if (lsm) {
-                        err = lov_test_and_clear_async_rc(lsm);
-                        if (rc == 0)
-                                rc = err;
-                }
+               err = lov_read_and_clear_async_rc(lli->lli_clob);
+               if (rc == 0)
+                       rc = err;
         }
 
         oc = ll_mdscapa_get(inode);
@@ -2083,15 +2096,17 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         if (!err)
                 ptlrpc_req_finished(req);
 
-        if (data && lsm) {
+       lsm = ccc_inode_lsm_get(inode);
+       if (data && lsm) {
                err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
-                                        CL_FSYNC_ALL);
+                               CL_FSYNC_ALL);
                if (rc == 0 && err < 0)
-                        rc = err;
-                lli->lli_write_rc = rc < 0 ? rc : 0;
-        }
+                       rc = err;
+               lli->lli_write_rc = rc < 0 ? rc : 0;
+       }
+       ccc_inode_lsm_put(inode, lsm);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
@@ -2421,7 +2436,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
         rc = __ll_inode_revalidate_it(dentry, it, ibits);
 
         /* if object not yet allocated, don't validate size */
-        if (rc == 0 && ll_i2info(dentry->d_inode)->lli_smd == NULL) {
+       if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
index d20062a..8136658 100644 (file)
@@ -248,10 +248,7 @@ int ll_som_update(struct inode *inode, struct md_op_data *op_data)
                                       old_flags & MF_GETATTR_LOCK);
                 if (rc) {
                         oa->o_valid = 0;
-                        if (rc == -ENOENT)
-                                CDEBUG(D_INODE, "objid "LPX64" is destroyed\n",
-                                       lli->lli_smd->lsm_object_id);
-                        else
+                       if (rc != -ENOENT)
                                 CERROR("inode_getattr failed (%d): unable to "
                                        "send a Size-on-MDS attribute update "
                                        "for inode %lu/%u\n", rc, inode->i_ino,
index cdf30ed..ecc249e 100644 (file)
@@ -162,7 +162,7 @@ struct ll_inode_info {
         __u64                           lli_open_fd_write_count;
         __u64                           lli_open_fd_exec_count;
         /* Protects access to och pointers and their usage counters, also
-         * atomicity of check-update of lli_smd */
+        * atomicity of check-update of lli_has_smd */
         cfs_mutex_t                     lli_och_mutex;
 
         struct inode                    lli_vfs_inode;
@@ -264,20 +264,19 @@ struct ll_inode_info {
          *      In the future, if more members are added only for directory,
          *      some of the following members can be moved into u.f.
          */
-        struct lov_stripe_md           *lli_smd;
+       bool                            lli_has_smd;
         struct cl_object               *lli_clob;
 };
 
 /*
  * Locking to guarantee consistency of non-atomic updates to long long i_size,
- * consistency between file size and KMS, and consistency within
- * ->lli_smd->lsm_oinfo[]'s.
+ * consistency between file size and KMS.
  *
- * Implemented by ->lli_size_sem and ->lsm_sem, nested in that order.
+ * Implemented by ->lli_size_sem and ->lsm_lock, nested in that order.
  */
 
-void ll_inode_size_lock(struct inode *inode, int lock_lsm);
-void ll_inode_size_unlock(struct inode *inode, int unlock_lsm);
+void ll_inode_size_lock(struct inode *inode);
+void ll_inode_size_unlock(struct inode *inode);
 
 // FIXME: replace the name of this with LL_I to conform to kernel stuff
 // static inline struct ll_inode_info *LL_I(struct inode *inode)
@@ -1372,14 +1371,14 @@ static inline struct ll_file_data *cl_iattr2fd(struct inode *inode,
         return LUSTRE_FPRIVATE(attr->ia_file);
 }
 
-static inline void cl_isize_lock(struct inode *inode, int lsmlock)
+static inline void cl_isize_lock(struct inode *inode)
 {
-        ll_inode_size_lock(inode, lsmlock);
+       ll_inode_size_lock(inode);
 }
 
-static inline void cl_isize_unlock(struct inode *inode, int lsmlock)
+static inline void cl_isize_unlock(struct inode *inode)
 {
-        ll_inode_size_unlock(inode, lsmlock);
+       ll_inode_size_unlock(inode);
 }
 
 static inline void cl_isize_write_nolock(struct inode *inode, loff_t kms)
@@ -1390,9 +1389,9 @@ static inline void cl_isize_write_nolock(struct inode *inode, loff_t kms)
 
 static inline void cl_isize_write(struct inode *inode, loff_t kms)
 {
-        ll_inode_size_lock(inode, 0);
-        i_size_write(inode, kms);
-        ll_inode_size_unlock(inode, 0);
+       ll_inode_size_lock(inode);
+       i_size_write(inode, kms);
+       ll_inode_size_unlock(inode);
 }
 
 #define cl_isize_read(inode)             i_size_read(inode)
index 4c89098..ec264c4 100644 (file)
@@ -883,7 +883,7 @@ void ll_lli_init(struct ll_inode_info *lli)
         lli->lli_open_fd_exec_count = 0;
         cfs_mutex_init(&lli->lli_och_mutex);
         cfs_spin_lock_init(&lli->lli_agl_lock);
-        lli->lli_smd = NULL;
+       lli->lli_has_smd = false;
         lli->lli_clob = NULL;
 
         LASSERT(lli->lli_vfs_inode.i_mode != 0);
@@ -1188,14 +1188,9 @@ void ll_clear_inode(struct inode *inode)
          * cl_object still uses inode lsm.
          */
         cl_inode_fini(inode);
+       lli->lli_has_smd = false;
 
-        if (lli->lli_smd) {
-                obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
-                lli->lli_smd = NULL;
-        }
-
-
-        EXIT;
+       EXIT;
 }
 
 int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data,
@@ -1329,9 +1324,9 @@ static int ll_setattr_ost(struct inode *inode, struct iattr *attr)
  */
 int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
 {
+       struct lov_stripe_md *lsm;
         struct inode *inode = dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
         struct md_op_data *op_data = NULL;
         struct md_open_data *mod = NULL;
         int ia_valid = attr->ia_valid;
@@ -1386,11 +1381,6 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                        LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
                        cfs_time_current_sec());
 
-        /* NB: ATTR_SIZE will only be set after this point if the size
-         * resides on the MDS, ie, this file has no objects. */
-        if (lsm)
-                attr->ia_valid &= ~ATTR_SIZE;
-
         /* We always do an MDS RPC, even if we're only changing the size;
          * only the MDS knows whether truncate() should fail with -ETXTBUSY */
 
@@ -1408,6 +1398,15 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                         DOWN_WRITE_I_ALLOC_SEM(inode);
         }
 
+       /* We need a steady stripe configuration for setattr to avoid
+        * confusion. */
+       lsm = ccc_inode_lsm_get(inode);
+
+       /* NB: ATTR_SIZE will only be set after this point if the size
+        * resides on the MDS, ie, this file has no objects. */
+       if (lsm != NULL)
+               attr->ia_valid &= ~ATTR_SIZE;
+
         memcpy(&op_data->op_attr, attr, sizeof(*attr));
 
         /* Open epoch for truncate. */
@@ -1420,7 +1419,7 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                 GOTO(out, rc);
 
         ll_ioepoch_open(lli, op_data->op_ioepoch);
-        if (!lsm || !S_ISREG(inode->i_mode)) {
+       if (lsm == NULL || !S_ISREG(inode->i_mode)) {
                 CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
                 GOTO(out, rc = 0);
         }
@@ -1439,6 +1438,7 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
                 rc = ll_setattr_ost(inode, attr);
         EXIT;
 out:
+       ccc_inode_lsm_put(inode, lsm);
         if (op_data) {
                 if (op_data->op_ioepoch) {
                         rc1 = ll_setattr_done_writing(inode, op_data, mod);
@@ -1577,10 +1577,9 @@ int ll_statfs(struct dentry *de, struct kstatfs *sfs)
         return 0;
 }
 
-void ll_inode_size_lock(struct inode *inode, int lock_lsm)
+void ll_inode_size_lock(struct inode *inode)
 {
         struct ll_inode_info *lli;
-        struct lov_stripe_md *lsm;
 
         LASSERT(!S_ISDIR(inode->i_mode));
 
@@ -1589,24 +1588,13 @@ void ll_inode_size_lock(struct inode *inode, int lock_lsm)
         cfs_down(&lli->lli_size_sem);
         LASSERT(lli->lli_size_sem_owner == NULL);
         lli->lli_size_sem_owner = current;
-        lsm = lli->lli_smd;
-        LASSERTF(lsm != NULL || lock_lsm == 0, "lsm %p, lock_lsm %d\n",
-                 lsm, lock_lsm);
-        if (lock_lsm)
-                lov_stripe_lock(lsm);
 }
 
-void ll_inode_size_unlock(struct inode *inode, int unlock_lsm)
+void ll_inode_size_unlock(struct inode *inode)
 {
         struct ll_inode_info *lli;
-        struct lov_stripe_md *lsm;
 
         lli = ll_i2info(inode);
-        lsm = lli->lli_smd;
-        LASSERTF(lsm != NULL || unlock_lsm == 0, "lsm %p, lock_lsm %d\n",
-                 lsm, unlock_lsm);
-        if (unlock_lsm)
-                lov_stripe_unlock(lsm);
         LASSERT(lli->lli_size_sem_owner == current);
         lli->lli_size_sem_owner = NULL;
         cfs_up(&lli->lli_size_sem);
@@ -1624,43 +1612,20 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                 LASSERT(S_ISREG(inode->i_mode));
 
                 cfs_mutex_lock(&lli->lli_och_mutex);
-                if (lli->lli_smd == NULL) {
-                        if (lsm->lsm_magic != LOV_MAGIC_V1 &&
-                            lsm->lsm_magic != LOV_MAGIC_V3) {
-                                dump_lsm(D_ERROR, lsm);
-                                LBUG();
-                        }
-                        CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
-                               lsm, inode->i_ino, inode->i_generation, inode);
-                        /* cl_file_inode_init must go before lli_smd or a race
-                         * is possible where client thinks the file has stripes,
-                         * but lov raid0 is not setup yet and parallel e.g.
-                         * glimpse would try to use uninitialized lov */
-                        cl_file_inode_init(inode, md);
-                        cfs_spin_lock(&lli->lli_lock);
-                        lli->lli_smd = lsm;
-                        cfs_spin_unlock(&lli->lli_lock);
-                        cfs_mutex_unlock(&lli->lli_och_mutex);
-                        lli->lli_maxbytes = lsm->lsm_maxbytes;
-                        if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
-                                lli->lli_maxbytes = MAX_LFS_FILESIZE;
-                } else {
-                        cfs_mutex_unlock(&lli->lli_och_mutex);
-                        LASSERT(lli->lli_smd->lsm_magic == lsm->lsm_magic &&
-                                lli->lli_smd->lsm_stripe_count ==
-                                lsm->lsm_stripe_count);
-                        if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
-                                CERROR("lsm mismatch for inode %ld\n",
-                                       inode->i_ino);
-                                CERROR("lli_smd:\n");
-                                dump_lsm(D_ERROR, lli->lli_smd);
-                                CERROR("lsm:\n");
-                                dump_lsm(D_ERROR, lsm);
-                                LBUG();
-                        }
-                }
-                if (lli->lli_smd != lsm)
-                        obd_free_memmd(ll_i2dtexp(inode), &lsm);
+               CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
+                               lsm, inode->i_ino, inode->i_generation, inode);
+               /* cl_file_inode_init must go before lli_has_smd or a race
+                * is possible where client thinks the file has stripes,
+                * but lov raid0 is not setup yet and parallel e.g.
+                * glimpse would try to use uninitialized lov */
+               if (cl_file_inode_init(inode, md) == 0)
+                       lli->lli_has_smd = true;
+               cfs_mutex_unlock(&lli->lli_och_mutex);
+               lli->lli_maxbytes = lsm->lsm_maxbytes;
+               if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
+                       lli->lli_maxbytes = MAX_LFS_FILESIZE;
+               if (md->lsm != NULL)
+                       obd_free_memmd(ll_i2dtexp(inode), &md->lsm);
         }
 
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
@@ -1738,7 +1703,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
 
         if (body->valid & OBD_MD_FLSIZE) {
                 if (exp_connect_som(ll_i2mdexp(inode)) &&
-                    S_ISREG(inode->i_mode) && lli->lli_smd) {
+                   S_ISREG(inode->i_mode) && lli->lli_has_smd) {
                         struct lustre_handle lockh;
                         ldlm_mode_t mode;
 
@@ -1796,7 +1761,7 @@ void ll_read_inode2(struct inode *inode, void *opaque)
         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
                PFID(&lli->lli_fid), inode);
 
-        LASSERT(!lli->lli_smd);
+       LASSERT(!lli->lli_has_smd);
 
         /* Core attributes from the MDS first.  This is a new inode, and
          * the VFS doesn't zero times in the core inode so we have to do
@@ -1914,18 +1879,13 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                 RETURN(put_user(flags, (int *)arg));
         }
         case FSFILT_IOC_SETFLAGS: {
-                struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+               struct lov_stripe_md *lsm;
                 struct obd_info oinfo = { { { 0 } } };
                 struct md_op_data *op_data;
 
                 if (get_user(flags, (int *)arg))
                         RETURN(-EFAULT);
 
-                oinfo.oi_md = lsm;
-                OBDO_ALLOC(oinfo.oi_oa);
-                if (!oinfo.oi_oa)
-                        RETURN(-ENOMEM);
-
                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
                                              LUSTRE_OPC_ANY, NULL);
                 if (IS_ERR(op_data))
@@ -1937,16 +1897,21 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                                 NULL, 0, NULL, 0, &req, NULL);
                 ll_finish_md_op_data(op_data);
                 ptlrpc_req_finished(req);
-                if (rc) {
-                        OBDO_FREE(oinfo.oi_oa);
-                        RETURN(rc);
-                }
+               if (rc)
+                       RETURN(rc);
 
-                if (lsm == NULL) {
-                        OBDO_FREE(oinfo.oi_oa);
-                        GOTO(update_cache, rc);
-                }
+               OBDO_ALLOC(oinfo.oi_oa);
+               if (!oinfo.oi_oa)
+                       RETURN(-ENOMEM);
+
+               lsm = ccc_inode_lsm_get(inode);
+               if (lsm == NULL) {
+                       inode->i_flags = ll_ext_to_inode_flags(flags);
+                       OBDO_FREE(oinfo.oi_oa);
+                       RETURN(0);
+               }
 
+               oinfo.oi_md = lsm;
                 oinfo.oi_oa->o_id = lsm->lsm_object_id;
                 oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
                 oinfo.oi_oa->o_flags = flags;
@@ -1957,16 +1922,12 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                 rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
                 capa_put(oinfo.oi_capa);
                 OBDO_FREE(oinfo.oi_oa);
-                if (rc) {
-                        if (rc != -EPERM && rc != -EACCES)
-                                CERROR("osc_setattr_async fails: rc = %d\n",rc);
-                        RETURN(rc);
-                }
+               ccc_inode_lsm_put(inode, lsm);
 
-                EXIT;
-update_cache:
-                inode->i_flags = ll_ext_to_inode_flags(flags);
-                return 0;
+               if (rc && rc != -EPERM && rc != -EACCES)
+                       CERROR("osc_setattr_async fails: rc = %d\n", rc);
+
+               RETURN(rc);
         }
         default:
                 RETURN(-ENOSYS);
@@ -2266,12 +2227,11 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
          * try to use the default stripe data from parent directory for
          * allocating OST objects.  Try to pass the parent FID to MDS. */
         if (opc == LUSTRE_OPC_CREATE && i1 == i2 && S_ISREG(i2->i_mode) &&
-            ll_i2info(i2)->lli_smd == NULL) {
-                struct ll_inode_info *lli = ll_i2info(i2);
+           !ll_i2info(i2)->lli_has_smd) {
+               struct ll_inode_info *lli = ll_i2info(i2);
 
-                cfs_spin_lock(&lli->lli_lock);
-                if (likely(lli->lli_smd == NULL &&
-                           !fid_is_zero(&lli->lli_pfid)))
+               cfs_spin_lock(&lli->lli_lock);
+               if (likely(!lli->lli_has_smd && !fid_is_zero(&lli->lli_pfid)))
                         op_data->op_fid1 = lli->lli_pfid;
                 cfs_spin_unlock(&lli->lli_lock);
                 /** We ignore parent's capability temporary. */
index a0d581b..809a564 100644 (file)
@@ -144,7 +144,7 @@ ll_iget_for_nfs(struct super_block *sb, struct lu_fid *fid, struct lu_fid *paren
          * It is an anonymous dentry without OST objects created yet.
          * We have to find the parent to tell MDS how to init lov objects.
          */
-        if (S_ISREG(inode->i_mode) && ll_i2info(inode)->lli_smd == NULL &&
+       if (S_ISREG(inode->i_mode) && !ll_i2info(inode)->lli_has_smd &&
             parent != NULL) {
                 struct ll_inode_info *lli = ll_i2info(inode);
 
index db4d451..01c995e 100644 (file)
@@ -1296,7 +1296,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                         result = cl_io_read_page(env, io, page);
                 } else {
                         /* Page from a non-object file. */
-                        LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
+                       LASSERT(!ll_i2info(vmpage->mapping->host)->lli_has_smd);
                         unlock_page(vmpage);
                         result = 0;
                 }
index 6335b7f..4aba4f0 100644 (file)
@@ -385,13 +385,12 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
         long count = iov_length(iov, nr_segs);
         long tot_bytes = 0, result = 0;
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
         unsigned long seg = 0;
         long size = MAX_DIO_SIZE;
         int refcheck;
         ENTRY;
 
-        if (!lli->lli_smd || !lli->lli_smd->lsm_object_id)
+       if (!lli->lli_has_smd)
                 RETURN(-EBADF);
 
         /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
@@ -488,14 +487,19 @@ out:
 
         if (tot_bytes > 0) {
                 if (rw == WRITE) {
-                        lov_stripe_lock(lsm);
-                        obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
-                        lov_stripe_unlock(lsm);
-                }
-        }
-
-        cl_env_put(env, &refcheck);
-        RETURN(tot_bytes ? : result);
+                       struct lov_stripe_md *lsm;
+
+                       lsm = ccc_inode_lsm_get(inode);
+                       LASSERT(lsm != NULL);
+                       lov_stripe_lock(lsm);
+                       obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
+                       lov_stripe_unlock(lsm);
+                       ccc_inode_lsm_put(inode, lsm);
+               }
+       }
+
+       cl_env_put(env, &refcheck);
+       RETURN(tot_bytes ? : result);
 }
 
 #if defined(HAVE_KERNEL_WRITE_BEGIN_END) || defined(MS_HAS_NEW_AOPS)
index d911ab6..caf9430 100644 (file)
@@ -139,10 +139,10 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 static inline int agl_should_run(struct ll_statahead_info *sai,
                                  struct inode *inode)
 {
-        if (inode != NULL && S_ISREG(inode->i_mode) &&
-            ll_i2info(inode)->lli_smd != NULL && sai->sai_agl_valid)
-                return 1;
-        return 0;
+       if (inode != NULL && S_ISREG(inode->i_mode) &&
+           ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid)
+               return 1;
+       return 0;
 }
 
 static inline struct ll_sa_entry *
index 9a59011..d1e9f31 100644 (file)
@@ -128,7 +128,7 @@ static int ll_readlink(struct dentry *dentry, char *buffer, int buflen)
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        ll_inode_size_lock(inode, 0);
+       ll_inode_size_lock(inode);
         rc = ll_readlink_internal(inode, &request, &symname);
         if (rc)
                 GOTO(out, rc);
@@ -136,8 +136,8 @@ static int ll_readlink(struct dentry *dentry, char *buffer, int buflen)
         rc = vfs_readlink(dentry, buffer, buflen, symname);
  out:
         ptlrpc_req_finished(request);
-        ll_inode_size_unlock(inode, 0);
-        RETURN(rc);
+       ll_inode_size_unlock(inode);
+       RETURN(rc);
 }
 
 #ifdef HAVE_COOKIE_FOLLOW_LINK
@@ -164,9 +164,9 @@ static LL_FOLLOW_LINK_RETURN_TYPE ll_follow_link(struct dentry *dentry,
         } else if (THREAD_SIZE == 8192 && current->link_count >= 8) {
                 rc = -ELOOP;
         } else {
-                ll_inode_size_lock(inode, 0);
-                rc = ll_readlink_internal(inode, &request, &symname);
-                ll_inode_size_unlock(inode, 0);
+               ll_inode_size_lock(inode);
+               rc = ll_readlink_internal(inode, &request, &symname);
+               ll_inode_size_unlock(inode);
         }
         if (rc) {
                 cfs_path_put(nd); /* Kernel assumes that ->follow_link()
index f534184..b5f6995 100644 (file)
@@ -239,7 +239,7 @@ static int vvp_io_read_lock(const struct lu_env *env,
 
         ENTRY;
         /* XXX: Layer violation, we shouldn't see lsm at llite level. */
-        if (lli->lli_smd != NULL) /* lsm-less file, don't need to lock */
+       if (lli->lli_has_smd) /* lsm-less file doesn't need to lock */
                 result = vvp_io_rw_lock(env, io, CLM_READ,
                                         io->u.ci_rd.rd.crw_pos,
                                         io->u.ci_rd.rd.crw_pos +
@@ -332,16 +332,16 @@ static int vvp_io_setattr_lock(const struct lu_env *env,
 
 static int vvp_do_vmtruncate(struct inode *inode, size_t size)
 {
-        int     result;
-        /*
-         * Only ll_inode_size_lock is taken at this level. lov_stripe_lock()
-         * is grabbed by ll_truncate() only over call to obd_adjust_kms().
-         */
-        ll_inode_size_lock(inode, 0);
-        result = vmtruncate(inode, size);
-        ll_inode_size_unlock(inode, 0);
+       int     result;
+       /*
+        * Only ll_inode_size_lock is taken at this level. lov_stripe_lock()
+        * is grabbed by ll_truncate() only over call to obd_adjust_kms().
+        */
+       ll_inode_size_lock(inode);
+       result = vmtruncate(inode, size);
+       ll_inode_size_unlock(inode);
 
-        return result;
+       return result;
 }
 
 static int vvp_io_setattr_trunc(const struct lu_env *env,
@@ -1037,7 +1037,7 @@ static int vvp_io_commit_write(const struct lu_env *env,
 
         size = cl_offset(obj, pg->cp_index) + to;
 
-        ll_inode_size_lock(inode, 0);
+       ll_inode_size_lock(inode);
         if (result == 0) {
                 if (size > i_size_read(inode)) {
                         cl_isize_write_nolock(inode, size);
@@ -1050,8 +1050,8 @@ static int vvp_io_commit_write(const struct lu_env *env,
                 if (size > i_size_read(inode))
                         cl_page_discard(env, io, pg);
         }
-        ll_inode_size_unlock(inode, 0);
-        RETURN(result);
+       ll_inode_size_unlock(inode);
+       RETURN(result);
 }
 
 static const struct cl_io_operations vvp_io_ops = {
index aaa7631..c765611 100644 (file)
@@ -431,6 +431,7 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name,
             (strncmp(name, XATTR_LUSTRE_PREFIX,
                      sizeof(XATTR_LUSTRE_PREFIX) - 1) == 0 &&
              strcmp(name + sizeof(XATTR_LUSTRE_PREFIX) - 1, "lov") == 0)) {
+               struct lov_stripe_md *lsm;
                 struct lov_user_md *lump;
                 struct lov_mds_md *lmm = NULL;
                 struct ptlrpc_request *request = NULL;
@@ -445,7 +446,8 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name,
                         GOTO(out, rc = sizeof(struct lov_user_md));
                 }
 
-                if (!ll_i2info(inode)->lli_smd) {
+               lsm = ccc_inode_lsm_get(inode);
+               if (lsm == NULL) {
                         if (S_ISDIR(inode->i_mode)) {
                                 rc = ll_dir_getstripe(inode, &lmm,
                                                       &lmmsize, &request);
@@ -455,10 +457,10 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name,
                 } else {
                         /* LSM is present already after lookup/getattr call.
                          * we need to grab layout lock once it is implemented */
-                        rc = obd_packmd(ll_i2dtexp(inode), &lmm,
-                                        ll_i2info(inode)->lli_smd);
-                        lmmsize = rc;
-                }
+                       rc = obd_packmd(ll_i2dtexp(inode), &lmm, lsm);
+                       lmmsize = rc;
+               }
+               ccc_inode_lsm_put(inode, lsm);
 
                 if (rc < 0)
                        GOTO(out, rc);
@@ -535,8 +537,8 @@ ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size)
                        rc -= xlen;
                }
        }
-        if (S_ISREG(inode->i_mode)) {
-                if (ll_i2info(inode)->lli_smd == NULL)
+       if (S_ISREG(inode->i_mode)) {
+               if (!ll_i2info(inode)->lli_has_smd)
                         rc2 = -1;
         } else if (S_ISDIR(inode->i_mode)) {
                 rc2 = ll_dir_getstripe(inode, &lmm, &lmmsize, &request);
index f9faf65..06834f7 100644 (file)
@@ -199,6 +199,10 @@ struct lov_object {
          * Type of an object. Protected by lov_object::lo_type_guard.
          */
         enum lov_layout_type   lo_type;
+       /**
+        * Waitq - wait for no one else is using lo_lsm
+        */
+       cfs_waitq_t            lo_waitq;
 
         union lov_layout_state {
                 struct lov_layout_raid0 {
@@ -469,6 +473,11 @@ struct lov_io {
          * lov_io::lis_cl::cis_object.
          */
         struct lov_object *lis_object;
+       /**
+        * Lov stripe - this determines how this io fans out.
+        * Hold a refcount to the lsm so it can't go away during IO.
+        */
+       struct lov_stripe_md *lis_lsm;
         /**
          * Original end-of-io position for this IO, set by the upper layer as
          * cl_io::u::ci_rw::pos + cl_io::u::ci_rw::count. lov remembers this,
@@ -603,6 +612,8 @@ struct lov_io_sub    *lov_page_subio    (const struct lu_env *env,
                                          struct lov_io *lio,
                                          const struct cl_page_slice *slice);
 
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm);
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
 
 #define lov_foreach_target(lov, var)                    \
         for (var = 0; var < lov_targets_nr(lov); ++var)
index 73e894a..a2528a0 100644 (file)
@@ -265,7 +265,7 @@ int lov_getstripe(struct obd_export *exp,
                   struct lov_stripe_md *lsm, struct lov_user_md *lump);
 int lov_alloc_memmd(struct lov_stripe_md **lsmp, __u16 stripe_count,
                     int pattern, int magic);
-void lov_free_memmd(struct lov_stripe_md **lsmp);
+int lov_free_memmd(struct lov_stripe_md **lsmp);
 
 void lov_dump_lmm_v1(int level, struct lov_mds_md_v1 *lmm);
 void lov_dump_lmm_v3(int level, struct lov_mds_md_v3 *lmm);
@@ -310,4 +310,11 @@ struct pool_desc *lov_find_pool(struct lov_obd *lov, char *poolname);
 int lov_check_index_in_pool(__u32 idx, struct pool_desc *pool);
 void lov_pool_putref(struct pool_desc *pool);
 
+static inline struct lov_stripe_md *lsm_addref(struct lov_stripe_md *lsm)
+{
+       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+       cfs_atomic_inc(&lsm->lsm_refc);
+       return lsm;
+}
+
 #endif
index ab0e41b..12f620b 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_io for LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOV
@@ -84,7 +85,7 @@ static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
                                int stripe, loff_t start, loff_t end)
 {
-        struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
+       struct lov_stripe_md *lsm    = lio->lis_lsm;
         struct cl_io         *parent = lio->lis_cl.cis_io;
 
         switch(io->ci_type) {
@@ -259,7 +260,7 @@ static int lov_page_stripe(const struct cl_page *page)
 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
                                   const struct cl_page_slice *slice)
 {
-        struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
+       struct lov_stripe_md *lsm  = lio->lis_lsm;
         struct cl_page       *page = slice->cpl_page;
         int stripe;
 
@@ -277,8 +278,7 @@ struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
                              struct cl_io *io)
 {
-        struct lov_object    *lov = lio->lis_object;
-        struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+       struct lov_stripe_md *lsm = lio->lis_lsm;
         int result;
 
         LASSERT(lio->lis_object != NULL);
@@ -303,13 +303,14 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
 static void lov_io_slice_init(struct lov_io *lio,
                               struct lov_object *obj, struct cl_io *io)
 {
-        struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
+       struct lov_stripe_md *lsm = lov_lsm_addref(obj);
+       ENTRY;
 
-        LASSERT(lsm != NULL);
-        ENTRY;
+       io->ci_result = 0;
+       lio->lis_object = obj;
 
-        io->ci_result = 0;
-        lio->lis_object = obj;
+       LASSERT(lsm != NULL);
+       lio->lis_lsm = lsm; /* called inside lo_type_guard. */
         lio->lis_stripe_count = lsm->lsm_stripe_count;
 
         switch (io->ci_type) {
@@ -371,7 +372,9 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
                 lio->lis_nr_subios = 0;
         }
-        EXIT;
+       lov_lsm_decref(lio->lis_object, lio->lis_lsm);
+       lio->lis_lsm = NULL;
+       EXIT;
 }
 
 static obd_off lov_offset_mod(obd_off val, int delta)
@@ -384,8 +387,8 @@ static obd_off lov_offset_mod(obd_off val, int delta)
 static int lov_io_iter_init(const struct lu_env *env,
                             const struct cl_io_slice *ios)
 {
-        struct lov_io        *lio = cl2lov_io(env, ios);
-        struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
+       struct lov_io        *lio = cl2lov_io(env, ios);
+       struct lov_stripe_md *lsm = lio->lis_lsm;
         struct lov_io_sub    *sub;
         obd_off endpos;
         obd_off start;
@@ -423,9 +426,9 @@ static int lov_io_iter_init(const struct lu_env *env,
 static int lov_io_rw_iter_init(const struct lu_env *env,
                                const struct cl_io_slice *ios)
 {
-        struct lov_io        *lio = cl2lov_io(env, ios);
-        struct cl_io         *io  = ios->cis_io;
-        struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
+       struct lov_io        *lio = cl2lov_io(env, ios);
+       struct cl_io         *io  = ios->cis_io;
+       struct lov_stripe_md *lsm = lio->lis_lsm;
         loff_t start = io->u.ci_rw.crw_pos;
         loff_t next;
         unsigned long ssize = lsm->lsm_stripe_size;
@@ -449,12 +452,12 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
                CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
                       LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
                       (__u64)lio->lis_io_endpos);
-        }
-        /*
-         * XXX The following call should be optimized: we know, that
-         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
-         */
-        RETURN(lov_io_iter_init(env, ios));
+       }
+       /*
+        * XXX The following call should be optimized: we know, that
+        * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
+        */
+       RETURN(lov_io_iter_init(env, ios));
 }
 
 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
@@ -474,8 +477,8 @@ static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
 
                if (parent->ci_result == 0)
                        parent->ci_result = sub->sub_io->ci_result;
-        }
-        RETURN(rc);
+       }
+       RETURN(rc);
 }
 
 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
@@ -913,7 +916,6 @@ int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
         CFS_INIT_LIST_HEAD(&lio->lis_active);
         lov_io_slice_init(lio, lov, io);
         if (io->ci_result == 0) {
-                LASSERT(lov_r0(lov)->lo_lsm != NULL);
                 io->ci_result = lov_io_subio_init(env, lio, io);
                 if (io->ci_result == 0)
                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
@@ -924,13 +926,14 @@ int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                       struct cl_io *io)
 {
-        struct lov_io *lio = lov_env_io(env);
-        int result;
+       struct lov_io *lio = lov_env_io(env);
+       int result;
+       ENTRY;
 
-        ENTRY;
-        switch (io->ci_type) {
-        default:
-                LBUG();
+       lio->lis_lsm = NULL;
+       switch (io->ci_type) {
+       default:
+               LBUG();
        case CIT_FSYNC:
         case CIT_MISC:
         case CIT_READ:
index bb29f48..8ba1e1a 100644 (file)
@@ -130,19 +130,21 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
 int lov_merge_lvb(struct obd_export *exp,
                   struct lov_stripe_md *lsm, struct ost_lvb *lvb, int kms_only)
 {
-        int   rc;
-        __u64 kms;
-
-        ENTRY;
-        rc = lov_merge_lvb_kms(lsm, lvb, &kms);
-        if (kms_only)
-                lvb->lvb_size = kms;
+       int   rc;
+       __u64 kms;
+
+       ENTRY;
+       lov_stripe_lock(lsm);
+       rc = lov_merge_lvb_kms(lsm, lvb, &kms);
+       lov_stripe_unlock(lsm);
+       if (kms_only)
+               lvb->lvb_size = kms;
        CDEBUG(D_INODE, "merged for FID "DFID" s="LPU64" m="LPU64" a="LPU64
               " c="LPU64" b="LPU64"\n",
               lsm->lsm_object_seq, (__u32)lsm->lsm_object_id,
               (__u32)(lsm->lsm_object_id >> 32), lvb->lvb_size, lvb->lvb_mtime,
               lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /* Must be called under the lov_stripe_lock() */
index f826c33..effb91c 100644 (file)
@@ -2768,22 +2768,6 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
         RETURN(rc);
 }
 
-int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
-{
-        int i, rc = 0;
-        ENTRY;
-
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
-                if (loi->loi_ar.ar_rc && !rc)
-                        rc = loi->loi_ar.ar_rc;
-                loi->loi_ar.ar_rc = 0;
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(lov_test_and_clear_async_rc);
-
-
 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
                            int cmd, __u64 *offset)
 {
index 40ca89b..33a47ed 100644 (file)
  * Implementation of cl_object for LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include "lov_cl_internal.h"
+#include <lustre_debug.h>
 
 /** \addtogroup lov
  *  @{
@@ -179,8 +181,15 @@ static int lov_init_raid0(const struct lu_env *env,
         struct lov_layout_raid0 *r0      = &state->raid0;
 
         ENTRY;
-        r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
-        r0->lo_lsm = conf->u.coc_md->lsm;
+
+       if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
+               dump_lsm(D_ERROR, lsm);
+               LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
+                        LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+       }
+
+       r0->lo_lsm = lsm_addref(lsm);
+       r0->lo_nr  = lsm->lsm_stripe_count;
         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
@@ -268,10 +277,17 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                              union lov_layout_state *state)
 {
-        struct lov_layout_raid0 *r0 = &state->raid0;
-        int                      i;
+       struct lov_layout_raid0 *r0 = &state->raid0;
+       struct lov_stripe_md    *lsm = r0->lo_lsm;
+       struct l_wait_info       lwi = { 0 };
+       int                      i;
+
+       ENTRY;
+
+       /* wait until there is no extra users. */
+       dump_lsm(D_INODE, lsm);
+       l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
 
-        ENTRY;
         if (r0->lo_sub != NULL) {
                 for (i = 0; i < r0->lo_nr; ++i) {
                         struct lovsub_object *los = r0->lo_sub[i];
@@ -299,11 +315,16 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
         struct lov_layout_raid0 *r0 = &state->raid0;
 
         ENTRY;
+
         if (r0->lo_sub != NULL) {
                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
                 r0->lo_sub = NULL;
         }
-        EXIT;
+
+       LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
+       lov_free_memmd(&r0->lo_lsm);
+
+       EXIT;
 }
 
 static int lov_print_empty(const struct lu_env *env, void *cookie,
@@ -499,6 +520,7 @@ static int lov_layout_change(const struct lu_env *env,
                 cl_env_put(nested, &refcheck);
                 cl_env_reexit(cookie);
 
+               old_ops->llo_delete(env, obj, &obj->u);
                 old_ops->llo_fini(env, obj, &obj->u);
                 LASSERT(cfs_list_empty(&hdr->coh_locks));
                 LASSERT(hdr->coh_tree.rnode == NULL);
@@ -529,6 +551,7 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 
         ENTRY;
         cfs_init_rwsem(&lov->lo_type_guard);
+       cfs_waitq_init(&lov->lo_waitq);
 
         /* no locking is necessary, as object is being created */
         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
@@ -544,8 +567,9 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                         const struct cl_object_conf *conf)
 {
-        struct lov_object *lov = cl2lov(obj);
-        int result;
+       struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+       struct lov_object *lov = cl2lov(obj);
+       int result = 0;
 
         ENTRY;
         /*
@@ -555,13 +579,21 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
         cfs_down_write(&lov->lo_type_guard);
         LASSERT(lov->lo_owner == NULL);
         lov->lo_owner = cfs_current();
-        if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
-                result = lov_layout_change(env, lov, LLT_RAID0, conf);
-        else
-                result = -EOPNOTSUPP;
-        lov->lo_owner = NULL;
-        cfs_up_write(&lov->lo_type_guard);
-        RETURN(result);
+       switch (lov->lo_type) {
+       case LLT_EMPTY:
+               if (lsm != NULL)
+                       result = lov_layout_change(env, lov, LLT_RAID0, conf);
+               break;
+       case LLT_RAID0:
+               if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
+                       result = -EOPNOTSUPP;
+               break;
+       default:
+               LBUG();
+       }
+       lov->lo_owner = NULL;
+       cfs_up_write(&lov->lo_type_guard);
+       RETURN(result);
 }
 
 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
@@ -692,4 +724,99 @@ struct lu_object *lov_object_alloc(const struct lu_env *env,
         RETURN(obj);
 }
 
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
+{
+       struct lov_stripe_md *lsm = NULL;
+
+       cfs_down_read(&lov->lo_type_guard);
+       switch (lov->lo_type) {
+       case LLT_RAID0:
+               lsm = lsm_addref(lov->u.raid0.lo_lsm);
+       case LLT_EMPTY:
+               break;
+       default:
+               LBUG();
+       }
+       cfs_up_read(&lov->lo_type_guard);
+       return lsm;
+}
+
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
+{
+       if (lsm == NULL)
+               return;
+
+       lov_free_memmd(&lsm);
+       if (lov->lo_owner != NULL)
+               cfs_waitq_signal(&lov->lo_waitq);
+}
+
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
+{
+       struct lu_object *luobj;
+       struct lov_stripe_md *lsm = NULL;
+
+       if (clobj == NULL)
+               return NULL;
+
+       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+                                &lov_device_type);
+       if (luobj != NULL)
+               lsm = lov_lsm_addref(lu2lov(luobj));
+       return lsm;
+}
+EXPORT_SYMBOL(lov_lsm_get);
+
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+{
+       struct lu_object *luobj;
+
+       if (clobj == NULL || lsm == NULL)
+               return;
+
+       luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+                                &lov_device_type);
+       LASSERT(luobj != NULL);
+
+       lov_lsm_decref(lu2lov(luobj), lsm);
+}
+EXPORT_SYMBOL(lov_lsm_put);
+
+int lov_read_and_clear_async_rc(struct cl_object *clob)
+{
+       struct lu_object *luobj;
+       int rc = 0;
+       ENTRY;
+
+       luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
+                                &lov_device_type);
+       if (luobj != NULL) {
+               struct lov_object *lov = lu2lov(luobj);
+
+               cfs_down_read(&lov->lo_type_guard);
+               switch (lov->lo_type) {
+               case LLT_RAID0: {
+                       struct lov_stripe_md *lsm;
+                       int i;
+
+                       lsm = lov->u.raid0.lo_lsm;
+                       LASSERT(lsm != NULL);
+                       for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                               struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+                               if (loi->loi_ar.ar_rc && !rc)
+                                       rc = loi->loi_ar.ar_rc;
+                               loi->loi_ar.ar_rc = 0;
+                       }
+               }
+               case LLT_EMPTY:
+                       break;
+               default:
+                       LBUG();
+               }
+               cfs_up_read(&lov->lo_type_guard);
+       }
+       RETURN(rc);
+}
+EXPORT_SYMBOL(lov_read_and_clear_async_rc);
+
 /** @} lov */
index 777adde..a44a723 100644 (file)
@@ -315,6 +315,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, __u16 stripe_count,
                 RETURN(-ENOMEM);
         }
 
+       cfs_atomic_set(&(*lsmp)->lsm_refc, 1);
         cfs_spin_lock_init(&(*lsmp)->lsm_lock);
         (*lsmp)->lsm_magic = magic;
         (*lsmp)->lsm_stripe_count = stripe_count;
@@ -330,14 +331,18 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, __u16 stripe_count,
         RETURN(lsm_size);
 }
 
-void lov_free_memmd(struct lov_stripe_md **lsmp)
+int lov_free_memmd(struct lov_stripe_md **lsmp)
 {
-        struct lov_stripe_md *lsm = *lsmp;
-
-        LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
-        lsm_op_find(lsm->lsm_magic)->lsm_free(lsm);
-
-        *lsmp = NULL;
+       struct lov_stripe_md *lsm = *lsmp;
+       int refc;
+
+       *lsmp = NULL;
+       LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+       if ((refc = cfs_atomic_dec_return(&lsm->lsm_refc)) == 0) {
+               LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
+               lsm_op_find(lsm->lsm_magic)->lsm_free(lsm);
+       }
+       return refc;
 }
 
 
index bb9ec06..bbcc306 100644 (file)
@@ -61,12 +61,13 @@ EXPORT_SYMBOL(dump_lniobuf);
 
 void dump_lsm(int level, struct lov_stripe_md *lsm)
 {
-        CDEBUG(level, "lsm %p, objid "LPX64", maxbytes "LPX64", magic 0x%08X, "
-               "stripe_size %u, stripe_count %u, "
-               "layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
-               lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic,
-               lsm->lsm_stripe_size, lsm->lsm_stripe_count,
-               lsm->lsm_layout_gen, lsm->lsm_pool_name);
+       CDEBUG(level, "lsm %p, objid "LPX64", maxbytes "LPX64", magic 0x%08X, "
+              "stripe_size %u, stripe_count %u, refc: %d, "
+              "layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
+              lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic,
+              lsm->lsm_stripe_size, lsm->lsm_stripe_count,
+              cfs_atomic_read(&lsm->lsm_refc), lsm->lsm_layout_gen,
+              lsm->lsm_pool_name);
 }
 
 #define LPDS sizeof(__u64)
index 9aefcb5..0145f3f 100644 (file)
@@ -526,7 +526,6 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 {
         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
-        struct lov_stripe_md *lsm  = eco->eo_lsm;
         ENTRY;
 
         LASSERT(cfs_atomic_read(&eco->eo_npages) == 0);
@@ -538,10 +537,10 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
         lu_object_fini(obj);
         lu_object_header_fini(obj->lo_header);
 
-        if (lsm)
-                obd_free_memmd(ec->ec_exp, &lsm);
-        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
-        EXIT;
+       if (eco->eo_lsm)
+               obd_free_memmd(ec->ec_exp, &eco->eo_lsm);
+       OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
+       EXIT;
 }
 
 static int echo_object_print(const struct lu_env *env, void *cookie,