1.precreate the objects even in DELORPHAN process, but instead of allocating these objects, otherwise
it might create the gap of object ID, which might cause some orphan unlink log being recorded wrongly.
2.Only choose OSTs in the pool if in create remedy process.
i=adilger.dilger
i=robert.read
i=johann
cfs_dentry_t *mds_logs_dir;
cfs_dentry_t *mds_objects_dir;
struct llog_handle *mds_cfg_llh;
- struct obd_device *mds_osc_obd; /* XXX lov_obd */
+ struct obd_device *mds_lov_obd;
struct obd_uuid mds_lov_uuid;
char *mds_profile;
- struct obd_export *mds_osc_exp; /* XXX lov_exp */
+ struct obd_export *mds_lov_exp;
struct lov_desc mds_lov_desc;
__u32 mds_id;
#define KEY_SET_FS "set_fs"
/* KEY_SET_INFO in lustre_idl.h */
#define KEY_SPTLRPC_CONF "sptlrpc_conf"
+#define KEY_CONNECT_FLAG "connect_flags"
struct lu_context;
#define OBD_FAIL_MDS_OPEN_WAIT_CREATE 0x144
#define OBD_FAIL_MDS_PDO_LOCK 0x145
#define OBD_FAIL_MDS_PDO_LOCK2 0x146
+#define OBD_FAIL_MDS_OSC_CREATE_FAIL 0x147
/* CMD */
#define OBD_FAIL_MDS_IS_SUBDIR_NET 0x180
struct obd_trans_info *oti,
struct lov_request_set **reqset);
int cb_create_update(void *cookie, int rc);
-int lov_update_create_set(struct lov_request_set *set,
- struct lov_request *req, int rc);
int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea);
int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
obd_count oa_bufs, struct brw_page *pga,
} else if (KEY_IS(KEY_FIEMAP)) {
rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
GOTO(out, rc);
+ } else if (KEY_IS(KEY_CONNECT_FLAG)) {
+ struct lov_tgt_desc *tgt;
+ __u64 ost_idx = *((__u64*)val);
+
+ LASSERT(*vallen == sizeof(__u64));
+ LASSERT(ost_idx < lov->desc.ld_tgt_count);
+ tgt = lov->lov_tgts[ost_idx];
+
+ if (!tgt || !tgt->ltd_exp)
+ GOTO(out, rc = -ESRCH);
+
+ *((__u64*)val) = tgt->ltd_exp->exp_connect_flags;
+ GOTO(out, rc = 0);
}
rc = -EINVAL;
}
}
+/**
+ * Check whether we can create the object on the OST(refered by ost_idx)
+ * \retval:
+ * 0: create the object.
+ * other value: did not create the object.
+ */
+static int lov_check_and_create_object(struct lov_obd *lov, int ost_idx,
+ struct lov_stripe_md *lsm,
+ struct lov_request *req,
+ struct obd_trans_info *oti)
+{
+ int stripe;
+ int rc = -EIO;
+ ENTRY;
+
+ CDEBUG(D_QOS, "Check and create on idx %d \n", ost_idx);
+ if (!lov->lov_tgts[ost_idx] ||
+ !lov->lov_tgts[ost_idx]->ltd_active)
+ RETURN(rc);
+
+ /* check if objects has been created on this ost */
+ for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) {
+ /* already have object at this stripe */
+ if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx)
+ break;
+ }
+
+ if (stripe >= lsm->lsm_stripe_count) {
+ req->rq_idx = ost_idx;
+ rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp,
+ req->rq_oi.oi_oa, &req->rq_oi.oi_md,
+ oti);
+ }
+ RETURN(rc);
+}
+
int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
{
struct lov_stripe_md *lsm = set->set_oi->oi_md;
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
- unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
- int stripe, i, rc = -EIO;
+ unsigned ost_idx = 0, ost_count;
+ struct pool_desc *pool;
+ struct ost_pool *osts = NULL;
+ int i, rc = -EIO;
ENTRY;
- ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count;
- for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
- if (!lov->lov_tgts[ost_idx] ||
- !lov->lov_tgts[ost_idx]->ltd_active)
- continue;
- /* check if objects has been created on this ost */
- for (stripe = 0; stripe < lsm->lsm_stripe_count; stripe++) {
- /* we try send create to this ost but he is failed */
- if (stripe == req->rq_stripe)
- continue;
- /* already have object at this stripe */
- if (ost_idx == lsm->lsm_oinfo[stripe]->loi_ost_idx)
- break;
- }
- if (stripe >= lsm->lsm_stripe_count) {
- req->rq_idx = ost_idx;
- rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp,
- req->rq_oi.oi_oa, &req->rq_oi.oi_md,
- set->set_oti);
- if (!rc)
+ /* First check whether we can create the objects on the pool */
+ pool = lov_find_pool(lov, lsm->lsm_pool_name);
+ if (pool != NULL) {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ ost_count = osts->op_count;
+ for (i = 0; i < ost_count; i++, ost_idx = osts->op_array[i]) {
+ rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+ set->set_oti);
+ if (rc == 0)
break;
}
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ lov_pool_putref(pool);
+ RETURN(rc);
}
+
+ ost_count = lov->desc.ld_tgt_count;
+ /* Then check whether we can create the objects on other OSTs */
+ ost_idx = (req->rq_idx + lsm->lsm_stripe_count) % ost_count;
+ for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
+ rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+ set->set_oti);
+
+ if (rc == 0)
+ break;
+ }
+
RETURN(rc);
}
return set->set_completes == set->set_count;
}
-
void lov_update_set(struct lov_request_set *set,
struct lov_request *req, int rc)
{
RETURN(rc);
}
+static int lov_update_create_set(struct lov_request_set *set,
+ struct lov_request *req, int rc)
+{
+ struct obd_trans_info *oti = set->set_oti;
+ struct lov_stripe_md *lsm = set->set_oi->oi_md;
+ struct lov_oinfo *loi;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ ENTRY;
+
+ if (rc && lov->lov_tgts[req->rq_idx] &&
+ lov->lov_tgts[req->rq_idx]->ltd_active) {
+ CERROR("error creating fid "LPX64" sub-object"
+ " on OST idx %d/%d: rc = %d\n",
+ set->set_oi->oi_oa->o_id, req->rq_idx,
+ lsm->lsm_stripe_count, rc);
+ if (rc > 0) {
+ CERROR("obd_create returned invalid err %d\n", rc);
+ rc = -EIO;
+ }
+ }
+
+ cfs_spin_lock(&set->set_lock);
+ req->rq_stripe = set->set_success;
+ loi = lsm->lsm_oinfo[req->rq_stripe];
+
+
+ if (rc) {
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
+ RETURN(rc);
+ }
+
+ loi->loi_id = req->rq_oi.oi_oa->o_id;
+ loi->loi_seq = req->rq_oi.oi_oa->o_seq;
+ loi->loi_ost_idx = req->rq_idx;
+ loi_init(loi);
+
+ if (oti && set->set_cookies)
+ ++oti->oti_logcookies;
+ if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
+ set->set_cookie_sent++;
+
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
+
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ RETURN(rc);
+}
+
static int create_done(struct obd_export *exp, struct lov_request_set *set,
struct lov_stripe_md **lsmp)
{
RETURN(rc);
}
-int lov_update_create_set(struct lov_request_set *set,
- struct lov_request *req, int rc)
-{
- struct obd_trans_info *oti = set->set_oti;
- struct lov_stripe_md *lsm = set->set_oi->oi_md;
- struct lov_oinfo *loi;
- struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
- ENTRY;
-
- if (rc && lov->lov_tgts[req->rq_idx] &&
- lov->lov_tgts[req->rq_idx]->ltd_active) {
- CERROR("error creating fid "LPX64" sub-object"
- " on OST idx %d/%d: rc = %d\n",
- set->set_oi->oi_oa->o_id, req->rq_idx,
- lsm->lsm_stripe_count, rc);
- if (rc > 0) {
- CERROR("obd_create returned invalid err %d\n", rc);
- rc = -EIO;
- }
- }
-
- cfs_spin_lock(&set->set_lock);
- req->rq_stripe = set->set_success;
- loi = lsm->lsm_oinfo[req->rq_stripe];
- if (rc) {
- lov_update_set(set, req, rc);
- cfs_spin_unlock(&set->set_lock);
- RETURN(rc);
- }
-
- loi->loi_id = req->rq_oi.oi_oa->o_id;
- loi->loi_seq = req->rq_oi.oi_oa->o_seq;
- loi->loi_ost_idx = req->rq_idx;
- loi_init(loi);
-
- if (oti && set->set_cookies)
- ++oti->oti_logcookies;
- if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
- set->set_cookie_sent++;
-
- lov_update_set(set, req, rc);
- cfs_spin_unlock(&set->set_lock);
-
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
- RETURN(rc);
-}
-
int cb_create_update(void *cookie, int rc)
{
struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
+ if (lovreq->rq_idx == obd_fail_val)
+ rc = -ENOTCONN;
+
rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
if (lov_finished_set(lovreq->rq_rqset))
lov_put_reqset(lovreq->rq_rqset);
return rc;
}
-
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
struct lov_stripe_md **lsmp, struct obdo *src_oa,
struct obd_trans_info *oti,
ENTRY;
LASSERT(mds->mds_lov_objids != NULL);
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
+ rc = obd_set_info_async(mds->mds_lov_exp, strlen(KEY_NEXT_ID),
KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
mds->mds_lov_objids, NULL);
}
#endif
/* Call that with obd_recovering = 1 just to update objids */
- obd_notify(obd->u.mds.mds_osc_obd, NULL, (obd->obd_async_recov ?
+ obd_notify(obd->u.mds.mds_lov_obd, NULL, (obd->obd_async_recov ?
OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL);
/* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */
{
struct mds_capa_info info = { .uuid = NULL, .capa = key };
struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
- struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
+ struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_lov_exp;
int rc;
ENTRY;
{
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
+ struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
struct lov_stripe_md *lsm = NULL;
int rc;
ENTRY;
const struct md_op_spec *spec, struct lu_attr *la)
{
struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
+ struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
struct lu_site *site = mdd2lu_dev(mdd)->ld_site;
struct obdo *oa;
struct lov_stripe_md *lsm = NULL;
int log_unlink)
{
struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
+ struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
struct lov_stripe_md *lsm = NULL;
struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
struct obdo *oa = &mdd_env_info(env)->mti_oa;
int rc;
ENTRY;
- if (IS_ERR(mds->mds_osc_obd))
- RETURN(PTR_ERR(mds->mds_osc_obd));
+ if (IS_ERR(mds->mds_lov_obd))
+ RETURN(PTR_ERR(mds->mds_lov_obd));
- rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
+ rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
if (rc < 0)
RETURN(rc);
OBD_FREE(lsr, sizeof(*lsr));
out:
- obd_free_memmd(mds->mds_osc_exp, &lsm);
+ obd_free_memmd(mds->mds_lov_exp, &lsm);
RETURN(rc);
}
LASSERT(lmm);
- rc = obd_unpackmd(mds->mds_osc_exp, &oinfo.oi_md, lmm, lmm_size);
+ rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size);
if (rc < 0) {
CERROR("Error unpack md %p for obj "DFID"\n", lmm,
PFID(parent));
oinfo.oi_capa = oc;
/* do async setattr from mds to ost not waiting for responses. */
- rc = obd_setattr_async(mds->mds_osc_exp, &oinfo, &oti, NULL);
+ rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL);
if (rc)
CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
" on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
out:
if (oinfo.oi_md)
- obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md);
+ obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md);
OBDO_FREE(oinfo.oi_oa);
RETURN(rc);
}
static int mds_lov_clean(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- struct obd_device *osc = mds->mds_osc_obd;
+ struct obd_device *osc = mds->mds_lov_obd;
ENTRY;
if (mds->mds_profile) {
osc->obd_fail = obd->obd_fail;
/* Cleanup the lov */
- obd_disconnect(mds->mds_osc_exp);
+ obd_disconnect(mds->mds_lov_exp);
class_manual_cleanup(osc);
RETURN(0);
/* Notify the LOV, which will in turn call mds_notify for each tgt */
/* This means that we have to hack obd_notify to think we're obd_set_up
during mds_lov_connect. */
- obd_notify(obd->u.mds.mds_osc_obd, NULL,
+ obd_notify(obd->u.mds.mds_lov_obd, NULL,
obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
OBD_NOTIFY_SYNC, NULL);
static int mds_lov_early_clean(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- struct obd_device *osc = mds->mds_osc_obd;
+ struct obd_device *osc = mds->mds_lov_obd;
if (!osc || (!obd->obd_force && !obd->obd_fail))
return(0);
if (ctxt)
llog_cleanup(ctxt);
rc = obd_llog_finish(obd, 0);
- mds->mds_osc_exp = NULL;
+ mds->mds_lov_exp = NULL;
cfs_up_write(&mds->mds_notify_lock);
break;
}
int rc = 0;
ENTRY;
- mds->mds_osc_exp = NULL;
+ mds->mds_lov_exp = NULL;
if (obd->obd_fail)
LCONSOLE_WARN("%s: shutting down for failover; client state "
return -ENOMEM;
if (obd->u.mds.mds_evict_ost_nids) {
- rc = obd_set_info_async(mds->mds_osc_exp,
+ rc = obd_set_info_async(mds->mds_lov_exp,
sizeof(KEY_EVICT_BY_NID),
KEY_EVICT_BY_NID, strlen(tmpbuf + 4) + 1,
tmpbuf + 4, set);
struct llog_cookie *logcookies, int numcookies)
{
struct obd_device *obd = ctxt->loc_obd;
- struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+ struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
struct llog_ctxt *lctxt;
int rc;
ENTRY;
struct obd_uuid *uuid)
{
struct obd_device *obd = ctxt->loc_obd;
- struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+ struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
struct llog_ctxt *lctxt;
int rc;
ENTRY;
int count, struct llog_cookie *cookies, int flags)
{
struct obd_device *obd = ctxt->loc_obd;
- struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+ struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
struct llog_ctxt *lctxt;
int rc;
ENTRY;
int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
struct obd_device *disk_obd, int *index)
{
- struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
+ struct obd_device *lov_obd = obd->u.mds.mds_lov_obd;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
int rc;
ENTRY;
- if (IS_ERR(mds->mds_osc_obd))
- RETURN(PTR_ERR(mds->mds_osc_obd));
+ if (IS_ERR(mds->mds_lov_obd))
+ RETURN(PTR_ERR(mds->mds_lov_obd));
- rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
+ rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
if (rc < 0)
RETURN(rc);
rc = mds_llog_add_unlink(obd, lsm, 0, logcookies,
cookies_size / sizeof(struct llog_cookie));
- obd_free_memmd(mds->mds_osc_exp, &lsm);
+ obd_free_memmd(mds->mds_lov_exp, &lsm);
RETURN(rc);
}
EXPORT_SYMBOL(mds_log_op_unlink);
int rc;
ENTRY;
- if (IS_ERR(mds->mds_osc_obd))
- RETURN(PTR_ERR(mds->mds_osc_obd));
+ if (IS_ERR(mds->mds_lov_obd))
+ RETURN(PTR_ERR(mds->mds_lov_obd));
rc = mds_llog_add_unlink(obd, lsm, count - 1, &logcookie, 1);
RETURN(rc);
ENTRY;
if (*lsmp == NULL) {
- rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
+ rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
if (rc < 0)
RETURN(rc);
/* need only one stripe, save old value */
if (lsm) {
/* restore stripes number */
lsm->lsm_stripe_count = stripes;
- obd_free_memmd(mds->mds_osc_exp, &lsm);
+ obd_free_memmd(mds->mds_lov_exp, &lsm);
}
EXIT;
return;
obd_id idx)
{
struct mds_obd *mds = &obd->u.mds;
+ struct obd_export *lov_exp = mds->mds_lov_exp;
unsigned int page;
unsigned int off;
obd_id *data;
+ __u64 connect_flags;
+ __u32 size;
int rc = 0;
ENTRY;
page = idx / OBJID_PER_PAGE();
off = idx % OBJID_PER_PAGE();
data = mds->mds_lov_page_array[page];
- if (data[off] < 2) {
+
+ size = sizeof(__u64);
+ connect_flags = idx;
+ rc = obd_get_info(lov_exp, sizeof(KEY_CONNECT_FLAG), KEY_CONNECT_FLAG,
+ &size, &connect_flags, NULL);
+ if (rc)
+ GOTO(out, rc);
+
+ if (data[off] < 2 || connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
/* We never read this lastid; ask the osc */
struct obd_id_info lastid;
- __u32 size = sizeof(lastid);
+ size = sizeof(lastid);
lastid.idx = idx;
lastid.data = &data[off];
- rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
- KEY_LAST_ID, &size, &lastid, NULL);
+ rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
+ &size, &lastid, NULL);
if (rc)
GOTO(out, rc);
if (ost_uuid != NULL)
oti.oti_ost_uuid = ost_uuid;
- rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
+ rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
RETURN(rc);
}
info.idx = idx;
info.data = id;
- rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
+ rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
KEY_NEXT_ID, sizeof(info), &info, NULL);
if (rc)
CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
if (!ld)
RETURN(-ENOMEM);
- rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
+ rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
&valsize, ld, NULL);
if (rc)
GOTO(out, rc);
int rc;
ENTRY;
- if (IS_ERR(mds->mds_osc_obd))
- RETURN(PTR_ERR(mds->mds_osc_obd));
+ if (IS_ERR(mds->mds_lov_obd))
+ RETURN(PTR_ERR(mds->mds_lov_obd));
- if (mds->mds_osc_obd)
+ if (mds->mds_lov_obd)
RETURN(0);
- mds->mds_osc_obd = class_name2obd(lov_name);
- if (!mds->mds_osc_obd) {
+ mds->mds_lov_obd = class_name2obd(lov_name);
+ if (!mds->mds_lov_obd) {
CERROR("MDS cannot locate LOV %s\n", lov_name);
- mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
+ mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
RETURN(-ENOTCONN);
}
GOTO(err_exit, rc);
}
- rc = obd_register_observer(mds->mds_osc_obd, obd);
+ rc = obd_register_observer(mds->mds_lov_obd, obd);
if (rc) {
CERROR("MDS cannot register as observer of LOV %s (%d)\n",
lov_name, rc);
if (rc)
GOTO(err_exit, rc);
- mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
+ mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
OBD_ALLOC(data, sizeof(*data));
if (data == NULL)
/* send the list of supported checksum types */
data->ocd_cksum_types = OBD_CKSUM_ALL;
/* NB: lov_connect() needs to fill in .ocd_index for each OST */
- rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
+ rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
OBD_FREE(data, sizeof(*data));
if (rc) {
CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
- mds->mds_osc_obd = ERR_PTR(rc);
+ mds->mds_lov_obd = ERR_PTR(rc);
RETURN(rc);
}
RETURN(rc);
err_exit:
- mds->mds_osc_exp = NULL;
- mds->mds_osc_obd = ERR_PTR(rc);
+ mds->mds_lov_exp = NULL;
+ mds->mds_lov_obd = ERR_PTR(rc);
RETURN(rc);
}
int rc = 0;
ENTRY;
- if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
- obd_register_observer(mds->mds_osc_obd, NULL);
+ if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
+ obd_register_observer(mds->mds_lov_obd, NULL);
/* The actual disconnect of the mds_lov will be called from
* class_disconnect_exports from mds_lov_clean. So we have to
* ensure that class_cleanup doesn't fail due to the extra ref
* we're holding now. The mechanism to do that already exists -
* the obd_force flag. We'll drop the final ref to the
- * mds_osc_exp in mds_cleanup. */
- mds->mds_osc_obd->obd_force = 1;
+ * mds_lov_exp in mds_cleanup. */
+ mds->mds_lov_obd->obd_force = 1;
}
RETURN(rc);
DEBUG_CAPA_KEY(D_SEC, key, "propagate");
info.capa = key;
- rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
+ rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
KEY_CAPA_KEY, sizeof(info), &info, NULL);
if (rc) {
DEBUG_CAPA_KEY(D_ERROR, key,
mgi.group = mdt_to_obd_objseq(mds->mds_id);
mgi.uuid = uuid;
- rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
+ rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
if (rc != 0)
GOTO(out, rc);
/* Deactivate it for safety */
CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
rc);
- if (!obd->obd_stopping && mds->mds_osc_obd &&
- !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
- obd_notify(mds->mds_osc_obd, watched,
+ if (!obd->obd_stopping && mds->mds_lov_obd &&
+ !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
+ obd_notify(mds->mds_lov_obd, watched,
OBD_NOTIFY_INACTIVE, NULL);
}
break;
}
+ /**
+ * If this is DELORPHAN process, no need create object here,
+ * otherwise this will create a gap of object id, and MDS
+ * might create some orphan log (mds_lov_update_objids), then
+ * remove objects wrongly on OST. Bug 21379.
+ */
+ if (oa->o_valid & OBD_MD_FLFLAGS &&
+ oa->o_flags == OBD_FL_DELORPHAN) {
+ cfs_spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
if (oscc_has_objects_nolock(oscc, 1)) {
memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
oa->o_id = oscc->oscc_next_id;
}
if (rc == 0 && global && is_master)
- rc = obd_quotactl(obd->u.mds.mds_osc_exp, oqctl);
+ rc = obd_quotactl(obd->u.mds.mds_lov_exp, oqctl);
}
if (is_master)
/* only when block qunit is reduced, boardcast to osts */
if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq))
- rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt);
+ rc = obd_quota_adjust_qunit(mds->mds_lov_exp, oqaq, qctxt);
out:
lustre_dqput(dquot);
oqctl->qc_cmd = Q_FINVALIDATE;
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
if (!rc)
- rc = obd_quotactl(mds->mds_osc_exp, oqctl);
+ rc = obd_quotactl(mds->mds_lov_exp, oqctl);
cfs_up_write(&mds->mds_qonoff_sem);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
GOTO(out, rc1);
}
- rc = obd_quotactl(mds->mds_osc_exp, oqctl);
+ rc = obd_quotactl(mds->mds_lov_exp, oqctl);
if (rc && rc != -EALREADY) {
CWARN("mds remote quota[%d] is failed to be off for %d\n",
oqctl->qc_type, rc);
id[GRPQUOTA] = oqctl->qc_id;
/* initialize all slave's limit */
- rc = obd_quotactl(mds->mds_osc_exp, ioqc);
+ rc = obd_quotactl(mds->mds_lov_exp, ioqc);
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL);
if (rc == -EDQUOT || rc == -EBUSY) {
/* adjust remote lqs */
if (QAQ_IS_ADJBLK(qaq)) {
- rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt);
+ rc = obd_quota_adjust_qunit(obd->u.mds.mds_lov_exp, qaq, qctxt);
if (rc < 0)
CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
{
struct mds_obd *mds = &obd->u.mds;
struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
- struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp);
+ struct obd_device *lov_obd = class_exp2obd(mds->mds_lov_exp);
struct lov_obd *lov = &lov_obd->u.lov;
struct quota_adjust_qunit *oqaq = NULL;
struct lustre_quota_info *qinfo = &mds->mds_quota_info;
/* get block usage from OSS */
soqc->qc_dqblk.dqb_curspace = 0;
- rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc);
+ rc = obd_quotactl(obd->u.mds.mds_lov_exp, soqc);
if (!rc || rc == -EREMOTEIO) {
oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
oqctl->qc_dqblk.dqb_valid |= QIF_SPACE;
qctl->qc_type = type;
qctl->qc_id = id;
qctl->qc_stat = QUOTA_RECOVERING;
- rc = obd_quotactl(mds->mds_osc_exp, qctl);
+ rc = obd_quotactl(mds->mds_lov_exp, qctl);
cfs_down_write(&mds->mds_qonoff_sem);
if (rc)
GOTO(out, rc);
/* for mds */
class_incref(obd, "qmaster_recovd_mds", obd);
/* for lov */
- class_incref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
+ class_incref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd);
cfs_complete(&data->comp);
}
}
cfs_up_write(&mds->mds_qonoff_sem);
- class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
+ class_decref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd);
class_decref(obd, "qmaster_recovd_mds", obd);
RETURN(rc);
}
wait_osc_import_state mds ost FULL
clients_up
+ wait_mds_ost_sync
# Veriy that the pool got created and is usable
df $POOL_ROOT > /dev/null
- sleep 5
- # Make sure OST0 can be striped on
- $SETSTRIPE -o 0 -c 1 $POOL_ROOT/$tfile
- STR=$($GETSTRIPE $POOL_ROOT/$tfile | grep 0x | cut -f2 | tr -d " ")
- rm $POOL_ROOT/$tfile
- if [[ "$STR" == "0" ]]; then
- echo "Creating a file in pool$i"
- create_file $POOL_ROOT/file$i pool$i || break
- check_file_in_pool $POOL_ROOT/file$i pool$i || break
- else
- echo "OST 0 seems to be unavailable. Try later."
- fi
+ sleep 5
+ # Make sure OST0 can be striped on
+ $SETSTRIPE -o 0 -c 1 $POOL_ROOT/$tfile
+ STR=$($GETSTRIPE $POOL_ROOT/$tfile | grep 0x | cut -f2 | tr -d " ")
+ rm $POOL_ROOT/$tfile
+ if [[ "$STR" == "0" ]]; then
+ echo "Creating a file in pool$i"
+ create_file $POOL_ROOT/file$i pool$i || break
+ check_file_in_pool $POOL_ROOT/file$i pool$i || break
+ else
+ echo "OST 0 seems to be unavailable. Try later."
+ fi
done
rm -rf $POOL_ROOT
}
run_test 25 "Create new pool and restart MDS ======================="
+test_26() {
+ local OSTCOUNT=`lctl get_param -n lov.$LOVNAME.*clilov*.numobd`
+ [[ $OSTCOUNT -le 2 ]] && skip_env "Need at least 3 OSTs" && return
+ set_cleanup_trap
+ local dev=$(mdsdevname ${SINGLEMDS//mds/})
+ local POOL_ROOT=${POOL_ROOT:-$DIR/$tdir}
+
+ mkdir -p $POOL_ROOT
+
+ create_pool_nofail $POOL
+
+ do_facet $SINGLEMDS "lctl pool_add $FSNAME.pool1 OST0000; sync"
+ wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.pool1 | \
+ sort -u | grep $FSNAME-OST0000_UUID " "$FSNAME-OST0000_UUID" || \
+ error "pool_add failed: $1; $2"
+
+ do_facet $SINGLEMDS "lctl pool_add $FSNAME.pool1 OST0002; sync"
+ wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.pool1 | \
+ sort -u | grep $FSNAME-OST0002_UUID" "$FSNAME-OST0002_UUID" || \
+ error "pool_add failed: $1; $2"
+
+ # Veriy that the pool got created and is usable
+ df $POOL_ROOT
+ echo "Creating files in $POOL"
+
+ for ((i=0;i<10;i++)); do
+ #OBD_FAIL_MDS_OSC_CREATE_FAIL 0x147
+ #Fail OST0000 to make sure the objects create on the other ost in the pool
+ do_facet $SINGLEMDS lctl set_param fail_loc=0x147
+ do_facet $SINGLEMDS lctl set_param fail_val=0
+ create_file $POOL_ROOT/file$i $POOL 1 -1 || break
+ do_facet $SINGLEMDS lctl set_param fail_loc=0
+ check_file_in_pool $POOL_ROOT/file$i $POOL || break
+ done
+ rm -rf $POOL_ROOT
+}
+run_test 26 "Choose other OSTs in the pool first in the creation remedy"
+
log "cleanup: ======================================================"
cd $ORIG_PWD
cleanup_pools $FSNAME
# bug number: 10124
ALWAYS_EXCEPT="15c $REPLAY_DUAL_EXCEPT"
+LFS=${LFS:-lfs}
+SETSTRIPE=${SETSTRIPE:-"$LFS setstripe"}
+GETSTRIPE=${GETSTRIPE:-"$LFS getstripe"}
SAVE_PWD=$PWD
PTLDEBUG=${PTLDEBUG:--1}
LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
# as test_15a
test_14b() {
+ wait_mds_ost_sync
+ wait_destroy_complete
BEFOREUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
mkdir -p $MOUNT1/$tdir
+ $SETSTRIPE -o 0 $MOUNT1/$tdir
replay_barrier $SINGLEMDS
- createmany -o $MOUNT1/$tfile- 5
- echo "data" > $MOUNT2/$tdir/$tfile-2
- createmany -o $MOUNT1/$tfile-3- 5
+ createmany -o $MOUNT1/$tdir/$tfile- 5
+
+ $SETSTRIPE -o 0 $MOUNT2/f14b-3
+ echo "data" > $MOUNT2/f14b-3
+ createmany -o $MOUNT1/$tdir/$tfile-3- 5
umount $MOUNT2
fail $SINGLEMDS
wait_recovery_complete $SINGLEMDS || error "MDS recovery not done"
# first 25 files should have been replayed
- unlinkmany $MOUNT1/$tfile- 5 || return 2
- unlinkmany $MOUNT1/$tfile-3- 5 || return 3
+ unlinkmany $MOUNT1/$tdir/$tfile- 5 || return 2
+ unlinkmany $MOUNT1/$tdir/$tfile-3- 5 || return 3
zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail"
local -a sync=($(do_nodes $(comma_list $(osts_nodes)) \
"$LCTL get_param -n obdfilter.*.mds_sync"))
local con=1
+ local i
for ((i=0; i<${#sync[@]}; i++)); do
[ ${sync[$i]} -eq 0 ] && continue
# there is a not finished MDS-OST synchronization
}
static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
+ struct lov_user_ost_data_v1 *objects,
int is_dir, int verbose, int depth,
char *pool_name)
{
llapi_printf(LLAPI_MSG_NORMAL, "%sstripe_offset: ",
prefix);
llapi_printf(LLAPI_MSG_NORMAL, "%u%c",
- lum->lmm_objects[0].l_ost_idx, nl);
+ objects[0].l_ost_idx, nl);
}
if ((verbose & VERBOSE_POOL) && (pool_name != NULL)) {
}
if (obdstripe == 1)
- lov_dump_user_lmm_header(lum, path, is_dir, header, depth,
+ lov_dump_user_lmm_header(lum, path, objects, is_dir, header, depth,
pool_name);
if (!is_dir && (header & VERBOSE_OBJID)) {