Whamcloud - gitweb
LU-9008 pfl: dynamic layout modification with write/truncate
authorBobi Jam <bobijam.xu@intel.com>
Thu, 6 Apr 2017 00:13:41 +0000 (08:13 +0800)
committerJinshan Xiong <jinshan.xiong@intel.com>
Thu, 6 Apr 2017 04:53:00 +0000 (21:53 -0700)
* in lov_init_composite(), skip init sub object without LCME_FL_INIT
  layout component.
* issue layout intent RPC during write/trunc ops when try to write to
  an un-init-ed component (even if at the lock stage).
* After layout intent RPC issued, restart the IO.
* get rid of unused lov_layout_operations::llo_install() interface.
* add an empty mdt_layout_change() interface to handle intent layout
  write RPC.

Reviewed-on: https://review.whamcloud.com/25317

Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Change-Id: I2f79482187d2af2660dd86e55da3f5dc0138e94a
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
24 files changed:
lustre/include/cl_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_sec.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/vvp_io.c
lustre/lov/lov_ea.c
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lov_object.c
lustre/lov/lov_pack.c
lustre/lov/lov_page.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/obdclass/genops.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/sec.c
lustre/tests/sanity-pfl.sh
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_layout.c

index 655eafb..655e6b1 100644 (file)
@@ -1829,6 +1829,11 @@ struct cl_io {
         */
                             ci_ignore_layout:1,
        /**
         */
                             ci_ignore_layout:1,
        /**
+        * Need MDS intervention to complete a write. This usually means the
+        * corresponding component is not initialized for the writing extent.
+        */
+                            ci_need_write_intent:1,
+       /**
         * Check if layout changed after the IO finishes. Mainly for HSM
         * requirement. If IO occurs to openning files, it doesn't need to
         * verify layout because HSM won't release openning files.
         * Check if layout changed after the IO finishes. Mainly for HSM
         * requirement. If IO occurs to openning files, it doesn't need to
         * verify layout because HSM won't release openning files.
index d3f0f75..df22acd 100644 (file)
@@ -3119,22 +3119,22 @@ struct getparent {
 } __attribute__((packed));
 
 enum {
 } __attribute__((packed));
 
 enum {
-        LAYOUT_INTENT_ACCESS    = 0,
-        LAYOUT_INTENT_READ      = 1,
-        LAYOUT_INTENT_WRITE     = 2,
-        LAYOUT_INTENT_GLIMPSE   = 3,
-        LAYOUT_INTENT_TRUNC     = 4,
-        LAYOUT_INTENT_RELEASE   = 5,
-        LAYOUT_INTENT_RESTORE   = 6
+       LAYOUT_INTENT_ACCESS    = 0,    /** generic access */
+       LAYOUT_INTENT_READ      = 1,    /** not used */
+       LAYOUT_INTENT_WRITE     = 2,    /** write file, for comp layout */
+       LAYOUT_INTENT_GLIMPSE   = 3,    /** not used */
+       LAYOUT_INTENT_TRUNC     = 4,    /** truncate file, for comp layout */
+       LAYOUT_INTENT_RELEASE   = 5,    /** reserved for HSM release */
+       LAYOUT_INTENT_RESTORE   = 6,    /** reserved for HSM restore */
 };
 
 /* enqueue layout lock with intent */
 struct layout_intent {
 };
 
 /* enqueue layout lock with intent */
 struct layout_intent {
-       __u32 li_opc; /* intent operation for enqueue, read, write etc */
+       __u32 li_opc;   /* intent operation for enqueue, read, write etc */
        __u32 li_flags;
        __u64 li_start;
        __u64 li_end;
        __u32 li_flags;
        __u64 li_start;
        __u64 li_end;
-};
+} __attribute__((packed));
 
 /**
  * On the wire version of hsm_progress structure.
 
 /**
  * On the wire version of hsm_progress structure.
index 8d49d38..7e6f490 100644 (file)
@@ -63,6 +63,7 @@ struct ptlrpc_sec;
 struct ptlrpc_svc_ctx;
 struct ptlrpc_cli_ctx;
 struct ptlrpc_ctx_ops;
 struct ptlrpc_svc_ctx;
 struct ptlrpc_cli_ctx;
 struct ptlrpc_ctx_ops;
+struct req_msg_field;
 
 /**
  * \addtogroup flavor flavor
 
 /**
  * \addtogroup flavor flavor
@@ -1084,7 +1085,8 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
-                               int segment, int newsize);
+                              const struct req_msg_field *field,
+                              int newsize);
 int  sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
                                     struct ptlrpc_request **req_ret);
 void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req);
 int  sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
                                     struct ptlrpc_request **req_ret);
 void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req);
index c3b8cef..68ac435 100644 (file)
@@ -1191,7 +1191,6 @@ restart:
                if (count > 0 && args->via_io_subtype == IO_NORMAL)
                        args->u.normal.via_iter = vio->vui_iter;
        }
                if (count > 0 && args->via_io_subtype == IO_NORMAL)
                        args->u.normal.via_iter = vio->vui_iter;
        }
-       GOTO(out, rc);
 out:
        cl_io_fini(env, io);
 
 out:
        cl_io_fini(env, io);
 
@@ -1226,7 +1225,7 @@ out:
 
        CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
 
 
        CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
 
-       return result > 0 ? result : rc;
+       RETURN(result > 0 ? result : rc);
 }
 
 /**
 }
 
 /**
@@ -4121,9 +4120,9 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode,
        lock_res_and_lock(lock);
        lvb_ready = ldlm_is_lvb_ready(lock);
        unlock_res_and_lock(lock);
        lock_res_and_lock(lock);
        lvb_ready = ldlm_is_lvb_ready(lock);
        unlock_res_and_lock(lock);
+
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
-
        if (lvb_ready)
                GOTO(out, rc = 0);
 
        if (lvb_ready)
                GOTO(out, rc = 0);
 
@@ -4148,7 +4147,6 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode,
        /* refresh layout failed, need to wait */
        wait_layout = rc == -EBUSY;
        EXIT;
        /* refresh layout failed, need to wait */
        wait_layout = rc == -EBUSY;
        EXIT;
-
 out:
        LDLM_LOCK_PUT(lock);
        ldlm_lock_decref(lockh, mode);
 out:
        LDLM_LOCK_PUT(lock);
        ldlm_lock_decref(lockh, mode);
@@ -4173,39 +4171,37 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int ll_layout_refresh_locked(struct inode *inode)
+/**
+ * Issue layout intent RPC to MDS.
+ * \param inode [in]   file inode
+ * \param intent [in]  layout intent
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+static int ll_layout_intent(struct inode *inode, struct layout_intent *intent)
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
        struct md_op_data     *op_data;
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
        struct md_op_data     *op_data;
-       struct lookup_intent    it;
-       struct lustre_handle    lockh;
-       enum ldlm_mode          mode;
+       struct lookup_intent it;
        struct ptlrpc_request *req;
        int rc;
        ENTRY;
 
        struct ptlrpc_request *req;
        int rc;
        ENTRY;
 
-again:
-       /* mostly layout lock is caching on the local side, so try to match
-        * it before grabbing layout lock mutex. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
-                              LCK_CR | LCK_CW | LCK_PR | LCK_PW);
-       if (mode != 0) { /* hit cached lock */
-               rc = ll_layout_lock_set(&lockh, mode, inode);
-               if (rc == -EAGAIN)
-                       goto again;
-
-               RETURN(rc);
-       }
-
        op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
                                     0, 0, LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
 
        op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
                                     0, 0, LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
 
-       /* have to enqueue one */
+       op_data->op_data = intent;
+       op_data->op_data_size = sizeof(*intent);
+
        memset(&it, 0, sizeof(it));
        it.it_op = IT_LAYOUT;
        memset(&it, 0, sizeof(it));
        it.it_op = IT_LAYOUT;
+       if (intent->li_opc == LAYOUT_INTENT_WRITE ||
+           intent->li_opc == LAYOUT_INTENT_TRUNC)
+               it.it_flags = FMODE_WRITE;
 
        LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)",
                          ll_get_fsname(inode->i_sb, NULL, 0),
 
        LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)",
                          ll_get_fsname(inode->i_sb, NULL, 0),
@@ -4219,18 +4215,11 @@ again:
 
        ll_finish_md_op_data(op_data);
 
 
        ll_finish_md_op_data(op_data);
 
-       mode = it.it_lock_mode;
-       it.it_lock_mode = 0;
-       ll_intent_drop_lock(&it);
-
-       if (rc == 0) {
-               /* set lock data in case this is a new lock */
+       /* set lock data in case this is a new lock */
+       if (!rc)
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-               lockh.cookie = it.it_lock_handle;
-               rc = ll_layout_lock_set(&lockh, mode, inode);
-               if (rc == -EAGAIN)
-                       goto again;
-       }
+
+       ll_intent_drop_lock(&it);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -4252,6 +4241,11 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 {
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct ll_sb_info       *sbi = ll_i2sbi(inode);
 {
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct ll_sb_info       *sbi = ll_i2sbi(inode);
+       struct lustre_handle lockh;
+       struct layout_intent intent = {
+               .li_opc = LAYOUT_INTENT_ACCESS,
+       };
+       enum ldlm_mode mode;
        int rc;
        ENTRY;
 
        int rc;
        ENTRY;
 
@@ -4266,18 +4260,57 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        /* take layout lock mutex to enqueue layout lock exclusively. */
        mutex_lock(&lli->lli_layout_mutex);
 
        /* take layout lock mutex to enqueue layout lock exclusively. */
        mutex_lock(&lli->lli_layout_mutex);
 
-       rc = ll_layout_refresh_locked(inode);
-       if (rc < 0)
-               GOTO(out, rc);
+       while (1) {
+               /* mostly layout lock is caching on the local side, so try to
+                * match it before grabbing layout lock mutex. */
+               mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
+                                      LCK_CR | LCK_CW | LCK_PR | LCK_PW);
+               if (mode != 0) { /* hit cached lock */
+                       rc = ll_layout_lock_set(&lockh, mode, inode);
+                       if (rc == -EAGAIN)
+                               continue;
+                       break;
+               }
 
 
-       *gen = ll_layout_version_get(lli);
-out:
+               rc = ll_layout_intent(inode, &intent);
+               if (rc != 0)
+                       break;
+       }
+
+       if (rc == 0)
+               *gen = ll_layout_version_get(lli);
        mutex_unlock(&lli->lli_layout_mutex);
 
        RETURN(rc);
 }
 
 /**
        mutex_unlock(&lli->lli_layout_mutex);
 
        RETURN(rc);
 }
 
 /**
+ * Issue layout intent RPC indicating where in a file an IO is about to write.
+ *
+ * \param[in] inode    file inode.
+ * \param[in] start    start offset of fille in bytes where an IO is about to
+ *                     write.
+ * \param[in] end      exclusive end offset in bytes of the write range.
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end)
+{
+       struct layout_intent intent = {
+               .li_opc = LAYOUT_INTENT_WRITE,
+               .li_start = start,
+               .li_end = end,
+       };
+       int rc;
+       ENTRY;
+
+       rc = ll_layout_intent(inode, &intent);
+
+       RETURN(rc);
+}
+
+/**
  *  This function send a restore request to the MDT
  */
 int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
  *  This function send a restore request to the MDT
  */
 int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
index 854c6cb..8426ad2 100644 (file)
@@ -1419,6 +1419,7 @@ static inline void d_lustre_revalidate(struct dentry *dentry)
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
 int ll_layout_refresh(struct inode *inode, __u32 *gen);
 int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
+int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end);
 
 int ll_xattr_init(void);
 void ll_xattr_fini(void);
 
 int ll_xattr_init(void);
 void ll_xattr_fini(void);
index 50ca3f1..50afff6 100644 (file)
@@ -298,18 +298,18 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        struct cl_object *obj = io->ci_obj;
        struct vvp_io    *vio = cl2vvp_io(env, ios);
        struct inode     *inode = vvp_object_inode(obj);
        struct cl_object *obj = io->ci_obj;
        struct vvp_io    *vio = cl2vvp_io(env, ios);
        struct inode     *inode = vvp_object_inode(obj);
+       int rc;
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
        CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d "
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
        CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d "
-                          "restore needed %d\n",
+                          "need write layout %d, restore needed %d\n",
               PFID(lu_object_fid(&obj->co_lu)),
               io->ci_ignore_layout, io->ci_verify_layout,
               PFID(lu_object_fid(&obj->co_lu)),
               io->ci_ignore_layout, io->ci_verify_layout,
-              vio->vui_layout_gen, io->ci_restore_needed);
+              vio->vui_layout_gen, io->ci_need_write_intent,
+              io->ci_restore_needed);
 
        if (io->ci_restore_needed) {
 
        if (io->ci_restore_needed) {
-               int     rc;
-
                /* file was detected release, we need to restore it
                 * before finishing the io
                 */
                /* file was detected release, we need to restore it
                 * before finishing the io
                 */
@@ -334,6 +334,31 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                }
        }
 
                }
        }
 
+       /**
+        * dynamic layout change needed, send layout intent
+        * RPC.
+        */
+       if (io->ci_need_write_intent) {
+               loff_t start = 0;
+               loff_t end = 0;
+
+               LASSERT(io->ci_type == CIT_WRITE || cl_io_is_trunc(io));
+
+               io->ci_need_write_intent = 0;
+
+               if (io->ci_type == CIT_WRITE) {
+                       start = io->u.ci_rw.crw_pos;
+                       end = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+               } else {
+                       end = io->u.ci_setattr.sa_attr.lvb_size;
+               }
+
+               rc = ll_layout_write_intent(inode, start, end);
+               io->ci_result = rc;
+               if (!rc)
+                       io->ci_need_restart = 1;
+       }
+
        if (!io->ci_ignore_layout && io->ci_verify_layout) {
                __u32 gen = 0;
 
        if (!io->ci_ignore_layout && io->ci_verify_layout) {
                __u32 gen = 0;
 
index 4f8271e..04624a3 100644 (file)
@@ -115,6 +115,9 @@ static void lsme_free(struct lov_stripe_md_entry *lsme)
        unsigned int i;
        size_t lsme_size;
 
        unsigned int i;
        size_t lsme_size;
 
+       if (!lsme_inited(lsme) ||
+           lsme->lsme_pattern & LOV_PATTERN_F_RELEASED)
+               stripe_count = 0;
        for (i = 0; i < stripe_count; i++)
                OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab);
 
        for (i = 0; i < stripe_count; i++)
                OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab);
 
@@ -142,7 +145,7 @@ void lsm_free(struct lov_stripe_md *lsm)
  */
 static struct lov_stripe_md_entry *
 lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
  */
 static struct lov_stripe_md_entry *
 lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
-           const char *pool_name, struct lov_ost_data_v1 *objects,
+           const char *pool_name, bool inited, struct lov_ost_data_v1 *objects,
            loff_t *maxbytes)
 {
        struct lov_stripe_md_entry *lsme;
            loff_t *maxbytes)
 {
        struct lov_stripe_md_entry *lsme;
@@ -160,7 +163,7 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
                RETURN(ERR_PTR(-EINVAL));
 
        pattern = le32_to_cpu(lmm->lmm_pattern);
                RETURN(ERR_PTR(-EINVAL));
 
        pattern = le32_to_cpu(lmm->lmm_pattern);
-       if (pattern & LOV_PATTERN_F_RELEASED)
+       if (pattern & LOV_PATTERN_F_RELEASED || !inited)
                stripe_count = 0;
        else
                stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
                stripe_count = 0;
        else
                stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
@@ -186,8 +189,10 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
 
        lsme->lsme_magic = magic;
        lsme->lsme_pattern = pattern;
 
        lsme->lsme_magic = magic;
        lsme->lsme_pattern = pattern;
+       lsme->lsme_flags = 0;
        lsme->lsme_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
        lsme->lsme_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
-       lsme->lsme_stripe_count = stripe_count;
+       /* preserve the possible -1 stripe count for uninstantiated component */
+       lsme->lsme_stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
        lsme->lsme_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
 
        if (pool_name != NULL) {
        lsme->lsme_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
 
        if (pool_name != NULL) {
@@ -278,10 +283,12 @@ lsm_unpackmd_v1v3(struct lov_obd *lov,
 
        pattern = le32_to_cpu(lmm->lmm_pattern);
 
 
        pattern = le32_to_cpu(lmm->lmm_pattern);
 
-       lsme = lsme_unpack(lov, lmm, buf_size, pool_name, objects, &maxbytes);
+       lsme = lsme_unpack(lov, lmm, buf_size, pool_name, true, objects,
+                          &maxbytes);
        if (IS_ERR(lsme))
                RETURN(ERR_CAST(lsme));
 
        if (IS_ERR(lsme))
                RETURN(ERR_CAST(lsme));
 
+       lsme->lsme_flags = LCME_FL_INIT;
        lsme->lsme_extent.e_start = 0;
        lsme->lsme_extent.e_end = LUSTRE_EOF;
 
        lsme->lsme_extent.e_start = 0;
        lsme->lsme_extent.e_end = LUSTRE_EOF;
 
@@ -371,7 +378,7 @@ static int lsm_verify_comp_md_v1(struct lov_comp_md_v1 *lcm,
 
 static struct lov_stripe_md_entry *
 lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
 
 static struct lov_stripe_md_entry *
 lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
-                size_t lmm_buf_size, loff_t *maxbytes)
+                size_t lmm_buf_size, bool inited, loff_t *maxbytes)
 {
        unsigned int magic;
        unsigned int stripe_count;
 {
        unsigned int magic;
        unsigned int stripe_count;
@@ -379,6 +386,9 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
        stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
        if (stripe_count == 0)
                RETURN(ERR_PTR(-EINVAL));
        stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
        if (stripe_count == 0)
                RETURN(ERR_PTR(-EINVAL));
+       /* un-instantiated lmm contains no ost id info, i.e. lov_ost_data_v1 */
+       if (!inited)
+               stripe_count = 0;
 
        magic = le32_to_cpu(lmm->lmm_magic);
        if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
 
        magic = le32_to_cpu(lmm->lmm_magic);
        if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
@@ -389,12 +399,12 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
 
        if (magic == LOV_MAGIC_V1) {
                return lsme_unpack(lov, lmm, lmm_buf_size, NULL,
 
        if (magic == LOV_MAGIC_V1) {
                return lsme_unpack(lov, lmm, lmm_buf_size, NULL,
-                                  lmm->lmm_objects, maxbytes);
+                                  inited, lmm->lmm_objects, maxbytes);
        } else {
                struct lov_mds_md_v3 *lmm3 = (struct lov_mds_md_v3 *)lmm;
 
                return lsme_unpack(lov, lmm, lmm_buf_size, lmm3->lmm_pool_name,
        } else {
                struct lov_mds_md_v3 *lmm3 = (struct lov_mds_md_v3 *)lmm;
 
                return lsme_unpack(lov, lmm, lmm_buf_size, lmm3->lmm_pool_name,
-                                  lmm3->lmm_objects, maxbytes);
+                                  inited, lmm3->lmm_objects, maxbytes);
        }
 }
 
        }
 }
 
@@ -440,6 +450,8 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
                blob = (char *)lcm + blob_offset;
 
                lsme = lsme_unpack_comp(lov, blob, blob_size,
                blob = (char *)lcm + blob_offset;
 
                lsme = lsme_unpack_comp(lov, blob, blob_size,
+                                       le32_to_cpu(lcme->lcme_flags) &
+                                       LCME_FL_INIT,
                                        (i == entry_count - 1) ? &maxbytes :
                                                                 NULL);
                if (IS_ERR(lsme))
                                        (i == entry_count - 1) ? &maxbytes :
                                                                 NULL);
                if (IS_ERR(lsme))
@@ -450,6 +462,7 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
 
                lsm->lsm_entries[i] = lsme;
                lsme->lsme_id = le32_to_cpu(lcme->lcme_id);
 
                lsm->lsm_entries[i] = lsme;
                lsme->lsme_id = le32_to_cpu(lcme->lcme_id);
+               lsme->lsme_flags = le32_to_cpu(lcme->lcme_flags);
                lu_extent_le_to_cpu(&lsme->lsme_extent, &lcme->lcme_extent);
 
                if (i == entry_count - 1) {
                lu_extent_le_to_cpu(&lsme->lsme_extent, &lcme->lcme_extent);
 
                if (i == entry_count - 1) {
@@ -482,7 +495,7 @@ const struct lsm_operations lsm_comp_md_v1_ops = {
 
 void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
 {
 
 void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
 {
-       int i;
+       int i, j;
 
        CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, "
               "refc: %d, entry: %u, layout_gen %u\n",
 
        CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, "
               "refc: %d, entry: %u, layout_gen %u\n",
@@ -493,12 +506,25 @@ void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
 
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
 
-               CDEBUG(level,
-                      DEXT ": id: %u, magic 0x%08X, stripe count %u, "
-                      "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n",
-                      PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic,
+               CDEBUG(level, DEXT ": id: %u, flags: %x, "
+                      "magic 0x%08X, layout_gen %u, "
+                      "stripe count %u, sstripe size %u, "
+                      "pool: ["LOV_POOLNAMEF"]\n",
+                      PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_flags,
+                      lse->lsme_magic, lse->lsme_layout_gen,
                       lse->lsme_stripe_count, lse->lsme_stripe_size,
                       lse->lsme_stripe_count, lse->lsme_stripe_size,
-                      lse->lsme_layout_gen, lse->lsme_pool_name);
+                      lse->lsme_pool_name);
+               if (!lsme_inited(lse) ||
+                   lse->lsme_pattern & LOV_PATTERN_F_RELEASED)
+                       break;
+               for (j = 0; j < lse->lsme_stripe_count; j++) {
+                       CDEBUG(level, "   oinfo:%p: ostid: "DOSTID
+                              " ost idx: %d gen: %d\n",
+                              lse->lsme_oinfo[j],
+                              POSTID(&lse->lsme_oinfo[j]->loi_oi),
+                              lse->lsme_oinfo[j]->loi_ost_idx,
+                              lse->lsme_oinfo[j]->loi_ost_gen);
+               }
        }
 }
 
        }
 }
 
index b6a21c4..548fd50 100644 (file)
@@ -45,6 +45,7 @@ struct lov_stripe_md_entry {
        struct lu_extent        lsme_extent;
        u32                     lsme_id;
        u32                     lsme_magic;
        struct lu_extent        lsme_extent;
        u32                     lsme_id;
        u32                     lsme_magic;
+       u32                     lsme_flags;
        u32                     lsme_pattern;
        u32                     lsme_stripe_size;
        u16                     lsme_stripe_count;
        u32                     lsme_pattern;
        u32                     lsme_stripe_size;
        u16                     lsme_stripe_count;
@@ -53,6 +54,16 @@ struct lov_stripe_md_entry {
        struct lov_oinfo       *lsme_oinfo[];
 };
 
        struct lov_oinfo       *lsme_oinfo[];
 };
 
+static inline void copy_lsm_entry(struct lov_stripe_md_entry *dst,
+                                 struct lov_stripe_md_entry *src)
+{
+       unsigned i;
+
+       for (i = 0; i < src->lsme_stripe_count; i++)
+               *dst->lsme_oinfo[i] = *src->lsme_oinfo[i];
+       memcpy(dst, src, offsetof(typeof(*src), lsme_oinfo));
+}
+
 struct lov_stripe_md {
        atomic_t        lsm_refc;
        spinlock_t      lsm_lock;
 struct lov_stripe_md {
        atomic_t        lsm_refc;
        spinlock_t      lsm_lock;
@@ -328,4 +339,14 @@ static inline void lov_lsm2layout(struct lov_stripe_md *lsm,
                ol->ol_comp_id = 0;
        }
 }
                ol->ol_comp_id = 0;
        }
 }
+
+static inline bool lsme_inited(const struct lov_stripe_md_entry *lsme)
+{
+       return lsme->lsme_flags & LCME_FL_INIT;
+}
+
+static inline bool lsm_entry_inited(const struct lov_stripe_md *lsm, int index)
+{
+       return lsme_inited(lsm->lsm_entries[index]);
+}
 #endif
 #endif
index 2e21bfb..aa32f58 100644 (file)
@@ -404,6 +404,11 @@ static int lov_io_iter_init(const struct lu_env *env,
                u64 end;
                int stripe;
 
                u64 end;
                int stripe;
 
+               CDEBUG(D_VFSTRACE, "component[%d] flags %#x\n",
+                      index, lsm->lsm_entries[index]->lsme_flags);
+               if (!lsm_entry_inited(lsm, index))
+                       break;
+
                index++;
                if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
                        continue;
                index++;
                if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
                        continue;
@@ -453,6 +458,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct cl_io         *io  = ios->cis_io;
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct cl_io         *io  = ios->cis_io;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
        struct lov_stripe_md_entry *lse;
        loff_t start = io->u.ci_rw.crw_pos;
        loff_t next;
        struct lov_stripe_md_entry *lse;
        loff_t start = io->u.ci_rw.crw_pos;
        loff_t next;
@@ -465,7 +471,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        if (cl_io_is_append(io))
                RETURN(lov_io_iter_init(env, ios));
 
        if (cl_io_is_append(io))
                RETURN(lov_io_iter_init(env, ios));
 
-       index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos);
+       index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos);
        if (index < 0) { /* non-existing layout component */
                if (io->ci_type == CIT_READ) {
                        /* TODO: it needs to detect the next component and
        if (index < 0) { /* non-existing layout component */
                if (io->ci_type == CIT_READ) {
                        /* TODO: it needs to detect the next component and
@@ -486,7 +492,9 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        if (next <= start * ssize)
                next = ~0ull;
 
        if (next <= start * ssize)
                next = ~0ull;
 
-       LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start);
+       LASSERTF(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start,
+                "pos %lld, [%lld, %lld)\n", io->u.ci_rw.crw_pos,
+                lse->lsme_extent.e_start, lse->lsme_extent.e_end);
        next = min_t(__u64, next, lse->lsme_extent.e_end);
        next = min_t(loff_t, next, lio->lis_io_endpos);
 
        next = min_t(__u64, next, lse->lsme_extent.e_end);
        next = min_t(loff_t, next, lio->lis_io_endpos);
 
@@ -499,6 +507,12 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
               (__u64)start, lio->lis_pos, lio->lis_endpos,
               (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
 
               (__u64)start, lio->lis_pos, lio->lis_endpos,
               (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
 
+       index = lov_lsm_entry(lsm, lio->lis_endpos - 1);
+       if (index > 0 && !lsm_entry_inited(lsm, index)) {
+               io->ci_need_write_intent = 1;
+               RETURN(io->ci_result = -ENODATA);
+       }
+
        /*
         * XXX The following call should be optimized: we know, that
         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
        /*
         * XXX The following call should be optimized: we know, that
         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
@@ -506,6 +520,26 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        RETURN(lov_io_iter_init(env, ios));
 }
 
        RETURN(lov_io_iter_init(env, ios));
 }
 
+static int lov_io_setattr_iter_init(const struct lu_env *env,
+                                   const struct cl_io_slice *ios)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct cl_io *io = ios->cis_io;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       int index;
+       ENTRY;
+
+       if (cl_io_is_trunc(io) && lio->lis_pos) {
+               index = lov_lsm_entry(lsm, lio->lis_pos - 1);
+               if (index > 0 && !lsm_entry_inited(lsm, index)) {
+                       io->ci_need_write_intent = 1;
+                       RETURN(io->ci_result = -ENODATA);
+               }
+       }
+
+       RETURN(lov_io_iter_init(env, ios));
+}
+
 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
                       int (*iofunc)(const struct lu_env *, struct cl_io *))
 {
 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
                       int (*iofunc)(const struct lu_env *, struct cl_io *))
 {
@@ -638,7 +672,7 @@ static int lov_io_read_ahead(const struct lu_env *env,
 
        offset = cl_offset(obj, start);
        index = lov_lsm_entry(loo->lo_lsm, offset);
 
        offset = cl_offset(obj, start);
        index = lov_lsm_entry(loo->lo_lsm, offset);
-       if (index < 0)
+       if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
                RETURN(-ENODATA);
 
        stripe = lov_stripe_number(loo->lo_lsm, index, offset);
                RETURN(-ENODATA);
 
        stripe = lov_stripe_number(loo->lo_lsm, index, offset);
@@ -893,15 +927,15 @@ static const struct cl_io_operations lov_io_ops = {
                         .cio_start     = lov_io_start,
                         .cio_end       = lov_io_end
                 },
                         .cio_start     = lov_io_start,
                         .cio_end       = lov_io_end
                 },
-                [CIT_SETATTR] = {
-                        .cio_fini      = lov_io_fini,
-                        .cio_iter_init = lov_io_iter_init,
-                        .cio_iter_fini = lov_io_iter_fini,
-                        .cio_lock      = lov_io_lock,
-                        .cio_unlock    = lov_io_unlock,
-                        .cio_start     = lov_io_start,
-                        .cio_end       = lov_io_end
-                },
+               [CIT_SETATTR] = {
+                       .cio_fini      = lov_io_fini,
+                       .cio_iter_init = lov_io_setattr_iter_init,
+                       .cio_iter_fini = lov_io_iter_fini,
+                       .cio_lock      = lov_io_lock,
+                       .cio_unlock    = lov_io_unlock,
+                       .cio_start     = lov_io_start,
+                       .cio_end       = lov_io_end
+               },
                [CIT_DATA_VERSION] = {
                        .cio_fini       = lov_io_fini,
                        .cio_iter_init  = lov_io_iter_init,
                [CIT_DATA_VERSION] = {
                        .cio_fini       = lov_io_fini,
                        .cio_iter_init  = lov_io_iter_init,
index 9c4855c..efa4cc1 100644 (file)
@@ -134,7 +134,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
 
        nr = 0;
        for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
 
        nr = 0;
        for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
-            index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) {
+            index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
                /* assume lsm entries are sorted. */
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
                /* assume lsm entries are sorted. */
@@ -149,8 +149,11 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
                                nr++;
                }
        }
                                nr++;
                }
        }
-       if (nr == 0)
-               RETURN(ERR_PTR(-EINVAL));
+       /**
+        * Aggressive lock request (from cl_setattr_ost) which asks for
+        * [eof, -1) lock, could come across uninstantiated layout extent,
+        * hence a 0 nr is possible.
+        */
 
        OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr]));
        if (lovlck == NULL)
 
        OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr]));
        if (lovlck == NULL)
@@ -159,7 +162,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
        lovlck->lls_nr = nr;
        nr = 0;
        for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
        lovlck->lls_nr = nr;
        nr = 0;
        for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
-            index < lov->lo_lsm->lsm_entry_count; index++) {
+            index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
                /* assume lsm entries are sorted. */
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
                /* assume lsm entries are sorted. */
index da3fc21..1d6c8d5 100644 (file)
@@ -63,8 +63,6 @@ struct lov_layout_operations {
                            union lov_layout_state *state);
         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
                          union lov_layout_state *state);
                            union lov_layout_state *state);
         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
                          union lov_layout_state *state);
-        void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
-                            union lov_layout_state *state);
         int  (*llo_print)(const struct lu_env *env, void *cookie,
                           lu_printer_t p, const struct lu_object *o);
         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
         int  (*llo_print)(const struct lu_env *env, void *cookie,
                           lu_printer_t p, const struct lu_object *o);
         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
@@ -91,16 +89,6 @@ static void lov_lsm_put(struct lov_stripe_md *lsm)
  * Lov object layout operations.
  *
  */
  * Lov object layout operations.
  *
  */
-
-static void lov_install_empty(const struct lu_env *env,
-                              struct lov_object *lov,
-                              union  lov_layout_state *state)
-{
-        /*
-         * File without objects.
-         */
-}
-
 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
                          struct lov_object *lov, struct lov_stripe_md *lsm,
                          const struct cl_object_conf *conf,
 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
                          struct lov_object *lov, struct lov_stripe_md *lsm,
                          const struct cl_object_conf *conf,
@@ -109,12 +97,6 @@ static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
        return 0;
 }
 
        return 0;
 }
 
-static void lov_install_composite(const struct lu_env *env,
-                                 struct lov_object *lov,
-                                 union  lov_layout_state *state)
-{
-}
-
 static struct cl_object *lov_sub_find(const struct lu_env *env,
                                       struct cl_device *dev,
                                       const struct lu_fid *fid,
 static struct cl_object *lov_sub_find(const struct lu_env *env,
                                       struct cl_device *dev,
                                       const struct lu_fid *fid,
@@ -322,6 +304,14 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                struct lov_layout_entry *le = &comp->lo_entries[i];
 
                le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
                struct lov_layout_entry *le = &comp->lo_entries[i];
 
                le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+               /**
+                * If the component has not been init-ed on MDS side, for
+                * PFL layout, we'd know that the components beyond this one
+                * will be dynamically init-ed later on file write/trunc ops.
+                */
+               if (!lsm_entry_inited(lsm, i))
+                       break;
+
                result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
                if (result < 0)
                        break;
                result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
                if (result < 0)
                        break;
@@ -572,9 +562,9 @@ static int lov_print_composite(const struct lu_env *env, void *cookie,
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
 
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
 
-               (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
+               (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %#x, %u, %u }\n",
                     PEXT(&lse->lsme_extent), lse->lsme_magic,
                     PEXT(&lse->lsme_extent), lse->lsme_magic,
-                    lse->lsme_id, lse->lsme_layout_gen,
+                    lse->lsme_id, lse->lsme_layout_gen, lse->lsme_flags,
                     lse->lsme_stripe_count, lse->lsme_stripe_size);
                lov_print_raid0(env, cookie, p, lov_r0(lov, i));
        }
                     lse->lsme_stripe_count, lse->lsme_stripe_size);
                lov_print_raid0(env, cookie, p, lov_r0(lov, i));
        }
@@ -672,6 +662,10 @@ static int lov_attr_get_composite(const struct lu_env *env,
                struct lov_layout_raid0 *r0 = &entry->lle_raid0;
                struct cl_attr *lov_attr = &r0->lo_attr;
 
                struct lov_layout_raid0 *r0 = &entry->lle_raid0;
                struct cl_attr *lov_attr = &r0->lo_attr;
 
+               /* PFL: This component has not been init-ed. */
+               if (!lsm_entry_inited(lov->lo_lsm, index))
+                       break;
+
                result = lov_attr_get_raid0(env, lov, index, r0);
                if (result != 0)
                        break;
                result = lov_attr_get_raid0(env, lov, index, r0);
                if (result != 0)
                        break;
@@ -699,7 +693,6 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_init      = lov_init_empty,
                 .llo_delete    = lov_delete_empty,
                 .llo_fini      = lov_fini_empty,
                 .llo_init      = lov_init_empty,
                 .llo_delete    = lov_delete_empty,
                 .llo_fini      = lov_fini_empty,
-                .llo_install   = lov_install_empty,
                 .llo_print     = lov_print_empty,
                 .llo_page_init = lov_page_init_empty,
                 .llo_lock_init = lov_lock_init_empty,
                 .llo_print     = lov_print_empty,
                 .llo_page_init = lov_page_init_empty,
                 .llo_lock_init = lov_lock_init_empty,
@@ -710,7 +703,6 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_init      = lov_init_released,
                 .llo_delete    = lov_delete_empty,
                 .llo_fini      = lov_fini_released,
                 .llo_init      = lov_init_released,
                 .llo_delete    = lov_delete_empty,
                 .llo_fini      = lov_fini_released,
-                .llo_install   = lov_install_empty,
                 .llo_print     = lov_print_released,
                 .llo_page_init = lov_page_init_empty,
                 .llo_lock_init = lov_lock_init_empty,
                 .llo_print     = lov_print_released,
                 .llo_page_init = lov_page_init_empty,
                 .llo_lock_init = lov_lock_init_empty,
@@ -721,7 +713,6 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_init      = lov_init_composite,
                .llo_delete    = lov_delete_composite,
                .llo_fini      = lov_fini_composite,
                .llo_init      = lov_init_composite,
                .llo_delete    = lov_delete_composite,
                .llo_fini      = lov_fini_composite,
-               .llo_install   = lov_install_composite,
                .llo_print     = lov_print_composite,
                .llo_page_init = lov_page_init_composite,
                .llo_lock_init = lov_lock_init_composite,
                .llo_print     = lov_print_composite,
                .llo_page_init = lov_page_init_composite,
                .llo_lock_init = lov_lock_init_composite,
@@ -906,7 +897,6 @@ static int lov_layout_change(const struct lu_env *unused,
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       new_ops->llo_install(env, lov, state);
        lov->lo_type = llt;
 
 out:
        lov->lo_type = llt;
 
 out:
@@ -954,8 +944,6 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
        if (rc != 0)
                GOTO(out_lsm, rc);
 
        if (rc != 0)
                GOTO(out_lsm, rc);
 
-       ops->llo_install(env, lov, set);
-
 out_lsm:
        lov_lsm_put(lsm);
 
 out_lsm:
        lov_lsm_put(lsm);
 
@@ -977,6 +965,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                                   conf->u.coc_layout.lb_len);
                if (IS_ERR(lsm))
                        RETURN(PTR_ERR(lsm));
                                   conf->u.coc_layout.lb_len);
                if (IS_ERR(lsm))
                        RETURN(PTR_ERR(lsm));
+               dump_lsm(D_INODE, lsm);
        }
 
        lov_conf_lock(lov);
        }
 
        lov_conf_lock(lov);
@@ -1544,6 +1533,9 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        for (entry = start_entry; entry <= end_entry; entry++) {
                lsme = lsm->lsm_entries[entry];
 
        for (entry = start_entry; entry <= end_entry; entry++) {
                lsme = lsm->lsm_entries[entry];
 
+               if (!lsme_inited(lsme))
+                       break;
+
                if (entry == start_entry)
                        fs.fs_ext.e_start = whole_start;
                else
                if (entry == start_entry)
                        fs.fs_ext.e_start = whole_start;
                else
@@ -1752,6 +1744,9 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                                                lsm->lsm_entries[i];
                                int j;
 
                                                lsm->lsm_entries[i];
                                int j;
 
+                               if (!lsme_inited(lse))
+                                       break;
+
                                for (j = 0; j < lse->lsme_stripe_count; j++) {
                                        struct lov_oinfo *loi =
                                                        lse->lsme_oinfo[j];
                                for (j = 0; j < lse->lsme_stripe_count; j++) {
                                        struct lov_oinfo *loi =
                                                        lse->lsme_oinfo[j];
index db005e5..f79827b 100644 (file)
@@ -169,6 +169,9 @@ ssize_t lov_lsm_pack_v1v3(const struct lov_stripe_md *lsm, void *buf,
                lmm_objects = lmmv1->lmm_objects;
        }
 
                lmm_objects = lmmv1->lmm_objects;
        }
 
+       if (lsm->lsm_is_released)
+               RETURN(lmm_size);
+
        for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
                struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i];
 
        for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
                struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i];
 
@@ -213,11 +216,13 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
        for (entry = 0; entry < lsm->lsm_entry_count; entry++) {
                struct lov_stripe_md_entry *lsme;
                struct lov_mds_md *lmm;
        for (entry = 0; entry < lsm->lsm_entry_count; entry++) {
                struct lov_stripe_md_entry *lsme;
                struct lov_mds_md *lmm;
+               __u16 stripecnt;
 
                lsme = lsm->lsm_entries[entry];
                lcme = &lcmv1->lcm_entries[entry];
 
                lcme->lcme_id = cpu_to_le32(lsme->lsme_id);
 
                lsme = lsm->lsm_entries[entry];
                lcme = &lcmv1->lcm_entries[entry];
 
                lcme->lcme_id = cpu_to_le32(lsme->lsme_id);
+               lcme->lcme_flags = cpu_to_le32(lsme->lsme_flags);
                lcme->lcme_extent.e_start =
                        cpu_to_le64(lsme->lsme_extent.e_start);
                lcme->lcme_extent.e_end =
                lcme->lcme_extent.e_start =
                        cpu_to_le64(lsme->lsme_extent.e_start);
                lcme->lcme_extent.e_end =
@@ -244,7 +249,13 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
                                ((struct lov_mds_md_v1 *)lmm)->lmm_objects;
                }
 
                                ((struct lov_mds_md_v1 *)lmm)->lmm_objects;
                }
 
-               for (i = 0; i < lsme->lsme_stripe_count; i++) {
+               if (lsme_inited(lsme) &&
+                   !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
+                       stripecnt = lsme->lsme_stripe_count;
+               else
+                       stripecnt = 0;
+
+               for (i = 0; i < stripecnt; i++) {
                        struct lov_oinfo *loi = lsme->lsme_oinfo[i];
 
                        ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
                        struct lov_oinfo *loi = lsme->lsme_oinfo[i];
 
                        ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
@@ -254,8 +265,7 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
                                        cpu_to_le32(loi->loi_ost_idx);
                }
 
                                        cpu_to_le32(loi->loi_ost_idx);
                }
 
-               size = lov_mds_md_size(lsme->lsme_stripe_count,
-                                      lsme->lsme_magic);
+               size = lov_mds_md_size(stripecnt, lsme->lsme_magic);
                lcme->lcme_size = cpu_to_le32(size);
                offset += size;
        } /* for each layout component */
                lcme->lcme_size = cpu_to_le32(size);
                offset += size;
        } /* for each layout component */
index ae74d25..19a908f 100644 (file)
@@ -81,7 +81,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
 
        offset = cl_offset(obj, index);
        entry = lov_lsm_entry(loo->lo_lsm, offset);
 
        offset = cl_offset(obj, index);
        entry = lov_lsm_entry(loo->lo_lsm, offset);
-       if (entry < 0) {
+       if (entry < 0 || !lsm_entry_inited(loo->lo_lsm, entry)) {
                /* non-existing layout component */
                lov_page_init_empty(env, obj, page, index);
                RETURN(0);
                /* non-existing layout component */
                lov_page_init_empty(env, obj, page, index);
                RETURN(0);
index e723b55..5ac91ef 100644 (file)
@@ -89,6 +89,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
                            struct list_head *cancels, enum ldlm_mode mode,
                             __u64 bits);
 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
                            struct list_head *cancels, enum ldlm_mode mode,
                             __u64 bits);
+int mdc_save_lovea(struct ptlrpc_request *req,
+                  const struct req_msg_field *field,
+                  void *data, u32 size);
 /* mdc/mdc_request.c */
 int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
                  struct lu_fid *fid, struct md_op_data *op_data);
 /* mdc/mdc_request.c */
 int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
                  struct lu_fid *fid, struct md_op_data *op_data);
index f683f02..35b96ce 100644 (file)
@@ -214,20 +214,32 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
  * original open if the MDS crashed just when this client also OOM'd)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
  * original open if the MDS crashed just when this client also OOM'd)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
-static void mdc_realloc_openmsg(struct ptlrpc_request *req,
-                               struct mdt_body *body)
+int mdc_save_lovea(struct ptlrpc_request *req,
+                  const struct req_msg_field *field,
+                  void *data, u32 size)
 {
 {
-       int     rc;
+       struct req_capsule *pill = &req->rq_pill;
+       void *lmm;
+       int rc = 0;
 
 
-       /* FIXME: remove this explicit offset. */
-       rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
-                                       body->mbo_eadatasize);
-       if (rc) {
-               CERROR("Can't enlarge segment %d size to %d\n",
-                      DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
-               body->mbo_valid &= ~OBD_MD_FLEASIZE;
-               body->mbo_eadatasize = 0;
+       if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
+               rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
+               if (rc) {
+                       CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
+                              req->rq_export->exp_obd->obd_name,
+                              size, rc);
+                       return rc;
+               }
+       } else {
+               req_capsule_shrink(pill, field, size, RCL_CLIENT);
        }
        }
+
+       req_capsule_set_size(pill, field, RCL_CLIENT, size);
+       lmm = req_capsule_client_get(pill, field);
+       if (lmm)
+               memcpy(lmm, data, size);
+
+       return rc;
 }
 
 static struct ptlrpc_request *
 }
 
 static struct ptlrpc_request *
@@ -454,7 +466,7 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
 
 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
                                                     struct lookup_intent *it,
 
 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
                                                     struct lookup_intent *it,
-                                                    struct md_op_data *unused)
+                                                    struct md_op_data *op_data)
 {
        struct obd_device     *obd = class_exp2obd(exp);
        struct ptlrpc_request *req;
 {
        struct obd_device     *obd = class_exp2obd(exp);
        struct ptlrpc_request *req;
@@ -481,9 +493,9 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
 
        /* pack the layout intent request */
        layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
 
        /* pack the layout intent request */
        layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
-       /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
-        * set for replication */
-       layout->li_opc = LAYOUT_INTENT_ACCESS;
+       LASSERT(op_data->op_data != NULL);
+       LASSERT(op_data->op_data_size == sizeof(*layout));
+       memcpy(layout, op_data->op_data, sizeof(*layout));
 
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                             obd->u.cli.cl_default_mds_easize);
 
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                             obd->u.cli.cl_default_mds_easize);
@@ -632,27 +644,16 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                          * (for example error one).
                          */
                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
                          * (for example error one).
                          */
                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
-                                void *lmm;
-                                if (req_capsule_get_size(pill, &RMF_EADATA,
-                                                         RCL_CLIENT) <
-                                   body->mbo_eadatasize)
-                                       mdc_realloc_openmsg(req, body);
-                               else
-                                       req_capsule_shrink(pill, &RMF_EADATA,
-                                                          body->mbo_eadatasize,
-                                                          RCL_CLIENT);
-
-                               req_capsule_set_size(pill, &RMF_EADATA,
-                                                    RCL_CLIENT,
-                                                    body->mbo_eadatasize);
-
-                               lmm = req_capsule_client_get(pill, &RMF_EADATA);
-                               if (lmm)
-                                       memcpy(lmm, eadata,
-                                              body->mbo_eadatasize);
+                               rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
+                                                   body->mbo_eadatasize);
+                               if (rc) {
+                                       body->mbo_valid &= ~OBD_MD_FLEASIZE;
+                                       body->mbo_eadatasize = 0;
+                                       rc = 0;
+                               }
                        }
                }
                        }
                }
-        } else if (it->it_op & IT_LAYOUT) {
+       } else if (it->it_op & IT_LAYOUT) {
                /* maybe the lock was granted right away and layout
                 * is packed into RMF_DLM_LVB of req */
                lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
                /* maybe the lock was granted right away and layout
                 * is packed into RMF_DLM_LVB of req */
                lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
@@ -661,6 +662,15 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                                                        &RMF_DLM_LVB, lvb_len);
                        if (lvb_data == NULL)
                                RETURN(-EPROTO);
                                                        &RMF_DLM_LVB, lvb_len);
                        if (lvb_data == NULL)
                                RETURN(-EPROTO);
+
+                       /**
+                        * save replied layout data to the request buffer for
+                        * recovery consideration (lest MDS reinitialize
+                        * another set of OST objects).
+                        */
+                       if (req->rq_transno)
+                               (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
+                                                    lvb_len);
                }
        }
 
                }
        }
 
@@ -1035,13 +1045,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
                case IT_READDIR:
                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                        break;
                case IT_READDIR:
                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                        break;
-                case IT_LAYOUT:
-                        policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
-                        break;
-                default:
-                        policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
-                        break;
-                }
+               case IT_LAYOUT:
+                       policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
+                       break;
+               default:
+                       policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+                       break;
+               }
 
                mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
                                      LDLM_IBITS, &policy,
 
                mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
                                      LDLM_IBITS, &policy,
index 6f9105e..97404f6 100644 (file)
@@ -384,32 +384,17 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
                        GOTO(out, rc = -EPROTO);
 
                if (body->mbo_valid & OBD_MD_FLEASIZE) {
                        GOTO(out, rc = -EPROTO);
 
                if (body->mbo_valid & OBD_MD_FLEASIZE) {
-                       void *eadata, *lmm;
+                       void *eadata;
 
                        eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
                                                        body->mbo_eadatasize);
                        if (eadata == NULL)
                                GOTO(out, rc = -EPROTO);
 
 
                        eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
                                                        body->mbo_eadatasize);
                        if (eadata == NULL)
                                GOTO(out, rc = -EPROTO);
 
-                       if (req_capsule_get_size(pill, &RMF_EADATA,
-                                                RCL_CLIENT) <
-                                       body->mbo_eadatasize) {
-                               rc = sptlrpc_cli_enlarge_reqbuf(req, 4,
-                                                       body->mbo_eadatasize);
-                               if (rc)
-                                       GOTO(out, rc = -ENOMEM);
-                       } else {
-                               req_capsule_shrink(pill, &RMF_EADATA,
-                                                  body->mbo_eadatasize,
-                                                  RCL_CLIENT);
-                       }
-
-                       req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT,
-                                            body->mbo_eadatasize);
-
-                       lmm = req_capsule_client_get(pill, &RMF_EADATA);
-                       if (lmm)
-                               memcpy(lmm, eadata, body->mbo_eadatasize);
+                       rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
+                                           body->mbo_eadatasize);
+                       if (rc)
+                               GOTO(out, rc);
                }
        }
 out:
                }
        }
 out:
index dd3e494..57b1234 100644 (file)
@@ -1227,6 +1227,24 @@ out:
 }
 
 /**
 }
 
 /**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param info [in]    thread environment
+ * \param obj [in]     object
+ * \param layout [in]  layout intent
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+                            struct mdt_object *obj,
+                            struct layout_intent *layout)
+{
+       /* XXX: to do */
+       return 0;
+}
+
+/**
  * Exchange MOF_LOV_CREATED flags between two objects after a
  * layout swap. No assumption is made on whether o1 or o2 have
  * created objects or not.
  * Exchange MOF_LOV_CREATED flags between two objects after a
  * layout swap. No assumption is made on whether o1 or o2 have
  * created objects or not.
@@ -3437,6 +3455,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
+       bool layout_change = false;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
@@ -3451,11 +3470,29 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (layout == NULL)
                RETURN(-EPROTO);
 
        if (layout == NULL)
                RETURN(-EPROTO);
 
-       if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+       switch (layout->li_opc) {
+       case LAYOUT_INTENT_TRUNC:
+       case LAYOUT_INTENT_WRITE:
+               layout_change = true;
+               break;
+       case LAYOUT_INTENT_ACCESS:
+               break;
+       case LAYOUT_INTENT_READ:
+       case LAYOUT_INTENT_GLIMPSE:
+       case LAYOUT_INTENT_RELEASE:
+       case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), layout->li_opc);
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), layout->li_opc);
-               RETURN(-EINVAL);
+               rc = -ENOTSUPP;
+               break;
+       default:
+               CERROR("%s: Unknown layout intent opc %d\n",
+                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+               rc = -EINVAL;
+               break;
        }
        }
+       if (rc < 0)
+               RETURN(rc);
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
@@ -3480,8 +3517,14 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
-       GOTO(out_obj, rc);
+       if (rc)
+               GOTO(out_obj, rc);
 
 
+       if (layout_change) {
+               rc = mdt_layout_change(info, obj, layout);
+               if (rc)
+                       GOTO(out_obj, rc);
+       }
 out_obj:
        mdt_object_put(info->mti_env, obj);
 
 out_obj:
        mdt_object_put(info->mti_env, obj);
 
index 3ed4c3a..c62e2e9 100644 (file)
@@ -2224,6 +2224,16 @@ static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
        return avail;
 }
 
        return avail;
 }
 
+static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+       if (it != NULL &&
+           (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+            it->it_op == IT_READDIR ||
+            (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
+                       return true;
+       return false;
+}
+
 /* Get a modify RPC slot from the obd client @cli according
  * to the kind of operation @opc that is going to be sent
  * and the intent @it of the operation if it applies.
 /* Get a modify RPC slot from the obd client @cli according
  * to the kind of operation @opc that is going to be sent
  * and the intent @it of the operation if it applies.
@@ -2242,8 +2252,7 @@ __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
        /* read-only metadata RPCs don't consume a slot on MDT
         * for reply reconstruction
         */
        /* read-only metadata RPCs don't consume a slot on MDT
         * for reply reconstruction
         */
-       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+       if (obd_skip_mod_rpc_slot(it))
                return 0;
 
        if (opc == MDS_CLOSE)
                return 0;
 
        if (opc == MDS_CLOSE)
@@ -2289,8 +2298,7 @@ void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
 {
        bool                    close_req = false;
 
 {
        bool                    close_req = false;
 
-       if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+       if (obd_skip_mod_rpc_slot(it))
                return;
 
        if (opc == MDS_CLOSE)
                return;
 
        if (opc == MDS_CLOSE)
index 1a7f069..5928306 100644 (file)
@@ -1929,17 +1929,17 @@ EXPORT_SYMBOL(req_capsule_server_pack);
  * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
  * corresponding to the given RMF (\a field).
  */
  * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
  * corresponding to the given RMF (\a field).
  */
-static __u32 __req_capsule_offset(const struct req_capsule *pill,
-                                 const struct req_msg_field *field,
-                                 enum req_location loc)
+__u32 __req_capsule_offset(const struct req_capsule *pill,
+                          const struct req_msg_field *field,
+                          enum req_location loc)
 {
        unsigned int offset;
 
 {
        unsigned int offset;
 
-        offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
-        LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n",
-                            pill->rc_fmt->rf_name,
-                            field->rmf_name, offset, loc);
-        offset --;
+       offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
+       LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n",
+                            pill->rc_fmt->rf_name,
+                            field->rmf_name, offset, loc);
+       offset--;
 
        LASSERT(offset < REQ_MAX_FIELD_NR);
         return offset;
 
        LASSERT(offset < REQ_MAX_FIELD_NR);
         return offset;
index 1dbc82d..5da292f 100644 (file)
@@ -308,6 +308,11 @@ void sptlrpc_conf_fini(void);
 int  sptlrpc_init(void);
 void sptlrpc_fini(void);
 
 int  sptlrpc_init(void);
 void sptlrpc_fini(void);
 
+/* layout.c */
+__u32 __req_capsule_offset(const struct req_capsule *pill,
+                          const struct req_msg_field *field,
+                          enum req_location loc);
+
 static inline bool ptlrpc_recoverable_error(int rc)
 {
        return (rc == -ENOTCONN || rc == -ENODEV);
 static inline bool ptlrpc_recoverable_error(int rc)
 {
        return (rc == -ENOTCONN || rc == -ENODEV);
index 79237a0..7037b91 100644 (file)
@@ -1657,11 +1657,14 @@ EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
  * so caller should refresh its local pointers if needed.
  */
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
  * so caller should refresh its local pointers if needed.
  */
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
-                               int segment, int newsize)
-{
-        struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
-        struct ptlrpc_sec_cops   *cops;
-        struct lustre_msg        *msg = req->rq_reqmsg;
+                              const struct req_msg_field *field,
+                              int newsize)
+{
+       struct req_capsule *pill = &req->rq_pill;
+       struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
+       struct ptlrpc_sec_cops *cops;
+       struct lustre_msg *msg = req->rq_reqmsg;
+       int segment = __req_capsule_offset(pill, field, RCL_CLIENT);
 
         LASSERT(ctx);
         LASSERT(msg);
 
         LASSERT(ctx);
         LASSERT(msg);
index 2720a1a..805344a 100644 (file)
@@ -47,6 +47,9 @@ test_0() {
        $LFS setstripe -E 1m -S 1M -c 1 -E -1 -c 1 $comp_file ||
                error "Create $comp_file failed"
 
        $LFS setstripe -E 1m -S 1M -c 1 -E -1 -c 1 $comp_file ||
                error "Create $comp_file failed"
 
+       #instantiate all components, so that objs are allocted
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
+
        local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
        local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
 
        local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
        local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
 
@@ -67,6 +70,9 @@ test_1() {
        $LFS setstripe -E 1m -S 1m -o 0 -E -1 -o 0 $comp_file ||
                error "Create $comp_file failed"
 
        $LFS setstripe -E 1m -S 1m -o 0 -E -1 -o 0 $comp_file ||
                error "Create $comp_file failed"
 
+       #instantiate all components, so that objs are allocted
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
+
        local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
        local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
 
        local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
        local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
 
@@ -97,8 +103,8 @@ test_2() {
 
        dd if=/dev/zero of=$comp_file bs=1M count=2 > /dev/null 2>&1 &&
                error "Write beyond component should fail"
 
        dd if=/dev/zero of=$comp_file bs=1M count=2 > /dev/null 2>&1 &&
                error "Write beyond component should fail"
-       dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 &&
-               error "Read beyond component should fail"
+       dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 ||
+               error "Read beyond component should short read, not fail"
 
        $LFS setstripe --component-add -E 2M -c 1 $comp_file ||
                error "Add component to $comp_file failed"
 
        $LFS setstripe --component-add -E 2M -c 1 $comp_file ||
                error "Add component to $comp_file failed"
@@ -166,6 +172,9 @@ test_3() {
        $LFS setstripe -E 1M -E 16M -E -1 $comp_file ||
                error "Create second $comp_file failed"
 
        $LFS setstripe -E 1M -E 16M -E -1 $comp_file ||
                error "Create second $comp_file failed"
 
+       #instantiate all components, so that objs are allocted
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=16k
+
        del_comp_and_verify $comp_file "init" 0 0
        rm -f $comp_file || error "Delete second $comp_file failed"
 }
        del_comp_and_verify $comp_file "init" 0 0
        rm -f $comp_file || error "Delete second $comp_file failed"
 }
@@ -195,6 +204,9 @@ test_5() {
        local comp_cnt=$($LFS getstripe --component-count $comp_file)
        [ $comp_cnt -ne 2 ] && error "file $comp_cnt != 2"
 
        local comp_cnt=$($LFS getstripe --component-count $comp_file)
        [ $comp_cnt -ne 2 ] && error "file $comp_cnt != 2"
 
+       #instantiate all components, so that objs are allocted
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=64k
+
        local ost_idx=$($LFS getstripe -I 1 -i $comp_file)
        [ $ost_idx -ne 0 ] &&
                error "component 1 ost_idx $ost_idx != 0"
        local ost_idx=$($LFS getstripe -I 1 -i $comp_file)
        [ $ost_idx -ne 0 ] &&
                error "component 1 ost_idx $ost_idx != 0"
index abcf0df..04bf813 100644 (file)
@@ -2105,6 +2105,7 @@ enum lov_dump_flags {
        LDF_IS_DIR      = 0x0001,
        LDF_IS_RAW      = 0x0002,
        LDF_INDENT      = 0x0004,
        LDF_IS_DIR      = 0x0001,
        LDF_IS_RAW      = 0x0002,
        LDF_INDENT      = 0x0004,
+       LDF_SKIP_OBJS   = 0x0008,
 };
 
 static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
 };
 
 static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
@@ -2115,6 +2116,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
        bool is_dir = flags & LDF_IS_DIR;
        bool is_raw = flags & LDF_IS_RAW;
        bool indent = flags & LDF_INDENT;
        bool is_dir = flags & LDF_IS_DIR;
        bool is_raw = flags & LDF_IS_RAW;
        bool indent = flags & LDF_INDENT;
+       bool skip_objs = flags & LDF_SKIP_OBJS;
        char *prefix = is_dir ? "" : "lmm_";
        char *separator = "";
        char *space = indent ? "      " : "";
        char *prefix = is_dir ? "" : "lmm_";
        char *separator = "";
        char *space = indent ? "      " : "";
@@ -2245,7 +2247,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
                if (verbose & ~VERBOSE_OFFSET)
                        llapi_printf(LLAPI_MSG_NORMAL, "%s%sstripe_offset: ",
                                     space, prefix);
                if (verbose & ~VERBOSE_OFFSET)
                        llapi_printf(LLAPI_MSG_NORMAL, "%s%sstripe_offset: ",
                                     space, prefix);
-               if (is_dir)
+               if (is_dir || skip_objs)
                        llapi_printf(LLAPI_MSG_NORMAL, "%d",
                                     lum->lmm_stripe_offset ==
                                     (typeof(lum->lmm_stripe_offset))(-1) ? -1 :
                        llapi_printf(LLAPI_MSG_NORMAL, "%d",
                                     lum->lmm_stripe_offset ==
                                     (typeof(lum->lmm_stripe_offset))(-1) ? -1 :
@@ -2276,6 +2278,7 @@ void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name,
 {
        bool is_dir = flags & LDF_IS_DIR;
        bool indent = flags & LDF_INDENT;
 {
        bool is_dir = flags & LDF_IS_DIR;
        bool indent = flags & LDF_INDENT;
+       bool skip_objs = flags & LDF_SKIP_OBJS;
        int i, obdstripe = (obdindex != OBD_NOT_FOUND) ? 0 : 1;
 
        if (!obdstripe) {
        int i, obdstripe = (obdindex != OBD_NOT_FOUND) ? 0 : 1;
 
        if (!obdstripe) {
@@ -2293,7 +2296,7 @@ void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name,
        lov_dump_user_lmm_header(lum, path, objects, header, depth, pool_name,
                                 flags);
 
        lov_dump_user_lmm_header(lum, path, objects, header, depth, pool_name,
                                 flags);
 
-       if (!is_dir && (header & VERBOSE_OBJID) &&
+       if (!is_dir && !skip_objs && (header & VERBOSE_OBJID) &&
            !(lum->lmm_pattern & LOV_PATTERN_F_RELEASED)) {
                char *space = "      - ";
 
            !(lum->lmm_pattern & LOV_PATTERN_F_RELEASED)) {
                char *space = "      - ";
 
@@ -2720,6 +2723,11 @@ static void lov_dump_comp_v1(struct find_param *param, char *path,
                    !(param->fp_comp_flags & entry->lcme_flags))
                        continue;
 
                    !(param->fp_comp_flags & entry->lcme_flags))
                        continue;
 
+               if (entry->lcme_flags & LCME_FL_INIT)
+                       flags &= ~LDF_SKIP_OBJS;
+               else
+                       flags |= LDF_SKIP_OBJS;
+
                if (param->fp_check_comp_id &&
                    param->fp_comp_id != entry->lcme_id)
                        continue;
                if (param->fp_check_comp_id &&
                    param->fp_comp_id != entry->lcme_id)
                        continue;
index 813e400..6136cff 100644 (file)
@@ -799,8 +799,13 @@ static bool llapi_layout_lum_valid(struct lov_user_md *lum, int lum_size)
                }
                obj_count = llapi_layout_objects_in_lum(lum, lum_size);
 
                }
                obj_count = llapi_layout_objects_in_lum(lum, lum_size);
 
-               if (obj_count != lum->lmm_stripe_count)
+               if (comp_v1) {
+                       if (!(comp_v1->lcm_entries[i].lcme_flags &
+                                LCME_FL_INIT) && obj_count != 0)
+                               return false;
+               } else if (obj_count != lum->lmm_stripe_count) {
                        return false;
                        return false;
+               }
        }
        return true;
 }
        }
        return true;
 }
@@ -1710,7 +1715,6 @@ int llapi_layout_comp_add(struct llapi_layout *layout)
                          llc_list);
 
        /* Inherit some attributes from existing component */
                          llc_list);
 
        /* Inherit some attributes from existing component */
-       new->llc_pattern = comp->llc_pattern;
        new->llc_stripe_size = comp->llc_stripe_size;
        new->llc_stripe_count = comp->llc_stripe_count;
        if (new->llc_extent.e_end <= last->llc_extent.e_end) {
        new->llc_stripe_size = comp->llc_stripe_size;
        new->llc_stripe_count = comp->llc_stripe_count;
        if (new->llc_extent.e_end <= last->llc_extent.e_end) {