Whamcloud - gitweb
LU-3182 lmv: fix duplicate directory entries
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index ba5962e..14e7a43 100644 (file)
@@ -344,7 +344,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 #endif
         struct lmv_obd          *lmv = &obd->u.lmv;
         struct obd_uuid         *cluuid = &lmv->cluuid;
-        struct obd_connect_data *mdc_data = NULL;
         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
         struct obd_device       *mdc_obd;
         struct obd_export       *mdc_exp;
@@ -389,8 +388,6 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
 
         fld_client_add_target(&lmv->lmv_fld, &target);
 
-        mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
-
         rc = obd_register_observer(mdc_obd, obd);
         if (rc) {
                 obd_disconnect(mdc_exp);
@@ -733,6 +730,88 @@ out_local:
         RETURN(rc);
 }
 
+static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
+{
+       struct obd_device       *obddev = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obddev->u.lmv;
+       struct getinfo_fid2path *gf;
+       struct lmv_tgt_desc     *tgt;
+       struct getinfo_fid2path *remote_gf = NULL;
+       int                     remote_gf_size = 0;
+       int                     rc;
+
+       gf = (struct getinfo_fid2path *)karg;
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+repeat_fid2path:
+       rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out_fid2path, rc);
+
+       /* If remote_gf != NULL, it means just building the
+        * path on the remote MDT, copy this path segement to gf */
+       if (remote_gf != NULL) {
+               struct getinfo_fid2path *ori_gf;
+               char *ptr;
+
+               ori_gf = (struct getinfo_fid2path *)karg;
+               if (strlen(ori_gf->gf_path) +
+                   strlen(gf->gf_path) > ori_gf->gf_pathlen)
+                       GOTO(out_fid2path, rc = -EOVERFLOW);
+
+               ptr = ori_gf->gf_path;
+
+               memmove(ptr + strlen(gf->gf_path) + 1, ptr,
+                       strlen(ori_gf->gf_path));
+
+               strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
+               ptr += strlen(gf->gf_path);
+               *ptr = '/';
+       }
+
+       CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
+              tgt->ltd_exp->exp_obd->obd_name,
+              gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
+              gf->gf_linkno);
+
+       if (rc == 0)
+               GOTO(out_fid2path, rc);
+
+       /* sigh, has to go to another MDT to do path building further */
+       if (remote_gf == NULL) {
+               remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
+               OBD_ALLOC(remote_gf, remote_gf_size);
+               if (remote_gf == NULL)
+                       GOTO(out_fid2path, rc = -ENOMEM);
+               remote_gf->gf_pathlen = PATH_MAX;
+       }
+
+       if (!fid_is_sane(&gf->gf_fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      tgt->ltd_exp->exp_obd->obd_name,
+                      PFID(&gf->gf_fid), -EINVAL);
+               GOTO(out_fid2path, rc = -EINVAL);
+       }
+
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               GOTO(out_fid2path, rc = -EINVAL);
+
+       remote_gf->gf_fid = gf->gf_fid;
+       remote_gf->gf_recno = -1;
+       remote_gf->gf_linkno = -1;
+       memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
+       gf = remote_gf;
+       goto repeat_fid2path;
+
+out_fid2path:
+       if (remote_gf != NULL)
+               OBD_FREE(remote_gf, remote_gf_size);
+       RETURN(rc);
+}
+
 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg)
 {
@@ -853,14 +932,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                break;
        }
        case OBD_IOC_FID2PATH: {
-               struct getinfo_fid2path *gf;
-               struct lmv_tgt_desc     *tgt;
-
-               gf = (struct getinfo_fid2path *)karg;
-               tgt = lmv_find_target(lmv, &gf->gf_fid);
-               if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
-               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               rc = lmv_fid2path(exp, len, karg, uarg);
                break;
        }
        case LL_IOC_HSM_STATE_GET:
@@ -868,13 +940,24 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
        case LL_IOC_HSM_ACTION:
        case LL_IOC_LOV_SWAP_LAYOUTS: {
                struct md_op_data       *op_data = karg;
-               struct lmv_tgt_desc     *tgt;
+               struct lmv_tgt_desc     *tgt1, *tgt2;
+
+               tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
+               if (IS_ERR(tgt1))
+                       RETURN(PTR_ERR(tgt1));
 
-               tgt = lmv_find_target(lmv, &op_data->op_fid1);
-               if (!tgt->ltd_exp)
+               tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
+               if (IS_ERR(tgt2))
+                       RETURN(PTR_ERR(tgt2));
+
+               if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
                        RETURN(-EINVAL);
 
-               rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
+               /* only files on same MDT can have their layouts swapped */
+               if (tgt1->ltd_idx != tgt2->ltd_idx)
+                       RETURN(-EPERM);
+
+               rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
                break;
        }
        default:
@@ -1002,21 +1085,23 @@ static int lmv_placement_policy(struct obd_device *obd,
 }
 
 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
-                    mdsno_t mds)
+                   mdsno_t mds)
 {
-        struct lmv_tgt_desc *tgt;
-        int                  rc;
-        ENTRY;
+       struct lmv_tgt_desc     *tgt;
+       int                      rc;
+       ENTRY;
 
-        tgt = lmv_get_target(lmv, mds);
+       tgt = lmv_get_target(lmv, mds);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
 
-        /*
-         * New seq alloc and FLD setup should be atomic. Otherwise we may find
-         * on server that seq in new allocated fid is not yet known.
-         */
+       /*
+        * New seq alloc and FLD setup should be atomic. Otherwise we may find
+        * on server that seq in new allocated fid is not yet known.
+        */
        mutex_lock(&tgt->ltd_fid_mutex);
 
-       if (tgt == NULL || tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
+       if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
                GOTO(out, rc = -ENODEV);
 
         /*
@@ -1414,6 +1499,9 @@ struct lmv_tgt_desc
        struct lmv_tgt_desc *tgt;
 
        tgt = lmv_find_target(lmv, fid);
+       if (IS_ERR(tgt))
+               return tgt;
+
        op_data->op_mds = tgt->ltd_idx;
 
        return tgt;
@@ -1837,86 +1925,127 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
+/*
+ * Adjust a set of pages, each page containing an array of lu_dirpages,
+ * so that each page can be used as a single logical lu_dirpage.
+ *
+ * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
+ * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
+ * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
+ * value is used as a cookie to request the next lu_dirpage in a
+ * directory listing that spans multiple pages (two in this example):
+ *   ________
+ *  |        |
+ * .|--------v-------   -----.
+ * |s|e|f|p|ent|ent| ... |ent|
+ * '--|--------------   -----'   Each CFS_PAGE contains a single
+ *    '------.                   lu_dirpage.
+ * .---------v-------   -----.
+ * |s|e|f|p|ent| 0 | ... | 0 |
+ * '-----------------   -----'
+ *
+ * However, on hosts where the native VM page size (CFS_PAGE_SIZE) is
+ * larger than LU_PAGE_SIZE, a single host page may contain multiple
+ * lu_dirpages. After reading the lu_dirpages from the MDS, the
+ * ldp_hash_end of the first lu_dirpage refers to the one immediately
+ * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * in general e0==s1, e1==s2, etc.):
+ *
+ * .--------------------   -----.
+ * |s0|e0|f0|p|ent|ent| ... |ent|
+ * |---v----------------   -----|
+ * |s1|e1|f1|p|ent|ent| ... |ent|
+ * |---v----------------   -----|  Here, each CFS_PAGE contains
+ *             ...                 multiple lu_dirpages.
+ * |---v----------------   -----|
+ * |s'|e'|f'|p|ent|ent| ... |ent|
+ * '---|----------------   -----'
+ *     v
+ * .----------------------------.
+ * |        next CFS_PAGE       |
+ *
+ * This structure is transformed into a single logical lu_dirpage as follows:
+ *
+ * - Replace e0 with e' so the request for the next lu_dirpage gets the page
+ *   labeled 'next CFS_PAGE'.
+ *
+ * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
+ *   a hash collision with the next page exists.
+ *
+ * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
+ *   to the first entry of the next lu_dirpage.
+ */
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
+{
+       int i;
+
+       for (i = 0; i < ncfspgs; i++) {
+               struct lu_dirpage       *dp = cfs_kmap(pages[i]);
+               struct lu_dirpage       *first = dp;
+               struct lu_dirent        *end_dirent = NULL;
+               struct lu_dirent        *ent;
+               __u64                   hash_end = dp->ldp_hash_end;
+               __u32                   flags = dp->ldp_flags;
+
+               for (; nlupgs > 1; nlupgs--) {
+                       ent = lu_dirent_start(dp);
+                       for (end_dirent = ent; ent != NULL;
+                            end_dirent = ent, ent = lu_dirent_next(ent));
+
+                       /* Advance dp to next lu_dirpage. */
+                       dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+
+                       /* Check if we've reached the end of the CFS_PAGE. */
+                       if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+                               break;
+
+                       /* Save the hash and flags of this lu_dirpage. */
+                       hash_end = dp->ldp_hash_end;
+                       flags = dp->ldp_flags;
+
+                       /* Check if lu_dirpage contains no entries. */
+                       if (!end_dirent)
+                               break;
+
+                       /* Enlarge the end entry lde_reclen from 0 to
+                        * first entry of next lu_dirpage. */
+                       LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
+                       end_dirent->lde_reclen =
+                               cpu_to_le16((char *)(dp->ldp_entries) -
+                                           (char *)end_dirent);
+               }
+
+               first->ldp_hash_end = hash_end;
+               first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+               first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+
+               cfs_kunmap(pages[i]);
+       }
+}
+#else
+#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
+#endif /* CFS_PAGE_SIZE > LU_PAGE_SIZE */
+
 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
-                        struct page **pages, struct ptlrpc_request **request)
+                       struct page **pages, struct ptlrpc_request **request)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        __u64                    offset = op_data->op_offset;
-        int                      rc;
-        int                      i;
-        /* number of pages read, in CFS_PAGE_SIZE */
-        int                      nrdpgs;
-        /* number of pages transferred in LU_PAGE_SIZE */
-        int                      nlupgs;
-        struct lmv_tgt_desc     *tgt;
-        struct lu_dirpage       *dp;
-        struct lu_dirent        *ent;
-        ENTRY;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       __u64                   offset = op_data->op_offset;
+       int                     rc;
+       int                     ncfspgs; /* pages read in CFS_PAGE_SIZE */
+       int                     nlupgs; /* pages read in LU_PAGE_SIZE */
+       struct lmv_tgt_desc     *tgt;
+       ENTRY;
 
-        rc = lmv_check_connect(obd);
-        if (rc)
-                RETURN(rc);
+       rc = lmv_check_connect(obd);
+       if (rc)
+               RETURN(rc);
 
        CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
               offset, PFID(&op_data->op_fid1));
 
-       /*
-        * This case handle directory lookup in clustered metadata case (i.e.
-        * split directory is located on multiple md servers.)
-        * each server keeps directory entries for certain range of hashes.
-        * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
-        * first server will keep records with hashes [ 0 ... MAX_HASH /N  - 1],
-        * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
-        * so on....
-        *      readdir can simply start reading entries from 0 - N server in
-        * order but that will not scale well as all client will request dir in
-        * to server in same order.
-        * Following algorithm does optimization:
-        * Instead of doing readdir in 1, 2, ...., N order, client with a
-        * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
-        * (every client has rank R)
-        *      But ll_readdir() expect offset range [0 to MAX_HASH/N) but
-        * since client ask dir from MDS{R} client has pages with offsets
-        * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
-        * on hash  values that we get.
-        * Since these codes might be still useful for sharded directory, so
-        * Keeping this code for further reference
-       if (0) {
-               LASSERT(nr > 0);
-               seg_size = MAX_HASH_SIZE;
-               do_div(seg_size, nr);
-               los      = obj->lo_stripes;
-               tgt      = lmv_get_target(lmv, los[0].ls_mds);
-               rank     = lmv_node_rank(tgt->ltd_exp, fid) % nr;
-               tgt_tmp  = offset;
-               do_div(tgt_tmp, seg_size);
-               tgt0_idx = do_div(tgt_tmp,  nr);
-               tgt_idx  = (tgt0_idx + rank) % nr;
-
-               if (tgt_idx < tgt0_idx)
-                        * Wrap around.
-                        *
-                        * Last segment has unusual length due to division
-                        * rounding.
-                       hash_adj = MAX_HASH_SIZE - seg_size * nr;
-               else
-                       hash_adj = 0;
-
-               hash_adj += rank * seg_size;
-
-               CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
-                      LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
-                      offset, tgt0_idx, offset + hash_adj, tgt_idx);
-
-               offset = (offset + hash_adj) & MAX_HASH_SIZE;
-               rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
-               tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
-
-               CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
-                      PFID(&rid), (unsigned long)offset, tgt_idx);
-       }
-       */
        tgt = lmv_find_target(lmv, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
@@ -1925,60 +2054,17 @@ static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
-       nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+       ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
                 >> CFS_PAGE_SHIFT;
        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
-       LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
+       LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
 
-       CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+       CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
               op_data->op_npages);
 
-       for (i = 0; i < nrdpgs; i++) {
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               struct lu_dirpage *first;
-               __u64 hash_end = 0;
-               __u32 flags = 0;
-#endif
-               struct lu_dirent *tmp = NULL;
-
-               dp = cfs_kmap(pages[i]);
-               ent = lu_dirent_start(dp);
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               first = dp;
-               hash_end = dp->ldp_hash_end;
-repeat:
-#endif
-               nlupgs--;
-
-               for (tmp = ent; ent != NULL;
-                    tmp = ent, ent = lu_dirent_next(ent));
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-               dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
-               if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
-                       ent = lu_dirent_start(dp);
+       lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
 
-                       if (tmp) {
-                               /* enlarge the end entry lde_reclen from 0 to
-                                * first entry of next lu_dirpage, in this way
-                                * several lu_dirpages can be stored into one
-                                * client page on client. */
-                               tmp = ((void *)tmp) +
-                                     le16_to_cpu(tmp->lde_reclen);
-                               tmp->lde_reclen =
-                                       cpu_to_le16((char *)(dp->ldp_entries) -
-                                                   (char *)tmp);
-                               goto repeat;
-                       }
-               }
-               first->ldp_hash_end = hash_end;
-               first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
-               first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
-#else
-               SET_BUT_UNUSED(tmp);
-#endif
-               cfs_kunmap(pages[i]);
-       }
        RETURN(rc);
 }
 
@@ -2232,10 +2318,10 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
         meap->mea_count = cpu_to_le32(lsmp->mea_count);
         meap->mea_master = cpu_to_le32(lsmp->mea_master);
 
-        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                meap->mea_ids[i] = meap->mea_ids[i];
-                fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
-        }
+       for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+               meap->mea_ids[i] = lsmp->mea_ids[i];
+               fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
+       }
 
         RETURN(mea_size);
 }