Whamcloud - gitweb
- Don't abort if fig2dev doesn't exist
authorpschwan <pschwan>
Thu, 4 Apr 2002 04:22:42 +0000 (04:22 +0000)
committerpschwan <pschwan>
Thu, 4 Apr 2002 04:22:42 +0000 (04:22 +0000)
- first chunk of DLM changes for new extents
- fixups to work with the new Portals tip

lustre/doc/Makefile.am
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_test.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index d379eb0..a0c5952 100644 (file)
@@ -17,7 +17,7 @@ all: master.pdf
        $(LYX2PDF) $< 2>/dev/null || printf "\n*** Warning: not creating PDF docs; install lyx to rectify this\n"
 
 .fig.eps:
-       fig2dev -L eps $< > $@
+       -fig2dev -L eps $< > $@
 
 master.pdf: master.lyx bigpicture.eps intermezzo.eps mds.eps portals-lib.eps client.eps layering.eps metadata.eps sb.eps cow.eps lockacq.eps obdfs.eps snapsetup.eps dirbodyapi.eps loraid.eps ost.eps updates.eps hotmigrate.eps lustreclusters.eps osthw.eps
 
index 39c6eb1..c720b3a 100644 (file)
@@ -23,10 +23,10 @@ typedef enum {
         ELDLM_BLOCK_CONV,
         ELDLM_BLOCK_WAIT,
         ELDLM_BAD_NAMESPACE,
-        ELDLM_RES_CHANGED
+        ELDLM_LOCK_CHANGED
 } ldlm_error_t;
 
-#define LDLM_FL_RES_CHANGED    (1 << 0)
+#define LDLM_FL_LOCK_CHANGED   (1 << 0)
 #define LDLM_FL_COMPLETION_AST (1 << 1)
 #define LDLM_FL_BLOCKING_AST   (1 << 2)
 
@@ -111,15 +111,17 @@ struct ldlm_lock {
         struct lustre_peer   *l_peer;
         void                 *l_data;
         __u32                 l_data_len;
+        struct ldlm_extent    l_extent;
         //void                 *l_event;
         //XXX cluster_host    l_holder;
         __u32                 l_version[RES_VERSION_SIZE];
 };
 
-typedef int (*ldlm_res_compat)(struct ldlm_resource *child,
-                               struct ldlm_resource *new);
-typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, __u64 *res_id_in,
-                               __u64 *res_id_out, ldlm_mode_t mode, void *data);
+typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new);
+typedef int (*ldlm_res_policy)(struct ldlm_resource *parent,
+                               struct ldlm_extent *req_ex,
+                               struct ldlm_extent *new_ex,
+                               ldlm_mode_t mode, void *data);
 
 #define LDLM_PLAIN       0x0
 #define LDLM_EXTENT      0x1
@@ -142,19 +144,14 @@ struct ldlm_resource {
         struct list_head       lr_waiting;
         ldlm_mode_t            lr_most_restr;
         atomic_t               lr_refcount;
+        __u32                  lr_type; /* PLAIN, EXTENT, or MDSINTENT */
         struct ldlm_resource  *lr_root;
         //XXX cluster_host          lr_master;
         __u64                  lr_name[RES_NAME_SIZE];
         __u32                  lr_version[RES_VERSION_SIZE];
-        __u32                  lr_type; /* PLAIN, EXTENT, or MDSINTENT */
         spinlock_t             lr_lock;
 };
 
-struct ldlm_extent {
-        __u64 start;
-        __u64 end;
-};
-
 static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
 {
         return (struct ldlm_extent *)(res->lr_name);
@@ -185,16 +182,18 @@ static inline void ldlm_unlock(struct obd_device *obddev)
 extern struct obd_ops ldlm_obd_ops;
 
 /* ldlm_extent.c */
-int ldlm_extent_compat(struct ldlm_resource *, struct ldlm_resource *);
-int ldlm_extent_policy(struct ldlm_resource *, __u64 *, __u64 *,
-                       ldlm_mode_t, void *);
+int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
+int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *,
+                       struct ldlm_extent *, ldlm_mode_t, void *);
 
 /* ldlm_lock.c */
+void ldlm_lock_free(struct ldlm_lock *lock);
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                                      __u32 ns_id,
                                      struct ldlm_handle *parent_lock_handle,
                                      __u64 *res_id,
                                      __u32 type,
+                                     struct ldlm_extent *req_ex,
                                      ldlm_mode_t mode,
                                      int *flags,
                                      ldlm_lock_callback completion,
index d036a5a..75de35d 100644 (file)
@@ -367,11 +367,17 @@ struct ldlm_handle {
         __u64 cookie;
 };
 
+struct ldlm_extent {
+        __u64 start;
+        __u64 end;
+};
+
 struct ldlm_request {
         __u32 ns_id;
         __u64 res_id[RES_NAME_SIZE];
         __u32 res_type;
         __u32 flags;
+        struct ldlm_extent lock_extent;
         struct ldlm_handle parent_res_handle;
         struct ldlm_handle parent_lock_handle;
         ldlm_mode_t mode;
index 2f92a1e..ff5c512 100644 (file)
  *
  * This helps to find conflicts between read and write locks on overlapping
  * extents. */
-int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new)
+int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
 {
-        struct ldlm_extent *child_ex, *new_ex;
-
-        child_ex = ldlm_res2extent(child);
-        new_ex = ldlm_res2extent(new);
-
-        if (MAX(child_ex->start, new_ex->start) <=
-            MIN(child_ex->end, new_ex->end))
+        if (MAX(a->l_extent.start, b->l_extent.start) <=
+            MIN(a->l_extent.end, b->l_extent.end))
                 return 0;
 
         return 1;
 }
 
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing extents outside the requested one
- *
- * An alternative policy is to not shrink the new extent when conflicts exist.
- *
- * To reconstruct our formulas, take a deep breath. */
-int ldlm_extent_policy(struct ldlm_resource *parent,
-                       __u64 *res_id_in, __u64 *res_id_out,
-                       ldlm_mode_t mode, void *data)
+static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex,
+                            struct ldlm_extent *new_ex, ldlm_mode_t mode)
 {
-        struct ldlm_extent *new_ex, *req_ex;
         struct list_head *tmp;
-        int rc = 0;
-
-        req_ex = (struct ldlm_extent *)res_id_in;
-
-        new_ex = (struct ldlm_extent *)res_id_out;
-        new_ex->start = 0;
-        new_ex->end = ~0;
 
-        list_for_each(tmp, &parent->lr_children) {
-                struct ldlm_resource *res;
-                struct ldlm_extent *exist_ex;
-                res = list_entry(tmp, struct ldlm_resource, lr_childof);
+        list_for_each(tmp, queue) {
+                struct ldlm_lock *lock;
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                exist_ex = ldlm_res2extent(res);
-
-                if (exist_ex->end < req_ex->start)
-                        new_ex->start = MIN(exist_ex->end, new_ex->start);
+                if (lock->l_extent.end < req_ex->start)
+                        new_ex->start = MIN(lock->l_extent.end, new_ex->start);
                 else {
-                        if (exist_ex->start < req_ex->start &&
-                            !lockmode_compat(res->lr_most_restr, mode))
+                        if (lock->l_extent.start < req_ex->start &&
+                            !lockmode_compat(lock->l_req_mode, mode))
                                 /* Policy: minimize conflict overlap */
                                 new_ex->start = req_ex->start;
                 }
-                if (exist_ex->start > req_ex->end)
-                        new_ex->end = MAX(exist_ex->start, new_ex->end);
+                if (lock->l_extent.start > req_ex->end)
+                        new_ex->end = MAX(lock->l_extent.start, new_ex->end);
                 else {
-                        if (exist_ex->end > req_ex->end &&
-                            !lockmode_compat(res->lr_most_restr, mode))
+                        if (lock->l_extent.end > req_ex->end &&
+                            !lockmode_compat(lock->l_req_mode, mode))
                                 /* Policy: minimize conflict overlap */
                                 new_ex->end = req_ex->end;
                 }
         }
+}
 
-        if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
-                rc = ELDLM_RES_CHANGED;
+/* The purpose of this function is to return:
+ * - the maximum extent
+ * - containing the requested extent
+ * - and not overlapping existing extents outside the requested one
+ *
+ * An alternative policy is to not shrink the new extent when conflicts exist.
+ *
+ * To reconstruct our formulas, take a deep breath. */
+int ldlm_extent_policy(struct ldlm_resource *res, struct ldlm_extent *req_ex,
+                       struct ldlm_extent *new_ex, ldlm_mode_t mode, void *data)
+{
+        new_ex->start = 0;
+        new_ex->end = ~0;
 
-        return rc;
-}
+        if (!res)
+                LBUG();
+
+        policy_internal(&res->lr_granted, req_ex, new_ex, mode);
+        policy_internal(&res->lr_converting, req_ex, new_ex, mode);
+        policy_internal(&res->lr_waiting, req_ex, new_ex, mode);
 
+        if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
+                return ELDLM_LOCK_CHANGED;
+        return 0;
+}
index 137df0d..286e1ad 100644 (file)
 
 extern kmem_cache_t *ldlm_lock_slab;
 
+static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
+static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b);
+
 ldlm_res_compat ldlm_res_compat_table [] = {
-        [LDLM_PLAIN] NULL,
+        [LDLM_PLAIN] ldlm_plain_compat,
         [LDLM_EXTENT] ldlm_extent_compat,
-        [LDLM_MDSINTENT] NULL
+        [LDLM_MDSINTENT] ldlm_intent_compat
 };
 
 ldlm_res_policy ldlm_res_policy_table [] = {
@@ -39,6 +42,17 @@ ldlm_res_policy ldlm_res_policy_table [] = {
         [LDLM_MDSINTENT] NULL
 };
 
+static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
+{
+        return lockmode_compat(a->l_req_mode, b->l_req_mode);
+}
+
+static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
+{
+        LBUG();
+        return 0;
+}
+
 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
                                        struct ldlm_resource *resource,
                                        ldlm_mode_t mode)
@@ -65,57 +79,37 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         return lock;
 }
 
-static int ldlm_notify_incompatible(struct list_head *list,
-                                    struct ldlm_lock *new)
+void ldlm_lock_free(struct ldlm_lock *lock)
+{
+        kmem_cache_free(ldlm_lock_slab, lock);
+}
+
+static int ldlm_lock_compat(struct ldlm_lock *lock)
 {
         struct list_head *tmp;
         int rc = 0;
 
-        list_for_each(tmp, list) {
-                struct ldlm_lock *lock;
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
+        list_for_each(tmp, &lock->l_resource->lr_granted) {
+                struct ldlm_lock *child;
+                ldlm_res_compat compat;
+
+                child = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                compat = ldlm_res_compat_table[child->l_resource->lr_type];
+                if (compat(child, lock) ||
+                    lockmode_compat(child->l_req_mode, lock->l_req_mode))
                         continue;
 
                 rc = 1;
 
-                if (lock->l_blocking_ast != NULL)
-                        lock->l_blocking_ast(lock, new, lock->l_data,
-                                             lock->l_data_len);
+                if (child->l_blocking_ast != NULL)
+                        child->l_blocking_ast(child, lock, child->l_data,
+                                              child->l_data_len);
         }
 
         return rc;
 }
 
-static int ldlm_lock_compat(struct ldlm_lock *lock)
-{
-        struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
-        ldlm_res_compat compat;
-
-        if (parent_res &&
-            (compat = ldlm_res_compat_table[parent_res->lr_type])) {
-                struct list_head *tmp;
-                int incompat = 0;
-                list_for_each(tmp, &parent_res->lr_children) {
-                        struct ldlm_resource *child;
-                        child = list_entry(tmp, struct ldlm_resource,
-                                           lr_childof);
-
-                        /* compat will return 0 when child == l_resource
-                         * hence notifications on the same resource are incl. */
-                        if (compat(child, lock->l_resource))
-                                continue;
-
-                        incompat |= ldlm_notify_incompatible(&child->lr_granted,
-                                                             lock);
-                }
-
-                return incompat;
-        }
-
-        return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
-}
-
 static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
 {
         ldlm_resource_add_lock(res, &res->lr_granted, lock);
@@ -128,36 +122,15 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
                 lock->l_completion_ast(lock, NULL, NULL, 0);
 }
 
-static int ldlm_reprocess_queue(struct ldlm_lock *lock,
-                                struct list_head *converting,
-                                struct list_head *granted_list)
-{
-        struct list_head *tmp, *pos;
-        int incompat = 0;
-
-        list_for_each_safe(tmp, pos, converting) { 
-                struct ldlm_lock *pending;
-                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
-
-                incompat = ldlm_lock_compat(pending);
-                if (incompat)
-                        break;
-
-                list_del(&pending->l_res_link); 
-                ldlm_grant_lock(pending->l_resource, pending);
-        }
-
-        return incompat;
-}
-
 /* XXX: Revisit the error handling; we do not, for example, do
- * ldlm_resource_put()s in our error cases, and we probably leak an allocated
+ * ldlm_resource_put()s in our error cases, and we probably leak any allocated
  * memory. */
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                                      __u32 ns_id,
                                      struct ldlm_handle *parent_lock_handle,
                                      __u64 *res_id,
                                      __u32 type,
+                                     struct ldlm_extent *req_ex,
                                      ldlm_mode_t mode,
                                      int *flags,
                                      ldlm_lock_callback completion,
@@ -169,14 +142,14 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         struct ldlm_namespace *ns;
         struct ldlm_resource *res, *parent_res;
         struct ldlm_lock *lock, *parent_lock;
+        struct ldlm_extent new_ex;
         int incompat = 0, rc;
-        __u64 new_id[RES_NAME_SIZE];
         ldlm_res_policy policy;
 
         ENTRY;
-        
+
         parent_lock = ldlm_handle2object(parent_lock_handle);
-        if ( parent_lock ) 
+        if (parent_lock)
                 parent_res = parent_lock->l_resource;
         else 
                 parent_res = NULL;
@@ -185,15 +158,6 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         if (ns == NULL || ns->ns_hash == NULL) 
                 RETURN(-ELDLM_BAD_NAMESPACE);
 
-        if (parent_res &&
-            (policy = ldlm_res_policy_table[parent_res->lr_type])) {
-                rc = policy(parent_res, res_id, new_id, mode, NULL);
-                if (rc == ELDLM_RES_CHANGED) {
-                        *flags |= LDLM_FL_RES_CHANGED;
-                        memcpy(res_id, new_id, sizeof(*new_id));
-                }
-        }
-
         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
         if (res == NULL)
                 RETURN(-ENOMEM);
@@ -202,6 +166,19 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         if (lock == NULL)
                 RETURN(-ENOMEM);
 
+        if ((policy = ldlm_res_policy_table[type])) {
+                rc = policy(res, req_ex, &new_ex, mode, NULL);
+                if (rc == ELDLM_LOCK_CHANGED) {
+                        *flags |= LDLM_FL_LOCK_CHANGED;
+                        memcpy(req_ex, &new_ex, sizeof(new_ex));
+                }
+        }
+
+        if ((type == LDLM_EXTENT && !req_ex) ||
+            (type != LDLM_EXTENT && req_ex))
+                LBUG();
+        if (req_ex)
+                memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
         lock->l_data = data;
         lock->l_data_len = data_len;
         if ((*flags) & LDLM_FL_COMPLETION_AST)
@@ -236,47 +213,32 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
         return rc;
 }
 
-static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
+static int ldlm_reprocess_queue(struct ldlm_resource *res,
+                                struct list_head *converting)
 {
-        struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
-        struct list_head *tmp;
-        int do_waiting;
-
-        list_for_each(tmp, &parent_res->lr_children) {
-                struct ldlm_resource *child;
-                child = list_entry(tmp, struct ldlm_resource, lr_childof);
-
-                ldlm_reprocess_queue(lock, &child->lr_converting,
-                                     &child->lr_granted);
-                if (!list_empty(&child->lr_converting))
-                        do_waiting = 0;
-        }
+        struct list_head *tmp, *pos;
+        int incompat = 0;
 
-        if (!do_waiting)
-                return;
+        list_for_each_safe(tmp, pos, converting) { 
+                struct ldlm_lock *pending;
+                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-        list_for_each(tmp, &parent_res->lr_children) {
-                struct ldlm_resource *child;
-                child = list_entry(tmp, struct ldlm_resource, lr_childof);
+                incompat = ldlm_lock_compat(pending);
+                if (incompat)
+                        break;
 
-                ldlm_reprocess_queue(lock, &child->lr_waiting,
-                                     &child->lr_granted);
+                list_del(&pending->l_res_link); 
+                ldlm_grant_lock(res, pending);
         }
+
+        return incompat;
 }
 
-static void ldlm_reprocess_all(struct ldlm_lock *lock)
+static void ldlm_reprocess_all(struct ldlm_resource *res)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        struct ldlm_resource *parent_res = res->lr_parent;
-
-        if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
-                ldlm_reprocess_res_compat(lock);
-                return;
-        }
-
-        ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
+        ldlm_reprocess_queue(res, &res->lr_converting);
         if (list_empty(&res->lr_converting))
-                ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
+                ldlm_reprocess_queue(res, &res->lr_waiting);
 }
 
 ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
@@ -291,10 +253,10 @@ ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
 
         ldlm_resource_del_lock(lock);
 
-        kmem_cache_free(ldlm_lock_slab, lock);
+        ldlm_lock_free(lock);
         if (ldlm_resource_put(res))
                 RETURN(ELDLM_OK);
-        ldlm_reprocess_all(lock);
+        ldlm_reprocess_all(res);
 
         RETURN(ELDLM_OK);
 }
@@ -314,7 +276,7 @@ ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
 
         list_add(&lock->l_res_link, res->lr_converting.prev);
 
-        ldlm_reprocess_all(lock);
+        ldlm_reprocess_all(res);
 
         RETURN(ELDLM_OK);
 }
index 2628812..5dd68e9 100644 (file)
@@ -60,6 +60,7 @@ static int ldlm_enqueue(struct ptlrpc_request *req)
                                       &dlm_req->parent_lock_handle,
                                       dlm_req->res_id,
                                       dlm_req->res_type,
+                                      &dlm_req->lock_extent,
                                       dlm_req->mode,
                                       &dlm_req->flags,
                                       ldlm_client_callback,
@@ -204,6 +205,29 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
         RETURN(0);
 }
 
+static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
+{
+        struct list_head *tmp, *pos;
+        int rc = 0;
+
+        list_for_each_safe(tmp, pos, q) {
+                struct ldlm_lock *lock;
+
+                if (rc) {
+                        /* Res was already cleaned up. */
+                        LBUG();
+                }
+
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                ldlm_resource_del_lock(lock);
+                ldlm_lock_free(lock);
+                rc = ldlm_resource_put(res);
+        }
+
+        return rc;
+}
+
 static int do_free_namespace(struct ldlm_namespace *ns)
 {
         struct list_head *tmp, *pos;
@@ -215,7 +239,12 @@ static int do_free_namespace(struct ldlm_namespace *ns)
                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
                         list_del_init(&res->lr_hash);
 
-                        rc = 0;
+                        rc = cleanup_resource(res, &res->lr_granted);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_converting);
+                        if (!rc)
+                                rc = cleanup_resource(res, &res->lr_waiting);
+
                         while (rc == 0)
                                 rc = ldlm_resource_put(res);
                 }
index 915b824..4b185bd 100644 (file)
@@ -34,9 +34,9 @@ int ldlm_test_basics(struct obd_device *obddev)
 {
         struct ldlm_namespace *ns;
         struct ldlm_resource *res;
-        __u64 res_id[RES_NAME_SIZE] = {1, 4, 3};
+        __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
         ldlm_error_t err;
-        struct ldlm_handle h;
+        struct ldlm_handle lockh_1, lockh_2;
         int flags;
 
         ldlm_lock(obddev);
@@ -52,19 +52,25 @@ int ldlm_test_basics(struct obd_device *obddev)
         /* Get a couple of read locks */
         flags = LDLM_FL_BLOCKING_AST;
         err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
-                                      LCK_CR, &flags, NULL, ldlm_test_callback,
-                                      NULL, 0, &h);
+                                      NULL, LCK_CR, &flags, NULL,
+                                      ldlm_test_callback, NULL, 0, &lockh_1);
         if (err != ELDLM_OK)
                 LBUG();
 
         err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
-                                      LCK_EX, &flags, NULL, ldlm_test_callback,
-                                      NULL, 0, &h);
+                                      NULL, LCK_EX, &flags, NULL,
+                                      ldlm_test_callback, NULL, 0, &lockh_2);
         if (err != -ELDLM_BLOCK_GRANTED)
                 LBUG();
 
         ldlm_resource_dump(res);
 
+        err = ldlm_local_lock_convert(obddev, &lockh_1, LCK_NL, &flags);
+        if (err != ELDLM_OK)
+                LBUG();
+
+        ldlm_resource_dump(res);
+
         ldlm_unlock(obddev);
 
         return 0;
@@ -74,12 +80,10 @@ int ldlm_test_extents(struct obd_device *obddev)
 {
         struct ldlm_namespace *ns;
         struct ldlm_resource *res;
-        __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0};
-        __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3};
-        __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3};
-        __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3};
+        __u64 res_id[RES_NAME_SIZE] = {0, 0, 0};
+        struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11};
+        struct ldlm_handle ext1_h, ext2_h, ext3_h;
         ldlm_error_t err;
-        struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h;
         int flags;
 
         ldlm_lock(obddev);
@@ -88,37 +92,31 @@ int ldlm_test_extents(struct obd_device *obddev)
         if (ns == NULL)
                 LBUG();
 
-        err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT,
-                                      LCK_NL, &flags, NULL, NULL, NULL, 0,
-                                      &file_h);
-        if (err != ELDLM_OK)
-                LBUG();
-
         flags = 0;
-        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT,
-                                      LCK_PR, &flags, NULL, NULL, NULL, 0,
-                                      &ext1_h);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+                                      &ext1, LCK_PR, &flags, NULL, NULL, NULL,
+                                      0, &ext1_h);
         if (err != ELDLM_OK)
                 LBUG();
-        if (!(flags & LDLM_FL_RES_CHANGED))
+        if (!(flags & LDLM_FL_LOCK_CHANGED))
                 LBUG();
 
         flags = 0;
-        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT,
-                                      LCK_PR, &flags, NULL, NULL, NULL, 0,
-                                      &ext2_h);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+                                      &ext2, LCK_PR, &flags, NULL, NULL, NULL,
+                                      0, &ext2_h);
         if (err != ELDLM_OK)
                 LBUG();
-        if (!(flags & LDLM_FL_RES_CHANGED))
+        if (!(flags & LDLM_FL_LOCK_CHANGED))
                 LBUG();
 
         flags = 0;
-        err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT,
-                                      LCK_EX, &flags, NULL, NULL, NULL, 0,
-                                      &ext3_h);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+                                      &ext3, LCK_EX, &flags, NULL, NULL, NULL,
+                                      0, &ext3_h);
         if (err != -ELDLM_BLOCK_GRANTED)
                 LBUG();
-        if (flags & LDLM_FL_RES_CHANGED)
+        if (flags & LDLM_FL_LOCK_CHANGED)
                 LBUG();
 
         /* Convert/cancel blocking locks */
@@ -133,15 +131,7 @@ int ldlm_test_extents(struct obd_device *obddev)
                 LBUG();
 
         /* Dump the results */
-        res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0);
-        if (res == NULL)
-                LBUG();
-        ldlm_resource_dump(res);
-        res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0);
-        if (res == NULL)
-                LBUG();
-        ldlm_resource_dump(res);
-        res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0);
+        res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
         if (res == NULL)
                 LBUG();
         ldlm_resource_dump(res);
index 3289447..17f1a1d 100644 (file)
@@ -93,17 +93,18 @@ int server_request_callback(ptl_event_t *ev, void *data)
 
         service->srv_ref_count[index]++;
 
-        if (ev->unlinked_me != -1) {
+        if (ptl_is_valid_handle(&ev->unlinked_me)) {
                 int idx;
 
                 for (idx = 0; idx < service->srv_ring_length; idx++)
-                        if (service->srv_me_h[idx] == ev->unlinked_me)
+                        if (service->srv_me_h[idx].handle_idx ==
+                            ev->unlinked_me.handle_idx)
                                 break;
                 if (idx == service->srv_ring_length)
                         LBUG();
 
-                CDEBUG(D_NET, "unlinked %d\n", idx); 
-                service->srv_me_h[idx] = 0; 
+                CDEBUG(D_NET, "unlinked %d\n", idx);
+                ptl_set_inv_handle(&(service->srv_me_h[idx]));
 
                 if (service->srv_ref_count[idx] == 0)
                         ptlrpc_link_svc_me(service, idx); 
index 0e5bb32..22631d7 100644 (file)
@@ -397,9 +397,9 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
 
         if (service->srv_ref_count[index] < 0)
                 LBUG();
-        
+
         if (service->srv_ref_count[index] == 0 &&
-            service->srv_me_h[index] == 0)  {
+            !ptl_is_valid_handle(&(service->srv_me_h[index]))) {
                 CDEBUG(D_NET, "relinking %d\n", index); 
                 ptlrpc_link_svc_me(service, index); 
         }
index 47d2cb0..d9c2b96 100644 (file)
@@ -53,7 +53,7 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc)
         if (svc->srv_flags & SVC_EVENT)
                 LBUG();
 
-        if (svc->srv_eq_h) { 
+        if (ptl_is_valid_handle(&svc->srv_eq_h)) {
                 int err;
                 err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
 
@@ -289,11 +289,11 @@ int rpc_unregister_service(struct ptlrpc_service *service)
         int rc, i;
 
         for (i = 0; i < service->srv_ring_length; i++) {
-                if (service->srv_me_h[i]) { 
+                if (ptl_is_valid_handle(&(service->srv_me_h[i]))) {
                         rc = PtlMEUnlink(service->srv_me_h[i]);
                         if (rc)
                                 CERROR("PtlMEUnlink failed: %d\n", rc);
-                        service->srv_me_h[i] = 0;
+                        ptl_set_inv_handle(&(service->srv_me_h[i]));
                 }
 
                 if (service->srv_buf[i] != NULL)