Whamcloud - gitweb
Add beginning of symlink support. Not totally functional yet, but not totally
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 63a301f..75c3cf3 100644 (file)
@@ -83,14 +83,17 @@ void ldlm_lock_put(struct ldlm_lock *lock)
 
         l_lock(nslock);
         lock->l_refc--;
+        LDLM_DEBUG(lock, "after refc--");
         if (lock->l_refc < 0)
                 LBUG();
 
         ldlm_resource_put(lock->l_resource);
         if (lock->l_parent)
-                ldlm_lock_put(lock->l_parent);
+                LDLM_LOCK_PUT(lock->l_parent);
 
         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+                lock->l_resource = NULL;
+                LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
                 if (lock->l_connection)
                         ptlrpc_put_connection(lock->l_connection);
                 CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
@@ -99,7 +102,6 @@ void ldlm_lock_put(struct ldlm_lock *lock)
         }
         l_unlock(nslock);
         EXIT;
-        return;
 }
 
 void ldlm_lock_destroy(struct ldlm_lock *lock)
@@ -108,31 +110,34 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
 
         if (!list_empty(&lock->l_children)) {
-                CERROR("lock %p still has children (%p)!\n", lock,
-                       lock->l_children.next);
+                LDLM_DEBUG(lock, "still has children (%p)!",
+                           lock->l_children.next);
                 ldlm_lock_dump(lock);
                 LBUG();
         }
         if (lock->l_readers || lock->l_writers) {
-                CDEBUG(D_INFO, "lock still has references (%d readers, %d "
-                       "writers)\n", lock->l_readers, lock->l_writers);
+                LDLM_DEBUG(lock, "lock still has references");
+                ldlm_lock_dump(lock);
                 LBUG();
         }
 
-        if (!list_empty(&lock->l_res_link))
+        if (!list_empty(&lock->l_res_link)) {
+                ldlm_lock_dump(lock);
                 LBUG();
+        }
 
         if (lock->l_flags & LDLM_FL_DESTROYED) {
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 EXIT;
                 return;
         }
 
         lock->l_flags = LDLM_FL_DESTROYED;
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-        ldlm_lock_put(lock);
+        LDLM_LOCK_PUT(lock);
         EXIT;
-        return;
 }
+
 /*
    usage: pass in a resource on which you have done get
           pass in a parent lock on which you have done a get
@@ -173,7 +178,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         }
         /* this is the extra refcount, to prevent the lock
            evaporating */
-        ldlm_lock_get(lock);
+        LDLM_LOCK_GET(lock);
         RETURN(lock);
 }
 
@@ -218,10 +223,9 @@ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
         lockh->cookie = lock->l_random;
 }
 
-
 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
 {
-        struct ldlm_lock *lock = NULL;
+        struct ldlm_lock *lock = NULL, *retval = NULL;
         ENTRY;
 
         if (!handle || !handle->addr)
@@ -233,16 +237,16 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (lock->l_random != handle->cookie)
-                GOTO(out, handle = NULL);
+                GOTO(out, NULL);
 
         if (lock->l_flags & LDLM_FL_DESTROYED)
-                GOTO(out, handle = NULL);
+                GOTO(out, NULL);
 
-        ldlm_lock_get(lock);
+        retval = LDLM_LOCK_GET(lock);
         EXIT;
  out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-        return  lock;
+        return retval;
 }
 
 
@@ -260,12 +264,13 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
         if (req->rq_reqmsg->bufcount > 1) {
                 /* an intent needs to be considered */
                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+                struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
                 struct mds_body *mds_rep;
                 struct ldlm_reply *rep;
                 __u64 new_resid[3] = {0, 0, 0}, old_res;
                 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
                                                   sizeof(struct mds_body),
-                                                  sizeof(struct obdo)};
+                                                  mds->mds_max_mdsize};
 
                 it->opc = NTOH__u64(it->opc);
 
@@ -278,19 +283,20 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                          * a file and its obdo */
                 case IT_CREAT:
                 case IT_CREAT|IT_OPEN:
+                case IT_LINK:
+                case IT_LOOKUP:
                 case IT_MKDIR:
-                case IT_SYMLINK:
                 case IT_MKNOD:
-                case IT_LINK:
                 case IT_OPEN:
+                case IT_READLINK:
                 case IT_RENAME:
-                        bufcount = 3;
-                        break;
+                case IT_RMDIR:
+                case IT_SETATTR:
+                case IT_SYMLINK:
                 case IT_UNLINK:
-                        bufcount = 2;
-                        size[1] = sizeof(struct obdo);
+                        bufcount = 3;
                         break;
-                case IT_RMDIR:
+                case IT_RENAME2:
                         bufcount = 1;
                         break;
                 default:
@@ -311,46 +317,33 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 switch (it->opc) {
                 case IT_CREAT:
                 case IT_CREAT|IT_OPEN:
+                case IT_LINK:
                 case IT_MKDIR:
-                case IT_SETATTR:
-                case IT_SYMLINK:
                 case IT_MKNOD:
-                case IT_LINK:
-                case IT_UNLINK:
-                case IT_RMDIR:
                 case IT_RENAME2:
-                        if (mds_reint_p == NULL)
-                                mds_reint_p =
-                                        inter_module_get_request
-                                        ("mds_reint", "mds");
-                        if (IS_ERR(mds_reint_p)) {
-                                CERROR("MDSINTENT locks require the MDS "
-                                       "module.\n");
-                                LBUG();
-                                RETURN(-EINVAL);
-                        }
+                case IT_RMDIR:
+                case IT_SYMLINK:
+                case IT_UNLINK:
                         rc = mds_reint_p(2, req);
-                        if (rc)
-                                LBUG();
+                        if (rc || req->rq_status != 0) {
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
+                        }
                         break;
                 case IT_GETATTR:
+                case IT_LOOKUP:
+                case IT_OPEN:
                 case IT_READDIR:
+                case IT_READLINK:
                 case IT_RENAME:
-                case IT_OPEN:
-                        if (mds_getattr_name_p == NULL)
-                                mds_getattr_name_p =
-                                        inter_module_get_request
-                                        ("mds_getattr_name", "mds");
-                        if (IS_ERR(mds_getattr_name_p)) {
-                                CERROR("MDSINTENT locks require the MDS "
-                                       "module.\n");
-                                LBUG();
-                                RETURN(-EINVAL);
-                        }
+                case IT_SETATTR:
                         rc = mds_getattr_name_p(2, req);
-                        if (rc) {
-                                req->rq_status = rc;
-                                RETURN(rc);
+                        /* FIXME: we need to sit down and decide on who should
+                         * set req->rq_status, who should return negative and
+                         * positive return values, and what they all mean. */
+                        if (rc || req->rq_status != 0) {
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
                         }
                         break;
                 case IT_READDIR|IT_OPEN:
@@ -361,12 +354,15 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                         LBUG();
                 }
 
-                if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
+                if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
+                    it->opc == IT_RENAME || it->opc == IT_RENAME2)
                         RETURN(ELDLM_LOCK_ABORTED);
 
-                mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
                 rep->lock_policy_res2 = req->rq_status;
-                new_resid[0] = mds_rep->ino;
+                mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+                new_resid[0] = NTOH__u32(mds_rep->ino);
+                if (new_resid[0] == 0)
+                        LBUG();
                 old_res = lock->l_resource->lr_name[0];
 
                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
@@ -420,7 +416,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
         OBD_ALLOC(w, sizeof(*w));
         if (!w) {
                 LBUG();
-                return;
+                GOTO(out, 0);
         }
 
         if (new) {
@@ -429,7 +425,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
                 ldlm_lock2desc(new, &w->w_desc);
         }
 
-        w->w_lock = ldlm_lock_get(lock);
+        w->w_lock = LDLM_LOCK_GET(lock);
         list_add(&w->w_list, lock->l_resource->lr_tmp);
  out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
@@ -442,7 +438,7 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
 
         lock = ldlm_handle2lock(lockh);
         ldlm_lock_addref_internal(lock, mode);
-        ldlm_lock_put(lock);
+        LDLM_LOCK_PUT(lock);
 }
 
 /* only called for local locks */
@@ -454,7 +450,8 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
         else
                 lock->l_writers++;
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-        ldlm_lock_get(lock);
+        LDLM_LOCK_GET(lock);
+        LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
 
 /* Args: unlocked lock */
@@ -466,7 +463,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
         if (lock == NULL)
                 LBUG();
 
-        LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
+        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
                 lock->l_readers--;
@@ -484,17 +481,18 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
                         LBUG();
                 }
 
-                CDEBUG(D_INFO, "final decref done on cbpending lock, "
-                       "calling callback.\n");
+                LDLM_DEBUG(lock, "final decref done on cbpending lock");
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 ldlm_lock2handle(lock, &lockh);
-                lock->l_blocking_ast(&lockh, NULL, lock->l_data,
+                /* FIXME: -1 is a really, really bad 'desc' */
+                lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
                                      lock->l_data_len);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        ldlm_lock_put(lock);
+        LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
+        LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
 
         EXIT;
 }
@@ -637,8 +635,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
 
         if (lock) {
                 ldlm_lock2handle(lock, lockh);
-                wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
-                                         lock->l_granted_mode);
+                wait_event(lock->l_waitq,
+                           lock->l_req_mode == lock->l_granted_mode);
         }
         if (rc)
                 LDLM_DEBUG(lock, "matched");
@@ -648,7 +646,6 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
 }
 
 /*   Returns a referenced, lock */
-
 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                                    struct lustre_handle *parent_lock_handle,
                                    __u64 *res_id, __u32 type,
@@ -807,7 +804,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                 if (rc)
                         CERROR("Failed AST - should clean & disconnect "
                                "client\n");
-                ldlm_lock_put(w->w_lock);
+                LDLM_LOCK_PUT(w->w_lock);
                 list_del(&w->w_list);
                 OBD_FREE(w, sizeof(*w));
         }
@@ -840,7 +837,6 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         EXIT;
 }
 
-/* Must be called with lock and lock->l_resource unlocked */
 void ldlm_lock_cancel(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res;
@@ -858,9 +854,9 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy(lock);
         l_unlock(&ns->ns_lock);
+        EXIT;
 }
 
-/* Must be called with lock and lock->l_resource unlocked */
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags)
 {