X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=96b6b5d7f765e52882b49af3de17e8253397ab73;hp=5e7c92516fda13829039957ead73cf84bf1fb33f;hb=HEAD;hpb=c4c17fa4a3f5d9c3df44e19ab3385c8de655cdef diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 5e7c925..682e9f4 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ #define DEBUG_SUBSYSTEM S_LMV @@ -53,10 +52,12 @@ #include #include #include +#include #include #include "lmv_internal.h" static int lmv_check_connect(struct obd_device *obd); +static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data); void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, int activate) @@ -129,44 +130,92 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, return rc; } -struct obd_uuid *lmv_get_uuid(struct obd_export *exp) +struct lu_tgt_desc *lmv_tgt_retry(struct lmv_obd *lmv, __u32 index) +{ + struct obd_device *obd = lmv2obd_dev(lmv); + struct lu_tgt_desc *tgt; + static time64_t next_print; + time64_t retry_limit = 0; + time64_t now; + unsigned int level; + int rc; + + might_sleep(); +retry: + tgt = lmv_tgt(lmv, index); + if (likely(tgt && tgt->ltd_exp)) + return tgt; + + now = ktime_get_seconds(); + if (retry_limit == 0) { + level = now > next_print ? D_WARNING : D_INFO; + retry_limit = now + RECONNECT_DELAY_MAX; + } else if (now > retry_limit) { + level = D_ERROR; + } else { + level = D_INFO; + } + CDEBUG_LIMIT(level, index < lmv->lmv_mdt_count ? + "%s: MDT index %u/%u not configured\n" : + "%s: MDT index %u more than MDT count %u\n", + obd->obd_name, index, lmv->lmv_mdt_count); + + if (index >= LOV_V1_INSANE_STRIPE_COUNT) + return NULL; + + if (now > next_print) { + LCONSOLE_INFO("%s: wait %ds while client connects to new MDT\n", + obd->obd_name, (int)(retry_limit - now)); + next_print = retry_limit + 600; + } + if (now < retry_limit) { + rc = schedule_timeout_interruptible(cfs_time_seconds(1)); + if (rc == 0) + goto retry; + } + + return NULL; +} + +static struct obd_uuid *lmv_get_uuid(struct obd_export *exp) { struct lmv_obd *lmv = &exp->exp_obd->u.lmv; struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); - return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp); + return tgt ? obd_get_uuid(tgt->ltd_exp) : NULL; } static int lmv_notify(struct obd_device *obd, struct obd_device *watched, enum obd_notify_event ev) { - struct obd_connect_data *conn_data; - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_uuid *uuid; - int rc = 0; - ENTRY; - - if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) { - CERROR("unexpected notification of %s %s!\n", - watched->obd_type->typ_name, - watched->obd_name); - RETURN(-EINVAL); - } - - uuid = &watched->u.cli.cl_target_uuid; - if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) { - /* - * Set MDC as active before notifying the observer, so the - * observer can use the MDC normally. - */ - rc = lmv_set_mdc_active(lmv, uuid, - ev == OBD_NOTIFY_ACTIVE); - if (rc) { - CERROR("%sactivation of %s failed: %d\n", - ev == OBD_NOTIFY_ACTIVE ? "" : "de", - uuid->uuid, rc); - RETURN(rc); - } + struct obd_connect_data *conn_data; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_uuid *uuid; + int rc = 0; + + ENTRY; + + if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) { + CERROR("unexpected notification of %s %s!\n", + watched->obd_type->typ_name, + watched->obd_name); + RETURN(-EINVAL); + } + + uuid = &watched->u.cli.cl_target_uuid; + if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) { + /* + * Set MDC as active before notifying the observer, so the + * observer can use the MDC normally. + */ + rc = lmv_set_mdc_active(lmv, uuid, + ev == OBD_NOTIFY_ACTIVE); + if (rc) { + CERROR("%sactivation of %s failed: %d\n", + ev == OBD_NOTIFY_ACTIVE ? "" : "de", + uuid->uuid, rc); + RETURN(rc); + } } else if (ev == OBD_NOTIFY_OCD) { conn_data = &watched->u.cli.cl_import->imp_connect_data; /* @@ -177,9 +226,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched, obd->obd_self_export->exp_connect_data = *conn_data; } - /* - * Pass the notification up the chain. - */ + /* Pass the notification up the chain. */ if (obd->obd_observer) rc = obd_notify(obd->obd_observer, watched, ev); @@ -195,6 +242,7 @@ static int lmv_connect(const struct lu_env *env, struct lustre_handle conn = { 0 }; struct obd_export *exp; int rc; + ENTRY; rc = class_connect(&conn, obd, cluuid); @@ -265,8 +313,8 @@ static int lmv_init_ea_size(struct obd_export *exp, __u32 easize, rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize); if (rc) { - CERROR("%s: obd_init_ea_size() failed on MDT target %d:" - " rc = %d\n", obd->obd_name, tgt->ltd_index, rc); + CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d\n", + obd->obd_name, tgt->ltd_index, rc); break; } } @@ -275,14 +323,15 @@ static int lmv_init_ea_size(struct obd_export *exp, __u32 easize, #define MAX_STRING_SIZE 128 -int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) +static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) { - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_device *mdc_obd; - struct obd_export *mdc_exp; - struct lu_fld_target target; - int rc; - ENTRY; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_device *mdc_obd; + struct obd_export *mdc_exp; + struct lu_fld_target target; + int rc; + + ENTRY; mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME, &obd->obd_uuid); @@ -302,14 +351,12 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) rc = obd_connect(NULL, &mdc_exp, mdc_obd, &obd->obd_uuid, &lmv->conn_data, lmv->lmv_cache); - if (rc) { - CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc); - RETURN(rc); - } + if (rc) { + CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc); + RETURN(rc); + } - /* - * Init fid sequence client for this mdc and add new fld target. - */ + /* Init fid sequence client for this mdc and add new fld target. */ rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA); if (rc) RETURN(rc); @@ -329,9 +376,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) } if (obd->obd_observer) { - /* - * Tell the observer about the new target. - */ + /* Tell the observer about the new target. */ rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd, OBD_NOTIFY_ACTIVE); if (rc) { @@ -353,8 +398,8 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) } CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n", - mdc_obd->obd_name, mdc_obd->obd_uuid.uuid, - atomic_read(&obd->obd_refcount)); + mdc_obd->obd_name, mdc_obd->obd_uuid.uuid, + kref_read(&obd->obd_refcount)); lmv_statfs_check_update(obd, tgt); @@ -481,6 +526,7 @@ out_disc: continue; --lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count; + obd_register_observer(tgt->ltd_exp->exp_obd, NULL); obd_disconnect(tgt->ltd_exp); } @@ -489,46 +535,56 @@ out_disc: static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) { - struct lmv_obd *lmv = &obd->u.lmv; - struct obd_device *mdc_obd; - int rc; - ENTRY; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_device *mdc_obd; + int rc; - LASSERT(tgt != NULL); - LASSERT(obd != NULL); + ENTRY; + + LASSERT(tgt != NULL); + LASSERT(obd != NULL); - mdc_obd = class_exp2obd(tgt->ltd_exp); + mdc_obd = class_exp2obd(tgt->ltd_exp); - if (mdc_obd) { - mdc_obd->obd_force = obd->obd_force; - mdc_obd->obd_fail = obd->obd_fail; - mdc_obd->obd_no_recov = obd->obd_no_recov; + if (mdc_obd) { + mdc_obd->obd_force = obd->obd_force; + mdc_obd->obd_fail = obd->obd_fail; + mdc_obd->obd_no_recov = obd->obd_no_recov; if (lmv->lmv_tgts_kobj) sysfs_remove_link(lmv->lmv_tgts_kobj, mdc_obd->obd_name); } - rc = obd_fid_fini(tgt->ltd_exp->exp_obd); + rc = lu_qos_del_tgt(&lmv->lmv_qos, tgt); + if (rc) + CERROR("%s: Can't del target from QoS table: rc = %d\n", + tgt->ltd_exp->exp_obd->obd_name, rc); + + rc = fld_client_del_target(&lmv->lmv_fld, tgt->ltd_index); if (rc) - CERROR("Can't finanize fids factory\n"); + CERROR("%s: Can't del fld targets: rc = %d\n", + tgt->ltd_exp->exp_obd->obd_name, rc); - CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n", - tgt->ltd_exp->exp_obd->obd_name, - tgt->ltd_exp->exp_obd->obd_uuid.uuid); + rc = obd_fid_fini(tgt->ltd_exp->exp_obd); + if (rc) + CERROR("%s: Can't finalize fids factory: rc = %d\n", + tgt->ltd_exp->exp_obd->obd_name, rc); - obd_register_observer(tgt->ltd_exp->exp_obd, NULL); - rc = obd_disconnect(tgt->ltd_exp); - if (rc) { - if (tgt->ltd_active) { - CERROR("Target %s disconnect error %d\n", - tgt->ltd_uuid.uuid, rc); - } - } + CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n", + tgt->ltd_exp->exp_obd->obd_name, + tgt->ltd_exp->exp_obd->obd_uuid.uuid); - lmv_activate_target(lmv, tgt, 0); - tgt->ltd_exp = NULL; - RETURN(0); + lmv_activate_target(lmv, tgt, 0); + obd_register_observer(tgt->ltd_exp->exp_obd, NULL); + rc = obd_disconnect(tgt->ltd_exp); + if (rc) { + CERROR("%s: Target %s disconnect error: rc = %d\n", + tgt->ltd_exp->exp_obd->obd_name, + tgt->ltd_uuid.uuid, rc); + } + tgt->ltd_exp = NULL; + RETURN(0); } static int lmv_disconnect(struct obd_export *exp) @@ -546,24 +602,34 @@ static int lmv_disconnect(struct obd_export *exp) if (lmv->lmv_tgts_kobj) kobject_put(lmv->lmv_tgts_kobj); - if (!lmv->connected) - class_export_put(exp); - rc = class_disconnect(exp); lmv->connected = 0; + rc = class_disconnect(exp); RETURN(rc); } +static void lmv_statfs_update(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt, + struct obd_statfs *osfs) +{ + spin_lock(&lmv->lmv_lock); + tgt->ltd_statfs = *osfs; + tgt->ltd_statfs_age = ktime_get_seconds(); + spin_unlock(&lmv->lmv_lock); + set_bit(LQ_DIRTY, &lmv->lmv_qos.lq_flags); +} + static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obddev = class_exp2obd(exp); - struct lmv_obd *lmv = &obddev->u.lmv; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; struct getinfo_fid2path *gf; struct lmv_tgt_desc *tgt; struct getinfo_fid2path *remote_gf = NULL; struct lu_fid root_fid; int remote_gf_size = 0; + int currentisenc = 0; + int globalisenc = 0; int rc; gf = karg; @@ -579,10 +645,23 @@ repeat_fid2path: if (rc != 0 && rc != -EREMOTE) GOTO(out_fid2path, rc); + if (gf->gf_u.gf_path[0] == '/') { + /* by convention, server side (mdt_path_current()) puts + * a leading '/' to tell client that we are dealing with + * an encrypted file + */ + currentisenc = 1; + globalisenc = 1; + } else { + currentisenc = 0; + } + /* If remote_gf != NULL, it means just building the - * path on the remote MDT, copy this path segement to gf */ + * path on the remote MDT, copy this path segment to gf. + */ if (remote_gf != NULL) { struct getinfo_fid2path *ori_gf; + int oldisenc = 0; char *ptr; int len; @@ -592,13 +671,22 @@ repeat_fid2path: GOTO(out_fid2path, rc = -EOVERFLOW); ptr = ori_gf->gf_u.gf_path; + oldisenc = ptr[0] == '/'; len = strlen(gf->gf_u.gf_path); - /* move the current path to the right to release space - * for closer-to-root part */ - memmove(ptr + len + 1, ptr, strlen(ori_gf->gf_u.gf_path)); - memcpy(ptr, gf->gf_u.gf_path, len); - ptr[len] = '/'; + if (len) { + /* move the current path to the right to release space + * for closer-to-root part + */ + memmove(ptr + len - currentisenc + 1 + globalisenc, + ptr + oldisenc, + strlen(ori_gf->gf_u.gf_path) - oldisenc + 1); + if (globalisenc) + *(ptr++) = '/'; + memcpy(ptr, gf->gf_u.gf_path + currentisenc, + len - currentisenc); + ptr[len - currentisenc] = '/'; + } } CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n", @@ -611,11 +699,11 @@ repeat_fid2path: /* sigh, has to go to another MDT to do path building further */ if (remote_gf == NULL) { - remote_gf_size = sizeof(*remote_gf) + PATH_MAX; + remote_gf_size = sizeof(*remote_gf) + len - sizeof(*gf); OBD_ALLOC(remote_gf, remote_gf_size); if (remote_gf == NULL) GOTO(out_fid2path, rc = -ENOMEM); - remote_gf->gf_pathlen = PATH_MAX; + remote_gf->gf_pathlen = len - sizeof(*gf); } if (!fid_is_sane(&gf->gf_fid)) { @@ -702,8 +790,7 @@ static int lmv_hsm_ct_unregister(struct obd_device *obd, unsigned int cmd, /* unregister request (call from llapi_hsm_copytool_fini) */ lmv_foreach_connected_tgt(lmv, tgt) - /* best effort: try to clean as much as possible - * (continue on error) */ + /* try to clean as much as possible (continue on error) */ obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg); /* Whatever the result, remove copytool from kuc groups. @@ -763,14 +850,14 @@ static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd, /* All or nothing: try to register to all MDS. * In case of failure, unregister from previous MDS, - * except if it because of inactive target. */ + * except if it because of inactive target. + */ lmv_foreach_connected_tgt(lmv, tgt) { err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg); if (err) { if (tgt->ltd_active) { /* permanent error */ - CERROR("%s: iocontrol MDC %s on MDT" - " idx %d cmd %x: err = %d\n", + CERROR("%s: iocontrol MDC %s on MDT idx %d cmd %x: err = %d\n", lmv2obd_dev(lmv)->obd_name, tgt->ltd_uuid.uuid, tgt->ltd_index, cmd, err); @@ -788,8 +875,8 @@ static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd, GOTO(err_kkuc_rem, rc); } /* else: transient error. - * kuc will register to the missing MDT - * when it is back */ + * kuc will register to the missing MDT when it is back + */ } else { any_set = true; } @@ -812,18 +899,36 @@ err_fput: static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obddev = class_exp2obd(exp); - struct lmv_obd *lmv = &obddev->u.lmv; + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; struct lu_tgt_desc *tgt = NULL; int set = 0; __u32 count = lmv->lmv_mdt_count; int rc = 0; ENTRY; - + CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n", + exp->exp_obd->obd_name, cmd, len, karg, uarg); if (count == 0) RETURN(-ENOTTY); + /* exit early for unknown ioctl types */ + if (unlikely(_IOC_TYPE(cmd) != 'f' && !IOC_OSC_SET_ACTIVE_ALLOW(cmd))) + RETURN(OBD_IOC_ERROR(obd->obd_name, cmd, "unknown", -ENOTTY)); + + /* handle commands that don't use @karg first */ + switch (cmd) { + case LL_IOC_GET_CONNECT_FLAGS: + tgt = lmv_tgt(lmv, 0); + rc = -ENODATA; + if (tgt && tgt->ltd_exp) + rc = obd_iocontrol(cmd, tgt->ltd_exp, len, NULL, uarg); + RETURN(rc); + } + + if (unlikely(karg == NULL)) + RETURN(OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", -EINVAL)); + switch (cmd) { case IOC_OBD_STATFS: { struct obd_ioctl_data *data = karg; @@ -837,7 +942,10 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(-ENODEV); tgt = lmv_tgt(lmv, index); - if (!tgt || !tgt->ltd_active) + if (!tgt) + RETURN(-EAGAIN); + + if (!tgt->ltd_active) RETURN(-ENODATA); mdc_obd = class_exp2obd(tgt->ltd_exp); @@ -855,15 +963,16 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, 0); if (rc) RETURN(rc); + lmv_statfs_update(lmv, tgt, &stat_buf); if (copy_to_user(data->ioc_pbuf1, &stat_buf, - min((int) data->ioc_plen1, - (int) sizeof(stat_buf)))) + min_t(int, data->ioc_plen1, sizeof(stat_buf)))) RETURN(-EFAULT); break; } case OBD_IOC_QUOTACTL: { struct if_quotactl *qctl = karg; struct obd_quotactl *oqctl; + struct obd_import *imp; if (qctl->qc_valid == QC_MDTIDX) { tgt = lmv_tgt(lmv, qctl->qc_idx); @@ -882,9 +991,19 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(-EINVAL); } - if (!tgt || !tgt->ltd_exp) + if (!tgt) + RETURN(-ENODEV); + + if (!tgt->ltd_exp) RETURN(-EINVAL); + imp = class_exp2cliimp(tgt->ltd_exp); + if (!tgt->ltd_active && imp->imp_state != LUSTRE_IMP_IDLE) { + qctl->qc_valid = QC_MDTIDX; + qctl->obd_uuid = tgt->ltd_uuid; + RETURN(-ENODATA); + } + OBD_ALLOC_PTR(oqctl); if (!oqctl) RETURN(-ENOMEM); @@ -892,20 +1011,13 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, QCTL_COPY(oqctl, qctl); rc = obd_quotactl(tgt->ltd_exp, oqctl); if (rc == 0) { - QCTL_COPY(qctl, oqctl); + QCTL_COPY_NO_PNAME(qctl, oqctl); qctl->qc_valid = QC_MDTIDX; qctl->obd_uuid = tgt->ltd_uuid; } OBD_FREE_PTR(oqctl); break; } - case LL_IOC_GET_CONNECT_FLAGS: { - tgt = lmv_tgt(lmv, 0); - rc = -ENODATA; - if (tgt && tgt->ltd_exp) - rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); - break; - } case LL_IOC_FID2MDTIDX: { struct lu_fid *fid = karg; int mdt_index; @@ -915,7 +1027,8 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(rc); /* Note: this is from llite(see ll_dir_ioctl()), @uarg does not - * point to user space memory for FID2MDTIDX. */ + * point to user space memory for FID2MDTIDX. + */ *(__u32 *)uarg = mdt_index; break; } @@ -954,9 +1067,9 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, if (reqcount == 0) RETURN(0); - /* if the request is about a single fid - * or if there is a single MDS, no need to split - * the request. */ + /* if the request is about a single fid or if there is a single + * MDS, no need to split the request. + */ if (reqcount == 1 || count == 1) { tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[0].hui_fid); if (IS_ERR(tgt)) @@ -1019,10 +1132,11 @@ hsm_req_err: } case LL_IOC_HSM_CT_START: { struct lustre_kernelcomm *lk = karg; + if (lk->lk_flags & LK_FLG_STOP) - rc = lmv_hsm_ct_unregister(obddev, cmd, len, lk, uarg); + rc = lmv_hsm_ct_unregister(obd, cmd, len, lk, uarg); else - rc = lmv_hsm_ct_register(obddev, cmd, len, lk, uarg); + rc = lmv_hsm_ct_register(obd, cmd, len, lk, uarg); break; } default: @@ -1031,24 +1145,27 @@ hsm_req_err: int err; /* ll_umount_begin() sets force flag but for lmv, not - * mdc. Let's pass it through */ + * mdc. Let's pass it through + */ mdc_obd = class_exp2obd(tgt->ltd_exp); - mdc_obd->obd_force = obddev->obd_force; + mdc_obd->obd_force = obd->obd_force; err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); if (err) { if (tgt->ltd_active) { - CERROR("error: iocontrol MDC %s on MDT" - " idx %d cmd %x: err = %d\n", - tgt->ltd_uuid.uuid, - tgt->ltd_index, cmd, err); + OBD_IOC_ERROR(obd->obd_name, cmd, + tgt->ltd_uuid.uuid, err); if (!rc) rc = err; + if (unlikely(err == -ENOTTY)) + break; } - } else + } else { set = 1; + } } if (!set && !rc) rc = -EIO; + break; } RETURN(rc); } @@ -1088,11 +1205,46 @@ int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, RETURN(rc); } +static u32 qos_exclude_hashfh(const void *data, u32 len, u32 seed) +{ + const char *name = data; + + return hashlen_hash(cfs_hashlen_string((void *)(unsigned long)seed, + name)); +} + +static int qos_exclude_cmpfn(struct rhashtable_compare_arg *arg, + const void *obj) +{ + const struct qos_exclude_prefix *prefix = obj; + const char *name = arg->key; + + return strcmp(name, prefix->qep_name); +} + +const struct rhashtable_params qos_exclude_hash_params = { + .key_len = 1, /* actually variable */ + .key_offset = offsetof(struct qos_exclude_prefix, qep_name), + .head_offset = offsetof(struct qos_exclude_prefix, qep_hash), + .hashfn = qos_exclude_hashfh, + .obj_cmpfn = qos_exclude_cmpfn, + .automatic_shrinking = true, +}; + +void qos_exclude_prefix_free(void *vprefix, void *data) +{ + struct qos_exclude_prefix *prefix = vprefix; + + list_del(&prefix->qep_list); + kfree(prefix); +} + static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_desc *desc; - struct lnet_process_id lnet_id; + struct lmv_desc *desc; + struct lnet_processid lnet_id; + struct qos_exclude_prefix *prefix; int i = 0; int rc; @@ -1120,14 +1272,15 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) lmv->max_easize = 0; spin_lock_init(&lmv->lmv_lock); + INIT_LIST_HEAD(&lmv->lmv_qos_exclude_list); /* * initialize rr_index to lower 32bit of netid, so that client * can distribute subdirs evenly from the beginning. */ - while (LNetGetId(i++, &lnet_id) != -ENOENT) { - if (LNET_NETTYP(LNET_NIDNET(lnet_id.nid)) != LOLND) { - lmv->lmv_qos_rr_index = (u32)lnet_id.nid; + while (LNetGetId(i++, &lnet_id, true) != -ENOENT) { + if (!nid_is_lo0(&lnet_id.nid)) { + lmv->lmv_qos_rr_index = nidhash(&lnet_id.nid); break; } } @@ -1147,7 +1300,32 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) CWARN("%s: error initialize target table: rc = %d\n", obd->obd_name, rc); - RETURN(rc); + rc = rhashtable_init(&lmv->lmv_qos_exclude_hash, + &qos_exclude_hash_params); + if (rc) { + CERROR("%s: qos exclude hash initalize failed: %d\n", + obd->obd_name, rc); + RETURN(rc); + } + + prefix = kmalloc(sizeof(*prefix), __GFP_ZERO); + if (!prefix) + GOTO(out, rc = -ENOMEM); + /* Apache Spark creates a _temporary directory for staging files */ + strcpy(prefix->qep_name, "_temporary"); + rc = rhashtable_insert_fast(&lmv->lmv_qos_exclude_hash, + &prefix->qep_hash, qos_exclude_hash_params); + if (rc) { + kfree(prefix); + GOTO(out, rc); + } + + list_add_tail(&prefix->qep_list, &lmv->lmv_qos_exclude_list); + GOTO(out, rc); +out: + if (rc) + rhashtable_destroy(&lmv->lmv_qos_exclude_hash); + return rc; } static int lmv_cleanup(struct obd_device *obd) @@ -1158,7 +1336,14 @@ static int lmv_cleanup(struct obd_device *obd) ENTRY; + rhashtable_free_and_destroy(&lmv->lmv_qos_exclude_hash, + qos_exclude_prefix_free, NULL); fld_client_fini(&lmv->lmv_fld); + fld_client_debugfs_fini(&lmv->lmv_fld); + + lprocfs_obd_cleanup(obd); + lprocfs_free_md_stats(obd); + lmv_foreach_tgt_safe(lmv, tgt, tmp) lmv_del_target(lmv, tgt); lu_tgt_descs_fini(&lmv->lmv_mdt_descs); @@ -1173,12 +1358,14 @@ static int lmv_process_config(struct obd_device *obd, size_t len, void *buf) int gen; __u32 index; int rc; + ENTRY; switch (lcfg->lcfg_command) { case LCFG_ADD_MDC: /* modify_mdc_tgts add 0:lustre-clilmv 1:lustre-MDT0000_UUID - * 2:0 3:1 4:lustre-MDT0000-mdc_UUID */ + * 2:0 3:1 4:lustre-MDT0000-mdc_UUID + */ if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) GOTO(out, rc = -EINVAL); @@ -1198,8 +1385,10 @@ out: RETURN(rc); } -static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags) +static int lmv_select_statfs_mdt(struct obd_export *exp, struct lmv_obd *lmv, + u32 flags) { + bool large_nid = exp_connect_flags2(exp) & OBD_CONNECT2_LARGE_NID; int i; if (flags & OBD_STATFS_FOR_MDT0) @@ -1210,15 +1399,16 @@ static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags) /* choose initial MDT for this client */ for (i = 0;; i++) { - struct lnet_process_id lnet_id; - if (LNetGetId(i, &lnet_id) == -ENOENT) + struct lnet_processid lnet_id; + + if (LNetGetId(i, &lnet_id, large_nid) == -ENOENT) break; - if (LNET_NETTYP(LNET_NIDNET(lnet_id.nid)) != LOLND) { + if (!nid_is_lo0(&lnet_id.nid)) { /* We dont need a full 64-bit modulus, just enough * to distribute the requests across MDTs evenly. */ - lmv->lmv_statfs_start = (u32)lnet_id.nid % + lmv->lmv_statfs_start = nidhash(&lnet_id.nid) % lmv->lmv_mdt_count; break; } @@ -1237,6 +1427,7 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, __u32 i; __u32 idx; int rc = 0; + int err = 0; ENTRY; @@ -1245,7 +1436,7 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); /* distribute statfs among MDTs */ - idx = lmv_select_statfs_mdt(lmv, flags); + idx = lmv_select_statfs_mdt(exp, lmv, flags); for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++, idx++) { idx = idx % lmv->lmv_mdt_descs.ltd_tgts_size; @@ -1253,24 +1444,30 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, if (!tgt || !tgt->ltd_exp) continue; - rc = obd_statfs(env, tgt->ltd_exp, temp, max_age, flags); + rc = obd_statfs(env, tgt->ltd_exp, temp, max_age, + flags | OBD_STATFS_NESTED); if (rc) { CERROR("%s: can't stat MDS #%d: rc = %d\n", tgt->ltd_exp->exp_obd->obd_name, i, rc); + err = rc; + /* Try another MDT */ + if (flags & OBD_STATFS_SUM) + continue; GOTO(out_free_temp, rc); } - if (temp->os_state & OS_STATE_SUM || + if (temp->os_state & OS_STATFS_SUM || flags == OBD_STATFS_FOR_MDT0) { - /* reset to the last aggregated values - * and don't sum with non-aggrated data */ - /* If the statfs is from mount, it needs to retrieve - * necessary information from MDT0. i.e. mount does - * not need the merged osfs from all of MDT. Also - * clients can be mounted as long as MDT0 is in - * service */ + /* reset to the last aggregated values and don't sum + * with non-aggrated data + * + * If the statfs is from mount, it needs to retrieve + * necessary info from MDT0. i.e. mount does not need + * the merged osfs from all of MDT. Also clients can be + * mounted as long as MDT0 is in service + */ *osfs = *temp; - break; + GOTO(out_free_temp, rc); } if (i == 0) { @@ -1283,14 +1480,15 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_granted += temp->os_granted; } } - - EXIT; + /* There is no stats from some MDTs, data incomplete */ + if (err) + rc = err; out_free_temp: OBD_FREE(temp, sizeof(*temp)); - return rc; + RETURN(rc); } -static int lmv_statfs_update(void *cookie, int rc) +static int lmv_statfs_cb(void *cookie, int rc) { struct obd_info *oinfo = cookie; struct obd_device *obd = oinfo->oi_obd; @@ -1302,13 +1500,8 @@ static int lmv_statfs_update(void *cookie, int rc) * NB: don't deactivate TGT upon error, because we may not trigger async * statfs any longer, then there is no chance to activate TGT. */ - if (!rc) { - spin_lock(&lmv->lmv_lock); - tgt->ltd_statfs = *osfs; - tgt->ltd_statfs_age = ktime_get_seconds(); - spin_unlock(&lmv->lmv_lock); - lmv->lmv_qos.lq_dirty = 1; - } + if (!rc) + lmv_statfs_update(lmv, tgt, osfs); return rc; } @@ -1319,7 +1512,7 @@ int lmv_statfs_check_update(struct obd_device *obd, struct lmv_tgt_desc *tgt) struct obd_info oinfo = { .oi_obd = obd, .oi_tgt = tgt, - .oi_cb_up = lmv_statfs_update, + .oi_cb_up = lmv_statfs_cb, }; int rc; @@ -1346,6 +1539,8 @@ static int lmv_get_root(struct obd_export *exp, const char *fileset, RETURN(-ENODEV); rc = md_get_root(tgt->ltd_exp, fileset, fid); + if (!rc) + lmv->lmv_setup_time = ktime_get_seconds(); RETURN(rc); } @@ -1456,11 +1651,27 @@ static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } -static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv, __u32 *mdt) +static inline bool tgt_qos_is_usable(struct lmv_obd *lmv, + struct lu_tgt_desc *tgt, time64_t now) { - struct lu_tgt_desc *tgt; + struct obd_import *imp = class_exp2cliimp(tgt->ltd_exp); + u32 maxage = lmv->lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage; + + return tgt->ltd_exp && tgt->ltd_active && + !(tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE) && + (now - imp->imp_setup_time > (maxage >> 1) || + now - lmv->lmv_setup_time < (maxage << 1)); +} + +static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv, + struct md_op_data *op_data) +{ + struct lu_tgt_desc *tgt, *cur = NULL; + time64_t now = ktime_get_seconds(); + __u64 total_avail = 0; __u64 total_weight = 0; __u64 cur_weight = 0; + int total_usable = 0; __u64 rand; int rc; @@ -1479,13 +1690,41 @@ static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv, __u32 *mdt) GOTO(unlock, tgt = ERR_PTR(rc)); lmv_foreach_tgt(lmv, tgt) { - tgt->ltd_qos.ltq_usable = 0; - if (!tgt->ltd_exp || !tgt->ltd_active) + if (!tgt_qos_is_usable(lmv, tgt, now)) { + tgt->ltd_qos.ltq_usable = 0; continue; - + } + /* update one hour overdue statfs */ + if (now - tgt->ltd_statfs_age > + 60 * lmv->lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage) + lmv_statfs_check_update(lmv2obd_dev(lmv), tgt); tgt->ltd_qos.ltq_usable = 1; - lu_tgt_qos_weight_calc(tgt); + lu_tgt_qos_weight_calc(tgt, true); + if (tgt->ltd_index == op_data->op_mds) + cur = tgt; + total_avail += tgt->ltd_qos.ltq_avail; total_weight += tgt->ltd_qos.ltq_weight; + total_usable++; + } + + /* If current MDT has above-average space and dir is not already using + * round-robin to spread across more MDTs, stay on the parent MDT + * to avoid creating needless remote MDT directories. Remote dirs + * close to the root balance space more effectively than bottom dirs, + * so prefer to create remote dirs at top level of directory tree. + * "16 / (dir_depth + 10)" is the factor to make it less likely + * for top-level directories to stay local unless they have more than + * average free space, while deep dirs prefer local until more full. + * depth=0 -> 160%, depth=3 -> 123%, depth=6 -> 100%, + * depth=9 -> 84%, depth=12 -> 73%, depth=15 -> 64% + */ + if (!lmv_op_default_rr_mkdir(op_data)) { + rand = total_avail * 16 / + (total_usable * (op_data->op_dir_depth + 10)); + if (cur && cur->ltd_qos.ltq_avail >= rand) { + tgt = cur; + GOTO(unlock, tgt); + } } rand = lu_prandom_u64_max(total_weight); @@ -1498,9 +1737,8 @@ static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv, __u32 *mdt) if (cur_weight < rand) continue; - *mdt = tgt->ltd_index; ltd_qos_update(&lmv->lmv_mdt_descs, tgt, &total_weight); - GOTO(unlock, rc = 0); + GOTO(unlock, tgt); } /* no proper target found */ @@ -1511,43 +1749,106 @@ unlock: return tgt; } -static struct lu_tgt_desc *lmv_locate_tgt_rr(struct lmv_obd *lmv, __u32 *mdt) +static struct lu_tgt_desc *lmv_locate_tgt_rr(struct lmv_obd *lmv) { + time64_t now = ktime_get_seconds(); struct lu_tgt_desc *tgt; int i; int index; ENTRY; - spin_lock(&lmv->lmv_qos.lq_rr.lqr_alloc); + spin_lock(&lmv->lmv_lock); for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++) { index = (i + lmv->lmv_qos_rr_index) % lmv->lmv_mdt_descs.ltd_tgts_size; tgt = lmv_tgt(lmv, index); - if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) + if (!tgt || !tgt_qos_is_usable(lmv, tgt, now)) continue; - *mdt = tgt->ltd_index; - lmv->lmv_qos_rr_index = (*mdt + 1) % + lmv->lmv_qos_rr_index = (tgt->ltd_index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size; - spin_unlock(&lmv->lmv_qos.lq_rr.lqr_alloc); + spin_unlock(&lmv->lmv_lock); RETURN(tgt); } - spin_unlock(&lmv->lmv_qos.lq_rr.lqr_alloc); + spin_unlock(&lmv->lmv_lock); RETURN(ERR_PTR(-ENODEV)); } +/* locate MDT which is less full (avoid the most full MDT) */ +static struct lu_tgt_desc *lmv_locate_tgt_lf(struct lmv_obd *lmv) +{ + struct lu_tgt_desc *min = NULL; + struct lu_tgt_desc *tgt; + __u64 avail = 0; + __u64 rand; + + ENTRY; + + if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs)) + RETURN(ERR_PTR(-EAGAIN)); + + down_write(&lmv->lmv_qos.lq_rw_sem); + + if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs)) + GOTO(unlock, tgt = ERR_PTR(-EAGAIN)); + + lmv_foreach_tgt(lmv, tgt) { + if (!tgt->ltd_exp || !tgt->ltd_active || + (tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE)) { + tgt->ltd_qos.ltq_usable = 0; + continue; + } + + tgt->ltd_qos.ltq_usable = 1; + lu_tgt_qos_weight_calc(tgt, true); + avail += tgt->ltd_qos.ltq_avail; + if (!min || min->ltd_qos.ltq_avail > tgt->ltd_qos.ltq_avail) + min = tgt; + } + + /* avoid the most full MDT */ + if (min) + avail -= min->ltd_qos.ltq_avail; + + rand = lu_prandom_u64_max(avail); + avail = 0; + lmv_foreach_connected_tgt(lmv, tgt) { + if (!tgt->ltd_qos.ltq_usable) + continue; + + if (tgt == min) + continue; + + avail += tgt->ltd_qos.ltq_avail; + if (avail < rand) + continue; + + GOTO(unlock, tgt); + } + + /* no proper target found */ + GOTO(unlock, tgt = ERR_PTR(-EAGAIN)); +unlock: + up_write(&lmv->lmv_qos.lq_rw_sem); + + RETURN(tgt); +} + +/* locate MDT by file name, for striped directory, the file name hash decides + * which stripe its dirent is stored. + */ static struct lmv_tgt_desc * -lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, +lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_object *lso, const char *name, int namelen, struct lu_fid *fid, - __u32 *mds, bool post_migrate) + __u32 *mds, bool new_layout) { struct lmv_tgt_desc *tgt; const struct lmv_oinfo *oinfo; - if (!lmv_dir_striped(lsm) || !namelen) { + if (!lmv_dir_striped(lso) || !namelen) { tgt = lmv_fid2tgt(lmv, fid); if (IS_ERR(tgt)) return tgt; @@ -1556,20 +1857,23 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, return tgt; } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) { - if (cfs_fail_val >= lsm->lsm_md_stripe_count) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) { + if (cfs_fail_val >= lso->lso_lsm.lsm_md_stripe_count) return ERR_PTR(-EBADF); - oinfo = &lsm->lsm_md_oinfo[cfs_fail_val]; + oinfo = &lso->lso_lsm.lsm_md_oinfo[cfs_fail_val]; } else { - oinfo = lsm_name_to_stripe_info(lsm, name, namelen, - post_migrate); + oinfo = lsm_name_to_stripe_info(lso, name, namelen, new_layout); if (IS_ERR(oinfo)) return ERR_CAST(oinfo); } + /* check stripe FID is sane */ + if (!fid_is_sane(&oinfo->lmo_fid)) + return ERR_PTR(-ENODEV); + *fid = oinfo->lmo_fid; *mds = oinfo->lmo_mds; - tgt = lmv_tgt(lmv, oinfo->lmo_mds); + tgt = lmv_tgt_retry(lmv, oinfo->lmo_mds); CDEBUG(D_INODE, "locate MDT %u parent "DFID"\n", *mds, PFID(fid)); @@ -1581,14 +1885,15 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, * * For striped directory, it will locate the stripe by name hash, if hash_type * is unknown, it will return the stripe specified by 'op_data->op_stripe_index' - * which is set outside, and if dir is migrating, 'op_data->op_post_migrate' + * which is set outside, and if dir is migrating, 'op_data->op_new_layout' * indicates whether old or new layout is used to locate. * - * For plain direcotry, it just locate the MDT of op_data->op_fid1. + * For plain directory, it just locate the MDT of op_data->op_fid1. * - * \param[in] lmv LMV device - * \param[in] op_data client MD stack parameters, name, namelen - * mds_num etc. + * \param[in] lmv LMV device + * \param[in/out] op_data client MD stack parameters, name, namelen etc, + * op_mds and op_fid1 will be updated if op_lso1 + * indicates fid1 represents a striped directory. * * retval pointer to the lmv_tgt_desc if succeed. * ERR_PTR(errno) if failed. @@ -1596,26 +1901,28 @@ lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, struct lmv_tgt_desc * lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data) { - struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lmv_stripe_md *lsm; struct lmv_oinfo *oinfo; struct lmv_tgt_desc *tgt; - if (lmv_dir_foreign(lsm)) + if (lmv_dir_foreign(op_data->op_lso1)) return ERR_PTR(-ENODATA); /* During creating VOLATILE file, it should honor the mdt * index if the file under striped dir is being restored, see - * ct_restore(). */ + * ct_restore(). + */ if (op_data->op_bias & MDS_CREATE_VOLATILE && op_data->op_mds != LMV_OFFSET_DEFAULT) { tgt = lmv_tgt(lmv, op_data->op_mds); if (!tgt) return ERR_PTR(-ENODEV); - if (lmv_dir_striped(lsm)) { + if (lmv_dir_striped(op_data->op_lso1)) { int i; /* refill the right parent fid */ + lsm = &op_data->op_lso1->lso_lsm; for (i = 0; i < lsm->lsm_md_stripe_count; i++) { oinfo = &lsm->lsm_md_oinfo[i]; if (oinfo->lmo_mds == op_data->op_mds) { @@ -1627,7 +1934,9 @@ lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data) if (i == lsm->lsm_md_stripe_count) op_data->op_fid1 = lsm->lsm_md_oinfo[0].lmo_fid; } - } else if (lmv_dir_bad_hash(lsm)) { + } else if (lmv_dir_bad_hash(op_data->op_lso1)) { + lsm = &op_data->op_lso1->lso_lsm; + LASSERT(op_data->op_stripe_index < lsm->lsm_md_stripe_count); oinfo = &lsm->lsm_md_oinfo[op_data->op_stripe_index]; @@ -1637,10 +1946,10 @@ lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data) if (!tgt) return ERR_PTR(-ENODEV); } else { - tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea1, + tgt = lmv_locate_tgt_by_name(lmv, op_data->op_lso1, op_data->op_name, op_data->op_namelen, &op_data->op_fid1, &op_data->op_mds, - op_data->op_post_migrate); + op_data->op_new_layout); } return tgt; @@ -1654,28 +1963,28 @@ lmv_locate_tgt2(struct lmv_obd *lmv, struct md_op_data *op_data) int rc; LASSERT(op_data->op_name); - if (lmv_dir_migrating(op_data->op_mea2)) { + if (lmv_dir_layout_changing(op_data->op_lso2)) { struct lu_fid fid1 = op_data->op_fid1; - struct lmv_stripe_md *lsm1 = op_data->op_mea1; + struct lmv_stripe_object *lso1 = op_data->op_lso1; struct ptlrpc_request *request = NULL; /* * avoid creating new file under old layout of migrating * directory, check it here. */ - tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea2, + tgt = lmv_locate_tgt_by_name(lmv, op_data->op_lso2, op_data->op_name, op_data->op_namelen, &op_data->op_fid2, &op_data->op_mds, false); if (IS_ERR(tgt)) RETURN(tgt); op_data->op_fid1 = op_data->op_fid2; - op_data->op_mea1 = op_data->op_mea2; + op_data->op_lso1 = op_data->op_lso2; rc = md_getattr_name(tgt->ltd_exp, op_data, &request); op_data->op_fid1 = fid1; - op_data->op_mea1 = lsm1; + op_data->op_lso1 = lso1; if (!rc) { - ptlrpc_req_finished(request); + ptlrpc_req_put(request); RETURN(ERR_PTR(-EEXIST)); } @@ -1683,18 +1992,20 @@ lmv_locate_tgt2(struct lmv_obd *lmv, struct md_op_data *op_data) RETURN(ERR_PTR(rc)); } - return lmv_locate_tgt_by_name(lmv, op_data->op_mea2, op_data->op_name, - op_data->op_namelen, &op_data->op_fid2, - &op_data->op_mds, true); + return lmv_locate_tgt_by_name(lmv, op_data->op_lso2, + op_data->op_name, op_data->op_namelen, + &op_data->op_fid2, &op_data->op_mds, + true); } -int lmv_migrate_existence_check(struct lmv_obd *lmv, struct md_op_data *op_data) +int lmv_old_layout_lookup(struct lmv_obd *lmv, struct md_op_data *op_data) { struct lu_tgt_desc *tgt; struct ptlrpc_request *request; int rc; - LASSERT(lmv_dir_migrating(op_data->op_mea1)); + LASSERT(lmv_dir_layout_changing(op_data->op_lso1)); + LASSERT(!op_data->op_new_layout); tgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(tgt)) @@ -1702,41 +2013,63 @@ int lmv_migrate_existence_check(struct lmv_obd *lmv, struct md_op_data *op_data) rc = md_getattr_name(tgt->ltd_exp, op_data, &request); if (!rc) { - ptlrpc_req_finished(request); + ptlrpc_req_put(request); return -EEXIST; } return rc; } -/* mkdir by QoS in two cases: - * 1. 'lfs mkdir -i -1' - * 2. parent default LMV master_mdt_index is -1 +/* mkdir by QoS upon 'lfs mkdir -i -1'. * * NB, mkdir by QoS only if parent is not striped, this is to avoid remote * directories under striped directory. */ -static inline bool lmv_op_qos_mkdir(const struct md_op_data *op_data) +static inline bool lmv_op_user_qos_mkdir(const struct md_op_data *op_data) { - const struct lmv_stripe_md *lsm = op_data->op_default_mea1; const struct lmv_user_md *lum = op_data->op_data; if (op_data->op_code != LUSTRE_OPC_MKDIR) return false; - if (lmv_dir_striped(op_data->op_mea1)) + if (lmv_dir_striped(op_data->op_lso1)) + return false; + + return (op_data->op_cli_flags & CLI_SET_MEA) && lum && + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC && + le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT; +} + +/* mkdir by QoS if either ROOT or parent default LMV is space balanced. */ +static inline bool lmv_op_default_qos_mkdir(const struct md_op_data *op_data) +{ + const struct lmv_stripe_object *lso = op_data->op_default_lso1; + + if (op_data->op_code != LUSTRE_OPC_MKDIR) + return false; + + if (lmv_dir_striped(op_data->op_lso1)) return false; - if (op_data->op_cli_flags & CLI_SET_MEA && lum && - (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) && - le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT) - return true; + return (op_data->op_flags & MF_QOS_MKDIR) || + (lso && lso->lso_lsm.lsm_md_master_mdt_index == + LMV_OFFSET_DEFAULT); +} - if (lsm && lsm->lsm_md_master_mdt_index == LMV_OFFSET_DEFAULT) - return true; +/* if parent default LMV is space balanced, and + * 1. max_inherit_rr is set + * 2. or parent is ROOT + * mkdir roundrobin. Or if parent doesn't have default LMV, while ROOT default + * LMV requests roundrobin mkdir, do the same. + * NB, this needs to check server is balanced, which is done by caller. + */ +static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data) +{ + const struct lmv_stripe_object *lso = op_data->op_default_lso1; - return false; + return (op_data->op_flags & MF_RR_MKDIR) || + (lso && lso->lso_lsm.lsm_md_max_inherit_rr != + LMV_INHERIT_RR_NONE) || fid_is_root(&op_data->op_fid1); } /* 'lfs mkdir -i ' */ @@ -1756,74 +2089,170 @@ static inline bool lmv_op_default_specific_mkdir(const struct md_op_data *op_data) { return op_data->op_code == LUSTRE_OPC_MKDIR && - op_data->op_default_mea1 && - op_data->op_default_mea1->lsm_md_master_mdt_index != + op_data->op_default_lso1 && + op_data->op_default_lso1->lso_lsm.lsm_md_master_mdt_index != LMV_OFFSET_DEFAULT; } -int lmv_create(struct obd_export *exp, struct md_op_data *op_data, - const void *data, size_t datalen, umode_t mode, uid_t uid, - gid_t gid, cfs_cap_t cap_effective, __u64 rdev, - struct ptlrpc_request **request) + +/* locate MDT by space usage */ +static struct lu_tgt_desc *lmv_locate_tgt_by_space(struct lmv_obd *lmv, + struct md_op_data *op_data, + struct lmv_tgt_desc *tgt) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; - int rc; + struct lmv_tgt_desc *tmp = tgt; + + tgt = lmv_locate_tgt_qos(lmv, op_data); + if (tgt == ERR_PTR(-EAGAIN)) { + if (ltd_qos_is_balanced(&lmv->lmv_mdt_descs) && + !lmv_op_default_rr_mkdir(op_data) && + !lmv_op_user_qos_mkdir(op_data) && + !(tmp->ltd_statfs.os_state & OS_STATFS_NOCREATE)) { + /* if not necessary, don't create remote directory. */ + tgt = tmp; + } else { + tgt = lmv_locate_tgt_rr(lmv); + /* if no MDT chosen, use parent MDT */ + if (IS_ERR(tgt)) + tgt = tmp; + } + if (!IS_ERR(tgt)) + lmv_statfs_check_update(lmv2obd_dev(lmv), tgt); + } - ENTRY; + if (!IS_ERR(tgt)) + op_data->op_mds = tgt->ltd_index; - if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count) - RETURN(-EIO); + /* If space balance was called because the original target was marked + * NOCREATE, periodically check whether the state has changed. + */ + if (tmp != tgt && tmp->ltd_statfs.os_state & OS_STATFS_NOCREATE) + lmv_statfs_check_update(lmv2obd_dev(lmv), tmp); - if (lmv_dir_bad_hash(op_data->op_mea1)) - RETURN(-EBADF); + return tgt; +} - if (lmv_dir_migrating(op_data->op_mea1)) { - /* - * if parent is migrating, create() needs to lookup existing - * name in both old and new layout, check old layout on client. - */ - rc = lmv_migrate_existence_check(lmv, op_data); - if (rc != -ENOENT) - RETURN(rc); +static bool lmv_qos_exclude(struct lmv_obd *lmv, struct md_op_data *op_data) +{ + const char *name = op_data->op_name; + size_t namelen = op_data->op_namelen; + char buf[NAME_MAX + 1]; + struct qos_exclude_prefix *prefix; + char *p; + + /* skip encrypted files */ + if (op_data->op_file_encctx) + return false; - op_data->op_post_migrate = true; - } + /* name length may not be validated yet */ + if (namelen > NAME_MAX) + return false; + + p = strrchr(name, '.'); + if (p) { + namelen = p - name; + if (!namelen) + return false; + strncpy(buf, name, namelen); + buf[namelen] = '\0'; + name = buf; + } + + prefix = rhashtable_lookup_fast(&lmv->lmv_qos_exclude_hash, name, + qos_exclude_hash_params); + return prefix != NULL; +} + +struct lmv_tgt_desc *lmv_locate_tgt_create(struct obd_device *obd, + struct lmv_obd *lmv, + struct md_op_data *op_data) +{ + struct lmv_tgt_desc *tgt; + + ENTRY; tgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); + RETURN(tgt); - if (lmv_op_qos_mkdir(op_data)) { - tgt = lmv_locate_tgt_qos(lmv, &op_data->op_mds); - if (tgt == ERR_PTR(-EAGAIN)) - tgt = lmv_locate_tgt_rr(lmv, &op_data->op_mds); - /* - * only update statfs after QoS mkdir, this means the cached - * statfs may be stale, and current mkdir may not follow QoS - * accurately, but it's not serious, and avoids periodic statfs - * when client doesn't mkdir by QoS. - */ - if (!IS_ERR(tgt)) - lmv_statfs_check_update(obd, tgt); - } else if (lmv_op_user_specific_mkdir(op_data)) { + /* the order to apply policy in mkdir: + * 1. is "lfs mkdir -i N"? mkdir on MDT N. + * 2. is "lfs mkdir -i -1"? mkdir by space usage. + * 3. is starting MDT specified in default LMV? mkdir on MDT N. + * 4. is default LMV space balanced? mkdir by space usage. + * + * If the existing parent or specific MDT selected is deactivated + * with OS_STATFS_NOCREATE then select a different MDT by QOS. + */ + if (lmv_op_user_specific_mkdir(op_data)) { struct lmv_user_md *lum = op_data->op_data; op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset); tgt = lmv_tgt(lmv, op_data->op_mds); if (!tgt) - RETURN(-ENODEV); + RETURN(ERR_PTR(-ENODEV)); + if (unlikely(tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE)) + GOTO(new_tgt, -EAGAIN); + } else if (lmv_op_user_qos_mkdir(op_data)) { + tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt); + if (IS_ERR(tgt)) + RETURN(tgt); } else if (lmv_op_default_specific_mkdir(op_data)) { - op_data->op_mds = - op_data->op_default_mea1->lsm_md_master_mdt_index; + struct lmv_stripe_md *lsm = &op_data->op_default_lso1->lso_lsm; + + op_data->op_mds = lsm->lsm_md_master_mdt_index; tgt = lmv_tgt(lmv, op_data->op_mds); if (!tgt) - RETURN(-ENODEV); + RETURN(ERR_PTR(-ENODEV)); + if (unlikely(tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE)) + GOTO(new_tgt, -EAGAIN); + } else if ((lmv_op_default_qos_mkdir(op_data) && + !lmv_qos_exclude(lmv, op_data)) || + tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE) { +new_tgt: + tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt); + if (IS_ERR(tgt)) + RETURN(tgt); } + RETURN(tgt); +} + +static int lmv_create(struct obd_export *exp, struct md_op_data *op_data, + const void *data, size_t datalen, umode_t mode, uid_t uid, + gid_t gid, kernel_cap_t cap_effective, __u64 rdev, + struct ptlrpc_request **request) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct mdt_body *repbody; + int rc; + + ENTRY; + + if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count) + RETURN(-EIO); + + if (lmv_dir_bad_hash(op_data->op_lso1)) + RETURN(-EBADF); + + if (lmv_dir_layout_changing(op_data->op_lso1)) { + /* + * if parent is migrating, create() needs to lookup existing + * name in both old and new layout, check old layout on client. + */ + rc = lmv_old_layout_lookup(lmv, op_data); + if (rc != -ENOENT) + RETURN(rc); + + op_data->op_new_layout = true; + } + + tgt = lmv_locate_tgt_create(obd, lmv, op_data); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); +retry: rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); @@ -1841,7 +2270,30 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2)); } - RETURN(rc); + + /* dir restripe needs to send to MDT where dir is located */ + if (rc != -EREMOTE || + !(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) + RETURN(rc); + + repbody = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); + if (repbody == NULL) + RETURN(-EPROTO); + + /* Not cross-ref case, just get out of here. */ + if (likely(!(repbody->mbo_valid & OBD_MD_MDS))) + RETURN(rc); + + op_data->op_fid2 = repbody->mbo_fid1; + ptlrpc_req_put(*request); + *request = NULL; + + tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + op_data->op_mds = tgt->ltd_index; + goto retry; } static int @@ -1872,7 +2324,7 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, } int -lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data, +lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **preq) { struct obd_device *obd = exp->exp_obd; @@ -1884,7 +2336,11 @@ lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data, ENTRY; retry: - tgt = lmv_locate_tgt(lmv, op_data); + if (op_data->op_namelen == 2 && + op_data->op_name[0] == '.' && op_data->op_name[1] == '.') + tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); + else + tgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1894,7 +2350,7 @@ retry: rc = md_getattr_name(tgt->ltd_exp, op_data, preq); if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) { - ptlrpc_req_finished(*preq); + ptlrpc_req_put(*preq); *preq = NULL; goto retry; } @@ -1911,7 +2367,7 @@ retry: op_data->op_namelen = 0; op_data->op_name = NULL; - ptlrpc_req_finished(*preq); + ptlrpc_req_put(*preq); *preq = NULL; goto retry; @@ -1921,11 +2377,11 @@ retry: } #define md_op_data_fid(op_data, fl) \ - (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \ - fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \ - fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \ - fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ - NULL) + (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \ + fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \ + fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \ + fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ + NULL) static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt, struct md_op_data *op_data, __u32 op_tgt, @@ -1935,6 +2391,7 @@ static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt, struct lmv_obd *lmv = &exp->exp_obd->u.lmv; union ldlm_policy_data policy = { { 0 } }; int rc = 0; + ENTRY; if (!fid_is_sane(fid)) @@ -1963,16 +2420,17 @@ static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt, } /* - * llite passes fid of an target inode in op_data->op_fid1 and id of directory in - * op_data->op_fid2 + * llite passes fid of an target inode in op_data->op_fid1 and id of directory + * in op_data->op_fid2 */ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, - struct ptlrpc_request **request) + struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; int rc; + ENTRY; LASSERT(op_data->op_namelen != 0); @@ -1983,15 +2441,13 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); - op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_cap = current_cap(); tgt = lmv_locate_tgt2(lmv, op_data); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - /* - * Cancel UPDATE lock on child (fid1). - */ + /* Cancel UPDATE lock on child (fid1). */ op_data->op_flags |= MF_MDC_CANCEL_FID2; rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_index, LCK_EX, MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); @@ -2003,19 +2459,66 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } +/* migrate the top directory */ +static inline bool lmv_op_topdir_migrate(const struct md_op_data *op_data) +{ + if (!S_ISDIR(op_data->op_mode)) + return false; + + if (lmv_dir_layout_changing(op_data->op_lso1)) + return false; + + return true; +} + +/* migrate top dir to specific MDTs */ +static inline bool lmv_topdir_specific_migrate(const struct md_op_data *op_data) +{ + const struct lmv_user_md *lum = op_data->op_data; + + if (!lmv_op_topdir_migrate(op_data)) + return false; + + return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT; +} + +/* migrate top dir in QoS mode if user issued "lfs migrate -m -1..." */ +static inline bool lmv_topdir_qos_migrate(const struct md_op_data *op_data) +{ + const struct lmv_user_md *lum = op_data->op_data; + + if (!lmv_op_topdir_migrate(op_data)) + return false; + + return le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT; +} + +static inline bool lmv_subdir_specific_migrate(const struct md_op_data *op_data) +{ + const struct lmv_user_md *lum = op_data->op_data; + + if (!S_ISDIR(op_data->op_mode)) + return false; + + if (!lmv_dir_layout_changing(op_data->op_lso1)) + return false; + + return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT; +} + static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, const char *name, size_t namelen, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lmv_stripe_object *lso = op_data->op_lso1; struct lmv_tgt_desc *parent_tgt; struct lmv_tgt_desc *sp_tgt; struct lmv_tgt_desc *tp_tgt = NULL; struct lmv_tgt_desc *child_tgt; struct lmv_tgt_desc *tgt; - struct lu_fid target_fid; + struct lu_fid target_fid = { 0 }; int rc; ENTRY; @@ -2027,36 +2530,22 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); - op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_cap = current_cap(); parent_tgt = lmv_fid2tgt(lmv, &op_data->op_fid1); if (IS_ERR(parent_tgt)) RETURN(PTR_ERR(parent_tgt)); - if (lmv_dir_striped(lsm)) { - __u32 hash_type = lsm->lsm_md_hash_type; - __u32 stripe_count = lsm->lsm_md_stripe_count; + if (lmv_dir_striped(lso)) { + const struct lmv_oinfo *oinfo; - /* - * old stripes are appended after new stripes for migrating - * directory. - */ - if (lmv_dir_migrating(lsm)) { - hash_type = lsm->lsm_md_migrate_hash; - stripe_count -= lsm->lsm_md_migrate_offset; - } - - rc = lmv_name_to_stripe_index(hash_type, stripe_count, name, - namelen); - if (rc < 0) - RETURN(rc); - - if (lmv_dir_migrating(lsm)) - rc += lsm->lsm_md_migrate_offset; + oinfo = lsm_name_to_stripe_info(lso, name, namelen, false); + if (IS_ERR(oinfo)) + RETURN(PTR_ERR(oinfo)); - /* save it in fid4 temporarily for early cancel */ - op_data->op_fid4 = lsm->lsm_md_oinfo[rc].lmo_fid; - sp_tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[rc].lmo_mds); + /* save source stripe FID in fid4 temporarily for ELC */ + op_data->op_fid4 = oinfo->lmo_fid; + sp_tgt = lmv_tgt_retry(lmv, oinfo->lmo_mds); if (!sp_tgt) RETURN(-ENODEV); @@ -2064,20 +2553,21 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, * if parent is being migrated too, fill op_fid2 with target * stripe fid, otherwise the target stripe is not created yet. */ - if (lmv_dir_migrating(lsm)) { - hash_type = lsm->lsm_md_hash_type & - ~LMV_HASH_FLAG_MIGRATION; - stripe_count = lsm->lsm_md_migrate_offset; - - rc = lmv_name_to_stripe_index(hash_type, stripe_count, - name, namelen); - if (rc < 0) - RETURN(rc); - - op_data->op_fid2 = lsm->lsm_md_oinfo[rc].lmo_fid; - tp_tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[rc].lmo_mds); + if (lmv_dir_layout_changing(lso)) { + oinfo = lsm_name_to_stripe_info(lso, name, namelen, + true); + if (IS_ERR(oinfo)) + RETURN(PTR_ERR(oinfo)); + + op_data->op_fid2 = oinfo->lmo_fid; + tp_tgt = lmv_tgt_retry(lmv, oinfo->lmo_mds); if (!tp_tgt) RETURN(-ENODEV); + + /* parent unchanged and update namespace only */ + if (lu_fid_eq(&op_data->op_fid4, &op_data->op_fid2) && + op_data->op_bias & MDS_MIGRATE_NSONLY) + RETURN(-EALREADY); } } else { sp_tgt = parent_tgt; @@ -2087,19 +2577,56 @@ static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(child_tgt)) RETURN(PTR_ERR(child_tgt)); - /* for directory, migrate to MDT specified by lum_stripe_offset; - * otherwise migrate to the target stripe of parent, but parent - * directory may have finished migration (normally current file too), - * allocate FID on MDT lum_stripe_offset, and server will check - * whether file was migrated already. - */ - if (S_ISDIR(op_data->op_mode) || !tp_tgt) { + if (lmv_topdir_specific_migrate(op_data)) { struct lmv_user_md *lum = op_data->op_data; op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset); - } else { + } else if (lmv_topdir_qos_migrate(op_data)) { + tgt = lmv_locate_tgt_lf(lmv); + if (tgt == ERR_PTR(-EAGAIN)) + tgt = lmv_locate_tgt_rr(lmv); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + op_data->op_mds = tgt->ltd_index; + } else if (lmv_subdir_specific_migrate(op_data)) { + struct lmv_user_md *lum = op_data->op_data; + __u32 i; + + LASSERT(tp_tgt); + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + /* adjust MDTs in lum, since subdir is located on where + * its parent stripe is, not the first specified MDT. + */ + for (i = 0; i < le32_to_cpu(lum->lum_stripe_count); + i++) { + if (le32_to_cpu(lum->lum_objects[i].lum_mds) == + tp_tgt->ltd_index) + break; + } + + if (i == le32_to_cpu(lum->lum_stripe_count)) + RETURN(-ENODEV); + + lum->lum_objects[i].lum_mds = + lum->lum_objects[0].lum_mds; + lum->lum_objects[0].lum_mds = + cpu_to_le32(tp_tgt->ltd_index); + } + /* NB, the above adjusts subdir migration for command like + * "lfs migrate -m 0,1,2 ...", but for migration like + * "lfs migrate -m 0 -c 2 ...", the top dir is migrated to MDT0 + * and MDT1, however its subdir may be migrated to MDT1 and MDT2 + */ + + lum->lum_stripe_offset = cpu_to_le32(tp_tgt->ltd_index); + op_data->op_mds = tp_tgt->ltd_index; + } else if (tp_tgt) { op_data->op_mds = tp_tgt->ltd_index; + } else { + op_data->op_mds = sp_tgt->ltd_index; } + rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data); if (rc) RETURN(rc); @@ -2199,7 +2726,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); - op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_cap = current_cap(); op_data->op_name = new; op_data->op_namelen = newlen; @@ -2212,7 +2739,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, * orphan, and we can only check orphan on the local MDT right now, so * we send rename request to the MDT where target child is located. If * target child does not exist, then it will send the request to the - * target parent */ + * target parent + */ if (fid_is_sane(&op_data->op_fid4)) { tgt = lmv_fid2tgt(lmv, &op_data->op_fid4); if (IS_ERR(tgt)) @@ -2287,7 +2815,7 @@ rename: rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen, request); if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) { - ptlrpc_req_finished(*request); + ptlrpc_req_put(*request); *request = NULL; goto retry; } @@ -2305,7 +2833,7 @@ rename: op_data->op_fid4 = body->mbo_fid1; - ptlrpc_req_finished(*request); + ptlrpc_req_put(*request); *request = NULL; tgt = lmv_fid2tgt(lmv, &op_data->op_fid4); @@ -2379,7 +2907,7 @@ struct stripe_dirent { struct lmv_dir_ctxt { struct lmv_obd *ldc_lmv; struct md_op_data *ldc_op_data; - struct md_callback *ldc_cb_op; + struct md_readdir_info *ldc_mrinfo; __u64 ldc_hash; int ldc_count; struct stripe_dirent ldc_stripes[0]; @@ -2452,7 +2980,7 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, /* @hash should be the last dirent hash */ LASSERTF(hash <= end, - "ctxt@%p stripe@%p hash %llx end %llx\n", + "ctxt@%px stripe@%px hash %llx end %llx\n", ctxt, stripe, hash, end); /* unload last page */ stripe_dirent_unload(stripe); @@ -2464,13 +2992,13 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, hash = end; } - oinfo = &op_data->op_mea1->lsm_md_oinfo[stripe_index]; + oinfo = &op_data->op_lso1->lso_lsm.lsm_md_oinfo[stripe_index]; if (!oinfo->lmo_root) { rc = -ENOENT; break; } - tgt = lmv_tgt(ctxt->ldc_lmv, oinfo->lmo_mds); + tgt = lmv_tgt_retry(ctxt->ldc_lmv, oinfo->lmo_mds); if (!tgt) { rc = -ENODEV; break; @@ -2481,7 +3009,7 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, op_data->op_fid2 = oinfo->lmo_fid; op_data->op_data = oinfo->lmo_root; - rc = md_read_page(tgt->ltd_exp, op_data, ctxt->ldc_cb_op, hash, + rc = md_read_page(tgt->ltd_exp, op_data, ctxt->ldc_mrinfo, hash, &stripe->sd_page); op_data->op_fid1 = fid; @@ -2502,8 +3030,8 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt, LASSERT(!ent); /* treat error as eof, so dir can be partially accessed */ stripe->sd_eof = true; - LCONSOLE_WARN("dir "DFID" stripe %d readdir failed: %d, " - "directory is partially accessed!\n", + ctxt->ldc_mrinfo->mr_partial_readdir_rc = rc; + LCONSOLE_WARN("dir "DFID" stripe %d readdir failed: %d, directory is partially accessed!\n", PFID(&ctxt->ldc_op_data->op_fid1), stripe_index, rc); } @@ -2603,7 +3131,8 @@ static struct lu_dirent *lmv_dirent_next(struct lmv_dir_ctxt *ctxt) * * \param[in] exp obd export refer to LMV * \param[in] op_data hold those MD parameters of read_entry - * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry + * \param[in] mrinfo ldlm callback being used in enqueue in mdc_read_entry, + * and partial readdir result will be stored in it. * \param[in] offset starting hash offset * \param[out] ppage the page holding the entry. Note: because the entry * will be accessed in upper layer, so we need hold the @@ -2615,8 +3144,8 @@ static struct lu_dirent *lmv_dirent_next(struct lmv_dir_ctxt *ctxt) */ static int lmv_striped_read_page(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, - __u64 offset, struct page **ppage) + struct md_readdir_info *mrinfo, __u64 offset, + struct page **ppage) { struct page *page = NULL; struct lu_dirpage *dp; @@ -2629,10 +3158,12 @@ static int lmv_striped_read_page(struct obd_export *exp, __u16 ent_size; size_t left_bytes; int rc = 0; + ENTRY; /* Allocate a page and read entries from all of stripes and fill - * the page by hash order */ + * the page by hash order + */ page = alloc_page(GFP_KERNEL); if (!page) RETURN(-ENOMEM); @@ -2648,13 +3179,13 @@ static int lmv_striped_read_page(struct obd_export *exp, last_ent = ent; /* initalize dir read context */ - stripe_count = op_data->op_mea1->lsm_md_stripe_count; + stripe_count = op_data->op_lso1->lso_lsm.lsm_md_stripe_count; OBD_ALLOC(ctxt, offsetof(typeof(*ctxt), ldc_stripes[stripe_count])); if (!ctxt) GOTO(free_page, rc = -ENOMEM); ctxt->ldc_lmv = &exp->exp_obd->u.lmv; ctxt->ldc_op_data = op_data; - ctxt->ldc_cb_op = cb_op; + ctxt->ldc_mrinfo = mrinfo; ctxt->ldc_hash = offset; ctxt->ldc_count = stripe_count; @@ -2671,7 +3202,8 @@ static int lmv_striped_read_page(struct obd_export *exp, ent_size = le16_to_cpu(next->lde_reclen); /* the last entry lde_reclen is 0, but it might not be the last - * one of this temporay dir page */ + * one of this temporay dir page + */ if (!ent_size) ent_size = lu_dirent_calc_size( le16_to_cpu(next->lde_namelen), @@ -2683,7 +3215,8 @@ static int lmv_striped_read_page(struct obd_export *exp, memcpy(ent, next, ent_size); /* Replace . with master FID and Replace .. with the parent FID - * of master object */ + * of master object + */ if (strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) == 0 && le16_to_cpu(ent->lde_namelen) == 1) @@ -2726,9 +3259,9 @@ free_page: return rc; } -int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, __u64 offset, - struct page **ppage) +static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_readdir_info *mrinfo, __u64 offset, + struct page **ppage) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2737,11 +3270,11 @@ int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, ENTRY; - if (unlikely(lmv_dir_foreign(op_data->op_mea1))) + if (unlikely(lmv_dir_foreign(op_data->op_lso1))) RETURN(-ENODATA); - if (unlikely(lmv_dir_striped(op_data->op_mea1))) { - rc = lmv_striped_read_page(exp, op_data, cb_op, offset, ppage); + if (unlikely(lmv_dir_striped(op_data->op_lso1))) { + rc = lmv_striped_read_page(exp, op_data, mrinfo, offset, ppage); RETURN(rc); } @@ -2749,7 +3282,7 @@ int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage); + rc = md_read_page(tgt->ltd_exp, op_data, mrinfo, offset, ppage); RETURN(rc); } @@ -2792,7 +3325,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); - op_data->op_cap = cfs_curproc_cap_pack(); + op_data->op_cap = current_cap(); retry: parent_tgt = lmv_locate_tgt(lmv, op_data); @@ -2832,7 +3365,7 @@ retry: rc = md_unlink(tgt->ltd_exp, op_data, request); if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) { - ptlrpc_req_finished(*request); + ptlrpc_req_put(*request); *request = NULL; goto retry; } @@ -2850,7 +3383,7 @@ retry: /* This is a remote object, try remote MDT. */ op_data->op_fid2 = body->mbo_fid1; - ptlrpc_req_finished(*request); + ptlrpc_req_put(*request); *request = NULL; tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); @@ -2864,9 +3397,6 @@ static int lmv_precleanup(struct obd_device *obd) { ENTRY; libcfs_kkuc_group_rem(&obd->obd_uuid, 0, KUC_GRP_HSM); - fld_client_debugfs_fini(&obd->u.lmv.lmv_fld); - lprocfs_obd_cleanup(obd); - lprocfs_free_md_stats(obd); RETURN(0); } @@ -2928,7 +3458,7 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, exp->exp_connect_data = *(struct obd_connect_data *)val; RETURN(rc); } else if (KEY_IS(KEY_TGT_COUNT)) { - *((int *)val) = lmv->lmv_mdt_descs.ltd_lmv_desc.ld_tgt_count; + *((int *)val) = lmv->lmv_mdt_descs.ltd_tgts_size; RETURN(0); } @@ -2939,9 +3469,9 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa, int *__rcs, struct ptlrpc_request_set *_set) { - struct obd_device *obddev = class_exp2obd(exp); + struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request_set *set = _set; - struct lmv_obd *lmv = &obddev->u.lmv; + struct lmv_obd *lmv = &obd->u.lmv; int tgt_count = lmv->lmv_mdt_count; struct lu_tgt_desc *tgt; struct fid_array *fat, **fas = NULL; @@ -2954,10 +3484,10 @@ static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa, } /* split FIDs by targets */ - OBD_ALLOC(fas, sizeof(fas) * tgt_count); + OBD_ALLOC_PTR_ARRAY(fas, tgt_count); if (fas == NULL) GOTO(out, rc = -ENOMEM); - OBD_ALLOC(rcs, sizeof(int *) * tgt_count); + OBD_ALLOC_PTR_ARRAY(rcs, tgt_count); if (rcs == NULL) GOTO(out_fas, rc = -ENOMEM); @@ -2977,7 +3507,7 @@ static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa, if (!fas[idx]) GOTO(out, rc = -ENOMEM); if (!rcs[idx]) - OBD_ALLOC(rcs[idx], sizeof(int) * fa->fa_nr); + OBD_ALLOC_PTR_ARRAY(rcs[idx], fa->fa_nr); if (!rcs[idx]) GOTO(out, rc = -ENOMEM); @@ -2995,6 +3525,7 @@ static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa, rc = ptlrpc_set_wait(NULL, set); if (rc == 0) { int j = 0; + for (i = 0; i < tgt_count; i++) { fat = fas[i]; if (!fat || fat->fa_nr == 0) @@ -3016,13 +3547,13 @@ out: OBD_FREE(fas[i], offsetof(struct fid_array, fa_fids[fa->fa_nr])); if (rcs && rcs[i]) - OBD_FREE(rcs[i], sizeof(int) * fa->fa_nr); + OBD_FREE_PTR_ARRAY(rcs[i], fa->fa_nr); } if (rcs) - OBD_FREE(rcs, sizeof(int *) * tgt_count); + OBD_FREE_PTR_ARRAY(rcs, tgt_count); out_fas: if (fas) - OBD_FREE(fas, sizeof(fas) * tgt_count); + OBD_FREE_PTR_ARRAY(fas, tgt_count); RETURN(rc); } @@ -3043,14 +3574,15 @@ out_fas: * \retval 0 on success * \retval negative negated errno on failure */ -int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, - __u32 keylen, void *key, __u32 vallen, void *val, - struct ptlrpc_request_set *set) +static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, + __u32 keylen, void *key, __u32 vallen, void *val, + struct ptlrpc_request_set *set) { - struct lmv_tgt_desc *tgt = NULL; - struct obd_device *obd; - struct lmv_obd *lmv; + struct lmv_tgt_desc *tgt; + struct obd_device *obd; + struct lmv_obd *lmv; int rc = 0; + ENTRY; obd = class_exp2obd(exp); @@ -3086,27 +3618,29 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, int cplen; int i; int rc = 0; + ENTRY; lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic); lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index); - if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE)) - lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN; + if (CFS_FAIL_CHECK(OBD_FAIL_LMV_UNKNOWN_STRIPE)) + lsm->lsm_md_hash_type = cfs_fail_val ?: LMV_HASH_TYPE_UNKNOWN; else lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version); lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset); lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash); - cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, + cplen = strscpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, sizeof(lsm->lsm_md_pool_name)); - if (cplen >= sizeof(lsm->lsm_md_pool_name)) - RETURN(-E2BIG); + if (cplen < 0) + RETURN(cplen); - CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %#x " - "layout_version %d\n", lsm->lsm_md_stripe_count, - lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type, + CDEBUG(D_INFO, "unpack lsm count %d/%d, master %d hash_type %#x/%#x layout_version %d\n", + lsm->lsm_md_stripe_count, + lsm->lsm_md_migrate_offset, lsm->lsm_md_master_mdt_index, + lsm->lsm_md_hash_type, lsm->lsm_md_migrate_hash, lsm->lsm_md_layout_version); stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); @@ -3144,125 +3678,196 @@ static inline int lmv_unpack_user_md(struct obd_export *exp, lsm->lsm_md_stripe_count = le32_to_cpu(lmu->lum_stripe_count); lsm->lsm_md_master_mdt_index = le32_to_cpu(lmu->lum_stripe_offset); lsm->lsm_md_hash_type = le32_to_cpu(lmu->lum_hash_type); + lsm->lsm_md_max_inherit = lmu->lum_max_inherit; + lsm->lsm_md_max_inherit_rr = lmu->lum_max_inherit_rr; + lsm->lsm_md_pool_name[LOV_MAXPOOLNAME] = 0; return 0; } -static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp, - const union lmv_mds_md *lmm, size_t lmm_size) +struct lmv_stripe_object *lmv_stripe_object_alloc(__u32 magic, + const union lmv_mds_md *lmm, + size_t lmm_size) { - struct lmv_stripe_md *lsm; - int lsm_size; - int rc; - bool allocated = false; - ENTRY; + struct lmv_stripe_object *lsm_obj; + int size; - LASSERT(lsmp != NULL); + if (magic == LMV_MAGIC_FOREIGN) { + struct lmv_foreign_md *lfm; - lsm = *lsmp; - /* Free memmd */ - if (lsm != NULL && lmm == NULL) { - int i; - struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm; + size = offsetof(typeof(*lfm), lfm_value[0]); + if (lmm_size < size) + RETURN(ERR_PTR(-EPROTO)); - if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { - size_t lfm_size; + size += le32_to_cpu(lmm->lmv_foreign_md.lfm_length); + if (lmm_size < size) + RETURN(ERR_PTR(-EPROTO)); - lfm_size = lfm->lfm_length + offsetof(typeof(*lfm), - lfm_value[0]); - OBD_FREE_LARGE(lfm, lfm_size); - RETURN(0); - } + OBD_ALLOC_LARGE(lsm_obj, lmm_size + + offsetof(typeof(*lsm_obj), lso_lfm)); + } else { + if (magic == LMV_MAGIC_V1) { + int count; - if (lmv_dir_striped(lsm)) { - for (i = 0; i < lsm->lsm_md_stripe_count; i++) { - if (lsm->lsm_md_oinfo[i].lmo_root) - iput(lsm->lsm_md_oinfo[i].lmo_root); - } - lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count); + size = offsetof(struct lmv_mds_md_v1, + lmv_stripe_fids[0]); + if (lmm_size < size) + RETURN(ERR_PTR(-EPROTO)); + + count = lmv_mds_md_stripe_count_get(lmm); + size += count * sizeof(struct lu_fid); + if (lmm_size < size) + RETURN(ERR_PTR(-EPROTO)); + + size = lmv_stripe_md_size(count); } else { - lsm_size = lmv_stripe_md_size(0); + if (lmm && lmm_size < sizeof(struct lmv_user_md)) + RETURN(ERR_PTR(-EPROTO)); + + /** + * Unpack default dirstripe(lmv_user_md) to + * lmv_stripe_md, stripecount should be 0 then. + */ + size = lmv_stripe_md_size(0); } - OBD_FREE(lsm, lsm_size); - *lsmp = NULL; - RETURN(0); + size += offsetof(typeof(*lsm_obj), lso_lsm); + OBD_ALLOC(lsm_obj, size); } - /* foreign lmv case */ - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_FOREIGN) { - struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm; - - if (lfm == NULL) { - OBD_ALLOC_LARGE(lfm, lmm_size); - if (lfm == NULL) - RETURN(-ENOMEM); - *lsmp = (struct lmv_stripe_md *)lfm; - } - lfm->lfm_magic = le32_to_cpu(lmm->lmv_foreign_md.lfm_magic); - lfm->lfm_length = le32_to_cpu(lmm->lmv_foreign_md.lfm_length); - lfm->lfm_type = le32_to_cpu(lmm->lmv_foreign_md.lfm_type); - lfm->lfm_flags = le32_to_cpu(lmm->lmv_foreign_md.lfm_flags); - memcpy(&lfm->lfm_value, &lmm->lmv_foreign_md.lfm_value, - lfm->lfm_length); - RETURN(lmm_size); + if (lsm_obj) { + atomic_set(&lsm_obj->lso_refs, 1); + RETURN(lsm_obj); } - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) + RETURN(ERR_PTR(-ENOMEM)); +} +EXPORT_SYMBOL(lmv_stripe_object_alloc); + +static int lmv_stripe_object_create(struct obd_export *exp, + struct lmv_stripe_object **lsop, + const union lmv_mds_md *lmm, + size_t lmm_size) +{ + struct lmv_stripe_object *lsm_obj; + __u32 magic; + int rc; + + ENTRY; + + LASSERT(lsop != NULL && *lsop == NULL); + + if (lmm_size == 0) + RETURN(-EPROTO); + + magic = le32_to_cpu(lmm->lmv_magic); + if (magic == LMV_MAGIC_STRIPE) RETURN(-EPERM); - /* Unpack memmd */ - if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 && - le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) { + if (magic != LMV_MAGIC_V1 && magic != LMV_USER_MAGIC && + magic != LMV_MAGIC_FOREIGN) { CERROR("%s: invalid lmv magic %x: rc = %d\n", - exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic), - -EIO); + exp->exp_obd->obd_name, magic, -EIO); RETURN(-EIO); } - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1) - lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm)); - else - /** - * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md, - * stripecount should be 0 then. - */ - lsm_size = lmv_stripe_md_size(0); + /* foreign lmv case */ + if (magic == LMV_MAGIC_FOREIGN) { + struct lmv_foreign_md *lfm; - if (lsm == NULL) { - OBD_ALLOC(lsm, lsm_size); - if (lsm == NULL) - RETURN(-ENOMEM); - allocated = true; - *lsmp = lsm; + lsm_obj = lmv_stripe_object_alloc(magic, lmm, lmm_size); + if (IS_ERR(lsm_obj)) + RETURN(PTR_ERR(lsm_obj)); + + *lsop = lsm_obj; + lfm = &lsm_obj->lso_lfm; + lfm->lfm_magic = magic; + lfm->lfm_length = le32_to_cpu(lmm->lmv_foreign_md.lfm_length); + lfm->lfm_type = le32_to_cpu(lmm->lmv_foreign_md.lfm_type); + lfm->lfm_flags = le32_to_cpu(lmm->lmv_foreign_md.lfm_flags); + memcpy(&lfm->lfm_value, &lmm->lmv_foreign_md.lfm_value, + lfm->lfm_length); + RETURN(0); } - switch (le32_to_cpu(lmm->lmv_magic)) { + /* Unpack memmd */ + lsm_obj = lmv_stripe_object_alloc(magic, lmm, lmm_size); + if (IS_ERR(lsm_obj)) + RETURN(PTR_ERR(lsm_obj)); + + switch (magic) { case LMV_MAGIC_V1: - rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1); + rc = lmv_unpack_md_v1(exp, &lsm_obj->lso_lsm, &lmm->lmv_md_v1); break; case LMV_USER_MAGIC: - rc = lmv_unpack_user_md(exp, lsm, &lmm->lmv_user_md); + rc = lmv_unpack_user_md(exp, &lsm_obj->lso_lsm, + &lmm->lmv_user_md); break; default: CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name, - le32_to_cpu(lmm->lmv_magic)); + magic); rc = -EINVAL; break; } - if (rc != 0 && allocated) { - OBD_FREE(lsm, lsm_size); - *lsmp = NULL; - lsm_size = rc; - } - RETURN(lsm_size); + if (rc != 0) + lmv_stripe_object_put(&lsm_obj); + + *lsop = lsm_obj; + RETURN(rc); } -void lmv_free_memmd(struct lmv_stripe_md *lsm) +struct lmv_stripe_object * +lmv_stripe_object_get(struct lmv_stripe_object *lsm_obj) { - lmv_unpackmd(NULL, &lsm, NULL, 0); + if (lsm_obj == NULL) + return NULL; + + atomic_inc(&lsm_obj->lso_refs); + CDEBUG(D_INODE, "get %p %u\n", lsm_obj, + atomic_read(&lsm_obj->lso_refs)); + return lsm_obj; } -EXPORT_SYMBOL(lmv_free_memmd); +EXPORT_SYMBOL(lmv_stripe_object_get); + +void lmv_stripe_object_put(struct lmv_stripe_object **lsop) +{ + struct lmv_stripe_object *lsm_obj; + size_t size; + int i; + + LASSERT(lsop != NULL); + + lsm_obj = *lsop; + if (lsm_obj == NULL) + return; + + *lsop = NULL; + CDEBUG(D_INODE, "put %p %u\n", lsm_obj, + atomic_read(&lsm_obj->lso_refs) - 1); + + if (!atomic_dec_and_test(&lsm_obj->lso_refs)) + return; + + if (lmv_dir_foreign(lsm_obj)) { + size = lsm_obj->lso_lfm.lfm_length + + offsetof(typeof(lsm_obj->lso_lfm), lfm_value[0]) + + offsetof(typeof(*lsm_obj), lso_lsm); + OBD_FREE_LARGE(lsm_obj, size); + return; + } + + if (lmv_dir_striped(lsm_obj)) { + struct lmv_stripe_md *lsm = &lsm_obj->lso_lsm; + + for (i = 0; i < lsm->lsm_md_stripe_count; i++) + iput(lsm->lsm_md_oinfo[i].lmo_root); + size = lmv_stripe_md_size(lsm->lsm_md_stripe_count); + } else { + size = lmv_stripe_md_size(0); + } + OBD_FREE(lsm_obj, size + offsetof(typeof(*lsm_obj), lso_lsm)); +} +EXPORT_SYMBOL(lmv_stripe_object_put); static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, union ldlm_policy_data *policy, @@ -3306,53 +3911,61 @@ static int lmv_set_lock_data(struct obd_export *exp, RETURN(rc); } -enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags, - const struct lu_fid *fid, enum ldlm_type type, - union ldlm_policy_data *policy, - enum ldlm_mode mode, struct lustre_handle *lockh) +static enum ldlm_mode +lmv_lock_match(struct obd_export *exp, __u64 flags, + const struct lu_fid *fid, enum ldlm_type type, + union ldlm_policy_data *policy, + enum ldlm_mode mode, struct lustre_handle *lockh) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - enum ldlm_mode rc; struct lu_tgt_desc *tgt; - int i; + __u64 bits = policy->l_inodebits.bits; + enum ldlm_mode rc = LCK_MINMODE; int index; + int i; - ENTRY; - - CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); - - /* - * With DNE every object can have two locks in different namespaces: + /* only one bit is set */ + LASSERT(bits && !(bits & (bits - 1))); + /* With DNE every object can have two locks in different namespaces: * lookup lock in space of MDT storing direntry and update/open lock in * space of MDT storing inode. Try the MDT that the FID maps to first, * since this can be easily found, and only try others if that fails. */ - for (i = 0, index = lmv_fid2tgt_index(lmv, fid); - i < lmv->lmv_mdt_descs.ltd_tgts_size; - i++, index = (index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size) { - if (index < 0) { - CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n", - obd->obd_name, PFID(fid), index); - index = 0; + if (bits == MDS_INODELOCK_LOOKUP) { + for (i = 0, index = lmv_fid2tgt_index(lmv, fid); + i < lmv->lmv_mdt_descs.ltd_tgts_size; i++, + index = (index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size) { + if (index < 0) { + CDEBUG(D_HA, + "%s: "DFID" is inaccessible: rc = %d\n", + obd->obd_name, PFID(fid), index); + index = 0; + } + tgt = lmv_tgt(lmv, index); + if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) + continue; + rc = md_lock_match(tgt->ltd_exp, flags, fid, type, + policy, mode, lockh); + if (rc) + break; } - - tgt = lmv_tgt(lmv, index); - if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) - continue; - - rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode, - lockh); - if (rc) - RETURN(rc); + } else { + tgt = lmv_fid2tgt(lmv, fid); + if (!IS_ERR(tgt) && tgt->ltd_exp && tgt->ltd_active) + rc = md_lock_match(tgt->ltd_exp, flags, fid, type, + policy, mode, lockh); } - RETURN(0); + CDEBUG(D_INODE, "Lock match for "DFID": %d\n", PFID(fid), rc); + + return rc; } -int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, - struct obd_export *dt_exp, struct obd_export *md_exp, - struct lustre_md *md) +static int +lmv_get_lustre_md(struct obd_export *exp, struct req_capsule *pill, + struct obd_export *dt_exp, struct obd_export *md_exp, + struct lustre_md *md) { struct lmv_obd *lmv = &exp->exp_obd->u.lmv; struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0); @@ -3360,10 +3973,10 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, if (!tgt || !tgt->ltd_exp) return -EINVAL; - return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md); + return md_get_lustre_md(tgt->ltd_exp, pill, dt_exp, md_exp, md); } -int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) +static int lmv_put_lustre_md(struct obd_export *exp, struct lustre_md *md) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -3371,22 +3984,17 @@ int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) ENTRY; - if (md->default_lmv) { - lmv_free_memmd(md->default_lmv); - md->default_lmv = NULL; - } - if (md->lmv != NULL) { - lmv_free_memmd(md->lmv); - md->lmv = NULL; - } + lmv_stripe_object_put(&md->def_lsm_obj); + lmv_stripe_object_put(&md->lsm_obj); + if (!tgt || !tgt->ltd_exp) RETURN(-EINVAL); - RETURN(md_free_lustre_md(tgt->ltd_exp, md)); + RETURN(0); } -int lmv_set_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och, - struct lookup_intent *it) +static int lmv_set_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och, + struct lookup_intent *it) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -3401,8 +4009,8 @@ int lmv_set_open_replay_data(struct obd_export *exp, RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it)); } -int lmv_clear_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och) +static int lmv_clear_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -3417,10 +4025,10 @@ int lmv_clear_open_replay_data(struct obd_export *exp, RETURN(md_clear_open_replay_data(tgt->ltd_exp, och)); } -int lmv_intent_getattr_async(struct obd_export *exp, - struct md_enqueue_info *minfo) +static int lmv_intent_getattr_async(struct obd_export *exp, + struct md_op_item *item) { - struct md_op_data *op_data = &minfo->mi_data; + struct md_op_data *op_data = &item->mop_data; struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *ptgt; @@ -3429,31 +4037,39 @@ int lmv_intent_getattr_async(struct obd_export *exp, ENTRY; - if (!fid_is_sane(&op_data->op_fid2)) + if (!(fid_is_sane(&op_data->op_fid2) || + fid_is_zero(&op_data->op_fid2))) RETURN(-EINVAL); ptgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(ptgt)) RETURN(PTR_ERR(ptgt)); - ctgt = lmv_fid2tgt(lmv, &op_data->op_fid1); - if (IS_ERR(ctgt)) - RETURN(PTR_ERR(ctgt)); - /* - * remote object needs two RPCs to lookup and getattr, considering the - * complexity don't support statahead for now. + * Zeroed FID @op_fid2 means that the intent getattr() comes from + * statahead by regularized file names. Currently only do statahead + * for the children files located same as the parent directory. */ - if (ctgt != ptgt) - RETURN(-EREMOTE); + if (!fid_is_zero(&op_data->op_fid2)) { + ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2); + if (IS_ERR(ctgt)) + RETURN(PTR_ERR(ctgt)); - rc = md_intent_getattr_async(ptgt->ltd_exp, minfo); + /* + * remote object needs two RPCs to lookup and getattr, + * considering the complexity don't support statahead for now. + */ + if (ctgt != ptgt) + RETURN(-EREMOTE); + } + + rc = md_intent_getattr_async(ptgt->ltd_exp, item); RETURN(rc); } -int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid, __u64 *bits) +static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, + struct lu_fid *fid, __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -3470,15 +4086,15 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, RETURN(rc); } -int lmv_get_fid_from_lsm(struct obd_export *exp, - const struct lmv_stripe_md *lsm, - const char *name, int namelen, struct lu_fid *fid) +static int lmv_get_fid_from_lsm(struct obd_export *exp, + const struct lmv_stripe_object *lso, + const char *name, int namelen, + struct lu_fid *fid) { const struct lmv_oinfo *oinfo; - LASSERT(lmv_dir_striped(lsm)); - - oinfo = lsm_name_to_stripe_info(lsm, name, namelen, false); + LASSERT(lmv_dir_striped(lso)); + oinfo = lsm_name_to_stripe_info(lso, name, namelen, false); if (IS_ERR(oinfo)) return PTR_ERR(oinfo); @@ -3492,8 +4108,8 @@ int lmv_get_fid_from_lsm(struct obd_export *exp, * process with other slave MDTs. The only exception is Q_GETOQUOTA for which * we directly fetch data from the slave MDTs. */ -int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, - struct obd_quotactl *oqctl) +static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -3508,6 +4124,30 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, RETURN(-EIO); } + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA || + oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) { + struct list_head *lst = (struct list_head *)oqctl->qc_iter_list; + int err; + + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA) + RETURN(obd_quota_iter(tgt->ltd_exp, oqctl, lst)); + + lmv_foreach_connected_tgt(lmv, tgt) { + if (!tgt->ltd_active) + continue; + + err = obd_quota_iter(tgt->ltd_exp, oqctl, lst); + if (err) { + CERROR("%s: getquota failed mdt %d: rc = %d\n", + obd->obd_name, tgt->ltd_index, err); + if (!rc) + rc = err; + } + } + + RETURN(rc); + } + if (oqctl->qc_cmd != Q_GETOQUOTA) { rc = obd_quotactl(tgt->ltd_exp, oqctl); RETURN(rc); @@ -3538,14 +4178,15 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, } static int lmv_merge_attr(struct obd_export *exp, - const struct lmv_stripe_md *lsm, + const struct lmv_stripe_object *lso, struct cl_attr *attr, ldlm_blocking_callback cb_blocking) { + const struct lmv_stripe_md *lsm = &lso->lso_lsm; int rc; int i; - if (!lmv_dir_striped(lsm)) + if (!lmv_dir_striped(lso)) return 0; rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0); @@ -3562,8 +4203,9 @@ static int lmv_merge_attr(struct obd_export *exp, "" DFID " size %llu, blocks %llu nlink %u, atime %lld ctime %lld, mtime %lld.\n", PFID(&lsm->lsm_md_oinfo[i].lmo_fid), i_size_read(inode), (unsigned long long)inode->i_blocks, - inode->i_nlink, (s64)inode->i_atime.tv_sec, - (s64)inode->i_ctime.tv_sec, (s64)inode->i_mtime.tv_sec); + inode->i_nlink, (s64)inode_get_atime_sec(inode), + (s64)inode_get_ctime_sec(inode), + (s64)inode_get_mtime_sec(inode)); /* for slave stripe, it needs to subtract nlink for . and .. */ if (i != 0) @@ -3574,73 +4216,299 @@ static int lmv_merge_attr(struct obd_export *exp, attr->cat_size += i_size_read(inode); attr->cat_blocks += inode->i_blocks; - if (attr->cat_atime < inode->i_atime.tv_sec) - attr->cat_atime = inode->i_atime.tv_sec; + if (attr->cat_atime < inode_get_atime_sec(inode)) + attr->cat_atime = inode_get_atime_sec(inode); - if (attr->cat_ctime < inode->i_ctime.tv_sec) - attr->cat_ctime = inode->i_ctime.tv_sec; + if (attr->cat_ctime < inode_get_ctime_sec(inode)) + attr->cat_ctime = inode_get_ctime_sec(inode); - if (attr->cat_mtime < inode->i_mtime.tv_sec) - attr->cat_mtime = inode->i_mtime.tv_sec; + if (attr->cat_mtime < inode_get_mtime_sec(inode)) + attr->cat_mtime = inode_get_mtime_sec(inode); } return 0; } +static struct lu_batch *lmv_batch_create(struct obd_export *exp, + enum lu_batch_flags flags, + __u32 max_count) +{ + struct lu_batch *bh; + struct lmv_batch *lbh; + + ENTRY; + OBD_ALLOC_PTR(lbh); + if (!lbh) + RETURN(ERR_PTR(-ENOMEM)); + + bh = &lbh->lbh_super; + bh->lbt_flags = flags; + bh->lbt_max_count = max_count; + + if (flags & BATCH_FL_RQSET) { + bh->lbt_rqset = ptlrpc_prep_set(); + if (bh->lbt_rqset == NULL) { + OBD_FREE_PTR(lbh); + RETURN(ERR_PTR(-ENOMEM)); + } + } + + INIT_LIST_HEAD(&lbh->lbh_sub_batch_list); + RETURN(bh); +} + +static int lmv_batch_stop(struct obd_export *exp, struct lu_batch *bh) +{ + struct lmv_batch *lbh; + struct lmvsub_batch *sub; + struct lmvsub_batch *tmp; + int rc = 0; + + ENTRY; + + lbh = container_of(bh, struct lmv_batch, lbh_super); + list_for_each_entry_safe(sub, tmp, &lbh->lbh_sub_batch_list, + sbh_sub_item) { + list_del(&sub->sbh_sub_item); + rc = md_batch_stop(sub->sbh_tgt->ltd_exp, sub->sbh_sub); + if (rc < 0) { + CERROR("%s: stop batch processing failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + if (bh->lbt_result == 0) + bh->lbt_result = rc; + } + OBD_FREE_PTR(sub); + } + + if (bh->lbt_flags & BATCH_FL_RQSET) { + rc = ptlrpc_set_wait(NULL, bh->lbt_rqset); + ptlrpc_set_destroy(bh->lbt_rqset); + } + + OBD_FREE_PTR(lbh); + RETURN(rc); +} + +static int lmv_batch_flush(struct obd_export *exp, struct lu_batch *bh, + bool wait) +{ + struct lmv_batch *lbh; + struct lmvsub_batch *sub; + int rc = 0; + int rc1; + + ENTRY; + + lbh = container_of(bh, struct lmv_batch, lbh_super); + list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) { + rc1 = md_batch_flush(sub->sbh_tgt->ltd_exp, sub->sbh_sub, wait); + if (rc1 < 0) { + CERROR("%s: stop batch processing failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + if (bh->lbt_result == 0) + bh->lbt_result = rc; + + if (rc == 0) + rc = rc1; + } + } + + if (wait && bh->lbt_flags & BATCH_FL_RQSET) { + rc1 = ptlrpc_set_wait(NULL, bh->lbt_rqset); + if (rc == 0) + rc = rc1; + } + + RETURN(rc); +} + +static inline struct lmv_tgt_desc * +lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item) +{ + struct md_op_data *op_data = &item->mop_data; + struct lmv_tgt_desc *tgt; + + switch (item->mop_opc) { + case MD_OP_GETATTR: { + struct lmv_tgt_desc *ptgt; + + if (!(fid_is_sane(&op_data->op_fid2) || + fid_is_zero(&op_data->op_fid2))) + RETURN(ERR_PTR(-EINVAL)); + + ptgt = lmv_locate_tgt(lmv, op_data); + if (IS_ERR(ptgt)) + RETURN(ptgt); + + /* + * Zeroed @op_fid2 means that it is a statahead populating call + * in the file name pattern which is using file name format to + * prefetch the attributes. Thus it has no idea about the FID of + * the children file. The children file is considered to be + * located on the same storage target with the parent directory + * or the stripped directory. + */ + if (fid_is_zero(&op_data->op_fid2)) { + tgt = ptgt; + break; + } + + tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); + if (IS_ERR(tgt)) + RETURN(tgt); + + /* + * Remote object needs two RPCs to lookup and getattr, + * considering the complexity don't support statahead for now. + */ + if (tgt != ptgt) + RETURN(ERR_PTR(-EREMOTE)); + + break; + } + default: + tgt = ERR_PTR(-ENOTSUPP); + } + + return tgt; +} + +static struct lu_batch *lmv_batch_lookup_sub(struct lmv_batch *lbh, + struct lmv_tgt_desc *tgt) +{ + struct lmvsub_batch *sub; + + list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) { + if (sub->sbh_tgt == tgt) + return sub->sbh_sub; + } + + return NULL; +} + +static struct lu_batch *lmv_batch_get_sub(struct lmv_batch *lbh, + struct lmv_tgt_desc *tgt) +{ + struct lmvsub_batch *sbh; + struct lu_batch *child_bh; + struct lu_batch *bh; + + ENTRY; + + child_bh = lmv_batch_lookup_sub(lbh, tgt); + if (child_bh != NULL) + RETURN(child_bh); + + OBD_ALLOC_PTR(sbh); + if (sbh == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + INIT_LIST_HEAD(&sbh->sbh_sub_item); + sbh->sbh_tgt = tgt; + + bh = &lbh->lbh_super; + child_bh = md_batch_create(tgt->ltd_exp, bh->lbt_flags, + bh->lbt_max_count); + if (IS_ERR(child_bh)) { + OBD_FREE_PTR(sbh); + RETURN(child_bh); + } + + child_bh->lbt_rqset = bh->lbt_rqset; + sbh->sbh_sub = child_bh; + list_add(&sbh->sbh_sub_item, &lbh->lbh_sub_batch_list); + RETURN(child_bh); +} + +static int lmv_batch_add(struct obd_export *exp, struct lu_batch *bh, + struct md_op_item *item) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + struct lmv_batch *lbh; + struct lu_batch *child_bh; + int rc; + + ENTRY; + + tgt = lmv_batch_locate_tgt(lmv, item); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + lbh = container_of(bh, struct lmv_batch, lbh_super); + child_bh = lmv_batch_get_sub(lbh, tgt); + if (IS_ERR(child_bh)) + RETURN(PTR_ERR(child_bh)); + + rc = md_batch_add(tgt->ltd_exp, child_bh, item); + RETURN(rc); +} + static const struct obd_ops lmv_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = lmv_setup, - .o_cleanup = lmv_cleanup, - .o_precleanup = lmv_precleanup, - .o_process_config = lmv_process_config, - .o_connect = lmv_connect, - .o_disconnect = lmv_disconnect, - .o_statfs = lmv_statfs, - .o_get_info = lmv_get_info, - .o_set_info_async = lmv_set_info_async, - .o_notify = lmv_notify, - .o_get_uuid = lmv_get_uuid, + .o_owner = THIS_MODULE, + .o_setup = lmv_setup, + .o_cleanup = lmv_cleanup, + .o_precleanup = lmv_precleanup, + .o_process_config = lmv_process_config, + .o_connect = lmv_connect, + .o_disconnect = lmv_disconnect, + .o_statfs = lmv_statfs, + .o_get_info = lmv_get_info, + .o_set_info_async = lmv_set_info_async, + .o_notify = lmv_notify, + .o_get_uuid = lmv_get_uuid, .o_fid_alloc = lmv_fid_alloc, - .o_iocontrol = lmv_iocontrol, - .o_quotactl = lmv_quotactl + .o_iocontrol = lmv_iocontrol, + .o_quotactl = lmv_quotactl }; static const struct md_ops lmv_md_ops = { .m_get_root = lmv_get_root, - .m_null_inode = lmv_null_inode, - .m_close = lmv_close, - .m_create = lmv_create, - .m_enqueue = lmv_enqueue, - .m_getattr = lmv_getattr, - .m_getxattr = lmv_getxattr, - .m_getattr_name = lmv_getattr_name, - .m_intent_lock = lmv_intent_lock, - .m_link = lmv_link, - .m_rename = lmv_rename, - .m_setattr = lmv_setattr, - .m_setxattr = lmv_setxattr, + .m_null_inode = lmv_null_inode, + .m_close = lmv_close, + .m_create = lmv_create, + .m_enqueue = lmv_enqueue, + .m_getattr = lmv_getattr, + .m_getxattr = lmv_getxattr, + .m_getattr_name = lmv_getattr_name, + .m_intent_lock = lmv_intent_lock, + .m_link = lmv_link, + .m_rename = lmv_rename, + .m_setattr = lmv_setattr, + .m_setxattr = lmv_setxattr, .m_fsync = lmv_fsync, .m_file_resync = lmv_file_resync, .m_read_page = lmv_read_page, - .m_unlink = lmv_unlink, - .m_init_ea_size = lmv_init_ea_size, - .m_cancel_unused = lmv_cancel_unused, - .m_set_lock_data = lmv_set_lock_data, - .m_lock_match = lmv_lock_match, + .m_unlink = lmv_unlink, + .m_init_ea_size = lmv_init_ea_size, + .m_cancel_unused = lmv_cancel_unused, + .m_set_lock_data = lmv_set_lock_data, + .m_lock_match = lmv_lock_match, .m_get_lustre_md = lmv_get_lustre_md, - .m_free_lustre_md = lmv_free_lustre_md, + .m_put_lustre_md = lmv_put_lustre_md, .m_merge_attr = lmv_merge_attr, - .m_set_open_replay_data = lmv_set_open_replay_data, - .m_clear_open_replay_data = lmv_clear_open_replay_data, - .m_intent_getattr_async = lmv_intent_getattr_async, + .m_set_open_replay_data = lmv_set_open_replay_data, + .m_clear_open_replay_data = lmv_clear_open_replay_data, + .m_intent_getattr_async = lmv_intent_getattr_async, .m_revalidate_lock = lmv_revalidate_lock, .m_get_fid_from_lsm = lmv_get_fid_from_lsm, - .m_unpackmd = lmv_unpackmd, + .m_stripe_object_create = lmv_stripe_object_create, .m_rmfid = lmv_rmfid, + .m_batch_create = lmv_batch_create, + .m_batch_add = lmv_batch_add, + .m_batch_stop = lmv_batch_stop, + .m_batch_flush = lmv_batch_flush, }; static int __init lmv_init(void) { - return class_register_type(&lmv_obd_ops, &lmv_md_ops, true, NULL, + int rc; + + rc = libcfs_setup(); + if (rc) + return rc; + + return class_register_type(&lmv_obd_ops, &lmv_md_ops, true, LUSTRE_LMV_NAME, NULL); }