Whamcloud - gitweb
LU-3716 obdecho: create a separate root object for echo access 30/10130/12
authorJian Yu <jian.yu@intel.com>
Sat, 17 Jan 2015 07:00:47 +0000 (23:00 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 23 Jan 2015 02:01:28 +0000 (02:01 +0000)
Currently, while echo client and normal client are attached at the
same time, both md echo objects and normal objects are created
and looked up under the same root object (ROOT), which will cause
ASSERTION( lu_device_is_mdt(o->lo_dev) ) failure.

This patch fixes the issue by creating a separate root object
(ROOT_ECHO) for echo access. The md echo objects created under
this root object can only be accessed by echo client. Normal client
will never see these echo objects.

Test-Parameters: alwaysuploadlogs envdefinitions=SLOW=yes \
mdtcount=1 testlist=mds-survey

Signed-off-by: Jian Yu <jian.yu@intel.com>
Change-Id: I8d8a9bd2c467bb40a7993d492aa3d4ba6676ac8f
Reviewed-on: http://review.whamcloud.com/10130
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/obdecho/echo_client.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osd-zfs/osd_oi.c

index a58e294..44d08e8 100644 (file)
@@ -484,6 +484,12 @@ enum dot_lustre_oid {
        FID_OID_DOT_LUSTRE_LPF  = 3UL,
 };
 
        FID_OID_DOT_LUSTRE_LPF  = 3UL,
 };
 
+/** OID for FID_SEQ_ROOT */
+enum root_oid {
+       FID_OID_ROOT            = 1UL,
+       FID_OID_ECHO_ROOT       = 2UL,
+};
+
 static inline bool fid_seq_is_mdt0(__u64 seq)
 {
        return seq == FID_SEQ_OST_MDT0;
 static inline bool fid_seq_is_mdt0(__u64 seq)
 {
        return seq == FID_SEQ_OST_MDT0;
@@ -554,7 +560,14 @@ static inline bool fid_is_mdt0(const struct lu_fid *fid)
 static inline void lu_root_fid(struct lu_fid *fid)
 {
        fid->f_seq = FID_SEQ_ROOT;
 static inline void lu_root_fid(struct lu_fid *fid)
 {
        fid->f_seq = FID_SEQ_ROOT;
-       fid->f_oid = 1;
+       fid->f_oid = FID_OID_ROOT;
+       fid->f_ver = 0;
+}
+
+static inline void lu_echo_root_fid(struct lu_fid *fid)
+{
+       fid->f_seq = FID_SEQ_ROOT;
+       fid->f_oid = FID_OID_ECHO_ROOT;
        fid->f_ver = 0;
 }
 
        fid->f_ver = 0;
 }
 
index 92b12e4..ce2e573 100644 (file)
@@ -259,7 +259,7 @@ static inline void lu_local_name_obj_fid(struct lu_fid *fid, __u32 oid)
 static inline int fid_is_root(const struct lu_fid *fid)
 {
        return unlikely((fid_seq(fid) == FID_SEQ_ROOT &&
 static inline int fid_is_root(const struct lu_fid *fid)
 {
        return unlikely((fid_seq(fid) == FID_SEQ_ROOT &&
-                        fid_oid(fid) == 1));
+                        fid_oid(fid) == FID_OID_ROOT));
 }
 
 static inline int fid_is_dot_lustre(const struct lu_fid *fid)
 }
 
 static inline int fid_is_dot_lustre(const struct lu_fid *fid)
index 0ad6050..bc7b834 100644 (file)
  */
 
 struct echo_device {
  */
 
 struct echo_device {
-        struct cl_device        ed_cl;
-        struct echo_client_obd *ed_ec;
-
-        struct cl_site          ed_site_myself;
-        struct cl_site         *ed_site;
-        struct lu_device       *ed_next;
-        int                     ed_next_ismd;
-        struct lu_client_seq   *ed_cl_seq;
+       struct cl_device          ed_cl;
+       struct echo_client_obd   *ed_ec;
+
+       struct cl_site            ed_site_myself;
+       struct cl_site           *ed_site;
+       struct lu_device         *ed_next;
+       int                       ed_next_ismd;
+       struct lu_client_seq     *ed_cl_seq;
+#ifdef HAVE_SERVER_SUPPORT
+       struct local_oid_storage *ed_los;
+       struct lu_fid             ed_root_fid;
+#endif /* HAVE_SERVER_SUPPORT */
 };
 
 struct echo_object {
 };
 
 struct echo_object {
@@ -96,6 +100,23 @@ struct echo_lock {
        atomic_t                el_refcount;
 };
 
        atomic_t                el_refcount;
 };
 
+#ifdef HAVE_SERVER_SUPPORT
+static const char echo_md_root_dir_name[] = "ROOT_ECHO";
+
+/**
+ * In order to use the values of members in struct mdd_device,
+ * we define an alias structure here.
+ */
+struct echo_md_device {
+       struct md_device                 emd_md_dev;
+       struct obd_export               *emd_child_exp;
+       struct dt_device                *emd_child;
+       struct dt_device                *emd_bottom;
+       struct lu_fid                    emd_root_fid;
+       struct lu_fid                    emd_local_root_fid;
+};
+#endif /* HAVE_SERVER_SUPPORT */
+
 static int echo_client_setup(const struct lu_env *env,
                              struct obd_device *obddev,
                              struct lustre_cfg *lcfg);
 static int echo_client_setup(const struct lu_env *env,
                              struct obd_device *obddev,
                              struct lustre_cfg *lcfg);
@@ -160,6 +181,28 @@ struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
         return container_of(c, struct echo_object_conf, eoc_cl);
 }
 
         return container_of(c, struct echo_object_conf, eoc_cl);
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
+{
+       return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
+}
+
+static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
+{
+       return &d->emd_md_dev.md_lu_dev;
+}
+
+static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
+{
+       return emd2lu_dev(d)->ld_site->ld_seq_site;
+}
+
+static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
+{
+       return d->emd_md_dev.md_lu_dev.ld_obd;
+}
+#endif /* HAVE_SERVER_SUPPORT */
+
 /** @} echo_helpers */
 
 static int cl_echo_object_put(struct echo_object *eco);
 /** @} echo_helpers */
 
 static int cl_echo_object_put(struct echo_object *eco);
@@ -685,6 +728,87 @@ static int echo_fid_fini(struct obd_device *obddev)
 
         RETURN(0);
 }
 
         RETURN(0);
 }
+
+static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
+{
+       ENTRY;
+
+       if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
+               local_oid_storage_fini(env, ed->ed_los);
+               ed->ed_los = NULL;
+       }
+}
+
+static int
+echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
+                         struct local_oid_storage *los,
+                         const struct lu_fid *pfid, const char *name,
+                         __u32 mode, struct lu_fid *fid)
+{
+       struct dt_object        *parent = NULL;
+       struct dt_object        *dto = NULL;
+       int                      rc = 0;
+       ENTRY;
+
+       LASSERT(!fid_is_zero(pfid));
+       parent = dt_locate(env, emd->emd_bottom, pfid);
+       if (unlikely(IS_ERR(parent)))
+               RETURN(PTR_ERR(parent));
+
+       /* create local file with @fid */
+       dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
+                                                parent, name, mode);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+
+       *fid = *lu_object_fid(&dto->do_lu);
+       /* since stack is not fully set up the local_storage uses own stack
+        * and we should drop its object from cache */
+       lu_object_put_nocache(env, &dto->do_lu);
+
+       EXIT;
+out_put:
+       lu_object_put(env, &parent->do_lu);
+       RETURN(rc);
+}
+
+static int
+echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
+                struct echo_device *ed)
+{
+       struct lu_fid                    fid;
+       int                              rc = 0;
+       ENTRY;
+
+       /* Setup local dirs */
+       fid.f_seq = FID_SEQ_LOCAL_NAME;
+       fid.f_oid = 1;
+       fid.f_ver = 0;
+       rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
+       if (rc != 0)
+               RETURN(rc);
+
+       lu_echo_root_fid(&fid);
+       if (echo_md_seq_site(emd)->ss_node_id == 0) {
+               rc = echo_md_local_file_create(env, emd, ed->ed_los,
+                                              &emd->emd_local_root_fid,
+                                              echo_md_root_dir_name, S_IFDIR |
+                                              S_IRUGO | S_IWUSR | S_IXUGO,
+                                              &fid);
+               if (rc != 0) {
+                       CERROR("%s: create md echo root fid failed: rc = %d\n",
+                              emd2obd_dev(emd)->obd_name, rc);
+                       GOTO(out_los, rc);
+               }
+       }
+       ed->ed_root_fid = fid;
+
+       RETURN(0);
+out_los:
+       echo_ed_los_fini(env, ed);
+
+       RETURN(rc);
+}
 #endif /* HAVE_SERVER_SUPPORT */
 
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
 #endif /* HAVE_SERVER_SUPPORT */
 
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
@@ -751,10 +875,12 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
 
         if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
 
         if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
-                /* Suppose to connect to some Metadata layer */
-                struct lu_site *ls;
-                struct lu_device *ld;
-                int    found = 0;
+               /* Suppose to connect to some Metadata layer */
+               struct lu_site          *ls = NULL;
+               struct lu_device        *ld = NULL;
+               struct md_device        *md = NULL;
+               struct echo_md_device   *emd = NULL;
+               int                      found = 0;
 
                 if (next == NULL) {
                         CERROR("%s is not lu device type!\n",
 
                 if (next == NULL) {
                         CERROR("%s is not lu device type!\n",
@@ -797,6 +923,15 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                        CERROR("echo fid init error %d\n", rc);
                        GOTO(out, rc);
                }
                        CERROR("echo fid init error %d\n", rc);
                        GOTO(out, rc);
                }
+
+               md = lu2md_dev(next);
+               emd = lu2emd_dev(&md->md_lu_dev);
+               rc = echo_md_root_get(env, emd, ed);
+               if (rc != 0) {
+                       CERROR("%s: get root error: rc = %d\n",
+                               emd2obd_dev(emd)->obd_name, rc);
+                       GOTO(out, rc);
+               }
 #else /* !HAVE_SERVER_SUPPORT */
                CERROR("Local operations are NOT supported on client side. "
                       "Only remote operations are supported. Metadata client "
 #else /* !HAVE_SERVER_SUPPORT */
                CERROR("Local operations are NOT supported on client side. "
                       "Only remote operations are supported. Metadata client "
@@ -926,6 +1061,7 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
        echo_client_cleanup(d->ld_obd);
 #ifdef HAVE_SERVER_SUPPORT
        echo_fid_fini(d->ld_obd);
        echo_client_cleanup(d->ld_obd);
 #ifdef HAVE_SERVER_SUPPORT
        echo_fid_fini(d->ld_obd);
+       echo_ed_los_fini(env, ed);
 #endif
        while (next && !ed->ed_next_ismd)
                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 #endif
        while (next && !ed->ed_next_ismd)
                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
@@ -1820,22 +1956,16 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env,
                                            struct echo_device *ed, char *path,
                                            int path_len)
 {
                                            struct echo_device *ed, char *path,
                                            int path_len)
 {
-        struct lu_device        *ld = ed->ed_next;
-        struct md_device        *md = lu2md_dev(ld);
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_name          *lname = &info->eti_lname;
-        struct lu_object        *parent = NULL;
-        struct lu_object        *child = NULL;
-        int rc = 0;
-        ENTRY;
+       struct lu_device        *ld = ed->ed_next;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_fid           *fid = &info->eti_fid;
+       struct lu_name          *lname = &info->eti_lname;
+       struct lu_object        *parent = NULL;
+       struct lu_object        *child = NULL;
+       int                      rc = 0;
+       ENTRY;
 
 
-        /*Only support MDD layer right now*/
-        rc = md->md_ops->mdo_root_get(env, md, fid);
-        if (rc) {
-                CERROR("get root error: rc = %d\n", rc);
-                RETURN(ERR_PTR(rc));
-        }
+       *fid = ed->ed_root_fid;
 
        /* In the function below, .hs_keycmp resolves to
         * lu_obj_hop_keycmp() */
 
        /* In the function below, .hs_keycmp resolves to
         * lu_obj_hop_keycmp() */
index fdfab11..d31f720 100644 (file)
@@ -1559,7 +1559,7 @@ static const struct osd_lf_map osd_lf_maps[] = {
        { "PENDING", { 0, 0, 0 }, 0, NULL, NULL },
 
        /* ROOT */
        { "PENDING", { 0, 0, 0 }, 0, NULL, NULL },
 
        /* ROOT */
-       { "ROOT", { FID_SEQ_ROOT, 1, 0 },
+       { "ROOT", { FID_SEQ_ROOT, FID_OID_ROOT, 0 },
                OLF_SCAN_SUBITEMS | OLF_HIDE_FID, osd_ios_ROOT_scan, NULL },
 
        /* changelog_catalog */
                OLF_SCAN_SUBITEMS | OLF_HIDE_FID, osd_ios_ROOT_scan, NULL },
 
        /* changelog_catalog */
index 00dfa14..0a663fc 100644 (file)
@@ -735,7 +735,7 @@ int osd_convert_root_to_new_seq(const struct lu_env *env,
 
        /* declare that we'll add object to fid-dnode mapping */
        newfid.f_seq = FID_SEQ_ROOT;
 
        /* declare that we'll add object to fid-dnode mapping */
        newfid.f_seq = FID_SEQ_ROOT;
-       newfid.f_oid = 1;
+       newfid.f_oid = FID_OID_ROOT;
        newfid.f_ver = 0;
        zapid = osd_get_name_n_idx(env, o, &newfid, buf);
        dmu_tx_hold_bonus(tx, zapid);
        newfid.f_ver = 0;
        zapid = osd_get_name_n_idx(env, o, &newfid, buf);
        dmu_tx_hold_bonus(tx, zapid);