Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index cfd1c8c..a46dd73 100644 (file)
@@ -126,6 +126,7 @@ static int expired_lock_main(void *arg)
         wake_up(&expired_lock_thread.elt_waitq);
 
         while (1) {
+                struct list_head *tmp, *n, work_list;
                 l_wait_event(expired_lock_thread.elt_waitq,
                              have_expired_locks() ||
                              expired_lock_thread.elt_state == ELT_TERMINATE,
@@ -133,30 +134,33 @@ static int expired_lock_main(void *arg)
 
                 spin_lock_bh(&expired_lock_thread.elt_lock);
                 while (!list_empty(expired)) {
-                        struct obd_export *export;
                         struct ldlm_lock *lock;
 
-                        lock = list_entry(expired->next, struct ldlm_lock,
-                                          l_pending_chain);
-                        if ((void *)lock < LP_POISON + PAGE_SIZE &&
-                            (void *)lock >= LP_POISON) {
-                                CERROR("free lock on elt list %p\n", lock);
-                                LBUG();
-                        }
-                        list_del_init(&lock->l_pending_chain);
-                        if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
-                            (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
-                                CERROR("lock with free export on elt list %p\n",
-                                       export);
-                                lock->l_export = NULL;
-                                LDLM_ERROR(lock, "free export\n");
-                                continue;
+                        list_add(&work_list, expired);
+                        list_del_init(expired);
+
+                        list_for_each_entry(lock, &work_list, l_pending_chain) {
+                                LDLM_DEBUG(lock, "moving to work list");
                         }
-                        export = class_export_get(lock->l_export);
+
                         spin_unlock_bh(&expired_lock_thread.elt_lock);
 
-                        ptlrpc_fail_export(export);
-                        class_export_put(export);
+
+                        list_for_each_safe(tmp, n, &work_list) {
+                                 lock = list_entry(tmp, struct ldlm_lock,
+                                                   l_pending_chain);
+                                 ptlrpc_fail_export(lock->l_export);
+                        }
+
+
+                        if (!list_empty(&work_list)) {
+                                list_for_each_entry(lock, &work_list, l_pending_chain) {
+                                        LDLM_ERROR(lock, "still on work list!");
+                                }
+                        }
+                        LASSERTF (list_empty(&work_list),
+                                  "some exports not failed properly\n");
+
                         spin_lock_bh(&expired_lock_thread.elt_lock);
                 }
                 spin_unlock_bh(&expired_lock_thread.elt_lock);
@@ -181,14 +185,17 @@ static void waiting_locks_callback(unsigned long unused)
                                   l_pending_chain);
 
                 if ((lock->l_callback_timeout > jiffies) ||
-                    (lock->l_req_mode == LCK_GROUP))
+                    (lock->l_req_mode == LCK_CW))
                         break;
 
                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
-                           "%s@%s nid %s ",
+                           "%s@%s nid "LPX64" (%s) ",
                            lock->l_export->exp_client_uuid.uuid,
                            lock->l_export->exp_connection->c_remote_uuid.uuid,
-                           ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str));
+                           lock->l_export->exp_connection->c_peer.peer_nid,
+                           portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
+                                           lock->l_export->exp_connection->c_peer.peer_nid,
+                                           str));
 
                 spin_lock_bh(&expired_lock_thread.elt_lock);
                 list_del(&lock->l_pending_chain);
@@ -304,57 +311,24 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 
 #endif /* __KERNEL__ */
 
-static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
+static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
 {
-        struct ptlrpc_connection *conn = lock->l_export->exp_connection;
+        const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
         char str[PTL_NALFMT_SIZE];
 
-        LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
-                   " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
-                   conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
-                   ptlrpc_peernid2str(&conn->c_peer, str));
+        CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
+               ", mode %s: evicting client %s@%s NID "LPX64" (%s)\n",
+               ast_type, rc,
+               lock->l_resource->lr_name.name[0],
+               lock->l_resource->lr_name.name[1],
+               ldlm_lockname[lock->l_granted_mode],
+               lock->l_export->exp_client_uuid.uuid,
+               conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
+               portals_nid2str(conn->c_peer.peer_ni->pni_number,
+                               conn->c_peer.peer_nid, str));
         ptlrpc_fail_export(lock->l_export);
 }
 
-static int ldlm_handle_ast_error(struct ldlm_lock *lock,
-                                 struct ptlrpc_request *req, int rc,
-                                 const char *ast_type)
-{
-        struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
-        char str[PTL_NALFMT_SIZE];
-
-        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
-                LASSERT(lock->l_export);
-                if (lock->l_export->exp_libclient) {
-                        LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
-                                   " timeout, just cancelling lock", ast_type,
-                                   ptlrpc_peernid2str(peer, str));
-                        ldlm_lock_cancel(lock);
-                        rc = -ERESTART;
-                } else {
-                        ldlm_del_waiting_lock(lock);
-                        ldlm_failed_ast(lock, rc, ast_type);
-                }
-        } else if (rc) {
-                if (rc == -EINVAL)
-                        LDLM_DEBUG(lock, "client (nid %s) returned %d"
-                                   " from %s AST - normal race",
-                                   ptlrpc_peernid2str(peer, str),
-                                   req->rq_repmsg->status, ast_type);
-                else
-                        LDLM_ERROR(lock, "client (nid %s) returned %d "
-                                   "from %s AST", ptlrpc_peernid2str(peer, str),
-                                   (req->rq_repmsg != NULL) ?
-                                   req->rq_repmsg->status : 0, ast_type);
-                ldlm_lock_cancel(lock);
-                /* Server-side AST functions are called from ldlm_reprocess_all,
-                 * which needs to be told to please restart its reprocessing. */
-                rc = -ERESTART;
-        }
-
-        return rc;
-}
-
 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *desc,
                              void *data, int flag)
@@ -416,8 +390,44 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
-        if (rc != 0)
-                rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                LASSERT(lock->l_export);
+                if (lock->l_export->exp_libclient) {
+                        CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
+                               LPU64") timeout, simply cancel lock 0x%p\n",
+                               req->rq_peer.peer_nid, lock);
+                        ldlm_lock_cancel(lock);
+                        rc = -ERESTART;
+                } else {
+                        ldlm_del_waiting_lock(lock);
+                        ldlm_failed_ast(lock, rc, "blocking");
+                }
+        } else if (rc) {
+                if (rc == -EINVAL)
+                        CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p--normal race\n",
+                               req->rq_peer.peer_nid,
+                               req->rq_repmsg->status, lock);
+                else if (rc == -ENOTCONN)
+                        CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p--this client was "
+                               "probably rebooted while it held a lock, nothing"
+                               " serious\n",req->rq_peer.peer_nid,
+                               req->rq_repmsg->status, lock);
+                else
+                        CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p\n",
+                               req->rq_peer.peer_nid,
+                               (req->rq_repmsg != NULL)?
+                               req->rq_repmsg->status : 0,
+                               lock);
+                LDLM_DEBUG(lock, "client sent rc %d rq_status %d from blocking "
+                           "AST", rc, req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
 
         ptlrpc_req_finished(req);
 
@@ -487,9 +497,21 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         rc = ptlrpc_queue_wait(req);
-        if (rc != 0)
-                rc = ldlm_handle_ast_error(lock, req, rc, "completion");
-
+        if ((rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) &&
+             !lock->l_export->exp_libclient) {
+                ldlm_del_waiting_lock(lock);
+                ldlm_failed_ast(lock, rc, "completion");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
+        } else if (rc) {
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "completion AST", rc, req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -522,13 +544,23 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
 
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ELDLM_NO_LOCK_DATA)
-                LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
-        else if (rc != 0)
-                rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
-        else
+        if ((rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) &&
+            !lock->l_export->exp_libclient) {
+                ldlm_del_waiting_lock(lock);
+                ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
+        } else if (rc == -ELDLM_NO_LOCK_DATA) {
+                LDLM_DEBUG(lock, "lost a race -- client has a lock, but no "
+                           "inode");
+        } else if (rc) {
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "glimpse AST", rc, req->rq_status);
+        } else {
                 rc = res->lr_namespace->ns_lvbo->lvbo_update
                         (res, req->rq_repmsg, 0, 1);
+        }
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
@@ -592,9 +624,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                         buffers = 2;
                 }
 
-                if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
-                        GOTO(out, rc = -ENOMEM);
-
                 rc = lustre_pack_reply(req, buffers, size, NULL);
                 if (rc)
                         GOTO(out, rc);
@@ -634,7 +663,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 err = lustre_pack_reply(req, 0, NULL, NULL);
                 if (rc == 0)
                         rc = err;
-                req->rq_status = rc;
         }
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
@@ -643,10 +671,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                            "(err=%d, rc=%d)", err, rc);
 
-                if (lock->l_resource->lr_lvb_len > 0 && rc == 0) {
+                if (lock->l_resource->lr_lvb_len > 0) {
                         void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
                                                   lock->l_resource->lr_lvb_len);
-                        LASSERT(lvb != NULL);
                         memcpy(lvb, lock->l_resource->lr_lvb_data,
                                lock->l_resource->lr_lvb_len);
                 }
@@ -731,10 +758,12 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("received cancel for unknown lock cookie "LPX64
-                       " from client %s nid %s\n",
+                       " from client %s nid "LPX64" (%s)\n",
                        dlm_req->lock_handle1.cookie,
                        req->rq_export->exp_client_uuid.uuid,
-                       ptlrpc_peernid2str(&req->rq_peer, str));
+                       req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str));
                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
                                   "(cookie "LPU64")",
                                   dlm_req->lock_handle1.cookie);
@@ -767,8 +796,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+static void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                                    struct ldlm_lock_desc *ld,
+                                    struct ldlm_lock *lock)
 {
         int do_ast;
         ENTRY;
@@ -914,10 +944,10 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
         return ptlrpc_reply(req);
 }
 
+#ifdef __KERNEL__
 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
                       struct ldlm_lock *lock)
 {
-#ifdef __KERNEL__
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
         struct ldlm_bl_work_item *blwi;
         ENTRY;
@@ -935,12 +965,18 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
         list_add_tail(&blwi->blwi_entry, &blp->blp_list);
         wake_up(&blp->blp_waitq);
         spin_unlock(&blp->blp_lock);
-#else
-        LBUG();
-#endif
 
         RETURN(0);
 }
+#else
+/* XXX */
+void liblustre_ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                                       struct ldlm_lock_desc *ld,
+                                       struct ldlm_lock *lock)
+{
+        ldlm_handle_bl_callback(ns, ld, lock);
+}
+#endif
 
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
@@ -959,11 +995,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
-                CDEBUG(D_RPCTRACE, "operation %d from nid %s with bad "
+                CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
                        "normal if this node rebooted with a lock held\n",
-                       req->rq_reqmsg->opc,
-                       ptlrpc_peernid2str(&req->rq_peer, str),
+                       req->rq_reqmsg->opc, req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str),
                        req->rq_reqmsg->handle.cookie,
                        req->rq_request_portal, req->rq_reply_portal);
 
@@ -1005,11 +1042,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 rc = llog_origin_handle_next_block(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-                rc = llog_origin_handle_prev_block(req);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         case LLOG_ORIGIN_HANDLE_READ_HEADER:
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
                 rc = llog_origin_handle_read_header(req);
@@ -1294,6 +1326,7 @@ static int ldlm_setup(void)
                 rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
                 if (rc < 0) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
+                        LBUG();
                         GOTO(out_thread, rc);
                 }
                 wait_for_completion(&blp->blp_comp);
@@ -1301,13 +1334,17 @@ static int ldlm_setup(void)
 
         rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
                                     LDLM_NUM_THREADS, "ldlm_cn");
-        if (rc)
+        if (rc) {
+                LBUG();
                 GOTO(out_thread, rc);
+        }
 
         rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
                                     LDLM_NUM_THREADS, "ldlm_cb");
-        if (rc)
+        if (rc) {
+                LBUG();
                 GOTO(out_thread, rc);
+        }
 
         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
         spin_lock_init(&expired_lock_thread.elt_lock);