Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index e07cae9..2307d20 100644 (file)
@@ -91,7 +91,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_srv_ni *srv_ni)
         }
 
         spin_lock_irqsave (&svc->srv_lock, flags);
-        list_add(&rqbd->rqbd_list, &srv_ni->sni_rqbds);
+        list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
         svc->srv_nbufs++;
         spin_unlock_irqrestore (&svc->srv_lock, flags);
 
@@ -191,6 +191,56 @@ timeval_sub(struct timeval *large, struct timeval *small)
                 (large->tv_usec - small->tv_usec);
 }
 
+static int
+ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
+{
+        struct ptlrpc_srv_ni              *srv_ni;
+        struct ptlrpc_request_buffer_desc *rqbd;
+        unsigned long                      flags;
+        int                                rc;
+
+        spin_lock_irqsave(&svc->srv_lock, flags);
+        if (list_empty (&svc->srv_idle_rqbds)) {
+                spin_unlock_irqrestore(&svc->srv_lock, flags);
+                return (0);
+        }
+
+        rqbd = list_entry(svc->srv_idle_rqbds.next,
+                          struct ptlrpc_request_buffer_desc,
+                          rqbd_list);
+        list_del (&rqbd->rqbd_list);
+
+        /* assume we will post successfully */
+        srv_ni = rqbd->rqbd_srv_ni;
+        srv_ni->sni_nrqbd_receiving++;
+        list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+
+        spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+        rc = ptlrpc_register_rqbd(rqbd);
+        if (rc == 0)
+                return (1);
+
+        spin_lock_irqsave(&svc->srv_lock, flags);
+
+        srv_ni->sni_nrqbd_receiving--;
+        list_del(&rqbd->rqbd_list);
+        list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
+
+        if (srv_ni->sni_nrqbd_receiving == 0) {
+                /* This service is off-air on this interface because all
+                 * its request buffers are busy.  Portals will have started
+                 * dropping incoming requests until more buffers get
+                 * posted */
+                CERROR("All %s %s request buffers busy\n",
+                       svc->srv_name, srv_ni->sni_ni->pni_name);
+        }
+
+        spin_unlock_irqrestore (&svc->srv_lock, flags);
+
+        return (-1);
+}
+
 struct ptlrpc_service *
 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                 int req_portal, int rep_portal, 
@@ -227,6 +277,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
         service->srv_handler = handler;
 
         INIT_LIST_HEAD(&service->srv_request_queue);
+        INIT_LIST_HEAD(&service->srv_idle_rqbds);
         INIT_LIST_HEAD(&service->srv_reply_queue);
 
         /* First initialise enough for early teardown */
@@ -235,7 +286,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
 
                 srv_ni->sni_service = service;
                 srv_ni->sni_ni = &ptlrpc_interfaces[i];
-                INIT_LIST_HEAD(&srv_ni->sni_rqbds);
+                INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
                 INIT_LIST_HEAD(&srv_ni->sni_active_replies);
         }
 
@@ -259,7 +310,12 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                                         srv_ni->sni_ni->pni_name);
                                 GOTO(failed, NULL);
                         }
-                        ptlrpc_register_rqbd (rqbd);
+
+                        /* We shouldn't be under memory pressure at
+                         * startup, so fail if we can't post all our
+                         * buffers at this time. */
+                        if (ptlrpc_server_post_idle_rqbds(service) <= 0)
+                                GOTO(failed, NULL);
                 }
         }
 
@@ -275,6 +331,26 @@ failed:
         return NULL;
 }
 
+static void
+ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+{
+        unsigned long  flags;
+        int            refcount;
+        
+        spin_lock_irqsave(&svc->srv_lock, flags);
+        svc->srv_n_active_reqs--;
+        refcount = --(req->rq_rqbd->rqbd_refcount);
+        if (refcount == 0) {
+                /* request buffer is now idle */
+                list_del(&req->rq_rqbd->rqbd_list);
+                list_add_tail(&req->rq_rqbd->rqbd_list,
+                              &svc->srv_idle_rqbds);
+        }
+        spin_unlock_irqrestore(&svc->srv_lock, flags);
+
+        ptlrpc_free_server_req(req);
+}
+
 static int 
 ptlrpc_server_handle_request (struct ptlrpc_service *svc)
 {
@@ -283,7 +359,6 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
         struct timeval         work_start;
         struct timeval         work_end;
         long                   timediff;
-        int                    refcount;
         int                    rc;
         ENTRY;
 
@@ -413,18 +488,8 @@ put_conn:
                 }
         }
 
-        spin_lock_irqsave(&svc->srv_lock, flags);
-        svc->srv_n_active_reqs--;
-        refcount = --(request->rq_rqbd->rqbd_refcount);
-        spin_unlock_irqrestore(&svc->srv_lock, flags);
-
-        if (refcount == 0) {
-                /* rqbd now idle: repost */
-                ptlrpc_register_rqbd(request->rq_rqbd);
-        }
-
-        ptlrpc_free_server_req(request);
-
+        ptlrpc_server_free_request(svc, request);
+        
         RETURN(1);
 }
 
@@ -529,6 +594,7 @@ int
 liblustre_check_services (void *arg) 
 {
         int  did_something = 0;
+        int  rc;
         struct list_head *tmp, *nxt;
         ENTRY;
         
@@ -548,12 +614,13 @@ liblustre_check_services (void *arg)
                 
                 svc->srv_nthreads++;
                 
-                while (ptlrpc_server_handle_reply (svc))
-                        did_something++;
-                        
-                while (ptlrpc_server_handle_request (svc))
-                        did_something++;
-                        
+                do {
+                        rc = ptlrpc_server_handle_reply(svc);
+                        rc |= ptlrpc_server_handle_request(svc);
+                        rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
+                        did_something |= rc;
+                } while (rc);
+                
                 svc->srv_nthreads--;
         }
 
@@ -571,6 +638,15 @@ void ptlrpc_daemonize(void)
         reparent_to_init();
 }
 
+static int
+ptlrpc_retry_rqbds(void *arg)
+{
+        struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
+        
+        svc->srv_rqbd_timeout = 0;
+        return (-ETIMEDOUT);
+}
+
 static int ptlrpc_main(void *arg)
 {
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
@@ -603,10 +679,14 @@ static int ptlrpc_main(void *arg)
         while ((thread->t_flags & SVC_STOPPING) == 0 ||
                svc->srv_n_difficult_replies != 0) {
                 /* Don't exit while there are replies to be handled */
-                struct l_wait_info lwi = { 0 };
-
+                struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+                                                     ptlrpc_retry_rqbds, svc);
+                                  
                 l_wait_event_exclusive (svc->srv_waitq,
-                              (thread->t_flags & SVC_STOPPING) != 0 ||
+                              ((thread->t_flags & SVC_STOPPING) != 0 &&
+                               svc->srv_n_difficult_replies == 0) ||
+                              (!list_empty(&svc->srv_idle_rqbds) &&
+                               svc->srv_rqbd_timeout == 0) ||
                               !list_empty (&svc->srv_reply_queue) ||
                               (!list_empty (&svc->srv_request_queue) &&
                                (svc->srv_n_difficult_replies == 0 ||
@@ -624,6 +704,14 @@ static int ptlrpc_main(void *arg)
                     (svc->srv_n_difficult_replies == 0 ||
                      svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
                         ptlrpc_server_handle_request (svc);
+
+                if (!list_empty(&svc->srv_idle_rqbds) &&
+                    ptlrpc_server_post_idle_rqbds(svc) < 0) {
+                        /* I just failed to repost request buffers.  Wait
+                         * for a timeout (unless something else happens)
+                         * before I try again */
+                        svc->srv_rqbd_timeout = HZ/10;
+                }
         }
 
         spin_lock_irqsave(&svc->srv_lock, flags);
@@ -756,8 +844,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                        service->srv_name, srv_ni->sni_ni->pni_name);
 
                 /* Unlink all the request buffers.  This forces a 'final'
-                 * event with its 'unlink' flag set for each rqbd */
-                list_for_each(tmp, &srv_ni->sni_rqbds) {
+                 * event with its 'unlink' flag set for each posted rqbd */
+                list_for_each(tmp, &srv_ni->sni_active_rqbds) {
                         struct ptlrpc_request_buffer_desc *rqbd =
                                 list_entry(tmp, struct ptlrpc_request_buffer_desc, 
                                            rqbd_list);
@@ -812,25 +900,27 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
-                req->rq_rqbd->rqbd_refcount--;
-                
-                ptlrpc_free_server_req(req);
+                service->srv_n_active_reqs++;
+
+                ptlrpc_server_free_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
+        LASSERT(service->srv_n_active_reqs == 0);
 
-        /* Now free all the request buffers since nothing references them
-         * any more... */
         for (i = 0; i < ptlrpc_ninterfaces; i++) {
                 srv_ni = &service->srv_interfaces[i];
+                LASSERT(list_empty(&srv_ni->sni_active_rqbds));
+        }
 
-                while (!list_empty(&srv_ni->sni_rqbds)) {
-                        struct ptlrpc_request_buffer_desc *rqbd =
-                                list_entry(srv_ni->sni_rqbds.next,
-                                           struct ptlrpc_request_buffer_desc, 
-                                           rqbd_list);
+        /* Now free all the request buffers since nothing references them
+         * any more... */
+        while (!list_empty(&service->srv_idle_rqbds)) {
+                struct ptlrpc_request_buffer_desc *rqbd =
+                        list_entry(service->srv_idle_rqbds.next,
+                                   struct ptlrpc_request_buffer_desc, 
+                                   rqbd_list);
 
-                        ptlrpc_free_rqbd(rqbd);
-                }
+                ptlrpc_free_rqbd(rqbd);
         }
 
         /* wait for all outstanding replies to complete (they were