Whamcloud - gitweb
if client_disconnect_export was called without force flag set,
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index ebf03fc..e85a7f8 100644 (file)
@@ -352,7 +352,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
         request->rq_import = class_import_get(imp);
 
         if (unlikely(ctx))
-                request->rq_cli_ctx = sptlrpc_ctx_get(ctx);
+                request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
         else {
                 rc = sptlrpc_req_get_ctx(request);
                 if (rc)
@@ -393,6 +393,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
         spin_lock_init(&request->rq_lock);
         CFS_INIT_LIST_HEAD(&request->rq_list);
         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
+        CFS_INIT_LIST_HEAD(&request->rq_mod_list);
         CFS_INIT_LIST_HEAD(&request->rq_ctx_chain);
         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
         cfs_waitq_init(&request->rq_reply_waitq);
@@ -404,7 +405,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
 
         RETURN(request);
 out_ctx:
-        sptlrpc_req_put_ctx(request);
+        sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
 out_free:
         class_import_put(imp);
         if (request->rq_pool)
@@ -550,10 +551,12 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                 /* allow CONNECT even if import is invalid */ ;
         } else if (imp->imp_invalid) {
                 /* If the import has been invalidated (such as by an OST
-                 * failure), the request must fail with -EIO. */
+                 * failure) the request must fail with -ESHUTDOWN.  This
+                 * indicates the requests should be discarded; an -EIO
+                 * may result in a resend of the request. */
                 if (!imp->imp_deactive)
                         DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
-                *status = -EIO;
+                *status = -ESHUTDOWN; /* bz 12940 */
         } else if (req->rq_import_generation != imp->imp_generation) {
                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
                 *status = -EIO;
@@ -608,13 +611,12 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
 
         err = lustre_msg_get_status(req->rq_repmsg);
         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
-                LCONSOLE_ERROR_MSG(0x011, "an error ocurred while communicating"
-                                   " with %s The %s operation failed with %d",
-                                   req->rq_export ? 
-                                        obd_export_nid2str(req->rq_export)
-                                        : "(no nid)",
-                                   ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
-                                   err);
+                struct obd_import *imp = req->rq_import;
+                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+                LCONSOLE_ERROR_MSG(0x011,"an error occurred while communicating"
+                                " with %s. The %s operation failed with %d\n",
+                                libcfs_nid2str(imp->imp_connection->c_peer.nid),
+                                ll_opcode2str(opc), err);
                 RETURN(err < 0 ? err : -EINVAL);
         }
 
@@ -640,10 +642,9 @@ static int after_reply(struct ptlrpc_request *req)
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
 
-#if SWAB_PARANOIA
         /* Clear reply swab mask; this is a new reply in sender's byte order */
         req->rq_rep_swab_mask = 0;
-#endif
+
         rc = sptlrpc_cli_unwrap_reply(req);
         if (rc) {
                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
@@ -676,18 +677,27 @@ static int after_reply(struct ptlrpc_request *req)
         rc = ptlrpc_check_status(req);
         imp->imp_connect_error = rc;
 
-        /* Either we've been evicted, or the server has failed for
-         * some reason. Try to reconnect, and if that fails, punt to the
-         * upcall. */
-        if ((rc == -ENOTCONN) || (rc == -ENODEV)) {
-                if (req->rq_send_state != LUSTRE_IMP_FULL ||
-                    imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
-                        RETURN(-ENOTCONN);
+        if (rc) {
+                /* Either we've been evicted, or the server has failed for
+                 * some reason. Try to reconnect, and if that fails, punt to
+                 * the upcall. */
+                if (ll_rpc_recoverable_error(rc)) {
+                        if (req->rq_send_state != LUSTRE_IMP_FULL ||
+                            imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
+                                RETURN(rc);
+                        }
+                        ptlrpc_request_handle_notconn(req);
+                        RETURN(rc);
+                }
+        } else {
+                /* Let's look if server send slv. Do it only for RPC with 
+                 * rc == 0. */
+                if (imp->imp_obd->obd_namespace) {
+                        /* Disconnect rpc is sent when namespace is already 
+                         * destroyed. Let's check this and will not try update
+                         * pool. */
+                        ldlm_cli_update_pool(req);
                 }
-
-                ptlrpc_request_handle_notconn(req);
-
-                RETURN(rc);
         }
 
         /* Store transno in reqmsg for replay. */
@@ -728,6 +738,9 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
+        if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
+                RETURN (0);
+        
         req->rq_phase = RQ_PHASE_RPC;
 
         imp = req->rq_import;
@@ -815,6 +828,9 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                     ptlrpc_send_new_req(req)) {
                         force_timer_recalc = 1;
                 }
+                /* delayed send - skip */
+                if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
+                        continue;
 
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
@@ -1010,14 +1026,13 @@ check_ctx:
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
 
-                req->rq_phase = RQ_PHASE_COMPLETE;
-
                 if (req->rq_interpret_reply != NULL) {
                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
                                 req->rq_interpret_reply;
                         req->rq_status = interpreter(req, &req->rq_async_args,
                                                      req->rq_status);
                 }
+                req->rq_phase = RQ_PHASE_COMPLETE;
 
                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
                        "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
@@ -1171,13 +1186,18 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 
                 /* request in-flight? */
                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
-                      (req->rq_phase == RQ_PHASE_BULK)))
+                      (req->rq_phase == RQ_PHASE_BULK) ||
+                      (req->rq_phase == RQ_PHASE_NEW)))
                         continue;
 
                 if (req->rq_timedout)   /* already timed out */
                         continue;
 
-                deadline = req->rq_sent + req->rq_timeout;
+                if (req->rq_phase == RQ_PHASE_NEW)
+                        deadline = req->rq_sent;
+                else
+                        deadline = req->rq_sent + req->rq_timeout;
+
                 if (deadline <= now)    /* actually expired already */
                         timeout = 1;    /* ASAP */
                 else if (timeout == 0 || timeout > deadline - now)
@@ -1208,7 +1228,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
 
                 /* wait until all complete, interrupted, or an in-flight
                  * req times out */
-                CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
+                CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
                        set, timeout);
                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
                                        ptlrpc_expired_set,
@@ -1265,6 +1285,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_import != NULL) {
                 if (!locked)
                         spin_lock(&request->rq_import->imp_lock);
+                list_del_init(&request->rq_mod_list);
                 list_del_init(&request->rq_replay_list);
                 if (!locked)
                         spin_unlock(&request->rq_import->imp_lock);
@@ -1293,7 +1314,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
                 sptlrpc_cli_free_reqbuf(request);
 
-        sptlrpc_req_put_ctx(request);
+        sptlrpc_req_put_ctx(request, !locked);
 
         if (request->rq_pool)
                 __ptlrpc_free_req_to_pool(request);
@@ -1403,12 +1424,12 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
             imp->imp_generation == imp->imp_last_generation_checked) {
-                CDEBUG(D_HA, "%s: skip recheck for last_committed "LPU64"\n",
+                CDEBUG(D_RPCTRACE, "%s: skip recheck: last_committed "LPU64"\n",
                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
                 return;
         }
-        
-        CDEBUG(D_HA, "%s: committing for last_committed "LPU64" gen %d\n",
+
+        CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
                imp->imp_generation);
         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
@@ -1422,22 +1443,22 @@ void ptlrpc_free_committed(struct obd_import *imp)
                 last_req = req;
 
                 if (req->rq_import_generation < imp->imp_generation) {
-                        DEBUG_REQ(D_HA, req, "freeing request with old gen");
+                        DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
                         GOTO(free_req, 0);
                 }
 
                 if (req->rq_replay) {
-                        DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
+                        DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
                         continue;
                 }
 
                 /* not yet committed */
                 if (req->rq_transno > imp->imp_peer_committed_transno) {
-                        DEBUG_REQ(D_HA, req, "stopping search");
+                        DEBUG_REQ(D_RPCTRACE, req, "stopping search");
                         break;
                 }
 
-                DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
+                DEBUG_REQ(D_RPCTRACE, req, "commit (last_committed "LPU64")",
                           imp->imp_peer_committed_transno);
 free_req:
                 spin_lock(&req->rq_lock);
@@ -1810,7 +1831,8 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         }
 
         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
-            lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN)
+            (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
+             lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
 
         /* The transno had better not change over replay. */
@@ -1906,7 +1928,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                DEBUG_REQ(D_HA, req, "inflight");
+                DEBUG_REQ(D_RPCTRACE, req, "inflight");
 
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {
@@ -1920,7 +1942,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                DEBUG_REQ(D_HA, req, "aborting waiting req");
+                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
 
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {