Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index fa23b91..37d7bf9 100644 (file)
@@ -169,7 +169,7 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
 
 void
 ptlrpc_save_lock (struct ptlrpc_request *req,
-                  struct lustre_handle *lock, int mode)
+                  struct lustre_handle *lock, int mode, int no_ack)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         int                        idx;
@@ -181,12 +181,14 @@ ptlrpc_save_lock (struct ptlrpc_request *req,
         rs->rs_locks[idx] = *lock;
         rs->rs_modes[idx] = mode;
         rs->rs_difficult = 1;
+        rs->rs_no_ack = !!no_ack;
 }
 
 void
 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         struct ptlrpc_service *svc = rs->rs_service;
+        ENTRY;
 
 #ifdef CONFIG_SMP
         LASSERT (spin_is_locked (&svc->srv_lock));
@@ -194,13 +196,16 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         LASSERT (rs->rs_difficult);
         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
 
-        if (rs->rs_scheduled)                   /* being set up or already notified */
+        if (rs->rs_scheduled) {                  /* being set up or already notified */
+                EXIT;
                 return;
+        }
 
         rs->rs_scheduled = 1;
         list_del (&rs->rs_list);
         list_add (&rs->rs_list, &svc->srv_reply_queue);
         cfs_waitq_signal (&svc->srv_waitq);
+        EXIT;
 }
 
 void
@@ -208,6 +213,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
 {
         struct list_head   *tmp;
         struct list_head   *nxt;
+        ENTRY;
 
         /* Find any replies that have been committed and get their service
          * to attend to complete them. */
@@ -232,6 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
         }
 
         spin_unlock(&obd->obd_uncommitted_replies_lock);
+        EXIT;
 }
 
 static int
@@ -400,41 +407,34 @@ failed:
         return NULL;
 }
 
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 {
-        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+        LASSERT(atomic_read(&req->rq_refcount) == 0);
+        LASSERT(list_empty(&req->rq_timed_list));
 
-        if (!atomic_dec_and_test(&req->rq_refcount))
-                return;
+         /* DEBUG_REQ() assumes the reply state of a request with a valid
+          * ref will not be destroyed until that reference is dropped. */
+        ptlrpc_req_drop_rs(req);
 
         sptlrpc_svc_ctx_decref(req);
 
-        LASSERT(list_empty(&req->rq_timed_list));
-        if (req != &rqbd->rqbd_req) {
+        if (req != &req->rq_rqbd->rqbd_req) {
                 /* NB request buffers use an embedded
                  * req if the incoming req unlinked the
                  * MD; this isn't one of them! */
                 OBD_FREE(req, sizeof(*req));
-        } else {
-                struct ptlrpc_service *svc = rqbd->rqbd_service;
-                /* schedule request buffer for re-use.
-                 * NB I can only do this after I've disposed of their
-                 * reqs; particularly the embedded req */
-                spin_lock(&svc->srv_lock);
-                list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
-                spin_unlock(&svc->srv_lock);
         }
 }
 
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
-        list_del(&req->rq_list);
-        ptlrpc_req_drop_rs(req);
-        ptlrpc_server_req_decref(req);
-}
-
-static void
-ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service             *svc = rqbd->rqbd_service;
@@ -442,12 +442,8 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
         struct list_head                  *tmp;
         struct list_head                  *nxt;
 
-        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
-                DEBUG_REQ(D_INFO, req, "free req");
-        spin_lock(&svc->srv_at_lock);
-        req->rq_sent_final = 1;
-        list_del_init(&req->rq_timed_list);
-        spin_unlock(&svc->srv_at_lock);
+        if (!atomic_dec_and_test(&req->rq_refcount))
+                return;
 
         spin_lock(&svc->srv_lock);
 
@@ -490,20 +486,49 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
                                 req = list_entry(rqbd->rqbd_reqs.next,
                                                  struct ptlrpc_request,
                                                  rq_list);
-                                __ptlrpc_server_free_request(req);
+                                list_del(&req->rq_list);
+                                ptlrpc_server_free_request(req);
                         }
 
                         spin_lock(&svc->srv_lock);
+                        /*
+                         * now all reqs including the embedded req has been
+                         * disposed, schedule request buffer for re-use.
+                         */
+                        LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+                        list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
                 }
+
+                spin_unlock(&svc->srv_lock);
         } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
-                 /* If we are low on memory, we are not interested in
-                    history */
-                list_del(&req->rq_history_list);
-                __ptlrpc_server_free_request(req);
+                /* If we are low on memory, we are not interested in history */
+                list_del(&req->rq_list);
+                list_del_init(&req->rq_history_list);
+                spin_unlock(&svc->srv_lock);
+
+                ptlrpc_server_free_request(req);
+        } else {
+                spin_unlock(&svc->srv_lock);
         }
+}
 
-        spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
+
+        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+                DEBUG_REQ(D_INFO, req, "free req");
 
+        spin_lock(&svc->srv_at_lock);
+        req->rq_sent_final = 1;
+        list_del_init(&req->rq_timed_list);
+        spin_unlock(&svc->srv_at_lock);
+
+        ptlrpc_server_drop_request(req);
 }
 
 /* This function makes sure dead exports are evicted in a timely manner.
@@ -585,26 +610,6 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         EXIT;
 }
 
-#ifndef __KERNEL__
-int lu_context_init(struct lu_context *ctx, __u32 tags)
-{
-        return 0;
-}
-
-void lu_context_fini(struct lu_context *ctx)
-{
-}
-
-void lu_context_enter(struct lu_context *ctx)
-{
-}
-
-void lu_context_exit(struct lu_context *ctx)
-{
-}
-
-#endif
-
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
@@ -644,7 +649,7 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
         /* Set timer for closest deadline */
         rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
                         rq_timed_list);
-        next = (__s32)(rq->rq_deadline - cfs_time_current_sec() - 
+        next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
                        at_early_margin);
         if (next <= 0)
                 ptlrpc_at_timer((unsigned long)svc);
@@ -670,9 +675,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
                 return(-ENOSYS);
 
-        DEBUG_REQ(D_ADAPTTO, req, "add timed %lds",
-                  req->rq_deadline - cfs_time_current_sec());
-
         spin_lock(&svc->srv_at_lock);
 
         if (unlikely(req->rq_sent_final)) {
@@ -684,7 +686,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         /* Add to sorted list.  Presumably latest rpcs will have the latest
            deadlines, so search backward. */
         list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
-                if (req->rq_deadline > rq->rq_deadline) {
+                if (req->rq_deadline >= rq->rq_deadline) {
                         list_add(&req->rq_timed_list, &rq->rq_timed_list);
                         found++;
                         break;
@@ -711,7 +713,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         struct ptlrpc_request *reqcopy;
         struct lustre_msg *reqmsg;
-        long olddl = req->rq_deadline - cfs_time_current_sec();
+        cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
         time_t newdl;
         int rc;
         ENTRY;
@@ -728,34 +730,43 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 RETURN(0);
 
         if (olddl < 0) {
-                CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not"
-                       " sending early reply. Increase at_early_margin (%d)?\n",
-                       req->rq_xid, olddl, at_early_margin);
+                DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
+                          "not sending early reply. Consider increasing "
+                          "at_early_margin (%d)?", olddl, at_early_margin);
+
                 /* Return an error so we're not re-added to the timed list. */
                 RETURN(-ETIMEDOUT);
         }
 
         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
-                CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT "
-                      "support\n");
+                DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
+                          "but no AT support");
                 RETURN(-ENOSYS);
         }
 
-        if (extra_time) {
-                /* Fake our processing time into the future to ask the
-                   clients for some extra amount of time */
-                extra_time += cfs_time_current_sec() -
-                        req->rq_arrival_time.tv_sec;
-                at_add(&svc->srv_at_estimate, extra_time);
+        if (req->rq_export && req->rq_export->exp_in_recovery) {
+                /* don't increase server estimates during recovery, and give
+                   clients the full recovery time. */
+                newdl = cfs_time_current_sec() +
+                        req->rq_export->exp_obd->obd_recovery_timeout;
+        } else {
+                if (extra_time) {
+                        /* Fake our processing time into the future to ask the
+                           clients for some extra amount of time */
+                        extra_time += cfs_time_current_sec() -
+                                req->rq_arrival_time.tv_sec;
+                        at_add(&svc->srv_at_estimate, extra_time);
+                }
+                newdl = req->rq_arrival_time.tv_sec +
+                        at_get(&svc->srv_at_estimate);
         }
-
-        newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
         if (req->rq_deadline >= newdl) {
                 /* We're not adding any time, no need to send an early reply
                    (e.g. maybe at adaptive_max) */
-                CDEBUG(D_ADAPTTO, "x"LPU64": Couldn't add any time (%ld/%ld), "
-                       "not sending early reply\n", req->rq_xid, olddl,
-                       newdl - cfs_time_current_sec());
+                DEBUG_REQ(D_WARNING, req, "Couldn't add any time ("
+                          CFS_DURATION_T"/"CFS_DURATION_T"), "
+                          "not sending early reply\n", olddl,
+                          cfs_time_sub(newdl, cfs_time_current_sec()));
                 RETURN(-ETIMEDOUT);
         }
 
@@ -780,8 +791,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
         if (req->rq_sent_final) {
-                CDEBUG(D_ADAPTTO, "x"LPU64": normal reply already sent out, "
-                       "abort sending early reply\n", req->rq_xid);
+                DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
+                          "abort sending early reply\n");
                 GOTO(out, rc = 0);
         }
 
@@ -890,8 +901,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                       at_get(&svc->srv_at_estimate), delay);
         }
 
-        /* ptlrpc_server_free_request may delete an entry out of the work
-           list */
+        /* ptlrpc_server_finish_request may delete an entry out of
+         * the work list */
         spin_lock(&svc->srv_at_lock);
         while (!list_empty(&work_list)) {
                 rq = list_entry(work_list.next, struct ptlrpc_request,
@@ -905,7 +916,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
                         ptlrpc_at_add_timed(rq);
 
-                ptlrpc_server_req_decref(rq);
+                ptlrpc_server_drop_request(rq);
                 spin_lock(&svc->srv_at_lock);
         }
         spin_unlock(&svc->srv_at_lock);
@@ -1000,8 +1011,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
 
         /* req_in handling should/must be fast */
         if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
-                DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
-                          cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
+                DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
+                          cfs_time_sub(cfs_time_current_sec(),
+                                       req->rq_arrival_time.tv_sec));
 
         /* Set rpc server deadline and add it to the timed list */
         deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
@@ -1028,7 +1040,7 @@ err_req:
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
         spin_unlock(&svc->srv_lock);
-        ptlrpc_server_free_request(req);
+        ptlrpc_server_finish_request(req);
 
         RETURN(1);
 }
@@ -1071,6 +1083,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         spin_unlock(&svc->srv_lock);
 
+        if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+                libcfs_debug_dumplog();
+
         do_gettimeofday(&work_start);
         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
         if (likely(svc->srv_stats != NULL)) {
@@ -1092,7 +1107,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         request->rq_session.lc_thread = thread;
         lu_context_enter(&request->rq_session);
 
-        CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
+        CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
 
         request->rq_svc_thread = thread;
         if (thread)
@@ -1112,11 +1127,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
            The deadline is increased if we send an early reply. */
         if (cfs_time_current_sec() > request->rq_deadline) {
                 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
-                          ": deadline %ld%+lds ago\n",
+                          ": deadline "CFS_DURATION_T":"CFS_DURATION_T"s ago\n",
                           libcfs_id2str(request->rq_peer),
-                          request->rq_deadline -
-                          request->rq_arrival_time.tv_sec,
-                          cfs_time_current_sec() - request->rq_deadline);
+                          cfs_time_sub(request->rq_deadline,
+                          request->rq_arrival_time.tv_sec),
+                          cfs_time_sub(cfs_time_current_sec(),
+                          request->rq_deadline));
                 goto put_rpc_export;
         }
 
@@ -1160,10 +1176,12 @@ put_conn:
 
         if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
                 DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
-                          "than estimated (%ld%+lds); client may timeout.",
-                          request->rq_xid, request->rq_deadline -
-                          request->rq_arrival_time.tv_sec,
-                          cfs_time_current_sec() - request->rq_deadline);
+                          "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
+                          " client may timeout.",
+                          request->rq_xid, cfs_time_sub(request->rq_deadline,
+                          request->rq_arrival_time.tv_sec),
+                          cfs_time_sub(cfs_time_current_sec(),
+                          request->rq_deadline));
         }
 
         do_gettimeofday(&work_end);
@@ -1189,13 +1207,15 @@ put_conn:
         }
         if (unlikely(request->rq_early_count)) {
                 DEBUG_REQ(D_ADAPTTO, request,
-                          "sent %d early replies before finishing in %lds",
+                          "sent %d early replies before finishing in "
+                          CFS_DURATION_T"s",
                           request->rq_early_count,
-                          work_end.tv_sec - request->rq_arrival_time.tv_sec);
+                          cfs_time_sub(work_end.tv_sec,
+                          request->rq_arrival_time.tv_sec));
         }
 
 out_req:
-        ptlrpc_server_free_request(request);
+        ptlrpc_server_finish_request(request);
 
         RETURN(1);
 }
@@ -1280,6 +1300,11 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
         if (!rs->rs_on_net) {
                 /* Off the net */
                 svc->srv_n_difficult_replies--;
+                if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+                        /* wake up threads that are being stopped by
+                           ptlrpc_unregister_service/ptlrpc_stop_threads
+                           and sleep waiting svr_n_difficult_replies == 0 */
+                        cfs_waitq_broadcast(&svc->srv_waitq);
                 spin_unlock(&svc->srv_lock);
 
                 class_export_put (exp);
@@ -1567,7 +1592,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
         struct l_wait_info lwi = { 0 };
+        ENTRY;
 
+        CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
         spin_lock(&svc->srv_lock);
         thread->t_flags = SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
@@ -1581,11 +1608,13 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         spin_unlock(&svc->srv_lock);
 
         OBD_FREE_PTR(thread);
+        EXIT;
 }
 
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
         struct ptlrpc_thread *thread;
+        ENTRY;
 
         spin_lock(&svc->srv_lock);
         while (!list_empty(&svc->srv_threads)) {
@@ -1598,6 +1627,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         }
 
         spin_unlock(&svc->srv_lock);
+        EXIT;
 }
 
 int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
@@ -1614,7 +1644,7 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
                 if (rc == -EMFILE)
                         break;
                 if (rc) {
-                        CERROR("cannot start %s thread #%d: rc %d\n", 
+                        CERROR("cannot start %s thread #%d: rc %d\n",
                                svc->srv_thread_name, i, rc);
                         ptlrpc_stop_all_threads(svc);
                 }
@@ -1662,7 +1692,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         d.thread = thread;
 
         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-        
+
           /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in ptlrpc_daemonize() right away.
          */
@@ -1692,7 +1722,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         struct l_wait_info    lwi;
         struct list_head     *tmp;
         struct ptlrpc_reply_state *rs, *t;
+        ENTRY;
 
+        service->srv_is_stopping = 1;
         cfs_timer_disarm(&service->srv_at_timer);
 
         ptlrpc_stop_all_threads(service);
@@ -1767,7 +1799,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-                ptlrpc_server_free_request(req);
+                ptlrpc_server_finish_request(req);
         }
         while (!list_empty(&service->srv_request_queue)) {
                 struct ptlrpc_request *req =
@@ -1779,7 +1811,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
 
-                ptlrpc_server_free_request(req);
+                ptlrpc_server_finish_request(req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
         LASSERT(service->srv_n_active_reqs == 0);
@@ -1822,7 +1854,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_timer_disarm(&service->srv_at_timer);
 
         OBD_FREE_PTR(service);
-        return 0;
+        RETURN(0);
 }
 
 /* Returns 0 if the service is healthy.