Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index b76fbcd..d826db1 100644 (file)
@@ -57,6 +57,7 @@ static void waiting_locks_callback(unsigned long unused)
                                                  l_pending_chain);
                 if (l->l_callback_timeout > jiffies)
                         break;
+                CERROR("lock timer expired, lock %p\n", l);
                 LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
                            l->l_export, l->l_export->exp_connection);
                 recovd_conn_fail(l->l_export->exp_connection);
@@ -162,14 +163,26 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         memcpy(&body->lock_desc, desc, sizeof(*desc));
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
-        req->rq_replen = 0; /* no reply needed */
+        req->rq_replen = lustre_msg_size(0, NULL);
 
         ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        (void)ptl_send_rpc(req);
+        req->rq_level = LUSTRE_CONN_RECOVD;
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_expired_completion_wait(lock);
+        } else if (rc) {
+                CERROR("client returned %d from blocking AST for lock %p\n",
+                       req->rq_status, lock);
+                LDLM_DEBUG(lock, "client returned error %d from blocking AST",
+                           req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
 
-        /* not waiting for reply */
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -199,11 +212,22 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         ldlm_lock2desc(lock, &body->lock_desc);
 
         LDLM_DEBUG(lock, "server preparing completion AST");
-        req->rq_replen = 0; /* no reply needed */
-
-        (void)ptl_send_rpc(req);
-
-        /* not waiting for reply */
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        req->rq_level = LUSTRE_CONN_RECOVD;
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_expired_completion_wait(lock);
+        } else if (rc) {
+                CERROR("client returned %d from completion AST for lock %p\n",
+                       req->rq_status, lock);
+                LDLM_DEBUG(lock, "client returned error %d from completion AST",
+                           req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -265,8 +289,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_server_completion_ast,
+        err = ldlm_lock_enqueue(obddev->obd_namespace, lock, cookie, cookielen,
+                                &flags, ldlm_server_completion_ast,
                                 ldlm_server_blocking_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
@@ -384,7 +408,11 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+                                      struct lustre_handle *handle);
+
+static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
+                                   struct ldlm_namespace *ns)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
@@ -395,11 +423,11 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("blocking callback on lock "LPX64" - lock disappeared\n",
-                       dlm_req->lock_handle1.addr);
-                RETURN(0);
+                       dlm_req->lock_handle1.cookie);
+                RETURN(-EINVAL);
         }
 
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
@@ -426,7 +454,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+                                   struct ldlm_namespace *ns)
 {
         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
         struct ldlm_request *dlm_req;
@@ -437,16 +466,16 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("completion callback on lock "LPX64" - lock "
-                       "disappeared\n", dlm_req->lock_handle1.addr);
-                RETURN(0);
+                       "disappeared\n", dlm_req->lock_handle1.cookie);
+                RETURN(-EINVAL);
         }
 
         LDLM_DEBUG(lock, "client completion callback handler START");
 
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        l_lock(&ns->ns_lock);
 
         /* If we receive the completion AST before the actual enqueue returned,
          * then we might need to switch lock modes, resources, or extents. */
@@ -461,14 +490,14 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
         if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
                    lock->l_resource->lr_name,
                    sizeof(__u64) * RES_NAME_SIZE) != 0) {
-                ldlm_lock_change_resource(lock,
+                ldlm_lock_change_resource(ns, lock,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock);
         lock->l_resource->lr_tmp = NULL;
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
         LDLM_LOCK_PUT(lock);
 
@@ -481,12 +510,13 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
 
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
+        struct ldlm_namespace *ns;
         int rc;
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
         if (rc) {
-                CERROR("lustre_ldlm: Invalid request: %d\n", rc);
+                CERROR("Invalid request: %d\n", rc);
                 RETURN(rc);
         }
 
@@ -501,32 +531,44 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
                 CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
                        dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
-                CERROR("--> ignoring this error as a temporary workaround!  "
-                       "beware!\n");
-                //RETURN(-ENOTCONN);
+                RETURN(-ENOTCONN);
         }
 
+        LASSERT(req->rq_export != NULL);
+        LASSERT(req->rq_export->exp_obd != NULL);
+        ns = req->rq_export->exp_obd->obd_namespace;
+        LASSERT(ns != NULL);
+
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-                rc = ldlm_handle_bl_callback(req);
-                RETURN(rc);
+                rc = ldlm_handle_bl_callback(req, ns);
+                break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-                rc = ldlm_handle_cp_callback(req);
-                RETURN(rc);
-
+                rc = ldlm_handle_cp_callback(req, ns);
+                break;
         default:
                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
                 RETURN(-EINVAL);
         }
 
+        req->rq_status = rc;
+        if (rc) {
+                ptlrpc_error(req->rq_svc, req);
+        } else {
+                rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                     &req->rq_repmsg);
+                if (rc)
+                        RETURN(rc);
+                ptlrpc_reply(req->rq_svc, req);
+        }
+
         RETURN(0);
 }
 
-
 static int ldlm_cancel_handler(struct ptlrpc_request *req)
 {
         int rc;
@@ -539,11 +581,14 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         }
 
         if (req->rq_export == NULL) {
+                struct ldlm_request *dlm_req;
                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
                        req->rq_reqmsg->opc, req->rq_request_portal,
                        req->rq_reply_portal);
                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
+                dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+                ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
                 CERROR("--> ignoring this error as a temporary workaround!  "
                        "beware!\n");
                 //RETURN(-ENOTCONN);
@@ -568,7 +613,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-
 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                           void *karg, void *uarg)
 {
@@ -579,7 +623,7 @@ static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 
         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
-                CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
+                CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
                 RETURN(-EINVAL);
         }
@@ -619,11 +663,9 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (ldlm_already_setup)
                 RETURN(-EALREADY);
 
-        MOD_INC_USE_COUNT;
-
         rc = ldlm_proc_setup(obddev);
         if (rc != 0)
-                GOTO(out_dec, rc);
+                RETURN(rc);
 
         ldlm->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
@@ -689,8 +731,6 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
  out_proc:
         ldlm_proc_cleanup(obddev);
 
- out_dec:
-        MOD_DEC_USE_COUNT;
         return rc;
 }
 
@@ -711,7 +751,6 @@ static int ldlm_cleanup(struct obd_device *obddev)
         ldlm_proc_cleanup(obddev);
 
         ldlm_already_setup = 0;
-        MOD_DEC_USE_COUNT;
         RETURN(0);
 }
 
@@ -723,6 +762,7 @@ static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
 }
 
 struct obd_ops ldlm_obd_ops = {
+        o_owner:       THIS_MODULE,
         o_iocontrol:   ldlm_iocontrol,
         o_setup:       ldlm_setup,
         o_cleanup:     ldlm_cleanup,
@@ -798,7 +838,9 @@ EXPORT_SYMBOL(ldlm_namespace_dump);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_replay_locks);
 EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_reprocess_all_ns);
 EXPORT_SYMBOL(ldlm_namespace_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach_res);
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);