Whamcloud - gitweb
Add flock support.
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 0625389..ec5f8d4 100644 (file)
@@ -237,6 +237,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         struct ldlm_reply *reply;
         int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
         int is_replay = *flags & LDLM_FL_REPLAY;
+        int cleanup_phase = 0;
         ENTRY;
 
         if (exp == NULL) {
@@ -258,7 +259,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
                                         completion, glimpse, data, lvb_len);
                 if (lock == NULL)
-                        GOTO(out_nolock, rc = -ENOMEM);
+                        RETURN(-ENOMEM);
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, mode);
                 ldlm_lock2handle(lock, lockh);
@@ -271,11 +272,14 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 LDLM_DEBUG(lock, "client-side enqueue START");
         }
 
+        /* lock not sent to server yet */
+        cleanup_phase = 2;
+
         if (req == NULL) {
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (req == NULL)
-                        GOTO(out_lock, rc = -ENOMEM);
+                        GOTO(cleanup, rc = -ENOMEM);
                 req_passed_in = 0;
         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
                 LBUG();
@@ -319,21 +323,29 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                                 tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
                                                             lvb_swabber);
                                 if (tmplvb == NULL)
-                                        GOTO(out_lock, rc = -EPROTO);
+                                        GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
                                         memcpy(lvb, tmplvb, lvb_len);
                         }
                 }
-                GOTO(out_lock, rc);
+                GOTO(cleanup, rc);
         }
 
         reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
                                    lustre_swab_ldlm_reply);
         if (reply == NULL) {
                 CERROR("Can't unpack ldlm_reply\n");
-                GOTO(out_lock, rc = -EPROTO);
+                GOTO(cleanup, rc = -EPROTO);
         }
 
+        /* XXX - Phil, wasn't sure if this shoiuld go before or after the
+        /* lustre_swab_repbuf() ? If we can't unpack the reply then we
+        /* don't know what occurred on the server so I think the safest
+        /* bet is to cleanup the lock as if it didn't make it ? */
+
+        /* lock enqueued on the server */
+        cleanup_phase = 1;
+
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         *flags = reply->lock_flags;
@@ -366,7 +378,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                                            reply->lock_desc.l_resource.lr_name);
                         if (lock->l_resource == NULL) {
                                 LBUG();
-                                GOTO(out_lock, rc = -ENOMEM);
+                                GOTO(cleanup, rc = -ENOMEM);
                         }
                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
@@ -391,7 +403,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 void *tmplvb;
                 tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
                 if (tmplvb == NULL)
-                        GOTO(out_lock, rc = -EPROTO);
+                        GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
         }
 
@@ -412,13 +424,17 @@ int ldlm_cli_enqueue(struct obd_export *exp,
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
- out_lock:
-        if (rc)
-                failed_lock_cleanup(ns, lock, lockh, mode);
-        if (!req_passed_in && req != NULL)
-                ptlrpc_req_finished(req);
+cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                if (rc)
+                        failed_lock_cleanup(ns, lock, lockh, mode);
+        case 1:
+                if (!req_passed_in && req != NULL)
+                        ptlrpc_req_finished(req);
+        }
+
         LDLM_LOCK_PUT(lock);
- out_nolock:
         return rc;
 }
 
@@ -1005,6 +1021,7 @@ int ldlm_replay_locks(struct obd_import *imp)
         INIT_LIST_HEAD(&list);
 
         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+        LASSERT(ns != NULL);
 
         /* ensure this doesn't fall to 0 before all have been queued */
         atomic_inc(&imp->imp_replay_inflight);