X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=dadea414f07f40fa2fece8d95a4995866e2a978f;hb=960c1ec8c030cc48b3208e892849dcf5722f28aa;hp=381920951195492d46d2d0dbf355507edb19726f;hpb=4fd42f6ef396a71405578687276fb750f7829e73;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 3819209..dadea41 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -299,7 +299,6 @@ int lmv_check_connect(struct obd_device *obd) { } lmv_set_timeouts(obd); - class_export_put(exp); return 0; @@ -377,6 +376,8 @@ out_local: if (!lmv->connected) class_export_put(exp); rc = class_disconnect(exp, 0); + if (lmv->refcount == 0) + lmv->connected = 0; RETURN(rc); } @@ -395,8 +396,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, for (i = 0; i < lmv->desc.ld_tgt_count; i++) { int err; - err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, - len, karg, uarg); + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obddev->obd_name, i); + continue; + } + + err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg); if (err) { if (lmv->tgts[i].active) { CERROR("error: iocontrol MDC %s on MDT" @@ -489,6 +494,11 @@ static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs, RETURN(rc); for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obd->obd_name, i); + continue; + } + rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age); if (rc) { CERROR("can't stat MDS #%d (%s)\n", i, @@ -578,6 +588,12 @@ static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid, for (i = 0; i < obj->objcount; i++) { + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", + obd->obd_name, i); + continue; + } + /* skip master obj. */ if (fid_equal(&obj->fid, &obj->objs[i].fid)) continue; @@ -643,7 +659,8 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid, obj = lmv_grab_obj(obd, pfid); if (obj) { /* directory is splitted. look for right mds for this name. */ - mds = raw_name2idx(obj->objcount, name, len); + mds = raw_name2idx(obj->hashtype, obj->objcount, name, len); + mds = obj->objs[mds].fid.mds; lmv_put_obj(obj); } rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cfid, it, data); @@ -737,7 +754,7 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, struct lmv_obd *lmv = &obd->u.lmv; struct mds_body *body; struct lmv_obj *obj; - int rc, mds; + int rc, mds, loop = 0; ENTRY; rc = lmv_check_connect(obd); @@ -747,9 +764,10 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, if (!lmv->desc.ld_active_tgt_count) RETURN(-EIO); repeat: + LASSERT(++loop <= 2); obj = lmv_grab_obj(obd, &op_data->fid1); if (obj) { - mds = raw_name2idx(obj->objcount, op_data->name, + mds = raw_name2idx(obj->hashtype, obj->objcount, op_data->name, op_data->namelen); op_data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); @@ -819,13 +837,13 @@ int lmv_enqueue_slaves(struct obd_export *exp, int locktype, LASSERT(mea != NULL); for (i = 0; i < mea->mea_count; i++) { - if (lmv->tgts[i].ltd_exp == NULL) - continue; - memset(&data2, 0, sizeof(data2)); data2.fid1 = mea->mea_fids[i]; mds = data2.fid1.mds; + if (lmv->tgts[mds].ltd_exp == NULL) + continue; + rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode, &data2, lockh + i, lmm, lmmsize, cb_completion, cb_blocking, cb_data); @@ -886,8 +904,8 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, if (obj) { /* directory is splitted. look for right mds for this * name */ - mds = raw_name2idx(obj->objcount, (char *)data->name, - data->namelen); + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)data->name, data->namelen); data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); } @@ -909,7 +927,7 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ll_fid rfid = *fid; - int rc, mds = fid->mds; + int rc, mds = fid->mds, loop = 0; struct mds_body *body; struct lmv_obj *obj; ENTRY; @@ -917,10 +935,11 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); repeat: + LASSERT(++loop <= 2); obj = lmv_grab_obj(obd, fid); if (obj) { /* directory is splitted. look for right mds for this name */ - mds = raw_name2idx(obj->objcount, filename, namelen - 1); + mds = raw_name2idx(obj->hashtype, obj->objcount, filename, namelen - 1); rfid = obj->objs[mds].fid; lmv_put_obj(obj); } @@ -931,7 +950,7 @@ repeat: (unsigned long)rfid.mds, (unsigned long)rfid.id, (unsigned long)rfid.generation); - rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, + rc = md_getattr_name(lmv->tgts[rfid.mds].ltd_exp, &rfid, filename, namelen, valid, ea_size, request); if (rc == 0) { /* this could be cross-node reference. in this case all we have @@ -985,7 +1004,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, /* usual link request */ obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - rc = raw_name2idx(obj->objcount, data->name, + rc = raw_name2idx(obj->hashtype, obj->objcount, data->name, data->namelen); data->fid1 = obj->objs[rc].fid; lmv_put_obj(obj); @@ -1063,7 +1082,7 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, if (obj) { /* directory is already splitted, so we have to forward request * to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)old, oldlen); + mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)old, oldlen); data->fid1 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, (unsigned long)obj->objs[mds].fid.mds, @@ -1076,7 +1095,7 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, if (obj) { /* directory is already splitted, so we have to forward request * to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)new, newlen); + mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)new, newlen); data->fid2 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, (unsigned long)obj->objs[mds].fid.mds, @@ -1099,10 +1118,10 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - int rc = 0, i = data->fid1.mds; struct ptlrpc_request *req; struct mds_body *body; struct lmv_obj *obj; + int rc = 0, i; ENTRY; rc = lmv_check_connect(obd); @@ -1120,8 +1139,8 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, for (i = 0; i < obj->objcount; i++) { data->fid1 = obj->objs[i].fid; - rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, - ea, ealen, ea2, ea2len, &req); + rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data, + iattr, ea, ealen, ea2, ea2len, &req); if (fid_equal(&obj->fid, &obj->objs[i].fid)) { /* this is master object and this request should @@ -1136,14 +1155,14 @@ int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, } lmv_put_obj(obj); } else { - LASSERT(i < lmv->desc.ld_tgt_count); - rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, - ealen, ea2, ea2len, request); + LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count); + rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data, + iattr, ea, ealen, ea2, ea2len, request); if (rc == 0) { body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body)); LASSERT(body != NULL); - LASSERT(body->mds == i); + LASSERT(body->mds == data->fid1.mds); } } RETURN(rc); @@ -1161,7 +1180,7 @@ int lmv_sync(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); - rc = md_sync(lmv->tgts[0].ltd_exp, fid, request); + rc = md_sync(lmv->tgts[fid->mds].ltd_exp, fid, request); RETURN(rc); } @@ -1282,13 +1301,14 @@ int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data, LASSERT(mea != NULL); for (i = 0; i < mea->mea_count; i++) { - if (lmv->tgts[i].ltd_exp == NULL) - continue; - memset(&data2, 0, sizeof(data2)); data2.fid1 = mea->mea_fids[i]; data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR; mds = data2.fid1.mds; + + if (lmv->tgts[mds].ltd_exp == NULL) + continue; + rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req); CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n", (unsigned long) mea->mea_fids[i].mds, @@ -1338,7 +1358,7 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - i = raw_name2idx(obj->objcount, data->name, + i = raw_name2idx(obj->hashtype, obj->objcount, data->name, data->namelen); data->fid1 = obj->objs[i].fid; lmv_put_obj(obj); @@ -1369,6 +1389,7 @@ struct obd_device *lmv_get_real_obd(struct obd_export *exp, rc = lmv_check_connect(obd); if (rc) RETURN(ERR_PTR(rc)); +#warning "we need well-desgined readdir() implementation to remove this mess" obd = lmv->tgts[0].ltd_exp->exp_obd; EXIT; return obd; @@ -1396,6 +1417,11 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize) RETURN(0); for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obd->obd_name, i); + continue; + } + rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize); if (rc) { CERROR("obd_init_ea_size() failed on MDT target %d, " @@ -1436,9 +1462,10 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mea *mea; int i, c, rc = 0; + struct mea *mea; struct ll_fid mfid; + int lcount; ENTRY; rc = lmv_check_connect(obd); @@ -1471,11 +1498,11 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, mea = (struct mea *)*ea; if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count) mea->mea_count = lmv->desc.ld_tgt_count; + mea->mea_magic = MEA_MAGIC_ALL_CHARS; mea->mea_master = -1; - - for (i = 0, c = 0; c < mea->mea_count && - i < lmv->desc.ld_tgt_count; i++) { + lcount = lmv->desc.ld_tgt_count; + for (i = 0, c = 0; c < mea->mea_count && i < lcount; i++) { struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; @@ -1510,8 +1537,6 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, c++; CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n", i, oa->o_id, oa->o_generation); - CDEBUG(D_ERROR, "dirobj at mds %d: "LPU64"/%u\n", - i, oa->o_id, oa->o_generation); } LASSERT(c == mea->mea_count); CDEBUG(D_OTHER, "%d dirobjects created\n", (int) mea->mea_count);