Whamcloud - gitweb
b=6422
authorgreen <green>
Fri, 15 Jul 2005 12:51:35 +0000 (12:51 +0000)
committergreen <green>
Fri, 15 Jul 2005 12:51:35 +0000 (12:51 +0000)
r=adilger

Support a pool of requests and lustre_msgs when allocating requests.
Use that on osc writeout path.

lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/osc/lproc_osc.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c

index faf290e..d90ae57 100644 (file)
@@ -293,6 +293,13 @@ enum rq_phase {
         RQ_PHASE_COMPLETE    = 0xebc0de04,
 };
 
+struct ptlrpc_request_pool {
+        spinlock_t prp_lock;
+        struct list_head prp_req_list;    /* list of ptlrpc_request structs */
+        int prp_rq_size;
+        void (*prp_populate)(struct ptlrpc_request_pool *, int);
+};
+        
 struct ptlrpc_request {
         int rq_type; /* one of PTL_RPC_MSG_* */
         struct list_head rq_list;
@@ -368,6 +375,8 @@ struct ptlrpc_request {
         void *rq_interpret_reply;               /* Async completion handler */
         union ptlrpc_async_args rq_async_args;  /* Async completion context */
         void *rq_ptlrpcd_data;
+        struct ptlrpc_request_pool *rq_pool;    /* Pool if request from
+                                                   preallocated list */
 };
 
 static inline const char *
@@ -698,8 +707,16 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *,
                             struct ptlrpc_request *);
 
+void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
+void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
+struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
+                                                void (*populate_pool)(struct ptlrpc_request_pool *, int));
 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
                                        int count, int *lengths, char **bufs);
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
+                                            int count, int *lengths,
+                                            char **bufs,
+                                            struct ptlrpc_request_pool *pool);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 void ptlrpc_req_finished(struct ptlrpc_request *request);
 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
index 823b24e..3bed2e9 100644 (file)
@@ -317,6 +317,7 @@ struct client_obd {
         /* used by quotacheck */
         spinlock_t               cl_qchk_lock;
         int                      cl_qchk_stat; /* quotacheck stat of the peer */
+        struct ptlrpc_request_pool *cl_rq_pool; /* emergency pool of requests */
 };
 
 /* Like a client, with some hangers-on.  Keep mc_client_obd first so that we
index b8d8ce5..1a82ba3 100644 (file)
@@ -92,6 +92,10 @@ static int osc_wr_max_rpcs_in_flight(struct file *file, const char *buffer,
         if (val < 1 || val > OSC_MAX_RIF_MAX)
                 return -ERANGE;
 
+        if (cli->cl_rq_pool && val > cli->cl_max_rpcs_in_flight)
+                cli->cl_rq_pool->prp_populate(cli->cl_rq_pool,
+                                              val - cli->cl_max_rpcs_in_flight);
+
         spin_lock(&cli->cl_loi_list_lock);
         cli->cl_max_rpcs_in_flight = val;
         spin_unlock(&cli->cl_loi_list_lock);
index 7ee7a60..ff6deb8 100644 (file)
@@ -758,8 +758,10 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         int                      requested_nob;
         int                      opc;
         int                      rc;
+        struct ptlrpc_request_pool *pool;
 
         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
+        pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_rq_pool : NULL;
 
         for (niocount = i = 1; i < page_count; i++)
                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
@@ -770,7 +772,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         size[2] = niocount * sizeof(*niobuf);
 
         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
-        req = ptlrpc_prep_req(imp, opc, 3, size, NULL);
+        req = ptlrpc_prep_req_pool(imp, opc, 3, size, NULL, pool);
         if (req == NULL)
                 return (-ENOMEM);
 
@@ -1584,6 +1586,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                 RETURN(0);
 
         loi_list_maint(cli, loi);
+
         spin_unlock(&cli->cl_loi_list_lock);
 
         request = osc_build_req(cli, &rpc_list, page_count, cmd);
@@ -1645,6 +1648,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                 cli->cl_r_in_flight++;
         else
                 cli->cl_w_in_flight++;
+
         /* queued sync pages can be torn down while the pages
          * were between the pending list and the rpc */
         list_for_each(pos, &aa->aa_oaps) {
@@ -2397,6 +2401,7 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
                             struct lov_stripe_md *lsm, obd_count page_count,
                             struct brw_page *pga)
 {
+        struct client_obd *cli = &exp->exp_obd->u.cli;
         struct ptlrpc_request *request = NULL;
         struct ost_body *body;
         struct niobuf_remote *nioptr;
@@ -2408,8 +2413,8 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(*nioptr);
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE,
-                                  3, size, NULL);
+        request = ptlrpc_prep_req_pool(class_exp2cliimp(exp), OST_SAN_WRITE,
+                                       3, size, NULL, cli->cl_rq_pool);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -3186,6 +3191,7 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
                 ptlrpcd_decref();
         } else {
                 struct lprocfs_static_vars lvars;
+                struct client_obd *cli = &obd->u.cli;
 
                 lprocfs_init_vars(osc, &lvars);
                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
@@ -3194,6 +3200,14 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
                 }
 
                 oscc_init(obd);
+                /* We need to allocate a few requests more, because
+                   brw_interpret_oap tries to create new requests before freeing
+                   previous ones. Ideally we want to have 2x max_rpcs_in_flight
+                   reserved, but I afraid that might be too much wasted RAM
+                   in fact, so 2 is just my guess and still should work. */
+                cli->cl_rq_pool = ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
+                                                      OST_MAXREQSIZE,
+                                                      ptlrpc_add_rqs_to_pool);
         }
 
         RETURN(rc);
@@ -3231,6 +3245,8 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         osc_qinfo_cleanup(cli);
 
+        ptlrpc_free_rq_pool(cli->cl_rq_pool);
+
         rc = client_obd_cleanup(obd);
         ptlrpcd_decref();
         RETURN(rc);
index c812d90..ca7804d 100644 (file)
@@ -181,10 +181,126 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
         EXIT;
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
-                                       int count, int *lengths, char **bufs)
+void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
+{
+        struct list_head *l, *tmp;
+        struct ptlrpc_request *req;
+
+        if (!pool)
+                return;
+
+        list_for_each_safe(l, tmp, &pool->prp_req_list) {
+                req = list_entry(l, struct ptlrpc_request, rq_list);
+                list_del(&req->rq_list);
+                LASSERT (req->rq_reqmsg);
+                OBD_FREE(req->rq_reqmsg, pool->prp_rq_size);
+                OBD_FREE(req, sizeof(*req));
+        }
+        OBD_FREE(pool, sizeof(*pool));
+}
+
+void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
+{
+        int i;
+        int size = 1;
+
+        while (size < pool->prp_rq_size)
+                size <<= 1;
+
+        LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,
+                 "Trying to change pool size with nonempty pool "
+                 "from %d to %d bytes\n", pool->prp_rq_size, size);
+
+        spin_lock(&pool->prp_lock);
+        pool->prp_rq_size = size;
+        for (i = 0; i < num_rq; i++) {
+                struct ptlrpc_request *req;
+                struct lustre_msg *msg;
+                OBD_ALLOC(req, sizeof(struct ptlrpc_request));
+                if (!req)
+                        goto out;
+                OBD_ALLOC_GFP(msg, size, GFP_KERNEL);
+                if (!msg) {
+                        OBD_FREE(req, sizeof(struct ptlrpc_request));
+                        goto out;
+                }
+                req->rq_reqmsg = msg;
+                req->rq_pool = pool;
+                list_add_tail(&req->rq_list, &pool->prp_req_list);
+        }
+out:
+        spin_unlock(&pool->prp_lock);
+        return;
+}
+
+struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
+                                                void (*populate_pool)(struct ptlrpc_request_pool *, int))
+{
+        struct ptlrpc_request_pool *pool;
+
+        OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));
+        if (!pool)
+                return NULL;
+
+        /* Request next power of two for the allocation, because internally
+           kernel would do exactly this */
+
+        spin_lock_init(&pool->prp_lock);
+        INIT_LIST_HEAD(&pool->prp_req_list);
+        pool->prp_rq_size = msgsize;
+        pool->prp_populate = populate_pool;
+
+        populate_pool(pool, num_rq);
+
+        if (list_empty(&pool->prp_req_list)) {
+                /* have not allocated a single request for the pool */
+                OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));
+                pool = NULL;
+        }
+        return pool;
+}
+
+static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
 {
         struct ptlrpc_request *request;
+        struct lustre_msg *reqmsg;
+
+        if (!pool)
+                return NULL;
+
+        spin_lock(&pool->prp_lock);
+
+        /* See if we have anything in a pool, and bail out if nothing,
+         * in writeout path, where this matters, this is safe to do, because
+         * nothing is lost in this case, and when some in-flight requests
+         * complete, this code will be called again. */
+        if (unlikely(list_empty(&pool->prp_req_list))) {
+                spin_unlock(&pool->prp_lock);
+                return NULL;
+        }
+
+        request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
+                             rq_list);
+        list_del(&request->rq_list);
+        spin_unlock(&pool->prp_lock);
+
+        LASSERT(request->rq_reqmsg);
+        LASSERT(request->rq_pool);
+
+        reqmsg = request->rq_reqmsg;
+        memset(request, 0, sizeof(*request));
+        request->rq_reqmsg = reqmsg;
+        request->rq_pool = pool;
+        request->rq_reqlen = pool->prp_rq_size;
+        return request;
+}
+        
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
+                                            int count, int *lengths,
+                                            char **bufs,
+                                            struct ptlrpc_request_pool *pool)
+{
+        struct ptlrpc_request *request = NULL;
         int rc;
         ENTRY;
 
@@ -193,7 +309,12 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         LASSERT((unsigned long)imp->imp_client > 0x1000);
         LASSERT(imp->imp_client != LP_POISON);
 
-        OBD_ALLOC(request, sizeof(*request));
+        if (pool)
+                request = ptlrpc_prep_req_from_pool(pool);
+
+        if (!request)
+                OBD_ALLOC(request, sizeof(*request));
+
         if (!request) {
                 CERROR("request allocation out of memory\n");
                 RETURN(NULL);
@@ -201,7 +322,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
 
         rc = lustre_pack_request(request, count, lengths, bufs);
         if (rc) {
-                CERROR("cannot pack request %d\n", rc);
+                LASSERT(!request->rq_pool);
                 OBD_FREE(request, sizeof(*request));
                 RETURN(NULL);
         }
@@ -241,6 +362,13 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         RETURN(request);
 }
 
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+                                       int count, int *lengths, char **bufs)
+{
+        return ptlrpc_prep_req_pool(imp, opcode, count, lengths, bufs, NULL);
+}
+
+
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
         struct ptlrpc_request_set *set;
@@ -1001,6 +1129,15 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
         RETURN(rc);
 }
 
+static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
+{
+        struct ptlrpc_request_pool *pool = request->rq_pool;
+
+        spin_lock(&pool->prp_lock);
+        list_add_tail(&request->rq_list, &pool->prp_req_list);
+        spin_unlock(&pool->prp_lock);
+}
+
 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
 {
         ENTRY;
@@ -1037,10 +1174,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 OBD_FREE(request->rq_repmsg, request->rq_replen);
                 request->rq_repmsg = NULL;
         }
-        if (request->rq_reqmsg != NULL) {
-                OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
-                request->rq_reqmsg = NULL;
-        }
         if (request->rq_export != NULL) {
                 class_export_put(request->rq_export);
                 request->rq_export = NULL;
@@ -1052,7 +1185,15 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_bulk != NULL)
                 ptlrpc_free_bulk(request->rq_bulk);
 
-        OBD_FREE(request, sizeof(*request));
+        if (request->rq_pool) {
+                __ptlrpc_free_req_to_pool(request);
+        } else {
+                if (request->rq_reqmsg != NULL) {
+                        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+                        request->rq_reqmsg = NULL;
+                }
+                OBD_FREE(request, sizeof(*request));
+        }
         EXIT;
 }
 
index d9ea5e1..ac0424e 100644 (file)
@@ -68,12 +68,27 @@ lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs)
 int lustre_pack_request (struct ptlrpc_request *req,
                          int count, int *lens, char **bufs)
 {
+        int reqlen;
         ENTRY;
 
-        req->rq_reqlen = lustre_msg_size (count, lens);
-        OBD_ALLOC(req->rq_reqmsg, req->rq_reqlen);
-        if (req->rq_reqmsg == NULL)
-                RETURN(-ENOMEM);
+        reqlen = lustre_msg_size (count, lens);
+        /* See if we got it from prealloc pool */
+        if (req->rq_reqmsg) {
+                /* Cannot return error here, that would create
+                   infinite loop in ptlrpc_prep_req_pool */
+                /* In this case ptlrpc_prep_req_from_pool sets req->rq_reqlen
+                   to maximum size that would fit into this preallocated
+                   request */
+                LASSERTF(req->rq_reqlen >= reqlen, "req->rq_reqlen %d, "
+                                                   "reqlen %d\n",req->rq_reqlen,
+                                                    reqlen);
+                memset(req->rq_reqmsg, 0, reqlen);
+        } else {
+                OBD_ALLOC(req->rq_reqmsg, reqlen);
+                if (req->rq_reqmsg == NULL)
+                        RETURN(-ENOMEM);
+        }
+        req->rq_reqlen = reqlen;
 
         lustre_init_msg (req->rq_reqmsg, count, lens, bufs);
         RETURN (0);
index 8490664..23b9cfa 100644 (file)
@@ -100,6 +100,10 @@ EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
 EXPORT_SYMBOL(ptlrpc_queue_wait);
 EXPORT_SYMBOL(ptlrpc_replay_req);
 EXPORT_SYMBOL(ptlrpc_restart_req);
+EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
+EXPORT_SYMBOL(ptlrpc_init_rq_pool);
+EXPORT_SYMBOL(ptlrpc_free_rq_pool);
+EXPORT_SYMBOL(ptlrpc_prep_req_pool);
 EXPORT_SYMBOL(ptlrpc_prep_req);
 EXPORT_SYMBOL(ptlrpc_free_req);
 EXPORT_SYMBOL(ptlrpc_unregister_reply);