$(LYX2PDF) $< 2>/dev/null || printf "\n*** Warning: not creating PDF docs; install lyx to rectify this\n"
.fig.eps:
- fig2dev -L eps $< > $@
+ -fig2dev -L eps $< > $@
master.pdf: master.lyx bigpicture.eps intermezzo.eps mds.eps portals-lib.eps client.eps layering.eps metadata.eps sb.eps cow.eps lockacq.eps obdfs.eps snapsetup.eps dirbodyapi.eps loraid.eps ost.eps updates.eps hotmigrate.eps lustreclusters.eps osthw.eps
ELDLM_BLOCK_CONV,
ELDLM_BLOCK_WAIT,
ELDLM_BAD_NAMESPACE,
- ELDLM_RES_CHANGED
+ ELDLM_LOCK_CHANGED
} ldlm_error_t;
-#define LDLM_FL_RES_CHANGED (1 << 0)
+#define LDLM_FL_LOCK_CHANGED (1 << 0)
#define LDLM_FL_COMPLETION_AST (1 << 1)
#define LDLM_FL_BLOCKING_AST (1 << 2)
struct lustre_peer *l_peer;
void *l_data;
__u32 l_data_len;
+ struct ldlm_extent l_extent;
//void *l_event;
//XXX cluster_host l_holder;
__u32 l_version[RES_VERSION_SIZE];
};
-typedef int (*ldlm_res_compat)(struct ldlm_resource *child,
- struct ldlm_resource *new);
-typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, __u64 *res_id_in,
- __u64 *res_id_out, ldlm_mode_t mode, void *data);
+typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new);
+typedef int (*ldlm_res_policy)(struct ldlm_resource *parent,
+ struct ldlm_extent *req_ex,
+ struct ldlm_extent *new_ex,
+ ldlm_mode_t mode, void *data);
#define LDLM_PLAIN 0x0
#define LDLM_EXTENT 0x1
struct list_head lr_waiting;
ldlm_mode_t lr_most_restr;
atomic_t lr_refcount;
+ __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */
struct ldlm_resource *lr_root;
//XXX cluster_host lr_master;
__u64 lr_name[RES_NAME_SIZE];
__u32 lr_version[RES_VERSION_SIZE];
- __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */
spinlock_t lr_lock;
};
-struct ldlm_extent {
- __u64 start;
- __u64 end;
-};
-
static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
{
return (struct ldlm_extent *)(res->lr_name);
extern struct obd_ops ldlm_obd_ops;
/* ldlm_extent.c */
-int ldlm_extent_compat(struct ldlm_resource *, struct ldlm_resource *);
-int ldlm_extent_policy(struct ldlm_resource *, __u64 *, __u64 *,
- ldlm_mode_t, void *);
+int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
+int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *,
+ struct ldlm_extent *, ldlm_mode_t, void *);
/* ldlm_lock.c */
+void ldlm_lock_free(struct ldlm_lock *lock);
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
__u32 ns_id,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
__u32 type,
+ struct ldlm_extent *req_ex,
ldlm_mode_t mode,
int *flags,
ldlm_lock_callback completion,
__u64 cookie;
};
+struct ldlm_extent {
+ __u64 start;
+ __u64 end;
+};
+
struct ldlm_request {
__u32 ns_id;
__u64 res_id[RES_NAME_SIZE];
__u32 res_type;
__u32 flags;
+ struct ldlm_extent lock_extent;
struct ldlm_handle parent_res_handle;
struct ldlm_handle parent_lock_handle;
ldlm_mode_t mode;
*
* This helps to find conflicts between read and write locks on overlapping
* extents. */
-int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new)
+int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
{
- struct ldlm_extent *child_ex, *new_ex;
-
- child_ex = ldlm_res2extent(child);
- new_ex = ldlm_res2extent(new);
-
- if (MAX(child_ex->start, new_ex->start) <=
- MIN(child_ex->end, new_ex->end))
+ if (MAX(a->l_extent.start, b->l_extent.start) <=
+ MIN(a->l_extent.end, b->l_extent.end))
return 0;
return 1;
}
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing extents outside the requested one
- *
- * An alternative policy is to not shrink the new extent when conflicts exist.
- *
- * To reconstruct our formulas, take a deep breath. */
-int ldlm_extent_policy(struct ldlm_resource *parent,
- __u64 *res_id_in, __u64 *res_id_out,
- ldlm_mode_t mode, void *data)
+static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex,
+ struct ldlm_extent *new_ex, ldlm_mode_t mode)
{
- struct ldlm_extent *new_ex, *req_ex;
struct list_head *tmp;
- int rc = 0;
-
- req_ex = (struct ldlm_extent *)res_id_in;
-
- new_ex = (struct ldlm_extent *)res_id_out;
- new_ex->start = 0;
- new_ex->end = ~0;
- list_for_each(tmp, &parent->lr_children) {
- struct ldlm_resource *res;
- struct ldlm_extent *exist_ex;
- res = list_entry(tmp, struct ldlm_resource, lr_childof);
+ list_for_each(tmp, queue) {
+ struct ldlm_lock *lock;
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- exist_ex = ldlm_res2extent(res);
-
- if (exist_ex->end < req_ex->start)
- new_ex->start = MIN(exist_ex->end, new_ex->start);
+ if (lock->l_extent.end < req_ex->start)
+ new_ex->start = MIN(lock->l_extent.end, new_ex->start);
else {
- if (exist_ex->start < req_ex->start &&
- !lockmode_compat(res->lr_most_restr, mode))
+ if (lock->l_extent.start < req_ex->start &&
+ !lockmode_compat(lock->l_req_mode, mode))
/* Policy: minimize conflict overlap */
new_ex->start = req_ex->start;
}
- if (exist_ex->start > req_ex->end)
- new_ex->end = MAX(exist_ex->start, new_ex->end);
+ if (lock->l_extent.start > req_ex->end)
+ new_ex->end = MAX(lock->l_extent.start, new_ex->end);
else {
- if (exist_ex->end > req_ex->end &&
- !lockmode_compat(res->lr_most_restr, mode))
+ if (lock->l_extent.end > req_ex->end &&
+ !lockmode_compat(lock->l_req_mode, mode))
/* Policy: minimize conflict overlap */
new_ex->end = req_ex->end;
}
}
+}
- if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
- rc = ELDLM_RES_CHANGED;
+/* The purpose of this function is to return:
+ * - the maximum extent
+ * - containing the requested extent
+ * - and not overlapping existing extents outside the requested one
+ *
+ * An alternative policy is to not shrink the new extent when conflicts exist.
+ *
+ * To reconstruct our formulas, take a deep breath. */
+int ldlm_extent_policy(struct ldlm_resource *res, struct ldlm_extent *req_ex,
+ struct ldlm_extent *new_ex, ldlm_mode_t mode, void *data)
+{
+ new_ex->start = 0;
+ new_ex->end = ~0;
- return rc;
-}
+ if (!res)
+ LBUG();
+
+ policy_internal(&res->lr_granted, req_ex, new_ex, mode);
+ policy_internal(&res->lr_converting, req_ex, new_ex, mode);
+ policy_internal(&res->lr_waiting, req_ex, new_ex, mode);
+ if (new_ex->end != req_ex->end || new_ex->start != req_ex->start)
+ return ELDLM_LOCK_CHANGED;
+ return 0;
+}
extern kmem_cache_t *ldlm_lock_slab;
+static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
+static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b);
+
ldlm_res_compat ldlm_res_compat_table [] = {
- [LDLM_PLAIN] NULL,
+ [LDLM_PLAIN] ldlm_plain_compat,
[LDLM_EXTENT] ldlm_extent_compat,
- [LDLM_MDSINTENT] NULL
+ [LDLM_MDSINTENT] ldlm_intent_compat
};
ldlm_res_policy ldlm_res_policy_table [] = {
[LDLM_MDSINTENT] NULL
};
+static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
+{
+ return lockmode_compat(a->l_req_mode, b->l_req_mode);
+}
+
+static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
+{
+ LBUG();
+ return 0;
+}
+
static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
struct ldlm_resource *resource,
ldlm_mode_t mode)
return lock;
}
-static int ldlm_notify_incompatible(struct list_head *list,
- struct ldlm_lock *new)
+void ldlm_lock_free(struct ldlm_lock *lock)
+{
+ kmem_cache_free(ldlm_lock_slab, lock);
+}
+
+static int ldlm_lock_compat(struct ldlm_lock *lock)
{
struct list_head *tmp;
int rc = 0;
- list_for_each(tmp, list) {
- struct ldlm_lock *lock;
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
+ list_for_each(tmp, &lock->l_resource->lr_granted) {
+ struct ldlm_lock *child;
+ ldlm_res_compat compat;
+
+ child = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ compat = ldlm_res_compat_table[child->l_resource->lr_type];
+ if (compat(child, lock) ||
+ lockmode_compat(child->l_req_mode, lock->l_req_mode))
continue;
rc = 1;
- if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, new, lock->l_data,
- lock->l_data_len);
+ if (child->l_blocking_ast != NULL)
+ child->l_blocking_ast(child, lock, child->l_data,
+ child->l_data_len);
}
return rc;
}
-static int ldlm_lock_compat(struct ldlm_lock *lock)
-{
- struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
- ldlm_res_compat compat;
-
- if (parent_res &&
- (compat = ldlm_res_compat_table[parent_res->lr_type])) {
- struct list_head *tmp;
- int incompat = 0;
- list_for_each(tmp, &parent_res->lr_children) {
- struct ldlm_resource *child;
- child = list_entry(tmp, struct ldlm_resource,
- lr_childof);
-
- /* compat will return 0 when child == l_resource
- * hence notifications on the same resource are incl. */
- if (compat(child, lock->l_resource))
- continue;
-
- incompat |= ldlm_notify_incompatible(&child->lr_granted,
- lock);
- }
-
- return incompat;
- }
-
- return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
-}
-
static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
{
ldlm_resource_add_lock(res, &res->lr_granted, lock);
lock->l_completion_ast(lock, NULL, NULL, 0);
}
-static int ldlm_reprocess_queue(struct ldlm_lock *lock,
- struct list_head *converting,
- struct list_head *granted_list)
-{
- struct list_head *tmp, *pos;
- int incompat = 0;
-
- list_for_each_safe(tmp, pos, converting) {
- struct ldlm_lock *pending;
- pending = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- incompat = ldlm_lock_compat(pending);
- if (incompat)
- break;
-
- list_del(&pending->l_res_link);
- ldlm_grant_lock(pending->l_resource, pending);
- }
-
- return incompat;
-}
-
/* XXX: Revisit the error handling; we do not, for example, do
- * ldlm_resource_put()s in our error cases, and we probably leak an allocated
+ * ldlm_resource_put()s in our error cases, and we probably leak any allocated
* memory. */
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
__u32 ns_id,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
__u32 type,
+ struct ldlm_extent *req_ex,
ldlm_mode_t mode,
int *flags,
ldlm_lock_callback completion,
struct ldlm_namespace *ns;
struct ldlm_resource *res, *parent_res;
struct ldlm_lock *lock, *parent_lock;
+ struct ldlm_extent new_ex;
int incompat = 0, rc;
- __u64 new_id[RES_NAME_SIZE];
ldlm_res_policy policy;
ENTRY;
-
+
parent_lock = ldlm_handle2object(parent_lock_handle);
- if ( parent_lock )
+ if (parent_lock)
parent_res = parent_lock->l_resource;
else
parent_res = NULL;
if (ns == NULL || ns->ns_hash == NULL)
RETURN(-ELDLM_BAD_NAMESPACE);
- if (parent_res &&
- (policy = ldlm_res_policy_table[parent_res->lr_type])) {
- rc = policy(parent_res, res_id, new_id, mode, NULL);
- if (rc == ELDLM_RES_CHANGED) {
- *flags |= LDLM_FL_RES_CHANGED;
- memcpy(res_id, new_id, sizeof(*new_id));
- }
- }
-
res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
if (res == NULL)
RETURN(-ENOMEM);
if (lock == NULL)
RETURN(-ENOMEM);
+ if ((policy = ldlm_res_policy_table[type])) {
+ rc = policy(res, req_ex, &new_ex, mode, NULL);
+ if (rc == ELDLM_LOCK_CHANGED) {
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ memcpy(req_ex, &new_ex, sizeof(new_ex));
+ }
+ }
+
+ if ((type == LDLM_EXTENT && !req_ex) ||
+ (type != LDLM_EXTENT && req_ex))
+ LBUG();
+ if (req_ex)
+ memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
lock->l_data = data;
lock->l_data_len = data_len;
if ((*flags) & LDLM_FL_COMPLETION_AST)
return rc;
}
-static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
+static int ldlm_reprocess_queue(struct ldlm_resource *res,
+ struct list_head *converting)
{
- struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
- struct list_head *tmp;
- int do_waiting;
-
- list_for_each(tmp, &parent_res->lr_children) {
- struct ldlm_resource *child;
- child = list_entry(tmp, struct ldlm_resource, lr_childof);
-
- ldlm_reprocess_queue(lock, &child->lr_converting,
- &child->lr_granted);
- if (!list_empty(&child->lr_converting))
- do_waiting = 0;
- }
+ struct list_head *tmp, *pos;
+ int incompat = 0;
- if (!do_waiting)
- return;
+ list_for_each_safe(tmp, pos, converting) {
+ struct ldlm_lock *pending;
+ pending = list_entry(tmp, struct ldlm_lock, l_res_link);
- list_for_each(tmp, &parent_res->lr_children) {
- struct ldlm_resource *child;
- child = list_entry(tmp, struct ldlm_resource, lr_childof);
+ incompat = ldlm_lock_compat(pending);
+ if (incompat)
+ break;
- ldlm_reprocess_queue(lock, &child->lr_waiting,
- &child->lr_granted);
+ list_del(&pending->l_res_link);
+ ldlm_grant_lock(res, pending);
}
+
+ return incompat;
}
-static void ldlm_reprocess_all(struct ldlm_lock *lock)
+static void ldlm_reprocess_all(struct ldlm_resource *res)
{
- struct ldlm_resource *res = lock->l_resource;
- struct ldlm_resource *parent_res = res->lr_parent;
-
- if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
- ldlm_reprocess_res_compat(lock);
- return;
- }
-
- ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
+ ldlm_reprocess_queue(res, &res->lr_converting);
if (list_empty(&res->lr_converting))
- ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
+ ldlm_reprocess_queue(res, &res->lr_waiting);
}
ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
ldlm_resource_del_lock(lock);
- kmem_cache_free(ldlm_lock_slab, lock);
+ ldlm_lock_free(lock);
if (ldlm_resource_put(res))
RETURN(ELDLM_OK);
- ldlm_reprocess_all(lock);
+ ldlm_reprocess_all(res);
RETURN(ELDLM_OK);
}
list_add(&lock->l_res_link, res->lr_converting.prev);
- ldlm_reprocess_all(lock);
+ ldlm_reprocess_all(res);
RETURN(ELDLM_OK);
}
&dlm_req->parent_lock_handle,
dlm_req->res_id,
dlm_req->res_type,
+ &dlm_req->lock_extent,
dlm_req->mode,
&dlm_req->flags,
ldlm_client_callback,
RETURN(0);
}
+static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
+{
+ struct list_head *tmp, *pos;
+ int rc = 0;
+
+ list_for_each_safe(tmp, pos, q) {
+ struct ldlm_lock *lock;
+
+ if (rc) {
+ /* Res was already cleaned up. */
+ LBUG();
+ }
+
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ ldlm_resource_del_lock(lock);
+ ldlm_lock_free(lock);
+ rc = ldlm_resource_put(res);
+ }
+
+ return rc;
+}
+
static int do_free_namespace(struct ldlm_namespace *ns)
{
struct list_head *tmp, *pos;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
list_del_init(&res->lr_hash);
- rc = 0;
+ rc = cleanup_resource(res, &res->lr_granted);
+ if (!rc)
+ rc = cleanup_resource(res, &res->lr_converting);
+ if (!rc)
+ rc = cleanup_resource(res, &res->lr_waiting);
+
while (rc == 0)
rc = ldlm_resource_put(res);
}
{
struct ldlm_namespace *ns;
struct ldlm_resource *res;
- __u64 res_id[RES_NAME_SIZE] = {1, 4, 3};
+ __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
ldlm_error_t err;
- struct ldlm_handle h;
+ struct ldlm_handle lockh_1, lockh_2;
int flags;
ldlm_lock(obddev);
/* Get a couple of read locks */
flags = LDLM_FL_BLOCKING_AST;
err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
- LCK_CR, &flags, NULL, ldlm_test_callback,
- NULL, 0, &h);
+ NULL, LCK_CR, &flags, NULL,
+ ldlm_test_callback, NULL, 0, &lockh_1);
if (err != ELDLM_OK)
LBUG();
err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
- LCK_EX, &flags, NULL, ldlm_test_callback,
- NULL, 0, &h);
+ NULL, LCK_EX, &flags, NULL,
+ ldlm_test_callback, NULL, 0, &lockh_2);
if (err != -ELDLM_BLOCK_GRANTED)
LBUG();
ldlm_resource_dump(res);
+ err = ldlm_local_lock_convert(obddev, &lockh_1, LCK_NL, &flags);
+ if (err != ELDLM_OK)
+ LBUG();
+
+ ldlm_resource_dump(res);
+
ldlm_unlock(obddev);
return 0;
{
struct ldlm_namespace *ns;
struct ldlm_resource *res;
- __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0};
- __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3};
- __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3};
- __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3};
+ __u64 res_id[RES_NAME_SIZE] = {0, 0, 0};
+ struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11};
+ struct ldlm_handle ext1_h, ext2_h, ext3_h;
ldlm_error_t err;
- struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h;
int flags;
ldlm_lock(obddev);
if (ns == NULL)
LBUG();
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT,
- LCK_NL, &flags, NULL, NULL, NULL, 0,
- &file_h);
- if (err != ELDLM_OK)
- LBUG();
-
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT,
- LCK_PR, &flags, NULL, NULL, NULL, 0,
- &ext1_h);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+ &ext1, LCK_PR, &flags, NULL, NULL, NULL,
+ 0, &ext1_h);
if (err != ELDLM_OK)
LBUG();
- if (!(flags & LDLM_FL_RES_CHANGED))
+ if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT,
- LCK_PR, &flags, NULL, NULL, NULL, 0,
- &ext2_h);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+ &ext2, LCK_PR, &flags, NULL, NULL, NULL,
+ 0, &ext2_h);
if (err != ELDLM_OK)
LBUG();
- if (!(flags & LDLM_FL_RES_CHANGED))
+ if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT,
- LCK_EX, &flags, NULL, NULL, NULL, 0,
- &ext3_h);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT,
+ &ext3, LCK_EX, &flags, NULL, NULL, NULL,
+ 0, &ext3_h);
if (err != -ELDLM_BLOCK_GRANTED)
LBUG();
- if (flags & LDLM_FL_RES_CHANGED)
+ if (flags & LDLM_FL_LOCK_CHANGED)
LBUG();
/* Convert/cancel blocking locks */
LBUG();
/* Dump the results */
- res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0);
- if (res == NULL)
- LBUG();
- ldlm_resource_dump(res);
- res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0);
- if (res == NULL)
- LBUG();
- ldlm_resource_dump(res);
- res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0);
+ res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
if (res == NULL)
LBUG();
ldlm_resource_dump(res);
service->srv_ref_count[index]++;
- if (ev->unlinked_me != -1) {
+ if (ptl_is_valid_handle(&ev->unlinked_me)) {
int idx;
for (idx = 0; idx < service->srv_ring_length; idx++)
- if (service->srv_me_h[idx] == ev->unlinked_me)
+ if (service->srv_me_h[idx].handle_idx ==
+ ev->unlinked_me.handle_idx)
break;
if (idx == service->srv_ring_length)
LBUG();
- CDEBUG(D_NET, "unlinked %d\n", idx);
- service->srv_me_h[idx] = 0;
+ CDEBUG(D_NET, "unlinked %d\n", idx);
+ ptl_set_inv_handle(&(service->srv_me_h[idx]));
if (service->srv_ref_count[idx] == 0)
ptlrpc_link_svc_me(service, idx);
if (service->srv_ref_count[index] < 0)
LBUG();
-
+
if (service->srv_ref_count[index] == 0 &&
- service->srv_me_h[index] == 0) {
+ !ptl_is_valid_handle(&(service->srv_me_h[index]))) {
CDEBUG(D_NET, "relinking %d\n", index);
ptlrpc_link_svc_me(service, index);
}
if (svc->srv_flags & SVC_EVENT)
LBUG();
- if (svc->srv_eq_h) {
+ if (ptl_is_valid_handle(&svc->srv_eq_h)) {
int err;
err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
int rc, i;
for (i = 0; i < service->srv_ring_length; i++) {
- if (service->srv_me_h[i]) {
+ if (ptl_is_valid_handle(&(service->srv_me_h[i]))) {
rc = PtlMEUnlink(service->srv_me_h[i]);
if (rc)
CERROR("PtlMEUnlink failed: %d\n", rc);
- service->srv_me_h[i] = 0;
+ ptl_set_inv_handle(&(service->srv_me_h[i]));
}
if (service->srv_buf[i] != NULL)