see here.
- Treat the completion ASTs like the blocking ASTs: hold the requests for
later dispatching, after we've finished walking all of the lists.
break;
case IT_UNLINK:
bufcount = 2;
- size[1] = sizeof(struct obdo);
+ size[1] = sizeof(struct obdo);
break;
case IT_RMDIR:
bufcount = 1;
rep->lock_policy_res1 = 1;
/* execute policy */
- switch ( it->opc ) {
+ switch (it->opc) {
case IT_CREAT:
case IT_CREAT|IT_OPEN:
case IT_MKDIR:
ENTRY;
spin_lock(&lock->l_lock);
- if (lock->l_flags & LDLM_FL_AST_SENT) {
+ if (lock->l_flags & LDLM_FL_AST_SENT)
RETURN(0);
- }
lock->l_flags |= LDLM_FL_AST_SENT;
rc = 1;
- CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
+ CDEBUG(D_OTHER, "compat function failed and lock modes "
+ "incompat\n");
if (send_cbs && child->l_blocking_ast != NULL) {
- CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
+ CDEBUG(D_OTHER, "incompatible; sending blocking "
+ "AST.\n");
/* It's very difficult to actually send the AST from
* here, because we'd have to drop the lock before going
* to sleep to wait for the reply. Instead we build the
/* Args: locked lock, locked resource */
void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
{
+ struct ptlrpc_request *req = NULL;
ENTRY;
ldlm_resource_add_lock(res, &res->lr_granted, lock);
if (lock->l_granted_mode < res->lr_most_restr)
res->lr_most_restr = lock->l_granted_mode;
- if (lock->l_completion_ast)
+ if (lock->l_completion_ast) {
lock->l_completion_ast(lock, NULL, lock->l_data,
- lock->l_data_len, NULL);
+ lock->l_data_len, &req);
+ if (req != NULL) {
+ struct list_head *list = res->lr_tmp;
+ if (list == NULL) {
+ LBUG();
+ return;
+ }
+ list_add(&req->rq_multi, list);
+ }
+ }
EXIT;
}
RETURN(0);
}
+static void ldlm_send_delayed_asts(struct list_head *rpc_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(tmp, pos, rpc_list) {
+ int rc;
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_multi);
+
+ CDEBUG(D_INFO, "Sending callback.\n");
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ if (rc)
+ CERROR("Callback send failed: %d\n", rc);
+ }
+ EXIT;
+}
+
/* Must be called with resource->lr_lock not taken. */
void ldlm_reprocess_all(struct ldlm_resource *res)
{
- struct list_head rpc_list, *tmp, *pos;
-
- INIT_LIST_HEAD(&rpc_list);
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ ENTRY;
/* Local lock trees don't get reprocessed. */
- if (res->lr_namespace->ns_client)
+ if (res->lr_namespace->ns_client) {
+ EXIT;
return;
+ }
spin_lock(&res->lr_lock);
res->lr_tmp = &rpc_list;
res->lr_tmp = NULL;
spin_unlock(&res->lr_lock);
- list_for_each_safe(tmp, pos, &rpc_list) {
- int rc;
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_multi);
-
- CDEBUG(D_INFO, "Sending callback.\n");
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- if (rc)
- CERROR("Callback send failed: %d\n", rc);
- }
+ ldlm_send_delayed_asts(&rpc_list);
+ EXIT;
}
/* Must be called with lock and lock->l_resource unlocked */
spin_lock(&lock->l_resource->lr_lock);
spin_lock(&lock->l_lock);
if (!new) {
- CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
/* If we receive the completion AST before the actual enqueue
spin_unlock(&lock->l_lock);
spin_unlock(&lock->l_resource->lr_lock);
} else {
- CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
lock->l_flags |= LDLM_FL_DYING;
spin_unlock(&lock->l_lock);
spin_unlock(&lock->l_resource->lr_lock);
req->rq_replen = lustre_msg_size(0, NULL);
if (reqp == NULL) {
+ LBUG();
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
ptlrpc_free_req(req);
#include <linux/lustre_dlm.h>
+struct ldlm_test_thread {
+ struct ldlm_namespace *t_ns;
+ struct list_head t_link;
+ __u32 t_flags;
+ wait_queue_head_t t_ctl_waitq;
+};
+
+static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED;
+static struct list_head ctl_threads;
+static int regression_running = 0;
+
static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
void *data, __u32 data_len,
struct ptlrpc_request **reqp)
RETURN(err);
}
+static int ldlm_test_main(void *data)
+{
+ return 0;
+}
+
+static int ldlm_start_thread(void)
+{
+ struct ldlm_test_thread *test;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(test, sizeof(*test));
+ if (test == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ init_waitqueue_head(&test->t_ctl_waitq);
+
+ spin_lock(&ctl_lock);
+ list_add(&test->t_link, &ctl_threads);
+ spin_unlock(&ctl_lock);
+
+ rc = kernel_thread(ldlm_test_main, (void *)test,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("cannot start thread\n");
+ RETURN(-EINVAL);
+ }
+ wait_event(test->t_ctl_waitq, test->t_flags & SVC_RUNNING);
+
+ RETURN(0);
+}
+
+static int ldlm_stop_all_threads(void)
+{
+ spin_lock(&ctl_lock);
+ while (!list_empty(&ctl_threads)) {
+ struct ldlm_test_thread *thread;
+ thread = list_entry(ctl_threads.next, struct ldlm_test_thread,
+ t_link);
+ spin_unlock(&ctl_lock);
+
+ thread->t_flags = SVC_STOPPING;
+
+ wake_up(&thread->t_ctl_waitq);
+ wait_event_interruptible(thread->t_ctl_waitq,
+ (thread->t_flags & SVC_STOPPED));
+
+ spin_lock(&ctl_lock);
+ list_del(&thread->t_link);
+ OBD_FREE(thread, sizeof(*thread));
+ }
+ spin_unlock(&ctl_lock);
+
+ return 0;
+}
+
+int ldlm_regression_start(struct obd_device *obddev,
+ struct ptlrpc_connection *conn, int count)
+{
+ int i, rc;
+ ENTRY;
+
+ spin_lock(&ctl_lock);
+ if (regression_running) {
+ CERROR("You can't start the ldlm regression twice.\n");
+ spin_unlock(&ctl_lock);
+ RETURN(-EINVAL);
+ }
+ regression_running = 1;
+ spin_unlock(&ctl_lock);
+
+ for (i = 0; i < count; i++) {
+ rc = ldlm_start_thread();
+ if (rc < 0)
+ GOTO(cleanup, rc);
+ }
+
+ cleanup:
+ RETURN(rc);
+}
+
+int ldlm_regression_stop(void)
+{
+ ENTRY;
+
+ spin_lock(&ctl_lock);
+ if (!regression_running) {
+ CERROR("The ldlm regression isn't started.\n");
+ spin_unlock(&ctl_lock);
+ RETURN(-EINVAL);
+ }
+ spin_unlock(&ctl_lock);
+
+ /* Do stuff */
+
+ spin_lock(&ctl_lock);
+ regression_running = 0;
+ spin_unlock(&ctl_lock);
+
+ RETURN(0);
+}
+
int ldlm_test(struct obd_device *obddev, struct ptlrpc_connection *conn)
{
int rc;
if (rc) {
CERROR("error during create: %d\n", rc);
- if (rc != -ENOSPC) LBUG();
+ if (rc != -ENOSPC)
+ LBUG();
GOTO(out_create_commit, rc);
} else {
struct iattr iattr;