X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flmv%2Flmv_obd.c;h=745c215128f796c322590f0f59696a1eadb37ee2;hp=e7e67b8805f9c4b80e3c3c8ca4e0a5e5326e8e93;hb=c15a14fbeb566705a2cdd6a471da1155c7b8b444;hpb=38fa948b83523004576a9c19cf38258bca67ad50 diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index e7e67b8..745c215 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -35,7 +35,6 @@ */ #define DEBUG_SUBSYSTEM S_LMV -#ifdef __KERNEL__ #include #include #include @@ -45,9 +44,6 @@ #include #include #include -#else -#include -#endif #include #include @@ -60,6 +56,7 @@ #include #include #include +#include #include "lmv_internal.h" /* This hash is only for testing purpose */ @@ -89,39 +86,35 @@ lmv_hash_fnv1a(unsigned int count, const char *name, int namelen) return hash; } -int lmv_name_to_stripe_index(enum lmv_hash_type hashtype, - unsigned int max_mdt_index, +int lmv_name_to_stripe_index(__u32 lmv_hash_type, unsigned int stripe_count, const char *name, int namelen) { int idx; + __u32 hash_type = lmv_hash_type & LMV_HASH_TYPE_MASK; LASSERT(namelen > 0); - if (max_mdt_index <= 1) + if (stripe_count <= 1) + return 0; + + /* for migrating object, always start from 0 stripe */ + if (lmv_hash_type & LMV_HASH_FLAG_MIGRATION) return 0; - switch (hashtype) { + switch (hash_type) { case LMV_HASH_TYPE_ALL_CHARS: - idx = lmv_hash_all_chars(max_mdt_index, name, namelen); + idx = lmv_hash_all_chars(stripe_count, name, namelen); break; case LMV_HASH_TYPE_FNV_1A_64: - idx = lmv_hash_fnv1a(max_mdt_index, name, namelen); + idx = lmv_hash_fnv1a(stripe_count, name, namelen); break; - /* LMV_HASH_TYPE_MIGRATION means the file is being migrated, - * and the file should be accessed by client, except for - * lookup(see lmv_intent_lookup), return -EACCES here */ - case LMV_HASH_TYPE_MIGRATION: - CERROR("%.*s is being migrated: rc = %d\n", namelen, - name, -EACCES); - return -EACCES; default: - CERROR("Unknown hash type 0x%x\n", hashtype); - return -EINVAL; + idx = -EBADFD; + break; } CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name, - hashtype, idx); + hash_type, idx); - LASSERT(idx < max_mdt_index); return idx; } @@ -301,14 +294,16 @@ static int lmv_connect(const struct lu_env *env, if (data) lmv->conn_data = *data; - if (obd->obd_type->typ_procsym == NULL) { - obd->obd_type->typ_procsym = lprocfs_seq_register("target_obds", - obd->obd_proc_entry, - NULL, NULL); - if (IS_ERR(obd->obd_type->typ_procsym)) { - CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.", - obd->obd_type->typ_name, obd->obd_name); - obd->obd_type->typ_procsym = NULL; + if (lmv->targets_proc_entry == NULL) { + lmv->targets_proc_entry = lprocfs_seq_register("target_obds", + obd->obd_proc_entry, + NULL, NULL); + if (IS_ERR(lmv->targets_proc_entry)) { + CERROR("%s: cannot register " + "/proc/fs/lustre/%s/%s/target_obds\n", + obd->obd_name, obd->obd_type->typ_name, + obd->obd_name); + lmv->targets_proc_entry = NULL; } } @@ -321,9 +316,9 @@ static int lmv_connect(const struct lu_env *env, if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_REAL)) rc = lmv_check_connect(obd); - if (rc && obd->obd_type->typ_procsym != NULL) - lprocfs_remove(&obd->obd_type->typ_procsym); - RETURN(rc); + if (rc && lmv->targets_proc_entry != NULL) + lprocfs_remove(&lmv->targets_proc_entry); + RETURN(rc); } static void lmv_set_timeouts(struct obd_device *obd) @@ -393,7 +388,7 @@ static int lmv_init_ea_size(struct obd_export *exp, int easize, cookiesize, def_cookiesize); if (rc) { CERROR("%s: obd_init_ea_size() failed on MDT target %d:" - " rc = %d.\n", obd->obd_name, i, rc); + " rc = %d\n", obd->obd_name, i, rc); break; } } @@ -482,22 +477,21 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_obd->obd_name, mdc_obd->obd_uuid.uuid, atomic_read(&obd->obd_refcount)); - if (obd->obd_type->typ_procsym != NULL) { + if (lmv->targets_proc_entry != NULL) { struct proc_dir_entry *mdc_symlink; LASSERT(mdc_obd->obd_type != NULL); LASSERT(mdc_obd->obd_type->typ_name != NULL); mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name, - obd->obd_type->typ_procsym, + lmv->targets_proc_entry, "../../../%s/%s", mdc_obd->obd_type->typ_name, mdc_obd->obd_name); if (mdc_symlink == NULL) { - CERROR("Could not register LMV target " - "/proc/fs/lustre/%s/%s/target_obds/%s.", + CERROR("cannot register LMV target " + "/proc/fs/lustre/%s/%s/target_obds/%s\n", obd->obd_type->typ_name, obd->obd_name, mdc_obd->obd_name); - lprocfs_remove(&obd->obd_type->typ_procsym); } } RETURN(0); @@ -518,6 +512,7 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, { struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; + int orig_tgt_count = 0; int rc = 0; ENTRY; @@ -589,14 +584,17 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, tgt->ltd_uuid = *uuidp; tgt->ltd_active = 0; lmv->tgts[index] = tgt; - if (index >= lmv->desc.ld_tgt_count) + if (index >= lmv->desc.ld_tgt_count) { + orig_tgt_count = lmv->desc.ld_tgt_count; lmv->desc.ld_tgt_count = index + 1; + } if (lmv->connected) { rc = lmv_connect_mdc(obd, tgt); - if (rc) { + if (rc != 0) { spin_lock(&lmv->lmv_lock); - lmv->desc.ld_tgt_count--; + if (lmv->desc.ld_tgt_count == index + 1) + lmv->desc.ld_tgt_count = orig_tgt_count; memset(tgt, 0, sizeof(*tgt)); spin_unlock(&lmv->lmv_lock); } else { @@ -703,9 +701,9 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt) mdc_obd->obd_no_recov = obd->obd_no_recov; } - if (obd->obd_type->typ_procsym != NULL) + if (lmv->targets_proc_entry != NULL) lprocfs_remove_proc_entry(mdc_obd->obd_name, - obd->obd_type->typ_procsym); + lmv->targets_proc_entry); rc = obd_fid_fini(tgt->ltd_exp->exp_obd); if (rc) @@ -754,8 +752,8 @@ static int lmv_disconnect(struct obd_export *exp) lmv_disconnect_mdc(obd, lmv->tgts[i]); } - if (obd->obd_type->typ_procsym != NULL) - lprocfs_remove(&obd->obd_type->typ_procsym); + if (lmv->targets_proc_entry != NULL) + lprocfs_remove(&lmv->targets_proc_entry); else CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n", obd->obd_type->typ_name, obd->obd_name); @@ -1357,7 +1355,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, int rc; ENTRY; - tgt = lmv_get_target(lmv, mds); + tgt = lmv_get_target(lmv, mds, NULL); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -1370,14 +1368,14 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) GOTO(out, rc = -ENODEV); - /* - * Asking underlaying tgt layer to allocate new fid. - */ - rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); - if (rc > 0) { - LASSERT(fid_is_sane(fid)); - rc = 0; - } + /* + * Asking underlying tgt layer to allocate new fid. + */ + rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL); + if (rc > 0) { + LASSERT(fid_is_sane(fid)); + rc = 0; + } EXIT; out: @@ -1385,8 +1383,8 @@ out: return rc; } -int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -1704,27 +1702,39 @@ static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid, ldlm_iterator_t it, void *data) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - __u32 i; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + int i; + int tgt; + int rc; + ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); - CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); + CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); /* * With DNE every object can have two locks in different namespaces: * lookup lock in space of MDT storing direntry and update/open lock in - * space of MDT storing inode. + * space of MDT storing inode. Try the MDT that the FID maps to first, + * since this can be easily found, and only try others if that fails. */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL) + for (i = 0, tgt = lmv_find_target_index(lmv, fid); + i < lmv->desc.ld_tgt_count; + i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) { + if (tgt < 0) { + CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n", + obd->obd_name, PFID(fid), tgt); + tgt = 0; + } + + if (lmv->tgts[tgt] == NULL || + lmv->tgts[tgt]->ltd_exp == NULL) continue; - rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data); + + rc = md_find_cbdata(lmv->tgts[tgt]->ltd_exp, fid, it, data); if (rc) RETURN(rc); } @@ -1771,15 +1781,32 @@ lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, oinfo = lsm_name_to_stripe_info(lsm, name, namelen); if (IS_ERR(oinfo)) - RETURN((void *)oinfo); + RETURN(ERR_CAST(oinfo)); *fid = oinfo->lmo_fid; *mds = oinfo->lmo_mds; - tgt = lmv_get_target(lmv, *mds); + tgt = lmv_get_target(lmv, *mds, NULL); CDEBUG(D_INFO, "locate on mds %u "DFID"\n", *mds, PFID(fid)); return tgt; } +/** + * Locate mds by fid or name + * + * For striped directory (lsm != NULL), it will locate the stripe + * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type + * is unknown, it will return -EBADFD, and lmv_intent_lookup might need + * walk through all of stripes to locate the entry. + * + * For normal direcotry, it will locate MDS by FID directly. + * \param[in] lmv LMV device + * \param[in] op_data client MD stack parameters, name, namelen + * mds_num etc. + * \param[in] fid object FID used to locate MDS. + * + * retval pointer to the lmv_tgt_desc if succeed. + * ERR_PTR(errno) if failed. + */ struct lmv_tgt_desc *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, struct lu_fid *fid) @@ -1787,9 +1814,7 @@ struct lmv_tgt_desc struct lmv_stripe_md *lsm = op_data->op_mea1; struct lmv_tgt_desc *tgt; - if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 || - op_data->op_namelen == 0 || - lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) { + if (lsm == NULL || op_data->op_namelen == 0) { tgt = lmv_find_target(lmv, fid); if (IS_ERR(tgt)) return tgt; @@ -1829,7 +1854,7 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), op_data->op_mds); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); @@ -1878,69 +1903,10 @@ static int lmv_done_writing(struct obd_export *exp, } static int -lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - __u64 extra_lock_flags) -{ - struct ptlrpc_request *req = it->d.lustre.it_data; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lustre_handle plock; - struct lmv_tgt_desc *tgt; - struct md_op_data *rdata; - struct lu_fid fid1; - struct mdt_body *body; - int rc = 0; - int pmode; - ENTRY; - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); - - if (!(body->valid & OBD_MD_MDS)) - RETURN(0); - - CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n", - LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1)); - - /* - * We got LOOKUP lock, but we really need attrs. - */ - pmode = it->d.lustre.it_lock_mode; - LASSERT(pmode != 0); - memcpy(&plock, lockh, sizeof(plock)); - it->d.lustre.it_lock_mode = 0; - it->d.lustre.it_data = NULL; - fid1 = body->fid1; - - ptlrpc_req_finished(req); - - tgt = lmv_find_target(lmv, &fid1); - if (IS_ERR(tgt)) - GOTO(out, rc = PTR_ERR(tgt)); - - OBD_ALLOC_PTR(rdata); - if (rdata == NULL) - GOTO(out, rc = -ENOMEM); - - rdata->op_fid1 = fid1; - rdata->op_bias = MDS_CROSS_REF; - - rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh, - lmm, lmmsize, NULL, extra_lock_flags); - OBD_FREE_PTR(rdata); - EXIT; -out: - ldlm_lock_decref(&plock, pmode); - return rc; -} - -static int lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - struct ptlrpc_request **req, __u64 extra_lock_flags) + const union ldlm_policy_data *policy, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, __u64 extra_lock_flags) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1962,13 +1928,9 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n", LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx); - rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh, - lmm, lmmsize, req, extra_lock_flags); + rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh, + extra_lock_flags); - if (rc == 0 && it && it->it_op == IT_OPEN) { - rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, - lmm, lmmsize, extra_lock_flags); - } RETURN(rc); } @@ -2003,8 +1965,8 @@ lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data, body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); - if (body->valid & OBD_MD_MDS) { - struct lu_fid rid = body->fid1; + if (body->mbo_valid & OBD_MD_MDS) { + struct lu_fid rid = body->mbo_fid1; CDEBUG(D_INODE, "Request attrs for "DFID"\n", PFID(&rid)); @@ -2154,7 +2116,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, if (op_data->op_cli_flags & CLI_MIGRATE) { LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n", PFID(&op_data->op_fid3)); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) RETURN(rc); src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid3); @@ -2297,124 +2259,187 @@ static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -/* - * Adjust a set of pages, each page containing an array of lu_dirpages, - * so that each page can be used as a single logical lu_dirpage. - * - * A lu_dirpage is laid out as follows, where s = ldp_hash_start, - * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a - * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end - * value is used as a cookie to request the next lu_dirpage in a - * directory listing that spans multiple pages (two in this example): - * ________ - * | | - * .|--------v------- -----. - * |s|e|f|p|ent|ent| ... |ent| - * '--|-------------- -----' Each CFS_PAGE contains a single - * '------. lu_dirpage. - * .---------v------- -----. - * |s|e|f|p|ent| 0 | ... | 0 | - * '----------------- -----' - * - * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is - * larger than LU_PAGE_SIZE, a single host page may contain multiple - * lu_dirpages. After reading the lu_dirpages from the MDS, the - * ldp_hash_end of the first lu_dirpage refers to the one immediately - * after it in the same CFS_PAGE (arrows simplified for brevity, but - * in general e0==s1, e1==s2, etc.): - * - * .-------------------- -----. - * |s0|e0|f0|p|ent|ent| ... |ent| - * |---v---------------- -----| - * |s1|e1|f1|p|ent|ent| ... |ent| - * |---v---------------- -----| Here, each CFS_PAGE contains - * ... multiple lu_dirpages. - * |---v---------------- -----| - * |s'|e'|f'|p|ent|ent| ... |ent| - * '---|---------------- -----' - * v - * .----------------------------. - * | next CFS_PAGE | +/** + * Get current minimum entry from striped directory * - * This structure is transformed into a single logical lu_dirpage as follows: + * This function will search the dir entry, whose hash value is the + * closest(>=) to @hash_offset, from all of sub-stripes, and it is + * only being called for striped directory. * - * - Replace e0 with e' so the request for the next lu_dirpage gets the page - * labeled 'next CFS_PAGE'. + * \param[in] exp export of LMV + * \param[in] op_data parameters transferred beween client MD stack + * stripe_information will be included in this + * parameter + * \param[in] cb_op ldlm callback being used in enqueue in + * mdc_read_page + * \param[in] hash_offset the hash value, which is used to locate + * minum(closet) dir entry + * \param[in|out] stripe_offset the caller use this to indicate the stripe + * index of last entry, so to avoid hash conflict + * between stripes. It will also be used to + * return the stripe index of current dir entry. + * \param[in|out] entp the minum entry and it also is being used + * to input the last dir entry to resolve the + * hash conflict * - * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether - * a hash collision with the next page exists. + * \param[out] ppage the page which holds the minum entry * - * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span - * to the first entry of the next lu_dirpage. + * \retval = 0 get the entry successfully + * negative errno (< 0) does not get the entry */ -#if PAGE_CACHE_SIZE > LU_PAGE_SIZE -static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs) +static int lmv_get_min_striped_entry(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 hash_offset, int *stripe_offset, + struct lu_dirent **entp, + struct page **ppage) { - int i; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lmv_tgt_desc *tgt; + int stripe_count; + struct lu_dirent *min_ent = NULL; + struct page *min_page = NULL; + int min_idx = 0; + int i; + int rc = 0; + ENTRY; + + stripe_count = lsm->lsm_md_stripe_count; + for (i = 0; i < stripe_count; i++) { + struct lu_dirent *ent = NULL; + struct page *page = NULL; + struct lu_dirpage *dp; + __u64 stripe_hash = hash_offset; + + tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL); + if (IS_ERR(tgt)) + GOTO(out, rc = PTR_ERR(tgt)); + + /* op_data will be shared by each stripe, so we need + * reset these value for each stripe */ + op_data->op_stripe_offset = i; + op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root; +next: + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash, + &page); + if (rc != 0) + GOTO(out, rc); + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + /* Skip dummy entry */ + if (le16_to_cpu(ent->lde_namelen) == 0) + continue; + + if (le64_to_cpu(ent->lde_hash) < hash_offset) + continue; + + if (le64_to_cpu(ent->lde_hash) == hash_offset && + (*entp == ent || i < *stripe_offset)) + continue; + + /* skip . and .. for other stripes */ + if (i != 0 && + (strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 || + strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0)) + continue; + break; + } + + if (ent == NULL) { + stripe_hash = le64_to_cpu(dp->ldp_hash_end); - for (i = 0; i < ncfspgs; i++) { - struct lu_dirpage *dp = kmap(pages[i]); - struct lu_dirpage *first = dp; - struct lu_dirent *end_dirent = NULL; - struct lu_dirent *ent; - __u64 hash_end = dp->ldp_hash_end; - __u32 flags = dp->ldp_flags; - - while (--nlupgs > 0) { - ent = lu_dirent_start(dp); - for (end_dirent = ent; ent != NULL; - end_dirent = ent, ent = lu_dirent_next(ent)); - - /* Advance dp to next lu_dirpage. */ - dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - - /* Check if we've reached the end of the CFS_PAGE. */ - if (!((unsigned long)dp & ~CFS_PAGE_MASK)) - break; - - /* Save the hash and flags of this lu_dirpage. */ - hash_end = dp->ldp_hash_end; - flags = dp->ldp_flags; - - /* Check if lu_dirpage contains no entries. */ - if (!end_dirent) - break; - - /* Enlarge the end entry lde_reclen from 0 to - * first entry of next lu_dirpage. */ - LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0); - end_dirent->lde_reclen = - cpu_to_le16((char *)(dp->ldp_entries) - - (char *)end_dirent); + kunmap(page); + page_cache_release(page); + page = NULL; + + /* reach the end of current stripe, go to next stripe */ + if (stripe_hash == MDS_DIR_END_OFF) + continue; + else + goto next; } - first->ldp_hash_end = hash_end; - first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); - first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); + if (min_ent != NULL) { + if (le64_to_cpu(min_ent->lde_hash) > + le64_to_cpu(ent->lde_hash)) { + min_ent = ent; + kunmap(min_page); + page_cache_release(min_page); + min_idx = i; + min_page = page; + } else { + kunmap(page); + page_cache_release(page); + page = NULL; + } + } else { + min_ent = ent; + min_page = page; + min_idx = i; + } + } - kunmap(pages[i]); +out: + if (*ppage != NULL) { + kunmap(*ppage); + page_cache_release(*ppage); } - LASSERTF(nlupgs == 0, "left = %d", nlupgs); + *stripe_offset = min_idx; + *entp = min_ent; + *ppage = min_page; + RETURN(rc); } -#else -#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0) -#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ -#define NORMAL_MAX_STRIPES 4 -int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, struct lu_dirent **ldp, - struct page **ppage) +/** + * Build dir entry page from a striped directory + * + * This function gets one entry by @offset from a striped directory. It will + * read entries from all of stripes, and choose one closest to the required + * offset(&offset). A few notes + * 1. skip . and .. for non-zero stripes, because there can only have one . + * and .. in a directory. + * 2. op_data will be shared by all of stripes, instead of allocating new + * one, so need to restore before reusing. + * 3. release the entry page if that is not being chosen. + * + * \param[in] exp obd export refer to LMV + * \param[in] op_data hold those MD parameters of read_entry + * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry + * \param[out] ldp the entry being read + * \param[out] ppage the page holding the entry. Note: because the entry + * will be accessed in upper layer, so we need hold the + * page until the usages of entry is finished, see + * ll_dir_entry_next. + * + * retval =0 if get entry successfully + * <0 cannot get entry + */ +static int lmv_read_striped_page(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 offset, struct page **ppage) { struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *lsm = op_data->op_mea1; - struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES]; - struct lu_dirent **ents = NULL; - int stripe_count; - __u64 min_hash; - int min_idx = 0; - struct page *min_page = NULL; - int i; + struct lu_fid master_fid = op_data->op_fid1; + struct inode *master_inode = op_data->op_data; + __u64 hash_offset = offset; + struct lu_dirpage *dp; + struct page *min_ent_page = NULL; + struct page *ent_page = NULL; + struct lu_dirent *ent; + void *area; + int ent_idx = 0; + struct lu_dirent *min_ent = NULL; + struct lu_dirent *last_ent; + int left_bytes; int rc; ENTRY; @@ -2422,72 +2447,159 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, if (rc) RETURN(rc); - if (lsm == NULL) - stripe_count = 1; - else - stripe_count = lsm->lsm_md_stripe_count; - - if (stripe_count > NORMAL_MAX_STRIPES) { - OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count); - if (ents == NULL) - GOTO(out, rc = -ENOMEM); - } else { - ents = tmp_ents; - memset(ents, 0, sizeof(ents[0]) * stripe_count); - } + /* Allocate a page and read entries from all of stripes and fill + * the page by hash order */ + ent_page = alloc_page(GFP_KERNEL); + if (ent_page == NULL) + RETURN(-ENOMEM); - min_hash = MDS_DIR_END_OFF; - for (i = 0; i < stripe_count; i++) { - struct lmv_tgt_desc *tgt; - struct page *page = NULL; + /* Initialize the entry page */ + dp = kmap(ent_page); + memset(dp, 0, sizeof(*dp)); + dp->ldp_hash_start = cpu_to_le64(offset); + dp->ldp_flags |= LDF_COLLIDE; + + area = dp + 1; + left_bytes = PAGE_CACHE_SIZE - sizeof(*dp); + ent = area; + last_ent = ent; + do { + __u16 ent_size; + + /* Find the minum entry from all sub-stripes */ + rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset, + &ent_idx, &min_ent, + &min_ent_page); + if (rc != 0) + GOTO(out, rc); - if (likely(lsm == NULL)) { - tgt = lmv_find_target(lmv, &op_data->op_fid1); - if (IS_ERR(tgt)) - GOTO(out, rc = PTR_ERR(tgt)); - LASSERT(op_data->op_data != NULL); - } else { - tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds); - if (IS_ERR(tgt)) - GOTO(out, rc = PTR_ERR(tgt)); - op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid; - op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid; - op_data->op_stripe_offset = i; + /* If it can not get minum entry, it means it already reaches + * the end of this directory */ + if (min_ent == NULL) { + last_ent->lde_reclen = 0; + hash_offset = MDS_DIR_END_OFF; + GOTO(out, rc); } - rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i], - &page); - if (rc != 0) + ent_size = le16_to_cpu(min_ent->lde_reclen); + + /* the last entry lde_reclen is 0, but it might not + * the end of this entry of this temporay entry */ + if (ent_size == 0) + ent_size = lu_dirent_calc_size( + le16_to_cpu(min_ent->lde_namelen), + le32_to_cpu(min_ent->lde_attrs)); + if (ent_size > left_bytes) { + last_ent->lde_reclen = cpu_to_le16(0); + hash_offset = le64_to_cpu(min_ent->lde_hash); GOTO(out, rc); - - if (ents[i] != NULL && - le64_to_cpu(ents[i]->lde_hash) <= min_hash) { - if (min_page != NULL) - page_cache_release(min_page); - min_page = page; - min_hash = le64_to_cpu(ents[i]->lde_hash); - min_idx = i; } - } - if (min_hash != MDS_DIR_END_OFF) - *ldp = ents[min_idx]; - else - *ldp = NULL; + memcpy(ent, min_ent, ent_size); + + /* Replace . with master FID and Replace .. with the parent FID + * of master object */ + if (strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 1) + fid_cpu_to_le(&ent->lde_fid, &master_fid); + else if (strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 2) + fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3); + + left_bytes -= ent_size; + ent->lde_reclen = cpu_to_le16(ent_size); + last_ent = ent; + ent = (void *)ent + ent_size; + hash_offset = le64_to_cpu(min_ent->lde_hash); + if (hash_offset == MDS_DIR_END_OFF) { + last_ent->lde_reclen = 0; + break; + } + } while (1); out: - if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL) - OBD_FREE(ents, sizeof(ents[0]) * stripe_count); + if (min_ent_page != NULL) { + kunmap(min_ent_page); + page_cache_release(min_ent_page); + } - if (rc != 0 && min_page != NULL) { - kunmap(min_page); - page_cache_release(min_page); + if (unlikely(rc != 0)) { + __free_page(ent_page); + ent_page = NULL; } else { - *ppage = min_page; + if (ent == area) + dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le32(dp->ldp_flags); + dp->ldp_hash_end = cpu_to_le64(hash_offset); + } + + /* We do not want to allocate md_op_data during each + * dir entry reading, so op_data will be shared by every stripe, + * then we need to restore it back to original value before + * return to the upper layer */ + op_data->op_fid1 = master_fid; + op_data->op_fid2 = master_fid; + op_data->op_data = master_inode; + + *ppage = ent_page; + + RETURN(rc); +} + +int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, __u64 offset, + struct page **ppage) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lmv_tgt_desc *tgt; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc != 0) + RETURN(rc); + + if (unlikely(lsm != NULL)) { + rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage); + RETURN(rc); } + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); + + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage); + RETURN(rc); } +/** + * Unlink a file/directory + * + * Unlink a file or directory under the parent dir. The unlink request + * usually will be sent to the MDT where the child is located, but if + * the client does not have the child FID then request will be sent to the + * MDT where the parent is located. + * + * If the parent is a striped directory then it also needs to locate which + * stripe the name of the child is located, and replace the parent FID + * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown, + * it will walk through all of sub-stripes until the child is being + * unlinked finally. + * + * \param[in] exp export refer to LMV + * \param[in] op_data different parameters transferred beween client + * MD stacks, name, namelen, FIDs etc. + * op_fid1 is the parent FID, op_fid2 is the child + * FID. + * \param[out] request point to the request of unlink. + * + * retval 0 if succeed + * negative errno if failed. + */ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { @@ -2497,39 +2609,58 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct lmv_tgt_desc *parent_tgt = NULL; struct mdt_body *body; int rc; + int stripe_index = 0; + struct lmv_stripe_md *lsm = op_data->op_mea1; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); -retry: +retry_unlink: + /* For striped dir, we need to locate the parent as well */ + if (lsm != NULL) { + struct lmv_tgt_desc *tmp; + + LASSERT(op_data->op_name != NULL && + op_data->op_namelen != 0); + + tmp = lmv_locate_target_for_name(lmv, lsm, + op_data->op_name, + op_data->op_namelen, + &op_data->op_fid1, + &op_data->op_mds); + + /* return -EBADFD means unknown hash type, might + * need try all sub-stripe here */ + if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD) + RETURN(PTR_ERR(tmp)); + + /* Note: both migrating dir and unknown hash dir need to + * try all of sub-stripes, so we need start search the + * name from stripe 0, but migrating dir is already handled + * inside lmv_locate_target_for_name(), so we only check + * unknown hash type directory here */ + if (!lmv_is_known_hash_type(lsm)) { + struct lmv_oinfo *oinfo; + + oinfo = &lsm->lsm_md_oinfo[stripe_index]; + + op_data->op_fid1 = oinfo->lmo_fid; + op_data->op_mds = oinfo->lmo_mds; + } + } + +try_next_stripe: /* Send unlink requests to the MDT where the child is located */ - if (likely(!fid_is_zero(&op_data->op_fid2))) { + if (likely(!fid_is_zero(&op_data->op_fid2))) tgt = lmv_find_target(lmv, &op_data->op_fid2); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - - /* For striped dir, we need to locate the parent as well */ - if (op_data->op_mea1 != NULL && - op_data->op_mea1->lsm_md_stripe_count > 1) { - struct lmv_tgt_desc *tmp; - - LASSERT(op_data->op_name != NULL && - op_data->op_namelen != 0); - tmp = lmv_locate_target_for_name(lmv, - op_data->op_mea1, - op_data->op_name, - op_data->op_namelen, - &op_data->op_fid1, - &op_data->op_mds); - if (IS_ERR(tmp)) - RETURN(PTR_ERR(tmp)); - } - } else { + else if (lsm != NULL) + tgt = lmv_get_target(lmv, op_data->op_mds, NULL); + else tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); - if (IS_ERR(tgt)) - RETURN(PTR_ERR(tgt)); - } + + if (IS_ERR(tgt)) + RETURN(PTR_ERR(tgt)); op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); @@ -2566,19 +2697,38 @@ retry: PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx); rc = md_unlink(tgt->ltd_exp, op_data, request); - if (rc != 0 && rc != -EREMOTE) + if (rc != 0 && rc != -EREMOTE && rc != -ENOENT) RETURN(rc); + /* Try next stripe if it is needed. */ + if (rc == -ENOENT && lsm != NULL && lmv_need_try_all_stripes(lsm)) { + struct lmv_oinfo *oinfo; + + stripe_index++; + if (stripe_index >= lsm->lsm_md_stripe_count) + RETURN(rc); + + oinfo = &lsm->lsm_md_oinfo[stripe_index]; + + op_data->op_fid1 = oinfo->lmo_fid; + op_data->op_mds = oinfo->lmo_mds; + + ptlrpc_req_finished(*request); + *request = NULL; + + goto try_next_stripe; + } + body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); if (body == NULL) RETURN(-EPROTO); /* Not cross-ref case, just get out of here. */ - if (likely(!(body->valid & OBD_MD_MDS))) + if (likely(!(body->mbo_valid & OBD_MD_MDS))) RETURN(0); CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n", - exp->exp_obd->obd_name, PFID(&body->fid1)); + exp->exp_obd->obd_name, PFID(&body->mbo_fid1)); /* This is a remote object, try remote MDT, Note: it may * try more than 1 time here, Considering following case @@ -2599,11 +2749,11 @@ retry: * * In theory, it might try unlimited time here, but it should * be very rare case. */ - op_data->op_fid2 = body->fid1; + op_data->op_fid2 = body->mbo_fid1; ptlrpc_req_finished(*request); *request = NULL; - goto retry; + goto retry_unlink; } static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -2828,7 +2978,10 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic); lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index); - lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); + if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE)) + lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN; + else + lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version); cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, sizeof(lsm->lsm_md_pool_name)); @@ -2870,13 +3023,15 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, lsm = *lsmp; /* Free memmd */ if (lsm != NULL && lmm == NULL) { -#ifdef __KERNEL__ int i; - for (i = 1; i < lsm->lsm_md_stripe_count; i++) { - if (lsm->lsm_md_oinfo[i].lmo_root != NULL) + for (i = 0; i < lsm->lsm_md_stripe_count; i++) { + /* For migrating inode, the master stripe and master + * object will be the same, so do not need iput, see + * ll_update_lsm_md */ + if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && + i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL) iput(lsm->lsm_md_oinfo[i].lmo_root); } -#endif lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count); OBD_FREE(lsm, lsm_size); *lsmp = NULL; @@ -2894,9 +3049,11 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, RETURN(0); } + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) + RETURN(-EPERM); + /* Unpack memmd */ if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 && - le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_MIGRATE && le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) { CERROR("%s: invalid lmv magic %x: rc = %d\n", exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic), @@ -2904,8 +3061,7 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, RETURN(-EIO); } - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1 || - le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_MIGRATE) + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1) lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm)); else /** @@ -2925,7 +3081,6 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, switch (le32_to_cpu(lmm->lmv_magic)) { case LMV_MAGIC_V1: - case LMV_MAGIC_MIGRATE: rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1); break; default: @@ -3028,33 +3183,42 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags, ldlm_policy_data_t *policy, ldlm_mode_t mode, struct lustre_handle *lockh) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - ldlm_mode_t rc; - __u32 i; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + ldlm_mode_t rc; + int tgt; + int i; + ENTRY; - CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); + CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); /* - * With CMD every object can have two locks in different namespaces: - * lookup lock in space of mds storing direntry and update/open lock in - * space of mds storing inode. Thus we check all targets, not only that - * one fid was created in. - */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; + * With DNE every object can have two locks in different namespaces: + * lookup lock in space of MDT storing direntry and update/open lock in + * space of MDT storing inode. Try the MDT that the FID maps to first, + * since this can be easily found, and only try others if that fails. + */ + for (i = 0, tgt = lmv_find_target_index(lmv, fid); + i < lmv->desc.ld_tgt_count; + i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) { + if (tgt < 0) { + CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n", + obd->obd_name, PFID(fid), tgt); + tgt = 0; + } - if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) + if (lmv->tgts[tgt] == NULL || + lmv->tgts[tgt]->ltd_exp == NULL || + lmv->tgts[tgt]->ltd_active == 0) continue; - rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode, - lockh); - if (rc) - RETURN(rc); - } + rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid, + type, policy, mode, lockh); + if (rc) + RETURN(rc); + } - RETURN(0); + RETURN(0); } int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, @@ -3216,6 +3380,22 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, RETURN(rc); } +int lmv_get_fid_from_lsm(struct obd_export *exp, + const struct lmv_stripe_md *lsm, + const char *name, int namelen, struct lu_fid *fid) +{ + const struct lmv_oinfo *oinfo; + + LASSERT(lsm != NULL); + oinfo = lsm_name_to_stripe_info(lsm, name, namelen); + if (IS_ERR(oinfo)) + return PTR_ERR(oinfo); + + *fid = oinfo->lmo_fid; + + RETURN(0); +} + /** * For lmv, only need to send request to master MDT, and the master MDT will * process with other slave MDTs. The only exception is Q_GETOQUOTA for which @@ -3298,16 +3478,12 @@ int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, int lmv_update_lsm_md(struct obd_export *exp, struct lmv_stripe_md *lsm, struct mdt_body *body, ldlm_blocking_callback cb_blocking) { - if (lsm->lsm_md_stripe_count <= 1) - return 0; - return lmv_revalidate_slaves(exp, body, lsm, cb_blocking, 0); } int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm, struct cl_attr *attr) { -#ifdef __KERNEL__ int i; for (i = 0; i < lsm->lsm_md_stripe_count; i++) { @@ -3336,7 +3512,6 @@ int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm, if (attr->cat_mtime < LTIME_S(inode->i_mtime)) attr->cat_mtime = LTIME_S(inode->i_mtime); } -#endif return 0; } @@ -3377,7 +3552,7 @@ struct md_ops lmv_md_ops = { .m_setattr = lmv_setattr, .m_setxattr = lmv_setxattr, .m_fsync = lmv_fsync, - .m_read_entry = lmv_read_entry, + .m_read_page = lmv_read_page, .m_unlink = lmv_unlink, .m_init_ea_size = lmv_init_ea_size, .m_cancel_unused = lmv_cancel_unused, @@ -3393,7 +3568,8 @@ struct md_ops lmv_md_ops = { .m_unpack_capa = lmv_unpack_capa, .m_get_remote_perm = lmv_get_remote_perm, .m_intent_getattr_async = lmv_intent_getattr_async, - .m_revalidate_lock = lmv_revalidate_lock + .m_revalidate_lock = lmv_revalidate_lock, + .m_get_fid_from_lsm = lmv_get_fid_from_lsm, }; int __init lmv_init(void) @@ -3405,7 +3581,6 @@ int __init lmv_init(void) LUSTRE_LMV_NAME, NULL); } -#ifdef __KERNEL__ static void lmv_exit(void) { class_unregister_type(LUSTRE_LMV_NAME); @@ -3417,4 +3592,3 @@ MODULE_LICENSE("GPL"); module_init(lmv_init); module_exit(lmv_exit); -#endif