#define LDLM_FL_BLOCK_GRANTED (1 << 1)
#define LDLM_FL_BLOCK_CONV (1 << 2)
#define LDLM_FL_BLOCK_WAIT (1 << 3)
+#define LDLM_FL_DYING (1 << 4)
#define L2B(c) (1 << c)
*/
struct ldlm_namespace {
- struct list_head ns_link; /* in the list of ns's */
- __u32 ns_id; /* identifier of ns */
+ struct obd_device *ns_obddev;
+ __u32 ns_local; /* is this a local lock tree? */
struct list_head *ns_hash; /* hash table for ns */
- atomic_t ns_refcount; /* count of resources in the hash */
+ __u32 ns_refcount; /* count of resources in the hash */
struct list_head ns_root_list; /* all root resources in ns */
- struct obd_device *ns_obddev;
+ spinlock_t ns_lock; /* protects hash, refcount, list */
};
/*
struct list_head l_children;
struct list_head l_childof;
struct list_head l_res_link; /*position in one of three res lists*/
+
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
+
ldlm_lock_callback l_completion_ast;
ldlm_lock_callback l_blocking_ast;
+
struct ptlrpc_connection *l_connection;
+ struct ptlrpc_client *l_client;
+ __u32 l_flags;
+ struct ldlm_handle l_remote_handle;
void *l_data;
__u32 l_data_len;
struct ldlm_extent l_extent;
- struct ldlm_handle l_remote_handle;
//void *l_event;
//XXX cluster_host l_holder;
__u32 l_version[RES_VERSION_SIZE];
+
+ __u32 l_readers;
+ __u32 l_writers;
+
+ /* If the lock is granted, a process sleeps on this waitq to learn when
+ * it's no longer in use. If the lock is not granted, a process sleeps
+ * on this waitq to learn when it becomes granted. */
wait_queue_head_t l_waitq;
+ spinlock_t l_lock;
};
typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new);
struct ldlm_extent *new_ex,
ldlm_mode_t mode, void *data);
-#define LDLM_PLAIN 0x0
-#define LDLM_EXTENT 0x1
-#define LDLM_MDSINTENT 0x2
+#define LDLM_PLAIN 0
+#define LDLM_EXTENT 1
+#define LDLM_MDSINTENT 2
+
+#define LDLM_MAX_TYPE 2
-#define LDLM_MAX_TYPE 0x2
extern ldlm_res_compat ldlm_res_compat_table [];
extern ldlm_res_policy ldlm_res_policy_table [];
struct list_head lr_converting;
struct list_head lr_waiting;
ldlm_mode_t lr_most_restr;
- atomic_t lr_refcount;
__u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */
struct ldlm_resource *lr_root;
//XXX cluster_host lr_master;
__u64 lr_name[RES_NAME_SIZE];
__u32 lr_version[RES_VERSION_SIZE];
- spinlock_t lr_lock;
+ __u32 lr_refcount;
+ spinlock_t lr_lock; /* protects lists, refcount */
};
static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
handle->addr = (__u64)(unsigned long)object;
}
-extern struct list_head ldlm_namespaces;
-extern spinlock_t ldlm_spinlock;
-
-static inline void ldlm_lock(void)
-{
- spin_lock(&ldlm_spinlock);
-}
-
-static inline void ldlm_unlock(void)
-{
- spin_unlock(&ldlm_spinlock);
-}
-
extern struct obd_ops ldlm_obd_ops;
/* ldlm_extent.c */
/* ldlm_lock.c */
void ldlm_lock_free(struct ldlm_lock *lock);
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
-ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
+int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
+ struct ldlm_extent *extent, ldlm_mode_t mode,
+ struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock_handle,
- __u64 *res_id,
- __u32 type,
+ __u64 *res_id, __u32 type,
+ ldlm_mode_t mode,
+ void *data,
+ __u32 data_len,
struct ldlm_handle *lockh);
ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
- ldlm_mode_t mode,
struct ldlm_extent *req_ex,
int *flags,
ldlm_lock_callback completion,
- ldlm_lock_callback blocking,
- void *data,
- __u32 data_len);
-ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
- int new_mode, int *flags);
-ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh);
+ ldlm_lock_callback blocking);
+struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
+ int new_mode, int *flags);
+struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock);
+void ldlm_reprocess_all(struct ldlm_resource *res);
void ldlm_lock_dump(struct ldlm_lock *lock);
/* ldlm_test.c */
int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn);
/* resource.c */
-struct ldlm_namespace *ldlm_namespace_find(__u32 id);
-ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id,
- struct ldlm_namespace **);
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 local);
int ldlm_namespace_free(struct ldlm_namespace *ns);
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
void ldlm_resource_dump(struct ldlm_resource *res);
/* ldlm_request.c */
-int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *,
- struct ptlrpc_connection *, __u32 ns_id);
int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer,
- __u32 ns_id,
+ struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
__u32 type,
struct ldlm_extent *req_ex,
ldlm_mode_t mode,
int *flags,
- ldlm_lock_callback completion,
- ldlm_lock_callback blocking,
void *data,
__u32 data_len,
- struct ldlm_handle *lockh,
- struct ptlrpc_request **request);
+ struct ldlm_handle *lockh);
int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
void *data, __u32 data_len);
int ldlm_cli_convert(struct ptlrpc_client *, struct ldlm_handle *,
- int new_mode, int *flags, struct ptlrpc_request **);
-int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_handle *,
- struct ptlrpc_request **);
+ int new_mode, int *flags);
+int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *);
#endif /* __KERNEL__ */
*/
/* opcodes */
-#define LDLM_NAMESPACE_NEW 1
-#define LDLM_ENQUEUE 2
-#define LDLM_CONVERT 3
-#define LDLM_CANCEL 4
-#define LDLM_CALLBACK 5
+#define LDLM_ENQUEUE 1
+#define LDLM_CONVERT 2
+#define LDLM_CANCEL 3
+#define LDLM_CALLBACK 4
#define RES_NAME_SIZE 3
#define RES_VERSION_SIZE 4
};
struct ldlm_resource_desc {
- __u32 lr_ns_id;
__u32 lr_type;
__u64 lr_name[RES_NAME_SIZE];
__u64 lr_version[RES_VERSION_SIZE];
int lustre_fread(struct file *file, char *str, int len, loff_t *off);
int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off);
int lustre_fsync(struct file *file);
+
+static inline void ll_sleep(int t)
+{
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(t * HZ);
+ set_current_state(TASK_RUNNING);
+}
+
#endif
#include <linux/portals_lib.h>
RETURN(rc);
}
+static inline int obd_enqueue(struct obd_conn *conn, struct ldlm_namespace *ns,
+ struct ldlm_handle *parent_lock, __u64 *res_id,
+ __u32 type, struct ldlm_extent *extent,
+ __u32 mode, int *flags, void *data, int datalen,
+ struct ldlm_handle *lockh)
+{
+ int rc;
+ OBD_CHECK_SETUP(conn);
+ OBD_CHECK_OP(conn, enqueue);
+
+ rc = OBP(conn->oc_dev, enqueue)(conn, ns, parent_lock, res_id, type,
+ extent, mode, flags, data, datalen,
+ lockh);
+ RETURN(rc);
+}
+
+static inline int obd_cancel(struct obd_conn *conn, __u32 mode,
+ struct ldlm_handle *lockh)
+{
+ int rc;
+ OBD_CHECK_SETUP(conn);
+ OBD_CHECK_OP(conn, cancel);
+
+ rc = OBP(conn->oc_dev, cancel)(conn, mode, lockh);
+ RETURN(rc);
+}
+
#endif
/*
ENTRY;
oa = obdo_alloc();
if ( !oa ) {
- EXIT;
- return ERR_PTR(-ENOMEM);
+ RETURN(ERR_PTR(-ENOMEM));
}
oa->o_id = id;
oa->o_valid = valid;
if ((err = OBP(conn->oc_dev, getattr)(conn, oa))) {
obdo_free(oa);
- EXIT;
- return ERR_PTR(err);
+ RETURN(ERR_PTR(err));
}
- EXIT;
- return oa;
+ RETURN(oa);
}
static inline void obdo_from_iattr(struct obdo *oa, struct iattr *attr)
{
if (MAX(a->l_extent.start, b->l_extent.start) <=
MIN(a->l_extent.end, b->l_extent.end))
- return 0;
+ RETURN(0);
- return 1;
+ RETURN(1);
}
static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex,
return 0;
}
-/* Caller should do ldlm_resource_get() on this resource first. */
+/* Args: referenced, unlocked parent (or NULL)
+ * referenced, unlocked resource
+ * Locks: parent->l_lock */
static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
struct ldlm_resource *resource)
{
memset(lock, 0, sizeof(*lock));
lock->l_resource = resource;
INIT_LIST_HEAD(&lock->l_children);
+ INIT_LIST_HEAD(&lock->l_res_link);
init_waitqueue_head(&lock->l_waitq);
+ lock->l_lock = SPIN_LOCK_UNLOCKED;
if (parent != NULL) {
+ spin_lock(&parent->l_lock);
lock->l_parent = parent;
list_add(&lock->l_childof, &parent->l_children);
+ spin_unlock(&parent->l_lock);
}
return lock;
}
-/* Caller must do its own ldlm_resource_put() on lock->l_resource */
+/* Args: unreferenced, locked lock
+ *
+ * Caller must do its own ldlm_resource_put() on lock->l_resource */
void ldlm_lock_free(struct ldlm_lock *lock)
{
if (!list_empty(&lock->l_children)) {
- CERROR("lock still has children!\n");
+ CERROR("lock %p still has children (%p)!\n", lock,
+ lock->l_children.next);
+ ldlm_lock_dump(lock);
LBUG();
}
+
+ if (lock->l_readers || lock->l_writers)
+ CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+ "writers)\n", lock->l_readers, lock->l_writers);
+
+ if (lock->l_connection)
+ ptlrpc_put_connection(lock->l_connection);
kmem_cache_free(ldlm_lock_slab, lock);
}
memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
}
-static int ldlm_lock_compat(struct ldlm_lock *lock)
+/* Args: unlocked lock */
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
{
- struct list_head *tmp;
+ spin_lock(&lock->l_lock);
+ if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ lock->l_readers++;
+ else
+ lock->l_writers++;
+ spin_unlock(&lock->l_lock);
+}
+
+/* Args: unlocked lock */
+void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
+{
+ int rc;
+
+ spin_lock(&lock->l_lock);
+ if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ lock->l_readers--;
+ else
+ lock->l_writers--;
+ if (!lock->l_readers && !lock->l_writers &&
+ lock->l_flags & LDLM_FL_DYING) {
+ /* Read this lock its rights. */
+ if (!lock->l_resource->lr_namespace->ns_local) {
+ CERROR("LDLM_FL_DYING set on non-local lock!\n");
+ LBUG();
+ }
+
+ CDEBUG(D_INFO, "final decref done on dying lock, "
+ "cancelling.\n");
+ spin_unlock(&lock->l_lock);
+ rc = ldlm_cli_cancel(lock->l_client, lock);
+ if (rc) {
+ /* FIXME: do something more dramatic */
+ CERROR("ldlm_cli_cancel: %d\n", rc);
+ }
+ } else
+ spin_unlock(&lock->l_lock);
+}
+
+/* Args: locked lock */
+static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
+ struct list_head *queue)
+{
+ struct list_head *tmp, *pos;
int rc = 0;
- list_for_each(tmp, &lock->l_resource->lr_granted) {
+ list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *child;
ldlm_res_compat compat;
child = list_entry(tmp, struct ldlm_lock, l_res_link);
+ if (lock == child)
+ continue;
compat = ldlm_res_compat_table[child->l_resource->lr_type];
- if (compat(child, lock) ||
- lockmode_compat(child->l_req_mode, lock->l_req_mode))
+ if (compat(child, lock)) {
+ CDEBUG(D_OTHER, "compat function succeded, next.\n");
+ continue;
+ }
+ if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
+ CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
continue;
+ }
rc = 1;
- if (child->l_blocking_ast != NULL)
+ CDEBUG(D_OTHER, "compat function failed and lock modes are "
+ "incompatible; sending blocking AST.\n");
+ if (send_cbs && child->l_blocking_ast != NULL)
child->l_blocking_ast(child, lock, child->l_data,
child->l_data_len);
}
return rc;
}
-static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+/* Args: unlocked lock */
+static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
{
+ int rc;
+ ENTRY;
+
+ rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
+ rc |= _ldlm_lock_compat(lock, send_cbs,
+ &lock->l_resource->lr_converting);
+
+ RETURN(rc);
+}
+
+/* Args: locked lock, locked resource */
+void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+{
+ ENTRY;
+
ldlm_resource_add_lock(res, &res->lr_granted, lock);
lock->l_granted_mode = lock->l_req_mode;
if (lock->l_completion_ast)
lock->l_completion_ast(lock, NULL,
lock->l_data, lock->l_data_len);
+ EXIT;
}
-ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+static int search_queue(struct list_head *queue, ldlm_mode_t mode,
+ struct ldlm_extent *extent, struct ldlm_handle *lockh)
+{
+ struct list_head *tmp;
+
+ list_for_each(tmp, queue) {
+ struct ldlm_lock *lock;
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (lock->l_flags & LDLM_FL_DYING)
+ continue;
+
+ /* lock_convert() takes the resource lock, so we're sure that
+ * req_mode, lr_type, and l_extent won't change beneath us */
+ if (lock->l_req_mode != mode)
+ continue;
+
+ if (lock->l_resource->lr_type == LDLM_EXTENT &&
+ (lock->l_extent.start > extent->start ||
+ lock->l_extent.end < extent->end))
+ continue;
+
+ ldlm_lock_addref(lock, mode);
+ ldlm_object2handle(lock, lockh);
+ return 1;
+ }
+
+ return 0;
+}
+
+/* Must be called with no resource or lock locks held.
+ *
+ * Returns 1 if it finds an already-existing lock that is compatible; in this
+ * case, lockh is filled in with a addref()ed lock */
+int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
+ struct ldlm_extent *extent, ldlm_mode_t mode,
+ struct ldlm_handle *lockh)
+{
+ struct ldlm_resource *res;
+ int rc = 0;
+ ENTRY;
+
+ res = ldlm_resource_get(ns, NULL, res_id, type, 0);
+ if (res == NULL)
+ RETURN(0);
+
+ spin_lock(&res->lr_lock);
+ if (search_queue(&res->lr_granted, mode, extent, lockh))
+ GOTO(out, rc = 1);
+ if (search_queue(&res->lr_converting, mode, extent, lockh))
+ GOTO(out, rc = 1);
+ if (search_queue(&res->lr_waiting, mode, extent, lockh))
+ GOTO(out, rc = 1);
+
+ EXIT;
+ out:
+ ldlm_resource_put(res);
+ spin_unlock(&res->lr_lock);
+ return rc;
+}
+
+/* Must be called without the resource lock held. */
+ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id, __u32 type,
+ ldlm_mode_t mode,
+ void *data,
+ __u32 data_len,
struct ldlm_handle *lockh)
{
- struct ldlm_namespace *ns;
struct ldlm_resource *res, *parent_res = NULL;
struct ldlm_lock *lock, *parent_lock;
- ns = ldlm_namespace_find(ns_id);
- if (ns == NULL || ns->ns_hash == NULL)
- RETURN(-ELDLM_BAD_NAMESPACE);
-
parent_lock = ldlm_handle2object(parent_lock_handle);
if (parent_lock)
parent_res = parent_lock->l_resource;
RETURN(-ENOMEM);
lock = ldlm_lock_new(parent_lock, res);
- if (lock == NULL)
+ if (lock == NULL) {
+ spin_lock(&res->lr_lock);
+ ldlm_resource_put(res);
+ spin_unlock(&res->lr_lock);
RETURN(-ENOMEM);
+ }
- ldlm_object2handle(lock, lockh);
+ lock->l_req_mode = mode;
+ lock->l_data = data;
+ lock->l_data_len = data_len;
+ ldlm_lock_addref(lock, mode);
+ ldlm_object2handle(lock, lockh);
return ELDLM_OK;
}
-/* XXX: Revisit the error handling; we do not, for example, do
- * ldlm_resource_put()s in our error cases, and we probably leak any allocated
- * memory. */
+/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
- ldlm_mode_t mode,
struct ldlm_extent *req_ex,
int *flags,
ldlm_lock_callback completion,
- ldlm_lock_callback blocking,
- void *data,
- __u32 data_len)
+ ldlm_lock_callback blocking)
{
+ struct ldlm_resource *res;
struct ldlm_lock *lock;
- struct ldlm_extent new_ex;
- int incompat = 0, rc;
+ int incompat = 0, local;
ldlm_res_policy policy;
ENTRY;
lock = ldlm_handle2object(lockh);
- if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) {
- rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL);
+ res = lock->l_resource;
+ local = res->lr_namespace->ns_local;
+ spin_lock(&res->lr_lock);
+
+ lock->l_blocking_ast = blocking;
+
+ if ((res->lr_type == LDLM_EXTENT && !req_ex) ||
+ (res->lr_type != LDLM_EXTENT && req_ex))
+ LBUG();
+
+ if ((policy = ldlm_res_policy_table[res->lr_type])) {
+ struct ldlm_extent new_ex;
+ int rc = policy(res, req_ex, &new_ex, lock->l_req_mode, NULL);
if (rc == ELDLM_LOCK_CHANGED) {
*flags |= LDLM_FL_LOCK_CHANGED;
memcpy(req_ex, &new_ex, sizeof(new_ex));
}
}
- if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) ||
- (lock->l_resource->lr_type != LDLM_EXTENT && req_ex))
- LBUG();
if (req_ex)
memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
- lock->l_req_mode = mode;
- lock->l_data = data;
- lock->l_data_len = data_len;
- lock->l_blocking_ast = blocking;
- spin_lock(&lock->l_resource->lr_lock);
- /* FIXME: We may want to optimize by checking lr_most_restr */
+ if (local && lock->l_req_mode == lock->l_granted_mode) {
+ /* The server returned a blocked lock, but it was granted before
+ * we got a chance to actually enqueue it. We don't need to do
+ * anything else. */
+ GOTO(out, ELDLM_OK);
+ }
- if (!list_empty(&lock->l_resource->lr_converting)) {
- ldlm_resource_add_lock(lock->l_resource,
- lock->l_resource->lr_waiting.prev, lock);
+ /* If this is a local resource, put it on the appropriate list. */
+ if (local) {
+ if (*flags & LDLM_FL_BLOCK_CONV)
+ ldlm_resource_add_lock(res, res->lr_converting.prev,
+ lock);
+ else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ else
+ ldlm_grant_lock(res, lock);
+ GOTO(out, ELDLM_OK);
+ }
+
+ /* FIXME: We may want to optimize by checking lr_most_restr */
+ if (!list_empty(&res->lr_converting)) {
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_CONV;
GOTO(out, ELDLM_OK);
}
- if (!list_empty(&lock->l_resource->lr_waiting)) {
- ldlm_resource_add_lock(lock->l_resource,
- lock->l_resource->lr_waiting.prev, lock);
+ if (!list_empty(&res->lr_waiting)) {
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_WAIT;
GOTO(out, ELDLM_OK);
}
-
- incompat = ldlm_lock_compat(lock);
+ incompat = ldlm_lock_compat(lock, 0);
if (incompat) {
- ldlm_resource_add_lock(lock->l_resource,
- lock->l_resource->lr_waiting.prev, lock);
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_GRANTED;
GOTO(out, ELDLM_OK);
}
- ldlm_grant_lock(lock->l_resource, lock);
+ ldlm_grant_lock(res, lock);
EXIT;
out:
/* Don't set 'completion_ast' until here so that if the lock is granted
* immediately we don't do an unnecessary completion call. */
lock->l_completion_ast = completion;
- spin_unlock(&lock->l_resource->lr_lock);
+ spin_unlock(&res->lr_lock);
return ELDLM_OK;
}
+/* Must be called with resource->lr_lock taken. */
static int ldlm_reprocess_queue(struct ldlm_resource *res,
struct list_head *converting)
{
struct list_head *tmp, *pos;
- int incompat = 0;
+ ENTRY;
list_for_each_safe(tmp, pos, converting) {
struct ldlm_lock *pending;
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
- incompat = ldlm_lock_compat(pending);
- if (incompat)
- break;
+ /* the resource lock protects ldlm_lock_compat */
+ if (ldlm_lock_compat(pending, 1))
+ RETURN(1);
- list_del(&pending->l_res_link);
+ list_del_init(&pending->l_res_link);
ldlm_grant_lock(res, pending);
+
+ ldlm_lock_addref(pending, pending->l_req_mode);
+ ldlm_lock_decref(pending, pending->l_granted_mode);
}
- return incompat;
+ RETURN(0);
}
-static void ldlm_reprocess_all(struct ldlm_resource *res)
+/* Must be called with resource->lr_lock not taken. */
+void ldlm_reprocess_all(struct ldlm_resource *res)
{
+ /* Local lock trees don't get reprocessed. */
+ if (res->lr_namespace->ns_local)
+ return;
+
+ spin_lock(&res->lr_lock);
ldlm_reprocess_queue(res, &res->lr_converting);
if (list_empty(&res->lr_converting))
ldlm_reprocess_queue(res, &res->lr_waiting);
+ spin_unlock(&res->lr_lock);
}
-ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh)
+/* Must be called with lock and lock->l_resource unlocked */
+struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
{
- struct ldlm_lock *lock;
struct ldlm_resource *res;
ENTRY;
- lock = ldlm_handle2object(lockh);
res = lock->l_resource;
- ldlm_resource_del_lock(lock);
+ spin_lock(&res->lr_lock);
+ spin_lock(&lock->l_lock);
- ldlm_lock_free(lock);
+ if (lock->l_readers || lock->l_writers)
+ CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+ "writers)\n", lock->l_readers, lock->l_writers);
+
+ ldlm_resource_del_lock(lock);
if (ldlm_resource_put(res))
- RETURN(ELDLM_OK);
- ldlm_reprocess_all(res);
+ res = NULL; /* res was freed, nothing else to do. */
+ else
+ spin_unlock(&res->lr_lock);
+ ldlm_lock_free(lock);
- RETURN(ELDLM_OK);
+ RETURN(res);
}
-ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
- int new_mode, int *flags)
+/* Must be called with lock and lock->l_resource unlocked */
+struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
+ int new_mode, int *flags)
{
struct ldlm_lock *lock;
struct ldlm_resource *res;
lock = ldlm_handle2object(lockh);
res = lock->l_resource;
- list_del(&lock->l_res_link);
- lock->l_req_mode = new_mode;
- list_add(&lock->l_res_link, res->lr_converting.prev);
+ spin_lock(&res->lr_lock);
+
+ lock->l_req_mode = new_mode;
+ list_del_init(&lock->l_res_link);
+
+ /* If this is a local resource, put it on the appropriate list. */
+ if (res->lr_namespace->ns_local) {
+ if (*flags & LDLM_FL_BLOCK_CONV)
+ ldlm_resource_add_lock(res, res->lr_converting.prev,
+ lock);
+ else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ else
+ ldlm_grant_lock(res, lock);
+ } else {
+ list_add(&lock->l_res_link, res->lr_converting.prev);
+ }
- ldlm_reprocess_all(res);
+ spin_unlock(&res->lr_lock);
- RETURN(ELDLM_OK);
+ RETURN(res);
}
void ldlm_lock_dump(struct ldlm_lock *lock)
{
char ver[128];
+ if (!(portal_debug & D_OTHER))
+ return;
+
if (RES_VERSION_SIZE != 4)
LBUG();
CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource);
CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
(int)lock->l_req_mode, (int)lock->l_granted_mode);
+ CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
+ lock->l_readers, lock->l_writers);
if (lock->l_resource->lr_type == LDLM_EXTENT)
CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n",
(unsigned long long)lock->l_extent.start,
*/
#define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_LDLM
-#include <linux/version.h>
#include <linux/module.h>
#include <linux/slab.h>
-#include <asm/unistd.h>
-
-#define DEBUG_SUBSYSTEM S_LDLM
-
#include <linux/lustre_dlm.h>
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
-static int _ldlm_namespace_new(struct obd_device *obddev,
- struct ptlrpc_request *req)
+#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
+
+static int is_local_conn(struct ptlrpc_connection *conn)
{
- struct ldlm_request *dlm_req;
- struct ldlm_namespace *ns;
- int rc;
- ldlm_error_t err;
ENTRY;
+ if (conn == NULL)
+ RETURN(1);
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc) {
- CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ RETURN(LOOPBACK(conn->c_peer.peer_nid));
+}
- err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
- &ns);
- req->rq_status = err;
+/* _ldlm_callback and local_callback setup the variables then call this common
+ * code */
+static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+ ldlm_mode_t mode, void *data, __u32 data_len)
+{
+ ENTRY;
+ ldlm_lock_dump(lock);
- CERROR("err = %d\n", err);
+ if (!lock)
+ LBUG();
+ if (!lock->l_resource)
+ LBUG();
+ spin_lock(&lock->l_resource->lr_lock);
+ spin_lock(&lock->l_lock);
+ if (!new) {
+ CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
+ lock->l_req_mode = mode;
+ list_del_init(&lock->l_res_link);
+ ldlm_grant_lock(lock->l_resource, lock);
+ wake_up(&lock->l_waitq);
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ } else {
+ CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
+ lock->l_flags |= LDLM_FL_DYING;
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ if (!lock->l_readers && !lock->l_writers) {
+ CDEBUG(D_INFO, "Lock already unused, canceling.\n");
+ if (ldlm_cli_cancel(lock->l_client, lock))
+ LBUG();
+ } else {
+ CDEBUG(D_INFO, "Lock still has references; lock will be"
+ " cancelled later.\n");
+ }
+ }
RETURN(0);
}
-static int _ldlm_enqueue(struct ptlrpc_request *req)
+static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
+ void *data, __u32 data_len)
+{
+ struct ldlm_lock *lock;
+ /* the 'remote handle' is the lock in the FS's namespace */
+ lock = ldlm_handle2object(&l->l_remote_handle);
+
+ return common_callback(lock, new, l->l_granted_mode, data, data_len);
+}
+
+static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
int rc, size = sizeof(*dlm_rep);
ldlm_error_t err;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock = NULL;
+ ldlm_lock_callback callback;
ENTRY;
+ /* Is this lock managed locally? */
+ if (is_local_conn(req->rq_connection))
+ callback = local_callback;
+ else
+ callback = ldlm_cli_callback;
+
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
sizeof(dlm_rep->lock_extent));
dlm_rep->flags = dlm_req->flags;
- err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
+ err = ldlm_local_lock_create(obddev->obd_namespace,
&dlm_req->lock_handle2,
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
+ dlm_req->lock_desc.l_req_mode,
+ lustre_msg_buf(req->rq_reqmsg, 1),
+ req->rq_reqmsg->buflens[1],
&dlm_rep->lock_handle);
if (err != ELDLM_OK)
GOTO(out, err);
err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
- dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_extent,
&dlm_rep->flags,
- ldlm_cli_callback,
- ldlm_cli_callback,
- lustre_msg_buf(req->rq_reqmsg, 1),
- req->rq_reqmsg->buflens[1]);
+ callback, callback);
if (err != ELDLM_OK)
GOTO(out, err);
EXIT;
out:
req->rq_status = err;
- CERROR("err = %d\n", err);
+ CDEBUG(D_INFO, "err = %d\n", err);
+
+ if (ptlrpc_reply(svc, req))
+ LBUG();
+
+ if (!err) {
+ ldlm_reprocess_all(lock->l_resource);
+ }
return 0;
}
-static int _ldlm_convert(struct ptlrpc_request *req)
+static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
- struct ldlm_request *dlm_req;
- int rc;
+ struct ldlm_request *dlm_req, *dlm_rep;
+ struct ldlm_resource *res;
+ int rc, size = sizeof(*dlm_rep);
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+ dlm_rep->flags = dlm_req->flags;
+
+ res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
+ dlm_req->lock_desc.l_req_mode,
+ &dlm_rep->flags);
+ req->rq_status = 0;
+ if (ptlrpc_reply(svc, req) != 0)
+ LBUG();
+
+ ldlm_reprocess_all(res);
- req->rq_status =
- ldlm_local_lock_convert(&dlm_req->lock_handle1,
- dlm_req->lock_desc.l_req_mode,
- &dlm_req->flags);
RETURN(0);
}
-static int _ldlm_cancel(struct ptlrpc_request *req)
+static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
int rc;
ENTRY;
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
+ lock = ldlm_handle2object(&dlm_req->lock_handle1);
+ res = ldlm_local_lock_cancel(lock);
+ req->rq_status = 0;
+ if (ptlrpc_reply(svc, req) != 0)
+ LBUG();
+
+ if (res != NULL)
+ ldlm_reprocess_all(res);
+
RETURN(0);
}
-static int _ldlm_callback(struct ptlrpc_request *req)
+static int _ldlm_callback(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock1, *lock2;
int rc;
ENTRY;
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- lock = ldlm_handle2object(&dlm_req->lock_handle1);
- ldlm_lock_dump(lock);
- if (dlm_req->lock_handle2.addr) {
- CERROR("Got blocked callback for lock %p.\n", lock);
- /* FIXME: do something impressive. */
- } else {
- CERROR("Got granted callback for lock %p.\n", lock);
- lock->l_granted_mode = lock->l_req_mode;
- wake_up(&lock->l_waitq);
- }
+ /* We must send the reply first, so that the thread is free to handle
+ * any requests made in common_callback() */
+ rc = ptlrpc_reply(svc, req);
+ if (rc != 0)
+ RETURN(rc);
- req->rq_status = 0;
+ lock1 = ldlm_handle2object(&dlm_req->lock_handle1);
+ lock2 = ldlm_handle2object(&dlm_req->lock_handle2);
+ common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
+ 0);
RETURN(0);
}
}
switch (req->rq_reqmsg->opc) {
- case LDLM_NAMESPACE_NEW:
- CDEBUG(D_INODE, "namespace_new\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
- rc = _ldlm_namespace_new(dev, req);
- break;
-
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = _ldlm_enqueue(req);
+ rc = _ldlm_enqueue(dev, svc, req);
break;
case LDLM_CONVERT:
CDEBUG(D_INODE, "convert\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = _ldlm_convert(req);
+ rc = _ldlm_convert(svc, req);
break;
case LDLM_CANCEL:
CDEBUG(D_INODE, "cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = _ldlm_cancel(req);
+ rc = _ldlm_cancel(svc, req);
break;
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
- rc = _ldlm_callback(req);
+ rc = _ldlm_callback(svc, req);
break;
default:
RETURN(rc);
}
+ EXIT;
out:
if (rc)
RETURN(ptlrpc_error(svc, req));
- else
- RETURN(ptlrpc_reply(svc, req));
+ return 0;
}
static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
return err;
}
-static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
+static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
int err;
ENTRY;
- ldlm_spinlock = SPIN_LOCK_UNLOCKED;
+ obddev->obd_namespace = ldlm_namespace_new(obddev, 0);
+ if (obddev->obd_namespace == NULL)
+ LBUG();
ldlm->ldlm_service =
ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
CERROR("cannot start thread\n");
LBUG();
}
+ err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
+ if (err) {
+ CERROR("cannot start thread\n");
+ LBUG();
+ }
OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
if (ldlm->ldlm_client == NULL)
RETURN(0);
}
-static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
-{
- struct list_head *tmp, *pos;
- int rc = 0;
-
- list_for_each_safe(tmp, pos, q) {
- struct ldlm_lock *lock;
-
- if (rc) {
- /* Res was already cleaned up. */
- LBUG();
- }
-
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- ldlm_resource_del_lock(lock);
- ldlm_lock_free(lock);
- rc = ldlm_resource_put(res);
- }
-
- return rc;
-}
-
-static int do_free_namespace(struct ldlm_namespace *ns)
-{
- struct list_head *tmp, *pos;
- int i, rc;
-
- for (i = 0; i < RES_HASH_SIZE; i++) {
- list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
- struct ldlm_resource *res;
- res = list_entry(tmp, struct ldlm_resource, lr_hash);
- list_del_init(&res->lr_hash);
-
- rc = cleanup_resource(res, &res->lr_granted);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_converting);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_waiting);
-
- while (rc == 0)
- rc = ldlm_resource_put(res);
- }
- }
-
- return ldlm_namespace_free(ns);
-}
-
-static int ldlm_free_all(struct obd_device *obddev)
-{
- struct list_head *tmp, *pos;
- int rc = 0;
-
- ldlm_lock();
-
- list_for_each_safe(tmp, pos, &ldlm_namespaces) {
- struct ldlm_namespace *ns;
- ns = list_entry(tmp, struct ldlm_namespace, ns_link);
-
- rc |= do_free_namespace(ns);
- }
-
- ldlm_unlock();
-
- return rc;
-}
-
static int ldlm_cleanup(struct obd_device *obddev)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
ENTRY;
+ ldlm_namespace_free(obddev->obd_namespace);
+
ptlrpc_stop_all_threads(ldlm->ldlm_service);
rpc_unregister_service(ldlm->ldlm_service);
OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
- if (ldlm_free_all(obddev)) {
- CERROR("ldlm_free_all could not complete.\n");
- RETURN(-1);
- }
-
MOD_DEC_USE_COUNT;
RETURN(0);
}
#include <linux/lustre_dlm.h>
-#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
-
-static int is_local_conn(struct ptlrpc_connection *conn)
-{
- ENTRY;
- if (conn == NULL)
- RETURN(1);
-
- RETURN(LOOPBACK(conn->c_peer.peer_nid));
-}
-
int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
- __u32 ns_id,
+ struct ldlm_namespace *ns,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
__u32 type,
struct ldlm_extent *req_ex,
ldlm_mode_t mode,
int *flags,
- ldlm_lock_callback completion,
- ldlm_lock_callback blocking,
void *data,
__u32 data_len,
- struct ldlm_handle *lockh,
- struct ptlrpc_request **request)
+ struct ldlm_handle *lockh)
{
- struct ldlm_handle local_lockh;
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- struct ptlrpc_request *req = NULL;
+ struct ptlrpc_request *req;
char *bufs[2] = {NULL, data};
int rc, size[2] = {sizeof(*body), data_len};
- ldlm_error_t err;
ENTRY;
- err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
- type, &local_lockh);
- if (err != ELDLM_OK)
- RETURN(err);
+ *flags = 0;
+ rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
+ NULL, 0, lockh);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
- lock = ldlm_handle2object(&local_lockh);
- /* Is this lock locally managed? */
- if (is_local_conn(conn))
- GOTO(local, 0);
+ lock = ldlm_handle2object(lockh);
+ spin_unlock(&lock->l_lock);
req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
/* Dump all of this data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_ns_id = ns_id;
body->lock_desc.l_resource.lr_type = type;
memcpy(body->lock_desc.l_resource.lr_name, res_id,
sizeof(body->lock_desc.l_resource.lr_name));
sizeof(body->lock_desc.l_extent));
body->flags = *flags;
- memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
+ memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
if (parent_lock_handle)
memcpy(&body->lock_handle2, parent_lock_handle,
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK) {
+ spin_lock(&lock->l_resource->lr_lock);
ldlm_resource_put(lock->l_resource);
+ spin_unlock(&lock->l_resource->lr_lock);
ldlm_lock_free(lock);
GOTO(out, rc);
}
lock->l_connection = conn;
+ lock->l_client = cl;
reply = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
+ memcpy(req_ex, &reply->lock_extent, sizeof(*req_ex));
*flags = reply->flags;
- CERROR("remote handle: %p, flags: %d\n",
+ CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
(void *)(unsigned long)reply->lock_handle.addr, *flags);
- CERROR("extent: %Lu -> %Lu\n",
+ CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
(unsigned long long)reply->lock_extent.start,
(unsigned long long)reply->lock_extent.end);
- EXIT;
- local:
- rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
- completion, blocking, data, data_len);
+ ptlrpc_free_req(req);
+
+ rc = ldlm_local_lock_enqueue(lockh, req_ex, flags, NULL, NULL);
+
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV)) {
+ LDLM_FL_BLOCK_CONV)) {
/* Go to sleep until the lock is granted. */
/* FIXME: or cancelled. */
+ CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
+ "going to sleep.\n", lock);
+ ldlm_lock_dump(lock);
wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
lock->l_granted_mode);
+ CDEBUG(D_NET, "waking up, the lock must be granted.\n");
}
- out:
- *request = req;
- return rc;
-}
-
-int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn, __u32 ns_id)
-{
- struct ldlm_namespace *ns;
- struct ldlm_request *body;
- struct ptlrpc_request *req;
- int rc, size = sizeof(*body);
- ENTRY;
-
- if (is_local_conn(conn))
- GOTO(local, 0);
-
- req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_ns_id = ns_id;
-
- req->rq_replen = lustre_msg_size(0, NULL);
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- if (rc)
- GOTO(out, rc);
-
EXIT;
- local:
- rc = ldlm_namespace_new(obddev, ns_id, &ns);
- if (rc != ELDLM_OK) {
- /* XXX: It succeeded remotely but failed locally. What to do? */
- CERROR("Local ldlm_namespace_new failed.\n");
- }
out:
return rc;
}
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
- if (new != NULL) {
+ if (new == NULL) {
+ CDEBUG(D_NET, "Sending granted AST\n");
+ ldlm_lock2desc(lock, &body->lock_desc);
+ } else {
+ CDEBUG(D_NET, "Sending blocked AST\n");
ldlm_lock2desc(new, &body->lock_desc);
ldlm_object2handle(new, &body->lock_handle2);
}
}
int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
- int new_mode, int *flags, struct ptlrpc_request **request)
+ int new_mode, int *flags)
{
struct ldlm_request *body;
- struct ldlm_reply *reply;
struct ldlm_lock *lock;
- struct ptlrpc_request *req = NULL;
+ struct ldlm_resource *res;
+ struct ptlrpc_request *req;
int rc, size[2] = {sizeof(*body), 0};
char *bufs[2] = {NULL, NULL};
ENTRY;
lock = ldlm_handle2object(lockh);
-
- if (is_local_conn(lock->l_connection))
- GOTO(local, 0);
+ *flags = 0;
size[1] = lock->l_data_len;
bufs[1] = lock->l_data;
if (rc != ELDLM_OK)
GOTO(out, rc);
- reply = lustre_msg_buf(req->rq_repmsg, 0);
- *flags = reply->flags;
-
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ res = ldlm_local_lock_convert(lockh, new_mode, &body->flags);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
+ if (lock->l_req_mode != lock->l_granted_mode) {
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ CDEBUG(D_NET, "convert returned a blocked lock, "
+ "going to sleep.\n");
+ wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
+ lock->l_granted_mode);
+ CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+ }
EXIT;
- local:
- rc = ldlm_local_lock_convert(lockh, new_mode, flags);
out:
- *request = req;
+ ptlrpc_free_req(req);
return rc;
}
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
- struct ptlrpc_request **request)
+int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
{
struct ldlm_request *body;
- struct ldlm_lock *lock;
- struct ptlrpc_request *req = NULL;
+ struct ptlrpc_request *req;
+ struct ldlm_resource *res;
int rc, size[2] = {sizeof(*body), 0};
char *bufs[2] = {NULL, NULL};
ENTRY;
- lock = ldlm_handle2object(lockh);
-
- if (is_local_conn(lock->l_connection))
- GOTO(local, 0);
+ if (lock->l_data_len == sizeof(struct inode)) {
+ /* FIXME: do something better than throwing away everything */
+ struct inode *inode = lock->l_data;
+ if (inode == NULL)
+ LBUG();
+ down(&inode->i_sem);
+ invalidate_inode_pages(inode);
+ up(&inode->i_sem);
+ }
size[1] = lock->l_data_len;
bufs[1] = lock->l_data;
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
if (rc != ELDLM_OK)
GOTO(out, rc);
+ res = ldlm_local_lock_cancel(lock);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
EXIT;
- local:
- rc = ldlm_local_lock_cancel(lockh);
out:
- *request = req;
return rc;
}
#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_LDLM
-#include <linux/slab.h>
#include <linux/lustre_dlm.h>
kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
-LIST_HEAD(ldlm_namespaces);
-spinlock_t ldlm_spinlock;
-
-struct ldlm_namespace *ldlm_namespace_find(__u32 id)
-{
- struct list_head *tmp;
- struct ldlm_namespace *res;
-
- res = NULL;
- list_for_each(tmp, &ldlm_namespaces) {
- struct ldlm_namespace *chk;
- chk = list_entry(tmp, struct ldlm_namespace, ns_link);
-
- if ( chk->ns_id == id ) {
- res = chk;
- break;
- }
- }
-
- return res;
-}
-
-/* this must be called with ldlm_lock() held */
-static int res_hash_init(struct ldlm_namespace *ns)
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev,
+ __u32 local)
{
- struct list_head *res_hash;
+ struct ldlm_namespace *ns;
struct list_head *bucket;
- if (ns->ns_hash != NULL)
- RETURN(0);
-
- /* FIXME: this memory appears to be leaked */
- OBD_ALLOC(res_hash, sizeof(*res_hash) * RES_HASH_SIZE);
- if (!res_hash) {
+ OBD_ALLOC(ns, sizeof(*ns));
+ if (!ns) {
LBUG();
- RETURN(-ENOMEM);
+ RETURN(NULL);
}
+ OBD_ALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
+ if (!ns->ns_hash) {
+ OBD_FREE(ns, sizeof(*ns));
+ LBUG();
+ RETURN(NULL);
+ }
+
+ ns->ns_obddev = obddev;
+ INIT_LIST_HEAD(&ns->ns_root_list);
+ ns->ns_lock = SPIN_LOCK_UNLOCKED;
+ ns->ns_refcount = 0;
+ ns->ns_local = local;
- for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash;
+ for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
bucket--)
INIT_LIST_HEAD(bucket);
- ns->ns_hash = res_hash;
-
- return 0;
+ return ns;
}
-ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id,
- struct ldlm_namespace **ns_out)
+static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
{
- struct ldlm_namespace *ns;
- int rc;
+ struct list_head *tmp, *pos;
+ int rc = 0;
- if (ldlm_namespace_find(id))
- RETURN(-ELDLM_NAMESPACE_EXISTS);
+ list_for_each_safe(tmp, pos, q) {
+ struct ldlm_lock *lock;
- OBD_ALLOC(ns, sizeof(*ns));
- if (!ns) {
- LBUG();
- RETURN(-ENOMEM);
- }
+ if (rc) {
+ /* Res was already cleaned up. */
+ LBUG();
+ }
- ns->ns_id = id;
- ns->ns_obddev = obddev;
- INIT_LIST_HEAD(&ns->ns_root_list);
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ spin_lock(&lock->l_lock);
+ ldlm_resource_del_lock(lock);
+ ldlm_lock_free(lock);
- rc = res_hash_init(ns);
- if (rc) {
- OBD_FREE(ns, sizeof(*ns));
- RETURN(rc);
+ rc = ldlm_resource_put(res);
}
- list_add(&ns->ns_link, &ldlm_namespaces);
- atomic_set(&ns->ns_refcount, 0);
- *ns_out = ns;
- return ELDLM_OK;
+ return rc;
}
int ldlm_namespace_free(struct ldlm_namespace *ns)
{
- if (atomic_read(&ns->ns_refcount))
- RETURN(-EBUSY);
+ struct list_head *tmp, *pos;
+ int i, rc;
+
+ /* We should probably take the ns_lock, but then ldlm_resource_put
+ * couldn't take it. Hmm. */
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ struct ldlm_resource *res;
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+
+ spin_lock(&res->lr_lock);
+ rc = cleanup_resource(res, &res->lr_granted);
+ if (!rc)
+ rc = cleanup_resource(res, &res->lr_converting);
+ if (!rc)
+ rc = cleanup_resource(res, &res->lr_waiting);
+
+ if (rc == 0) {
+ CERROR("Resource refcount nonzero after lock "
+ "cleanup; forcing cleanup.\n");
+ res->lr_refcount = 1;
+ rc = ldlm_resource_put(res);
+ }
+ }
+ }
- list_del(&ns->ns_link);
OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE);
OBD_FREE(ns, sizeof(*ns));
- return 0;
+ return ELDLM_OK;
}
static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
struct ldlm_resource *res;
res = kmem_cache_alloc(ldlm_resource_slab, SLAB_KERNEL);
- if (res == NULL)
+ if (res == NULL) {
LBUG();
+ return NULL;
+ }
memset(res, 0, sizeof(*res));
INIT_LIST_HEAD(&res->lr_children);
INIT_LIST_HEAD(&res->lr_waiting);
res->lr_lock = SPIN_LOCK_UNLOCKED;
-
- atomic_set(&res->lr_refcount, 1);
+ res->lr_refcount = 1;
return res;
}
-/* ldlm_lock() must be taken before calling resource_add */
+/* Args: locked namespace
+ * Returns: newly-allocated, referenced, unlocked resource */
static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u64 *name, __u32 type)
{
struct list_head *bucket;
struct ldlm_resource *res;
+ ENTRY;
- bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
+ if (type < 0 || type > LDLM_MAX_TYPE) {
+ LBUG();
+ RETURN(NULL);
+ }
res = ldlm_resource_new();
- if (!res)
+ if (!res) {
LBUG();
+ RETURN(NULL);
+ }
memcpy(res->lr_name, name, sizeof(res->lr_name));
res->lr_namespace = ns;
- if (type < 0 || type > LDLM_MAX_TYPE)
- LBUG();
+ ns->ns_refcount++;
res->lr_type = type;
res->lr_most_restr = LCK_NL;
+
+ bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
list_add(&res->lr_hash, bucket);
- atomic_inc(&ns->ns_refcount);
+
if (parent == NULL) {
res->lr_parent = res;
list_add(&res->lr_rootlink, &ns->ns_root_list);
list_add(&res->lr_childof, &parent->lr_children);
}
- return res;
+ RETURN(res);
}
+/* Args: unlocked namespace
+ * Locks: takes and releases ns->ns_lock and res->lr_lock
+ * Returns: referenced, unlocked ldlm_resource or NULL */
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u64 *name, __u32 type, int create)
{
struct list_head *bucket;
struct list_head *tmp = bucket;
- struct ldlm_resource *res;
+ struct ldlm_resource *res = NULL;
ENTRY;
if (ns->ns_hash == NULL)
RETURN(NULL);
+
+ spin_lock(&ns->ns_lock);
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
- res = NULL;
list_for_each(tmp, bucket) {
struct ldlm_resource *chk;
chk = list_entry(tmp, struct ldlm_resource, lr_hash);
if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
res = chk;
- atomic_inc(&res->lr_refcount);
+ spin_lock(&res->lr_lock);
+ res->lr_refcount++;
+ spin_unlock(&res->lr_lock);
+ EXIT;
break;
}
}
if (res == NULL && create)
res = ldlm_resource_add(ns, parent, name, type);
+ spin_unlock(&ns->ns_lock);
RETURN(res);
}
+/* Args: locked resource
+ * Locks: takes and releases res->lr_lock
+ * takes and releases ns->ns_lock iff res->lr_refcount falls to 0
+ */
int ldlm_resource_put(struct ldlm_resource *res)
{
- int rc = 0;
+ int rc = 0;
- if (atomic_read(&res->lr_refcount) <= 0)
- LBUG();
+ if (res->lr_refcount == 1) {
+ struct ldlm_namespace *ns = res->lr_namespace;
+ ENTRY;
+
+ spin_unlock(&res->lr_lock);
+ spin_lock(&ns->ns_lock);
+ spin_lock(&res->lr_lock);
+
+ if (res->lr_refcount != 1) {
+ spin_unlock(&ns->ns_lock);
+ goto out;
+ }
- if (atomic_dec_and_test(&res->lr_refcount)) {
if (!list_empty(&res->lr_granted))
LBUG();
if (!list_empty(&res->lr_waiting))
LBUG();
- atomic_dec(&res->lr_namespace->ns_refcount);
+ if (!list_empty(&res->lr_children))
+ LBUG();
+
+ ns->ns_refcount--;
list_del(&res->lr_hash);
list_del(&res->lr_rootlink);
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
+ spin_unlock(&ns->ns_lock);
rc = 1;
+ } else {
+ ENTRY;
+ out:
+ res->lr_refcount--;
+ if (res->lr_refcount < 0)
+ LBUG();
}
- return rc;
+ RETURN(rc);
}
+/* Must be called with resource->lr_lock taken */
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock)
{
list_add(&lock->l_res_link, head);
- atomic_inc(&res->lr_refcount);
+ res->lr_refcount++;
}
+/* Must be called with resource->lr_lock taken */
void ldlm_resource_del_lock(struct ldlm_lock *lock)
{
- list_del(&lock->l_res_link);
- atomic_dec(&lock->l_resource->lr_refcount);
+ list_del_init(&lock->l_res_link);
+ lock->l_resource->lr_refcount--;
}
int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
{
- desc->lr_ns_id = res->lr_namespace->ns_id;
desc->lr_type = res->lr_type;
memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
(unsigned long long)res->lr_name[2]);
CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name);
- CDEBUG(D_OTHER, "Namespace: %p (%u)\n", res->lr_namespace,
- res->lr_namespace->ns_id);
+ CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace);
CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
CDEBUG(D_OTHER, "Granted locks:\n");
ldlm_lock_dump(lock);
}
}
-
struct ldlm_handle lockh_1, lockh_2;
int flags;
- ldlm_lock();
-
- err = ldlm_namespace_new(obddev, 1, &ns);
- if (err != ELDLM_OK)
+ ns = ldlm_namespace_new(obddev, 0);
+ if (ns == NULL)
LBUG();
- err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_1);
- err = ldlm_local_lock_enqueue(&lockh_1, LCK_CR, NULL, &flags, NULL,
- ldlm_test_callback, NULL, 0);
+ err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_CR,
+ NULL, 0, &lockh_1);
+ err = ldlm_local_lock_enqueue(&lockh_1, NULL, &flags,
+ ldlm_test_callback, ldlm_test_callback);
if (err != ELDLM_OK)
LBUG();
- err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_2);
- err = ldlm_local_lock_enqueue(&lockh_2, LCK_EX, NULL, &flags, NULL,
- ldlm_test_callback, NULL, 0);
+ err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_EX,
+ NULL, 0, &lockh_2);
+ err = ldlm_local_lock_enqueue(&lockh_2, NULL, &flags,
+ ldlm_test_callback, ldlm_test_callback);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_BLOCK_GRANTED))
LBUG();
ldlm_resource_dump(res);
- err = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags);
- if (err != ELDLM_OK)
- LBUG();
+ res = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
ldlm_resource_dump(res);
- ldlm_unlock();
+ ldlm_namespace_free(ns);
return 0;
}
{
struct ldlm_namespace *ns;
struct ldlm_resource *res;
+ struct ldlm_lock *lock;
__u64 res_id[RES_NAME_SIZE] = {0, 0, 0};
struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11};
struct ldlm_handle ext1_h, ext2_h, ext3_h;
ldlm_error_t err;
int flags;
- ldlm_lock();
-
- err = ldlm_namespace_new(obddev, 2, &ns);
- if (err != ELDLM_OK)
+ ns = ldlm_namespace_new(obddev, 0);
+ if (ns == NULL)
LBUG();
flags = 0;
- err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext1_h);
- err = ldlm_local_lock_enqueue(&ext1_h, LCK_PR, &ext1, &flags, NULL,
- NULL, NULL, 0);
+ err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR,
+ NULL, 0, &ext1_h);
+ err = ldlm_local_lock_enqueue(&ext1_h, &ext1, &flags, NULL, NULL);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext2_h);
- err = ldlm_local_lock_enqueue(&ext2_h, LCK_PR, &ext2, &flags, NULL,
- NULL, NULL, 0);
+ err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR,
+ NULL, 0, &ext2_h);
+ err = ldlm_local_lock_enqueue(&ext2_h, &ext2, &flags, NULL, NULL);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext3_h);
- err = ldlm_local_lock_enqueue(&ext3_h, LCK_EX, &ext3, &flags, NULL,
- NULL, NULL, 0);
+ err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_EX,
+ NULL, 0, &ext3_h);
+ err = ldlm_local_lock_enqueue(&ext3_h, &ext3, &flags, NULL, NULL);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_BLOCK_GRANTED))
/* Convert/cancel blocking locks */
flags = 0;
- err = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags);
- if (err != ELDLM_OK)
- LBUG();
+ res = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
flags = 0;
- err = ldlm_local_lock_cancel(&ext2_h);
- if (err != ELDLM_OK)
- LBUG();
+ lock = ldlm_handle2object(&ext2_h);
+ res = ldlm_local_lock_cancel(lock);
+ if (res != NULL)
+ ldlm_reprocess_all(res);
/* Dump the results */
res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
if (res == NULL)
LBUG();
ldlm_resource_dump(res);
-
- ldlm_unlock();
+ ldlm_namespace_free(ns);
return 0;
}
struct ptlrpc_connection *conn)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
- struct ptlrpc_request *request;
__u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
struct ldlm_extent ext = {4, 6};
- struct ldlm_handle lockh1, lockh2;
+ struct ldlm_handle lockh1;
int flags = 0;
ldlm_error_t err;
- err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, conn, 3);
- ptlrpc_free_req(request);
- CERROR("ldlm_cli_namespace_new: %d\n", err);
- if (err != ELDLM_OK)
- GOTO(out, err);
-
- err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3,
+ err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, obddev->obd_namespace,
NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags,
- NULL, NULL, NULL, 0, &lockh1, &request);
- ptlrpc_free_req(request);
- CERROR("ldlm_cli_enqueue: %d\n", err);
-
- flags = 0;
- err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3,
- NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags,
- NULL, NULL, NULL, 0, &lockh2, &request);
- ptlrpc_free_req(request);
+ NULL, 0, &lockh1);
CERROR("ldlm_cli_enqueue: %d\n", err);
- EXIT;
- out:
- return err;
+ RETURN(err);
}
int ldlm_test(struct obd_device *obddev, struct ptlrpc_connection *conn)
* (jj@sunsite.ms.mff.cuni.cz)
*/
-#include <asm/uaccess.h>
-#include <asm/system.h>
-
-#include <linux/errno.h>
-#include <linux/fs.h>
-#include <linux/fcntl.h>
-#include <linux/sched.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/smp_lock.h>
-
#define DEBUG_SUBSYSTEM S_LLITE
-#include <linux/obd_support.h>
+#include <linux/lustre_dlm.h>
#include <linux/lustre_lite.h>
int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
static int ll_file_open(struct inode *inode, struct file *file)
{
- int rc;
+ int rc;
struct ptlrpc_request *req = NULL;
struct ll_file_data *fd;
struct obdo *oa;
struct ll_sb_info *sbi = ll_i2sbi(inode);
ENTRY;
- if (file->private_data)
+ if (file->private_data)
LBUG();
- fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
+ fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
if (!fd)
GOTO(out, rc = -ENOMEM);
memset(fd, 0, sizeof(*fd));
rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
- S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
+ S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
fd->fd_req = req;
ptlrpc_req_finished(req);
if (rc) {
oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
if (oa == NULL)
LBUG();
- rc = obd_open(ll_i2obdconn(inode), oa);
+ rc = obd_open(ll_i2obdconn(inode), oa);
obdo_free(oa);
if (rc) {
/* XXX: Need to do mdc_close here! */
- if (rc > 0)
- rc = -rc;
- GOTO(out, rc);
+ GOTO(out, rc = abs(rc));
}
file->private_data = fd;
- EXIT;
+ EXIT;
out:
if (rc && fd) {
- kmem_cache_free(ll_file_data_slab, fd);
+ kmem_cache_free(ll_file_data_slab, fd);
file->private_data = NULL;
}
ENTRY;
fd = (struct ll_file_data *)file->private_data;
- if (!fd || !fd->fd_mdshandle) {
+ if (!fd || !fd->fd_mdshandle) {
LBUG();
GOTO(out, rc = -EINVAL);
}
oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID));
if (oa == NULL)
LBUG();
- rc = obd_close(ll_i2obdconn(inode), oa);
+ rc = obd_close(ll_i2obdconn(inode), oa);
obdo_free(oa);
- if (rc) {
- if (rc > 0)
- rc = -rc;
- GOTO(out, rc);
- }
+ if (rc)
+ GOTO(out, abs(rc));
iattr.ia_valid = ATTR_SIZE;
iattr.ia_size = inode->i_size;
rc = ll_inode_setattr(inode, &iattr, 0);
if (rc) {
CERROR("failed - %d.\n", rc);
- rc = -EIO;
+ rc = -EIO; /* XXX - GOTO(out)? -phil */
}
rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
EXIT;
out:
- if (!rc && fd) {
- kmem_cache_free(ll_file_data_slab, fd);
+ if (!rc && fd) {
+ kmem_cache_free(ll_file_data_slab, fd);
file->private_data = NULL;
}
return rc;
}
}
+static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
+ loff_t *ppos)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+#if 0
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_extent extent;
+ struct ldlm_handle lockh;
+ __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+ int flags = 0;
+ ldlm_error_t err;
+#endif
+ ssize_t retval;
+ ENTRY;
+
+#if 0
+ extent.start = *ppos;
+ extent.end = *ppos + count;
+ CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
+ inode->i_ino, extent.start, extent.end);
+
+ err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id,
+ LDLM_EXTENT, &extent, LCK_PR, &flags, inode,
+ sizeof(*inode), &lockh);
+ if (err != ELDLM_OK)
+ CERROR("lock enqueue: err: %d\n", err);
+ ldlm_lock_dump((void *)(unsigned long)lockh.addr);
+#endif
+
+ CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n",
+ inode->i_ino, count, *ppos);
+ retval = generic_file_read(filp, buf, count, ppos);
+ if (retval > 0) {
+ struct iattr attr;
+ attr.ia_valid = ATTR_ATIME;
+ attr.ia_atime = CURRENT_TIME;
+ ll_setattr(filp->f_dentry, &attr);
+ }
+
+#if 0
+ err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh);
+ if (err != ELDLM_OK)
+ CERROR("lock cancel: err: %d\n", err);
+#endif
+
+ RETURN(retval);
+}
/*
* Write to a file (through the page cache).
static ssize_t
ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
{
+ struct inode *inode = file->f_dentry->d_inode;
+#if 0
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ldlm_extent extent;
+ struct ldlm_handle lockh;
+ __u64 res_id[RES_NAME_SIZE] = {inode->i_ino};
+ int flags = 0;
+ ldlm_error_t err;
+#endif
ssize_t retval;
+ ENTRY;
+
+#if 0
+ extent.start = *ppos;
+ extent.end = *ppos + count;
+ CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n",
+ inode->i_ino, extent.start, extent.end);
+
+ err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id,
+ LDLM_EXTENT, &extent, LCK_PW, &flags, inode,
+ sizeof(*inode), &lockh);
+ if (err != ELDLM_OK)
+ CERROR("lock enqueue: err: %d\n", err);
+ ldlm_lock_dump((void *)(unsigned long)lockh.addr);
+#endif
+
CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n",
- file->f_dentry->d_inode->i_ino, (long)count, *ppos);
+ inode->i_ino, (long)count, *ppos);
retval = generic_file_write(file, buf, count, ppos);
- CDEBUG(D_INFO, "Wrote %ld\n", (long)retval);
/* update mtime/ctime/atime here, NOT size */
if (retval > 0) {
CURRENT_TIME;
ll_setattr(file->f_dentry, &attr);
}
- EXIT;
- return retval;
-}
+#if 0
+ err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh);
+ if (err != ELDLM_OK)
+ CERROR("lock cancel: err: %d\n", err);
+#endif
+
+ RETURN(retval);
+}
/* XXX this does not need to do anything for data, it _does_ need to
- call setattr */
+ call setattr */
int ll_fsync(struct file *file, struct dentry *dentry, int data)
{
return 0;
}
struct file_operations ll_file_operations = {
- read: generic_file_read,
+ read: ll_file_read,
write: ll_file_write,
open: ll_file_open,
release: ll_file_release,
fsync: NULL
};
-
struct inode_operations ll_file_inode_operations = {
truncate: ll_truncate,
setattr: ll_setattr
};
-
*
*/
-#include <linux/config.h>
-#include <linux/module.h>
-
#define DEBUG_SUBSYSTEM S_LLITE
+#include <linux/module.h>
#include <linux/lustre_lite.h>
#include <linux/lustre_ha.h>
+#include <linux/lustre_dlm.h>
kmem_cache_t *ll_file_data_slab;
extern struct address_space_operations ll_aops;
GOTO(out_free, sb = NULL);
}
+ sbi->ll_namespace = ldlm_namespace_new(NULL, 1);
+ if (sbi->ll_namespace == NULL) {
+ CERROR("failed to create local lock namespace\n");
+ GOTO(out_free, sb = NULL);
+ }
+
ptlrpc_init_client(ptlrpc_connmgr, ll_recover,
MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
&sbi->ll_mds_client);
obd_disconnect(&sbi->ll_conn);
out_free:
MOD_DEC_USE_COUNT;
+ if (sbi->ll_namespace)
+ ldlm_namespace_free(sbi->ll_namespace);
OBD_FREE(sbi, sizeof(*sbi));
}
if (device)
ENTRY;
ll_commitcbd_cleanup(sbi);
obd_disconnect(&sbi->ll_conn);
+ ldlm_namespace_free(sbi->ll_namespace);
ptlrpc_put_connection(sbi->ll_mds_conn);
ptlrpc_cleanup_client(&sbi->ll_mds_client);
OBD_FREE(sb->u.generic_sbp, sizeof(*sbi));
*connection = osc->osc_conn;
}
+static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
+ struct ptlrpc_connection **connection)
+{
+ struct osc_obd *osc = &conn->oc_dev->u.osc;
+ *cl = osc->osc_ldlm_client;
+ *connection = osc->osc_conn;
+}
+
static int osc_connect(struct obd_conn *conn)
{
struct ptlrpc_request *request;
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
+ struct ptlrpc_bulk_desc *bulk;
+ int rc;
+ ENTRY;
osc_con2cl(conn, &cl, &connection);
- if (cl->cli_obd) {
- /* local sendpage */
- memcpy((char *)(unsigned long)dst->addr,
- (char *)(unsigned long)src->addr, src->len);
- } else {
- struct ptlrpc_bulk_desc *bulk;
- int rc;
-
- bulk = ptlrpc_prep_bulk(connection);
- if (bulk == NULL)
- RETURN(-ENOMEM);
-
- bulk->b_buf = (void *)(unsigned long)src->addr;
- bulk->b_buflen = src->len;
- bulk->b_xid = dst->xid;
- rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
- if (rc != 0) {
- CERROR("send_bulk failed: %d\n", rc);
- ptlrpc_free_bulk(bulk);
- LBUG();
- RETURN(rc);
- }
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
+ bulk = ptlrpc_prep_bulk(connection);
+ if (bulk == NULL)
+ RETURN(-ENOMEM);
- if (bulk->b_flags & PTL_RPC_FL_INTR) {
- ptlrpc_free_bulk(bulk);
- RETURN(-EINTR);
- }
+ bulk->b_buf = (void *)(unsigned long)src->addr;
+ bulk->b_buflen = src->len;
+ bulk->b_xid = dst->xid;
+ rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
+ if (rc != 0) {
+ CERROR("send_bulk failed: %d\n", rc);
+ ptlrpc_free_bulk(bulk);
+ LBUG();
+ RETURN(rc);
+ }
+ wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk));
+ if (bulk->b_flags & PTL_RPC_FL_INTR) {
ptlrpc_free_bulk(bulk);
+ RETURN(-EINTR);
}
- return 0;
+ ptlrpc_free_bulk(bulk);
+ RETURN(0);
}
int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
struct ptlrpc_connection *connection;
struct ptlrpc_request *request;
struct ost_body *body;
- struct obd_ioobj ioo;
- struct niobuf src;
int pages, rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
struct ptlrpc_bulk_desc **bulk;
ENTRY;
- size[1] = num_oa * sizeof(ioo);
+ size[1] = num_oa * sizeof(struct obd_ioobj);
pages = 0;
for (i = 0; i < num_oa; i++)
pages += oa_bufs[i];
- size[2] = pages * sizeof(src);
+ size[2] = pages * sizeof(struct niobuf);
OBD_ALLOC(bulk, pages * sizeof(*bulk));
if (bulk == NULL)
struct obd_ioobj ioo;
struct ost_body *body;
struct niobuf *src;
- int pages, rc, i, j, size[3] = {sizeof(*body)};
+ long pages;
+ int rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
ENTRY;
GOTO(out, rc = -EINVAL);
if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
- CERROR("buffer length wrong (%d vs. %d)\n",
+ CERROR("buffer length wrong (%d vs. %ld)\n",
request->rq_repmsg->buflens[1],
pages * sizeof(struct niobuf));
GOTO(out, rc = -EINVAL);
offset, flags);
}
+int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
+ struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type,
+ struct ldlm_extent *extent, __u32 mode, int *flags, void *data,
+ int datalen, struct ldlm_handle *lockh)
+{
+ struct ptlrpc_connection *conn;
+ struct ptlrpc_client *cl;
+ int rc;
+ __u32 mode2;
+
+ /* Filesystem locks are given a bit of special treatment: first we
+ * fixup the lock to start and end on page boundaries. */
+ extent->start &= PAGE_MASK;
+ extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+
+ /* Next, search for already existing extent locks that will cover us */
+ osc_con2dlmcl(oconn, &cl, &conn);
+ rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
+ if (rc == 1) {
+ /* We already have a lock, and it's referenced */
+ return 0;
+ }
+
+ /* Next, search for locks that we can upgrade (if we're trying to write)
+ * or are more than we need (if we're trying to read). Because the VFS
+ * and page cache already protect us locally, lots of readers/writers
+ * can share a single PW lock. */
+ if (mode == LCK_PW)
+ mode2 = LCK_PR;
+ else
+ mode2 = LCK_PW;
+
+ rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
+ if (rc == 1) {
+ int flags;
+ struct ldlm_lock *lock = ldlm_handle2object(lockh);
+ /* FIXME: This is not incredibly elegant, but it might
+ * be more elegant than adding another parameter to
+ * lock_match. I want a second opinion. */
+ ldlm_lock_addref(lock, mode);
+ ldlm_lock_decref(lock, mode2);
+
+ if (mode == LCK_PR)
+ return 0;
+
+ rc = ldlm_cli_convert(cl, lockh, type, &flags);
+ if (rc)
+ LBUG();
+
+ return rc;
+ }
+
+ rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
+ extent, mode, flags, data, datalen, lockh);
+ return rc;
+}
+
+int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh)
+{
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ lock = ldlm_handle2object(lockh);
+ ldlm_lock_decref(lock, mode);
+
+ RETURN(0);
+}
+
static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct osc_obd *osc = &obddev->u.osc;
o_connect: osc_connect,
o_disconnect: osc_disconnect,
o_brw: osc_brw,
- o_punch: osc_punch
+ o_punch: osc_punch,
+ o_enqueue: osc_enqueue,
+ o_cancel: osc_cancel
};
static int __init osc_init(void)
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
-#if 0
+#if 1
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err)
GOTO(error_disc, err = -EINVAL);
init_waitqueue_head(&req->rq_wait_for_rep);
resend:
req->rq_time = CURRENT_TIME;
- req->rq_timeout = 3;
+ req->rq_timeout = 30;
rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);