Whamcloud - gitweb
- move the peter branch changes to the head
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 36eb21b..bc659f9 100644 (file)
@@ -21,7 +21,40 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-static int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+{
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        struct ptlrpc_client *cl;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        if (lock == NULL) {
+                LBUG();
+                RETURN(-EINVAL);
+        }
+
+        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+                              &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        body->lock_flags = flags;
+
+        LDLM_DEBUG(lock, "server preparing completion AST");
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
+        RETURN(rc);
+}
+
+int ldlm_handle_enqueue(struct ptlrpc_request *req)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
@@ -68,7 +101,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
 
         flags = dlm_req->lock_flags;
         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_server_ast, ldlm_server_ast);
+                                ldlm_server_completion_ast, ldlm_server_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -104,7 +137,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
         return 0;
 }
 
-static int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_convert(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_reply *dlm_rep;
@@ -143,7 +176,7 @@ static int ldlm_handle_convert(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_cancel(struct ptlrpc_request *req)
+int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
@@ -159,6 +192,8 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
+                LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
+                                  (void *)(unsigned long) dlm_req->lock_handle1.addr);
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
@@ -173,9 +208,7 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
                 ldlm_reprocess_all(lock->l_resource);
                 LDLM_DEBUG(lock, "server-side cancel handler END");
                 LDLM_LOCK_PUT(lock);
-        } else
-                LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
-                                  lock);
+        } 
 
         RETURN(0);
 }
@@ -242,29 +275,27 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                 }
                 LDLM_LOCK_PUT(lock);
         } else {
-                struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+                struct list_head ast_list = LIST_HEAD_INIT(ast_list);
 
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
 
                 /* If we receive the completion AST before the actual enqueue
                  * returned, then we might need to switch resources. */
+                ldlm_resource_unlink_lock(lock);
                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
                            lock->l_resource->lr_name,
                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
                         LDLM_DEBUG(lock, "completion AST, new resource");
                 }
-                lock->l_resource->lr_tmp = &rpc_list;
-                ldlm_resource_unlink_lock(lock);
+                lock->l_resource->lr_tmp = &ast_list;
                 ldlm_grant_lock(lock);
-                /*  FIXME: we want any completion function, not just wake_up */
-                wake_up(&lock->l_waitq);
                 lock->l_resource->lr_tmp = NULL;
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_LOCK_PUT(lock);
 
-                ldlm_run_ast_work(&rpc_list);
+                ldlm_run_ast_work(&ast_list);
         }
 
         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
@@ -272,7 +303,8 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle(struct ptlrpc_request *req)
+
+static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         int rc;
         ENTRY;
@@ -288,31 +320,7 @@ static int ldlm_handle(struct ptlrpc_request *req)
                        req->rq_reqmsg->type);
                 GOTO(out, rc = -EINVAL);
         }
-
-        if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
-                CERROR("No export handle for enqueue request.\n");
-                GOTO(out, rc = -ENOTCONN);
-        }
-
         switch (req->rq_reqmsg->opc) {
-        case LDLM_ENQUEUE:
-                CDEBUG(D_INODE, "enqueue\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
-                rc = ldlm_handle_enqueue(req);
-                break;
-
-        case LDLM_CONVERT:
-                CDEBUG(D_INODE, "convert\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
-                rc = ldlm_handle_convert(req);
-                break;
-
-        case LDLM_CANCEL:
-                CDEBUG(D_INODE, "cancel\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
-                rc = ldlm_handle_cancel(req);
-                break;
-
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
@@ -331,6 +339,7 @@ out:
         return 0;
 }
 
+
 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
                           void *karg, void *uarg)
 {
@@ -385,29 +394,16 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         MOD_INC_USE_COUNT;
         ldlm->ldlm_service =
                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
-                                LDLM_REPLY_PORTAL, "self", ldlm_handle);
+                                LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
         if (!ldlm->ldlm_service) {
                 LBUG();
                 GOTO(out_dec, rc = -ENOMEM);
         }
 
-        if (mds_reint_p == NULL)
-                mds_reint_p = inter_module_get_request("mds_reint", "mds");
-        if (IS_ERR(mds_reint_p)) {
-                CERROR("MDSINTENT locks require the MDS module.\n");
-                GOTO(out_dec, rc = -EINVAL);
-        }
-        if (mds_getattr_name_p == NULL)
-                mds_getattr_name_p = inter_module_get_request
-                        ("mds_getattr_name", "mds");
-        if (IS_ERR(mds_getattr_name_p)) {
-                CERROR("MDSINTENT locks require the MDS module.\n");
-                GOTO(out_dec, rc = -EINVAL);
-        }
-
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
-                                         "lustre_dlm");
+                char name[32];
+                sprintf(name, "lustre_dlm_%2d", i); 
+                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
                 if (rc) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
                         LBUG();
@@ -421,10 +417,6 @@ out_thread:
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
 out_dec:
-        if (mds_reint_p != NULL)
-                inter_module_put("mds_reint");
-        if (mds_getattr_name_p != NULL)
-                inter_module_put("mds_getattr_name");
         MOD_DEC_USE_COUNT;
         return rc;
 }
@@ -437,11 +429,6 @@ static int ldlm_cleanup(struct obd_device *obddev)
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
 
-        if (mds_reint_p != NULL)
-                inter_module_put("mds_reint");
-        if (mds_getattr_name_p != NULL)
-                inter_module_put("mds_getattr_name");
-
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
@@ -487,16 +474,24 @@ static void __exit ldlm_exit(void)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_register_intent);
+EXPORT_SYMBOL(ldlm_unregister_intent); 
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_change_resource);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_cancel);
 EXPORT_SYMBOL(ldlm_match_or_enqueue);
+EXPORT_SYMBOL(ldlm_it2str);
 EXPORT_SYMBOL(ldlm_test);
 EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_namespace_new);