Whamcloud - gitweb
LU-4603 lmv: a few fixes about readdir of striped dir.
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 85fd541..aca1807 100644 (file)
@@ -47,6 +47,7 @@
 #endif
 
 #include <lustre_acl.h>
+#include <lustre_ioctl.h>
 #include <obd_class.h>
 #include <lustre_lmv.h>
 #include <lustre_fid.h>
@@ -95,15 +96,15 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req)
        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
        int rc;
 
-       /* mdc_enter_request() ensures that this client has no more
+       /* obd_get_request_slot() ensures that this client has no more
         * than cl_max_rpcs_in_flight RPCs simultaneously inf light
         * against an MDT. */
-       rc = mdc_enter_request(cli);
+       rc = obd_get_request_slot(cli);
        if (rc != 0)
                return rc;
 
        rc = ptlrpc_queue_wait(req);
-       mdc_exit_request(cli);
+       obd_put_request_slot(cli);
 
        return rc;
 }
@@ -373,7 +374,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
        /* Flush local XATTR locks to get rid of a possible cancel RPC */
        if (opcode == MDS_REINT && fid_is_sane(fid) &&
            exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
-               CFS_LIST_HEAD(cancels);
+               struct list_head cancels = LIST_HEAD_INIT(cancels);
                int count;
 
                /* Without that packing would fail */
@@ -405,8 +406,8 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
                          sizeof(struct mdt_rec_reint));
                rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
                rec->sx_opcode = REINT_SETXATTR;
-               rec->sx_fsuid  = current_fsuid();
-               rec->sx_fsgid  = current_fsgid();
+               rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
+               rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
                rec->sx_cap    = cfs_curproc_cap_pack();
                rec->sx_suppgid1 = suppgid;
                 rec->sx_suppgid2 = -1;
@@ -878,7 +879,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                req_fmt = &RQF_MDS_RELEASE_CLOSE;
 
                /* allocate a FID for volatile file */
-               rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
+               rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
                if (rc < 0) {
                        CERROR("%s: "DFID" failed to allocate FID: %d\n",
                               obd->obd_name, PFID(&op_data->op_fid1), rc);
@@ -926,10 +927,10 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
         mdc_close_pack(req, op_data);
 
-        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_easize);
-        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_cookiesize);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_cookiesize);
 
         ptlrpc_request_set_replen(req);
 
@@ -1239,8 +1240,8 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
                                LASSERTF(*start <= *hash, "start = "LPX64
                                         ",end = "LPX64",hash = "LPX64"\n",
                                         *start, *end, *hash);
-                       CDEBUG(D_VFSTRACE, "page%lu [%llu %llu], hash"LPU64"\n",
-                              offset, *start, *end, *hash);
+                       CDEBUG(D_VFSTRACE, "offset %lx ["LPX64" "LPX64"],"
+                             " hash "LPX64"\n", offset, *start, *end, *hash);
                        if (*hash > *end) {
                                kunmap(page);
                                mdc_release_page(page, 0);
@@ -1502,7 +1503,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 {
        struct lookup_intent    it = { .it_op = IT_READDIR };
        struct page             *page;
-       struct inode            *dir = NULL;
+       struct inode            *dir = op_data->op_data;
        struct address_space    *mapping;
        struct lu_dirpage       *dp;
        __u64                   start = 0;
@@ -1516,18 +1517,10 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 
        *ppage = NULL;
 
-       if (op_data->op_mea1 != NULL) {
-               __u32 index = op_data->op_stripe_offset;
-
-               dir = op_data->op_mea1->lsm_md_oinfo[index].lmo_root;
-       } else {
-               dir = op_data->op_data;
-       }
        LASSERT(dir != NULL);
-
        mapping = dir->i_mapping;
 
-       rc = mdc_intent_lock(exp, op_data, NULL, 0, &it, 0, &enq_req,
+       rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
                             cb_op->md_blocking_ast, 0);
        if (enq_req != NULL)
                ptlrpc_req_finished(enq_req);
@@ -1649,10 +1642,13 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
        struct lu_dirpage       *dp;
        struct lu_dirent        *ent;
        int                     rc = 0;
+       __u32                   same_hash_count;
+       __u64                   hash_offset = op_data->op_hash_offset;
        ENTRY;
 
-       CDEBUG(D_INFO, DFID "offset = "LPU64"\n", PFID(&op_data->op_fid1),
-              op_data->op_hash_offset);
+       CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n",
+              PFID(&op_data->op_fid1), op_data->op_hash_offset,
+              op_data->op_cli_flags);
 
        *ppage = NULL;
        *entp = NULL;
@@ -1664,6 +1660,9 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
+       /* same_hash_count means how many entries with this
+        * hash value has been read */
+       same_hash_count = op_data->op_same_hash_offset + 1;
        dp = page_address(page);
        for (ent = lu_dirent_start(dp); ent != NULL;
             ent = lu_dirent_next(ent)) {
@@ -1671,16 +1670,33 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
                if (le16_to_cpu(ent->lde_namelen) == 0)
                        continue;
 
-               if (le64_to_cpu(ent->lde_hash) > op_data->op_hash_offset)
-                       break;
+               if (le64_to_cpu(ent->lde_hash) <
+                               op_data->op_hash_offset)
+                       continue;
+
+               if (unlikely(le64_to_cpu(ent->lde_hash) ==
+                               op_data->op_hash_offset)) {
+                       /* If it is not for next entry, which usually from
+                        * ll_dir_entry_start, return this entry. */
+                       if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY))
+                               break;
+
+                       /* Keep reading until all of entries being read are
+                        * skipped. */
+                       if (same_hash_count > 0) {
+                               same_hash_count--;
+                               continue;
+                       }
+               }
+               break;
        }
 
        /* If it can not find entry in current page, try next page. */
        if (ent == NULL) {
-               __u64 orig_offset = op_data->op_hash_offset;
-
                if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
-                       mdc_release_page(page, 0);
+                       op_data->op_same_hash_offset = 0;
+                       mdc_release_page(page,
+                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
                        RETURN(0);
                }
 
@@ -1695,13 +1711,19 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
                        dp = page_address(page);
                        ent = lu_dirent_start(dp);
                }
+       }
 
-               op_data->op_hash_offset = orig_offset;
+       /* If the next hash is the same as the current hash, increase
+        * the op_same_hash_offset to resolve the same hash conflict */
+       if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) {
+               if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset))
+                       op_data->op_same_hash_offset++;
+               else
+                       op_data->op_same_hash_offset = 0;
        }
 
        *ppage = page;
        *entp = ent;
-
        RETURN(rc);
 }
 
@@ -2281,8 +2303,9 @@ out:
 static int mdc_ioc_changelog_send(struct obd_device *obd,
                                   struct ioc_changelog *icc)
 {
-        struct changelog_show *cs;
-        int rc;
+       struct changelog_show *cs;
+       struct task_struct *task;
+       int rc;
 
         /* Freed in mdc_changelog_send_thread */
         OBD_ALLOC_PTR(cs);
@@ -2299,16 +2322,20 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
         * New thread because we should return to user app before
         * writing into our pipe
         */
-       rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs,
-                                "mdc_clg_send_thread"));
-       if (!IS_ERR_VALUE(rc)) {
-               CDEBUG(D_CHANGELOG, "start changelog thread\n");
-               return 0;
+       task = kthread_run(mdc_changelog_send_thread, cs,
+                          "mdc_clg_send_thread");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("%s: cannot start changelog thread: rc = %d\n",
+                      obd->obd_name, rc);
+               OBD_FREE_PTR(cs);
+       } else {
+               rc = 0;
+               CDEBUG(D_CHANGELOG, "%s: started changelog thread\n",
+                      obd->obd_name);
        }
 
-        CERROR("Failed to start changelog thread: %d\n", rc);
-        OBD_FREE_PTR(cs);
-        return rc;
+       return rc;
 }
 
 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
@@ -2401,7 +2428,7 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
 static int mdc_ioc_swap_layouts(struct obd_export *exp,
                                struct md_op_data *op_data)
 {
-       CFS_LIST_HEAD(cancels);
+       struct list_head cancels = LIST_HEAD_INIT(cancels);
        struct ptlrpc_request   *req;
        int                      rc, count;
        struct mdc_swap_layouts *msl, *payload;
@@ -2416,9 +2443,11 @@ static int mdc_ioc_swap_layouts(struct obd_export *exp,
         * with the request RPC to avoid extra RPC round trips
         */
        count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
-                                       LCK_CR, MDS_INODELOCK_LAYOUT);
+                                       LCK_EX, MDS_INODELOCK_LAYOUT |
+                                       MDS_INODELOCK_XATTR);
        count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
-                                        LCK_CR, MDS_INODELOCK_LAYOUT);
+                                        LCK_EX, MDS_INODELOCK_LAYOUT |
+                                        MDS_INODELOCK_XATTR);
 
        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                   &RQF_MDS_SWAP_LAYOUTS);
@@ -2824,22 +2853,50 @@ int mdc_set_info_async(const struct lu_env *env,
 }
 
 int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
-                 __u32 keylen, void *key, __u32 *vallen, void *val,
-                 struct lov_stripe_md *lsm)
+                __u32 keylen, void *key, __u32 *vallen, void *val,
+                struct lov_stripe_md *lsm)
 {
-        int rc = -EINVAL;
+       int rc = -EINVAL;
 
-        if (KEY_IS(KEY_MAX_EASIZE)) {
-                int mdsize, *max_easize;
+       if (KEY_IS(KEY_MAX_EASIZE)) {
+               int mdsize, *max_easize;
 
-                if (*vallen != sizeof(int))
-                        RETURN(-EINVAL);
-                mdsize = *(int*)val;
-                if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
-                        exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
-                max_easize = val;
-                *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
-                RETURN(0);
+               if (*vallen != sizeof(int))
+                       RETURN(-EINVAL);
+               mdsize = *(int *)val;
+               if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
+                       exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
+               max_easize = val;
+               *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
+               RETURN(0);
+       } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
+               int *default_easize;
+
+               if (*vallen != sizeof(int))
+                       RETURN(-EINVAL);
+               default_easize = val;
+               *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
+               RETURN(0);
+       } else if (KEY_IS(KEY_MAX_COOKIESIZE)) {
+               int mdsize, *max_cookiesize;
+
+               if (*vallen != sizeof(int))
+                       RETURN(-EINVAL);
+               mdsize = *(int *)val;
+               if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize)
+                       exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize;
+               max_cookiesize = val;
+               *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize;
+               RETURN(0);
+       } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) {
+               int *default_cookiesize;
+
+               if (*vallen != sizeof(int))
+                       RETURN(-EINVAL);
+               default_cookiesize = val;
+               *default_cookiesize =
+                       exp->exp_obd->u.cli.cl_default_mds_cookiesize;
+               RETURN(0);
         } else if (KEY_IS(KEY_CONN_DATA)) {
                 struct obd_import *imp = class_exp2cliimp(exp);
                 struct obd_connect_data *data = val;
@@ -3031,13 +3088,13 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
         RETURN(rc);
 }
 
-int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
-                  struct md_op_data *op_data)
+int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
+                 struct lu_fid *fid, struct md_op_data *op_data)
 {
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct lu_client_seq *seq = cli->cl_seq;
-        ENTRY;
-        RETURN(seq_client_alloc_fid(NULL, seq, fid));
+       struct client_obd *cli = &exp->exp_obd->u.cli;
+       struct lu_client_seq *seq = cli->cl_seq;
+       ENTRY;
+       RETURN(seq_client_alloc_fid(env, seq, fid));
 }
 
 struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
@@ -3073,7 +3130,7 @@ static int mdc_resource_inode_free(struct ldlm_resource *res)
 }
 
 struct ldlm_valblock_ops inode_lvbo = {
-       lvbo_free: mdc_resource_inode_free
+       .lvbo_free = mdc_resource_inode_free
 };
 
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
@@ -3129,26 +3186,33 @@ err_rpc_lock:
 }
 
 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
- * us to make MDS RPCs with large enough reply buffers to hold the
- * maximum-sized (= maximum striped) EA and cookie without having to
- * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
+ * us to make MDS RPCs with large enough reply buffers to hold a default
+ * sized EA and cookie without having to calculate this (via a call into the
+ * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
+ * but not used to avoid wastefully vmalloc()'ing large reply buffers when
+ * a large number of stripes is possible.  If a larger reply buffer is
+ * required it will be reallocated in the ptlrpc layer due to overflow.
+ */
 static int mdc_init_ea_size(struct obd_export *exp, int easize,
-                     int def_easize, int cookiesize)
+                           int def_easize, int cookiesize, int def_cookiesize)
 {
-        struct obd_device *obd = exp->exp_obd;
-        struct client_obd *cli = &obd->u.cli;
-        ENTRY;
+       struct obd_device *obd = exp->exp_obd;
+       struct client_obd *cli = &obd->u.cli;
+       ENTRY;
 
-        if (cli->cl_max_mds_easize < easize)
-                cli->cl_max_mds_easize = easize;
+       if (cli->cl_max_mds_easize < easize)
+               cli->cl_max_mds_easize = easize;
 
-        if (cli->cl_default_mds_easize < def_easize)
-                cli->cl_default_mds_easize = def_easize;
+       if (cli->cl_default_mds_easize < def_easize)
+               cli->cl_default_mds_easize = def_easize;
 
-        if (cli->cl_max_mds_cookiesize < cookiesize)
-                cli->cl_max_mds_cookiesize = cookiesize;
+       if (cli->cl_max_mds_cookiesize < cookiesize)
+               cli->cl_max_mds_cookiesize = cookiesize;
 
-        RETURN(0);
+       if (cli->cl_default_mds_cookiesize < def_cookiesize)
+               cli->cl_default_mds_cookiesize = def_cookiesize;
+
+       RETURN(0);
 }
 
 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
@@ -3392,11 +3456,11 @@ struct md_ops mdc_md_ops = {
 
 int __init mdc_init(void)
 {
-       return class_register_type(&mdc_obd_ops, &mdc_md_ops, NULL,
+       return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL,
 #ifndef HAVE_ONLY_PROCFS_SEQ
-                                       NULL,
+                                  NULL,
 #endif
-                                       LUSTRE_MDC_NAME, NULL);
+                                  LUSTRE_MDC_NAME, NULL);
 }
 
 #ifdef __KERNEL__