X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lock.c;h=4c124d44a47bbfa6c1134a862c0ecd7ccaaaf97d;hb=f0d608786a27dfb8dddf06d6b086b491749557f1;hp=08eae3553dc3971a2c952e098536eb3abe19c84f;hpb=43c01b9d94e783295edc2d1432ed3af0076a0e51;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 08eae35..4c124d4 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -135,6 +135,8 @@ char *ldlm_it2str(int it) return "unlink"; case IT_GETXATTR: return "getxattr"; + case IT_LAYOUT: + return "layout"; default: CERROR("Unknown intent %d\n", it); return "UNKNOWN"; @@ -435,6 +437,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) lock->l_exp_refs_nr = 0; lock->l_exp_refs_target = NULL; #endif + CFS_INIT_LIST_HEAD(&lock->l_exp_list); RETURN(lock); } @@ -566,7 +569,8 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) /* Make sure all the right bits are set in this lock we are going to pass to client */ LASSERTF(lock->l_policy_data.l_inodebits.bits == - (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE), + (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | + MDS_INODELOCK_LAYOUT), "Inappropriate inode lock bits during " "conversion " LPU64 "\n", lock->l_policy_data.l_inodebits.bits); @@ -770,7 +774,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers && + !(lock->l_flags & LDLM_FL_NO_LRU) && !(lock->l_flags & LDLM_FL_BL_AST)) { + + LDLM_DEBUG(lock, "add lock into lru list"); + /* If this is a client-side namespace and this was the last * reference, put it on the LRU. */ ldlm_lock_add_to_lru(lock); @@ -786,6 +794,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) !ns_connect_lru_resize(ns)) ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0); } else { + LDLM_DEBUG(lock, "do not add lock into lru list"); unlock_res_and_lock(lock); } @@ -1046,7 +1055,8 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, continue; if (!unref && - (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))) + (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_failed)) continue; if ((flags & LDLM_FL_LOCAL_ONLY) && @@ -1066,10 +1076,27 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, return NULL; } +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock) +{ + if (!lock->l_failed) { + lock->l_failed = 1; + cfs_waitq_broadcast(&lock->l_waitq); + } +} +EXPORT_SYMBOL(ldlm_lock_fail_match_locked); + +void ldlm_lock_fail_match(struct ldlm_lock *lock) +{ + lock_res_and_lock(lock); + ldlm_lock_fail_match_locked(lock); + unlock_res_and_lock(lock); +} +EXPORT_SYMBOL(ldlm_lock_fail_match); + void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) { lock->l_flags |= LDLM_FL_LVB_READY; - cfs_waitq_signal(&lock->l_waitq); + cfs_waitq_broadcast(&lock->l_waitq); } void ldlm_lock_allow_match(struct ldlm_lock *lock) @@ -1178,7 +1205,16 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, - (lock->l_flags & LDLM_FL_LVB_READY), &lwi); + lock->l_flags & LDLM_FL_LVB_READY || + lock->l_failed, + &lwi); + if (!(lock->l_flags & LDLM_FL_LVB_READY)) { + if (flags & LDLM_FL_TEST_LOCK) + LDLM_LOCK_RELEASE(lock); + else + ldlm_lock_decref_internal(lock, mode); + rc = 0; + } } } out2: @@ -1216,6 +1252,41 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, return rc ? mode : 0; } +ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, + __u64 *bits) +{ + struct ldlm_lock *lock; + ldlm_mode_t mode = 0; + ENTRY; + + lock = ldlm_handle2lock(lockh); + if (lock != NULL) { + lock_res_and_lock(lock); + if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_failed) + GOTO(out, mode); + + if (lock->l_flags & LDLM_FL_CBPENDING && + lock->l_readers == 0 && lock->l_writers == 0) + GOTO(out, mode); + + if (bits) + *bits = lock->l_policy_data.l_inodebits.bits; + mode = lock->l_granted_mode; + ldlm_lock_addref_internal_nolock(lock, mode); + } + + EXIT; + +out: + if (lock != NULL) { + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + } + return mode; +} +EXPORT_SYMBOL(ldlm_revalidate_lock_handle); + /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, @@ -1240,7 +1311,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_req_mode = mode; lock->l_ast_data = data; lock->l_pid = cfs_curproc_pid(); - lock->l_ns_srv = ns_is_server(ns); + lock->l_ns_srv = !!ns_is_server(ns); if (cbs) { lock->l_blocking_ast = cbs->lcs_blocking; lock->l_completion_ast = cbs->lcs_completion; @@ -1416,33 +1487,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, RETURN(rc); } -/* Helper function for ldlm_run_ast_work(). - * - * Send an existing rpc set specified by @arg->set and then - * destroy it. Create new one if @do_create flag is set. */ -static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create) -{ - int rc = 0; - ENTRY; - - if (arg->set) { - ptlrpc_set_wait(arg->set); - if (arg->type == LDLM_BL_CALLBACK) - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); - ptlrpc_set_destroy(arg->set); - arg->set = NULL; - arg->rpcs = 0; - } - - if (do_create) { - arg->set = ptlrpc_prep_set(); - if (arg->set == NULL) - rc = -ENOMEM; - } - - RETURN(rc); -} - static int ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) { @@ -1534,7 +1578,8 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) { - struct ldlm_cb_set_arg arg = { 0 }; + struct l_wait_info lwi = { 0 }; + struct ldlm_cb_set_arg *arg; cfs_list_t *tmp, *pos; int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); unsigned int max_ast_count; @@ -1544,21 +1589,26 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, if (cfs_list_empty(rpc_list)) RETURN(0); - rc = ldlm_deliver_cb_set(&arg, 1); - if (rc != 0) - RETURN(rc); + OBD_ALLOC_PTR(arg); + if (arg == NULL) + RETURN(-ENOMEM); + + cfs_atomic_set(&arg->restart, 0); + cfs_atomic_set(&arg->rpcs, 0); + cfs_atomic_set(&arg->refcount, 1); + cfs_waitq_init(&arg->waitq); switch (ast_type) { case LDLM_WORK_BL_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_bl_ast_lock; break; case LDLM_WORK_CP_AST: - arg.type = LDLM_CP_CALLBACK; + arg->type = LDLM_CP_CALLBACK; work_ast_lock = ldlm_work_cp_ast_lock; break; case LDLM_WORK_REVOKE_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_revoke_ast_lock; break; default: @@ -1566,21 +1616,23 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, } max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; + arg->threshold = max_ast_count; cfs_list_for_each_safe(tmp, pos, rpc_list) { - (void)work_ast_lock(tmp, &arg); - if (arg.rpcs > max_ast_count) { - rc = ldlm_deliver_cb_set(&arg, 1); - if (rc != 0) - break; - } - } + (void)work_ast_lock(tmp, arg); + if (cfs_atomic_read(&arg->rpcs) < max_ast_count) + continue; - (void)ldlm_deliver_cb_set(&arg, 0); + l_wait_event(arg->waitq, + cfs_atomic_read(&arg->rpcs) < arg->threshold, + &lwi); + } - if (rc == 0 && cfs_atomic_read(&arg.restart)) - rc = -ERESTART; + arg->threshold = 1; + l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi); + rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0; + ldlm_csa_put(arg); RETURN(rc); }