Whamcloud - gitweb
LU-793 ptlrpc: fix ptlrpc_request_change_export()
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 15e4802..ca2aa01 100644 (file)
@@ -710,7 +710,7 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc,
  */
 struct ptlrpc_service *
 ptlrpc_register_service(struct ptlrpc_service_conf *conf,
-                       cfs_proc_dir_entry_t *proc_entry)
+                       struct proc_dir_entry *proc_entry)
 {
        struct ptlrpc_service_cpt_conf  *cconf = &conf->psc_cpt;
        struct ptlrpc_service           *service;
@@ -989,19 +989,21 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
                                  struct obd_export *export)
 {
        if (req->rq_export != NULL) {
-               if (!cfs_list_empty(&req->rq_exp_list)) {
-                       /* remove rq_exp_list from last export */
-                       spin_lock_bh(&req->rq_export->exp_rpc_lock);
-                       cfs_list_del_init(&req->rq_exp_list);
-                       spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+               LASSERT(!list_empty(&req->rq_exp_list));
+               /* remove rq_exp_list from last export */
+               spin_lock_bh(&req->rq_export->exp_rpc_lock);
+               list_del_init(&req->rq_exp_list);
+               spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+               /* export has one reference already, so it`s safe to
+                * add req to export queue here and get another
+                * reference for request later */
+               spin_lock_bh(&export->exp_rpc_lock);
+               if (req->rq_ops != NULL) /* hp request */
+                       list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+               else
+                       list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+               spin_unlock_bh(&export->exp_rpc_lock);
 
-                       /* export has one reference already, so it`s safe to
-                        * add req to export queue here and get another
-                        * reference for request later */
-                       spin_lock_bh(&export->exp_rpc_lock);
-                       cfs_list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
-                       spin_unlock_bh(&export->exp_rpc_lock);
-               }
                class_export_rpc_dec(req->rq_export);
                class_export_put(req->rq_export);
        }
@@ -1022,6 +1024,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 {
        ptlrpc_server_hpreq_fini(req);
 
+       if (req->rq_session.lc_thread != NULL) {
+               lu_context_exit(&req->rq_session);
+               lu_context_fini(&req->rq_session);
+       }
+
        ptlrpc_server_drop_request(req);
 }
 
@@ -1515,6 +1522,45 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
        RETURN(1); /* return "did_something" for liblustre */
 }
 
+/* Check if we are already handling earlier incarnation of this request.
+ * Called under &req->rq_export->exp_rpc_lock locked */
+static int ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request   *tmp = NULL;
+
+       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ||
+           (cfs_atomic_read(&req->rq_export->exp_rpc_count) == 0))
+               return 0;
+
+       /* bulk request are aborted upon reconnect, don't try to
+        * find a match */
+       if (req->rq_bulk_write || req->rq_bulk_read)
+               return 0;
+
+       /* This list should not be longer than max_requests in
+        * flights on the client, so it is not all that long.
+        * Also we only hit this codepath in case of a resent
+        * request which makes it even more rarely hit */
+       cfs_list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs,
+                               rq_exp_list) {
+               /* Found duplicate one */
+               if (tmp->rq_xid == req->rq_xid)
+                       goto found;
+       }
+       cfs_list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs,
+                               rq_exp_list) {
+               /* Found duplicate one */
+               if (tmp->rq_xid == req->rq_xid)
+                       goto found;
+       }
+       return 0;
+
+found:
+       DEBUG_REQ(D_HA, req, "Found duplicate req in processing\n");
+       DEBUG_REQ(D_HA, tmp, "Request being processed\n");
+       return -EBUSY;
+}
+
 /**
  * Put the request to the export list if the request may become
  * a high priority one.
@@ -1522,7 +1568,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
 static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                                    struct ptlrpc_request *req)
 {
-       int rc = 0;
+       cfs_list_t      *list;
+       int              rc, hp = 0;
+
        ENTRY;
 
        if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
@@ -1531,48 +1579,61 @@ static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                        RETURN(rc);
                LASSERT(rc == 0);
        }
-       if (req->rq_export && req->rq_ops) {
-               /* Perform request specific check. We should do this check
-                * before the request is added into exp_hp_rpcs list otherwise
-                * it may hit swab race at LU-1044. */
-               if (req->rq_ops->hpreq_check) {
-                       rc = req->rq_ops->hpreq_check(req);
-                       /**
-                        * XXX: Out of all current
-                        * ptlrpc_hpreq_ops::hpreq_check(), only
-                        * ldlm_cancel_hpreq_check() can return an error code;
-                        * other functions assert in similar places, which seems
-                        * odd. What also does not seem right is that handlers
-                        * for those RPCs do not assert on the same checks, but
-                        * rather handle the error cases. e.g. see
-                        * ost_rw_hpreq_check(), and ost_brw_read(),
-                        * ost_brw_write().
-                        */
-                       if (rc < 0)
-                               RETURN(rc);
-                       LASSERT(rc == 0 || rc == 1);
+       if (req->rq_export) {
+               if (req->rq_ops) {
+                       /* Perform request specific check. We should do this
+                        * check before the request is added into exp_hp_rpcs
+                        * list otherwise it may hit swab race at LU-1044. */
+                       if (req->rq_ops->hpreq_check) {
+                               rc = req->rq_ops->hpreq_check(req);
+                               /**
+                                * XXX: Out of all current
+                                * ptlrpc_hpreq_ops::hpreq_check(), only
+                                * ldlm_cancel_hpreq_check() can return an
+                                * error code; other functions assert in
+                                * similar places, which seems odd.
+                                * What also does not seem right is that
+                                * handlers for those RPCs do not assert
+                                * on the same checks, but rather handle the
+                                * error cases. e.g. see ost_rw_hpreq_check(),
+                                * and ost_brw_read(), ost_brw_write().
+                                */
+                               if (rc < 0)
+                                       RETURN(rc);
+                               LASSERT(rc == 0 || rc == 1);
+                               hp = rc;
+                       }
+                       list = &req->rq_export->exp_hp_rpcs;
+               } else {
+                       list = &req->rq_export->exp_reg_rpcs;
                }
 
+               /* do search for duplicated xid and the adding to the list
+                * atomically */
                spin_lock_bh(&req->rq_export->exp_rpc_lock);
-               cfs_list_add(&req->rq_exp_list,
-                            &req->rq_export->exp_hp_rpcs);
+               rc = ptlrpc_server_check_resend_in_progress(req);
+               if (rc < 0) {
+                       spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+                       RETURN(rc);
+               }
+               cfs_list_add(&req->rq_exp_list, list);
                spin_unlock_bh(&req->rq_export->exp_rpc_lock);
        }
 
-       ptlrpc_nrs_req_initialize(svcpt, req, rc);
+       ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
 
-       RETURN(rc);
+       RETURN(hp);
 }
 
 /** Remove the request from the export list. */
 static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
 {
-        ENTRY;
-        if (req->rq_export && req->rq_ops) {
-                /* refresh lock timeout again so that client has more
-                 * room to send lock cancel RPC. */
-                if (req->rq_ops->hpreq_fini)
-                        req->rq_ops->hpreq_fini(req);
+       ENTRY;
+       if (req->rq_export) {
+               /* refresh lock timeout again so that client has more
+                * room to send lock cancel RPC. */
+               if (req->rq_ops && req->rq_ops->hpreq_fini)
+                       req->rq_ops->hpreq_fini(req);
 
                spin_lock_bh(&req->rq_export->exp_rpc_lock);
                cfs_list_del_init(&req->rq_exp_list);
@@ -1898,11 +1959,25 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                 goto err_req;
         }
 
-        req->rq_svc_thread = thread;
+       req->rq_svc_thread = thread;
+       if (thread != NULL) {
+               /* initialize request session, it is needed for request
+                * processing by target */
+               rc = lu_context_init(&req->rq_session, LCT_SERVER_SESSION |
+                                                      LCT_NOREF);
+               if (rc) {
+                       CERROR("%s: failure to initialize session: rc = %d\n",
+                              thread->t_name, rc);
+                       goto err_req;
+               }
+               req->rq_session.lc_thread = thread;
+               lu_context_enter(&req->rq_session);
+               req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+       }
 
-        ptlrpc_at_add_timed(req);
+       ptlrpc_at_add_timed(req);
 
-        /* Move it over to the request processing queue */
+       /* Move it over to the request processing queue */
        rc = ptlrpc_server_request_add(svcpt, req);
        if (rc)
                GOTO(err_req, rc);
@@ -1924,14 +1999,14 @@ static int
 ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                             struct ptlrpc_thread *thread)
 {
-       struct ptlrpc_service *svc = svcpt->scp_service;
-        struct ptlrpc_request *request;
-        struct timeval         work_start;
-        struct timeval         work_end;
-        long                   timediff;
-        int                    rc;
-        int                    fail_opc = 0;
-        ENTRY;
+       struct ptlrpc_service   *svc = svcpt->scp_service;
+       struct ptlrpc_request   *request;
+       struct timeval           work_start;
+       struct timeval           work_end;
+       long                     timediff;
+       int                      fail_opc = 0;
+
+       ENTRY;
 
        request = ptlrpc_server_request_get(svcpt, false);
        if (request == NULL)
@@ -1965,23 +2040,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                                    at_get(&svcpt->scp_at_estimate));
         }
 
-       rc = lu_context_init(&request->rq_session, LCT_SERVER_SESSION |
-                                                  LCT_NOREF);
-        if (rc) {
-                CERROR("Failure to initialize session: %d\n", rc);
-                goto out_req;
-        }
-        request->rq_session.lc_thread = thread;
-        request->rq_session.lc_cookie = 0x5;
-        lu_context_enter(&request->rq_session);
-
-        CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
-
-        request->rq_svc_thread = thread;
-        if (thread)
-                request->rq_svc_thread->t_env->le_ses = &request->rq_session;
-
-        if (likely(request->rq_export)) {
+       if (likely(request->rq_export)) {
                if (unlikely(ptlrpc_check_req(request)))
                        goto put_conn;
                 ptlrpc_update_export_timer(request->rq_export, timediff >> 19);
@@ -2013,14 +2072,21 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
         if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
                 CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
 
-       rc = svc->srv_ops.so_req_handler(request);
+       CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
 
-        ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
+       /* re-assign request and sesson thread to the current one */
+       request->rq_svc_thread = thread;
+       if (thread != NULL) {
+               LASSERT(request->rq_session.lc_thread != NULL);
+               request->rq_session.lc_thread = thread;
+               request->rq_session.lc_cookie = 0x55;
+               thread->t_env->le_ses = &request->rq_session;
+       }
+       svc->srv_ops.so_req_handler(request);
 
-put_conn:
-        lu_context_exit(&request->rq_session);
-        lu_context_fini(&request->rq_session);
+       ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
 
+put_conn:
        if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
                     DEBUG_REQ(D_WARNING, request, "Request took longer "
                               "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
@@ -2072,7 +2138,6 @@ put_conn:
                           request->rq_arrival_time.tv_sec));
         }
 
-out_req:
        ptlrpc_server_finish_active_request(svcpt, request);
 
        RETURN(1);