X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=bd74795355f3015da6e973741491f6e1a7248ec6;hp=afc287626700f2af086805aff1a80e7175123b49;hb=57b9e88690b51cd8ccd9b2fd8a09f533ca09a7bd;hpb=98c7560d5141fe5da38e5fd622935439cea6a283 diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index afc2876..bd74795 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -96,7 +96,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n", i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie); - if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0) + if (obd_uuid_equals(uuid, &tgt->uuid)) break; } @@ -552,27 +552,27 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf) struct lmv_obd *lmv = &obd->u.lmv; ENTRY; - if (lcfg->lcfg_inllen1 < 1) { + if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { CERROR("LMV setup requires a descriptor\n"); RETURN(-EINVAL); } - if (lcfg->lcfg_inllen2 < 1) { + if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) { CERROR("LMV setup requires an MDT UUID list\n"); RETURN(-EINVAL); } - desc = (struct lmv_desc *)lcfg->lcfg_inlbuf1; - if (sizeof(*desc) > lcfg->lcfg_inllen1) { + desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1); + if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) { CERROR("descriptor size wrong: %d > %d\n", - (int)sizeof(*desc), lcfg->lcfg_inllen1); + (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1)); RETURN(-EINVAL); } - uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2; - if (sizeof(*uuids) * desc->ld_tgt_count != lcfg->lcfg_inllen2) { + uuids = (struct obd_uuid *)lustre_cfg_buf(lcfg, 2); + if (sizeof(*uuids) * desc->ld_tgt_count != LUSTRE_CFG_BUFLEN(lcfg, 2)) { CERROR("UUID array size wrong: %u * %u != %u\n", - sizeof(*uuids), desc->ld_tgt_count, lcfg->lcfg_inllen2); + sizeof(*uuids), desc->ld_tgt_count, LUSTRE_CFG_BUFLEN(lcfg, 2)); RETURN(-EINVAL); } @@ -1030,6 +1030,53 @@ cleanup: return rc; } +int lmv_enqueue_remote(struct obd_export *exp, int lock_type, + struct lookup_intent *it, int lock_mode, + struct mdc_op_data *data, struct lustre_handle *lockh, + void *lmm, int lmmsize, ldlm_completion_callback cb_compl, + ldlm_blocking_callback cb_blocking, void *cb_data) +{ + struct ptlrpc_request *req = LUSTRE_IT(it)->it_data; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lustre_handle plock; + struct mdc_op_data rdata; + struct mds_body *body = NULL; + int rc = 0, pmode; + ENTRY; + + body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body)); + LASSERT(body != NULL); + + if (!(body->valid & OBD_MD_MDS)) + RETURN(0); + + CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4" -> "DLID4"\n", + LL_IT2STR(it), OLID4(&data->id1), OLID4(&body->id1)); + + /* we got LOOKUP lock, but we really need attrs */ + pmode = LUSTRE_IT(it)->it_lock_mode; + LASSERT(pmode != 0); + memcpy(&plock, lockh, sizeof(plock)); + LUSTRE_IT(it)->it_lock_mode = 0; + LUSTRE_IT(it)->it_data = NULL; + LASSERT((body->valid & OBD_MD_FID) != 0); + + memcpy(&rdata, data, sizeof(rdata)); + rdata.id1 = body->id1; + rdata.name = NULL; + rdata.namelen = 0; + + LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE; + ptlrpc_req_finished(req); + + rc = md_enqueue(lmv->tgts[id_group(&rdata.id1)].ltd_exp, + lock_type, it, lock_mode, &rdata, lockh, lmm, + lmmsize, cb_compl, cb_blocking, cb_data); + ldlm_lock_decref(&plock, pmode); + RETURN(rc); +} + int lmv_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, int lock_mode, struct mdc_op_data *data, struct lustre_handle *lockh, @@ -1070,6 +1117,10 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, lock_type, it, lock_mode, data, lockh, lmm, lmmsize, cb_compl, cb_blocking, cb_data); + if (rc == 0 && it->it_op == IT_OPEN) + rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode, + data, lockh, lmm, lmmsize, + cb_compl, cb_blocking, cb_data); RETURN(rc); } @@ -1102,8 +1153,9 @@ repeat: CDEBUG(D_OTHER, "getattr_lock for %*s on "DLID4" -> "DLID4"\n", namelen, filename, OLID4(id), OLID4(&rid)); - rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, &rid, - filename, namelen, (valid | OBD_MD_FID), + rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, + &rid, filename, namelen, + valid == OBD_MD_FLID ? valid : valid | OBD_MD_FID, ea_size, request); if (rc == 0) { /* @@ -1113,7 +1165,8 @@ repeat: */ body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body)); LASSERT(body != NULL); - LASSERT((body->valid & OBD_MD_FID) != 0); + LASSERT((body->valid & OBD_MD_FID) != 0 + || body->valid == OBD_MD_FLID); if (body->valid & OBD_MD_MDS) { struct ptlrpc_request *req = NULL; @@ -1148,7 +1201,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_obj *obj; - int rc; + int rc, mds; ENTRY; rc = lmv_check_connect(obd); @@ -1157,25 +1210,31 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, if (data->namelen != 0) { /* usual link request */ - obj = lmv_grab_obj(obd, &data->id1); + obj = lmv_grab_obj(obd, &data->id2); if (obj) { rc = raw_name2idx(obj->hashtype, obj->objcount, data->name, data->namelen); - data->id1 = obj->objs[rc].id; + data->id2 = obj->objs[rc].id; lmv_put_obj(obj); } + + mds = id_group(&data->id2); CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n", OLID4(&data->id2), data->namelen, data->name, OLID4(&data->id1)); } else { + mds = id_group(&data->id1); + /* request from MDS to acquire i_links for inode by id1 */ CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n", OLID4(&data->id1)); } - - rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, - data, request); + + CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", + mds, OLID4(&data->id1)); + rc = md_link(lmv->tgts[mds].ltd_exp, data, request); + RETURN(rc); } @@ -1595,6 +1654,7 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, } int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa, + void *acl, int acl_size, struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; @@ -1607,8 +1667,8 @@ int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa, LASSERT(ea == NULL); LASSERT(oa->o_mds < lmv->desc.ld_tgt_count); - rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, - oa, &obj_mdp, oti); + rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa, + acl, acl_size, &obj_mdp, oti); RETURN(rc); } @@ -1628,6 +1688,7 @@ int lmv_getready(struct obd_export *exp) * values for "master" object, as it will be used. */ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, + void *acl, int acl_size, struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; @@ -1644,12 +1705,15 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, LASSERT(oa != NULL); if (ea == NULL) { - rc = lmv_obd_create_single(exp, oa, NULL, oti); + rc = lmv_obd_create_single(exp, oa, acl, acl_size, NULL, oti); if (rc) CERROR("Can't create object, rc = %d\n", rc); RETURN(rc); } + /* acl is only suppied when mds create single remote obj */ + LASSERT(acl == NULL && acl_size == 0); + if (*ea == NULL) { rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea); if (rc < 0) { @@ -1702,7 +1766,8 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID; - rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti); + rc = obd_create(lmv->tgts[c].ltd_exp, oa, NULL, 0, + &obj_mdp, oti); if (rc) { CERROR("obd_create() failed on MDT target %d, " "error %d\n", c, rc); @@ -1839,8 +1904,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen, int lmv_set_info(struct obd_export *exp, obd_count keylen, void *key, obd_count vallen, void *val) { - struct obd_device *obd; - struct lmv_obd *lmv; + struct lmv_tgt_desc *tgt; + struct obd_device *obd; + struct lmv_obd *lmv; ENTRY; obd = class_exp2obd(exp); @@ -1860,7 +1926,6 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen, /* maybe this could be default */ if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) || (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) { - struct lmv_tgt_desc *tgt; struct obd_export *exp; int rc = 0, err, i; @@ -1896,6 +1961,23 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen, RETURN(rc); } + if ((keylen == strlen("flush_cred") && + strcmp(key, "flush_cred") == 0)) { + int rc = 0, i; + + for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; + i++, tgt++) { + if (!tgt->ltd_exp) + continue; + rc = obd_set_info(tgt->ltd_exp, + keylen, key, vallen, val); + if (rc) + RETURN(rc); + } + + RETURN(0); + } + RETURN(-EINVAL); } @@ -2035,6 +2117,29 @@ int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa, RETURN(err); } +static int lmv_cancel_unused(struct obd_export *exp, + struct lov_stripe_md *lsm, + int flags, void *opaque) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int rc = 0, err, i; + ENTRY; + + LASSERT(lsm == NULL); + + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active) + continue; + + err = obd_cancel_unused(lmv->tgts[i].ltd_exp, + NULL, flags, opaque); + if (!rc) + rc = err; + } + RETURN(rc); +} + struct obd_ops lmv_obd_ops = { .o_owner = THIS_MODULE, .o_attach = lmv_attach, @@ -2056,6 +2161,7 @@ struct obd_ops lmv_obd_ops = { .o_notify = lmv_notify, .o_iocontrol = lmv_iocontrol, .o_getready = lmv_getready, + .o_cancel_unused = lmv_cancel_unused, }; struct md_ops lmv_md_ops = {