X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_flock.c;h=cbeacfa3047de6cf2c1181db8532bf8cd80ce430;hb=4ecae3cd5af60e389eba1e6eff2913b09f557203;hp=800880cfc1d897456617af0f73f0e921348aab92;hpb=926c6309185a25a8ac1541cfa67910325ed8626f;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 800880c..cbeacfa 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -21,6 +21,13 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +/* + * 2003 - 2005 Copyright, Hewlett-Packard Development Compnay, LP. + * + * Developed under the sponsorship of the U.S. Government + * under Subcontract No. B514193 + */ + #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -28,19 +35,15 @@ #include #include #include -#include +#include #else #include #endif #include "ldlm_internal.h" -#define l_flock_waitq l_lru - static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq); - -int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag); +static int ldlm_deadlock_timeout = 30 * HZ; /** * list_for_remaining_safe - iterate over the remaining entries in a list @@ -58,7 +61,8 @@ ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new) { return((new->l_policy_data.l_flock.pid == lock->l_policy_data.l_flock.pid) && - (new->l_export == lock->l_export)); + (new->l_policy_data.l_flock.nid == + lock->l_policy_data.l_flock.nid)); } static inline int @@ -78,11 +82,12 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)", mode, flags); - LASSERT(list_empty(&lock->l_flock_waitq)); - + /* don't need to take the locks here because the lock + * is on a local destroy list, not the resource list. */ list_del_init(&lock->l_res_link); + if (flags == LDLM_FL_WAIT_NOREPROC) { - /* client side - set a flag to prevent sending a CANCEL */ + /* client side - set flags to prevent sending a CANCEL */ lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; ldlm_lock_decref_internal(lock, mode); } @@ -91,73 +96,44 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) EXIT; } -static int -ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock) -{ - struct obd_export *req_export = req->l_export; - struct obd_export *blocking_export = blocking_lock->l_export; - pid_t req_pid = req->l_policy_data.l_flock.pid; - pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid; - struct ldlm_lock *lock; - -restart: - list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) { - if ((lock->l_policy_data.l_flock.pid != blocking_pid) || - (lock->l_export != blocking_export)) - continue; - - blocking_pid = lock->l_policy_data.l_flock.blocking_pid; - blocking_export = (struct obd_export *)(long) - lock->l_policy_data.l_flock.blocking_export; - if (blocking_pid == req_pid && blocking_export == req_export) - return 1; - - goto restart; - } - - return 0; -} - int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { + struct list_head destroy_list = LIST_HEAD_INIT(destroy_list); struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; - struct list_head *tmp; - struct list_head *ownlocks = NULL; - struct ldlm_lock *lock = NULL; + struct list_head *pos; + struct list_head *tmp = NULL; + struct ldlm_lock *lock; struct ldlm_lock *new = req; - struct ldlm_lock *new2 = NULL; + struct ldlm_lock *new2; ldlm_mode_t mode = req->l_req_mode; - int local = ns->ns_client; int added = (mode == LCK_NL); int overlaps = 0; + int rc = LDLM_ITER_CONTINUE; + int i = 0; ENTRY; - CDEBUG(D_DLMTRACE, "flags %#x pid "LPU64" mode %u start "LPU64" end " - LPU64"\n", *flags, new->l_policy_data.l_flock.pid, mode, - req->l_policy_data.l_flock.start, + CDEBUG(D_DLMTRACE, "flags %#x mode %u pid "LPU64" nid "LPU64" " + "start "LPU64" end "LPU64"\n", *flags, mode, + req->l_policy_data.l_flock.pid, + req->l_policy_data.l_flock.nid, + req->l_policy_data.l_flock.start, req->l_policy_data.l_flock.end); *err = ELDLM_OK; - if (local) { - /* No blocking ASTs are sent to the clients for - * Posix file & record locks */ - req->l_blocking_ast = NULL; - } else { - /* Called on the server for lock cancels. */ - req->l_blocking_ast = ldlm_flock_blocking_ast; - } + /* No blocking ASTs are sent for Posix file & record locks */ + req->l_blocking_ast = NULL; if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { /* This loop determines where this processes locks start * in the resource lr_granted list. */ - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + list_for_each(pos, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (ldlm_same_flock_owner(lock, req)) { - ownlocks = tmp; + tmp = pos; break; } } @@ -166,12 +142,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* This loop determines if there are existing locks * that conflict with the new lock request. */ - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + list_for_each(pos, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (ldlm_same_flock_owner(lock, req)) { - if (!ownlocks) - ownlocks = tmp; + if (!tmp) + tmp = pos; continue; } @@ -182,42 +158,39 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, if (!ldlm_flocks_overlap(lock, req)) continue; + /* deadlock detection will be done will be postponed + * until ldlm_flock_completion_ast(). */ + + *flags |= LDLM_FL_LOCK_CHANGED; + + req->l_policy_data.l_flock.blocking_pid = + lock->l_policy_data.l_flock.pid; + req->l_policy_data.l_flock.blocking_nid = + lock->l_policy_data.l_flock.nid; + if (!first_enq) RETURN(LDLM_ITER_CONTINUE); if (*flags & LDLM_FL_BLOCK_NOWAIT) { - ldlm_flock_destroy(req, mode, *flags); + list_move(&req->l_res_link, &destroy_list); *err = -EAGAIN; - RETURN(LDLM_ITER_STOP); + GOTO(out, rc = LDLM_ITER_STOP); } if (*flags & LDLM_FL_TEST_LOCK) { - ldlm_flock_destroy(req, mode, *flags); req->l_req_mode = lock->l_granted_mode; req->l_policy_data.l_flock.pid = lock->l_policy_data.l_flock.pid; + req->l_policy_data.l_flock.nid = + lock->l_policy_data.l_flock.nid; req->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; req->l_policy_data.l_flock.end = lock->l_policy_data.l_flock.end; - *flags |= LDLM_FL_LOCK_CHANGED; - RETURN(LDLM_ITER_STOP); - } - - if (ldlm_flock_deadlock(req, lock)) { - ldlm_flock_destroy(req, mode, *flags); - *err = -EDEADLK; - RETURN(LDLM_ITER_STOP); + list_move(&req->l_res_link, &destroy_list); + GOTO(out, rc = LDLM_ITER_STOP); } - req->l_policy_data.l_flock.blocking_pid = - lock->l_policy_data.l_flock.pid; - req->l_policy_data.l_flock.blocking_export = - (long)(void *)lock->l_export; - - LASSERT(list_empty(&req->l_flock_waitq)); - list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq); - ldlm_resource_add_lock(res, &res->lr_waiting, req); *flags |= LDLM_FL_BLOCK_GRANTED; RETURN(LDLM_ITER_STOP); @@ -225,24 +198,18 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } if (*flags & LDLM_FL_TEST_LOCK) { - ldlm_flock_destroy(req, mode, *flags); req->l_req_mode = LCK_NL; *flags |= LDLM_FL_LOCK_CHANGED; - RETURN(LDLM_ITER_STOP); + list_move(&req->l_res_link, &destroy_list); + GOTO(out, rc = LDLM_ITER_STOP); } - /* In case we had slept on this lock request take it off of the - * deadlock detection waitq. */ - list_del_init(&req->l_flock_waitq); - /* Scan the locks owned by this process that overlap this request. * We may have to merge or split existing locks. */ + pos = (tmp != NULL) ? tmp : &res->lr_granted; - if (!ownlocks) - ownlocks = &res->lr_granted; - - list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) { - lock = list_entry(ownlocks, struct ldlm_lock, l_res_link); + list_for_remaining_safe(pos, tmp, &res->lr_granted) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); if (!ldlm_same_flock_owner(lock, new)) break; @@ -281,7 +248,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } if (added) { - ldlm_flock_destroy(lock, mode, *flags); + list_move(&lock->l_res_link, &destroy_list); } else { new = lock; added = 1; @@ -307,7 +274,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, new->l_policy_data.l_flock.end + 1; break; } - ldlm_flock_destroy(lock, lock->l_req_mode, *flags); + list_move(&lock->l_res_link, &destroy_list); continue; } if (new->l_policy_data.l_flock.end >= @@ -333,14 +300,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); if (!new2) { - ldlm_flock_destroy(req, lock->l_granted_mode, *flags); + list_move(&req->l_res_link, &destroy_list); *err = -ENOLCK; - RETURN(LDLM_ITER_STOP); + GOTO(out, rc = LDLM_ITER_STOP); } new2->l_granted_mode = lock->l_granted_mode; new2->l_policy_data.l_flock.pid = new->l_policy_data.l_flock.pid; + new2->l_policy_data.l_flock.nid = + new->l_policy_data.l_flock.nid; new2->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; new2->l_policy_data.l_flock.end = @@ -354,20 +323,26 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, &new2->l_export->exp_ldlm_data.led_held_locks); } if (*flags == LDLM_FL_WAIT_NOREPROC) - ldlm_lock_addref_internal(new2, lock->l_granted_mode); + ldlm_lock_addref_internal_nolock(new2, + lock->l_granted_mode); /* insert new2 at lock */ - ldlm_resource_add_lock(res, ownlocks, new2); + ldlm_resource_add_lock(res, pos, new2); LDLM_LOCK_PUT(new2); break; } - /* Add req to the granted queue before calling ldlm_reprocess_all(). */ - if (!added) { - req->l_granted_mode = req->l_req_mode; + /* At this point we're granting the lock request. */ + req->l_granted_mode = req->l_req_mode; + + if (added) { + list_move(&req->l_res_link, &destroy_list); + } else { + /* Add req to the granted queue before calling + * ldlm_reprocess_all() below. */ list_del_init(&req->l_res_link); - /* insert new lock before ownlocks in list. */ - ldlm_resource_add_lock(res, ownlocks, req); + /* insert new lock before pos in the list. */ + ldlm_resource_add_lock(res, pos, req); } if (*flags != LDLM_FL_WAIT_NOREPROC) { @@ -382,36 +357,288 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, * but only once because first_enq will be false from * ldlm_reprocess_queue. */ if ((mode == LCK_NL) && overlaps) { - struct list_head rpc_list - = LIST_HEAD_INIT(rpc_list); + struct list_head rpc_list = + LIST_HEAD_INIT(rpc_list); int rc; -restart: - res->lr_tmp = &rpc_list; - ldlm_reprocess_queue(res, &res->lr_waiting); - res->lr_tmp = NULL; - - l_unlock(&ns->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, - &rpc_list); - l_lock(&ns->ns_lock); + restart: + ldlm_reprocess_queue(res, &res->lr_waiting, + &rpc_list); + unlock_res(res); + rc = ldlm_run_cp_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, NULL, 0); + ldlm_add_ast_work_item(req, NULL, work_list); } } - /* In case we're reprocessing the requested lock we can't destroy - * it until after calling ldlm_ast_work_item() above so that lawi() - * can bump the reference count on req. Otherwise req could be freed - * before the completion AST can be sent. */ - if (added) - ldlm_flock_destroy(req, mode, *flags); + out: + if (!list_empty(&destroy_list)) { + /* FIXME: major hack. when called from ldlm_lock_enqueue() + * the res and the lock are locked. When called from + * ldlm_reprocess_queue() the res is locked but the lock + * is not. */ + if (added && first_enq && res->lr_namespace->ns_client) + unlock_bitlock(req); + + unlock_res(res); + + CDEBUG(D_DLMTRACE, "Destroy locks:\n"); + + list_for_each_safe(pos, tmp, &destroy_list) { + lock = list_entry(pos, struct ldlm_lock, l_res_link); + ldlm_lock_dump(D_DLMTRACE, lock, ++i); + ldlm_flock_destroy(lock, lock->l_req_mode, *flags); + } + + if (added && first_enq && res->lr_namespace->ns_client) + lock_bitlock(req); + + lock_res(res); + } - ldlm_resource_dump(res); - RETURN(LDLM_ITER_CONTINUE); + RETURN(rc); +} + +struct ldlm_sleep_flock { + __u64 lsf_pid; + __u64 lsf_nid; + __u64 lsf_blocking_pid; + __u64 lsf_blocking_nid; + struct list_head lsf_list; +}; + +int +ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + struct ldlm_sleep_flock *lsf; + struct list_head *pos; + __u64 pid, nid, blocking_pid, blocking_nid; + unsigned int flags; + int rc = 0; + ENTRY; + + req->rq_status = 0; + + dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req), + lustre_swab_ldlm_request); + if (dlm_req == NULL) { + CERROR("bad request buffer for flock deadlock check\n"); + RETURN(-EFAULT); + } + + flags = dlm_req->lock_flags; + pid = dlm_req->lock_desc.l_policy_data.l_flock.pid; + nid = dlm_req->lock_desc.l_policy_data.l_flock.nid; + blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid; + blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid; + + CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" " + "blk: pid: "LPU64" nid: "LPU64"\n", + dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid); + + if (flags & LDLM_FL_GET_BLOCKING) { + struct ldlm_lock *lock; + struct ldlm_reply *dlm_rep; + int size = sizeof(*dlm_rep); + + lock = ldlm_handle2lock(&dlm_req->lock_handle1); + if (!lock) { + CERROR("received deadlock check for unknown lock " + "cookie "LPX64" from client %s id %s\n", + dlm_req->lock_handle1.cookie, + req->rq_export->exp_client_uuid.uuid, + req->rq_peerstr); + req->rq_status = -ESTALE; + RETURN(0); + } + + lock_res_and_lock(lock); + blocking_pid = lock->l_policy_data.l_flock.blocking_pid; + blocking_nid = lock->l_policy_data.l_flock.blocking_nid; + unlock_res_and_lock(lock); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("lustre_pack_reply failed: rc = %d\n", rc); + req->rq_status = rc; + RETURN(0); + } + + dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep)); + dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid = + blocking_pid; + dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid = + blocking_nid; + } else { + rc = lustre_pack_reply(req, 0, NULL, NULL); + } + + if (flags & LDLM_FL_DEADLOCK_CHK) { + __u64 orig_blocking_pid = blocking_pid; + __u64 orig_blocking_nid = blocking_nid; + restart: + list_for_each(pos, &ldlm_flock_waitq) { + lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list); + + /* We want to return a deadlock condition for the + * last lock on the waitq that created the deadlock + * situation. Posix verification suites expect this + * behavior. We'll stop if we haven't found a deadlock + * up to the point where the current process is queued + * to let the last lock on the queue that's in the + * deadlock loop detect the deadlock. In this case + * just update the blocking info.*/ + if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) { + lsf->lsf_blocking_pid = blocking_pid; + lsf->lsf_blocking_nid = blocking_nid; + break; + } + + if ((lsf->lsf_pid != blocking_pid) || + (lsf->lsf_nid != blocking_nid)) + continue; + + blocking_pid = lsf->lsf_blocking_pid; + blocking_nid = lsf->lsf_blocking_nid; + + if (blocking_pid == pid && blocking_nid == nid){ + req->rq_status = -EDEADLOCK; + flags |= LDLM_FL_DEADLOCK_DEL; + break; + } + + goto restart; + } + + /* If we got all the way thru the list then we're not on it. */ + if (pos == &ldlm_flock_waitq) { + OBD_ALLOC(lsf, sizeof(*lsf)); + if (!lsf) + RETURN(-ENOSPC); + + lsf->lsf_pid = pid; + lsf->lsf_nid = nid; + lsf->lsf_blocking_pid = orig_blocking_pid; + lsf->lsf_blocking_nid = orig_blocking_nid; + list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq); + } + } + + if (flags & LDLM_FL_DEADLOCK_DEL) { + list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) { + if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) { + list_del_init(&lsf->lsf_list); + OBD_FREE(lsf, sizeof(*lsf)); + break; + } + } + } + + RETURN(rc); +} + +int +ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock, + unsigned int flags) +{ + struct obd_import *imp; + struct ldlm_request *body; + struct ldlm_reply *reply; + struct ptlrpc_request *req; + int rc, size = sizeof(*body); + ENTRY; + + CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags); + + imp = obd->u.cli.cl_import; + req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1, + &size, NULL); + if (!req) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lock_flags = flags; + ldlm_lock2desc(lock, &body->lock_desc); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + if (flags & LDLM_FL_GET_BLOCKING) { + size = sizeof(*reply); + req->rq_replen = lustre_msg_size(1, &size); + } else { + req->rq_replen = lustre_msg_size(0, NULL); + } + + rc = ptlrpc_queue_wait(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + if (flags & LDLM_FL_GET_BLOCKING) { + reply = lustre_swab_repbuf(req, 0, sizeof (*reply), + lustre_swab_ldlm_reply); + if (reply == NULL) { + CERROR ("Can't unpack ldlm_reply\n"); + GOTO (out, rc = -EPROTO); + } + + lock->l_policy_data.l_flock.blocking_pid = + reply->lock_desc.l_policy_data.l_flock.blocking_pid; + lock->l_policy_data.l_flock.blocking_nid = + reply->lock_desc.l_policy_data.l_flock.blocking_nid; + + CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" " + "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n", + lock->l_policy_data.l_flock.pid, + lock->l_policy_data.l_flock.nid, + lock->l_policy_data.l_flock.blocking_pid, + lock->l_policy_data.l_flock.blocking_nid); + } + + rc = req->rq_status; + out: + ptlrpc_req_finished(req); + RETURN(rc); +} + +int +ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd, + struct ldlm_lock *lock) +{ + unsigned int flags = 0; + int rc; + ENTRY; + + if (obd == NULL) { + /* Delete this process from the sleeplock list. */ + flags = LDLM_FL_DEADLOCK_DEL; + rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags); + RETURN(rc); + } + + flags = LDLM_FL_GET_BLOCKING; + if (obd == master_obd) + flags |= LDLM_FL_DEADLOCK_CHK; + + rc = ldlm_send_flock_deadlock_check(obd, lock, flags); + CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags); + if (rc || (flags & LDLM_FL_DEADLOCK_CHK)) + RETURN(rc); + + CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n", + master_obd); + + flags = LDLM_FL_DEADLOCK_CHK; + + rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags); + + CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags); + + RETURN(rc); } struct ldlm_flock_wait_data { @@ -424,44 +651,37 @@ ldlm_flock_interrupted_wait(void *data) { struct ldlm_lock *lock; struct lustre_handle lockh; - int rc; ENTRY; lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock; - /* take lock off the deadlock detection waitq. */ - list_del_init(&lock->l_flock_waitq); + /* client side - set flag to prevent lock from being put on lru list */ + lock_res_and_lock(lock); + lock->l_flags |= LDLM_FL_CBPENDING; + unlock_res_and_lock(lock); ldlm_lock_decref_internal(lock, lock->l_req_mode); ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); + ldlm_cli_cancel(&lockh); EXIT; } int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) { - struct ldlm_namespace *ns; - struct file_lock *getlk = lock->l_ast_data; struct ldlm_flock_wait_data fwd; unsigned long irqflags; struct obd_device *obd; + struct obd_device *master_obd = (struct obd_device *)lock->l_ast_data; struct obd_import *imp = NULL; ldlm_error_t err; + int deadlock_checked = 0; int rc = 0; struct l_wait_info lwi; ENTRY; - CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n", - flags, data, getlk); - LASSERT(flags != LDLM_FL_WAIT_NOREPROC); - if (flags == 0) { - wake_up(&lock->l_waitq); - RETURN(0); - } - if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) goto granted; @@ -470,10 +690,12 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) "sleeping"); ldlm_lock_dump(D_DLMTRACE, lock, 0); - fwd.fwd_lock = lock; obd = class_exp2obd(lock->l_conn_export); + CDEBUG(D_DLMTRACE, "flags: 0x%x master: %p obd: %p\n", + flags, master_obd, obd); + /* if this is a local lock, then there is no import */ if (obd != NULL) imp = obd->u.cli.cl_import; @@ -484,75 +706,52 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) spin_unlock_irqrestore(&imp->imp_lock, irqflags); } - lwi = LWI_TIMEOUT_INTR(0,NULL,ldlm_flock_interrupted_wait,&fwd); + lwi = LWI_TIMEOUT_INTR(ldlm_deadlock_timeout, NULL, + ldlm_flock_interrupted_wait, &fwd); - /* Go to sleep until the lock is granted. */ + restart: rc = l_wait_event(lock->l_waitq, ((lock->l_req_mode == lock->l_granted_mode) || lock->l_destroyed), &lwi); - if (rc) { - LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", - rc); - RETURN(rc); + if (rc == -ETIMEDOUT) { + deadlock_checked = 1; + rc = ldlm_flock_deadlock_check(master_obd, obd, lock); + if (rc == -EDEADLK) + ldlm_flock_interrupted_wait(&fwd); + else { + CDEBUG(D_DLMTRACE, "lock: %p going back to sleep,\n", + lock); + goto restart; + } + } else { + if (deadlock_checked) + ldlm_flock_deadlock_check(master_obd, NULL, lock); } - LASSERT(!(lock->l_destroyed)); - -granted: - - LDLM_DEBUG(lock, "client-side enqueue waking up"); - ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); - - /* take lock off the deadlock detection waitq. */ - list_del_init(&lock->l_flock_waitq); + LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc); + RETURN(rc); + + granted: + LDLM_DEBUG(lock, "client-side enqueue granted"); + lock_res_and_lock(lock); /* ldlm_lock_enqueue() has already placed lock on the granted list. */ list_del_init(&lock->l_res_link); if (flags & LDLM_FL_TEST_LOCK) { - /* fcntl(F_GETLK) request */ - /* The old mode was saved in getlk->fl_type so that if the mode - * in the lock changes we can decref the approprate refcount. */ - ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC); - switch (lock->l_granted_mode) { - case LCK_PR: - getlk->fl_type = F_RDLCK; - break; - case LCK_PW: - getlk->fl_type = F_WRLCK; - break; - default: - getlk->fl_type = F_UNLCK; - } - getlk->fl_pid = lock->l_policy_data.l_flock.pid; - getlk->fl_start = lock->l_policy_data.l_flock.start; - getlk->fl_end = lock->l_policy_data.l_flock.end; + /* client side - set flag to prevent sending a CANCEL */ + lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; } else { + int noreproc = LDLM_FL_WAIT_NOREPROC; + /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ - flags = LDLM_FL_WAIT_NOREPROC; - ldlm_process_flock_lock(lock, &flags, 1, &err); + ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); + if (flags == 0) + wake_up(&lock->l_waitq); } - l_unlock(&ns->ns_lock); - RETURN(0); -} - -int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - struct ldlm_namespace *ns; - ENTRY; - LASSERT(lock); - LASSERT(flag == LDLM_CB_CANCELING); - - ns = lock->l_resource->lr_namespace; - - /* take lock off the deadlock detection waitq. */ - l_lock(&ns->ns_lock); - list_del_init(&lock->l_flock_waitq); - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); RETURN(0); }