From 3ec1726129df122583e271e5b18c13c3b3747e1c Mon Sep 17 00:00:00 2001 From: green Date: Fri, 15 Jul 2005 12:51:35 +0000 Subject: [PATCH] b=6422 r=adilger Support a pool of requests and lustre_msgs when allocating requests. Use that on osc writeout path. --- lustre/include/linux/lustre_net.h | 17 ++++ lustre/include/linux/obd.h | 1 + lustre/osc/lproc_osc.c | 4 + lustre/osc/osc_request.c | 22 +++++- lustre/ptlrpc/client.c | 159 +++++++++++++++++++++++++++++++++++--- lustre/ptlrpc/pack_generic.c | 23 +++++- lustre/ptlrpc/ptlrpc_module.c | 4 + 7 files changed, 214 insertions(+), 16 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index faf290e..d90ae57 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -293,6 +293,13 @@ enum rq_phase { RQ_PHASE_COMPLETE = 0xebc0de04, }; +struct ptlrpc_request_pool { + spinlock_t prp_lock; + struct list_head prp_req_list; /* list of ptlrpc_request structs */ + int prp_rq_size; + void (*prp_populate)(struct ptlrpc_request_pool *, int); +}; + struct ptlrpc_request { int rq_type; /* one of PTL_RPC_MSG_* */ struct list_head rq_list; @@ -368,6 +375,8 @@ struct ptlrpc_request { void *rq_interpret_reply; /* Async completion handler */ union ptlrpc_async_args rq_async_args; /* Async completion context */ void *rq_ptlrpcd_data; + struct ptlrpc_request_pool *rq_pool; /* Pool if request from + preallocated list */ }; static inline const char * @@ -698,8 +707,16 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *); void ptlrpc_set_add_new_req(struct ptlrpc_request_set *, struct ptlrpc_request *); +void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool); +void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq); +struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int, + void (*populate_pool)(struct ptlrpc_request_pool *, int)); struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, int count, int *lengths, char **bufs); +struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode, + int count, int *lengths, + char **bufs, + struct ptlrpc_request_pool *pool); void ptlrpc_free_req(struct ptlrpc_request *request); void ptlrpc_req_finished(struct ptlrpc_request *request); void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 823b24e..3bed2e9 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -317,6 +317,7 @@ struct client_obd { /* used by quotacheck */ spinlock_t cl_qchk_lock; int cl_qchk_stat; /* quotacheck stat of the peer */ + struct ptlrpc_request_pool *cl_rq_pool; /* emergency pool of requests */ }; /* Like a client, with some hangers-on. Keep mc_client_obd first so that we diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index b8d8ce5..1a82ba3 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -92,6 +92,10 @@ static int osc_wr_max_rpcs_in_flight(struct file *file, const char *buffer, if (val < 1 || val > OSC_MAX_RIF_MAX) return -ERANGE; + if (cli->cl_rq_pool && val > cli->cl_max_rpcs_in_flight) + cli->cl_rq_pool->prp_populate(cli->cl_rq_pool, + val - cli->cl_max_rpcs_in_flight); + spin_lock(&cli->cl_loi_list_lock); cli->cl_max_rpcs_in_flight = val; spin_unlock(&cli->cl_loi_list_lock); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 7ee7a60..ff6deb8 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -758,8 +758,10 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, int requested_nob; int opc; int rc; + struct ptlrpc_request_pool *pool; opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ; + pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_rq_pool : NULL; for (niocount = i = 1; i < page_count; i++) if (!can_merge_pages(&pga[i - 1], &pga[i])) @@ -770,7 +772,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, size[2] = niocount * sizeof(*niobuf); OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); - req = ptlrpc_prep_req(imp, opc, 3, size, NULL); + req = ptlrpc_prep_req_pool(imp, opc, 3, size, NULL, pool); if (req == NULL) return (-ENOMEM); @@ -1584,6 +1586,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, RETURN(0); loi_list_maint(cli, loi); + spin_unlock(&cli->cl_loi_list_lock); request = osc_build_req(cli, &rpc_list, page_count, cmd); @@ -1645,6 +1648,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, cli->cl_r_in_flight++; else cli->cl_w_in_flight++; + /* queued sync pages can be torn down while the pages * were between the pending list and the rpc */ list_for_each(pos, &aa->aa_oaps) { @@ -2397,6 +2401,7 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga) { + struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *request = NULL; struct ost_body *body; struct niobuf_remote *nioptr; @@ -2408,8 +2413,8 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa, size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(*nioptr); - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE, - 3, size, NULL); + request = ptlrpc_prep_req_pool(class_exp2cliimp(exp), OST_SAN_WRITE, + 3, size, NULL, cli->cl_rq_pool); if (!request) RETURN(-ENOMEM); @@ -3186,6 +3191,7 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf) ptlrpcd_decref(); } else { struct lprocfs_static_vars lvars; + struct client_obd *cli = &obd->u.cli; lprocfs_init_vars(osc, &lvars); if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { @@ -3194,6 +3200,14 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf) } oscc_init(obd); + /* We need to allocate a few requests more, because + brw_interpret_oap tries to create new requests before freeing + previous ones. Ideally we want to have 2x max_rpcs_in_flight + reserved, but I afraid that might be too much wasted RAM + in fact, so 2 is just my guess and still should work. */ + cli->cl_rq_pool = ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, + OST_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); } RETURN(rc); @@ -3231,6 +3245,8 @@ int osc_cleanup(struct obd_device *obd) /* free memory of osc quota cache */ osc_qinfo_cleanup(cli); + ptlrpc_free_rq_pool(cli->cl_rq_pool); + rc = client_obd_cleanup(obd); ptlrpcd_decref(); RETURN(rc); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index c812d90..ca7804d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -181,10 +181,126 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) EXIT; } -struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, - int count, int *lengths, char **bufs) +void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool) +{ + struct list_head *l, *tmp; + struct ptlrpc_request *req; + + if (!pool) + return; + + list_for_each_safe(l, tmp, &pool->prp_req_list) { + req = list_entry(l, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); + LASSERT (req->rq_reqmsg); + OBD_FREE(req->rq_reqmsg, pool->prp_rq_size); + OBD_FREE(req, sizeof(*req)); + } + OBD_FREE(pool, sizeof(*pool)); +} + +void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) +{ + int i; + int size = 1; + + while (size < pool->prp_rq_size) + size <<= 1; + + LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size, + "Trying to change pool size with nonempty pool " + "from %d to %d bytes\n", pool->prp_rq_size, size); + + spin_lock(&pool->prp_lock); + pool->prp_rq_size = size; + for (i = 0; i < num_rq; i++) { + struct ptlrpc_request *req; + struct lustre_msg *msg; + OBD_ALLOC(req, sizeof(struct ptlrpc_request)); + if (!req) + goto out; + OBD_ALLOC_GFP(msg, size, GFP_KERNEL); + if (!msg) { + OBD_FREE(req, sizeof(struct ptlrpc_request)); + goto out; + } + req->rq_reqmsg = msg; + req->rq_pool = pool; + list_add_tail(&req->rq_list, &pool->prp_req_list); + } +out: + spin_unlock(&pool->prp_lock); + return; +} + +struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize, + void (*populate_pool)(struct ptlrpc_request_pool *, int)) +{ + struct ptlrpc_request_pool *pool; + + OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool)); + if (!pool) + return NULL; + + /* Request next power of two for the allocation, because internally + kernel would do exactly this */ + + spin_lock_init(&pool->prp_lock); + INIT_LIST_HEAD(&pool->prp_req_list); + pool->prp_rq_size = msgsize; + pool->prp_populate = populate_pool; + + populate_pool(pool, num_rq); + + if (list_empty(&pool->prp_req_list)) { + /* have not allocated a single request for the pool */ + OBD_FREE(pool, sizeof (struct ptlrpc_request_pool)); + pool = NULL; + } + return pool; +} + +static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) { struct ptlrpc_request *request; + struct lustre_msg *reqmsg; + + if (!pool) + return NULL; + + spin_lock(&pool->prp_lock); + + /* See if we have anything in a pool, and bail out if nothing, + * in writeout path, where this matters, this is safe to do, because + * nothing is lost in this case, and when some in-flight requests + * complete, this code will be called again. */ + if (unlikely(list_empty(&pool->prp_req_list))) { + spin_unlock(&pool->prp_lock); + return NULL; + } + + request = list_entry(pool->prp_req_list.next, struct ptlrpc_request, + rq_list); + list_del(&request->rq_list); + spin_unlock(&pool->prp_lock); + + LASSERT(request->rq_reqmsg); + LASSERT(request->rq_pool); + + reqmsg = request->rq_reqmsg; + memset(request, 0, sizeof(*request)); + request->rq_reqmsg = reqmsg; + request->rq_pool = pool; + request->rq_reqlen = pool->prp_rq_size; + return request; +} + +struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode, + int count, int *lengths, + char **bufs, + struct ptlrpc_request_pool *pool) +{ + struct ptlrpc_request *request = NULL; int rc; ENTRY; @@ -193,7 +309,12 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, LASSERT((unsigned long)imp->imp_client > 0x1000); LASSERT(imp->imp_client != LP_POISON); - OBD_ALLOC(request, sizeof(*request)); + if (pool) + request = ptlrpc_prep_req_from_pool(pool); + + if (!request) + OBD_ALLOC(request, sizeof(*request)); + if (!request) { CERROR("request allocation out of memory\n"); RETURN(NULL); @@ -201,7 +322,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, rc = lustre_pack_request(request, count, lengths, bufs); if (rc) { - CERROR("cannot pack request %d\n", rc); + LASSERT(!request->rq_pool); OBD_FREE(request, sizeof(*request)); RETURN(NULL); } @@ -241,6 +362,13 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, RETURN(request); } +struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode, + int count, int *lengths, char **bufs) +{ + return ptlrpc_prep_req_pool(imp, opcode, count, lengths, bufs, NULL); +} + + struct ptlrpc_request_set *ptlrpc_prep_set(void) { struct ptlrpc_request_set *set; @@ -1001,6 +1129,15 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) RETURN(rc); } +static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) +{ + struct ptlrpc_request_pool *pool = request->rq_pool; + + spin_lock(&pool->prp_lock); + list_add_tail(&request->rq_list, &pool->prp_req_list); + spin_unlock(&pool->prp_lock); +} + static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) { ENTRY; @@ -1037,10 +1174,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) OBD_FREE(request->rq_repmsg, request->rq_replen); request->rq_repmsg = NULL; } - if (request->rq_reqmsg != NULL) { - OBD_FREE(request->rq_reqmsg, request->rq_reqlen); - request->rq_reqmsg = NULL; - } if (request->rq_export != NULL) { class_export_put(request->rq_export); request->rq_export = NULL; @@ -1052,7 +1185,15 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) if (request->rq_bulk != NULL) ptlrpc_free_bulk(request->rq_bulk); - OBD_FREE(request, sizeof(*request)); + if (request->rq_pool) { + __ptlrpc_free_req_to_pool(request); + } else { + if (request->rq_reqmsg != NULL) { + OBD_FREE(request->rq_reqmsg, request->rq_reqlen); + request->rq_reqmsg = NULL; + } + OBD_FREE(request, sizeof(*request)); + } EXIT; } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index d9ea5e1..ac0424e 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -68,12 +68,27 @@ lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs) int lustre_pack_request (struct ptlrpc_request *req, int count, int *lens, char **bufs) { + int reqlen; ENTRY; - req->rq_reqlen = lustre_msg_size (count, lens); - OBD_ALLOC(req->rq_reqmsg, req->rq_reqlen); - if (req->rq_reqmsg == NULL) - RETURN(-ENOMEM); + reqlen = lustre_msg_size (count, lens); + /* See if we got it from prealloc pool */ + if (req->rq_reqmsg) { + /* Cannot return error here, that would create + infinite loop in ptlrpc_prep_req_pool */ + /* In this case ptlrpc_prep_req_from_pool sets req->rq_reqlen + to maximum size that would fit into this preallocated + request */ + LASSERTF(req->rq_reqlen >= reqlen, "req->rq_reqlen %d, " + "reqlen %d\n",req->rq_reqlen, + reqlen); + memset(req->rq_reqmsg, 0, reqlen); + } else { + OBD_ALLOC(req->rq_reqmsg, reqlen); + if (req->rq_reqmsg == NULL) + RETURN(-ENOMEM); + } + req->rq_reqlen = reqlen; lustre_init_msg (req->rq_reqmsg, count, lens, bufs); RETURN (0); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 8490664..23b9cfa 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -100,6 +100,10 @@ EXPORT_SYMBOL(ptlrpc_uuid_to_connection); EXPORT_SYMBOL(ptlrpc_queue_wait); EXPORT_SYMBOL(ptlrpc_replay_req); EXPORT_SYMBOL(ptlrpc_restart_req); +EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); +EXPORT_SYMBOL(ptlrpc_init_rq_pool); +EXPORT_SYMBOL(ptlrpc_free_rq_pool); +EXPORT_SYMBOL(ptlrpc_prep_req_pool); EXPORT_SYMBOL(ptlrpc_prep_req); EXPORT_SYMBOL(ptlrpc_free_req); EXPORT_SYMBOL(ptlrpc_unregister_reply); -- 1.8.3.1