Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 4c21bc6..7b6f50d 100644 (file)
@@ -44,19 +44,22 @@ int ldlm_expired_completion_wait(void *data)
 {
         struct lock_wait_data *lwd = data;
         struct ldlm_lock *lock = lwd->lwd_lock;
-        struct obd_device *obd = class_conn2obd(lock->l_connh);
+        struct obd_import *imp;
+        struct obd_device *obd;
 
-        if (obd == NULL) {
-                LDLM_ERROR(lock, "lock timed out; mot entering recovery in "
+        if (lock->l_conn_export == NULL) {
+                LDLM_ERROR(lock, "lock timed out; not entering recovery in "
                            "server code, just going back to sleep");
-        } else {
-                struct obd_import *imp = obd->u.cli.cl_import;
-                ptlrpc_fail_import(imp, lwd->lwd_generation);
-                LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
-                           imp->imp_target_uuid.uuid,
-                           imp->imp_connection->c_remote_uuid.uuid);
+                RETURN(0);
         }
-        
+
+        obd = lock->l_conn_export->exp_obd;
+        imp = obd->u.cli.cl_import;
+        ptlrpc_fail_import(imp, lwd->lwd_generation);
+        LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
+                   imp->imp_target_uuid.uuid,
+                   imp->imp_connection->c_remote_uuid.uuid);
+
         RETURN(0);
 }
 
@@ -69,17 +72,6 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct obd_import *imp = NULL;
         int rc = 0;
         struct l_wait_info lwi;
-
-        obd = class_conn2obd(lock->l_connh);
-
-        /* if this is a local lock, then there is no import */
-        if (obd != NULL)
-                imp = obd->u.cli.cl_import;
-
-        lwd.lwd_lock = lock;
-
-        lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
-                               interrupted_completion_wait, &lwd);
         ENTRY;
 
         if (flags == LDLM_FL_WAIT_NOREPROC)
@@ -99,7 +91,18 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         ldlm_lock_dump(D_OTHER, lock, 0);
         ldlm_reprocess_all(lock->l_resource);
 
- noreproc:
+noreproc:
+
+        obd = class_exp2obd(lock->l_conn_export);
+
+        /* if this is a local lock, then there is no import */
+        if (obd != NULL)
+                imp = obd->u.cli.cl_import;
+
+        lwd.lwd_lock = lock;
+
+        lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
+                               interrupted_completion_wait, &lwd);
         if (imp != NULL) {
                 spin_lock_irqsave(&imp->imp_lock, irqflags);
                 lwd.lwd_generation = imp->imp_generation;
@@ -161,8 +164,8 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
-        if (type == LDLM_EXTENT)
-                memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
+        if (type != LDLM_PLAIN)
+                memcpy(cookie, &lock->l_policy_data, cookielen);
         if ((*flags) & LDLM_FL_LOCK_CHANGED)
                 memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
 
@@ -180,7 +183,20 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         return err;
 }
 
-int ldlm_cli_enqueue(struct lustre_handle *connh,
+static void failed_lock_cleanup(struct ldlm_namespace *ns,
+                                struct ldlm_lock *lock,
+                                struct lustre_handle *lockh, int mode)
+{
+        /* Set a flag to prevent us from sending a CANCEL (bug 407) */
+        l_lock(&ns->ns_lock);
+        lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+        LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
+        l_unlock(&ns->ns_lock);
+
+        ldlm_lock_decref_and_cancel(lockh, mode);
+}
+
+int ldlm_cli_enqueue(struct obd_export *exp,
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -201,9 +217,9 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         ENTRY;
 
         is_replay = *flags & LDLM_FL_REPLAY;
-        LASSERT(connh != NULL || !is_replay);
+        LASSERT(exp != NULL || !is_replay);
 
-        if (connh == NULL) {
+        if (exp == NULL) {
                 rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
                                             type, cookie, cookielen, mode,
                                             flags, completion, blocking, data,
@@ -216,23 +232,22 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         if (is_replay) {
                 lock = ldlm_handle2lock(lockh);
                 LDLM_DEBUG(lock, "client-side enqueue START");
-                LASSERT(connh == lock->l_connh);
+                LASSERT(exp == lock->l_conn_export);
         } else {
                 lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
                                         mode, blocking, completion, data);
                 if (lock == NULL)
                         GOTO(out_nolock, rc = -ENOMEM);
-                LDLM_DEBUG(lock, "client-side enqueue START");
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, mode);
                 ldlm_lock2handle(lock, lockh);
-                if (type == LDLM_EXTENT)
-                        memcpy(&lock->l_extent, cookie,
-                               sizeof(body->lock_desc.l_extent));
+                if (type != LDLM_PLAIN)
+                        memcpy(&lock->l_policy_data, cookie, cookielen);
+                LDLM_DEBUG(lock, "client-side enqueue START");
         }
 
         if (req == NULL) {
-                req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
@@ -255,7 +270,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 size = sizeof(*reply);
                 req->rq_replen = lustre_msg_size(1, &size);
         }
-        lock->l_connh = connh;
+        lock->l_conn_export = exp;
         lock->l_export = NULL;
         lock->l_blocking_ast = blocking;
 
@@ -266,48 +281,45 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 LASSERT(!is_replay);
                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
-                /* Set a flag to prevent us from sending a CANCEL (bug 407) */
-                l_lock(&ns->ns_lock);
-                lock->l_flags |= LDLM_FL_LOCAL_ONLY;
-                LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
-                l_unlock(&ns->ns_lock);
-
-                ldlm_lock_decref_and_cancel(lockh, mode);
-
+                failed_lock_cleanup(ns, lock, lockh, mode);
                 if (rc == ELDLM_LOCK_ABORTED) {
-                        /* caller expects reply buffer 0 to have been swabbed */
-                        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                        /* Before we return, swab the reply */
+                        reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
                                                    lustre_swab_ldlm_reply);
                         if (reply == NULL) {
-                                CERROR ("Can't unpack ldlm_reply\n");
-                                GOTO (out_req, rc = -EPROTO);
+                                CERROR("Can't unpack ldlm_reply\n");
+                                GOTO(out_req, rc = -EPROTO);
                         }
                 }
                 GOTO(out_req, rc);
         }
 
-        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+        reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
                                    lustre_swab_ldlm_reply);
         if (reply == NULL) {
-                CERROR ("Can't unpack ldlm_reply\n");
-                GOTO (out_req, rc = -EPROTO);
+                CERROR("Can't unpack ldlm_reply\n");
+                GOTO(out_req, rc = -EPROTO);
         }
 
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         *flags = reply->lock_flags;
 
-        CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: %d\n", lock,
-               reply->lock_handle.cookie, *flags);
+        CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
+               lock, reply->lock_handle.cookie, *flags);
         if (type == LDLM_EXTENT) {
                 CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
                        "extent "LPU64" -> "LPU64"\n",
-                       body->lock_desc.l_extent.start,
-                       body->lock_desc.l_extent.end,
-                       reply->lock_extent.start, reply->lock_extent.end);
+                       body->lock_desc.l_policy_data.l_extent.start,
+                       body->lock_desc.l_policy_data.l_extent.end,
+                       reply->lock_policy_data.l_extent.start,
+                       reply->lock_policy_data.l_extent.end);
 
-                cookie = &reply->lock_extent; /* FIXME bug 267 */
-                cookielen = sizeof(reply->lock_extent);
+                cookie = &reply->lock_policy_data; /* FIXME bug 267 */
+                cookielen = sizeof(struct ldlm_extent);
+        } else if (type == LDLM_FLOCK) {
+                cookie = &reply->lock_policy_data;
+                cookielen = sizeof(struct ldlm_flock);
         }
 
         /* If enqueue returned a blocked lock but the completion handler has
@@ -338,11 +350,22 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
         }
+        if ((*flags) & LDLM_FL_AST_SENT) {
+                l_lock(&ns->ns_lock);
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                l_unlock(&ns->ns_lock);
+                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
+        }
 
         if (!is_replay) {
                 rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
-                if (lock->l_completion_ast)
-                        lock->l_completion_ast(lock, *flags, NULL);
+                if (lock->l_completion_ast != NULL) {
+                        int err = lock->l_completion_ast(lock, *flags, NULL);
+                        if (err)
+                                failed_lock_cleanup(ns, lock, lockh, mode);
+                        if (!rc)
+                                rc = err;
+                }
         }
 
         LDLM_DEBUG(lock, "client-side enqueue END");
@@ -362,7 +385,7 @@ int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
         struct ldlm_res_id junk;
         int flags = LDLM_FL_REPLAY;
         ldlm_lock2handle(lock, &lockh);
-        return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, junk,
+        return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk,
                                 lock->l_resource->lr_type, NULL, 0, -1, &flags,
                                 NULL, NULL, NULL, &lockh);
 }
@@ -390,7 +413,6 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 {
         struct ldlm_request *body;
-        struct lustre_handle *connh;
         struct ldlm_reply *reply;
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
@@ -404,15 +426,14 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
                 RETURN(-EINVAL);
         }
         *flags = 0;
-        connh = lock->l_connh;
 
-        if (!connh)
+        if (lock->l_conn_export == NULL)
                 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export), 
+                              LDLM_CONVERT, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -464,7 +485,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         if (lock == NULL)
                 RETURN(0);
 
-        if (lock->l_connh) {
+        if (lock->l_conn_export) {
                 int local_only;
                 struct obd_import *imp;
 
@@ -482,7 +503,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                         goto local_cancel;
                 }
 
-                imp = class_conn2cliimp(lock->l_connh);
+                imp = class_exp2cliimp(lock->l_conn_export);
                 if (imp == NULL || imp->imp_invalid) {
                         CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
                                imp);
@@ -612,9 +633,9 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                 struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                if (opaque != NULL && lock->l_data != opaque) {
+                if (opaque != NULL && lock->l_ast_data != opaque) {
                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
-                                  lock->l_data, opaque);
+                                   lock->l_ast_data, opaque);
                         //LBUG();
                         continue;
                 }
@@ -873,7 +894,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
-        req->rq_level = LUSTRE_CONN_RECOVER;
+        req->rq_send_state = LUSTRE_IMP_REPLAY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);