X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=9d6aab1e54db96773c72bfdf94354e1eda7ca039;hp=3fe8196c21a5ccfc3321e3dffbed82de69720eb3;hb=d41530916baeff6b289e5658cc27d79fdd34cb7e;hpb=89891a7b931071eccb8c877bcbcd05c4635076e9 diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 3fe8196..9d6aab1 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -68,6 +68,9 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, spin_lock(&lmv->lmv_lock); for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) { + if (tgt->ltd_exp == NULL) + continue; + CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n", i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie); if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0) @@ -168,7 +171,7 @@ int lmv_detach(struct obd_device *dev) * say caller that everything is okay. Real connection will be performed * later. */ static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd, - struct obd_uuid *cluuid) + struct obd_uuid *cluuid, unsigned long connect_flags) { struct lmv_obd *lmv = &obd->u.lmv; struct obd_export *exp; @@ -191,6 +194,7 @@ static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd, } lmv->cluuid = *cluuid; + lmv->connect_flags = connect_flags; lmv->connected = 0; lmv->exp = exp; @@ -219,7 +223,8 @@ void lmv_set_timeouts(struct obd_device *obd) } /* Performs a check if passed obd is connected. If no - connect it. */ -int lmv_check_connect(struct obd_device *obd) { +int lmv_check_connect(struct obd_device *obd) +{ struct lmv_obd *lmv = &obd->u.lmv; struct obd_uuid *cluuid; struct lmv_tgt_desc *tgts; @@ -268,7 +273,7 @@ int lmv_check_connect(struct obd_device *obd) { GOTO(out_disc, rc = -EINVAL); } - rc = obd_connect(&conn, tgt_obd, &lmv_osc_uuid); + rc = obd_connect(&conn, tgt_obd, &lmv_osc_uuid, lmv->connect_flags); if (rc) { CERROR("Target %s connect error %d\n", tgts->uuid.uuid, rc); @@ -296,7 +301,6 @@ int lmv_check_connect(struct obd_device *obd) { } lmv_set_timeouts(obd); - class_export_put(exp); return 0; @@ -311,7 +315,7 @@ int lmv_check_connect(struct obd_device *obd) { rc2 = obd_disconnect(tgts->ltd_exp, 0); if (rc2) CERROR("error: LMV target %s disconnect on MDT idx %d: " - "rc = %d\n", uuid.uuid, i, rc2); + "error %d\n", uuid.uuid, i, rc2); } class_disconnect(exp, 0); RETURN (rc); @@ -369,11 +373,13 @@ static int lmv_disconnect(struct obd_export *exp, int flags) } out_local: - /* This is the case when no real connection is established by + /* this is the case when no real connection is established by * lmv_check_connect(). */ if (!lmv->connected) class_export_put(exp); rc = class_disconnect(exp, 0); + if (lmv->refcount == 0) + lmv->connected = 0; RETURN(rc); } @@ -392,8 +398,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, for (i = 0; i < lmv->desc.ld_tgt_count; i++) { int err; - err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, - len, karg, uarg); + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obddev->obd_name, i); + continue; + } + + err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg); if (err) { if (lmv->tgts[i].active) { CERROR("error: iocontrol MDC %s on MDT" @@ -413,13 +423,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) { - struct lustre_cfg *lcfg = buf; + int i, rc = 0; struct lmv_desc *desc; - struct lmv_obd *lmv = &obd->u.lmv; struct obd_uuid *uuids; struct lmv_tgt_desc *tgts; - int i; - int rc = 0; + struct lustre_cfg *lcfg = buf; + struct lmv_obd *lmv = &obd->u.lmv; ENTRY; if (lcfg->lcfg_inllen1 < 1) { @@ -450,7 +459,7 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) OBD_ALLOC(lmv->tgts, lmv->bufsize); if (lmv->tgts == NULL) { CERROR("Out of memory\n"); - RETURN(-EINVAL); + RETURN(-ENOMEM); } lmv->desc = *desc; @@ -459,10 +468,17 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++) tgts->uuid = uuids[i]; + lmv->max_cookiesize = 0; + lmv->max_easize = sizeof(struct ll_fid) * desc->ld_tgt_count + sizeof(struct mea); - lmv->max_cookiesize = 0; + rc = lmv_setup_mgr(obd); + if (rc) { + CERROR("Can't setup LMV object manager, " + "error %d.\n", rc); + OBD_FREE(lmv->tgts, lmv->bufsize); + } RETURN(rc); } @@ -480,6 +496,11 @@ static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs, RETURN(rc); for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obd->obd_name, i); + continue; + } + rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age); if (rc) { CERROR("can't stat MDS #%d (%s)\n", i, @@ -502,7 +523,7 @@ static int lmv_cleanup(struct obd_device *obd, int flags) { struct lmv_obd *lmv = &obd->u.lmv; ENTRY; - lmv_cleanup_objs(obd); + lmv_cleanup_mgr(obd); OBD_FREE(lmv->tgts, lmv->bufsize); RETURN(0); } @@ -522,39 +543,75 @@ static int lmv_getstatus(struct obd_export *exp, struct ll_fid *fid) } static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid, - unsigned long valid, unsigned int ea_size, - struct ptlrpc_request **request) + unsigned long valid, unsigned int ea_size, + struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; int rc, i = fid->mds; struct lmv_obj *obj; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); - obj = lmv_grab_obj(obd, fid, 0); + + LASSERT(i < lmv->desc.ld_tgt_count); + + rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid, + ea_size, request); + if (rc) + RETURN(rc); + + obj = lmv_grab_obj(obd, fid); + CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n", - (unsigned long) fid->mds, - (unsigned long) fid->id, - (unsigned long) fid->generation, - obj ? "(splitted)" : ""); + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation, obj ? "(splitted)" : ""); + + /* if object is splitted, then we loop over all the slaves and gather + * size attribute. In ideal world we would have to gather also mds field + * from all slaves, as object is spread over the cluster and this is + * definitely interesting information and it is not good to loss it, + * but...*/ + if (obj) { + struct mds_body *body; - LASSERT(fid->mds < lmv->desc.ld_tgt_count); - rc = md_getattr(lmv->tgts[i].ltd_exp, fid, - valid, ea_size, request); - if (rc == 0 && obj) { - /* we have to loop over dirobjs here and gather attrs - * for all the slaves */ -#warning "attrs gathering here" + if (*request == NULL) { + lmv_put_obj(obj); + RETURN(rc); + } + + body = lustre_msg_buf((*request)->rq_repmsg, 0, + sizeof(*body)); + LASSERT(body != NULL); + + lmv_lock_obj(obj); + + for (i = 0; i < obj->objcount; i++) { + + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", + obd->obd_name, i); + continue; + } + + /* skip master obj. */ + if (fid_equal(&obj->fid, &obj->objs[i].fid)) + continue; + + body->size += obj->objs[i].size; + } + + lmv_unlock_obj(obj); + lmv_put_obj(obj); } - lmv_put_obj(obj); + RETURN(rc); } -static int lmv_change_cbdata(struct obd_export *exp, - struct ll_fid *fid, - ldlm_iterator_t it, void *data) +static int lmv_change_cbdata(struct obd_export *exp, struct ll_fid *fid, + ldlm_iterator_t it, void *data) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -564,12 +621,15 @@ static int lmv_change_cbdata(struct obd_export *exp, rc = lmv_check_connect(obd); if (rc) RETURN(rc); - CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", - (unsigned long) fid->mds, - (unsigned long) fid->id, - (unsigned long) fid->generation); + + CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n", (unsigned long)fid->mds, + (unsigned long)fid->id, (unsigned long)fid->generation); + LASSERT(fid->mds < lmv->desc.ld_tgt_count); - rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, fid, it, data); + + rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, + fid, it, data); + RETURN(rc); } @@ -582,23 +642,27 @@ static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid, struct lmv_obj *obj; int rc = 0, mds; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); + LASSERT(pfid->mds < lmv->desc.ld_tgt_count); LASSERT(cfid->mds < lmv->desc.ld_tgt_count); + CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu:%*s -> %lu/%lu/%lu\n", - (unsigned long) pfid->mds, (unsigned long) pfid->id, - (unsigned long) pfid->generation, len, name, - (unsigned long) cfid->mds, (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long)pfid->mds, (unsigned long)pfid->id, + (unsigned long)pfid->generation, len, name, + (unsigned long)cfid->mds, (unsigned long)cfid->id, + (unsigned long)cfid->generation); - /* this is default mds for directory name belongs to */ + /* this is default mds for directory name belongs to. */ mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid, 0); + obj = lmv_grab_obj(obd, pfid); if (obj) { - /* directory is splitted. look for right mds for this name */ - mds = raw_name2idx(obj->objcount, name, len); + /* directory is splitted. look for right mds for this name. */ + mds = raw_name2idx(obj->hashtype, obj->objcount, name, len); + mds = obj->objs[mds].fid.mds; lmv_put_obj(obj); } rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cfid, it, data); @@ -614,10 +678,8 @@ static int lmv_valid_attrs(struct obd_export *exp, struct ll_fid *fid) rc = lmv_check_connect(obd); if (rc) RETURN(rc); - CDEBUG(D_OTHER, "validate %lu/%lu/%lu\n", - (unsigned long) fid->mds, - (unsigned long) fid->id, - (unsigned long) fid->generation); + CDEBUG(D_OTHER, "validate %lu/%lu/%lu\n", (unsigned long) fid->mds, + (unsigned long) fid->id, (unsigned long) fid->generation); LASSERT(fid->mds < lmv->desc.ld_tgt_count); rc = md_valid_attrs(lmv->tgts[fid->mds].ltd_exp, fid); RETURN(rc); @@ -646,6 +708,7 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ptlrpc_request *req = NULL; + struct lmv_obj *obj; struct lustre_md md; unsigned long valid; int mealen, rc; @@ -659,21 +722,25 @@ int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) rc = md_getattr(lmv->tgts[fid->mds].ltd_exp, fid, valid, mealen, &req); if (rc) { - CERROR("md_getattr() failed, rc = %d\n", rc); + CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, rc); } rc = mdc_req2lustre_md(exp, req, 0, NULL, &md); if (rc) { - CERROR("mdc_req2lustre_md() failed, rc = %d\n", rc); + CERROR("mdc_req2lustre_md() failed, error %d\n", rc); GOTO(cleanup, rc); } if (md.mea == NULL) GOTO(cleanup, rc = -ENODATA); - rc = lmv_create_obj_from_attrs(exp, fid, md.mea); - obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea); + obj = lmv_create_obj(exp, fid, md.mea); + if (IS_ERR(obj)) + rc = PTR_ERR(obj); + + lmv_put_obj(obj); + obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea); cleanup: if (req) @@ -687,9 +754,9 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mds_body *mds_body; + struct mds_body *body; struct lmv_obj *obj; - int rc, mds; + int rc, mds, loop = 0; ENTRY; rc = lmv_check_connect(obd); @@ -699,36 +766,39 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, if (!lmv->desc.ld_active_tgt_count) RETURN(-EIO); repeat: - obj = lmv_grab_obj(obd, &op_data->fid1, 0); + LASSERT(++loop <= 2); + obj = lmv_grab_obj(obd, &op_data->fid1); if (obj) { - mds = raw_name2idx(obj->objcount, op_data->name, + mds = raw_name2idx(obj->hashtype, obj->objcount, op_data->name, op_data->namelen); op_data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); } - CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n", - op_data->namelen, op_data->name, - (unsigned long) op_data->fid1.mds, - (unsigned long) op_data->fid1.id, - (unsigned long) op_data->fid1.generation); + CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n", op_data->namelen, + op_data->name, (unsigned long)op_data->fid1.mds, + (unsigned long)op_data->fid1.id, + (unsigned long)op_data->fid1.generation); + rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data, datalen, mode, uid, gid, rdev, request); if (rc == 0) { if (*request == NULL) - RETURN(rc); - mds_body = lustre_msg_buf((*request)->rq_repmsg, 0, - sizeof(*mds_body)); - LASSERT(mds_body != NULL); - CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n", - (unsigned long) mds_body->fid1.id, - (unsigned long) mds_body->fid1.generation, - op_data->fid1.mds); - LASSERT(mds_body->valid & OBD_MD_MDS || - mds_body->mds == op_data->fid1.mds); + RETURN(rc); + + body = lustre_msg_buf((*request)->rq_repmsg, 0, + sizeof(*body)); + LASSERT(body != NULL); + + CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, " + "mds = %d\n", (unsigned long)body->fid1.id, + (unsigned long)body->fid1.generation, op_data->fid1.mds); + + LASSERT(body->valid & OBD_MD_MDS || + body->mds == op_data->fid1.mds); } else if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ + /* directory got splitted. time to update local object and + * repeat the request with proper MDS */ rc = lmv_get_mea_and_update_object(exp, &op_data->fid1); if (rc == 0) { ptlrpc_req_finished(*request); @@ -755,11 +825,10 @@ int lmv_done_writing(struct obd_export *exp, struct obdo *obdo) } int lmv_enqueue_slaves(struct obd_export *exp, int locktype, - struct lookup_intent *it, int lockmode, - struct mdc_op_data *data, struct lustre_handle *lockh, - void *lmm, int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, void *cb_data) + struct lookup_intent *it, int lockmode, + struct mdc_op_data *data, struct lustre_handle *lockh, + void *lmm, int lmmsize, ldlm_completion_callback cb_completion, + ldlm_blocking_callback cb_blocking, void *cb_data) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -770,19 +839,21 @@ int lmv_enqueue_slaves(struct obd_export *exp, int locktype, LASSERT(mea != NULL); for (i = 0; i < mea->mea_count; i++) { - if (lmv->tgts[i].ltd_exp == NULL) - continue; - memset(&data2, 0, sizeof(data2)); data2.fid1 = mea->mea_fids[i]; mds = data2.fid1.mds; + + if (lmv->tgts[mds].ltd_exp == NULL) + continue; + rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode, &data2, lockh + i, lmm, lmmsize, cb_completion, cb_blocking, cb_data); + CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n", - (unsigned long) mea->mea_fids[i].mds, - (unsigned long) mea->mea_fids[i].id, - (unsigned long) mea->mea_fids[i].generation, + (unsigned long)mea->mea_fids[i].mds, + (unsigned long)mea->mea_fids[i].id, + (unsigned long)mea->mea_fids[i].generation, rc, it->d.lustre.it_status); if (rc) GOTO(cleanup, rc); @@ -810,8 +881,7 @@ cleanup: int lmv_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, int lock_mode, struct mdc_op_data *data, struct lustre_handle *lockh, - void *lmm, int lmmsize, - ldlm_completion_callback cb_completion, + void *lmm, int lmmsize, ldlm_completion_callback cb_completion, ldlm_blocking_callback cb_blocking, void *cb_data) { struct obd_device *obd = exp->exp_obd; @@ -832,19 +902,19 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, } if (data->namelen) { - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - /* directory is splitted. look for - * right mds for this name */ - mds = raw_name2idx(obj->objcount, (char *)data->name, - data->namelen); + /* directory is splitted. look for right mds for this + * name */ + mds = raw_name2idx(obj->hashtype, obj->objcount, + (char *)data->name, data->namelen); data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); } } - CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", - LL_IT2STR(it), (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation); + CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n", LL_IT2STR(it), + (unsigned long)data->fid1.id, (unsigned long)data->fid1.generation); + rc = md_enqueue(lmv->tgts[data->fid1.mds].ltd_exp, lock_type, it, lock_mode, data, lockh, lmm, lmmsize, cb_completion, cb_blocking, cb_data); @@ -853,13 +923,13 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, } int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, - char *filename, int namelen, unsigned long valid, - unsigned int ea_size, struct ptlrpc_request **request) + char *filename, int namelen, unsigned long valid, + unsigned int ea_size, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ll_fid rfid = *fid; - int rc, mds = fid->mds; + int rc, mds = fid->mds, loop = 0; struct mds_body *body; struct lmv_obj *obj; ENTRY; @@ -867,24 +937,27 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); repeat: - obj = lmv_grab_obj(obd, fid, 0); + LASSERT(++loop <= 2); + obj = lmv_grab_obj(obd, fid); if (obj) { /* directory is splitted. look for right mds for this name */ - mds = raw_name2idx(obj->objcount, filename, namelen - 1); + mds = raw_name2idx(obj->hashtype, obj->objcount, filename, namelen - 1); rfid = obj->objs[mds].fid; lmv_put_obj(obj); } + CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu -> %lu/%lu/%lu\n", - namelen, filename, (unsigned long) fid->mds, - (unsigned long) fid->id, (unsigned long) fid->generation, - (unsigned long) rfid.mds, (unsigned long) rfid.id, - (unsigned long) rfid.generation); - rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, namelen, - valid, ea_size, request); + namelen, filename, (unsigned long)fid->mds, + (unsigned long)fid->id, (unsigned long)fid->generation, + (unsigned long)rfid.mds, (unsigned long)rfid.id, + (unsigned long)rfid.generation); + + rc = md_getattr_name(lmv->tgts[rfid.mds].ltd_exp, &rfid, filename, + namelen, valid, ea_size, request); if (rc == 0) { - /* this could be cross-node reference. in this case all - * we have right now is mds/ino/generation triple. we'd - * like to find other attributes */ + /* this could be cross-node reference. in this case all we have + * right now is mds/ino/generation triple. we'd like to find + * other attributes */ body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body)); LASSERT(body != NULL); if (body->valid & OBD_MD_MDS) { @@ -900,8 +973,8 @@ repeat: *request = req; } } else if (rc == -ERESTART) { - /* directory got splitted. time to update local object - * and repeat the request with proper MDS */ + /* directory got splitted. time to update local object and + * repeat the request with proper MDS */ rc = lmv_get_mea_and_update_object(exp, &rfid); if (rc == 0) { ptlrpc_req_finished(*request); @@ -913,8 +986,8 @@ repeat: /* - * llite passes fid of an target inode in data->fid1 and - * fid of directory in data->fid2 + * llite passes fid of an target inode in data->fid1 and fid of directory in + * data->fid2 */ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct ptlrpc_request **request) @@ -924,29 +997,36 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct lmv_obj *obj; int rc; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); + if (data->namelen != 0) { /* usual link request */ - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - rc = raw_name2idx(obj->objcount, data->name, - data->namelen); + rc = raw_name2idx(obj->hashtype, obj->objcount, data->name, + data->namelen); data->fid1 = obj->objs[rc].fid; lmv_put_obj(obj); } - CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n", - (unsigned) data->fid2.mds, (unsigned) data->fid2.id, - (unsigned) data->fid2.generation, data->namelen, - data->name, (unsigned) data->fid1.mds, - (unsigned) data->fid1.id, - (unsigned) data->fid1.generation, data->fid1.mds); + + CDEBUG(D_OTHER,"link %lu/%lu/%lu:%*s to %lu/%lu/%lu mds %lu\n", + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, + data->namelen, data->name, + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, + (unsigned long)data->fid1.mds); } else { /* request from MDS to acquire i_links for inode by fid1 */ - CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n", - (unsigned) data->fid1.mds, (unsigned) data->fid1.id, - (unsigned) data->fid1.generation); + CDEBUG(D_OTHER, "inc i_nlinks for %lu/%lu/%lu\n", + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation); } rc = md_link(lmv->tgts[data->fid1.mds].ltd_exp, data, request); @@ -964,66 +1044,67 @@ int lmv_rename(struct obd_export *exp, struct mdc_op_data *data, ENTRY; CDEBUG(D_OTHER, "rename %*s in %lu/%lu/%lu to %*s in %lu/%lu/%lu\n", - oldlen, old, (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, + oldlen, old, (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, newlen, new, (unsigned long) data->fid2.mds, (unsigned long) data->fid2.id, (unsigned long) data->fid2.generation); + if (!fid_equal(&data->fid1, &data->fid2)) CWARN("cross-node rename %lu/%lu/%lu:%*s to %lu/%lu/%lu:%*s\n", - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, oldlen, old, - (unsigned long) data->fid2.mds, - (unsigned long) data->fid2.id, - (unsigned long) data->fid2.generation, newlen, new); + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, oldlen, old, + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, newlen, new); rc = lmv_check_connect(obd); if (rc) RETURN(rc); if (oldlen == 0) { - /* MDS with old dir entry is asking another MDS - * to create name there */ + /* MDS with old dir entry is asking another MDS to create name + * there */ CDEBUG(D_OTHER, "create %*s(%d/%d) in %lu/%lu/%lu pointing to %lu/%lu/%lu\n", newlen, new, oldlen, newlen, - (unsigned long) data->fid2.mds, - (unsigned long) data->fid2.id, - (unsigned long) data->fid2.generation, - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation); + (unsigned long)data->fid2.mds, + (unsigned long)data->fid2.id, + (unsigned long)data->fid2.generation, + (unsigned long)data->fid1.mds, + (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation); mds = data->fid2.mds; goto request; } - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - /* directory is already splitted, so we have to forward - * request to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)old, oldlen); + /* directory is already splitted, so we have to forward request + * to the right MDS */ + mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)old, oldlen); data->fid1 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, - (unsigned long) obj->objs[mds].fid.mds, - (unsigned long) obj->objs[mds].fid.id, - (unsigned long) obj->objs[mds].fid.generation); + (unsigned long)obj->objs[mds].fid.mds, + (unsigned long)obj->objs[mds].fid.id, + (unsigned long)obj->objs[mds].fid.generation); + lmv_put_obj(obj); } - lmv_put_obj(obj); - obj = lmv_grab_obj(obd, &data->fid2, 0); + obj = lmv_grab_obj(obd, &data->fid2); if (obj) { - /* directory is already splitted, so we have to forward - * request to the right MDS */ - mds = raw_name2idx(obj->objcount, (char *)new, newlen); + /* directory is already splitted, so we have to forward request + * to the right MDS */ + mds = raw_name2idx(obj->hashtype, obj->objcount, (char *)new, newlen); data->fid2 = obj->objs[mds].fid; CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds, - (unsigned long) obj->objs[mds].fid.mds, - (unsigned long) obj->objs[mds].fid.id, - (unsigned long) obj->objs[mds].fid.generation); + (unsigned long)obj->objs[mds].fid.mds, + (unsigned long)obj->objs[mds].fid.id, + (unsigned long)obj->objs[mds].fid.generation); + lmv_put_obj(obj); } - lmv_put_obj(obj); mds = data->fid1.mds; @@ -1034,51 +1115,56 @@ request: } int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data, - struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len, - struct ptlrpc_request **request) + struct iattr *iattr, void *ea, int ealen, void *ea2, + int ea2len, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - int rc = 0, i = data->fid1.mds; struct ptlrpc_request *req; - struct mds_body *mds_body; + struct mds_body *body; struct lmv_obj *obj; + int rc = 0, i; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - obj = lmv_grab_obj(obd, &data->fid1, 0); + obj = lmv_grab_obj(obd, &data->fid1); + CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n", - (unsigned long) data->fid1.mds, - (unsigned long) data->fid1.id, - (unsigned long) data->fid1.generation, iattr->ia_valid, + (unsigned long)data->fid1.mds, (unsigned long)data->fid1.id, + (unsigned long)data->fid1.generation, iattr->ia_valid, obj ? ", splitted" : ""); + if (obj) { for (i = 0; i < obj->objcount; i++) { data->fid1 = obj->objs[i].fid; - rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, - ealen, ea2, ea2len, &req); - LASSERT(rc == 0); + + rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data, + iattr, ea, ealen, ea2, ea2len, &req); + if (fid_equal(&obj->fid, &obj->objs[i].fid)) { - /* this is master object and this request - * should be returned back to llite */ + /* this is master object and this request should + * be returned back to llite */ *request = req; } else { ptlrpc_req_finished(req); } + + if (rc) + break; } lmv_put_obj(obj); } else { LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count); - rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen, - ea2, ea2len, request); + rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data, + iattr, ea, ealen, ea2, ea2len, request); if (rc == 0) { - mds_body = lustre_msg_buf((*request)->rq_repmsg, 0, - sizeof(*mds_body)); - LASSERT(mds_body != NULL); - LASSERT(mds_body->mds == i); + body = lustre_msg_buf((*request)->rq_repmsg, 0, + sizeof(*body)); + LASSERT(body != NULL); + LASSERT(body->mds == data->fid1.mds); } } RETURN(rc); @@ -1096,12 +1182,12 @@ int lmv_sync(struct obd_export *exp, struct ll_fid *fid, if (rc) RETURN(rc); - rc = md_sync(lmv->tgts[0].ltd_exp, fid, request); + rc = md_sync(lmv->tgts[fid->mds].ltd_exp, fid, request); RETURN(rc); } -int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, void *data, int flag) +int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) { struct lustre_handle lockh; struct lmv_obj *obj; @@ -1120,17 +1206,15 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, case LDLM_CB_CANCELING: /* time to drop cached attrs for dirobj */ obj = lock->l_ast_data; - if (!obj) - break; - - CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n", - lock->l_resource->lr_name.name[3] == 1 ? - "LOOKUP" : "UPDATE", - (unsigned long) lock->l_resource->lr_name.name[0], - (unsigned long) lock->l_resource->lr_name.name[1], - (unsigned long) obj->fid.mds, - (unsigned long) obj->fid.id, - (unsigned long) obj->fid.generation); + if (obj) { + CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n", + lock->l_resource->lr_name.name[3] == 1 ? "LOOKUP" : "UPDATE", + (unsigned long)lock->l_resource->lr_name.name[0], + (unsigned long)lock->l_resource->lr_name.name[1], + (unsigned long)obj->fid.mds, (unsigned long)obj->fid.id, + (unsigned long)obj->fid.generation); + lmv_put_obj(obj); + } break; default: LBUG(); @@ -1176,32 +1260,34 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, (unsigned long) rfid.id, (unsigned long) rfid.generation); - obj = lmv_grab_obj(obd, mdc_fid, 0); + obj = lmv_grab_obj(obd, mdc_fid); if (obj) { - /* find dirobj containing page with requested offset */ - /* FIXME: what about protecting cached attrs here? */ + lmv_lock_obj(obj); + + /* find dirobj containing page with requested offset. */ for (i = 0; i < obj->objcount; i++) { if (offset < obj->objs[i].size) break; offset -= obj->objs[i].size; } rfid = obj->objs[i].fid; + + lmv_unlock_obj(obj); + lmv_put_obj(obj); + CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n", - (unsigned long) rfid.mds, - (unsigned long) rfid.id, - (unsigned long) rfid.generation, - (unsigned long) offset); + (unsigned long)rfid.mds, (unsigned long)rfid.id, + (unsigned long)rfid.generation, (unsigned long)offset); } - rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, page, request); - if (rc == 0 && !fid_equal(&rfid, mdc_fid)) { - /* this page isn't from master object. to avoid - * ./.. duplication in directory, we have to remove them - * from all slave objects */ + rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, + page, request); + + if (rc == 0 && !fid_equal(&rfid, mdc_fid)) + /* this page isn't from master object. To avoid "." and ".." + * duplication in directory, we have to remove them from all + * slave objects */ lmv_remove_dots(page); - } - - lmv_put_obj(obj); - + RETURN(rc); } @@ -1217,13 +1303,14 @@ int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data, LASSERT(mea != NULL); for (i = 0; i < mea->mea_count; i++) { - if (lmv->tgts[i].ltd_exp == NULL) - continue; - memset(&data2, 0, sizeof(data2)); data2.fid1 = mea->mea_fids[i]; data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR; mds = data2.fid1.mds; + + if (lmv->tgts[mds].ltd_exp == NULL) + continue; + rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req); CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n", (unsigned long) mea->mea_fids[i].mds, @@ -1239,6 +1326,19 @@ int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); } +int lmv_delete_object(struct obd_export *exp, struct ll_fid *fid) +{ + ENTRY; + + if (!lmv_delete_obj(exp, fid)) { + CDEBUG(D_OTHER, "Object %lu/%lu/%lu is not found.\n", + (unsigned long)fid->mds, (unsigned long)fid->id, + (unsigned long)fid->generation); + } + + RETURN(0); +} + int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, struct ptlrpc_request **request) { @@ -1246,6 +1346,7 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, struct lmv_obd *lmv = &obd->u.lmv; int rc, i = 0; ENTRY; + rc = lmv_check_connect(obd); if (rc) RETURN(rc); @@ -1256,9 +1357,10 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, RETURN(rc); } else if (data->namelen != 0) { struct lmv_obj *obj; - obj = lmv_grab_obj(obd, &data->fid1, 0); + + obj = lmv_grab_obj(obd, &data->fid1); if (obj) { - i = raw_name2idx(obj->objcount, data->name, + i = raw_name2idx(obj->hashtype, obj->objcount, data->name, data->namelen); data->fid1 = obj->objs[i].fid; lmv_put_obj(obj); @@ -1289,6 +1391,7 @@ struct obd_device *lmv_get_real_obd(struct obd_export *exp, rc = lmv_check_connect(obd); if (rc) RETURN(ERR_PTR(rc)); +#warning "we need well-desgined readdir() implementation to remove this mess" obd = lmv->tgts[0].ltd_exp->exp_obd; EXIT; return obd; @@ -1315,9 +1418,19 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize) if (lmv->connected == 0) RETURN(0); - /* FIXME: error handling? */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) { + CWARN("%s: NULL export for %d\n", obd->obd_name, i); + continue; + } + rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize); + if (rc) { + CERROR("obd_init_ea_size() failed on MDT target %d, " + "error %d.\n", i, rc); + break; + } + } RETURN(rc); } @@ -1339,7 +1452,6 @@ int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa, LASSERT(oa->o_mds < lmv->desc.ld_tgt_count); rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa, &obj_mdp, oti); - LASSERT(rc == 0); RETURN(rc); } @@ -1352,9 +1464,10 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mea *mea; int i, c, rc = 0; + struct mea *mea; struct ll_fid mfid; + int lcount; ENTRY; rc = lmv_check_connect(obd); @@ -1370,21 +1483,28 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, if (*ea == NULL) { rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea); - LASSERT(*ea != NULL); + if (rc < 0) { + CERROR("obd_alloc_diskmd() failed, error %d\n", + rc); + RETURN(rc); + } + + if (*ea == NULL) + RETURN(-EINVAL); } - mea = (struct mea *)*ea; + rc = 0; mfid.id = oa->o_id; mfid.generation = oa->o_generation; - rc = 0; + + mea = (struct mea *)*ea; if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count) mea->mea_count = lmv->desc.ld_tgt_count; + mea->mea_magic = MEA_MAGIC_ALL_CHARS; mea->mea_master = -1; - - /* FIXME: error handling? */ - for (i = 0, c = 0; c < mea->mea_count && - i < lmv->desc.ld_tgt_count; i++) { + lcount = lmv->desc.ld_tgt_count; + for (i = 0, c = 0; c < mea->mea_count && i < lcount; i++) { struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; @@ -1398,17 +1518,20 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, continue; } - /* "Master" MDS should always be part of stripped dir, so - scan for it */ + /* "master" MDS should always be part of stripped dir, so scan + for it. */ if (mea->mea_master == -1 && c == mea->mea_count - 1) continue; oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE - | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID; + | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID; rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti); - /* FIXME: error handling here */ - LASSERT(rc == 0); + if (rc) { + CERROR("obd_create() failed on MDT target %d, " + "error %d\n", c, rc); + RETURN(rc); + } mea->mea_fids[c].id = oa->o_id; mea->mea_fids[c].generation = oa->o_generation; @@ -1479,22 +1602,7 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen, } lmv = &obd->u.lmv; - if (keylen >= strlen("client") && strcmp(key, "client") == 0) { - struct lmv_tgt_desc *tgts; - int i, rc; - - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); - - for (i = 0, tgts = lmv->tgts; - i < lmv->desc.ld_tgt_count; i++, tgts++) { - rc = obd_set_info(tgts->ltd_exp, keylen, key, vallen, val); - if (rc) - RETURN(rc); - } - RETURN(0); - } else if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) { + if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) { lmv->server_timeout = 1; lmv_set_timeouts(obd); RETURN(0); @@ -1522,9 +1630,9 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, RETURN(0); } - if (!*lmmp) { + if (*lmmp == NULL) { OBD_ALLOC(*lmmp, mea_size); - if (!*lmmp) + if (*lmmp == NULL) RETURN(-ENOMEM); } @@ -1537,7 +1645,7 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, } int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt, - struct lov_mds_md *disk_src, int mdsize) + struct lov_mds_md *disk_src, int mdsize) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -1559,8 +1667,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt, LASSERT(mea_size == mdsize); OBD_ALLOC(*tmea, mea_size); - /* FIXME: error handling here */ - LASSERT(*tmea != NULL); + if (*tmea == NULL) + RETURN(-ENOMEM); if (!disk_src) RETURN(mea_size); @@ -1571,8 +1679,8 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt, } int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *ea, obd_count oa_bufs, - struct brw_page *pgarr, struct obd_trans_info *oti) + struct lov_stripe_md *ea, obd_count oa_bufs, + struct brw_page *pgarr, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1631,6 +1739,7 @@ struct md_ops lmv_md_ops = { .m_unlink = lmv_unlink, .m_get_real_obd = lmv_get_real_obd, .m_valid_attrs = lmv_valid_attrs, + .m_delete_object = lmv_delete_object, }; int __init lmv_init(void)