return "unlink";
case IT_GETXATTR:
return "getxattr";
+ case IT_LAYOUT:
+ return "layout";
default:
CERROR("Unknown intent %d\n", it);
return "UNKNOWN";
lock->l_exp_refs_nr = 0;
lock->l_exp_refs_target = NULL;
#endif
+ CFS_INIT_LIST_HEAD(&lock->l_exp_list);
RETURN(lock);
}
/* Make sure all the right bits are set in this lock we
are going to pass to client */
LASSERTF(lock->l_policy_data.l_inodebits.bits ==
- (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
+ (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_LAYOUT),
"Inappropriate inode lock bits during "
"conversion " LPU64 "\n",
lock->l_policy_data.l_inodebits.bits);
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns_is_client(ns) &&
!lock->l_readers && !lock->l_writers &&
+ !(lock->l_flags & LDLM_FL_NO_LRU) &&
!(lock->l_flags & LDLM_FL_BL_AST)) {
+
+ LDLM_DEBUG(lock, "add lock into lru list");
+
/* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */
ldlm_lock_add_to_lru(lock);
!ns_connect_lru_resize(ns))
ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
} else {
+ LDLM_DEBUG(lock, "do not add lock into lru list");
unlock_res_and_lock(lock);
}
continue;
if (!unref &&
- (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)))
+ (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+ lock->l_failed))
continue;
if ((flags & LDLM_FL_LOCAL_ONLY) &&
return NULL;
}
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
+{
+ if (!lock->l_failed) {
+ lock->l_failed = 1;
+ cfs_waitq_broadcast(&lock->l_waitq);
+ }
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
+
+void ldlm_lock_fail_match(struct ldlm_lock *lock)
+{
+ lock_res_and_lock(lock);
+ ldlm_lock_fail_match_locked(lock);
+ unlock_res_and_lock(lock);
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match);
+
void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
{
lock->l_flags |= LDLM_FL_LVB_READY;
- cfs_waitq_signal(&lock->l_waitq);
+ cfs_waitq_broadcast(&lock->l_waitq);
}
void ldlm_lock_allow_match(struct ldlm_lock *lock)
/* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
l_wait_event(lock->l_waitq,
- (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
+ lock->l_flags & LDLM_FL_LVB_READY ||
+ lock->l_failed,
+ &lwi);
+ if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
+ if (flags & LDLM_FL_TEST_LOCK)
+ LDLM_LOCK_RELEASE(lock);
+ else
+ ldlm_lock_decref_internal(lock, mode);
+ rc = 0;
+ }
}
}
out2:
return rc ? mode : 0;
}
+ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
+ __u64 *bits)
+{
+ struct ldlm_lock *lock;
+ ldlm_mode_t mode = 0;
+ ENTRY;
+
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL) {
+ lock_res_and_lock(lock);
+ if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+ lock->l_failed)
+ GOTO(out, mode);
+
+ if (lock->l_flags & LDLM_FL_CBPENDING &&
+ lock->l_readers == 0 && lock->l_writers == 0)
+ GOTO(out, mode);
+
+ if (bits)
+ *bits = lock->l_policy_data.l_inodebits.bits;
+ mode = lock->l_granted_mode;
+ ldlm_lock_addref_internal_nolock(lock, mode);
+ }
+
+ EXIT;
+
+out:
+ if (lock != NULL) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ return mode;
+}
+EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
+
/* Returns a referenced lock */
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
lock->l_req_mode = mode;
lock->l_ast_data = data;
lock->l_pid = cfs_curproc_pid();
- lock->l_ns_srv = ns_is_server(ns);
+ lock->l_ns_srv = !!ns_is_server(ns);
if (cbs) {
lock->l_blocking_ast = cbs->lcs_blocking;
lock->l_completion_ast = cbs->lcs_completion;
RETURN(rc);
}
-/* Helper function for ldlm_run_ast_work().
- *
- * Send an existing rpc set specified by @arg->set and then
- * destroy it. Create new one if @do_create flag is set. */
-static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
-{
- int rc = 0;
- ENTRY;
-
- if (arg->set) {
- ptlrpc_set_wait(arg->set);
- if (arg->type == LDLM_BL_CALLBACK)
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- ptlrpc_set_destroy(arg->set);
- arg->set = NULL;
- arg->rpcs = 0;
- }
-
- if (do_create) {
- arg->set = ptlrpc_prep_set();
- if (arg->set == NULL)
- rc = -ENOMEM;
- }
-
- RETURN(rc);
-}
-
static int
ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
{
int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
ldlm_desc_ast_t ast_type)
{
- struct ldlm_cb_set_arg arg = { 0 };
+ struct l_wait_info lwi = { 0 };
+ struct ldlm_cb_set_arg *arg;
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
unsigned int max_ast_count;
if (cfs_list_empty(rpc_list))
RETURN(0);
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- RETURN(rc);
+ OBD_ALLOC_PTR(arg);
+ if (arg == NULL)
+ RETURN(-ENOMEM);
+
+ cfs_atomic_set(&arg->restart, 0);
+ cfs_atomic_set(&arg->rpcs, 0);
+ cfs_atomic_set(&arg->refcount, 1);
+ cfs_waitq_init(&arg->waitq);
switch (ast_type) {
case LDLM_WORK_BL_AST:
- arg.type = LDLM_BL_CALLBACK;
+ arg->type = LDLM_BL_CALLBACK;
work_ast_lock = ldlm_work_bl_ast_lock;
break;
case LDLM_WORK_CP_AST:
- arg.type = LDLM_CP_CALLBACK;
+ arg->type = LDLM_CP_CALLBACK;
work_ast_lock = ldlm_work_cp_ast_lock;
break;
case LDLM_WORK_REVOKE_AST:
- arg.type = LDLM_BL_CALLBACK;
+ arg->type = LDLM_BL_CALLBACK;
work_ast_lock = ldlm_work_revoke_ast_lock;
break;
default:
}
max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+ arg->threshold = max_ast_count;
cfs_list_for_each_safe(tmp, pos, rpc_list) {
- (void)work_ast_lock(tmp, &arg);
- if (arg.rpcs > max_ast_count) {
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- break;
- }
- }
+ (void)work_ast_lock(tmp, arg);
+ if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
+ continue;
- (void)ldlm_deliver_cb_set(&arg, 0);
+ l_wait_event(arg->waitq,
+ cfs_atomic_read(&arg->rpcs) < arg->threshold,
+ &lwi);
+ }
- if (rc == 0 && cfs_atomic_read(&arg.restart))
- rc = -ERESTART;
+ arg->threshold = 1;
+ l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
+ rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+ ldlm_csa_put(arg);
RETURN(rc);
}