Whamcloud - gitweb
b=1021,2720
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 2d7946b..a9020d3 100644 (file)
@@ -388,7 +388,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 LASSERT(lock->l_export);
                 if (lock->l_export->exp_libclient) {
                         CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
@@ -445,13 +445,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct ptlrpc_request *req;
         struct timeval granted_time;
         long total_enqueue_wait;
-        int rc = 0, size = sizeof(*body);
+        int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
         ENTRY;
 
-        if (lock == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
+        LASSERT(lock != NULL);
 
         do_gettimeofday(&granted_time);
         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
@@ -459,9 +456,14 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
+        if (lock->l_resource->lr_lvb_len) {
+                buffers = 2;
+                size[1] = lock->l_resource->lr_lvb_len;
+        }
+
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LDLM_CP_CALLBACK, 1, &size, NULL);
-        if (!req)
+                              LDLM_CP_CALLBACK, buffers, size, NULL);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
@@ -470,6 +472,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         body->lock_flags = flags;
         ldlm_lock2desc(lock, &body->lock_desc);
 
+        if (buffers == 2) {
+                void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+                                           lock->l_resource->lr_lvb_len);
+                memcpy(lvb, lock->l_resource->lr_lvb_data,
+                       lock->l_resource->lr_lvb_len);
+        }
+
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
                    total_enqueue_wait);
         req->rq_replen = lustre_msg_size(0, NULL);
@@ -486,7 +495,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
         } else if (rc) {
@@ -502,14 +511,57 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         RETURN(rc);
 }
 
+int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        LASSERT(lock != NULL);
+
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
+                              LDLM_GL_CALLBACK, 1, &size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        ldlm_lock2desc(lock, &body->lock_desc);
+
+        size = lock->l_resource->lr_lvb_len;
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        req->rq_send_state = LUSTRE_IMP_FULL;
+        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                ldlm_del_waiting_lock(lock);
+                ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc) {
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "completion AST\n", rc, req->rq_status);
+                ldlm_lock_cancel(lock);
+        } else {
+                rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
+                                                             req->rq_repmsg, 0);
+        }
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
 int ldlm_handle_enqueue(struct ptlrpc_request *req,
                         ldlm_completion_callback completion_callback,
-                        ldlm_blocking_callback blocking_callback)
+                        ldlm_blocking_callback blocking_callback,
+                        ldlm_glimpse_callback glimpse_callback)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size = sizeof(*dlm_rep), cookielen = 0;
+        int rc, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
         ldlm_error_t err;
         struct ldlm_lock *lock = NULL;
@@ -526,31 +578,14 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         }
 
         flags = dlm_req->lock_flags;
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
-            (flags & LDLM_FL_HAS_INTENT)) {
-                /* In this case, the reply buffer is allocated deep in
-                 * local_lock_enqueue by the policy function. */
-                cookie = req;
-                cookielen = sizeof(*req);
-        } else {
-                rc = lustre_pack_reply(req, 1, &size, NULL);
-                if (rc) {
-                        CERROR("out of memory\n");
-                        RETURN(-ENOMEM);
-                }
-                if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
-                        cookie = &dlm_req->lock_desc.l_policy_data;
-                        cookielen = sizeof(ldlm_policy_data_t);
-                }
-        }
 
         /* The lock's callback data might be set in the policy function */
-        lock = ldlm_lock_create(obddev->obd_namespace,
-                                &dlm_req->lock_handle2,
+        lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                blocking_callback, completion_callback, NULL);
+                                blocking_callback, completion_callback,
+                                glimpse_callback, NULL, 0);
         if (!lock)
                 GOTO(out, err = -ENOMEM);
 
@@ -566,24 +601,35 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
-                                &flags);
+        if (flags & LDLM_FL_HAS_INTENT) {
+                /* In this case, the reply buffer is allocated deep in
+                 * local_lock_enqueue by the policy function. */
+                cookie = req;
+        } else {
+                int buffers = 1;
+                if (lock->l_resource->lr_lvb_len) {
+                        size[1] = lock->l_resource->lr_lvb_len;
+                        buffers = 2;
+                }
+
+                rc = lustre_pack_reply(req, buffers, size, NULL);
+                if (rc)
+                        RETURN(rc);
+        }
+
+        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
+                memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+                       sizeof(ldlm_policy_data_t));
+
+        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
         if (err)
                 GOTO(out, err);
 
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
         dlm_rep->lock_flags = flags;
 
+        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
-        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
-                memcpy(&dlm_rep->lock_policy_data, &lock->l_policy_data,
-                       cookielen);
-        }
-        if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
-                memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
-                       sizeof(dlm_rep->lock_resource_name));
-                dlm_rep->lock_mode = lock->l_req_mode;
-        }
 
         /* We never send a blocking AST until the lock is granted, but
          * we can tell it right now */
@@ -597,6 +643,12 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         EXIT;
  out:
+        if (lock->l_resource->lr_lvb_len > 0) {
+                void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+                                           lock->l_resource->lr_lvb_len);
+                memcpy(lvb, lock->l_resource->lr_lvb_data,
+                       lock->l_resource->lr_lvb_len);
+        }
         req->rq_status = err;
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
@@ -662,11 +714,12 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        struct ldlm_resource *res;
         char str[PTL_NALFMT_SIZE];
         int rc;
         ENTRY;
 
-        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR("bad request buffer for cancel\n");
@@ -694,10 +747,18 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
+                res = lock->l_resource;
+                if (res && res->lr_namespace->ns_lvbo &&
+                    res->lr_namespace->ns_lvbo->lvbo_update) {
+                        (void)res->lr_namespace->ns_lvbo->lvbo_update
+                                (res, NULL, 0);
+                                //(res, req->rq_reqmsg, 1);
+                }
+                        
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
-                req->rq_status = 0;
+                req->rq_status = rc;
         }
 
         if (ptlrpc_reply(req) != 0)
@@ -781,6 +842,18 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
         }
 
+        if (lock->l_lvb_len) {
+                void *lvb;
+                lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
+                                         lock->l_lvb_swabber);
+                if (lvb == NULL) {
+                        LDLM_ERROR(lock, "completion AST did not contain "
+                                   "expected LVB!");
+                } else {
+                        memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
+                }
+        }
+
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock, req, sizeof(*req), 1);
         lock->l_resource->lr_tmp = NULL;
@@ -795,6 +868,37 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         EXIT;
 }
 
+static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
+{
+        ENTRY;
+
+        l_lock(&ns->ns_lock);
+        LDLM_DEBUG(lock, "client glimpse AST callback handler");
+
+        if (lock->l_glimpse_ast != NULL) {
+                l_unlock(&ns->ns_lock);
+                l_check_no_ns_lock(ns);
+                lock->l_glimpse_ast(lock, req);
+                l_lock(&ns->ns_lock);
+        }
+
+        if (lock->l_granted_mode == LCK_PW &&
+            !lock->l_readers && !lock->l_writers &&
+            time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+                l_unlock(&ns->ns_lock);
+                ldlm_handle_bl_callback(ns, NULL, lock);
+                EXIT;
+                return;
+        }
+
+        l_unlock(&ns->ns_lock);
+        LDLM_LOCK_PUT(lock);
+        EXIT;
+}
+
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
@@ -869,6 +973,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+        } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) {
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
         } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
         } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
@@ -947,7 +1053,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * cancelling right now, because it's unused, or have an intent result
          * in the reply, so we might have to push the responsibility for sending
          * the reply down into the AST handlers, alas. */
-        if (req->rq_reqmsg->opc != LDLM_BL_CALLBACK)
+        if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK)
                 ldlm_callback_reply(req, 0);
 
         switch (req->rq_reqmsg->opc) {
@@ -966,6 +1072,10 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 CDEBUG(D_INODE, "completion ast\n");
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
+        case LDLM_GL_CALLBACK:
+                CDEBUG(D_INODE, "glimpse ast\n");
+                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
+                break;
         default:
                 LBUG();                         /* checked above */
         }
@@ -1322,14 +1432,18 @@ void __exit ldlm_exit(void)
 /* ldlm_flock.c */
 EXPORT_SYMBOL(ldlm_flock_completion_ast);
 
+/* ldlm_extent.c */
+EXPORT_SYMBOL(ldlm_extent_shift_kms);
+
 /* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_get_processing_policy);
 EXPORT_SYMBOL(ldlm_lock2desc);
 EXPORT_SYMBOL(ldlm_register_intent);
-EXPORT_SYMBOL(ldlm_unregister_intent);
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_lock2handle);
 EXPORT_SYMBOL(__ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_get);
 EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_cancel);
@@ -1343,6 +1457,7 @@ EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_lock_dump_handle);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match);
 
 /* ldlm_request.c */
 EXPORT_SYMBOL(ldlm_completion_ast);
@@ -1360,6 +1475,7 @@ EXPORT_SYMBOL(ldlm_change_cbdata);
 /* ldlm_lockd.c */
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_convert);
@@ -1378,6 +1494,8 @@ EXPORT_SYMBOL(ldlm_regression_stop);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_resource_get);
+EXPORT_SYMBOL(ldlm_resource_putref);
 
 /* l_lock.c */
 EXPORT_SYMBOL(l_lock);