Whamcloud - gitweb
LU-1445 lod: Add FLD lookup to LOD
authorwangdi <di.wang@whamcloud.com>
Thu, 26 Sep 2013 01:35:37 +0000 (18:35 -0700)
committerOleg Drokin <green@whamcloud.com>
Sun, 20 Jan 2013 00:13:21 +0000 (19:13 -0500)
1. Adding FLD in LOD can make it getting the OST index
of the real FID when setting stripEA(l_ost_index) to
the file. Then the client can still use l_ost_index
in stripeEA to locate OST, by which the old client can
still access OST after FID_on_OST is landed.

2. Init sequence client for OSP.

Signed-off-by: Wang Di <di.wang@intel.com>
Change-Id: I559edfd02feb08f32156c7237ec75fe6dde681e3
Reviewed-on: http://review.whamcloud.com/5047
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/fid/fid_handler.c
lustre/include/lustre_fid.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/lmv/lmv_obd.c
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/osp/osp_dev.c

index 295c685..a007fef 100644 (file)
 #include <lustre_fid.h>
 #include "fid_internal.h"
 
 #include <lustre_fid.h>
 #include "fid_internal.h"
 
-int client_fid_init(struct obd_export *exp, enum lu_cli_type type)
+int client_fid_init(struct obd_device *obd,
+                   struct obd_export *exp, enum lu_cli_type type)
 {
 {
-       struct client_obd *cli = &exp->exp_obd->u.cli;
+       struct client_obd *cli = &obd->u.cli;
        char *prefix;
        int rc;
        ENTRY;
        char *prefix;
        int rc;
        ENTRY;
@@ -73,8 +74,7 @@ int client_fid_init(struct obd_export *exp, enum lu_cli_type type)
        if (prefix == NULL)
                GOTO(out_free_seq, rc = -ENOMEM);
 
        if (prefix == NULL)
                GOTO(out_free_seq, rc = -ENOMEM);
 
-       snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s",
-                exp->exp_obd->obd_name);
+       snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name);
 
        /* Init client side sequence-manager */
        rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
 
        /* Init client side sequence-manager */
        rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
@@ -90,9 +90,9 @@ out_free_seq:
 }
 EXPORT_SYMBOL(client_fid_init);
 
 }
 EXPORT_SYMBOL(client_fid_init);
 
-int client_fid_fini(struct obd_export *exp)
+int client_fid_fini(struct obd_device *obd)
 {
 {
-       struct client_obd *cli = &exp->exp_obd->u.cli;
+       struct client_obd *cli = &obd->u.cli;
        ENTRY;
 
        if (cli->cl_seq != NULL) {
        ENTRY;
 
        if (cli->cl_seq != NULL) {
index 08bf3de..ee4c3c6 100644 (file)
@@ -434,8 +434,9 @@ int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss);
 int fid_is_local(const struct lu_env *env,
                  struct lu_site *site, const struct lu_fid *fid);
 
 int fid_is_local(const struct lu_env *env,
                  struct lu_site *site, const struct lu_fid *fid);
 
-int client_fid_init(struct obd_export *exp, enum lu_cli_type type);
-int client_fid_fini(struct obd_export *exp);
+int client_fid_init(struct obd_device *obd, struct obd_export *exp,
+                   enum lu_cli_type type);
+int client_fid_fini(struct obd_device *obd);
 
 /* fid locking */
 
 
 /* fid locking */
 
index 7941a83..2b21509 100644 (file)
@@ -1327,9 +1327,10 @@ struct obd_ops {
                            void *localdata);
         int (*o_disconnect)(struct obd_export *exp);
 
                            void *localdata);
         int (*o_disconnect)(struct obd_export *exp);
 
-        /* Initialize/finalize fids infrastructure. */
-       int (*o_fid_init)(struct obd_export *exp, enum lu_cli_type type);
-        int (*o_fid_fini)(struct obd_export *exp);
+       /* Initialize/finalize fids infrastructure. */
+       int (*o_fid_init)(struct obd_device *obd,
+                         struct obd_export *exp, enum lu_cli_type type);
+       int (*o_fid_fini)(struct obd_device *obd);
 
         /* Allocate new fid according to passed @hint. */
         int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
 
         /* Allocate new fid according to passed @hint. */
         int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
index 5b3ea31..b8b911f 100644 (file)
@@ -1041,28 +1041,29 @@ static inline int obd_disconnect(struct obd_export *exp)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static inline int obd_fid_init(struct obd_export *exp, enum lu_cli_type type)
+static inline int obd_fid_init(struct obd_device *obd, struct obd_export *exp,
+                              enum lu_cli_type type)
 {
 {
-        int rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
 
-        OBD_CHECK_DT_OP(exp->exp_obd, fid_init, 0);
-        EXP_COUNTER_INCREMENT(exp, fid_init);
+       OBD_CHECK_DT_OP(obd, fid_init, 0);
+       OBD_COUNTER_INCREMENT(obd, fid_init);
 
 
-       rc = OBP(exp->exp_obd, fid_init)(exp, type);
+       rc = OBP(obd, fid_init)(obd, exp, type);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static inline int obd_fid_fini(struct obd_export *exp)
+static inline int obd_fid_fini(struct obd_device *obd)
 {
 {
-        int rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
 
-        OBD_CHECK_DT_OP(exp->exp_obd, fid_fini, 0);
-        EXP_COUNTER_INCREMENT(exp, fid_fini);
+       OBD_CHECK_DT_OP(obd, fid_fini, 0);
+       OBD_COUNTER_INCREMENT(obd, fid_fini);
 
 
-        rc = OBP(exp->exp_obd, fid_fini)(exp);
-        RETURN(rc);
+       rc = OBP(obd, fid_fini)(obd);
+       RETURN(rc);
 }
 
 static inline int obd_fid_alloc(struct obd_export *exp,
 }
 
 static inline int obd_fid_alloc(struct obd_export *exp,
index fec3b2e..0b0790f 100644 (file)
@@ -406,12 +406,12 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
-        /*
-         * Init fid sequence client for this mdc and add new fld target.
-         */
-       rc = obd_fid_init(mdc_exp, LUSTRE_SEQ_METADATA);
-        if (rc)
-                RETURN(rc);
+       /*
+        * Init fid sequence client for this mdc and add new fld target.
+        */
+       rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
+       if (rc)
+               RETURN(rc);
 
         target.ft_srv = NULL;
         target.ft_exp = mdc_exp;
 
         target.ft_srv = NULL;
         target.ft_exp = mdc_exp;
@@ -634,9 +634,9 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 }
         }
 #endif
                 }
         }
 #endif
-        rc = obd_fid_fini(tgt->ltd_exp);
-        if (rc)
-                CERROR("Can't finanize fids factory\n");
+       rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
+       if (rc)
+               CERROR("Can't finanize fids factory\n");
 
         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
                tgt->ltd_exp->exp_obd->obd_name,
 
         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
                tgt->ltd_exp->exp_obd->obd_name,
index 03ae13e..9bf10d8 100644 (file)
 
 #include "lod_internal.h"
 
 
 #include "lod_internal.h"
 
+/**
+ * Lookup MDT/OST index \a tgt by FID \a fid.
+ *
+ * \param lod LOD to be lookup at.
+ * \param fid FID of object to find MDT/OST.
+ * \param tgt MDT/OST index to return.
+ * \param flags indidcate the FID is on MDS or OST.
+ **/
+int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
+                  const struct lu_fid *fid, __u32 *tgt, int flags)
+{
+       struct lu_seq_range     range;
+       struct lu_server_fld    *server_fld;
+       int rc = 0;
+       ENTRY;
+
+       LASSERTF(fid_is_sane(fid), "Invalid FID "DFID"\n", PFID(fid));
+       if (fid_is_idif(fid)) {
+               *tgt = fid_idif_ost_idx(fid);
+               RETURN(rc);
+       }
+
+       if (!lod->lod_initialized || !fid_is_norm(fid)) {
+               LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
+               *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+               RETURN(rc);
+       }
+
+       server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
+       range.lsr_flags = flags;
+       rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
+       if (rc) {
+               CERROR("%s: Can't find tgt by seq "LPX64", rc %d\n",
+                      lod2obd(lod)->obd_name, fid_seq(fid), rc);
+               RETURN(rc);
+       }
+
+       *tgt = range.lsr_index;
+
+       CDEBUG(D_INFO, "LOD: got tgt %x for sequence: "
+              LPX64"\n", *tgt, fid_seq(fid));
+
+       RETURN(rc);
+}
+
 extern struct lu_object_operations lod_lu_obj_ops;
 extern struct dt_object_operations lod_obj_ops;
 
 extern struct lu_object_operations lod_lu_obj_ops;
 extern struct dt_object_operations lod_obj_ops;
 
@@ -214,6 +259,13 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
        ENTRY;
 
        rc = next->ld_ops->ldo_prepare(env, pdev, next);
        ENTRY;
 
        rc = next->ld_ops->ldo_prepare(env, pdev, next);
+       if (rc != 0) {
+               CERROR("%s: prepare bottom error: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       lod->lod_initialized = 1;
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
index 9027ee0..28d6081 100644 (file)
@@ -77,7 +77,8 @@ struct lod_device {
        cfs_proc_dir_entry_t *lod_proc_entry;
        struct lprocfs_stats *lod_stats;
        int                   lod_connects;
        cfs_proc_dir_entry_t *lod_proc_entry;
        struct lprocfs_stats *lod_stats;
        int                   lod_connects;
-       int                   lod_recovery_completed;
+       unsigned int          lod_recovery_completed:1,
+                             lod_initialized:1;
 
        /* lov settings descriptor storing static information */
        struct lov_desc       lod_desc;
 
        /* lov settings descriptor storing static information */
        struct lov_desc       lod_desc;
@@ -247,6 +248,9 @@ static inline struct lod_thread_info *lod_env_info(const struct lu_env *env)
        if ((__dev)->lod_osts_size > 0) \
                cfs_foreach_bit((__dev)->lod_ost_bitmap, (index))
 
        if ((__dev)->lod_osts_size > 0) \
                cfs_foreach_bit((__dev)->lod_ost_bitmap, (index))
 
+/* lod_dev.c */
+int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
+                  const struct lu_fid *fid, mdsno_t *tgt, int flags);
 /* lod_lov.c */
 void lod_getref(struct lod_device *lod);
 void lod_putref(struct lod_device *lod);
 /* lod_lov.c */
 void lod_getref(struct lod_device *lod);
 void lod_putref(struct lod_device *lod);
index 067a844..72d8204 100644 (file)
@@ -104,6 +104,7 @@ void lod_putref(struct lod_device *lod)
                                CERROR("%s: qos_del_tgt(%s) failed: rc = %d\n",
                                       lod2obd(lod)->obd_name,
                                       obd_uuid2str(&ost_desc->ltd_uuid), rc);
                                CERROR("%s: qos_del_tgt(%s) failed: rc = %d\n",
                                       lod2obd(lod)->obd_name,
                                       obd_uuid2str(&ost_desc->ltd_uuid), rc);
+
                        rc = obd_disconnect(ost_desc->ltd_exp);
                        if (rc)
                                CERROR("%s: failed to disconnect %s: rc = %d\n",
                        rc = obd_disconnect(ost_desc->ltd_exp);
                        if (rc)
                                CERROR("%s: failed to disconnect %s: rc = %d\n",
@@ -449,19 +450,27 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
        }
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
        }
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               const struct lu_fid *fid;
+               const struct lu_fid     *fid;
+               struct lod_device       *lod;
+               __u32                   index;
 
 
+               lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
                LASSERT(lo->ldo_stripe[i]);
                fid = lu_object_fid(&lo->ldo_stripe[i]->do_lu);
 
                rc = fid_ostid_pack(fid, &info->lti_ostid);
                LASSERT(rc == 0);
                LASSERT(lo->ldo_stripe[i]);
                fid = lu_object_fid(&lo->ldo_stripe[i]->do_lu);
 
                rc = fid_ostid_pack(fid, &info->lti_ostid);
                LASSERT(rc == 0);
-               LASSERT(info->lti_ostid.oi_seq == FID_SEQ_OST_MDT0);
 
                objs[i].l_object_id  = cpu_to_le64(info->lti_ostid.oi_id);
                objs[i].l_object_seq = cpu_to_le64(info->lti_ostid.oi_seq);
                objs[i].l_ost_gen    = cpu_to_le32(0);
 
                objs[i].l_object_id  = cpu_to_le64(info->lti_ostid.oi_id);
                objs[i].l_object_seq = cpu_to_le64(info->lti_ostid.oi_seq);
                objs[i].l_ost_gen    = cpu_to_le32(0);
-               objs[i].l_ost_idx    = cpu_to_le32(fid_idif_ost_idx(fid));
+               rc = lod_fld_lookup(env, lod, fid, &index, LU_SEQ_RANGE_OST);
+               if (rc < 0) {
+                       CERROR("%s: Can not locate "DFID": rc = %d\n",
+                              lod2obd(lod)->obd_name, PFID(fid), rc);
+                       RETURN(rc);
+               }
+               objs[i].l_ost_idx = cpu_to_le32(index);
        }
 
        info->lti_buf.lb_buf = lmm;
        }
 
        info->lti_buf.lb_buf = lmm;
@@ -590,13 +599,12 @@ int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo,
        lo->ldo_stripes_allocated = lo->ldo_stripenr;
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
        lo->ldo_stripes_allocated = lo->ldo_stripenr;
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
-
                info->lti_ostid.oi_id = le64_to_cpu(objs[i].l_object_id);
                info->lti_ostid.oi_id = le64_to_cpu(objs[i].l_object_id);
-               /* XXX: support for DNE? */
                info->lti_ostid.oi_seq = le64_to_cpu(objs[i].l_object_seq);
                idx = le64_to_cpu(objs[i].l_ost_idx);
                fid_ostid_unpack(&info->lti_fid, &info->lti_ostid, idx);
                info->lti_ostid.oi_seq = le64_to_cpu(objs[i].l_object_seq);
                idx = le64_to_cpu(objs[i].l_ost_idx);
                fid_ostid_unpack(&info->lti_fid, &info->lti_ostid, idx);
-
+               LASSERTF(fid_is_sane(&info->lti_fid), ""DFID" insane!\n",
+                        PFID(&info->lti_fid));
                /*
                 * XXX: assertion is left for testing, to make
                 * sure we never process requests till configuration
                /*
                 * XXX: assertion is left for testing, to make
                 * sure we never process requests till configuration
index 7d5cb30..7357ec6 100644 (file)
@@ -248,6 +248,8 @@ static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
        /* stop sync thread */
        osp_sync_fini(d);
 
        /* stop sync thread */
        osp_sync_fini(d);
 
+       obd_fid_fini(d->opd_obd);
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -512,6 +514,13 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
        if (rc)
                GOTO(out_precreat, rc);
 
        if (rc)
                GOTO(out_precreat, rc);
 
+       rc = obd_fid_init(m->opd_obd, NULL, LUSTRE_SEQ_DATA);
+       if (rc) {
+               CERROR("%s: fid init error: rc = %d\n",
+                      m->opd_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
        /*
         * Initiate connect to OST
         */
        /*
         * Initiate connect to OST
         */
@@ -663,8 +672,7 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
                RETURN(rc);
 
        *exp = class_conn2export(&conn);
                RETURN(rc);
 
        *exp = class_conn2export(&conn);
-       if (is_osp_on_ost(obd->obd_name))
-               osp->opd_exp = *exp;
+       osp->opd_exp = *exp;
 
        /* Why should there ever be more than 1 connect? */
        osp->opd_connects++;
 
        /* Why should there ever be more than 1 connect? */
        osp->opd_connects++;
@@ -834,6 +842,9 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
                d->opd_imp_seen_connected = 1;
                if (is_osp_on_ost(d->opd_obd->obd_name))
                        break;
                d->opd_imp_seen_connected = 1;
                if (is_osp_on_ost(d->opd_obd->obd_name))
                        break;
+               if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
+                       d->opd_obd->u.cli.cl_seq->lcs_exp =
+                                       class_export_get(d->opd_exp);
                cfs_waitq_signal(&d->opd_pre_waitq);
                __osp_sync_check_for_work(d);
                CDEBUG(D_HA, "got connected\n");
                cfs_waitq_signal(&d->opd_pre_waitq);
                __osp_sync_check_for_work(d);
                CDEBUG(D_HA, "got connected\n");
@@ -970,6 +981,8 @@ static struct obd_ops osp_obd_device_ops = {
        .o_import_event = osp_import_event,
        .o_iocontrol    = osp_iocontrol,
        .o_statfs       = osp_obd_statfs,
        .o_import_event = osp_import_event,
        .o_iocontrol    = osp_iocontrol,
        .o_statfs       = osp_obd_statfs,
+       .o_fid_init     = client_fid_init,
+       .o_fid_fini     = client_fid_fini,
 };
 
 struct llog_operations osp_mds_ost_orig_logops;
 };
 
 struct llog_operations osp_mds_ost_orig_logops;