Whamcloud - gitweb
b=3869,1742
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 4f113f4..1183afe 100644 (file)
@@ -46,7 +46,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         int rq_portal, rp_portal, connect_op;
         char *name = obddev->obd_type->typ_name;
         char *mgmt_name = NULL;
-        int rc = 0;
+        int rc;
         struct obd_device *mgmt_obd;
         mgmtcli_register_for_events_t register_f;
         ENTRY;
@@ -112,7 +112,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
         cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
 
-        ldlm_get_ref();
+        rc = ldlm_get_ref();
         if (rc) {
                 CERROR("ldlm_get_ref failed: %d\n", rc);
                 GOTO(err, rc);
@@ -333,19 +333,16 @@ int client_disconnect_export(struct obd_export *exp, int failover)
  * -------------------------------------------------------------------------- */
 
 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
-                            struct obd_uuid *cluuid)
+                            struct obd_uuid *cluuid, int initial_conn)
 {
-        if (exp->exp_connection) {
+        if (exp->exp_connection && !initial_conn) {
                 struct lustre_handle *hdl;
                 hdl = &exp->exp_imp_reverse->imp_remote_handle;
                 /* Might be a re-connect after a partition. */
-#warning "FIXME ASAP"
-                memcpy(&hdl->cookie, &conn->cookie, sizeof(conn->cookie));
-                if (1 || !memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
+                if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
                         CERROR("%s reconnecting\n", cluuid->uuid);
                         conn->cookie = exp->exp_handle.h_cookie;
-                        /*RETURN(EALREADY);*/
-                        RETURN(0);
+                        RETURN(EALREADY);
                 } else {
                         CERROR("%s reconnecting from %s, "
                                "handle mismatch (ours "LPX64", theirs "
@@ -377,6 +374,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         char *str, *tmp;
         int rc = 0, abort_recovery;
         unsigned long flags;
+        int initial_conn = 0;
+        char peer_str[PTL_NALFMT_SIZE];
         ENTRY;
 
         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
@@ -410,8 +409,19 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         obd_str2uuid (&cluuid, str);
 
         /* XXX extract a nettype and format accordingly */
-        snprintf(remote_uuid.uuid, sizeof remote_uuid,
-                 "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
+        switch (sizeof(ptl_nid_t)) {
+                /* NB the casts only avoid compiler warnings */
+        case 8:
+                snprintf(remote_uuid.uuid, sizeof remote_uuid,
+                         "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid);
+                break;
+        case 4:
+                snprintf(remote_uuid.uuid, sizeof remote_uuid,
+                         "NET_%x_UUID", (__u32)req->rq_peer.peer_nid);
+                break;
+        default:
+                LBUG();
+        }
 
         spin_lock_bh(&target->obd_processing_task_lock);
         abort_recovery = target->obd_abort_recovery;
@@ -428,7 +438,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc)
                 GOTO(out, rc);
-
+        
+        if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL)
+                initial_conn = 1;
+        
         /* lctl gets a backstage, all-access pass. */
         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
                 goto dont_check_exports;
@@ -440,7 +453,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                         spin_unlock(&target->obd_dev_lock);
                         LASSERT(export->exp_obd == target);
 
-                        rc = target_handle_reconnect(&conn, export, &cluuid);
+                        rc = target_handle_reconnect(&conn, export, &cluuid,
+                                                     initial_conn);
                         break;
                 }
                 export = NULL;
@@ -448,28 +462,46 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         /* If we found an export, we already unlocked. */
         if (!export) {
                 spin_unlock(&target->obd_dev_lock);
-        } else if (req->rq_reqmsg->conn_cnt == 1) {
+        } else if (req->rq_export == NULL && 
+                   atomic_read(&export->exp_rpc_count) > 0) {
+                CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                      export, atomic_read(&export->exp_refcount));
+                GOTO(out, rc = -EBUSY);
+        } else if (req->rq_export != NULL &&
+                   atomic_read(&export->exp_rpc_count) > 1) {
+                CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                      export, atomic_read(&export->exp_rpc_count));
+                GOTO(out, rc = -EBUSY);
+        }
+        else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
                 CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
                        cluuid.uuid);
-#warning "FIXME ASAP"
-                /*GOTO(out, rc = -EALREADY);*/
+                GOTO(out, rc = -EALREADY);
         }
 
         /* Tell the client if we're in recovery. */
         /* If this is the first client, start the recovery timer */
+        CWARN("%s: connection from %s@%s %s\n", target->obd_name, cluuid.uuid,
+              ptlrpc_peernid2str(&req->rq_peer, peer_str),
+              target->obd_recovering ? "(recovering)" : "");
         if (target->obd_recovering) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
                 target_start_recovery_timer(target, handler);
         }
-
+#if 0
         /* Tell the client if we support replayable requests */
         if (target->obd_replayable)
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
-
+#endif
         if (export == NULL) {
                 if (target->obd_recovering) {
-                        CERROR("denying connection for new client %s: "
+                        CERROR("denying connection for new client %s@%s: "
                                "%d clients in recovery for %lds\n", cluuid.uuid,
+                               ptlrpc_peernid2str(&req->rq_peer, peer_str),
                                target->obd_recoverable_clients,
                                (target->obd_recovery_timer.expires-jiffies)/HZ);
                         rc = -EBUSY;
@@ -478,7 +510,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                         rc = obd_connect(&conn, target, &cluuid);
                 }
         }
-
+        /* Tell the client if we support replayable requests */
+        if (target->obd_replayable)
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
 
         /* If all else goes well, this is our RPC return code. */
         req->rq_status = 0;
@@ -486,11 +520,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (rc && rc != EALREADY)
                 GOTO(out, rc);
 
-        /* XXX track this all the time? */
-        if (target->obd_recovering) {
-                target->obd_connected_clients++;
-        }
-
         req->rq_repmsg->handle = conn;
 
         /* If the client and the server are the same node, we will already
@@ -508,14 +537,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         LASSERT(export != NULL);
 
         spin_lock_irqsave(&export->exp_lock, flags);
-#warning "FIXME ASAP"
-        if (0 && export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
-                CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
-                       cluuid.uuid, export->exp_conn_cnt, 
+        if (initial_conn) {
+                req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1;
+        } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
+                CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n",
+                       cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str),
+                       export->exp_conn_cnt, 
                        req->rq_reqmsg->conn_cnt);
                 spin_unlock_irqrestore(&export->exp_lock, flags);
                 GOTO(out, rc = -EALREADY);
-        }
+        } 
         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
         spin_unlock_irqrestore(&export->exp_lock, flags);
 
@@ -534,6 +565,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 GOTO(out, rc = 0);
         }
 
+        if (target->obd_recovering) {
+                target->obd_connected_clients++;
+        }
+
         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
                sizeof conn);
 
@@ -708,7 +743,6 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
         obd->obd_recovery_handler = handler;
         obd->obd_recovery_timer.function = target_recovery_expired;
         obd->obd_recovery_timer.data = (unsigned long)obd;
-        init_timer(&obd->obd_recovery_timer);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
         reset_recovery_timer(obd);
@@ -730,6 +764,9 @@ static int check_for_next_transno(struct obd_device *obd)
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
+        CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
+               "req_transno: "LPU64", next_transno: "LPU64"\n",
+               max, connected, completed, queue_len, req_transno, next_transno);
         if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
@@ -843,10 +880,14 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
          * Also, if this request has a transno less than the one we're waiting
          * for, we should process it now.  It could (and currently always will)
          * be an open request for a descriptor that was opened some time ago.
+         *
+         * Also, a resent, replayed request that has already been
+         * handled will pass through here and be processed immediately.
          */
         if (obd->obd_processing_task == current->pid ||
             transno < obd->obd_next_recovery_transno) {
                 /* Processing the queue right now, don't re-add. */
+                lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 OBD_FREE(reqmsg, req->rq_reqlen);
@@ -854,6 +895,17 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 return 1;
         }
 
+        /* A resent, replayed request that is still on the queue; just drop it.
+           The queued request will handle this. */
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) ==
+            (MSG_RESENT | MSG_REPLAY)) {
+                DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                OBD_FREE(reqmsg, req->rq_reqlen);
+                OBD_FREE(saved_req, sizeof *saved_req);
+                return 0;
+        }
+
         memcpy(saved_req, req, sizeof *req);
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
         req = saved_req;
@@ -939,7 +991,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
-        --obd->obd_recoverable_clients;
+        /* only count the first "replay over" request from each
+           export */
+        if (req->rq_export->exp_replay_needed) {
+                --obd->obd_recoverable_clients;
+                req->rq_export->exp_replay_needed = 0;
+        }
         recovery_done = (obd->obd_recoverable_clients == 0);
         spin_unlock_bh(&obd->obd_processing_task_lock);