Whamcloud - gitweb
LU-1820 ptlrpc: fix put export for hp request
authorAlexander.Boyko <alexander_boyko@xyratex.com>
Sat, 30 Mar 2013 07:43:02 +0000 (11:43 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 22 Apr 2013 15:52:10 +0000 (11:52 -0400)
Fix for ASSERTION(cfs_list_empty(&exp->exp_hp_rpcs)) failed.

The root cause is in target_handle_connect()
if (req->rq_export != NULL)
class_export_put(req->rq_export);

/* request takes one export refcount */
req->rq_export = class_export_get(export);

For previous export it release export reference, but
rq_exp_list still in exp_hp_rpcs queue, after connect
requests became hp requsts. ptltpc_server_handle_req put last
reference for export and export go to zombie list with non
empty exp_hp_rpcs.
This patch add rq_exp_list move to new export at
target_handle_connect(), and cleanup for
id I3d312c28481143b557d7987501c975c7e287885e.

Fixed export reference for request.  Before this patch, request
take one reference by class_conn2export() and take another reference
and increment export rpc counter by class_export_rpc_get(). One
export reference for request is enough.

Signed-off-by: Alexander Boyko <alexander_boyko@xyratex.com>
Xyratex-bug-id: MRP-881
Change-Id: I6da198fe9b50e85b09f8fe74789e6c6f5bfd534d
Reviewed-on: http://review.whamcloud.com/5922
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_net.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ptlrpc/service.c

index 22deb5e..c1bfedb 100644 (file)
@@ -2904,6 +2904,8 @@ int liblustre_check_services(void *arg);
 void ptlrpc_daemonize(char *name);
 int ptlrpc_service_health_check(struct ptlrpc_service *);
 void ptlrpc_server_drop_request(struct ptlrpc_request *req);
+void ptlrpc_request_change_export(struct ptlrpc_request *req,
+                                 struct obd_export *export);
 
 #ifdef __KERNEL__
 int ptlrpc_hr_init(void);
index 2e7fd30..574a8d8 100644 (file)
@@ -230,21 +230,19 @@ extern void (*class_export_dump_hook)(struct obd_export *);
 
 #endif
 
-#define class_export_rpc_get(exp)                                       \
+#define class_export_rpc_inc(exp)                                       \
 ({                                                                      \
         cfs_atomic_inc(&(exp)->exp_rpc_count);                          \
         CDEBUG(D_INFO, "RPC GETting export %p : new rpc_count %d\n",    \
                (exp), cfs_atomic_read(&(exp)->exp_rpc_count));          \
-        class_export_get(exp);                                          \
 })
 
-#define class_export_rpc_put(exp)                                       \
+#define class_export_rpc_dec(exp)                                       \
 ({                                                                      \
         LASSERT_ATOMIC_POS(&exp->exp_rpc_count);                        \
         cfs_atomic_dec(&(exp)->exp_rpc_count);                          \
         CDEBUG(D_INFO, "RPC PUTting export %p : new rpc_count %d\n",    \
                (exp), cfs_atomic_read(&(exp)->exp_rpc_count));          \
-        class_export_put(exp);                                          \
 })
 
 #define class_export_lock_get(exp, lock)                                \
index 2e3a1d5..d89a90f 100644 (file)
@@ -1113,11 +1113,7 @@ dont_check_exports:
          * XXX this will go away when shaver stops sending the "connect" handle
          * in the real "remote handle" field of the request --phik 24 Apr 2003
          */
-        if (req->rq_export != NULL)
-                class_export_put(req->rq_export);
-
-       /* Request takes one export reference. */
-       req->rq_export = class_export_get(export);
+       ptlrpc_request_change_export(req, export);
 
        spin_lock(&export->exp_lock);
        if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
@@ -1323,7 +1319,7 @@ EXPORT_SYMBOL(target_destroy_export);
  */
 static void target_request_copy_get(struct ptlrpc_request *req)
 {
-        class_export_rpc_get(req->rq_export);
+       class_export_rpc_inc(req->rq_export);
         LASSERT(cfs_list_empty(&req->rq_list));
         CFS_INIT_LIST_HEAD(&req->rq_replay_list);
 
@@ -1339,7 +1335,7 @@ static void target_request_copy_put(struct ptlrpc_request *req)
         LASSERT_ATOMIC_POS(&req->rq_export->exp_replay_count);
 
         cfs_atomic_dec(&req->rq_export->exp_replay_count);
-        class_export_rpc_put(req->rq_export);
+       class_export_rpc_dec(req->rq_export);
         ptlrpc_server_drop_request(req);
 }
 
index 63fee6d..65ffcaa 100644 (file)
@@ -984,15 +984,55 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        }
 }
 
+/** Change request export and move hp request from old export to new */
+void ptlrpc_request_change_export(struct ptlrpc_request *req,
+                                 struct obd_export *export)
+{
+       if (req->rq_export != NULL) {
+               if (!cfs_list_empty(&req->rq_exp_list)) {
+                       /* remove rq_exp_list from last export */
+                       spin_lock_bh(&req->rq_export->exp_rpc_lock);
+                       cfs_list_del_init(&req->rq_exp_list);
+                       spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+
+                       /* export has one reference already, so it`s safe to
+                        * add req to export queue here and get another
+                        * reference for request later */
+                       spin_lock_bh(&export->exp_rpc_lock);
+                       cfs_list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+                       spin_unlock_bh(&export->exp_rpc_lock);
+               }
+               class_export_rpc_dec(req->rq_export);
+               class_export_put(req->rq_export);
+       }
+
+       /* request takes one export refcount */
+       req->rq_export = class_export_get(export);
+       class_export_rpc_inc(export);
+
+       return;
+}
+
 /**
  * to finish a request: stop sending more early replies, and release
- * the request. should be called after we finished handling the request.
+ * the request.
  */
 static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
                                         struct ptlrpc_request *req)
 {
        ptlrpc_server_hpreq_fini(req);
 
+       ptlrpc_server_drop_request(req);
+}
+
+/**
+ * to finish a active request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_active_request(
+                                       struct ptlrpc_service_part *svcpt,
+                                       struct ptlrpc_request *req)
+{
        spin_lock(&svcpt->scp_req_lock);
        ptlrpc_nrs_req_stop_nolock(req);
        svcpt->scp_nreqs_active--;
@@ -1002,7 +1042,10 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 
        ptlrpc_nrs_req_finalize(req);
 
-       ptlrpc_server_drop_request(req);
+       if (req->rq_export != NULL)
+               class_export_rpc_dec(req->rq_export);
+
+       ptlrpc_server_finish_request(svcpt, req);
 }
 
 /**
@@ -1334,7 +1377,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
                 GOTO(out, rc = -ENODEV);
 
         /* RPC ref */
-        class_export_rpc_get(reqcopy->rq_export);
+       class_export_rpc_inc(reqcopy->rq_export);
         if (reqcopy->rq_export->exp_obd &&
             reqcopy->rq_export->exp_obd->obd_fail)
                 GOTO(out_put, rc = -ENODEV);
@@ -1358,7 +1401,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
         ptlrpc_req_drop_rs(reqcopy);
 
 out_put:
-        class_export_rpc_put(reqcopy->rq_export);
+       class_export_rpc_dec(reqcopy->rq_export);
         class_export_put(reqcopy->rq_export);
 out:
         sptlrpc_svc_ctx_decref(reqcopy);
@@ -1684,14 +1727,23 @@ ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, bool force)
 static struct ptlrpc_request *
 ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
 {
-       struct ptlrpc_request *req;
+       struct ptlrpc_request *req = NULL;
        ENTRY;
 
+       spin_lock(&svcpt->scp_req_lock);
+#ifndef __KERNEL__
+       /* !@%$# liblustre only has 1 thread */
+       if (cfs_atomic_read(&svcpt->scp_nreps_difficult) != 0) {
+               spin_unlock(&svcpt->scp_req_lock);
+               RETURN(NULL);
+       }
+#endif
+
        if (ptlrpc_server_high_pending(svcpt, force)) {
                req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
                if (req != NULL) {
                        svcpt->scp_hreq_count++;
-                       RETURN(req);
+                       goto got_request;
                }
        }
 
@@ -1699,10 +1751,24 @@ ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
                req = ptlrpc_nrs_req_get_nolock(svcpt, false, force);
                if (req != NULL) {
                        svcpt->scp_hreq_count = 0;
-                       RETURN(req);
+                       goto got_request;
                }
        }
+
+       spin_unlock(&svcpt->scp_req_lock);
        RETURN(NULL);
+
+got_request:
+       svcpt->scp_nreqs_active++;
+       if (req->rq_hp)
+               svcpt->scp_nhreqs_active++;
+
+       spin_unlock(&svcpt->scp_req_lock);
+
+       if (likely(req->rq_export))
+               class_export_rpc_inc(req->rq_export);
+
+       RETURN(req);
 }
 
 /**
@@ -1802,7 +1868,6 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt)
         req->rq_export = class_conn2export(
                 lustre_msg_get_handle(req->rq_reqmsg));
         if (req->rq_export) {
-               class_export_rpc_get(req->rq_export);
                 rc = ptlrpc_check_req(req);
                 if (rc == 0) {
                         rc = sptlrpc_target_export_check(req->rq_export, req);
@@ -1837,19 +1902,13 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt)
 
         /* Move it over to the request processing queue */
        rc = ptlrpc_server_request_add(svcpt, req);
-       if (rc) {
-               ptlrpc_server_hpreq_fini(req);
+       if (rc)
                GOTO(err_req, rc);
-       }
+
        cfs_waitq_signal(&svcpt->scp_waitq);
        RETURN(1);
 
 err_req:
-       if (req->rq_export)
-               class_export_rpc_put(req->rq_export);
-       spin_lock(&svcpt->scp_req_lock);
-       svcpt->scp_nreqs_active++;
-       spin_unlock(&svcpt->scp_req_lock);
        ptlrpc_server_finish_request(svcpt, req);
 
        RETURN(1);
@@ -1864,7 +1923,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                             struct ptlrpc_thread *thread)
 {
        struct ptlrpc_service *svc = svcpt->scp_service;
-        struct obd_export     *export = NULL;
         struct ptlrpc_request *request;
         struct timeval         work_start;
         struct timeval         work_end;
@@ -1873,19 +1931,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
         int                    fail_opc = 0;
         ENTRY;
 
-       spin_lock(&svcpt->scp_req_lock);
-#ifndef __KERNEL__
-       /* !@%$# liblustre only has 1 thread */
-       if (cfs_atomic_read(&svcpt->scp_nreps_difficult) != 0) {
-               spin_unlock(&svcpt->scp_req_lock);
-               RETURN(0);
-       }
-#endif
        request = ptlrpc_server_request_get(svcpt, false);
-       if (request == NULL) {
-               spin_unlock(&svcpt->scp_req_lock);
-                RETURN(0);
-        }
+       if (request == NULL)
+               RETURN(0);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
@@ -1893,19 +1941,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
 
         if (unlikely(fail_opc)) {
-                if (request->rq_export && request->rq_ops) {
-                       spin_unlock(&svcpt->scp_req_lock);
-
+               if (request->rq_export && request->rq_ops)
                        OBD_FAIL_TIMEOUT(fail_opc, 4);
-
-                       spin_lock(&svcpt->scp_req_lock);
-               }
        }
-       svcpt->scp_nreqs_active++;
-       if (request->rq_hp)
-               svcpt->scp_nhreqs_active++;
-
-       spin_unlock(&svcpt->scp_req_lock);
 
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
 
@@ -1925,7 +1963,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                                    at_get(&svcpt->scp_at_estimate));
         }
 
-       export = request->rq_export;
        rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
         if (rc) {
                 CERROR("Failure to initialize session: %d\n", rc);
@@ -2033,9 +2070,7 @@ put_conn:
         }
 
 out_req:
-       if (export != NULL)
-               class_export_rpc_put(export);
-       ptlrpc_server_finish_request(svcpt, request);
+       ptlrpc_server_finish_active_request(svcpt, request);
 
        RETURN(1);
 }
@@ -3038,18 +3073,12 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
 
                        cfs_list_del(&req->rq_list);
                        svcpt->scp_nreqs_incoming--;
-                       svcpt->scp_nreqs_active++;
                        ptlrpc_server_finish_request(svcpt, req);
                }
 
                while (ptlrpc_server_request_pending(svcpt, true)) {
                        req = ptlrpc_server_request_get(svcpt, true);
-                       svcpt->scp_nreqs_active++;
-                       ptlrpc_server_hpreq_fini(req);
-
-                       if (req->rq_export != NULL)
-                               class_export_rpc_put(req->rq_export);
-                       ptlrpc_server_finish_request(svcpt, req);
+                       ptlrpc_server_finish_active_request(svcpt, req);
                }
 
                LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));