RQ_PHASE_COMPLETE = 0xebc0de04,
};
+struct ptlrpc_request_pool {
+ spinlock_t prp_lock;
+ struct list_head prp_req_list; /* list of ptlrpc_request structs */
+ int prp_rq_size;
+ void (*prp_populate)(struct ptlrpc_request_pool *, int);
+};
+
struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_MSG_* */
struct list_head rq_list;
void *rq_interpret_reply; /* Async completion handler */
union ptlrpc_async_args rq_async_args; /* Async completion context */
void *rq_ptlrpcd_data;
+ struct ptlrpc_request_pool *rq_pool; /* Pool if request from
+ preallocated list */
};
static inline const char *
void ptlrpc_set_add_new_req(struct ptlrpc_request_set *,
struct ptlrpc_request *);
+void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
+void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
+struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
+ void (*populate_pool)(struct ptlrpc_request_pool *, int));
struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
int count, int *lengths, char **bufs);
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
+ int count, int *lengths,
+ char **bufs,
+ struct ptlrpc_request_pool *pool);
void ptlrpc_free_req(struct ptlrpc_request *request);
void ptlrpc_req_finished(struct ptlrpc_request *request);
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
int requested_nob;
int opc;
int rc;
+ struct ptlrpc_request_pool *pool;
opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
+ pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_rq_pool : NULL;
for (niocount = i = 1; i < page_count; i++)
if (!can_merge_pages(&pga[i - 1], &pga[i]))
size[2] = niocount * sizeof(*niobuf);
OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
- req = ptlrpc_prep_req(imp, opc, 3, size, NULL);
+ req = ptlrpc_prep_req_pool(imp, opc, 3, size, NULL, pool);
if (req == NULL)
return (-ENOMEM);
RETURN(0);
loi_list_maint(cli, loi);
+
spin_unlock(&cli->cl_loi_list_lock);
request = osc_build_req(cli, &rpc_list, page_count, cmd);
cli->cl_r_in_flight++;
else
cli->cl_w_in_flight++;
+
/* queued sync pages can be torn down while the pages
* were between the pending list and the rpc */
list_for_each(pos, &aa->aa_oaps) {
struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page *pga)
{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *request = NULL;
struct ost_body *body;
struct niobuf_remote *nioptr;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*nioptr);
- request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE,
- 3, size, NULL);
+ request = ptlrpc_prep_req_pool(class_exp2cliimp(exp), OST_SAN_WRITE,
+ 3, size, NULL, cli->cl_rq_pool);
if (!request)
RETURN(-ENOMEM);
ptlrpcd_decref();
} else {
struct lprocfs_static_vars lvars;
+ struct client_obd *cli = &obd->u.cli;
lprocfs_init_vars(osc, &lvars);
if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
}
oscc_init(obd);
+ /* We need to allocate a few requests more, because
+ brw_interpret_oap tries to create new requests before freeing
+ previous ones. Ideally we want to have 2x max_rpcs_in_flight
+ reserved, but I afraid that might be too much wasted RAM
+ in fact, so 2 is just my guess and still should work. */
+ cli->cl_rq_pool = ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
+ OST_MAXREQSIZE,
+ ptlrpc_add_rqs_to_pool);
}
RETURN(rc);
/* free memory of osc quota cache */
osc_qinfo_cleanup(cli);
+ ptlrpc_free_rq_pool(cli->cl_rq_pool);
+
rc = client_obd_cleanup(obd);
ptlrpcd_decref();
RETURN(rc);
EXIT;
}
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
- int count, int *lengths, char **bufs)
+void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
+{
+ struct list_head *l, *tmp;
+ struct ptlrpc_request *req;
+
+ if (!pool)
+ return;
+
+ list_for_each_safe(l, tmp, &pool->prp_req_list) {
+ req = list_entry(l, struct ptlrpc_request, rq_list);
+ list_del(&req->rq_list);
+ LASSERT (req->rq_reqmsg);
+ OBD_FREE(req->rq_reqmsg, pool->prp_rq_size);
+ OBD_FREE(req, sizeof(*req));
+ }
+ OBD_FREE(pool, sizeof(*pool));
+}
+
+void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
+{
+ int i;
+ int size = 1;
+
+ while (size < pool->prp_rq_size)
+ size <<= 1;
+
+ LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,
+ "Trying to change pool size with nonempty pool "
+ "from %d to %d bytes\n", pool->prp_rq_size, size);
+
+ spin_lock(&pool->prp_lock);
+ pool->prp_rq_size = size;
+ for (i = 0; i < num_rq; i++) {
+ struct ptlrpc_request *req;
+ struct lustre_msg *msg;
+ OBD_ALLOC(req, sizeof(struct ptlrpc_request));
+ if (!req)
+ goto out;
+ OBD_ALLOC_GFP(msg, size, GFP_KERNEL);
+ if (!msg) {
+ OBD_FREE(req, sizeof(struct ptlrpc_request));
+ goto out;
+ }
+ req->rq_reqmsg = msg;
+ req->rq_pool = pool;
+ list_add_tail(&req->rq_list, &pool->prp_req_list);
+ }
+out:
+ spin_unlock(&pool->prp_lock);
+ return;
+}
+
+struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
+ void (*populate_pool)(struct ptlrpc_request_pool *, int))
+{
+ struct ptlrpc_request_pool *pool;
+
+ OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));
+ if (!pool)
+ return NULL;
+
+ /* Request next power of two for the allocation, because internally
+ kernel would do exactly this */
+
+ spin_lock_init(&pool->prp_lock);
+ INIT_LIST_HEAD(&pool->prp_req_list);
+ pool->prp_rq_size = msgsize;
+ pool->prp_populate = populate_pool;
+
+ populate_pool(pool, num_rq);
+
+ if (list_empty(&pool->prp_req_list)) {
+ /* have not allocated a single request for the pool */
+ OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));
+ pool = NULL;
+ }
+ return pool;
+}
+
+static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
{
struct ptlrpc_request *request;
+ struct lustre_msg *reqmsg;
+
+ if (!pool)
+ return NULL;
+
+ spin_lock(&pool->prp_lock);
+
+ /* See if we have anything in a pool, and bail out if nothing,
+ * in writeout path, where this matters, this is safe to do, because
+ * nothing is lost in this case, and when some in-flight requests
+ * complete, this code will be called again. */
+ if (unlikely(list_empty(&pool->prp_req_list))) {
+ spin_unlock(&pool->prp_lock);
+ return NULL;
+ }
+
+ request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
+ rq_list);
+ list_del(&request->rq_list);
+ spin_unlock(&pool->prp_lock);
+
+ LASSERT(request->rq_reqmsg);
+ LASSERT(request->rq_pool);
+
+ reqmsg = request->rq_reqmsg;
+ memset(request, 0, sizeof(*request));
+ request->rq_reqmsg = reqmsg;
+ request->rq_pool = pool;
+ request->rq_reqlen = pool->prp_rq_size;
+ return request;
+}
+
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
+ int count, int *lengths,
+ char **bufs,
+ struct ptlrpc_request_pool *pool)
+{
+ struct ptlrpc_request *request = NULL;
int rc;
ENTRY;
LASSERT((unsigned long)imp->imp_client > 0x1000);
LASSERT(imp->imp_client != LP_POISON);
- OBD_ALLOC(request, sizeof(*request));
+ if (pool)
+ request = ptlrpc_prep_req_from_pool(pool);
+
+ if (!request)
+ OBD_ALLOC(request, sizeof(*request));
+
if (!request) {
CERROR("request allocation out of memory\n");
RETURN(NULL);
rc = lustre_pack_request(request, count, lengths, bufs);
if (rc) {
- CERROR("cannot pack request %d\n", rc);
+ LASSERT(!request->rq_pool);
OBD_FREE(request, sizeof(*request));
RETURN(NULL);
}
RETURN(request);
}
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+ int count, int *lengths, char **bufs)
+{
+ return ptlrpc_prep_req_pool(imp, opcode, count, lengths, bufs, NULL);
+}
+
+
struct ptlrpc_request_set *ptlrpc_prep_set(void)
{
struct ptlrpc_request_set *set;
RETURN(rc);
}
+static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
+{
+ struct ptlrpc_request_pool *pool = request->rq_pool;
+
+ spin_lock(&pool->prp_lock);
+ list_add_tail(&request->rq_list, &pool->prp_req_list);
+ spin_unlock(&pool->prp_lock);
+}
+
static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
{
ENTRY;
OBD_FREE(request->rq_repmsg, request->rq_replen);
request->rq_repmsg = NULL;
}
- if (request->rq_reqmsg != NULL) {
- OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
- request->rq_reqmsg = NULL;
- }
if (request->rq_export != NULL) {
class_export_put(request->rq_export);
request->rq_export = NULL;
if (request->rq_bulk != NULL)
ptlrpc_free_bulk(request->rq_bulk);
- OBD_FREE(request, sizeof(*request));
+ if (request->rq_pool) {
+ __ptlrpc_free_req_to_pool(request);
+ } else {
+ if (request->rq_reqmsg != NULL) {
+ OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+ request->rq_reqmsg = NULL;
+ }
+ OBD_FREE(request, sizeof(*request));
+ }
EXIT;
}