X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=8af14da99e554b6d3b9ee4bd3082dbb973ce7514;hp=f10ca5a0465fd2704656224eec9a240041f1c17a;hb=c1d0a355a6;hpb=478f97b212714bc3af9a9a104efab314ca942758 diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index f10ca5a..8af14da 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -63,11 +63,12 @@ static int lmv_check_connect(struct obd_device *obd); void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, int activate) { - if (tgt->ltd_active == activate) - return; + if (tgt->ltd_active == activate) + return; - tgt->ltd_active = activate; - lmv->desc.ld_active_tgt_count += (activate ? 1 : -1); + tgt->ltd_active = activate; + lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count += + (activate ? 1 : -1); tgt->ltd_exp->exp_obd->obd_inactive = !activate; } @@ -83,50 +84,47 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, const struct obd_uuid *uuid, int activate) { - struct lmv_tgt_desc *tgt = NULL; - struct obd_device *obd; - __u32 i; - int rc = 0; + struct lu_tgt_desc *tgt = NULL; + struct obd_device *obd; + int rc = 0; + ENTRY; CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n", lmv, uuid->uuid, activate); spin_lock(&lmv->lmv_lock); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - tgt = lmv->tgts[i]; - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; - - CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i, - tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie); + lmv_foreach_connected_tgt(lmv, tgt) { + CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", + tgt->ltd_index, tgt->ltd_uuid.uuid, + tgt->ltd_exp->exp_handle.h_cookie); if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) break; } - if (i == lmv->desc.ld_tgt_count) - GOTO(out_lmv_lock, rc = -EINVAL); + if (!tgt) + GOTO(out_lmv_lock, rc = -EINVAL); - obd = class_exp2obd(tgt->ltd_exp); - if (obd == NULL) - GOTO(out_lmv_lock, rc = -ENOTCONN); + obd = class_exp2obd(tgt->ltd_exp); + if (obd == NULL) + GOTO(out_lmv_lock, rc = -ENOTCONN); - CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n", - obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd, - obd->obd_type->typ_name, i); - LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0); + CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n", + obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd, + obd->obd_type->typ_name, tgt->ltd_index); + LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0); - if (tgt->ltd_active == activate) { - CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd, - activate ? "" : "in"); - GOTO(out_lmv_lock, rc); - } + if (tgt->ltd_active == activate) { + CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd, + activate ? "" : "in"); + GOTO(out_lmv_lock, rc); + } - CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, - activate ? "" : "in"); - lmv_activate_target(lmv, tgt, activate); - EXIT; + CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, + activate ? "" : "in"); + lmv_activate_target(lmv, tgt, activate); + EXIT; out_lmv_lock: spin_unlock(&lmv->lmv_lock); @@ -135,8 +133,8 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *lmv_get_uuid(struct obd_export *exp) { - struct lmv_obd *lmv = &exp->exp_obd->u.lmv; - struct lmv_tgt_desc *tgt = lmv->tgts[0]; + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp); } @@ -240,21 +238,22 @@ out_sysfs: static int lmv_init_ea_size(struct obd_export *exp, __u32 easize, __u32 def_easize) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - __u32 i; - int rc = 0; - int change = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int change = 0; + int rc = 0; + ENTRY; - if (lmv->max_easize < easize) { - lmv->max_easize = easize; - change = 1; - } - if (lmv->max_def_easize < def_easize) { - lmv->max_def_easize = def_easize; - change = 1; - } + if (lmv->max_easize < easize) { + lmv->max_easize = easize; + change = 1; + } + if (lmv->max_def_easize < def_easize) { + lmv->max_def_easize = def_easize; + change = 1; + } if (change == 0) RETURN(0); @@ -262,20 +261,14 @@ static int lmv_init_ea_size(struct obd_export *exp, __u32 easize, if (lmv->connected == 0) RETURN(0); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - - if (tgt == NULL || tgt->ltd_exp == NULL) { - CWARN("%s: NULL export for %d\n", obd->obd_name, i); - continue; - } + lmv_foreach_connected_tgt(lmv, tgt) { if (!tgt->ltd_active) continue; rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize); if (rc) { CERROR("%s: obd_init_ea_size() failed on MDT target %d:" - " rc = %d\n", obd->obd_name, i, rc); + " rc = %d\n", obd->obd_name, tgt->ltd_index, rc); break; } } @@ -351,11 +344,11 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) tgt->ltd_active = 1; tgt->ltd_exp = mdc_exp; - lmv->desc.ld_active_tgt_count++; + lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count++; md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize); - rc = lqos_add_tgt(&lmv->lmv_qos, tgt); + rc = lu_qos_add_tgt(&lmv->lmv_qos, tgt); if (rc) { obd_disconnect(mdc_exp); RETURN(rc); @@ -375,16 +368,11 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) RETURN(0); } -static void lmv_del_target(struct lmv_obd *lmv, int index) +static void lmv_del_target(struct lmv_obd *lmv, struct lu_tgt_desc *tgt) { - if (lmv->tgts[index] == NULL) - return; - - lqos_del_tgt(&lmv->lmv_qos, lmv->tgts[index]); - - OBD_FREE_PTR(lmv->tgts[index]); - lmv->tgts[index] = NULL; - return; + LASSERT(tgt); + ltd_del_tgt(&lmv->lmv_mdt_descs, tgt); + OBD_FREE_PTR(tgt); } static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, @@ -393,7 +381,7 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, struct obd_device *mdc_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; - int orig_tgt_count = 0; + struct lu_tgt_descs *ltd = &lmv->lmv_mdt_descs; int rc = 0; ENTRY; @@ -407,159 +395,98 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, RETURN(-EINVAL); } - mutex_lock(&lmv->lmv_init_mutex); - if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) { - tgt = lmv->tgts[index]; - CERROR("%s: UUID %s already assigned at LMV target index %d:" - " rc = %d\n", obd->obd_name, - obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST); - mutex_unlock(&lmv->lmv_init_mutex); - RETURN(-EEXIST); - } - - if (index >= lmv->tgts_size) { - /* We need to reallocate the lmv target array. */ - struct lmv_tgt_desc **newtgts, **old = NULL; - __u32 newsize = 1; - __u32 oldsize = 0; - - while (newsize < index + 1) - newsize = newsize << 1; - OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize); - if (newtgts == NULL) { - mutex_unlock(&lmv->lmv_init_mutex); - RETURN(-ENOMEM); - } - - if (lmv->tgts_size) { - memcpy(newtgts, lmv->tgts, - sizeof(*newtgts) * lmv->tgts_size); - old = lmv->tgts; - oldsize = lmv->tgts_size; - } - - lmv->tgts = newtgts; - lmv->tgts_size = newsize; - smp_rmb(); - if (old) - OBD_FREE(old, sizeof(*old) * oldsize); - - CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts, - lmv->tgts_size); - } - OBD_ALLOC_PTR(tgt); - if (!tgt) { - mutex_unlock(&lmv->lmv_init_mutex); + if (!tgt) RETURN(-ENOMEM); - } mutex_init(&tgt->ltd_fid_mutex); tgt->ltd_index = index; tgt->ltd_uuid = *uuidp; tgt->ltd_active = 0; - lmv->tgts[index] = tgt; - if (index >= lmv->desc.ld_tgt_count) { - orig_tgt_count = lmv->desc.ld_tgt_count; - lmv->desc.ld_tgt_count = index + 1; - } - if (lmv->connected == 0) { + mutex_lock(<d->ltd_mutex); + rc = ltd_add_tgt(ltd, tgt); + mutex_unlock(<d->ltd_mutex); + + if (rc) + GOTO(out_tgt, rc); + + if (!lmv->connected) /* lmv_check_connect() will connect this target. */ - mutex_unlock(&lmv->lmv_init_mutex); RETURN(0); - } - /* Otherwise let's connect it ourselves */ - mutex_unlock(&lmv->lmv_init_mutex); rc = lmv_connect_mdc(obd, tgt); - if (rc != 0) { - spin_lock(&lmv->lmv_lock); - if (lmv->desc.ld_tgt_count == index + 1) - lmv->desc.ld_tgt_count = orig_tgt_count; - memset(tgt, 0, sizeof(*tgt)); - spin_unlock(&lmv->lmv_lock); - } else { + if (!rc) { int easize = sizeof(struct lmv_stripe_md) + - lmv->desc.ld_tgt_count * sizeof(struct lu_fid); + lmv->lmv_mdt_count * sizeof(struct lu_fid); + lmv_init_ea_size(obd->obd_self_export, easize, 0); } RETURN(rc); + +out_tgt: + OBD_FREE_PTR(tgt); + return rc; } static int lmv_check_connect(struct obd_device *obd) { - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - __u32 i; - int rc; - int easize; - ENTRY; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int easize; + int rc; - if (lmv->connected) - RETURN(0); + ENTRY; - mutex_lock(&lmv->lmv_init_mutex); - if (lmv->connected) { - mutex_unlock(&lmv->lmv_init_mutex); - RETURN(0); - } + if (lmv->connected) + RETURN(0); - if (lmv->desc.ld_tgt_count == 0) { - mutex_unlock(&lmv->lmv_init_mutex); - CERROR("%s: no targets configured.\n", obd->obd_name); - RETURN(-EINVAL); - } + mutex_lock(&lmv->lmv_mdt_descs.ltd_mutex); + if (lmv->connected) + GOTO(unlock, rc = 0); - LASSERT(lmv->tgts != NULL); + if (!lmv->lmv_mdt_count) { + CERROR("%s: no targets configured: rc = -EINVAL\n", + obd->obd_name); + GOTO(unlock, rc = -EINVAL); + } - if (lmv->tgts[0] == NULL) { - mutex_unlock(&lmv->lmv_init_mutex); - CERROR("%s: no target configured for index 0.\n", + if (!lmv_mdt0_inited(lmv)) { + CERROR("%s: no target configured for index 0: rc = -EINVAL.\n", obd->obd_name); - RETURN(-EINVAL); + GOTO(unlock, rc = -EINVAL); } CDEBUG(D_CONFIG, "Time to connect %s to %s\n", obd->obd_uuid.uuid, obd->obd_name); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - tgt = lmv->tgts[i]; - if (tgt == NULL) - continue; + lmv_foreach_tgt(lmv, tgt) { rc = lmv_connect_mdc(obd, tgt); if (rc) GOTO(out_disc, rc); } lmv->connected = 1; - easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC); + easize = lmv_mds_md_size(lmv->lmv_mdt_count, LMV_MAGIC); lmv_init_ea_size(obd->obd_self_export, easize, 0); - mutex_unlock(&lmv->lmv_init_mutex); - RETURN(0); + EXIT; +unlock: + mutex_unlock(&lmv->lmv_mdt_descs.ltd_mutex); + + return rc; - out_disc: - while (i-- > 0) { - int rc2; - tgt = lmv->tgts[i]; - if (tgt == NULL) +out_disc: + lmv_foreach_tgt(lmv, tgt) { + tgt->ltd_active = 0; + if (!tgt->ltd_exp) continue; - tgt->ltd_active = 0; - if (tgt->ltd_exp) { - --lmv->desc.ld_active_tgt_count; - rc2 = obd_disconnect(tgt->ltd_exp); - if (rc2) { - CERROR("LMV target %s disconnect on " - "MDC idx %d: error %d\n", - tgt->ltd_uuid.uuid, i, rc2); - } - } - } - mutex_unlock(&lmv->lmv_init_mutex); + --lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count; + obd_disconnect(tgt->ltd_exp); + } - RETURN(rc); + goto unlock; } static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) @@ -608,33 +535,22 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) static int lmv_disconnect(struct obd_export *exp) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - int rc; - __u32 i; - ENTRY; - - if (!lmv->tgts) - goto out_local; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL) - continue; + ENTRY; - lmv_disconnect_mdc(obd, lmv->tgts[i]); - } + lmv_foreach_connected_tgt(lmv, tgt) + lmv_disconnect_mdc(obd, tgt); if (lmv->lmv_tgts_kobj) kobject_put(lmv->lmv_tgts_kobj); -out_local: - /* - * This is the case when no real connection is established by - * lmv_check_connect(). - */ - if (!lmv->connected) - class_export_put(exp); - rc = class_disconnect(exp); + if (!lmv->connected) + class_export_put(exp); + rc = class_disconnect(exp); lmv->connected = 0; RETURN(rc); @@ -643,17 +559,17 @@ out_local: static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obddev = class_exp2obd(exp); - struct lmv_obd *lmv = &obddev->u.lmv; + struct obd_device *obddev = class_exp2obd(exp); + struct lmv_obd *lmv = &obddev->u.lmv; struct getinfo_fid2path *gf; - struct lmv_tgt_desc *tgt; + struct lmv_tgt_desc *tgt; struct getinfo_fid2path *remote_gf = NULL; - struct lu_fid root_fid; - int remote_gf_size = 0; - int rc; + struct lu_fid root_fid; + int remote_gf_size = 0; + int rc; gf = karg; - tgt = lmv_find_target(lmv, &gf->gf_fid); + tgt = lmv_fid2tgt(lmv, &gf->gf_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -711,7 +627,7 @@ repeat_fid2path: GOTO(out_fid2path, rc = -EINVAL); } - tgt = lmv_find_target(lmv, &gf->gf_fid); + tgt = lmv_fid2tgt(lmv, &gf->gf_fid); if (IS_ERR(tgt)) GOTO(out_fid2path, rc = -EINVAL); @@ -733,13 +649,13 @@ static int lmv_hsm_req_count(struct lmv_obd *lmv, const struct hsm_user_request *hur, const struct lmv_tgt_desc *tgt_mds) { - __u32 i; - int nr = 0; - struct lmv_tgt_desc *curr_tgt; + struct lmv_tgt_desc *curr_tgt; + __u32 i; + int nr = 0; /* count how many requests must be sent to the given target */ for (i = 0; i < hur->hur_request.hr_itemcount; i++) { - curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid); + curr_tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[i].hui_fid); if (IS_ERR(curr_tgt)) RETURN(PTR_ERR(curr_tgt)); if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) @@ -753,15 +669,14 @@ static int lmv_hsm_req_build(struct lmv_obd *lmv, const struct lmv_tgt_desc *tgt_mds, struct hsm_user_request *hur_out) { - __u32 i, nr_out; - struct lmv_tgt_desc *curr_tgt; + __u32 i, nr_out; + struct lmv_tgt_desc *curr_tgt; /* build the hsm_user_request for the given target */ hur_out->hur_request = hur_in->hur_request; nr_out = 0; for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) { - curr_tgt = lmv_find_target(lmv, - &hur_in->hur_user_item[i].hui_fid); + curr_tgt = lmv_fid2tgt(lmv, &hur_in->hur_user_item[i].hui_fid); if (IS_ERR(curr_tgt)) RETURN(PTR_ERR(curr_tgt)); if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) { @@ -782,20 +697,16 @@ static int lmv_hsm_ct_unregister(struct obd_device *obd, unsigned int cmd, void __user *uarg) { struct lmv_obd *lmv = &obd->u.lmv; - __u32 i; - int rc; + struct lu_tgt_desc *tgt; + int rc; + ENTRY; /* unregister request (call from llapi_hsm_copytool_fini) */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; + lmv_foreach_connected_tgt(lmv, tgt) /* best effort: try to clean as much as possible * (continue on error) */ obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg); - } /* Whatever the result, remove copytool from kuc groups. * Unreached coordinators will get EPIPE on next requests @@ -812,12 +723,14 @@ static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd, { struct lmv_obd *lmv = &obd->u.lmv; struct file *filp; - __u32 i, j; - int err; bool any_set = false; struct kkuc_ct_data *kcd; size_t kcd_size; + struct lu_tgt_desc *tgt; + __u32 i; + int err; int rc = 0; + ENTRY; filp = fget(lk->lk_wfd); @@ -853,12 +766,7 @@ static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd, /* All or nothing: try to register to all MDS. * In case of failure, unregister from previous MDS, * except if it because of inactive target. */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; - + lmv_foreach_connected_tgt(lmv, tgt) { err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg); if (err) { if (tgt->ltd_active) { @@ -866,14 +774,16 @@ static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd, CERROR("%s: iocontrol MDC %s on MDT" " idx %d cmd %x: err = %d\n", lmv2obd_dev(lmv)->obd_name, - tgt->ltd_uuid.uuid, i, cmd, err); + tgt->ltd_uuid.uuid, tgt->ltd_index, cmd, + err); rc = err; lk->lk_flags |= LK_FLG_STOP; + i = tgt->ltd_index; /* unregister from previous MDS */ - for (j = 0; j < i; j++) { - tgt = lmv->tgts[j]; - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; + lmv_foreach_connected_tgt(lmv, tgt) { + if (tgt->ltd_index >= i) + break; + obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg); } @@ -901,37 +811,35 @@ err_fput: return rc; } - - - static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obddev = class_exp2obd(exp); - struct lmv_obd *lmv = &obddev->u.lmv; - struct lmv_tgt_desc *tgt = NULL; - __u32 i = 0; - int rc = 0; - int set = 0; - __u32 count = lmv->desc.ld_tgt_count; + struct obd_device *obddev = class_exp2obd(exp); + struct lmv_obd *lmv = &obddev->u.lmv; + struct lu_tgt_desc *tgt = NULL; + int set = 0; + __u32 count = lmv->lmv_mdt_count; + int rc = 0; + ENTRY; - if (count == 0) - RETURN(-ENOTTY); + if (count == 0) + RETURN(-ENOTTY); - switch (cmd) { - case IOC_OBD_STATFS: { - struct obd_ioctl_data *data = karg; - struct obd_device *mdc_obd; - struct obd_statfs stat_buf = {0}; - __u32 index; + switch (cmd) { + case IOC_OBD_STATFS: { + struct obd_ioctl_data *data = karg; + struct obd_device *mdc_obd; + struct obd_statfs stat_buf = {0}; + __u32 index; - memcpy(&index, data->ioc_inlbuf2, sizeof(__u32)); - if ((index >= count)) - RETURN(-ENODEV); + memcpy(&index, data->ioc_inlbuf2, sizeof(__u32)); - tgt = lmv->tgts[index]; - if (tgt == NULL || !tgt->ltd_active) + if (index >= lmv->lmv_mdt_descs.ltd_tgts_size) + RETURN(-ENODEV); + + tgt = lmv_tgt(lmv, index); + if (!tgt || !tgt->ltd_active) RETURN(-ENODATA); mdc_obd = class_exp2obd(tgt->ltd_exp); @@ -954,59 +862,50 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, (int) sizeof(stat_buf)))) RETURN(-EFAULT); break; - } - case OBD_IOC_QUOTACTL: { - struct if_quotactl *qctl = karg; - struct obd_quotactl *oqctl; + } + case OBD_IOC_QUOTACTL: { + struct if_quotactl *qctl = karg; + struct obd_quotactl *oqctl; if (qctl->qc_valid == QC_MDTIDX) { - if (count <= qctl->qc_idx) - RETURN(-EINVAL); - - tgt = lmv->tgts[qctl->qc_idx]; - if (tgt == NULL || tgt->ltd_exp == NULL) - RETURN(-EINVAL); + tgt = lmv_tgt(lmv, qctl->qc_idx); } else if (qctl->qc_valid == QC_UUID) { - for (i = 0; i < count; i++) { - tgt = lmv->tgts[i]; - if (tgt == NULL) - continue; + lmv_foreach_tgt(lmv, tgt) { if (!obd_uuid_equals(&tgt->ltd_uuid, &qctl->obd_uuid)) continue; - if (tgt->ltd_exp == NULL) - RETURN(-EINVAL); + if (!tgt->ltd_exp) + RETURN(-EINVAL); - break; - } - } else { - RETURN(-EINVAL); - } + break; + } + } else { + RETURN(-EINVAL); + } - if (i >= count) - RETURN(-EAGAIN); + if (!tgt || !tgt->ltd_exp) + RETURN(-EINVAL); - LASSERT(tgt != NULL && tgt->ltd_exp != NULL); - OBD_ALLOC_PTR(oqctl); - if (!oqctl) - RETURN(-ENOMEM); + OBD_ALLOC_PTR(oqctl); + if (!oqctl) + RETURN(-ENOMEM); - QCTL_COPY(oqctl, qctl); - rc = obd_quotactl(tgt->ltd_exp, oqctl); - if (rc == 0) { - QCTL_COPY(qctl, oqctl); - qctl->qc_valid = QC_MDTIDX; - qctl->obd_uuid = tgt->ltd_uuid; - } - OBD_FREE_PTR(oqctl); - break; - } + QCTL_COPY(oqctl, qctl); + rc = obd_quotactl(tgt->ltd_exp, oqctl); + if (rc == 0) { + QCTL_COPY(qctl, oqctl); + qctl->qc_valid = QC_MDTIDX; + qctl->obd_uuid = tgt->ltd_uuid; + } + OBD_FREE_PTR(oqctl); + break; + } case LL_IOC_GET_CONNECT_FLAGS: { - tgt = lmv->tgts[0]; - if (tgt == NULL || tgt->ltd_exp == NULL) - RETURN(-ENODATA); - rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); + tgt = lmv_tgt(lmv, 0); + rc = -ENODATA; + if (tgt && tgt->ltd_exp) + rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); break; } case LL_IOC_FID2MDTIDX: { @@ -1029,9 +928,9 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, case LL_IOC_HSM_STATE_GET: case LL_IOC_HSM_STATE_SET: case LL_IOC_HSM_ACTION: { - struct md_op_data *op_data = karg; + struct md_op_data *op_data = karg; - tgt = lmv_find_target(lmv, &op_data->op_fid1); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1044,7 +943,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, case LL_IOC_HSM_PROGRESS: { const struct hsm_progress_kernel *hpk = karg; - tgt = lmv_find_target(lmv, &hpk->hpk_fid); + tgt = lmv_fid2tgt(lmv, &hpk->hpk_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); @@ -1061,22 +960,17 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, * or if there is a single MDS, no need to split * the request. */ if (reqcount == 1 || count == 1) { - tgt = lmv_find_target(lmv, - &hur->hur_user_item[0].hui_fid); + tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[0].hui_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); } else { /* split fid list to their respective MDS */ - for (i = 0; i < count; i++) { + lmv_foreach_connected_tgt(lmv, tgt) { int nr, rc1; size_t reqlen; struct hsm_user_request *req; - tgt = lmv->tgts[i]; - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; - nr = lmv_hsm_req_count(lmv, hur, tgt); if (nr < 0) RETURN(nr); @@ -1104,14 +998,14 @@ hsm_req_err: break; } case LL_IOC_LOV_SWAP_LAYOUTS: { - struct md_op_data *op_data = karg; - struct lmv_tgt_desc *tgt1, *tgt2; + struct md_op_data *op_data = karg; + struct lmv_tgt_desc *tgt1, *tgt2; - tgt1 = lmv_find_target(lmv, &op_data->op_fid1); + tgt1 = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(tgt1)) RETURN(PTR_ERR(tgt1)); - tgt2 = lmv_find_target(lmv, &op_data->op_fid2); + tgt2 = lmv_fid2tgt(lmv, &op_data->op_fid2); if (IS_ERR(tgt2)) RETURN(PTR_ERR(tgt2)); @@ -1134,13 +1028,10 @@ hsm_req_err: break; } default: - for (i = 0; i < count; i++) { + lmv_foreach_connected_tgt(lmv, tgt) { struct obd_device *mdc_obd; int err; - tgt = lmv->tgts[i]; - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; /* ll_umount_begin() sets force flag but for lmv, not * mdc. Let's pass it through */ mdc_obd = class_exp2obd(tgt->ltd_exp); @@ -1150,123 +1041,52 @@ hsm_req_err: if (tgt->ltd_active) { CERROR("error: iocontrol MDC %s on MDT" " idx %d cmd %x: err = %d\n", - tgt->ltd_uuid.uuid, i, cmd, err); + tgt->ltd_uuid.uuid, + tgt->ltd_index, cmd, err); if (!rc) rc = err; } } else set = 1; - } - if (!set && !rc) - rc = -EIO; - } - RETURN(rc); + } + if (!set && !rc) + rc = -EIO; + } + RETURN(rc); } -/** - * This is _inode_ placement policy function (not name). - */ -static u32 lmv_placement_policy(struct obd_device *obd, - struct md_op_data *op_data) +int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { + struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_user_md *lum; - u32 mdt; + struct lmv_tgt_desc *tgt; + int rc; ENTRY; - if (lmv->desc.ld_tgt_count == 1) - RETURN(0); + LASSERT(op_data); + LASSERT(fid); - lum = op_data->op_data; - /* - * Choose MDT by - * 1. See if the stripe offset is specified by lum. - * 2. If parent has default LMV, and its hash type is "space", choose - * MDT with QoS. (see lmv_locate_tgt_qos()). - * 3. Then check if default LMV stripe offset is not -1. - * 4. Finally choose MDS by name hash if the parent - * is striped directory. (see lmv_locate_tgt()). - * - * presently explicit MDT location is not supported - * for foreign dirs (as it can't be embedded into free - * format LMV, like with lum_stripe_offset), so we only - * rely on default stripe offset or then name hashing. - */ - if (op_data->op_cli_flags & CLI_SET_MEA && lum != NULL && - le32_to_cpu(lum->lum_magic != LMV_MAGIC_FOREIGN) && - le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) { - mdt = le32_to_cpu(lum->lum_stripe_offset); - } else if (op_data->op_code == LUSTRE_OPC_MKDIR && - !lmv_dir_striped(op_data->op_mea1) && - lmv_dir_space_hashed(op_data->op_default_mea1)) { - mdt = op_data->op_mds; - } else if (op_data->op_code == LUSTRE_OPC_MKDIR && - op_data->op_default_mea1 && - op_data->op_default_mea1->lsm_md_master_mdt_index != - (__u32)-1) { - mdt = op_data->op_default_mea1->lsm_md_master_mdt_index; - op_data->op_mds = mdt; - } else { - mdt = op_data->op_mds; - } + tgt = lmv_tgt(lmv, op_data->op_mds); + if (!tgt) + RETURN(-ENODEV); - RETURN(mdt); -} - -int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds) -{ - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; - - tgt = lmv_get_target(lmv, mds, NULL); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + if (!tgt->ltd_active || !tgt->ltd_exp) + RETURN(-ENODEV); /* * New seq alloc and FLD setup should be atomic. Otherwise we may find * on server that seq in new allocated fid is not yet known. */ mutex_lock(&tgt->ltd_fid_mutex); - - if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) - GOTO(out, rc = -ENODEV); - - /* - * Asking underlying tgt layer to allocate new fid. - */ rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL); + mutex_unlock(&tgt->ltd_fid_mutex); if (rc > 0) { LASSERT(fid_is_sane(fid)); rc = 0; } - EXIT; -out: - mutex_unlock(&tgt->ltd_fid_mutex); - return rc; -} - -int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, - struct lu_fid *fid, struct md_op_data *op_data) -{ - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - u32 mds; - int rc; - - ENTRY; - - LASSERT(op_data != NULL); - LASSERT(fid != NULL); - - mds = lmv_placement_policy(obd, op_data); - - rc = __lmv_fid_alloc(lmv, fid, mds); - if (rc) - CERROR("Can't alloc new fid, rc %d\n", rc); - RETURN(rc); } @@ -1280,43 +1100,28 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ENTRY; - if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { - CERROR("LMV setup requires a descriptor\n"); - RETURN(-EINVAL); - } - - desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1); - if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) { - CERROR("Lmv descriptor size wrong: %d > %d\n", - (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1)); - RETURN(-EINVAL); - } + if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { + CERROR("LMV setup requires a descriptor\n"); + RETURN(-EINVAL); + } - lmv->tgts_size = 32U; - OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size); - if (lmv->tgts == NULL) - RETURN(-ENOMEM); + desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1); + if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) { + CERROR("Lmv descriptor size wrong: %d > %d\n", + (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1)); + RETURN(-EINVAL); + } - obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid); - lmv->desc.ld_tgt_count = 0; - lmv->desc.ld_active_tgt_count = 0; - lmv->desc.ld_qos_maxage = LMV_DESC_QOS_MAXAGE_DEFAULT; + obd_str2uuid(&lmv->lmv_mdt_descs.ltd_lmv_desc.ld_uuid, + desc->ld_uuid.uuid); + lmv->lmv_mdt_descs.ltd_lmv_desc.ld_tgt_count = 0; + lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count = 0; + lmv->lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage = + LMV_DESC_QOS_MAXAGE_DEFAULT; lmv->max_def_easize = 0; lmv->max_easize = 0; spin_lock_init(&lmv->lmv_lock); - mutex_init(&lmv->lmv_init_mutex); - - /* Set up allocation policy (QoS and RR) */ - INIT_LIST_HEAD(&lmv->lmv_qos.lq_svr_list); - init_rwsem(&lmv->lmv_qos.lq_rw_sem); - lmv->lmv_qos.lq_dirty = 1; - lmv->lmv_qos.lq_rr.lqr_dirty = 1; - lmv->lmv_qos.lq_reset = 1; - /* Default priority is toward free space balance */ - lmv->lmv_qos.lq_prio_free = 232; - /* Default threshold for rr (roughly 17%) */ - lmv->lmv_qos.lq_threshold_rr = 43; /* * initialize rr_index to lower 32bit of netid, so that client @@ -1336,33 +1141,30 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = fld_client_init(&lmv->lmv_fld, obd->obd_name, LUSTRE_CLI_FLD_HASH_DHT); - if (rc) { + if (rc) CERROR("Can't init FLD, err %d\n", rc); - GOTO(out, rc); - } - RETURN(0); + rc = lu_tgt_descs_init(&lmv->lmv_mdt_descs, true); + if (rc) + CWARN("%s: error initialize target table: rc = %d\n", + obd->obd_name, rc); -out: - return rc; + RETURN(rc); } static int lmv_cleanup(struct obd_device *obd) { - struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_tgt_desc *tgt; + struct lu_tgt_desc *tmp; + ENTRY; fld_client_fini(&lmv->lmv_fld); - if (lmv->tgts != NULL) { - int i; - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - if (lmv->tgts[i] == NULL) - continue; - lmv_del_target(lmv, i); - } - OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size); - lmv->tgts_size = 0; - } + lmv_foreach_tgt_safe(lmv, tgt, tmp) + lmv_del_target(lmv, tgt); + lu_tgt_descs_fini(&lmv->lmv_mdt_descs); + RETURN(0); } @@ -1405,7 +1207,7 @@ static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags) if (flags & OBD_STATFS_FOR_MDT0) return 0; - if (lmv->lmv_statfs_start || lmv->desc.ld_tgt_count == 1) + if (lmv->lmv_statfs_start || lmv->lmv_mdt_count == 1) return lmv->lmv_statfs_start; /* choose initial MDT for this client */ @@ -1418,8 +1220,8 @@ static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags) /* We dont need a full 64-bit modulus, just enough * to distribute the requests across MDTs evenly. */ - lmv->lmv_statfs_start = - (u32)lnet_id.nid % lmv->desc.ld_tgt_count; + lmv->lmv_statfs_start = (u32)lnet_id.nid % + lmv->lmv_mdt_count; break; } } @@ -1430,31 +1232,33 @@ static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags) static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, time64_t max_age, __u32 flags) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_statfs *temp; - int rc = 0; - __u32 i, idx; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_statfs *temp; + struct lu_tgt_desc *tgt; + __u32 i; + __u32 idx; + int rc = 0; + ENTRY; - OBD_ALLOC(temp, sizeof(*temp)); - if (temp == NULL) - RETURN(-ENOMEM); + OBD_ALLOC(temp, sizeof(*temp)); + if (temp == NULL) + RETURN(-ENOMEM); /* distribute statfs among MDTs */ idx = lmv_select_statfs_mdt(lmv, flags); - for (i = 0; i < lmv->desc.ld_tgt_count; i++, idx++) { - idx = idx % lmv->desc.ld_tgt_count; - if (lmv->tgts[idx] == NULL || lmv->tgts[idx]->ltd_exp == NULL) + for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++, idx++) { + idx = idx % lmv->lmv_mdt_descs.ltd_tgts_size; + tgt = lmv_tgt(lmv, idx); + if (!tgt || !tgt->ltd_exp) continue; - rc = obd_statfs(env, lmv->tgts[idx]->ltd_exp, temp, - max_age, flags); + rc = obd_statfs(env, tgt->ltd_exp, temp, max_age, flags); if (rc) { CERROR("%s: can't stat MDS #%d: rc = %d\n", - lmv->tgts[idx]->ltd_exp->exp_obd->obd_name, i, - rc); + tgt->ltd_exp->exp_obd->obd_name, i, rc); GOTO(out_free_temp, rc); } @@ -1480,12 +1284,12 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_files += temp->os_files; osfs->os_granted += temp->os_granted; } - } + } - EXIT; + EXIT; out_free_temp: - OBD_FREE(temp, sizeof(*temp)); - return rc; + OBD_FREE(temp, sizeof(*temp)); + return rc; } static int lmv_statfs_update(void *cookie, int rc) @@ -1522,7 +1326,7 @@ int lmv_statfs_check_update(struct obd_device *obd, struct lmv_tgt_desc *tgt) int rc; if (ktime_get_seconds() - tgt->ltd_statfs_age < - obd->u.lmv.desc.ld_qos_maxage) + obd->u.lmv.lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage) return 0; rc = obd_statfs_async(tgt->ltd_exp, &oinfo, 0, NULL); @@ -1533,12 +1337,17 @@ int lmv_statfs_check_update(struct obd_device *obd, struct lmv_tgt_desc *tgt) static int lmv_get_root(struct obd_export *exp, const char *fileset, struct lu_fid *fid) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_tgt_desc *tgt = lmv_tgt(lmv, 0); + int rc; + + ENTRY; - rc = md_get_root(lmv->tgts[0]->ltd_exp, fileset, fid); + if (!tgt) + RETURN(-ENODEV); + + rc = md_get_root(tgt->ltd_exp, fileset, fid); RETURN(rc); } @@ -1546,15 +1355,16 @@ static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid, u64 obd_md_valid, const char *name, size_t buf_size, struct ptlrpc_request **req) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + ENTRY; + + tgt = lmv_fid2tgt(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); rc = md_getxattr(tgt->ltd_exp, fid, obd_md_valid, name, buf_size, req); @@ -1567,15 +1377,16 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, unsigned int xattr_flags, u32 suppgid, struct ptlrpc_request **req) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + ENTRY; + + tgt = lmv_fid2tgt(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); rc = md_setxattr(tgt->ltd_exp, fid, obd_md_valid, name, value, value_size, xattr_flags, suppgid, req); @@ -1583,68 +1394,151 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, - struct ptlrpc_request **request) +static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, + struct ptlrpc_request **request) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + + ENTRY; + + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (op_data->op_flags & MF_GET_MDT_IDX) { + op_data->op_mds = tgt->ltd_index; + RETURN(0); + } + + rc = md_getattr(tgt->ltd_exp, op_data, request); + + RETURN(rc); +} + +static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_tgt_desc *tgt; + + ENTRY; + + CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); + + /* + * With DNE every object can have two locks in different namespaces: + * lookup lock in space of MDT storing direntry and update/open lock in + * space of MDT storing inode. + */ + lmv_foreach_connected_tgt(lmv, tgt) + md_null_inode(tgt->ltd_exp, fid); + + RETURN(0); +} + +static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod, struct ptlrpc_request **request) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + + ENTRY; + + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1)); + rc = md_close(tgt->ltd_exp, op_data, mod, request); + RETURN(rc); +} + +static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv, __u32 *mdt) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct lu_tgt_desc *tgt; + __u64 total_weight = 0; + __u64 cur_weight = 0; + __u64 rand; + int rc; - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + ENTRY; - if (op_data->op_flags & MF_GET_MDT_IDX) { - op_data->op_mds = tgt->ltd_index; - RETURN(0); - } + if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs)) + RETURN(ERR_PTR(-EAGAIN)); - rc = md_getattr(tgt->ltd_exp, op_data, request); + down_write(&lmv->lmv_qos.lq_rw_sem); - RETURN(rc); -} + if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs)) + GOTO(unlock, tgt = ERR_PTR(-EAGAIN)); -static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - __u32 i; - ENTRY; + rc = ltd_qos_penalties_calc(&lmv->lmv_mdt_descs); + if (rc) + GOTO(unlock, tgt = ERR_PTR(rc)); - CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); + lmv_foreach_tgt(lmv, tgt) { + tgt->ltd_qos.ltq_usable = 0; + if (!tgt->ltd_exp || !tgt->ltd_active) + continue; - /* - * With DNE every object can have two locks in different namespaces: - * lookup lock in space of MDT storing direntry and update/open lock in - * space of MDT storing inode. - */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL) + tgt->ltd_qos.ltq_usable = 1; + lu_tgt_qos_weight_calc(tgt); + total_weight += tgt->ltd_qos.ltq_weight; + } + + rand = lu_prandom_u64_max(total_weight); + + lmv_foreach_connected_tgt(lmv, tgt) { + if (!tgt->ltd_qos.ltq_usable) + continue; + + cur_weight += tgt->ltd_qos.ltq_weight; + if (cur_weight < rand) continue; - md_null_inode(lmv->tgts[i]->ltd_exp, fid); + + *mdt = tgt->ltd_index; + ltd_qos_update(&lmv->lmv_mdt_descs, tgt, &total_weight); + GOTO(unlock, rc = 0); } - RETURN(0); + /* no proper target found */ + GOTO(unlock, tgt = ERR_PTR(-EAGAIN)); +unlock: + up_write(&lmv->lmv_qos.lq_rw_sem); + + return tgt; } -static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, - struct md_open_data *mod, struct ptlrpc_request **request) +static struct lu_tgt_desc *lmv_locate_tgt_rr(struct lmv_obd *lmv, __u32 *mdt) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct lu_tgt_desc *tgt; + int i; + int index; + + ENTRY; + + spin_lock(&lmv->lmv_qos.lq_rr.lqr_alloc); + for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++) { + index = (i + lmv->lmv_qos_rr_index) % + lmv->lmv_mdt_descs.ltd_tgts_size; + tgt = lmv_tgt(lmv, index); + if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) + continue; - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + *mdt = tgt->ltd_index; + lmv->lmv_qos_rr_index = (*mdt + 1) % + lmv->lmv_mdt_descs.ltd_tgts_size; + spin_unlock(&lmv->lmv_qos.lq_rr.lqr_alloc); - CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1)); - rc = md_close(tgt->ltd_exp, op_data, mod, request); - RETURN(rc); + RETURN(tgt); + } + spin_unlock(&lmv->lmv_qos.lq_rr.lqr_alloc); + + RETURN(ERR_PTR(-ENODEV)); } static struct lmv_tgt_desc * @@ -1656,7 +1550,7 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, const struct lmv_oinfo *oinfo; if (!lmv_dir_striped(lsm) || !namelen) { - tgt = lmv_find_target(lmv, fid); + tgt = lmv_fid2tgt(lmv, fid); if (IS_ERR(tgt)) return tgt; @@ -1677,11 +1571,11 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, *fid = oinfo->lmo_fid; *mds = oinfo->lmo_mds; - tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL); + tgt = lmv_tgt(lmv, oinfo->lmo_mds); CDEBUG(D_INODE, "locate MDT %u parent "DFID"\n", *mds, PFID(fid)); - return tgt; + return tgt ? tgt : ERR_PTR(-ENODEV); } /** @@ -1692,8 +1586,7 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, * which is set outside, and if dir is migrating, 'op_data->op_post_migrate' * indicates whether old or new layout is used to locate. * - * For plain direcotry, normally it will locate MDT by FID, but if this - * directory has default LMV, and its hash type is "space", locate MDT with QoS. + * For plain direcotry, it just locate the MDT of op_data->op_fid1. * * \param[in] lmv LMV device * \param[in] op_data client MD stack parameters, name, namelen @@ -1716,10 +1609,10 @@ lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data) * index if the file under striped dir is being restored, see * ct_restore(). */ if (op_data->op_bias & MDS_CREATE_VOLATILE && - (int)op_data->op_mds != -1) { - tgt = lmv_get_target(lmv, op_data->op_mds, NULL); - if (IS_ERR(tgt)) - return tgt; + op_data->op_mds != LMV_OFFSET_DEFAULT) { + tgt = lmv_tgt(lmv, op_data->op_mds); + if (!tgt) + return ERR_PTR(-ENODEV); if (lmv_dir_striped(lsm)) { int i; @@ -1742,30 +1635,9 @@ lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data) op_data->op_fid1 = oinfo->lmo_fid; op_data->op_mds = oinfo->lmo_mds; - tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL); - } else if (op_data->op_code == LUSTRE_OPC_MKDIR && - lmv_dir_space_hashed(op_data->op_default_mea1) && - !lmv_dir_striped(lsm)) { - tgt = lmv_locate_tgt_qos(lmv, &op_data->op_mds); - if (tgt == ERR_PTR(-EAGAIN)) - tgt = lmv_locate_tgt_rr(lmv, &op_data->op_mds); - /* - * only update statfs when mkdir under dir with "space" hash, - * this means the cached statfs may be stale, and current mkdir - * may not follow QoS accurately, but it's not serious, and it - * avoids periodic statfs when client doesn't mkdir under - * "space" hashed directories. - * - * TODO: after MDT support QoS object allocation, also update - * statfs for 'lfs mkdir -i -1 ...", currently it's done in user - * space. - */ - if (!IS_ERR(tgt)) { - struct obd_device *obd; - - obd = container_of(lmv, struct obd_device, u.lmv); - lmv_statfs_check_update(obd, tgt); - } + tgt = lmv_tgt(lmv, oinfo->lmo_mds); + if (!tgt) + return ERR_PTR(-ENODEV); } else { tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea1, op_data->op_name, op_data->op_namelen, @@ -1818,6 +1690,78 @@ lmv_locate_tgt2(struct lmv_obd *lmv, struct md_op_data *op_data) &op_data->op_mds, true); } +int lmv_migrate_existence_check(struct lmv_obd *lmv, struct md_op_data *op_data) +{ + struct lu_tgt_desc *tgt; + struct ptlrpc_request *request; + int rc; + + LASSERT(lmv_dir_migrating(op_data->op_mea1)); + + tgt = lmv_locate_tgt(lmv, op_data); + if (IS_ERR(tgt)) + return PTR_ERR(tgt); + + rc = md_getattr_name(tgt->ltd_exp, op_data, &request); + if (!rc) { + ptlrpc_req_finished(request); + return -EEXIST; + } + + return rc; +} + +/* mkdir by QoS in two cases: + * 1. 'lfs mkdir -i -1' + * 2. parent default LMV master_mdt_index is -1 + * + * NB, mkdir by QoS only if parent is not striped, this is to avoid remote + * directories under striped directory. + */ +static inline bool lmv_op_qos_mkdir(const struct md_op_data *op_data) +{ + const struct lmv_stripe_md *lsm = op_data->op_default_mea1; + const struct lmv_user_md *lum = op_data->op_data; + + if (op_data->op_code != LUSTRE_OPC_MKDIR) + return false; + + if (lmv_dir_striped(op_data->op_mea1)) + return false; + + if (op_data->op_cli_flags & CLI_SET_MEA && lum && + (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) && + le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT) + return true; + + if (lsm && lsm->lsm_md_master_mdt_index == LMV_OFFSET_DEFAULT) + return true; + + return false; +} + +/* 'lfs mkdir -i ' */ +static inline bool lmv_op_user_specific_mkdir(const struct md_op_data *op_data) +{ + const struct lmv_user_md *lum = op_data->op_data; + + return op_data->op_code == LUSTRE_OPC_MKDIR && + op_data->op_cli_flags & CLI_SET_MEA && lum && + (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) && + le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT; +} + +/* parent default LMV master_mdt_index is not -1. */ +static inline bool +lmv_op_default_specific_mkdir(const struct md_op_data *op_data) +{ + return op_data->op_code == LUSTRE_OPC_MKDIR && + op_data->op_default_mea1 && + op_data->op_default_mea1->lsm_md_master_mdt_index != + LMV_OFFSET_DEFAULT; +} int lmv_create(struct obd_export *exp, struct md_op_data *op_data, const void *data, size_t datalen, umode_t mode, uid_t uid, gid_t gid, cfs_cap_t cap_effective, __u64 rdev, @@ -1830,7 +1774,7 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, ENTRY; - if (!lmv->desc.ld_active_tgt_count) + if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count) RETURN(-EIO); if (lmv_dir_bad_hash(op_data->op_mea1)) @@ -1839,20 +1783,9 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, if (lmv_dir_migrating(op_data->op_mea1)) { /* * if parent is migrating, create() needs to lookup existing - * name, to avoid creating new file under old layout of - * migrating directory, check old layout here. + * name in both old and new layout, check old layout on client. */ - tgt = lmv_locate_tgt(lmv, op_data); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - - rc = md_getattr_name(tgt->ltd_exp, op_data, request); - if (!rc) { - ptlrpc_req_finished(*request); - *request = NULL; - RETURN(-EEXIST); - } - + rc = lmv_migrate_existence_check(lmv, op_data); if (rc != -ENOENT) RETURN(rc); @@ -1863,26 +1796,44 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n", - (int)op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1), op_data->op_mds); + if (lmv_op_qos_mkdir(op_data)) { + tgt = lmv_locate_tgt_qos(lmv, &op_data->op_mds); + if (tgt == ERR_PTR(-EAGAIN)) + tgt = lmv_locate_tgt_rr(lmv, &op_data->op_mds); + /* + * only update statfs after QoS mkdir, this means the cached + * statfs may be stale, and current mkdir may not follow QoS + * accurately, but it's not serious, and avoids periodic statfs + * when client doesn't mkdir by QoS. + */ + if (!IS_ERR(tgt)) + lmv_statfs_check_update(obd, tgt); + } else if (lmv_op_user_specific_mkdir(op_data)) { + struct lmv_user_md *lum = op_data->op_data; + + op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset); + tgt = lmv_tgt(lmv, op_data->op_mds); + if (!tgt) + RETURN(-ENODEV); + } else if (lmv_op_default_specific_mkdir(op_data)) { + op_data->op_mds = + op_data->op_default_mea1->lsm_md_master_mdt_index; + tgt = lmv_tgt(lmv, op_data->op_mds); + if (!tgt) + RETURN(-ENODEV); + } + + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); - if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) { - /* Send the create request to the MDT where the object - * will be located */ - tgt = lmv_find_target(lmv, &op_data->op_fid2); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - - op_data->op_mds = tgt->ltd_index; - } - - CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n", - PFID(&op_data->op_fid2), op_data->op_mds); + CDEBUG(D_INODE, "CREATE name '%.*s' "DFID" on "DFID" -> mds #%x\n", + (int)op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid2), PFID(&op_data->op_fid1), + op_data->op_mds); op_data->op_flags |= MF_MDC_CANCEL_FID1; rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid, @@ -1900,15 +1851,16 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, const union ldlm_policy_data *policy, struct md_op_data *op_data, struct lustre_handle *lockh, __u64 extra_lock_flags) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; CDEBUG(D_INODE, "ENQUEUE on "DFID"\n", PFID(&op_data->op_fid1)); - tgt = lmv_find_target(lmv, &op_data->op_fid1); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1991,7 +1943,7 @@ static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt, RETURN(0); if (tgt == NULL) { - tgt = lmv_find_target(lmv, fid); + tgt = lmv_fid2tgt(lmv, fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); } @@ -2079,7 +2031,7 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); op_data->op_cap = cfs_curproc_cap_pack(); - parent_tgt = lmv_find_target(lmv, &op_data->op_fid1); + parent_tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(parent_tgt)) RETURN(PTR_ERR(parent_tgt)); @@ -2106,10 +2058,9 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, /* save it in fid4 temporarily for early cancel */ op_data->op_fid4 = lsm->lsm_md_oinfo[rc].lmo_fid; - sp_tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[rc].lmo_mds, - NULL); - if (IS_ERR(sp_tgt)) - RETURN(PTR_ERR(sp_tgt)); + sp_tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[rc].lmo_mds); + if (!sp_tgt) + RETURN(-ENODEV); /* * if parent is being migrated too, fill op_fid2 with target @@ -2126,24 +2077,32 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); op_data->op_fid2 = lsm->lsm_md_oinfo[rc].lmo_fid; - tp_tgt = lmv_get_target(lmv, - lsm->lsm_md_oinfo[rc].lmo_mds, - NULL); - if (IS_ERR(tp_tgt)) - RETURN(PTR_ERR(tp_tgt)); + tp_tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[rc].lmo_mds); + if (!tp_tgt) + RETURN(-ENODEV); } } else { sp_tgt = parent_tgt; } - child_tgt = lmv_find_target(lmv, &op_data->op_fid3); + child_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3); if (IS_ERR(child_tgt)) RETURN(PTR_ERR(child_tgt)); - if (!S_ISDIR(op_data->op_mode) && tp_tgt) - rc = __lmv_fid_alloc(lmv, &target_fid, tp_tgt->ltd_index); - else - rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data); + /* for directory, migrate to MDT specified by lum_stripe_offset; + * otherwise migrate to the target stripe of parent, but parent + * directory may have finished migration (normally current file too), + * allocate FID on MDT lum_stripe_offset, and server will check + * whether file was migrated already. + */ + if (S_ISDIR(op_data->op_mode) || !tp_tgt) { + struct lmv_user_md *lum = op_data->op_data; + + op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset); + } else { + op_data->op_mds = tp_tgt->ltd_index; + } + rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data); if (rc) RETURN(rc); @@ -2159,7 +2118,7 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, */ if (S_ISDIR(op_data->op_mode) && (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) { - tgt = lmv_find_target(lmv, &target_fid); + tgt = lmv_fid2tgt(lmv, &target_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); } else { @@ -2257,7 +2216,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, * target child does not exist, then it will send the request to the * target parent */ if (fid_is_sane(&op_data->op_fid4)) { - tgt = lmv_find_target(lmv, &op_data->op_fid4); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid4); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); } else { @@ -2285,7 +2244,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, } if (fid_is_sane(&op_data->op_fid3)) { - src_tgt = lmv_find_target(lmv, &op_data->op_fid3); + src_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3); if (IS_ERR(src_tgt)) RETURN(PTR_ERR(src_tgt)); @@ -2351,7 +2310,7 @@ rename: ptlrpc_req_finished(*request); *request = NULL; - tgt = lmv_find_target(lmv, &op_data->op_fid4); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid4); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2373,10 +2332,11 @@ rename: static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, void *ea, size_t ealen, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc = 0; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc = 0; + ENTRY; CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x/0x%x\n", @@ -2384,7 +2344,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, op_data->op_xvalid); op_data->op_flags |= MF_MDC_CANCEL_FID1; - tgt = lmv_find_target(lmv, &op_data->op_fid1); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2396,13 +2356,14 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; - tgt = lmv_find_target(lmv, fid); + tgt = lmv_fid2tgt(lmv, fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2511,9 +2472,9 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, break; } - tgt = lmv_get_target(ctxt->ldc_lmv, oinfo->lmo_mds, NULL); - if (IS_ERR(tgt)) { - rc = PTR_ERR(tgt); + tgt = lmv_tgt(ctxt->ldc_lmv, oinfo->lmo_mds); + if (!tgt) { + rc = -ENODEV; break; } @@ -2554,17 +2515,18 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, static int lmv_file_resync(struct obd_export *exp, struct md_op_data *data) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; rc = lmv_check_connect(obd); if (rc != 0) RETURN(rc); - tgt = lmv_find_target(lmv, &data->op_fid1); + tgt = lmv_fid2tgt(lmv, &data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2785,7 +2747,7 @@ int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } - tgt = lmv_find_target(lmv, &op_data->op_fid1); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2840,7 +2802,7 @@ retry: RETURN(PTR_ERR(parent_tgt)); if (likely(!fid_is_zero(&op_data->op_fid2))) { - tgt = lmv_find_target(lmv, &op_data->op_fid2); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); } else { @@ -2893,7 +2855,7 @@ retry: ptlrpc_req_finished(*request); *request = NULL; - tgt = lmv_find_target(lmv, &op_data->op_fid2); + tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2929,31 +2891,24 @@ static int lmv_precleanup(struct obd_device *obd) static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val) { - struct obd_device *obd; - struct lmv_obd *lmv; - int rc = 0; - ENTRY; - - obd = class_exp2obd(exp); - if (obd == NULL) { - CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n", - exp->exp_handle.h_cookie); - RETURN(-EINVAL); - } + struct obd_device *obd; + struct lmv_obd *lmv; + struct lu_tgt_desc *tgt; + int rc = 0; - lmv = &obd->u.lmv; - if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) { - int i; + ENTRY; - LASSERT(*vallen == sizeof(__u32)); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - /* - * All tgts should be connected when this gets called. - */ - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; + obd = class_exp2obd(exp); + if (obd == NULL) { + CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n", + exp->exp_handle.h_cookie); + RETURN(-EINVAL); + } + lmv = &obd->u.lmv; + if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) { + LASSERT(*vallen == sizeof(__u32)); + lmv_foreach_connected_tgt(lmv, tgt) { if (!obd_get_info(env, tgt->ltd_exp, keylen, key, vallen, val)) RETURN(0); @@ -2966,18 +2921,112 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, * Forwarding this request to first MDS, it should know LOV * desc. */ - rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key, - vallen, val); + tgt = lmv_tgt(lmv, 0); + if (!tgt) + RETURN(-ENODEV); + + rc = obd_get_info(env, tgt->ltd_exp, keylen, key, vallen, val); if (!rc && KEY_IS(KEY_CONN_DATA)) exp->exp_connect_data = *(struct obd_connect_data *)val; - RETURN(rc); - } else if (KEY_IS(KEY_TGT_COUNT)) { - *((int *)val) = lmv->desc.ld_tgt_count; - RETURN(0); - } + RETURN(rc); + } else if (KEY_IS(KEY_TGT_COUNT)) { + *((int *)val) = lmv->lmv_mdt_descs.ltd_tgts_size; + RETURN(0); + } - CDEBUG(D_IOCTL, "Invalid key\n"); - RETURN(-EINVAL); + CDEBUG(D_IOCTL, "Invalid key\n"); + RETURN(-EINVAL); +} + +static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa, + int *__rcs, struct ptlrpc_request_set *_set) +{ + struct obd_device *obddev = class_exp2obd(exp); + struct ptlrpc_request_set *set = _set; + struct lmv_obd *lmv = &obddev->u.lmv; + int tgt_count = lmv->lmv_mdt_count; + struct lu_tgt_desc *tgt; + struct fid_array *fat, **fas = NULL; + int i, rc, **rcs = NULL; + + if (!set) { + set = ptlrpc_prep_set(); + if (!set) + RETURN(-ENOMEM); + } + + /* split FIDs by targets */ + OBD_ALLOC(fas, sizeof(fas) * tgt_count); + if (fas == NULL) + GOTO(out, rc = -ENOMEM); + OBD_ALLOC(rcs, sizeof(int *) * tgt_count); + if (rcs == NULL) + GOTO(out_fas, rc = -ENOMEM); + + for (i = 0; i < fa->fa_nr; i++) { + unsigned int idx; + + rc = lmv_fld_lookup(lmv, &fa->fa_fids[i], &idx); + if (rc) { + CDEBUG(D_OTHER, "can't lookup "DFID": rc = %d\n", + PFID(&fa->fa_fids[i]), rc); + continue; + } + LASSERT(idx < tgt_count); + if (!fas[idx]) + OBD_ALLOC(fas[idx], offsetof(struct fid_array, + fa_fids[fa->fa_nr])); + if (!fas[idx]) + GOTO(out, rc = -ENOMEM); + if (!rcs[idx]) + OBD_ALLOC(rcs[idx], sizeof(int) * fa->fa_nr); + if (!rcs[idx]) + GOTO(out, rc = -ENOMEM); + + fat = fas[idx]; + fat->fa_fids[fat->fa_nr++] = fa->fa_fids[i]; + } + + lmv_foreach_connected_tgt(lmv, tgt) { + fat = fas[tgt->ltd_index]; + if (!fat || fat->fa_nr == 0) + continue; + rc = md_rmfid(tgt->ltd_exp, fat, rcs[tgt->ltd_index], set); + } + + rc = ptlrpc_set_wait(NULL, set); + if (rc == 0) { + int j = 0; + for (i = 0; i < tgt_count; i++) { + fat = fas[i]; + if (!fat || fat->fa_nr == 0) + continue; + /* copy FIDs back */ + memcpy(fa->fa_fids + j, fat->fa_fids, + fat->fa_nr * sizeof(struct lu_fid)); + /* copy rcs back */ + memcpy(__rcs + j, rcs[i], fat->fa_nr * sizeof(**rcs)); + j += fat->fa_nr; + } + } + if (set != _set) + ptlrpc_set_destroy(set); + +out: + for (i = 0; i < tgt_count; i++) { + if (fas && fas[i]) + OBD_FREE(fas[i], offsetof(struct fid_array, + fa_fids[fa->fa_nr])); + if (rcs && rcs[i]) + OBD_FREE(rcs[i], sizeof(int) * fa->fa_nr); + } + if (rcs) + OBD_FREE(rcs, sizeof(int *) * tgt_count); +out_fas: + if (fas) + OBD_FREE(fas, sizeof(fas) * tgt_count); + + RETURN(rc); } /** @@ -3016,14 +3065,9 @@ int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) || KEY_IS(KEY_DEFAULT_EASIZE)) { - int i, err = 0; - - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - tgt = lmv->tgts[i]; - - if (tgt == NULL || tgt->ltd_exp == NULL) - continue; + int err = 0; + lmv_foreach_connected_tgt(lmv, tgt) { err = obd_set_info_async(env, tgt->ltd_exp, keylen, key, vallen, val, set); if (err && rc == 0) @@ -3075,7 +3119,7 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, * set default value -1, so lmv_locate_tgt() knows this stripe * target is not initialized. */ - lsm->lsm_md_oinfo[i].lmo_mds = (u32)-1; + lsm->lsm_md_oinfo[i].lmo_mds = LMV_OFFSET_DEFAULT; if (!fid_is_sane(&lsm->lsm_md_oinfo[i].lmo_fid)) continue; @@ -3228,17 +3272,16 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, void *opaque) { struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + struct lu_tgt_desc *tgt; + int err; int rc = 0; - __u32 i; + ENTRY; LASSERT(fid != NULL); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - int err; - - if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) + lmv_foreach_connected_tgt(lmv, tgt) { + if (!tgt->ltd_active) continue; err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags, @@ -3253,9 +3296,10 @@ static int lmv_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh, void *data, __u64 *bits) { - struct lmv_obd *lmv = &exp->exp_obd->u.lmv; - struct lmv_tgt_desc *tgt = lmv->tgts[0]; - int rc; + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); + int rc; + ENTRY; if (tgt == NULL || tgt->ltd_exp == NULL) @@ -3269,37 +3313,38 @@ enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags, union ldlm_policy_data *policy, enum ldlm_mode mode, struct lustre_handle *lockh) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - enum ldlm_mode rc; - int tgt; - int i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + enum ldlm_mode rc; + struct lu_tgt_desc *tgt; + int i; + int index; + ENTRY; CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); - /* + /* * With DNE every object can have two locks in different namespaces: * lookup lock in space of MDT storing direntry and update/open lock in * space of MDT storing inode. Try the MDT that the FID maps to first, * since this can be easily found, and only try others if that fails. */ - for (i = 0, tgt = lmv_find_target_index(lmv, fid); - i < lmv->desc.ld_tgt_count; - i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) { - if (tgt < 0) { + for (i = 0, index = lmv_fid2tgt_index(lmv, fid); + i < lmv->lmv_mdt_descs.ltd_tgts_size; + i++, index = (index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size) { + if (index < 0) { CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n", - obd->obd_name, PFID(fid), tgt); - tgt = 0; + obd->obd_name, PFID(fid), index); + index = 0; } - if (lmv->tgts[tgt] == NULL || - lmv->tgts[tgt]->ltd_exp == NULL || - lmv->tgts[tgt]->ltd_active == 0) + tgt = lmv_tgt(lmv, index); + if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) continue; - rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid, - type, policy, mode, lockh); + rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode, + lockh); if (rc) RETURN(rc); } @@ -3311,20 +3356,21 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, struct obd_export *dt_exp, struct obd_export *md_exp, struct lustre_md *md) { - struct lmv_obd *lmv = &exp->exp_obd->u.lmv; - struct lmv_tgt_desc *tgt = lmv->tgts[0]; + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); - if (tgt == NULL || tgt->ltd_exp == NULL) - RETURN(-EINVAL); + if (!tgt || !tgt->ltd_exp) + return -EINVAL; - return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md); + return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md); } int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt = lmv->tgts[0]; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); + ENTRY; if (md->default_lmv) { @@ -3335,21 +3381,22 @@ int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) lmv_free_memmd(md->lmv); md->lmv = NULL; } - if (tgt == NULL || tgt->ltd_exp == NULL) + if (!tgt || !tgt->ltd_exp) RETURN(-EINVAL); - RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md)); + RETURN(md_free_lustre_md(tgt->ltd_exp, md)); } int lmv_set_open_replay_data(struct obd_export *exp, struct obd_client_handle *och, struct lookup_intent *it) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + ENTRY; - tgt = lmv_find_target(lmv, &och->och_fid); + tgt = lmv_fid2tgt(lmv, &och->och_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -3357,18 +3404,19 @@ int lmv_set_open_replay_data(struct obd_export *exp, } int lmv_clear_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och) + struct obd_client_handle *och) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + + ENTRY; - tgt = lmv_find_target(lmv, &och->och_fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_fid2tgt(lmv, &och->och_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - RETURN(md_clear_open_replay_data(tgt->ltd_exp, och)); + RETURN(md_clear_open_replay_data(tgt->ltd_exp, och)); } int lmv_intent_getattr_async(struct obd_export *exp, @@ -3377,45 +3425,51 @@ int lmv_intent_getattr_async(struct obd_export *exp, struct md_op_data *op_data = &minfo->mi_data; struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt = NULL; + struct lmv_tgt_desc *ptgt; + struct lmv_tgt_desc *ctgt; int rc; + ENTRY; if (!fid_is_sane(&op_data->op_fid2)) RETURN(-EINVAL); - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + ptgt = lmv_locate_tgt(lmv, op_data); + if (IS_ERR(ptgt)) + RETURN(PTR_ERR(ptgt)); + + ctgt = lmv_fid2tgt(lmv, &op_data->op_fid1); + if (IS_ERR(ctgt)) + RETURN(PTR_ERR(ctgt)); /* - * no special handle for remote dir, which needs to fetch both LOOKUP - * lock on parent, and then UPDATE lock on child MDT, which makes all - * complicated because this is done async. So only LOOKUP lock is - * fetched for remote dir, but considering remote dir is rare case, - * and not supporting it in statahead won't cause any issue, just leave - * it as is. + * remote object needs two RPCs to lookup and getattr, considering the + * complexity don't support statahead for now. */ + if (ctgt != ptgt) + RETURN(-EREMOTE); + + rc = md_intent_getattr_async(ptgt->ltd_exp, minfo); - rc = md_intent_getattr_async(tgt->ltd_exp, minfo); RETURN(rc); } int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid, __u64 *bits) + struct lu_fid *fid, __u64 *bits) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; + + ENTRY; - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + tgt = lmv_fid2tgt(lmv, fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits); - RETURN(rc); + rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits); + RETURN(rc); } int lmv_get_fid_from_lsm(struct obd_export *exp, @@ -3443,49 +3497,46 @@ int lmv_get_fid_from_lsm(struct obd_export *exp, int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, struct obd_quotactl *oqctl) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt = lmv->tgts[0]; - int rc = 0; - __u32 i; - __u64 curspace, curinodes; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); + __u64 curspace, curinodes; + int rc = 0; + ENTRY; - if (tgt == NULL || - tgt->ltd_exp == NULL || - !tgt->ltd_active || - lmv->desc.ld_tgt_count == 0) { + if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) { CERROR("master lmv inactive\n"); RETURN(-EIO); } - if (oqctl->qc_cmd != Q_GETOQUOTA) { - rc = obd_quotactl(tgt->ltd_exp, oqctl); - RETURN(rc); - } + if (oqctl->qc_cmd != Q_GETOQUOTA) { + rc = obd_quotactl(tgt->ltd_exp, oqctl); + RETURN(rc); + } - curspace = curinodes = 0; - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + curspace = curinodes = 0; + lmv_foreach_connected_tgt(lmv, tgt) { int err; - tgt = lmv->tgts[i]; - if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) + if (!tgt->ltd_active) continue; - err = obd_quotactl(tgt->ltd_exp, oqctl); - if (err) { - CERROR("getquota on mdt %d failed. %d\n", i, err); - if (!rc) - rc = err; - } else { - curspace += oqctl->qc_dqblk.dqb_curspace; - curinodes += oqctl->qc_dqblk.dqb_curinodes; - } - } - oqctl->qc_dqblk.dqb_curspace = curspace; - oqctl->qc_dqblk.dqb_curinodes = curinodes; + err = obd_quotactl(tgt->ltd_exp, oqctl); + if (err) { + CERROR("getquota on mdt %d failed. %d\n", + tgt->ltd_index, err); + if (!rc) + rc = err; + } else { + curspace += oqctl->qc_dqblk.dqb_curspace; + curinodes += oqctl->qc_dqblk.dqb_curinodes; + } + } + oqctl->qc_dqblk.dqb_curspace = curspace; + oqctl->qc_dqblk.dqb_curinodes = curinodes; - RETURN(rc); + RETURN(rc); } static int lmv_merge_attr(struct obd_export *exp, @@ -3586,6 +3637,7 @@ struct md_ops lmv_md_ops = { .m_revalidate_lock = lmv_revalidate_lock, .m_get_fid_from_lsm = lmv_get_fid_from_lsm, .m_unpackmd = lmv_unpackmd, + .m_rmfid = lmv_rmfid, }; static int __init lmv_init(void)