*/
#define DEBUG_SUBSYSTEM S_LMV
-#ifdef __KERNEL__
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/math64.h>
#include <linux/seq_file.h>
#include <linux/namei.h>
-#else
-#include <liblustre.h>
-#endif
#include <lustre/lustre_idl.h>
#include <obd_support.h>
idx = lmv_hash_fnv1a(stripe_count, name, namelen);
break;
default:
- CERROR("Unknown hash type 0x%x\n", hash_type);
- return -EINVAL;
+ idx = -EBADFD;
+ break;
}
CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name,
obd->obd_proc_entry,
NULL, NULL);
if (IS_ERR(lmv->targets_proc_entry)) {
- CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
- obd->obd_type->typ_name, obd->obd_name);
+ CERROR("%s: cannot register "
+ "/proc/fs/lustre/%s/%s/target_obds\n",
+ obd->obd_name, obd->obd_type->typ_name,
+ obd->obd_name);
lmv->targets_proc_entry = NULL;
}
}
cookiesize, def_cookiesize);
if (rc) {
CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
- " rc = %d.\n", obd->obd_name, i, rc);
+ " rc = %d\n", obd->obd_name, i, rc);
break;
}
}
mdc_obd->obd_type->typ_name,
mdc_obd->obd_name);
if (mdc_symlink == NULL) {
- CERROR("Could not register LMV target "
- "/proc/fs/lustre/%s/%s/target_obds/%s.",
+ CERROR("cannot register LMV target "
+ "/proc/fs/lustre/%s/%s/target_obds/%s\n",
obd->obd_type->typ_name, obd->obd_name,
mdc_obd->obd_name);
}
{
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
+ int orig_tgt_count = 0;
int rc = 0;
ENTRY;
tgt->ltd_uuid = *uuidp;
tgt->ltd_active = 0;
lmv->tgts[index] = tgt;
- if (index >= lmv->desc.ld_tgt_count)
+ if (index >= lmv->desc.ld_tgt_count) {
+ orig_tgt_count = lmv->desc.ld_tgt_count;
lmv->desc.ld_tgt_count = index + 1;
+ }
if (lmv->connected) {
rc = lmv_connect_mdc(obd, tgt);
- if (rc) {
+ if (rc != 0) {
spin_lock(&lmv->lmv_lock);
- lmv->desc.ld_tgt_count--;
+ if (lmv->desc.ld_tgt_count == index + 1)
+ lmv->desc.ld_tgt_count = orig_tgt_count;
memset(tgt, 0, sizeof(*tgt));
spin_unlock(&lmv->lmv_lock);
} else {
int rc;
ENTRY;
- tgt = lmv_get_target(lmv, mds);
+ tgt = lmv_get_target(lmv, mds, NULL);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
ldlm_iterator_t it, void *data)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- __u32 i;
- int rc;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int i;
+ int tgt;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
+ CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
/*
* With DNE every object can have two locks in different namespaces:
* lookup lock in space of MDT storing direntry and update/open lock in
- * space of MDT storing inode.
+ * space of MDT storing inode. Try the MDT that the FID maps to first,
+ * since this can be easily found, and only try others if that fails.
*/
- for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
- if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
+ for (i = 0, tgt = lmv_find_target_index(lmv, fid);
+ i < lmv->desc.ld_tgt_count;
+ i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
+ if (tgt < 0) {
+ CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
+ obd->obd_name, PFID(fid), tgt);
+ tgt = 0;
+ }
+
+ if (lmv->tgts[tgt] == NULL ||
+ lmv->tgts[tgt]->ltd_exp == NULL)
continue;
- rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
+
+ rc = md_find_cbdata(lmv->tgts[tgt]->ltd_exp, fid, it, data);
if (rc)
RETURN(rc);
}
oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
if (IS_ERR(oinfo))
- RETURN((void *)oinfo);
+ RETURN(ERR_CAST(oinfo));
*fid = oinfo->lmo_fid;
*mds = oinfo->lmo_mds;
- tgt = lmv_get_target(lmv, *mds);
+ tgt = lmv_get_target(lmv, *mds, NULL);
CDEBUG(D_INFO, "locate on mds %u "DFID"\n", *mds, PFID(fid));
return tgt;
}
+/**
+ * Locate mds by fid or name
+ *
+ * For striped directory (lsm != NULL), it will locate the stripe
+ * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
+ * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
+ * walk through all of stripes to locate the entry.
+ *
+ * For normal direcotry, it will locate MDS by FID directly.
+ * \param[in] lmv LMV device
+ * \param[in] op_data client MD stack parameters, name, namelen
+ * mds_num etc.
+ * \param[in] fid object FID used to locate MDS.
+ *
+ * retval pointer to the lmv_tgt_desc if succeed.
+ * ERR_PTR(errno) if failed.
+ */
struct lmv_tgt_desc
*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
struct lu_fid *fid)
}
static int
-lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct md_op_data *op_data,
- struct lustre_handle *lockh, void *lmm, int lmmsize,
- __u64 extra_lock_flags)
-{
- struct ptlrpc_request *req = it->d.lustre.it_data;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle plock;
- struct lmv_tgt_desc *tgt;
- struct md_op_data *rdata;
- struct lu_fid fid1;
- struct mdt_body *body;
- int rc = 0;
- int pmode;
- ENTRY;
-
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- if (!(body->valid & OBD_MD_MDS))
- RETURN(0);
-
- CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
- LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
-
- /*
- * We got LOOKUP lock, but we really need attrs.
- */
- pmode = it->d.lustre.it_lock_mode;
- LASSERT(pmode != 0);
- memcpy(&plock, lockh, sizeof(plock));
- it->d.lustre.it_lock_mode = 0;
- it->d.lustre.it_data = NULL;
- fid1 = body->fid1;
-
- ptlrpc_req_finished(req);
-
- tgt = lmv_find_target(lmv, &fid1);
- if (IS_ERR(tgt))
- GOTO(out, rc = PTR_ERR(tgt));
-
- OBD_ALLOC_PTR(rdata);
- if (rdata == NULL)
- GOTO(out, rc = -ENOMEM);
-
- rdata->op_fid1 = fid1;
- rdata->op_bias = MDS_CROSS_REF;
-
- rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
- lmm, lmmsize, NULL, extra_lock_flags);
- OBD_FREE_PTR(rdata);
- EXIT;
-out:
- ldlm_lock_decref(&plock, pmode);
- return rc;
-}
-
-static int
lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct md_op_data *op_data,
- struct lustre_handle *lockh, void *lmm, int lmmsize,
- struct ptlrpc_request **req, __u64 extra_lock_flags)
+ const union ldlm_policy_data *policy,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, __u64 extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
- rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
- lmm, lmmsize, req, extra_lock_flags);
+ rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
+ extra_lock_flags);
- if (rc == 0 && it && it->it_op == IT_OPEN) {
- rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
- lmm, lmmsize, extra_lock_flags);
- }
RETURN(rc);
}
body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
- if (body->valid & OBD_MD_MDS) {
- struct lu_fid rid = body->fid1;
+ if (body->mbo_valid & OBD_MD_MDS) {
+ struct lu_fid rid = body->mbo_fid1;
CDEBUG(D_INODE, "Request attrs for "DFID"\n",
PFID(&rid));
RETURN(rc);
}
-/*
- * Adjust a set of pages, each page containing an array of lu_dirpages,
- * so that each page can be used as a single logical lu_dirpage.
- *
- * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
- * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
- * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end
- * value is used as a cookie to request the next lu_dirpage in a
- * directory listing that spans multiple pages (two in this example):
- * ________
- * | |
- * .|--------v------- -----.
- * |s|e|f|p|ent|ent| ... |ent|
- * '--|-------------- -----' Each CFS_PAGE contains a single
- * '------. lu_dirpage.
- * .---------v------- -----.
- * |s|e|f|p|ent| 0 | ... | 0 |
- * '----------------- -----'
- *
- * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
- * larger than LU_PAGE_SIZE, a single host page may contain multiple
- * lu_dirpages. After reading the lu_dirpages from the MDS, the
- * ldp_hash_end of the first lu_dirpage refers to the one immediately
- * after it in the same CFS_PAGE (arrows simplified for brevity, but
- * in general e0==s1, e1==s2, etc.):
- *
- * .-------------------- -----.
- * |s0|e0|f0|p|ent|ent| ... |ent|
- * |---v---------------- -----|
- * |s1|e1|f1|p|ent|ent| ... |ent|
- * |---v---------------- -----| Here, each CFS_PAGE contains
- * ... multiple lu_dirpages.
- * |---v---------------- -----|
- * |s'|e'|f'|p|ent|ent| ... |ent|
- * '---|---------------- -----'
- * v
- * .----------------------------.
- * | next CFS_PAGE |
+/**
+ * Get current minimum entry from striped directory
*
- * This structure is transformed into a single logical lu_dirpage as follows:
+ * This function will search the dir entry, whose hash value is the
+ * closest(>=) to @hash_offset, from all of sub-stripes, and it is
+ * only being called for striped directory.
*
- * - Replace e0 with e' so the request for the next lu_dirpage gets the page
- * labeled 'next CFS_PAGE'.
+ * \param[in] exp export of LMV
+ * \param[in] op_data parameters transferred beween client MD stack
+ * stripe_information will be included in this
+ * parameter
+ * \param[in] cb_op ldlm callback being used in enqueue in
+ * mdc_read_page
+ * \param[in] hash_offset the hash value, which is used to locate
+ * minum(closet) dir entry
+ * \param[in|out] stripe_offset the caller use this to indicate the stripe
+ * index of last entry, so to avoid hash conflict
+ * between stripes. It will also be used to
+ * return the stripe index of current dir entry.
+ * \param[in|out] entp the minum entry and it also is being used
+ * to input the last dir entry to resolve the
+ * hash conflict
*
- * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
- * a hash collision with the next page exists.
+ * \param[out] ppage the page which holds the minum entry
*
- * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
- * to the first entry of the next lu_dirpage.
+ * \retval = 0 get the entry successfully
+ * negative errno (< 0) does not get the entry
*/
-#if PAGE_CACHE_SIZE > LU_PAGE_SIZE
-static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
+static int lmv_get_min_striped_entry(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ __u64 hash_offset, int *stripe_offset,
+ struct lu_dirent **entp,
+ struct page **ppage)
{
- int i;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *tgt;
+ int stripe_count;
+ struct lu_dirent *min_ent = NULL;
+ struct page *min_page = NULL;
+ int min_idx = 0;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ stripe_count = lsm->lsm_md_stripe_count;
+ for (i = 0; i < stripe_count; i++) {
+ struct lu_dirent *ent = NULL;
+ struct page *page = NULL;
+ struct lu_dirpage *dp;
+ __u64 stripe_hash = hash_offset;
- for (i = 0; i < ncfspgs; i++) {
- struct lu_dirpage *dp = kmap(pages[i]);
- struct lu_dirpage *first = dp;
- struct lu_dirent *end_dirent = NULL;
- struct lu_dirent *ent;
- __u64 hash_end = dp->ldp_hash_end;
- __u32 flags = dp->ldp_flags;
-
- while (--nlupgs > 0) {
- ent = lu_dirent_start(dp);
- for (end_dirent = ent; ent != NULL;
- end_dirent = ent, ent = lu_dirent_next(ent));
-
- /* Advance dp to next lu_dirpage. */
- dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
-
- /* Check if we've reached the end of the CFS_PAGE. */
- if (!((unsigned long)dp & ~CFS_PAGE_MASK))
- break;
-
- /* Save the hash and flags of this lu_dirpage. */
- hash_end = dp->ldp_hash_end;
- flags = dp->ldp_flags;
-
- /* Check if lu_dirpage contains no entries. */
- if (!end_dirent)
- break;
-
- /* Enlarge the end entry lde_reclen from 0 to
- * first entry of next lu_dirpage. */
- LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
- end_dirent->lde_reclen =
- cpu_to_le16((char *)(dp->ldp_entries) -
- (char *)end_dirent);
+ tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
+ if (IS_ERR(tgt))
+ GOTO(out, rc = PTR_ERR(tgt));
+
+ /* op_data will be shared by each stripe, so we need
+ * reset these value for each stripe */
+ op_data->op_stripe_offset = i;
+ op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
+ op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
+ op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
+next:
+ rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
+ &page);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ /* Skip dummy entry */
+ if (le16_to_cpu(ent->lde_namelen) == 0)
+ continue;
+
+ if (le64_to_cpu(ent->lde_hash) < hash_offset)
+ continue;
+
+ if (le64_to_cpu(ent->lde_hash) == hash_offset &&
+ (*entp == ent || i < *stripe_offset))
+ continue;
+
+ /* skip . and .. for other stripes */
+ if (i != 0 &&
+ (strncmp(ent->lde_name, ".",
+ le16_to_cpu(ent->lde_namelen)) == 0 ||
+ strncmp(ent->lde_name, "..",
+ le16_to_cpu(ent->lde_namelen)) == 0))
+ continue;
+ break;
}
- first->ldp_hash_end = hash_end;
- first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
- first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+ if (ent == NULL) {
+ stripe_hash = le64_to_cpu(dp->ldp_hash_end);
- kunmap(pages[i]);
+ kunmap(page);
+ page_cache_release(page);
+ page = NULL;
+
+ /* reach the end of current stripe, go to next stripe */
+ if (stripe_hash == MDS_DIR_END_OFF)
+ continue;
+ else
+ goto next;
+ }
+
+ if (min_ent != NULL) {
+ if (le64_to_cpu(min_ent->lde_hash) >
+ le64_to_cpu(ent->lde_hash)) {
+ min_ent = ent;
+ kunmap(min_page);
+ page_cache_release(min_page);
+ min_idx = i;
+ min_page = page;
+ } else {
+ kunmap(page);
+ page_cache_release(page);
+ page = NULL;
+ }
+ } else {
+ min_ent = ent;
+ min_page = page;
+ min_idx = i;
+ }
+ }
+
+out:
+ if (*ppage != NULL) {
+ kunmap(*ppage);
+ page_cache_release(*ppage);
}
- LASSERTF(nlupgs == 0, "left = %d", nlupgs);
+ *stripe_offset = min_idx;
+ *entp = min_ent;
+ *ppage = min_page;
+ RETURN(rc);
}
-#else
-#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
-#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
-#define NORMAL_MAX_STRIPES 4
-int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **ldp,
- struct page **ppage)
+/**
+ * Build dir entry page from a striped directory
+ *
+ * This function gets one entry by @offset from a striped directory. It will
+ * read entries from all of stripes, and choose one closest to the required
+ * offset(&offset). A few notes
+ * 1. skip . and .. for non-zero stripes, because there can only have one .
+ * and .. in a directory.
+ * 2. op_data will be shared by all of stripes, instead of allocating new
+ * one, so need to restore before reusing.
+ * 3. release the entry page if that is not being chosen.
+ *
+ * \param[in] exp obd export refer to LMV
+ * \param[in] op_data hold those MD parameters of read_entry
+ * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry
+ * \param[out] ldp the entry being read
+ * \param[out] ppage the page holding the entry. Note: because the entry
+ * will be accessed in upper layer, so we need hold the
+ * page until the usages of entry is finished, see
+ * ll_dir_entry_next.
+ *
+ * retval =0 if get entry successfully
+ * <0 cannot get entry
+ */
+static int lmv_read_striped_page(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ __u64 offset, struct page **ppage)
{
struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *lsm = op_data->op_mea1;
- struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES];
- struct lu_dirent **ents = NULL;
- int stripe_count;
- __u64 min_hash;
- int min_idx = 0;
- struct page *min_page = NULL;
- int i;
+ struct lu_fid master_fid = op_data->op_fid1;
+ struct inode *master_inode = op_data->op_data;
+ __u64 hash_offset = offset;
+ struct lu_dirpage *dp;
+ struct page *min_ent_page = NULL;
+ struct page *ent_page = NULL;
+ struct lu_dirent *ent;
+ void *area;
+ int ent_idx = 0;
+ struct lu_dirent *min_ent = NULL;
+ struct lu_dirent *last_ent;
+ int left_bytes;
int rc;
ENTRY;
if (rc)
RETURN(rc);
- if (lsm == NULL)
- stripe_count = 1;
- else
- stripe_count = lsm->lsm_md_stripe_count;
-
- if (stripe_count > NORMAL_MAX_STRIPES) {
- OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
- if (ents == NULL)
- GOTO(out, rc = -ENOMEM);
- } else {
- ents = tmp_ents;
- memset(ents, 0, sizeof(ents[0]) * stripe_count);
- }
+ /* Allocate a page and read entries from all of stripes and fill
+ * the page by hash order */
+ ent_page = alloc_page(GFP_KERNEL);
+ if (ent_page == NULL)
+ RETURN(-ENOMEM);
- min_hash = MDS_DIR_END_OFF;
- for (i = 0; i < stripe_count; i++) {
- struct lmv_tgt_desc *tgt;
- struct page *page = NULL;
+ /* Initialize the entry page */
+ dp = kmap(ent_page);
+ memset(dp, 0, sizeof(*dp));
+ dp->ldp_hash_start = cpu_to_le64(offset);
+ dp->ldp_flags |= LDF_COLLIDE;
+
+ area = dp + 1;
+ left_bytes = PAGE_CACHE_SIZE - sizeof(*dp);
+ ent = area;
+ last_ent = ent;
+ do {
+ __u16 ent_size;
+
+ /* Find the minum entry from all sub-stripes */
+ rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
+ &ent_idx, &min_ent,
+ &min_ent_page);
+ if (rc != 0)
+ GOTO(out, rc);
- if (likely(lsm == NULL)) {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt))
- GOTO(out, rc = PTR_ERR(tgt));
- LASSERT(op_data->op_data != NULL);
- } else {
- tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
- if (IS_ERR(tgt))
- GOTO(out, rc = PTR_ERR(tgt));
- op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
- op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
- op_data->op_stripe_offset = i;
+ /* If it can not get minum entry, it means it already reaches
+ * the end of this directory */
+ if (min_ent == NULL) {
+ last_ent->lde_reclen = 0;
+ hash_offset = MDS_DIR_END_OFF;
+ GOTO(out, rc);
}
- rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
- &page);
- if (rc != 0)
+ ent_size = le16_to_cpu(min_ent->lde_reclen);
+
+ /* the last entry lde_reclen is 0, but it might not
+ * the end of this entry of this temporay entry */
+ if (ent_size == 0)
+ ent_size = lu_dirent_calc_size(
+ le16_to_cpu(min_ent->lde_namelen),
+ le32_to_cpu(min_ent->lde_attrs));
+ if (ent_size > left_bytes) {
+ last_ent->lde_reclen = cpu_to_le16(0);
+ hash_offset = le64_to_cpu(min_ent->lde_hash);
GOTO(out, rc);
-
- if (ents[i] != NULL &&
- le64_to_cpu(ents[i]->lde_hash) <= min_hash) {
- if (min_page != NULL)
- page_cache_release(min_page);
- min_page = page;
- min_hash = le64_to_cpu(ents[i]->lde_hash);
- min_idx = i;
}
- }
- if (min_hash != MDS_DIR_END_OFF)
- *ldp = ents[min_idx];
- else
- *ldp = NULL;
+ memcpy(ent, min_ent, ent_size);
+
+ /* Replace . with master FID and Replace .. with the parent FID
+ * of master object */
+ if (strncmp(ent->lde_name, ".",
+ le16_to_cpu(ent->lde_namelen)) == 0 &&
+ le16_to_cpu(ent->lde_namelen) == 1)
+ fid_cpu_to_le(&ent->lde_fid, &master_fid);
+ else if (strncmp(ent->lde_name, "..",
+ le16_to_cpu(ent->lde_namelen)) == 0 &&
+ le16_to_cpu(ent->lde_namelen) == 2)
+ fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
+
+ left_bytes -= ent_size;
+ ent->lde_reclen = cpu_to_le16(ent_size);
+ last_ent = ent;
+ ent = (void *)ent + ent_size;
+ hash_offset = le64_to_cpu(min_ent->lde_hash);
+ if (hash_offset == MDS_DIR_END_OFF) {
+ last_ent->lde_reclen = 0;
+ break;
+ }
+ } while (1);
out:
- if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
- OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
+ if (min_ent_page != NULL) {
+ kunmap(min_ent_page);
+ page_cache_release(min_ent_page);
+ }
- if (rc != 0 && min_page != NULL) {
- kunmap(min_page);
- page_cache_release(min_page);
+ if (unlikely(rc != 0)) {
+ __free_page(ent_page);
+ ent_page = NULL;
} else {
- *ppage = min_page;
+ if (ent == area)
+ dp->ldp_flags |= LDF_EMPTY;
+ dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_hash_end = cpu_to_le64(hash_offset);
}
+ /* We do not want to allocate md_op_data during each
+ * dir entry reading, so op_data will be shared by every stripe,
+ * then we need to restore it back to original value before
+ * return to the upper layer */
+ op_data->op_fid1 = master_fid;
+ op_data->op_fid2 = master_fid;
+ op_data->op_data = master_inode;
+
+ *ppage = ent_page;
+
RETURN(rc);
}
+int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
+ struct md_callback *cb_op, __u64 offset,
+ struct page **ppage)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
+
+ rc = lmv_check_connect(obd);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (unlikely(lsm != NULL)) {
+ rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
+ RETURN(rc);
+ }
+
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
+
+ RETURN(rc);
+}
+
+/**
+ * Unlink a file/directory
+ *
+ * Unlink a file or directory under the parent dir. The unlink request
+ * usually will be sent to the MDT where the child is located, but if
+ * the client does not have the child FID then request will be sent to the
+ * MDT where the parent is located.
+ *
+ * If the parent is a striped directory then it also needs to locate which
+ * stripe the name of the child is located, and replace the parent FID
+ * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
+ * it will walk through all of sub-stripes until the child is being
+ * unlinked finally.
+ *
+ * \param[in] exp export refer to LMV
+ * \param[in] op_data different parameters transferred beween client
+ * MD stacks, name, namelen, FIDs etc.
+ * op_fid1 is the parent FID, op_fid2 is the child
+ * FID.
+ * \param[out] request point to the request of unlink.
+ *
+ * retval 0 if succeed
+ * negative errno if failed.
+ */
static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
struct lmv_tgt_desc *parent_tgt = NULL;
struct mdt_body *body;
int rc;
+ int stripe_index = 0;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
-retry:
+retry_unlink:
+ /* For striped dir, we need to locate the parent as well */
+ if (lsm != NULL) {
+ struct lmv_tgt_desc *tmp;
+
+ LASSERT(op_data->op_name != NULL &&
+ op_data->op_namelen != 0);
+
+ tmp = lmv_locate_target_for_name(lmv, lsm,
+ op_data->op_name,
+ op_data->op_namelen,
+ &op_data->op_fid1,
+ &op_data->op_mds);
+
+ /* return -EBADFD means unknown hash type, might
+ * need try all sub-stripe here */
+ if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
+ RETURN(PTR_ERR(tmp));
+
+ /* Note: both migrating dir and unknown hash dir need to
+ * try all of sub-stripes, so we need start search the
+ * name from stripe 0, but migrating dir is already handled
+ * inside lmv_locate_target_for_name(), so we only check
+ * unknown hash type directory here */
+ if (!lmv_is_known_hash_type(lsm)) {
+ struct lmv_oinfo *oinfo;
+
+ oinfo = &lsm->lsm_md_oinfo[stripe_index];
+
+ op_data->op_fid1 = oinfo->lmo_fid;
+ op_data->op_mds = oinfo->lmo_mds;
+ }
+ }
+
+try_next_stripe:
/* Send unlink requests to the MDT where the child is located */
- if (likely(!fid_is_zero(&op_data->op_fid2))) {
+ if (likely(!fid_is_zero(&op_data->op_fid2)))
tgt = lmv_find_target(lmv, &op_data->op_fid2);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
-
- /* For striped dir, we need to locate the parent as well */
- if (op_data->op_mea1 != NULL) {
- struct lmv_tgt_desc *tmp;
-
- LASSERT(op_data->op_name != NULL &&
- op_data->op_namelen != 0);
- tmp = lmv_locate_target_for_name(lmv,
- op_data->op_mea1,
- op_data->op_name,
- op_data->op_namelen,
- &op_data->op_fid1,
- &op_data->op_mds);
- if (IS_ERR(tmp))
- RETURN(PTR_ERR(tmp));
- }
- } else {
+ else if (lsm != NULL)
+ tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
+ else
tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
- }
+
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
rc = md_unlink(tgt->ltd_exp, op_data, request);
- if (rc != 0 && rc != -EREMOTE)
+ if (rc != 0 && rc != -EREMOTE && rc != -ENOENT)
RETURN(rc);
+ /* Try next stripe if it is needed. */
+ if (rc == -ENOENT && lsm != NULL && lmv_need_try_all_stripes(lsm)) {
+ struct lmv_oinfo *oinfo;
+
+ stripe_index++;
+ if (stripe_index >= lsm->lsm_md_stripe_count)
+ RETURN(rc);
+
+ oinfo = &lsm->lsm_md_oinfo[stripe_index];
+
+ op_data->op_fid1 = oinfo->lmo_fid;
+ op_data->op_mds = oinfo->lmo_mds;
+
+ ptlrpc_req_finished(*request);
+ *request = NULL;
+
+ goto try_next_stripe;
+ }
+
body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
if (body == NULL)
RETURN(-EPROTO);
/* Not cross-ref case, just get out of here. */
- if (likely(!(body->valid & OBD_MD_MDS)))
+ if (likely(!(body->mbo_valid & OBD_MD_MDS)))
RETURN(0);
CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
- exp->exp_obd->obd_name, PFID(&body->fid1));
+ exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
/* This is a remote object, try remote MDT, Note: it may
* try more than 1 time here, Considering following case
*
* In theory, it might try unlimited time here, but it should
* be very rare case. */
- op_data->op_fid2 = body->fid1;
+ op_data->op_fid2 = body->mbo_fid1;
ptlrpc_req_finished(*request);
*request = NULL;
- goto retry;
+ goto retry_unlink;
}
static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
- lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
+ if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
+ lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
+ else
+ lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
- fid_le_to_cpu(&lsm->lsm_md_master_fid, &lmm1->lmv_master_fid);
cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
sizeof(lsm->lsm_md_pool_name));
- if (!fid_is_sane(&lsm->lsm_md_master_fid))
- RETURN(-EPROTO);
-
if (cplen >= sizeof(lsm->lsm_md_pool_name))
RETURN(-E2BIG);
lsm = *lsmp;
/* Free memmd */
if (lsm != NULL && lmm == NULL) {
-#ifdef __KERNEL__
int i;
for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
/* For migrating inode, the master stripe and master
i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
iput(lsm->lsm_md_oinfo[i].lmo_root);
}
-#endif
lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
OBD_FREE(lsm, lsm_size);
*lsmp = NULL;
RETURN(0);
}
+ if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
+ RETURN(-EPERM);
+
/* Unpack memmd */
if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
ldlm_policy_data_t *policy, ldlm_mode_t mode,
struct lustre_handle *lockh)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- ldlm_mode_t rc;
- __u32 i;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ldlm_mode_t rc;
+ int tgt;
+ int i;
+ ENTRY;
- CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
+ CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
/*
- * With CMD every object can have two locks in different namespaces:
- * lookup lock in space of mds storing direntry and update/open lock in
- * space of mds storing inode. Thus we check all targets, not only that
- * one fid was created in.
- */
- for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
- struct lmv_tgt_desc *tgt = lmv->tgts[i];
+ * With DNE every object can have two locks in different namespaces:
+ * lookup lock in space of MDT storing direntry and update/open lock in
+ * space of MDT storing inode. Try the MDT that the FID maps to first,
+ * since this can be easily found, and only try others if that fails.
+ */
+ for (i = 0, tgt = lmv_find_target_index(lmv, fid);
+ i < lmv->desc.ld_tgt_count;
+ i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
+ if (tgt < 0) {
+ CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
+ obd->obd_name, PFID(fid), tgt);
+ tgt = 0;
+ }
- if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
+ if (lmv->tgts[tgt] == NULL ||
+ lmv->tgts[tgt]->ltd_exp == NULL ||
+ lmv->tgts[tgt]->ltd_active == 0)
continue;
- rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode,
- lockh);
- if (rc)
- RETURN(rc);
- }
+ rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
+ type, policy, mode, lockh);
+ if (rc)
+ RETURN(rc);
+ }
- RETURN(0);
+ RETURN(0);
}
int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
RETURN(rc);
}
+int lmv_get_fid_from_lsm(struct obd_export *exp,
+ const struct lmv_stripe_md *lsm,
+ const char *name, int namelen, struct lu_fid *fid)
+{
+ const struct lmv_oinfo *oinfo;
+
+ LASSERT(lsm != NULL);
+ oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
+ if (IS_ERR(oinfo))
+ return PTR_ERR(oinfo);
+
+ *fid = oinfo->lmo_fid;
+
+ RETURN(0);
+}
+
/**
* For lmv, only need to send request to master MDT, and the master MDT will
* process with other slave MDTs. The only exception is Q_GETOQUOTA for which
int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm,
struct cl_attr *attr)
{
-#ifdef __KERNEL__
int i;
for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
if (attr->cat_mtime < LTIME_S(inode->i_mtime))
attr->cat_mtime = LTIME_S(inode->i_mtime);
}
-#endif
return 0;
}
.m_setattr = lmv_setattr,
.m_setxattr = lmv_setxattr,
.m_fsync = lmv_fsync,
- .m_read_entry = lmv_read_entry,
+ .m_read_page = lmv_read_page,
.m_unlink = lmv_unlink,
.m_init_ea_size = lmv_init_ea_size,
.m_cancel_unused = lmv_cancel_unused,
.m_unpack_capa = lmv_unpack_capa,
.m_get_remote_perm = lmv_get_remote_perm,
.m_intent_getattr_async = lmv_intent_getattr_async,
- .m_revalidate_lock = lmv_revalidate_lock
+ .m_revalidate_lock = lmv_revalidate_lock,
+ .m_get_fid_from_lsm = lmv_get_fid_from_lsm,
};
int __init lmv_init(void)
LUSTRE_LMV_NAME, NULL);
}
-#ifdef __KERNEL__
static void lmv_exit(void)
{
class_unregister_type(LUSTRE_LMV_NAME);
module_init(lmv_init);
module_exit(lmv_exit);
-#endif