Whamcloud - gitweb
b=3869,1742
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 7c636f3..1183afe 100644 (file)
@@ -375,6 +375,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         int rc = 0, abort_recovery;
         unsigned long flags;
         int initial_conn = 0;
+        char peer_str[PTL_NALFMT_SIZE];
         ENTRY;
 
         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
@@ -461,7 +462,22 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         /* If we found an export, we already unlocked. */
         if (!export) {
                 spin_unlock(&target->obd_dev_lock);
-        } else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
+        } else if (req->rq_export == NULL && 
+                   atomic_read(&export->exp_rpc_count) > 0) {
+                CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                      export, atomic_read(&export->exp_refcount));
+                GOTO(out, rc = -EBUSY);
+        } else if (req->rq_export != NULL &&
+                   atomic_read(&export->exp_rpc_count) > 1) {
+                CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                      export, atomic_read(&export->exp_rpc_count));
+                GOTO(out, rc = -EBUSY);
+        }
+        else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
                 CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
                        cluuid.uuid);
                 GOTO(out, rc = -EALREADY);
@@ -469,7 +485,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         /* Tell the client if we're in recovery. */
         /* If this is the first client, start the recovery timer */
-        CWARN("%s: connection from %s %s\n", target->obd_name, cluuid.uuid,
+        CWARN("%s: connection from %s@%s %s\n", target->obd_name, cluuid.uuid,
+              ptlrpc_peernid2str(&req->rq_peer, peer_str),
               target->obd_recovering ? "(recovering)" : "");
         if (target->obd_recovering) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
@@ -482,8 +499,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 #endif
         if (export == NULL) {
                 if (target->obd_recovering) {
-                        CERROR("denying connection for new client %s: "
+                        CERROR("denying connection for new client %s@%s: "
                                "%d clients in recovery for %lds\n", cluuid.uuid,
+                               ptlrpc_peernid2str(&req->rq_peer, peer_str),
                                target->obd_recoverable_clients,
                                (target->obd_recovery_timer.expires-jiffies)/HZ);
                         rc = -EBUSY;
@@ -522,8 +540,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (initial_conn) {
                 req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1;
         } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
-                CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
-                       cluuid.uuid, export->exp_conn_cnt, 
+                CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n",
+                       cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                       export->exp_conn_cnt, 
                        req->rq_reqmsg->conn_cnt);
                 spin_unlock_irqrestore(&export->exp_lock, flags);
                 GOTO(out, rc = -EALREADY);
@@ -868,6 +887,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         if (obd->obd_processing_task == current->pid ||
             transno < obd->obd_next_recovery_transno) {
                 /* Processing the queue right now, don't re-add. */
+                lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 OBD_FREE(reqmsg, req->rq_reqlen);
@@ -971,7 +991,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
-        --obd->obd_recoverable_clients;
+        /* only count the first "replay over" request from each
+           export */
+        if (req->rq_export->exp_replay_needed) {
+                --obd->obd_recoverable_clients;
+                req->rq_export->exp_replay_needed = 0;
+        }
         recovery_done = (obd->obd_recoverable_clients == 0);
         spin_unlock_bh(&obd->obd_processing_task_lock);