+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ rc = md_fsync(tgt->ltd_exp, fid, oc, request);
+ RETURN(rc);
+}
+
+/*
+ * Adjust a set of pages, each page containing an array of lu_dirpages,
+ * so that each page can be used as a single logical lu_dirpage.
+ *
+ * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
+ * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
+ * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end
+ * value is used as a cookie to request the next lu_dirpage in a
+ * directory listing that spans multiple pages (two in this example):
+ * ________
+ * | |
+ * .|--------v------- -----.
+ * |s|e|f|p|ent|ent| ... |ent|
+ * '--|-------------- -----' Each CFS_PAGE contains a single
+ * '------. lu_dirpage.
+ * .---------v------- -----.
+ * |s|e|f|p|ent| 0 | ... | 0 |
+ * '----------------- -----'
+ *
+ * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
+ * larger than LU_PAGE_SIZE, a single host page may contain multiple
+ * lu_dirpages. After reading the lu_dirpages from the MDS, the
+ * ldp_hash_end of the first lu_dirpage refers to the one immediately
+ * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * in general e0==s1, e1==s2, etc.):
+ *
+ * .-------------------- -----.
+ * |s0|e0|f0|p|ent|ent| ... |ent|
+ * |---v---------------- -----|
+ * |s1|e1|f1|p|ent|ent| ... |ent|
+ * |---v---------------- -----| Here, each CFS_PAGE contains
+ * ... multiple lu_dirpages.
+ * |---v---------------- -----|
+ * |s'|e'|f'|p|ent|ent| ... |ent|
+ * '---|---------------- -----'
+ * v
+ * .----------------------------.
+ * | next CFS_PAGE |
+ *
+ * This structure is transformed into a single logical lu_dirpage as follows:
+ *
+ * - Replace e0 with e' so the request for the next lu_dirpage gets the page
+ * labeled 'next CFS_PAGE'.
+ *
+ * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
+ * a hash collision with the next page exists.
+ *
+ * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
+ * to the first entry of the next lu_dirpage.
+ */
+#if PAGE_CACHE_SIZE > LU_PAGE_SIZE
+static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
+{
+ int i;
+
+ for (i = 0; i < ncfspgs; i++) {
+ struct lu_dirpage *dp = kmap(pages[i]);
+ struct lu_dirpage *first = dp;
+ struct lu_dirent *end_dirent = NULL;
+ struct lu_dirent *ent;
+ __u64 hash_end = dp->ldp_hash_end;
+ __u32 flags = dp->ldp_flags;
+
+ while (--nlupgs > 0) {
+ ent = lu_dirent_start(dp);
+ for (end_dirent = ent; ent != NULL;
+ end_dirent = ent, ent = lu_dirent_next(ent));
+
+ /* Advance dp to next lu_dirpage. */
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+
+ /* Check if we've reached the end of the CFS_PAGE. */
+ if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+ break;
+
+ /* Save the hash and flags of this lu_dirpage. */
+ hash_end = dp->ldp_hash_end;
+ flags = dp->ldp_flags;
+
+ /* Check if lu_dirpage contains no entries. */
+ if (!end_dirent)
+ break;
+
+ /* Enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage. */
+ LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
+ end_dirent->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)end_dirent);
+ }
+
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+
+ kunmap(pages[i]);
+ }
+ LASSERTF(nlupgs == 0, "left = %d", nlupgs);
+}
+#else
+#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
+#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
+
+/**
+ * This function will read entry from a striped directory, bascially, it will
+ * read entries from all of stripes, and choose one closest to the required
+ * offset(&op_data->op_hash_offset). A few notes
+ * 1. skip . and .. for non-zero stripes, because there can only have one .
+ * and .. in a directory.
+ * 2. op_data will be shared by all of stripes, instead of allocating new
+ * one, so need to restore before reusing.
+ * 3. release the entry page if that is not being chosen.
+ *
+ * param[in]exp obd export refer to LMV
+ * param[in]op_data hold those MD parameters of read_entry.
+ * param[in]cb_op ldlm callback being used in enqueue in mdc_read_entry
+ * param[out]ldp the entry being read.
+ * param[out]ppage the page holding the entry, note: because the entry
+ * will be accessed in upper layer, so we need hold the
+ * page until the usages of entry is finished, see
+ * ll_dir_entry_next.
+ *
+ * retval =0 if get entry successfully
+ * <0 can not get entry.
+ */
+#define NORMAL_MAX_STRIPES 4
+static int lmv_read_striped_entry(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ struct lu_dirent **ldp,
+ struct page **ppage)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *tgt;
+ struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES];
+ struct lu_dirent **ents = NULL;
+ struct lu_fid master_fid = op_data->op_fid1;
+ void *master_data = op_data->op_data;
+ __u64 last_idx = op_data->op_stripe_offset;
+ __u64 hash_offset = op_data->op_hash_offset;
+ __u32 same_hash_offset = op_data->op_same_hash_offset;
+ __u32 cli_flags = op_data->op_cli_flags;
+ int stripe_count;
+ __u64 min_hash;
+ int min_same_hash_offset = 0;
+ int min_idx = 0;
+ struct page *min_page = NULL;
+ int i;
+ int rc;
+ ENTRY;
+
+ LASSERT(lsm != NULL);
+
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
+
+ /* . and .. will be stored on the master object, so we need iterate
+ * the master object as well */
+ stripe_count = lsm->lsm_md_stripe_count;
+ if (stripe_count > NORMAL_MAX_STRIPES) {
+ OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
+ if (ents == NULL)
+ GOTO(out, rc = -ENOMEM);
+ } else {
+ ents = tmp_ents;
+ memset(ents, 0, sizeof(ents[0]) * stripe_count);
+ }
+
+ min_hash = MDS_DIR_END_OFF;
+ for (i = 0; i < stripe_count; i++) {
+ struct page *page = NULL;
+
+ tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
+ if (IS_ERR(tgt))
+ GOTO(out, rc = PTR_ERR(tgt));
+
+ if (last_idx != i)
+ op_data->op_same_hash_offset = 0;
+ else
+ op_data->op_same_hash_offset = same_hash_offset;
+
+ /* op_data will be shared by each stripe, so we need
+ * reset these value for each stripe */
+ op_data->op_stripe_offset = i;
+ op_data->op_hash_offset = hash_offset;
+ op_data->op_cli_flags = cli_flags;
+ op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
+ op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
+ op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
+
+next:
+ rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
+ &page);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (ents[i] != NULL &&
+ (strncmp(ents[i]->lde_name, ".",
+ le16_to_cpu(ents[i]->lde_namelen)) == 0 ||
+ strncmp(ents[i]->lde_name, "..",
+ le16_to_cpu(ents[i]->lde_namelen)) == 0)) {
+ if (i == 0) {
+ /* replace . with master FID */
+ if (le16_to_cpu(ents[i]->lde_namelen) == 1)
+ fid_cpu_to_le(&ents[i]->lde_fid,
+ &master_fid);
+ else
+ fid_cpu_to_le(&ents[i]->lde_fid,
+ &op_data->op_fid3);
+ } else {
+ /* skip . and .. for other stripes */
+ op_data->op_cli_flags |= CLI_NEXT_ENTRY;
+ op_data->op_hash_offset =
+ le64_to_cpu(ents[i]->lde_hash);
+ kunmap(page);
+ page_cache_release(page);
+ goto next;
+ }
+ }
+
+ if (ents[i] != NULL) {
+ /* If the hash value of read_entry is equal to the
+ * current min_hash, which is very rare and only
+ * happens if two entries have the same hash value
+ * but on different stripes, in this case, we need
+ * make sure these entries are being reading forward,
+ * not backward, i.e. only reset the min_entry, if
+ * current stripe is ahead of last entry. Note: if
+ * there are hash conflict inside the entry, MDC
+ * (see mdc_read_entry) will resolve them. */
+ if (le64_to_cpu(ents[i]->lde_hash) < min_hash ||
+ (le64_to_cpu(ents[i]->lde_hash) == min_hash &&
+ i >= last_idx)) {
+ if (min_page != NULL) {
+ kunmap(min_page);
+ page_cache_release(min_page);
+ }
+ min_page = page;
+ min_hash = le64_to_cpu(ents[i]->lde_hash);
+ min_same_hash_offset =
+ op_data->op_same_hash_offset;
+ min_idx = i;
+ } else {
+ kunmap(page);
+ page_cache_release(page);
+ }
+ }
+ }
+
+ if (min_hash != MDS_DIR_END_OFF) {
+ *ldp = ents[min_idx];
+ op_data->op_stripe_offset = min_idx;
+ op_data->op_same_hash_offset = min_same_hash_offset;
+ *ppage = min_page;
+ } else {
+ *ldp = NULL;
+ *ppage = NULL;
+ }
+out:
+ /* We do not want to allocate md_op_data during each
+ * dir entry reading, so op_data will be shared by every stripe,
+ * then we need to restore it back to original value before
+ * return to the upper layer */
+ op_data->op_hash_offset = hash_offset;
+ op_data->op_fid1 = master_fid;
+ op_data->op_fid2 = master_fid;
+ op_data->op_data = master_data;
+ op_data->op_cli_flags = cli_flags;
+ if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
+ OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
+
+ if (rc != 0 && min_page != NULL) {
+ kunmap(min_page);
+ page_cache_release(min_page);
+ }
+
+ RETURN(rc);
+}
+
+int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
+ struct md_callback *cb_op, struct lu_dirent **ldp,
+ struct page **ppage)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
+
+ rc = lmv_check_connect(obd);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (unlikely(lsm != NULL)) {
+ rc = lmv_read_striped_entry(exp, op_data, cb_op,
+ ldp, ppage);
+ RETURN(rc);
+ }
+
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, ldp,
+ ppage);
+ RETURN(rc);
+}
+
+static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
+ struct ptlrpc_request **request)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct lmv_tgt_desc *parent_tgt = NULL;
+ struct mdt_body *body;
+ int rc;
+ ENTRY;
+
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
+retry:
+ /* Send unlink requests to the MDT where the child is located */
+ if (likely(!fid_is_zero(&op_data->op_fid2))) {
+ tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ /* For striped dir, we need to locate the parent as well */
+ if (op_data->op_mea1 != NULL) {
+ struct lmv_tgt_desc *tmp;
+
+ LASSERT(op_data->op_name != NULL &&
+ op_data->op_namelen != 0);
+ tmp = lmv_locate_target_for_name(lmv,
+ op_data->op_mea1,
+ op_data->op_name,
+ op_data->op_namelen,
+ &op_data->op_fid1,
+ &op_data->op_mds);
+ if (IS_ERR(tmp))
+ RETURN(PTR_ERR(tmp));
+ }
+ } else {
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+ }
+
+ op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
+ op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
+ op_data->op_cap = cfs_curproc_cap_pack();
+
+ /*
+ * If child's fid is given, cancel unused locks for it if it is from
+ * another export than parent.
+ *
+ * LOOKUP lock for child (fid3) should also be cancelled on parent
+ * tgt_tgt in mdc_unlink().
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+ /*
+ * Cancel FULL locks on child (fid3).
+ */
+ parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(parent_tgt))
+ RETURN(PTR_ERR(parent_tgt));
+
+ if (parent_tgt != tgt) {
+ rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID3);
+ }
+
+ rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+ if (rc != 0)
+ RETURN(rc);
+
+ CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
+ PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
+
+ rc = md_unlink(tgt->ltd_exp, op_data, request);
+ if (rc != 0 && rc != -EREMOTE)
+ RETURN(rc);
+
+ body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+
+ /* Not cross-ref case, just get out of here. */
+ if (likely(!(body->mbo_valid & OBD_MD_MDS)))
+ RETURN(0);
+
+ CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
+ exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
+
+ /* This is a remote object, try remote MDT, Note: it may
+ * try more than 1 time here, Considering following case
+ * /mnt/lustre is root on MDT0, remote1 is on MDT1
+ * 1. Initially A does not know where remote1 is, it send
+ * unlink RPC to MDT0, MDT0 return -EREMOTE, it will
+ * resend unlink RPC to MDT1 (retry 1st time).
+ *
+ * 2. During the unlink RPC in flight,
+ * client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
+ * and create new remote1, but on MDT0
+ *
+ * 3. MDT1 get unlink RPC(from A), then do remote lock on
+ * /mnt/lustre, then lookup get fid of remote1, and find
+ * it is remote dir again, and replay -EREMOTE again.
+ *
+ * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
+ *
+ * In theory, it might try unlimited time here, but it should
+ * be very rare case. */
+ op_data->op_fid2 = body->mbo_fid1;
+ ptlrpc_req_finished(*request);
+ *request = NULL;
+
+ goto retry;