Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 388fd01..512f62d 100644 (file)
 #include <portals/types.h>
 #include "ptlrpc_internal.h"
 
+/* forward ref */
+static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
+
+
 static LIST_HEAD (ptlrpc_all_services);
 static spinlock_t ptlrpc_all_services_lock = SPIN_LOCK_UNLOCKED;
 
@@ -117,6 +121,29 @@ ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
         OBD_FREE (rqbd, sizeof (*rqbd));
 }
 
+int
+ptlrpc_grow_req_bufs(struct ptlrpc_srv_ni *srv_ni)
+{
+        struct ptlrpc_service             *svc = srv_ni->sni_service;
+        struct ptlrpc_request_buffer_desc *rqbd;
+        int                                i;
+
+        for (i = 0; i < svc->srv_nbuf_per_group; i++) {
+                rqbd = ptlrpc_alloc_rqbd(srv_ni);
+
+                if (rqbd == NULL) {
+                        CERROR ("%s/%s: Can't allocate request buffer\n",
+                                svc->srv_name, srv_ni->sni_ni->pni_name);
+                        return (-ENOMEM);
+                }
+
+                if (ptlrpc_server_post_idle_rqbds(svc) < 0)
+                        return (-EAGAIN);
+        }
+
+        return (0);
+}
+
 void
 ptlrpc_save_llog_lock(struct ptlrpc_request *req, struct llog_create_locks *lcl)
 {
@@ -225,28 +252,32 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
         struct ptlrpc_request_buffer_desc *rqbd;
         unsigned long                      flags;
         int                                rc;
+        int                                posted = 0;
 
-        spin_lock_irqsave(&svc->srv_lock, flags);
-        if (list_empty (&svc->srv_idle_rqbds)) {
+        for (;;) {
+                spin_lock_irqsave(&svc->srv_lock, flags);
+                if (list_empty (&svc->srv_idle_rqbds)) {
+                        spin_unlock_irqrestore(&svc->srv_lock, flags);
+                        return (posted);
+                }
+                rqbd = list_entry(svc->srv_idle_rqbds.next,
+                                  struct ptlrpc_request_buffer_desc,
+                                  rqbd_list);
+                list_del (&rqbd->rqbd_list);
+
+                /* assume we will post successfully */
+                srv_ni = rqbd->rqbd_srv_ni;
+                srv_ni->sni_nrqbd_receiving++;
+                list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
                 spin_unlock_irqrestore(&svc->srv_lock, flags);
-                return (0);
-        }
-
-        rqbd = list_entry(svc->srv_idle_rqbds.next,
-                          struct ptlrpc_request_buffer_desc,
-                          rqbd_list);
-        list_del (&rqbd->rqbd_list);
 
-        /* assume we will post successfully */
-        srv_ni = rqbd->rqbd_srv_ni;
-        srv_ni->sni_nrqbd_receiving++;
-        list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+                rc = ptlrpc_register_rqbd(rqbd);
+                if (rc != 0)
+                        break;
 
-        spin_unlock_irqrestore(&svc->srv_lock, flags);
+                posted = 1;
+        }
 
-        rc = ptlrpc_register_rqbd(rqbd);
-        if (rc == 0)
-                return (1);
 
         spin_lock_irqsave(&svc->srv_lock, flags);
 
@@ -275,11 +306,10 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                 struct proc_dir_entry *proc_entry)
 {
         int                                i;
-        int                                j;
+        int                                rc;
         int                                ssize;
         struct ptlrpc_service             *service;
         struct ptlrpc_srv_ni              *srv_ni;
-        struct ptlrpc_request_buffer_desc *rqbd;
         ENTRY;
 
         LASSERT (ptlrpc_ninterfaces > 0);
@@ -297,6 +327,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
         INIT_LIST_HEAD(&service->srv_threads);
         init_waitqueue_head(&service->srv_waitq);
 
+        service->srv_nbuf_per_group = nbufs;
         service->srv_max_req_size = max_req_size;
         service->srv_buf_size = bufsize;
         service->srv_rep_portal = rep_portal;
@@ -328,22 +359,11 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                 CDEBUG (D_NET, "%s: initialising interface %s\n", name,
                         srv_ni->sni_ni->pni_name);
 
-                for (j = 0; j < nbufs; j++) {
-                        rqbd = ptlrpc_alloc_rqbd (srv_ni);
-                        
-                        if (rqbd == NULL) {
-                                CERROR ("%s.%d: Can't allocate request %d "
-                                        "on %s\n", name, i, j, 
-                                        srv_ni->sni_ni->pni_name);
-                                GOTO(failed, NULL);
-                        }
-
-                        /* We shouldn't be under memory pressure at
-                         * startup, so fail if we can't post all our
-                         * buffers at this time. */
-                        if (ptlrpc_server_post_idle_rqbds(service) <= 0)
-                                GOTO(failed, NULL);
-                }
+                rc = ptlrpc_grow_req_bufs(srv_ni);
+                /* We shouldn't be under memory pressure at startup, so
+                 * fail if we can't post all our buffers at this time. */
+                if (rc != 0)
+                        GOTO(failed, NULL);
         }
 
         if (proc_entry != NULL)
@@ -378,6 +398,7 @@ ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *re
         ptlrpc_free_server_req(req);
 }
 
+static char str[PTL_NALFMT_SIZE];
 static int 
 ptlrpc_server_handle_request (struct ptlrpc_service *svc)
 {
@@ -388,7 +409,6 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
         struct timeval         work_end;
         long                   timediff;
         int                    rc;
-        char                   str[PTL_NALFMT_SIZE];
         ENTRY;
 
         spin_lock_irqsave (&svc->srv_lock, flags);
@@ -448,8 +468,9 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
          * client's timeout is similar to mine, she'll be timing out this
          * REQ anyway (bug 1502) */
         if (timediff / 1000000 > (long)obd_timeout) {
-                CERROR("Dropping timed-out request from %s: %ld seconds old\n",
-                       ptlrpc_peernid2str(&request->rq_peer, str), 
+                CERROR("Dropping timed-out opc %d request from %s"
+                       ": %ld seconds old\n", request->rq_reqmsg->opc,
+                       ptlrpc_peernid2str(&request->rq_peer, str),
                        timediff / 1000000);
                 goto out;
         }
@@ -481,8 +502,9 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
                request->rq_peer.peer_ni->pni_name,
                ptlrpc_peernid2str(&request->rq_peer, str),
                request->rq_reqmsg->opc);
-
+        request->rq_svc = svc;
         rc = svc->srv_handler(request);
+        request->rq_svc = NULL;
         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
                "%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
                (request->rq_export ?
@@ -684,6 +706,26 @@ void ptlrpc_daemonize(void)
         reparent_to_init();
 }
 
+static void
+ptlrpc_check_rqbd_pools(struct ptlrpc_service *svc)
+{
+        struct ptlrpc_srv_ni  *sni;
+        int                    i;
+        int                    avail = 0;
+        int                    low_water = svc->srv_nbuf_per_group/2;
+
+        for (i = 0; i < ptlrpc_ninterfaces; i++) {
+                sni = &svc->srv_interfaces[i];
+
+                avail += sni->sni_nrqbd_receiving;
+                /* NB I'm not locking; just looking. */
+                if (sni->sni_nrqbd_receiving <= low_water)
+                        ptlrpc_grow_req_bufs(sni);
+        }
+
+        lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR, avail);
+}
+
 static int
 ptlrpc_retry_rqbds(void *arg)
 {
@@ -743,7 +785,9 @@ static int ptlrpc_main(void *arg)
                                 svc->srv_n_active_reqs <
                                 (svc->srv_nthreads - 1))),
                               &lwi);
-
+                
+                ptlrpc_check_rqbd_pools(svc);
+                
                 if (!list_empty (&svc->srv_reply_queue))
                         ptlrpc_server_handle_reply (svc);