Whamcloud - gitweb
- Grammatical, LDLM updates to network.lyx
authorpschwan <pschwan>
Tue, 6 Aug 2002 02:02:13 +0000 (02:02 +0000)
committerpschwan <pschwan>
Tue, 6 Aug 2002 02:02:13 +0000 (02:02 +0000)
- dlm cleanups, bugfixes
- there are now two totally separate DLM callback packets and functions
- beginnings of ldlm /proc bits
- the lock server can now change the lock mode, in addition to the resource
- fix one part of the open() object creation race by adding a semaphore that's
  taken in ll_file_open
- some cleanup in obdclass/proc_lustre.c; much more is needed

26 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/obd.h
lustre/include/linux/obd_support.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/lib/l_net.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdclass/proc_lustre.c
lustre/obdfilter/filter.c
lustre/ost/ost_handler.c
lustre/tests/create.pl
lustre/utils/Makefile.am
lustre/utils/obdctl.c

index cf1e79b..f8b4c21 100644 (file)
@@ -7,6 +7,7 @@
 
 #ifdef __KERNEL__
 
+#include <linux/proc_fs.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 
@@ -80,13 +81,15 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
 */
 
 struct ldlm_namespace {
-        char                 *ns_name;
-        struct ptlrpc_client  ns_rpc_client; /* used for revocation callbacks */
-        __u32                 ns_client; /* is this a client-side lock tree? */
-        struct list_head     *ns_hash; /* hash table for ns */
-        __u32                 ns_refcount; /* count of resources in the hash */
-        struct list_head      ns_root_list; /* all root resources in ns */
-        struct lustre_lock    ns_lock; /* protects hash, refcount, list */
+        char                  *ns_name;
+        struct ptlrpc_client   ns_rpc_client;/* used for revocation callbacks */
+        __u32                  ns_client; /* is this a client-side lock tree? */
+        struct list_head      *ns_hash; /* hash table for ns */
+        __u32                  ns_refcount; /* count of resources in the hash */
+        struct list_head       ns_root_list; /* all root resources in ns */
+        struct lustre_lock     ns_lock; /* protects hash, refcount, list */
+        struct list_head       ns_list_chain; /* position in global NS list */
+        struct proc_dir_entry *ns_proc_dir;
 };
 
 /* 
@@ -101,9 +104,9 @@ struct ldlm_namespace {
 
 struct ldlm_lock;
 
-typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh,
-                                  struct ldlm_lock_desc *new, void *data,
-                                  __u32 data_len);
+typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock,
+                                      struct ldlm_lock_desc *new, void *data,
+                                      __u32 data_len);
 
 typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); 
 
@@ -120,7 +123,7 @@ struct ldlm_lock {
         ldlm_mode_t           l_granted_mode;
 
         ldlm_completion_callback    l_completion_ast;
-        ldlm_lock_callback    l_blocking_ast;
+        ldlm_blocking_callback    l_blocking_ast;
 
         struct ptlrpc_connection *l_connection;
         struct ptlrpc_client *l_client;
@@ -160,10 +163,10 @@ extern ldlm_res_policy ldlm_res_policy_table [];
 struct ldlm_resource {
         struct ldlm_namespace *lr_namespace;
         struct list_head       lr_hash;
-        struct list_head       lr_rootlink; /* link all root resources in NS */
         struct ldlm_resource  *lr_parent;   /* 0 for a root resource */
         struct list_head       lr_children; /* list head for child resources */
-        struct list_head       lr_childof;  /* part of child list of parent */
+        struct list_head       lr_childof;  /* part of ns_root_list if root res,
+                                             * part of lr_children if child */
 
         struct list_head       lr_granted;
         struct list_head       lr_converting;
@@ -275,7 +278,7 @@ ldlm_lock_create(struct ldlm_namespace *ns,
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie,
                                int cookie_len, int *flags,
                                ldlm_completion_callback completion,
-                               ldlm_lock_callback blocking);
+                               ldlm_blocking_callback blocking);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
@@ -289,6 +292,8 @@ int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn);
 /* resource.c */
 struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local);
 int ldlm_namespace_free(struct ldlm_namespace *ns);
+int ldlm_proc_setup(struct obd_device *obd);
+void ldlm_proc_cleanup(struct obd_device *obd);
 
 /* resource.c - internal */
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
@@ -300,8 +305,10 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
                             struct ldlm_lock *lock);
 void ldlm_resource_unlink_lock(struct ldlm_lock *lock);
 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc);
-void ldlm_resource_dump(struct ldlm_resource *res);
-int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]);
+void ldlm_dump_all_namespaces(void);
+void ldlm_namespace_dump(struct ldlm_namespace *);
+void ldlm_resource_dump(struct ldlm_resource *);
+int ldlm_lock_change_resource(struct ldlm_lock *, __u64 new_resid[3]);
 
 /* ldlm_request.c */
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags);
@@ -315,7 +322,7 @@ int ldlm_cli_enqueue(struct lustre_handle *conn,
                      ldlm_mode_t mode,
                      int *flags,
                      ldlm_completion_callback completion,
-                     ldlm_lock_callback callback,
+                     ldlm_blocking_callback callback,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh);
@@ -329,7 +336,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           ldlm_mode_t mode,
                           int *flags,
                           ldlm_completion_callback completion,
-                          ldlm_lock_callback callback,
+                          ldlm_blocking_callback callback,
                           void *data,
                           __u32 data_len,
                           struct lustre_handle *lockh);
@@ -338,6 +345,11 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
 int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
 int ldlm_cli_cancel(struct lustre_handle *lockh);
 
+/* mds/handler.c */
+/* This has to be here because recurisve inclusion sucks. */
+int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                     void *data, __u32 data_len);
+
 #endif /* __KERNEL__ */
 
 /* ioctls for trying requests */
@@ -345,6 +357,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh);
 #define IOC_LDLM_MIN_NR                 40
 
 #define IOC_LDLM_TEST                   _IOWR('f', 40, long)
+#define IOC_LDLM_DUMP                   _IOWR('f', 41, long)
 #define IOC_LDLM_MAX_NR                 41
 
 #endif
index 4e14155..23f25c6 100644 (file)
@@ -165,7 +165,6 @@ typedef uint32_t        obd_count;
 
 #define OBD_FL_INLINEDATA       (0x00000001)
 #define OBD_FL_OBDMDEXISTS      (0x00000002)
-#define OBD_FL_CREATEONOPEN     (0x00000004)
 
 #define OBD_INLINESZ    60
 
@@ -422,7 +421,8 @@ struct lov_desc {
 #define LDLM_ENQUEUE       101
 #define LDLM_CONVERT       102
 #define LDLM_CANCEL        103
-#define LDLM_CALLBACK      104
+#define LDLM_BL_CALLBACK   104
+#define LDLM_CP_CALLBACK   105
 
 #define RES_NAME_SIZE 3
 #define RES_VERSION_SIZE 4
@@ -470,6 +470,7 @@ struct ldlm_request {
 struct ldlm_reply {
         __u32 lock_flags;
         __u64 lock_resource_name[RES_NAME_SIZE];
+        __u32 lock_mode;
         struct lustre_handle lock_handle;
         struct ldlm_extent lock_extent;   /* XXX make this policy 1 &2 */
         __u64  lock_policy_res1;
index 14ff974..02b4c6e 100644 (file)
@@ -42,14 +42,10 @@ struct ll_inode_md {
 
 #define LL_INLINESZ      60
 struct ll_inode_info {
-        int              lli_flags;
-        //        struct obdo     *lli_obdo;
-        struct  lov_stripe_md *lli_smd;
-        //        int              lli_obdo_mdsz;
-        //void            *lli_obdo_md;
-        char            *lli_symlink_name;
-        char             lli_inline[LL_INLINESZ];
-        struct lustre_handle lli_intent_lock_handle;
+        struct lov_stripe_md *lli_smd;
+        char                 *lli_symlink_name;
+        struct lustre_handle  lli_intent_lock_handle;
+        struct semaphore      lli_open_sem;
 };
 
 #define LL_SUPER_MAGIC 0x0BD00BD0
@@ -101,12 +97,6 @@ static inline struct ll_inode_info *ll_i2info(struct inode *inode)
         return (struct ll_inode_info *)&(inode->u.generic_ip);
 }
 
-static inline int ll_has_inline(struct inode *inode)
-{
-        return (ll_i2info(inode)->lli_flags & OBD_FL_INLINEDATA);
-}
-
-
 static inline struct lustre_handle *ll_i2obdconn(struct inode *inode)
 {
         return ll_s2obdconn(inode->i_sb);
index 34c90de..175c52b 100644 (file)
@@ -134,8 +134,6 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct lustre_handle *lockh);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt);
-int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                      void *data, int data_len, struct ptlrpc_request **req);
 int mds_reint(int offset, struct ptlrpc_request *req);
 
 /* mdc/mdc_request.c */
index cc58695..d2553bb 100644 (file)
@@ -12,6 +12,7 @@
 #include <linux/fs.h>
 #include <linux/list.h>
 #include <linux/smp_lock.h>
+#include <linux/proc_fs.h>
 
 #include <linux/lustre_idl.h>
 
index 887cfb9..3c0a9e1 100644 (file)
@@ -83,7 +83,8 @@ extern unsigned long obd_fail_loc;
 #define OBD_FAIL_LDLM_ENQUEUE            0x302
 #define OBD_FAIL_LDLM_CONVERT            0x303
 #define OBD_FAIL_LDLM_CANCEL             0x304
-#define OBD_FAIL_LDLM_CALLBACK           0x305
+#define OBD_FAIL_LDLM_BL_CALLBACK        0x305
+#define OBD_FAIL_LDLM_CP_CALLBACK        0x306
 
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
index c10d560..6eb1e60 100644 (file)
@@ -21,7 +21,7 @@
 
 /* lock types */
 char *ldlm_lockname[] = {
-        [0]      "--",
+        [0] "--",
         [LCK_EX] "EX",
         [LCK_PW] "PW",
         [LCK_PR] "PR",
@@ -30,30 +30,30 @@ char *ldlm_lockname[] = {
         [LCK_NL] "NL"
 };
 char *ldlm_typename[] = {
-        [LDLM_PLAIN]     "PLN",
-        [LDLM_EXTENT]    "EXT",
+        [LDLM_PLAIN] "PLN",
+        [LDLM_EXTENT] "EXT",
         [LDLM_MDSINTENT] "INT"
 };
 
 char *ldlm_it2str(int it)
 {
-        switch (it) { 
+        switch (it) {
         case IT_OPEN:
                 return "open";
         case IT_CREAT:
                 return "creat";
-        case (IT_OPEN|IT_CREAT):
+        case (IT_OPEN | IT_CREAT):
                 return "open|creat";
         case IT_MKDIR:
                 return "mkdir";
         case IT_LINK:
-               return "link";
+                return "link";
         case IT_SYMLINK:
                 return "symlink";
         case IT_UNLINK:
                 return "unlink";
         case IT_RMDIR:
-               return "rmdir";
+                return "rmdir";
         case IT_RENAME:
                 return "rename";
         case IT_RENAME2:
@@ -71,7 +71,7 @@ char *ldlm_it2str(int it)
         case IT_LOOKUP:
                 return "lookup";
         default:
-                CERROR("Unknown intent %d\n", it); 
+                CERROR("Unknown intent %d\n", it);
                 return "UNKNOWN";
         }
 }
@@ -80,20 +80,20 @@ extern kmem_cache_t *ldlm_lock_slab;
 
 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
 
-ldlm_res_compat ldlm_res_compat_table [] = {
+ldlm_res_compat ldlm_res_compat_table[] = {
         [LDLM_PLAIN] ldlm_plain_compat,
         [LDLM_EXTENT] ldlm_extent_compat,
         [LDLM_MDSINTENT] ldlm_plain_compat
 };
 
-ldlm_res_policy ldlm_res_policy_table [] = {
+ldlm_res_policy ldlm_res_policy_table[] = {
         [LDLM_PLAIN] NULL,
         [LDLM_EXTENT] ldlm_extent_policy,
         [LDLM_MDSINTENT] NULL
 };
 
-void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
-                              ldlm_mode_t mode, void *data))
+void ldlm_register_intent(int (*arg) (struct ldlm_lock * lock, void *req_cookie,
+                                      ldlm_mode_t mode, void *data))
 {
         ldlm_res_policy_table[LDLM_MDSINTENT] = arg;
 }
@@ -237,8 +237,16 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
         ENTRY;
 
         l_lock(&ns->ns_lock);
-        type = lock->l_resource->lr_type;
+        if (memcmp(new_resid, lock->l_resource->lr_name,
+                   sizeof(lock->l_resource->lr_name)) == 0) {
+                /* Nothing to do */
+                l_unlock(&ns->ns_lock);
+                RETURN(0);
+        }
 
+        type = lock->l_resource->lr_type;
+        if (new_resid[0] == 0)
+                LBUG();
         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
         if (lock->l_resource == NULL) {
                 LBUG();
@@ -266,7 +274,7 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
 
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
 {
-        lockh->addr = (__u64)(unsigned long)lock;
+        lockh->addr = (__u64) (unsigned long)lock;
         lockh->cookie = lock->l_random;
 }
 
@@ -291,7 +299,7 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
 
         retval = LDLM_LOCK_GET(lock);
         EXIT;
- out:
     out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         return retval;
 }
@@ -337,7 +345,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
 
         w->w_lock = LDLM_LOCK_GET(lock);
         list_add(&w->w_list, lock->l_resource->lr_tmp);
- out:
     out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         return;
 }
@@ -384,8 +392,6 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
          * run the callback. */
         if (!lock->l_readers && !lock->l_writers &&
             (lock->l_flags & LDLM_FL_CBPENDING)) {
-                struct lustre_handle lockh;
-
                 if (!lock->l_resource->lr_namespace->ns_client) {
                         CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
                         LBUG();
@@ -394,21 +400,20 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-                ldlm_lock2handle(lock, &lockh);
-                /* FIXME: -1 is a really, really bad 'desc' */
-                lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
+                /* FIXME: need a real 'desc' here */
+                lock->l_blocking_ast(lock, NULL, lock->l_data,
                                      lock->l_data_len);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
-        LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
+        LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
+        LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
 
         EXIT;
 }
 
 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
-                             struct list_head *queue)
+                                 struct list_head *queue)
 {
         struct list_head *tmp, *pos;
         int rc = 1;
@@ -482,7 +487,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock)
         EXIT;
 }
 
-/* returns a referenced lock or NULL */ 
+/* returns a referenced lock or NULL */
 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                                       struct ldlm_extent *extent)
 {
@@ -517,7 +522,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
  * Returns 1 if it finds an already-existing lock that is compatible; in this
  * case, lockh is filled in with a addref()ed lock 
 */
-int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
+int ldlm_lock_match(struct ldlm_namespace *ns, __u64 * res_id, __u32 type,
                     void *cookie, int cookielen, ldlm_mode_t mode,
                     struct lustre_handle *lockh)
 {
@@ -541,14 +546,14 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
                 GOTO(out, rc = 1);
 
         EXIT;
- out:
     out:
         ldlm_resource_put(res);
         l_unlock(&ns->ns_lock);
 
         if (lock) {
                 ldlm_lock2handle(lock, lockh);
                 if (lock->l_completion_ast)
-                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
+                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
         }
         if (rc)
                 LDLM_DEBUG(lock, "matched");
@@ -560,10 +565,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
 /*   Returns a referenced, lock */
 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                                    struct lustre_handle *parent_lock_handle,
-                                   __u64 *res_id, __u32 type,
-                                   ldlm_mode_t mode,
-                                   void *data,
-                                   __u32 data_len)
+                                   __u64 * res_id, __u32 type,
+                                   ldlm_mode_t mode, void *data, __u32 data_len)
 {
         struct ldlm_resource *res, *parent_res = NULL;
         struct ldlm_lock *lock, *parent_lock;
@@ -590,11 +593,11 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
 }
 
 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                                void *cookie, int cookie_len,
                                int *flags,
                                ldlm_completion_callback completion,
-                               ldlm_lock_callback blocking)
+                               ldlm_blocking_callback blocking)
 {
         struct ldlm_resource *res;
         int local;
@@ -632,9 +635,12 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
                 GOTO(out, ELDLM_OK);
         }
 
-        /* If this is a local resource, put it on the appropriate list. */
-        /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */
-        list_del_init(&lock->l_res_link);
+        /* This distinction between local lock trees is very important; a client
+         * namespace only has information about locks taken by that client, and
+         * thus doesn't have enough information to decide for itself if it can
+         * be granted (below).  In this case, we do exactly what the server
+         * tells us to do, as dictated by the 'flags' */
+        ldlm_resource_unlink_lock(lock);
         if (local) {
                 if (*flags & LDLM_FL_BLOCK_CONV)
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
@@ -657,7 +663,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
                 *flags |= LDLM_FL_BLOCK_WAIT;
                 GOTO(out, ELDLM_OK);
         }
-        if (!ldlm_lock_compat(lock,0)) {
+        if (!ldlm_lock_compat(lock, 0)) {
                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
                 GOTO(out, ELDLM_OK);
@@ -665,7 +671,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
 
         ldlm_grant_lock(lock);
         EXIT;
- out:
     out:
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
@@ -704,12 +710,11 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_ast_work *w =
                         list_entry(tmp, struct ldlm_ast_work, w_list);
-                struct lustre_handle lockh;
 
-                ldlm_lock2handle(w->w_lock, &lockh);
                 if (w->w_blocking)
                         rc = w->w_lock->l_blocking_ast
-                                (&lockh, &w->w_desc, w->w_data, w->w_datalen);
+                                (w->w_lock, &w->w_desc, w->w_data,
+                                 w->w_datalen);
                 else
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
                 if (rc)
@@ -797,7 +802,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                         granted = 1;
                         /* FIXME: completion handling not with ns_lock held ! */
                         if (lock->l_completion_ast)
-                                lock->l_completion_ast(lock, 0); 
+                                lock->l_completion_ast(lock, 0);
                 }
         } else
                 list_add(&lock->l_res_link, res->lr_converting.prev);
index bc659f9..a1cf9b4 100644 (file)
@@ -21,7 +21,38 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
+                                    struct ldlm_lock_desc *desc,
+                                    void *data, __u32 data_len)
+{
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        struct ptlrpc_client *cl;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
+                              &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        memcpy(&body->lock_desc, desc, sizeof(*desc));
+
+        LDLM_DEBUG(lock, "server preparing blocking AST");
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
+
+        RETURN(rc);
+}
+
+static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
@@ -35,7 +66,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         }
 
         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
                               &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -44,6 +75,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
         body->lock_flags = flags;
+        ldlm_lock2desc(lock, &body->lock_desc);
 
         LDLM_DEBUG(lock, "server preparing completion AST");
         req->rq_replen = lustre_msg_size(0, NULL);
@@ -101,7 +133,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
 
         flags = dlm_req->lock_flags;
         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_server_completion_ast, ldlm_server_ast);
+                                ldlm_server_completion_ast,
+                                ldlm_server_blocking_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -112,9 +145,11 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
                        sizeof(lock->l_extent));
-        if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
+        if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
                        sizeof(dlm_rep->lock_resource_name));
+                dlm_rep->lock_mode = lock->l_req_mode;
+        }
 
         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
         EXIT;
@@ -213,97 +248,104 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_callback(struct ptlrpc_request *req)
+static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock_desc *descp = NULL;
         struct ldlm_lock *lock;
-        __u64 is_blocking_ast = 0;
-        int rc;
+        int rc, do_ast;
         ENTRY;
 
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc) {
-                CERROR("out of memory\n");
+        if (rc)
                 RETURN(-ENOMEM);
-        }
+        rc = ptlrpc_reply(req->rq_svc, req);
+        if (rc)
+                RETURN(rc);
+
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        /* We must send the reply first, so that the thread is free to handle
-         * any requests made in common_callback() */
+        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        if (!lock) {
+                CERROR("blocking callback on lock %Lx - lock disappeared\n",
+                       dlm_req->lock_handle1.addr);
+                RETURN(0);
+        }
+
+        LDLM_DEBUG(lock, "client blocking AST callback handler START");
+
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock->l_flags |= LDLM_FL_CBPENDING;
+        do_ast = (!lock->l_readers && !lock->l_writers);
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+        if (do_ast) {
+                LDLM_DEBUG(lock, "already unused, calling "
+                           "callback (%p)", lock->l_blocking_ast);
+                if (lock->l_blocking_ast != NULL) {
+                        lock->l_blocking_ast(lock, &dlm_req->lock_desc,
+                                             lock->l_data, lock->l_data_len);
+                }
+        } else
+                LDLM_DEBUG(lock, "Lock still has references, will be"
+                           " cancelled later");
+
+        LDLM_DEBUG(lock, "client blocking callback handler END");
+        LDLM_LOCK_PUT(lock);
+        RETURN(0);
+}
+
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
+{
+        struct list_head ast_list = LIST_HEAD_INIT(ast_list);
+        struct ldlm_request *dlm_req;
+        struct ldlm_lock *lock;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(-ENOMEM);
         rc = ptlrpc_reply(req->rq_svc, req);
-        if (rc != 0)
+        if (rc)
                 RETURN(rc);
 
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
-                CERROR("callback on lock %Lx - lock disappeared\n",
+                CERROR("completion callback on lock %Lx - lock disappeared\n",
                        dlm_req->lock_handle1.addr);
                 RETURN(0);
         }
 
-        /* check if this is a blocking AST */
-        if (dlm_req->lock_desc.l_req_mode !=
-            dlm_req->lock_desc.l_granted_mode) {
-                descp = &dlm_req->lock_desc;
-                is_blocking_ast = 1;
-        }
+        LDLM_DEBUG(lock, "client completion callback handler START");
 
-        LDLM_DEBUG(lock, "client %s callback handler START",
-                   is_blocking_ast ? "blocked" : "completion");
-
-        if (descp) {
-                int do_ast;
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
-                lock->l_flags |= LDLM_FL_CBPENDING;
-                do_ast = (!lock->l_readers && !lock->l_writers);
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-
-                if (do_ast) {
-                        LDLM_DEBUG(lock, "already unused, calling "
-                                   "callback (%p)", lock->l_blocking_ast);
-                        if (lock->l_blocking_ast != NULL) {
-                                struct lustre_handle lockh;
-                                ldlm_lock2handle(lock, &lockh);
-                                lock->l_blocking_ast(&lockh, descp,
-                                                     lock->l_data,
-                                                     lock->l_data_len);
-                        }
-                } else {
-                        LDLM_DEBUG(lock, "Lock still has references, will be"
-                                   " cancelled later");
-                }
-                LDLM_LOCK_PUT(lock);
-        } else {
-                struct list_head ast_list = LIST_HEAD_INIT(ast_list);
-
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
-                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
-
-                /* If we receive the completion AST before the actual enqueue
-                 * returned, then we might need to switch resources. */
-                ldlm_resource_unlink_lock(lock);
-                if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
-                           lock->l_resource->lr_name,
-                           sizeof(__u64) * RES_NAME_SIZE) != 0) {
-                        ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
-                        LDLM_DEBUG(lock, "completion AST, new resource");
-                }
-                lock->l_resource->lr_tmp = &ast_list;
-                ldlm_grant_lock(lock);
-                lock->l_resource->lr_tmp = NULL;
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                LDLM_LOCK_PUT(lock);
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
 
-                ldlm_run_ast_work(&ast_list);
+        /* If we receive the completion AST before the actual enqueue returned,
+         * then we might need to switch resources. */
+        ldlm_resource_unlink_lock(lock);
+        if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+                   lock->l_resource->lr_name,
+                   sizeof(__u64) * RES_NAME_SIZE) != 0) {
+                ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
+                LDLM_DEBUG(lock, "completion AST, new resource");
         }
+        lock->l_resource->lr_tmp = &ast_list;
+        ldlm_grant_lock(lock);
+        lock->l_resource->lr_tmp = NULL;
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
+        LDLM_LOCK_PUT(lock);
 
-        LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
-                          is_blocking_ast ? "blocked" : "completion", lock);
+        ldlm_run_ast_work(&ast_list);
+
+        LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
+                          lock);
         RETURN(0);
 }
 
-
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         int rc;
@@ -321,10 +363,15 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 GOTO(out, rc = -EINVAL);
         }
         switch (req->rq_reqmsg->opc) {
-        case LDLM_CALLBACK:
-                CDEBUG(D_INODE, "callback\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
-                rc = ldlm_handle_callback(req);
+        case LDLM_BL_CALLBACK:
+                CDEBUG(D_INODE, "blocking ast\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+                rc = ldlm_handle_bl_callback(req);
+                break;
+        case LDLM_CP_CALLBACK:
+                CDEBUG(D_INODE, "completion ast\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+                rc = ldlm_handle_cp_callback(req);
                 break;
 
         default:
@@ -345,7 +392,7 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
 {
         struct obd_device *obddev = class_conn2obd(conn);
         struct ptlrpc_connection *connection;
-        int err;
+        int err = 0;
         ENTRY;
 
         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
@@ -365,11 +412,13 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
 
         switch (cmd) {
-        case IOC_LDLM_TEST: {
+        case IOC_LDLM_TEST:
                 err = ldlm_test(obddev, connection);
                 CERROR("-- done err %d\n", err);
                 GOTO(out, err);
-        }
+        case IOC_LDLM_DUMP:
+                ldlm_dump_all_namespaces();
+                GOTO(out, err);
         default:
                 GOTO(out, err = -EINVAL);
         }
@@ -387,17 +436,20 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
-        int rc;
-        int i;
+        int rc, i;
         ENTRY;
 
         MOD_INC_USE_COUNT;
-        ldlm->ldlm_service =
-                ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
-                                LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
+        rc = ldlm_proc_setup(obddev);
+        if (rc != 0)
+                GOTO(out_dec, rc);
+
+        ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
+                                             LDLM_REPLY_PORTAL, "self",
+                                             ldlm_callback_handler);
         if (!ldlm->ldlm_service) {
                 LBUG();
-                GOTO(out_dec, rc = -ENOMEM);
+                GOTO(out_proc, rc = -ENOMEM);
         }
 
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
@@ -413,10 +465,12 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
 
         RETURN(0);
 
-out_thread:
+ out_thread:
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
-out_dec:
+ out_proc:
+        ldlm_proc_cleanup(obddev);
+ out_dec:
         MOD_DEC_USE_COUNT;
         return rc;
 }
@@ -428,6 +482,7 @@ static int ldlm_cleanup(struct obd_device *obddev)
 
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
+        ldlm_proc_cleanup(obddev);
 
         MOD_DEC_USE_COUNT;
         RETURN(0);
@@ -441,7 +496,6 @@ struct obd_ops ldlm_obd_ops = {
         o_disconnect:  class_disconnect
 };
 
-
 static int __init ldlm_init(void)
 {
         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
@@ -483,6 +537,7 @@ EXPORT_SYMBOL(ldlm_unregister_intent);
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock2handle);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
index 5f21751..2b83255 100644 (file)
@@ -15,7 +15,6 @@
 
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
 {
-
         ENTRY;
 
         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
@@ -26,19 +25,19 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
                            " sleeping");
                 ldlm_lock_dump(lock);
                 ldlm_reprocess_all(lock->l_resource);
-                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
+                wait_event(lock->l_waitq, (lock->l_req_mode ==
+                                           lock->l_granted_mode));
                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
-        } else if (flags == LDLM_FL_WAIT_NOREPROC) { 
-                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
-        } else if (flags == 0) { 
+        } else if (flags == LDLM_FL_WAIT_NOREPROC) {
+                wait_event(lock->l_waitq, (lock->l_req_mode ==
+                                           lock->l_granted_mode));
+        } else if (flags == 0) {
                 wake_up(&lock->l_waitq);
-
         }
 
         RETURN(0);
 }
 
-
 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   struct lustre_handle *parent_lockh,
                                   __u64 *res_id,
@@ -47,16 +46,16 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   ldlm_mode_t mode,
                                   int *flags,
                                   ldlm_completion_callback completion,
-                                  ldlm_lock_callback callback,
+                                  ldlm_blocking_callback blocking,
                                   void *data,
                                   __u32 data_len,
                                   struct lustre_handle *lockh)
 {
-        struct ldlm_lock *lock; 
-        int err; 
+        struct ldlm_lock *lock;
+        int err;
 
-        if (ns->ns_client) { 
-                CERROR("Trying to cancel local lock\n"); 
+        if (ns->ns_client) {
+                CERROR("Trying to cancel local lock\n");
                 LBUG();
         }
 
@@ -67,9 +66,10 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
 
         ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
-        lock->l_connh = NULL; 
+        lock->l_connh = NULL;
 
-        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback);
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
+                                blocking);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -78,10 +78,11 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         if ((*flags) & LDLM_FL_LOCK_CHANGED)
                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
 
-        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
+                          lock);
 
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags); 
+                lock->l_completion_ast(lock, *flags);
 
         LDLM_DEBUG(lock, "client-side local enqueue END");
         EXIT;
@@ -91,8 +92,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         return err;
 }
 
-
-int ldlm_cli_enqueue(struct lustre_handle *connh, 
+int ldlm_cli_enqueue(struct lustre_handle *connh,
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -102,7 +102,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                      ldlm_mode_t mode,
                      int *flags,
                      ldlm_completion_callback completion,
-                     ldlm_lock_callback callback,
+                     ldlm_blocking_callback blocking,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh)
@@ -114,10 +114,11 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         int rc, size = sizeof(*body), req_passed_in = 1;
         ENTRY;
 
-        if (connh == NULL) 
-                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, 
-                                              type, cookie, cookielen, mode, 
-                                              flags, completion, callback, data, data_len, lockh);
+        if (connh == NULL)
+                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
+                                              type, cookie, cookielen, mode,
+                                              flags, completion, blocking, data,
+                                              data_len, lockh);
         connection = client_conn2cli(connh)->cl_conn;
 
         *flags = 0;
@@ -157,7 +158,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 size = sizeof(*reply);
                 req->rq_replen = lustre_msg_size(1, &size);
         }
-        lock->l_connh = connh; 
+        lock->l_connh = connh;
         lock->l_connection = ptlrpc_connection_addref(connection);
         lock->l_client = client_conn2cli(connh)->cl_client;
 
@@ -191,27 +192,38 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         /* If enqueue returned a blocked lock but the completion handler has
          * already run, then it fixed up the resource and we don't need to do it
          * again. */
-        if ((*flags) & LDLM_FL_LOCK_CHANGED &&
-            lock->l_req_mode != lock->l_granted_mode) {
-                CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
-                       "%ld\n", (long)reply->lock_resource_name[0],
-                       (long)lock->l_resource->lr_name[0]);
-
-                ldlm_lock_change_resource(lock, reply->lock_resource_name);
-                if (lock->l_resource == NULL) {
-                        LBUG();
-                        RETURN(-ENOMEM);
+        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+                int newmode = reply->lock_mode;
+                if (newmode && newmode != lock->l_req_mode) {
+                        LDLM_DEBUG(lock, "server returned different mode %s",
+                                   ldlm_lockname[newmode]);
+                        lock->l_req_mode = newmode;
+                }
+
+                if (reply->lock_resource_name[0] !=
+                    lock->l_resource->lr_name[0]) {
+                        CDEBUG(D_INFO, "remote intent success, locking %ld "
+                               "instead of %ld\n",
+                               (long)reply->lock_resource_name[0],
+                               (long)lock->l_resource->lr_name[0]);
+
+                        ldlm_lock_change_resource(lock,
+                                                  reply->lock_resource_name);
+                        if (lock->l_resource == NULL) {
+                                LBUG();
+                                RETURN(-ENOMEM);
+                        }
+                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
-                LDLM_DEBUG(lock, "client-side enqueue, new resource");
         }
 
         if (!req_passed_in)
                 ptlrpc_free_req(req);
 
         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
-                               callback);
+                               blocking);
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags); 
+                lock->l_completion_ast(lock, *flags);
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
@@ -221,7 +233,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         return rc;
 }
 
-int ldlm_match_or_enqueue(struct lustre_handle *connh, 
+int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           struct ptlrpc_request *req,
                           struct ldlm_namespace *ns,
                           struct lustre_handle *parent_lock_handle,
@@ -231,7 +243,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           ldlm_mode_t mode,
                           int *flags,
                           ldlm_completion_callback completion,
-                          ldlm_lock_callback callback,
+                          ldlm_blocking_callback blocking,
                           void *data,
                           __u32 data_len,
                           struct lustre_handle *lockh)
@@ -242,8 +254,8 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
         if (rc == 0) {
                 rc = ldlm_cli_enqueue(connh, req, ns,
                                       parent_lock_handle, res_id, type, cookie,
-                                      cookielen, mode, flags, completion, callback, data,
-                                      data_len, lockh);
+                                      cookielen, mode, flags, completion,
+                                      blocking, data, data_len, lockh);
                 if (rc != ELDLM_OK)
                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
                 RETURN(rc);
@@ -251,66 +263,17 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                 RETURN(0);
 }
 
-int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                    void *data, __u32 data_len)
+static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
+                                  int *flags)
 {
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        struct ptlrpc_client *cl;
-        int rc = 0, size = sizeof(*body);
-        ENTRY;
 
-        lock = ldlm_handle2lock(lockh);
-        if (lock == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
-                              &size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        memcpy(&body->lock_handle1, &lock->l_remote_handle,
-               sizeof(body->lock_handle1));
-
-        if (desc == NULL) {
-                CDEBUG(D_NET, "Sending granted AST\n");
-                ldlm_lock2desc(lock, &body->lock_desc);
-        } else {
-                CDEBUG(D_NET, "Sending blocked AST\n");
-                memcpy(&body->lock_desc, desc, sizeof(*desc));
-        }
-
-        LDLM_DEBUG(lock, "server preparing %s AST",
-                   desc == 0 ? "completion" : "blocked");
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-
-        EXIT;
- out:
-        LDLM_LOCK_PUT(lock);
-        return rc;
-}
-
-
-
-static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags)
-{
-
-        if (lock->l_resource->lr_namespace->ns_client) { 
-                CERROR("Trying to cancel local lock\n"); 
+        if (lock->l_resource->lr_namespace->ns_client) {
+                CERROR("Trying to cancel local lock\n");
                 LBUG();
         }
         LDLM_DEBUG(lock, "client-side local convert");
 
-        ldlm_lock_convert(lock, new_mode, flags); 
+        ldlm_lock_convert(lock, new_mode, flags);
         ldlm_reprocess_all(lock->l_resource);
 
         LDLM_DEBUG(lock, "client-side local convert handler END");
@@ -335,7 +298,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
                 RETURN(-EINVAL);
         }
         *flags = 0;
-        connh = lock->l_connh; 
+        connh = lock->l_connh;
 
         if (!connh)
                 return ldlm_cli_convert_local(lock, new_mode, flags);
@@ -368,7 +331,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         /* Go to sleep until the lock is granted. */
         /* FIXME: or cancelled. */
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
+                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
         EXIT;
  out:
         LDLM_LOCK_PUT(lock);
@@ -384,7 +347,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
-        lock = ldlm_handle2lock(lockh); 
+        lock = ldlm_handle2lock(lockh);
         if (!lock) {
                 /* It's possible that the decref that we did just before this
                  * cancel was the last reader/writer, and caused a cancel before
@@ -395,33 +358,34 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 RETURN(-EINVAL);
         }
 
-        if (lock->l_connh) { 
+        if (lock->l_connh) {
                 LDLM_DEBUG(lock, "client-side cancel");
-                req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL);
+                req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size,
+                                       NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
-                
+
                 body = lustre_msg_buf(req->rq_reqmsg, 0);
                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
                        sizeof(body->lock_handle1));
-                
+
                 req->rq_replen = lustre_msg_size(0, NULL);
-                
+
                 rc = ptlrpc_queue_wait(req);
                 rc = ptlrpc_check_status(req, rc);
                 ptlrpc_free_req(req);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
-                
+
                 ldlm_lock_cancel(lock);
-        } else { 
+        } else {
                 LDLM_DEBUG(lock, "client-side local cancel");
-                if (lock->l_resource->lr_namespace->ns_client) { 
-                        CERROR("Trying to cancel local lock\n"); 
+                if (lock->l_resource->lr_namespace->ns_client) {
+                        CERROR("Trying to cancel local lock\n");
                         LBUG();
                 }
-                ldlm_lock_cancel(lock); 
-                ldlm_reprocess_all(lock->l_resource); 
+                ldlm_lock_cancel(lock);
+                ldlm_reprocess_all(lock->l_resource);
                 LDLM_DEBUG(lock, "client-side local cancel handler END");
         }
 
index fcfca26..fec3c0a 100644 (file)
 
 kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
 
+spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED;
+struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list);
+static struct proc_dir_entry *ldlm_ns_proc_dir = NULL;
+
+int ldlm_proc_setup(struct obd_device *obd)
+{
+        ENTRY;
+
+        if (obd->obd_proc_entry == NULL)
+                RETURN(-EINVAL);
+
+        ldlm_ns_proc_dir = proc_mkdir("namespaces", obd->obd_proc_entry);
+        if (ldlm_ns_proc_dir == NULL) {
+                CERROR("Couldn't create /proc/lustre/ldlm/namespaces\n");
+                RETURN(-EPERM);
+        }
+        RETURN(0);
+}
+
+void ldlm_proc_cleanup(struct obd_device *obd)
+{
+        proc_lustre_remove_obd_entry("namespaces", obd);
+}
+
 struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
 {
         struct ldlm_namespace *ns = NULL;
@@ -50,6 +74,14 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
                 INIT_LIST_HEAD(bucket);
+
+        spin_lock(&ldlm_namespace_lock);
+        list_add(&ns->ns_list_chain, &ldlm_namespace_list);
+        ns->ns_proc_dir = proc_mkdir(ns->ns_name, ldlm_ns_proc_dir);
+        if (ns->ns_proc_dir == NULL)
+                CERROR("Unable to create proc directory for namespace.\n");
+        spin_unlock(&ldlm_namespace_lock);
+
         RETURN(ns);
 
  out:
@@ -104,6 +136,11 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
         if (!ns)
                 RETURN(ELDLM_OK);
 
+        spin_lock(&ldlm_namespace_lock);
+        list_del(&ns->ns_list_chain);
+        remove_proc_entry(ns->ns_name, ldlm_ns_proc_dir);
+        spin_unlock(&ldlm_namespace_lock);
+
         l_lock(&ns->ns_lock);
 
         for (i = 0; i < RES_HASH_SIZE; i++) {
@@ -201,10 +238,9 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
         list_add(&res->lr_hash, bucket);
 
-        if (parent == NULL) {
-                res->lr_parent = res;
-                list_add(&res->lr_rootlink, &ns->ns_root_list);
-        } else {
+        if (parent == NULL)
+                list_add(&res->lr_childof, &ns->ns_root_list);
+        else {
                 res->lr_parent = parent;
                 list_add(&res->lr_childof, &parent->lr_children);
         }
@@ -288,7 +324,6 @@ int ldlm_resource_put(struct ldlm_resource *res)
 
                 ns->ns_refcount--;
                 list_del(&res->lr_hash);
-                list_del(&res->lr_rootlink);
                 list_del(&res->lr_childof);
 
                 kmem_cache_free(ldlm_resource_slab, res);
@@ -333,6 +368,40 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
         memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
 }
 
+void ldlm_dump_all_namespaces(void)
+{
+        struct list_head *tmp;
+
+        spin_lock(&ldlm_namespace_lock);
+
+        list_for_each(tmp, &ldlm_namespace_list) {
+                struct ldlm_namespace *ns;
+                ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
+                ldlm_namespace_dump(ns);
+        }
+
+        spin_unlock(&ldlm_namespace_lock);
+}
+
+void ldlm_namespace_dump(struct ldlm_namespace *ns)
+{
+        struct list_head *tmp;
+
+        l_lock(&ns->ns_lock);
+        CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name,
+               ns->ns_refcount, ns->ns_client);
+
+        list_for_each(tmp, &ns->ns_root_list) {
+                struct ldlm_resource *res;
+                res = list_entry(tmp, struct ldlm_resource, lr_childof);
+
+                /* Once we have resources with children, this should really dump
+                 * them recursively. */
+                ldlm_resource_dump(res);
+        }
+        l_unlock(&ns->ns_lock);
+}
+
 void ldlm_resource_dump(struct ldlm_resource *res)
 {
         struct list_head *tmp;
index 6b5daba..c0c8f13 100644 (file)
@@ -27,21 +27,13 @@ static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED;
 static struct list_head ctl_threads;
 static int regression_running = 0;
 
-static int ldlm_test_callback(struct lustre_handle *lockh,
-                              struct ldlm_lock_desc *new,
-                              void *data, __u32 data_len)
+static int ldlm_blocking_ast(struct ldlm_lock *lock,
+                             struct ldlm_lock_desc *new,
+                             void *data, __u32 data_len)
 {
-        struct ldlm_lock *lock;
         ENTRY;
-
-        lock = ldlm_handle2lock(lockh);
-        if (lock == NULL) {
-                CERROR("invalid handle in callback\n");
-                RETURN(0);
-        }
-
-        printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new);
-        return 0;
+        CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new);
+        RETURN(0);
 }
 
 int ldlm_test_basics(struct obd_device *obddev)
@@ -61,7 +53,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         if (lock1 == NULL)
                 LBUG();
         err = ldlm_lock_enqueue(lock1, NULL, 0, &flags,
-                                ldlm_completion_ast, ldlm_test_callback);
+                                ldlm_completion_ast, ldlm_blocking_ast);
         if (err != ELDLM_OK)
                 LBUG();
 
@@ -69,7 +61,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         if (lock == NULL)
                 LBUG();
         err = ldlm_lock_enqueue(lock, NULL, 0, &flags,
-                                ldlm_completion_ast, ldlm_test_callback);
+                                ldlm_completion_ast, ldlm_blocking_ast);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_BLOCK_GRANTED))
@@ -169,9 +161,9 @@ static int ldlm_test_network(struct obd_device *obddev,
 
         /* FIXME: this needs a connh as 3rd paramter, before it will work */
 
-        err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT,
-                               &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, NULL, 0,
-                               &lockh1);
+        err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id,
+                               LDLM_EXTENT, &ext, sizeof(ext), LCK_PR, &flags,
+                               NULL, NULL, NULL, 0, &lockh1);
         CERROR("ldlm_cli_enqueue: %d\n", err);
         if (err == ELDLM_OK)
                 ldlm_lock_decref(&lockh1, LCK_PR);
@@ -221,8 +213,8 @@ static int ldlm_test_main(void *data)
 
                 rc = ldlm_cli_enqueue(NULL, NULL, ns, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_completion_ast, ldlm_test_callback, NULL, 0,
-                                      &lockh);
+                                      &flags, ldlm_completion_ast,
+                                      ldlm_blocking_ast, NULL, 0, &lockh);
                 if (rc < 0) {
                         CERROR("ldlm_cli_enqueue: %d\n", rc);
                         LBUG();
index 4c4a016..27cd3ef 100644 (file)
@@ -38,7 +38,7 @@
 #include <linux/lustre_net.h>
 #include <linux/lustre_dlm.h>
 
-struct client_obd *client_conn2cli(struct lustre_handle *conn) 
+struct client_obd *client_conn2cli(struct lustre_handle *conn)
 {
         struct obd_export *export = class_conn2export(conn);
         if (!export)
@@ -85,7 +85,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
 
         mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid);
         if (!mdc->cl_conn)
-                RETURN(-ENOENT); 
+                RETURN(-ENOENT);
 
         OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client));
         if (mdc->cl_client == NULL)
@@ -133,36 +133,37 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd)
 {
         struct client_obd *cli = &obd->u.cli;
         struct ptlrpc_request *request;
-        int rc, size[] = {sizeof(cli->cl_target_uuid), 
+        int rc, size[] = {sizeof(cli->cl_target_uuid),
                           sizeof(obd->obd_uuid) };
         char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid};
         int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_CONNECT : MDS_CONNECT;
 
         ENTRY;
-        down(&cli->cl_sem); 
+        down(&cli->cl_sem);
         MOD_INC_USE_COUNT;
         rc = class_connect(conn, obd);
-        if (!rc) 
+        if (!rc)
                 cli->cl_conn_count++;
-        else { 
+        else {
                 MOD_DEC_USE_COUNT;
                 up(&cli->cl_sem);
                 RETURN(rc);
         }
-                
-        if (cli->cl_conn_count > 1) { 
+
+        if (cli->cl_conn_count > 1) {
                 up(&cli->cl_sem);
                 RETURN(0);
         }
 
-        obd->obd_namespace =
-                ldlm_namespace_new("cli", LDLM_NAMESPACE_CLIENT);
-        if (obd->obd_namespace == NULL) { 
+        obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
+                                                LDLM_NAMESPACE_CLIENT);
+        if (obd->obd_namespace == NULL) {
                 up(&cli->cl_sem);
                 RETURN(-ENOMEM);
         }
 
-        request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 2, size, tmp);
+        request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 2, size,
+                                  tmp);
         if (!request)
                 GOTO(out_disco, -ENOMEM);
 
@@ -202,18 +203,18 @@ int client_obd_disconnect(struct lustre_handle *conn)
         ENTRY;
 
         down(&obd->u.cli.cl_sem);
-        if (!obd->u.cli.cl_conn_count) { 
-                CERROR("disconnecting disconnected device (%s)\n", 
-                       obd->obd_name); 
+        if (!obd->u.cli.cl_conn_count) {
+                CERROR("disconnecting disconnected device (%s)\n",
+                       obd->obd_name);
                 RETURN(0);
         }
 
-        obd->u.cli.cl_conn_count--; 
+        obd->u.cli.cl_conn_count--;
         if (obd->u.cli.cl_conn_count)
                 GOTO(class_only, 0);
 
         ldlm_namespace_free(obd->obd_namespace);
-        obd->obd_namespace = NULL; 
+        obd->obd_namespace = NULL;
         request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
@@ -221,14 +222,14 @@ int client_obd_disconnect(struct lustre_handle *conn)
         request->rq_replen = lustre_msg_size(0, NULL);
 
         rc = ptlrpc_queue_wait(request);
-        if (rc) 
+        if (rc)
                 GOTO(out, rc);
  class_only:
         rc = class_disconnect(conn);
         if (!rc)
                 MOD_DEC_USE_COUNT;
  out:
-        if (request) 
+        if (request)
                 ptlrpc_free_req(request);
         up(&obd->u.cli.cl_sem);
         RETURN(rc);
index 22c198a..19ff845 100644 (file)
@@ -48,23 +48,32 @@ static int ll_file_open(struct inode *inode, struct file *file)
         /*  delayed create of object (intent created inode) */
         /*  XXX object needs to be cleaned up if mdc_open fails */
         /*  XXX error handling appropriate here? */
-        if (lli->lli_smd == NULL || lli->lli_smd->lmd_object_id == 0) {
+        if (lli->lli_smd == NULL) {
                 struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb));
                 struct inode * inode = file->f_dentry->d_inode;
 
-                lli->lli_smd = NULL; 
-                oa =  obdo_alloc();
-                if (!oa) { 
-                        RETURN(-ENOMEM);
+                down(&lli->lli_open_sem);
+                /* Check to see if we lost the race */
+                if (lli->lli_smd == NULL) {
+                        oa = obdo_alloc();
+                        if (!oa) {
+                                up(&lli->lli_open_sem);
+                                RETURN(-ENOMEM);
+                        }
+                        oa->o_mode = S_IFREG | 0600;
+                        oa->o_easize = mdc->cl_max_mdsize;
+                        oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLEASIZE;
+                        rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd);
+                        if (rc) {
+                                obdo_free(oa);
+                                up(&lli->lli_open_sem);
+                                RETURN(rc);
+                        }
+                        md = lli->lli_smd;
                 }
-                oa->o_mode = S_IFREG | 0600;
-                oa->o_easize = mdc->cl_max_mdsize;
-                oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLEASIZE;
-                rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd);
-                if (rc)
-                        RETURN(rc);
-                md = lli->lli_smd;
-                lli->lli_flags &= ~OBD_FL_CREATEONOPEN;
+                if (lli->lli_smd && lli->lli_smd->lmd_object_id == 0)
+                        LBUG();
+                up(&lli->lli_open_sem);
         }
 
         fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
index 7a43976..5b1cf5f 100644 (file)
@@ -169,14 +169,17 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                         //else
                         //        GOTO(err, it->it_status);
                 } else if (it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK |
-                                        IT_RMDIR | IT_SETATTR | IT_LOOKUP |
-                                        IT_OPEN)) {
+                                        IT_RMDIR | IT_SETATTR | IT_LOOKUP)) {
                         /* For remove/check, we want the lookup to succeed */
                         request = (struct ptlrpc_request *)it->it_data;
                         if (it->it_status)
                                 GOTO(negative, NULL);
                         //else
                         //        GOTO(err, it->it_status);
+                } else if (it->it_op == IT_OPEN) {
+                        request = (struct ptlrpc_request *)it->it_data;
+                        if (it->it_status && it->it_status != -EEXIST)
+                                GOTO(negative, NULL);
                 } else if (it->it_op == IT_RENAME2) {
                         /* Set below to be a dentry from the IT_RENAME op */
                         inode = ((struct dentry *)(it->it_data))->d_inode;
@@ -224,7 +227,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
  negative:
         dentry->d_op = &ll_d_ops;
         d_add(dentry, inode);
-        if (it->it_op == IT_LOOKUP || (it->it_op == IT_OPEN && it->it_status)) {
+        if (it->it_op == IT_LOOKUP) {
                 ll_intent_release(dentry);
                 ptlrpc_free_req(request);
         }
@@ -396,7 +399,6 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode)
 
         if (dentry->d_it->it_disposition) {
                 ii = ll_i2info(inode);
-                ii->lli_flags |= OBD_FL_CREATEONOPEN;
                 memcpy(&ii->lli_intent_lock_handle,
                        dentry->d_it->it_lock_handle,
                        sizeof(struct lustre_handle));
index 315c2d3..f49ae28 100644 (file)
@@ -397,43 +397,47 @@ inline int ll_stripe_md_size(struct super_block *sb)
         return mdc->cl_max_mdsize;
 }
 
-static void ll_to_inode(struct inode *dst, struct ll_inode_md *md)
+static void ll_read_inode2(struct inode *inode, void *opaque)
 {
+        struct ll_inode_md *md = opaque;
         struct mds_body *body = md->body;
-        struct ll_inode_info *ii = ll_i2info(dst);
+        struct ll_inode_info *ii = ll_i2info(inode);
+        ENTRY;
+
+        sema_init(&ii->lli_open_sem, 1);
 
         /* core attributes first */
         if (body->valid & OBD_MD_FLID)
-                dst->i_ino = body->ino;
+                inode->i_ino = body->ino;
         if (body->valid & OBD_MD_FLATIME)
-                dst->i_atime = body->atime;
+                inode->i_atime = body->atime;
         if (body->valid & OBD_MD_FLMTIME)
-                dst->i_mtime = body->mtime;
+                inode->i_mtime = body->mtime;
         if (body->valid & OBD_MD_FLCTIME)
-                dst->i_ctime = body->ctime;
+                inode->i_ctime = body->ctime;
         if (body->valid & OBD_MD_FLSIZE)
-                dst->i_size = body->size;
+                inode->i_size = body->size;
         if (body->valid & OBD_MD_FLMODE)
-                dst->i_mode = body->mode;
+                inode->i_mode = body->mode;
         if (body->valid & OBD_MD_FLUID)
-                dst->i_uid = body->uid;
+                inode->i_uid = body->uid;
         if (body->valid & OBD_MD_FLGID)
-                dst->i_gid = body->gid;
+                inode->i_gid = body->gid;
         if (body->valid & OBD_MD_FLFLAGS)
-                dst->i_flags = body->flags;
+                inode->i_flags = body->flags;
         if (body->valid & OBD_MD_FLNLINK)
-                dst->i_nlink = body->nlink;
+                inode->i_nlink = body->nlink;
         if (body->valid & OBD_MD_FLGENER)
-                dst->i_generation = body->generation;
+                inode->i_generation = body->generation;
         if (body->valid & OBD_MD_FLRDEV)
-                dst->i_rdev = body->extra;
+                inode->i_rdev = body->extra;
         //if (body->valid & OBD_MD_FLEASIZE)
         if (md && md->md && md->md->lmd_stripe_count) { 
                 struct lov_stripe_md *smd = md->md;
-                int size = ll_stripe_md_size(dst->i_sb);
+                int size = ll_stripe_md_size(inode->i_sb);
                 if (md->md->lmd_easize != size) { 
                         CERROR("Striping metadata size error %ld\n",
-                               dst->i_ino); 
+                               inode->i_ino); 
                         LBUG();
                 }
                 OBD_ALLOC(ii->lli_smd, size);
@@ -443,14 +447,6 @@ static void ll_to_inode(struct inode *dst, struct ll_inode_md *md)
                 }
                 memcpy(ii->lli_smd, smd, size);
         }
-} /* ll_to_inode */
-
-static void ll_read_inode2(struct inode *inode, void *opaque)
-{
-        struct ll_inode_md *md = opaque;
-
-        ENTRY;
-        ll_to_inode(inode, md);
 
         /* OIDEBUG(inode); */
 
@@ -468,8 +464,7 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
                 inode->i_op = &ll_fast_symlink_inode_operations;
                 EXIT;
         } else {
-                init_special_inode(inode, inode->i_mode,
-                                   ((int *)ll_i2info(inode)->lli_inline)[0]);
+                init_special_inode(inode, inode->i_mode, inode->i_rdev);
                 EXIT;
         }
 
index e6de13e..bf725c2 100644 (file)
@@ -187,19 +187,14 @@ int mdc_getattr(struct lustre_handle *conn,
         return rc;
 }
 
-static int mdc_lock_callback(struct lustre_handle *lockh,
-                             struct ldlm_lock_desc *desc, void *data,
-                             int data_len, struct ptlrpc_request **req)
+static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                            void *data, __u32 data_len)
 {
         int rc;
         struct inode *inode = data;
+        struct lustre_handle lockh;
         ENTRY;
 
-        if (desc == NULL) {
-                /* Completion AST.  Do nothing. */
-                RETURN(0);
-        }
-
         if (data_len != sizeof(*inode)) {
                 CERROR("data_len should be %d, but is %d\n", sizeof(*inode),
                        data_len);
@@ -215,7 +210,8 @@ static int mdc_lock_callback(struct lustre_handle *lockh,
                 invalidate_inode_pages(inode);
         }
 
-        rc = ldlm_cli_cancel(lockh);
+        ldlm_lock2handle(lock, &lockh);
+        rc = ldlm_cli_cancel(&lockh);
         if (rc < 0) {
                 CERROR("ldlm_cli_cancel: %d\n", rc);
                 LBUG();
@@ -340,10 +336,15 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 RETURN(-EINVAL);
         }
 #warning FIXME: the data here needs to be different if a lock was granted for a different inode
-        rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, lock_type,
-                              NULL, 0, lock_mode, &flags, ldlm_completion_ast,
-                              (void *)mdc_lock_callback, data, datalen, lockh);
-        if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) {
+        rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id,
+                              lock_type, NULL, 0, lock_mode, &flags,
+                              ldlm_completion_ast, mdc_blocking_ast, data,
+                              datalen, lockh);
+        if (rc == -ENOENT) {
+                /* This can go when we're sure that this can never happen */
+                LBUG();
+        }
+        if (rc == ELDLM_LOCK_ABORTED) {
                 lock_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 /* rc = 0 */
index 1d66464..bdede83 100644 (file)
@@ -94,9 +94,9 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 }
 
 /* 'dir' is a inode for which a lock has already been taken */
-struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir,
-                                      struct vfsmount **mnt, char *name,
-                                      int namelen, int lock_mode,
+struct dentry *mds_name2locked_dentry(struct obd_device *obd,
+                                      struct dentry *dir, struct vfsmount **mnt,
+                                      char *name, int namelen, int lock_mode,
                                       struct lustre_handle *lockh,
                                       int dir_lock_mode)
 {
@@ -124,8 +124,8 @@ struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir
         res_id[0] = dchild->d_inode->i_ino;
         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                   &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
-                                   0, lockh);
+                                   &flags, ldlm_completion_ast,
+                                   mds_blocking_ast, NULL, 0, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(dchild);
                 up(&dir->d_inode->i_sem);
@@ -151,8 +151,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         res_id[0] = de->d_inode->i_ino;
         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                   &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
-                                   0, lockh);
+                                   &flags, ldlm_completion_ast,
+                                   mds_blocking_ast, NULL, 0, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(de);
                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
@@ -349,17 +349,14 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                      void *data, int data_len, struct ptlrpc_request **reqp)
+int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                     void *data, __u32 data_len)
 {
+        struct lustre_handle lockh;
         ENTRY;
 
-        if (desc == NULL) {
-                /* Completion AST.  Do nothing */
-                RETURN(0);
-        }
-
-        if (ldlm_cli_cancel(lockh) < 0)
+        ldlm_lock2handle(lock, &lockh);
+        if (ldlm_cli_cancel(&lockh) < 0)
                 LBUG();
         RETURN(0);
 }
@@ -412,8 +409,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback,
-                                      NULL, 0, &lockh);
+                                      &flags, ldlm_completion_ast,
+                                      mds_blocking_ast, NULL, 0, &lockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_create_de, rc = -EIO);
@@ -892,14 +889,13 @@ int mds_handle(struct ptlrpc_request *req)
                 if (rc)
                         break;
                 RETURN(0);
-        case LDLM_CALLBACK:
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 CERROR("callbacks should not happen on MDS\n");
                 LBUG();
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
-
-
         default:
                 rc = ptlrpc_error(req->rq_svc, req);
                 RETURN(rc);
@@ -1142,7 +1138,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
                 it->opc = NTOH__u64(it->opc);
 
-                LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
+                LDLM_DEBUG(lock, "intent policy, opc: %s",
+                           ldlm_it2str(it->opc));
 
                 /* prepare reply */
                 switch((long)it->opc) {
@@ -1183,8 +1180,15 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
                 /* execute policy */
                 switch ((long)it->opc) {
-                case IT_CREAT:
                 case IT_CREAT|IT_OPEN:
+                        rc = mds_reint(2, req);
+                        if (rc || (req->rq_status != 0 &&
+                                   req->rq_status != -EEXIST)) {
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
+                        }
+                        break;
+                case IT_CREAT:
                 case IT_LINK:
                 case IT_MKDIR:
                 case IT_MKNOD:
@@ -1259,7 +1263,7 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
 
 extern int mds_iocontrol(long cmd, struct lustre_handle *conn, 
-                          int len, void *karg, void *uarg);
+                         int len, void *karg, void *uarg);
 
 /* use obd ops to offer management infrastructure */
 static struct obd_ops mds_obd_ops = {
index f9cfb36..f9ee992 100644 (file)
@@ -98,12 +98,10 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle,
 }
 
 static int mds_extN_set_md(struct inode *inode, void *handle,
-                             struct lov_stripe_md *md)
+                           struct lov_stripe_md *md)
 {
         int rc;
 
-
-
         lock_kernel();
         down(&inode->i_sem);
         if (md == NULL)
@@ -118,9 +116,11 @@ static int mds_extN_set_md(struct inode *inode, void *handle,
         up(&inode->i_sem);
         unlock_kernel();
 
-        if (rc)
-                CERROR("error adding objectid %Ld to inode %ld\n",
-                       (unsigned long long)md->lmd_object_id, inode->i_ino);
+        if (rc) {
+                CERROR("error adding objectid %Ld to inode %ld: %d\n",
+                       (unsigned long long)md->lmd_object_id, inode->i_ino, rc);
+                LBUG();
+        }
         return rc;
 }
 
index e18be7f..c63ebc6 100644 (file)
@@ -128,17 +128,17 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         }
 
         EXIT;
- out_unlock:
     out_unlock:
         unlock_kernel();
- out_setattr_de:
     out_setattr_de:
         l_dput(de);
- out_setattr:
     out_setattr:
         req->rq_status = rc;
-        return(0);
+        return (0);
 }
 
 static int mds_reint_recreate(struct mds_update_record *rec, int offset,
-                            struct ptlrpc_request *req)
+                              struct ptlrpc_request *req)
 {
         struct dentry *de = NULL;
         struct mds_obd *mds = mds_req2mds(req);
@@ -177,10 +177,10 @@ static int mds_reint_recreate(struct mds_update_record *rec, int offset,
                 LBUG();
         }
 
-out_create_dchild:
+      out_create_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-out_create_de:
+      out_create_de:
         l_dput(de);
         req->rq_status = rc;
         return 0;
@@ -262,7 +262,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         }
 
         switch (type) {
-        case S_IFREG: {
+        case S_IFREG:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
                 if (!handle)
                         GOTO(out_create_dchild, PTR_ERR(handle));
@@ -270,7 +270,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 EXIT;
                 break;
         }
-        case S_IFDIR: {
+        case S_IFDIR:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
                 if (!handle)
                         GOTO(out_create_dchild, PTR_ERR(handle));
@@ -278,7 +278,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 EXIT;
                 break;
         }
-        case S_IFLNK: {
+        case S_IFLNK:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
                 if (!handle)
                         GOTO(out_create_dchild, PTR_ERR(handle));
@@ -289,7 +289,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         case S_IFCHR:
         case S_IFBLK:
         case S_IFIFO:
-        case S_IFSOCK: {
+        case S_IFSOCK:{
                 int rdev = rec->ur_rdev;
                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
                 if (!handle)
@@ -299,7 +299,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 break;
         }
         default:
-                CERROR("bad file type %o for create of %s\n",type,rec->ur_name);
+                CERROR("bad file type %o for create of %s\n", type,
+                       rec->ur_name);
                 GOTO(out_create_dchild, rc = -EINVAL);
         }
 
@@ -351,29 +352,29 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 body->valid = OBD_MD_FLID | OBD_MD_FLGENER;
         }
         EXIT;
-out_create_commit:
+      out_create_commit:
         err = mds_fs_commit(mds, dir, handle);
         if (err) {
                 CERROR("error on commit: err = %d\n", err);
                 if (!rc)
                         rc = err;
         }
-out_create_dchild:
+      out_create_dchild:
         l_dput(dchild);
         ldlm_lock_decref(&lockh, lock_mode);
-out_create_de:
+      out_create_de:
         up(&dir->i_sem);
         l_dput(de);
-out_create:
+      out_create:
         req->rq_status = rc;
         return 0;
 
-out_create_unlink:
+      out_create_unlink:
         /* Destroy the file we just created.  This should not need extra
          * journal credits, as we have already modified all of the blocks
          * needed in order to create the file in the first place.
          */
-        switch(type) {
+        switch (type) {
         case S_IFDIR:
                 err = vfs_rmdir(dir, dchild);
                 if (err)
@@ -407,8 +408,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         /* a name was supplied by the client; fid1 is the directory */
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
-        de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode,
-                                    &lockh);
+        de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
         if (IS_ERR(de)) {
                 LBUG();
                 RETURN(PTR_ERR(de));
@@ -417,7 +417,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
-                                    LCK_EX, &child_lockh, lock_mode);
+                                        LCK_EX, &child_lockh, lock_mode);
 
         if (IS_ERR(dchild) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
                 LBUG();
@@ -431,10 +431,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         if (!inode) {
                 CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s\n",
                        dir->i_ino, rec->ur_name);
-                /* XXX should be out_unlink_cancel, or do we keep child_lockh?*/
+                /* XXX should be out_unlink_cancel, or do we keep child_lockh? */
                 GOTO(out_unlink_dchild, rc = -ENOENT);
         } else if (offset) {
-                struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); 
+                struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
                 mds_pack_inode2fid(&body->fid1, inode);
                 mds_pack_inode2body(body, inode);
         }
@@ -456,7 +456,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         md = lustre_msg_buf(req->rq_repmsg, 2);
                         md->lmd_easize = mds->mds_max_mdsize;
                         if ((rc = mds_fs_get_md(mds, inode, md)) < 0) {
-                                CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n",
+                                CDEBUG(D_INFO,
+                                       "No md for ino %ld: rc = %d\n",
                                        inode->i_ino, rc);
                                 memset(md, 0, md->lmd_easize);
                         }
@@ -481,18 +482,18 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         EXIT;
 
- out_unlink_cancel:
     out_unlink_cancel:
         ldlm_lock_decref(&child_lockh, LCK_EX);
         err = ldlm_cli_cancel(&child_lockh);
         if (err < 0) {
                 CERROR("failed to cancel child inode lock: err = %d\n", err);
                 if (!rc)
-                        rc = -ENOLCK; /*XXX translate LDLM lock error */
+                        rc = -ENOLCK;   /*XXX translate LDLM lock error */
         }
-out_unlink_dchild:
+      out_unlink_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-out_unlink:
+      out_unlink:
         ldlm_lock_decref(&lockh, lock_mode);
         l_dput(de);
         req->rq_status = rc;
@@ -500,7 +501,7 @@ out_unlink:
 }
 
 static int mds_reint_link(struct mds_update_record *rec, int offset,
-                            struct ptlrpc_request *req)
+                          struct ptlrpc_request *req)
 {
         struct dentry *de_src = NULL;
         struct dentry *de_tgt_dir = NULL;
@@ -554,14 +555,14 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         }
         EXIT;
 
-out_link_dchild:
+      out_link_dchild:
         l_dput(dchild);
-out_link_de_tgt_dir:
+      out_link_de_tgt_dir:
         up(&de_tgt_dir->d_inode->i_sem);
         l_dput(de_tgt_dir);
-out_link_de_src:
+      out_link_de_src:
         l_dput(de_src);
-out_link:
+      out_link:
         req->rq_status = rc;
         return 0;
 }
@@ -578,7 +579,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct lustre_handle tgtlockh, srclockh, oldhandle;
         int flags, lock_mode, rc = 0, err;
         void *handle;
-        __u64 res_id[3] = {0};
+        __u64 res_id[3] = { 0 };
         ENTRY;
 
         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
@@ -589,13 +590,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         res_id[0] = de_srcdir->d_inode->i_ino;
 
         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
-                                   NULL, 0, lock_mode, &srclockh);
+                             NULL, 0, lock_mode, &srclockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
-                                      0, &srclockh);
+                                      &flags, ldlm_completion_ast,
+                                      mds_blocking_ast, NULL, 0, &srclockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_rename_srcput, rc = -EIO);
@@ -611,13 +612,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         res_id[0] = de_tgtdir->d_inode->i_ino;
 
         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
-                                   NULL, 0, lock_mode, &tgtlockh);
+                             NULL, 0, lock_mode, &tgtlockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
-                                      0, &tgtlockh);
+                                      &flags, ldlm_completion_ast,
+                                      mds_blocking_ast, NULL, 0, &tgtlockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_rename_tgtput, rc = -EIO);
@@ -625,7 +626,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         } else
                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
 
-       double_lock(de_tgtdir, de_srcdir);
+        double_lock(de_tgtdir, de_srcdir);
 
         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
         if (IS_ERR(de_old)) {
@@ -663,21 +664,20 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         }
         EXIT;
 
-out_rename_denew:
+      out_rename_denew:
         l_dput(de_new);
-out_rename_deold:
-        if (!rc) { 
+      out_rename_deold:
+        if (!rc) {
                 res_id[0] = de_old->d_inode->i_ino;
                 /* Take an exclusive lock on the resource that we're
                  * about to free, to force everyone to drop their
                  * locks. */
                 LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, 
-                                      res_id,
-                                      LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
-                                      ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, 
-                                      &oldhandle);
-                if (rc) 
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+                                      res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
+                                      &flags, ldlm_completion_ast,
+                                      mds_blocking_ast, NULL, 0, &oldhandle);
+                if (rc)
                         CERROR("failed to get child inode lock (child ino %Ld, "
                                "dir ino %ld)\n",
                                res_id[0], de_old->d_inode->i_ino);
@@ -685,37 +685,37 @@ out_rename_deold:
 
         l_dput(de_old);
 
-        if (!rc) { 
+        if (!rc) {
                 ldlm_lock_decref(&oldhandle, LCK_EX);
                 rc = ldlm_cli_cancel(&oldhandle);
                 if (rc < 0)
                         CERROR("failed to cancel child inode lock ino "
                                "%Ld: %d\n", res_id[0], rc);
         }
- out_rename_tgtdir:
     out_rename_tgtdir:
         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
         ldlm_lock_decref(&tgtlockh, lock_mode);
- out_rename_tgtput:
     out_rename_tgtput:
         l_dput(de_tgtdir);
- out_rename_srcdir:
     out_rename_srcdir:
         ldlm_lock_decref(&srclockh, lock_mode);
- out_rename_srcput:
     out_rename_srcput:
         l_dput(de_srcdir);
- out_rename:
     out_rename:
         req->rq_status = rc;
         return 0;
 }
 
-typedef int (*mds_reinter)(struct mds_update_record *, int offset,
-                           struct ptlrpc_request *);
+typedef int (*mds_reinter) (struct mds_update_record *, int offset,
+                            struct ptlrpc_request *);
 
-static mds_reinter reinters[REINT_MAX+1] = {
-        [REINT_SETATTR]   mds_reint_setattr,
-        [REINT_CREATE]    mds_reint_create,
-        [REINT_UNLINK]    mds_reint_unlink,
-        [REINT_LINK]      mds_reint_link,
-        [REINT_RENAME]    mds_reint_rename,
-        [REINT_RECREATE]  mds_reint_recreate,
+static mds_reinter reinters[REINT_MAX + 1] = {
+        [REINT_SETATTR] mds_reint_setattr,
+        [REINT_CREATE] mds_reint_create,
+        [REINT_UNLINK] mds_reint_unlink,
+        [REINT_LINK] mds_reint_link,
+        [REINT_RENAME] mds_reint_rename,
+        [REINT_RECREATE] mds_reint_recreate,
 };
 
 int mds_reint_rec(struct mds_update_record *rec, int offset,
@@ -733,7 +733,7 @@ int mds_reint_rec(struct mds_update_record *rec, int offset,
         }
 
         push_ctxt(&saved, &mds->mds_ctxt);
-        rc = reinters[rec->ur_opcode](rec, offset, req);
+        rc = reinters[rec->ur_opcode] (rec, offset, req);
         pop_ctxt(&saved);
 
         return rc;
index 1dcf9b7..8b2a80f 100644 (file)
@@ -301,8 +301,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                                         LBUG();
                                 }
                                 memcpy(obd->obd_name, data->ioc_inlbuf2, len);
-                                //obd->obd_proc_entry =
-                                //        proc_lustre_register_obd_device(obd);
+                                obd->obd_proc_entry =
+                                        proc_lustre_register_obd_device(obd);
                         } else {
                                 CERROR("WARNING: unnamed obd device\n");
                                 obd->obd_proc_entry = NULL;
@@ -348,8 +348,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                         obd->obd_name = NULL;
                 }
 
-                //if (obd->obd_proc_entry)
-                  //      proc_lustre_release_obd_device(obd);
+                if (obd->obd_proc_entry)
+                        proc_lustre_release_obd_device(obd);
 
                 obd->obd_flags &= ~OBD_ATTACHED;
                 obd->obd_type->typ_refcnt--;
index 34601a5..979467c 100644 (file)
@@ -1,27 +1,42 @@
-/* proc_lustre.c manages /proc/lustre/obd. 
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * proc_lustre.c manages /proc/lustre
  *
  * Copyright (c) 2001 Rumi Zahir <rumi.zahir@intel.com>
  *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
  *
- * OBD devices materialize in /proc as a directory:
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+/* OBD devices materialize in /proc as a directory:
  *              /proc/lustre/obd/<number>
- * when /dev/obd<number> is opened. When the device is closed, the 
- * directory entry disappears. 
- * 
+ * when /dev/obd<number> is opened. When the device is closed, the
+ * directory entry disappears.
+ *
  * For each open OBD device, code in this file also creates a file
- * named <status>. "cat /proc/lustre/obd/<number>/status" gives 
+ * named <status>. "cat /proc/lustre/obd/<number>/status" gives
  * information about the OBD device's configuration.
  * The class driver manages the "status" entry.
  *
  * Other logical drivers can create their own entries. For example,
  * the obdtrace driver creates /proc/lustre/obd/<obdid>/stats entry.
  *
- * This file defines three functions 
+ * This file defines three functions
  *   proc_lustre_register_obd_device() - called at device attach time
  *   proc_lustre_release_obd_device() - called at detach
- *               proc_lustre_remove_obd_entry() 
+ *               proc_lustre_remove_obd_entry()
  * that dynamically create/delete /proc/lustre/obd entries:
  *
  *     proc_lustre_register_obd_device() registers an obd device,
@@ -33,7 +48,7 @@
  *
  *     proc_lustre_remove_obd_entry() removes a
  *     /proc/lustre/obd/<obdid>/ entry by name. This is the only
- *     function that is exported to other modules. 
+ *     function that is exported to other modules.
  */
 
 #define EXPORT_SYMTAB
 
 #ifdef CONFIG_PROC_FS
 extern struct proc_dir_entry proc_root;
-static struct proc_dir_entry *proc_lustre_dir_entry = 0;
-static struct proc_dir_entry *proc_lustre_obd_dir_entry = 0;
-
+static struct proc_dir_entry *proc_lustre_dir_entry = NULL;
+static struct proc_dir_entry *proc_lustre_obd_dir_entry = NULL;
 
 static int read_lustre_status(char *page, char **start, off_t offset,
-                             int count, int *eof, void *data)
+                              int count, int *eof, void *data)
 {
-       struct obd_device * obddev = (struct obd_device *) data;
-       int p;
+        struct obd_device * obddev = (struct obd_device *)data;
+        int p;
+
+#warning FIXME: This function is madness, completely unsafe, a disaster waiting to happen.
 
-       p = sprintf(&page[0], "device=%d\n", obddev->obd_minor);
-       p += sprintf(&page[0], "name=%s\n", MKSTR(obddev->obd_name));
-       p += sprintf(&page[0], "uuid=%s\n", obddev->obd_uuid);
+        p = sprintf(&page[0], "device=%d\n", obddev->obd_minor);
+        p += sprintf(&page[p], "name=%s\n", MKSTR(obddev->obd_name));
+        p += sprintf(&page[p], "uuid=%s\n", obddev->obd_uuid);
         p += sprintf(&page[p], "attached=1\n");
-        p += sprintf(&page[0], "type=%s\n", MKSTR(obddev->obd_type->typ_name));
+        p += sprintf(&page[p], "type=%s\n", MKSTR(obddev->obd_type->typ_name));
 
-        if  (obddev->obd_flags & OBD_SET_UP) {
+        if (obddev->obd_flags & OBD_SET_UP)
                 p += sprintf(&page[p], "setup=1\n");
-        }
 
         /* print exports */
         {
-                struct list_head * lh;
-                struct obd_export * export=0;
+                struct list_head *lh;
+                struct obd_export *export = NULL;
 
                 lh = &obddev->obd_exports;
                 while ((lh = lh->next) != &obddev->obd_exports) {
                         p += sprintf(&page[p],
-                                     ((export==0) ? ", connections(" : ",") );
+                                  ((export == NULL) ? ", connections(" : ",") );
                         export = list_entry(lh, struct obd_export, exp_chain);
                         p += sprintf(&page[p], "%p", export);
                 }
-                if (export!=0) { /* there was at least one export */
+                if (export != 0) { /* there was at least one export */
                         p += sprintf(&page[p], ")");
                 }
         }
 
         p += sprintf(&page[p], "\n");
 
-       /* Compute eof and return value */
-       if (offset + count >= p) {
-               *eof=1;
-               return (p - offset);
-       }
-       return count;
+        /* Compute eof and return value */
+        if (offset + count >= p) {
+                *eof = 1;
+                return (p - offset);
+        }
+        return count;
 }
 
-struct proc_dir_entry *
-proc_lustre_register_obd_device(struct obd_device *obd)
+struct proc_dir_entry *proc_lustre_register_obd_device(struct obd_device *obd)
 {
-       char obdname[32];
-       struct proc_dir_entry *obd_dir;
-       struct proc_dir_entry *obd_status = 0;
-
-       if (!proc_lustre_dir_entry) {
-               proc_lustre_dir_entry = proc_mkdir("lustre", &proc_root);
-               if (IS_ERR(proc_lustre_dir_entry))
-                       return 0;
-       
-               proc_lustre_obd_dir_entry = 
-                       proc_mkdir("devices", proc_lustre_dir_entry);
-               if (IS_ERR(proc_lustre_obd_dir_entry))
-                       return 0;
-       }
-       sprintf(obdname, "%s", obd->obd_name);
-       obd_dir =  proc_mkdir(obdname, proc_lustre_obd_dir_entry);
-
-        if (obd_dir) 
-               obd_status = create_proc_entry("status", S_IRUSR | S_IFREG, 
+        struct proc_dir_entry *obd_dir;
+        struct proc_dir_entry *obd_status = NULL;
+
+        if (!proc_lustre_dir_entry) {
+                proc_lustre_dir_entry = proc_mkdir("lustre", &proc_root);
+                if (IS_ERR(proc_lustre_dir_entry))
+                        return 0;
+
+                proc_lustre_obd_dir_entry =
+                        proc_mkdir("devices", proc_lustre_dir_entry);
+                if (IS_ERR(proc_lustre_obd_dir_entry))
+                        return 0;
+        }
+        obd_dir = proc_mkdir(obd->obd_name, proc_lustre_obd_dir_entry);
+
+        if (obd_dir)
+                obd_status = create_proc_entry("status", S_IRUSR | S_IFREG,
                                                obd_dir);
 
-       if (obd_status) {
-               obd_status->read_proc = read_lustre_status;
-               obd_status->data = (void*) obd;
-       }
+        if (obd_status) {
+                obd_status->read_proc = read_lustre_status;
+                obd_status->data = (void *)obd;
+        }
 
-       return obd_dir;
+        return obd_dir;
 }
 
-void proc_lustre_remove_obd_entry(const charname, struct obd_device *obd)
+void proc_lustre_remove_obd_entry(const char *name, struct obd_device *obd)
 {
-       struct proc_dir_entry *obd_entry = 0;
-       struct proc_dir_entry *obd_dir = obd->obd_proc_entry;
-       
-       remove_proc_entry(name, obd_dir);
-
-       while (obd_dir->subdir==0) {
-               /* if we removed last entry in this directory,
-                * then remove parent directory unless this
-                * is /proc itself
-                */
-               if (obd_dir == &proc_root) 
-                       break;
-                       
-               obd_entry = obd_dir;
-               obd_dir = obd_dir->parent;
-
-               /* If /proc/lustre/obd/foo or /proc/lustre/obd or
-                * /proc/lustre is being removed, then reset
-                * internal variables
-                */
-
-               if (obd_entry == obd->obd_proc_entry)
-                       obd->obd_proc_entry=0; /* /proc/lustre/obd/foo */
-               else 
-                       if (obd_entry == proc_lustre_obd_dir_entry)
-                               proc_lustre_obd_dir_entry=0;
-                       else 
-                               if (obd_entry == proc_lustre_dir_entry) 
-                                       proc_lustre_dir_entry=0;
-
-               remove_proc_entry(obd_entry->name, obd_dir);
-       }
+        struct proc_dir_entry *obd_entry = NULL;
+        struct proc_dir_entry *obd_dir = obd->obd_proc_entry;
+
+        remove_proc_entry(name, obd_dir);
+
+        while (obd_dir->subdir == NULL) {
+                /* if we removed last entry in this directory, then
+                 * remove parent directory unless this is /proc itself */
+                if (obd_dir == &proc_root)
+                        break;
+
+                obd_entry = obd_dir;
+                obd_dir = obd_dir->parent;
+
+                /* If /proc/lustre/obd/foo or /proc/lustre/obd or
+                 * /proc/lustre is being removed, then reset internal
+                 * variables */
+
+                if (obd_entry == obd->obd_proc_entry)
+                        obd->obd_proc_entry = NULL; /* /proc/lustre/obd/foo */
+                else if (obd_entry == proc_lustre_obd_dir_entry)
+                        proc_lustre_obd_dir_entry = NULL; /* /proc/lustre/obd */
+                else if (obd_entry == proc_lustre_dir_entry)
+                        proc_lustre_dir_entry = NULL; /* /proc/lustre */
+
+                remove_proc_entry(obd_entry->name, obd_dir);
+        }
 }
 
 void proc_lustre_release_obd_device(struct obd_device *obd)
 {
-       proc_lustre_remove_obd_entry("status", obd);
+        proc_lustre_remove_obd_entry("status", obd);
 }
 
 
@@ -174,23 +181,17 @@ void proc_lustre_release_obd_device(struct obd_device *obd)
 
 struct proc_dir_entry *proc_lustre_register_obd_device(struct obd_device *obd)
 {
-       return 0;
+        return 0;
 }
 
-void proc_lustre_remove_obd_entry(const char* name, struct obd_device *obd) {}
-void proc_lustre_release_obd_device(struct obd_device *obd) {}
-
-#endif   /* CONFIG_PROC_FS */
-                                  
-EXPORT_SYMBOL(proc_lustre_remove_obd_entry);
-
-
-
-
-
-
-
-
+void proc_lustre_remove_obd_entry(const char* name, struct obd_device *obd)
+{
+}
 
+void proc_lustre_release_obd_device(struct obd_device *obd)
+{
+}
 
+#endif   /* CONFIG_PROC_FS */
 
+EXPORT_SYMBOL(proc_lustre_remove_obd_entry);
index 22ba1f2..e2ceb0e 100644 (file)
@@ -675,6 +675,11 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa,
                 GOTO(out, rc = -ENOENT);
 
         inode = object_dentry->d_inode;
+        if (inode == NULL) {
+                CERROR("trying to destroy negative inode %Ld!\n", oa->o_id);
+                GOTO(out, rc = -ENOENT);
+        }
+
         if (inode->i_nlink != 1) {
                 CERROR("destroying inode with nlink = %d\n", inode->i_nlink);
                 LBUG();
index 333ca79..90a2571 100644 (file)
@@ -514,15 +514,13 @@ static int ost_handle(struct ptlrpc_request *req)
                 if (rc)
                         break;
                 RETURN(0);
-        case LDLM_CALLBACK:
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
-                CERROR("callbacks should not happen on MDS\n");
+                CERROR("callbacks should not happen on OST\n");
                 LBUG();
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
-
-
-
         default:
                 req->rq_status = -ENOTSUPP;
                 rc = ptlrpc_error(req->rq_svc, req);
index 71b4f95..952dac5 100644 (file)
@@ -4,7 +4,7 @@ my $mtpt = shift || usage();
 my $mount_count = shift || usage();
 my $i = shift || usage();
 my $files = 5;
-my $mcreate = 1; # should we use mcreate or open?
+my $mcreate = 0; # should we use mcreate or open?
 
 sub usage () {
     print "Usage: $0 <mount point prefix> <mount count> <iterations>\n";
index 097b4ab..0320ff2 100644 (file)
@@ -5,10 +5,10 @@ CFLAGS:=-g -O2 -I. -I/usr/include/libxml2 -I/usr/include/glib-1.2 -I$(PORTALS)/i
 -I/usr/lib/glib/include -I$(srcdir)/../include -Wall
 KFLAGS:=
 CPPFLAGS = $(HAVE_LIBREADLINE)
-#bdctl_LDADD := $(LIBREADLINE) -lxml2 # -lefence
+obdctl_LDADD := $(LIBREADLINE) -lxml2 # -lefence
 lctl_LDADD := $(LIBREADLINE)
-sbin_PROGRAMS = lctl obdctl
-#obdctl_SOURCES = parser.c obdctl.c parser.h
+sbin_PROGRAMS = lctl obdctl
+obdctl_SOURCES = parser.c obdctl.c parser.h
 lctl_SOURCES = parser.c network.c device.c debug.c lctl.c lctl.h parser.h
 
 include $(top_srcdir)/Rules
index acd785f..6fbd1a7 100644 (file)
@@ -50,7 +50,7 @@
 #include <errno.h>
 #include <string.h>
 
-#include <asm/page.h>   /* needed for PAGE_SIZE - rread*/ 
+#include <asm/page.h>           /* needed for PAGE_SIZE - rread */
 
 #define __KERNEL__
 #include <linux/list.h>
 
 #define SHMEM_STATS 1
 #if SHMEM_STATS
-#include <sys/ipc.h>
-#include <sys/shm.h>
+# include <sys/ipc.h>
+# include <sys/shm.h>
 
-#define MAX_SHMEM_COUNT 1024
+# define MAX_SHMEM_COUNT 1024
 static long long *shared_counters;
-static long long  counter_snapshot[2][MAX_SHMEM_COUNT];
-struct timeval    prev_time;
+static long long counter_snapshot[2][MAX_SHMEM_COUNT];
+struct timeval prev_time;
 #endif
 
 static int jt_newdev(int argc, char **argv);
@@ -123,7 +123,7 @@ do {                                                                    \
 
 */
 
-char * obdo_print(struct obdo *obd)
+char *obdo_print(struct obdo *obd)
 {
         char buf[1024];
 
@@ -141,10 +141,7 @@ char * obdo_print(struct obdo *obd)
                 obd->o_mode,
                 obd->o_uid,
                 obd->o_gid,
-                obd->o_flags,
-                obd->o_obdflags,
-                obd->o_nlink,
-                obd->o_valid);
+                obd->o_flags, obd->o_obdflags, obd->o_nlink, obd->o_valid);
         return strdup(buf);
 }
 
@@ -199,7 +196,7 @@ static int be_verbose(int verbose, struct timeval *next_time,
         }
 
         /* A negative verbosity means to print at most each X seconds */
-        if (verbose < 0 && next_time != NULL && difftime(&now, next_time) >= 0){
+        if (verbose < 0 && next_time != NULL && difftime(&now, next_time) >= 0) {
                 next_time->tv_sec = now.tv_sec - verbose;
                 next_time->tv_usec = now.tv_usec;
                 if (next_num)
@@ -219,7 +216,7 @@ static int get_verbose(const char *arg)
         else if (arg[0] == 's' || arg[0] == 'q')
                 verbose = 0;
         else
-                verbose = (int) strtoul(arg, NULL, 0);
+                verbose = (int)strtoul(arg, NULL, 0);
 
         if (verbose < 0)
                 printf("Print status every %d seconds\n", -verbose);
@@ -235,12 +232,12 @@ static int do_disconnect(char *func, int verbose)
 {
         struct obd_ioctl_data data;
 
-        if (conn_addr == -1) 
-                return 0; 
+        if (conn_addr == -1)
+                return 0;
 
         IOCINIT(data);
 
-        rc = ioctl(fd, OBD_IOC_DISCONNECT , &data);
+        rc = ioctl(fd, OBD_IOC_DISCONNECT, &data);
         if (rc < 0) {
                 fprintf(stderr, "error: %s: %x %s\n", cmdname(func),
                         OBD_IOC_DISCONNECT, strerror(errno));
@@ -255,79 +252,75 @@ static int do_disconnect(char *func, int verbose)
 }
 
 #if SHMEM_STATS
-static void
-shmem_setup ()
+static void shmem_setup(void)
 {
-        int shmid = shmget (IPC_PRIVATE, sizeof (counter_snapshot[0]), 0600);
+        int shmid = shmget(IPC_PRIVATE, sizeof(counter_snapshot[0]), 0600);
 
-        if (shmid == -1)
-        {
-                fprintf (stderr, "Can't create shared memory counters: %s\n", strerror (errno));
+        if (shmid == -1) {
+                fprintf(stderr, "Can't create shared memory counters: %s\n",
+                        strerror(errno));
                 return;
         }
-        
-        shared_counters = (long long *)shmat (shmid, NULL, 0);
 
-        if (shared_counters == (long long *)(-1))
-        {
-                fprintf (stderr, "Can't attach shared memory counters: %s\n", strerror (errno));
+        shared_counters = (long long *)shmat(shmid, NULL, 0);
+
+        if (shared_counters == (long long *)(-1)) {
+                fprintf(stderr, "Can't attach shared memory counters: %s\n",
+                        strerror(errno));
                 shared_counters = NULL;
                 return;
         }
 }
 
-static inline void
-shmem_reset ()
+static inline void shmem_reset(void)
 {
         if (shared_counters == NULL)
                 return;
-        
-        memset (shared_counters, 0, sizeof (counter_snapshot[0]));
-        memset (counter_snapshot, 0, sizeof (counter_snapshot));
-        gettimeofday (&prev_time, NULL);
+
+        memset(shared_counters, 0, sizeof(counter_snapshot[0]));
+        memset(counter_snapshot, 0, sizeof(counter_snapshot));
+        gettimeofday(&prev_time, NULL);
 }
 
-static inline void
-shmem_bump ()
+static inline void shmem_bump(void)
 {
-        if (shared_counters == NULL || 
-            thread <= 0 || thread > MAX_SHMEM_COUNT)
+        if (shared_counters == NULL || thread <= 0 || thread > MAX_SHMEM_COUNT)
                 return;
-        
+
         shared_counters[thread - 1]++;
 }
 
-static void
-shmem_snap (int n)
+static void shmem_snap(int n)
 {
         struct timeval this_time;
-        int            non_zero = 0;
-        long long      total = 0;
-        double         secs;
-        int            i;
-        
+        int non_zero = 0;
+        long long total = 0;
+        double secs;
+        int i;
+
         if (shared_counters == NULL || n > MAX_SHMEM_COUNT)
                 return;
 
-        memcpy (counter_snapshot[1], counter_snapshot[0], n * sizeof (counter_snapshot[0][0]));
-        memcpy (counter_snapshot[0], shared_counters, n * sizeof (counter_snapshot[0][0]));
-        gettimeofday (&this_time, NULL);
-        
-        for (i = 0; i < n; i++)
-        {
-                long long this_count = counter_snapshot[0][i] - counter_snapshot[1][i];
-                
-                if (this_count != 0)
-                {
+        memcpy(counter_snapshot[1], counter_snapshot[0],
+               n * sizeof(counter_snapshot[0][0]));
+        memcpy(counter_snapshot[0], shared_counters,
+               n * sizeof(counter_snapshot[0][0]));
+        gettimeofday(&this_time, NULL);
+
+        for (i = 0; i < n; i++) {
+                long long this_count =
+                        counter_snapshot[0][i] - counter_snapshot[1][i];
+
+                if (this_count != 0) {
                         non_zero++;
                         total += this_count;
                 }
         }
-        
-        secs = (this_time.tv_sec + this_time.tv_usec/1000000.0) -
-               (prev_time.tv_sec + prev_time.tv_usec/1000000.0);
-        
-        printf ("%d/%d Total: %f/second\n", non_zero, n, total / secs);
+
+        secs = (this_time.tv_sec + this_time.tv_usec / 1000000.0) -
+                (prev_time.tv_sec + prev_time.tv_usec / 1000000.0);
+
+        printf("%d/%d Total: %f/second\n", non_zero, n, total / secs);
 
         prev_time = this_time;
 }
@@ -345,12 +338,13 @@ shmem_snap (int n)
 
 extern command_t cmdlist[];
 
-static int xml_command(char *cmd, ...) {
+static int xml_command(char *cmd, ...)
+{
         va_list args;
         char *arg, *cmds[8];
         int i = 1, j;
-       
-        cmds[0] = cmd; 
+
+        cmds[0] = cmd;
         va_start(args, cmd);
 
         while (((arg = va_arg(args, char *)) != NULL) && (i < 8)) {
@@ -362,7 +356,7 @@ static int xml_command(char *cmd, ...) {
 
         printf("obdctl > ");
         for (j = 0; j < i; j++)
-          printf("%s ", cmds[j]);
+                printf("%s ", cmds[j]);
 
         printf("\n");
 
@@ -370,15 +364,16 @@ static int xml_command(char *cmd, ...) {
 }
 
 #if 0
-static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root) {
+static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root)
+{
         xmlNodePtr cur = root->xmlChildrenNode;
         network_t *net;
-        
-        if ((net = (network_t *)calloc(1, sizeof(network_t))) == NULL) {
+
+        if ((net = (network_t *) calloc(1, sizeof(network_t))) == NULL) {
                 printf("error: unable to malloc network_t\n");
                 return NULL;
         }
-        
+
         net->type = xmlGetProp(root, "type");
         if (net->type == NULL) {
                 printf("error: type attrib required (tcp, elan, myrinet)\n");
@@ -394,20 +389,21 @@ static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root) {
                         net->port = atoi(xmlNodeGetContent(cur));
 
                 cur = cur->next;
-        } 
+        }
 
         if (net->server == NULL) {
                 printf("error: <server> tag required\n");
                 free(net);
                 return NULL;
         }
-        
+
         return net;
 }
 #endif
 
-static int xml_mds(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_mds(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         xmlNodePtr cur = root->xmlChildrenNode;
         char *fstype = NULL, *device = NULL;
         int rc;
@@ -419,7 +415,7 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root,
 
                 if (!xmlStrcmp(cur->name, "device"))
                         device = xmlNodeGetContent(cur);
+
                 /* FIXME: Parse the network bits
                  * if (!xmlStrcmp(cur->name, "network")) {
                  *       net = xml_network(doc, cur);
@@ -429,7 +425,7 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root,
                  * free(net);
                  */
                 cur = cur->next;
-        } 
+        }
 
         if ((fstype == NULL) || (device == NULL)) {
                 printf("error: <fstype> and <device> tags required\n");
@@ -439,7 +435,8 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root,
         if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
-        if ((rc = xml_command("attach","mds",serv_name,serv_uuid,NULL)) != 0)
+        if ((rc =
+             xml_command("attach", "mds", serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", device, fstype, NULL)) != 0)
@@ -447,9 +444,10 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root,
 
         return 0;
 }
-        
-static int xml_obd(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+
+static int xml_obd(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         char *obdtype, *format = NULL, *fstype = NULL, *device = NULL;
         xmlNodePtr cur = root->xmlChildrenNode;
         int rc;
@@ -463,12 +461,12 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root,
 
                 if (!xmlStrcmp(cur->name, "device"))
                         device = xmlNodeGetContent(cur);
+
                 if (!xmlStrcmp(cur->name, "autoformat"))
                         format = xmlNodeGetContent(cur);
 
                 cur = cur->next;
-        } 
+        }
 
         if ((obdtype == NULL) || (fstype == NULL) || (device == NULL)) {
                 printf("error: 'type' attrib and <fstype> "
@@ -480,12 +478,12 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root,
          * but is currently unsupported.  You'll have to use the scripts
          * for now until support is added, or specify a real device.
          */
-        
+
         if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("attach", obdtype,
-            serv_name,serv_uuid, NULL)) != 0)
+                              serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", device, fstype, NULL)) != 0)
@@ -494,8 +492,9 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root,
         return 0;
 }
 
-static int xml_ost(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_ost(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         char *server_name = NULL, *server_uuid = NULL;
         char *failover_name = NULL, *failover_uuid = NULL;
         xmlNodePtr cur = root->xmlChildrenNode;
@@ -517,17 +516,18 @@ static int xml_ost(xmlDocPtr doc, xmlNodePtr root,
                 }
 
                 cur = cur->next;
-        } 
+        }
 
         if ((server_name == NULL) || (server_uuid == NULL)) {
                 printf("error: atleast the <server_id> tag is required\n");
                 return -1;
         }
-        
+
         if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
-        if ((rc = xml_command("attach","ost",serv_name,serv_uuid,NULL)) != 0)
+        if ((rc =
+             xml_command("attach", "ost", serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", server_name, NULL)) != 0)
@@ -536,8 +536,9 @@ static int xml_ost(xmlDocPtr doc, xmlNodePtr root,
         return 0;
 }
 
-static int xml_osc(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_osc(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         char *ost_name = NULL, *ost_uuid = NULL;
         xmlNodePtr cur = root->xmlChildrenNode;
         int ost_num, rc = 0;
@@ -551,7 +552,7 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root,
                 }
 
                 cur = cur->next;
-        } 
+        }
 
         if ((ost_name == NULL) || (ost_uuid == NULL)) {
                 printf("error: atleast the <service_id> tag is required\n");
@@ -561,7 +562,8 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root,
         if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
-        if ((rc = xml_command("attach","osc",serv_name,serv_uuid,NULL)) != 0)
+        if ((rc =
+             xml_command("attach", "osc", serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", ost_uuid, "localhost", NULL)) != 0)
@@ -570,8 +572,9 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root,
         return 0;
 }
 
-static int xml_mdc(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_mdc(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         char *mds_name = NULL, *mds_uuid = NULL;
         xmlNodePtr cur = root->xmlChildrenNode;
         int mds_num, rc = 0;
@@ -585,46 +588,51 @@ static int xml_mdc(xmlDocPtr doc, xmlNodePtr root,
                 }
 
                 cur = cur->next;
-        } 
+        }
 
         if ((mds_name == NULL) || (mds_uuid == NULL)) {
                 printf("error: atleast the <service_id> tag is required\n");
                 return -1;
         }
 
-        if ((rc = xml_command("newdev", NULL)) != 0) 
+        if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
-        if ((rc = xml_command("attach","mdc",serv_name,serv_uuid,NULL)) != 0)
+        if ((rc =
+             xml_command("attach", "mdc", serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", mds_uuid, "localhost", NULL)) != 0)
                 return rc;
-                
+
         return 0;
 }
 
-static int xml_lov(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_lov(xmlDocPtr doc, xmlNodePtr root,
+                   char *serv_name, char *serv_uuid)
+{
         printf("--- Setting up LOV ---\n");
         return 0;
 }
 
-static int xml_router(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_router(xmlDocPtr doc, xmlNodePtr root,
+                      char *serv_name, char *serv_uuid)
+{
         printf("--- Setting up ROUTER ---\n");
         return 0;
 }
 
-static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root, 
-                   char *serv_name, char *serv_uuid) {
+static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root,
+                    char *serv_name, char *serv_uuid)
+{
         int rc;
 
         printf("--- Setting up LDLM ---\n");
         if ((rc = xml_command("newdev", NULL)) != 0)
                 return rc;
 
-        if ((rc = xml_command("attach","ldlm",serv_name,serv_uuid,NULL)) != 0)
+        if ((rc =
+             xml_command("attach", "ldlm", serv_name, serv_uuid, NULL)) != 0)
                 return rc;
 
         if ((rc = xml_command("setup", NULL)) != 0)
@@ -633,8 +641,9 @@ static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root,
         return 0;
 }
 
-static int xml_service(xmlDocPtr doc, xmlNodePtr root, 
-                       int serv_num, char *serv_name, char *serv_uuid) {
+static int xml_service(xmlDocPtr doc, xmlNodePtr root,
+                       int serv_num, char *serv_name, char *serv_uuid)
+{
         xmlNodePtr cur = root;
         char *name, *uuid;
 
@@ -642,8 +651,7 @@ static int xml_service(xmlDocPtr doc, xmlNodePtr root,
                 name = xmlGetProp(cur, "name");
                 uuid = xmlGetProp(cur, "uuid");
 
-                if (xmlStrcmp(name, serv_name) ||
-                    xmlStrcmp(uuid, serv_uuid)) {
+                if (xmlStrcmp(name, serv_name) || xmlStrcmp(uuid, serv_uuid)) {
                         cur = cur->next;
                         continue;
                 }
@@ -665,18 +673,19 @@ static int xml_service(xmlDocPtr doc, xmlNodePtr root,
                 else if (!xmlStrcmp(cur->name, "ldlm"))
                         return xml_ldlm(doc, cur, name, uuid);
                 else
-                        return -1;        
+                        return -1;
 
                 cur = cur->next;
         }
 
         printf("error: No XML config branch for name=%s uuid=%s\n",
-                serv_name, serv_uuid); 
-        return -1; 
+               serv_name, serv_uuid);
+        return -1;
 }
 
-static int xml_profile(xmlDocPtr doc, xmlNodePtr root, 
-                       int prof_num, char *prof_name, char *prof_uuid) {
+static int xml_profile(xmlDocPtr doc, xmlNodePtr root,
+                       int prof_num, char *prof_name, char *prof_uuid)
+{
         xmlNodePtr parent, cur = root;
         char *name, *uuid, *srv_name, *srv_uuid;
         int rc = 0, num;
@@ -685,16 +694,15 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root,
                 name = xmlGetProp(cur, "name");
                 uuid = xmlGetProp(cur, "uuid");
 
-                if (xmlStrcmp(cur->name, "profile") || 
-                    xmlStrcmp(name, prof_name)          ||
-                    xmlStrcmp(uuid, prof_uuid)) {
+                if (xmlStrcmp(cur->name, "profile") ||
+                    xmlStrcmp(name, prof_name) || xmlStrcmp(uuid, prof_uuid)) {
                         cur = cur->next;
                         continue;
                 }
 
                 /* FIXME: Doesn't understand mountpoints yet
                  *        xml_mountpoint(doc, root, ...);
-                 */    
+                 */
 
                 /* Setup each service in turn
                  * FIXME: Should be sorted by "num" attr, we shouldn't
@@ -706,8 +714,10 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root,
                         if (!xmlStrcmp(cur->name, "service_id")) {
                                 num = atoi(xmlGetProp(cur, "num"));
                                 rc = xml_service(doc, root, num,
-                                        srv_name = xmlGetProp(cur, "name"),
-                                        srv_uuid = xmlGetProp(cur, "uuid"));
+                                                 srv_name =
+                                                 xmlGetProp(cur, "name"),
+                                                 srv_uuid =
+                                                 xmlGetProp(cur, "uuid"));
                                 if (rc != 0) {
                                         printf("error: service config\n");
                                         return rc;
@@ -720,14 +730,15 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root,
                 cur = parent->next;
         }
 
-        return rc; 
+        return rc;
 }
 
-static int xml_node(xmlDocPtr doc, xmlNodePtr root) {
+static int xml_node(xmlDocPtr doc, xmlNodePtr root)
+{
         xmlNodePtr parent, cur = root;
         char *name, *uuid;
         int rc = 0, num;
-        
+
         /* Walk the node tags looking for ours */
         while (cur != NULL) {
                 if (xmlStrcmp(cur->name, "node")) {
@@ -781,7 +792,7 @@ static int do_xml(char *func, char *file)
         doc = xmlParseFile(file);
         if (doc == NULL) {
                 fprintf(stderr, "error: Unable to parse XML\n");
-                return -1; 
+                return -1;
         }
 
         cur = xmlDocGetRootElement(doc);
@@ -790,7 +801,7 @@ static int do_xml(char *func, char *file)
                 xmlFreeDoc(doc);
                 return -1;
         }
-        
+
         if (xmlStrcmp(cur->name, (const xmlChar *)"lustre")) {
                 fprintf(stderr, "error: Root node != <lustre>\n");
                 xmlFreeDoc(doc);
@@ -798,13 +809,13 @@ static int do_xml(char *func, char *file)
         }
 
         /* FIXME: Validate the XML against the DTD here */
-    
+
         /* FIXME: Merge all the text nodes under each branch and 
          *        prune empty nodes.  Just to make the parsing more
          *        tolerant, the exact location of nested tags isn't
          *        critical for this.
          */
-        
+
         rc = xml_node(doc, cur->xmlChildrenNode);
         xmlFreeDoc(doc);
 
@@ -827,7 +838,7 @@ static int do_device(char *func, int dev)
                 return -2;
         }
 
-        return ioctl(fd, OBD_IOC_DEVICE , buf);
+        return ioctl(fd, OBD_IOC_DEVICE, buf);
 }
 
 static int jt_device(int argc, char **argv)
@@ -861,13 +872,13 @@ static int jt_connect(int argc, char **argv)
                 return -1;
         }
 
-        rc = ioctl(fd, OBD_IOC_CONNECT , &data);
+        rc = ioctl(fd, OBD_IOC_CONNECT, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %x %s\n", cmdname(argv[0]),
                         OBD_IOC_CONNECT, strerror(rc = errno));
         else
                 conn_addr = data.ioc_addr;
-                conn_cookie = data.ioc_cookie;
+        conn_cookie = data.ioc_cookie;
         return rc;
 }
 
@@ -946,7 +957,7 @@ static int jt__threads(int argc, char **argv)
                        argv[0], threads, argv[3], argv[4]);
 
         SHMEM_RESET();
-        
+
         for (i = 1, next_thread = verbose; i <= threads; i++) {
                 rc = fork();
                 if (rc < 0) {
@@ -963,22 +974,20 @@ static int jt__threads(int argc, char **argv)
                 rc = 0;
         }
 
-        if (!thread) { /* parent process */
+        if (!thread) {          /* parent process */
                 int live_threads = threads;
-                
-                while (live_threads > 0)
-                {
-                        int    status;
-                        pid_t  ret;
-
-                        ret = waitpid (0, &status, verbose < 0 ? WNOHANG : 0);
-                        if (ret == 0)
-                        {
+
+                while (live_threads > 0) {
+                        int status;
+                        pid_t ret;
+
+                        ret = waitpid(0, &status, verbose < 0 ? WNOHANG : 0);
+                        if (ret == 0) {
                                 if (verbose >= 0)
-                                        abort ();
+                                        abort();
 
-                                sleep (-verbose);
-                                SHMEM_SNAP (threads);
+                                sleep(-verbose);
+                                SHMEM_SNAP(threads);
                                 continue;
                         }
 
@@ -1026,10 +1035,10 @@ static int jt_detach(int argc, char **argv)
                 return -2;
         }
 
-        rc = ioctl(fd, OBD_IOC_DETACH , buf);
+        rc = ioctl(fd, OBD_IOC_DETACH, buf);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
-                        strerror(rc=errno));
+                        strerror(rc = errno));
 
         return rc;
 }
@@ -1045,10 +1054,10 @@ static int jt_cleanup(int argc, char **argv)
                 return -1;
         }
 
-        rc = ioctl(fd, OBD_IOC_CLEANUP , &data);
+        rc = ioctl(fd, OBD_IOC_CLEANUP, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
-                        strerror(rc=errno));
+                        strerror(rc = errno));
 
         return rc;
 }
@@ -1067,10 +1076,10 @@ static int jt_newdev(int argc, char **argv)
                 return -1;
         }
 
-        rc = ioctl(fd, OBD_IOC_NEWDEV , &data);
+        rc = ioctl(fd, OBD_IOC_NEWDEV, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
-                        strerror(rc=errno));
+                        strerror(rc = errno));
         else {
                 printf("Current device set to %d\n", data.ioc_dev);
         }
@@ -1098,10 +1107,10 @@ static int jt_list(int argc, char **argv)
                 return -1;
         }
 
-        rc = ioctl(fd, OBD_IOC_LIST , data);
+        rc = ioctl(fd, OBD_IOC_LIST, data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
-                        strerror(rc=errno));
+                        strerror(rc = errno));
         else {
                 printf("%s", data->ioc_bulk);
         }
@@ -1121,7 +1130,7 @@ static int jt_attach(int argc, char **argv)
                 return -1;
         }
 
-        data.ioc_inllen1 =  strlen(argv[1]) + 1;
+        data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
         if (argc >= 3) {
                 data.ioc_inllen2 = strlen(argv[2]) + 1;
@@ -1138,7 +1147,7 @@ static int jt_attach(int argc, char **argv)
                 return -2;
         }
 
-        rc = ioctl(fd, OBD_IOC_ATTACH , buf);
+        rc = ioctl(fd, OBD_IOC_ATTACH, buf);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %x %s\n", cmdname(argv[0]),
                         OBD_IOC_ATTACH, strerror(rc = errno));
@@ -1158,7 +1167,7 @@ static int jt_attach(int argc, char **argv)
         return rc;
 }
 
-#define N2D_OFF 0x100    /* So we can tell between error codes and devices */
+#define N2D_OFF 0x100      /* So we can tell between error codes and devices */
 
 static int do_name2dev(char *func, char *name)
 {
@@ -1176,7 +1185,7 @@ static int do_name2dev(char *func, char *name)
                 fprintf(stderr, "error: %s: invalid ioctl\n", cmdname(func));
                 return -2;
         }
-        rc = ioctl(fd, OBD_IOC_NAME2DEV , buf);
+        rc = ioctl(fd, OBD_IOC_NAME2DEV, buf);
         if (rc < 0) {
                 fprintf(stderr, "error: %s: %s - %s\n", cmdname(func),
                         name, strerror(rc = errno));
@@ -1191,7 +1200,7 @@ static int do_name2dev(char *func, char *name)
 static int jt_name2dev(int argc, char **argv)
 {
         if (argc != 2) {
-                Parser_printhelp("name2dev"); 
+                Parser_printhelp("name2dev");
                 return -1;
         }
 
@@ -1211,7 +1220,7 @@ static int jt_setup(int argc, char **argv)
 
         IOCINIT(data);
 
-        if ( argc > 3) {
+        if (argc > 3) {
                 fprintf(stderr, "usage: %s [device] [fstype]\n",
                         cmdname(argv[0]));
                 return -1;
@@ -1231,7 +1240,7 @@ static int jt_setup(int argc, char **argv)
                 data.ioc_inllen1 = strlen(argv[1]) + 1;
                 data.ioc_inlbuf1 = argv[1];
         }
-        if ( argc == 3 ) {
+        if (argc == 3) {
                 data.ioc_inllen2 = strlen(argv[2]) + 1;
                 data.ioc_inlbuf2 = argv[2];
         }
@@ -1240,7 +1249,7 @@ static int jt_setup(int argc, char **argv)
                 fprintf(stderr, "error: %s: invalid ioctl\n", cmdname(argv[0]));
                 return -2;
         }
-        rc = ioctl(fd, OBD_IOC_SETUP , buf);
+        rc = ioctl(fd, OBD_IOC_SETUP, buf);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
                         strerror(rc = errno));
@@ -1277,8 +1286,8 @@ static int jt_create(int argc, char **argv)
         gettimeofday(&next_time, NULL);
         next_time.tv_sec -= verbose;
 
-        for (i = 1, next_count = verbose; i <= count ; i++) {
-                rc = ioctl(fd, OBD_IOC_CREATE , &data);
+        for (i = 1, next_count = verbose; i <= count; i++) {
+                rc = ioctl(fd, OBD_IOC_CREATE, &data);
                 SHMEM_BUMP();
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #%d - %s\n",
@@ -1306,7 +1315,7 @@ static int jt_setattr(int argc, char **argv)
         data.ioc_obdo1.o_mode = S_IFREG | strtoul(argv[2], NULL, 0);
         data.ioc_obdo1.o_valid = OBD_MD_FLMODE;
 
-        rc = ioctl(fd, OBD_IOC_SETATTR , &data);
+        rc = ioctl(fd, OBD_IOC_SETATTR, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
                         strerror(rc = errno));
@@ -1325,9 +1334,9 @@ static int jt_destroy(int argc, char **argv)
         }
 
         data.ioc_obdo1.o_id = strtoul(argv[1], NULL, 0);
-        data.ioc_obdo1.o_mode = S_IFREG|0644;
+        data.ioc_obdo1.o_mode = S_IFREG | 0644;
 
-        rc = ioctl(fd, OBD_IOC_DESTROY , &data);
+        rc = ioctl(fd, OBD_IOC_DESTROY, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
                         strerror(rc = errno));
@@ -1351,10 +1360,10 @@ static int jt_getattr(int argc, char **argv)
         data.ioc_obdo1.o_valid = 0xffffffff;
         printf("%s: object id %Ld\n", cmdname(argv[0]), data.ioc_obdo1.o_id);
 
-        rc = ioctl(fd, OBD_IOC_GETATTR , &data);
+        rc = ioctl(fd, OBD_IOC_GETATTR, &data);
         if (rc) {
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
-                        strerror(rc=errno));
+                        strerror(rc = errno));
         } else {
                 printf("%s: object id %Ld, mode %o\n", cmdname(argv[0]),
                        data.ioc_obdo1.o_id, data.ioc_obdo1.o_mode);
@@ -1370,7 +1379,8 @@ static int jt_test_getattr(int argc, char **argv)
         int verbose;
 
         if (argc != 2 && argc != 3) {
-                fprintf(stderr, "usage: %s count [verbose]\n",cmdname(argv[0]));
+                fprintf(stderr, "usage: %s count [verbose]\n",
+                        cmdname(argv[0]));
                 return -1;
         }
 
@@ -1388,21 +1398,23 @@ static int jt_test_getattr(int argc, char **argv)
         next_time.tv_sec = start.tv_sec - verbose;
         next_time.tv_usec = start.tv_usec;
         if (verbose != 0)
-                printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]),
-                       count, ctime(&start.tv_sec));
+                printf("%s: getting %d attrs (testing only): %s",
+                       cmdname(argv[0]), count, ctime(&start.tv_sec));
 
         for (i = 1, next_count = verbose; i <= count; i++) {
-                rc = ioctl(fd, OBD_IOC_GETATTR , &data);
+                rc = ioctl(fd, OBD_IOC_GETATTR, &data);
                 SHMEM_BUMP();
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #%d - %s\n",
                                 cmdname(argv[0]), i, strerror(rc = errno));
                         break;
                 } else {
-                        if (be_verbose(verbose, &next_time, i,&next_count,count))
-                        printf("%s: got attr #%d\n", cmdname(argv[0]), i);
-               }
-       }
+                        if (be_verbose
+                            (verbose, &next_time, i, &next_count, count))
+                                printf("%s: got attr #%d\n", cmdname(argv[0]),
+                                       i);
+                }
+        }
 
         if (!rc) {
                 struct timeval end;
@@ -1518,7 +1530,8 @@ static int jt_test_brw(int argc, char **argv)
                                 cmdname(argv[0]), i, strerror(rc = errno),
                                 write ? "write" : "read");
                         break;
-                } else if (be_verbose(verbose, &next_time, i,&next_count,count))
+                } else if (be_verbose
+                           (verbose, &next_time, i, &next_count, count))
                         printf("%s: %s number %d\n", cmdname(argv[0]),
                                write ? "write" : "read", i);
         }
@@ -1536,8 +1549,9 @@ static int jt_test_brw(int argc, char **argv)
                 --i;
                 if (verbose != 0)
                         printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s",
-                               cmdname(argv[0]), write ? "wrote" : "read", obdos,
-                               pages, i, diff, (double)obdos * i * pages / diff,
+                               cmdname(argv[0]), write ? "wrote" : "read",
+                               obdos, pages, i, diff,
+                               (double)obdos * i * pages / diff,
                                ctime(&end.tv_sec));
         }
         return rc;
@@ -1546,53 +1560,52 @@ static int jt_test_brw(int argc, char **argv)
 static int jt_lov_config(int argc, char **argv)
 {
         struct obd_ioctl_data data;
-        struct lov_desc desc; 
+        struct lov_desc desc;
         uuid_t *uuidarray;
         int size, i;
         IOCINIT(data);
 
-        printf("WARNING: obdctl lovconfig NOT MAINTAINED\n"); 
+        printf("WARNING: obdctl lovconfig NOT MAINTAINED\n");
         return -1;
 
-        if (argc <= 5 ){
-                Parser_printhelp("lovconfig"); 
+        if (argc <= 5{
+                Parser_printhelp("lovconfig");
                 return -1;
         }
 
-        if (strlen(argv[1]) > sizeof(uuid_t) - 1) { 
-                fprintf(stderr, "lov_config: no %dB memory for uuid's\n", 
+        if (strlen(argv[1]) > sizeof(uuid_t) - 1) {
+                fprintf(stderr, "lov_config: no %dB memory for uuid's\n",
                         strlen(argv[1]));
                 return -ENOMEM;
         }
-            
-        memset(&desc, 0, sizeof(desc)); 
-        strcpy(desc.ld_uuid, argv[1]); 
-        desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); 
-        desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0); 
-        desc.ld_pattern = strtoul(argv[4], NULL, 0); 
+
+        memset(&desc, 0, sizeof(desc));
+        strcpy(desc.ld_uuid, argv[1]);
+        desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0);
+        desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0);
+        desc.ld_pattern = strtoul(argv[4], NULL, 0);
         desc.ld_tgt_count = argc - 5;
 
 
         size = sizeof(uuid_t) * desc.ld_tgt_count;
         uuidarray = malloc(size);
-        if (!uuidarray) { 
-                fprintf(stderr, "lov_config: no %dB memory for uuid's\n", 
-                        size);
+        if (!uuidarray) {
+                fprintf(stderr, "lov_config: no %dB memory for uuid's\n", size);
                 return -ENOMEM;
         }
-        memset(uuidarray, 0, size); 
-        for (i=5 ; i < argc ; i++) { 
-                char *buf = (char *) (uuidarray + i -5 );
-                if (strlen(argv[i]) >= sizeof(uuid_t)) { 
-                        fprintf(stderr, "lov_config: arg %d (%s) too long\n",  
+        memset(uuidarray, 0, size);
+        for (i = 5; i < argc; i++) {
+                char *buf = (char *)(uuidarray + i - 5);
+                if (strlen(argv[i]) >= sizeof(uuid_t)) {
+                        fprintf(stderr, "lov_config: arg %d (%s) too long\n",
                                 i, argv[i]);
                         free(uuidarray);
                         return -EINVAL;
                 }
-                strcpy(buf, argv[i]); 
+                strcpy(buf, argv[i]);
         }
 
-        data.ioc_inllen1 = sizeof(desc); 
+        data.ioc_inllen1 = sizeof(desc);
         data.ioc_inlbuf1 = (char *)&desc;
         data.ioc_inllen2 = size;
         data.ioc_inlbuf2 = (char *)uuidarray;
@@ -1602,15 +1615,14 @@ static int jt_lov_config(int argc, char **argv)
                 return -EINVAL;
         }
 
-        rc = ioctl(fd, OBD_IOC_LOV_CONFIG , buf);
+        rc = ioctl(fd, OBD_IOC_LOV_CONFIG, buf);
         if (rc < 0)
-                fprintf(stderr, "lov_config: error: %s: %s\n", 
-                        cmdname(argv[0]),strerror(rc = errno));
+                fprintf(stderr, "lov_config: error: %s: %s\n",
+                        cmdname(argv[0]), strerror(rc = errno));
         free(uuidarray);
         return rc;
 }
 
-
 static int jt_test_ldlm(int argc, char **argv)
 {
         struct obd_ioctl_data data;
@@ -1628,6 +1640,23 @@ static int jt_test_ldlm(int argc, char **argv)
         return rc;
 }
 
+static int jt_dump_ldlm(int argc, char **argv)
+{
+        struct obd_ioctl_data data;
+
+        IOCINIT(data);
+        if (argc != 1) {
+                fprintf(stderr, "usage: %s\n", cmdname(argv[0]));
+                return 1;
+        }
+
+        rc = ioctl(fd, IOC_LDLM_DUMP, &data);
+        if (rc)
+                fprintf(stderr, "error: %s failed: %s\n",
+                        cmdname(argv[0]), strerror(rc = errno));
+        return rc;
+}
+
 static int jt_newconn(int argc, char **argv)
 {
         struct obd_ioctl_data data;
@@ -1638,7 +1667,7 @@ static int jt_newconn(int argc, char **argv)
                 return -1;
         }
 
-        rc = ioctl(fd, OBD_IOC_RECOVD_NEWCONN , &data);
+        rc = ioctl(fd, OBD_IOC_RECOVD_NEWCONN, &data);
         if (rc < 0)
                 fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]),
                         strerror(rc = errno));
@@ -1658,15 +1687,16 @@ command_t cmdlist[] = {
         {"--xml", jt__xml, 0, "--xml <xml file> <command [args ...]>"},
         {"--device", jt__device, 0, "--device <devno> <command [args ...]>"},
         {"--threads", jt__threads, 0,
-                "--threads <threads> <devno> <command [args ...]>"},
+         "--threads <threads> <devno> <command [args ...]>"},
 
         /* Device configuration commands */
         {"lovconfig", jt_lov_config, 0, "configure lov data on MDS "
-         "[usage: lovconfig lov-uuid stripecount, stripesize, pattern, UUID1, [UUID2, ...]"}, 
+         "[usage: lovconfig lov-uuid stripecount, stripesize, pattern, UUID1, [UUID2, ...]"},
         {"list", jt_list, 0, "list the devices (no args)"},
         {"newdev", jt_newdev, 0, "set device to a new unused obd (no args)"},
         {"device", jt_device, 0, "set current device (args device_no name)"},
-        {"name2dev", jt_name2dev, 0, "set device by name [usage: name2dev devname]"},
+        {"name2dev", jt_name2dev, 0,
+         "set device by name [usage: name2dev devname]"},
         {"attach", jt_attach, 0, "name the type of device (args: type data"},
         {"setup", jt_setup, 0, "setup device (args: <blkdev> [data]"},
         {"detach", jt_detach, 0, "detach the current device (arg: )"},
@@ -1675,7 +1705,7 @@ command_t cmdlist[] = {
         /* Session commands */
         {"connect", jt_connect, 0, "connect - get a connection to device"},
         {"disconnect", jt_disconnect, 0,
-                "disconnect - break connection to device"},
+         "disconnect - break connection to device"},
 
         /* Session operations */
         {"create", jt_create, 0, "create [count [mode [verbose]]]"},
@@ -1686,12 +1716,13 @@ command_t cmdlist[] = {
         {"test_getattr", jt_test_getattr, 0, "test_getattr <count> [verbose]"},
         {"test_brw", jt_test_brw, 0, "test_brw <count> [write [verbose]]"},
         {"test_ldlm", jt_test_ldlm, 0, "test lock manager (no args)"},
+        {"dump_ldlm", jt_dump_ldlm, 0, "dump all lock manager state (no args)"},
 
         /* User interface commands */
         {"help", Parser_help, 0, "help"},
         {"exit", jt_quit, 0, "quit"},
         {"quit", jt_quit, 0, "quit"},
-        { 0, 0, 0, NULL }
+        {0, 0, 0, NULL}
 };
 
 
@@ -1700,9 +1731,8 @@ static void signal_server(int sig)
         if (sig == SIGINT) {
                 do_disconnect("sigint", 1);
                 exit(1);
-        } else {
+        } else
                 fprintf(stderr, "%s: got signal %d\n", cmdname("sigint"), sig);
-        }
 }
 
 int main(int argc, char **argv)
@@ -1714,9 +1744,9 @@ int main(int argc, char **argv)
         sigact.sa_flags = SA_RESTART;
         sigaction(SIGINT, &sigact, NULL);
 
-        setlinebuf (stdout);
+        setlinebuf(stdout);
         SHMEM_SETUP();
-        
+
         if (argc > 1) {
                 rc = Parser_execarg(argc - 1, argv + 1, cmdlist);
         } else {
@@ -1727,4 +1757,3 @@ int main(int argc, char **argv)
         do_disconnect(argv[0], 1);
         return rc;
 }
-