Whamcloud - gitweb
Landing the ldlm_testing branch; now the only difference is that the locking
authorpschwan <pschwan>
Thu, 9 May 2002 17:08:39 +0000 (17:08 +0000)
committerpschwan <pschwan>
Thu, 9 May 2002 17:08:39 +0000 (17:08 +0000)
calls are #if 0ed out of the trunk's ll_file_read and ll_file_write

15 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/llite/file.c
lustre/llite/super.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c

index 9d8ee98..42621d9 100644 (file)
@@ -28,6 +28,7 @@ typedef enum {
 #define LDLM_FL_BLOCK_GRANTED  (1 << 1)
 #define LDLM_FL_BLOCK_CONV     (1 << 2)
 #define LDLM_FL_BLOCK_WAIT     (1 << 3)
+#define LDLM_FL_DYING          (1 << 4)
 
 #define L2B(c) (1 << c)
 
@@ -74,12 +75,12 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
 */
 
 struct ldlm_namespace {
-        struct list_head      ns_link;      /* in the list of ns's */
-        __u32                 ns_id;        /* identifier of ns */
+        struct obd_device    *ns_obddev;
+        __u32                 ns_local;     /* is this a local lock tree? */
         struct list_head     *ns_hash;      /* hash table for ns */
-        atomic_t              ns_refcount;  /* count of resources in the hash */
+        __u32                 ns_refcount;  /* count of resources in the hash */
         struct list_head      ns_root_list; /* all root resources in ns */
-        struct obd_device    *ns_obddev;
+        spinlock_t            ns_lock;      /* protects hash, refcount, list */
 };
 
 /* 
@@ -103,19 +104,32 @@ struct ldlm_lock {
         struct list_head      l_children;
         struct list_head      l_childof;
         struct list_head      l_res_link; /*position in one of three res lists*/
+
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
+
         ldlm_lock_callback    l_completion_ast;
         ldlm_lock_callback    l_blocking_ast;
+
         struct ptlrpc_connection *l_connection;
+        struct ptlrpc_client *l_client;
+        __u32                 l_flags;
+        struct ldlm_handle    l_remote_handle;
         void                 *l_data;
         __u32                 l_data_len;
         struct ldlm_extent    l_extent;
-        struct ldlm_handle    l_remote_handle;
         //void                 *l_event;
         //XXX cluster_host    l_holder;
         __u32                 l_version[RES_VERSION_SIZE];
+
+        __u32                 l_readers;
+        __u32                 l_writers;
+
+        /* If the lock is granted, a process sleeps on this waitq to learn when
+         * it's no longer in use.  If the lock is not granted, a process sleeps
+         * on this waitq to learn when it becomes granted. */
         wait_queue_head_t     l_waitq;
+        spinlock_t            l_lock;
 };
 
 typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new);
@@ -124,11 +138,12 @@ typedef int (*ldlm_res_policy)(struct ldlm_resource *parent,
                                struct ldlm_extent *new_ex,
                                ldlm_mode_t mode, void *data);
 
-#define LDLM_PLAIN       0x0
-#define LDLM_EXTENT      0x1
-#define LDLM_MDSINTENT   0x2
+#define LDLM_PLAIN       0
+#define LDLM_EXTENT      1
+#define LDLM_MDSINTENT   2
+
+#define LDLM_MAX_TYPE 2
 
-#define LDLM_MAX_TYPE    0x2
 extern ldlm_res_compat ldlm_res_compat_table []; 
 extern ldlm_res_policy ldlm_res_policy_table []; 
 
@@ -144,13 +159,13 @@ struct ldlm_resource {
         struct list_head       lr_converting;
         struct list_head       lr_waiting;
         ldlm_mode_t            lr_most_restr;
-        atomic_t               lr_refcount;
         __u32                  lr_type; /* PLAIN, EXTENT, or MDSINTENT */
         struct ldlm_resource  *lr_root;
         //XXX cluster_host          lr_master;
         __u64                  lr_name[RES_NAME_SIZE];
         __u32                  lr_version[RES_VERSION_SIZE];
-        spinlock_t             lr_lock;
+        __u32                  lr_refcount;
+        spinlock_t             lr_lock; /* protects lists, refcount */
 };
 
 static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
@@ -170,19 +185,6 @@ static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle)
         handle->addr = (__u64)(unsigned long)object;
 }
 
-extern struct list_head ldlm_namespaces;
-extern spinlock_t ldlm_spinlock;
-
-static inline void ldlm_lock(void)
-{
-        spin_lock(&ldlm_spinlock);
-}
-
-static inline void ldlm_unlock(void)
-{
-        spin_unlock(&ldlm_spinlock);
-}
-
 extern struct obd_ops ldlm_obd_ops;
 
 /* ldlm_extent.c */
@@ -193,31 +195,35 @@ int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *,
 /* ldlm_lock.c */
 void ldlm_lock_free(struct ldlm_lock *lock);
 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
-ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
+int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
+                          struct ldlm_extent *extent, ldlm_mode_t mode,
+                          struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
                                     struct ldlm_handle *parent_lock_handle,
-                                    __u64 *res_id,
-                                    __u32 type,
+                                    __u64 *res_id, __u32 type,
+                                     ldlm_mode_t mode,
+                                    void *data,
+                                    __u32 data_len,
                                     struct ldlm_handle *lockh);
 ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
-                                     ldlm_mode_t mode,
                                      struct ldlm_extent *req_ex,
                                      int *flags,
                                      ldlm_lock_callback completion,
-                                     ldlm_lock_callback blocking,
-                                     void *data,
-                                     __u32 data_len);
-ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
-                                     int new_mode, int *flags);
-ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh);
+                                     ldlm_lock_callback blocking);
+struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
+                                              int new_mode, int *flags);
+struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock);
+void ldlm_reprocess_all(struct ldlm_resource *res);
 void ldlm_lock_dump(struct ldlm_lock *lock);
 
 /* ldlm_test.c */
 int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn);
 
 /* resource.c */
-struct ldlm_namespace *ldlm_namespace_find(__u32 id);
-ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id,
-                                struct ldlm_namespace **);
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 local);
 int ldlm_namespace_free(struct ldlm_namespace *ns);
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
@@ -230,28 +236,22 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc);
 void ldlm_resource_dump(struct ldlm_resource *res);
 
 /* ldlm_request.c */
-int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *,
-                           struct ptlrpc_connection *, __u32 ns_id);
 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer,
-                     __u32 ns_id,
+                     struct ldlm_namespace *ns,
                      struct ldlm_handle *parent_lock_handle,
                      __u64 *res_id,
                      __u32 type,
                      struct ldlm_extent *req_ex,
                      ldlm_mode_t mode,
                      int *flags,
-                     ldlm_lock_callback completion,
-                     ldlm_lock_callback blocking,
                      void *data,
                      __u32 data_len,
-                     struct ldlm_handle *lockh,
-                     struct ptlrpc_request **request);
+                     struct ldlm_handle *lockh);
 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
                       void *data, __u32 data_len);
 int ldlm_cli_convert(struct ptlrpc_client *, struct ldlm_handle *,
-                     int new_mode, int *flags, struct ptlrpc_request **);
-int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_handle *,
-                    struct ptlrpc_request **);
+                     int new_mode, int *flags);
+int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *);
 
 
 #endif /* __KERNEL__ */
index 1c9890f..cce3c43 100644 (file)
@@ -269,11 +269,10 @@ struct mds_rec_rename {
  */
 
 /* opcodes */
-#define LDLM_NAMESPACE_NEW 1
-#define LDLM_ENQUEUE       2
-#define LDLM_CONVERT       3
-#define LDLM_CANCEL        4
-#define LDLM_CALLBACK      5
+#define LDLM_ENQUEUE       1
+#define LDLM_CONVERT       2
+#define LDLM_CANCEL        3
+#define LDLM_CALLBACK      4
 
 #define RES_NAME_SIZE 3
 #define RES_VERSION_SIZE 4
@@ -299,7 +298,6 @@ struct ldlm_extent {
 };
 
 struct ldlm_resource_desc {
-        __u32 lr_ns_id;
         __u32 lr_type;
         __u64 lr_name[RES_NAME_SIZE];
         __u64 lr_version[RES_VERSION_SIZE];
index 2db81cf..fcf3998 100644 (file)
@@ -46,6 +46,14 @@ int simple_mkdir(struct dentry *dir, char *name, int mode);
 int lustre_fread(struct file *file, char *str, int len, loff_t *off);
 int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off);
 int lustre_fsync(struct file *file);
+
+static inline void ll_sleep(int t)
+{
+        set_current_state(TASK_INTERRUPTIBLE);
+        schedule_timeout(t * HZ);
+        set_current_state(TASK_RUNNING);
+}
+
 #endif
 
 #include <linux/portals_lib.h>
index 7ed6e48..a428858 100644 (file)
@@ -344,6 +344,33 @@ static inline int obd_iocontrol(int cmd, struct obd_conn *conn,
         RETURN(rc);
 }
 
+static inline int obd_enqueue(struct obd_conn *conn, struct ldlm_namespace *ns,
+                              struct ldlm_handle *parent_lock, __u64 *res_id,
+                              __u32 type, struct ldlm_extent *extent,
+                              __u32 mode, int *flags, void *data, int datalen,
+                              struct ldlm_handle *lockh)
+{
+        int rc;
+        OBD_CHECK_SETUP(conn);
+        OBD_CHECK_OP(conn, enqueue);
+        
+        rc = OBP(conn->oc_dev, enqueue)(conn, ns, parent_lock, res_id, type,
+                                        extent, mode, flags, data, datalen,
+                                        lockh);
+        RETURN(rc);
+}
+
+static inline int obd_cancel(struct obd_conn *conn, __u32 mode,
+                             struct ldlm_handle *lockh)
+{
+        int rc;
+        OBD_CHECK_SETUP(conn);
+        OBD_CHECK_OP(conn, cancel);
+        
+        rc = OBP(conn->oc_dev, cancel)(conn, mode, lockh);
+        RETURN(rc);
+}
+
 #endif 
 
 /*
@@ -396,8 +423,7 @@ static __inline__ struct obdo *obdo_fromid(struct obd_conn *conn, obd_id id,
         ENTRY;
         oa = obdo_alloc();
         if ( !oa ) {
-                EXIT;
-                return ERR_PTR(-ENOMEM);
+                RETURN(ERR_PTR(-ENOMEM));
         }
 
         oa->o_id = id;
@@ -405,11 +431,9 @@ static __inline__ struct obdo *obdo_fromid(struct obd_conn *conn, obd_id id,
         oa->o_valid = valid;
         if ((err = OBP(conn->oc_dev, getattr)(conn, oa))) {
                 obdo_free(oa);
-                EXIT;
-                return ERR_PTR(err);
+                RETURN(ERR_PTR(err));
         }
-        EXIT;
-        return oa;
+        RETURN(oa);
 }
 
 static inline void obdo_from_iattr(struct obdo *oa, struct iattr *attr)
index e24a457..b334e4a 100644 (file)
@@ -26,9 +26,9 @@ int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
 {
         if (MAX(a->l_extent.start, b->l_extent.start) <=
             MIN(a->l_extent.end, b->l_extent.end))
-                return 0;
+                RETURN(0);
 
-        return 1;
+        RETURN(1);
 }
 
 static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex,
index 83e2d2c..05bd152 100644 (file)
@@ -45,7 +45,9 @@ static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
         return 0;
 }
 
-/* Caller should do ldlm_resource_get() on this resource first. */
+/* Args: referenced, unlocked parent (or NULL)
+ *       referenced, unlocked resource
+ * Locks: parent->l_lock */
 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
                                        struct ldlm_resource *resource)
 {
@@ -61,23 +63,38 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         memset(lock, 0, sizeof(*lock));
         lock->l_resource = resource;
         INIT_LIST_HEAD(&lock->l_children);
+        INIT_LIST_HEAD(&lock->l_res_link);
         init_waitqueue_head(&lock->l_waitq);
+        lock->l_lock = SPIN_LOCK_UNLOCKED;
 
         if (parent != NULL) {
+                spin_lock(&parent->l_lock);
                 lock->l_parent = parent;
                 list_add(&lock->l_childof, &parent->l_children);
+                spin_unlock(&parent->l_lock);
         }
 
         return lock;
 }
 
-/* Caller must do its own ldlm_resource_put() on lock->l_resource */
+/* Args: unreferenced, locked lock
+ *
+ * Caller must do its own ldlm_resource_put() on lock->l_resource */
 void ldlm_lock_free(struct ldlm_lock *lock)
 {
         if (!list_empty(&lock->l_children)) {
-                CERROR("lock still has children!\n");
+                CERROR("lock %p still has children (%p)!\n", lock,
+                       lock->l_children.next);
+                ldlm_lock_dump(lock);
                 LBUG();
         }
+
+        if (lock->l_readers || lock->l_writers)
+                CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+                       "writers)\n", lock->l_readers, lock->l_writers);
+
+        if (lock->l_connection)
+                ptlrpc_put_connection(lock->l_connection);
         kmem_cache_free(ldlm_lock_slab, lock);
 }
 
@@ -90,25 +107,77 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
 }
 
-static int ldlm_lock_compat(struct ldlm_lock *lock)
+/* Args: unlocked lock */
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
 {
-        struct list_head *tmp;
+        spin_lock(&lock->l_lock);
+        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+                lock->l_readers++;
+        else
+                lock->l_writers++;
+        spin_unlock(&lock->l_lock);
+}
+
+/* Args: unlocked lock */
+void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
+{
+        int rc;
+
+        spin_lock(&lock->l_lock);
+        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+                lock->l_readers--;
+        else
+                lock->l_writers--;
+        if (!lock->l_readers && !lock->l_writers &&
+            lock->l_flags & LDLM_FL_DYING) {
+                /* Read this lock its rights. */
+                if (!lock->l_resource->lr_namespace->ns_local) {
+                        CERROR("LDLM_FL_DYING set on non-local lock!\n");
+                        LBUG();
+                }
+
+                CDEBUG(D_INFO, "final decref done on dying lock, "
+                       "cancelling.\n");
+                spin_unlock(&lock->l_lock);
+                rc = ldlm_cli_cancel(lock->l_client, lock);
+                if (rc) {
+                        /* FIXME: do something more dramatic */
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+                }
+        } else
+                spin_unlock(&lock->l_lock);
+}
+
+/* Args: locked lock */
+static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
+                             struct list_head *queue)
+{
+        struct list_head *tmp, *pos;
         int rc = 0;
 
-        list_for_each(tmp, &lock->l_resource->lr_granted) {
+        list_for_each_safe(tmp, pos, queue) {
                 struct ldlm_lock *child;
                 ldlm_res_compat compat;
 
                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
+                if (lock == child)
+                        continue;
 
                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
-                if (compat(child, lock) ||
-                    lockmode_compat(child->l_req_mode, lock->l_req_mode))
+                if (compat(child, lock)) {
+                        CDEBUG(D_OTHER, "compat function succeded, next.\n");
+                        continue;
+                }
+                if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
+                        CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
                         continue;
+                }
 
                 rc = 1;
 
-                if (child->l_blocking_ast != NULL)
+                CDEBUG(D_OTHER, "compat function failed and lock modes are "
+                       "incompatible; sending blocking AST.\n");
+                if (send_cbs && child->l_blocking_ast != NULL)
                         child->l_blocking_ast(child, lock, child->l_data,
                                               child->l_data_len);
         }
@@ -116,8 +185,24 @@ static int ldlm_lock_compat(struct ldlm_lock *lock)
         return rc;
 }
 
-static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+/* Args: unlocked lock */
+static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
 {
+        int rc;
+        ENTRY;
+
+        rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
+        rc |= _ldlm_lock_compat(lock, send_cbs,
+                                &lock->l_resource->lr_converting);
+
+        RETURN(rc);
+}
+
+/* Args: locked lock, locked resource */
+void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+{
+        ENTRY;
+
         ldlm_resource_add_lock(res, &res->lr_granted, lock);
         lock->l_granted_mode = lock->l_req_mode;
 
@@ -127,21 +212,82 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
         if (lock->l_completion_ast)
                 lock->l_completion_ast(lock, NULL,
                                        lock->l_data, lock->l_data_len);
+        EXIT;
 }
 
-ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+static int search_queue(struct list_head *queue, ldlm_mode_t mode,
+                        struct ldlm_extent *extent, struct ldlm_handle *lockh)
+{
+        struct list_head *tmp;
+
+        list_for_each(tmp, queue) { 
+                struct ldlm_lock *lock;
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (lock->l_flags & LDLM_FL_DYING)
+                        continue;
+
+                /* lock_convert() takes the resource lock, so we're sure that
+                 * req_mode, lr_type, and l_extent won't change beneath us */
+                if (lock->l_req_mode != mode)
+                        continue;
+
+                if (lock->l_resource->lr_type == LDLM_EXTENT &&
+                    (lock->l_extent.start > extent->start ||
+                     lock->l_extent.end < extent->end))
+                        continue;
+
+                ldlm_lock_addref(lock, mode);
+                ldlm_object2handle(lock, lockh);
+                return 1;
+        }
+
+        return 0;
+}
+
+/* Must be called with no resource or lock locks held.
+ *
+ * Returns 1 if it finds an already-existing lock that is compatible; in this
+ * case, lockh is filled in with a addref()ed lock */
+int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
+                          struct ldlm_extent *extent, ldlm_mode_t mode,
+                          struct ldlm_handle *lockh)
+{
+        struct ldlm_resource *res;
+        int rc = 0;
+        ENTRY;
+
+        res = ldlm_resource_get(ns, NULL, res_id, type, 0);
+        if (res == NULL)
+                RETURN(0);
+
+        spin_lock(&res->lr_lock);
+        if (search_queue(&res->lr_granted, mode, extent, lockh))
+                GOTO(out, rc = 1);
+        if (search_queue(&res->lr_converting, mode, extent, lockh))
+                GOTO(out, rc = 1);
+        if (search_queue(&res->lr_waiting, mode, extent, lockh))
+                GOTO(out, rc = 1);
+
+        EXIT;
+ out:
+        ldlm_resource_put(res);
+        spin_unlock(&res->lr_lock);
+        return rc;
+}
+
+/* Must be called without the resource lock held. */
+ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
                                     struct ldlm_handle *parent_lock_handle,
                                     __u64 *res_id, __u32 type,
+                                     ldlm_mode_t mode,
+                                    void *data,
+                                    __u32 data_len,
                                     struct ldlm_handle *lockh)
 {
-        struct ldlm_namespace *ns;
         struct ldlm_resource *res, *parent_res = NULL;
         struct ldlm_lock *lock, *parent_lock;
 
-        ns = ldlm_namespace_find(ns_id);
-        if (ns == NULL || ns->ns_hash == NULL) 
-                RETURN(-ELDLM_BAD_NAMESPACE);
-
         parent_lock = ldlm_handle2object(parent_lock_handle);
         if (parent_lock)
                 parent_res = parent_lock->l_resource;
@@ -151,134 +297,172 @@ ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
                 RETURN(-ENOMEM);
 
         lock = ldlm_lock_new(parent_lock, res);
-        if (lock == NULL)
+        if (lock == NULL) {
+                spin_lock(&res->lr_lock);
+                ldlm_resource_put(res);
+                spin_unlock(&res->lr_lock);
                 RETURN(-ENOMEM);
+        }
 
-        ldlm_object2handle(lock, lockh);
+        lock->l_req_mode = mode;
+        lock->l_data = data;
+        lock->l_data_len = data_len;
+        ldlm_lock_addref(lock, mode);
 
+        ldlm_object2handle(lock, lockh);
         return ELDLM_OK;
 }
 
-/* XXX: Revisit the error handling; we do not, for example, do
- * ldlm_resource_put()s in our error cases, and we probably leak any allocated
- * memory. */
+/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
 ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
-                                     ldlm_mode_t mode,
                                      struct ldlm_extent *req_ex,
                                      int *flags,
                                      ldlm_lock_callback completion,
-                                     ldlm_lock_callback blocking,
-                                     void *data,
-                                     __u32 data_len)
+                                     ldlm_lock_callback blocking)
 {
+        struct ldlm_resource *res;
         struct ldlm_lock *lock;
-        struct ldlm_extent new_ex;
-        int incompat = 0, rc;
+        int incompat = 0, local;
         ldlm_res_policy policy;
         ENTRY;
 
         lock = ldlm_handle2object(lockh);
-        if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) {
-                rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL);
+        res = lock->l_resource;
+        local = res->lr_namespace->ns_local;
+        spin_lock(&res->lr_lock);
+
+        lock->l_blocking_ast = blocking;
+
+        if ((res->lr_type == LDLM_EXTENT && !req_ex) ||
+            (res->lr_type != LDLM_EXTENT && req_ex))
+                LBUG();
+
+        if ((policy = ldlm_res_policy_table[res->lr_type])) {
+                struct ldlm_extent new_ex;
+                int rc = policy(res, req_ex, &new_ex, lock->l_req_mode, NULL);
                 if (rc == ELDLM_LOCK_CHANGED) {
                         *flags |= LDLM_FL_LOCK_CHANGED;
                         memcpy(req_ex, &new_ex, sizeof(new_ex));
                 }
         }
 
-        if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) ||
-            (lock->l_resource->lr_type != LDLM_EXTENT && req_ex))
-                LBUG();
         if (req_ex)
                 memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
-        lock->l_req_mode = mode;
-        lock->l_data = data;
-        lock->l_data_len = data_len;
-        lock->l_blocking_ast = blocking;
-        spin_lock(&lock->l_resource->lr_lock);
 
-        /* FIXME: We may want to optimize by checking lr_most_restr */
+        if (local && lock->l_req_mode == lock->l_granted_mode) {
+                /* The server returned a blocked lock, but it was granted before
+                 * we got a chance to actually enqueue it.  We don't need to do
+                 * anything else. */
+                GOTO(out, ELDLM_OK);
+        }
 
-        if (!list_empty(&lock->l_resource->lr_converting)) {
-                ldlm_resource_add_lock(lock->l_resource,
-                                       lock->l_resource->lr_waiting.prev, lock);
+        /* If this is a local resource, put it on the appropriate list. */
+        if (local) {
+                if (*flags & LDLM_FL_BLOCK_CONV)
+                        ldlm_resource_add_lock(res, res->lr_converting.prev,
+                                               lock);
+                else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+                        ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                else
+                        ldlm_grant_lock(res, lock);
+                GOTO(out, ELDLM_OK);
+        }
+
+        /* FIXME: We may want to optimize by checking lr_most_restr */
+        if (!list_empty(&res->lr_converting)) {
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
                 *flags |= LDLM_FL_BLOCK_CONV;
                 GOTO(out, ELDLM_OK);
         }
-        if (!list_empty(&lock->l_resource->lr_waiting)) {
-                ldlm_resource_add_lock(lock->l_resource,
-                                       lock->l_resource->lr_waiting.prev, lock);
+        if (!list_empty(&res->lr_waiting)) {
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
                 *flags |= LDLM_FL_BLOCK_WAIT;
                 GOTO(out, ELDLM_OK);
         }
-
-        incompat = ldlm_lock_compat(lock);
+        incompat = ldlm_lock_compat(lock, 0);
         if (incompat) {
-                ldlm_resource_add_lock(lock->l_resource,
-                                       lock->l_resource->lr_waiting.prev, lock);
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
                 GOTO(out, ELDLM_OK);
         }
 
-        ldlm_grant_lock(lock->l_resource, lock);
+        ldlm_grant_lock(res, lock);
         EXIT;
  out:
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
-        spin_unlock(&lock->l_resource->lr_lock);
+        spin_unlock(&res->lr_lock);
         return ELDLM_OK;
 }
 
+/* Must be called with resource->lr_lock taken. */
 static int ldlm_reprocess_queue(struct ldlm_resource *res,
                                 struct list_head *converting)
 {
         struct list_head *tmp, *pos;
-        int incompat = 0;
+        ENTRY;
 
         list_for_each_safe(tmp, pos, converting) { 
                 struct ldlm_lock *pending;
                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                incompat = ldlm_lock_compat(pending);
-                if (incompat)
-                        break;
+                /* the resource lock protects ldlm_lock_compat */
+                if (ldlm_lock_compat(pending, 1))
+                        RETURN(1);
 
-                list_del(&pending->l_res_link); 
+                list_del_init(&pending->l_res_link); 
                 ldlm_grant_lock(res, pending);
+
+                ldlm_lock_addref(pending, pending->l_req_mode);
+                ldlm_lock_decref(pending, pending->l_granted_mode);
         }
 
-        return incompat;
+        RETURN(0);
 }
 
-static void ldlm_reprocess_all(struct ldlm_resource *res)
+/* Must be called with resource->lr_lock not taken. */
+void ldlm_reprocess_all(struct ldlm_resource *res)
 {
+        /* Local lock trees don't get reprocessed. */
+        if (res->lr_namespace->ns_local)
+                return;
+
+        spin_lock(&res->lr_lock);
         ldlm_reprocess_queue(res, &res->lr_converting);
         if (list_empty(&res->lr_converting))
                 ldlm_reprocess_queue(res, &res->lr_waiting);
+        spin_unlock(&res->lr_lock);
 }
 
-ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh)
+/* Must be called with lock and lock->l_resource unlocked */
+struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
 {
-        struct ldlm_lock *lock;
         struct ldlm_resource *res;
         ENTRY;
 
-        lock = ldlm_handle2object(lockh);
         res = lock->l_resource;
 
-        ldlm_resource_del_lock(lock);
+        spin_lock(&res->lr_lock);
+        spin_lock(&lock->l_lock);
 
-        ldlm_lock_free(lock);
+        if (lock->l_readers || lock->l_writers)
+                CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+                       "writers)\n", lock->l_readers, lock->l_writers);
+
+        ldlm_resource_del_lock(lock);
         if (ldlm_resource_put(res))
-                RETURN(ELDLM_OK);
-        ldlm_reprocess_all(res);
+                res = NULL; /* res was freed, nothing else to do. */
+        else
+                spin_unlock(&res->lr_lock);
+        ldlm_lock_free(lock);
 
-        RETURN(ELDLM_OK);
+        RETURN(res);
 }
 
-ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
-                                     int new_mode, int *flags)
+/* Must be called with lock and lock->l_resource unlocked */
+struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
+                                              int new_mode, int *flags)
 {
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
@@ -286,20 +470,37 @@ ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
 
         lock = ldlm_handle2object(lockh);
         res = lock->l_resource;
-        list_del(&lock->l_res_link);
-        lock->l_req_mode = new_mode;
 
-        list_add(&lock->l_res_link, res->lr_converting.prev);
+        spin_lock(&res->lr_lock);
+
+        lock->l_req_mode = new_mode;
+        list_del_init(&lock->l_res_link);
+
+        /* If this is a local resource, put it on the appropriate list. */
+        if (res->lr_namespace->ns_local) {
+                if (*flags & LDLM_FL_BLOCK_CONV)
+                        ldlm_resource_add_lock(res, res->lr_converting.prev,
+                                               lock);
+                else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+                        ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                else
+                        ldlm_grant_lock(res, lock);
+        } else {
+                list_add(&lock->l_res_link, res->lr_converting.prev);
+        }
 
-        ldlm_reprocess_all(res);
+        spin_unlock(&res->lr_lock);
 
-        RETURN(ELDLM_OK);
+        RETURN(res);
 }
 
 void ldlm_lock_dump(struct ldlm_lock *lock)
 {
         char ver[128];
 
+        if (!(portal_debug & D_OTHER))
+                return;
+
         if (RES_VERSION_SIZE != 4)
                 LBUG();
 
@@ -312,6 +513,8 @@ void ldlm_lock_dump(struct ldlm_lock *lock)
         CDEBUG(D_OTHER, "  Resource: %p\n", lock->l_resource);
         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
                (int)lock->l_req_mode, (int)lock->l_granted_mode);
+        CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
+               lock->l_readers, lock->l_writers);
         if (lock->l_resource->lr_type == LDLM_EXTENT)
                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
                        (unsigned long long)lock->l_extent.start,
index 730d606..9bef050 100644 (file)
  */
 
 #define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_LDLM
 
-#include <linux/version.h>
 #include <linux/module.h>
 #include <linux/slab.h>
-#include <asm/unistd.h>
-
-#define DEBUG_SUBSYSTEM S_LDLM
-
 #include <linux/lustre_dlm.h>
 
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 
-static int _ldlm_namespace_new(struct obd_device *obddev,
-                               struct ptlrpc_request *req)
+#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
+
+static int is_local_conn(struct ptlrpc_connection *conn)
 {
-        struct ldlm_request *dlm_req;
-        struct ldlm_namespace *ns;
-        int rc;
-        ldlm_error_t err;
         ENTRY;
+        if (conn == NULL)
+                RETURN(1);
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc) {
-                CERROR("out of memory\n");
-                req->rq_status = -ENOMEM;
-                RETURN(0);
-        }
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+        RETURN(LOOPBACK(conn->c_peer.peer_nid));
+}
 
-        err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
-                                 &ns);
-        req->rq_status = err;
+/* _ldlm_callback and local_callback setup the variables then call this common
+ * code */
+static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+                           ldlm_mode_t mode, void *data, __u32 data_len)
+{
+        ENTRY;
+        ldlm_lock_dump(lock);
 
-        CERROR("err = %d\n", err);
+        if (!lock)
+                LBUG();
+        if (!lock->l_resource)
+                LBUG();
 
+        spin_lock(&lock->l_resource->lr_lock);
+        spin_lock(&lock->l_lock);
+        if (!new) {
+                CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
+                lock->l_req_mode = mode;
+                list_del_init(&lock->l_res_link); 
+                ldlm_grant_lock(lock->l_resource, lock);
+                wake_up(&lock->l_waitq);
+                spin_unlock(&lock->l_lock);
+                spin_unlock(&lock->l_resource->lr_lock);
+        } else {
+                CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
+                lock->l_flags |= LDLM_FL_DYING;
+                spin_unlock(&lock->l_lock);
+                spin_unlock(&lock->l_resource->lr_lock);
+                if (!lock->l_readers && !lock->l_writers) {
+                        CDEBUG(D_INFO, "Lock already unused, canceling.\n");
+                        if (ldlm_cli_cancel(lock->l_client, lock))
+                                LBUG();
+                } else {
+                        CDEBUG(D_INFO, "Lock still has references; lock will be"
+                               " cancelled later.\n");
+                }
+        }
         RETURN(0);
 }
 
-static int _ldlm_enqueue(struct ptlrpc_request *req)
+static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
+                          void *data, __u32 data_len)
+{
+        struct ldlm_lock *lock;
+        /* the 'remote handle' is the lock in the FS's namespace */
+        lock = ldlm_handle2object(&l->l_remote_handle);
+
+        return common_callback(lock, new, l->l_granted_mode, data, data_len);
+}
+
+static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
+                         struct ptlrpc_request *req)
 {
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
         int rc, size = sizeof(*dlm_rep);
         ldlm_error_t err;
-        struct ldlm_lock *lock;
+        struct ldlm_lock *lock = NULL;
+        ldlm_lock_callback callback;
         ENTRY;
 
+        /* Is this lock managed locally? */
+        if (is_local_conn(req->rq_connection))
+                callback = local_callback;
+        else
+                callback = ldlm_cli_callback;
+
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
-                req->rq_status = -ENOMEM;
-                RETURN(0);
+                RETURN(-ENOMEM);
         }
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
@@ -71,22 +109,21 @@ static int _ldlm_enqueue(struct ptlrpc_request *req)
                sizeof(dlm_rep->lock_extent));
         dlm_rep->flags = dlm_req->flags;
 
-        err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
+        err = ldlm_local_lock_create(obddev->obd_namespace,
                                      &dlm_req->lock_handle2,
                                      dlm_req->lock_desc.l_resource.lr_name,
                                      dlm_req->lock_desc.l_resource.lr_type,
+                                     dlm_req->lock_desc.l_req_mode,
+                                     lustre_msg_buf(req->rq_reqmsg, 1),
+                                     req->rq_reqmsg->buflens[1],
                                      &dlm_rep->lock_handle);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
         err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
-                                      dlm_req->lock_desc.l_req_mode,
                                       &dlm_rep->lock_extent,
                                       &dlm_rep->flags,
-                                      ldlm_cli_callback,
-                                      ldlm_cli_callback,
-                                      lustre_msg_buf(req->rq_reqmsg, 1),
-                                      req->rq_reqmsg->buflens[1]);
+                                      callback, callback);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -97,78 +134,99 @@ static int _ldlm_enqueue(struct ptlrpc_request *req)
         EXIT;
  out:
         req->rq_status = err;
-        CERROR("err = %d\n", err);
+        CDEBUG(D_INFO, "err = %d\n", err);
+
+        if (ptlrpc_reply(svc, req))
+                LBUG();
+
+        if (!err) {
+                ldlm_reprocess_all(lock->l_resource);
+        }
 
         return 0;
 }
 
-static int _ldlm_convert(struct ptlrpc_request *req)
+static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
-        struct ldlm_request *dlm_req;
-        int rc;
+        struct ldlm_request *dlm_req, *dlm_rep;
+        struct ldlm_resource *res;
+        int rc, size = sizeof(*dlm_rep);
         ENTRY;
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
-                req->rq_status = -ENOMEM;
-                RETURN(0);
+                RETURN(-ENOMEM);
         }
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+        dlm_rep->flags = dlm_req->flags;
+
+        res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
+                                      dlm_req->lock_desc.l_req_mode,
+                                      &dlm_rep->flags);
+        req->rq_status = 0;
+        if (ptlrpc_reply(svc, req) != 0)
+                LBUG();
+
+        ldlm_reprocess_all(res);
 
-        req->rq_status =
-                ldlm_local_lock_convert(&dlm_req->lock_handle1,
-                                        dlm_req->lock_desc.l_req_mode,
-                                        &dlm_req->flags);
         RETURN(0);
 }
 
-static int _ldlm_cancel(struct ptlrpc_request *req)
+static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
+        struct ldlm_lock *lock;
+        struct ldlm_resource *res;
         int rc;
         ENTRY;
 
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
-                req->rq_status = -ENOMEM;
-                RETURN(0);
+                RETURN(-ENOMEM);
         }
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
+        lock = ldlm_handle2object(&dlm_req->lock_handle1);
+        res = ldlm_local_lock_cancel(lock);
+        req->rq_status = 0;
+        if (ptlrpc_reply(svc, req) != 0)
+                LBUG();
+
+        if (res != NULL)
+                ldlm_reprocess_all(res);
+
         RETURN(0);
 }
 
-static int _ldlm_callback(struct ptlrpc_request *req)
+static int _ldlm_callback(struct ptlrpc_service *svc,
+                          struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock;
+        struct ldlm_lock *lock1, *lock2;
         int rc;
         ENTRY;
 
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
-                req->rq_status = -ENOMEM;
-                RETURN(0);
+                RETURN(-ENOMEM);
         }
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = ldlm_handle2object(&dlm_req->lock_handle1);
-        ldlm_lock_dump(lock);
-        if (dlm_req->lock_handle2.addr) {
-                CERROR("Got blocked callback for lock %p.\n", lock);
-                /* FIXME: do something impressive. */
-        } else {
-                CERROR("Got granted callback for lock %p.\n", lock);
-                lock->l_granted_mode = lock->l_req_mode;
-                wake_up(&lock->l_waitq);
-        }
+        /* We must send the reply first, so that the thread is free to handle
+         * any requests made in common_callback() */
+        rc = ptlrpc_reply(svc, req);
+        if (rc != 0)
+                RETURN(rc);
 
-        req->rq_status = 0;
+        lock1 = ldlm_handle2object(&dlm_req->lock_handle1);
+        lock2 = ldlm_handle2object(&dlm_req->lock_handle2);
 
+        common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
+                        0);
         RETURN(0);
 }
 
@@ -191,34 +249,28 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
         }
 
         switch (req->rq_reqmsg->opc) {
-        case LDLM_NAMESPACE_NEW:
-                CDEBUG(D_INODE, "namespace_new\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
-                rc = _ldlm_namespace_new(dev, req);
-                break;
-
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
-                rc = _ldlm_enqueue(req);
+                rc = _ldlm_enqueue(dev, svc, req);
                 break;
 
         case LDLM_CONVERT:
                 CDEBUG(D_INODE, "convert\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
-                rc = _ldlm_convert(req);
+                rc = _ldlm_convert(svc, req);
                 break;
 
         case LDLM_CANCEL:
                 CDEBUG(D_INODE, "cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
-                rc = _ldlm_cancel(req);
+                rc = _ldlm_cancel(svc, req);
                 break;
 
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
-                rc = _ldlm_callback(req);
+                rc = _ldlm_callback(svc, req);
                 break;
 
         default:
@@ -226,11 +278,11 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 RETURN(rc);
         }
 
+        EXIT;
 out:
         if (rc)
                 RETURN(ptlrpc_error(svc, req));
-        else
-                RETURN(ptlrpc_reply(svc, req));
+        return 0;
 }
 
 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
@@ -271,13 +323,15 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
         return err;
 }
 
-static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
+static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
         int err;
         ENTRY;
 
-        ldlm_spinlock = SPIN_LOCK_UNLOCKED;
+        obddev->obd_namespace = ldlm_namespace_new(obddev, 0);
+        if (obddev->obd_namespace == NULL)
+                LBUG();
 
         ldlm->ldlm_service =
                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
@@ -290,6 +344,11 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
                 CERROR("cannot start thread\n");
                 LBUG();
         }
+        err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
+        if (err) {
+                CERROR("cannot start thread\n");
+                LBUG();
+        }
 
         OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
         if (ldlm->ldlm_client == NULL)
@@ -302,78 +361,13 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
         RETURN(0);
 }
 
-static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
-{
-        struct list_head *tmp, *pos;
-        int rc = 0;
-
-        list_for_each_safe(tmp, pos, q) {
-                struct ldlm_lock *lock;
-
-                if (rc) {
-                        /* Res was already cleaned up. */
-                        LBUG();
-                }
-
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                ldlm_resource_del_lock(lock);
-                ldlm_lock_free(lock);
-                rc = ldlm_resource_put(res);
-        }
-
-        return rc;
-}
-
-static int do_free_namespace(struct ldlm_namespace *ns)
-{
-        struct list_head *tmp, *pos;
-        int i, rc;
-
-        for (i = 0; i < RES_HASH_SIZE; i++) {
-                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
-                        struct ldlm_resource *res;
-                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
-                        list_del_init(&res->lr_hash);
-
-                        rc = cleanup_resource(res, &res->lr_granted);
-                        if (!rc)
-                                rc = cleanup_resource(res, &res->lr_converting);
-                        if (!rc)
-                                rc = cleanup_resource(res, &res->lr_waiting);
-
-                        while (rc == 0)
-                                rc = ldlm_resource_put(res);
-                }
-        }
-
-        return ldlm_namespace_free(ns);
-}
-
-static int ldlm_free_all(struct obd_device *obddev)
-{
-        struct list_head *tmp, *pos;
-        int rc = 0;
-
-        ldlm_lock();
-
-        list_for_each_safe(tmp, pos, &ldlm_namespaces) {
-                struct ldlm_namespace *ns;
-                ns = list_entry(tmp, struct ldlm_namespace, ns_link);
-
-                rc |= do_free_namespace(ns);
-        }
-
-        ldlm_unlock();
-
-        return rc;
-}
-
 static int ldlm_cleanup(struct obd_device *obddev)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
         ENTRY;
 
+        ldlm_namespace_free(obddev->obd_namespace);
+
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         rpc_unregister_service(ldlm->ldlm_service);
 
@@ -385,11 +379,6 @@ static int ldlm_cleanup(struct obd_device *obddev)
         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
 
-        if (ldlm_free_all(obddev)) {
-                CERROR("ldlm_free_all could not complete.\n");
-                RETURN(-1);
-        }
-
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
index 7c5aa44..eaee614 100644 (file)
 
 #include <linux/lustre_dlm.h>
 
-#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
-
-static int is_local_conn(struct ptlrpc_connection *conn)
-{
-        ENTRY;
-        if (conn == NULL)
-                RETURN(1);
-
-        RETURN(LOOPBACK(conn->c_peer.peer_nid));
-}
-
 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
-                     __u32 ns_id,
+                     struct ldlm_namespace *ns,
                      struct ldlm_handle *parent_lock_handle,
                      __u64 *res_id,
                      __u32 type,
                      struct ldlm_extent *req_ex,
                      ldlm_mode_t mode,
                      int *flags,
-                     ldlm_lock_callback completion,
-                     ldlm_lock_callback blocking,
                      void *data,
                      __u32 data_len,
-                     struct ldlm_handle *lockh,
-                     struct ptlrpc_request **request)
+                     struct ldlm_handle *lockh)
 {
-        struct ldlm_handle local_lockh;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
         char *bufs[2] = {NULL, data};
         int rc, size[2] = {sizeof(*body), data_len};
-        ldlm_error_t err;
         ENTRY;
 
-        err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
-                                     type, &local_lockh);
-        if (err != ELDLM_OK)
-                RETURN(err);
+        *flags = 0;
+        rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
+                                    NULL, 0, lockh);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
 
-        lock = ldlm_handle2object(&local_lockh);
-        /* Is this lock locally managed? */
-        if (is_local_conn(conn))
-                GOTO(local, 0);
+        lock = ldlm_handle2object(lockh);
 
+        spin_unlock(&lock->l_lock);
         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
         /* Dump all of this data into the request buffer */
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        body->lock_desc.l_resource.lr_ns_id = ns_id;
         body->lock_desc.l_resource.lr_type = type;
         memcpy(body->lock_desc.l_resource.lr_name, res_id,
                sizeof(body->lock_desc.l_resource.lr_name));
@@ -77,7 +59,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                        sizeof(body->lock_desc.l_extent));
         body->flags = *flags;
 
-        memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
+        memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
 
         if (parent_lock_handle)
                 memcpy(&body->lock_handle2, parent_lock_handle,
@@ -90,73 +72,43 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
         if (rc != ELDLM_OK) {
+                spin_lock(&lock->l_resource->lr_lock);
                 ldlm_resource_put(lock->l_resource);
+                spin_unlock(&lock->l_resource->lr_lock);
                 ldlm_lock_free(lock);
                 GOTO(out, rc);
         }
 
         lock->l_connection = conn;
+        lock->l_client = cl;
         reply = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
+        memcpy(req_ex, &reply->lock_extent, sizeof(*req_ex));
         *flags = reply->flags;
 
-        CERROR("remote handle: %p, flags: %d\n",
+        CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
                (void *)(unsigned long)reply->lock_handle.addr, *flags);
-        CERROR("extent: %Lu -> %Lu\n",
+        CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
                (unsigned long long)reply->lock_extent.start,
                (unsigned long long)reply->lock_extent.end);
 
-        EXIT;
- local:
-        rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
-                                     completion, blocking, data, data_len);
+        ptlrpc_free_req(req);
+
+        rc = ldlm_local_lock_enqueue(lockh, req_ex, flags, NULL, NULL);
+
         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                     LDLM_FL_BLOCK_CONV)) {
+                      LDLM_FL_BLOCK_CONV)) {
                 /* Go to sleep until the lock is granted. */
                 /* FIXME: or cancelled. */
+                CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
+                       "going to sleep.\n", lock);
+                ldlm_lock_dump(lock);
                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
                                          lock->l_granted_mode);
+                CDEBUG(D_NET, "waking up, the lock must be granted.\n");
         }
- out:
-        *request = req;
-        return rc;
-}
-
-int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
-                           struct ptlrpc_connection *conn, __u32 ns_id)
-{
-        struct ldlm_namespace *ns;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        int rc, size = sizeof(*body);
-        ENTRY;
-
-        if (is_local_conn(conn))
-                GOTO(local, 0);
-
-        req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        body->lock_desc.l_resource.lr_ns_id = ns_id;
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-        if (rc)
-                GOTO(out, rc);
-
         EXIT;
- local:
-        rc = ldlm_namespace_new(obddev, ns_id, &ns);
-        if (rc != ELDLM_OK) {
-                /* XXX: It succeeded remotely but failed locally. What to do? */
-                CERROR("Local ldlm_namespace_new failed.\n");
-        }
  out:
         return rc;
 }
@@ -181,7 +133,11 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
 
-        if (new != NULL) {
+        if (new == NULL) {
+                CDEBUG(D_NET, "Sending granted AST\n");
+                ldlm_lock2desc(lock, &body->lock_desc);
+        } else {
+                CDEBUG(D_NET, "Sending blocked AST\n");
                 ldlm_lock2desc(new, &body->lock_desc);
                 ldlm_object2handle(new, &body->lock_handle2);
         }
@@ -198,20 +154,18 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
 }
 
 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
-                     int new_mode, int *flags, struct ptlrpc_request **request)
+                     int new_mode, int *flags)
 {
         struct ldlm_request *body;
-        struct ldlm_reply *reply;
         struct ldlm_lock *lock;
-        struct ptlrpc_request *req = NULL;
+        struct ldlm_resource *res;
+        struct ptlrpc_request *req;
         int rc, size[2] = {sizeof(*body), 0};
         char *bufs[2] = {NULL, NULL};
         ENTRY;
 
         lock = ldlm_handle2object(lockh);
-
-        if (is_local_conn(lock->l_connection))
-                GOTO(local, 0);
+        *flags = 0;
 
         size[1] = lock->l_data_len;
         bufs[1] = lock->l_data;
@@ -234,31 +188,43 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_msg_buf(req->rq_repmsg, 0);
-        *flags = reply->flags;
-
+        body = lustre_msg_buf(req->rq_repmsg, 0);
+        res = ldlm_local_lock_convert(lockh, new_mode, &body->flags);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
+        if (lock->l_req_mode != lock->l_granted_mode) {
+                /* Go to sleep until the lock is granted. */
+                /* FIXME: or cancelled. */
+                CDEBUG(D_NET, "convert returned a blocked lock, "
+                       "going to sleep.\n");
+                wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
+                                         lock->l_granted_mode);
+                CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+        }
         EXIT;
- local:
-        rc = ldlm_local_lock_convert(lockh, new_mode, flags);
  out:
-        *request = req;
+        ptlrpc_free_req(req);
         return rc;
 }
 
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
-                    struct ptlrpc_request **request)
+int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
 {
         struct ldlm_request *body;
-        struct ldlm_lock *lock;
-        struct ptlrpc_request *req = NULL;
+        struct ptlrpc_request *req;
+        struct ldlm_resource *res;
         int rc, size[2] = {sizeof(*body), 0};
         char *bufs[2] = {NULL, NULL};
         ENTRY;
 
-        lock = ldlm_handle2object(lockh);
-
-        if (is_local_conn(lock->l_connection))
-                GOTO(local, 0);
+        if (lock->l_data_len == sizeof(struct inode)) {
+                /* FIXME: do something better than throwing away everything */
+                struct inode *inode = lock->l_data;
+                if (inode == NULL)
+                        LBUG();
+                down(&inode->i_sem);
+                invalidate_inode_pages(inode);
+                up(&inode->i_sem);
+        }
 
         size[1] = lock->l_data_len;
         bufs[1] = lock->l_data;
@@ -275,13 +241,14 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
 
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
+        res = ldlm_local_lock_cancel(lock);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
         EXIT;
- local:
-        rc = ldlm_local_lock_cancel(lockh);
  out:
-        *request = req;
         return rc;
 }
index 5e765ee..35f046a 100644 (file)
 #define EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_LDLM
 
-#include <linux/slab.h>
 #include <linux/lustre_dlm.h>
 
 kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
 
-LIST_HEAD(ldlm_namespaces);
-spinlock_t ldlm_spinlock;
-
-struct ldlm_namespace *ldlm_namespace_find(__u32 id)
-{
-        struct list_head *tmp;
-        struct ldlm_namespace *res;
-
-        res = NULL;
-        list_for_each(tmp, &ldlm_namespaces) { 
-                struct ldlm_namespace *chk;
-                chk = list_entry(tmp, struct ldlm_namespace, ns_link);
-                
-                if ( chk->ns_id == id ) {
-                        res = chk;
-                        break;
-                }
-        }
-
-        return res;
-}
-
-/* this must be called with ldlm_lock() held */
-static int res_hash_init(struct ldlm_namespace *ns)
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev,
+                                          __u32 local)
 {
-        struct list_head *res_hash;
+        struct ldlm_namespace *ns;
         struct list_head *bucket;
 
-        if (ns->ns_hash != NULL)
-                RETURN(0);
-
-        /* FIXME: this memory appears to be leaked */
-        OBD_ALLOC(res_hash, sizeof(*res_hash) * RES_HASH_SIZE);
-        if (!res_hash) {
+        OBD_ALLOC(ns, sizeof(*ns));
+        if (!ns) {
                 LBUG();
-                RETURN(-ENOMEM);
+                RETURN(NULL);
         }
+        OBD_ALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
+        if (!ns->ns_hash) {
+                OBD_FREE(ns, sizeof(*ns));
+                LBUG();
+                RETURN(NULL);
+        }
+
+        ns->ns_obddev = obddev;
+        INIT_LIST_HEAD(&ns->ns_root_list);
+        ns->ns_lock = SPIN_LOCK_UNLOCKED;
+        ns->ns_refcount = 0;
+        ns->ns_local = local;
 
-        for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash;
+        for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
                 INIT_LIST_HEAD(bucket);
 
-        ns->ns_hash = res_hash;
-
-        return 0;
+        return ns;
 }
 
-ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id,
-                                struct ldlm_namespace **ns_out)
+static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
 {
-        struct ldlm_namespace *ns;
-        int rc;
+        struct list_head *tmp, *pos;
+        int rc = 0;
 
-        if (ldlm_namespace_find(id))
-                RETURN(-ELDLM_NAMESPACE_EXISTS);
+        list_for_each_safe(tmp, pos, q) {
+                struct ldlm_lock *lock;
 
-        OBD_ALLOC(ns, sizeof(*ns));
-        if (!ns) {
-                LBUG();
-                RETURN(-ENOMEM);
-        }
+                if (rc) {
+                        /* Res was already cleaned up. */
+                        LBUG();
+                }
 
-        ns->ns_id = id;
-        ns->ns_obddev = obddev;
-        INIT_LIST_HEAD(&ns->ns_root_list);
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                spin_lock(&lock->l_lock);
+                ldlm_resource_del_lock(lock);
+                ldlm_lock_free(lock);
 
-        rc = res_hash_init(ns);
-        if (rc) {
-                OBD_FREE(ns, sizeof(*ns));
-                RETURN(rc);
+                rc = ldlm_resource_put(res);
         }
-        list_add(&ns->ns_link, &ldlm_namespaces);
-        atomic_set(&ns->ns_refcount, 0);
 
-        *ns_out = ns;
-        return ELDLM_OK;
+        return rc;
 }
 
 int ldlm_namespace_free(struct ldlm_namespace *ns)
 {
-        if (atomic_read(&ns->ns_refcount))
-                RETURN(-EBUSY);
+        struct list_head *tmp, *pos;
+        int i, rc;
+
+        /* We should probably take the ns_lock, but then ldlm_resource_put
+         * couldn't take it.  Hmm. */
+        for (i = 0; i < RES_HASH_SIZE; i++) {
+                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                        struct ldlm_resource *res;
+                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
+
+                        spin_lock(&res->lr_lock);
+                        rc = cleanup_resource(res, &res->lr_granted);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_converting);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_waiting);
+
+                        if (rc == 0) {
+                                CERROR("Resource refcount nonzero after lock "
+                                       "cleanup; forcing cleanup.\n");
+                                res->lr_refcount = 1;
+                                rc = ldlm_resource_put(res);
+                        }
+                }
+        }
 
-        list_del(&ns->ns_link);
         OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE);
         OBD_FREE(ns, sizeof(*ns));
 
-        return 0;
+        return ELDLM_OK;
 }
 
 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
@@ -125,8 +123,10 @@ static struct ldlm_resource *ldlm_resource_new(void)
         struct ldlm_resource *res;
 
         res = kmem_cache_alloc(ldlm_resource_slab, SLAB_KERNEL);
-        if (res == NULL)
+        if (res == NULL) {
                 LBUG();
+                return NULL;
+        }
         memset(res, 0, sizeof(*res));
 
         INIT_LIST_HEAD(&res->lr_children);
@@ -136,35 +136,42 @@ static struct ldlm_resource *ldlm_resource_new(void)
         INIT_LIST_HEAD(&res->lr_waiting);
 
         res->lr_lock = SPIN_LOCK_UNLOCKED;
-
-        atomic_set(&res->lr_refcount, 1);
+        res->lr_refcount = 1;
 
         return res;
 }
 
-/* ldlm_lock() must be taken before calling resource_add */
+/* Args: locked namespace
+ * Returns: newly-allocated, referenced, unlocked resource */
 static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
                                                struct ldlm_resource *parent,
                                                __u64 *name, __u32 type)
 {
         struct list_head *bucket;
         struct ldlm_resource *res;
+        ENTRY;
 
-        bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
+        if (type < 0 || type > LDLM_MAX_TYPE) {
+                LBUG();
+                RETURN(NULL);
+        }
 
         res = ldlm_resource_new();
-        if (!res)
+        if (!res) {
                 LBUG();
+                RETURN(NULL);
+        }
 
         memcpy(res->lr_name, name, sizeof(res->lr_name));
         res->lr_namespace = ns;
-        if (type < 0 || type > LDLM_MAX_TYPE) 
-                LBUG();
+        ns->ns_refcount++;
 
         res->lr_type = type; 
         res->lr_most_restr = LCK_NL;
+
+        bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
         list_add(&res->lr_hash, bucket);
-        atomic_inc(&ns->ns_refcount);
+
         if (parent == NULL) {
                 res->lr_parent = res;
                 list_add(&res->lr_rootlink, &ns->ns_root_list);
@@ -173,49 +180,70 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
                 list_add(&res->lr_childof, &parent->lr_children);
         }
 
-        return res;
+        RETURN(res);
 }
 
+/* Args: unlocked namespace
+ * Locks: takes and releases ns->ns_lock and res->lr_lock
+ * Returns: referenced, unlocked ldlm_resource or NULL */
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
                                         __u64 *name, __u32 type, int create)
 {
         struct list_head *bucket;
         struct list_head *tmp = bucket;
-        struct ldlm_resource *res;
+        struct ldlm_resource *res = NULL;
 
         ENTRY;
 
         if (ns->ns_hash == NULL)
                 RETURN(NULL);
+
+        spin_lock(&ns->ns_lock);
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
 
-        res = NULL;
         list_for_each(tmp, bucket) {
                 struct ldlm_resource *chk;
                 chk = list_entry(tmp, struct ldlm_resource, lr_hash);
 
                 if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
                         res = chk;
-                        atomic_inc(&res->lr_refcount);
+                        spin_lock(&res->lr_lock);
+                        res->lr_refcount++;
+                        spin_unlock(&res->lr_lock);
+                        EXIT;
                         break;
                 }
         }
 
         if (res == NULL && create)
                 res = ldlm_resource_add(ns, parent, name, type);
+        spin_unlock(&ns->ns_lock);
 
         RETURN(res);
 }
 
+/* Args: locked resource
+ * Locks: takes and releases res->lr_lock
+ *        takes and releases ns->ns_lock iff res->lr_refcount falls to 0
+ */
 int ldlm_resource_put(struct ldlm_resource *res)
 {
-        int rc = 0; 
+        int rc = 0;
 
-        if (atomic_read(&res->lr_refcount) <= 0)
-                LBUG();
+        if (res->lr_refcount == 1) {
+                struct ldlm_namespace *ns = res->lr_namespace;
+                ENTRY;
+
+                spin_unlock(&res->lr_lock);
+                spin_lock(&ns->ns_lock);
+                spin_lock(&res->lr_lock);
+
+                if (res->lr_refcount != 1) {
+                        spin_unlock(&ns->ns_lock);
+                        goto out;
+                }
 
-        if (atomic_dec_and_test(&res->lr_refcount)) {
                 if (!list_empty(&res->lr_granted))
                         LBUG();
 
@@ -225,29 +253,41 @@ int ldlm_resource_put(struct ldlm_resource *res)
                 if (!list_empty(&res->lr_waiting))
                         LBUG();
 
-                atomic_dec(&res->lr_namespace->ns_refcount);
+                if (!list_empty(&res->lr_children))
+                        LBUG();
+
+                ns->ns_refcount--;
                 list_del(&res->lr_hash);
                 list_del(&res->lr_rootlink);
                 list_del(&res->lr_childof);
 
                 kmem_cache_free(ldlm_resource_slab, res);
+                spin_unlock(&ns->ns_lock);
                 rc = 1;
+        } else {
+                ENTRY;
+        out:
+                res->lr_refcount--;
+                if (res->lr_refcount < 0)
+                        LBUG();
         }
 
-        return rc
+        RETURN(rc)
 }
 
+/* Must be called with resource->lr_lock taken */
 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
                             struct ldlm_lock *lock)
 {
         list_add(&lock->l_res_link, head);
-        atomic_inc(&res->lr_refcount);
+        res->lr_refcount++;
 }
 
+/* Must be called with resource->lr_lock taken */
 void ldlm_resource_del_lock(struct ldlm_lock *lock)
 {
-        list_del(&lock->l_res_link);
-        atomic_dec(&lock->l_resource->lr_refcount);
+        list_del_init(&lock->l_res_link);
+        lock->l_resource->lr_refcount--;
 }
 
 int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
@@ -258,7 +298,6 @@ int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
 
 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
 {
-        desc->lr_ns_id = res->lr_namespace->ns_id;
         desc->lr_type = res->lr_type;
         memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
         memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
@@ -278,8 +317,7 @@ void ldlm_resource_dump(struct ldlm_resource *res)
                  (unsigned long long)res->lr_name[2]);
 
         CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name);
-        CDEBUG(D_OTHER, "Namespace: %p (%u)\n", res->lr_namespace,
-               res->lr_namespace->ns_id);
+        CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace);
         CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
 
         CDEBUG(D_OTHER, "Granted locks:\n");
@@ -303,4 +341,3 @@ void ldlm_resource_dump(struct ldlm_resource *res)
                 ldlm_lock_dump(lock);
         }
 }
-
index 20b2301..e6efaf2 100644 (file)
@@ -30,21 +30,21 @@ int ldlm_test_basics(struct obd_device *obddev)
         struct ldlm_handle lockh_1, lockh_2;
         int flags;
 
-        ldlm_lock();
-
-        err = ldlm_namespace_new(obddev, 1, &ns);
-        if (err != ELDLM_OK)
+        ns = ldlm_namespace_new(obddev, 0);
+        if (ns == NULL)
                 LBUG();
 
-        err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_1);
-        err = ldlm_local_lock_enqueue(&lockh_1, LCK_CR, NULL, &flags, NULL,
-                                      ldlm_test_callback, NULL, 0);
+        err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_CR,
+                                     NULL, 0, &lockh_1);
+        err = ldlm_local_lock_enqueue(&lockh_1, NULL, &flags,
+                                      ldlm_test_callback, ldlm_test_callback);
         if (err != ELDLM_OK)
                 LBUG();
 
-        err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_2);
-        err = ldlm_local_lock_enqueue(&lockh_2, LCK_EX, NULL, &flags, NULL,
-                                      ldlm_test_callback, NULL, 0);
+        err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_EX,
+                                     NULL, 0, &lockh_2);
+        err = ldlm_local_lock_enqueue(&lockh_2, NULL, &flags,
+                                      ldlm_test_callback, ldlm_test_callback);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_BLOCK_GRANTED))
@@ -55,12 +55,12 @@ int ldlm_test_basics(struct obd_device *obddev)
                 LBUG();
         ldlm_resource_dump(res);
 
-        err = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags);
-        if (err != ELDLM_OK)
-                LBUG();
+        res = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
 
         ldlm_resource_dump(res);
-        ldlm_unlock();
+        ldlm_namespace_free(ns);
 
         return 0;
 }
@@ -69,40 +69,39 @@ int ldlm_test_extents(struct obd_device *obddev)
 {
         struct ldlm_namespace *ns;
         struct ldlm_resource *res;
+        struct ldlm_lock *lock;
         __u64 res_id[RES_NAME_SIZE] = {0, 0, 0};
         struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11};
         struct ldlm_handle ext1_h, ext2_h, ext3_h;
         ldlm_error_t err;
         int flags;
 
-        ldlm_lock();
-
-        err = ldlm_namespace_new(obddev, 2, &ns);
-        if (err != ELDLM_OK)
+        ns = ldlm_namespace_new(obddev, 0);
+        if (ns == NULL)
                 LBUG();
 
         flags = 0;
-        err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext1_h);
-        err = ldlm_local_lock_enqueue(&ext1_h, LCK_PR, &ext1, &flags, NULL,
-                                      NULL, NULL, 0);
+        err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR,
+                                     NULL, 0, &ext1_h);
+        err = ldlm_local_lock_enqueue(&ext1_h, &ext1, &flags, NULL, NULL);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_LOCK_CHANGED))
                 LBUG();
 
         flags = 0;
-        err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext2_h);
-        err = ldlm_local_lock_enqueue(&ext2_h, LCK_PR, &ext2, &flags, NULL,
-                                      NULL, NULL, 0);
+        err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR,
+                                     NULL, 0, &ext2_h);
+        err = ldlm_local_lock_enqueue(&ext2_h, &ext2, &flags, NULL, NULL);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_LOCK_CHANGED))
                 LBUG();
 
         flags = 0;
-        err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext3_h);
-        err = ldlm_local_lock_enqueue(&ext3_h, LCK_EX, &ext3, &flags, NULL,
-                                      NULL, NULL, 0);
+        err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_EX,
+                                     NULL, 0, &ext3_h);
+        err = ldlm_local_lock_enqueue(&ext3_h, &ext3, &flags, NULL, NULL);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_BLOCK_GRANTED))
@@ -112,22 +111,22 @@ int ldlm_test_extents(struct obd_device *obddev)
 
         /* Convert/cancel blocking locks */
         flags = 0;
-        err = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags);
-        if (err != ELDLM_OK)
-                LBUG();
+        res = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
 
         flags = 0;
-        err = ldlm_local_lock_cancel(&ext2_h);
-        if (err != ELDLM_OK)
-                LBUG();
+        lock = ldlm_handle2object(&ext2_h);
+        res = ldlm_local_lock_cancel(lock);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
 
         /* Dump the results */
         res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
         if (res == NULL)
                 LBUG();
         ldlm_resource_dump(res);
-
-        ldlm_unlock();
+        ldlm_namespace_free(ns);
 
         return 0;
 }
@@ -136,36 +135,19 @@ static int ldlm_test_network(struct obd_device *obddev,
                              struct ptlrpc_connection *conn)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
-        struct ptlrpc_request *request;
 
         __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
         struct ldlm_extent ext = {4, 6};
-        struct ldlm_handle lockh1, lockh2;
+        struct ldlm_handle lockh1;
         int flags = 0;
         ldlm_error_t err;
 
-        err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, conn, 3);
-        ptlrpc_free_req(request);
-        CERROR("ldlm_cli_namespace_new: %d\n", err);
-        if (err != ELDLM_OK)
-                GOTO(out, err);
-
-        err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3,
+        err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, obddev->obd_namespace,
                                NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags,
-                               NULL, NULL, NULL, 0, &lockh1, &request);
-        ptlrpc_free_req(request);
-        CERROR("ldlm_cli_enqueue: %d\n", err);
-
-        flags = 0;
-        err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3,
-                               NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags,
-                               NULL, NULL, NULL, 0, &lockh2, &request);
-        ptlrpc_free_req(request);
+                               NULL, 0, &lockh1);
         CERROR("ldlm_cli_enqueue: %d\n", err);
 
-        EXIT;
- out:
-        return err;
+        RETURN(err);
 }
 
 int ldlm_test(struct obd_device *obddev, struct ptlrpc_connection *conn)
index 672b38c..4be688a 100644 (file)
  *      (jj@sunsite.ms.mff.cuni.cz)
  */
 
-#include <asm/uaccess.h>
-#include <asm/system.h>
-
-#include <linux/errno.h>
-#include <linux/fs.h>
-#include <linux/fcntl.h>
-#include <linux/sched.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/smp_lock.h>
-
 #define DEBUG_SUBSYSTEM S_LLITE
 
-#include <linux/obd_support.h>
+#include <linux/lustre_dlm.h>
 #include <linux/lustre_lite.h>
 
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
@@ -48,23 +35,23 @@ extern inline struct obdo * ll_oa_from_inode(struct inode *inode,
 
 static int ll_file_open(struct inode *inode, struct file *file)
 {
-        int rc; 
+        int rc;
         struct ptlrpc_request *req = NULL;
         struct ll_file_data *fd;
         struct obdo *oa;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         ENTRY;
 
-        if (file->private_data) 
+        if (file->private_data)
                 LBUG();
 
-        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL); 
+        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
         if (!fd)
                 GOTO(out, rc = -ENOMEM);
         memset(fd, 0, sizeof(*fd));
 
         rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
-                      S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); 
+                      S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
         fd->fd_req = req;
         ptlrpc_req_finished(req);
         if (rc) {
@@ -79,21 +66,19 @@ static int ll_file_open(struct inode *inode, struct file *file)
         oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
         if (oa == NULL)
                 LBUG();
-        rc = obd_open(ll_i2obdconn(inode), oa); 
+        rc = obd_open(ll_i2obdconn(inode), oa);
         obdo_free(oa);
         if (rc) {
                 /* XXX: Need to do mdc_close here! */
-                if (rc > 0) 
-                        rc = -rc;
-                GOTO(out, rc);
+                GOTO(out, rc = abs(rc));
         }
 
         file->private_data = fd;
 
-        EXIT; 
+        EXIT;
  out:
         if (rc && fd) {
-                kmem_cache_free(ll_file_data_slab, fd); 
+                kmem_cache_free(ll_file_data_slab, fd);
                 file->private_data = NULL;
         }
 
@@ -112,7 +97,7 @@ static int ll_file_release(struct inode *inode, struct file *file)
         ENTRY;
 
         fd = (struct ll_file_data *)file->private_data;
-        if (!fd || !fd->fd_mdshandle) { 
+        if (!fd || !fd->fd_mdshandle) {
                 LBUG();
                 GOTO(out, rc = -EINVAL);
         }
@@ -120,20 +105,17 @@ static int ll_file_release(struct inode *inode, struct file *file)
         oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
         if (oa == NULL)
                 LBUG();
-        rc = obd_close(ll_i2obdconn(inode), oa); 
+        rc = obd_close(ll_i2obdconn(inode), oa);
         obdo_free(oa);
-        if (rc) { 
-                if (rc > 0) 
-                        rc = -rc;
-                GOTO(out, rc);
-        }
+        if (rc)
+                GOTO(out, abs(rc));
 
         iattr.ia_valid = ATTR_SIZE;
         iattr.ia_size = inode->i_size;
         rc = ll_inode_setattr(inode, &iattr, 0);
         if (rc) {
                 CERROR("failed - %d.\n", rc);
-                rc = -EIO;
+                rc = -EIO; /* XXX - GOTO(out)? -phil */
         }
 
         rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
@@ -149,8 +131,8 @@ static int ll_file_release(struct inode *inode, struct file *file)
         EXIT; 
 
  out:
-        if (!rc && fd) { 
-                kmem_cache_free(ll_file_data_slab, fd); 
+        if (!rc && fd) {
+                kmem_cache_free(ll_file_data_slab, fd);
                 file->private_data = NULL;
         }
         return rc;
@@ -172,6 +154,53 @@ static inline void ll_remove_suid(struct inode *inode)
         }
 }
 
+static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
+                            loff_t *ppos)
+{
+        struct inode *inode = filp->f_dentry->d_inode;
+#if 0
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ldlm_extent extent;
+        struct ldlm_handle lockh;
+        __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+        int flags = 0;
+        ldlm_error_t err;
+#endif
+        ssize_t retval;
+        ENTRY;
+
+#if 0
+        extent.start = *ppos;
+        extent.end = *ppos + count;
+        CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
+               inode->i_ino, extent.start, extent.end);
+
+        err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id,
+                          LDLM_EXTENT, &extent, LCK_PR, &flags, inode,
+                          sizeof(*inode), &lockh);
+        if (err != ELDLM_OK)
+                CERROR("lock enqueue: err: %d\n", err);
+        ldlm_lock_dump((void *)(unsigned long)lockh.addr);
+#endif
+
+        CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
+               inode->i_ino, count, *ppos);
+        retval = generic_file_read(filp, buf, count, ppos);
+        if (retval > 0) {
+                struct iattr attr;
+                attr.ia_valid = ATTR_ATIME;
+                attr.ia_atime = CURRENT_TIME;
+                ll_setattr(filp->f_dentry, &attr);
+        }
+
+#if 0
+        err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh);
+        if (err != ELDLM_OK)
+                CERROR("lock cancel: err: %d\n", err);
+#endif
+
+        RETURN(retval);
+}
 
 /*
  * Write to a file (through the page cache).
@@ -179,12 +208,36 @@ static inline void ll_remove_suid(struct inode *inode)
 static ssize_t
 ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
 {
+        struct inode *inode = file->f_dentry->d_inode;
+#if 0
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ldlm_extent extent;
+        struct ldlm_handle lockh;
+        __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+        int flags = 0;
+        ldlm_error_t err;
+#endif
         ssize_t retval;
+        ENTRY;
+
+#if 0
+        extent.start = *ppos;
+        extent.end = *ppos + count;
+        CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
+               inode->i_ino, extent.start, extent.end);
+
+        err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id,
+                          LDLM_EXTENT, &extent, LCK_PW, &flags, inode,
+                          sizeof(*inode), &lockh);
+        if (err != ELDLM_OK)
+                CERROR("lock enqueue: err: %d\n", err);
+        ldlm_lock_dump((void *)(unsigned long)lockh.addr);
+#endif
+
         CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
-               file->f_dentry->d_inode->i_ino, (long)count, *ppos);
+               inode->i_ino, (long)count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
-        CDEBUG(D_INFO, "Wrote %ld\n", (long)retval);
 
         /* update mtime/ctime/atime here, NOT size */
         if (retval > 0) {
@@ -194,20 +247,25 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                         CURRENT_TIME;
                 ll_setattr(file->f_dentry, &attr);
         }
-        EXIT;
-        return retval;
-}
 
+#if 0
+        err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh);
+        if (err != ELDLM_OK)
+                CERROR("lock cancel: err: %d\n", err);
+#endif
+
+        RETURN(retval);
+}
 
 /* XXX this does not need to do anything for data, it _does_ need to
-   call setattr */ 
+   call setattr */
 int ll_fsync(struct file *file, struct dentry *dentry, int data)
 {
         return 0;
 }
 
 struct file_operations ll_file_operations = {
-        read: generic_file_read,
+        read: ll_file_read,
         write: ll_file_write,
         open: ll_file_open,
         release: ll_file_release,
@@ -215,9 +273,7 @@ struct file_operations ll_file_operations = {
         fsync: NULL
 };
 
-
 struct inode_operations ll_file_inode_operations = {
         truncate: ll_truncate,
         setattr: ll_setattr
 };
-
index 21b2556..49b19ad 100644 (file)
  *
  */
 
-#include <linux/config.h>
-#include <linux/module.h>
-
 #define DEBUG_SUBSYSTEM S_LLITE
 
+#include <linux/module.h>
 #include <linux/lustre_lite.h>
 #include <linux/lustre_ha.h>
+#include <linux/lustre_dlm.h>
 
 kmem_cache_t *ll_file_data_slab;
 extern struct address_space_operations ll_aops;
@@ -121,6 +120,12 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 GOTO(out_free, sb = NULL);
         }
 
+        sbi->ll_namespace = ldlm_namespace_new(NULL, 1);
+        if (sbi->ll_namespace == NULL) {
+                CERROR("failed to create local lock namespace\n");
+                GOTO(out_free, sb = NULL);
+        }
+
         ptlrpc_init_client(ptlrpc_connmgr, ll_recover,
                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
                            &sbi->ll_mds_client);
@@ -193,6 +198,8 @@ out_disc:
                 obd_disconnect(&sbi->ll_conn);
 out_free:
                 MOD_DEC_USE_COUNT;
+                if (sbi->ll_namespace)
+                        ldlm_namespace_free(sbi->ll_namespace);
                 OBD_FREE(sbi, sizeof(*sbi));
         }
         if (device)
@@ -209,6 +216,7 @@ static void ll_put_super(struct super_block *sb)
         ENTRY;
         ll_commitcbd_cleanup(sbi);
         obd_disconnect(&sbi->ll_conn);
+        ldlm_namespace_free(sbi->ll_namespace);
         ptlrpc_put_connection(sbi->ll_mds_conn);
         ptlrpc_cleanup_client(&sbi->ll_mds_client);
         OBD_FREE(sb->u.generic_sbp, sizeof(*sbi));
index 7b33e6c..8f0b85c 100644 (file)
@@ -31,6 +31,14 @@ static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
         *connection = osc->osc_conn;
 }
 
+static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
+                          struct ptlrpc_connection **connection)
+{
+        struct osc_obd *osc = &conn->oc_dev->u.osc;
+        *cl = osc->osc_ldlm_client;
+        *connection = osc->osc_conn;
+}
+
 static int osc_connect(struct obd_conn *conn)
 {
         struct ptlrpc_request *request;
@@ -345,43 +353,35 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
+        struct ptlrpc_bulk_desc *bulk;
+        int rc;
+        ENTRY;
 
         osc_con2cl(conn, &cl, &connection);
 
-        if (cl->cli_obd) {
-                /* local sendpage */
-                memcpy((char *)(unsigned long)dst->addr,
-                       (char *)(unsigned long)src->addr, src->len);
-        } else {
-                struct ptlrpc_bulk_desc *bulk;
-                int rc;
-
-                bulk = ptlrpc_prep_bulk(connection);
-                if (bulk == NULL)
-                        RETURN(-ENOMEM);
-
-                bulk->b_buf = (void *)(unsigned long)src->addr;
-                bulk->b_buflen = src->len;
-                bulk->b_xid = dst->xid;
-                rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
-                if (rc != 0) {
-                        CERROR("send_bulk failed: %d\n", rc);
-                        ptlrpc_free_bulk(bulk);
-                        LBUG();
-                        RETURN(rc);
-                }
-                wait_event_interruptible(bulk->b_waitq,
-                                         ptlrpc_check_bulk_sent(bulk));
+        bulk = ptlrpc_prep_bulk(connection);
+        if (bulk == NULL)
+                RETURN(-ENOMEM);
 
-                if (bulk->b_flags & PTL_RPC_FL_INTR) {
-                        ptlrpc_free_bulk(bulk);
-                        RETURN(-EINTR);
-                }
+        bulk->b_buf = (void *)(unsigned long)src->addr;
+        bulk->b_buflen = src->len;
+        bulk->b_xid = dst->xid;
+        rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
+        if (rc != 0) {
+                CERROR("send_bulk failed: %d\n", rc);
+                ptlrpc_free_bulk(bulk);
+                LBUG();
+                RETURN(rc);
+        }
+        wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk));
 
+        if (bulk->b_flags & PTL_RPC_FL_INTR) {
                 ptlrpc_free_bulk(bulk);
+                RETURN(-EINTR);
         }
 
-        return 0;
+        ptlrpc_free_bulk(bulk);
+        RETURN(0);
 }
 
 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
@@ -392,18 +392,16 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         struct ptlrpc_connection *connection;
         struct ptlrpc_request *request;
         struct ost_body *body;
-        struct obd_ioobj ioo;
-        struct niobuf src;
         int pages, rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
         struct ptlrpc_bulk_desc **bulk;
         ENTRY;
 
-        size[1] = num_oa * sizeof(ioo);
+        size[1] = num_oa * sizeof(struct obd_ioobj);
         pages = 0;
         for (i = 0; i < num_oa; i++)
                 pages += oa_bufs[i];
-        size[2] = pages * sizeof(src);
+        size[2] = pages * sizeof(struct niobuf);
 
         OBD_ALLOC(bulk, pages * sizeof(*bulk));
         if (bulk == NULL)
@@ -475,7 +473,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         struct obd_ioobj ioo;
         struct ost_body *body;
         struct niobuf *src;
-        int pages, rc, i, j, size[3] = {sizeof(*body)};
+        long pages;
+        int rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
         ENTRY;
 
@@ -519,7 +518,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                 GOTO(out, rc = -EINVAL);
 
         if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
-                CERROR("buffer length wrong (%d vs. %d)\n",
+                CERROR("buffer length wrong (%d vs. %ld)\n",
                        request->rq_repmsg->buflens[1],
                        pages * sizeof(struct niobuf));
                 GOTO(out, rc = -EINVAL);
@@ -554,6 +553,74 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                                      offset, flags);
 }
 
+int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
+                struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type,
+                struct ldlm_extent *extent, __u32 mode, int *flags, void *data,
+                int datalen, struct ldlm_handle *lockh)
+{
+        struct ptlrpc_connection *conn;
+        struct ptlrpc_client *cl;
+        int rc;
+        __u32 mode2;
+
+        /* Filesystem locks are given a bit of special treatment: first we
+         * fixup the lock to start and end on page boundaries. */
+        extent->start &= PAGE_MASK;
+        extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+
+        /* Next, search for already existing extent locks that will cover us */
+        osc_con2dlmcl(oconn, &cl, &conn);
+        rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
+        if (rc == 1) {
+                /* We already have a lock, and it's referenced */
+                return 0;
+        }
+
+        /* Next, search for locks that we can upgrade (if we're trying to write)
+         * or are more than we need (if we're trying to read).  Because the VFS
+         * and page cache already protect us locally, lots of readers/writers
+         * can share a single PW lock. */
+        if (mode == LCK_PW)
+                mode2 = LCK_PR;
+        else
+                mode2 = LCK_PW;
+
+        rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
+        if (rc == 1) {
+                int flags;
+                struct ldlm_lock *lock = ldlm_handle2object(lockh);
+                /* FIXME: This is not incredibly elegant, but it might
+                 * be more elegant than adding another parameter to
+                 * lock_match.  I want a second opinion. */
+                ldlm_lock_addref(lock, mode);
+                ldlm_lock_decref(lock, mode2);
+
+                if (mode == LCK_PR)
+                        return 0;
+
+                rc = ldlm_cli_convert(cl, lockh, type, &flags);
+                if (rc)
+                        LBUG();
+
+                return rc;
+        }
+
+        rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
+                              extent, mode, flags, data, datalen, lockh);
+        return rc;
+}
+
+int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        lock = ldlm_handle2object(lockh);
+        ldlm_lock_decref(lock, mode);
+
+        RETURN(0);
+}
+
 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct osc_obd *osc = &obddev->u.osc;
@@ -613,7 +680,9 @@ struct obd_ops osc_obd_ops = {
         o_connect: osc_connect,
         o_disconnect: osc_disconnect,
         o_brw: osc_brw,
-        o_punch: osc_punch
+        o_punch: osc_punch,
+        o_enqueue: osc_enqueue,
+        o_cancel: osc_cancel
 };
 
 static int __init osc_init(void)
index e49e989..8d56058 100644 (file)
@@ -595,7 +595,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
-#if 0
+#if 1
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
index e66aa05..9523a21 100644 (file)
@@ -361,7 +361,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         init_waitqueue_head(&req->rq_wait_for_rep);
  resend:
         req->rq_time = CURRENT_TIME;
-        req->rq_timeout = 3;
+        req->rq_timeout = 30;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);