Whamcloud - gitweb
LU-5 readdir read multiple pages per rpc
authorLai Siyao <laisiyao@whamcloud.com>
Thu, 26 May 2011 13:44:47 +0000 (06:44 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 15 Jul 2011 00:38:07 +0000 (17:38 -0700)
add support for readdir to read multiple pages per rpc:
* because client has no idea how many directory pages it can read, it
  tries to read maximum pages each time, but will only store pages
  read from mds into page cache.
* add a flag LDF_COLLIDE to mark a dir page hash collides with the
  next page, and client will remove this page from page cache after
  processing.
* upon readpage bulk io failure, client won't be evicted, and client
  will resend the bulk request.
* support large page size on client: MDS_READPAGE RPC will fill reply
  with page size LU_PAGE_SIZE(4k), and if client page is bigger than
  LU_PAGE_SIZE, several pages will be integrated into one dir page with
  CFS_PAGE_SIZE.

Signed-off-by: Lai Siyao <laisiyao@whamcloud.com>
Change-Id: Id6bc36fbcec79993d49bbe9a535851e5e3ebd876
Reviewed-on: http://review.whamcloud.com/604
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
27 files changed:
lustre/autoconf/lustre-core.m4
lustre/cmm/mdc_device.c
lustre/include/linux/lustre_compat25.h
lustre/include/lprocfs_status.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_lib.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/dir.c
lustre/llite/dir.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/obdclass/lprocfs_status.c
lustre/osc/lproc_osc.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/import.c
lustre/tests/sanity.sh
lustre/utils/wirecheck.c

index 822af2a..3147e78 100644 (file)
@@ -1972,6 +1972,16 @@ LB_LINUX_TRY_COMPILE([
 ])
 ])
 
 ])
 ])
 
+# 2.6.27 exported add_to_page_cache_lru.
+AC_DEFUN([LC_EXPORT_ADD_TO_PAGE_CACHE_LRU],
+[LB_CHECK_SYMBOL_EXPORT([add_to_page_cache_lru],
+[mm/filemap.c],[
+        AC_DEFINE(HAVE_ADD_TO_PAGE_CACHE_LRU, 1,
+                [add_to_page_cache_lru functions are present])
+],[
+])
+])
+
 # 2.6.31
 
 # 2.6.31 replaces blk_queue_hardsect_size by blk_queue_logical_block_size function
 # 2.6.31
 
 # 2.6.31 replaces blk_queue_hardsect_size by blk_queue_logical_block_size function
@@ -2317,6 +2327,7 @@ AC_DEFUN([LC_PROG_LINUX],
          LC_VFS_SYMLINK_5ARGS
          LC_SB_ANY_QUOTA_ACTIVE
          LC_SB_HAS_QUOTA_ACTIVE
          LC_VFS_SYMLINK_5ARGS
          LC_SB_ANY_QUOTA_ACTIVE
          LC_SB_HAS_QUOTA_ACTIVE
+         LC_EXPORT_ADD_TO_PAGE_CACHE_LRU
 
          # 2.6.31
          LC_BLK_QUEUE_LOG_BLK_SIZE
 
          # 2.6.31
          LC_BLK_QUEUE_LOG_BLK_SIZE
@@ -2452,6 +2463,26 @@ LB_LINUX_TRY_COMPILE([
 ])
 ])
 
 ])
 ])
 
+# 2.6.29 split file and anonymous page queues
+AC_DEFUN([LC_PAGEVEC_LRU_ADD_FILE],
+[AC_MSG_CHECKING([if kernel has .pagevec_lru_add_file])
+LB_LINUX_TRY_COMPILE([
+        #include <linux/mm.h>
+        #include <linux/pagevec.h>
+],[
+        struct pagevec lru_pagevec;
+
+        pagevec_init(&lru_pagevec, 0);
+        pagevec_lru_add_file(&lru_pagevec);
+],[
+        AC_MSG_RESULT([yes])
+        AC_DEFINE(HAVE_PAGEVEC_LRU_ADD_FILE, 1,
+                [kernel has .pagevec_lru_add_file])
+],[
+        AC_MSG_RESULT([no])
+])
+])
+
 #
 # --enable-mpitest
 #
 #
 # --enable-mpitest
 #
@@ -2692,6 +2723,7 @@ fi
          LC_D_OBTAIN_ALIAS
          LC_BLKDEV_PUT_2ARGS
          LC_DENTRY_OPEN_4ARGS
          LC_D_OBTAIN_ALIAS
          LC_BLKDEV_PUT_2ARGS
          LC_DENTRY_OPEN_4ARGS
+         LC_PAGEVEC_LRU_ADD_FILE
 
 ])
 
 
 ])
 
index 27c5d63..05c6ef1 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -156,11 +159,13 @@ static int mdc_obd_add(const struct lu_env *env,
                                          OBD_CONNECT_MDS_CAPA |
                                          OBD_CONNECT_OSS_CAPA |
                                          OBD_CONNECT_IBITS |
                                          OBD_CONNECT_MDS_CAPA |
                                          OBD_CONNECT_OSS_CAPA |
                                          OBD_CONNECT_IBITS |
+                                         OBD_CONNECT_BRW_SIZE |
                                          OBD_CONNECT_MDS_MDS |
                                          OBD_CONNECT_FID |
                                          OBD_CONNECT_AT |
                                          OBD_CONNECT_FULL20 |
                                          OBD_CONNECT_64BITHASH;
                                          OBD_CONNECT_MDS_MDS |
                                          OBD_CONNECT_FID |
                                          OBD_CONNECT_AT |
                                          OBD_CONNECT_FULL20 |
                                          OBD_CONNECT_64BITHASH;
+                ocd->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
                 rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL);
                 OBD_FREE_PTR(ocd);
                 if (rc) {
                 rc = obd_connect(env, &desc->cl_exp, mdc, &mdc->obd_uuid, ocd, NULL);
                 OBD_FREE_PTR(ocd);
                 if (rc) {
index a1291c9..0a8ce3c 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -837,5 +840,23 @@ static inline int ll_quota_off(struct super_block *sb, int off, int remount)
 #define bio_hw_segments(q, bio) 0
 #endif
 
 #define bio_hw_segments(q, bio) 0
 #endif
 
+#ifndef HAVE_PAGEVEC_LRU_ADD_FILE
+#define pagevec_lru_add_file pagevec_lru_add
+#endif
+
+#ifdef HAVE_ADD_TO_PAGE_CACHE_LRU
+#define ll_add_to_page_cache_lru(pg, mapping, off, gfp) \
+        add_to_page_cache_lru(pg, mapping, off, gfp)
+#define ll_pagevec_init(pv, cold)       do {} while (0)
+#define ll_pagevec_add(pv, pg)          (0)
+#define ll_pagevec_lru_add_file(pv)     do {} while (0)
+#else
+#define ll_add_to_page_cache_lru(pg, mapping, off, gfp) \
+        add_to_page_cache(pg, mapping, off, gfp)
+#define ll_pagevec_init(pv, cold)       pagevec_init(&lru_pvec, cold);
+#define ll_pagevec_add(pv, pg)          pagevec_add(pv, pg)
+#define ll_pagevec_lru_add_file(pv)     pagevec_lru_add_file(pv)
+#endif
+
 #endif /* __KERNEL__ */
 #endif /* _COMPAT25_H */
 #endif /* __KERNEL__ */
 #endif /* _COMPAT25_H */
index ccfb446..95ec094 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -641,6 +644,10 @@ int lprocfs_obd_wr_recovery_time_hard(struct file *file,
                                       unsigned long count, void *data);
 int lprocfs_obd_rd_mntdev(char *page, char **start, off_t off,
                           int count, int *eof, void *data);
                                       unsigned long count, void *data);
 int lprocfs_obd_rd_mntdev(char *page, char **start, off_t off,
                           int count, int *eof, void *data);
+int lprocfs_obd_rd_max_pages_per_rpc(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data);
+int lprocfs_obd_wr_max_pages_per_rpc(struct file *file, const char *buffer,
+                                     unsigned long count, void *data);
 /* all quota proc functions */
 extern int lprocfs_quota_rd_bunit(char *page, char **start,
                                   off_t off, int count,
 /* all quota proc functions */
 extern int lprocfs_quota_rd_bunit(char *page, char **start,
                                   off_t off, int count,
index 709204c..2e2f43b 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -858,7 +861,14 @@ struct lu_dirpage {
 };
 
 enum lu_dirpage_flags {
 };
 
 enum lu_dirpage_flags {
-        LDF_EMPTY = 1 << 0
+        /**
+         * dirpage contains no entry.
+         */
+        LDF_EMPTY   = 1 << 0,
+        /**
+         * last entry's lde_hash equals ldp_hash_end.
+         */
+        LDF_COLLIDE = 1 << 1
 };
 
 static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
 };
 
 static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
@@ -906,6 +916,21 @@ static inline int lu_dirent_size(struct lu_dirent *ent)
 
 #define MDS_DIR_END_OFF 0xfffffffffffffffeULL
 
 
 #define MDS_DIR_END_OFF 0xfffffffffffffffeULL
 
+/**
+ * MDS_READPAGE page size
+ *
+ * This is the directory page size packed in MDS_READPAGE RPC.
+ * It's different than CFS_PAGE_SIZE because the client needs to
+ * access the struct lu_dirpage header packed at the beginning of
+ * the "page" and without this there isn't any way to know find the
+ * lu_dirpage header is if client and server CFS_PAGE_SIZE differ.
+ */
+#define LU_PAGE_SHIFT 12
+#define LU_PAGE_SIZE  (1UL << LU_PAGE_SHIFT)
+#define LU_PAGE_MASK  (~(LU_PAGE_SIZE - 1))
+
+#define LU_PAGE_COUNT 1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT)
+
 /** @} lu_dir */
 
 struct lustre_handle {
 /** @} lu_dir */
 
 struct lustre_handle {
@@ -1092,11 +1117,12 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
                                 OBD_CONNECT_RMT_CLIENT | \
                                 OBD_CONNECT_RMT_CLIENT_FORCE | \
                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
                                 OBD_CONNECT_RMT_CLIENT | \
                                 OBD_CONNECT_RMT_CLIENT_FORCE | \
-                                OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
-                                OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | \
-                                LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR | \
-                                OBD_CONNECT_LOV_V3 | OBD_CONNECT_SOM | \
-                                OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH)
+                                OBD_CONNECT_BRW_SIZE | OBD_CONNECT_MDS_CAPA | \
+                                OBD_CONNECT_OSS_CAPA | OBD_CONNECT_MDS_MDS | \
+                                OBD_CONNECT_FID | LRU_RESIZE_CONNECT_FLAG | \
+                                OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 | \
+                                OBD_CONNECT_SOM | OBD_CONNECT_FULL20 | \
+                                OBD_CONNECT_64BITHASH)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
index cdef911..6dd1a67 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -92,12 +95,16 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req);
 
 #define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
 
 
 #define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
 
+struct l_wait_info;
+
 void target_cancel_recovery_timer(struct obd_device *obd);
 void target_stop_recovery_thread(struct obd_device *obd);
 void target_cleanup_recovery(struct obd_device *obd);
 int target_queue_recovery_request(struct ptlrpc_request *req,
                                   struct obd_device *obd);
 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id);
 void target_cancel_recovery_timer(struct obd_device *obd);
 void target_stop_recovery_thread(struct obd_device *obd);
 void target_cleanup_recovery(struct obd_device *obd);
 int target_queue_recovery_request(struct ptlrpc_request *req,
                                   struct obd_device *obd);
 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id);
+int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
+                   struct l_wait_info *lwi);
 
 /* client.c */
 
 
 /* client.c */
 
index fbdd34f..d4c5b66 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -242,6 +245,7 @@ struct md_object_operations {
         int (*moo_xattr_del)(const struct lu_env *env, struct md_object *obj,
                              const char *name);
 
         int (*moo_xattr_del)(const struct lu_env *env, struct md_object *obj,
                              const char *name);
 
+        /** \retval number of bytes actually read upon success */
         int (*moo_readpage)(const struct lu_env *env, struct md_object *obj,
                             const struct lu_rdpg *rdpg);
 
         int (*moo_readpage)(const struct lu_env *env, struct md_object *obj,
                             const struct lu_rdpg *rdpg);
 
index c43e0c8..91caaa6 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -1503,8 +1506,8 @@ struct md_ops {
         int (*m_sync)(struct obd_export *, const struct lu_fid *,
                       struct obd_capa *, struct ptlrpc_request **);
         int (*m_readpage)(struct obd_export *, const struct lu_fid *,
         int (*m_sync)(struct obd_export *, const struct lu_fid *,
                       struct obd_capa *, struct ptlrpc_request **);
         int (*m_readpage)(struct obd_export *, const struct lu_fid *,
-                          struct obd_capa *, __u64, struct page *,
-                          struct ptlrpc_request **);
+                          struct obd_capa *, __u64, struct page **,
+                          unsigned, struct ptlrpc_request **);
 
         int (*m_unlink)(struct obd_export *, struct md_op_data *,
                         struct ptlrpc_request **);
 
         int (*m_unlink)(struct obd_export *, struct md_op_data *,
                         struct ptlrpc_request **);
@@ -1661,4 +1664,11 @@ static inline struct md_open_data *obd_mod_alloc(void)
 extern void obdo_from_inode(struct obdo *dst, struct inode *src,
                             struct lu_fid *parent, obd_flag valid);
 
 extern void obdo_from_inode(struct obdo *dst, struct inode *src,
                             struct lu_fid *parent, obd_flag valid);
 
+/* return 1 if client should be resend request */
+static inline int client_should_resend(int resend, struct client_obd *cli)
+{
+        return cfs_atomic_read(&cli->cl_resends) ?
+               cfs_atomic_read(&cli->cl_resends) > resend : 1;
+}
+
 #endif /* __OBD_H */
 #endif /* __OBD_H */
index 65e502f..e24c644 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -1999,14 +2002,15 @@ static inline int md_sync(struct obd_export *exp, const struct lu_fid *fid,
 
 static inline int md_readpage(struct obd_export *exp, const struct lu_fid *fid,
                               struct obd_capa *oc, __u64 offset,
 
 static inline int md_readpage(struct obd_export *exp, const struct lu_fid *fid,
                               struct obd_capa *oc, __u64 offset,
-                              struct page *page,
+                              struct page **pages, unsigned npages,
                               struct ptlrpc_request **request)
 {
         int rc;
         ENTRY;
         EXP_CHECK_MD_OP(exp, readpage);
         EXP_MD_COUNTER_INCREMENT(exp, readpage);
                               struct ptlrpc_request **request)
 {
         int rc;
         ENTRY;
         EXP_CHECK_MD_OP(exp, readpage);
         EXP_MD_COUNTER_INCREMENT(exp, readpage);
-        rc = MDP(exp->exp_obd, readpage)(exp, fid, oc, offset, page, request);
+        rc = MDP(exp->exp_obd, readpage)(exp, fid, oc, offset, pages, npages,
+                                         request);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index e2b0410..d0fa1cf 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -2459,3 +2462,107 @@ void ldlm_dump_export_locks(struct obd_export *exp)
         cfs_spin_unlock(&exp->exp_locks_list_guard);
 }
 #endif
         cfs_spin_unlock(&exp->exp_locks_list_guard);
 }
 #endif
+
+static int target_bulk_timeout(void *data)
+{
+        ENTRY;
+        /* We don't fail the connection here, because having the export
+         * killed makes the (vital) call to commitrw very sad.
+         */
+        RETURN(1);
+}
+
+static inline char *bulk2type(struct ptlrpc_bulk_desc *desc)
+{
+        return desc->bd_type == BULK_GET_SINK ? "GET" : "PUT";
+}
+
+int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
+                   struct l_wait_info *lwi)
+{
+        struct ptlrpc_request *req = desc->bd_req;
+        int rc = 0;
+        ENTRY;
+
+        /* Check if there is eviction in progress, and if so, wait for
+         * it to finish */
+        if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+                *lwi = LWI_INTR(NULL, NULL);
+                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+                                  !cfs_atomic_read(&exp->exp_obd->
+                                                   obd_evict_inprogress),
+                                  lwi);
+        }
+
+        /* Check if client was evicted or tried to reconnect already */
+        if (exp->exp_failed || exp->exp_abort_active_req) {
+                rc = -ENOTCONN;
+        } else {
+                if (desc->bd_type == BULK_PUT_SINK)
+                        rc = sptlrpc_svc_wrap_bulk(req, desc);
+                if (rc == 0)
+                        rc = ptlrpc_start_bulk_transfer(desc);
+        }
+
+        if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+                ptlrpc_abort_bulk(desc);
+        } else if (rc == 0) {
+                time_t start = cfs_time_current_sec();
+                do {
+                        long timeoutl = req->rq_deadline - cfs_time_current_sec();
+                        cfs_duration_t timeout = timeoutl <= 0 ?
+                                CFS_TICK : cfs_time_seconds(timeoutl);
+                        *lwi = LWI_TIMEOUT_INTERVAL(timeout,
+                                                    cfs_time_seconds(1),
+                                                   target_bulk_timeout,
+                                                   desc);
+                        rc = l_wait_event(desc->bd_waitq,
+                                          !ptlrpc_server_bulk_active(desc) ||
+                                          exp->exp_failed ||
+                                          exp->exp_abort_active_req,
+                                          lwi);
+                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        /* Wait again if we changed deadline */
+                } while ((rc == -ETIMEDOUT) &&
+                         (req->rq_deadline > cfs_time_current_sec()));
+
+                if (rc == -ETIMEDOUT) {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "timeout on bulk %s after %ld%+lds",
+                                  bulk2type(desc),
+                                  req->rq_deadline - start,
+                                  cfs_time_current_sec() -
+                                  req->rq_deadline);
+                        ptlrpc_abort_bulk(desc);
+                } else if (exp->exp_failed) {
+                        DEBUG_REQ(D_ERROR, req, "Eviction on bulk %s",
+                                  bulk2type(desc));
+                        rc = -ENOTCONN;
+                        ptlrpc_abort_bulk(desc);
+                } else if (exp->exp_abort_active_req) {
+                        DEBUG_REQ(D_ERROR, req, "Reconnect on bulk %s",
+                                  bulk2type(desc));
+                        /* we don't reply anyway */
+                        rc = -ETIMEDOUT;
+                        ptlrpc_abort_bulk(desc);
+                } else if (!desc->bd_success ||
+                           desc->bd_nob_transferred != desc->bd_nob) {
+                        DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
+                                  desc->bd_success ?
+                                  "truncated" : "network error on",
+                                  bulk2type(desc),
+                                  desc->bd_nob_transferred,
+                                  desc->bd_nob);
+                        /* XXX should this be a different errno? */
+                        rc = -ETIMEDOUT;
+                } else if (desc->bd_type == BULK_GET_SINK) {
+                        rc = sptlrpc_svc_unwrap_bulk(req, desc);
+                }
+        } else {
+                DEBUG_REQ(D_ERROR, req, "bulk %s failed: rc %d",
+                          bulk2type(desc), rc);
+        }
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(target_bulk_io);
index 3faa795..92414ab 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -102,7 +105,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
 
         offset = (__u64)hash_x_index(page->index, 0);
         rc = md_readpage(sbi->ll_md_exp, &lli->lli_fid, NULL,
 
         offset = (__u64)hash_x_index(page->index, 0);
         rc = md_readpage(sbi->ll_md_exp, &lli->lli_fid, NULL,
-                         offset, page, &request);
+                         offset, &page, 1, &request);
         if (!rc) {
                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);         /* checked by md_readpage() */
         if (!rc) {
                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);         /* checked by md_readpage() */
index ff0e92d..7466dd8 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -45,6 +48,7 @@
 #include <linux/smp_lock.h>
 #include <asm/uaccess.h>
 #include <linux/buffer_head.h>   // for wait_on_buffer
 #include <linux/smp_lock.h>
 #include <asm/uaccess.h>
 #include <linux/buffer_head.h>   // for wait_on_buffer
+#include <linux/pagevec.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
  *
  * page format
  *
  *
  * page format
  *
- *
- *
- *
+ * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains
+ * a header lu_dirpage which describes the start/end hash, and whether this
+ * page is empty (contains no dir entry) or hash collide with next page.
+ * After client receives reply, several pages will be integrated into dir page
+ * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * lu_dirpage for this integrated page will be adjusted.
  *
  */
 
 /* returns the page unlocked, but with a reference */
  *
  */
 
 /* returns the page unlocked, but with a reference */
-static int ll_dir_readpage(struct file *file, struct page *page)
+static int ll_dir_readpage(struct file *file, struct page *page0)
 {
 {
-        struct inode *inode = page->mapping->host;
+        struct inode *inode = page0->mapping->host;
+        int hash64 = ll_i2sbi(inode)->ll_flags & LL_SBI_64BIT_HASH;
+        struct obd_export *exp = ll_i2sbi(inode)->ll_md_exp;
         struct ptlrpc_request *request;
         struct mdt_body *body;
         struct obd_capa *oc;
         __u64 hash;
         struct ptlrpc_request *request;
         struct mdt_body *body;
         struct obd_capa *oc;
         __u64 hash;
+        struct page **page_pool;
+        struct page *page;
+#ifndef HAVE_ADD_TO_PAGE_CACHE_LRU
+        struct pagevec lru_pvec;
+#endif
+        struct lu_dirpage *dp;
+        int max_pages = ll_i2sbi(inode)->ll_md_brw_size >> CFS_PAGE_SHIFT;
+        int nrdpgs = 0; /* number of pages read actually */
+        int npages;
+        int i;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
@@ -170,25 +189,84 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 hash = lli->lli_sa_pos;
                 cfs_spin_unlock(&lli->lli_sa_lock);
         }
                 hash = lli->lli_sa_pos;
                 cfs_spin_unlock(&lli->lli_sa_lock);
         }
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
-               inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) hash "LPU64"\n",
+               inode->i_ino, inode->i_generation, inode, hash);
+
+        LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
+
+        OBD_ALLOC(page_pool, sizeof(page) * max_pages);
+        if (page_pool != NULL) {
+                page_pool[0] = page0;
+        } else {
+                page_pool = &page0;
+                max_pages = 1;
+        }
+        for (npages = 1; npages < max_pages; npages++) {
+                page = page_cache_alloc_cold(inode->i_mapping);
+                if (!page)
+                        break;
+                page_pool[npages] = page;
+        }
 
         oc = ll_mdscapa_get(inode);
 
         oc = ll_mdscapa_get(inode);
-        rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode),
-                         oc, hash, page, &request);
+        rc = md_readpage(exp, ll_inode2fid(inode), oc, hash, page_pool, npages,
+                         &request);
         capa_put(oc);
         capa_put(oc);
-        if (!rc) {
+        if (rc == 0) {
                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 /* Checked by mdc_readpage() */
                 LASSERT(body != NULL);
 
                 if (body->valid & OBD_MD_FLSIZE)
                         cl_isize_write(inode, body->size);
                 body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 /* Checked by mdc_readpage() */
                 LASSERT(body != NULL);
 
                 if (body->valid & OBD_MD_FLSIZE)
                         cl_isize_write(inode, body->size);
-                SetPageUptodate(page);
+
+                nrdpgs = (request->rq_bulk->bd_nob_transferred+CFS_PAGE_SIZE-1)
+                         >> CFS_PAGE_SHIFT;
+                SetPageUptodate(page0);
         }
         }
+        unlock_page(page0);
         ptlrpc_req_finished(request);
 
         ptlrpc_req_finished(request);
 
-        unlock_page(page);
+        CDEBUG(D_VFSTRACE, "read %d/%d pages\n", nrdpgs, npages);
+
+        ll_pagevec_init(&lru_pvec, 0);
+        for (i = 1; i < npages; i++) {
+                unsigned long offset;
+                int ret;
+
+                page = page_pool[i];
+
+                if (rc < 0 || i >= nrdpgs) {
+                        page_cache_release(page);
+                        continue;
+                }
+
+                SetPageUptodate(page);
+
+                dp = cfs_kmap(page);
+                hash = le64_to_cpu(dp->ldp_hash_start);
+                cfs_kunmap(page);
+
+                offset = hash_x_index(hash, hash64);
+
+                prefetchw(&page->flags);
+                ret = ll_add_to_page_cache_lru(page, inode->i_mapping, offset,
+                                               GFP_KERNEL);
+                if (ret == 0) {
+                        unlock_page(page);
+                        page_cache_get(page);
+                        if (ll_pagevec_add(&lru_pvec, page) == 0)
+                                ll_pagevec_lru_add_file(&lru_pvec);
+                } else {
+                        CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
+                               " %d\n", offset, ret);
+                }
+                page_cache_release(page);
+        }
+        ll_pagevec_lru_add_file(&lru_pvec);
+
+        if (page_pool != &page0)
+                OBD_FREE(page_pool, sizeof(struct page *) * max_pages);
         EXIT;
         return rc;
 }
         EXIT;
         return rc;
 }
@@ -261,7 +339,7 @@ static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash,
                  */
                 wait_on_page(page);
                 if (PageUptodate(page)) {
                  */
                 wait_on_page(page);
                 if (PageUptodate(page)) {
-                        dp = kmap(page);
+                        dp = cfs_kmap(page);
                         if (BITS_PER_LONG == 32 && hash64) {
                                 *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
                                 *end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
                         if (BITS_PER_LONG == 32 && hash64) {
                                 *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
                                 *end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
@@ -272,8 +350,21 @@ static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash,
                         }
                         LASSERTF(*start <= *hash, "start = "LPX64",end = "
                                  LPX64",hash = "LPX64"\n", *start, *end, *hash);
                         }
                         LASSERTF(*start <= *hash, "start = "LPX64",end = "
                                  LPX64",hash = "LPX64"\n", *start, *end, *hash);
+                        CDEBUG(D_VFSTRACE, "page %lu [%llu %llu], hash "LPU64"\n",
+                               offset, *start, *end, *hash);
                         if (*hash > *end || (*end != *start && *hash == *end)) {
                         if (*hash > *end || (*end != *start && *hash == *end)) {
-                                ll_release_page(page, *hash, *start, *end);
+                                /*
+                                 * upon hash collision, remove this page,
+                                 * otherwise put page reference, and
+                                 * ll_get_dir_page() will issue RPC to fetch
+                                 * the page we want.
+                                 */
+                                if (dp->ldp_flags & cpu_to_le32(LDF_COLLIDE)) {
+                                        ll_release_page(page, *hash, *start, *end);
+                                } else {
+                                        cfs_kunmap(page);
+                                        page_cache_release(page);
+                                }
                                 page = NULL;
                         }
                 } else {
                                 page = NULL;
                         }
                 } else {
index a9fdab1..f14da0a 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -393,6 +396,7 @@ struct ll_sb_info {
         /* =0 - hold lock over whole read/write
          * >0 - max. chunk to be read/written w/o lock re-acquiring */
         unsigned long             ll_max_rw_chunk;
         /* =0 - hold lock over whole read/write
          * >0 - max. chunk to be read/written w/o lock re-acquiring */
         unsigned long             ll_max_rw_chunk;
+        unsigned int              ll_md_brw_size; /* used by readdir */
 
         struct lu_site           *ll_site;
         struct cl_device         *ll_cl;
 
         struct lu_site           *ll_site;
         struct cl_device         *ll_cl;
index d9635a1..8489451 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -199,12 +202,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         /* indicate the features supported by this client */
         data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
                                   OBD_CONNECT_JOIN     | OBD_CONNECT_ATTRFID  |
         /* indicate the features supported by this client */
         data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
                                   OBD_CONNECT_JOIN     | OBD_CONNECT_ATTRFID  |
-                                  OBD_CONNECT_VERSION  | OBD_CONNECT_MDS_CAPA |
-                                  OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
-                                  OBD_CONNECT_FID      | OBD_CONNECT_AT |
-                                  OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT |
-                                  OBD_CONNECT_VBR      | OBD_CONNECT_FULL20 |
-                                  OBD_CONNECT_64BITHASH;
+                                  OBD_CONNECT_VERSION  | OBD_CONNECT_BRW_SIZE |
+                                  OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
+                                  OBD_CONNECT_CANCELSET| OBD_CONNECT_FID      |
+                                  OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
+                                  OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR    |
+                                  OBD_CONNECT_FULL20   | OBD_CONNECT_64BITHASH;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
@@ -244,6 +247,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
                 data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT_FORCE;
 
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
                 data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT_FORCE;
 
+        data->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
+
         err = obd_connect(NULL, &sbi->ll_md_exp, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x14f, "An MDT (md %s) is performing "
         err = obd_connect(NULL, &sbi->ll_md_exp, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x14f, "An MDT (md %s) is performing "
@@ -343,6 +348,11 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         if (data->ocd_connect_flags & OBD_CONNECT_64BITHASH)
                 sbi->ll_flags |= LL_SBI_64BIT_HASH;
 
         if (data->ocd_connect_flags & OBD_CONNECT_64BITHASH)
                 sbi->ll_flags |= LL_SBI_64BIT_HASH;
 
+        if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
+                sbi->ll_md_brw_size = data->ocd_brw_size;
+        else
+                sbi->ll_md_brw_size = CFS_PAGE_SIZE;
+
         obd = class_name2obd(dt);
         if (!obd) {
                 CERROR("DT %s: not setup or attached\n", dt);
         obd = class_name2obd(dt);
         if (!obd) {
                 CERROR("DT %s: not setup or attached\n", dt);
@@ -386,7 +396,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
 
         obd->obd_upcall.onu_owner = &sbi->ll_lco;
         obd->obd_upcall.onu_upcall = cl_ocd_update;
 
         obd->obd_upcall.onu_owner = &sbi->ll_lco;
         obd->obd_upcall.onu_upcall = cl_ocd_update;
-        data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
+
+        data->ocd_brw_size = PTLRPC_MAX_BRW_SIZE;
 
         err = obd_connect(NULL, &sbi->ll_dt_exp, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
 
         err = obd_connect(NULL, &sbi->ll_dt_exp, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
index cd4ab9d..6b6885e 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -2342,7 +2345,8 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
 }
 
 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
 }
 
 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
-                        struct obd_capa *oc, __u64 offset64, struct page *page,
+                        struct obd_capa *oc, __u64 offset64,
+                        struct page **pages, unsigned npages,
                         struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
                         struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
@@ -2358,6 +2362,11 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         int                      tgt0_idx = 0;
         int                      rc;
         int                      nr = 0;
         int                      tgt0_idx = 0;
         int                      rc;
         int                      nr = 0;
+        int                      i;
+        /* number of pages read, in CFS_PAGE_SIZE */
+        int                      nrdpgs;
+        /* number of pages transferred in LU_PAGE_SIZE */
+        int                      nlupgs;
         struct lmv_stripe       *los;
         struct lmv_tgt_desc     *tgt;
         struct lu_dirpage       *dp;
         struct lmv_stripe       *los;
         struct lmv_tgt_desc     *tgt;
         struct lu_dirpage       *dp;
@@ -2435,34 +2444,99 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 GOTO(cleanup, rc = PTR_ERR(tgt));
 
         if (IS_ERR(tgt))
                 GOTO(cleanup, rc = PTR_ERR(tgt));
 
-        rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request);
+        rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, pages, npages,
+                         request);
         if (rc)
                 GOTO(cleanup, rc);
         if (rc)
                 GOTO(cleanup, rc);
-        if (obj) {
-                dp = cfs_kmap(page);
 
 
-                lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
-                lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
-                LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
+        nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+                 >> CFS_PAGE_SHIFT;
+        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+        LASSERT(nrdpgs > 0 && nrdpgs <= npages);
 
 
-                for (ent = lu_dirent_start(dp); ent != NULL;
-                     ent = lu_dirent_next(ent))
-                        lmv_hash_adjust(&ent->lde_hash, hash_adj);
+        CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, npages);
 
 
-                if (tgt0_idx != nr - 1) {
-                        __u64 end;
+        for (i = 0; i < nrdpgs; i++) {
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                struct lu_dirpage *first;
+                __u64 hash_end = 0;
+                __u32 flags = 0;
+#endif
+                struct lu_dirent *tmp = NULL;
+
+                dp = cfs_kmap(pages[i]);
+                if (obj) {
+                        lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
+                        lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
+                        LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
 
 
-                        end = le64_to_cpu(dp->ldp_hash_end);
-                        if (end == MDS_DIR_END_OFF) {
+                        if ((tgt0_idx != nr - 1) &&
+                            (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
+                        {
                                 dp->ldp_hash_end = cpu_to_le32(seg_size *
                                                                (tgt0_idx + 1));
                                 CDEBUG(D_INODE,
                                        ""DFID" reset end "LPX64" tgt %d\n",
                                        PFID(&rid),
                                 dp->ldp_hash_end = cpu_to_le32(seg_size *
                                                                (tgt0_idx + 1));
                                 CDEBUG(D_INODE,
                                        ""DFID" reset end "LPX64" tgt %d\n",
                                        PFID(&rid),
-                                       (__u64)le64_to_cpu(dp->ldp_hash_end), tgt_idx);
+                                       (__u64)le64_to_cpu(dp->ldp_hash_end),
+                                       tgt_idx);
                         }
                 }
                         }
                 }
-                cfs_kunmap(page);
+
+                ent = lu_dirent_start(dp);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                first = dp;
+                hash_end = dp->ldp_hash_end;
+repeat:
+#endif
+                nlupgs--;
+                for (tmp = ent; ent != NULL;
+                     tmp = ent, ent = lu_dirent_next(ent)) {
+                        if (obj)
+                                lmv_hash_adjust(&ent->lde_hash, hash_adj);
+                }
+
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+                        ent = lu_dirent_start(dp);
+
+                        if (obj) {
+                                lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+                                if ((tgt0_idx != nr - 1) &&
+                                    (le64_to_cpu(dp->ldp_hash_end) ==
+                                     MDS_DIR_END_OFF)) {
+                                        hash_end = cpu_to_le32(seg_size *
+                                                               (tgt0_idx + 1));
+                                        CDEBUG(D_INODE,
+                                            ""DFID" reset end "LPX64" tgt %d\n",
+                                            PFID(&rid),
+                                            (__u64)le64_to_cpu(hash_end),
+                                            tgt_idx);
+                                }
+                        }
+                        hash_end = dp->ldp_hash_end;
+                        flags = dp->ldp_flags;
+
+                        if (tmp) {
+                                /* enlarge the end entry lde_reclen from 0 to
+                                 * first entry of next lu_dirpage, in this way
+                                 * several lu_dirpages can be stored into one
+                                 * client page on client. */
+                                tmp = ((void *)tmp) +
+                                      le16_to_cpu(tmp->lde_reclen);
+                                tmp->lde_reclen =
+                                        cpu_to_le16((char *)(dp->ldp_entries) -
+                                                    (char *)tmp);
+                                goto repeat;
+                        }
+                }
+                first->ldp_hash_end = hash_end;
+                first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+                first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+#endif
+                cfs_kunmap(pages[i]);
         }
         EXIT;
 cleanup:
         }
         EXIT;
 cleanup:
index 0983f8d..aa6ed8c 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -149,6 +152,14 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         /*{ "filegroups",      lprocfs_rd_filegroups,  0, 0 },*/
         { "mds_server_uuid", lprocfs_rd_server_uuid, 0, 0 },
         { "mds_conn_uuid",   lprocfs_rd_conn_uuid,   0, 0 },
         /*{ "filegroups",      lprocfs_rd_filegroups,  0, 0 },*/
         { "mds_server_uuid", lprocfs_rd_server_uuid, 0, 0 },
         { "mds_conn_uuid",   lprocfs_rd_conn_uuid,   0, 0 },
+        /*
+         * FIXME: below proc entry is provided, but not in used, instead
+         * sbi->sb_md_brw_size is used, the per obd variable should be used
+         * when CMD is enabled, and dir pages are managed in MDC layer.
+         * Remember to enable proc write function.
+         */
+        { "max_pages_per_rpc",  lprocfs_obd_rd_max_pages_per_rpc,
+                                /* lprocfs_obd_wr_max_pages_per_rpc */0, 0 },
         { "max_rpcs_in_flight", mdc_rd_max_rpcs_in_flight,
                                 mdc_wr_max_rpcs_in_flight, 0 },
         { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
         { "max_rpcs_in_flight", mdc_rd_max_rpcs_in_flight,
                                 mdc_wr_max_rpcs_in_flight, 0 },
         { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
index d8d6303..92ebe03 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -1020,15 +1023,22 @@ EXPORT_SYMBOL(mdc_sendpage);
 #endif
 
 int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
 #endif
 
 int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, __u64 offset, struct page *page,
-                 struct ptlrpc_request **request)
+                 struct obd_capa *oc, __u64 offset, struct page **pages,
+                 unsigned npages, struct ptlrpc_request **request)
 {
         struct ptlrpc_request   *req;
         struct ptlrpc_bulk_desc *desc;
 {
         struct ptlrpc_request   *req;
         struct ptlrpc_bulk_desc *desc;
+        int                      i;
+        cfs_waitq_t              waitq;
+        int                      resends = 0;
+        struct l_wait_info       lwi;
         int                      rc;
         ENTRY;
 
         *request = NULL;
         int                      rc;
         ENTRY;
 
         *request = NULL;
+        cfs_waitq_init(&waitq);
+
+restart_bulk:
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
         if (req == NULL)
                 RETURN(-ENOMEM);
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -1044,21 +1054,35 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
+        desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
+                                    MDS_BULK_PORTAL);
         if (desc == NULL) {
                 ptlrpc_request_free(req);
                 RETURN(-ENOMEM);
         }
 
         /* NB req now owns desc and will free it when it gets freed */
         if (desc == NULL) {
                 ptlrpc_request_free(req);
                 RETURN(-ENOMEM);
         }
 
         /* NB req now owns desc and will free it when it gets freed */
-        ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
-        mdc_readdir_pack(req, offset, CFS_PAGE_SIZE, fid, oc);
+        for (i = 0; i < npages; i++)
+                ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+
+        mdc_readdir_pack(req, offset, CFS_PAGE_SIZE * npages, fid, oc);
 
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc) {
                 ptlrpc_req_finished(req);
 
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc) {
                 ptlrpc_req_finished(req);
-                RETURN(rc);
+                if (rc != -ETIMEDOUT)
+                        RETURN(rc);
+
+                resends++;
+                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("too many resend retries, returning error\n");
+                        RETURN(-EIO);
+                }
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                l_wait_event(waitq, 0, &lwi);
+
+                goto restart_bulk;
         }
 
         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
         }
 
         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
@@ -1068,9 +1092,10 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
-        if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) {
+        if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
-                        req->rq_bulk->bd_nob_transferred, CFS_PAGE_SIZE);
+                        req->rq_bulk->bd_nob_transferred,
+                        CFS_PAGE_SIZE * npages);
                 ptlrpc_req_finished(req);
                 RETURN(-EPROTO);
         }
                 ptlrpc_req_finished(req);
                 RETURN(-EPROTO);
         }
index 6cef918..a5f4975 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -2231,20 +2234,20 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              int first, void *area, int nob,
+                              struct lu_dirpage *dp, int nob,
                               const struct dt_it_ops *iops, struct dt_it *it,
                               const struct dt_it_ops *iops, struct dt_it *it,
-                              __u64 *start, __u64 *end,
-                              struct lu_dirent **last, __u32 attr)
+                              __u32 attr)
 {
 {
+        void                   *area = dp;
         int                     result;
         __u64                   hash = 0;
         struct lu_dirent       *ent;
         int                     result;
         __u64                   hash = 0;
         struct lu_dirent       *ent;
+        struct lu_dirent       *last = NULL;
+        int                     first = 1;
 
 
-        if (first) {
-                memset(area, 0, sizeof (struct lu_dirpage));
-                area += sizeof (struct lu_dirpage);
-                nob  -= sizeof (struct lu_dirpage);
-        }
+        memset(area, 0, sizeof (*dp));
+        area += sizeof (*dp);
+        nob  -= sizeof (*dp);
 
         ent  = area;
         do {
 
         ent  = area;
         do {
@@ -2260,7 +2263,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 hash = iops->store(env, it);
                 if (unlikely(first)) {
                         first = 0;
                 hash = iops->store(env, it);
                 if (unlikely(first)) {
                         first = 0;
-                        *start = hash;
+                        dp->ldp_hash_start = cpu_to_le64(hash);
                 }
 
                 /* calculate max space required for lu_dirent */
                 }
 
                 /* calculate max space required for lu_dirent */
@@ -2277,20 +2280,10 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
-                        /*
-                         * record doesn't fit into page, enlarge previous one.
-                         */
-                        if (*last) {
-                                (*last)->lde_reclen =
-                                        cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
-                                                        nob);
-                                result = 0;
-                        } else
-                                result = -EINVAL;
-
+                        result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
                 }
                         goto out;
                 }
-                *last = ent;
+                last = ent;
                 ent = (void *)ent + recsize;
                 nob -= recsize;
 
                 ent = (void *)ent + recsize;
                 nob -= recsize;
 
@@ -2301,7 +2294,12 @@ next:
         } while (result == 0);
 
 out:
         } while (result == 0);
 
 out:
-        *end = hash;
+        dp->ldp_hash_end = cpu_to_le64(hash);
+        if (last != NULL) {
+                if (last->lde_hash == dp->ldp_hash_end)
+                        dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+                last->lde_reclen = 0; /* end mark */
+        }
         return result;
 }
 
         return result;
 }
 
@@ -2312,13 +2310,11 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
         struct dt_object  *next = mdd_object_child(obj);
         const struct dt_it_ops  *iops;
         struct page       *pg;
         struct dt_object  *next = mdd_object_child(obj);
         const struct dt_it_ops  *iops;
         struct page       *pg;
-        struct lu_dirent  *last = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
         int i;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
         int i;
+        int nlupgs = 0;
         int rc;
         int nob;
         int rc;
         int nob;
-        __u64 hash_start;
-        __u64 hash_end = 0;
 
         LASSERT(rdpg->rp_pages != NULL);
         LASSERT(next->do_index_ops != NULL);
 
         LASSERT(rdpg->rp_pages != NULL);
         LASSERT(next->do_index_ops != NULL);
@@ -2336,7 +2332,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
 
         rc = iops->load(env, it, rdpg->rp_hash);
 
 
         rc = iops->load(env, it, rdpg->rp_hash);
 
-        if (rc == 0){
+        if (rc == 0) {
                 /*
                  * Iterator didn't find record with exactly the key requested.
                  *
                 /*
                  * Iterator didn't find record with exactly the key requested.
                  *
@@ -2361,39 +2357,51 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          */
         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
              i++, nob -= CFS_PAGE_SIZE) {
          */
         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
              i++, nob -= CFS_PAGE_SIZE) {
+                struct lu_dirpage *dp;
+
                 LASSERT(i < rdpg->rp_npages);
                 pg = rdpg->rp_pages[i];
                 LASSERT(i < rdpg->rp_npages);
                 pg = rdpg->rp_pages[i];
-                rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
-                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
-                                        it, &hash_start, &hash_end, &last,
-                                        rdpg->rp_attrs);
-                if (rc != 0 || i == rdpg->rp_npages - 1) {
-                        if (last)
-                                last->lde_reclen = 0;
+                dp = cfs_kmap(pg);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+repeat:
+#endif
+                rc = mdd_dir_page_build(env, mdd, dp,
+                                        min_t(int, nob, LU_PAGE_SIZE),
+                                        iops, it, rdpg->rp_attrs);
+                if (rc > 0) {
+                        /*
+                         * end of directory.
+                         */
+                        dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+                        nlupgs++;
+                } else if (rc < 0) {
+                        CWARN("build page failed: %d!\n", rc);
+                } else {
+                        nlupgs++;
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                        if ((unsigned long)dp & ~CFS_PAGE_MASK)
+                                goto repeat;
+#endif
                 }
                 cfs_kunmap(pg);
         }
                 }
                 cfs_kunmap(pg);
         }
-        if (rc > 0) {
-                /*
-                 * end of directory.
-                 */
-                hash_end = MDS_DIR_END_OFF;
-                rc = 0;
-        }
-        if (rc == 0) {
+        if (rc >= 0) {
                 struct lu_dirpage *dp;
 
                 dp = cfs_kmap(rdpg->rp_pages[0]);
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                 struct lu_dirpage *dp;
 
                 dp = cfs_kmap(rdpg->rp_pages[0]);
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(hash_end);
-                if (i == 0)
+                if (nlupgs == 0) {
                         /*
                         /*
-                         * No pages were processed, mark this.
+                         * No pages were processed, mark this for first page
+                         * and send back.
                          */
                          */
-                        dp->ldp_flags |= LDF_EMPTY;
-
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                        dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
+                        nlupgs = 1;
+                }
                 cfs_kunmap(rdpg->rp_pages[0]);
                 cfs_kunmap(rdpg->rp_pages[0]);
+
+                rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
         }
         iops->put(env, it);
         iops->fini(env, it);
         }
         iops->put(env, it);
         iops->fini(env, it);
@@ -2435,10 +2443,9 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
-                dp->ldp_flags |= LDF_EMPTY;
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                 cfs_kunmap(pg);
                 cfs_kunmap(pg);
-                GOTO(out_unlock, rc = 0);
+                GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
         rc = __mdd_readpage(env, mdd_obj, rdpg);
         }
 
         rc = __mdd_readpage(env, mdd_obj, rdpg);
index ef2542b..b91322e 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -1207,7 +1210,7 @@ static int mdt_disconnect(struct mdt_thread_info *info)
 }
 
 static int mdt_sendpage(struct mdt_thread_info *info,
 }
 
 static int mdt_sendpage(struct mdt_thread_info *info,
-                        struct lu_rdpg *rdpg)
+                        struct lu_rdpg *rdpg, int nob)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct obd_export       *exp = req->rq_export;
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct obd_export       *exp = req->rq_export;
@@ -1215,7 +1218,6 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
-        int                      timeout;
         int                      i;
         int                      rc;
         ENTRY;
         int                      i;
         int                      rc;
         ENTRY;
@@ -1225,63 +1227,16 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (desc == NULL)
                 RETURN(-ENOMEM);
 
         if (desc == NULL)
                 RETURN(-ENOMEM);
 
-        for (i = 0, tmpcount = rdpg->rp_count;
-                i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
+        for (i = 0, tmpcount = nob;
+                i < rdpg->rp_npages && tmpcount > 0; i++, tmpcount -= tmpsize) {
                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
         }
 
                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
         }
 
-        LASSERT(desc->bd_nob == rdpg->rp_count);
-        rc = sptlrpc_svc_wrap_bulk(req, desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        rc = ptlrpc_start_bulk_transfer(desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
-                GOTO(abort_bulk, rc = 0);
-
-        do {
-                timeout = (int) req->rq_deadline - cfs_time_current_sec();
-                if (timeout < 0)
-                        CERROR("Req deadline already passed %lu (now: %lu)\n",
-                               req->rq_deadline, cfs_time_current_sec());
-                *lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(max(timeout, 1)),
-                                            cfs_time_seconds(1), NULL, NULL);
-                rc = l_wait_event(desc->bd_waitq,
-                                  !ptlrpc_server_bulk_active(desc) ||
-                                  exp->exp_failed ||
-                                  exp->exp_abort_active_req, lwi);
-                LASSERT (rc == 0 || rc == -ETIMEDOUT);
-        } while ((rc == -ETIMEDOUT) &&
-                 (req->rq_deadline > cfs_time_current_sec()));
-
-        if (rc == 0) {
-                if (desc->bd_success &&
-                    desc->bd_nob_transferred == rdpg->rp_count)
-                        GOTO(free_desc, rc);
-
-                rc = -ETIMEDOUT;
-                if (exp->exp_abort_active_req || exp->exp_failed)
-                        GOTO(abort_bulk, rc);
-        }
-
-        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
-                  (rc == -ETIMEDOUT) ? "timeout" : "network error",
-                  desc->bd_nob_transferred, rdpg->rp_count,
-                  exp->exp_client_uuid.uuid,
-                  exp->exp_connection->c_remote_uuid.uuid);
-
-        class_fail_export(exp);
-
-        EXIT;
-abort_bulk:
-        ptlrpc_abort_bulk(desc);
-free_desc:
+        LASSERT(desc->bd_nob == nob);
+        rc = target_bulk_io(exp, desc, lwi);
         ptlrpc_free_bulk(desc);
         ptlrpc_free_bulk(desc);
-        return rc;
+        RETURN(rc);
 }
 
 #ifdef HAVE_SPLIT_SUPPORT
 }
 
 #ifdef HAVE_SPLIT_SUPPORT
@@ -1491,8 +1446,10 @@ static int mdt_readpage(struct mdt_thread_info *info)
         rdpg->rp_attrs = reqbody->mode;
         if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
                 rdpg->rp_attrs |= LUDA_64BITHASH;
         rdpg->rp_attrs = reqbody->mode;
         if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
                 rdpg->rp_attrs |= LUDA_64BITHASH;
-        rdpg->rp_count  = reqbody->nlink;
-        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
+        rdpg->rp_count  = min_t(unsigned int, reqbody->nlink,
+                                PTLRPC_MAX_BRW_SIZE);
+        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
+                          CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
@@ -1505,11 +1462,11 @@ static int mdt_readpage(struct mdt_thread_info *info)
 
         /* call lower layers to fill allocated pages with directory data */
         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
 
         /* call lower layers to fill allocated pages with directory data */
         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
-        if (rc)
+        if (rc < 0)
                 GOTO(free_rdpg, rc);
 
         /* send pages to client */
                 GOTO(free_rdpg, rc);
 
         /* send pages to client */
-        rc = mdt_sendpage(info, rdpg);
+        rc = mdt_sendpage(info, rdpg, rc);
 
         EXIT;
 free_rdpg:
 
         EXIT;
 free_rdpg:
@@ -4921,6 +4878,24 @@ static int mdt_connect_internal(struct obd_export *exp,
                 if (!mdt->mdt_som_conf)
                         data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
 
                 if (!mdt->mdt_som_conf)
                         data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
 
+                if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+                        data->ocd_brw_size = min(data->ocd_brw_size,
+                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+                        if (data->ocd_brw_size == 0) {
+                                CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+                                       " ocd_version: %x ocd_grant: %d "
+                                       "ocd_index: %u ocd_brw_size is "
+                                       "unexpectedly zero, network data "
+                                       "corruption? Refusing connection of this"
+                                       " client\n",
+                                       exp->exp_obd->obd_name,
+                                       exp->exp_client_uuid.uuid,
+                                       exp, data->ocd_connect_flags, data->ocd_version,
+                                       data->ocd_grant, data->ocd_index);
+                                return -EPROTO;
+                        }
+                }
+
                 cfs_spin_lock(&exp->exp_lock);
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 cfs_spin_unlock(&exp->exp_lock);
                 cfs_spin_lock(&exp->exp_lock);
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 cfs_spin_unlock(&exp->exp_lock);
index bb870a0..489ff03 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -2379,6 +2382,46 @@ int lprocfs_obd_rd_mntdev(char *page, char **start, off_t off,
 }
 EXPORT_SYMBOL(lprocfs_obd_rd_mntdev);
 
 }
 EXPORT_SYMBOL(lprocfs_obd_rd_mntdev);
 
+int lprocfs_obd_rd_max_pages_per_rpc(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data)
+{
+        struct obd_device *dev = data;
+        struct client_obd *cli = &dev->u.cli;
+        int rc;
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        rc = snprintf(page, count, "%d\n", cli->cl_max_pages_per_rpc);
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        return rc;
+}
+EXPORT_SYMBOL(lprocfs_obd_rd_max_pages_per_rpc);
+
+int lprocfs_obd_wr_max_pages_per_rpc(struct file *file, const char *buffer,
+                                     unsigned long count, void *data)
+{
+        struct obd_device *dev = data;
+        struct client_obd *cli = &dev->u.cli;
+        struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data;
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        LPROCFS_CLIMP_CHECK(dev);
+        if (val < 1 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) {
+                LPROCFS_CLIMP_EXIT(dev);
+                return -ERANGE;
+        }
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        cli->cl_max_pages_per_rpc = val;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        LPROCFS_CLIMP_EXIT(dev);
+        return count;
+}
+EXPORT_SYMBOL(lprocfs_obd_wr_max_pages_per_rpc);
+
 EXPORT_SYMBOL(lprocfs_register);
 EXPORT_SYMBOL(lprocfs_srch);
 EXPORT_SYMBOL(lprocfs_remove);
 EXPORT_SYMBOL(lprocfs_register);
 EXPORT_SYMBOL(lprocfs_srch);
 EXPORT_SYMBOL(lprocfs_remove);
index 237200b..f59ab26 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -79,44 +82,6 @@ static int osc_wr_active(struct file *file, const char *buffer,
         return count;
 }
 
         return count;
 }
 
-static int osc_rd_max_pages_per_rpc(char *page, char **start, off_t off,
-                                    int count, int *eof, void *data)
-{
-        struct obd_device *dev = data;
-        struct client_obd *cli = &dev->u.cli;
-        int rc;
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        rc = snprintf(page, count, "%d\n", cli->cl_max_pages_per_rpc);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        return rc;
-}
-
-static int osc_wr_max_pages_per_rpc(struct file *file, const char *buffer,
-                                    unsigned long count, void *data)
-{
-        struct obd_device *dev = data;
-        struct client_obd *cli = &dev->u.cli;
-        struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data;
-        int val, rc;
-
-        rc = lprocfs_write_helper(buffer, count, &val);
-        if (rc)
-                return rc;
-
-        LPROCFS_CLIMP_CHECK(dev);
-        if (val < 1 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) {
-                LPROCFS_CLIMP_EXIT(dev);
-                return -ERANGE;
-        }
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        cli->cl_max_pages_per_rpc = val;
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        LPROCFS_CLIMP_EXIT(dev);
-        return count;
-}
-
 static int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off,
                                      int count, int *eof, void *data)
 {
 static int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off,
                                      int count, int *eof, void *data)
 {
@@ -618,8 +583,8 @@ static struct lprocfs_vars lprocfs_osc_obd_vars[] = {
         { "ost_conn_uuid",   lprocfs_rd_conn_uuid, 0, 0 },
         { "active",          osc_rd_active,
                              osc_wr_active, 0 },
         { "ost_conn_uuid",   lprocfs_rd_conn_uuid, 0, 0 },
         { "active",          osc_rd_active,
                              osc_wr_active, 0 },
-        { "max_pages_per_rpc", osc_rd_max_pages_per_rpc,
-                               osc_wr_max_pages_per_rpc, 0 },
+        { "max_pages_per_rpc", lprocfs_obd_rd_max_pages_per_rpc,
+                               lprocfs_obd_wr_max_pages_per_rpc, 0 },
         { "max_rpcs_in_flight", osc_rd_max_rpcs_in_flight,
                                 osc_wr_max_rpcs_in_flight, 0 },
         { "destroys_in_flight", osc_rd_destroys_in_flight, 0, 0 },
         { "max_rpcs_in_flight", osc_rd_max_rpcs_in_flight,
                                 osc_wr_max_rpcs_in_flight, 0 },
         { "destroys_in_flight", osc_rd_destroys_in_flight, 0, 0 },
index f598fce..305ddd3 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -193,13 +196,6 @@ static inline int osc_recoverable_error(int rc)
         return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
 }
 
         return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
 }
 
-/* return 1 if osc should be resend request */
-static inline int osc_should_resend(int resend, struct client_obd *cli)
-{
-        return cfs_atomic_read(&cli->cl_resends) ?
-               cfs_atomic_read(&cli->cl_resends) > resend : 1;
-}
-
 #ifndef min_t
 #define min_t(type,x,y) \
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
 #ifndef min_t
 #define min_t(type,x,y) \
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
index 8530428..7f57030 100644 (file)
@@ -1709,7 +1709,7 @@ restart_bulk:
         ptlrpc_req_finished(req);
         if (osc_recoverable_error(rc)) {
                 resends++;
         ptlrpc_req_finished(req);
         if (osc_recoverable_error(rc)) {
                 resends++;
-                if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
                         CERROR("too many resend retries, returning error\n");
                         RETURN(-EIO);
                 }
                         CERROR("too many resend retries, returning error\n");
                         RETURN(-EIO);
                 }
@@ -1733,7 +1733,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+        if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
                 CERROR("too many resent retries, returning error\n");
                 RETURN(-EIO);
         }
                 CERROR("too many resent retries, returning error\n");
                 RETURN(-EIO);
         }
index 1eeaabf..565b0b8 100644 (file)
@@ -513,15 +513,6 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static int ost_bulk_timeout(void *data)
-{
-        ENTRY;
-        /* We don't fail the connection here, because having the export
-         * killed makes the (vital) call to commitrw very sad.
-         */
-        RETURN(1);
-}
-
 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
                                cksum_type_t cksum_type)
 {
 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
                                cksum_type_t cksum_type)
 {
@@ -909,76 +900,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         /* Check if client was evicted while we were doing i/o before touching
            network */
         if (rc == 0) {
         /* Check if client was evicted while we were doing i/o before touching
            network */
         if (rc == 0) {
-                /* Check if there is eviction in progress, and if so, wait for
-                 * it to finish */
-                if (unlikely(cfs_atomic_read(&exp->exp_obd->
-                                             obd_evict_inprogress))) {
-                        lwi = LWI_INTR(NULL, NULL);
-                        rc = l_wait_event(exp->exp_obd->
-                                          obd_evict_inprogress_waitq,
-                                          !cfs_atomic_read(&exp->exp_obd->
-                                          obd_evict_inprogress),
-                                          &lwi);
-                }
-                /* Check if client was evicted or tried to reconnect already */
-                if (exp->exp_failed || exp->exp_abort_active_req)
-                        rc = -ENOTCONN;
-                else {
-                        rc = sptlrpc_svc_wrap_bulk(req, desc);
-                        if (rc == 0)
-                                rc = ptlrpc_start_bulk_transfer(desc);
-                }
-
-                if (rc == 0) {
-                        time_t start = cfs_time_current_sec();
-                        do {
-                                long timeoutl = req->rq_deadline -
-                                        cfs_time_current_sec();
-                                cfs_duration_t timeout = timeoutl <= 0 ?
-                                        CFS_TICK : cfs_time_seconds(timeoutl);
-                                lwi = LWI_TIMEOUT_INTERVAL(timeout,
-                                                           cfs_time_seconds(1),
-                                                           ost_bulk_timeout,
-                                                           desc);
-                                rc = l_wait_event(desc->bd_waitq,
-                                                  !ptlrpc_server_bulk_active(desc) ||
-                                                  exp->exp_failed ||
-                                                  exp->exp_abort_active_req,
-                                                  &lwi);
-                                LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                                /* Wait again if we changed deadline */
-                        } while ((rc == -ETIMEDOUT) &&
-                                 (req->rq_deadline > cfs_time_current_sec()));
-
-                        if (rc == -ETIMEDOUT) {
-                                DEBUG_REQ(D_ERROR, req,
-                                          "timeout on bulk PUT after %ld%+lds",
-                                          req->rq_deadline - start,
-                                          cfs_time_current_sec() -
-                                          req->rq_deadline);
-                                ptlrpc_abort_bulk(desc);
-                        } else if (exp->exp_failed) {
-                                DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
-                                rc = -ENOTCONN;
-                                ptlrpc_abort_bulk(desc);
-                        } else if (exp->exp_abort_active_req) {
-                                DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
-                                /* we don't reply anyway */
-                                rc = -ETIMEDOUT;
-                                ptlrpc_abort_bulk(desc);
-                        } else if (!desc->bd_success ||
-                                   desc->bd_nob_transferred != desc->bd_nob) {
-                                DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
-                                          desc->bd_success ?
-                                          "truncated" : "network error on",
-                                          desc->bd_nob_transferred,
-                                          desc->bd_nob);
-                                /* XXX should this be a different errno? */
-                                rc = -ETIMEDOUT;
-                        }
-                } else {
-                        DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
-                }
+                rc = target_bulk_io(exp, desc, &lwi);
                 no_reply = rc != 0;
         }
 
                 no_reply = rc != 0;
         }
 
@@ -1057,17 +979,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         /* pause before transaction has been started */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
 
         /* pause before transaction has been started */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
 
-        /* Check if there is eviction in progress, and if so, wait for it to
-         * finish */
-        if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
-                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
-                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
-                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
-                        &lwi);
-        }
-        if (exp->exp_failed)
-                GOTO(out, rc = -ENOTCONN);
-
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
@@ -1189,58 +1100,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (rc != 0)
                 GOTO(out_lock, rc);
 
         if (rc != 0)
                 GOTO(out_lock, rc);
 
-        /* Check if client was evicted or tried to reconnect while we
-         * were doing i/o before touching network */
-        if (desc->bd_export->exp_failed ||
-            desc->bd_export->exp_abort_active_req)
-                rc = -ENOTCONN;
-        else
-                rc = ptlrpc_start_bulk_transfer(desc);
-        if (rc == 0) {
-                time_t start = cfs_time_current_sec();
-                do {
-                        long timeoutl = req->rq_deadline -
-                                cfs_time_current_sec();
-                        cfs_duration_t timeout = timeoutl <= 0 ?
-                                CFS_TICK : cfs_time_seconds(timeoutl);
-                        lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
-                                                   ost_bulk_timeout, desc);
-                        rc = l_wait_event(desc->bd_waitq,
-                                          !ptlrpc_server_bulk_active(desc) ||
-                                          desc->bd_export->exp_failed ||
-                                          desc->bd_export->exp_abort_active_req,
-                                          &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        /* Wait again if we changed deadline */
-                } while ((rc == -ETIMEDOUT) &&
-                         (req->rq_deadline > cfs_time_current_sec()));
-
-                if (rc == -ETIMEDOUT) {
-                        DEBUG_REQ(D_ERROR, req,
-                                  "timeout on bulk GET after %ld%+lds",
-                                  req->rq_deadline - start,
-                                  cfs_time_current_sec() -
-                                  req->rq_deadline);
-                        ptlrpc_abort_bulk(desc);
-                } else if (desc->bd_export->exp_failed) {
-                        DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
-                        rc = -ENOTCONN;
-                        ptlrpc_abort_bulk(desc);
-                } else if (desc->bd_export->exp_abort_active_req) {
-                        DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
-                        /* we don't reply anyway */
-                        rc = -ETIMEDOUT;
-                        ptlrpc_abort_bulk(desc);
-                } else if (!desc->bd_success) {
-                        DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
-                        /* XXX should this be a different errno? */
-                        rc = -ETIMEDOUT;
-                } else {
-                        rc = sptlrpc_svc_unwrap_bulk(req, desc);
-                }
-        } else {
-                DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
-        }
+        rc = target_bulk_io(exp, desc, &lwi);
         no_reply = rc != 0;
 
 skip_transfer:
         no_reply = rc != 0;
 
 skip_transfer:
index 1464917..80f169b 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -1071,10 +1074,12 @@ finish:
                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
                 }
 
                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
                 }
 
-                if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+                if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
                         cli->cl_max_pages_per_rpc =
                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
                         cli->cl_max_pages_per_rpc =
                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
-                }
+                else if (imp->imp_connect_op == MDS_CONNECT ||
+                         imp->imp_connect_op == MGS_CONNECT)
+                        cli->cl_max_pages_per_rpc = 1;
 
                 /* Reset ns_connect_flags only for initial connect. It might be
                  * changed in while using FS and if we reset it in reconnect
 
                 /* Reset ns_connect_flags only for initial connect. It might be
                  * changed in while using FS and if we reset it in reconnect
index 9d7e7b6..a46297c 100644 (file)
@@ -784,6 +784,10 @@ test_24u() { # bug12192
 }
 run_test 24u "create stripe file"
 
 }
 run_test 24u "create stripe file"
 
+page_size() {
+       getconf PAGE_SIZE
+}
+
 test_24v() {
        local NRFILES=100000
        local FREE_INODES=`lfs df -i|grep "filesystem summary" | awk '{print $5}'`
 test_24v() {
        local NRFILES=100000
        local FREE_INODES=`lfs df -i|grep "filesystem summary" | awk '{print $5}'`
@@ -793,8 +797,26 @@ test_24v() {
 
        mkdir -p $DIR/d24v
        createmany -m $DIR/d24v/$tfile $NRFILES
 
        mkdir -p $DIR/d24v
        createmany -m $DIR/d24v/$tfile $NRFILES
+
+       cancel_lru_locks mdc
+       lctl set_param mdc.*.stats clear
+
        ls $DIR/d24v >/dev/null || error "error in listing large dir"
 
        ls $DIR/d24v >/dev/null || error "error in listing large dir"
 
+       # LU-5 large readdir
+       # DIRENT_SIZE = 32 bytes for sizeof(struct lu_dirent) +
+       #               8 bytes for name(filename is mostly 5 in this test) +
+       #               8 bytes for luda_type
+       # take into account of overhead in lu_dirpage header and end mark in
+       # each page, plus one in RPC_NUM calculation.
+       DIRENT_SIZE=48
+       RPC_SIZE=$(($(lctl get_param -n mdc.*.max_pages_per_rpc)*$(page_size)))
+       RPC_NUM=$(((NRFILES * DIRENT_SIZE + RPC_SIZE - 1) / RPC_SIZE + 1))
+       mds_readpage=`lctl get_param mdc.*.stats | \
+                               awk '/^mds_readpage/ {print $2}'`
+       [ $mds_readpage -gt $RPC_NUM ] && \
+               error "large readdir doesn't take effect"
+
        rm $DIR/d24v -rf
 }
 run_test 24v "list directory with large files (handle hash collision, bug: 17560)"
        rm $DIR/d24v -rf
 }
 run_test 24v "list directory with large files (handle hash collision, bug: 17560)"
@@ -2614,10 +2636,6 @@ test_42d() {
 }
 run_test 42d "test complete truncate of file with cached dirty data"
 
 }
 run_test 42d "test complete truncate of file with cached dirty data"
 
-page_size() {
-       getconf PAGE_SIZE
-}
-
 test_42e() { # bug22074
        local TDIR=$DIR/${tdir}e
        local pagesz=$(page_size)
 test_42e() { # bug22074
        local TDIR=$DIR/${tdir}e
        local pagesz=$(page_size)
index e3e6b6c..8bbd30b 100644 (file)
@@ -1460,6 +1460,10 @@ main(int argc, char **argv)
         CHECK_VALUE(MGS_TARGET_DEL);
         CHECK_VALUE(MGS_SET_INFO);
 
         CHECK_VALUE(MGS_TARGET_DEL);
         CHECK_VALUE(MGS_SET_INFO);
 
+        CHECK_VALUE(LDF_EMPTY);
+        CHECK_VALUE(LDF_COLLIDE);
+        CHECK_VALUE(LU_PAGE_SIZE);
+
         COMMENT("Sizes and Offsets");
         BLANK_LINE();
         CHECK_STRUCT(obd_uuid);
         COMMENT("Sizes and Offsets");
         BLANK_LINE();
         CHECK_STRUCT(obd_uuid);