From: pschwan Date: Thu, 4 Apr 2002 04:22:42 +0000 (+0000) Subject: - Don't abort if fig2dev doesn't exist X-Git-Tag: 0.4.2~417 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=d2dfb5a2f8e32e6d347c6854f14ce28e7afbafb7;p=fs%2Flustre-release.git - Don't abort if fig2dev doesn't exist - first chunk of DLM changes for new extents - fixups to work with the new Portals tip --- diff --git a/lustre/doc/Makefile.am b/lustre/doc/Makefile.am index d379eb0..a0c5952 100644 --- a/lustre/doc/Makefile.am +++ b/lustre/doc/Makefile.am @@ -17,7 +17,7 @@ all: master.pdf $(LYX2PDF) $< 2>/dev/null || printf "\n*** Warning: not creating PDF docs; install lyx to rectify this\n" .fig.eps: - fig2dev -L eps $< > $@ + -fig2dev -L eps $< > $@ master.pdf: master.lyx bigpicture.eps intermezzo.eps mds.eps portals-lib.eps client.eps layering.eps metadata.eps sb.eps cow.eps lockacq.eps obdfs.eps snapsetup.eps dirbodyapi.eps loraid.eps ost.eps updates.eps hotmigrate.eps lustreclusters.eps osthw.eps diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 39c6eb1..c720b3a 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -23,10 +23,10 @@ typedef enum { ELDLM_BLOCK_CONV, ELDLM_BLOCK_WAIT, ELDLM_BAD_NAMESPACE, - ELDLM_RES_CHANGED + ELDLM_LOCK_CHANGED } ldlm_error_t; -#define LDLM_FL_RES_CHANGED (1 << 0) +#define LDLM_FL_LOCK_CHANGED (1 << 0) #define LDLM_FL_COMPLETION_AST (1 << 1) #define LDLM_FL_BLOCKING_AST (1 << 2) @@ -111,15 +111,17 @@ struct ldlm_lock { struct lustre_peer *l_peer; void *l_data; __u32 l_data_len; + struct ldlm_extent l_extent; //void *l_event; //XXX cluster_host l_holder; __u32 l_version[RES_VERSION_SIZE]; }; -typedef int (*ldlm_res_compat)(struct ldlm_resource *child, - struct ldlm_resource *new); -typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, __u64 *res_id_in, - __u64 *res_id_out, ldlm_mode_t mode, void *data); +typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new); +typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, + struct ldlm_extent *req_ex, + struct ldlm_extent *new_ex, + ldlm_mode_t mode, void *data); #define LDLM_PLAIN 0x0 #define LDLM_EXTENT 0x1 @@ -142,19 +144,14 @@ struct ldlm_resource { struct list_head lr_waiting; ldlm_mode_t lr_most_restr; atomic_t lr_refcount; + __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */ struct ldlm_resource *lr_root; //XXX cluster_host lr_master; __u64 lr_name[RES_NAME_SIZE]; __u32 lr_version[RES_VERSION_SIZE]; - __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */ spinlock_t lr_lock; }; -struct ldlm_extent { - __u64 start; - __u64 end; -}; - static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) { return (struct ldlm_extent *)(res->lr_name); @@ -185,16 +182,18 @@ static inline void ldlm_unlock(struct obd_device *obddev) extern struct obd_ops ldlm_obd_ops; /* ldlm_extent.c */ -int ldlm_extent_compat(struct ldlm_resource *, struct ldlm_resource *); -int ldlm_extent_policy(struct ldlm_resource *, __u64 *, __u64 *, - ldlm_mode_t, void *); +int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *); +int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *, + struct ldlm_extent *, ldlm_mode_t, void *); /* ldlm_lock.c */ +void ldlm_lock_free(struct ldlm_lock *lock); ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, + struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, ldlm_lock_callback completion, diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index d036a5a..75de35d 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -367,11 +367,17 @@ struct ldlm_handle { __u64 cookie; }; +struct ldlm_extent { + __u64 start; + __u64 end; +}; + struct ldlm_request { __u32 ns_id; __u64 res_id[RES_NAME_SIZE]; __u32 res_type; __u32 flags; + struct ldlm_extent lock_extent; struct ldlm_handle parent_res_handle; struct ldlm_handle parent_lock_handle; ldlm_mode_t mode; diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 2f92a1e..ff5c512 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -30,70 +30,65 @@ * * This helps to find conflicts between read and write locks on overlapping * extents. */ -int ldlm_extent_compat(struct ldlm_resource *child, struct ldlm_resource *new) +int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b) { - struct ldlm_extent *child_ex, *new_ex; - - child_ex = ldlm_res2extent(child); - new_ex = ldlm_res2extent(new); - - if (MAX(child_ex->start, new_ex->start) <= - MIN(child_ex->end, new_ex->end)) + if (MAX(a->l_extent.start, b->l_extent.start) <= + MIN(a->l_extent.end, b->l_extent.end)) return 0; return 1; } -/* The purpose of this function is to return: - * - the maximum extent - * - containing the requested extent - * - and not overlapping existing extents outside the requested one - * - * An alternative policy is to not shrink the new extent when conflicts exist. - * - * To reconstruct our formulas, take a deep breath. */ -int ldlm_extent_policy(struct ldlm_resource *parent, - __u64 *res_id_in, __u64 *res_id_out, - ldlm_mode_t mode, void *data) +static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex, + struct ldlm_extent *new_ex, ldlm_mode_t mode) { - struct ldlm_extent *new_ex, *req_ex; struct list_head *tmp; - int rc = 0; - - req_ex = (struct ldlm_extent *)res_id_in; - - new_ex = (struct ldlm_extent *)res_id_out; - new_ex->start = 0; - new_ex->end = ~0; - list_for_each(tmp, &parent->lr_children) { - struct ldlm_resource *res; - struct ldlm_extent *exist_ex; - res = list_entry(tmp, struct ldlm_resource, lr_childof); + list_for_each(tmp, queue) { + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); - exist_ex = ldlm_res2extent(res); - - if (exist_ex->end < req_ex->start) - new_ex->start = MIN(exist_ex->end, new_ex->start); + if (lock->l_extent.end < req_ex->start) + new_ex->start = MIN(lock->l_extent.end, new_ex->start); else { - if (exist_ex->start < req_ex->start && - !lockmode_compat(res->lr_most_restr, mode)) + if (lock->l_extent.start < req_ex->start && + !lockmode_compat(lock->l_req_mode, mode)) /* Policy: minimize conflict overlap */ new_ex->start = req_ex->start; } - if (exist_ex->start > req_ex->end) - new_ex->end = MAX(exist_ex->start, new_ex->end); + if (lock->l_extent.start > req_ex->end) + new_ex->end = MAX(lock->l_extent.start, new_ex->end); else { - if (exist_ex->end > req_ex->end && - !lockmode_compat(res->lr_most_restr, mode)) + if (lock->l_extent.end > req_ex->end && + !lockmode_compat(lock->l_req_mode, mode)) /* Policy: minimize conflict overlap */ new_ex->end = req_ex->end; } } +} - if (new_ex->end != req_ex->end || new_ex->start != req_ex->start) - rc = ELDLM_RES_CHANGED; +/* The purpose of this function is to return: + * - the maximum extent + * - containing the requested extent + * - and not overlapping existing extents outside the requested one + * + * An alternative policy is to not shrink the new extent when conflicts exist. + * + * To reconstruct our formulas, take a deep breath. */ +int ldlm_extent_policy(struct ldlm_resource *res, struct ldlm_extent *req_ex, + struct ldlm_extent *new_ex, ldlm_mode_t mode, void *data) +{ + new_ex->start = 0; + new_ex->end = ~0; - return rc; -} + if (!res) + LBUG(); + + policy_internal(&res->lr_granted, req_ex, new_ex, mode); + policy_internal(&res->lr_converting, req_ex, new_ex, mode); + policy_internal(&res->lr_waiting, req_ex, new_ex, mode); + if (new_ex->end != req_ex->end || new_ex->start != req_ex->start) + return ELDLM_LOCK_CHANGED; + return 0; +} diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 137df0d..286e1ad 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -27,10 +27,13 @@ extern kmem_cache_t *ldlm_lock_slab; +static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b); +static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b); + ldlm_res_compat ldlm_res_compat_table [] = { - [LDLM_PLAIN] NULL, + [LDLM_PLAIN] ldlm_plain_compat, [LDLM_EXTENT] ldlm_extent_compat, - [LDLM_MDSINTENT] NULL + [LDLM_MDSINTENT] ldlm_intent_compat }; ldlm_res_policy ldlm_res_policy_table [] = { @@ -39,6 +42,17 @@ ldlm_res_policy ldlm_res_policy_table [] = { [LDLM_MDSINTENT] NULL }; +static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b) +{ + return lockmode_compat(a->l_req_mode, b->l_req_mode); +} + +static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b) +{ + LBUG(); + return 0; +} + static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, struct ldlm_resource *resource, ldlm_mode_t mode) @@ -65,57 +79,37 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, return lock; } -static int ldlm_notify_incompatible(struct list_head *list, - struct ldlm_lock *new) +void ldlm_lock_free(struct ldlm_lock *lock) +{ + kmem_cache_free(ldlm_lock_slab, lock); +} + +static int ldlm_lock_compat(struct ldlm_lock *lock) { struct list_head *tmp; int rc = 0; - list_for_each(tmp, list) { - struct ldlm_lock *lock; - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (lockmode_compat(lock->l_req_mode, new->l_req_mode)) + list_for_each(tmp, &lock->l_resource->lr_granted) { + struct ldlm_lock *child; + ldlm_res_compat compat; + + child = list_entry(tmp, struct ldlm_lock, l_res_link); + + compat = ldlm_res_compat_table[child->l_resource->lr_type]; + if (compat(child, lock) || + lockmode_compat(child->l_req_mode, lock->l_req_mode)) continue; rc = 1; - if (lock->l_blocking_ast != NULL) - lock->l_blocking_ast(lock, new, lock->l_data, - lock->l_data_len); + if (child->l_blocking_ast != NULL) + child->l_blocking_ast(child, lock, child->l_data, + child->l_data_len); } return rc; } -static int ldlm_lock_compat(struct ldlm_lock *lock) -{ - struct ldlm_resource *parent_res = lock->l_resource->lr_parent; - ldlm_res_compat compat; - - if (parent_res && - (compat = ldlm_res_compat_table[parent_res->lr_type])) { - struct list_head *tmp; - int incompat = 0; - list_for_each(tmp, &parent_res->lr_children) { - struct ldlm_resource *child; - child = list_entry(tmp, struct ldlm_resource, - lr_childof); - - /* compat will return 0 when child == l_resource - * hence notifications on the same resource are incl. */ - if (compat(child, lock->l_resource)) - continue; - - incompat |= ldlm_notify_incompatible(&child->lr_granted, - lock); - } - - return incompat; - } - - return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock); -} - static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) { ldlm_resource_add_lock(res, &res->lr_granted, lock); @@ -128,36 +122,15 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) lock->l_completion_ast(lock, NULL, NULL, 0); } -static int ldlm_reprocess_queue(struct ldlm_lock *lock, - struct list_head *converting, - struct list_head *granted_list) -{ - struct list_head *tmp, *pos; - int incompat = 0; - - list_for_each_safe(tmp, pos, converting) { - struct ldlm_lock *pending; - pending = list_entry(tmp, struct ldlm_lock, l_res_link); - - incompat = ldlm_lock_compat(pending); - if (incompat) - break; - - list_del(&pending->l_res_link); - ldlm_grant_lock(pending->l_resource, pending); - } - - return incompat; -} - /* XXX: Revisit the error handling; we do not, for example, do - * ldlm_resource_put()s in our error cases, and we probably leak an allocated + * ldlm_resource_put()s in our error cases, and we probably leak any allocated * memory. */ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, + struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, ldlm_lock_callback completion, @@ -169,14 +142,14 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, struct ldlm_namespace *ns; struct ldlm_resource *res, *parent_res; struct ldlm_lock *lock, *parent_lock; + struct ldlm_extent new_ex; int incompat = 0, rc; - __u64 new_id[RES_NAME_SIZE]; ldlm_res_policy policy; ENTRY; - + parent_lock = ldlm_handle2object(parent_lock_handle); - if ( parent_lock ) + if (parent_lock) parent_res = parent_lock->l_resource; else parent_res = NULL; @@ -185,15 +158,6 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, if (ns == NULL || ns->ns_hash == NULL) RETURN(-ELDLM_BAD_NAMESPACE); - if (parent_res && - (policy = ldlm_res_policy_table[parent_res->lr_type])) { - rc = policy(parent_res, res_id, new_id, mode, NULL); - if (rc == ELDLM_RES_CHANGED) { - *flags |= LDLM_FL_RES_CHANGED; - memcpy(res_id, new_id, sizeof(*new_id)); - } - } - res = ldlm_resource_get(ns, parent_res, res_id, type, 1); if (res == NULL) RETURN(-ENOMEM); @@ -202,6 +166,19 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, if (lock == NULL) RETURN(-ENOMEM); + if ((policy = ldlm_res_policy_table[type])) { + rc = policy(res, req_ex, &new_ex, mode, NULL); + if (rc == ELDLM_LOCK_CHANGED) { + *flags |= LDLM_FL_LOCK_CHANGED; + memcpy(req_ex, &new_ex, sizeof(new_ex)); + } + } + + if ((type == LDLM_EXTENT && !req_ex) || + (type != LDLM_EXTENT && req_ex)) + LBUG(); + if (req_ex) + memcpy(&lock->l_extent, req_ex, sizeof(*req_ex)); lock->l_data = data; lock->l_data_len = data_len; if ((*flags) & LDLM_FL_COMPLETION_AST) @@ -236,47 +213,32 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, return rc; } -static void ldlm_reprocess_res_compat(struct ldlm_lock *lock) +static int ldlm_reprocess_queue(struct ldlm_resource *res, + struct list_head *converting) { - struct ldlm_resource *parent_res = lock->l_resource->lr_parent; - struct list_head *tmp; - int do_waiting; - - list_for_each(tmp, &parent_res->lr_children) { - struct ldlm_resource *child; - child = list_entry(tmp, struct ldlm_resource, lr_childof); - - ldlm_reprocess_queue(lock, &child->lr_converting, - &child->lr_granted); - if (!list_empty(&child->lr_converting)) - do_waiting = 0; - } + struct list_head *tmp, *pos; + int incompat = 0; - if (!do_waiting) - return; + list_for_each_safe(tmp, pos, converting) { + struct ldlm_lock *pending; + pending = list_entry(tmp, struct ldlm_lock, l_res_link); - list_for_each(tmp, &parent_res->lr_children) { - struct ldlm_resource *child; - child = list_entry(tmp, struct ldlm_resource, lr_childof); + incompat = ldlm_lock_compat(pending); + if (incompat) + break; - ldlm_reprocess_queue(lock, &child->lr_waiting, - &child->lr_granted); + list_del(&pending->l_res_link); + ldlm_grant_lock(res, pending); } + + return incompat; } -static void ldlm_reprocess_all(struct ldlm_lock *lock) +static void ldlm_reprocess_all(struct ldlm_resource *res) { - struct ldlm_resource *res = lock->l_resource; - struct ldlm_resource *parent_res = res->lr_parent; - - if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) { - ldlm_reprocess_res_compat(lock); - return; - } - - ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted); + ldlm_reprocess_queue(res, &res->lr_converting); if (list_empty(&res->lr_converting)) - ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted); + ldlm_reprocess_queue(res, &res->lr_waiting); } ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, @@ -291,10 +253,10 @@ ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, ldlm_resource_del_lock(lock); - kmem_cache_free(ldlm_lock_slab, lock); + ldlm_lock_free(lock); if (ldlm_resource_put(res)) RETURN(ELDLM_OK); - ldlm_reprocess_all(lock); + ldlm_reprocess_all(res); RETURN(ELDLM_OK); } @@ -314,7 +276,7 @@ ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev, list_add(&lock->l_res_link, res->lr_converting.prev); - ldlm_reprocess_all(lock); + ldlm_reprocess_all(res); RETURN(ELDLM_OK); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 2628812..5dd68e9 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -60,6 +60,7 @@ static int ldlm_enqueue(struct ptlrpc_request *req) &dlm_req->parent_lock_handle, dlm_req->res_id, dlm_req->res_type, + &dlm_req->lock_extent, dlm_req->mode, &dlm_req->flags, ldlm_client_callback, @@ -204,6 +205,29 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) RETURN(0); } +static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) +{ + struct list_head *tmp, *pos; + int rc = 0; + + list_for_each_safe(tmp, pos, q) { + struct ldlm_lock *lock; + + if (rc) { + /* Res was already cleaned up. */ + LBUG(); + } + + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + ldlm_resource_del_lock(lock); + ldlm_lock_free(lock); + rc = ldlm_resource_put(res); + } + + return rc; +} + static int do_free_namespace(struct ldlm_namespace *ns) { struct list_head *tmp, *pos; @@ -215,7 +239,12 @@ static int do_free_namespace(struct ldlm_namespace *ns) res = list_entry(tmp, struct ldlm_resource, lr_hash); list_del_init(&res->lr_hash); - rc = 0; + rc = cleanup_resource(res, &res->lr_granted); + if (!rc) + rc = cleanup_resource(res, &res->lr_converting); + if (!rc) + rc = cleanup_resource(res, &res->lr_waiting); + while (rc == 0) rc = ldlm_resource_put(res); } diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 915b824..4b185bd 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -34,9 +34,9 @@ int ldlm_test_basics(struct obd_device *obddev) { struct ldlm_namespace *ns; struct ldlm_resource *res; - __u64 res_id[RES_NAME_SIZE] = {1, 4, 3}; + __u64 res_id[RES_NAME_SIZE] = {1, 2, 3}; ldlm_error_t err; - struct ldlm_handle h; + struct ldlm_handle lockh_1, lockh_2; int flags; ldlm_lock(obddev); @@ -52,19 +52,25 @@ int ldlm_test_basics(struct obd_device *obddev) /* Get a couple of read locks */ flags = LDLM_FL_BLOCKING_AST; err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, - LCK_CR, &flags, NULL, ldlm_test_callback, - NULL, 0, &h); + NULL, LCK_CR, &flags, NULL, + ldlm_test_callback, NULL, 0, &lockh_1); if (err != ELDLM_OK) LBUG(); err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, - LCK_EX, &flags, NULL, ldlm_test_callback, - NULL, 0, &h); + NULL, LCK_EX, &flags, NULL, + ldlm_test_callback, NULL, 0, &lockh_2); if (err != -ELDLM_BLOCK_GRANTED) LBUG(); ldlm_resource_dump(res); + err = ldlm_local_lock_convert(obddev, &lockh_1, LCK_NL, &flags); + if (err != ELDLM_OK) + LBUG(); + + ldlm_resource_dump(res); + ldlm_unlock(obddev); return 0; @@ -74,12 +80,10 @@ int ldlm_test_extents(struct obd_device *obddev) { struct ldlm_namespace *ns; struct ldlm_resource *res; - __u64 file_res_id[RES_NAME_SIZE] = {0, 0, 0}; - __u64 res_ext1[RES_NAME_SIZE] = {4, 6, 3}; - __u64 res_ext2[RES_NAME_SIZE] = {6, 9, 3}; - __u64 res_ext3[RES_NAME_SIZE] = {10, 11, 3}; + __u64 res_id[RES_NAME_SIZE] = {0, 0, 0}; + struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11}; + struct ldlm_handle ext1_h, ext2_h, ext3_h; ldlm_error_t err; - struct ldlm_handle file_h, ext1_h, ext2_h, ext3_h; int flags; ldlm_lock(obddev); @@ -88,37 +92,31 @@ int ldlm_test_extents(struct obd_device *obddev) if (ns == NULL) LBUG(); - err = ldlm_local_lock_enqueue(obddev, 1, NULL, file_res_id, LDLM_EXTENT, - LCK_NL, &flags, NULL, NULL, NULL, 0, - &file_h); - if (err != ELDLM_OK) - LBUG(); - flags = 0; - err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext1, LDLM_EXTENT, - LCK_PR, &flags, NULL, NULL, NULL, 0, - &ext1_h); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT, + &ext1, LCK_PR, &flags, NULL, NULL, NULL, + 0, &ext1_h); if (err != ELDLM_OK) LBUG(); - if (!(flags & LDLM_FL_RES_CHANGED)) + if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext2, LDLM_EXTENT, - LCK_PR, &flags, NULL, NULL, NULL, 0, - &ext2_h); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT, + &ext2, LCK_PR, &flags, NULL, NULL, NULL, + 0, &ext2_h); if (err != ELDLM_OK) LBUG(); - if (!(flags & LDLM_FL_RES_CHANGED)) + if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_enqueue(obddev, 1, &file_h, res_ext3, LDLM_EXTENT, - LCK_EX, &flags, NULL, NULL, NULL, 0, - &ext3_h); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_EXTENT, + &ext3, LCK_EX, &flags, NULL, NULL, NULL, + 0, &ext3_h); if (err != -ELDLM_BLOCK_GRANTED) LBUG(); - if (flags & LDLM_FL_RES_CHANGED) + if (flags & LDLM_FL_LOCK_CHANGED) LBUG(); /* Convert/cancel blocking locks */ @@ -133,15 +131,7 @@ int ldlm_test_extents(struct obd_device *obddev) LBUG(); /* Dump the results */ - res = ldlm_resource_get(ns, NULL, file_res_id, LDLM_EXTENT, 0); - if (res == NULL) - LBUG(); - ldlm_resource_dump(res); - res = ldlm_resource_get(ns, NULL, res_ext1, LDLM_EXTENT, 0); - if (res == NULL) - LBUG(); - ldlm_resource_dump(res); - res = ldlm_resource_get(ns, NULL, res_ext2, LDLM_EXTENT, 0); + res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0); if (res == NULL) LBUG(); ldlm_resource_dump(res); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 3289447..17f1a1d 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -93,17 +93,18 @@ int server_request_callback(ptl_event_t *ev, void *data) service->srv_ref_count[index]++; - if (ev->unlinked_me != -1) { + if (ptl_is_valid_handle(&ev->unlinked_me)) { int idx; for (idx = 0; idx < service->srv_ring_length; idx++) - if (service->srv_me_h[idx] == ev->unlinked_me) + if (service->srv_me_h[idx].handle_idx == + ev->unlinked_me.handle_idx) break; if (idx == service->srv_ring_length) LBUG(); - CDEBUG(D_NET, "unlinked %d\n", idx); - service->srv_me_h[idx] = 0; + CDEBUG(D_NET, "unlinked %d\n", idx); + ptl_set_inv_handle(&(service->srv_me_h[idx])); if (service->srv_ref_count[idx] == 0) ptlrpc_link_svc_me(service, idx); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 0e5bb32..22631d7 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -397,9 +397,9 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) if (service->srv_ref_count[index] < 0) LBUG(); - + if (service->srv_ref_count[index] == 0 && - service->srv_me_h[index] == 0) { + !ptl_is_valid_handle(&(service->srv_me_h[index]))) { CDEBUG(D_NET, "relinking %d\n", index); ptlrpc_link_svc_me(service, index); } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 47d2cb0..d9c2b96 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -53,7 +53,7 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc) if (svc->srv_flags & SVC_EVENT) LBUG(); - if (svc->srv_eq_h) { + if (ptl_is_valid_handle(&svc->srv_eq_h)) { int err; err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev); @@ -289,11 +289,11 @@ int rpc_unregister_service(struct ptlrpc_service *service) int rc, i; for (i = 0; i < service->srv_ring_length; i++) { - if (service->srv_me_h[i]) { + if (ptl_is_valid_handle(&(service->srv_me_h[i]))) { rc = PtlMEUnlink(service->srv_me_h[i]); if (rc) CERROR("PtlMEUnlink failed: %d\n", rc); - service->srv_me_h[i] = 0; + ptl_set_inv_handle(&(service->srv_me_h[i])); } if (service->srv_buf[i] != NULL)