From 404dd95cb8369e1400f7b83094889736123b82c6 Mon Sep 17 00:00:00 2001 From: wangdi Date: Thu, 15 Sep 2005 17:19:21 +0000 Subject: [PATCH] Branch: HEAD land posix flock patch of Don. --- lustre/include/linux/lustre_dlm.h | 31 ++- lustre/include/linux/lustre_idl.h | 16 +- lustre/include/linux/lustre_lib.h | 36 ++- lustre/ldlm/l_lock.c | 11 +- lustre/ldlm/ldlm_flock.c | 531 ++++++++++++++++++++++++++------------ lustre/ldlm/ldlm_lock.c | 2 - lustre/ldlm/ldlm_lockd.c | 2 + lustre/ldlm/ldlm_request.c | 6 +- lustre/llite/file.c | 117 +++++++-- lustre/mds/handler.c | 5 + lustre/ptlrpc/lproc_ptlrpc.c | 101 ++++---- lustre/ptlrpc/pack_generic.c | 2 + lustre/utils/wirecheck.c | 4 +- lustre/utils/wiretest.c | 18 +- 14 files changed, 594 insertions(+), 288 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index adfae7f..5894b24 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -67,12 +67,15 @@ typedef enum { #define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */ #define LDLM_FL_CONFIG_CHANGE 0x020000 /* see ldlm_cli_cancel_unused */ -#define LDLM_FL_NO_TIMEOUT 0x020000 /* Blocked by group lock - wait +#define LDLM_FL_NO_TIMEOUT 0x040000 /* Blocked by group lock - wait * indefinitely */ /* file & record locking */ -#define LDLM_FL_BLOCK_NOWAIT 0x040000 /* server told not to wait if blocked */ -#define LDLM_FL_TEST_LOCK 0x080000 /* return blocking lock */ +#define LDLM_FL_BLOCK_NOWAIT 0x080000 /* server told not to wait if blocked */ +#define LDLM_FL_TEST_LOCK 0x100000 /* return blocking lock */ +#define LDLM_FL_GET_BLOCKING 0x200000 /* return updated blocking proc info */ +#define LDLM_FL_DEADLOCK_CHK 0x400000 /* check for deadlock */ +#define LDLM_FL_DEADLOCK_DEL 0x800000 /* lock no longer blocked */ /* These are flags that are mapped into the flags and ASTs of blocking locks */ #define LDLM_AST_DISCARD_DATA 0x80000000 /* Add FL_DISCARD to blocking ASTs */ @@ -418,9 +421,9 @@ do { \ if (lock->l_resource->lr_type == LDLM_FLOCK) { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ - "res: "LPU64"/"LPU64"/"LPU64" rrc: %d type: %s pid: " \ - LPU64" " "["LPU64"->"LPU64"] flags: %x remote: "LPX64 \ - " expref: %d pid: %u\n" , ## a, \ + "res: "LPU64"/"LPU64"/"LPU64" rrc: %d type: %s " \ + "pid: "LPU64" nid: "LPU64" ["LPU64"->"LPU64"] " \ + "flags: %x remote: "LPX64" expref: %d pid: %u\n", ## a,\ lock->l_resource->lr_namespace->ns_name, lock, \ lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \ lock->l_readers, lock->l_writers, \ @@ -432,6 +435,7 @@ do { \ atomic_read(&lock->l_resource->lr_refcount), \ ldlm_typename[lock->l_resource->lr_type], \ lock->l_policy_data.l_flock.pid, \ + lock->l_policy_data.l_flock.nid, \ lock->l_policy_data.l_flock.start, \ lock->l_policy_data.l_flock.end, \ lock->l_flags, lock->l_remote_handle.cookie, \ @@ -523,6 +527,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *, struct ldlm_res_id *, /* ldlm_flock.c */ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data); +int ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req); /* ldlm_extent.c */ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms); @@ -683,6 +688,20 @@ static inline void check_res_locked(struct ldlm_resource *res) LASSERT_SPIN_LOCKED(&res->lr_lock); } +static inline void lock_bitlock(struct ldlm_lock *lock) +{ + bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); + LASSERT(lock->l_pidb == 0); + lock->l_pidb = current->pid; +} + +static inline void unlock_bitlock(struct ldlm_lock *lock) +{ + LASSERT(lock->l_pidb == current->pid); + lock->l_pidb = 0; + bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); +} + struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock); void unlock_res_and_lock(struct ldlm_lock *lock); diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 3b0e1ea..052debc 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -901,12 +901,13 @@ struct lmv_desc { */ /* opcodes -- MUST be distinct from OST/MDS opcodes */ typedef enum { - LDLM_ENQUEUE = 101, - LDLM_CONVERT = 102, - LDLM_CANCEL = 103, - LDLM_BL_CALLBACK = 104, - LDLM_CP_CALLBACK = 105, - LDLM_GL_CALLBACK = 106, + LDLM_ENQUEUE = 101, + LDLM_CONVERT = 102, + LDLM_CANCEL = 103, + LDLM_BL_CALLBACK = 104, + LDLM_CP_CALLBACK = 105, + LDLM_GL_CALLBACK = 106, + LDLM_FLK_DEADLOCK_CHK = 107, LDLM_LAST_OPC } ldlm_cmd_t; #define LDLM_FIRST_OPC LDLM_ENQUEUE @@ -942,8 +943,9 @@ struct ldlm_flock { __u64 start; __u64 end; __u64 pid; + __u64 nid; __u64 blocking_pid; - __u64 blocking_export; + __u64 blocking_nid; }; /* it's important that the fields of the ldlm_extent structure match diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 3c9f081..bfb480b 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -615,6 +615,22 @@ do { \ set_current_state(TASK_INTERRUPTIBLE); \ if (condition) \ break; \ + if (signal_pending(current)) { \ + if (!info->lwi_timeout || __timed_out) { \ + break; \ + } else { \ + /* We have to do this here because some signals */ \ + /* are not blockable - ie from strace(1). */ \ + /* In these cases we want to schedule_timeout() */ \ + /* again, because we don't want that to return */ \ + /* -EINTR when the RPC actually succeeded. */ \ + /* the RECALC_SIGPENDING below will deliver the */ \ + /* signal properly. */ \ + SIGNAL_MASK_LOCK(current, irqflags); \ + CLEAR_SIGPENDING; \ + SIGNAL_MASK_UNLOCK(current, irqflags); \ + } \ + } \ if (info->lwi_timeout && !__timed_out) { \ timeout_remaining = schedule_timeout(timeout_remaining); \ if (timeout_remaining == 0) { \ @@ -631,24 +647,6 @@ do { \ } else { \ schedule(); \ } \ - if (condition) \ - break; \ - if (signal_pending(current)) { \ - if (__timed_out) { \ - break; \ - } else { \ - /* We have to do this here because some signals */ \ - /* are not blockable - ie from strace(1). */ \ - /* In these cases we want to schedule_timeout() */ \ - /* again, because we don't want that to return */ \ - /* -EINTR when the RPC actually succeeded. */ \ - /* the RECALC_SIGPENDING below will deliver the */ \ - /* signal properly. */ \ - SIGNAL_MASK_LOCK(current, irqflags); \ - CLEAR_SIGPENDING; \ - SIGNAL_MASK_UNLOCK(current, irqflags); \ - } \ - } \ } \ \ SIGNAL_MASK_LOCK(current, irqflags); \ @@ -656,7 +654,7 @@ do { \ RECALC_SIGPENDING; \ SIGNAL_MASK_UNLOCK(current, irqflags); \ \ - if (__timed_out && signal_pending(current)) { \ + if ((!info->lwi_timeout || __timed_out) && signal_pending(current)) { \ if (info->lwi_on_signal) \ info->lwi_on_signal(info->lwi_cb_data); \ ret = -EINTR; \ diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index fb41ccb..b652097 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -64,21 +64,12 @@ struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock) return res; } - bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); - LASSERT(lock->l_pidb == 0); + lock_bitlock(lock); res = lock->l_resource; - lock->l_pidb = current->pid; lock_res(res); return res; } -void unlock_bitlock(struct ldlm_lock *lock) -{ - LASSERT(lock->l_pidb == current->pid); - lock->l_pidb = 0; - bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); -} - void unlock_res_and_lock(struct ldlm_lock *lock) { struct ldlm_resource *res = lock->l_resource; diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index a86c021..cbeacfa 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -21,6 +21,13 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +/* + * 2003 - 2005 Copyright, Hewlett-Packard Development Compnay, LP. + * + * Developed under the sponsorship of the U.S. Government + * under Subcontract No. B514193 + */ + #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -35,12 +42,8 @@ #include "ldlm_internal.h" -#define l_flock_waitq l_lru - static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq); - -int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag); +static int ldlm_deadlock_timeout = 30 * HZ; /** * list_for_remaining_safe - iterate over the remaining entries in a list @@ -58,7 +61,8 @@ ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new) { return((new->l_policy_data.l_flock.pid == lock->l_policy_data.l_flock.pid) && - (new->l_export == lock->l_export)); + (new->l_policy_data.l_flock.nid == + lock->l_policy_data.l_flock.nid)); } static inline int @@ -78,10 +82,12 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)", mode, flags); - LASSERT(list_empty(&lock->l_flock_waitq)); + /* don't need to take the locks here because the lock + * is on a local destroy list, not the resource list. */ list_del_init(&lock->l_res_link); + if (flags == LDLM_FL_WAIT_NOREPROC) { - /* client side - set a flag to prevent sending a CANCEL */ + /* client side - set flags to prevent sending a CANCEL */ lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; ldlm_lock_decref_internal(lock, mode); } @@ -90,73 +96,44 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) EXIT; } -static int -ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock) -{ - struct obd_export *req_export = req->l_export; - struct obd_export *blocking_export = blocking_lock->l_export; - pid_t req_pid = req->l_policy_data.l_flock.pid; - pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid; - struct ldlm_lock *lock; - -restart: - list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) { - if ((lock->l_policy_data.l_flock.pid != blocking_pid) || - (lock->l_export != blocking_export)) - continue; - - blocking_pid = lock->l_policy_data.l_flock.blocking_pid; - blocking_export = (struct obd_export *)(long) - lock->l_policy_data.l_flock.blocking_export; - if (blocking_pid == req_pid && blocking_export == req_export) - return 1; - - goto restart; - } - - return 0; -} - int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, ldlm_error_t *err, struct list_head *work_list) { + struct list_head destroy_list = LIST_HEAD_INIT(destroy_list); struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; - struct list_head *tmp; - struct list_head *ownlocks = NULL; - struct ldlm_lock *lock = NULL; + struct list_head *pos; + struct list_head *tmp = NULL; + struct ldlm_lock *lock; struct ldlm_lock *new = req; - struct ldlm_lock *new2 = NULL; + struct ldlm_lock *new2; ldlm_mode_t mode = req->l_req_mode; - int local = ns->ns_client; int added = (mode == LCK_NL); int overlaps = 0; + int rc = LDLM_ITER_CONTINUE; + int i = 0; ENTRY; - CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end " - LPU64"\n", *flags, (unsigned int)new->l_policy_data.l_flock.pid, - mode, req->l_policy_data.l_flock.start, + CDEBUG(D_DLMTRACE, "flags %#x mode %u pid "LPU64" nid "LPU64" " + "start "LPU64" end "LPU64"\n", *flags, mode, + req->l_policy_data.l_flock.pid, + req->l_policy_data.l_flock.nid, + req->l_policy_data.l_flock.start, req->l_policy_data.l_flock.end); *err = ELDLM_OK; - if (local) { - /* No blocking ASTs are sent to the clients for - * Posix file & record locks */ - req->l_blocking_ast = NULL; - } else { - /* Called on the server for lock cancels. */ - req->l_blocking_ast = ldlm_flock_blocking_ast; - } + /* No blocking ASTs are sent for Posix file & record locks */ + req->l_blocking_ast = NULL; if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { /* This loop determines where this processes locks start * in the resource lr_granted list. */ - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + list_for_each(pos, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (ldlm_same_flock_owner(lock, req)) { - ownlocks = tmp; + tmp = pos; break; } } @@ -165,12 +142,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* This loop determines if there are existing locks * that conflict with the new lock request. */ - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + list_for_each(pos, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (ldlm_same_flock_owner(lock, req)) { - if (!ownlocks) - ownlocks = tmp; + if (!tmp) + tmp = pos; continue; } @@ -181,42 +158,39 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, if (!ldlm_flocks_overlap(lock, req)) continue; + /* deadlock detection will be done will be postponed + * until ldlm_flock_completion_ast(). */ + + *flags |= LDLM_FL_LOCK_CHANGED; + + req->l_policy_data.l_flock.blocking_pid = + lock->l_policy_data.l_flock.pid; + req->l_policy_data.l_flock.blocking_nid = + lock->l_policy_data.l_flock.nid; + if (!first_enq) RETURN(LDLM_ITER_CONTINUE); if (*flags & LDLM_FL_BLOCK_NOWAIT) { - ldlm_flock_destroy(req, mode, *flags); + list_move(&req->l_res_link, &destroy_list); *err = -EAGAIN; - RETURN(LDLM_ITER_STOP); + GOTO(out, rc = LDLM_ITER_STOP); } if (*flags & LDLM_FL_TEST_LOCK) { - ldlm_flock_destroy(req, mode, *flags); req->l_req_mode = lock->l_granted_mode; req->l_policy_data.l_flock.pid = lock->l_policy_data.l_flock.pid; + req->l_policy_data.l_flock.nid = + lock->l_policy_data.l_flock.nid; req->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; req->l_policy_data.l_flock.end = lock->l_policy_data.l_flock.end; - *flags |= LDLM_FL_LOCK_CHANGED; - RETURN(LDLM_ITER_STOP); + list_move(&req->l_res_link, &destroy_list); + GOTO(out, rc = LDLM_ITER_STOP); } - if (ldlm_flock_deadlock(req, lock)) { - ldlm_flock_destroy(req, mode, *flags); - *err = -EDEADLK; - RETURN(LDLM_ITER_STOP); - } - - req->l_policy_data.l_flock.blocking_pid = - lock->l_policy_data.l_flock.pid; - req->l_policy_data.l_flock.blocking_export = - (long)(void *)lock->l_export; - - LASSERT(list_empty(&req->l_flock_waitq)); - list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq); - ldlm_resource_add_lock(res, &res->lr_waiting, req); *flags |= LDLM_FL_BLOCK_GRANTED; RETURN(LDLM_ITER_STOP); @@ -224,24 +198,18 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } if (*flags & LDLM_FL_TEST_LOCK) { - ldlm_flock_destroy(req, mode, *flags); req->l_req_mode = LCK_NL; *flags |= LDLM_FL_LOCK_CHANGED; - RETURN(LDLM_ITER_STOP); + list_move(&req->l_res_link, &destroy_list); + GOTO(out, rc = LDLM_ITER_STOP); } - /* In case we had slept on this lock request take it off of the - * deadlock detection waitq. */ - list_del_init(&req->l_flock_waitq); - /* Scan the locks owned by this process that overlap this request. * We may have to merge or split existing locks. */ + pos = (tmp != NULL) ? tmp : &res->lr_granted; - if (!ownlocks) - ownlocks = &res->lr_granted; - - list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) { - lock = list_entry(ownlocks, struct ldlm_lock, l_res_link); + list_for_remaining_safe(pos, tmp, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (!ldlm_same_flock_owner(lock, new)) break; @@ -280,7 +248,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } if (added) { - ldlm_flock_destroy(lock, mode, *flags); + list_move(&lock->l_res_link, &destroy_list); } else { new = lock; added = 1; @@ -306,7 +274,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, new->l_policy_data.l_flock.end + 1; break; } - ldlm_flock_destroy(lock, lock->l_req_mode, *flags); + list_move(&lock->l_res_link, &destroy_list); continue; } if (new->l_policy_data.l_flock.end >= @@ -332,14 +300,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); if (!new2) { - ldlm_flock_destroy(req, lock->l_granted_mode, *flags); + list_move(&req->l_res_link, &destroy_list); *err = -ENOLCK; - RETURN(LDLM_ITER_STOP); + GOTO(out, rc = LDLM_ITER_STOP); } new2->l_granted_mode = lock->l_granted_mode; new2->l_policy_data.l_flock.pid = new->l_policy_data.l_flock.pid; + new2->l_policy_data.l_flock.nid = + new->l_policy_data.l_flock.nid; new2->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; new2->l_policy_data.l_flock.end = @@ -353,10 +323,11 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, &new2->l_export->exp_ldlm_data.led_held_locks); } if (*flags == LDLM_FL_WAIT_NOREPROC) - ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); + ldlm_lock_addref_internal_nolock(new2, + lock->l_granted_mode); /* insert new2 at lock */ - ldlm_resource_add_lock(res, ownlocks, new2); + ldlm_resource_add_lock(res, pos, new2); LDLM_LOCK_PUT(new2); break; } @@ -364,11 +335,14 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* At this point we're granting the lock request. */ req->l_granted_mode = req->l_req_mode; - /* Add req to the granted queue before calling ldlm_reprocess_all(). */ - if (!added) { + if (added) { + list_move(&req->l_res_link, &destroy_list); + } else { + /* Add req to the granted queue before calling + * ldlm_reprocess_all() below. */ list_del_init(&req->l_res_link); - /* insert new lock before ownlocks in list. */ - ldlm_resource_add_lock(res, ownlocks, req); + /* insert new lock before pos in the list. */ + ldlm_resource_add_lock(res, pos, req); } if (*flags != LDLM_FL_WAIT_NOREPROC) { @@ -383,11 +357,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, * but only once because first_enq will be false from * ldlm_reprocess_queue. */ if ((mode == LCK_NL) && overlaps) { - struct list_head rpc_list - = LIST_HEAD_INIT(rpc_list); + struct list_head rpc_list = + LIST_HEAD_INIT(rpc_list); int rc; -restart: - ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); + restart: + ldlm_reprocess_queue(res, &res->lr_waiting, + &rpc_list); unlock_res(res); rc = ldlm_run_cp_ast_work(&rpc_list); lock_res(res); @@ -396,19 +371,274 @@ restart: } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, NULL); + ldlm_add_ast_work_item(req, NULL, work_list); + } + } + + out: + if (!list_empty(&destroy_list)) { + /* FIXME: major hack. when called from ldlm_lock_enqueue() + * the res and the lock are locked. When called from + * ldlm_reprocess_queue() the res is locked but the lock + * is not. */ + if (added && first_enq && res->lr_namespace->ns_client) + unlock_bitlock(req); + + unlock_res(res); + + CDEBUG(D_DLMTRACE, "Destroy locks:\n"); + + list_for_each_safe(pos, tmp, &destroy_list) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); + ldlm_lock_dump(D_DLMTRACE, lock, ++i); + ldlm_flock_destroy(lock, lock->l_req_mode, *flags); + } + + if (added && first_enq && res->lr_namespace->ns_client) + lock_bitlock(req); + + lock_res(res); + } + + RETURN(rc); +} + +struct ldlm_sleep_flock { + __u64 lsf_pid; + __u64 lsf_nid; + __u64 lsf_blocking_pid; + __u64 lsf_blocking_nid; + struct list_head lsf_list; +}; + +int +ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + struct ldlm_sleep_flock *lsf; + struct list_head *pos; + __u64 pid, nid, blocking_pid, blocking_nid; + unsigned int flags; + int rc = 0; + ENTRY; + + req->rq_status = 0; + + dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req), + lustre_swab_ldlm_request); + if (dlm_req == NULL) { + CERROR("bad request buffer for flock deadlock check\n"); + RETURN(-EFAULT); + } + + flags = dlm_req->lock_flags; + pid = dlm_req->lock_desc.l_policy_data.l_flock.pid; + nid = dlm_req->lock_desc.l_policy_data.l_flock.nid; + blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid; + blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid; + + CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" " + "blk: pid: "LPU64" nid: "LPU64"\n", + dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid); + + if (flags & LDLM_FL_GET_BLOCKING) { + struct ldlm_lock *lock; + struct ldlm_reply *dlm_rep; + int size = sizeof(*dlm_rep); + + lock = ldlm_handle2lock(&dlm_req->lock_handle1); + if (!lock) { + CERROR("received deadlock check for unknown lock " + "cookie "LPX64" from client %s id %s\n", + dlm_req->lock_handle1.cookie, + req->rq_export->exp_client_uuid.uuid, + req->rq_peerstr); + req->rq_status = -ESTALE; + RETURN(0); + } + + lock_res_and_lock(lock); + blocking_pid = lock->l_policy_data.l_flock.blocking_pid; + blocking_nid = lock->l_policy_data.l_flock.blocking_nid; + unlock_res_and_lock(lock); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("lustre_pack_reply failed: rc = %d\n", rc); + req->rq_status = rc; + RETURN(0); + } + + dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep)); + dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid = + blocking_pid; + dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid = + blocking_nid; + } else { + rc = lustre_pack_reply(req, 0, NULL, NULL); + } + + if (flags & LDLM_FL_DEADLOCK_CHK) { + __u64 orig_blocking_pid = blocking_pid; + __u64 orig_blocking_nid = blocking_nid; + restart: + list_for_each(pos, &ldlm_flock_waitq) { + lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list); + + /* We want to return a deadlock condition for the + * last lock on the waitq that created the deadlock + * situation. Posix verification suites expect this + * behavior. We'll stop if we haven't found a deadlock + * up to the point where the current process is queued + * to let the last lock on the queue that's in the + * deadlock loop detect the deadlock. In this case + * just update the blocking info.*/ + if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) { + lsf->lsf_blocking_pid = blocking_pid; + lsf->lsf_blocking_nid = blocking_nid; + break; + } + + if ((lsf->lsf_pid != blocking_pid) || + (lsf->lsf_nid != blocking_nid)) + continue; + + blocking_pid = lsf->lsf_blocking_pid; + blocking_nid = lsf->lsf_blocking_nid; + + if (blocking_pid == pid && blocking_nid == nid){ + req->rq_status = -EDEADLOCK; + flags |= LDLM_FL_DEADLOCK_DEL; + break; + } + + goto restart; + } + + /* If we got all the way thru the list then we're not on it. */ + if (pos == &ldlm_flock_waitq) { + OBD_ALLOC(lsf, sizeof(*lsf)); + if (!lsf) + RETURN(-ENOSPC); + + lsf->lsf_pid = pid; + lsf->lsf_nid = nid; + lsf->lsf_blocking_pid = orig_blocking_pid; + lsf->lsf_blocking_nid = orig_blocking_nid; + list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq); + } + } + + if (flags & LDLM_FL_DEADLOCK_DEL) { + list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) { + if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) { + list_del_init(&lsf->lsf_list); + OBD_FREE(lsf, sizeof(*lsf)); + break; + } } } - /* In case we're reprocessing the requested lock we can't destroy - * it until after calling ldlm_ast_work_item() above so that lawi() - * can bump the reference count on req. Otherwise req could be freed - * before the completion AST can be sent. */ - if (added) - ldlm_flock_destroy(req, mode, *flags); + RETURN(rc); +} + +int +ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock, + unsigned int flags) +{ + struct obd_import *imp; + struct ldlm_request *body; + struct ldlm_reply *reply; + struct ptlrpc_request *req; + int rc, size = sizeof(*body); + ENTRY; + + CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags); + + imp = obd->u.cli.cl_import; + req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1, + &size, NULL); + if (!req) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lock_flags = flags; + ldlm_lock2desc(lock, &body->lock_desc); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + if (flags & LDLM_FL_GET_BLOCKING) { + size = sizeof(*reply); + req->rq_replen = lustre_msg_size(1, &size); + } else { + req->rq_replen = lustre_msg_size(0, NULL); + } + + rc = ptlrpc_queue_wait(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + if (flags & LDLM_FL_GET_BLOCKING) { + reply = lustre_swab_repbuf(req, 0, sizeof (*reply), + lustre_swab_ldlm_reply); + if (reply == NULL) { + CERROR ("Can't unpack ldlm_reply\n"); + GOTO (out, rc = -EPROTO); + } + + lock->l_policy_data.l_flock.blocking_pid = + reply->lock_desc.l_policy_data.l_flock.blocking_pid; + lock->l_policy_data.l_flock.blocking_nid = + reply->lock_desc.l_policy_data.l_flock.blocking_nid; + + CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" " + "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n", + lock->l_policy_data.l_flock.pid, + lock->l_policy_data.l_flock.nid, + lock->l_policy_data.l_flock.blocking_pid, + lock->l_policy_data.l_flock.blocking_nid); + } - ldlm_resource_dump(D_OTHER, res); - RETURN(LDLM_ITER_CONTINUE); + rc = req->rq_status; + out: + ptlrpc_req_finished(req); + RETURN(rc); +} + +int +ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd, + struct ldlm_lock *lock) +{ + unsigned int flags = 0; + int rc; + ENTRY; + + if (obd == NULL) { + /* Delete this process from the sleeplock list. */ + flags = LDLM_FL_DEADLOCK_DEL; + rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags); + RETURN(rc); + } + + flags = LDLM_FL_GET_BLOCKING; + if (obd == master_obd) + flags |= LDLM_FL_DEADLOCK_CHK; + + rc = ldlm_send_flock_deadlock_check(obd, lock, flags); + CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags); + if (rc || (flags & LDLM_FL_DEADLOCK_CHK)) + RETURN(rc); + + CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n", + master_obd); + + flags = LDLM_FL_DEADLOCK_CHK; + + rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags); + + CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags); + + RETURN(rc); } struct ldlm_flock_wait_data { @@ -425,11 +655,10 @@ ldlm_flock_interrupted_wait(void *data) lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock; - /* take lock off the deadlock detection waitq. */ - list_del_init(&lock->l_flock_waitq); - /* client side - set flag to prevent lock from being put on lru list */ + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; + unlock_res_and_lock(lock); ldlm_lock_decref_internal(lock, lock->l_req_mode); ldlm_lock2handle(lock, &lockh); @@ -440,20 +669,17 @@ ldlm_flock_interrupted_wait(void *data) int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) { - struct ldlm_namespace *ns; - struct file_lock *getlk = lock->l_ast_data; struct ldlm_flock_wait_data fwd; unsigned long irqflags; struct obd_device *obd; + struct obd_device *master_obd = (struct obd_device *)lock->l_ast_data; struct obd_import *imp = NULL; ldlm_error_t err; + int deadlock_checked = 0; int rc = 0; struct l_wait_info lwi; ENTRY; - CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n", - flags, data, getlk); - LASSERT(flags != LDLM_FL_WAIT_NOREPROC); if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | @@ -467,6 +693,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) fwd.fwd_lock = lock; obd = class_exp2obd(lock->l_conn_export); + CDEBUG(D_DLMTRACE, "flags: 0x%x master: %p obd: %p\n", + flags, master_obd, obd); + /* if this is a local lock, then there is no import */ if (obd != NULL) imp = obd->u.cli.cl_import; @@ -477,46 +706,42 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) spin_unlock_irqrestore(&imp->imp_lock, irqflags); } - lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd); + lwi = LWI_TIMEOUT_INTR(ldlm_deadlock_timeout, NULL, + ldlm_flock_interrupted_wait, &fwd); - /* Go to sleep until the lock is granted. */ + restart: rc = l_wait_event(lock->l_waitq, ((lock->l_req_mode == lock->l_granted_mode) || lock->l_destroyed), &lwi); + if (rc == -ETIMEDOUT) { + deadlock_checked = 1; + rc = ldlm_flock_deadlock_check(master_obd, obd, lock); + if (rc == -EDEADLK) + ldlm_flock_interrupted_wait(&fwd); + else { + CDEBUG(D_DLMTRACE, "lock: %p going back to sleep,\n", + lock); + goto restart; + } + } else { + if (deadlock_checked) + ldlm_flock_deadlock_check(master_obd, NULL, lock); + } + LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc); RETURN(rc); -granted: - + granted: LDLM_DEBUG(lock, "client-side enqueue granted"); - ns = lock->l_resource->lr_namespace; - lock_res(lock->l_resource); - - /* take lock off the deadlock detection waitq. */ - list_del_init(&lock->l_flock_waitq); + lock_res_and_lock(lock); /* ldlm_lock_enqueue() has already placed lock on the granted list. */ list_del_init(&lock->l_res_link); if (flags & LDLM_FL_TEST_LOCK) { - /* fcntl(F_GETLK) request */ - /* The old mode was saved in getlk->fl_type so that if the mode - * in the lock changes we can decref the approprate refcount. */ - ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC); - switch (lock->l_granted_mode) { - case LCK_PR: - getlk->fl_type = F_RDLCK; - break; - case LCK_PW: - getlk->fl_type = F_WRLCK; - break; - default: - getlk->fl_type = F_UNLCK; - } - getlk->fl_pid = lock->l_policy_data.l_flock.pid; - getlk->fl_start = lock->l_policy_data.l_flock.start; - getlk->fl_end = lock->l_policy_data.l_flock.end; + /* client side - set flag to prevent sending a CANCEL */ + lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; } else { int noreproc = LDLM_FL_WAIT_NOREPROC; @@ -526,21 +751,7 @@ granted: if (flags == 0) wake_up(&lock->l_waitq); } - unlock_res(lock->l_resource); - RETURN(0); -} -int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - ENTRY; - - LASSERT(lock); - LASSERT(flag == LDLM_CB_CANCELING); - - /* take lock off the deadlock detection waitq. */ - lock_res(lock->l_resource); - list_del_init(&lock->l_flock_waitq); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 13d3be3..48b219d 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -291,8 +291,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, RETURN(lock); } -void unlock_bitlock(struct ldlm_lock *lock); - int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, struct ldlm_res_id new_resid) { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 4e32bd4..cfd8c97 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1054,6 +1054,7 @@ static int ldlm_msg_check_version(struct lustre_msg *msg) case LDLM_BL_CALLBACK: case LDLM_CP_CALLBACK: case LDLM_GL_CALLBACK: + case LDLM_FLK_DEADLOCK_CHK: rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", @@ -1572,6 +1573,7 @@ void __exit ldlm_exit(void) /* ldlm_flock.c */ EXPORT_SYMBOL(ldlm_flock_completion_ast); +EXPORT_SYMBOL(ldlm_handle_flock_deadlock_check); /* ldlm_extent.c */ EXPORT_SYMBOL(ldlm_extent_shift_kms); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index f595a9b..7a5e367 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -424,11 +424,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, if (!is_replay) { rc = ldlm_lock_enqueue(ns, &lock, NULL, flags); if (lock->l_completion_ast != NULL) { + /* since the lock made it to the server at this point + * it's the completion AST's responsibilty to cleanup + * the lock if the completion processing fails since + * it's no longer a simple local lock cancel. */ int err = lock->l_completion_ast(lock, *flags, NULL); if (!rc) rc = err; - if (rc) - cleanup_phase = 2; } } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index c993a2e..94ba1db 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -34,6 +34,7 @@ #endif #include #include +#include #include "llite_internal.h" __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); @@ -1724,33 +1725,35 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) RETURN(rc); } -int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) +int ll_file_flock(struct file *file, int cmd, struct file_lock *fl) { struct inode *inode = file->f_dentry->d_inode; struct ll_inode_info *li = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); - struct obd_device *obddev; - struct ldlm_res_id res_id = - { .name = {id_fid(&li->lli_id), id_group(&li->lli_id), LDLM_FLOCK} }; + struct obd_device *obd = md_get_real_obd(sbi->ll_md_exp, &li->lli_id); + struct ldlm_res_id res_id = { .name = {id_fid(&li->lli_id), + id_group(&li->lli_id), LDLM_FLOCK} }; struct lustre_handle lockh = {0}; + struct ptlrpc_connection *conn; + ptl_process_id_t ptlpid; ldlm_policy_data_t flock; ldlm_mode_t mode = 0; int flags = 0; int rc; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n", - inode->i_ino, file_lock); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu cmd=%d file_lock=%p\n", + inode->i_ino, cmd, fl); - flock.l_flock.pid = file_lock->fl_pid; - flock.l_flock.start = file_lock->fl_start; - flock.l_flock.end = file_lock->fl_end; + if (!(fl->fl_flags & FL_POSIX)) + RETURN(-ENOSYS); - switch (file_lock->fl_type) { + switch (fl->fl_type) { case F_RDLCK: mode = LCK_PR; break; case F_UNLCK: + /* An unlock request may or may not have any relation to * existing locks so we may not be able to pass a lock handle * via a normal ldlm_lock_cancel() request. The request may even @@ -1765,22 +1768,21 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) mode = LCK_PW; break; default: - CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); - LBUG(); + CERROR("unknown fcntl lock type: %d\n", fl->fl_type); + RETURN(-EINVAL); } switch (cmd) { - case F_SETLKW: -#ifdef F_SETLKW64 - case F_SETLKW64: -#endif - flags = 0; - break; case F_SETLK: #ifdef F_SETLK64 case F_SETLK64: #endif - flags = LDLM_FL_BLOCK_NOWAIT; + flags |= LDLM_FL_BLOCK_NOWAIT; + break; + case F_SETLKW: +#ifdef F_SETLKW64 + case F_SETLKW64: +#endif break; case F_GETLK: #ifdef F_GETLK64 @@ -1789,23 +1791,88 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) flags = LDLM_FL_TEST_LOCK; /* Save the old mode so that if the mode in the lock changes we * can decrement the appropriate reader or writer refcount. */ - file_lock->fl_type = mode; + fl->fl_type = mode; break; default: CERROR("unknown fcntl lock command: %d\n", cmd); - LBUG(); + RETURN(-EINVAL); } + /* Since we're called on every close to remove any oustanding Posix + * flocks owned by the process it's worth a little effort to avoid + * the RPCs if there are no flocks on this file from this node. */ + if (mode == LCK_NL && fl->fl_start == 0 && fl->fl_end >= OFFSET_MAX) { + struct ldlm_resource *res; + + res = ldlm_resource_get(obd->obd_namespace, NULL, + res_id, LDLM_FLOCK, 0); + if (res == NULL) + RETURN(0); + + ldlm_resource_putref(res); + } + + conn = class_exp2cliimp(obd->obd_self_export)->imp_connection; + if (!conn || !conn->c_peer.peer_ni) + RETURN(-ENOTCONN); + + rc = PtlGetId(conn->c_peer.peer_ni->pni_ni_h, &ptlpid); + if (rc != PTL_OK) + RETURN(-ENOTCONN); + + flock.l_flock.start = fl->fl_start; + flock.l_flock.end = fl->fl_end; + /* XXX - ptlpid.pid is currently coming back a constant; i.e. 12345. */ + flock.l_flock.pid = fl->fl_pid; + flock.l_flock.nid = ptlpid.nid; + flock.l_flock.blocking_pid = 0; + flock.l_flock.blocking_nid = 0; + CDEBUG(D_DLMTRACE, "inode=%lu, pid="LPU64", flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid, flags, mode, flock.l_flock.start, flock.l_flock.end); - obddev = md_get_real_obd(sbi->ll_md_exp, &li->lli_id); - rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL, - obddev->obd_namespace, + rc = ldlm_cli_enqueue(obd->obd_self_export, NULL, obd->obd_namespace, res_id, LDLM_FLOCK, &flock, mode, &flags, - NULL, ldlm_flock_completion_ast, NULL, file_lock, + NULL, ldlm_flock_completion_ast, NULL, + md_get_real_obd(sbi->ll_md_exp, &sbi->ll_rootid), NULL, 0, NULL, &lockh); + + if (flags & LDLM_FL_TEST_LOCK) { + struct ldlm_lock *lock = ldlm_handle2lock(&lockh); + + fl->fl_start = lock->l_policy_data.l_flock.start; + fl->fl_end = lock->l_policy_data.l_flock.end; + fl->fl_pid = lock->l_policy_data.l_flock.pid; + + switch (lock->l_granted_mode) { + case LCK_PR: + fl->fl_type = F_RDLCK; + break; + case LCK_PW: + fl->fl_type = F_WRLCK; + break; + case LCK_NL: + fl->fl_type = F_UNLCK; + break; + default: + CERROR("unexpected lock type: %d returned from server." + "\n", lock->l_granted_mode); + rc = -EINVAL; + break; + } + + /* offset the addref() done by ldlm_handle2lock() above. */ + LDLM_LOCK_PUT(lock); + ldlm_lock_decref(&lockh, mode); + ldlm_cli_cancel(&lockh); + + /* the LDLM_CBPENDING flag was set in the lock by the + * completion AST so the ldlm_lock_decref() call above + * scheduled a blocking AST which will do the final + * lock put on the lock. */ + } + RETURN(rc); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 8cdddeb..3f1a247 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2927,6 +2927,7 @@ static int mds_msg_check_version(struct lustre_msg *msg) case LDLM_CONVERT: case LDLM_BL_CALLBACK: case LDLM_CP_CALLBACK: + case LDLM_FLK_DEADLOCK_CHK: rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", @@ -3237,6 +3238,10 @@ int mds_handle(struct ptlrpc_request *req) LBUG(); OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); break; + case LDLM_FLK_DEADLOCK_CHK: + DEBUG_REQ(D_INODE, req, "flock deadlock check"); + rc = ldlm_handle_flock_deadlock_check(req); + break; case LLOG_ORIGIN_HANDLE_OPEN: DEBUG_REQ(D_INODE, req, "llog_init"); OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 995aec2..276a5e5 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -33,56 +33,57 @@ struct ll_rpc_opcode { __u32 opcode; const char *opname; } ll_rpc_opcode_table[LUSTRE_MAX_OPCODES] = { - { OST_REPLY, "ost_reply" }, - { OST_GETATTR, "ost_getattr" }, - { OST_SETATTR, "ost_setattr" }, - { OST_READ, "ost_read" }, - { OST_WRITE, "ost_write" }, - { OST_CREATE , "ost_create" }, - { OST_DESTROY, "ost_destroy" }, - { OST_GET_INFO, "ost_get_info" }, - { OST_CONNECT, "ost_connect" }, - { OST_DISCONNECT, "ost_disconnect" }, - { OST_PUNCH, "ost_punch" }, - { OST_OPEN, "ost_open" }, - { OST_CLOSE, "ost_close" }, - { OST_STATFS, "ost_statfs" }, - { OST_SAN_READ, "ost_san_read" }, - { OST_SAN_WRITE, "ost_san_write" }, - { OST_SYNC, "ost_sync" }, - { OST_SET_INFO, "ost_set_info" }, - { MDS_GETATTR, "mds_getattr" }, - { MDS_GETATTR_LOCK, "mds_getattr_lock" }, - { MDS_CLOSE, "mds_close" }, - { MDS_REINT, "mds_reint" }, - { MDS_READPAGE, "mds_readpage" }, - { MDS_CONNECT, "mds_connect" }, - { MDS_DISCONNECT, "mds_disconnect" }, - { MDS_GETSTATUS, "mds_getstatus" }, - { MDS_STATFS, "mds_statfs" }, - { MDS_PIN, "mds_pin" }, - { MDS_UNPIN, "mds_unpin" }, - { MDS_SYNC, "mds_sync" }, - { MDS_DONE_WRITING, "mds_done_writing" }, - { MDS_ACCESS_CHECK, "mds_access_check"}, - { MDS_PARSE_ID, "mds_parse_id" }, - { LDLM_ENQUEUE, "ldlm_enqueue" }, - { LDLM_CONVERT, "ldlm_convert" }, - { LDLM_CANCEL, "ldlm_cancel" }, - { LDLM_BL_CALLBACK, "ldlm_bl_callback" }, - { LDLM_CP_CALLBACK, "ldlm_cp_callback" }, - { LDLM_GL_CALLBACK, "ldlm_gl_callback" }, - { PTLBD_QUERY, "ptlbd_query" }, - { PTLBD_READ, "ptlbd_read" }, - { PTLBD_WRITE, "ptlbd_write" }, - { PTLBD_FLUSH, "ptlbd_flush" }, - { PTLBD_CONNECT, "ptlbd_connect" }, - { PTLBD_DISCONNECT, "ptlbd_disconnect" }, - { OBD_PING, "obd_ping" }, - { OBD_LOG_CANCEL, "llog_origin_handle_cancel"}, - { SEC_INIT, "sec_init"}, - { SEC_INIT_CONTINUE,"sec_init_continue"}, - { SEC_FINI, "sec_fini"}, + { OST_REPLY, "ost_reply" }, + { OST_GETATTR, "ost_getattr" }, + { OST_SETATTR, "ost_setattr" }, + { OST_READ, "ost_read" }, + { OST_WRITE, "ost_write" }, + { OST_CREATE , "ost_create" }, + { OST_DESTROY, "ost_destroy" }, + { OST_GET_INFO, "ost_get_info" }, + { OST_CONNECT, "ost_connect" }, + { OST_DISCONNECT, "ost_disconnect" }, + { OST_PUNCH, "ost_punch" }, + { OST_OPEN, "ost_open" }, + { OST_CLOSE, "ost_close" }, + { OST_STATFS, "ost_statfs" }, + { OST_SAN_READ, "ost_san_read" }, + { OST_SAN_WRITE, "ost_san_write" }, + { OST_SYNC, "ost_sync" }, + { OST_SET_INFO, "ost_set_info" }, + { MDS_GETATTR, "mds_getattr" }, + { MDS_GETATTR_LOCK , "mds_getattr_lock" }, + { MDS_CLOSE, "mds_close" }, + { MDS_REINT, "mds_reint" }, + { MDS_READPAGE, "mds_readpage" }, + { MDS_CONNECT, "mds_connect" }, + { MDS_DISCONNECT, "mds_disconnect" }, + { MDS_GETSTATUS, "mds_getstatus" }, + { MDS_STATFS, "mds_statfs" }, + { MDS_PIN, "mds_pin" }, + { MDS_UNPIN, "mds_unpin" }, + { MDS_SYNC, "mds_sync" }, + { MDS_DONE_WRITING, "mds_done_writing" }, + { MDS_ACCESS_CHECK, "mds_access_check"}, + { MDS_PARSE_ID, "mds_parse_id" }, + { LDLM_ENQUEUE, "ldlm_enqueue" }, + { LDLM_CONVERT, "ldlm_convert" }, + { LDLM_CANCEL, "ldlm_cancel" }, + { LDLM_BL_CALLBACK, "ldlm_bl_callback" }, + { LDLM_CP_CALLBACK, "ldlm_cp_callback" }, + { LDLM_GL_CALLBACK, "ldlm_gl_callback" }, + { LDLM_FLK_DEADLOCK_CHK, "ldlm_flock_deadlock_check" }, + { PTLBD_QUERY, "ptlbd_query" }, + { PTLBD_READ, "ptlbd_read" }, + { PTLBD_WRITE, "ptlbd_write" }, + { PTLBD_FLUSH, "ptlbd_flush" }, + { PTLBD_CONNECT, "ptlbd_connect" }, + { PTLBD_DISCONNECT, "ptlbd_disconnect" }, + { OBD_PING, "obd_ping" }, + { OBD_LOG_CANCEL, "llog_origin_handle_cancel"}, + { SEC_INIT, "sec_init"}, + { SEC_INIT_CONTINUE, "sec_init_continue"}, + { SEC_FINI, "sec_fini"}, }; const char* ll_opcode2str(__u32 opcode) diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index f07d95b..8736f7a 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -849,7 +849,9 @@ void lustre_swab_ldlm_policy_data(ldlm_policy_data_t *d) __swab64s(&d->l_flock.start); __swab64s(&d->l_flock.end); __swab64s(&d->l_flock.pid); + __swab64s(&d->l_flock.nid); __swab64s(&d->l_flock.blocking_pid); + __swab64s(&d->l_flock.blocking_nid); } void lustre_swab_ldlm_intent(struct ldlm_intent *i) diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 04fb65b..65b8874 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -395,8 +395,9 @@ check_ldlm_flock(void) CHECK_MEMBER(ldlm_flock, start); CHECK_MEMBER(ldlm_flock, end); CHECK_MEMBER(ldlm_flock, pid); + CHECK_MEMBER(ldlm_flock, nid); CHECK_MEMBER(ldlm_flock, blocking_pid); - CHECK_MEMBER(ldlm_flock, blocking_export); + CHECK_MEMBER(ldlm_flock, blocking_nid); } void @@ -848,6 +849,7 @@ main(int argc, char **argv) CHECK_VALUE(LDLM_CANCEL); CHECK_VALUE(LDLM_BL_CALLBACK); CHECK_VALUE(LDLM_CP_CALLBACK); + CHECK_VALUE(LDLM_FLK_DEADLOCK_CHK); CHECK_VALUE(LDLM_LAST_OPC); CHECK_VALUE(LCK_EX); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index dc03a7b..3819669 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -166,7 +166,9 @@ void lustre_assert_wire_constants(void) (unsigned long long)LDLM_BL_CALLBACK); LASSERTF(LDLM_CP_CALLBACK == 105, " found %llu\n", (unsigned long long)LDLM_CP_CALLBACK); - LASSERTF(LDLM_LAST_OPC == 107, " found %llu\n", + LASSERTF(LDLM_FLK_DEADLOCK_CHK == 107, " found %llu\n", + (unsigned long long)LDLM_FLK_DEADLOCK_CHK); + LASSERTF(LDLM_LAST_OPC == 108, " found %llu\n", (unsigned long long)LDLM_LAST_OPC); LASSERTF(LCK_EX == 1, " found %llu\n", (unsigned long long)LCK_EX); @@ -922,14 +924,18 @@ void lustre_assert_wire_constants(void) (unsigned long long)(int)offsetof(struct ldlm_flock, pid)); LASSERTF((int)sizeof(((struct ldlm_flock *)0)->pid) == 8, " found %llu\n", (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->pid)); - LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 24, " found %llu\n", + LASSERTF((int)offsetof(struct ldlm_flock, nid) == 24, " found %llu\n", + (unsigned long long)(int)offsetof(struct ldlm_flock, nid)); + LASSERTF((int)sizeof(((struct ldlm_flock *)0)->nid) == 8, " found %llu\n", + (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->nid)); + LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 32, " found %llu\n", (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_pid)); LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_pid) == 8, " found %llu\n", (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_pid)); - LASSERTF((int)offsetof(struct ldlm_flock, blocking_export) == 32, " found %llu\n", - (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_export)); - LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_export) == 8, " found %llu\n", - (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_export)); + LASSERTF((int)offsetof(struct ldlm_flock, blocking_nid) == 40, " found %llu\n", + (unsigned long long)(int)offsetof(struct ldlm_flock, blocking_nid)); + LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_nid) == 8, " found %llu\n", + (unsigned long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_nid)); /* Checks for struct ldlm_intent */ LASSERTF((int)sizeof(struct ldlm_intent) == 8, " found %llu\n", -- 1.8.3.1