Whamcloud - gitweb
b=6514
authorjacob <jacob>
Thu, 7 Jul 2005 00:17:55 +0000 (00:17 +0000)
committerjacob <jacob>
Thu, 7 Jul 2005 00:17:55 +0000 (00:17 +0000)
r=adilger,green
originally by nikita

Severity   : major
Frequency  : rare (only unsupported configurations with a node running as an
             OST and a client)
Bugzilla   : 6514, 5137
Description: Mounting a Lustre file system on a node running as an OST could
             lead to deadlocks
Details    : OSTs now allocate memory needed to write out data at
             startup, instead of when needed, to avoid having to
             allocate memory in possibly low memory situations.
             Specifically, if the file system is mounted on on OST,
             memory pressure could force it to try to write out data,
             which it needed to allocate memory to do.  Due to the low
             memory, it would be unable to do so and the node would
             become unresponsive.

18 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_support.h
lustre/liblustre/file.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/namei.c
lustre/llite/special.c
lustre/mds/mds_open.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_24.c
lustre/obdfilter/filter_io_26.c
lustre/ost/ost_handler.c
lustre/ost/ost_internal.h
lustre/ptlrpc/service.c

index bef3abf..1fe6a58 100644 (file)
@@ -84,7 +84,21 @@ Details    : The b_committed_data struct is protected by the big kernel lock
             access to this struct.  In 2.6 kernels there is finer grained
             locking to improve SMP performance of the JBD layer.
 
-       
+Severity   : major
+Frequency  : rare (only unsupported configurations with a node running as an 
+             OST and a client)
+Bugzilla   : 6514, 5137
+Description: Mounting a Lustre file system on a node running as an OST could
+             lead to deadlocks
+Details    : OSTs now allocate memory needed to write out data at
+             startup, instead of when needed, to avoid having to
+             allocate memory in possibly low memory situations.
+             Specifically, if the file system is mounted on on OST,
+             memory pressure could force it to try to write out data,
+             which it needed to allocate memory to do.  Due to the low
+             memory, it would be unable to do so and the node would
+             become unresponsive.
+
 ------------------------------------------------------------------------------
 
 2005-06-20  Cluster File Systems, Inc. <info@clusterfs.com>
index 9fc120b..3e44c52 100644 (file)
@@ -279,6 +279,8 @@ struct ptlrpc_reply_state {
         struct lustre_msg     rs_msg;
 };
 
+struct ptlrpc_thread;
+
 enum rq_phase {
         RQ_PHASE_NEW         = 0xebc0de00,
         RQ_PHASE_RPC         = 0xebc0de01,
@@ -302,6 +304,8 @@ struct ptlrpc_request {
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         atomic_t rq_refcount;   /* client-side refcount for SENT race */
 
+        struct ptlrpc_thread *rq_svc_thread; /* initial thread servicing req */
+
         int rq_request_portal;  /* XXX FIXME bug 249 */
         int rq_reply_portal;    /* XXX FIXME bug 249 */
 
@@ -464,9 +468,14 @@ struct ptlrpc_bulk_desc {
 };
 
 struct ptlrpc_thread {
-        struct list_head t_link;
+
+        struct list_head t_link; /* active threads for service, from svc->srv_threads */
 
         __u32 t_flags;
+
+        void *t_data; /* thread-private data (preallocated memory) */
+
+        unsigned int t_id; /* service thread index, from ptlrpc_start_n_threads */
         wait_queue_head_t t_ctl_waitq;
 };
 
@@ -548,6 +557,17 @@ struct ptlrpc_service {
         struct proc_dir_entry   *srv_procroot;
         struct lprocfs_stats    *srv_stats;
         
+        /*
+         * if non-NULL called during thread creation (ptlrpc_start_thread())
+         * to initialize service specific per-thread state.
+         */
+        int (*srv_init)(struct ptlrpc_thread *thread);
+        /*
+         * if non-NULL called during thread shutdown (ptlrpc_main()) to
+         * destruct state created by ->srv_init().
+         */
+        void (*srv_done)(struct ptlrpc_thread *thread);
+
         struct ptlrpc_srv_ni srv_interfaces[0];
 };
 
@@ -698,10 +718,11 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                                        struct proc_dir_entry *proc_entry,
                                        svcreq_printfn_t);
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
+
 int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
                            int cnt, char *base_name);
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
-                        char *name);
+                        char *name, int id);
 int ptlrpc_unregister_service(struct ptlrpc_service *service);
 int liblustre_check_services (void *arg);
 void ptlrpc_daemonize(void);
index 033f9c5..51f1910 100644 (file)
@@ -211,6 +211,24 @@ struct filter_obd {
         int fo_r_in_flight; /* protected by fo_stats_lock */
         int fo_w_in_flight; /* protected by fo_stats_lock */
 
+        /*
+         * per-filter pool of kiobuf's allocated by filter_common_setup() and
+         * torn down by filter_cleanup(). Contains OST_NUM_THREADS elements of
+         * which ->fo_iobuf_count were allocated.
+         *
+         * This pool contains kiobuf used by
+         * filter_{prep,commit}rw_{read,write}() and is shared by all OST
+         * threads.
+         *
+         * Locking: none, each OST thread uses only one element, determined by
+         * its "ordinal number", ->t_id.
+         *
+         * This is (void *) array, because 2.4 and 2.6 use different iobuf
+         * structures.
+         */
+        void                   **fo_iobuf_pool;
+        int                      fo_iobuf_count;
+
         struct obd_histogram     fo_r_pages;
         struct obd_histogram     fo_w_pages;
         struct obd_histogram     fo_read_rpc_hist;
@@ -448,6 +466,9 @@ struct obd_trans_info {
         struct llog_cookie       oti_onecookie;
         struct llog_cookie      *oti_logcookies;
         int                      oti_numcookies;
+
+        /* initial thread handling transaction */
+        struct ptlrpc_thread    *oti_thread; 
 };
 
 static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
index 757dba2..86c87bd 100644 (file)
@@ -326,6 +326,8 @@ do {                                                                          \
 
 #define OBD_ALLOC(ptr, size) OBD_ALLOC_GFP(ptr, size, OBD_GFP_MASK)
 #define OBD_ALLOC_WAIT(ptr, size) OBD_ALLOC_GFP(ptr, size, GFP_KERNEL)
+#define OBD_ALLOC_PTR(ptr) OBD_ALLOC(ptr, sizeof *(ptr))
+#define OBD_ALLOC_PTR_WAIT(ptr) OBD_ALLOC_WAIT(ptr, sizeof *(ptr))
 
 #ifdef __arch_um__
 # define OBD_VMALLOC(ptr, size) OBD_ALLOC(ptr, size)
@@ -414,6 +416,8 @@ do {                                                                          \
         }                                                                     \
 } while (0)
 
+#define OBD_FREE_PTR(ptr) OBD_FREE(ptr, sizeof *(ptr))
+
 #define OBD_SLAB_FREE(ptr, slab, size)                                        \
 do {                                                                          \
         LASSERT(ptr);                                                         \
index 71a3548..31627d3 100644 (file)
@@ -214,6 +214,7 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
         int rc;
         ENTRY;
 
+        oti.oti_thread = request->rq_svc_thread;
         /* req is swabbed so this is safe */
         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
 
index 46f8d9b..c8b4e7c 100644 (file)
 #endif
 #include "llite_internal.h"
 
+/* also used by llite/special.c:ll_special_open() */
+struct ll_file_data *ll_file_data_get(void)
+{
+        struct ll_file_data *fd;
+
+        OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
+        return fd;
+}
+
+static void ll_file_data_put(struct ll_file_data *fd)
+{
+        if (fd != NULL)
+                OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
+}
+
 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
                         struct file *file)
 {
@@ -81,7 +96,7 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
         ptlrpc_req_finished(req);
         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
         LUSTRE_FPRIVATE(file) = NULL;
-        OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
+        ll_file_data_put(fd);
 
         RETURN(rc);
 }
@@ -145,11 +160,11 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         RETURN(rc);
 }
 
-int ll_local_open(struct file *file, struct lookup_intent *it)
+int ll_local_open(struct file *file, struct lookup_intent *it,
+                  struct ll_file_data *fd)
 {
         struct ptlrpc_request *req = it->d.lustre.it_data;
         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
-        struct ll_file_data *fd;
         struct mds_body *body;
         ENTRY;
 
@@ -159,9 +174,6 @@ int ll_local_open(struct file *file, struct lookup_intent *it)
 
         LASSERT(!LUSTRE_FPRIVATE(file));
 
-        OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
-        /* We can't handle this well without reorganizing ll_file_open and
-         * ll_mdc_close, so don't even try right now. */
         LASSERT(fd != NULL);
 
         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
@@ -198,6 +210,7 @@ int ll_file_open(struct inode *inode, struct file *file)
                                           .it_flags = file->f_flags };
         struct lov_stripe_md *lsm;
         struct ptlrpc_request *req;
+        struct ll_file_data *fd;
         int rc = 0;
         ENTRY;
 
@@ -210,21 +223,29 @@ int ll_file_open(struct inode *inode, struct file *file)
 
         it = file->f_it;
 
+        fd = ll_file_data_get();
+        if (fd == NULL)
+                RETURN(-ENOMEM);
+
         if (!it || !it->d.lustre.it_disposition) {
                 it = &oit;
                 rc = ll_intent_file_open(file, NULL, 0, it);
-                if (rc)
+                if (rc) {
+                        ll_file_data_put(fd);
                         GOTO(out, rc);
         }
+        }
 
         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
         rc = it_open_error(DISP_OPEN_OPEN, it);
         /* mdc_intent_lock() didn't get a request ref if there was an open
          * error, so don't do cleanup on the request here (bug 3430) */
-        if (rc)
+        if (rc) {
+                ll_file_data_put(fd);
                 RETURN(rc);
+        }
 
-        rc = ll_local_open(file, it);
+        rc = ll_local_open(file, it, fd);
         LASSERTF(rc == 0, "rc = %d\n", rc);
 
         if (!S_ISREG(inode->i_mode))
@@ -974,11 +995,12 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
                                     int lum_size)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct file *f;
+        struct file *f = NULL;
         struct obd_export *exp = ll_i2obdexp(inode);
         struct lov_stripe_md *lsm;
         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
         struct ptlrpc_request *req = NULL;
+        struct ll_file_data *fd;
         int rc = 0;
         struct lustre_md md;
         ENTRY;
@@ -992,6 +1014,10 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
                 RETURN(-EEXIST);
         }
 
+        fd = ll_file_data_get();
+        if (fd == NULL)
+                GOTO(out, -ENOMEM);
+
         f = get_empty_filp();
         if (!f)
                 GOTO(out, -ENOMEM);
@@ -1015,9 +1041,10 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
                 GOTO(out, rc);
         ll_update_inode(f->f_dentry->d_inode, md.body, md.lsm);
 
-        rc = ll_local_open(f, &oit);
+        rc = ll_local_open(f, &oit, fd);
         if (rc)
                 GOTO(out, rc);
+        fd = NULL;
         ll_intent_release(&oit);
 
         rc = ll_file_release(f->f_dentry->d_inode, f);
@@ -1025,6 +1052,7 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
  out:
         if (f)
                 put_filp(f);
+        ll_file_data_put(fd);
         up(&lli->lli_open_sem);
         if (req != NULL)
                 ptlrpc_req_finished(req);
index 6af329b..626bf61 100644 (file)
@@ -312,13 +312,15 @@ int ll_file_open(struct inode *inode, struct file *file);
 int ll_file_release(struct inode *inode, struct file *file);
 int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *);
 int ll_glimpse_size(struct inode *inode);
-int ll_local_open(struct file *file, struct lookup_intent *it);
+int ll_local_open(struct file *file,
+                  struct lookup_intent *it, struct ll_file_data *fd);
 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
                  struct file *file);
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
                struct lookup_intent *it, struct kstat *stat);
 #endif
+struct ll_file_data *ll_file_data_get(void);
 
 /* llite/dcache.c */
 void ll_intent_drop_lock(struct lookup_intent *);
index 8908332..d9571c7 100644 (file)
@@ -722,6 +722,8 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
         int rc;
         ENTRY;
 
+        oti.oti_thread = request->rq_svc_thread;
+
         /* req is swabbed so this is safe */
         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
 
index 895f7c9..1e9355a 100644 (file)
@@ -274,9 +274,14 @@ static int ll_special_open(struct inode *inode, struct file *filp)
         struct file_operations *sfops = filp->f_op;
         struct ptlrpc_request *req;
         struct lookup_intent *it;
+        struct ll_file_data *fd;
         int rc = -EINVAL, err;
         ENTRY;
 
+        fd = ll_file_data_get();
+        if (fd == NULL)
+                RETURN(-ENOMEM);
+
         if (pfop && *pfop) {
                 /* FIXME fops_get */
                 if ((*pfop)->open) {
@@ -291,7 +296,7 @@ static int ll_special_open(struct inode *inode, struct file *filp)
 
         it = filp->f_it;
 
-        err = ll_local_open(filp, it);
+        err = ll_local_open(filp, it, fd);
         if (rc != 0) {
                 CERROR("error opening special file: rc %d\n", rc);
                 ll_mdc_close(ll_i2sbi(inode)->ll_mdc_exp, inode, filp);
index f1cd617..8ab6b71 100644 (file)
@@ -332,6 +332,7 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         if (*ids == NULL)
                 RETURN(-ENOMEM);
         oti.oti_objid = *ids;
+        oti.oti_thread = req->rq_svc_thread;
 
         /* replay case */
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
@@ -1296,6 +1297,7 @@ int mds_close(struct ptlrpc_request *req)
         if (rc) {
                 CERROR("lustre_pack_reply: rc = %d\n", rc);
                 req->rq_status = rc;
+                /* Continue on to drop local open count even if we can't send the reply */
         } else {
                 MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
         }
index b4a63d6..8e8faf0 100644 (file)
@@ -1195,6 +1195,104 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         RETURN(ELDLM_LOCK_ABORTED);
 }
 
+/*
+ * per-obd_device iobuf pool.
+ *
+ * To avoid memory deadlocks in low-memory setups, amount of dynamic
+ * allocations in write-path has to be minimized (see bug 5137).
+ *
+ * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to
+ * OST threads (see ost_thread_{init,done}()).
+ *
+ * "iobuf's" used by filter cannot be attached to OST thread, however, because
+ * at the OST layer there are only (potentially) multiple obd_device of type
+ * unknown at the time of OST thread creation.
+ *
+ * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool
+ * field). This array has size OST_NUM_THREADS, so that each OST thread uses
+ * it's very own iobuf.
+ *
+ * Functions below
+ *
+ *     filter_kiobuf_pool_init()
+ *
+ *     filter_kiobuf_pool_done()
+ *
+ *     filter_iobuf_get()
+ *
+ * operate on this array. They are "generic" in a sense that they don't depend
+ * on actual type of iobuf's (the latter depending on Linux kernel version).
+ */
+
+/*
+ * destroy pool created by filter_iobuf_pool_init
+ */
+static void filter_iobuf_pool_done(struct filter_obd *filter)
+{
+        void **pool;
+        int i;
+
+        ENTRY;
+
+        pool = filter->fo_iobuf_pool;
+        if (pool != NULL) {
+                for (i = 0; i < OST_NUM_THREADS; ++ i) {
+                        if (pool[i] != NULL)
+                                filter_free_iobuf(pool[i]);
+                }
+                OBD_FREE(pool, OST_NUM_THREADS * sizeof pool[0]);
+                filter->fo_iobuf_pool = NULL;
+        }
+        EXIT;
+}
+
+/*
+ * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
+ */
+static int filter_iobuf_pool_init(struct filter_obd *filter, int count)
+{
+        void **pool;
+        int i;
+        int result;
+
+        ENTRY;
+
+        LASSERT(count <= OST_NUM_THREADS);
+
+        OBD_ALLOC_GFP(pool, OST_NUM_THREADS * sizeof pool[0], GFP_KERNEL);
+        if (pool == NULL)
+                RETURN(-ENOMEM);
+
+        filter->fo_iobuf_pool = pool;
+        filter->fo_iobuf_count = count;
+        for (i = 0; i < count; ++ i) {
+                /*
+                 * allocate kiobuf to be used by i-th OST thread.
+                 */
+                result = filter_alloc_iobuf(filter, OBD_BRW_WRITE,
+                                            PTLRPC_MAX_BRW_PAGES,
+                                            &pool[i]);
+                if (result != 0) {
+                        filter_iobuf_pool_done(filter);
+                        break;
+                }
+        }
+        RETURN(result);
+}
+
+/*
+ * return iobuf preallocated by filter_iobuf_pool_init() for @thread.
+ */
+void *filter_iobuf_get(struct ptlrpc_thread *thread, struct filter_obd *filter)
+{
+        void *kio;
+
+        LASSERT(thread->t_id < filter->fo_iobuf_count);
+        kio = filter->fo_iobuf_pool[thread->t_id];
+        LASSERT(kio != NULL);
+        return kio;
+}
+
 /* mount the file system (secretly) */
 int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                         void *option)
@@ -1204,7 +1302,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         struct vfsmount *mnt;
         char *str;
         char ns_name[48];
-        int rc = 0;
+        int rc;
         ENTRY;
 
         if (lcfg->lcfg_bufcount < 3 ||
@@ -1216,6 +1314,10 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         if (IS_ERR(obd->obd_fsops))
                 RETURN(PTR_ERR(obd->obd_fsops));
 
+        rc = filter_iobuf_pool_init(filter, OST_NUM_THREADS);
+        if (rc != 0)
+                GOTO(err_ops, rc);
+
         mnt = do_kern_mount(lustre_cfg_string(lcfg, 2),MS_NOATIME|MS_NODIRATIME,
                             lustre_cfg_string(lcfg, 1), option);
         rc = PTR_ERR(mnt);
@@ -1332,6 +1434,7 @@ err_mntput:
         lock_kernel();
 err_ops:
         fsfilt_put_ops(obd->obd_fsops);
+        filter_iobuf_pool_done(filter);
         return rc;
 }
 
@@ -1503,6 +1606,8 @@ static int filter_cleanup(struct obd_device *obd)
 
         fsfilt_put_ops(obd->obd_fsops);
 
+        filter_iobuf_pool_done(filter);
+
         LCONSOLE_INFO("OST %s has stopped.\n", obd->obd_name);
 
         RETURN(0);
index 2fe4d3d..3ab099c 100644 (file)
@@ -131,8 +131,6 @@ int filter_brw(int cmd, struct obd_export *, struct obdo *,
               struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
               struct obd_trans_info *);
 void flip_into_page_cache(struct inode *inode, struct page *new_page);
-void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
-                           int niocount, struct niobuf_local *res);
 
 /* filter_io_*.c */
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
@@ -148,6 +146,8 @@ int filter_alloc_iobuf(struct filter_obd *, int rw, int num_pages, void **ret);
 void filter_free_iobuf(void *iobuf);
 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
                           struct inode *inode, struct page *page);
+void *filter_iobuf_get(struct ptlrpc_thread *thread, struct filter_obd *filter);
+void filter_iobuf_put(void *iobuf);
 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
                      struct obd_export *exp, struct iattr *attr,
                      struct obd_trans_info *oti, void **wait_handle);
index 4970169..8d2863d 100644 (file)
@@ -42,12 +42,9 @@ static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
 {
         struct page *page;
 
-        page = alloc_pages(GFP_HIGHUSER, 0);
-        if (page == NULL) {
-                CERROR("no memory for a temp page\n");
-                lnb->rc = -ENOMEM;
-                RETURN(-ENOMEM);
-        }
+        LASSERT(lnb->page != NULL);
+
+        page = lnb->page;
 #if 0
         POISON_PAGE(page, 0xf1);
         if (lnb->len != PAGE_SIZE) {
@@ -56,24 +53,19 @@ static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
         }
 #endif
         page->index = lnb->offset >> PAGE_SHIFT;
-        lnb->page = page;
 
         RETURN(0);
 }
 
-void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
+static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
                            int niocount, struct niobuf_local *res)
 {
         int i, j;
 
         for (i = 0; i < objcount; i++, obj++) {
-                for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
-                        if (res->page != NULL) {
-                                __free_page(res->page);
+                for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
                                 res->page = NULL;
                         }
-                }
-        }
 }
 
 /* Grab the dirty and seen grant announcements from the incoming obdo.
@@ -295,14 +287,9 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 spin_unlock(&obd->obd_osfs_lock);
         }
 
-        memset(res, 0, niocount * sizeof(*res));
-
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
-        rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_READ, obj->ioo_bufcnt,
-                                &iobuf);
-        if (rc)
-                GOTO(cleanup, rc);
+        iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
 
         dentry = filter_oa2dentry(obd, oa);
         if (IS_ERR(dentry)) {
@@ -325,21 +312,19 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
+                /*
+                 * ost_brw_write()->ost_nio_pages_get() already initialized
+                 * lnb->page to point to the page from the per-thread page
+                 * pool (bug 5137), initialize page.
+                 */
+                LASSERT(lnb->page != NULL);
+
                 if (inode->i_size <= rnb->offset)
-                      /* If there's no more data, abort early.
-                       * lnb->page == NULL and lnb->rc == 0, so it's
-                       * easy to detect later. */
+                        /* If there's no more data, abort early.  lnb->rc == 0,
+                         * so it's easy to detect later. */
                         break;
                 else
-                        rc = filter_alloc_dio_page(obd, inode, lnb);
-
-                if (rc) {
-                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                             "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                              lnb->len, lnb->offset, i, obj->ioo_bufcnt,
-                              dentry, rc);
-                        GOTO(cleanup, rc);
-                }
+                        filter_alloc_dio_page(obd, inode, lnb);
 
                 if (inode->i_size < lnb->offset + lnb->len - 1)
                         lnb->rc = inode->i_size - lnb->offset;
@@ -372,8 +357,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                         f_dput(dentry);
         }
 
-        if (iobuf != NULL)
-                filter_free_iobuf(iobuf);
+        filter_iobuf_put(iobuf);
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         if (rc)
@@ -525,13 +509,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
-        memset(res, 0, niocount * sizeof(*res));
-
-        /* This iobuf is for reading any partial pages from disk */
-        rc = filter_alloc_iobuf(&exp->exp_obd->u.filter, OBD_BRW_READ,
-                                obj->ioo_bufcnt, &iobuf);
-        if (rc)
-                GOTO(cleanup, rc);
+        iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
         cleanup_phase = 1;
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
@@ -589,13 +567,19 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
-                if (rc) {
-                        CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
-                               lnb->len, lnb->offset,
-                               i, obj->ioo_bufcnt, dentry, rc);
-                        GOTO(cleanup, rc);
+                /*
+                 * ost_brw_write()->ost_nio_pages_get() already initialized
+                 * lnb->page to point to the page from the per-thread page
+                 * pool (bug 5137), initialize page.
+                 */
+                LASSERT(lnb->page != NULL);
+                if (lnb->len != PAGE_SIZE) {
+                        memset(kmap(lnb->page) + lnb->len,
+                               0, PAGE_SIZE - lnb->len);
+                        kunmap(lnb->page);
                 }
+                lnb->page->index = lnb->offset >> PAGE_SHIFT;
+
                 cleanup_phase = 4;
 
                 /* If the filter writes a partial page, then has the file
@@ -630,7 +614,6 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                 kunmap(lnb->page);
                         }
                 }
-
                 if (lnb->rc == 0)
                         tot_bytes += lnb->len;
         }
@@ -646,10 +629,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 cleanup:
         switch(cleanup_phase) {
         case 4:
-                if (rc)
-                        filter_free_dio_pages(objcount, obj, niocount, res);
         case 3:
-                filter_free_iobuf(iobuf);
+                filter_iobuf_put(iobuf);
         case 2:
                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
                 if (rc)
@@ -661,7 +642,7 @@ cleanup:
                         filter_grant_incoming(exp, oa);
                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
                 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-                filter_free_iobuf(iobuf);
+                filter_iobuf_put(iobuf);
                 break;
         default:;
         }
index 4d43bb3..195eeaf 100644 (file)
@@ -306,6 +306,11 @@ static void clear_kiobuf(struct kiobuf *iobuf)
         iobuf->length = 0;
 }
 
+void filter_iobuf_put(void *iobuf)
+{
+        clear_kiobuf(iobuf);
+}
+
 int filter_alloc_iobuf(struct filter_obd *filter, int rw, int num_pages,
                        void **ret)
 {
@@ -375,10 +380,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
         if (rc != 0)
                 GOTO(cleanup, rc);
 
-        rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_WRITE,
-                                obj->ioo_bufcnt, &iobuf);
-        if (rc)
-                GOTO(cleanup, rc);
+        iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
         cleanup_phase = 1;
 
         fso.fso_dentry = res->dentry;
@@ -450,9 +452,12 @@ cleanup:
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 LASSERT(current->journal_info == NULL);
         case 1:
-                filter_free_iobuf(iobuf);
+                filter_iobuf_put(iobuf);
         case 0:
-                filter_free_dio_pages(objcount, obj, niocount, res);
+                /*
+                 * lnb->page automatically returns back into per-thread page
+                 * pool (bug 5137)
+                 */
                 f_dput(res->dentry);
         }
 
index 311ea15..c393282 100644 (file)
@@ -201,10 +201,9 @@ int filter_alloc_iobuf(struct filter_obd *filter, int rw, int num_pages,
         RETURN(-ENOMEM);
 }
 
-void filter_free_iobuf(void *iobuf)
+void filter_iobuf_put(void *iobuf)
 {
         struct dio_request *dreq = iobuf;
-        int                 num_pages = dreq->dr_max_pages;
 
         /* free all bios */
         while (dreq->dr_bios) {
@@ -212,12 +211,22 @@ void filter_free_iobuf(void *iobuf)
                 dreq->dr_bios = bio->bi_private;
                 bio_put(bio);
         }
+        dreq->dr_npages = 0;
+        atomic_set(&dreq->dr_numreqs, 0);
+}
+
+void filter_free_iobuf(void *iobuf)
+{
+        struct dio_request *dreq = iobuf;
+        int                 num_pages = dreq->dr_max_pages;
+
+        filter_iobuf_put(dreq);
 
         OBD_FREE(dreq->dr_blocks,
                  MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
         OBD_FREE(dreq->dr_pages,
                  num_pages * sizeof(*dreq->dr_pages));
-        OBD_FREE(dreq, sizeof(*dreq));
+        OBD_FREE_PTR(dreq);
 }
 
 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
@@ -522,10 +531,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         if (rc != 0)
                 GOTO(cleanup, rc);
 
-        rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_WRITE, obj->ioo_bufcnt,
-                                (void **)&dreq);
-        if (rc)
-                GOTO(cleanup, rc);
+        dreq = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
         cleanup_phase = 1;
 
         fso.fso_dentry = res->dentry;
@@ -617,9 +623,12 @@ cleanup:
                         OBD_FREE(uc, sizeof(*uc));
                 LASSERT(current->journal_info == NULL);
         case 1:
-                filter_free_iobuf(dreq);
+                filter_iobuf_put(dreq);
         case 0:
-                filter_free_dio_pages(objcount, obj, niocount, res);
+                /*
+                 * lnb->page automatically returns back into per-thread page
+                 * pool (bug 5137)
+                 */
                 f_dput(res->dentry);
         }
 
index 8615d7c..9ca7b6d 100644 (file)
@@ -56,6 +56,7 @@ void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
 
         if (req->rq_repmsg && req->rq_reqmsg != 0)
                 oti->oti_transno = req->rq_repmsg->transno;
+        oti->oti_thread = req->rq_svc_thread;
 }
 
 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
@@ -254,6 +255,12 @@ static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
         int   rnbidx = 0;
         int   npages = 0;
 
+        /*
+         * array of sufficient size already preallocated by caller
+         */
+        LASSERT(pp_rnbp != NULL);
+        LASSERT(*pp_rnbp != NULL);
+
         /* first count and check the number of pages required */
         for (i = 0; i < nioo; i++)
                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
@@ -287,9 +294,7 @@ static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
                 return npages;
         }
 
-        OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
-        if (pp_rnb == NULL)
-                return -ENOMEM;
+        pp_rnb = *pp_rnbp;
 
         /* now do the actual split */
         page = rnbidx = 0;
@@ -328,19 +333,9 @@ static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
         }
         LASSERT(page == npages);
 
-        *pp_rnbp = pp_rnb;
         return npages;
 }
 
-static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
-                                   struct niobuf_remote *rnb)
-{
-        if (pp_rnb == rnb)                      /* didn't allocate above */
-                return;
-
-        OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
-}
-
 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
 {
         __u32 cksum = ~0;
@@ -367,11 +362,59 @@ static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
         return cksum;
 }
 
+/*
+ * populate @nio by @nrpages pages from per-thread page pool
+ */
+static void ost_nio_pages_get(struct ptlrpc_request *req,
+                              struct niobuf_local *nio, int nrpages)
+{
+        int i;
+        struct ost_thread_local_cache *tls;
+
+        ENTRY;
+
+        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+        LASSERT(req != NULL);
+        LASSERT(req->rq_svc_thread != NULL);
+
+        tls = ost_tls(req);
+        LASSERT(tls != NULL);
+
+        memset(nio, 0, nrpages * sizeof *nio);
+        for (i = 0; i < nrpages; ++ i) {
+                struct page *page;
+
+                page = tls->page[i];
+                LASSERT(page != NULL);
+                POISON_PAGE(page, 0xf1);
+                nio[i].page = page;
+                LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
+        }
+        EXIT;
+}
+
+/*
+ * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
+ */
+static void ost_nio_pages_put(struct ptlrpc_request *req,
+                              struct niobuf_local *nio, int nrpages)
+{
+        int i;
+
+        ENTRY;
+
+        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+
+        for (i = 0; i < nrpages; ++ i)
+                POISON_PAGE(nio[i].page, 0xf2);
+        EXIT;
+}
+
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc;
         struct niobuf_remote    *remote_nb;
-        struct niobuf_remote    *pp_rnb;
+        struct niobuf_remote    *pp_rnb = NULL;
         struct niobuf_local     *local_nb;
         struct obd_ioobj        *ioo;
         struct ost_body         *body, *repbody;
@@ -425,20 +468,27 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (rc)
                 GOTO(out, rc);
 
+        /*
+         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
+         * ost_thread_init().
+         */
+        local_nb = ost_tls(req)->local;
+        pp_rnb   = ost_tls(req)->remote;
+
         /* FIXME all niobuf splitting should be done in obdfilter if needed */
         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
         if (npages < 0)
                 GOTO(out, rc = npages);
 
-        OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
-        if (local_nb == NULL)
-                GOTO(out_pp_rnb, rc = -ENOMEM);
+        LASSERT(npages <= OST_THREAD_POOL_SIZE);
+
+        ost_nio_pages_get(req, local_nb, npages);
 
         desc = ptlrpc_prep_bulk_exp (req, npages, 
                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
         if (desc == NULL)
-                GOTO(out_local, rc = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
                         ioo, npages, pp_rnb, local_nb, oti);
@@ -506,6 +556,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
                           ioo, npages, local_nb, oti, rc);
 
+        ost_nio_pages_put(req, local_nb, npages);
+
         if (rc == 0) {
                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
@@ -520,10 +572,6 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
  out_bulk:
         ptlrpc_free_bulk(desc);
- out_local:
-        OBD_FREE(local_nb, sizeof(*local_nb) * npages);
- out_pp_rnb:
-        free_per_page_niobufs(npages, pp_rnb, remote_nb);
  out:
         LASSERT(rc <= 0);
         if (rc == 0) {
@@ -636,20 +684,27 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out, rc);
         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
 
+        /*
+         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
+         * ost_thread_init().
+         */
+        local_nb = ost_tls(req)->local;
+        pp_rnb   = ost_tls(req)->remote;
+
         /* FIXME all niobuf splitting should be done in obdfilter if needed */
         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
         if (npages < 0)
                 GOTO(out, rc = npages);
 
-        OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
-        if (local_nb == NULL)
-                GOTO(out_pp_rnb, rc = -ENOMEM);
+        LASSERT(npages <= OST_THREAD_POOL_SIZE);
+
+        ost_nio_pages_get(req, local_nb, npages);
 
         desc = ptlrpc_prep_bulk_exp (req, npages, 
                                      BULK_GET_SINK, OST_BULK_PORTAL);
         if (desc == NULL)
-                GOTO(out_local, rc = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM);
@@ -718,6 +773,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
                            objcount, ioo, npages, local_nb, oti, rc);
 
+        ost_nio_pages_put(req, local_nb, npages);
+
         if (rc == 0) {
                 /* set per-requested niobuf return codes */
                 for (i = j = 0; i < niocount; i++) {
@@ -738,10 +795,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
  out_bulk:
         ptlrpc_free_bulk(desc);
- out_local:
-        OBD_FREE(local_nb, sizeof(*local_nb) * npages);
- out_pp_rnb:
-        free_per_page_niobufs(npages, pp_rnb, remote_nb);
  out:
         if (rc == 0) {
                 oti_to_request(oti, req);
@@ -778,7 +831,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
 {
-        struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
+        struct niobuf_remote *remote_nb, *res_nb, *pp_rnb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body, *repbody;
         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
@@ -818,6 +871,12 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
                         lustre_swab_niobuf_remote (&remote_nb[i]);
         }
 
+        /*
+         * Per-thread array of struct niobuf_remote's was allocated by
+         * ost_thread_init().
+         */
+        pp_rnb = ost_tls(req)->remote;
+
         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
         if (npages < 0)
@@ -826,13 +885,13 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
         size[1] = npages * sizeof(*pp_rnb);
         rc = lustre_pack_reply(req, 2, size, NULL);
         if (rc)
-                GOTO(out_pp_rnb, rc);
+                GOTO(out, rc);
 
         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
                                         objcount, ioo, npages, pp_rnb);
 
         if (req->rq_status)
-                GOTO(out_pp_rnb, rc = 0);
+                GOTO(out, rc = 0);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
@@ -840,8 +899,6 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
         memcpy(res_nb, remote_nb, size[1]);
         rc = 0;
-out_pp_rnb:
-        free_per_page_niobufs(npages, pp_rnb, remote_nb);
 out:
         target_committed_to_req(req);
         if (rc) {
@@ -1154,6 +1211,71 @@ out:
         return 0;
 }
 
+/*
+ * free per-thread pool created by ost_thread_init().
+ */
+static void ost_thread_done(struct ptlrpc_thread *thread)
+{
+        int i;
+        struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
+                                             * Storage */
+
+        ENTRY;
+
+        LASSERT(thread != NULL);
+        LASSERT(thread->t_data != NULL);
+
+        /*
+         * be prepared to handle partially-initialized pools (because this is
+         * called from ost_thread_init() for cleanup.
+         */
+        tls = thread->t_data;
+        if (tls != NULL) {
+                for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
+                        if (tls->page[i] != NULL)
+                                __free_page(tls->page[i]);
+                }
+                OBD_FREE_PTR(tls);
+                thread->t_data = NULL;
+        }
+        EXIT;
+}
+
+/*
+ * initialize per-thread page pool (bug 5137).
+ */
+static int ost_thread_init(struct ptlrpc_thread *thread)
+{
+        int result;
+        int i;
+        struct ost_thread_local_cache *tls;
+
+        ENTRY;
+
+        LASSERT(thread != NULL);
+        LASSERT(thread->t_data == NULL);
+        LASSERT(thread->t_id < OST_NUM_THREADS);
+
+        OBD_ALLOC_PTR(tls);
+        if (tls != NULL) {
+                result = 0;
+                thread->t_data = tls;
+                /*
+                 * populate pool
+                 */
+                for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
+                        tls->page[i] = alloc_page(OST_THREAD_POOL_GFP);
+                        if (tls->page[i] == NULL) {
+                                ost_thread_done(thread);
+                                result = -ENOMEM;
+                                break;
+                        }
+                }
+        } else
+                result = -ENOMEM;
+        RETURN(result);
+}
+
 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct ost_obd *ost = &obd->u.ost;
@@ -1182,8 +1304,10 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(out_lprocfs, rc = -ENOMEM);
         }
 
-        rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
-                                    "ll_ost");
+        ost->ost_service->srv_init = ost_thread_init;
+        ost->ost_service->srv_done = ost_thread_done;
+        rc = ptlrpc_start_n_threads(obd, ost->ost_service,
+                                    OST_NUM_THREADS, "ll_ost");
         if (rc)
                 GOTO(out_service, rc = -EINVAL);
 
@@ -1197,8 +1321,8 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(out_service, rc = -ENOMEM);
         }
 
-        rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
-                                    "ll_ost_creat");
+        rc = ptlrpc_start_n_threads(obd, ost->ost_create_service,
+                                    1, "ll_ost_creat");
         if (rc)
                 GOTO(out_create, rc = -EINVAL);
 
@@ -1234,6 +1358,11 @@ static int ost_cleanup(struct obd_device *obd)
         RETURN(err);
 }
 
+struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
+{
+        return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+}
+
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
         .o_owner        = THIS_MODULE,
index 9f31f25..9c7fecc 100644 (file)
@@ -11,6 +11,40 @@ extern void ost_print_req(void *seq_file, struct ptlrpc_request *req);
 # define ost_print_req NULL
 #endif
 
+/*
+ * tunables for per-thread page pool (bug 5137)
+ */
+enum {
+        /*
+         * pool size in pages
+         */
+        OST_THREAD_POOL_SIZE = PTLRPC_MAX_BRW_PAGES,
+        /*
+         * GFP mask used to allocate pages for pool
+         */
+        OST_THREAD_POOL_GFP  = GFP_HIGHUSER
+};
+
+struct page;
+struct niobuf_local;
+struct niobuf_remote;
+struct ptlrpc_request;
+
+/*
+ * struct ost_thread_local_cache is allocated and initialized for each OST
+ * thread by ost_thread_init().
+ */
+struct ost_thread_local_cache {
+        /*
+         * pool of pages and nio buffers used by write-path
+         */
+        struct page          *page  [OST_THREAD_POOL_SIZE];
+        struct niobuf_local   local [OST_THREAD_POOL_SIZE];
+        struct niobuf_remote  remote[OST_THREAD_POOL_SIZE];
+};
+
+struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
+
 #ifdef HAVE_QUOTA_SUPPORT
 /* Quota stuff */
 int ost_quotacheck(struct ptlrpc_request *req);
index 94d42d0..e54449d 100644 (file)
@@ -433,7 +433,8 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
 }
 
 static int
-ptlrpc_server_handle_request (struct ptlrpc_service *svc)
+ptlrpc_server_handle_request(struct ptlrpc_service *svc,
+                             struct ptlrpc_thread *thread)
 {
         struct ptlrpc_request *request;
         unsigned long          flags;
@@ -496,6 +497,7 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
 
         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
 
+        request->rq_svc_thread = thread;
         request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
 
         if (request->rq_export) {
@@ -716,7 +718,7 @@ liblustre_check_services (void *arg)
 
                 do {
                         rc = ptlrpc_server_handle_reply(svc);
-                        rc |= ptlrpc_server_handle_request(svc);
+                        rc |= ptlrpc_server_handle_request(svc, NULL);
                         rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
                         did_something |= rc;
                 } while (rc);
@@ -813,6 +815,10 @@ static int ptlrpc_main(void *arg)
 
         /* Record that the thread is running */
         thread->t_flags = SVC_RUNNING;
+        /*
+         * wake up our creator. Note: @data is invalid after this point,
+         * because it's allocated on ptlrpc_start_thread() stack.
+         */
         wake_up(&thread->t_ctl_waitq);
 
         watchdog = lc_watchdog_add(svc->srv_watchdog_timeout,
@@ -857,7 +863,7 @@ static int ptlrpc_main(void *arg)
                 if (!list_empty (&svc->srv_request_queue) &&
                     (svc->srv_n_difficult_replies == 0 ||
                      svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
-                        ptlrpc_server_handle_request (svc);
+                        ptlrpc_server_handle_request(svc, thread);
 
                 if (!list_empty(&svc->srv_idle_rqbds) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
@@ -868,6 +874,12 @@ static int ptlrpc_main(void *arg)
                 }
         }
 
+        /*
+         * deconstruct service specific state created by ptlrpc_start_thread()
+         */
+        if (svc->srv_done != NULL)
+                svc->srv_done(thread);
+
         spin_lock_irqsave(&svc->srv_lock, flags);
 
         svc->srv_nthreads--;                    /* must know immediately */
@@ -931,7 +943,7 @@ int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
         for (i = 0; i < num_threads; i++) {
                 char name[32];
                 sprintf(name, "%s_%02d", base_name, i);
-                rc = ptlrpc_start_thread(dev, svc, name);
+                rc = ptlrpc_start_thread(dev, svc, name, i);
                 if (rc) {
                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
                                i, rc);
@@ -942,7 +954,7 @@ int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
 }
 
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
-                        char *name)
+                        char *name, int id)
 {
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
@@ -955,16 +967,23 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
         if (thread == NULL)
                 RETURN(-ENOMEM);
         init_waitqueue_head(&thread->t_ctl_waitq);
+        thread->t_id = id;
+          
+        if (svc->srv_init != NULL) {
+                rc = svc->srv_init(thread);
+                if (rc != 0)
+                        RETURN(rc);
+        }
+
+        spin_lock_irqsave(&svc->srv_lock, flags);
+        list_add(&thread->t_link, &svc->srv_threads);
+        spin_unlock_irqrestore(&svc->srv_lock, flags);
 
         d.dev = dev;
         d.svc = svc;
         d.name = name;
         d.thread = thread;
 
-        spin_lock_irqsave(&svc->srv_lock, flags);
-        list_add(&thread->t_link, &svc->srv_threads);
-        spin_unlock_irqrestore(&svc->srv_lock, flags);
-
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in ptlrpc_daemonize() right away.
          */
@@ -975,6 +994,10 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                 spin_lock_irqsave(&svc->srv_lock, flags);
                 list_del(&thread->t_link);
                 spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+                if (svc->srv_done != NULL)
+                        svc->srv_done(thread);
+
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }