Whamcloud - gitweb
LU-3285 lov: add MDT target to the LOV device 10/28010/15
authorMikhal Pershin <mike.pershin@intel.com>
Wed, 26 Apr 2017 11:24:57 +0000 (14:24 +0300)
committerMike Pershin <mike.pershin@intel.com>
Fri, 13 Oct 2017 12:37:18 +0000 (12:37 +0000)
MDC becomes LOV target like OSC for Data-on-MDT needs.
Patch does the following:
- new composite layout entry type is added - LLT_DOM to
describe Data-on-MDT striping.
- LOV process config log and checks for MDC targets organizing
them separately from OSCs
- LOV operations are changed where needed to understand new layout
entry type

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I8a985d66a5f283ed387a311ff46f307c60317a79
Reviewed-on: https://review.whamcloud.com/28010
Tested-by: Jenkins
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
17 files changed:
lustre/include/obd.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/lmv/lmv_obd.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_dev.c
lustre/lov/lov_ea.c
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_obd.c
lustre/lov/lov_object.c
lustre/lov/lov_offset.c
lustre/mdc/mdc_request.c
lustre/mgs/mgs_llog.c
lustre/obdclass/obd_config.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 29a3cb9..ccd7fd9 100644 (file)
@@ -372,6 +372,11 @@ struct lov_tgt_desc {
                             ltd_reap:1;  /* should this target be deleted */
 };
 
+struct lov_md_tgt_desc {
+       struct obd_device *lmtd_mdc;
+       __u32              lmtd_index;
+};
+
 struct lov_obd {
        struct lov_desc         desc;
        struct lov_tgt_desc   **lov_tgts;               /* sparse array */
@@ -394,10 +399,13 @@ struct lov_obd {
        struct cl_client_cache *lov_cache;
 
        struct rw_semaphore     lov_notify_lock;
+       /* Data-on-MDT: MDC array */
+       struct lov_md_tgt_desc  *lov_mdc_tgts;
 };
 
 struct lmv_tgt_desc {
        struct obd_uuid         ltd_uuid;
+       struct obd_device       *ltd_obd;
        struct obd_export       *ltd_exp;
        __u32                   ltd_idx;
        struct mutex            ltd_fid_mutex;
index bb18450..f4adad5 100644 (file)
@@ -432,7 +432,7 @@ enum ll_lease_type {
 #define LOV_PATTERN_NONE       0x000
 #define LOV_PATTERN_RAID0      0x001
 #define LOV_PATTERN_RAID1      0x002
-#define LOV_PATTERN_FIRST      0x100
+#define LOV_PATTERN_MDT                0x100
 #define LOV_PATTERN_CMOBD      0x200
 
 #define LOV_PATTERN_F_MASK     0xffff0000
@@ -443,6 +443,7 @@ enum ll_lease_type {
 static inline bool lov_pattern_supported(__u32 pattern)
 {
        return pattern == LOV_PATTERN_RAID0 ||
+              pattern == LOV_PATTERN_MDT ||
               pattern == (LOV_PATTERN_RAID0 | LOV_PATTERN_F_RELEASED);
 }
 
index 19bbe64..9034ba0 100644 (file)
@@ -418,7 +418,7 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
        mutex_lock(&lmv->lmv_init_mutex);
        if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
                tgt = lmv->tgts[index];
-               CERROR("%s: UUID %s already assigned at LOV target index %d:"
+               CERROR("%s: UUID %s already assigned at LMV target index %d:"
                       " rc = %d\n", obd->obd_name,
                       obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
                mutex_unlock(&lmv->lmv_init_mutex);
index ca40cc8..f97747d 100644 (file)
@@ -91,6 +91,12 @@ enum lov_device_flags {
  * Upper half.
  */
 
+/* Data-on-MDT array item in lov_device::ld_md_tgts[] */
+struct lovdom_device {
+       struct cl_device        *ldm_mdc;
+       int                      ldm_idx;
+};
+
 struct lov_device {
         /*
          * XXX Locking of lov-private data is missing.
@@ -101,6 +107,13 @@ struct lov_device {
         __u32                     ld_target_nr;
         struct lovsub_device    **ld_target;
         __u32                     ld_flags;
+
+       /* Data-on-MDT devices */
+       __u32                     ld_md_tgts_nr;
+       struct lovdom_device     *ld_md_tgts;
+       struct obd_device        *ld_lmv;
+       /* LU site for subdevices */
+       struct lu_site            ld_site;
 };
 
 /**
@@ -129,6 +142,34 @@ static inline char *llt2str(enum lov_layout_type llt)
        return "";
 }
 
+/**
+ * Return lov_layout_entry_type associated with a given composite layout
+ * entry.
+ */
+static inline __u32 lov_entry_type(struct lov_stripe_md_entry *lsme)
+{
+       if ((lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_RAID0) ||
+           (lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_MDT))
+               return lov_pattern(lsme->lsme_pattern);
+       return 0;
+}
+
+struct lov_layout_entry;
+struct lov_object;
+struct lov_lock_sub;
+
+struct lov_comp_layout_entry_ops {
+       int (*lco_init)(const struct lu_env *env, struct lov_device *dev,
+                       struct lov_object *lov, unsigned int index,
+                       const struct cl_object_conf *conf,
+                       struct lov_layout_entry *lle);
+       void (*lco_fini)(const struct lu_env *env,
+                        struct lov_layout_entry *lle);
+       int  (*lco_getattr)(const struct lu_env *env, struct lov_object *obj,
+                           unsigned int index, struct lov_layout_entry *lle,
+                           struct cl_attr **attr);
+};
+
 struct lov_layout_raid0 {
        unsigned               lo_nr;
        /**
@@ -165,6 +206,25 @@ struct lov_layout_raid0 {
        struct cl_attr         lo_attr;
 };
 
+struct lov_layout_dom {
+       /* keep this always at first place so DOM layout entry
+        * can be addressed also as RAID0 after initialization.
+        */
+       struct lov_layout_raid0 lo_dom_r0;
+       struct lovsub_object *lo_dom;
+       struct lov_oinfo *lo_loi;
+};
+
+struct lov_layout_entry {
+       __u32 lle_type;
+       struct lu_extent lle_extent;
+       struct lov_comp_layout_entry_ops *lle_comp_ops;
+       union {
+               struct lov_layout_raid0 lle_raid0;
+               struct lov_layout_dom lle_dom;
+       };
+};
+
 /**
  * lov-specific file state.
  *
@@ -180,7 +240,7 @@ struct lov_layout_raid0 {
  * function corresponding to the current layout type.
  */
 struct lov_object {
-       struct cl_object       lo_cl;
+       struct cl_object        lo_cl;
        /**
         * Serializes object operations with transitions between layout types.
         *
@@ -220,13 +280,10 @@ struct lov_object {
                } released;
                struct lov_layout_composite {
                        /**
-                        * Current valid entry count of lo_entries.
+                        * Current valid entry count of entries.
                         */
                        unsigned int lo_entry_count;
-                       struct lov_layout_entry {
-                               struct lu_extent lle_extent;
-                               struct lov_layout_raid0 lle_raid0;
-                       } *lo_entries;
+                       struct lov_layout_entry *lo_entries;
                } composite;
        } u;
        /**
@@ -634,6 +691,15 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env)
         return info;
 }
 
+static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i)
+{
+       LASSERT(lov->lo_type == LLT_COMP);
+       LASSERTF(i < lov->u.composite.lo_entry_count,
+                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+       return &lov->u.composite.lo_entries[i];
+}
+
 static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
 {
        LASSERT(lov->lo_type == LLT_COMP);
index 2506c39..bf9dba2 100644 (file)
@@ -142,64 +142,114 @@ struct lu_context_key lov_session_key = {
 /* type constructor/destructor: lov_type_{init,fini,start,stop}() */
 LU_TYPE_INIT_FINI(lov, &lov_key, &lov_session_key);
 
+
+static int lov_mdc_dev_init(const struct lu_env *env, struct lov_device *ld,
+                           struct lu_device *mdc_dev, __u32 idx, __u32 nr)
+{
+       struct cl_device *cl;
+
+       ENTRY;
+       cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type,
+                          mdc_dev);
+       if (IS_ERR(cl))
+               RETURN(PTR_ERR(cl));
+
+       ld->ld_md_tgts[nr].ldm_mdc = cl;
+       ld->ld_md_tgts[nr].ldm_idx = idx;
+       RETURN(0);
+}
+
 static struct lu_device *lov_device_fini(const struct lu_env *env,
-                                         struct lu_device *d)
+                                        struct lu_device *d)
 {
-        int i;
-        struct lov_device *ld = lu2lov_dev(d);
+       struct lov_device *ld = lu2lov_dev(d);
+       int i;
 
-        LASSERT(ld->ld_lov != NULL);
-        if (ld->ld_target == NULL)
-                RETURN(NULL);
+       LASSERT(ld->ld_lov != NULL);
 
-        lov_foreach_target(ld, i) {
-                struct lovsub_device *lsd;
+       if (ld->ld_lmv != NULL) {
+               class_decref(ld->ld_lmv, "lov", d);
+               ld->ld_lmv = NULL;
+       }
 
-                lsd = ld->ld_target[i];
-                if (lsd != NULL) {
-                        cl_stack_fini(env, lovsub2cl_dev(lsd));
-                        ld->ld_target[i] = NULL;
-                }
-        }
-        RETURN(NULL);
+       if (ld->ld_md_tgts != NULL) {
+               for (i = 0; i < ld->ld_md_tgts_nr; i++) {
+                       if (ld->ld_md_tgts[i].ldm_mdc == NULL)
+                               continue;
+
+                       cl_stack_fini(env, ld->ld_md_tgts[i].ldm_mdc);
+                       ld->ld_md_tgts[i].ldm_mdc = NULL;
+                       ld->ld_lov->lov_mdc_tgts[i].lmtd_mdc = NULL;
+               }
+       }
+
+       if (ld->ld_target != NULL) {
+               lov_foreach_target(ld, i) {
+                       struct lovsub_device *lsd;
+
+                       lsd = ld->ld_target[i];
+                       if (lsd != NULL) {
+                               cl_stack_fini(env, lovsub2cl_dev(lsd));
+                               ld->ld_target[i] = NULL;
+                       }
+               }
+       }
+       RETURN(NULL);
 }
 
 static int lov_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
-        struct lov_device *ld = lu2lov_dev(d);
-        int i;
-        int rc = 0;
-
-        LASSERT(d->ld_site != NULL);
-        if (ld->ld_target == NULL)
-                RETURN(rc);
-
-        lov_foreach_target(ld, i) {
-                struct lovsub_device *lsd;
-                struct cl_device     *cl;
-                struct lov_tgt_desc  *desc;
-
-                desc = ld->ld_lov->lov_tgts[i];
-                if (desc == NULL)
-                        continue;
-
-                cl = cl_type_setup(env, d->ld_site, &lovsub_device_type,
-                                   desc->ltd_obd->obd_lu_dev);
-                if (IS_ERR(cl)) {
-                        rc = PTR_ERR(cl);
-                        break;
-                }
-                lsd = cl2lovsub_dev(cl);
-                ld->ld_target[i] = lsd;
-        }
+       struct lov_device *ld = lu2lov_dev(d);
+       int i;
+       int rc = 0;
+
+       /* check all added already MDC subdevices and initialize them */
+       for (i = 0; i < ld->ld_md_tgts_nr; i++) {
+               struct obd_device *mdc;
+               __u32 idx;
+
+               mdc = ld->ld_lov->lov_mdc_tgts[i].lmtd_mdc;
+               idx = ld->ld_lov->lov_mdc_tgts[i].lmtd_index;
+
+               if (mdc == NULL)
+                       continue;
+
+               rc = lov_mdc_dev_init(env, ld, mdc->obd_lu_dev, idx, i);
+               if (rc) {
+                       CERROR("%s: failed to add MDC %s as target: rc = %d\n",
+                              d->ld_obd->obd_name,
+                              obd_uuid2str(&mdc->obd_uuid), rc);
+                       GOTO(out_err, rc);
+               }
+       }
 
-        if (rc)
-                lov_device_fini(env, d);
-        else
-                ld->ld_flags |= LOV_DEV_INITIALIZED;
+       if (ld->ld_target == NULL)
+               RETURN(0);
 
-        RETURN(rc);
+       lov_foreach_target(ld, i) {
+               struct lovsub_device *lsd;
+               struct cl_device *cl;
+               struct lov_tgt_desc *desc;
+
+               desc = ld->ld_lov->lov_tgts[i];
+               if (desc == NULL)
+                       continue;
+
+               cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type,
+                                  desc->ltd_obd->obd_lu_dev);
+               if (IS_ERR(cl))
+                       GOTO(out_err, rc = PTR_ERR(cl));
+
+               lsd = cl2lovsub_dev(cl);
+               ld->ld_target[i] = lsd;
+       }
+       ld->ld_flags |= LOV_DEV_INITIALIZED;
+       RETURN(0);
+
+out_err:
+       lu_device_fini(d);
+       RETURN(rc);
 }
 
 /* Free the lov specific data created for the back end lu_device. */
@@ -209,9 +259,24 @@ static struct lu_device *lov_device_free(const struct lu_env *env,
        struct lov_device *ld = lu2lov_dev(d);
        const int nr = ld->ld_target_nr;
 
+       lu_site_fini(&ld->ld_site);
+
        cl_device_fini(lu2cl_dev(d));
-       if (ld->ld_target != NULL)
+       if (ld->ld_target) {
                OBD_FREE(ld->ld_target, nr * sizeof ld->ld_target[0]);
+               ld->ld_target = NULL;
+       }
+       if (ld->ld_md_tgts) {
+               OBD_FREE(ld->ld_md_tgts,
+                        sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX);
+               ld->ld_md_tgts = NULL;
+       }
+       /* free array of MDCs */
+       if (ld->ld_lov->lov_mdc_tgts) {
+               OBD_FREE(ld->ld_lov->lov_mdc_tgts,
+                        sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX);
+               ld->ld_lov->lov_mdc_tgts = NULL;
+       }
 
        OBD_FREE_PTR(ld);
        return NULL;
@@ -283,11 +348,9 @@ static int lov_cl_add_target(const struct lu_env *env, struct lu_device *dev,
                 RETURN(-EINVAL);
         }
 
-        rc = lov_expand_targets(env, ld);
-        if (rc == 0 && ld->ld_flags & LOV_DEV_INITIALIZED) {
-                LASSERT(dev->ld_site != NULL);
-
-               cl = cl_type_setup(env, dev->ld_site, &lovsub_device_type,
+       rc = lov_expand_targets(env, ld);
+       if (rc == 0 && ld->ld_flags & LOV_DEV_INITIALIZED) {
+               cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type,
                                   tgt->ltd_obd->obd_lu_dev);
                if (!IS_ERR(cl)) {
                        lsd = cl2lovsub_dev(cl);
@@ -303,34 +366,139 @@ static int lov_cl_add_target(const struct lu_env *env, struct lu_device *dev,
         RETURN(rc);
 }
 
-static int lov_process_config(const struct lu_env *env,
-                              struct lu_device *d, struct lustre_cfg *cfg)
+/**
+ * Add new MDC target device in LOV.
+ *
+ * This function is part of the configuration log processing. It adds new MDC
+ * device to the MDC device array indexed by their indexes.
+ *
+ * \param[in] env      execution environment
+ * \param[in] d                LU device of LOV device
+ * \param[in] mdc      MDC device to add
+ * \param[in] idx      MDC device index
+ *
+ * \retval             0 if successful
+ * \retval             negative value on error
+ */
+static int lov_add_mdc_target(const struct lu_env *env, struct lu_device *d,
+                             struct obd_device *mdc, __u32 idx)
 {
-        struct obd_device *obd = d->ld_obd;
-        int cmd;
-        int rc;
-        int gen;
-        __u32 index;
+       struct lov_device *ld = lu2lov_dev(d);
+       struct obd_device *lov_obd = d->ld_obd;
+       struct obd_device *lmv_obd;
+       int next;
+       int rc = 0;
 
-        obd_getref(obd);
+       ENTRY;
 
-        cmd = cfg->lcfg_command;
-        rc = lov_process_config_base(d->ld_obd, cfg, &index, &gen);
-        if (rc == 0) {
-                switch(cmd) {
-                case LCFG_LOV_ADD_OBD:
-                case LCFG_LOV_ADD_INA:
-                        rc = lov_cl_add_target(env, d, index);
-                        if (rc != 0)
-                               lov_del_target(d->ld_obd, index, NULL, 0);
-                        break;
-                case LCFG_LOV_DEL_OBD:
-                        lov_cl_del_target(env, d, index);
-                        break;
-                }
-        }
-        obd_putref(obd);
-        RETURN(rc);
+       LASSERT(mdc != NULL);
+       if (ld->ld_md_tgts_nr == LOV_MDC_TGT_MAX) {
+               /* If the maximum value of LOV_MDC_TGT_MAX will become too
+                * small then all MD target handling must be rewritten in LOD
+                * manner, check lod_add_device() and related functionality.
+                */
+               CERROR("%s: cannot serve more than %d MDC devices\n",
+                      lov_obd->obd_name, LOV_MDC_TGT_MAX);
+               RETURN(-ERANGE);
+       }
+
+       /* grab FLD from lmv, do that here, when first MDC is added
+        * to be sure LMV is set up and can be found */
+       if (ld->ld_lmv == NULL) {
+               next = 0;
+               while ((lmv_obd = class_devices_in_group(&lov_obd->obd_uuid,
+                                                        &next)) != NULL) {
+                       if ((strncmp(lmv_obd->obd_type->typ_name,
+                                    LUSTRE_LMV_NAME,
+                                    strlen(LUSTRE_LMV_NAME)) == 0))
+                               break;
+               }
+               if (lmv_obd == NULL) {
+                       CERROR("%s: cannot find LMV OBD by UUID (%s)\n",
+                              lov_obd->obd_name,
+                              obd_uuid2str(&lmv_obd->obd_uuid));
+                       RETURN(-ENODEV);
+               }
+               spin_lock(&lmv_obd->obd_dev_lock);
+               class_incref(lmv_obd, "lov", ld);
+               spin_unlock(&lmv_obd->obd_dev_lock);
+               ld->ld_lmv = lmv_obd;
+       }
+
+       LASSERT(lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_mdc ==
+               NULL);
+
+       if (ld->ld_flags & LOV_DEV_INITIALIZED) {
+               rc = lov_mdc_dev_init(env, ld, mdc->obd_lu_dev, idx,
+                                     ld->ld_md_tgts_nr);
+               if (rc) {
+                       CERROR("%s: failed to add MDC %s as target: rc = %d\n",
+                              lov_obd->obd_name, obd_uuid2str(&mdc->obd_uuid),
+                              rc);
+                       RETURN(rc);
+               }
+       }
+
+       lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_mdc = mdc;
+       lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_index = idx;
+       ld->ld_md_tgts_nr++;
+
+       RETURN(rc);
+}
+
+static int lov_process_config(const struct lu_env *env,
+                              struct lu_device *d, struct lustre_cfg *cfg)
+{
+       struct obd_device *obd = d->ld_obd;
+       int cmd;
+       int rc;
+       int gen;
+       __u32 index;
+
+       obd_getref(obd);
+
+       cmd = cfg->lcfg_command;
+
+       rc = lov_process_config_base(d->ld_obd, cfg, &index, &gen);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       switch (cmd) {
+       case LCFG_LOV_ADD_OBD:
+       case LCFG_LOV_ADD_INA:
+               rc = lov_cl_add_target(env, d, index);
+               if (rc != 0)
+                       lov_del_target(d->ld_obd, index, NULL, 0);
+               break;
+       case LCFG_LOV_DEL_OBD:
+               lov_cl_del_target(env, d, index);
+               break;
+       case LCFG_ADD_MDC:
+       {
+               struct obd_device       *mdc;
+               struct obd_uuid          tgt_uuid;
+
+               /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
+                * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
+               if (LUSTRE_CFG_BUFLEN(cfg, 1) > sizeof(tgt_uuid.uuid))
+                       GOTO(out, rc = -EINVAL);
+
+               obd_str2uuid(&tgt_uuid, lustre_cfg_buf(cfg, 1));
+
+               if (sscanf(lustre_cfg_buf(cfg, 2), "%d", &index) != 1)
+                       GOTO(out, rc = -EINVAL);
+
+               mdc = class_find_client_obd(&tgt_uuid, LUSTRE_MDC_NAME,
+                                           &obd->obd_uuid);
+               if (mdc == NULL)
+                       GOTO(out, rc = -ENODEV);
+               rc = lov_add_mdc_target(env, d, mdc, index);
+               break;
+       }
+       }
+out:
+       obd_putref(obd);
+       RETURN(rc);
 }
 
 static const struct lu_device_operations lov_lu_ops = {
@@ -359,13 +527,45 @@ static struct lu_device *lov_device_alloc(const struct lu_env *env,
         obd = class_name2obd(lustre_cfg_string(cfg, 0));
         LASSERT(obd != NULL);
         rc = lov_setup(obd, cfg);
-        if (rc) {
-                lov_device_free(env, d);
-                RETURN(ERR_PTR(rc));
-        }
+       if (rc)
+               GOTO(out, rc);
+
+       /* Alloc MDC devices array */
+       /* XXX: need dynamic allocation at some moment */
+       OBD_ALLOC(ld->ld_md_tgts, sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX);
+       if (ld->ld_md_tgts == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       ld->ld_md_tgts_nr = 0;
+
+       ld->ld_lov = &obd->u.lov;
+       OBD_ALLOC(ld->ld_lov->lov_mdc_tgts,
+                 sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX);
+       if (ld->ld_lov->lov_mdc_tgts == NULL)
+               GOTO(out_md_tgts, rc = -ENOMEM);
+
+       rc = lu_site_init(&ld->ld_site, d);
+       if (rc != 0)
+               GOTO(out_mdc_tgts, rc);
+
+       rc = lu_site_init_finish(&ld->ld_site);
+       if (rc != 0)
+               GOTO(out_site, rc);
+
+       RETURN(d);
+out_site:
+       lu_site_fini(&ld->ld_site);
+out_mdc_tgts:
+       OBD_FREE(ld->ld_lov->lov_mdc_tgts,
+                sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX);
+       ld->ld_lov->lov_mdc_tgts = NULL;
+out_md_tgts:
+       OBD_FREE(ld->ld_md_tgts, sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX);
+       ld->ld_md_tgts = NULL;
+out:
+       OBD_FREE_PTR(ld);
 
-        ld->ld_lov = &obd->u.lov;
-        RETURN(d);
+       return ERR_PTR(rc);
 }
 
 static const struct lu_device_type_operations lov_device_type_ops = {
index b29bb43..8025d1c 100644 (file)
@@ -90,7 +90,8 @@ static int lsm_lmm_verify_v1v3(struct lov_mds_md *lmm, size_t lmm_size,
                return -EINVAL;
        }
 
-       if (lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_RAID0) {
+       if (lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_MDT &&
+           lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_RAID0) {
                CERROR("bad striping pattern\n");
                lov_dump_lmm_common(D_WARNING, lmm);
                return -EINVAL;
@@ -201,6 +202,12 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
                        GOTO(out_lsme, rc = -E2BIG);
        }
 
+       /* with Data-on-MDT set maxbytes to stripe size */
+       if (lsme_is_dom(lsme)) {
+               lov_bytes = lsme->lsme_stripe_size;
+               goto out_dom;
+       }
+
        for (i = 0; i < stripe_count; i++) {
                struct lov_oinfo *loi;
                struct lov_tgt_desc *ltd;
@@ -244,6 +251,7 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
 
        lov_bytes = min_stripe_maxbytes * stripe_count;
 
+out_dom:
        if (maxbytes != NULL) {
                if (lov_bytes < min_stripe_maxbytes) /* handle overflow */
                        *maxbytes = MAX_LFS_FILESIZE;
@@ -381,7 +389,8 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
        unsigned int stripe_count;
 
        stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
-       if (stripe_count == 0)
+       if (stripe_count == 0 &&
+           lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_MDT)
                RETURN(ERR_PTR(-EINVAL));
        /* un-instantiated lmm contains no ost id info, i.e. lov_ost_data_v1 */
        if (!inited)
@@ -467,9 +476,10 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
                                            maxbytes;
                        /* the last component hasn't been defined, or
                         * lsm_maxbytes overflowed. */
-                       if (lsme->lsme_extent.e_end != LUSTRE_EOF ||
-                           lsm->lsm_maxbytes <
-                           (loff_t)lsme->lsme_extent.e_start)
+                       if (!lsme_is_dom(lsme) &&
+                           (lsme->lsme_extent.e_end != LUSTRE_EOF ||
+                            lsm->lsm_maxbytes <
+                            (loff_t)lsme->lsme_extent.e_start))
                                lsm->lsm_maxbytes = MAX_LFS_FILESIZE;
                }
        }
index aa400ad..a081b33 100644 (file)
@@ -54,6 +54,11 @@ struct lov_stripe_md_entry {
        struct lov_oinfo       *lsme_oinfo[];
 };
 
+static inline bool lsme_is_dom(struct lov_stripe_md_entry *lsme)
+{
+       return (lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_MDT);
+}
+
 static inline void copy_lsm_entry(struct lov_stripe_md_entry *dst,
                                  struct lov_stripe_md_entry *src)
 {
@@ -306,6 +311,8 @@ extern struct lprocfs_vars lprocfs_lov_obd_vars[];
 /* lov_cl.c */
 extern struct lu_device_type lov_device_type;
 
+#define LOV_MDC_TGT_MAX 256
+
 /* pools */
 extern struct cfs_hash_ops pool_hash_operations;
 /* ost_pool methods */
index 577e7d1..acddf1d 100644 (file)
@@ -571,7 +571,10 @@ static int lov_io_setattr_iter_init(const struct lu_env *env,
 
        if (cl_io_is_trunc(io) && lio->lis_pos > 0) {
                index = lov_lsm_entry(lsm, lio->lis_pos - 1);
-               if (index > 0 && !lsm_entry_inited(lsm, index)) {
+               /* no entry found for such offset */
+               if (index < 0) {
+                       RETURN(io->ci_result = -ENODATA);
+               } else if (!lsm_entry_inited(lsm, index)) {
                        io->ci_need_write_intent = 1;
                        RETURN(io->ci_result = -ENODATA);
                }
index 5204de9..0e645b7 100644 (file)
@@ -899,7 +899,10 @@ int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
         ENTRY;
 
         switch(cmd = lcfg->lcfg_command) {
-        case LCFG_LOV_ADD_OBD:
+       case LCFG_ADD_MDC:
+       case LCFG_DEL_MDC:
+               break;
+       case LCFG_LOV_ADD_OBD:
         case LCFG_LOV_ADD_INA:
         case LCFG_LOV_DEL_OBD: {
                 __u32 index;
@@ -1264,58 +1267,71 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
        struct obd_device *obddev = class_exp2obd(exp);
        struct lov_obd *lov = &obddev->u.lov;
        struct lov_tgt_desc *tgt;
-       int do_inactive = 0;
-       int no_set = 0;
-       u32 count;
+       bool do_inactive = false, no_set = false;
        u32 i;
        int rc = 0;
        int err;
-        ENTRY;
 
-        if (set == NULL) {
-                no_set = 1;
-                set = ptlrpc_prep_set();
-                if (!set)
-                        RETURN(-ENOMEM);
-        }
+       ENTRY;
 
-        obd_getref(obddev);
-        count = lov->desc.ld_tgt_count;
+       if (set == NULL) {
+               no_set = true;
+               set = ptlrpc_prep_set();
+               if (!set)
+                       RETURN(-ENOMEM);
+       }
+
+       obd_getref(obddev);
 
        if (KEY_IS(KEY_CHECKSUM)) {
-                do_inactive = 1;
+               do_inactive = true;
        } else if (KEY_IS(KEY_CACHE_SET)) {
                LASSERT(lov->lov_cache == NULL);
                lov->lov_cache = val;
-               do_inactive = 1;
+               do_inactive = true;
                cl_cache_incref(lov->lov_cache);
        }
 
-       for (i = 0; i < count; i++) {
+       for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                tgt = lov->lov_tgts[i];
 
-                /* OST was disconnected */
-                if (!tgt || !tgt->ltd_exp)
-                        continue;
+               /* OST was disconnected */
+               if (tgt == NULL || tgt->ltd_exp == NULL)
+                       continue;
 
-                /* OST is inactive and we don't want inactive OSCs */
-                if (!tgt->ltd_active && !do_inactive)
-                        continue;
+               /* OST is inactive and we don't want inactive OSCs */
+               if (!tgt->ltd_active && !do_inactive)
+                       continue;
 
                err = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
                                         vallen, val, set);
-                if (!rc)
-                        rc = err;
-        }
 
-        obd_putref(obddev);
-        if (no_set) {
-                err = ptlrpc_set_wait(set);
-                if (!rc)
-                        rc = err;
-                ptlrpc_set_destroy(set);
-        }
-        RETURN(rc);
+               if (rc == 0)
+                       rc = err;
+       }
+
+       /* cycle through MDC target for Data-on-MDT */
+       for (i = 0; i < LOV_MDC_TGT_MAX; i++) {
+               struct obd_device *mdc;
+
+               mdc = lov->lov_mdc_tgts[i].lmtd_mdc;
+               if (mdc == NULL)
+                       continue;
+
+               err = obd_set_info_async(env, mdc->obd_self_export,
+                                        keylen, key, vallen, val, set);
+               if (rc == 0)
+                       rc = err;
+       }
+
+       obd_putref(obddev);
+       if (no_set) {
+               err = ptlrpc_set_wait(set);
+               if (rc == 0)
+                       rc = err;
+               ptlrpc_set_destroy(set);
+       }
+       RETURN(rc);
 }
 
 void lov_stripe_lock(struct lov_stripe_md *md)
index e26769c..216221a 100644 (file)
@@ -89,30 +89,40 @@ static void lov_lsm_put(struct lov_stripe_md *lsm)
  * Lov object layout operations.
  *
  */
-static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
-                         struct lov_object *lov, struct lov_stripe_md *lsm,
-                         const struct cl_object_conf *conf,
-                         union lov_layout_state *state)
+
+static struct cl_object *lov_sub_find(const struct lu_env *env,
+                                     struct cl_device *dev,
+                                     const struct lu_fid *fid,
+                                     const struct cl_object_conf *conf)
 {
-       return 0;
+       struct lu_object *o;
+
+       ENTRY;
+
+       o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
+       LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
+       RETURN(lu2cl(o));
 }
 
-static struct cl_object *lov_sub_find(const struct lu_env *env,
-                                      struct cl_device *dev,
-                                      const struct lu_fid *fid,
-                                      const struct cl_object_conf *conf)
+static int lov_page_slice_fixup(struct lov_object *lov,
+                               struct cl_object *stripe)
 {
-        struct lu_object *o;
+       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+       struct cl_object *o;
 
-        ENTRY;
-        o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
-        LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
-        RETURN(lu2cl(o));
+       if (stripe == NULL)
+               return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
+                      cfs_size_round(sizeof(struct lov_page));
+
+       cl_object_for_each(o, stripe)
+               o->co_slice_off += hdr->coh_page_bufsize;
+
+       return cl_object_header(stripe)->coh_page_bufsize;
 }
 
 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
-                       struct cl_object *subobj, struct lov_layout_raid0 *r0,
-                       struct lov_oinfo *oinfo, int idx)
+                       struct cl_object *subobj, struct lov_oinfo *oinfo,
+                       int idx)
 {
        struct cl_object_header *hdr;
        struct cl_object_header *subhdr;
@@ -132,7 +142,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                return -EIO;
        }
 
-       hdr    = cl_object_header(lov2cl(lov));
+       hdr = cl_object_header(lov2cl(lov));
        subhdr = cl_object_header(subobj);
 
        CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID
@@ -145,13 +155,14 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
        spin_lock(&subhdr->coh_attr_guard);
        parent = subhdr->coh_parent;
        if (parent == NULL) {
+               struct lovsub_object *lso = cl2lovsub(subobj);
+
                subhdr->coh_parent = hdr;
                spin_unlock(&subhdr->coh_attr_guard);
                subhdr->coh_nesting = hdr->coh_nesting + 1;
                lu_object_ref_add(&subobj->co_lu, "lov-parent", lov);
-               r0->lo_sub[stripe] = cl2lovsub(subobj);
-               r0->lo_sub[stripe]->lso_super = lov;
-               r0->lo_sub[stripe]->lso_index = idx;
+               lso->lso_super = lov;
+               lso->lso_index = idx;
                result = 0;
        } else {
                struct lu_object  *old_obj;
@@ -181,33 +192,19 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
        return result;
 }
 
-static int lov_page_slice_fixup(struct lov_object *lov,
-                               struct cl_object *stripe)
-{
-       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
-       struct cl_object *o;
-
-       if (stripe == NULL)
-               return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
-                      cfs_size_round(sizeof(struct lov_page));
-
-       cl_object_for_each(o, stripe)
-               o->co_slice_off += hdr->coh_page_bufsize;
-
-       return cl_object_header(stripe)->coh_page_bufsize;
-}
-
 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
-                         struct lov_object *lov, int index,
-                         struct lov_layout_raid0 *r0)
+                         struct lov_object *lov, unsigned int index,
+                         const struct cl_object_conf *conf,
+                         struct lov_layout_entry *lle)
 {
-       struct lov_thread_info  *lti     = lov_env_info(env);
-       struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
-       struct lu_fid           *ofid    = &lti->lti_fid;
-       struct cl_object        *stripe;
+       struct lov_layout_raid0 *r0 = &lle->lle_raid0;
+       struct lov_thread_info *lti = lov_env_info(env);
+       struct cl_object_conf *subconf = &lti->lti_stripe_conf;
+       struct lu_fid *ofid = &lti->lti_fid;
+       struct cl_object *stripe;
        struct lov_stripe_md_entry *lse  = lov_lse(lov, index);
        int result;
-       int psz;
+       int psz, sz;
        int i;
 
        ENTRY;
@@ -255,7 +252,7 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
                if (IS_ERR(stripe))
                        GOTO(out, result = PTR_ERR(stripe));
 
-               result = lov_init_sub(env, lov, stripe, r0, oinfo,
+               result = lov_init_sub(env, lov, stripe, oinfo,
                                      lov_comp_index(index, i));
                if (result == -EAGAIN) { /* try again */
                        --i;
@@ -264,7 +261,9 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
                }
 
                if (result == 0) {
-                       int sz = lov_page_slice_fixup(lov, stripe);
+                       r0->lo_sub[i] = cl2lovsub(stripe);
+
+                       sz = lov_page_slice_fixup(lov, stripe);
                        LASSERT(ergo(psz > 0, psz == sz));
                        psz = sz;
                }
@@ -275,116 +274,6 @@ out:
        RETURN(result);
 }
 
-static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
-                             struct lov_object *lov, struct lov_stripe_md *lsm,
-                             const struct cl_object_conf *conf,
-                             union lov_layout_state *state)
-{
-       struct lov_layout_composite *comp = &state->composite;
-       unsigned int entry_count;
-       unsigned int psz = 0;
-       int result = 0;
-       int i;
-
-       ENTRY;
-
-       LASSERT(lsm->lsm_entry_count > 0);
-       LASSERT(lov->lo_lsm == NULL);
-       lov->lo_lsm = lsm_addref(lsm);
-       lov->lo_layout_invalid = true;
-
-       entry_count = lsm->lsm_entry_count;
-       comp->lo_entry_count = entry_count;
-
-       OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
-       if (comp->lo_entries == NULL)
-               RETURN(-ENOMEM);
-
-       for (i = 0; i < entry_count; i++) {
-               struct lov_layout_entry *le = &comp->lo_entries[i];
-
-               le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
-               /**
-                * If the component has not been init-ed on MDS side, for
-                * PFL layout, we'd know that the components beyond this one
-                * will be dynamically init-ed later on file write/trunc ops.
-                */
-               if (!lsm_entry_inited(lsm, i))
-                       continue;
-
-               result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
-               if (result < 0)
-                       break;
-
-               LASSERT(ergo(psz > 0, psz == result));
-               psz = result;
-       }
-       if (psz > 0)
-               cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
-
-       return result > 0 ? 0 : result;
-}
-
-static int lov_init_released(const struct lu_env *env,
-                            struct lov_device *dev, struct lov_object *lov,
-                            struct lov_stripe_md *lsm,
-                            const struct cl_object_conf *conf,
-                            union lov_layout_state *state)
-{
-       LASSERT(lsm != NULL);
-       LASSERT(lsm->lsm_is_released);
-       LASSERT(lov->lo_lsm == NULL);
-
-       lov->lo_lsm = lsm_addref(lsm);
-       return 0;
-}
-
-static struct cl_object *lov_find_subobj(const struct lu_env *env,
-                                        struct lov_object *lov,
-                                        struct lov_stripe_md *lsm,
-                                        int index)
-{
-       struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
-       struct lov_thread_info  *lti = lov_env_info(env);
-       struct lu_fid           *ofid = &lti->lti_fid;
-       struct lov_oinfo        *oinfo;
-       struct cl_device        *subdev;
-       int                     entry = lov_comp_entry(index);
-       int                     stripe = lov_comp_stripe(index);
-       int                     ost_idx;
-       int                     rc;
-       struct cl_object        *result;
-
-       if (lov->lo_type != LLT_COMP)
-               GOTO(out, result = NULL);
-
-       if (entry >= lsm->lsm_entry_count ||
-           stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
-               GOTO(out, result = NULL);
-
-       oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
-       ost_idx = oinfo->loi_ost_idx;
-       rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
-       if (rc != 0)
-               GOTO(out, result = NULL);
-
-       subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
-       result = lov_sub_find(env, subdev, ofid, NULL);
-out:
-       if (result == NULL)
-               result = ERR_PTR(-EINVAL);
-       return result;
-}
-
-static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
-                           union lov_layout_state *state)
-{
-       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
-
-       lov_layout_wait(env, lov);
-       return 0;
-}
-
 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                               struct lov_layout_raid0 *r0,
                               struct lovsub_object *los, int idx)
@@ -405,8 +294,8 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
         cl_object_put(env, sub);
 
-        /* ... wait until it is actually destroyed---sub-object clears its
-         * ->lo_sub[] slot in lovsub_object_fini() */
+       /* ... wait until it is actually destroyed---sub-object clears its
+        * ->lo_sub[] slot in lovsub_object_free() */
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
@@ -432,8 +321,10 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 }
 
 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
-                            struct lov_layout_raid0 *r0)
+                            struct lov_layout_entry *lle)
 {
+       struct lov_layout_raid0 *r0 = &lle->lle_raid0;
+
        ENTRY;
 
         if (r0->lo_sub != NULL) {
@@ -456,6 +347,386 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
        EXIT;
 }
 
+static void lov_fini_raid0(const struct lu_env *env,
+                          struct lov_layout_entry *lle)
+{
+       struct lov_layout_raid0 *r0 = &lle->lle_raid0;
+
+       if (r0->lo_sub != NULL) {
+               OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+               r0->lo_sub = NULL;
+       }
+}
+
+static int lov_print_raid0(const struct lu_env *env, void *cookie,
+                          lu_printer_t p, const struct lov_layout_entry *lle)
+{
+       const struct lov_layout_raid0 *r0 = &lle->lle_raid0;
+       int i;
+
+       for (i = 0; i < r0->lo_nr; ++i) {
+               struct lu_object *sub;
+
+               if (r0->lo_sub[i] != NULL) {
+                       sub = lovsub2lu(r0->lo_sub[i]);
+                       lu_object_print(env, cookie, p, sub);
+               } else {
+                       (*p)(env, cookie, "sub %d absent\n", i);
+               }
+       }
+       return 0;
+}
+
+static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
+                             unsigned int index, struct lov_layout_entry *lle,
+                             struct cl_attr **lov_attr)
+{
+       struct lov_layout_raid0 *r0 = &lle->lle_raid0;
+       struct lov_stripe_md *lsm = lov->lo_lsm;
+       struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
+       struct cl_attr *attr = &r0->lo_attr;
+       __u64 kms = 0;
+       int result = 0;
+
+       if (r0->lo_attr_valid) {
+               *lov_attr = attr;
+               return 0;
+       }
+
+       memset(lvb, 0, sizeof(*lvb));
+
+       /* XXX: timestamps can be negative by sanity:test_39m,
+        * how can it be? */
+       lvb->lvb_atime = LLONG_MIN;
+       lvb->lvb_ctime = LLONG_MIN;
+       lvb->lvb_mtime = LLONG_MIN;
+
+       /*
+        * XXX that should be replaced with a loop over sub-objects,
+        * doing cl_object_attr_get() on them. But for now, let's
+        * reuse old lov code.
+        */
+
+       /*
+        * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
+        * happy. It's not needed, because new code uses
+        * ->coh_attr_guard spin-lock to protect consistency of
+        * sub-object attributes.
+        */
+       lov_stripe_lock(lsm);
+       result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
+       lov_stripe_unlock(lsm);
+       if (result == 0) {
+               cl_lvb2attr(attr, lvb);
+               attr->cat_kms = kms;
+               r0->lo_attr_valid = 1;
+               *lov_attr = attr;
+       }
+
+       return result;
+}
+
+static struct lov_comp_layout_entry_ops raid0_ops = {
+       .lco_init      = lov_init_raid0,
+       .lco_fini      = lov_fini_raid0,
+       .lco_getattr   = lov_attr_get_raid0,
+};
+
+static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov,
+                           unsigned int index, struct lov_layout_entry *lle,
+                           struct cl_attr **lov_attr)
+{
+       struct lov_layout_dom *dom = &lle->lle_dom;
+       struct lov_oinfo *loi = dom->lo_loi;
+       struct cl_attr *attr = &dom->lo_dom_r0.lo_attr;
+
+       if (dom->lo_dom_r0.lo_attr_valid) {
+               *lov_attr = attr;
+               return 0;
+       }
+
+       if (OST_LVB_IS_ERR(loi->loi_lvb.lvb_blocks))
+               return OST_LVB_GET_ERR(loi->loi_lvb.lvb_blocks);
+
+       cl_lvb2attr(attr, &loi->loi_lvb);
+       attr->cat_kms = attr->cat_size > loi->loi_kms ? attr->cat_size :
+                                                       loi->loi_kms;
+       dom->lo_dom_r0.lo_attr_valid = 1;
+       *lov_attr = attr;
+
+       return 0;
+}
+
+/**
+ * Lookup FLD to get MDS index of the given DOM object FID.
+ *
+ * \param[in]  ld      LOV device
+ * \param[in]  fid     FID to lookup
+ * \param[out] nr      index in MDC array to return back
+ *
+ * \retval             0 and \a mds filled with MDS index if successful
+ * \retval             negative value on error
+ */
+static int lov_fld_lookup(struct lov_device *ld, const struct lu_fid *fid,
+                         __u32 *nr)
+{
+       __u32 mds_idx;
+       int i, rc;
+
+       ENTRY;
+
+       rc = fld_client_lookup(&ld->ld_lmv->u.lmv.lmv_fld, fid_seq(fid),
+                              &mds_idx, LU_SEQ_RANGE_MDT, NULL);
+       if (rc) {
+               CERROR("%s: error while looking for mds number. Seq %#llx"
+                      ", err = %d\n", lu_dev_name(cl2lu_dev(&ld->ld_cl)),
+                      fid_seq(fid), rc);
+               RETURN(rc);
+       }
+
+       CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n",
+              mds_idx, PFID(fid));
+
+       /* find proper MDC device in the array */
+       for (i = 0; i < ld->ld_md_tgts_nr; i++) {
+               if (ld->ld_md_tgts[i].ldm_mdc != NULL &&
+                   ld->ld_md_tgts[i].ldm_idx == mds_idx)
+                       break;
+       }
+
+       if (i == ld->ld_md_tgts_nr) {
+               CERROR("%s: cannot find corresponding MDC device for mds #%x "
+                      "for fid="DFID"\n", lu_dev_name(cl2lu_dev(&ld->ld_cl)),
+                      mds_idx, PFID(fid));
+               rc = -EINVAL;
+       } else {
+               *nr = i;
+       }
+       RETURN(rc);
+}
+
+/**
+ * Implementation of lov_comp_layout_entry_ops::lco_init for DOM object.
+ *
+ * Init the DOM object for the first time. It prepares also RAID0 entry
+ * for it to use in common methods with ordinary RAID0 layout entries.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dev      LOV device
+ * \param[in] lov      LOV object
+ * \param[in] index    Composite layout entry index in LSM
+ * \param[in] lle      Composite LOV layout entry
+ */
+static int lov_init_dom(const struct lu_env *env, struct lov_device *dev,
+                       struct lov_object *lov, unsigned int index,
+                       const struct cl_object_conf *conf,
+                       struct lov_layout_entry *lle)
+{
+       struct lov_thread_info *lti = lov_env_info(env);
+       struct lov_stripe_md_entry *lsme = lov_lse(lov, index);
+       struct cl_object *clo;
+       struct lu_object *o = lov2lu(lov);
+       const struct lu_fid *fid = lu_object_fid(o);
+       struct cl_device *mdcdev;
+       struct lov_oinfo *loi = NULL;
+       struct cl_object_conf *sconf = &lti->lti_stripe_conf;
+       struct inode *inode = conf->coc_inode;
+
+       int rc;
+       __u32 idx = 0;
+
+       ENTRY;
+
+       LASSERT(index == 0);
+
+       /* find proper MDS device */
+       rc = lov_fld_lookup(dev, fid, &idx);
+       if (rc)
+               RETURN(rc);
+
+       LASSERTF(dev->ld_md_tgts[idx].ldm_mdc != NULL,
+                "LOV md target[%u] is NULL\n", idx);
+
+       /* check lsm is DOM, more checks are needed */
+       LASSERT(lsme->lsme_stripe_count == 0);
+
+       /*
+        * Create lower cl_objects.
+        */
+       mdcdev = dev->ld_md_tgts[idx].ldm_mdc;
+
+       LASSERTF(mdcdev != NULL, "non-initialized mdc subdev\n");
+
+       /* DoM object has no oinfo in LSM entry, create it exclusively */
+       OBD_SLAB_ALLOC_PTR_GFP(loi, lov_oinfo_slab, GFP_NOFS);
+       if (loi == NULL)
+               RETURN(-ENOMEM);
+
+       fid_to_ostid(lu_object_fid(lov2lu(lov)), &loi->loi_oi);
+       /* Initialize lvb structure */
+       loi->loi_lvb.lvb_mtime = LTIME_S(inode->i_mtime);
+       loi->loi_lvb.lvb_atime = LTIME_S(inode->i_atime);
+       loi->loi_lvb.lvb_ctime = LTIME_S(inode->i_ctime);
+       loi->loi_lvb.lvb_blocks = inode->i_blocks;
+       loi->loi_lvb.lvb_size = i_size_read(inode);
+       if (loi->loi_lvb.lvb_size > lsme->lsme_stripe_size)
+               loi->loi_lvb.lvb_size = lsme->lsme_stripe_size;
+       loi_kms_set(loi, loi->loi_lvb.lvb_size);
+
+       sconf->u.coc_oinfo = loi;
+again:
+       clo = lov_sub_find(env, mdcdev, fid, sconf);
+       if (IS_ERR(clo))
+               GOTO(out, rc = PTR_ERR(clo));
+
+       rc = lov_init_sub(env, lov, clo, loi, lov_comp_index(index, 0));
+       if (rc == -EAGAIN) /* try again */
+               goto again;
+       else if (rc != 0)
+               GOTO(out, rc);
+
+       lle->lle_dom.lo_dom = cl2lovsub(clo);
+       spin_lock_init(&lle->lle_dom.lo_dom_r0.lo_sub_lock);
+       lle->lle_dom.lo_dom_r0.lo_nr = 1;
+       lle->lle_dom.lo_dom_r0.lo_sub = &lle->lle_dom.lo_dom;
+       lle->lle_dom.lo_loi = loi;
+
+       rc = lov_page_slice_fixup(lov, clo);
+       RETURN(rc);
+
+out:
+       if (loi != NULL)
+               OBD_SLAB_FREE_PTR(loi, lov_oinfo_slab);
+       return rc;
+}
+
+/**
+ * Implementation of lov_layout_operations::llo_fini for DOM object.
+ *
+ * Finish the DOM object and free related memory.
+ *
+ * \param[in] env      execution environment
+ * \param[in] lov      LOV object
+ * \param[in] state    LOV layout state
+ */
+static void lov_fini_dom(const struct lu_env *env,
+                        struct lov_layout_entry *lle)
+{
+       if (lle->lle_dom.lo_dom != NULL)
+               lle->lle_dom.lo_dom = NULL;
+       if (lle->lle_dom.lo_loi != NULL)
+               OBD_SLAB_FREE_PTR(lle->lle_dom.lo_loi, lov_oinfo_slab);
+}
+
+static struct lov_comp_layout_entry_ops dom_ops = {
+       .lco_init = lov_init_dom,
+       .lco_fini = lov_fini_dom,
+       .lco_getattr = lov_attr_get_dom,
+};
+
+static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
+                             struct lov_object *lov, struct lov_stripe_md *lsm,
+                             const struct cl_object_conf *conf,
+                             union lov_layout_state *state)
+{
+       struct lov_layout_composite *comp = &state->composite;
+       struct lov_layout_entry *lle;
+       unsigned int entry_count;
+       unsigned int psz = 0;
+       int result = 0;
+       int i;
+
+       ENTRY;
+
+       LASSERT(lsm->lsm_entry_count > 0);
+       LASSERT(lov->lo_lsm == NULL);
+       lov->lo_lsm = lsm_addref(lsm);
+       lov->lo_layout_invalid = true;
+
+       entry_count = lsm->lsm_entry_count;
+       comp->lo_entry_count = entry_count;
+
+       OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
+       if (comp->lo_entries == NULL)
+               RETURN(-ENOMEM);
+
+       /* Initiate all entry types and extents data at first */
+       for (i = 0; i < entry_count; i++) {
+               lle = &comp->lo_entries[i];
+
+               lle->lle_type = lov_entry_type(lsm->lsm_entries[i]);
+               switch (lle->lle_type) {
+               case LOV_PATTERN_RAID0:
+                       lle->lle_comp_ops = &raid0_ops;
+                       break;
+               case LOV_PATTERN_MDT:
+                       lle->lle_comp_ops = &dom_ops;
+                       break;
+               default:
+                       CERROR("%s: unknown composite layout entry type %i\n",
+                              lov2obd(dev->ld_lov)->obd_name,
+                              lsm->lsm_entries[i]->lsme_pattern);
+                       dump_lsm(D_ERROR, lsm);
+                       RETURN(-EIO);
+               }
+               lle->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+       }
+
+       i = 0;
+       lov_foreach_layout_entry(lov, lle) {
+               /**
+                * If the component has not been init-ed on MDS side, for
+                * PFL layout, we'd know that the components beyond this one
+                * will be dynamically init-ed later on file write/trunc ops.
+                */
+               if (lsm_entry_inited(lsm, i)) {
+                       result = lle->lle_comp_ops->lco_init(env, dev, lov, i,
+                                                            conf, lle);
+                       if (result < 0)
+                               break;
+
+                       LASSERT(ergo(psz > 0, psz == result));
+                       psz = result;
+               }
+               i++;
+       }
+       if (psz > 0)
+               cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+
+       return result > 0 ? 0 : result;
+}
+
+static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
+                         struct lov_object *lov, struct lov_stripe_md *lsm,
+                         const struct cl_object_conf *conf,
+                         union lov_layout_state *state)
+{
+       return 0;
+}
+
+static int lov_init_released(const struct lu_env *env,
+                            struct lov_device *dev, struct lov_object *lov,
+                            struct lov_stripe_md *lsm,
+                            const struct cl_object_conf *conf,
+                            union lov_layout_state *state)
+{
+       LASSERT(lsm != NULL);
+       LASSERT(lsm->lsm_is_released);
+       LASSERT(lov->lo_lsm == NULL);
+
+       lov->lo_lsm = lsm_addref(lsm);
+       return 0;
+}
+
+static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
+                           union lov_layout_state *state)
+{
+       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
+
+       lov_layout_wait(env, lov);
+       return 0;
+}
+
 static int lov_delete_composite(const struct lu_env *env,
                                struct lov_object *lov,
                                union lov_layout_state *state)
@@ -470,7 +741,7 @@ static int lov_delete_composite(const struct lu_env *env,
        lov_layout_wait(env, lov);
        if (comp->lo_entries)
                lov_foreach_layout_entry(lov, entry)
-                       lov_delete_raid0(env, lov, &entry->lle_raid0);
+                       lov_delete_raid0(env, lov, entry);
 
        RETURN(0);
 }
@@ -481,15 +752,6 @@ static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
        LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 }
 
-static void lov_fini_raid0(const struct lu_env *env,
-                          struct lov_layout_raid0 *r0)
-{
-       if (r0->lo_sub != NULL) {
-               OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
-               r0->lo_sub = NULL;
-       }
-}
-
 static void lov_fini_composite(const struct lu_env *env,
                               struct lov_object *lov,
                               union lov_layout_state *state)
@@ -501,7 +763,7 @@ static void lov_fini_composite(const struct lu_env *env,
                struct lov_layout_entry *entry;
 
                lov_foreach_layout_entry(lov, entry)
-                       lov_fini_raid0(env, &entry->lle_raid0);
+                       entry->lle_comp_ops->lco_fini(env, entry);
 
                OBD_FREE(comp->lo_entries,
                         comp->lo_entry_count * sizeof(*comp->lo_entries));
@@ -530,24 +792,6 @@ static int lov_print_empty(const struct lu_env *env, void *cookie,
         return 0;
 }
 
-static int lov_print_raid0(const struct lu_env *env, void *cookie,
-                          lu_printer_t p, struct lov_layout_raid0 *r0)
-{
-       int i;
-
-       for (i = 0; i < r0->lo_nr; ++i) {
-               struct lu_object *sub;
-
-               if (r0->lo_sub[i] != NULL) {
-                       sub = lovsub2lu(r0->lo_sub[i]);
-                       lu_object_print(env, cookie, p, sub);
-               } else {
-                       (*p)(env, cookie, "sub %d absent\n", i);
-               }
-       }
-       return 0;
-}
-
 static int lov_print_composite(const struct lu_env *env, void *cookie,
                               lu_printer_t p, const struct lu_object *o)
 {
@@ -563,12 +807,15 @@ static int lov_print_composite(const struct lu_env *env, void *cookie,
 
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+               struct lov_layout_entry *lle = lov_entry(lov, i);
 
-               (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %#x, %u, %u }\n",
+               (*p)(env, cookie,
+                    DEXT ": { 0x%08X, %u, %#x, %u, %#x, %u, %u }\n",
                     PEXT(&lse->lsme_extent), lse->lsme_magic,
-                    lse->lsme_id, lse->lsme_layout_gen, lse->lsme_flags,
-                    lse->lsme_stripe_count, lse->lsme_stripe_size);
-               lov_print_raid0(env, cookie, p, lov_r0(lov, i));
+                    lse->lsme_id, lse->lsme_pattern, lse->lsme_layout_gen,
+                    lse->lsme_flags, lse->lsme_stripe_count,
+                    lse->lsme_stripe_size);
+               lov_print_raid0(env, cookie, p, lle);
        }
 
        return 0;
@@ -602,51 +849,6 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
         return 0;
 }
 
-static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
-                             unsigned int index, struct lov_layout_raid0 *r0)
-
-{
-       struct lov_stripe_md *lsm = lov->lo_lsm;
-       struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
-       struct cl_attr *attr = &r0->lo_attr;
-       __u64 kms = 0;
-       int result = 0;
-
-       if (r0->lo_attr_valid)
-               return 0;
-
-       memset(lvb, 0, sizeof(*lvb));
-
-       /* XXX: timestamps can be negative by sanity:test_39m,
-        * how can it be? */
-       lvb->lvb_atime = LLONG_MIN;
-       lvb->lvb_ctime = LLONG_MIN;
-       lvb->lvb_mtime = LLONG_MIN;
-
-       /*
-        * XXX that should be replaced with a loop over sub-objects,
-        * doing cl_object_attr_get() on them. But for now, let's
-        * reuse old lov code.
-        */
-
-       /*
-        * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
-        * happy. It's not needed, because new code uses
-        * ->coh_attr_guard spin-lock to protect consistency of
-        * sub-object attributes.
-        */
-       lov_stripe_lock(lsm);
-       result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
-       lov_stripe_unlock(lsm);
-       if (result == 0) {
-               cl_lvb2attr(attr, lvb);
-               attr->cat_kms = kms;
-               r0->lo_attr_valid = 1;
-       }
-
-       return result;
-}
-
 static int lov_attr_get_composite(const struct lu_env *env,
                                  struct cl_object *obj,
                                  struct cl_attr *attr)
@@ -661,19 +863,22 @@ static int lov_attr_get_composite(const struct lu_env *env,
        attr->cat_size = 0;
        attr->cat_blocks = 0;
        lov_foreach_layout_entry(lov, entry) {
-               struct lov_layout_raid0 *r0 = &entry->lle_raid0;
-               struct cl_attr *lov_attr = &r0->lo_attr;
+               struct cl_attr *lov_attr = NULL;
 
                /* PFL: This component has not been init-ed. */
                if (!lsm_entry_inited(lov->lo_lsm, index))
                        break;
 
-               result = lov_attr_get_raid0(env, lov, index, r0);
-               if (result != 0)
-                       break;
+               result = entry->lle_comp_ops->lco_getattr(env, lov, index,
+                                                         entry, &lov_attr);
+               if (result < 0)
+                       RETURN(result);
 
                index++;
 
+               if (lov_attr == NULL)
+                       continue;
+
                /* merge results */
                attr->cat_blocks += lov_attr->cat_blocks;
                if (attr->cat_size < lov_attr->cat_size)
@@ -687,28 +892,28 @@ static int lov_attr_get_composite(const struct lu_env *env,
                if (attr->cat_mtime < lov_attr->cat_mtime)
                        attr->cat_mtime = lov_attr->cat_mtime;
        }
-       RETURN(result);
+       RETURN(0);
 }
 
 const static struct lov_layout_operations lov_dispatch[] = {
-        [LLT_EMPTY] = {
-                .llo_init      = lov_init_empty,
-                .llo_delete    = lov_delete_empty,
-                .llo_fini      = lov_fini_empty,
-                .llo_print     = lov_print_empty,
-                .llo_page_init = lov_page_init_empty,
-                .llo_lock_init = lov_lock_init_empty,
-                .llo_io_init   = lov_io_init_empty,
+       [LLT_EMPTY] = {
+               .llo_init      = lov_init_empty,
+               .llo_delete    = lov_delete_empty,
+               .llo_fini      = lov_fini_empty,
+               .llo_print     = lov_print_empty,
+               .llo_page_init = lov_page_init_empty,
+               .llo_lock_init = lov_lock_init_empty,
+               .llo_io_init   = lov_io_init_empty,
                .llo_getattr   = lov_attr_get_empty,
-        },
-        [LLT_RELEASED] = {
-                .llo_init      = lov_init_released,
-                .llo_delete    = lov_delete_empty,
-                .llo_fini      = lov_fini_released,
-                .llo_print     = lov_print_released,
-                .llo_page_init = lov_page_init_empty,
-                .llo_lock_init = lov_lock_init_empty,
-                .llo_io_init   = lov_io_init_released,
+       },
+       [LLT_RELEASED] = {
+               .llo_init      = lov_init_released,
+               .llo_delete    = lov_delete_empty,
+               .llo_fini      = lov_fini_released,
+               .llo_print     = lov_print_released,
+               .llo_page_init = lov_page_init_empty,
+               .llo_lock_init = lov_lock_init_empty,
+               .llo_io_init   = lov_io_init_released,
                .llo_getattr   = lov_attr_get_empty,
        },
        [LLT_COMP] = {
@@ -1253,6 +1458,43 @@ struct fiemap_state {
        bool                    fs_enough;
 };
 
+static struct cl_object *lov_find_subobj(const struct lu_env *env,
+                                        struct lov_object *lov,
+                                        struct lov_stripe_md *lsm,
+                                        int index)
+{
+       struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
+       struct lov_thread_info  *lti = lov_env_info(env);
+       struct lu_fid           *ofid = &lti->lti_fid;
+       struct lov_oinfo        *oinfo;
+       struct cl_device        *subdev;
+       int                     entry = lov_comp_entry(index);
+       int                     stripe = lov_comp_stripe(index);
+       int                     ost_idx;
+       int                     rc;
+       struct cl_object        *result;
+
+       if (lov->lo_type != LLT_COMP)
+               GOTO(out, result = NULL);
+
+       if (entry >= lsm->lsm_entry_count ||
+           stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
+               GOTO(out, result = NULL);
+
+       oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
+       ost_idx = oinfo->loi_ost_idx;
+       rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
+       if (rc != 0)
+               GOTO(out, result = NULL);
+
+       subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+       result = lov_sub_find(env, subdev, ofid, NULL);
+out:
+       if (result == NULL)
+               result = ERR_PTR(-EINVAL);
+       return result;
+}
+
 int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
                      struct lov_stripe_md *lsm, struct fiemap *fiemap,
                      size_t *buflen, struct ll_fiemap_info_key *fmkey,
@@ -1466,6 +1708,10 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                        GOTO(out_lsm, rc = -ENOTSUPP);
        }
 
+       /* No support for DOM layout yet. */
+       if (lsme_is_dom(lsm->lsm_entries[0]))
+               GOTO(out_lsm, rc = -ENOTSUPP);
+
        if (lsm->lsm_is_released) {
                if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
                        /**
index 3ff0a38..c5ba4eb 100644 (file)
@@ -44,6 +44,9 @@ static loff_t stripe_width(struct lov_stripe_md *lsm, unsigned int index)
 
        LASSERT(index < lsm->lsm_entry_count);
 
+       if (lsme_is_dom(entry))
+               return (loff_t)entry->lsme_stripe_size;
+
        return (loff_t)entry->lsme_stripe_size * entry->lsme_stripe_count;
 }
 
@@ -141,12 +144,12 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off,
        loff_t stripe_off;
        loff_t this_stripe;
        loff_t swidth;
-        int ret = 0;
+       int ret = 0;
 
-        if (lov_off == OBD_OBJECT_EOF) {
-                *obdoff = OBD_OBJECT_EOF;
-                return 0;
-        }
+       if (lov_off == OBD_OBJECT_EOF) {
+               *obdoff = OBD_OBJECT_EOF;
+               return 0;
+       }
 
        swidth = stripe_width(lsm, index);
 
@@ -197,8 +200,8 @@ loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size,
        loff_t this_stripe;
        loff_t swidth;
 
-        if (file_size == OBD_OBJECT_EOF)
-                return OBD_OBJECT_EOF;
+       if (file_size == OBD_OBJECT_EOF)
+               return OBD_OBJECT_EOF;
 
        swidth = stripe_width(lsm, index);
 
index 793ebc6..718d6ff 100644 (file)
@@ -2264,7 +2264,12 @@ static int mdc_set_info_async(const struct lu_env *env,
                RETURN(0);
        }
 
-       CERROR("Unknown key %s\n", (char *)key);
+       /* TODO: these OSC-related keys are ignored for now */
+       if (KEY_IS(KEY_CHECKSUM) || KEY_IS(KEY_CACHE_SET) ||
+           KEY_IS(KEY_CACHE_LRU_SHRINK) || KEY_IS(KEY_GRANT_SHRINK))
+               RETURN(0);
+
+       CERROR("%s: Unknown key %s\n", exp->exp_obd->obd_name, (char *)key);
        RETURN(-EINVAL);
 }
 
index a583d6d..135e78b 100644 (file)
@@ -1910,7 +1910,8 @@ static int mgs_steal_client_llog_handler(const struct lu_env *env,
         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
                 RETURN(rc);
 
-        if (lcfg->lcfg_command == LCFG_ADD_MDC) {
+       if (lcfg->lcfg_command == LCFG_ADD_MDC &&
+           strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
                 int index;
 
                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
@@ -2546,17 +2547,14 @@ static int mgs_write_log_mdt(const struct lu_env *env,
        if (rc)
                GOTO(out_free, rc);
 
-       rc = record_marker(env, llh, fsdb, CM_START, cliname,
-                          "mount opts");
+       rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
        if (rc)
                GOTO(out_end, rc);
        rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
                              fsdb->fsdb_clilmv);
        if (rc)
                GOTO(out_end, rc);
-       rc = record_marker(env, llh, fsdb, CM_END, cliname,
-                          "mount opts");
-
+       rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
        if (rc)
                GOTO(out_end, rc);
 
index bcb9375..84987d3 100644 (file)
@@ -1295,7 +1295,6 @@ int class_process_config(struct lustre_cfg *lcfg)
 
                 GOTO(out, err = -EINVAL);
         }
-
        switch(lcfg->lcfg_command) {
        case LCFG_SETUP: {
                err = class_setup(obd, lcfg);
@@ -1335,12 +1334,47 @@ int class_process_config(struct lustre_cfg *lcfg)
                 err = obd_pool_del(obd, lustre_cfg_string(lcfg, 2));
                 GOTO(out, err = 0);
         }
-        default: {
-                err = obd_process_config(obd, sizeof(*lcfg), lcfg);
-                GOTO(out, err);
+       /* Process config log ADD_MDC record twice to add MDC also to LOV
+        * for Data-on-MDT:
+        *
+        * add 0:lustre-clilmv 1:lustre-MDT0000_UUID 2:0 3:1
+        *     4:lustre-MDT0000-mdc_UUID
+        */
+       case LCFG_ADD_MDC: {
+               struct obd_device *lov_obd;
+               char *clilmv;
+
+               err = obd_process_config(obd, sizeof(*lcfg), lcfg);
+               if (err)
+                       GOTO(out, err);
+
+               /* make sure this is client LMV log entry */
+               clilmv = strstr(lustre_cfg_string(lcfg, 0), "clilmv");
+               if (!clilmv)
+                       GOTO(out, err);
+
+               /* replace 'lmv' with 'lov' name to address LOV device and
+                * process llog record to add MDC there. */
+               clilmv[4] = 'o';
+               lov_obd = class_name2obd(lustre_cfg_string(lcfg, 0));
+               if (lov_obd == NULL) {
+                       err = -ENOENT;
+                       CERROR("%s: Cannot find LOV by %s name, rc = %d\n",
+                              obd->obd_name, lustre_cfg_string(lcfg, 0), err);
+               } else {
+                       err = obd_process_config(lov_obd, sizeof(*lcfg), lcfg);
+               }
+               /* restore 'lmv' name */
+               clilmv[4] = 'm';
+               GOTO(out, err);
+       }
+       default: {
+               err = obd_process_config(obd, sizeof(*lcfg), lcfg);
+               GOTO(out, err);
 
         }
         }
+       EXIT;
 out:
         if ((err < 0) && !(lcfg->lcfg_command & LCFG_REQUIRED)) {
                 CWARN("Ignoring error %d on optional command %#x\n", err,
index 1eb0078..f5bd241 100644 (file)
@@ -1672,8 +1672,8 @@ void lustre_assert_wire_constants(void)
                (unsigned)LOV_PATTERN_RAID0);
        LASSERTF(LOV_PATTERN_RAID1 == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LOV_PATTERN_RAID1);
-       LASSERTF(LOV_PATTERN_FIRST == 0x00000100UL, "found 0x%.8xUL\n",
-               (unsigned)LOV_PATTERN_FIRST);
+       LASSERTF(LOV_PATTERN_MDT == 0x00000100UL, "found 0x%.8xUL\n",
+               (unsigned)LOV_PATTERN_MDT);
        LASSERTF(LOV_PATTERN_CMOBD == 0x00000200UL, "found 0x%.8xUL\n",
                (unsigned)LOV_PATTERN_CMOBD);
 
index dc3e115..2abe262 100644 (file)
@@ -765,7 +765,7 @@ check_lov_mds_md_v3(void)
 
        CHECK_VALUE_X(LOV_PATTERN_RAID0);
        CHECK_VALUE_X(LOV_PATTERN_RAID1);
-       CHECK_VALUE_X(LOV_PATTERN_FIRST);
+       CHECK_VALUE_X(LOV_PATTERN_MDT);
        CHECK_VALUE_X(LOV_PATTERN_CMOBD);
 }
 
index 3851c24..482d3d3 100644 (file)
@@ -1690,8 +1690,8 @@ void lustre_assert_wire_constants(void)
                (unsigned)LOV_PATTERN_RAID0);
        LASSERTF(LOV_PATTERN_RAID1 == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LOV_PATTERN_RAID1);
-       LASSERTF(LOV_PATTERN_FIRST == 0x00000100UL, "found 0x%.8xUL\n",
-               (unsigned)LOV_PATTERN_FIRST);
+       LASSERTF(LOV_PATTERN_MDT == 0x00000100UL, "found 0x%.8xUL\n",
+               (unsigned)LOV_PATTERN_MDT);
        LASSERTF(LOV_PATTERN_CMOBD == 0x00000200UL, "found 0x%.8xUL\n",
                (unsigned)LOV_PATTERN_CMOBD);