Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 6507f1a..37d7bf9 100644 (file)
@@ -169,7 +169,7 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
 
 void
 ptlrpc_save_lock (struct ptlrpc_request *req,
-                  struct lustre_handle *lock, int mode)
+                  struct lustre_handle *lock, int mode, int no_ack)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         int                        idx;
@@ -181,12 +181,14 @@ ptlrpc_save_lock (struct ptlrpc_request *req,
         rs->rs_locks[idx] = *lock;
         rs->rs_modes[idx] = mode;
         rs->rs_difficult = 1;
+        rs->rs_no_ack = !!no_ack;
 }
 
 void
 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         struct ptlrpc_service *svc = rs->rs_service;
+        ENTRY;
 
 #ifdef CONFIG_SMP
         LASSERT (spin_is_locked (&svc->srv_lock));
@@ -194,13 +196,16 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         LASSERT (rs->rs_difficult);
         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
 
-        if (rs->rs_scheduled)                   /* being set up or already notified */
+        if (rs->rs_scheduled) {                  /* being set up or already notified */
+                EXIT;
                 return;
+        }
 
         rs->rs_scheduled = 1;
         list_del (&rs->rs_list);
         list_add (&rs->rs_list, &svc->srv_reply_queue);
         cfs_waitq_signal (&svc->srv_waitq);
+        EXIT;
 }
 
 void
@@ -208,6 +213,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
 {
         struct list_head   *tmp;
         struct list_head   *nxt;
+        ENTRY;
 
         /* Find any replies that have been committed and get their service
          * to attend to complete them. */
@@ -232,6 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
         }
 
         spin_unlock(&obd->obd_uncommitted_replies_lock);
+        EXIT;
 }
 
 static int
@@ -960,16 +967,13 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         req->rq_req_swab_mask = 0;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if (rc < 0) {
+        if (rc != 0) {
                 CERROR("error unpacking request: ptl %d from %s x"LPU64"\n",
                        svc->srv_req_portal, libcfs_id2str(req->rq_peer),
                        req->rq_xid);
                 goto err_req;
         }
 
-        if (rc > 0)
-                lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
-
         rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
         if (rc) {
                 CERROR ("error unpacking ptlrpc body: ptl %d from %s x"
@@ -1296,6 +1300,11 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
         if (!rs->rs_on_net) {
                 /* Off the net */
                 svc->srv_n_difficult_replies--;
+                if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+                        /* wake up threads that are being stopped by
+                           ptlrpc_unregister_service/ptlrpc_stop_threads
+                           and sleep waiting svr_n_difficult_replies == 0 */
+                        cfs_waitq_broadcast(&svc->srv_waitq);
                 spin_unlock(&svc->srv_lock);
 
                 class_export_put (exp);
@@ -1583,7 +1592,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
         struct l_wait_info lwi = { 0 };
+        ENTRY;
 
+        CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
         spin_lock(&svc->srv_lock);
         thread->t_flags = SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
@@ -1597,11 +1608,13 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         spin_unlock(&svc->srv_lock);
 
         OBD_FREE_PTR(thread);
+        EXIT;
 }
 
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
         struct ptlrpc_thread *thread;
+        ENTRY;
 
         spin_lock(&svc->srv_lock);
         while (!list_empty(&svc->srv_threads)) {
@@ -1614,6 +1627,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         }
 
         spin_unlock(&svc->srv_lock);
+        EXIT;
 }
 
 int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
@@ -1708,7 +1722,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         struct l_wait_info    lwi;
         struct list_head     *tmp;
         struct ptlrpc_reply_state *rs, *t;
+        ENTRY;
 
+        service->srv_is_stopping = 1;
         cfs_timer_disarm(&service->srv_at_timer);
 
         ptlrpc_stop_all_threads(service);
@@ -1838,7 +1854,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_timer_disarm(&service->srv_at_timer);
 
         OBD_FREE_PTR(service);
-        return 0;
+        RETURN(0);
 }
 
 /* Returns 0 if the service is healthy.