Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 1a298b9..cdf0d9e 100644 (file)
@@ -44,12 +44,9 @@ extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 extern struct lustre_lock ldlm_handle_lock;
 extern struct list_head ldlm_namespace_list;
-extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
-extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
 static DECLARE_MUTEX(ldlm_ref_sem);
-static int ldlm_refcount = 0;
-
+static int ldlm_refcount;
 /* LDLM state */
 
 static struct ldlm_state *ldlm_state;
@@ -73,6 +70,10 @@ static struct expired_lock_thread {
 } expired_lock_thread;
 #endif
 
+#if !defined(ENOTSUPP)
+#  define ENOTSUPP 524
+#endif
+
 #define ELT_STOPPED   0
 #define ELT_READY     1
 #define ELT_TERMINATE 2
@@ -376,7 +377,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 /* this blocking AST will be communicated as part of the
                  * completion AST instead */
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");                 RETURN(0);
+                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");                 
+                RETURN(0);
         }
 
         if (lock->l_destroyed) {
@@ -415,7 +417,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
-        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+        req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc != 0)
                 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
@@ -449,11 +451,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
+        down(&lock->l_resource->lr_lvb_sem);
         if (lock->l_resource->lr_lvb_len) {
                 buffers = 2;
                 size[1] = lock->l_resource->lr_lvb_len;
         }
-
+        up(&lock->l_resource->lr_lvb_sem);
+        
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
                               buffers, size, NULL);
@@ -467,10 +471,15 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         ldlm_lock2desc(lock, &body->lock_desc);
 
         if (buffers == 2) {
-                void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
-                                           lock->l_resource->lr_lvb_len);
+                void *lvb;
+                
+                down(&lock->l_resource->lr_lvb_sem);
+                lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+                                     lock->l_resource->lr_lvb_len);
+
                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                        lock->l_resource->lr_lvb_len);
+                up(&lock->l_resource->lr_lvb_sem);
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
@@ -478,7 +487,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         req->rq_replen = lustre_msg_size(0, NULL);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
-        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+        req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
 
         /* We only send real blocking ASTs after the lock is granted */
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
@@ -579,8 +588,15 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
 
         LASSERT(req->rq_export);
-        lock->l_export = class_export_get(req->rq_export);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (req->rq_export->exp_failed) {
+                LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                GOTO(out, err = -ENOTCONN);
+        }
+        lock->l_export = class_export_get(req->rq_export);
+
         list_add(&lock->l_export_chain,
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
@@ -591,11 +607,12 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 cookie = req;
         } else {
                 int buffers = 1;
+                down(&lock->l_resource->lr_lvb_sem);
                 if (lock->l_resource->lr_lvb_len) {
                         size[1] = lock->l_resource->lr_lvb_len;
                         buffers = 2;
                 }
-
+                up(&lock->l_resource->lr_lvb_sem);
                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
 
@@ -647,12 +664,22 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                            "(err=%d, rc=%d)", err, rc);
 
-                if (lock->l_resource->lr_lvb_len > 0 && rc == 0) {
-                        void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
-                                                  lock->l_resource->lr_lvb_len);
-                        LASSERT(lvb != NULL);
-                        memcpy(lvb, lock->l_resource->lr_lvb_data,
-                               lock->l_resource->lr_lvb_len);
+                if (rc == 0) {
+                        down(&lock->l_resource->lr_lvb_sem);
+                        size[1] = lock->l_resource->lr_lvb_len;
+                        if (size[1] > 0) {
+                                void *lvb = lustre_msg_buf(req->rq_repmsg,
+                                                           1, size[1]);
+                                LASSERTF(lvb != NULL, "req %p, lock %p\n",
+                                         req, lock);
+
+                                memcpy(lvb, lock->l_resource->lr_lvb_data,
+                                       size[1]);
+                        }
+                        up(&lock->l_resource->lr_lvb_sem);
+                } else {
+                        ldlm_lock_destroy(lock);
+
                 }
 
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
@@ -893,21 +920,16 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                 ptlrpc_error(req);
         }
 
+        l_unlock(&ns->ns_lock);
         if (lock->l_granted_mode == LCK_PW &&
             !lock->l_readers && !lock->l_writers &&
             time_after(jiffies, lock->l_last_used + 10 * HZ)) {
-#ifdef __KERNEL__
-                ldlm_bl_to_thread(ns, NULL, lock);
-                l_unlock(&ns->ns_lock);
-#else
-                l_unlock(&ns->ns_lock);
-                ldlm_handle_bl_callback(ns, NULL, lock);
-#endif
+                if (ldlm_bl_to_thread(ns, NULL, lock))
+                        ldlm_handle_bl_callback(ns, NULL, lock);
+
                 EXIT;
                 return;
         }
-
-        l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -944,11 +966,12 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
         list_add_tail(&blwi->blwi_entry, &blp->blp_list);
         wake_up(&blp->blp_waitq);
         spin_unlock(&blp->blp_lock);
+
+        RETURN(0);
 #else
-        LBUG();
+        RETURN(-ENOSYS);
 #endif
 
-        RETURN(0);
 }
 
 static int ldlm_msg_check_version(struct lustre_msg *msg)
@@ -1012,13 +1035,11 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 struct ldlm_request *dlm_req;
 
                 CDEBUG(D_RPCTRACE, "operation %d from nid %s with bad "
-                       "export cookie "LPX64" (ptl req %d/rep %d); this is "
+                       "export cookie "LPX64"; this is "
                        "normal if this node rebooted with a lock held\n",
                        req->rq_reqmsg->opc,
                        ptlrpc_peernid2str(&req->rq_peer, str),
-                       req->rq_reqmsg->handle.cookie,
-                       req->rq_request_portal, req->rq_reply_portal);
-
+                       req->rq_reqmsg->handle.cookie);
                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
                                              lustre_swab_ldlm_request);
                 if (dlm_req != NULL)
@@ -1112,14 +1133,10 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
-#ifdef __KERNEL__
-                rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
-                ldlm_callback_reply(req, rc);
-#else
-                rc = 0;
-                ldlm_callback_reply(req, rc);
-                ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
-#endif
+                ldlm_callback_reply(req, 0);
+                if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
+                        ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
+
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
@@ -1155,9 +1172,11 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
 
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
-                CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
-                       req->rq_reqmsg->opc, req->rq_request_portal,
-                       req->rq_reply_portal);
+                char str[PTL_NALFMT_SIZE];
+                CERROR("operation %d with bad export from NID %s\n",
+                       req->rq_reqmsg->opc,
+                       ptlrpc_peernid2str(&req->rq_peer, str));
+
                 CERROR("--> export cookie: "LPX64"\n",
                        req->rq_reqmsg->handle.cookie);
                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
@@ -1442,7 +1461,9 @@ static int ldlm_cleanup(int force)
         wake_up(&expired_lock_thread.elt_waitq);
         wait_event(expired_lock_thread.elt_waitq,
                    expired_lock_thread.elt_state == ELT_STOPPED);
-
+#else
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
 #endif
 
         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
@@ -1476,10 +1497,11 @@ void __exit ldlm_exit(void)
 {
         if ( ldlm_refcount )
                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
-        if (kmem_cache_destroy(ldlm_resource_slab) != 0)
-                CERROR("couldn't free ldlm resource slab\n");
-        if (kmem_cache_destroy(ldlm_lock_slab) != 0)
-                CERROR("couldn't free ldlm lock slab\n");
+        LASSERTF(kmem_cache_destroy(ldlm_resource_slab) == 0,
+                 "couldn't free ldlm resource slab\n");
+        LASSERTF(kmem_cache_destroy(ldlm_lock_slab) == 0,
+                 "couldn't free ldlm lock slab\n");
+
 }
 
 /* ldlm_flock.c */
@@ -1557,6 +1579,8 @@ EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
 /* ldlm_lib.c */
+EXPORT_SYMBOL(client_import_add_conn);
+EXPORT_SYMBOL(client_import_del_conn);
 EXPORT_SYMBOL(client_obd_setup);
 EXPORT_SYMBOL(client_obd_cleanup);
 EXPORT_SYMBOL(client_connect_import);
@@ -1564,6 +1588,7 @@ EXPORT_SYMBOL(client_disconnect_export);
 EXPORT_SYMBOL(target_start_recovery_thread);
 EXPORT_SYMBOL(target_stop_recovery_thread);
 EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_cleanup_recovery);
 EXPORT_SYMBOL(target_destroy_export);
 EXPORT_SYMBOL(target_cancel_recovery_timer);
 EXPORT_SYMBOL(target_send_reply);