Whamcloud - gitweb
- add more infrastructure to handle extents and debug.
authorbraam <braam>
Fri, 29 Mar 2002 15:19:40 +0000 (15:19 +0000)
committerbraam <braam>
Fri, 29 Mar 2002 15:19:40 +0000 (15:19 +0000)
- some of this is going to change dramatically as we feel we
  are handling too many resources now.

12 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/mds/handler.c
lustre/ptlrpc/service.c
lustre/tests/common.sh
lustre/tests/lldlm.sh
lustre/tests/llmountcleanup.sh

index 96cef52..39c6eb1 100644 (file)
@@ -78,6 +78,7 @@ struct ldlm_namespace {
         struct list_head      ns_link;      /* in the list of ns's */
         __u32                 ns_id;        /* identifier of ns */
         struct list_head     *ns_hash;      /* hash table for ns */
+        atomic_t              ns_refcount;  /* count of resources in the hash */
         struct list_head      ns_root_list; /* all root resources in ns */
         struct obd_device    *ns_obddev;
 };
@@ -95,7 +96,7 @@ struct ldlm_namespace {
 struct ldlm_lock;
 
 typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new,
-                                  void *data); 
+                                  void *data, __u32 data_len);
 
 struct ldlm_lock {
         struct ldlm_resource *l_resource;
@@ -108,6 +109,8 @@ struct ldlm_lock {
         ldlm_lock_callback    l_completion_ast;
         ldlm_lock_callback    l_blocking_ast;
         struct lustre_peer   *l_peer;
+        void                 *l_data;
+        __u32                 l_data_len;
         //void                 *l_event;
         //XXX cluster_host    l_holder;
         __u32                 l_version[RES_VERSION_SIZE];
@@ -122,6 +125,7 @@ typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, __u64 *res_id_in,
 #define LDLM_EXTENT      0x1
 #define LDLM_MDSINTENT   0x2
 
+#define LDLM_MAX_TYPE    0x2
 extern ldlm_res_compat ldlm_res_compat_table []; 
 extern ldlm_res_policy ldlm_res_policy_table []; 
 
@@ -144,8 +148,6 @@ struct ldlm_resource {
         __u32                  lr_version[RES_VERSION_SIZE];
         __u32                  lr_type; /* PLAIN, EXTENT, or MDSINTENT */
         spinlock_t             lr_lock;
-
-        void (*lr_blocking)(struct ldlm_lock *lock, struct ldlm_lock *new);
 };
 
 struct ldlm_extent {
@@ -160,7 +162,9 @@ static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
 
 static inline void *ldlm_handle2object(struct ldlm_handle *handle)
 {
-        return (void *)(unsigned long)(handle->addr);
+        if (handle) 
+                return (void *)(unsigned long)(handle->addr);
+        return NULL; 
 }
 
 static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle)
@@ -188,29 +192,38 @@ int ldlm_extent_policy(struct ldlm_resource *, __u64 *, __u64 *,
 /* ldlm_lock.c */
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                                      __u32 ns_id,
-                                     struct ldlm_handle *parent_res_handle,
                                      struct ldlm_handle *parent_lock_handle,
                                      __u64 *res_id,
+                                     __u32 type,
                                      ldlm_mode_t mode,
                                      int *flags,
                                      ldlm_lock_callback completion,
                                      ldlm_lock_callback blocking,
-                                     __u32 data_len,
                                      void *data,
+                                     __u32 data_len,
                                      struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
+                                     struct ldlm_handle *lockh,
+                                     int new_mode, int *flags);
+ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
+                                    struct ldlm_handle *lockh);
 void ldlm_lock_dump(struct ldlm_lock *lock);
 
 /* ldlm_test.c */
 int ldlm_test(struct obd_device *device);
 
 /* resource.c */
-struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id);
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id);
+struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id);
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id);
+int ldlm_namespace_free(struct ldlm_namespace *ns);
 void ldlm_resource_dump(struct ldlm_resource *res);
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
-                                        __u64 *name, int create);
+                                        __u64 *name, __u32 type, int create);
 int ldlm_resource_put(struct ldlm_resource *res);
+void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
+                            struct ldlm_lock *lock);
+void ldlm_resource_del_lock(struct ldlm_lock *lock);
 
 #endif /* __KERNEL__ */
 
index d4e5941..d036a5a 100644 (file)
@@ -370,6 +370,7 @@ struct ldlm_handle {
 struct ldlm_request {
         __u32 ns_id;
         __u64 res_id[RES_NAME_SIZE];
+        __u32 res_type;
         __u32 flags;
         struct ldlm_handle parent_res_handle;
         struct ldlm_handle parent_lock_handle;
index 3e1724d..2f92a1e 100644 (file)
@@ -34,12 +34,12 @@ int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new)
 {
         struct ldlm_extent *child_ex, *new_ex;
 
-       child_ex = ldlm_res2extent(child);
-       new_ex = ldlm_res2extent(new);
+        child_ex = ldlm_res2extent(child);
+        new_ex = ldlm_res2extent(new);
 
-       if (MAX(child_ex->start, new_ex->start) >
-           MIN(child_ex->end, new_ex->end))
-               return 0;
+        if (MAX(child_ex->start, new_ex->start) <=
+            MIN(child_ex->end, new_ex->end))
+                return 0;
 
         return 1;
 }
@@ -53,11 +53,12 @@ int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new)
  *
  * To reconstruct our formulas, take a deep breath. */
 int ldlm_extent_policy(struct ldlm_resource *parent,
-                      __u64 *res_id_in, __u64 *res_id_out,
-                      ldlm_mode_t mode, void *data)
+                       __u64 *res_id_in, __u64 *res_id_out,
+                       ldlm_mode_t mode, void *data)
 {
         struct ldlm_extent *new_ex, *req_ex;
         struct list_head *tmp;
+        int rc = 0;
 
         req_ex = (struct ldlm_extent *)res_id_in;
 
@@ -67,27 +68,32 @@ int ldlm_extent_policy(struct ldlm_resource *parent,
 
         list_for_each(tmp, &parent->lr_children) {
                 struct ldlm_resource *res;
-                struct ldlm_extent *res_ex;
+                struct ldlm_extent *exist_ex;
                 res = list_entry(tmp, struct ldlm_resource, lr_childof);
 
-                res_ex = ldlm_res2extent(res);
+                exist_ex = ldlm_res2extent(res);
 
-                if (res_ex->end < req_ex->start)
-                        new_ex->start = MIN(res_ex->end, new_ex->start);
+                if (exist_ex->end < req_ex->start)
+                        new_ex->start = MIN(exist_ex->end, new_ex->start);
                 else {
-                        if (res_ex->start < req_ex->start)
+                        if (exist_ex->start < req_ex->start &&
+                            !lockmode_compat(res->lr_most_restr, mode))
                                 /* Policy: minimize conflict overlap */
                                 new_ex->start = req_ex->start;
                 }
-                if (res_ex->start > req_ex->end)
-                        new_ex->end = MAX(res_ex->start, new_ex->end);
+                if (exist_ex->start > req_ex->end)
+                        new_ex->end = MAX(exist_ex->start, new_ex->end);
                 else {
-                        if (res_ex->end > req_ex->end)
+                        if (exist_ex->end > req_ex->end &&
+                            !lockmode_compat(res->lr_most_restr, mode))
                                 /* Policy: minimize conflict overlap */
                                 new_ex->end = req_ex->end;
                 }
         }
 
-       return 0;
+        if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
+                rc = ELDLM_RES_CHANGED;
+
+        return rc;
 }
 
index be1cba0..34a861d 100644 (file)
@@ -45,6 +45,9 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
 {
         struct ldlm_lock *lock;
 
+        if (resource == NULL)
+                LBUG();
+
         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
         if (lock == NULL)
                 return NULL;
@@ -76,73 +79,91 @@ static int ldlm_notify_incompatible(struct list_head *list,
 
                 rc = 1;
 
-                if (lock->l_resource->lr_blocking != NULL)
-                        lock->l_resource->lr_blocking(lock, new);
+                if (lock->l_blocking_ast != NULL)
+                        lock->l_blocking_ast(lock, new, lock->l_data,
+                                             lock->l_data_len);
         }
 
         return rc;
 }
 
-
-static int ldlm_reprocess_queue(struct list_head *queue, 
-                                struct list_head *granted_list)
+static int ldlm_lock_compat(struct ldlm_lock *lock)
 {
-        struct list_head *tmp1, *tmp2;
-        struct ldlm_resource *res;
-        int rc = 0;
+        struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
+        ldlm_res_compat compat;
 
-        list_for_each(tmp1, queue) { 
-                struct ldlm_lock *pending;
-                rc = 0; 
-                pending = list_entry(tmp1, struct ldlm_lock, l_res_link);
-
-                /* check if pending can go in ... */ 
-                list_for_each(tmp2, granted_list) {
-                        struct ldlm_lock *lock;
-                        lock = list_entry(tmp2, struct ldlm_lock, l_res_link);
-                        if (lockmode_compat(lock->l_granted_mode, 
-                                            pending->l_req_mode))
+        if (parent_res &&
+            (compat = ldlm_res_compat_table[parent_res->lr_type])) {
+                struct list_head *tmp;
+                int incompat = 0;
+                list_for_each(tmp, &parent_res->lr_children) {
+                        struct ldlm_resource *child;
+                        child = list_entry(tmp, struct ldlm_resource,
+                                           lr_childof);
+
+                        /* compat will return 0 when child == l_resource
+                         * hence notifications on the same resource are incl. */
+                        if (compat(child, lock->l_resource))
                                 continue;
-                        else { 
-                                /* no, we are done */
-                                rc = 1;
-                                break;
-                        }
-                }
 
-                if (rc) { 
-                        /* no - we are done */
-                        break;
+                        incompat |= ldlm_notify_incompatible(&child->lr_granted,
+                                                             lock);
                 }
 
-                res = pending->l_resource;
-                list_del(&pending->l_res_link); 
-                list_add(&pending->l_res_link, &res->lr_granted);
-                pending->l_granted_mode = pending->l_req_mode;
+                return incompat;
+        }
+
+        return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
+}
+
+static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+{
+        ldlm_resource_add_lock(res, &res->lr_granted, lock);
+        lock->l_granted_mode = lock->l_req_mode;
+
+        if (lock->l_granted_mode < res->lr_most_restr)
+                res->lr_most_restr = lock->l_granted_mode;
+
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, NULL, NULL, 0);
+}
 
-                if (pending->l_granted_mode < res->lr_most_restr)
-                        res->lr_most_restr = pending->l_granted_mode;
+static int ldlm_reprocess_queue(struct ldlm_lock *lock,
+                                struct list_head *converting,
+                                struct list_head *granted_list)
+{
+        struct list_head *tmp, *pos;
+        int incompat = 0;
 
-                if (pending->l_completion_ast)
-                        pending->l_completion_ast(pending, NULL, NULL);
-                
+        list_for_each_safe(tmp, pos, converting) { 
+                struct ldlm_lock *pending;
+                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                incompat = ldlm_lock_compat(pending);
+                if (incompat)
+                        break;
 
+                list_del(&pending->l_res_link); 
+                ldlm_grant_lock(pending->l_resource, pending);
         }
 
-        return rc;
+        return incompat;
 }
 
+/* XXX: Revisit the error handling; we do not, for example, do
+ * ldlm_resource_put()s in our error cases, and we probably leak an allocated
+ * memory. */
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                                      __u32 ns_id,
-                                     struct ldlm_handle *parent_res_handle,
                                      struct ldlm_handle *parent_lock_handle,
                                      __u64 *res_id,
+                                     __u32 type,
                                      ldlm_mode_t mode,
                                      int *flags,
                                      ldlm_lock_callback completion,
                                      ldlm_lock_callback blocking,
-                                     __u32 data_len,
                                      void *data,
+                                     __u32 data_len,
                                      struct ldlm_handle *lockh)
 {
         struct ldlm_namespace *ns;
@@ -150,13 +171,15 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         struct ldlm_lock *lock, *parent_lock;
         int incompat = 0, rc;
         __u64 new_id[RES_NAME_SIZE];
-        ldlm_res_compat compat;
         ldlm_res_policy policy;
 
         ENTRY;
-
-        parent_res = ldlm_handle2object(parent_res_handle);
+        
         parent_lock = ldlm_handle2object(parent_lock_handle);
+        if ( parent_lock ) 
+                parent_res = parent_lock->l_resource;
+        else 
+                parent_res = NULL;
 
         ns = ldlm_namespace_find(obddev, ns_id);
         if (ns == NULL || ns->ns_hash == NULL) 
@@ -171,7 +194,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                 }
         }
 
-        res = ldlm_resource_get(ns, parent_res, res_id, 1);
+        res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
         if (res == NULL)
                 RETURN(-ENOMEM);
 
@@ -179,6 +202,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         if (lock == NULL)
                 RETURN(-ENOMEM);
 
+        lock->l_data = data;
+        lock->l_data_len = data_len;
         if ((*flags) & LDLM_FL_COMPLETION_AST)
                 lock->l_completion_ast = completion;
         if ((*flags) & LDLM_FL_BLOCKING_AST)
@@ -189,53 +214,69 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         /* FIXME: We may want to optimize by checking lr_most_restr */
 
         if (!list_empty(&res->lr_converting)) {
-                list_add(&lock->l_res_link, res->lr_waiting.prev);
-                rc = -ELDLM_BLOCK_CONV;
-                GOTO(out, rc);
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                GOTO(out, rc = -ELDLM_BLOCK_CONV);
         }
         if (!list_empty(&res->lr_waiting)) {
-                list_add(&lock->l_res_link, res->lr_waiting.prev);
-                rc = -ELDLM_BLOCK_WAIT;
-                GOTO(out, rc);
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                GOTO(out, rc = -ELDLM_BLOCK_WAIT);
         }
 
-        if (parent_res &&
-            (compat = ldlm_res_compat_table[parent_res->lr_type])) {
-                struct list_head *tmp;
-                list_for_each(tmp, &parent_res->lr_children) {
-                        struct ldlm_resource *child;
-                        child = list_entry(tmp, struct ldlm_resource,
-                                           lr_childof);
+        incompat = ldlm_lock_compat(lock);
+        if (incompat) {
+                ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
+        }
 
-                        if (compat(child, res))
-                                continue;
+        ldlm_grant_lock(res, lock);
+        GOTO(out, rc = ELDLM_OK);
 
-                        incompat |= ldlm_notify_incompatible(&child->lr_granted,
-                                                             lock);
-                }
-        } else
-                incompat = ldlm_notify_incompatible(&res->lr_granted, lock);
+ out:
+        spin_unlock(&res->lr_lock);
+        return rc;
+}
 
-        if (incompat) {
-                list_add(&lock->l_res_link, res->lr_waiting.prev);
-                rc = -ELDLM_BLOCK_GRANTED;
-                GOTO(out, rc);
+static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
+{
+        struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
+        struct list_head *tmp;
+        int do_waiting;
+
+        list_for_each(tmp, &parent_res->lr_children) {
+                struct ldlm_resource *child;
+                child = list_entry(tmp, struct ldlm_resource, lr_childof);
+
+                ldlm_reprocess_queue(lock, &child->lr_converting,
+                                     &child->lr_granted);
+                if (!list_empty(&child->lr_converting))
+                        do_waiting = 0;
         }
 
-        list_add(&lock->l_res_link, &res->lr_granted);
-        lock->l_granted_mode = mode;
-        if (mode < res->lr_most_restr)
-                res->lr_most_restr = mode;
+        if (!do_waiting)
+                return;
 
-        if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, NULL, NULL);
+        list_for_each(tmp, &parent_res->lr_children) {
+                struct ldlm_resource *child;
+                child = list_entry(tmp, struct ldlm_resource, lr_childof);
 
-        rc = ELDLM_OK;
-        GOTO(out, rc);
+                ldlm_reprocess_queue(lock, &child->lr_waiting,
+                                     &child->lr_granted);
+        }
+}
 
- out:
-        spin_unlock(&res->lr_lock);
-        return rc;
+static void ldlm_reprocess_all(struct ldlm_lock *lock)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        struct ldlm_resource *parent_res = res->lr_parent;
+
+        if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
+                ldlm_reprocess_res_compat(lock);
+                return;
+        }
+
+        ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
+        if (list_empty(&res->lr_converting))
+                ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
 }
 
 ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
@@ -245,43 +286,37 @@ ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
         struct ldlm_resource *res;
         ENTRY;
 
-        lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+        lock = ldlm_handle2object(lockh);
         res = lock->l_resource;
-        list_del(&lock->l_res_link);
 
-        kmem_cache_free(ldlm_lock_slab, lock);
-        if (ldlm_resource_put(res)) {
-                EXIT;
-                return 0;
-        }
+        ldlm_resource_del_lock(lock);
 
-        ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted);
-        if (list_empty(&res->lr_converting))
-                ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted);
+        kmem_cache_free(ldlm_lock_slab, lock);
+        if (ldlm_resource_put(res))
+                RETURN(ELDLM_OK);
+        ldlm_reprocess_all(lock);
 
-        return 0;
+        RETURN(ELDLM_OK);
 }
 
 ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
                                      struct ldlm_handle *lockh,
-                                     int new_mode, int flags)
+                                     int new_mode, int *flags)
 {
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
         ENTRY;
 
-        lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+        lock = ldlm_handle2object(lockh);
         res = lock->l_resource;
         list_del(&lock->l_res_link);
         lock->l_req_mode = new_mode;
 
-        list_add(&lock->l_res_link, &res->lr_converting);
+        list_add(&lock->l_res_link, res->lr_converting.prev);
 
-        ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted);
-        if (list_empty(&res->lr_converting))
-                ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted);
+        ldlm_reprocess_all(lock);
 
-        return 0;
+        RETURN(ELDLM_OK);
 }
 
 void ldlm_lock_dump(struct ldlm_lock *lock)
index fbc74ca..2628812 100644 (file)
@@ -26,7 +26,7 @@ extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 
 static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                                void *data)
+                                void *data, __u32 data_len)
 {
         LBUG();
         return 0;
@@ -55,14 +55,17 @@ static int ldlm_enqueue(struct ptlrpc_request *req)
 
         msg->xid = req_msg->xid;
 
-        err = ldlm_local_lock_enqueue(req->rq_obd, dlm_req->ns_id,
-                                      &dlm_req->parent_res_handle,
+        err = ldlm_local_lock_enqueue(req->rq_obd,
+                                      dlm_req->ns_id,
                                       &dlm_req->parent_lock_handle,
-                                      dlm_req->res_id, dlm_req->mode,
-                                      &dlm_req->flags, ldlm_client_callback,
+                                      dlm_req->res_id,
+                                      dlm_req->res_type,
+                                      dlm_req->mode,
+                                      &dlm_req->flags,
+                                      ldlm_client_callback,
                                       ldlm_client_callback,
-                                      req_msg->buflens[1],
                                       lustre_msg_buf(1, req_msg),
+                                      req_msg->buflens[1],
                                       &dlm_rep->lock_handle);
         msg->status = HTON__u32(err);
 
@@ -190,6 +193,8 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
                                              LDLM_REQUEST_PORTAL,
                                              LDLM_REPLY_PORTAL,
                                              "self", ldlm_handle);
+        if (!ldlm->ldlm_service)
+                LBUG();
 
         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
         if (err)
@@ -199,6 +204,45 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
         RETURN(0);
 }
 
+static int do_free_namespace(struct ldlm_namespace *ns)
+{
+        struct list_head *tmp, *pos;
+        int i, rc;
+
+        for (i = 0; i < RES_HASH_SIZE; i++) {
+                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                        struct ldlm_resource *res;
+                        res = list_entry(tmp, struct ldlm_resource, lr_hash);
+                        list_del_init(&res->lr_hash);
+
+                        rc = 0;
+                        while (rc == 0)
+                                rc = ldlm_resource_put(res);
+                }
+        }
+
+        return ldlm_namespace_free(ns);
+}
+
+static int ldlm_free_all(struct obd_device *obddev)
+{
+        struct list_head *tmp, *pos;
+        int rc = 0;
+
+        ldlm_lock(obddev);
+
+        list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { 
+                struct ldlm_namespace *ns;
+                ns = list_entry(tmp, struct ldlm_namespace, ns_link);
+
+                rc |= do_free_namespace(ns);
+        }
+
+        ldlm_unlock(obddev);
+
+        return rc;
+}
+
 static int ldlm_cleanup(struct obd_device *obddev)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
@@ -212,9 +256,13 @@ static int ldlm_cleanup(struct obd_device *obddev)
                 CERROR("Request list not empty!\n");
         }
 
-        rpc_unregister_service(ldlm->ldlm_service);
         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
 
+        if (ldlm_free_all(obddev)) {
+                CERROR("ldlm_free_all could not complete.\n");
+                RETURN(-1);
+        }
+
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
index e00baa9..db33639 100644 (file)
@@ -31,8 +31,6 @@ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id)
         struct list_head *tmp;
         struct ldlm_namespace *res;
 
-        ldlm_lock(obddev);
-
         res = NULL;
         list_for_each(tmp, &obddev->u.ldlm.ldlm_namespaces) { 
                 struct ldlm_namespace *chk;
@@ -44,8 +42,6 @@ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id)
                 }
         }
 
-        ldlm_unlock(obddev);
-
         return res;
 }
 
@@ -62,10 +58,9 @@ static void res_hash_init(struct ldlm_namespace *ns)
         if (!res_hash)
                 LBUG();
 
-        for (bucket = res_hash + RES_HASH_SIZE-1 ; bucket >= res_hash ;
-             bucket--) {
+        for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash;
+             bucket--)
                 INIT_LIST_HEAD(bucket);
-        }
 
         ns->ns_hash = res_hash;
 }
@@ -74,8 +69,6 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id)
 {
         struct ldlm_namespace *ns;
 
-        ldlm_lock(obddev);
-
         if (ldlm_namespace_find(obddev, id))
                 LBUG();
 
@@ -89,20 +82,30 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id)
         list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces);
 
         res_hash_init(ns); 
-
-        ldlm_unlock(obddev);
+        atomic_set(&ns->ns_refcount, 0);
 
         return ns;
 }
 
+int ldlm_namespace_free(struct ldlm_namespace *ns)
+{
+        if (atomic_read(&ns->ns_refcount))
+                return -EBUSY;
+
+        list_del(&ns->ns_link);
+        OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE);
+        OBD_FREE(ns, sizeof(*ns));
+
+        return 0;
+}
+
 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
 {
         __u32 hash = 0;
         int i;
 
-        for (i = 0; i < RES_NAME_SIZE; i++) {
+        for (i = 0; i < RES_NAME_SIZE; i++)
                 hash += name[i];
-        }
 
         hash += (__u32)((unsigned long)parent >> 4);
 
@@ -119,6 +122,7 @@ static struct ldlm_resource *ldlm_resource_new(void)
         memset(res, 0, sizeof(*res));
 
         INIT_LIST_HEAD(&res->lr_children);
+        INIT_LIST_HEAD(&res->lr_childof);
         INIT_LIST_HEAD(&res->lr_granted);
         INIT_LIST_HEAD(&res->lr_converting);
         INIT_LIST_HEAD(&res->lr_waiting);
@@ -133,7 +137,7 @@ static struct ldlm_resource *ldlm_resource_new(void)
 /* ldlm_lock(obddev) must be taken before calling resource_add */
 static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
                                                struct ldlm_resource *parent,
-                                               __u64 *name)
+                                               __u64 *name, __u32 type)
 {
         struct list_head *bucket;
         struct ldlm_resource *res;
@@ -146,7 +150,13 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
 
         memcpy(res->lr_name, name, RES_NAME_SIZE * sizeof(__u32));
         res->lr_namespace = ns;
+        if (type < 0 || type > LDLM_MAX_TYPE) 
+                LBUG();
+
+        res->lr_type = type; 
+        res->lr_most_restr = LCK_NL;
         list_add(&res->lr_hash, bucket);
+        atomic_inc(&ns->ns_refcount);
         if (parent == NULL) {
                 res->lr_parent = res;
                 list_add(&res->lr_rootlink, &ns->ns_root_list);
@@ -160,7 +170,7 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
 
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
-                                        __u64 *name, int create)
+                                        __u64 *name, __u32 type, int create)
 {
         struct list_head *bucket;
         struct list_head *tmp = bucket;
@@ -172,8 +182,6 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                 RETURN(NULL);
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
 
-        ldlm_lock(ns->ns_obddev);
-
         res = NULL;
         list_for_each(tmp, bucket) {
                 struct ldlm_resource *chk;
@@ -188,9 +196,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
         }
 
         if (res == NULL && create)
-                res = ldlm_resource_add(ns, parent, name);
-
-        ldlm_unlock(ns->ns_obddev);
+                res = ldlm_resource_add(ns, parent, name, type);
 
         RETURN(res);
 }
@@ -198,7 +204,9 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
 int ldlm_resource_put(struct ldlm_resource *res)
 {
         int rc = 0; 
-        ldlm_lock(res->lr_namespace->ns_obddev);
+
+        if (atomic_read(&res->lr_refcount) <= 0)
+                LBUG();
 
         if (atomic_dec_and_test(&res->lr_refcount)) {
                 if (!list_empty(&res->lr_granted))
@@ -210,6 +218,7 @@ int ldlm_resource_put(struct ldlm_resource *res)
                 if (!list_empty(&res->lr_waiting))
                         LBUG();
 
+                atomic_dec(&res->lr_namespace->ns_refcount);
                 list_del(&res->lr_hash);
                 list_del(&res->lr_rootlink);
                 list_del(&res->lr_childof);
@@ -217,10 +226,23 @@ int ldlm_resource_put(struct ldlm_resource *res)
                 kmem_cache_free(ldlm_resource_slab, res);
                 rc = 1;
         }
-        ldlm_unlock(res->lr_namespace->ns_obddev);
+
         return rc; 
 }
 
+void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
+                            struct ldlm_lock *lock)
+{
+        list_add(&lock->l_res_link, head);
+        atomic_inc(&res->lr_refcount);
+}
+
+void ldlm_resource_del_lock(struct ldlm_lock *lock)
+{
+        list_del(&lock->l_res_link);
+        atomic_dec(&lock->l_resource->lr_refcount);
+}
+
 int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
 {
         LBUG();
index bef9aad..915b824 100644 (file)
 
 #include <linux/lustre_dlm.h>
 
-static void ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new)
+static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+                               void *data, __u32 data_len)
 {
         printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new);
+        return 0;
 }
 
-int ldlm_test(struct obd_device *obddev)
+int ldlm_test_basics(struct obd_device *obddev)
 {
         struct ldlm_namespace *ns;
         struct ldlm_resource *res;
-        __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
+        __u64 res_id[RES_NAME_SIZE] = {1, 4, 3};
         ldlm_error_t err;
         struct ldlm_handle h;
         int flags;
 
+        ldlm_lock(obddev);
+
         ns = ldlm_namespace_new(obddev, 1);
         if (ns == NULL)
                 LBUG();
 
-        res = ldlm_resource_get(ns, NULL, res_id, 1);
+        res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
         if (res == NULL)
                 LBUG();
 
-        res->lr_blocking = ldlm_test_callback;
-
         /* Get a couple of read locks */
-        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR,
-                                      &flags, NULL, NULL, 0, NULL, &h);
+        flags = LDLM_FL_BLOCKING_AST;
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
+                                      LCK_CR, &flags, NULL, ldlm_test_callback,
+                                      NULL, 0, &h);
         if (err != ELDLM_OK)
                 LBUG();
 
-        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR,
-                                      &flags, NULL, NULL, 0, NULL, &h);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
+                                      LCK_EX, &flags, NULL, ldlm_test_callback,
+                                      NULL, 0, &h);
+        if (err != -ELDLM_BLOCK_GRANTED)
+                LBUG();
+
+        ldlm_resource_dump(res);
+
+        ldlm_unlock(obddev);
+
+        return 0;
+}
+
+int ldlm_test_extents(struct obd_device *obddev)
+{
+        struct ldlm_namespace *ns;
+        struct ldlm_resource *res;
+        __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0};
+        __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3};
+        __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3};
+        __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3};
+        ldlm_error_t err;
+        struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h;
+        int flags;
+
+        ldlm_lock(obddev);
+
+        ns = ldlm_namespace_new(obddev, 2);
+        if (ns == NULL)
+                LBUG();
+
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT,
+                                      LCK_NL, &flags, NULL, NULL, NULL, 0,
+                                      &file_h);
+        if (err != ELDLM_OK)
+                LBUG();
+
+        flags = 0;
+        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT,
+                                      LCK_PR, &flags, NULL, NULL, NULL, 0,
+                                      &ext1_h);
         if (err != ELDLM_OK)
                 LBUG();
+        if (!(flags & LDLM_FL_RES_CHANGED))
+                LBUG();
 
+        flags = 0;
+        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT,
+                                      LCK_PR, &flags, NULL, NULL, NULL, 0,
+                                      &ext2_h);
+        if (err != ELDLM_OK)
+                LBUG();
+        if (!(flags & LDLM_FL_RES_CHANGED))
+                LBUG();
+
+        flags = 0;
+        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT,
+                                      LCK_EX, &flags, NULL, NULL, NULL, 0,
+                                      &ext3_h);
+        if (err != -ELDLM_BLOCK_GRANTED)
+                LBUG();
+        if (flags & LDLM_FL_RES_CHANGED)
+                LBUG();
+
+        /* Convert/cancel blocking locks */
+        flags = 0;
+        err = ldlm_local_lock_convert(obddev, &ext1_h, LCK_NL, &flags);
+        if (err != ELDLM_OK)
+                LBUG();
+
+        flags = 0;
+        err = ldlm_local_lock_cancel(obddev, &ext1_h);
+        if (err != ELDLM_OK)
+                LBUG();
+
+        /* Dump the results */
+        res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0);
+        if (res == NULL)
+                LBUG();
+        ldlm_resource_dump(res);
+        res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0);
+        if (res == NULL)
+                LBUG();
+        ldlm_resource_dump(res);
+        res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0);
+        if (res == NULL)
+                LBUG();
         ldlm_resource_dump(res);
 
+        ldlm_unlock(obddev);
+
         return 0;
 }
+
+int ldlm_test(struct obd_device *obddev)
+{
+        int rc; 
+        rc = ldlm_test_basics(obddev);
+        if (rc) 
+                RETURN(rc);
+
+        rc = ldlm_test_extents(obddev);
+        RETURN(rc); 
+}
index 6aa6ab5..b9db2cb 100644 (file)
@@ -536,7 +536,6 @@ static int mds_cleanup(struct obd_device * obddev)
                 CERROR("Request list not empty!\n");
         }
 
-        rpc_unregister_service(mds->mds_service);
         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
 
         sb = mds->mds_sb;
index 1612597..4ec408e 100644 (file)
@@ -41,33 +41,25 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc)
 
         spin_lock(&svc->srv_lock); 
         if (sigismember(&(current->pending.signal), SIGKILL) ||
-            sigismember(&(current->pending.signal), SIGSTOP) ||
-            sigismember(&(current->pending.signal), SIGCONT) ||
+            sigismember(&(current->pending.signal), SIGTERM) ||
             sigismember(&(current->pending.signal), SIGINT)) { 
                 svc->srv_flags |= SVC_KILLED;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 1);
         }
 
-        if ( svc->srv_flags & SVC_STOPPING ) {
-                EXIT;
-                rc = 1;
-                goto out;
-        }
+        if (svc->srv_flags & SVC_STOPPING)
+                GOTO(out, rc = 1);
 
         if (svc->srv_flags & SVC_EVENT)
                 LBUG();
 
-        if ( svc->srv_eq_h ) { 
+        if (svc->srv_eq_h) { 
                 int err;
                 err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
 
                 if (err == PTL_OK) { 
                         svc->srv_flags |= SVC_EVENT;
-                        EXIT;
-                        rc = 1;
-                        goto out;
+                        GOTO(out, rc = 1);
                 }
 
                 if (err != PTL_EQ_EMPTY) {
@@ -75,16 +67,12 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc)
                         LBUG();
                 }
 
-                EXIT;
-                rc = 0;
-                goto out;
+                GOTO(out, rc = 0);
         }
 
         if (!list_empty(&svc->srv_reqs)) {
                 svc->srv_flags |= SVC_LIST;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 1);
         }
 
         EXIT;
@@ -192,14 +180,14 @@ static int ptlrpc_main(void *arg)
                 
                 spin_lock(&svc->srv_lock);
                 if (svc->srv_flags & SVC_SIGNAL) {
-                        EXIT;
                         spin_unlock(&svc->srv_lock);
+                        EXIT;
                         break;
                 }
 
                 if (svc->srv_flags & SVC_STOPPING) {
-                        EXIT;
                         spin_unlock(&svc->srv_lock);
+                        EXIT;
                         break;
                 }
 
@@ -246,6 +234,7 @@ static int ptlrpc_main(void *arg)
                 }
                 CERROR("unknown break in service"); 
                 spin_unlock(&svc->srv_lock);
+                EXIT;
                 break; 
         }
 
@@ -281,18 +270,15 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
         init_waitqueue_head(&svc->srv_ctl_waitq);
         rc = kernel_thread(ptlrpc_main, (void *) &d, 
                            CLONE_VM | CLONE_FS | CLONE_FILES);
-        if (rc < 0) { 
+        if (rc < 0) {
                 CERROR("cannot start thread\n"); 
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
 
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
-
-
 int rpc_unregister_service(struct ptlrpc_service *service)
 {
         int rc, i;
index a7b0227..328dd76 100644 (file)
@@ -63,6 +63,7 @@ setup() {
 
     insmod $SRCDIR/../../obd/class/obdclass.o || exit -1
     insmod $SRCDIR/../../obd/rpc/ptlrpc.o || exit -1
+    insmod $SRCDIR/../../obd/ldlm/ldlm.o || exit -1
     insmod $SRCDIR/../../obd/ext2obd/obdext2.o || exit -1
     insmod $SRCDIR/../../obd/ost/ost.o || exit -1
     insmod $SRCDIR/../../obd/osc/osc.o || exit -1
index 1ab3d24..462d5df 100755 (executable)
@@ -3,10 +3,13 @@
 SRCDIR="`dirname $0`"
 . $SRCDIR/common.sh
 
-setup_ldlm
+NETWORK=tcp
+LOCALHOST=localhost
+SERVER=localhost
+PORT=1234
 
-mknod /dev/obd c 10 241
-echo 0xffffffff > /proc/sys/portals/debug
+setup
+setup_portals
 
 $OBDCTL <<EOF
 device 0
index 6f52eda..1267fe0 100755 (executable)
@@ -29,6 +29,7 @@ rmmod mds
 rmmod osc
 rmmod ost
 rmmod obdext2
+rmmod ldlm
 rmmod ptlrpc
 rmmod obdclass