Whamcloud - gitweb
Many dlm and intent lock fixes.
authorpschwan <pschwan>
Mon, 17 Jun 2002 19:53:26 +0000 (19:53 +0000)
committerpschwan <pschwan>
Mon, 17 Jun 2002 19:53:26 +0000 (19:53 +0000)
 - initial unlink support
 - a DLM trace mode for debugging
 - serious DLM concurrency bugfixes
 - removed vestigial ``local callback'' code from ldlm
 - the beginnings of multi-RPC infrastructure

20 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/lib/mds_updates.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/mds/mds_reint.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c

index eec8407..875be13 100644 (file)
@@ -78,11 +78,12 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
 */
 
 struct ldlm_namespace {
-        struct ptlrpc_client ns_client; /* used for revocation callbacks */
-        __u32                 ns_local; /* is this a local lock tree? */
+        char                 *ns_name;
+        struct ptlrpc_client  ns_rpc_client; /* used for revocation callbacks */
+        __u32                 ns_client; /* is this a client-side lock tree? */
         struct list_head     *ns_hash; /* hash table for ns */
         __u32                 ns_refcount; /* count of resources in the hash */
-        struct list_head     ns_root_list; /* all root resources in ns */
+        struct list_head      ns_root_list; /* all root resources in ns */
         spinlock_t            ns_lock; /* protects hash, refcount, list */
 };
 
@@ -99,7 +100,8 @@ struct ldlm_namespace {
 struct ldlm_lock;
 
 typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new,
-                                  void *data, __u32 data_len);
+                                  void *data, __u32 data_len,
+                                  struct ptlrpc_request **req);
 
 struct ldlm_lock {
         struct ldlm_resource *l_resource;
@@ -167,6 +169,7 @@ struct ldlm_resource {
         __u32                  lr_version[RES_VERSION_SIZE];
         __u32                  lr_refcount;
         spinlock_t             lr_lock; /* protects lists, refcount */
+        void                  *lr_tmp;
 };
 
 static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
@@ -176,6 +179,23 @@ static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
 
 extern struct obd_ops ldlm_obd_ops;
 
+#define LDLM_DEBUG(lock, format, a...)                          \
+do {                                                            \
+        CDEBUG(D_DLMTRACE, "### " format                        \
+               " (%s: lock %p mode %d/%d on res %Lu (rc %d) "   \
+               " type %d remote %Lx)\n", ## a,                  \
+               lock->l_resource->lr_namespace->ns_name, lock,   \
+               lock->l_granted_mode, lock->l_req_mode,          \
+               lock->l_resource->lr_name[0],                    \
+               lock->l_resource->lr_refcount,                   \
+               lock->l_resource->lr_type,                       \
+               lock->l_remote_handle.addr);                     \
+} while (0)
+
+#define LDLM_DEBUG_NOLOCK(format, a...)                 \
+        CDEBUG(D_DLMTRACE, "### " format "\n", ## a);
+
+
 /* ldlm_extent.c */
 int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
 int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *);
@@ -211,10 +231,10 @@ void ldlm_lock_dump(struct ldlm_lock *lock);
 int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn);
 
 /* resource.c */
-struct ldlm_namespace *ldlm_namespace_new(__u32 local);
+struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local);
 int ldlm_namespace_free(struct ldlm_namespace *ns);
 
-/* resourc.c - internal */
+/* resource.c - internal */
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
                                         __u64 *name, __u32 type, int create);
@@ -241,12 +261,11 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl,
                      __u32 data_len,
                      struct lustre_handle *lockh);
 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                      void *data, __u32 data_len);
+                      void *data, __u32 data_len, struct ptlrpc_request **reqp);
 int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
                      int new_mode, int *flags);
 int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *);
 
-
 #endif /* __KERNEL__ */
 
 /* ioctls for trying requests */
index 52b034a..9238982 100644 (file)
@@ -134,7 +134,7 @@ void mds_pack_inode2body(struct mds_body *body, struct inode *inode);
 /* mds/handler.c */
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
 int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                      void *data, int data_len);
+                      void *data, int data_len, struct ptlrpc_request **req);
 int mds_reint(int offset, struct ptlrpc_request *req);
 
 /* mdc/mdc_request.c */
index 139330c..30224e2 100644 (file)
@@ -141,6 +141,7 @@ struct ptlrpc_client {
 struct ptlrpc_request { 
         int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
         struct list_head rq_list;
+        struct list_head rq_multi;
         struct obd_device *rq_obd;
         int rq_status;
         int rq_flags; 
index 25d2214..3ff3c7e 100644 (file)
@@ -55,13 +55,16 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 struct ldlm_reply *rep;
                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
                 __u32 type = lock->l_resource->lr_type;
-                __u64 new_resid[3] = {0, 0, 0};
+                __u64 new_resid[3] = {0, 0, 0}, old_res;
                 int bufcount, rc, size[3] = {sizeof(struct ldlm_reply),
                                              sizeof(struct mds_body),
                                              sizeof(struct obdo)};
 
                 it->opc = NTOH__u64(it->opc);
 
+                LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
+
+                /* prepare reply */
                 switch(it->opc) {
                 case IT_GETATTR:
                         /* Note that in the negative case you may be returning
@@ -76,8 +79,12 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 case IT_RENAME:
                         bufcount = 3;
                         break;
-                default:
+                case IT_UNLINK:
                         bufcount = 2;
+                        size[1] = sizeof(struct obdo); 
+                        break;
+                default:
+                        LBUG();
                 }
 
                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
@@ -89,6 +96,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
                 rep = lustre_msg_buf(req->rq_repmsg, 0);
                 rep->lock_policy_res1 = 1;
+
+                /* execute policy */
                 switch ( it->opc ) {
                 case IT_CREAT:
                 case IT_CREAT|IT_OPEN:
@@ -97,6 +106,7 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 case IT_SYMLINK:
                 case IT_MKNOD:
                 case IT_LINK:
+                case IT_UNLINK:
                 case IT_RENAME2:
                         if (mds_reint_p == NULL)
                                 mds_reint_p =
@@ -140,13 +150,16 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                         LBUG();
                 }
 
+                if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
+                        RETURN(ELDLM_LOCK_ABORTED);
+
                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
                 rep->lock_policy_res2 = req->rq_status;
                 new_resid[0] = mds_rep->ino;
+                old_res = lock->l_resource->lr_name[0];
 
                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
-                       "%ld\n", mds_rep->ino,
-                       (long)lock->l_resource->lr_name[0]);
+                       "%ld\n", mds_rep->ino, (long)old_res);
                 ldlm_resource_put(lock->l_resource);
 
                 lock->l_resource =
@@ -155,6 +168,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                         LBUG();
                         RETURN(-ENOMEM);
                 }
+                LDLM_DEBUG(lock, "intent policy, old res %ld",
+                           (long)old_res);
                 RETURN(ELDLM_LOCK_CHANGED);
         } else {
                 int size = sizeof(struct ldlm_reply);
@@ -247,21 +262,25 @@ void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
         spin_unlock(&lock->l_lock);
 }
 
-void ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
+        struct ptlrpc_request *req = NULL;
         ENTRY;
 
         spin_lock(&lock->l_lock);
         if (lock->l_flags & LDLM_FL_AST_SENT) {
-                EXIT;
-                return;
+                RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_AST_SENT;
-        spin_unlock(&lock->l_lock);
 
-        lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len);
-        EXIT;
+        lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
+        spin_unlock(&lock->l_lock);
+        if (req != NULL) {
+                struct list_head *list = lock->l_resource->lr_tmp;
+                list_add(&req->rq_multi, list);
+        }
+        RETURN(1);
 }
 
 /* Args: unlocked lock */
@@ -280,7 +299,7 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
         if (!lock->l_readers && !lock->l_writers &&
             lock->l_flags & LDLM_FL_DYING) {
                 /* Read this lock its rights. */
-                if (!lock->l_resource->lr_namespace->ns_local) {
+                if (!lock->l_resource->lr_namespace->ns_client) {
                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
                         LBUG();
                 }
@@ -288,14 +307,16 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
                 CDEBUG(D_INFO, "final decref done on dying lock, "
                        "calling callback.\n");
                 spin_unlock(&lock->l_lock);
+                /* This function pointer is unfortunately overloaded.  This
+                 * call will not result in an RPC. */
                 lock->l_blocking_ast(lock, NULL, lock->l_data,
-                                     lock->l_data_len);
+                                     lock->l_data_len, NULL);
         } else
                 spin_unlock(&lock->l_lock);
         EXIT;
 }
 
-/* Args: locked lock */
+/* Args: unlocked lock */
 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
                              struct list_head *queue)
 {
@@ -325,6 +346,10 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
                 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
                 if (send_cbs && child->l_blocking_ast != NULL) {
                         CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
+                        /* It's very difficult to actually send the AST from
+                         * here, because we'd have to drop the lock before going
+                         * to sleep to wait for the reply.  Instead we build the
+                         * packet and send it later. */
                         ldlm_send_blocking_ast(child, lock);
                 }
         }
@@ -358,8 +383,8 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
                 res->lr_most_restr = lock->l_granted_mode;
 
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, NULL,
-                                       lock->l_data, lock->l_data_len);
+                lock->l_completion_ast(lock, NULL, lock->l_data,
+                                       lock->l_data_len, NULL);
         EXIT;
 }
 
@@ -477,7 +502,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
 
         lock = lustre_handle2object(lockh);
         res = lock->l_resource;
-        local = res->lr_namespace->ns_local;
+        local = res->lr_namespace->ns_client;
         spin_lock(&res->lr_lock);
 
         lock->l_blocking_ast = blocking;
@@ -492,7 +517,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
                         res = lock->l_resource;
                         *flags |= LDLM_FL_LOCK_CHANGED;
                 }
-                if (rc < 0) {
+                if (rc == ELDLM_LOCK_ABORTED) {
                         /* Abort. */
                         ldlm_resource_put(lock->l_resource);
                         ldlm_lock_free(lock);
@@ -562,6 +587,8 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
                 struct ldlm_lock *pending;
                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
+                CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
+
                 /* the resource lock protects ldlm_lock_compat */
                 if (ldlm_lock_compat(pending, 1))
                         RETURN(1);
@@ -579,15 +606,37 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
 /* Must be called with resource->lr_lock not taken. */
 void ldlm_reprocess_all(struct ldlm_resource *res)
 {
+        struct list_head rpc_list, *tmp, *pos;
+
+        INIT_LIST_HEAD(&rpc_list);
+
         /* Local lock trees don't get reprocessed. */
-        if (res->lr_namespace->ns_local)
+        if (res->lr_namespace->ns_client)
                 return;
 
         spin_lock(&res->lr_lock);
+        res->lr_tmp = &rpc_list;
+
         ldlm_reprocess_queue(res, &res->lr_converting);
         if (list_empty(&res->lr_converting))
                 ldlm_reprocess_queue(res, &res->lr_waiting);
+
+        res->lr_tmp = NULL;
         spin_unlock(&res->lr_lock);
+
+        list_for_each_safe(tmp, pos, &rpc_list) {
+                int rc;
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_multi);
+
+                CDEBUG(D_INFO, "Sending callback.\n");
+
+                rc = ptlrpc_queue_wait(req);
+                rc = ptlrpc_check_status(req, rc);
+                ptlrpc_free_req(req);
+                if (rc)
+                        CERROR("Callback send failed: %d\n", rc);
+        }
 }
 
 /* Must be called with lock and lock->l_resource unlocked */
@@ -632,7 +681,7 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
         list_del_init(&lock->l_res_link);
 
         /* If this is a local resource, put it on the appropriate list. */
-        if (res->lr_namespace->ns_local) {
+        if (res->lr_namespace->ns_client) {
                 if (*flags & LDLM_FL_BLOCK_CONV)
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
                                                lock);
@@ -670,7 +719,8 @@ void ldlm_lock_dump(struct ldlm_lock *lock)
 
         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
-        CDEBUG(D_OTHER, "  Resource: %p\n", lock->l_resource);
+        CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
+               lock->l_resource->lr_name[0]);
         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
                (int)lock->l_req_mode, (int)lock->l_granted_mode);
         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
index dcd6943..3e1cc75 100644 (file)
@@ -21,21 +21,11 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
-
-static int is_local_conn(struct ptlrpc_connection *conn)
-{
-        ENTRY;
-        if (conn == NULL)
-                RETURN(1);
-
-        RETURN(LOOPBACK(conn->c_peer.peer_nid));
-}
-
 /* _ldlm_callback and local_callback setup the variables then call this common
  * code */
 static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                           ldlm_mode_t mode, void *data, __u32 data_len)
+                           ldlm_mode_t mode, void *data, __u32 data_len,
+                           struct ptlrpc_request **reqp)
 {
         ENTRY;
 
@@ -66,7 +56,7 @@ static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
                                "callback (%p).\n", lock->l_blocking_ast);
                         if (lock->l_blocking_ast != NULL)
                                 lock->l_blocking_ast(lock, new, lock->l_data,
-                                                     lock->l_data_len);
+                                                     lock->l_data_len, reqp);
                 } else {
                         CDEBUG(D_INFO, "Lock still has references; lock will be"
                                " cancelled later.\n");
@@ -75,17 +65,6 @@ static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
         RETURN(0);
 }
 
-/* FIXME: I think that this is no longer necessary. */
-static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
-                          void *data, __u32 data_len)
-{
-        struct ldlm_lock *lock;
-        /* the 'remote handle' is the lock in the FS's namespace */
-        lock = lustre_handle2object(&l->l_remote_handle);
-
-        return common_callback(lock, new, l->l_granted_mode, data, data_len);
-}
-
 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
                          struct ptlrpc_request *req)
 {
@@ -100,11 +79,7 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
         void *cookie;
         ENTRY;
 
-        /* Is this lock managed locally? */
-        if (is_local_conn(req->rq_connection))
-                callback = local_callback;
-        else
-                callback = ldlm_cli_callback;
+        callback = ldlm_cli_callback;
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
@@ -134,6 +109,9 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
+        lock = lustre_handle2object(&lockh);
+        LDLM_DEBUG(lock, "server-side enqueue handler");
+
         flags = dlm_req->lock_flags;
         err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
                                       callback, callback);
@@ -144,7 +122,6 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
         dlm_rep->lock_flags = flags;
 
         memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
-        lock = lustre_handle2object(&lockh);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
                        sizeof(lock->l_extent));
@@ -174,6 +151,7 @@ static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         struct ldlm_request *dlm_req;
         struct ldlm_reply *dlm_rep;
         struct ldlm_resource *res;
+        struct ldlm_lock *lock;
         int rc, size = sizeof(*dlm_rep);
         ENTRY;
 
@@ -186,6 +164,9 @@ static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
+        lock = lustre_handle2object(&dlm_req->lock_handle1);
+        LDLM_DEBUG(lock, "server-side convert handler");
+
         res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
                                       dlm_req->lock_desc.l_req_mode,
                                       &dlm_rep->lock_flags);
@@ -214,6 +195,7 @@ static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
         lock = lustre_handle2object(&dlm_req->lock_handle1);
+        LDLM_DEBUG(lock, "server-side cancel handler");
         res = ldlm_local_lock_cancel(lock);
         req->rq_status = 0;
         if (ptlrpc_reply(svc, req) != 0)
@@ -249,8 +231,15 @@ static int _ldlm_callback(struct ptlrpc_service *svc,
         lock1 = lustre_handle2object(&dlm_req->lock_handle1);
         lock2 = lustre_handle2object(&dlm_req->lock_handle2);
 
+        LDLM_DEBUG(lock1, "client %s callback handler START",
+                   lock2 == NULL ? "completion" : "blocked");
+
         common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
-                        0);
+                        0, NULL);
+
+        LDLM_DEBUG_NOLOCK("client %s callback handler END",
+                   lock2 == NULL ? "completion" : "blocked");
+
         RETURN(0);
 }
 
index 074d60e..5f566ad 100644 (file)
@@ -40,6 +40,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 GOTO(out, rc);
 
         lock = lustre_handle2object(lockh);
+        LDLM_DEBUG(lock, "client-side enqueue START");
 
         if (req == NULL) {
                 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
@@ -80,6 +81,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 ldlm_resource_put(lock->l_resource);
                 spin_unlock(&lock->l_resource->lr_lock);
                 ldlm_lock_free(lock);
+                if (rc == ELDLM_LOCK_ABORTED)
+                        rc = 0;
                 GOTO(out, rc);
         }
 
@@ -111,6 +114,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                         LBUG();
                         RETURN(-ENOMEM);
                 }
+                LDLM_DEBUG(lock, "client-side enqueue, new resource");
         }
 
         if (!req_passed_in)
@@ -119,6 +123,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
                                      callback);
 
+        LDLM_DEBUG(lock, "client-side enqueue END");
         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                       LDLM_FL_BLOCK_CONV)) {
                 /* Go to sleep until the lock is granted. */
@@ -136,16 +141,17 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 }
 
 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                      void *data, __u32 data_len)
+                      void *data, __u32 data_len, struct ptlrpc_request **reqp)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
-        struct ptlrpc_client *cl = &lock->l_resource->lr_namespace->ns_client;
+        struct ptlrpc_client *cl =
+                &lock->l_resource->lr_namespace->ns_rpc_client;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+                              &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -162,11 +168,18 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
                 ldlm_object2handle(new, &body->lock_handle2);
         }
 
+        LDLM_DEBUG(lock, "server preparing %s AST",
+                   new == NULL ? "completion" : "blocked");
+
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
+        if (reqp == NULL) {
+                rc = ptlrpc_queue_wait(req);
+                rc = ptlrpc_check_status(req, rc);
+                ptlrpc_free_req(req);
+        } else {
+                *reqp = req;
+        }
 
         EXIT;
  out:
@@ -187,6 +200,8 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
         lock = lustre_handle2object(lockh);
         *flags = 0;
 
+        LDLM_DEBUG(lock, "client-side convert");
+
         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
                               NULL);
         if (!req)
@@ -228,12 +243,13 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
 
 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
 {
-        struct ldlm_request *body;
         struct ptlrpc_request *req;
+        struct ldlm_request *body;
         struct ldlm_resource *res;
         int rc, size = sizeof(*body);
         ENTRY;
 
+        LDLM_DEBUG(lock, "client-side cancel");
         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
                               NULL);
         if (!req)
index 99d00b8..67fdf88 100644 (file)
@@ -15,7 +15,7 @@
 
 kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
 
-struct ldlm_namespace *ldlm_namespace_new(__u32 local)
+struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
 {
         struct ldlm_namespace *ns = NULL;
         struct list_head *bucket;
@@ -32,14 +32,20 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local)
                 GOTO(out, ns);
         }
 
-        ptlrpc_init_client(NULL, NULL,
-                           LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           &ns->ns_client);
+        OBD_ALLOC(ns->ns_name, strlen(name) + 1);
+        if (!ns->ns_name) {
+                LBUG();
+                GOTO(out, ns);
+        }
+        strcpy(ns->ns_name, name);
+
+        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           &ns->ns_rpc_client);
 
         INIT_LIST_HEAD(&ns->ns_root_list);
         ns->ns_lock = SPIN_LOCK_UNLOCKED;
         ns->ns_refcount = 0;
-        ns->ns_local = local;
+        ns->ns_client = client;
 
         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
@@ -49,6 +55,8 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local)
  out: 
         if (ns && ns->ns_hash)
                 vfree(ns->ns_hash);
+        if (ns && ns->ns_name)
+                OBD_FREE(ns->ns_name, strlen(name) + 1);
         if (ns) 
                 OBD_FREE(ns, sizeof(*ns));
         return NULL;
@@ -57,7 +65,7 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local)
 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
 {
         struct list_head *tmp, *pos;
-        int rc = 0, client = res->lr_namespace->ns_local;
+        int rc = 0, client = res->lr_namespace->ns_client;
         ENTRY;
 
         list_for_each_safe(tmp, pos, q) {
@@ -117,7 +125,8 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
         }
 
         vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */);
-        ptlrpc_cleanup_client(&ns->ns_client); 
+        ptlrpc_cleanup_client(&ns->ns_rpc_client); 
+        OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
         OBD_FREE(ns, sizeof(*ns));
 
         return ELDLM_OK;
@@ -310,8 +319,10 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
 /* Must be called with resource->lr_lock taken */
 void ldlm_resource_del_lock(struct ldlm_lock *lock)
 {
-        list_del_init(&lock->l_res_link);
-        lock->l_resource->lr_refcount--;
+        if (!list_empty(&lock->l_res_link)) {
+                list_del_init(&lock->l_res_link);
+                lock->l_resource->lr_refcount--;
+        }
 }
 
 int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)
@@ -341,7 +352,8 @@ void ldlm_resource_dump(struct ldlm_resource *res)
                  (unsigned long long)res->lr_name[2]);
 
         CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name);
-        CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace);
+        CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
+               res->lr_namespace->ns_name);
         CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
 
         CDEBUG(D_OTHER, "Granted locks:\n");
index c110553..554e203 100644 (file)
@@ -14,7 +14,8 @@
 #include <linux/lustre_dlm.h>
 
 static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                               void *data, __u32 data_len)
+                              void *data, __u32 data_len,
+                              struct ptlrpc_request **reqp)
 {
         printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new);
         return 0;
@@ -29,7 +30,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         struct lustre_handle lockh_1, lockh_2;
         int flags;
 
-        ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+        ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER);
         if (ns == NULL)
                 LBUG();
 
@@ -75,7 +76,7 @@ int ldlm_test_extents(struct obd_device *obddev)
         ldlm_error_t err;
         int flags;
 
-        ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+        ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER);
         if (ns == NULL)
                 LBUG();
 
index 70e6a40..e7f192c 100644 (file)
@@ -189,7 +189,8 @@ void mds_unlink_pack(struct ptlrpc_request *req, int offset,
 
         rec->ul_opcode = HTON__u32(REINT_UNLINK);
         ll_inode2fid(&rec->ul_fid1, inode);
-        ll_inode2fid(&rec->ul_fid2, child);
+        if (child) 
+                ll_inode2fid(&rec->ul_fid2, child);
 
         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1);
         LOGL0(name, namelen, tmp);
index bbdd884..cbe154b 100644 (file)
@@ -30,11 +30,13 @@ void ll_intent_release(struct dentry *de)
                 return;
         }
 
-        handle = (struct lustre_handle *)de->d_it->it_lock_handle;
-        lock = lustre_handle2object(handle);
-        CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock,
-               de->d_it->it_lock_mode);
-        ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+        if (de->d_it->it_lock_mode) {
+                handle = (struct lustre_handle *)de->d_it->it_lock_handle;
+                lock = lustre_handle2object(handle);
+                CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock,
+                       de->d_it->it_lock_mode);
+                ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+        }
         de->d_it = NULL;
         EXIT;
 }
index aa16333..1382b9c 100644 (file)
@@ -196,7 +196,8 @@ static void ll_update_atime(struct inode *inode)
 }
 
 static int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                            void *data, __u32 data_len)
+                            void *data, __u32 data_len,
+                            struct ptlrpc_request **reqp)
 {
         struct inode *inode = lock->l_data;
         ENTRY;
index 5d23512..1dba622 100644 (file)
@@ -105,10 +105,11 @@ int ll_lock(struct inode *dir, struct dentry *dentry,
                 err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
                                   it, LCK_PW, dir, dentry, lockh, 0, NULL, 0,
                                   dir, sizeof(*dir));
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN))
+        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK))
                 err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
                                   it, LCK_PR, dir, dentry, lockh, 0, NULL, 0,
                                   dir, sizeof(*dir));
+
         else
                 LBUG();
 
@@ -152,7 +153,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
              it->it_disposition && !it->it_status)
                 GOTO(negative, NULL);
 
-        if ( (it->it_op & (IT_GETATTR)) &&
+        if ( (it->it_op & (IT_GETATTR | IT_UNLINK)) &&
              it->it_disposition && it->it_status)
                 GOTO(negative, NULL);
 
@@ -170,6 +171,21 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                         RETURN(ERR_PTR(-abs(err)));
                 }
                 offset = 0;
+        } else if (it->it_op == IT_UNLINK) { 
+                struct obdo *obdo;
+                request = (struct ptlrpc_request *)it->it_data;
+                obdo = lustre_msg_buf(request->rq_repmsg, 1);
+                inode = new_inode(dir->i_sb);
+                ll_i2info(inode)->lli_obdo = obdo_alloc();
+
+                /* XXX fix mem allocation error */
+                memcpy(ll_i2info(inode)->lli_obdo, obdo, sizeof(*obdo));
+
+                if (!inode) 
+                        GOTO(out_req, -ENOMEM);
+                inode->i_mode= S_IFREG;
+                inode->i_nlink = 1;
+                GOTO(out_req, 0);
         } else {
                 struct mds_body *body;
 
@@ -196,6 +212,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
         if (it->it_op & IT_RENAME)
                 it->it_data = dentry;
 
+ out_req:
         ptlrpc_free_req(request);
         if (!inode)
                 RETURN(ERR_PTR(-ENOMEM));
@@ -517,6 +534,11 @@ static int ll_unlink(struct inode * dir, struct dentry *dentry)
         struct page * page;
         int err = -ENOENT;
 
+        if (dentry->d_it && dentry->d_it->it_disposition) { 
+                inode->i_nlink = 0;
+                GOTO(out, err=0);
+        }
+
         de = ext2_find_entry (dir, dentry, &page);
         if (!de)
                 goto out;
index 434176a..1f92883 100644 (file)
@@ -257,19 +257,20 @@ static void ll_clear_inode(struct inode *inode)
 static void ll_delete_inode(struct inode *inode)
 {
         if (S_ISREG(inode->i_mode)) { 
-                int err; 
-                struct obdo *oa; 
+                int err;
+                struct obdo *oa;
                 oa = ll_i2info(inode)->lli_obdo;
 
-                if (!oa) {
-                        CERROR("no memory\n");
-                        GOTO(out, -ENOMEM);
-                }
+                if (!oa)
+                        GOTO(out, -EINVAL);
+
+                if (oa->o_id == 0)
+                        /* No obdo was ever created */
+                        GOTO(out, 0);
 
                 err = obd_destroy(ll_i2obdconn(inode), oa);
                 CDEBUG(D_INODE, "obd destroy of %Ld error %d\n",
                        (unsigned long long)oa->o_id, err);
-                obdo_free(oa);
         }
 out:
         clear_inode(inode);
index 9830405..924629e 100644 (file)
@@ -125,7 +125,8 @@ int mdc_getattr(struct obd_conn *conn,
 }
 
 static int mdc_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                             void *data, int data_len)
+                             void *data, int data_len,
+                             struct ptlrpc_request **req)
 {
         int rc;
         struct inode *inode = data;
@@ -173,6 +174,10 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it,
         struct ldlm_intent *lit;
         ENTRY;
 
+#warning FIXME: Andreas, the sgid directory stuff also goes here, but check again on mds
+
+        LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
+
         switch (it->it_op) { 
         case IT_MKDIR:
                 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; 
@@ -235,6 +240,24 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it,
                 size[0] = sizeof(struct ldlm_reply);
                 size[1] = sizeof(struct mds_body);
                 req->rq_replen = lustre_msg_size(2, size);
+        } else if ( it->it_op == IT_UNLINK ) {
+                size[2] = sizeof(struct mds_rec_unlink);
+                size[3] = de->d_name.len + 1;
+                req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
+                                      LDLM_ENQUEUE, 4, size, NULL);
+                if (!req)
+                        RETURN(-ENOMEM);
+
+                /* pack the intent */
+                lit = lustre_msg_buf(req->rq_reqmsg, 1);
+                lit->opc = NTOH__u64((__u64)it->it_op);
+
+                /* pack the intended request */
+                mds_unlink_pack(req, 2, dir, NULL, de->d_name.name, 
+                                de->d_name.len);
+                size[0] = sizeof(struct ldlm_reply);
+                size[1] = sizeof(struct obdo);
+                req->rq_replen = lustre_msg_size(2, size);
         } else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
                     it->it_op == IT_OPEN ) {
                 size[2] = sizeof(struct mds_body);
@@ -293,8 +316,10 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it,
                               obddev->obd_namespace, NULL, res_id, lock_type,
                               NULL, 0, lock_mode, &flags,
                               (void *)mdc_lock_callback, data, datalen, lockh);
-
-        if (rc != 0) {
+        if (rc == -ENOENT) {
+                lock_mode = 0;
+                memset(lockh, 0, sizeof(*lockh));
+        } else if (rc != 0) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
                 RETURN(rc);
         }
@@ -316,6 +341,7 @@ int mdc_open(struct obd_conn *conn, ino_t ino, int type, int flags,
         struct mds_body *body;
         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
         struct ptlrpc_request *req;
+        ENTRY;
 
         if (obdo != NULL) {
                 bufcount = 2;
@@ -686,7 +712,8 @@ static int mdc_connect(struct obd_conn *conn)
 
         ENTRY;
 
-        conn->oc_dev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+        conn->oc_dev->obd_namespace =
+                ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
         if (conn->oc_dev->obd_namespace == NULL)
                 RETURN(-ENOMEM);
 
index 36c3448..4e1d8ec 100644 (file)
@@ -260,7 +260,7 @@ static int mds_disconnect(struct mds_obd *mds, struct ptlrpc_request *req)
 }
 
 int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                      void *data, int data_len)
+                      void *data, int data_len, struct ptlrpc_request **reqp)
 {
         ENTRY;
 
@@ -317,6 +317,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
                                    NULL, 0, lock_mode, &lockh);
         if (rc == 0) {
+                LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
                                       NULL, mds->mds_local_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
@@ -326,6 +327,9 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_create_de, rc = -EIO);
                 }
+        } else {
+                lock = lustre_handle2object(&lockh);
+                LDLM_DEBUG(lock, "matched");
         }
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
@@ -882,14 +886,16 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_thread, rc);
         }
 
-        obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+        obddev->obd_namespace =
+                ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL) {
                 LBUG();
                 mds_cleanup(obddev);
                 GOTO(err_thread, rc);
         }
 
-        mds->mds_local_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+        mds->mds_local_namespace =
+                ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
         if (mds->mds_local_namespace == NULL) {
                 LBUG();
                 mds_cleanup(obddev);
index 30f4d51..55ce734 100644 (file)
@@ -100,20 +100,23 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle,
 static int mds_extN_set_obdo(struct inode *inode, void *handle,
                              struct obdo *obdo)
 {
-        struct mds_objid *data = (struct mds_objid *)obdo->o_inline;
+        struct mds_objid *data
         int rc;
 
-        data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
+
 
         lock_kernel();
         down(&inode->i_sem);
         if (obdo == NULL)
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
                                     XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
-        else
+        else { 
+                data = (struct mds_objid *)obdo->o_inline;       
+                data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
                                     XATTR_LUSTRE_MDS_OBJID, obdo->o_inline,
                                     OBD_INLINESZ, XATTR_CREATE);
+        }
         up(&inode->i_sem);
         unlock_kernel();
 
@@ -139,8 +142,8 @@ static int mds_extN_get_obdo(struct inode *inode, struct obdo *obdo)
         unlock_kernel();
 
         if (rc < 0) {
-                CERROR("error getting EA %s from MDS inode %ld: rc = %d\n",
-                       XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
+                CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: "
+                       "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
                 obdo->o_id = 0;
         } else if (data->mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) {
                 CERROR("MDS object id %Ld has bad magic %Lx\n",
index 6b95585..b2ab3c3 100644 (file)
@@ -220,6 +220,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
                                    NULL, 0, lock_mode, &lockh);
         if (rc == 0) {
+                LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
                                       NULL, mds->mds_local_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
@@ -229,6 +230,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_create_de, rc = -EIO);
                 }
+        } else {
+                lock = lustre_handle2object(&lockh);
+                LDLM_DEBUG(lock, "matched");
         }
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
@@ -245,7 +249,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 struct mds_body *body;
                 struct obdo *obdo;
                 struct inode *inode = dchild->d_inode;
-                CERROR("child exists (dir %ld, name %s, ino %ld)\n",
+                CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
 
                 body = lustre_msg_buf(req->rq_repmsg, offset);
@@ -373,7 +377,12 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct dentry *de = NULL;
         struct dentry *dchild = NULL;
         struct mds_obd *mds = &req->rq_obd->u.mds;
+        struct obdo *obdo;
         struct inode *dir, *inode;
+        int lock_mode, flags;
+        __u64 res_id[3] = {0};
+        struct lustre_handle lockh;
+        struct ldlm_lock *lock;
         void *handle;
         int rc = 0;
         int err;
@@ -384,8 +393,30 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LBUG();
                 GOTO(out_unlink, rc = -ESTALE);
         }
+
         dir = de->d_inode;
         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+        lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
+        res_id[0] = dir->i_ino;
+
+        rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+                                   NULL, 0, lock_mode, &lockh);
+        if (rc == 0) {
+                LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
+                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+                                      NULL, mds->mds_local_namespace, NULL,
+                                      res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+                                      &flags, (void *)mds_lock_callback, NULL,
+                                      0, &lockh);
+                if (rc != ELDLM_OK) {
+                        CERROR("lock enqueue: err: %d\n", rc);
+                        GOTO(out_unlink_de, rc = -EIO);
+                }
+        } else {
+                lock = lustre_handle2object(&lockh);
+                LDLM_DEBUG(lock, "matched");
+        }
+        ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
         down(&dir->i_sem);
         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
@@ -403,6 +434,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 GOTO(out_unlink_dchild, rc = -ESTALE);
         }
 
+#if 0 /* in intent case the client doesn't have the inode */
         if (inode->i_ino != rec->ur_fid2->id) {
                 CERROR("inode and FID ID do not match (%ld != %Ld)\n",
                        inode->i_ino, rec->ur_fid2->id);
@@ -415,6 +447,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LBUG();
                 GOTO(out_unlink_dchild, rc = -ESTALE);
         }
+#endif
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
 
@@ -426,6 +459,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 rc = vfs_rmdir(dir, dchild);
                 break;
         default:
+                if (offset) {
+                        obdo = lustre_msg_buf(req->rq_repmsg, 1); 
+                        rc = mds_fs_get_obdo(mds, inode, obdo);
+                        if (rc < 0)
+                                CDEBUG(D_INFO, "No obdo for ino %ld err %d\n",
+                                       inode->i_ino, rc);
+                }
+
                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
                 if (!handle)
                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
@@ -444,10 +485,39 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         EXIT;
 out_unlink_dchild:
+        res_id[0] = inode->i_ino;
         l_dput(dchild);
 out_unlink_de:
         up(&dir->i_sem);
+        lock = lustre_handle2object(&lockh);
+        ldlm_lock_decref(lock, lock_mode);
+        if (!rc) { 
+                /* Take an exclusive lock on the resource that we're
+                 * about to free, to force everyone to drop their
+                 * locks. */
+                LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
+                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+                                      NULL, mds->mds_local_namespace, NULL, 
+                                      res_id,
+                                      LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
+                                      (void *)mds_lock_callback, NULL, 0, 
+                                      &lockh);
+                if (rc) 
+                        CERROR("failed to get child inode lock (child ino %Ld, "
+                               "dir ino %ld)\n",
+                               res_id[0], de->d_inode->i_ino);
+        }
+
         l_dput(de);
+
+        if (!rc) { 
+                lock = lustre_handle2object(&lockh);
+                ldlm_lock_decref(lock, LCK_EX);
+                if (ldlm_cli_cancel(lock->l_client, lock))
+                        CERROR("failed to get child inode lock ino %Ld\n", 
+                               res_id[0]);
+        }
+
 out_unlink:
         req->rq_status = rc;
         return 0;
index 7590bfd..82a28eb 100644 (file)
@@ -755,7 +755,8 @@ static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!osc->osc_conn)
                 RETURN(-ENOENT);
 
-        obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+        obddev->obd_namespace =
+                ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
         if (obddev->obd_namespace == NULL)
                 GOTO(out_conn, rc = -ENOMEM);
 
index 79592f6..386cd7e 100644 (file)
@@ -630,7 +630,8 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(error_dec, err = -EINVAL);
         }
 
-        obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+        obddev->obd_namespace =
+                ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL)
                 LBUG();
 
index 01bee46..61ca919 100644 (file)
@@ -92,6 +92,7 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
         OBD_ALLOC(bulk, sizeof(*bulk));
         if (bulk != NULL) {
                 bulk->b_connection = ptlrpc_connection_addref(conn);
+                atomic_set(&bulk->b_pages_remaining, 0);
                 init_waitqueue_head(&bulk->b_waitq);
                 INIT_LIST_HEAD(&bulk->b_page_list);
         }
@@ -183,6 +184,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
         request->rq_reqmsg->target_id = HTON__u32(cl->cli_target_devno);
 
         INIT_LIST_HEAD(&request->rq_list);
+        INIT_LIST_HEAD(&request->rq_multi);
 
         /* this will be dec()d once in req_finished, once in free_committed */
         atomic_set(&request->rq_refcount, 2);
@@ -232,7 +234,7 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
         }
 
         ptlrpc_put_connection(request->rq_connection);
-
+        list_del(&request->rq_multi);
         OBD_FREE(request, sizeof(*request));
         EXIT;
 }
@@ -314,8 +316,12 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err)
         }
 
         if (req->rq_repmsg->status != 0) {
-                CERROR("req->rq_repmsg->status is %d\n",
-                       req->rq_repmsg->status);
+                if (req->rq_repmsg->status < 0)
+                        CERROR("req->rq_repmsg->status is %d\n",
+                               req->rq_repmsg->status);
+                else
+                        CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n",
+                               req->rq_repmsg->status);
                 /* XXX: translate this error from net to host */
                 RETURN(req->rq_repmsg->status);
         }