access to this struct. In 2.6 kernels there is finer grained
locking to improve SMP performance of the JBD layer.
-
+Severity : major
+Frequency : rare (only unsupported configurations with a node running as an
+ OST and a client)
+Bugzilla : 6514, 5137
+Description: Mounting a Lustre file system on a node running as an OST could
+ lead to deadlocks
+Details : OSTs now allocate memory needed to write out data at
+ startup, instead of when needed, to avoid having to
+ allocate memory in possibly low memory situations.
+ Specifically, if the file system is mounted on on OST,
+ memory pressure could force it to try to write out data,
+ which it needed to allocate memory to do. Due to the low
+ memory, it would be unable to do so and the node would
+ become unresponsive.
+
------------------------------------------------------------------------------
2005-06-20 Cluster File Systems, Inc. <info@clusterfs.com>
struct lustre_msg rs_msg;
};
+struct ptlrpc_thread;
+
enum rq_phase {
RQ_PHASE_NEW = 0xebc0de00,
RQ_PHASE_RPC = 0xebc0de01,
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
atomic_t rq_refcount; /* client-side refcount for SENT race */
+ struct ptlrpc_thread *rq_svc_thread; /* initial thread servicing req */
+
int rq_request_portal; /* XXX FIXME bug 249 */
int rq_reply_portal; /* XXX FIXME bug 249 */
};
struct ptlrpc_thread {
- struct list_head t_link;
+
+ struct list_head t_link; /* active threads for service, from svc->srv_threads */
__u32 t_flags;
+
+ void *t_data; /* thread-private data (preallocated memory) */
+
+ unsigned int t_id; /* service thread index, from ptlrpc_start_n_threads */
wait_queue_head_t t_ctl_waitq;
};
struct proc_dir_entry *srv_procroot;
struct lprocfs_stats *srv_stats;
+ /*
+ * if non-NULL called during thread creation (ptlrpc_start_thread())
+ * to initialize service specific per-thread state.
+ */
+ int (*srv_init)(struct ptlrpc_thread *thread);
+ /*
+ * if non-NULL called during thread shutdown (ptlrpc_main()) to
+ * destruct state created by ->srv_init().
+ */
+ void (*srv_done)(struct ptlrpc_thread *thread);
+
struct ptlrpc_srv_ni srv_interfaces[0];
};
struct proc_dir_entry *proc_entry,
svcreq_printfn_t);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
+
int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
int cnt, char *base_name);
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
- char *name);
+ char *name, int id);
int ptlrpc_unregister_service(struct ptlrpc_service *service);
int liblustre_check_services (void *arg);
void ptlrpc_daemonize(void);
int fo_r_in_flight; /* protected by fo_stats_lock */
int fo_w_in_flight; /* protected by fo_stats_lock */
+ /*
+ * per-filter pool of kiobuf's allocated by filter_common_setup() and
+ * torn down by filter_cleanup(). Contains OST_NUM_THREADS elements of
+ * which ->fo_iobuf_count were allocated.
+ *
+ * This pool contains kiobuf used by
+ * filter_{prep,commit}rw_{read,write}() and is shared by all OST
+ * threads.
+ *
+ * Locking: none, each OST thread uses only one element, determined by
+ * its "ordinal number", ->t_id.
+ *
+ * This is (void *) array, because 2.4 and 2.6 use different iobuf
+ * structures.
+ */
+ void **fo_iobuf_pool;
+ int fo_iobuf_count;
+
struct obd_histogram fo_r_pages;
struct obd_histogram fo_w_pages;
struct obd_histogram fo_read_rpc_hist;
struct llog_cookie oti_onecookie;
struct llog_cookie *oti_logcookies;
int oti_numcookies;
+
+ /* initial thread handling transaction */
+ struct ptlrpc_thread *oti_thread;
};
static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
#define OBD_ALLOC(ptr, size) OBD_ALLOC_GFP(ptr, size, OBD_GFP_MASK)
#define OBD_ALLOC_WAIT(ptr, size) OBD_ALLOC_GFP(ptr, size, GFP_KERNEL)
+#define OBD_ALLOC_PTR(ptr) OBD_ALLOC(ptr, sizeof *(ptr))
+#define OBD_ALLOC_PTR_WAIT(ptr) OBD_ALLOC_WAIT(ptr, sizeof *(ptr))
#ifdef __arch_um__
# define OBD_VMALLOC(ptr, size) OBD_ALLOC(ptr, size)
} \
} while (0)
+#define OBD_FREE_PTR(ptr) OBD_FREE(ptr, sizeof *(ptr))
+
#define OBD_SLAB_FREE(ptr, slab, size) \
do { \
LASSERT(ptr); \
int rc;
ENTRY;
+ oti.oti_thread = request->rq_svc_thread;
/* req is swabbed so this is safe */
body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
#endif
#include "llite_internal.h"
+/* also used by llite/special.c:ll_special_open() */
+struct ll_file_data *ll_file_data_get(void)
+{
+ struct ll_file_data *fd;
+
+ OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
+ return fd;
+}
+
+static void ll_file_data_put(struct ll_file_data *fd)
+{
+ if (fd != NULL)
+ OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
+}
+
int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
struct file *file)
{
ptlrpc_req_finished(req);
och->och_fh.cookie = DEAD_HANDLE_MAGIC;
LUSTRE_FPRIVATE(file) = NULL;
- OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
+ ll_file_data_put(fd);
RETURN(rc);
}
RETURN(rc);
}
-int ll_local_open(struct file *file, struct lookup_intent *it)
+int ll_local_open(struct file *file, struct lookup_intent *it,
+ struct ll_file_data *fd)
{
struct ptlrpc_request *req = it->d.lustre.it_data;
struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
- struct ll_file_data *fd;
struct mds_body *body;
ENTRY;
LASSERT(!LUSTRE_FPRIVATE(file));
- OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
- /* We can't handle this well without reorganizing ll_file_open and
- * ll_mdc_close, so don't even try right now. */
LASSERT(fd != NULL);
memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
.it_flags = file->f_flags };
struct lov_stripe_md *lsm;
struct ptlrpc_request *req;
+ struct ll_file_data *fd;
int rc = 0;
ENTRY;
it = file->f_it;
+ fd = ll_file_data_get();
+ if (fd == NULL)
+ RETURN(-ENOMEM);
+
if (!it || !it->d.lustre.it_disposition) {
it = &oit;
rc = ll_intent_file_open(file, NULL, 0, it);
- if (rc)
+ if (rc) {
+ ll_file_data_put(fd);
GOTO(out, rc);
}
+ }
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
rc = it_open_error(DISP_OPEN_OPEN, it);
/* mdc_intent_lock() didn't get a request ref if there was an open
* error, so don't do cleanup on the request here (bug 3430) */
- if (rc)
+ if (rc) {
+ ll_file_data_put(fd);
RETURN(rc);
+ }
- rc = ll_local_open(file, it);
+ rc = ll_local_open(file, it, fd);
LASSERTF(rc == 0, "rc = %d\n", rc);
if (!S_ISREG(inode->i_mode))
int lum_size)
{
struct ll_inode_info *lli = ll_i2info(inode);
- struct file *f;
+ struct file *f = NULL;
struct obd_export *exp = ll_i2obdexp(inode);
struct lov_stripe_md *lsm;
struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
struct ptlrpc_request *req = NULL;
+ struct ll_file_data *fd;
int rc = 0;
struct lustre_md md;
ENTRY;
RETURN(-EEXIST);
}
+ fd = ll_file_data_get();
+ if (fd == NULL)
+ GOTO(out, -ENOMEM);
+
f = get_empty_filp();
if (!f)
GOTO(out, -ENOMEM);
GOTO(out, rc);
ll_update_inode(f->f_dentry->d_inode, md.body, md.lsm);
- rc = ll_local_open(f, &oit);
+ rc = ll_local_open(f, &oit, fd);
if (rc)
GOTO(out, rc);
+ fd = NULL;
ll_intent_release(&oit);
rc = ll_file_release(f->f_dentry->d_inode, f);
out:
if (f)
put_filp(f);
+ ll_file_data_put(fd);
up(&lli->lli_open_sem);
if (req != NULL)
ptlrpc_req_finished(req);
int ll_file_release(struct inode *inode, struct file *file);
int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *);
int ll_glimpse_size(struct inode *inode);
-int ll_local_open(struct file *file, struct lookup_intent *it);
+int ll_local_open(struct file *file,
+ struct lookup_intent *it, struct ll_file_data *fd);
int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
struct file *file);
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
int ll_getattr(struct vfsmount *mnt, struct dentry *de,
struct lookup_intent *it, struct kstat *stat);
#endif
+struct ll_file_data *ll_file_data_get(void);
/* llite/dcache.c */
void ll_intent_drop_lock(struct lookup_intent *);
int rc;
ENTRY;
+ oti.oti_thread = request->rq_svc_thread;
+
/* req is swabbed so this is safe */
body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
struct file_operations *sfops = filp->f_op;
struct ptlrpc_request *req;
struct lookup_intent *it;
+ struct ll_file_data *fd;
int rc = -EINVAL, err;
ENTRY;
+ fd = ll_file_data_get();
+ if (fd == NULL)
+ RETURN(-ENOMEM);
+
if (pfop && *pfop) {
/* FIXME fops_get */
if ((*pfop)->open) {
it = filp->f_it;
- err = ll_local_open(filp, it);
+ err = ll_local_open(filp, it, fd);
if (rc != 0) {
CERROR("error opening special file: rc %d\n", rc);
ll_mdc_close(ll_i2sbi(inode)->ll_mdc_exp, inode, filp);
if (*ids == NULL)
RETURN(-ENOMEM);
oti.oti_objid = *ids;
+ oti.oti_thread = req->rq_svc_thread;
/* replay case */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
if (rc) {
CERROR("lustre_pack_reply: rc = %d\n", rc);
req->rq_status = rc;
+ /* Continue on to drop local open count even if we can't send the reply */
} else {
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
}
RETURN(ELDLM_LOCK_ABORTED);
}
+/*
+ * per-obd_device iobuf pool.
+ *
+ * To avoid memory deadlocks in low-memory setups, amount of dynamic
+ * allocations in write-path has to be minimized (see bug 5137).
+ *
+ * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to
+ * OST threads (see ost_thread_{init,done}()).
+ *
+ * "iobuf's" used by filter cannot be attached to OST thread, however, because
+ * at the OST layer there are only (potentially) multiple obd_device of type
+ * unknown at the time of OST thread creation.
+ *
+ * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool
+ * field). This array has size OST_NUM_THREADS, so that each OST thread uses
+ * it's very own iobuf.
+ *
+ * Functions below
+ *
+ * filter_kiobuf_pool_init()
+ *
+ * filter_kiobuf_pool_done()
+ *
+ * filter_iobuf_get()
+ *
+ * operate on this array. They are "generic" in a sense that they don't depend
+ * on actual type of iobuf's (the latter depending on Linux kernel version).
+ */
+
+/*
+ * destroy pool created by filter_iobuf_pool_init
+ */
+static void filter_iobuf_pool_done(struct filter_obd *filter)
+{
+ void **pool;
+ int i;
+
+ ENTRY;
+
+ pool = filter->fo_iobuf_pool;
+ if (pool != NULL) {
+ for (i = 0; i < OST_NUM_THREADS; ++ i) {
+ if (pool[i] != NULL)
+ filter_free_iobuf(pool[i]);
+ }
+ OBD_FREE(pool, OST_NUM_THREADS * sizeof pool[0]);
+ filter->fo_iobuf_pool = NULL;
+ }
+ EXIT;
+}
+
+/*
+ * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
+ */
+static int filter_iobuf_pool_init(struct filter_obd *filter, int count)
+{
+ void **pool;
+ int i;
+ int result;
+
+ ENTRY;
+
+ LASSERT(count <= OST_NUM_THREADS);
+
+ OBD_ALLOC_GFP(pool, OST_NUM_THREADS * sizeof pool[0], GFP_KERNEL);
+ if (pool == NULL)
+ RETURN(-ENOMEM);
+
+ filter->fo_iobuf_pool = pool;
+ filter->fo_iobuf_count = count;
+ for (i = 0; i < count; ++ i) {
+ /*
+ * allocate kiobuf to be used by i-th OST thread.
+ */
+ result = filter_alloc_iobuf(filter, OBD_BRW_WRITE,
+ PTLRPC_MAX_BRW_PAGES,
+ &pool[i]);
+ if (result != 0) {
+ filter_iobuf_pool_done(filter);
+ break;
+ }
+ }
+ RETURN(result);
+}
+
+/*
+ * return iobuf preallocated by filter_iobuf_pool_init() for @thread.
+ */
+void *filter_iobuf_get(struct ptlrpc_thread *thread, struct filter_obd *filter)
+{
+ void *kio;
+
+ LASSERT(thread->t_id < filter->fo_iobuf_count);
+ kio = filter->fo_iobuf_pool[thread->t_id];
+ LASSERT(kio != NULL);
+ return kio;
+}
+
/* mount the file system (secretly) */
int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
void *option)
struct vfsmount *mnt;
char *str;
char ns_name[48];
- int rc = 0;
+ int rc;
ENTRY;
if (lcfg->lcfg_bufcount < 3 ||
if (IS_ERR(obd->obd_fsops))
RETURN(PTR_ERR(obd->obd_fsops));
+ rc = filter_iobuf_pool_init(filter, OST_NUM_THREADS);
+ if (rc != 0)
+ GOTO(err_ops, rc);
+
mnt = do_kern_mount(lustre_cfg_string(lcfg, 2),MS_NOATIME|MS_NODIRATIME,
lustre_cfg_string(lcfg, 1), option);
rc = PTR_ERR(mnt);
lock_kernel();
err_ops:
fsfilt_put_ops(obd->obd_fsops);
+ filter_iobuf_pool_done(filter);
return rc;
}
fsfilt_put_ops(obd->obd_fsops);
+ filter_iobuf_pool_done(filter);
+
LCONSOLE_INFO("OST %s has stopped.\n", obd->obd_name);
RETURN(0);
struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
struct obd_trans_info *);
void flip_into_page_cache(struct inode *inode, struct page *new_page);
-void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *res);
/* filter_io_*.c */
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
void filter_free_iobuf(void *iobuf);
int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
struct inode *inode, struct page *page);
+void *filter_iobuf_get(struct ptlrpc_thread *thread, struct filter_obd *filter);
+void filter_iobuf_put(void *iobuf);
int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
struct obd_export *exp, struct iattr *attr,
struct obd_trans_info *oti, void **wait_handle);
{
struct page *page;
- page = alloc_pages(GFP_HIGHUSER, 0);
- if (page == NULL) {
- CERROR("no memory for a temp page\n");
- lnb->rc = -ENOMEM;
- RETURN(-ENOMEM);
- }
+ LASSERT(lnb->page != NULL);
+
+ page = lnb->page;
#if 0
POISON_PAGE(page, 0xf1);
if (lnb->len != PAGE_SIZE) {
}
#endif
page->index = lnb->offset >> PAGE_SHIFT;
- lnb->page = page;
RETURN(0);
}
-void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
+static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_local *res)
{
int i, j;
for (i = 0; i < objcount; i++, obj++) {
- for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
- if (res->page != NULL) {
- __free_page(res->page);
+ for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
res->page = NULL;
}
- }
- }
}
/* Grab the dirty and seen grant announcements from the incoming obdo.
spin_unlock(&obd->obd_osfs_lock);
}
- memset(res, 0, niocount * sizeof(*res));
-
push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_READ, obj->ioo_bufcnt,
- &iobuf);
- if (rc)
- GOTO(cleanup, rc);
+ iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
dentry = filter_oa2dentry(obd, oa);
if (IS_ERR(dentry)) {
lnb->len = rnb->len;
lnb->flags = rnb->flags;
+ /*
+ * ost_brw_write()->ost_nio_pages_get() already initialized
+ * lnb->page to point to the page from the per-thread page
+ * pool (bug 5137), initialize page.
+ */
+ LASSERT(lnb->page != NULL);
+
if (inode->i_size <= rnb->offset)
- /* If there's no more data, abort early.
- * lnb->page == NULL and lnb->rc == 0, so it's
- * easy to detect later. */
+ /* If there's no more data, abort early. lnb->rc == 0,
+ * so it's easy to detect later. */
break;
else
- rc = filter_alloc_dio_page(obd, inode, lnb);
-
- if (rc) {
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "page err %u@"LPU64" %u/%u %p: rc %d\n",
- lnb->len, lnb->offset, i, obj->ioo_bufcnt,
- dentry, rc);
- GOTO(cleanup, rc);
- }
+ filter_alloc_dio_page(obd, inode, lnb);
if (inode->i_size < lnb->offset + lnb->len - 1)
lnb->rc = inode->i_size - lnb->offset;
f_dput(dentry);
}
- if (iobuf != NULL)
- filter_free_iobuf(iobuf);
+ filter_iobuf_put(iobuf);
pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
if (rc)
LASSERT(objcount == 1);
LASSERT(obj->ioo_bufcnt > 0);
- memset(res, 0, niocount * sizeof(*res));
-
- /* This iobuf is for reading any partial pages from disk */
- rc = filter_alloc_iobuf(&exp->exp_obd->u.filter, OBD_BRW_READ,
- obj->ioo_bufcnt, &iobuf);
- if (rc)
- GOTO(cleanup, rc);
+ iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
cleanup_phase = 1;
push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
lnb->len = rnb->len;
lnb->flags = rnb->flags;
- rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
- if (rc) {
- CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
- lnb->len, lnb->offset,
- i, obj->ioo_bufcnt, dentry, rc);
- GOTO(cleanup, rc);
+ /*
+ * ost_brw_write()->ost_nio_pages_get() already initialized
+ * lnb->page to point to the page from the per-thread page
+ * pool (bug 5137), initialize page.
+ */
+ LASSERT(lnb->page != NULL);
+ if (lnb->len != PAGE_SIZE) {
+ memset(kmap(lnb->page) + lnb->len,
+ 0, PAGE_SIZE - lnb->len);
+ kunmap(lnb->page);
}
+ lnb->page->index = lnb->offset >> PAGE_SHIFT;
+
cleanup_phase = 4;
/* If the filter writes a partial page, then has the file
kunmap(lnb->page);
}
}
-
if (lnb->rc == 0)
tot_bytes += lnb->len;
}
cleanup:
switch(cleanup_phase) {
case 4:
- if (rc)
- filter_free_dio_pages(objcount, obj, niocount, res);
case 3:
- filter_free_iobuf(iobuf);
+ filter_iobuf_put(iobuf);
case 2:
pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
if (rc)
filter_grant_incoming(exp, oa);
spin_unlock(&exp->exp_obd->obd_osfs_lock);
pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
- filter_free_iobuf(iobuf);
+ filter_iobuf_put(iobuf);
break;
default:;
}
iobuf->length = 0;
}
+void filter_iobuf_put(void *iobuf)
+{
+ clear_kiobuf(iobuf);
+}
+
int filter_alloc_iobuf(struct filter_obd *filter, int rw, int num_pages,
void **ret)
{
if (rc != 0)
GOTO(cleanup, rc);
- rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_WRITE,
- obj->ioo_bufcnt, &iobuf);
- if (rc)
- GOTO(cleanup, rc);
+ iobuf = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
cleanup_phase = 1;
fso.fso_dentry = res->dentry;
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
LASSERT(current->journal_info == NULL);
case 1:
- filter_free_iobuf(iobuf);
+ filter_iobuf_put(iobuf);
case 0:
- filter_free_dio_pages(objcount, obj, niocount, res);
+ /*
+ * lnb->page automatically returns back into per-thread page
+ * pool (bug 5137)
+ */
f_dput(res->dentry);
}
RETURN(-ENOMEM);
}
-void filter_free_iobuf(void *iobuf)
+void filter_iobuf_put(void *iobuf)
{
struct dio_request *dreq = iobuf;
- int num_pages = dreq->dr_max_pages;
/* free all bios */
while (dreq->dr_bios) {
dreq->dr_bios = bio->bi_private;
bio_put(bio);
}
+ dreq->dr_npages = 0;
+ atomic_set(&dreq->dr_numreqs, 0);
+}
+
+void filter_free_iobuf(void *iobuf)
+{
+ struct dio_request *dreq = iobuf;
+ int num_pages = dreq->dr_max_pages;
+
+ filter_iobuf_put(dreq);
OBD_FREE(dreq->dr_blocks,
MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
OBD_FREE(dreq->dr_pages,
num_pages * sizeof(*dreq->dr_pages));
- OBD_FREE(dreq, sizeof(*dreq));
+ OBD_FREE_PTR(dreq);
}
int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
if (rc != 0)
GOTO(cleanup, rc);
- rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_WRITE, obj->ioo_bufcnt,
- (void **)&dreq);
- if (rc)
- GOTO(cleanup, rc);
+ dreq = filter_iobuf_get(oti->oti_thread, &exp->exp_obd->u.filter);
cleanup_phase = 1;
fso.fso_dentry = res->dentry;
OBD_FREE(uc, sizeof(*uc));
LASSERT(current->journal_info == NULL);
case 1:
- filter_free_iobuf(dreq);
+ filter_iobuf_put(dreq);
case 0:
- filter_free_dio_pages(objcount, obj, niocount, res);
+ /*
+ * lnb->page automatically returns back into per-thread page
+ * pool (bug 5137)
+ */
f_dput(res->dentry);
}
if (req->rq_repmsg && req->rq_reqmsg != 0)
oti->oti_transno = req->rq_repmsg->transno;
+ oti->oti_thread = req->rq_svc_thread;
}
void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
int rnbidx = 0;
int npages = 0;
+ /*
+ * array of sufficient size already preallocated by caller
+ */
+ LASSERT(pp_rnbp != NULL);
+ LASSERT(*pp_rnbp != NULL);
+
/* first count and check the number of pages required */
for (i = 0; i < nioo; i++)
for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
return npages;
}
- OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
- if (pp_rnb == NULL)
- return -ENOMEM;
+ pp_rnb = *pp_rnbp;
/* now do the actual split */
page = rnbidx = 0;
}
LASSERT(page == npages);
- *pp_rnbp = pp_rnb;
return npages;
}
-static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
- struct niobuf_remote *rnb)
-{
- if (pp_rnb == rnb) /* didn't allocate above */
- return;
-
- OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
-}
-
static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
{
__u32 cksum = ~0;
return cksum;
}
+/*
+ * populate @nio by @nrpages pages from per-thread page pool
+ */
+static void ost_nio_pages_get(struct ptlrpc_request *req,
+ struct niobuf_local *nio, int nrpages)
+{
+ int i;
+ struct ost_thread_local_cache *tls;
+
+ ENTRY;
+
+ LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+ LASSERT(req != NULL);
+ LASSERT(req->rq_svc_thread != NULL);
+
+ tls = ost_tls(req);
+ LASSERT(tls != NULL);
+
+ memset(nio, 0, nrpages * sizeof *nio);
+ for (i = 0; i < nrpages; ++ i) {
+ struct page *page;
+
+ page = tls->page[i];
+ LASSERT(page != NULL);
+ POISON_PAGE(page, 0xf1);
+ nio[i].page = page;
+ LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
+ }
+ EXIT;
+}
+
+/*
+ * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
+ */
+static void ost_nio_pages_put(struct ptlrpc_request *req,
+ struct niobuf_local *nio, int nrpages)
+{
+ int i;
+
+ ENTRY;
+
+ LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+
+ for (i = 0; i < nrpages; ++ i)
+ POISON_PAGE(nio[i].page, 0xf2);
+ EXIT;
+}
+
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct ptlrpc_bulk_desc *desc;
struct niobuf_remote *remote_nb;
- struct niobuf_remote *pp_rnb;
+ struct niobuf_remote *pp_rnb = NULL;
struct niobuf_local *local_nb;
struct obd_ioobj *ioo;
struct ost_body *body, *repbody;
if (rc)
GOTO(out, rc);
+ /*
+ * Per-thread array of struct niobuf_{local,remote}'s was allocated by
+ * ost_thread_init().
+ */
+ local_nb = ost_tls(req)->local;
+ pp_rnb = ost_tls(req)->remote;
+
/* FIXME all niobuf splitting should be done in obdfilter if needed */
/* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
if (npages < 0)
GOTO(out, rc = npages);
- OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
- if (local_nb == NULL)
- GOTO(out_pp_rnb, rc = -ENOMEM);
+ LASSERT(npages <= OST_THREAD_POOL_SIZE);
+
+ ost_nio_pages_get(req, local_nb, npages);
desc = ptlrpc_prep_bulk_exp (req, npages,
BULK_PUT_SOURCE, OST_BULK_PORTAL);
if (desc == NULL)
- GOTO(out_local, rc = -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
ioo, npages, pp_rnb, local_nb, oti);
rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
ioo, npages, local_nb, oti, rc);
+ ost_nio_pages_put(req, local_nb, npages);
+
if (rc == 0) {
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
out_bulk:
ptlrpc_free_bulk(desc);
- out_local:
- OBD_FREE(local_nb, sizeof(*local_nb) * npages);
- out_pp_rnb:
- free_per_page_niobufs(npages, pp_rnb, remote_nb);
out:
LASSERT(rc <= 0);
if (rc == 0) {
GOTO(out, rc);
rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
+ /*
+ * Per-thread array of struct niobuf_{local,remote}'s was allocated by
+ * ost_thread_init().
+ */
+ local_nb = ost_tls(req)->local;
+ pp_rnb = ost_tls(req)->remote;
+
/* FIXME all niobuf splitting should be done in obdfilter if needed */
/* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
if (npages < 0)
GOTO(out, rc = npages);
- OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
- if (local_nb == NULL)
- GOTO(out_pp_rnb, rc = -ENOMEM);
+ LASSERT(npages <= OST_THREAD_POOL_SIZE);
+
+ ost_nio_pages_get(req, local_nb, npages);
desc = ptlrpc_prep_bulk_exp (req, npages,
BULK_GET_SINK, OST_BULK_PORTAL);
if (desc == NULL)
- GOTO(out_local, rc = -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* obd_preprw clobbers oa->valid, so save what we need */
do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM);
rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
objcount, ioo, npages, local_nb, oti, rc);
+ ost_nio_pages_put(req, local_nb, npages);
+
if (rc == 0) {
/* set per-requested niobuf return codes */
for (i = j = 0; i < niocount; i++) {
out_bulk:
ptlrpc_free_bulk(desc);
- out_local:
- OBD_FREE(local_nb, sizeof(*local_nb) * npages);
- out_pp_rnb:
- free_per_page_niobufs(npages, pp_rnb, remote_nb);
out:
if (rc == 0) {
oti_to_request(oti, req);
static int ost_san_brw(struct ptlrpc_request *req, int cmd)
{
- struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
+ struct niobuf_remote *remote_nb, *res_nb, *pp_rnb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body, *repbody;
int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
lustre_swab_niobuf_remote (&remote_nb[i]);
}
+ /*
+ * Per-thread array of struct niobuf_remote's was allocated by
+ * ost_thread_init().
+ */
+ pp_rnb = ost_tls(req)->remote;
+
/* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
if (npages < 0)
size[1] = npages * sizeof(*pp_rnb);
rc = lustre_pack_reply(req, 2, size, NULL);
if (rc)
- GOTO(out_pp_rnb, rc);
+ GOTO(out, rc);
req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
objcount, ioo, npages, pp_rnb);
if (req->rq_status)
- GOTO(out_pp_rnb, rc = 0);
+ GOTO(out, rc = 0);
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
memcpy(res_nb, remote_nb, size[1]);
rc = 0;
-out_pp_rnb:
- free_per_page_niobufs(npages, pp_rnb, remote_nb);
out:
target_committed_to_req(req);
if (rc) {
return 0;
}
+/*
+ * free per-thread pool created by ost_thread_init().
+ */
+static void ost_thread_done(struct ptlrpc_thread *thread)
+{
+ int i;
+ struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
+ * Storage */
+
+ ENTRY;
+
+ LASSERT(thread != NULL);
+ LASSERT(thread->t_data != NULL);
+
+ /*
+ * be prepared to handle partially-initialized pools (because this is
+ * called from ost_thread_init() for cleanup.
+ */
+ tls = thread->t_data;
+ if (tls != NULL) {
+ for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
+ if (tls->page[i] != NULL)
+ __free_page(tls->page[i]);
+ }
+ OBD_FREE_PTR(tls);
+ thread->t_data = NULL;
+ }
+ EXIT;
+}
+
+/*
+ * initialize per-thread page pool (bug 5137).
+ */
+static int ost_thread_init(struct ptlrpc_thread *thread)
+{
+ int result;
+ int i;
+ struct ost_thread_local_cache *tls;
+
+ ENTRY;
+
+ LASSERT(thread != NULL);
+ LASSERT(thread->t_data == NULL);
+ LASSERT(thread->t_id < OST_NUM_THREADS);
+
+ OBD_ALLOC_PTR(tls);
+ if (tls != NULL) {
+ result = 0;
+ thread->t_data = tls;
+ /*
+ * populate pool
+ */
+ for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
+ tls->page[i] = alloc_page(OST_THREAD_POOL_GFP);
+ if (tls->page[i] == NULL) {
+ ost_thread_done(thread);
+ result = -ENOMEM;
+ break;
+ }
+ }
+ } else
+ result = -ENOMEM;
+ RETURN(result);
+}
+
static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
{
struct ost_obd *ost = &obd->u.ost;
GOTO(out_lprocfs, rc = -ENOMEM);
}
- rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
- "ll_ost");
+ ost->ost_service->srv_init = ost_thread_init;
+ ost->ost_service->srv_done = ost_thread_done;
+ rc = ptlrpc_start_n_threads(obd, ost->ost_service,
+ OST_NUM_THREADS, "ll_ost");
if (rc)
GOTO(out_service, rc = -EINVAL);
GOTO(out_service, rc = -ENOMEM);
}
- rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
- "ll_ost_creat");
+ rc = ptlrpc_start_n_threads(obd, ost->ost_create_service,
+ 1, "ll_ost_creat");
if (rc)
GOTO(out_create, rc = -EINVAL);
RETURN(err);
}
+struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
+{
+ return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+}
+
/* use obd ops to offer management infrastructure */
static struct obd_ops ost_obd_ops = {
.o_owner = THIS_MODULE,
# define ost_print_req NULL
#endif
+/*
+ * tunables for per-thread page pool (bug 5137)
+ */
+enum {
+ /*
+ * pool size in pages
+ */
+ OST_THREAD_POOL_SIZE = PTLRPC_MAX_BRW_PAGES,
+ /*
+ * GFP mask used to allocate pages for pool
+ */
+ OST_THREAD_POOL_GFP = GFP_HIGHUSER
+};
+
+struct page;
+struct niobuf_local;
+struct niobuf_remote;
+struct ptlrpc_request;
+
+/*
+ * struct ost_thread_local_cache is allocated and initialized for each OST
+ * thread by ost_thread_init().
+ */
+struct ost_thread_local_cache {
+ /*
+ * pool of pages and nio buffers used by write-path
+ */
+ struct page *page [OST_THREAD_POOL_SIZE];
+ struct niobuf_local local [OST_THREAD_POOL_SIZE];
+ struct niobuf_remote remote[OST_THREAD_POOL_SIZE];
+};
+
+struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
+
#ifdef HAVE_QUOTA_SUPPORT
/* Quota stuff */
int ost_quotacheck(struct ptlrpc_request *req);
}
static int
-ptlrpc_server_handle_request (struct ptlrpc_service *svc)
+ptlrpc_server_handle_request(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread)
{
struct ptlrpc_request *request;
unsigned long flags;
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
+ request->rq_svc_thread = thread;
request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
if (request->rq_export) {
do {
rc = ptlrpc_server_handle_reply(svc);
- rc |= ptlrpc_server_handle_request(svc);
+ rc |= ptlrpc_server_handle_request(svc, NULL);
rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
did_something |= rc;
} while (rc);
/* Record that the thread is running */
thread->t_flags = SVC_RUNNING;
+ /*
+ * wake up our creator. Note: @data is invalid after this point,
+ * because it's allocated on ptlrpc_start_thread() stack.
+ */
wake_up(&thread->t_ctl_waitq);
watchdog = lc_watchdog_add(svc->srv_watchdog_timeout,
if (!list_empty (&svc->srv_request_queue) &&
(svc->srv_n_difficult_replies == 0 ||
svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
- ptlrpc_server_handle_request (svc);
+ ptlrpc_server_handle_request(svc, thread);
if (!list_empty(&svc->srv_idle_rqbds) &&
ptlrpc_server_post_idle_rqbds(svc) < 0) {
}
}
+ /*
+ * deconstruct service specific state created by ptlrpc_start_thread()
+ */
+ if (svc->srv_done != NULL)
+ svc->srv_done(thread);
+
spin_lock_irqsave(&svc->srv_lock, flags);
svc->srv_nthreads--; /* must know immediately */
for (i = 0; i < num_threads; i++) {
char name[32];
sprintf(name, "%s_%02d", base_name, i);
- rc = ptlrpc_start_thread(dev, svc, name);
+ rc = ptlrpc_start_thread(dev, svc, name, i);
if (rc) {
CERROR("cannot start %s thread #%d: rc %d\n", base_name,
i, rc);
}
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
- char *name)
+ char *name, int id)
{
struct l_wait_info lwi = { 0 };
struct ptlrpc_svc_data d;
if (thread == NULL)
RETURN(-ENOMEM);
init_waitqueue_head(&thread->t_ctl_waitq);
+ thread->t_id = id;
+
+ if (svc->srv_init != NULL) {
+ rc = svc->srv_init(thread);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ spin_lock_irqsave(&svc->srv_lock, flags);
+ list_add(&thread->t_link, &svc->srv_threads);
+ spin_unlock_irqrestore(&svc->srv_lock, flags);
d.dev = dev;
d.svc = svc;
d.name = name;
d.thread = thread;
- spin_lock_irqsave(&svc->srv_lock, flags);
- list_add(&thread->t_link, &svc->srv_threads);
- spin_unlock_irqrestore(&svc->srv_lock, flags);
-
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
spin_lock_irqsave(&svc->srv_lock, flags);
list_del(&thread->t_link);
spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+ if (svc->srv_done != NULL)
+ svc->srv_done(thread);
+
OBD_FREE(thread, sizeof(*thread));
RETURN(rc);
}