Whamcloud - gitweb
b=21379 Even create objects in DELORPHAN process; Choose osts in the pool in create...
authordi wang <di.wang@oracle.com>
Mon, 18 Jan 2010 18:58:47 +0000 (10:58 -0800)
committerRobert Read <robert.read@oracle.com>
Tue, 15 Jun 2010 20:49:05 +0000 (13:49 -0700)
1.precreate the objects even in DELORPHAN process, but instead of allocating these objects, otherwise
  it might create the gap of object ID, which might cause some orphan unlink log being recorded wrongly.
2.Only choose OSTs in the pool if in create remedy process.

i=adilger.dilger
i=robert.read
i=johann

19 files changed:
lustre/include/obd.h
lustre/include/obd_support.h
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_lov.c
lustre/mds/handler.c
lustre/mds/lproc_mds.c
lustre/mds/mds_log.c
lustre/mds/mds_lov.c
lustre/osc/osc_create.c
lustre/quota/lproc_quota.c
lustre/quota/quota_master.c
lustre/tests/ost-pools.sh
lustre/tests/replay-dual.sh
lustre/tests/test-framework.sh
lustre/utils/liblustreapi.c

index ebe7112..1047a40 100644 (file)
@@ -509,10 +509,10 @@ struct mds_obd {
         cfs_dentry_t                    *mds_logs_dir;
         cfs_dentry_t                    *mds_objects_dir;
         struct llog_handle              *mds_cfg_llh;
         cfs_dentry_t                    *mds_logs_dir;
         cfs_dentry_t                    *mds_objects_dir;
         struct llog_handle              *mds_cfg_llh;
-        struct obd_device               *mds_osc_obd; /* XXX lov_obd */
+        struct obd_device               *mds_lov_obd;
         struct obd_uuid                  mds_lov_uuid;
         char                            *mds_profile;
         struct obd_uuid                  mds_lov_uuid;
         char                            *mds_profile;
-        struct obd_export               *mds_osc_exp; /* XXX lov_exp */
+        struct obd_export               *mds_lov_exp;
         struct lov_desc                  mds_lov_desc;
         __u32                            mds_id;
 
         struct lov_desc                  mds_lov_desc;
         __u32                            mds_id;
 
@@ -1134,6 +1134,7 @@ enum obd_cleanup_stage {
 #define KEY_SET_FS              "set_fs"
 /*      KEY_SET_INFO in lustre_idl.h */
 #define KEY_SPTLRPC_CONF        "sptlrpc_conf"
 #define KEY_SET_FS              "set_fs"
 /*      KEY_SET_INFO in lustre_idl.h */
 #define KEY_SPTLRPC_CONF        "sptlrpc_conf"
+#define KEY_CONNECT_FLAG        "connect_flags"
 
 
 struct lu_context;
 
 
 struct lu_context;
index 6dd3f80..c944348 100644 (file)
@@ -228,6 +228,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_OPEN_WAIT_CREATE    0x144
 #define OBD_FAIL_MDS_PDO_LOCK            0x145
 #define OBD_FAIL_MDS_PDO_LOCK2           0x146
 #define OBD_FAIL_MDS_OPEN_WAIT_CREATE    0x144
 #define OBD_FAIL_MDS_PDO_LOCK            0x145
 #define OBD_FAIL_MDS_PDO_LOCK2           0x146
+#define OBD_FAIL_MDS_OSC_CREATE_FAIL     0x147
 
 /* CMD */
 #define OBD_FAIL_MDS_IS_SUBDIR_NET       0x180
 
 /* CMD */
 #define OBD_FAIL_MDS_IS_SUBDIR_NET       0x180
index e7e5255..7fcb7c1 100644 (file)
@@ -200,8 +200,6 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oifo,
                         struct obd_trans_info *oti,
                         struct lov_request_set **reqset);
 int cb_create_update(void *cookie, int rc);
                         struct obd_trans_info *oti,
                         struct lov_request_set **reqset);
 int cb_create_update(void *cookie, int rc);
-int lov_update_create_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc);
 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea);
 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                      obd_count oa_bufs, struct brw_page *pga,
 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea);
 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
                      obd_count oa_bufs, struct brw_page *pga,
index 1206dcd..8ecfd4d 100644 (file)
@@ -2578,6 +2578,19 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
         } else if (KEY_IS(KEY_FIEMAP)) {
                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
                 GOTO(out, rc);
         } else if (KEY_IS(KEY_FIEMAP)) {
                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
                 GOTO(out, rc);
+        } else if (KEY_IS(KEY_CONNECT_FLAG)) {
+                struct lov_tgt_desc *tgt;
+                __u64 ost_idx = *((__u64*)val);
+
+                LASSERT(*vallen == sizeof(__u64));
+                LASSERT(ost_idx < lov->desc.ld_tgt_count);
+                tgt = lov->lov_tgts[ost_idx];
+
+                if (!tgt || !tgt->ltd_exp)
+                        GOTO(out, rc = -ESRCH);
+
+                *((__u64*)val) = tgt->ltd_exp->exp_connect_flags;
+                GOTO(out, rc = 0);
         }
 
         rc = -EINVAL;
         }
 
         rc = -EINVAL;
index e5124be..2262c50 100644 (file)
@@ -494,37 +494,80 @@ void qos_shrink_lsm(struct lov_request_set *set)
         }
 }
 
         }
 }
 
+/**
+ * Check whether we can create the object on the OST(refered by ost_idx)
+ * \retval:
+ *          0: create the object.
+ *          other value: did not create the object.
+ */
+static int lov_check_and_create_object(struct lov_obd *lov, int ost_idx,
+                                       struct lov_stripe_md *lsm,
+                                       struct lov_request *req,
+                                       struct obd_trans_info *oti)
+{
+        int stripe;
+        int rc = -EIO;
+        ENTRY;
+
+        CDEBUG(D_QOS, "Check and create on idx %d \n", ost_idx);
+        if (!lov->lov_tgts[ost_idx] ||
+            !lov->lov_tgts[ost_idx]->ltd_active)
+                RETURN(rc);
+
+        /* check if objects has been created on this ost */
+        for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) {
+                /* already have object at this stripe */
+                if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx)
+                        break;
+        }
+
+        if (stripe >= lsm->lsm_stripe_count) {
+                req->rq_idx = ost_idx;
+                rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp,
+                                req->rq_oi.oi_oa, &req->rq_oi.oi_md,
+                                oti);
+        }
+        RETURN(rc);
+}
+
 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
 {
         struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
 {
         struct lov_stripe_md *lsm = set->set_oi->oi_md;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
-        int stripe, i, rc = -EIO;
+        unsigned ost_idx = 0, ost_count;
+        struct pool_desc *pool;
+        struct ost_pool *osts = NULL;
+        int i, rc = -EIO;
         ENTRY;
 
         ENTRY;
 
-        ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count;
-        for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
-                if (!lov->lov_tgts[ost_idx] ||
-                    !lov->lov_tgts[ost_idx]->ltd_active)
-                        continue;
-                /* check if objects has been created on this ost */
-                for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) {
-                        /* we try send create to this ost but he is failed */
-                        if (stripe == req->rq_stripe)
-                                continue;
-                        /* already have object at this stripe */
-                        if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx)
-                                break;
-                }
-                if (stripe >= lsm->lsm_stripe_count) {
-                        req->rq_idx = ost_idx;
-                        rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp,
-                                        req->rq_oi.oi_oa, &req->rq_oi.oi_md,
-                                        set->set_oti);
-                        if (!rc)
+        /* First check whether we can create the objects on the pool */
+        pool = lov_find_pool(lov, lsm->lsm_pool_name);
+        if (pool != NULL) {
+                cfs_down_read(&pool_tgt_rw_sem(pool));
+                osts = &(pool->pool_obds);
+                ost_count = osts->op_count;
+                for (i = 0; i < ost_count; i++, ost_idx = osts->op_array[i]) {
+                        rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+                                                         set->set_oti);
+                        if (rc == 0)
                                 break;
                 }
                                 break;
                 }
+                cfs_up_read(&pool_tgt_rw_sem(pool));
+                lov_pool_putref(pool);
+                RETURN(rc);
         }
         }
+
+        ost_count = lov->desc.ld_tgt_count;
+        /* Then check whether we can create the objects on other OSTs */
+        ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count;
+        for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
+                rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+                                                 set->set_oti);
+
+                if (rc == 0)
+                        break;
+        }
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 01095df..79d2943 100644 (file)
@@ -103,7 +103,6 @@ int lov_finished_set(struct lov_request_set *set)
         return set->set_completes == set->set_count;
 }
 
         return set->set_completes == set->set_count;
 }
 
-
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
@@ -550,6 +549,56 @@ out_set:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+static int lov_update_create_set(struct lov_request_set *set,
+                                 struct lov_request *req, int rc)
+{
+        struct obd_trans_info *oti = set->set_oti;
+        struct lov_stripe_md *lsm = set->set_oi->oi_md;
+        struct lov_oinfo *loi;
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+        ENTRY;
+
+        if (rc && lov->lov_tgts[req->rq_idx] &&
+            lov->lov_tgts[req->rq_idx]->ltd_active) {
+                CERROR("error creating fid "LPX64" sub-object"
+                       " on OST idx %d/%d: rc = %d\n",
+                       set->set_oi->oi_oa->o_id, req->rq_idx,
+                       lsm->lsm_stripe_count, rc);
+                if (rc > 0) {
+                        CERROR("obd_create returned invalid err %d\n", rc);
+                        rc = -EIO;
+                }
+        }
+
+        cfs_spin_lock(&set->set_lock);
+        req->rq_stripe = set->set_success;
+        loi = lsm->lsm_oinfo[req->rq_stripe];
+
+
+        if (rc) {
+                lov_update_set(set, req, rc);
+                cfs_spin_unlock(&set->set_lock);
+                RETURN(rc);
+        }
+
+        loi->loi_id = req->rq_oi.oi_oa->o_id;
+        loi->loi_seq = req->rq_oi.oi_oa->o_seq;
+        loi->loi_ost_idx = req->rq_idx;
+        loi_init(loi);
+
+        if (oti && set->set_cookies)
+                ++oti->oti_logcookies;
+        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
+                set->set_cookie_sent++;
+
+        lov_update_set(set, req, rc);
+        cfs_spin_unlock(&set->set_lock);
+
+        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+        RETURN(rc);
+}
+
 static int create_done(struct obd_export *exp, struct lov_request_set *set,
                        struct lov_stripe_md **lsmp)
 {
 static int create_done(struct obd_export *exp, struct lov_request_set *set,
                        struct lov_stripe_md **lsmp)
 {
@@ -660,67 +709,23 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-int lov_update_create_set(struct lov_request_set *set,
-                          struct lov_request *req, int rc)
-{
-        struct obd_trans_info *oti = set->set_oti;
-        struct lov_stripe_md *lsm = set->set_oi->oi_md;
-        struct lov_oinfo *loi;
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
-        ENTRY;
-
-        if (rc && lov->lov_tgts[req->rq_idx] &&
-            lov->lov_tgts[req->rq_idx]->ltd_active) {
-                CERROR("error creating fid "LPX64" sub-object"
-                       " on OST idx %d/%d: rc = %d\n",
-                       set->set_oi->oi_oa->o_id, req->rq_idx,
-                       lsm->lsm_stripe_count, rc);
-                if (rc > 0) {
-                        CERROR("obd_create returned invalid err %d\n", rc);
-                        rc = -EIO;
-                }
-        }
-
-        cfs_spin_lock(&set->set_lock);
-        req->rq_stripe = set->set_success;
-        loi = lsm->lsm_oinfo[req->rq_stripe];
-        if (rc) {
-                lov_update_set(set, req, rc);
-                cfs_spin_unlock(&set->set_lock);
-                RETURN(rc);
-        }
-
-        loi->loi_id = req->rq_oi.oi_oa->o_id;
-        loi->loi_seq = req->rq_oi.oi_oa->o_seq;
-        loi->loi_ost_idx = req->rq_idx;
-        loi_init(loi);
-
-        if (oti && set->set_cookies)
-                ++oti->oti_logcookies;
-        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
-                set->set_cookie_sent++;
-
-        lov_update_set(set, req, rc);
-        cfs_spin_unlock(&set->set_lock);
-
-        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
-               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
-        RETURN(rc);
-}
-
 int cb_create_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
 int cb_create_update(void *cookie, int rc)
 {
         struct obd_info *oinfo = cookie;
         struct lov_request *lovreq;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
+                if (lovreq->rq_idx == obd_fail_val)
+                        rc = -ENOTCONN;
+
         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
         if (lov_finished_set(lovreq->rq_rqset))
                 lov_put_reqset(lovreq->rq_rqset);
         return rc;
 }
 
         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
         if (lov_finished_set(lovreq->rq_rqset))
                 lov_put_reqset(lovreq->rq_rqset);
         return rc;
 }
 
-
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
                         struct obd_trans_info *oti,
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
                         struct obd_trans_info *oti,
index 58f4602..8ec669f 100644 (file)
@@ -1010,7 +1010,7 @@ static int mdd_lov_set_nextid(const struct lu_env *env,
         ENTRY;
 
         LASSERT(mds->mds_lov_objids != NULL);
         ENTRY;
 
         LASSERT(mds->mds_lov_objids != NULL);
-        rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
+        rc = obd_set_info_async(mds->mds_lov_exp, strlen(KEY_NEXT_ID),
                                 KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
                                 mds->mds_lov_objids, NULL);
 
                                 KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
                                 mds->mds_lov_objids, NULL);
 
@@ -1054,7 +1054,7 @@ static int mdd_recovery_complete(const struct lu_env *env,
         }
 #endif
         /* Call that with obd_recovering = 1 just to update objids */
         }
 #endif
         /* Call that with obd_recovering = 1 just to update objids */
-        obd_notify(obd->u.mds.mds_osc_obd, NULL, (obd->obd_async_recov ?
+        obd_notify(obd->u.mds.mds_lov_obd, NULL, (obd->obd_async_recov ?
                     OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL);
 
         /* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */
                     OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL);
 
         /* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */
@@ -1181,7 +1181,7 @@ static int mdd_update_capa_key(const struct lu_env *env,
 {
         struct mds_capa_info info = { .uuid = NULL, .capa = key };
         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
 {
         struct mds_capa_info info = { .uuid = NULL, .capa = key };
         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
-        struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
+        struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_lov_exp;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
index 9cd1193..5b6c656 100644 (file)
@@ -239,7 +239,7 @@ static int mdd_lov_set_stripe_md(const struct lu_env *env,
 {
         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
         struct obd_device       *obd = mdd2obd_dev(mdd);
 {
         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
         struct obd_device       *obd = mdd2obd_dev(mdd);
-        struct obd_export       *lov_exp = obd->u.mds.mds_osc_exp;
+        struct obd_export       *lov_exp = obd->u.mds.mds_lov_exp;
         struct lov_stripe_md    *lsm = NULL;
         int rc;
         ENTRY;
         struct lov_stripe_md    *lsm = NULL;
         int rc;
         ENTRY;
@@ -380,7 +380,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                    const struct md_op_spec *spec, struct lu_attr *la)
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
                    const struct md_op_spec *spec, struct lu_attr *la)
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
-        struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
+        struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
         struct obdo           *oa;
         struct lov_stripe_md  *lsm = NULL;
         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
         struct obdo           *oa;
         struct lov_stripe_md  *lsm = NULL;
@@ -555,7 +555,7 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
                       int log_unlink)
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
                       int log_unlink)
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
-        struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
+        struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
         struct lov_stripe_md  *lsm = NULL;
         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
         struct lov_stripe_md  *lsm = NULL;
         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
@@ -678,10 +678,10 @@ int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        if (IS_ERR(mds->mds_osc_obd))
-                RETURN(PTR_ERR(mds->mds_osc_obd));
+        if (IS_ERR(mds->mds_lov_obd))
+                RETURN(PTR_ERR(mds->mds_lov_obd));
 
 
-        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
+        rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
         if (rc < 0)
                 RETURN(rc);
 
         if (rc < 0)
                 RETURN(rc);
 
@@ -704,7 +704,7 @@ int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
 
         OBD_FREE(lsr, sizeof(*lsr));
  out:
 
         OBD_FREE(lsr, sizeof(*lsr));
  out:
-        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        obd_free_memmd(mds->mds_lov_exp, &lsm);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -749,7 +749,7 @@ static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
 
         LASSERT(lmm);
 
 
         LASSERT(lmm);
 
-        rc = obd_unpackmd(mds->mds_osc_exp, &oinfo.oi_md, lmm, lmm_size);
+        rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size);
         if (rc < 0) {
                 CERROR("Error unpack md %p for obj "DFID"\n", lmm,
                         PFID(parent));
         if (rc < 0) {
                 CERROR("Error unpack md %p for obj "DFID"\n", lmm,
                         PFID(parent));
@@ -772,13 +772,13 @@ static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
         oinfo.oi_capa = oc;
 
         /* do async setattr from mds to ost not waiting for responses. */
         oinfo.oi_capa = oc;
 
         /* do async setattr from mds to ost not waiting for responses. */
-        rc = obd_setattr_async(mds->mds_osc_exp, &oinfo, &oti, NULL);
+        rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL);
         if (rc)
                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
 out:
         if (oinfo.oi_md)
         if (rc)
                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
 out:
         if (oinfo.oi_md)
-                obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md);
+                obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md);
         OBDO_FREE(oinfo.oi_oa);
         RETURN(rc);
 }
         OBDO_FREE(oinfo.oi_oa);
         RETURN(rc);
 }
index d534e80..7fd6ec0 100644 (file)
@@ -149,7 +149,7 @@ static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
 static int mds_lov_clean(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
 static int mds_lov_clean(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct obd_device *osc = mds->mds_osc_obd;
+        struct obd_device *osc = mds->mds_lov_obd;
         ENTRY;
 
         if (mds->mds_profile) {
         ENTRY;
 
         if (mds->mds_profile) {
@@ -171,7 +171,7 @@ static int mds_lov_clean(struct obd_device *obd)
         osc->obd_fail = obd->obd_fail;
 
         /* Cleanup the lov */
         osc->obd_fail = obd->obd_fail;
 
         /* Cleanup the lov */
-        obd_disconnect(mds->mds_osc_exp);
+        obd_disconnect(mds->mds_lov_exp);
         class_manual_cleanup(osc);
 
         RETURN(0);
         class_manual_cleanup(osc);
 
         RETURN(0);
@@ -247,7 +247,7 @@ int mds_postrecov(struct obd_device *obd)
         /* Notify the LOV, which will in turn call mds_notify for each tgt */
         /* This means that we have to hack obd_notify to think we're obd_set_up
            during mds_lov_connect. */
         /* Notify the LOV, which will in turn call mds_notify for each tgt */
         /* This means that we have to hack obd_notify to think we're obd_set_up
            during mds_lov_connect. */
-        obd_notify(obd->u.mds.mds_osc_obd, NULL,
+        obd_notify(obd->u.mds.mds_lov_obd, NULL,
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
 
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
 
@@ -258,7 +258,7 @@ int mds_postrecov(struct obd_device *obd)
 static int mds_lov_early_clean(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
 static int mds_lov_early_clean(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct obd_device *osc = mds->mds_osc_obd;
+        struct obd_device *osc = mds->mds_lov_obd;
 
         if (!osc || (!obd->obd_force && !obd->obd_fail))
                 return(0);
 
         if (!osc || (!obd->obd_force && !obd->obd_fail))
                 return(0);
@@ -289,7 +289,7 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 if (ctxt)
                         llog_cleanup(ctxt);
                 rc = obd_llog_finish(obd, 0);
                 if (ctxt)
                         llog_cleanup(ctxt);
                 rc = obd_llog_finish(obd, 0);
-                mds->mds_osc_exp = NULL;
+                mds->mds_lov_exp = NULL;
                 cfs_up_write(&mds->mds_notify_lock);
                 break;
         }
                 cfs_up_write(&mds->mds_notify_lock);
                 break;
         }
@@ -430,7 +430,7 @@ static int mds_cmd_cleanup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        mds->mds_osc_exp = NULL;
+        mds->mds_lov_exp = NULL;
 
         if (obd->obd_fail)
                 LCONSOLE_WARN("%s: shutting down for failover; client state "
 
         if (obd->obd_fail)
                 LCONSOLE_WARN("%s: shutting down for failover; client state "
index 4e0e0a5..102e790 100644 (file)
@@ -99,7 +99,7 @@ static int lprocfs_mds_wr_evict_client(struct file *file, const char *buffer,
                 return -ENOMEM;
 
         if (obd->u.mds.mds_evict_ost_nids) {
                 return -ENOMEM;
 
         if (obd->u.mds.mds_evict_ost_nids) {
-                rc = obd_set_info_async(mds->mds_osc_exp,
+                rc = obd_set_info_async(mds->mds_lov_exp,
                                         sizeof(KEY_EVICT_BY_NID),
                                         KEY_EVICT_BY_NID, strlen(tmpbuf + 4) + 1,
                                         tmpbuf + 4, set);
                                         sizeof(KEY_EVICT_BY_NID),
                                         KEY_EVICT_BY_NID, strlen(tmpbuf + 4) + 1,
                                         tmpbuf + 4, set);
index 3bd6a83..70aac6e 100644 (file)
@@ -60,7 +60,7 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
                                struct llog_cookie *logcookies, int numcookies)
 {
         struct obd_device *obd = ctxt->loc_obd;
                                struct llog_cookie *logcookies, int numcookies)
 {
         struct obd_device *obd = ctxt->loc_obd;
-        struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+        struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
@@ -78,7 +78,7 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt,
                                    struct obd_uuid *uuid)
 {
         struct obd_device *obd = ctxt->loc_obd;
                                    struct obd_uuid *uuid)
 {
         struct obd_device *obd = ctxt->loc_obd;
-        struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+        struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
@@ -98,7 +98,7 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls
                           int count, struct llog_cookie *cookies, int flags)
 {
         struct obd_device *obd = ctxt->loc_obd;
                           int count, struct llog_cookie *cookies, int flags)
 {
         struct obd_device *obd = ctxt->loc_obd;
-        struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+        struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
         struct llog_ctxt *lctxt;
         int rc;
         ENTRY;
@@ -198,7 +198,7 @@ EXPORT_SYMBOL(mds_changelog_llog_init);
 int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *disk_obd, int *index)
 {
 int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *disk_obd, int *index)
 {
-        struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+        struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
@@ -296,15 +296,15 @@ int mds_log_op_unlink(struct obd_device *obd,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        if (IS_ERR(mds->mds_osc_obd))
-                RETURN(PTR_ERR(mds->mds_osc_obd));
+        if (IS_ERR(mds->mds_lov_obd))
+                RETURN(PTR_ERR(mds->mds_lov_obd));
 
 
-        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
+        rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
         if (rc < 0)
                 RETURN(rc);
         rc = mds_llog_add_unlink(obd, lsm, 0, logcookies,
                                  cookies_size / sizeof(struct llog_cookie));
         if (rc < 0)
                 RETURN(rc);
         rc = mds_llog_add_unlink(obd, lsm, 0, logcookies,
                                  cookies_size / sizeof(struct llog_cookie));
-        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        obd_free_memmd(mds->mds_lov_exp, &lsm);
         RETURN(rc);
 }
 EXPORT_SYMBOL(mds_log_op_unlink);
         RETURN(rc);
 }
 EXPORT_SYMBOL(mds_log_op_unlink);
@@ -317,8 +317,8 @@ int mds_log_op_orphan(struct obd_device *obd, struct lov_stripe_md *lsm,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        if (IS_ERR(mds->mds_osc_obd))
-                RETURN(PTR_ERR(mds->mds_osc_obd));
+        if (IS_ERR(mds->mds_lov_obd))
+                RETURN(PTR_ERR(mds->mds_lov_obd));
 
         rc = mds_llog_add_unlink(obd, lsm, count - 1, &logcookie, 1);
         RETURN(rc);
 
         rc = mds_llog_add_unlink(obd, lsm, count - 1, &logcookie, 1);
         RETURN(rc);
index ba289da..a44652a 100644 (file)
@@ -271,7 +271,7 @@ static int mds_log_lost_precreated(struct obd_device *obd,
         ENTRY;
 
         if (*lsmp == NULL) {
         ENTRY;
 
         if (*lsmp == NULL) {
-                rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
+                rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
                 if (rc < 0)
                         RETURN(rc);
                 /* need only one stripe, save old value */
                 if (rc < 0)
                         RETURN(rc);
                 /* need only one stripe, save old value */
@@ -346,7 +346,7 @@ void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
         if (lsm) {
                 /* restore stripes number */
                 lsm->lsm_stripe_count = stripes;
         if (lsm) {
                 /* restore stripes number */
                 lsm->lsm_stripe_count = stripes;
-                obd_free_memmd(mds->mds_osc_exp, &lsm);
+                obd_free_memmd(mds->mds_lov_exp, &lsm);
         }
         EXIT;
         return;
         }
         EXIT;
         return;
@@ -478,24 +478,35 @@ static int mds_lov_get_objid(struct obd_device * obd,
                              obd_id idx)
 {
         struct mds_obd *mds = &obd->u.mds;
                              obd_id idx)
 {
         struct mds_obd *mds = &obd->u.mds;
+        struct obd_export *lov_exp = mds->mds_lov_exp;
         unsigned int page;
         unsigned int off;
         obd_id *data;
         unsigned int page;
         unsigned int off;
         obd_id *data;
+        __u64 connect_flags;
+        __u32 size;
         int rc = 0;
         ENTRY;
 
         page = idx / OBJID_PER_PAGE();
         off = idx % OBJID_PER_PAGE();
         data = mds->mds_lov_page_array[page];
         int rc = 0;
         ENTRY;
 
         page = idx / OBJID_PER_PAGE();
         off = idx % OBJID_PER_PAGE();
         data = mds->mds_lov_page_array[page];
-        if (data[off] < 2) {
+
+        size = sizeof(__u64);
+        connect_flags = idx;
+        rc = obd_get_info(lov_exp, sizeof(KEY_CONNECT_FLAG), KEY_CONNECT_FLAG,
+                          &size, &connect_flags, NULL);
+        if (rc)
+                GOTO(out, rc);
+
+        if (data[off] < 2 || connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
                 /* We never read this lastid; ask the osc */
                 struct obd_id_info lastid;
                 /* We never read this lastid; ask the osc */
                 struct obd_id_info lastid;
-                __u32 size = sizeof(lastid);
 
 
+                size = sizeof(lastid);
                 lastid.idx = idx;
                 lastid.data = &data[off];
                 lastid.idx = idx;
                 lastid.data = &data[off];
-                rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
-                                  KEY_LAST_ID, &size, &lastid, NULL);
+                rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
+                                  &size, &lastid, NULL);
                 if (rc)
                         GOTO(out, rc);
 
                 if (rc)
                         GOTO(out, rc);
 
@@ -531,7 +542,7 @@ int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
         if (ost_uuid != NULL)
                 oti.oti_ost_uuid = ost_uuid;
 
         if (ost_uuid != NULL)
                 oti.oti_ost_uuid = ost_uuid;
 
-        rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
+        rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -548,7 +559,7 @@ static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
 
         info.idx = idx;
         info.data = id;
 
         info.idx = idx;
         info.data = id;
-        rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
+        rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
         if (rc)
                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
         if (rc)
                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
@@ -571,7 +582,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         if (!ld)
                 RETURN(-ENOMEM);
 
         if (!ld)
                 RETURN(-ENOMEM);
 
-        rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
+        rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
                           &valsize, ld, NULL);
         if (rc)
                 GOTO(out, rc);
                           &valsize, ld, NULL);
         if (rc)
                 GOTO(out, rc);
@@ -670,16 +681,16 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        if (IS_ERR(mds->mds_osc_obd))
-                RETURN(PTR_ERR(mds->mds_osc_obd));
+        if (IS_ERR(mds->mds_lov_obd))
+                RETURN(PTR_ERR(mds->mds_lov_obd));
 
 
-        if (mds->mds_osc_obd)
+        if (mds->mds_lov_obd)
                 RETURN(0);
 
                 RETURN(0);
 
-        mds->mds_osc_obd = class_name2obd(lov_name);
-        if (!mds->mds_osc_obd) {
+        mds->mds_lov_obd = class_name2obd(lov_name);
+        if (!mds->mds_lov_obd) {
                 CERROR("MDS cannot locate LOV %s\n", lov_name);
                 CERROR("MDS cannot locate LOV %s\n", lov_name);
-                mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
+                mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
                 RETURN(-ENOTCONN);
         }
 
                 RETURN(-ENOTCONN);
         }
 
@@ -691,7 +702,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(err_exit, rc);
         }
 
                 GOTO(err_exit, rc);
         }
 
-        rc = obd_register_observer(mds->mds_osc_obd, obd);
+        rc = obd_register_observer(mds->mds_lov_obd, obd);
         if (rc) {
                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
                        lov_name, rc);
         if (rc) {
                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
                        lov_name, rc);
@@ -703,7 +714,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (rc)
                 GOTO(err_exit, rc);
 
         if (rc)
                 GOTO(err_exit, rc);
 
-        mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
+        mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
 
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
 
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
@@ -726,11 +737,11 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         /* send the list of supported checksum types */
         data->ocd_cksum_types = OBD_CKSUM_ALL;
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
         /* send the list of supported checksum types */
         data->ocd_cksum_types = OBD_CKSUM_ALL;
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
-        rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
+        rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
-                mds->mds_osc_obd = ERR_PTR(rc);
+                mds->mds_lov_obd = ERR_PTR(rc);
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
@@ -746,8 +757,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         RETURN(rc);
 
 err_exit:
         RETURN(rc);
 
 err_exit:
-        mds->mds_osc_exp = NULL;
-        mds->mds_osc_obd = ERR_PTR(rc);
+        mds->mds_lov_exp = NULL;
+        mds->mds_lov_obd = ERR_PTR(rc);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -757,16 +768,16 @@ int mds_lov_disconnect(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
-                obd_register_observer(mds->mds_osc_obd, NULL);
+        if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
+                obd_register_observer(mds->mds_lov_obd, NULL);
 
                 /* The actual disconnect of the mds_lov will be called from
                  * class_disconnect_exports from mds_lov_clean. So we have to
                  * ensure that class_cleanup doesn't fail due to the extra ref
                  * we're holding now. The mechanism to do that already exists -
                  * the obd_force flag. We'll drop the final ref to the
 
                 /* The actual disconnect of the mds_lov will be called from
                  * class_disconnect_exports from mds_lov_clean. So we have to
                  * ensure that class_cleanup doesn't fail due to the extra ref
                  * we're holding now. The mechanism to do that already exists -
                  * the obd_force flag. We'll drop the final ref to the
-                 * mds_osc_exp in mds_cleanup. */
-                mds->mds_osc_obd->obd_force = 1;
+                 * mds_lov_exp in mds_cleanup. */
+                mds->mds_lov_obd->obd_force = 1;
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -796,7 +807,7 @@ static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
 
                 info.capa = key;
                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
 
                 info.capa = key;
-                rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
+                rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
                 if (rc) {
                         DEBUG_CAPA_KEY(D_ERROR, key,
                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
                 if (rc) {
                         DEBUG_CAPA_KEY(D_ERROR, key,
@@ -847,7 +858,7 @@ static int __mds_lov_synchronize(void *data)
         mgi.group = mdt_to_obd_objseq(mds->mds_id);
         mgi.uuid = uuid;
 
         mgi.group = mdt_to_obd_objseq(mds->mds_id);
         mgi.uuid = uuid;
 
-        rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
+        rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
         if (rc != 0)
                 GOTO(out, rc);
                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
         if (rc != 0)
                 GOTO(out, rc);
@@ -896,9 +907,9 @@ out:
                 /* Deactivate it for safety */
                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
                        rc);
                 /* Deactivate it for safety */
                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
                        rc);
-                if (!obd->obd_stopping && mds->mds_osc_obd &&
-                    !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
-                        obd_notify(mds->mds_osc_obd, watched,
+                if (!obd->obd_stopping && mds->mds_lov_obd &&
+                    !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
+                        obd_notify(mds->mds_lov_obd, watched,
                                    OBD_NOTIFY_INACTIVE, NULL);
         }
 
                                    OBD_NOTIFY_INACTIVE, NULL);
         }
 
index 8915ad5..6dfd1e4 100644 (file)
@@ -647,6 +647,18 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         break;
                 }
 
                         break;
                 }
 
+                /**
+                 * If this is DELORPHAN process, no need create object here,
+                 * otherwise this will create a gap of object id, and MDS
+                 * might create some orphan log (mds_lov_update_objids), then
+                 * remove objects wrongly on OST. Bug 21379.
+                 */
+                if (oa->o_valid & OBD_MD_FLFLAGS &&
+                        oa->o_flags == OBD_FL_DELORPHAN) {
+                        cfs_spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
+
                 if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;
                 if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;
index 21ec147..d7aefe6 100644 (file)
@@ -250,7 +250,7 @@ int generic_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl, int glo
                 }
 
                 if (rc == 0 && global && is_master)
                 }
 
                 if (rc == 0 && global && is_master)
-                        rc = obd_quotactl(obd->u.mds.mds_osc_exp, oqctl);
+                        rc = obd_quotactl(obd->u.mds.mds_lov_exp, oqctl);
         }
 
         if (is_master)
         }
 
         if (is_master)
index 3ad5ee0..bd6e966 100644 (file)
@@ -324,7 +324,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type,
 
         /* only when block qunit is reduced, boardcast to osts */
         if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq))
 
         /* only when block qunit is reduced, boardcast to osts */
         if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq))
-                rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt);
+                rc = obd_quota_adjust_qunit(mds->mds_lov_exp, oqaq, qctxt);
 
 out:
         lustre_dqput(dquot);
 
 out:
         lustre_dqput(dquot);
@@ -656,7 +656,7 @@ int mds_quota_finvalidate(struct obd_device *obd, struct obd_quotactl *oqctl)
         oqctl->qc_cmd = Q_FINVALIDATE;
         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
         if (!rc)
         oqctl->qc_cmd = Q_FINVALIDATE;
         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
         if (!rc)
-                rc = obd_quotactl(mds->mds_osc_exp, oqctl);
+                rc = obd_quotactl(mds->mds_lov_exp, oqctl);
 
         cfs_up_write(&mds->mds_qonoff_sem);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         cfs_up_write(&mds->mds_qonoff_sem);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
@@ -895,7 +895,7 @@ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl)
                 GOTO(out, rc1);
         }
 
                 GOTO(out, rc1);
         }
 
-        rc = obd_quotactl(mds->mds_osc_exp, oqctl);
+        rc = obd_quotactl(mds->mds_lov_exp, oqctl);
         if (rc && rc != -EALREADY) {
                 CWARN("mds remote quota[%d] is failed to be off for %d\n",
                       oqctl->qc_type, rc);
         if (rc && rc != -EALREADY) {
                 CWARN("mds remote quota[%d] is failed to be off for %d\n",
                       oqctl->qc_type, rc);
@@ -1223,7 +1223,7 @@ static int mds_init_slave_blimits(struct obd_device *obd,
                 id[GRPQUOTA] = oqctl->qc_id;
 
         /* initialize all slave's limit */
                 id[GRPQUOTA] = oqctl->qc_id;
 
         /* initialize all slave's limit */
-        rc = obd_quotactl(mds->mds_osc_exp, ioqc);
+        rc = obd_quotactl(mds->mds_lov_exp, ioqc);
 
         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL);
         if (rc == -EDQUOT || rc == -EBUSY) {
 
         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL);
         if (rc == -EDQUOT || rc == -EBUSY) {
@@ -1254,7 +1254,7 @@ static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq)
 
         /* adjust remote lqs */
         if (QAQ_IS_ADJBLK(qaq)) {
 
         /* adjust remote lqs */
         if (QAQ_IS_ADJBLK(qaq)) {
-                rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt);
+                rc = obd_quota_adjust_qunit(obd->u.mds.mds_lov_exp, qaq, qctxt);
                 if (rc < 0)
                         CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
 
                 if (rc < 0)
                         CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
 
@@ -1265,7 +1265,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
-        struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp);
+        struct obd_device *lov_obd = class_exp2obd(mds->mds_lov_exp);
         struct lov_obd *lov = &lov_obd->u.lov;
         struct quota_adjust_qunit *oqaq = NULL;
         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
         struct lov_obd *lov = &lov_obd->u.lov;
         struct quota_adjust_qunit *oqaq = NULL;
         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
@@ -1452,7 +1452,7 @@ static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl)
 
         /* get block usage from OSS */
         soqc->qc_dqblk.dqb_curspace = 0;
 
         /* get block usage from OSS */
         soqc->qc_dqblk.dqb_curspace = 0;
-        rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc);
+        rc = obd_quotactl(obd->u.mds.mds_lov_exp, soqc);
         if (!rc || rc == -EREMOTEIO) {
                 oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
                 oqctl->qc_dqblk.dqb_valid |= QIF_SPACE;
         if (!rc || rc == -EREMOTEIO) {
                 oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
                 oqctl->qc_dqblk.dqb_valid |= QIF_SPACE;
@@ -1579,7 +1579,7 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type)
         qctl->qc_type = type;
         qctl->qc_id = id;
         qctl->qc_stat = QUOTA_RECOVERING;
         qctl->qc_type = type;
         qctl->qc_id = id;
         qctl->qc_stat = QUOTA_RECOVERING;
-        rc = obd_quotactl(mds->mds_osc_exp, qctl);
+        rc = obd_quotactl(mds->mds_lov_exp, qctl);
         cfs_down_write(&mds->mds_qonoff_sem);
         if (rc)
                 GOTO(out, rc);
         cfs_down_write(&mds->mds_qonoff_sem);
         if (rc)
                 GOTO(out, rc);
@@ -1633,7 +1633,7 @@ static int qmaster_recovery_main(void *arg)
         /* for mds */
         class_incref(obd, "qmaster_recovd_mds", obd);
         /* for lov */
         /* for mds */
         class_incref(obd, "qmaster_recovd_mds", obd);
         /* for lov */
-        class_incref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
+        class_incref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd);
 
         cfs_complete(&data->comp);
 
 
         cfs_complete(&data->comp);
 
@@ -1666,7 +1666,7 @@ free:
                 }
         }
         cfs_up_write(&mds->mds_qonoff_sem);
                 }
         }
         cfs_up_write(&mds->mds_qonoff_sem);
-        class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
+        class_decref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd);
         class_decref(obd, "qmaster_recovd_mds", obd);
         RETURN(rc);
 }
         class_decref(obd, "qmaster_recovd_mds", obd);
         RETURN(rc);
 }
index 7f512bf..8e86178 100644 (file)
@@ -1323,26 +1323,65 @@ test_25() {
         wait_osc_import_state mds ost FULL
         clients_up
 
         wait_osc_import_state mds ost FULL
         clients_up
 
+        wait_mds_ost_sync 
         # Veriy that the pool got created and is usable
         df $POOL_ROOT > /dev/null
         # Veriy that the pool got created and is usable
         df $POOL_ROOT > /dev/null
-               sleep 5
-               # Make sure OST0 can be striped on
-               $SETSTRIPE -o 0 -c 1 $POOL_ROOT/$tfile
-               STR=$($GETSTRIPE $POOL_ROOT/$tfile | grep 0x | cut -f2 | tr -d " ")
-               rm $POOL_ROOT/$tfile
-               if [[ "$STR" == "0" ]]; then
-                       echo "Creating a file in pool$i"
-                       create_file $POOL_ROOT/file$i pool$i || break
-                       check_file_in_pool $POOL_ROOT/file$i pool$i || break
-               else
-                       echo "OST 0 seems to be unavailable.  Try later."
-               fi
+        sleep 5
+        # Make sure OST0 can be striped on
+        $SETSTRIPE -o 0 -c 1 $POOL_ROOT/$tfile
+        STR=$($GETSTRIPE $POOL_ROOT/$tfile | grep 0x | cut -f2 | tr -d " ")
+        rm $POOL_ROOT/$tfile
+        if [[ "$STR" == "0" ]]; then
+                echo "Creating a file in pool$i"
+                create_file $POOL_ROOT/file$i pool$i || break
+                check_file_in_pool $POOL_ROOT/file$i pool$i || break
+        else
+                echo "OST 0 seems to be unavailable.  Try later."
+        fi
     done
 
     rm -rf $POOL_ROOT
 }
 run_test 25 "Create new pool and restart MDS ======================="
 
     done
 
     rm -rf $POOL_ROOT
 }
 run_test 25 "Create new pool and restart MDS ======================="
 
+test_26() {
+    local OSTCOUNT=`lctl get_param -n lov.$LOVNAME.*clilov*.numobd`
+    [[ $OSTCOUNT -le 2 ]] && skip_env "Need at least 3 OSTs" && return
+    set_cleanup_trap
+    local dev=$(mdsdevname ${SINGLEMDS//mds/})
+    local POOL_ROOT=${POOL_ROOT:-$DIR/$tdir}
+
+    mkdir -p $POOL_ROOT
+
+    create_pool_nofail $POOL 
+    do_facet $SINGLEMDS "lctl pool_add $FSNAME.pool1 OST0000; sync"
+    wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.pool1 | \
+    sort -u | grep $FSNAME-OST0000_UUID " "$FSNAME-OST0000_UUID" || \
+    error "pool_add failed: $1; $2"
+  
+    do_facet $SINGLEMDS "lctl pool_add $FSNAME.pool1 OST0002; sync"
+    wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.pool1 | \
+    sort -u | grep $FSNAME-OST0002_UUID" "$FSNAME-OST0002_UUID" || \
+    error "pool_add failed: $1; $2"
+  
+    # Veriy that the pool got created and is usable
+    df $POOL_ROOT
+    echo "Creating files in $POOL"
+
+    for ((i=0;i<10;i++)); do
+        #OBD_FAIL_MDS_OSC_CREATE_FAIL     0x147  
+        #Fail OST0000 to make sure the objects create on the other ost in the pool 
+        do_facet $SINGLEMDS lctl set_param fail_loc=0x147
+        do_facet $SINGLEMDS lctl set_param fail_val=0
+        create_file $POOL_ROOT/file$i $POOL 1 -1 || break
+        do_facet $SINGLEMDS lctl set_param fail_loc=0
+        check_file_in_pool $POOL_ROOT/file$i $POOL || break
+    done
+    rm -rf $POOL_ROOT
+}
+run_test 26 "Choose other OSTs in the pool first in the creation remedy"
+
 log "cleanup: ======================================================"
 cd $ORIG_PWD
 cleanup_pools $FSNAME
 log "cleanup: ======================================================"
 cd $ORIG_PWD
 cleanup_pools $FSNAME
index e3f7c5d..44e257a 100755 (executable)
@@ -5,6 +5,9 @@ set -e
 # bug number:  10124
 ALWAYS_EXCEPT="15c   $REPLAY_DUAL_EXCEPT"
 
 # bug number:  10124
 ALWAYS_EXCEPT="15c   $REPLAY_DUAL_EXCEPT"
 
+LFS=${LFS:-lfs}
+SETSTRIPE=${SETSTRIPE:-"$LFS setstripe"}
+GETSTRIPE=${GETSTRIPE:-"$LFS getstripe"}
 SAVE_PWD=$PWD
 PTLDEBUG=${PTLDEBUG:--1}
 LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
 SAVE_PWD=$PWD
 PTLDEBUG=${PTLDEBUG:--1}
 LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
@@ -254,20 +257,25 @@ run_test 13 "close resend timeout"
 # as test_15a
 
 test_14b() {
 # as test_15a
 
 test_14b() {
+    wait_mds_ost_sync
+    wait_destroy_complete
     BEFOREUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
     mkdir -p $MOUNT1/$tdir
     BEFOREUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
     mkdir -p $MOUNT1/$tdir
+    $SETSTRIPE -o 0 $MOUNT1/$tdir
     replay_barrier $SINGLEMDS
     replay_barrier $SINGLEMDS
-    createmany -o $MOUNT1/$tfile- 5
-    echo "data" > $MOUNT2/$tdir/$tfile-2
-    createmany -o $MOUNT1/$tfile-3- 5
+    createmany -o $MOUNT1/$tdir/$tfile- 5
+
+    $SETSTRIPE -o 0 $MOUNT2/f14b-3
+    echo "data" > $MOUNT2/f14b-3
+    createmany -o $MOUNT1/$tdir/$tfile-3- 5
     umount $MOUNT2
 
     fail $SINGLEMDS
     wait_recovery_complete $SINGLEMDS || error "MDS recovery not done"
 
     # first 25 files should have been replayed
     umount $MOUNT2
 
     fail $SINGLEMDS
     wait_recovery_complete $SINGLEMDS || error "MDS recovery not done"
 
     # first 25 files should have been replayed
-    unlinkmany $MOUNT1/$tfile- 5 || return 2
-    unlinkmany $MOUNT1/$tfile-3- 5 || return 3
+    unlinkmany $MOUNT1/$tdir/$tfile- 5 || return 2
+    unlinkmany $MOUNT1/$tdir/$tfile-3- 5 || return 3
 
     zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail"
 
 
     zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail"
 
index cd290b0..1093952 100644 (file)
@@ -1182,6 +1182,7 @@ wait_mds_ost_sync () {
         local -a sync=($(do_nodes $(comma_list $(osts_nodes)) \
             "$LCTL get_param -n obdfilter.*.mds_sync"))
         local con=1
         local -a sync=($(do_nodes $(comma_list $(osts_nodes)) \
             "$LCTL get_param -n obdfilter.*.mds_sync"))
         local con=1
+        local i
         for ((i=0; i<${#sync[@]}; i++)); do
             [ ${sync[$i]} -eq 0 ] && continue
             # there is a not finished MDS-OST synchronization
         for ((i=0; i<${#sync[@]}; i++)); do
             [ ${sync[$i]} -eq 0 ] && continue
             # there is a not finished MDS-OST synchronization
index 6ac3882..7631b51 100644 (file)
@@ -1381,6 +1381,7 @@ int llapi_ostlist(char *path, struct find_param *param)
 }
 
 static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
 }
 
 static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
+                                     struct lov_user_ost_data_v1 *objects,
                                      int is_dir, int verbose, int depth,
                                      char *pool_name)
 {
                                      int is_dir, int verbose, int depth,
                                      char *pool_name)
 {
@@ -1431,7 +1432,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
                         llapi_printf(LLAPI_MSG_NORMAL, "%sstripe_offset:  ",
                                      prefix);
                 llapi_printf(LLAPI_MSG_NORMAL, "%u%c",
                         llapi_printf(LLAPI_MSG_NORMAL, "%sstripe_offset:  ",
                                      prefix);
                 llapi_printf(LLAPI_MSG_NORMAL, "%u%c",
-                             lum->lmm_objects[0].l_ost_idx, nl);
+                             objects[0].l_ost_idx, nl);
         }
 
         if ((verbose & VERBOSE_POOL) && (pool_name != NULL)) {
         }
 
         if ((verbose & VERBOSE_POOL) && (pool_name != NULL)) {
@@ -1460,7 +1461,7 @@ void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name,
         }
 
         if (obdstripe == 1)
         }
 
         if (obdstripe == 1)
-                lov_dump_user_lmm_header(lum, path, is_dir, header, depth,
+                lov_dump_user_lmm_header(lum, path, objects, is_dir, header, depth,
                                          pool_name);
 
         if (!is_dir && (header & VERBOSE_OBJID)) {
                                          pool_name);
 
         if (!is_dir && (header & VERBOSE_OBJID)) {