X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=c3a93bfd0b3d86ea6e11e244a5fe4c9942e936ed;hb=848f9e20320cb7c01eaf7f1b5c27f5efd54e4818;hp=8a79475964bb0804be7145dfa05757321ad9521e;hpb=c42b426c87c3d3b1dc9eda612cc831293dc80d68;p=fs%2Flustre-release.git diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 8a79475..c3a93bf 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -27,7 +27,7 @@ * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -45,6 +45,7 @@ #include #include #include +#include #else #include #endif @@ -174,8 +175,7 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched, * the same. Otherwise one of MDTs runs wrong version or * something like this. --umka */ - obd->obd_self_export->exp_connect_flags = - conn_data->ocd_connect_flags; + obd->obd_self_export->exp_connect_data = *conn_data; } #if 0 else if (ev == OBD_NOTIFY_DISCON) { @@ -345,7 +345,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) #endif struct lmv_obd *lmv = &obd->u.lmv; struct obd_uuid *cluuid = &lmv->cluuid; - struct obd_connect_data *mdc_data = NULL; struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" }; struct obd_device *mdc_obd; struct obd_export *mdc_exp; @@ -390,8 +389,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) fld_client_add_target(&lmv->lmv_fld, &target); - mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data; - rc = obd_register_observer(mdc_obd, obd); if (rc) { obd_disconnect(mdc_exp); @@ -734,6 +731,207 @@ out_local: RETURN(rc); } +static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg) +{ + struct obd_device *obddev = class_exp2obd(exp); + struct lmv_obd *lmv = &obddev->u.lmv; + struct getinfo_fid2path *gf; + struct lmv_tgt_desc *tgt; + struct getinfo_fid2path *remote_gf = NULL; + int remote_gf_size = 0; + int rc; + + gf = (struct getinfo_fid2path *)karg; + tgt = lmv_find_target(lmv, &gf->gf_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + +repeat_fid2path: + rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg); + if (rc != 0 && rc != -EREMOTE) + GOTO(out_fid2path, rc); + + /* If remote_gf != NULL, it means just building the + * path on the remote MDT, copy this path segement to gf */ + if (remote_gf != NULL) { + struct getinfo_fid2path *ori_gf; + char *ptr; + + ori_gf = (struct getinfo_fid2path *)karg; + if (strlen(ori_gf->gf_path) + + strlen(gf->gf_path) > ori_gf->gf_pathlen) + GOTO(out_fid2path, rc = -EOVERFLOW); + + ptr = ori_gf->gf_path; + + memmove(ptr + strlen(gf->gf_path) + 1, ptr, + strlen(ori_gf->gf_path)); + + strncpy(ptr, gf->gf_path, strlen(gf->gf_path)); + ptr += strlen(gf->gf_path); + *ptr = '/'; + } + + CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n", + tgt->ltd_exp->exp_obd->obd_name, + gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno, + gf->gf_linkno); + + if (rc == 0) + GOTO(out_fid2path, rc); + + /* sigh, has to go to another MDT to do path building further */ + if (remote_gf == NULL) { + remote_gf_size = sizeof(*remote_gf) + PATH_MAX; + OBD_ALLOC(remote_gf, remote_gf_size); + if (remote_gf == NULL) + GOTO(out_fid2path, rc = -ENOMEM); + remote_gf->gf_pathlen = PATH_MAX; + } + + if (!fid_is_sane(&gf->gf_fid)) { + CERROR("%s: invalid FID "DFID": rc = %d\n", + tgt->ltd_exp->exp_obd->obd_name, + PFID(&gf->gf_fid), -EINVAL); + GOTO(out_fid2path, rc = -EINVAL); + } + + tgt = lmv_find_target(lmv, &gf->gf_fid); + if (IS_ERR(tgt)) + GOTO(out_fid2path, rc = -EINVAL); + + remote_gf->gf_fid = gf->gf_fid; + remote_gf->gf_recno = -1; + remote_gf->gf_linkno = -1; + memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen); + gf = remote_gf; + goto repeat_fid2path; + +out_fid2path: + if (remote_gf != NULL) + OBD_FREE(remote_gf, remote_gf_size); + RETURN(rc); +} + +static int lmv_hsm_req_count(struct lmv_obd *lmv, + const struct hsm_user_request *hur, + const struct lmv_tgt_desc *tgt_mds) +{ + int i, nr = 0; + struct lmv_tgt_desc *curr_tgt; + + /* count how many requests must be sent to the given target */ + for (i = 0; i < hur->hur_request.hr_itemcount; i++) { + curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid); + if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) + nr++; + } + return nr; +} + +static void lmv_hsm_req_build(struct lmv_obd *lmv, + struct hsm_user_request *hur_in, + const struct lmv_tgt_desc *tgt_mds, + struct hsm_user_request *hur_out) +{ + int i, nr_out; + struct lmv_tgt_desc *curr_tgt; + + /* build the hsm_user_request for the given target */ + hur_out->hur_request = hur_in->hur_request; + nr_out = 0; + for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) { + curr_tgt = lmv_find_target(lmv, + &hur_in->hur_user_item[i].hui_fid); + if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) { + hur_out->hur_user_item[nr_out] = + hur_in->hur_user_item[i]; + nr_out++; + } + } + hur_out->hur_request.hr_itemcount = nr_out; + memcpy(hur_data(hur_out), hur_data(hur_in), + hur_in->hur_request.hr_data_len); +} + +static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len, + struct lustre_kernelcomm *lk, void *uarg) +{ + int i, rc = 0; + ENTRY; + + /* unregister request (call from llapi_hsm_copytool_fini) */ + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + /* best effort: try to clean as much as possible + * (continue on error) */ + obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg); + } + + /* Whatever the result, remove copytool from kuc groups. + * Unreached coordinators will get EPIPE on next requests + * and will unregister automatically. + */ + rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group); + RETURN(rc); +} + +static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len, + struct lustre_kernelcomm *lk, void *uarg) +{ + struct file *filp; + int i, j, err; + int rc = 0; + bool any_set = false; + ENTRY; + + /* All or nothing: try to register to all MDS. + * In case of failure, unregister from previous MDS, + * except if it because of inactive target. */ + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, + len, lk, uarg); + if (err) { + if (lmv->tgts[i]->ltd_active) { + /* permanent error */ + CERROR("error: iocontrol MDC %s on MDT" + "idx %d cmd %x: err = %d\n", + lmv->tgts[i]->ltd_uuid.uuid, + i, cmd, err); + rc = err; + lk->lk_flags |= LK_FLG_STOP; + /* unregister from previous MDS */ + for (j = 0; j < i; j++) + obd_iocontrol(cmd, + lmv->tgts[j]->ltd_exp, + len, lk, uarg); + RETURN(rc); + } + /* else: transient error. + * kuc will register to the missing MDT + * when it is back */ + } else { + any_set = true; + } + } + + if (!any_set) + /* no registration done: return error */ + RETURN(-ENOTCONN); + + /* at least one registration done, with no failure */ + filp = fget(lk->lk_wfd); + if (filp == NULL) { + RETURN(-EBADF); + } + rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data); + if (rc != 0 && filp != NULL) + fput(filp); + RETURN(rc); +} + + + + static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { @@ -768,7 +966,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, RETURN(-EINVAL); /* copy UUID */ - if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), + if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd), min((int) data->ioc_plen2, (int) sizeof(struct obd_uuid)))) RETURN(-EFAULT); @@ -778,7 +976,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, 0); if (rc) RETURN(rc); - if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf, + if (copy_to_user(data->ioc_pbuf1, &stat_buf, min((int) data->ioc_plen1, (int) sizeof(stat_buf)))) RETURN(-EFAULT); @@ -854,26 +1052,110 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, break; } case OBD_IOC_FID2PATH: { - struct getinfo_fid2path *gf; - struct lmv_tgt_desc *tgt; + rc = lmv_fid2path(exp, len, karg, uarg); + break; + } + case LL_IOC_HSM_STATE_GET: + case LL_IOC_HSM_STATE_SET: + case LL_IOC_HSM_ACTION: { + struct md_op_data *op_data = karg; + struct lmv_tgt_desc *tgt; - gf = (struct getinfo_fid2path *)karg; - tgt = lmv_find_target(lmv, &gf->gf_fid); + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + if (tgt->ltd_exp == NULL) + RETURN(-EINVAL); + + rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); + break; + } + case LL_IOC_HSM_PROGRESS: { + const struct hsm_progress_kernel *hpk = karg; + struct lmv_tgt_desc *tgt; + + tgt = lmv_find_target(lmv, &hpk->hpk_fid); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); break; } - case LL_IOC_HSM_STATE_GET: - case LL_IOC_HSM_STATE_SET: { - struct md_op_data *op_data = karg; + case LL_IOC_HSM_REQUEST: { + struct hsm_user_request *hur = karg; struct lmv_tgt_desc *tgt; + unsigned int reqcount = hur->hur_request.hr_itemcount; + + if (reqcount == 0) + RETURN(0); + + /* if the request is about a single fid + * or if there is a single MDS, no need to split + * the request. */ + if (reqcount == 1 || count == 1) { + tgt = lmv_find_target(lmv, + &hur->hur_user_item[0].hui_fid); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); + } else { + /* split fid list to their respective MDS */ + for (i = 0; i < count; i++) { + unsigned int nr, reqlen; + int rc1; + struct hsm_user_request *req; - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (!tgt->ltd_exp) + nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]); + if (nr == 0) /* nothing for this MDS */ + continue; + + /* build a request with fids for this MDS */ + reqlen = offsetof(typeof(*hur), + hur_user_item[nr]) + + hur->hur_request.hr_data_len; + OBD_ALLOC_LARGE(req, reqlen); + if (req == NULL) + RETURN(-ENOMEM); + + lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req); + + rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, + reqlen, req, uarg); + if (rc1 != 0 && rc == 0) + rc = rc1; + OBD_FREE_LARGE(req, reqlen); + } + } + break; + } + case LL_IOC_LOV_SWAP_LAYOUTS: { + struct md_op_data *op_data = karg; + struct lmv_tgt_desc *tgt1, *tgt2; + + tgt1 = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt1)) + RETURN(PTR_ERR(tgt1)); + + tgt2 = lmv_find_target(lmv, &op_data->op_fid2); + if (IS_ERR(tgt2)) + RETURN(PTR_ERR(tgt2)); + + if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL)) RETURN(-EINVAL); - rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg); + /* only files on same MDT can have their layouts swapped */ + if (tgt1->ltd_idx != tgt2->ltd_idx) + RETURN(-EPERM); + + rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg); + break; + } + case LL_IOC_HSM_CT_START: { + struct lustre_kernelcomm *lk = karg; + if (lk->lk_flags & LK_FLG_STOP) + rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg); + else + rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg); break; } default: @@ -962,32 +1244,62 @@ static int lmv_placement_policy(struct obd_device *obd, struct md_op_data *op_data, mdsno_t *mds) { + struct lmv_obd *lmv = &obd->u.lmv; + ENTRY; + LASSERT(mds != NULL); - /* Allocate new fid on target according to to different - * QOS policy. In DNE phase I, llite should always tell - * which MDT where the dir will be located */ - *mds = op_data->op_mds; + if (lmv->desc.ld_tgt_count == 1) { + *mds = 0; + RETURN(0); + } + + /** + * If stripe_offset is provided during setdirstripe + * (setdirstripe -i xx), xx MDS will be choosen. + */ + if (op_data->op_cli_flags & CLI_SET_MEA) { + struct lmv_user_md *lum; + + lum = (struct lmv_user_md *)op_data->op_data; + if (lum->lum_type == LMV_STRIPE_TYPE && + lum->lum_stripe_offset != -1) { + if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) { + CERROR("%s: Stripe_offset %d > MDT count %d:" + " rc = %d\n", obd->obd_name, + lum->lum_stripe_offset, + lmv->desc.ld_tgt_count, -ERANGE); + RETURN(-ERANGE); + } + *mds = lum->lum_stripe_offset; + RETURN(0); + } + } + /* Allocate new fid on target according to operation type and parent + * home mds. */ + *mds = op_data->op_mds; RETURN(0); } int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, - mdsno_t mds) + mdsno_t mds) { - struct lmv_tgt_desc *tgt; - int rc; - ENTRY; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; - tgt = lmv_get_target(lmv, mds); + tgt = lmv_get_target(lmv, mds); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); - /* - * New seq alloc and FLD setup should be atomic. Otherwise we may find - * on server that seq in new allocated fid is not yet known. - */ + /* + * New seq alloc and FLD setup should be atomic. Otherwise we may find + * on server that seq in new allocated fid is not yet known. + */ mutex_lock(&tgt->ltd_fid_mutex); - if (tgt == NULL || tgt->ltd_active == 0 || tgt->ltd_exp == NULL) + if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) GOTO(out, rc = -ENODEV); /* @@ -1296,8 +1608,7 @@ static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } -static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, - ldlm_iterator_t it, void *data) +static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1319,7 +1630,7 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, for (i = 0; i < lmv->desc.ld_tgt_count; i++) { if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL) continue; - md_change_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data); + md_null_inode(lmv->tgts[i]->ltd_exp, fid); } RETURN(0); @@ -1386,6 +1697,9 @@ struct lmv_tgt_desc struct lmv_tgt_desc *tgt; tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + return tgt; + op_data->op_mds = tgt->ltd_idx; return tgt; @@ -1809,86 +2123,127 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, - struct page **pages, struct ptlrpc_request **request) +/* + * Adjust a set of pages, each page containing an array of lu_dirpages, + * so that each page can be used as a single logical lu_dirpage. + * + * A lu_dirpage is laid out as follows, where s = ldp_hash_start, + * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a + * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end + * value is used as a cookie to request the next lu_dirpage in a + * directory listing that spans multiple pages (two in this example): + * ________ + * | | + * .|--------v------- -----. + * |s|e|f|p|ent|ent| ... |ent| + * '--|-------------- -----' Each CFS_PAGE contains a single + * '------. lu_dirpage. + * .---------v------- -----. + * |s|e|f|p|ent| 0 | ... | 0 | + * '----------------- -----' + * + * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is + * larger than LU_PAGE_SIZE, a single host page may contain multiple + * lu_dirpages. After reading the lu_dirpages from the MDS, the + * ldp_hash_end of the first lu_dirpage refers to the one immediately + * after it in the same CFS_PAGE (arrows simplified for brevity, but + * in general e0==s1, e1==s2, etc.): + * + * .-------------------- -----. + * |s0|e0|f0|p|ent|ent| ... |ent| + * |---v---------------- -----| + * |s1|e1|f1|p|ent|ent| ... |ent| + * |---v---------------- -----| Here, each CFS_PAGE contains + * ... multiple lu_dirpages. + * |---v---------------- -----| + * |s'|e'|f'|p|ent|ent| ... |ent| + * '---|---------------- -----' + * v + * .----------------------------. + * | next CFS_PAGE | + * + * This structure is transformed into a single logical lu_dirpage as follows: + * + * - Replace e0 with e' so the request for the next lu_dirpage gets the page + * labeled 'next CFS_PAGE'. + * + * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether + * a hash collision with the next page exists. + * + * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span + * to the first entry of the next lu_dirpage. + */ +#if PAGE_CACHE_SIZE > LU_PAGE_SIZE +static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - __u64 offset = op_data->op_offset; - int rc; - int i; - /* number of pages read, in CFS_PAGE_SIZE */ - int nrdpgs; - /* number of pages transferred in LU_PAGE_SIZE */ - int nlupgs; - struct lmv_tgt_desc *tgt; - struct lu_dirpage *dp; - struct lu_dirent *ent; - ENTRY; + int i; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + for (i = 0; i < ncfspgs; i++) { + struct lu_dirpage *dp = kmap(pages[i]); + struct lu_dirpage *first = dp; + struct lu_dirent *end_dirent = NULL; + struct lu_dirent *ent; + __u64 hash_end = dp->ldp_hash_end; + __u32 flags = dp->ldp_flags; - CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", - offset, PFID(&op_data->op_fid1)); + for (; nlupgs > 1; nlupgs--) { + ent = lu_dirent_start(dp); + for (end_dirent = ent; ent != NULL; + end_dirent = ent, ent = lu_dirent_next(ent)); + + /* Advance dp to next lu_dirpage. */ + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + + /* Check if we've reached the end of the CFS_PAGE. */ + if (!((unsigned long)dp & ~CFS_PAGE_MASK)) + break; + + /* Save the hash and flags of this lu_dirpage. */ + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; + + /* Check if lu_dirpage contains no entries. */ + if (!end_dirent) + break; + + /* Enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage. */ + LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0); + end_dirent->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)end_dirent); + } - /* - * This case handle directory lookup in clustered metadata case (i.e. - * split directory is located on multiple md servers.) - * each server keeps directory entries for certain range of hashes. - * E.g. we have N server and suppose hash range is 0 to MAX_HASH. - * first server will keep records with hashes [ 0 ... MAX_HASH /N - 1], - * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and - * so on.... - * readdir can simply start reading entries from 0 - N server in - * order but that will not scale well as all client will request dir in - * to server in same order. - * Following algorithm does optimization: - * Instead of doing readdir in 1, 2, ...., N order, client with a - * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order. - * (every client has rank R) - * But ll_readdir() expect offset range [0 to MAX_HASH/N) but - * since client ask dir from MDS{R} client has pages with offsets - * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj - * on hash values that we get. - * Since these codes might be still useful for sharded directory, so - * Keeping this code for further reference - if (0) { - LASSERT(nr > 0); - seg_size = MAX_HASH_SIZE; - do_div(seg_size, nr); - los = obj->lo_stripes; - tgt = lmv_get_target(lmv, los[0].ls_mds); - rank = lmv_node_rank(tgt->ltd_exp, fid) % nr; - tgt_tmp = offset; - do_div(tgt_tmp, seg_size); - tgt0_idx = do_div(tgt_tmp, nr); - tgt_idx = (tgt0_idx + rank) % nr; - - if (tgt_idx < tgt0_idx) - * Wrap around. - * - * Last segment has unusual length due to division - * rounding. - hash_adj = MAX_HASH_SIZE - seg_size * nr; - else - hash_adj = 0; + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); - hash_adj += rank * seg_size; + kunmap(pages[i]); + } +} +#else +#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0) +#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ - CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" " - LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj, - offset, tgt0_idx, offset + hash_adj, tgt_idx); +static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, + struct page **pages, struct ptlrpc_request **request) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + __u64 offset = op_data->op_offset; + int rc; + int ncfspgs; /* pages read in PAGE_CACHE_SIZE */ + int nlupgs; /* pages read in LU_PAGE_SIZE */ + struct lmv_tgt_desc *tgt; + ENTRY; - offset = (offset + hash_adj) & MAX_HASH_SIZE; - rid = lsm->mea_oinfo[tgt_idx].lmo_fid; - tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", + offset, PFID(&op_data->op_fid1)); - CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n", - PFID(&rid), (unsigned long)offset, tgt_idx); - } - */ tgt = lmv_find_target(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1897,60 +2252,17 @@ static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, if (rc != 0) RETURN(rc); - nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1) - >> CFS_PAGE_SHIFT; + ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); - LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages); + LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages); - CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs, op_data->op_npages); - for (i = 0; i < nrdpgs; i++) { -#if CFS_PAGE_SIZE > LU_PAGE_SIZE - struct lu_dirpage *first; - __u64 hash_end = 0; - __u32 flags = 0; -#endif - struct lu_dirent *tmp = NULL; - - dp = cfs_kmap(pages[i]); - ent = lu_dirent_start(dp); -#if CFS_PAGE_SIZE > LU_PAGE_SIZE - first = dp; - hash_end = dp->ldp_hash_end; -repeat: -#endif - nlupgs--; - - for (tmp = ent; ent != NULL; - tmp = ent, ent = lu_dirent_next(ent)); -#if CFS_PAGE_SIZE > LU_PAGE_SIZE - dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) { - ent = lu_dirent_start(dp); + lmv_adjust_dirpages(pages, ncfspgs, nlupgs); - if (tmp) { - /* enlarge the end entry lde_reclen from 0 to - * first entry of next lu_dirpage, in this way - * several lu_dirpages can be stored into one - * client page on client. */ - tmp = ((void *)tmp) + - le16_to_cpu(tmp->lde_reclen); - tmp->lde_reclen = - cpu_to_le16((char *)(dp->ldp_entries) - - (char *)tmp); - goto repeat; - } - } - first->ldp_hash_end = hash_end; - first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); - first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); -#else - SET_BUT_UNUSED(tmp); -#endif - cfs_kunmap(pages[i]); - } RETURN(rc); } @@ -1960,14 +2272,19 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt = NULL; - int rc; + struct mdt_body *body; + int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - - tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); +retry: + /* Send unlink requests to the MDT where the child is located */ + if (likely(!fid_is_zero(&op_data->op_fid2))) + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); + else + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1993,9 +2310,48 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, if (rc != 0) RETURN(rc); + CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n", + PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx); + rc = md_unlink(tgt->ltd_exp, op_data, request); + if (rc != 0 && rc != -EREMOTE) + RETURN(rc); - RETURN(rc); + body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + + /* Not cross-ref case, just get out of here. */ + if (likely(!(body->valid & OBD_MD_MDS))) + RETURN(0); + + CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n", + exp->exp_obd->obd_name, PFID(&body->fid1)); + + /* This is a remote object, try remote MDT, Note: it may + * try more than 1 time here, Considering following case + * /mnt/lustre is root on MDT0, remote1 is on MDT1 + * 1. Initially A does not know where remote1 is, it send + * unlink RPC to MDT0, MDT0 return -EREMOTE, it will + * resend unlink RPC to MDT1 (retry 1st time). + * + * 2. During the unlink RPC in flight, + * client B mv /mnt/lustre/remote1 /mnt/lustre/remote2 + * and create new remote1, but on MDT0 + * + * 3. MDT1 get unlink RPC(from A), then do remote lock on + * /mnt/lustre, then lookup get fid of remote1, and find + * it is remote dir again, and replay -EREMOTE again. + * + * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times). + * + * In theory, it might try unlimited time here, but it should + * be very rare case. */ + op_data->op_fid2 = body->fid1; + ptlrpc_req_finished(*request); + *request = NULL; + + goto retry; } static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -2068,10 +2424,8 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, */ rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key, vallen, val, NULL); - if (!rc && KEY_IS(KEY_CONN_DATA)) { - exp->exp_connect_flags = - ((struct obd_connect_data *)val)->ocd_connect_flags; - } + if (!rc && KEY_IS(KEY_CONN_DATA)) + exp->exp_connect_data = *(struct obd_connect_data *)val; RETURN(rc); } else if (KEY_IS(KEY_TGT_COUNT)) { *((int *)val) = lmv->desc.ld_tgt_count; @@ -2162,10 +2516,10 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, meap->mea_count = cpu_to_le32(lsmp->mea_count); meap->mea_master = cpu_to_le32(lsmp->mea_master); - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - meap->mea_ids[i] = meap->mea_ids[i]; - fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]); - } + for (i = 0; i < lmv->desc.ld_tgt_count; i++) { + meap->mea_ids[i] = lsmp->mea_ids[i]; + fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]); + } RETURN(mea_size); } @@ -2543,7 +2897,7 @@ struct obd_ops lmv_obd_ops = { struct md_ops lmv_md_ops = { .m_getstatus = lmv_getstatus, - .m_change_cbdata = lmv_change_cbdata, + .m_null_inode = lmv_null_inode, .m_find_cbdata = lmv_find_cbdata, .m_close = lmv_close, .m_create = lmv_create,