Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index c28bbe2..5765d8c 100644 (file)
@@ -51,7 +51,7 @@ static int ldlm_refcount = 0;
 
 /* LDLM state */
 
-static struct ldlm_state *ldlm ;
+static struct ldlm_state *ldlm_state;
 
 inline unsigned long round_timeout(unsigned long timeout)
 {
@@ -498,9 +498,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
-                           "completion AST\n", rc, req->rq_status);
+                           "completion AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -541,10 +544,12 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
-                           "completion AST\n", rc, req->rq_status);
-                ldlm_lock_cancel(lock);
+                           "glimpse AST", rc, req->rq_status);
         } else {
                 rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
                                                              req->rq_repmsg, 0);
@@ -561,9 +566,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size[2] = {sizeof(*dlm_rep)};
+        int rc = 0, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
-        ldlm_error_t err;
+        ldlm_error_t err = ELDLM_OK;
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
         ENTRY;
@@ -574,7 +579,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR ("Can't unpack dlm_req\n");
-                RETURN (-EFAULT);
+                GOTO(out, rc = -EFAULT);
         }
 
         flags = dlm_req->lock_flags;
@@ -587,7 +592,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                 blocking_callback, completion_callback,
                                 glimpse_callback, NULL, 0);
         if (!lock)
-                GOTO(out, err = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         do_gettimeofday(&lock->l_enqueued_time);
         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
@@ -614,12 +619,15 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
                 rc = lustre_pack_reply(req, buffers, size, NULL);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(out, rc);
         }
 
         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
                 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
                        sizeof(ldlm_policy_data_t));
+        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+                memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+                       sizeof(lock->l_req_extent));
 
         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
         if (err)
@@ -643,26 +651,34 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         EXIT;
  out:
-        if (lock != NULL && lock->l_resource->lr_lvb_len > 0) {
-                void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
-                                           lock->l_resource->lr_lvb_len);
-                memcpy(lvb, lock->l_resource->lr_lvb_data,
-                       lock->l_resource->lr_lvb_len);
-        }
         req->rq_status = err;
+        if (req->rq_reply_state == NULL) {
+                err = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0)
+                        rc = err;
+        }
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d)", err);
+                           "(err=%d, rc=%d)", err, rc);
+
+                if (lock->l_resource->lr_lvb_len > 0) {
+                        void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+                                                  lock->l_resource->lr_lvb_len);
+                        memcpy(lvb, lock->l_resource->lr_lvb_data,
+                               lock->l_resource->lr_lvb_len);
+                }
+
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
         }
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+                          lock, rc);
 
-        return 0;
+        return rc;
 }
 
 int ldlm_handle_convert(struct ptlrpc_request *req)
@@ -754,7 +770,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                                 (res, NULL, 0);
                                 //(res, req->rq_reqmsg, 1);
                 }
-                        
+
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
@@ -902,17 +918,19 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
-        rc = lustre_pack_reply(req, 0, NULL, NULL);
-        if (rc)
-                return rc;
+        if (req->rq_reply_state == NULL) {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        return rc;
+        }
         return ptlrpc_reply(req);
 }
 
 #ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                      struct ldlm_lock *lock)
 {
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
         struct ldlm_bl_work_item *blwi;
         ENTRY;
 
@@ -921,7 +939,8 @@ static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
                 RETURN(-ENOMEM);
 
         blwi->blwi_ns = ns;
-        blwi->blwi_ld = *ld;
+        if (ld != NULL)
+                blwi->blwi_ld = *ld;
         blwi->blwi_lock = lock;
 
         spin_lock(&blp->blp_lock);
@@ -969,58 +988,48 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+        LASSERT(req->rq_export != NULL);
+        LASSERT(req->rq_export->exp_obd != NULL);
+
+        switch(req->rq_reqmsg->opc) {
+        case LDLM_BL_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+                break;
+        case LDLM_CP_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) {
+                break;
+        case LDLM_GL_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
+                break;
+        case OBD_LOG_CANCEL:
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else {
-                ldlm_callback_reply(req, -EPROTO);
-                RETURN(0);
-        }
-
-        LASSERT(req->rq_export != NULL);
-        LASSERT(req->rq_export->exp_obd != NULL);
-
-        /* FIXME - how to send reply */
-        if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
-                int rc = llog_origin_handle_cancel(req);
+                rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                int rc = llog_origin_handle_create(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_CREATE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_create(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                int rc = llog_origin_handle_next_block(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_next_block(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                int rc = llog_origin_handle_read_header(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_READ_HEADER:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_read_header(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                int rc = llog_origin_handle_close(req);
+        case LLOG_ORIGIN_HANDLE_CLOSE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_close(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
+        default:
+                CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
+                ldlm_callback_reply(req, -EPROTO);
+                RETURN(0);
         }
 
         ns = req->rq_export->exp_obd->obd_namespace;
@@ -1053,14 +1062,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * cancelling right now, because it's unused, or have an intent result
          * in the reply, so we might have to push the responsibility for sending
          * the reply down into the AST handlers, alas. */
-        if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK)
-                ldlm_callback_reply(req, 0);
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
 #ifdef __KERNEL__
-                rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+                rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
                 ldlm_callback_reply(req, rc);
 #else
                 rc = 0;
@@ -1070,6 +1077,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
+                ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
         case LDLM_GL_CALLBACK:
@@ -1235,11 +1243,11 @@ static int ldlm_setup(void)
 #endif
         ENTRY;
 
-        if (ldlm != NULL)
+        if (ldlm_state != NULL)
                 RETURN(-EALREADY);
 
-        OBD_ALLOC(ldlm, sizeof(*ldlm));
-        if (ldlm == NULL)
+        OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+        if (ldlm_state == NULL)
                 RETURN(-ENOMEM);
 
 #ifdef __KERNEL__
@@ -1248,25 +1256,25 @@ static int ldlm_setup(void)
                 GOTO(out_free, rc);
 #endif
 
-        ldlm->ldlm_cb_service =
+        ldlm_state->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                                 ldlm_callback_handler, "ldlm_cbd",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cb_service) {
+        if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
-        ldlm->ldlm_cancel_service =
-                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, 
+        ldlm_state->ldlm_cancel_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
                                 ldlm_cancel_handler, "ldlm_canceld",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cancel_service) {
+        if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
@@ -1274,7 +1282,7 @@ static int ldlm_setup(void)
         OBD_ALLOC(blp, sizeof(*blp));
         if (blp == NULL)
                 GOTO(out_proc, rc = -ENOMEM);
-        ldlm->ldlm_bl_pool = blp;
+        ldlm_state->ldlm_bl_pool = blp;
 
         atomic_set(&blp->blp_num_threads, 0);
         init_waitqueue_head(&blp->blp_waitq);
@@ -1298,14 +1306,14 @@ static int ldlm_setup(void)
                 wait_for_completion(&blp->blp_comp);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
                                     LDLM_NUM_THREADS, "ldlm_cn");
         if (rc) {
                 LBUG();
                 GOTO(out_thread, rc);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
                                     LDLM_NUM_THREADS, "ldlm_cb");
         if (rc) {
                 LBUG();
@@ -1337,8 +1345,8 @@ static int ldlm_setup(void)
 
 #ifdef __KERNEL__
  out_thread:
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
 #endif
 
  out_proc:
@@ -1346,15 +1354,15 @@ static int ldlm_setup(void)
         ldlm_proc_cleanup();
  out_free:
 #endif
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
         return rc;
 }
 
 static int ldlm_cleanup(int force)
 {
 #ifdef __KERNEL__
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 #endif
         ENTRY;
 
@@ -1379,10 +1387,10 @@ static int ldlm_cleanup(int force)
         }
         OBD_FREE(blp, sizeof(*blp));
 
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
         ldlm_proc_cleanup();
 
         expired_lock_thread.elt_state = ELT_TERMINATE;
@@ -1392,8 +1400,8 @@ static int ldlm_cleanup(int force)
 
 #endif
 
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
 
         RETURN(0);
 }
@@ -1494,6 +1502,8 @@ EXPORT_SYMBOL(ldlm_regression_stop);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
 EXPORT_SYMBOL(ldlm_resource_get);
 EXPORT_SYMBOL(ldlm_resource_putref);