Whamcloud - gitweb
LU-8500 ldlm: fix export reference problem
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index aeeee64..e31274b 100644 (file)
@@ -167,14 +167,17 @@ int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
                         ptlrpc_connection_put(imp->imp_connection);
                         imp->imp_connection = NULL;
 
-                        dlmexp = class_conn2export(&imp->imp_dlm_handle);
-                        if (dlmexp && dlmexp->exp_connection) {
-                                LASSERT(dlmexp->exp_connection ==
-                                        imp_conn->oic_conn);
-                                ptlrpc_connection_put(dlmexp->exp_connection);
-                                dlmexp->exp_connection = NULL;
-                        }
-                }
+                       dlmexp = class_conn2export(&imp->imp_dlm_handle);
+                       if (dlmexp && dlmexp->exp_connection) {
+                               LASSERT(dlmexp->exp_connection ==
+                                       imp_conn->oic_conn);
+                               ptlrpc_connection_put(dlmexp->exp_connection);
+                               dlmexp->exp_connection = NULL;
+                       }
+
+                       if (dlmexp != NULL)
+                               class_export_put(dlmexp);
+               }
 
                list_del(&imp_conn->oic_item);
                 ptlrpc_connection_put(imp_conn->oic_conn);
@@ -578,7 +581,7 @@ int client_connect_import(const struct lu_env *env,
 
        if (data) {
                LASSERTF((ocd->ocd_connect_flags & data->ocd_connect_flags) ==
-                        ocd->ocd_connect_flags, "old "LPX64", new "LPX64"\n",
+                        ocd->ocd_connect_flags, "old %#llx, new %#llx\n",
                         data->ocd_connect_flags, ocd->ocd_connect_flags);
                data->ocd_connect_flags = ocd->ocd_connect_flags;
                /* clear the flag as it was not set and is not known
@@ -613,7 +616,7 @@ int client_disconnect_export(struct obd_export *exp)
         ENTRY;
 
         if (!obd) {
-                CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
+               CERROR("invalid export for disconnect: exp %p cookie %#llx\n",
                        exp, exp ? exp->exp_handle.h_cookie : -1);
                 RETURN(-EINVAL);
         }
@@ -734,7 +737,7 @@ static int target_handle_reconnect(struct lustre_handle *conn,
        if (!exp->exp_connection || !lustre_handle_is_used(hdl)) {
                conn->cookie = exp->exp_handle.h_cookie;
                CDEBUG(D_HA, "connect export for UUID '%s' at %p,"
-                      " cookie "LPX64"\n", cluuid->uuid, exp, conn->cookie);
+                      " cookie %#llx\n", cluuid->uuid, exp, conn->cookie);
                RETURN(0);
        }
 
@@ -743,9 +746,9 @@ static int target_handle_reconnect(struct lustre_handle *conn,
        /* Might be a re-connect after a partition. */
        if (memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
                LCONSOLE_WARN("%s: already connected client %s (at %s) "
-                             "with handle "LPX64". Rejecting client "
+                             "with handle %#llx. Rejecting client "
                              "with the same UUID trying to reconnect "
-                             "with handle "LPX64"\n", target->obd_name,
+                             "with handle %#llx\n", target->obd_name,
                              obd_uuid2str(&exp->exp_client_uuid),
                              obd_export_nid2str(exp),
                              hdl->cookie, conn->cookie);
@@ -1208,7 +1211,7 @@ no_export:
                 GOTO(out, rc);
         }
 
-        CDEBUG(D_HA, "%s: connection from %s@%s %st"LPU64" exp %p cur %ld last %ld\n",
+       CDEBUG(D_HA, "%s: connection from %s@%s %st%llu exp %p cur %ld last %ld\n",
                target->obd_name, cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
               target->obd_recovering ? "recovering/" : "", data->ocd_transno,
               export, (long)cfs_time_current_sec(),
@@ -1878,7 +1881,7 @@ static int check_for_next_transno(struct lu_target *lut)
        next_transno = obd->obd_next_recovery_transno;
 
        CDEBUG(D_HA, "max: %d, connected: %d, completed: %d, queue_len: %d, "
-              "req_transno: "LPU64", next_transno: "LPU64"\n",
+              "req_transno: %llu, next_transno: %llu\n",
               obd->obd_max_recoverable_clients, connected, completed,
               queue_len, req_transno, next_transno);
 
@@ -1890,29 +1893,25 @@ static int check_for_next_transno(struct lu_target *lut)
                wake_up = 1;
        } else if (tdtd != NULL && req != NULL &&
                   is_req_replayed_by_update(req)) {
-               LASSERTF(req_transno < next_transno, "req_transno "LPU64
-                        "next_transno"LPU64"\n", req_transno, next_transno);
-               CDEBUG(D_HA, "waking for duplicate req ("LPU64")\n",
+               LASSERTF(req_transno < next_transno, "req_transno %llu"
+                        "next_transno%llu\n", req_transno, next_transno);
+               CDEBUG(D_HA, "waking for duplicate req (%llu)\n",
                       req_transno);
                wake_up = 1;
        } else if (req_transno == next_transno ||
                   (update_transno != 0 && update_transno <= next_transno)) {
-               CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
+               CDEBUG(D_HA, "waking for next (%lld)\n", next_transno);
                wake_up = 1;
        } else if (queue_len > 0 &&
                   queue_len == atomic_read(&obd->obd_req_replay_clients)) {
-               int d_lvl = D_HA;
                /** handle gaps occured due to lost reply or VBR */
                LASSERTF(req_transno >= next_transno,
-                        "req_transno: "LPU64", next_transno: "LPU64"\n",
+                        "req_transno: %llu, next_transno: %llu\n",
                         req_transno, next_transno);
-               if (req_transno > obd->obd_last_committed &&
-                   !obd->obd_version_recov)
-                       d_lvl = D_ERROR;
-               CDEBUG(d_lvl,
+               CDEBUG(D_HA,
                       "%s: waking for gap in transno, VBR is %s (skip: "
-                      LPD64", ql: %d, comp: %d, conn: %d, next: "LPD64
-                      ", next_update "LPD64" last_committed: "LPD64")\n",
+                      "%lld, ql: %d, comp: %d, conn: %d, next: %lld"
+                      ", next_update %lld last_committed: %lld)\n",
                       obd->obd_name, obd->obd_version_recov ? "ON" : "OFF",
                       next_transno, queue_len, completed, connected,
                       req_transno, update_transno, obd->obd_last_committed);
@@ -1923,7 +1922,7 @@ static int check_for_next_transno(struct lu_target *lut)
                wake_up = 1;
        } else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS)) {
                CDEBUG(D_HA, "accepting transno gaps is explicitly allowed"
-                      " by fail_lock, waking up ("LPD64")\n", next_transno);
+                      " by fail_lock, waking up (%lld)\n", next_transno);
                obd->obd_next_recovery_transno = req_transno;
                wake_up = 1;
        }
@@ -2009,7 +2008,7 @@ repeat:
                         * clients */
                        abort_req_replay_queue(obd);
                        abort_lock_replay_queue(obd);
-                       CDEBUG(D_HA, "%s: there are still update replay ("LPX64
+                       CDEBUG(D_HA, "%s: there are still update replay (%#llx"
                               ")in the queue.\n", obd->obd_name,
                               next_update_transno);
                } else {
@@ -2265,7 +2264,7 @@ static void drop_duplicate_replay_req(struct lu_env *env,
                                      struct obd_device *obd,
                                      struct ptlrpc_request *req)
 {
-       DEBUG_REQ(D_HA, req, "remove t"LPD64" from %s because of duplicate"
+       DEBUG_REQ(D_HA, req, "remove t%lld from %s because of duplicate"
                  " update records are found.\n",
                  lustre_msg_get_transno(req->rq_reqmsg),
                  libcfs_nid2str(req->rq_peer.nid));
@@ -2299,7 +2298,7 @@ static void replay_request_or_update(struct lu_env *env,
        __u64                   transno;
        ENTRY;
 
-       CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+       CDEBUG(D_HA, "Waiting for transno %lld\n",
               obd->obd_next_recovery_transno);
 
        /* Replay all of request and update by transno */
@@ -2354,7 +2353,7 @@ static void replay_request_or_update(struct lu_env *env,
                        }
 
                        LASSERT(trd->trd_processing_task == current_pid());
-                       DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
+                       DEBUG_REQ(D_HA, req, "processing t%lld from %s",
                                  lustre_msg_get_transno(req->rq_reqmsg),
                                  libcfs_nid2str(req->rq_peer.nid));
 
@@ -2384,7 +2383,7 @@ static void replay_request_or_update(struct lu_env *env,
                        extend_recovery_timer(obd, obd_timeout, true);
 
                        if (rc == 0 && dtrq->dtrq_xid != 0) {
-                               CDEBUG(D_HA, "Move x"LPU64" t"LPU64
+                               CDEBUG(D_HA, "Move x%llu t%llu"
                                       " to finish list\n", dtrq->dtrq_xid,
                                       dtrq->dtrq_master_transno);
 
@@ -2473,7 +2472,7 @@ static int target_recovery_thread(void *arg)
 
        /* next stage: replay requests or update */
        delta = jiffies;
-       CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n",
+       CDEBUG(D_INFO, "1: request replay stage - %d clients from t%llu\n",
               atomic_read(&obd->obd_req_replay_clients),
               obd->obd_next_recovery_transno);
        replay_request_or_update(env, lut, trd, thread);
@@ -2619,7 +2618,7 @@ void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
         }
 
        CDEBUG(D_HA, "RECOVERY: service %s, %d recoverable clients, "
-              "last_transno "LPU64"\n", obd->obd_name,
+              "last_transno %llu\n", obd->obd_name,
               obd->obd_max_recoverable_clients, obd->obd_last_committed);
         LASSERT(obd->obd_stopping == 0);
         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
@@ -2689,15 +2688,46 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                wake_up(&obd->obd_next_transno_waitq);
                spin_lock(&obd->obd_recovery_task_lock);
                if (obd->obd_recovering) {
+                       struct ptlrpc_request *tmp;
+                       struct ptlrpc_request *duplicate = NULL;
+
+                       if (likely(!req->rq_export->exp_replay_done)) {
+                               req->rq_export->exp_replay_done = 1;
+                               list_add_tail(&req->rq_list,
+                                             &obd->obd_final_req_queue);
+                               spin_unlock(&obd->obd_recovery_task_lock);
+                               RETURN(0);
+                       }
+
+                       /* XXX O(n), but only happens if final ping is
+                        * timed out, probably reorganize the list as
+                        * a hash list later */
+                       list_for_each_entry_safe(reqiter, tmp,
+                                                &obd->obd_final_req_queue,
+                                                rq_list) {
+                               if (reqiter->rq_export == req->rq_export) {
+                                       list_del_init(&reqiter->rq_list);
+                                       duplicate = reqiter;
+                                       break;
+                               }
+                       }
+
                        list_add_tail(&req->rq_list,
-                                         &obd->obd_final_req_queue);
+                                     &obd->obd_final_req_queue);
+                       req->rq_export->exp_replay_done = 1;
+                       spin_unlock(&obd->obd_recovery_task_lock);
+
+                       if (duplicate != NULL) {
+                               DEBUG_REQ(D_HA, duplicate,
+                                         "put prev final req\n");
+                               target_request_copy_put(duplicate);
+                       }
+                       RETURN(0);
                } else {
                        spin_unlock(&obd->obd_recovery_task_lock);
                        target_request_copy_put(req);
                        RETURN(obd->obd_stopping ? -ENOTCONN : 1);
                }
-               spin_unlock(&obd->obd_recovery_task_lock);
-               RETURN(0);
        }
        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
                /* client declares he's ready to replay locks */
@@ -2738,8 +2768,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
          * Also, a resent, replayed request that has already been
          * handled will pass through here and be processed immediately.
          */
-        CDEBUG(D_HA, "Next recovery transno: "LPU64
-               ", current: "LPU64", replaying\n",
+       CDEBUG(D_HA, "Next recovery transno: %llu"
+              ", current: %llu, replaying\n",
                obd->obd_next_recovery_transno, transno);
 
        /* If the request has been replayed by update replay, then sends this
@@ -2819,7 +2849,7 @@ void target_committed_to_req(struct ptlrpc_request *req)
                           "%d)", exp->exp_obd->obd_no_transno,
                           req->rq_repmsg == NULL);
 
-        CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
+       CDEBUG(D_INFO, "last_committed %llu, transno %llu, xid %llu\n",
                exp->exp_last_committed, req->rq_transno, req->rq_xid);
 }
 
@@ -2925,7 +2955,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         rs->rs_opc       = lustre_msg_get_opc(req->rq_reqmsg);
 
        spin_lock(&exp->exp_uncommitted_replies_lock);
-       CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
+       CDEBUG(D_NET, "rs transno = %llu, last committed = %llu\n",
               rs->rs_transno, exp->exp_last_committed);
        if (rs->rs_transno > exp->exp_last_committed) {
                /* not committed already */