up correctly afterwards yet.
case ${host_cpu} in
um )
AC_MSG_RESULT($host_cpu)
- KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
+ KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
KCPPFLAGS='-D__KERNEL__ -U__i386__ -Ui386 -DUM_FASTCALL -D__arch_um__ -DSUBARCH="i386" -DNESTING=0 -D_LARGEFILE64_SOURCE -Derrno=kernel_errno -DPATCHLEVEL=4 -DMODULE -I$(LINUX)/arch/um/include '
MOD_LINK=elf_i386
;;
int cookie_len, int *flags,
ldlm_lock_callback completion,
ldlm_lock_callback blocking);
-struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode,
+struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
void ldlm_lock_cancel(struct ldlm_lock *lock);
+void ldlm_run_ast_work(struct list_head *rpc_list);
void ldlm_reprocess_all(struct ldlm_resource *res);
void ldlm_lock_dump(struct ldlm_lock *lock);
/* lock types */
char *ldlm_lockname[] = {
[0] "--",
- [LCK_EX] "EX",
+ [LCK_EX] "EX",
[LCK_PW] "PW",
[LCK_PR] "PR",
[LCK_CW] "CW",
[LDLM_PLAIN] "PLN",
[LDLM_EXTENT] "EXT",
[LDLM_MDSINTENT] "INT"
-};
+};
extern kmem_cache_t *ldlm_lock_slab;
int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
/*
* REFCOUNTED LOCK OBJECTS
- */
+ */
-/*
- * Lock refcounts, during creation:
+/*
+ * Lock refcounts, during creation:
* - one special one for allocation, dec'd only once in destroy
* - one for being a lock that's in-use
* - one for the addref associated with a new lock
l_unlock(&parent->l_resource->lr_namespace->ns_lock);
}
/* this is the extra refcount, to prevent the lock
- evaporating */
+ evaporating */
ldlm_lock_get(lock);
RETURN(lock);
}
RETURN(-ENOMEM);
}
- /* move references over */
+ /* move references over */
for (i = 0; i < lock->l_refc; i++) {
int rc;
ldlm_resource_getref(lock->l_resource);
RETURN(0);
}
-/*
+/*
* HANDLES
- */
+ */
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
{
memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
}
-static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new)
+static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
+ struct ldlm_lock *new)
{
struct ldlm_ast_work *w;
ENTRY;
-
+
OBD_ALLOC(w, sizeof(*w));
- if (!w) {
+ if (!w) {
LBUG();
return;
}
-
+
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_get(lock);
- if (new) {
+ if (new) {
+ if (lock->l_flags & LDLM_FL_AST_SENT)
+ GOTO(out, 0);
+
+ lock->l_flags |= LDLM_FL_AST_SENT;
w->w_blocking = 1;
ldlm_lock2desc(new, &w->w_desc);
}
- w->w_lock = lock;
- lock->l_flags |= LDLM_FL_AST_SENT;
+ w->w_lock = ldlm_lock_get(lock);
list_add(&w->w_list, lock->l_resource->lr_tmp);
+ out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
return;
}
ENTRY;
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted);
+ rc = ldlm_lock_compat_list(lock, send_cbs,
+ &lock->l_resource->lr_granted);
/* FIXME: should we be sending ASTs to converting? */
- if (rc)
+ if (rc)
rc = ldlm_lock_compat_list
(lock, send_cbs, &lock->l_resource->lr_converting);
RETURN(rc);
}
-/* NOTE: called by
- - ldlm_handle_enqueuque - resource
+/* NOTE: called by
+ - ldlm_handle_enqueuque - resource
*/
void ldlm_grant_lock(struct ldlm_lock *lock)
{
ldlm_resource_put(res);
l_unlock(&ns->ns_lock);
- if (lock)
+ if (lock) {
+ ldlm_lock2handle(lock, lockh);
wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
lock->l_granted_mode);
- if (rc)
+ }
+ if (rc)
LDLM_DEBUG(lock, "matched");
else
- LDLM_DEBUG(lock, "not matched");
+ LDLM_DEBUG_NOLOCK("not matched");
return rc;
}
*flags |= LDLM_FL_LOCK_CHANGED;
} else if (rc == ELDLM_LOCK_ABORTED) {
ldlm_lock_destroy(lock);
- ldlm_lock_put(lock);
RETURN(rc);
}
}
RETURN(0);
}
-static void ldlm_run_ast_work(struct list_head *rpc_list)
+void ldlm_run_ast_work(struct list_head *rpc_list)
{
struct list_head *tmp, *pos;
int rc;
ENTRY;
list_for_each_safe(tmp, pos, rpc_list) {
- struct ldlm_ast_work *w =
+ struct ldlm_ast_work *w =
list_entry(tmp, struct ldlm_ast_work, w_list);
struct lustre_handle lockh;
-
- ldlm_lock2handle(w->w_lock, &lockh);
- if (w->w_blocking) {
- rc = w->w_lock->l_blocking_ast(&lockh, &w->w_desc, w->w_data, w->w_datalen);
- } else {
- rc = w->w_lock->l_completion_ast(&lockh, NULL, w->w_data, w->w_datalen);
- }
- if (rc) {
- CERROR("Failed AST - should clean & disconnect client\n");
- }
+
+ ldlm_lock2handle(w->w_lock, &lockh);
+ if (w->w_blocking)
+ rc = w->w_lock->l_blocking_ast
+ (&lockh, &w->w_desc, w->w_data, w->w_datalen);
+ else
+ rc = w->w_lock->l_completion_ast
+ (&lockh, NULL, w->w_data, w->w_datalen);
+ if (rc)
+ CERROR("Failed AST - should clean & disconnect "
+ "client\n");
ldlm_lock_put(w->w_lock);
list_del(&w->w_list);
OBD_FREE(w, sizeof(*w));
}
/* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode, int *flags)
+struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
+ int *flags)
{
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
struct ldlm_resource *res;
struct ldlm_namespace *ns;
+ int granted = 0;
ENTRY;
res = lock->l_resource;
if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
ldlm_resource_add_lock(res, res->lr_converting.prev,
lock);
- else {
+ else {
+ res->lr_tmp = &rpc_list;
ldlm_grant_lock(lock);
+ res->lr_tmp = NULL;
+ granted = 1;
/* FIXME: completion handling not with ns_lock held ! */
wake_up(&lock->l_waitq);
}
- } else {
+ } else
list_add(&lock->l_res_link, res->lr_converting.prev);
- }
l_unlock(&ns->ns_lock);
+ if (granted)
+ ldlm_run_ast_work(&rpc_list);
RETURN(res);
}
extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+static int ldlm_handle_enqueue(struct obd_device *obddev,
+ struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode, NULL, 0);
if (!lock)
- GOTO(out, -ENOMEM);
+ GOTO(out, err = -ENOMEM);
memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
sizeof(lock->l_remote_handle));
return 0;
}
-static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_convert(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_reply *dlm_rep;
req->rq_status = EINVAL;
} else {
LDLM_DEBUG(lock, "server-side convert handler START");
- ldlm_convert(lock, dlm_req->lock_desc.l_req_mode,
+ ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
req->rq_status = 0;
}
RETURN(0);
}
-static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_cancel(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
}
static int ldlm_handle_callback(struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+ struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock_desc *descp;
+ struct ldlm_lock_desc *descp = NULL;
struct ldlm_lock *lock;
- __u64 is_blocking_ast;
+ __u64 is_blocking_ast = 0;
int rc;
ENTRY;
RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- descp = &dlm_req->lock_desc;
/* We must send the reply first, so that the thread is free to handle
* any requests made in common_callback() */
RETURN(rc);
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
- /* check if this is a blocking AST */
- if (!descp->l_req_mode)
- descp = NULL;
-
if (!lock) {
CERROR("callback on lock %Lx - lock disappeared\n",
dlm_req->lock_handle1.addr);
RETURN(0);
}
+ /* check if this is a blocking AST */
+ if (dlm_req->lock_desc.l_req_mode !=
+ dlm_req->lock_desc.l_granted_mode) {
+ descp = &dlm_req->lock_desc;
+ is_blocking_ast = 1;
+ }
+
LDLM_DEBUG(lock, "client %s callback handler START",
is_blocking_ast ? "blocked" : "completion");
}
ldlm_lock_put(lock);
} else {
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
/* If we receive the completion AST before the actual enqueue
ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
}
+ lock->l_resource->lr_tmp = &rpc_list;
+ ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock);
/* FIXME: we want any completion function, not just wake_up */
wake_up(&lock->l_waitq);
ldlm_lock_put(lock);
+ lock->l_resource->lr_tmp = NULL;
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+ ldlm_run_ast_work(&rpc_list);
}
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
}
static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+ struct ptlrpc_request *req)
{
struct obd_device *req_dev;
int id, rc;
}
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
- void *data, __u32 data_len)
+ void *data, __u32 data_len)
{
struct ldlm_lock *lock;
struct ldlm_request *body;
}
LDLM_DEBUG(lock, "server preparing %s AST",
- desc->l_req_mode == 0 ? "completion" : "blocked");
+ desc == 0 ? "completion" : "blocked");
req->rq_replen = lustre_msg_size(0, NULL);
GOTO(out, rc);
reply = lustre_msg_buf(req->rq_repmsg, 0);
- res = ldlm_convert(lock, new_mode, &reply->lock_flags);
+ res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
ldlm_reprocess_all(res);
if (lock->l_req_mode != lock->l_granted_mode) {
#define DEBUG_SUBSYSTEM S_LDLM
+#include <linux/types.h>
+#include <linux/random.h>
+
#include <linux/lustre_dlm.h>
struct ldlm_test_thread {
static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED;
static struct list_head ctl_threads;
static int regression_running = 0;
+static struct ptlrpc_client ctl_client;
+static struct ptlrpc_connection *ctl_conn;
static int ldlm_test_callback(struct lustre_handle *lockh,
struct ldlm_lock_desc *new,
void *data, __u32 data_len)
{
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ lock = ldlm_handle2lock(lockh);
+ if (lock == NULL) {
+ CERROR("invalid handle in callback\n");
+ RETURN(0);
+ }
+
printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new);
return 0;
}
LBUG();
ldlm_resource_dump(res);
- res = ldlm_convert(lock1, LCK_NL, &flags);
+ res = ldlm_lock_convert(lock1, LCK_NL, &flags);
if (res != NULL)
ldlm_reprocess_all(res);
/* Convert/cancel blocking locks */
flags = 0;
- res = ldlm_convert(lock1, LCK_NL, &flags);
+ res = ldlm_lock_convert(lock1, LCK_NL, &flags);
if (res != NULL)
ldlm_reprocess_all(res);
static int ldlm_test_main(void *data)
{
- return 0;
+ struct ldlm_test_thread *thread = data;
+ struct ldlm_namespace *ns;
+ ENTRY;
+
+ lock_kernel();
+ daemonize();
+ spin_lock_irq(¤t->sigmask_lock);
+ sigfillset(¤t->blocked);
+ recalc_sigpending(current);
+ spin_unlock_irq(¤t->sigmask_lock);
+
+ sprintf(current->comm, "ldlm_test");
+
+ ns = ldlm_namespace_new("ldlm_test", LDLM_NAMESPACE_CLIENT);
+ if (ns == NULL) {
+ LBUG();
+ GOTO(out, -ENOMEM);
+ }
+
+ /* Record that the thread is running */
+ thread->t_flags |= SVC_RUNNING;
+ wake_up(&thread->t_ctl_waitq);
+
+ while (1) {
+ struct lustre_handle lockh;
+ __u64 res_id[3] = {0};
+ __u32 lock_mode;
+ char random;
+ int flags = 0, rc;
+
+ /* Pick a random resource from 1 to 10 */
+ get_random_bytes(&random, sizeof(random));
+ res_id[0] = random % 10 + 1;
+
+ /* Pick a random lock mode */
+ get_random_bytes(&random, sizeof(random));
+ lock_mode = random % LCK_NL + 1;
+
+ rc = ldlm_cli_enqueue(&ctl_client, ctl_conn, NULL, ns, NULL,
+ res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ &flags, ldlm_test_callback, NULL, 0,
+ &lockh);
+ if (rc < 0) {
+ CERROR("ldlm_cli_enqueue: %d\n", rc);
+ LBUG();
+ }
+ }
+
+ out:
+ thread->t_flags |= SVC_STOPPED;
+ wake_up(&thread->t_ctl_waitq);
+
+ RETURN(0);
}
static int ldlm_start_thread(void)
RETURN(0);
}
-static int ldlm_stop_all_threads(void)
-{
- spin_lock(&ctl_lock);
- while (!list_empty(&ctl_threads)) {
- struct ldlm_test_thread *thread;
- thread = list_entry(ctl_threads.next, struct ldlm_test_thread,
- t_link);
- spin_unlock(&ctl_lock);
-
- thread->t_flags = SVC_STOPPING;
-
- wake_up(&thread->t_ctl_waitq);
- wait_event_interruptible(thread->t_ctl_waitq,
- (thread->t_flags & SVC_STOPPED));
-
- spin_lock(&ctl_lock);
- list_del(&thread->t_link);
- OBD_FREE(thread, sizeof(*thread));
- }
- spin_unlock(&ctl_lock);
-
- return 0;
-}
-
int ldlm_regression_start(struct obd_device *obddev,
struct ptlrpc_connection *conn, int count)
{
- int i, rc;
+ int i, rc = 0;
ENTRY;
spin_lock(&ctl_lock);
spin_unlock(&ctl_lock);
RETURN(-EINVAL);
}
- spin_unlock(&ctl_lock);
- /* Do stuff */
+ while (!list_empty(&ctl_threads)) {
+ struct ldlm_test_thread *thread;
+ thread = list_entry(ctl_threads.next, struct ldlm_test_thread,
+ t_link);
+
+ thread->t_flags |= SVC_STOPPING;
+ spin_unlock(&ctl_lock);
+
+ wake_up(&thread->t_ctl_waitq);
+ wait_event_interruptible(thread->t_ctl_waitq,
+ thread->t_flags & SVC_STOPPED);
+
+ spin_lock(&ctl_lock);
+ list_del(&thread->t_link);
+ OBD_FREE(thread, sizeof(*thread));
+ }
- spin_lock(&ctl_lock);
regression_running = 0;
spin_unlock(&ctl_lock);
#include <linux/lustre_mds.h>
#include <linux/lustre_dlm.h>
extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
- struct ptlrpc_request *req);
+ struct ptlrpc_request *req);
static int mds_cleanup(struct obd_device * obddev);
/* Assumes caller has already pushed into the kernel filesystem context */
/* XXX error handling */
rc = mds_fs_set_obdo(mds, inode, handle, obdo);
// rc = mds_fs_setattr(mds, de, handle, &iattr);
- if (!rc)
+ if (!rc) {
+ struct obd_run_ctxt saved;
+ push_ctxt(&saved, &mds->mds_ctxt);
rc = mds_update_last_rcvd(mds, handle, req);
- else {
+ pop_ctxt(&saved);
+ } else {
req->rq_status = rc;
RETURN(0);
}