From: pschwan Date: Thu, 9 May 2002 17:08:39 +0000 (+0000) Subject: Landing the ldlm_testing branch; now the only difference is that the locking X-Git-Tag: 0.4.2~288 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=0d57b0cb1097d6c673f82c7c9fd0dc3364e1bca2;p=fs%2Flustre-release.git Landing the ldlm_testing branch; now the only difference is that the locking calls are #if 0ed out of the trunk's ll_file_read and ll_file_write --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 9d8ee98..42621d9 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -28,6 +28,7 @@ typedef enum { #define LDLM_FL_BLOCK_GRANTED (1 << 1) #define LDLM_FL_BLOCK_CONV (1 << 2) #define LDLM_FL_BLOCK_WAIT (1 << 3) +#define LDLM_FL_DYING (1 << 4) #define L2B(c) (1 << c) @@ -74,12 +75,12 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) */ struct ldlm_namespace { - struct list_head ns_link; /* in the list of ns's */ - __u32 ns_id; /* identifier of ns */ + struct obd_device *ns_obddev; + __u32 ns_local; /* is this a local lock tree? */ struct list_head *ns_hash; /* hash table for ns */ - atomic_t ns_refcount; /* count of resources in the hash */ + __u32 ns_refcount; /* count of resources in the hash */ struct list_head ns_root_list; /* all root resources in ns */ - struct obd_device *ns_obddev; + spinlock_t ns_lock; /* protects hash, refcount, list */ }; /* @@ -103,19 +104,32 @@ struct ldlm_lock { struct list_head l_children; struct list_head l_childof; struct list_head l_res_link; /*position in one of three res lists*/ + ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; + ldlm_lock_callback l_completion_ast; ldlm_lock_callback l_blocking_ast; + struct ptlrpc_connection *l_connection; + struct ptlrpc_client *l_client; + __u32 l_flags; + struct ldlm_handle l_remote_handle; void *l_data; __u32 l_data_len; struct ldlm_extent l_extent; - struct ldlm_handle l_remote_handle; //void *l_event; //XXX cluster_host l_holder; __u32 l_version[RES_VERSION_SIZE]; + + __u32 l_readers; + __u32 l_writers; + + /* If the lock is granted, a process sleeps on this waitq to learn when + * it's no longer in use. If the lock is not granted, a process sleeps + * on this waitq to learn when it becomes granted. */ wait_queue_head_t l_waitq; + spinlock_t l_lock; }; typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new); @@ -124,11 +138,12 @@ typedef int (*ldlm_res_policy)(struct ldlm_resource *parent, struct ldlm_extent *new_ex, ldlm_mode_t mode, void *data); -#define LDLM_PLAIN 0x0 -#define LDLM_EXTENT 0x1 -#define LDLM_MDSINTENT 0x2 +#define LDLM_PLAIN 0 +#define LDLM_EXTENT 1 +#define LDLM_MDSINTENT 2 + +#define LDLM_MAX_TYPE 2 -#define LDLM_MAX_TYPE 0x2 extern ldlm_res_compat ldlm_res_compat_table []; extern ldlm_res_policy ldlm_res_policy_table []; @@ -144,13 +159,13 @@ struct ldlm_resource { struct list_head lr_converting; struct list_head lr_waiting; ldlm_mode_t lr_most_restr; - atomic_t lr_refcount; __u32 lr_type; /* PLAIN, EXTENT, or MDSINTENT */ struct ldlm_resource *lr_root; //XXX cluster_host lr_master; __u64 lr_name[RES_NAME_SIZE]; __u32 lr_version[RES_VERSION_SIZE]; - spinlock_t lr_lock; + __u32 lr_refcount; + spinlock_t lr_lock; /* protects lists, refcount */ }; static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) @@ -170,19 +185,6 @@ static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle) handle->addr = (__u64)(unsigned long)object; } -extern struct list_head ldlm_namespaces; -extern spinlock_t ldlm_spinlock; - -static inline void ldlm_lock(void) -{ - spin_lock(&ldlm_spinlock); -} - -static inline void ldlm_unlock(void) -{ - spin_unlock(&ldlm_spinlock); -} - extern struct obd_ops ldlm_obd_ops; /* ldlm_extent.c */ @@ -193,31 +195,35 @@ int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *, /* ldlm_lock.c */ void ldlm_lock_free(struct ldlm_lock *lock); void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); -ldlm_error_t ldlm_local_lock_create(__u32 ns_id, +void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode); +void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode); +void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock); +int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, + struct ldlm_extent *extent, ldlm_mode_t mode, + struct ldlm_handle *lockh); +ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns, struct ldlm_handle *parent_lock_handle, - __u64 *res_id, - __u32 type, + __u64 *res_id, __u32 type, + ldlm_mode_t mode, + void *data, + __u32 data_len, struct ldlm_handle *lockh); ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, - ldlm_mode_t mode, struct ldlm_extent *req_ex, int *flags, ldlm_lock_callback completion, - ldlm_lock_callback blocking, - void *data, - __u32 data_len); -ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh, - int new_mode, int *flags); -ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh); + ldlm_lock_callback blocking); +struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh, + int new_mode, int *flags); +struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock); +void ldlm_reprocess_all(struct ldlm_resource *res); void ldlm_lock_dump(struct ldlm_lock *lock); /* ldlm_test.c */ int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_find(__u32 id); -ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id, - struct ldlm_namespace **); +struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 local); int ldlm_namespace_free(struct ldlm_namespace *ns); struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, @@ -230,28 +236,22 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc); void ldlm_resource_dump(struct ldlm_resource *res); /* ldlm_request.c */ -int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *, - struct ptlrpc_connection *, __u32 ns_id); int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer, - __u32 ns_id, + struct ldlm_namespace *ns, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, - ldlm_lock_callback completion, - ldlm_lock_callback blocking, void *data, __u32 data_len, - struct ldlm_handle *lockh, - struct ptlrpc_request **request); + struct ldlm_handle *lockh); int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, void *data, __u32 data_len); int ldlm_cli_convert(struct ptlrpc_client *, struct ldlm_handle *, - int new_mode, int *flags, struct ptlrpc_request **); -int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_handle *, - struct ptlrpc_request **); + int new_mode, int *flags); +int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 1c9890f..cce3c43 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -269,11 +269,10 @@ struct mds_rec_rename { */ /* opcodes */ -#define LDLM_NAMESPACE_NEW 1 -#define LDLM_ENQUEUE 2 -#define LDLM_CONVERT 3 -#define LDLM_CANCEL 4 -#define LDLM_CALLBACK 5 +#define LDLM_ENQUEUE 1 +#define LDLM_CONVERT 2 +#define LDLM_CANCEL 3 +#define LDLM_CALLBACK 4 #define RES_NAME_SIZE 3 #define RES_VERSION_SIZE 4 @@ -299,7 +298,6 @@ struct ldlm_extent { }; struct ldlm_resource_desc { - __u32 lr_ns_id; __u32 lr_type; __u64 lr_name[RES_NAME_SIZE]; __u64 lr_version[RES_VERSION_SIZE]; diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 2db81cf..fcf3998 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -46,6 +46,14 @@ int simple_mkdir(struct dentry *dir, char *name, int mode); int lustre_fread(struct file *file, char *str, int len, loff_t *off); int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off); int lustre_fsync(struct file *file); + +static inline void ll_sleep(int t) +{ + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(t * HZ); + set_current_state(TASK_RUNNING); +} + #endif #include diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 7ed6e48..a428858 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -344,6 +344,33 @@ static inline int obd_iocontrol(int cmd, struct obd_conn *conn, RETURN(rc); } +static inline int obd_enqueue(struct obd_conn *conn, struct ldlm_namespace *ns, + struct ldlm_handle *parent_lock, __u64 *res_id, + __u32 type, struct ldlm_extent *extent, + __u32 mode, int *flags, void *data, int datalen, + struct ldlm_handle *lockh) +{ + int rc; + OBD_CHECK_SETUP(conn); + OBD_CHECK_OP(conn, enqueue); + + rc = OBP(conn->oc_dev, enqueue)(conn, ns, parent_lock, res_id, type, + extent, mode, flags, data, datalen, + lockh); + RETURN(rc); +} + +static inline int obd_cancel(struct obd_conn *conn, __u32 mode, + struct ldlm_handle *lockh) +{ + int rc; + OBD_CHECK_SETUP(conn); + OBD_CHECK_OP(conn, cancel); + + rc = OBP(conn->oc_dev, cancel)(conn, mode, lockh); + RETURN(rc); +} + #endif /* @@ -396,8 +423,7 @@ static __inline__ struct obdo *obdo_fromid(struct obd_conn *conn, obd_id id, ENTRY; oa = obdo_alloc(); if ( !oa ) { - EXIT; - return ERR_PTR(-ENOMEM); + RETURN(ERR_PTR(-ENOMEM)); } oa->o_id = id; @@ -405,11 +431,9 @@ static __inline__ struct obdo *obdo_fromid(struct obd_conn *conn, obd_id id, oa->o_valid = valid; if ((err = OBP(conn->oc_dev, getattr)(conn, oa))) { obdo_free(oa); - EXIT; - return ERR_PTR(err); + RETURN(ERR_PTR(err)); } - EXIT; - return oa; + RETURN(oa); } static inline void obdo_from_iattr(struct obdo *oa, struct iattr *attr) diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index e24a457..b334e4a 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -26,9 +26,9 @@ int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b) { if (MAX(a->l_extent.start, b->l_extent.start) <= MIN(a->l_extent.end, b->l_extent.end)) - return 0; + RETURN(0); - return 1; + RETURN(1); } static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 83e2d2c..05bd152 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -45,7 +45,9 @@ static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b) return 0; } -/* Caller should do ldlm_resource_get() on this resource first. */ +/* Args: referenced, unlocked parent (or NULL) + * referenced, unlocked resource + * Locks: parent->l_lock */ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, struct ldlm_resource *resource) { @@ -61,23 +63,38 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, memset(lock, 0, sizeof(*lock)); lock->l_resource = resource; INIT_LIST_HEAD(&lock->l_children); + INIT_LIST_HEAD(&lock->l_res_link); init_waitqueue_head(&lock->l_waitq); + lock->l_lock = SPIN_LOCK_UNLOCKED; if (parent != NULL) { + spin_lock(&parent->l_lock); lock->l_parent = parent; list_add(&lock->l_childof, &parent->l_children); + spin_unlock(&parent->l_lock); } return lock; } -/* Caller must do its own ldlm_resource_put() on lock->l_resource */ +/* Args: unreferenced, locked lock + * + * Caller must do its own ldlm_resource_put() on lock->l_resource */ void ldlm_lock_free(struct ldlm_lock *lock) { if (!list_empty(&lock->l_children)) { - CERROR("lock still has children!\n"); + CERROR("lock %p still has children (%p)!\n", lock, + lock->l_children.next); + ldlm_lock_dump(lock); LBUG(); } + + if (lock->l_readers || lock->l_writers) + CDEBUG(D_INFO, "lock still has references (%d readers, %d " + "writers)\n", lock->l_readers, lock->l_writers); + + if (lock->l_connection) + ptlrpc_put_connection(lock->l_connection); kmem_cache_free(ldlm_lock_slab, lock); } @@ -90,25 +107,77 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); } -static int ldlm_lock_compat(struct ldlm_lock *lock) +/* Args: unlocked lock */ +void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) { - struct list_head *tmp; + spin_lock(&lock->l_lock); + if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) + lock->l_readers++; + else + lock->l_writers++; + spin_unlock(&lock->l_lock); +} + +/* Args: unlocked lock */ +void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) +{ + int rc; + + spin_lock(&lock->l_lock); + if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) + lock->l_readers--; + else + lock->l_writers--; + if (!lock->l_readers && !lock->l_writers && + lock->l_flags & LDLM_FL_DYING) { + /* Read this lock its rights. */ + if (!lock->l_resource->lr_namespace->ns_local) { + CERROR("LDLM_FL_DYING set on non-local lock!\n"); + LBUG(); + } + + CDEBUG(D_INFO, "final decref done on dying lock, " + "cancelling.\n"); + spin_unlock(&lock->l_lock); + rc = ldlm_cli_cancel(lock->l_client, lock); + if (rc) { + /* FIXME: do something more dramatic */ + CERROR("ldlm_cli_cancel: %d\n", rc); + } + } else + spin_unlock(&lock->l_lock); +} + +/* Args: locked lock */ +static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, + struct list_head *queue) +{ + struct list_head *tmp, *pos; int rc = 0; - list_for_each(tmp, &lock->l_resource->lr_granted) { + list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *child; ldlm_res_compat compat; child = list_entry(tmp, struct ldlm_lock, l_res_link); + if (lock == child) + continue; compat = ldlm_res_compat_table[child->l_resource->lr_type]; - if (compat(child, lock) || - lockmode_compat(child->l_req_mode, lock->l_req_mode)) + if (compat(child, lock)) { + CDEBUG(D_OTHER, "compat function succeded, next.\n"); + continue; + } + if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) { + CDEBUG(D_OTHER, "lock modes are compatible, next.\n"); continue; + } rc = 1; - if (child->l_blocking_ast != NULL) + CDEBUG(D_OTHER, "compat function failed and lock modes are " + "incompatible; sending blocking AST.\n"); + if (send_cbs && child->l_blocking_ast != NULL) child->l_blocking_ast(child, lock, child->l_data, child->l_data_len); } @@ -116,8 +185,24 @@ static int ldlm_lock_compat(struct ldlm_lock *lock) return rc; } -static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) +/* Args: unlocked lock */ +static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) { + int rc; + ENTRY; + + rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted); + rc |= _ldlm_lock_compat(lock, send_cbs, + &lock->l_resource->lr_converting); + + RETURN(rc); +} + +/* Args: locked lock, locked resource */ +void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) +{ + ENTRY; + ldlm_resource_add_lock(res, &res->lr_granted, lock); lock->l_granted_mode = lock->l_req_mode; @@ -127,21 +212,82 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) if (lock->l_completion_ast) lock->l_completion_ast(lock, NULL, lock->l_data, lock->l_data_len); + EXIT; } -ldlm_error_t ldlm_local_lock_create(__u32 ns_id, +static int search_queue(struct list_head *queue, ldlm_mode_t mode, + struct ldlm_extent *extent, struct ldlm_handle *lockh) +{ + struct list_head *tmp; + + list_for_each(tmp, queue) { + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (lock->l_flags & LDLM_FL_DYING) + continue; + + /* lock_convert() takes the resource lock, so we're sure that + * req_mode, lr_type, and l_extent won't change beneath us */ + if (lock->l_req_mode != mode) + continue; + + if (lock->l_resource->lr_type == LDLM_EXTENT && + (lock->l_extent.start > extent->start || + lock->l_extent.end < extent->end)) + continue; + + ldlm_lock_addref(lock, mode); + ldlm_object2handle(lock, lockh); + return 1; + } + + return 0; +} + +/* Must be called with no resource or lock locks held. + * + * Returns 1 if it finds an already-existing lock that is compatible; in this + * case, lockh is filled in with a addref()ed lock */ +int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, + struct ldlm_extent *extent, ldlm_mode_t mode, + struct ldlm_handle *lockh) +{ + struct ldlm_resource *res; + int rc = 0; + ENTRY; + + res = ldlm_resource_get(ns, NULL, res_id, type, 0); + if (res == NULL) + RETURN(0); + + spin_lock(&res->lr_lock); + if (search_queue(&res->lr_granted, mode, extent, lockh)) + GOTO(out, rc = 1); + if (search_queue(&res->lr_converting, mode, extent, lockh)) + GOTO(out, rc = 1); + if (search_queue(&res->lr_waiting, mode, extent, lockh)) + GOTO(out, rc = 1); + + EXIT; + out: + ldlm_resource_put(res); + spin_unlock(&res->lr_lock); + return rc; +} + +/* Must be called without the resource lock held. */ +ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, + ldlm_mode_t mode, + void *data, + __u32 data_len, struct ldlm_handle *lockh) { - struct ldlm_namespace *ns; struct ldlm_resource *res, *parent_res = NULL; struct ldlm_lock *lock, *parent_lock; - ns = ldlm_namespace_find(ns_id); - if (ns == NULL || ns->ns_hash == NULL) - RETURN(-ELDLM_BAD_NAMESPACE); - parent_lock = ldlm_handle2object(parent_lock_handle); if (parent_lock) parent_res = parent_lock->l_resource; @@ -151,134 +297,172 @@ ldlm_error_t ldlm_local_lock_create(__u32 ns_id, RETURN(-ENOMEM); lock = ldlm_lock_new(parent_lock, res); - if (lock == NULL) + if (lock == NULL) { + spin_lock(&res->lr_lock); + ldlm_resource_put(res); + spin_unlock(&res->lr_lock); RETURN(-ENOMEM); + } - ldlm_object2handle(lock, lockh); + lock->l_req_mode = mode; + lock->l_data = data; + lock->l_data_len = data_len; + ldlm_lock_addref(lock, mode); + ldlm_object2handle(lock, lockh); return ELDLM_OK; } -/* XXX: Revisit the error handling; we do not, for example, do - * ldlm_resource_put()s in our error cases, and we probably leak any allocated - * memory. */ +/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */ ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, - ldlm_mode_t mode, struct ldlm_extent *req_ex, int *flags, ldlm_lock_callback completion, - ldlm_lock_callback blocking, - void *data, - __u32 data_len) + ldlm_lock_callback blocking) { + struct ldlm_resource *res; struct ldlm_lock *lock; - struct ldlm_extent new_ex; - int incompat = 0, rc; + int incompat = 0, local; ldlm_res_policy policy; ENTRY; lock = ldlm_handle2object(lockh); - if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) { - rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL); + res = lock->l_resource; + local = res->lr_namespace->ns_local; + spin_lock(&res->lr_lock); + + lock->l_blocking_ast = blocking; + + if ((res->lr_type == LDLM_EXTENT && !req_ex) || + (res->lr_type != LDLM_EXTENT && req_ex)) + LBUG(); + + if ((policy = ldlm_res_policy_table[res->lr_type])) { + struct ldlm_extent new_ex; + int rc = policy(res, req_ex, &new_ex, lock->l_req_mode, NULL); if (rc == ELDLM_LOCK_CHANGED) { *flags |= LDLM_FL_LOCK_CHANGED; memcpy(req_ex, &new_ex, sizeof(new_ex)); } } - if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) || - (lock->l_resource->lr_type != LDLM_EXTENT && req_ex)) - LBUG(); if (req_ex) memcpy(&lock->l_extent, req_ex, sizeof(*req_ex)); - lock->l_req_mode = mode; - lock->l_data = data; - lock->l_data_len = data_len; - lock->l_blocking_ast = blocking; - spin_lock(&lock->l_resource->lr_lock); - /* FIXME: We may want to optimize by checking lr_most_restr */ + if (local && lock->l_req_mode == lock->l_granted_mode) { + /* The server returned a blocked lock, but it was granted before + * we got a chance to actually enqueue it. We don't need to do + * anything else. */ + GOTO(out, ELDLM_OK); + } - if (!list_empty(&lock->l_resource->lr_converting)) { - ldlm_resource_add_lock(lock->l_resource, - lock->l_resource->lr_waiting.prev, lock); + /* If this is a local resource, put it on the appropriate list. */ + if (local) { + if (*flags & LDLM_FL_BLOCK_CONV) + ldlm_resource_add_lock(res, res->lr_converting.prev, + lock); + else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + else + ldlm_grant_lock(res, lock); + GOTO(out, ELDLM_OK); + } + + /* FIXME: We may want to optimize by checking lr_most_restr */ + if (!list_empty(&res->lr_converting)) { + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_CONV; GOTO(out, ELDLM_OK); } - if (!list_empty(&lock->l_resource->lr_waiting)) { - ldlm_resource_add_lock(lock->l_resource, - lock->l_resource->lr_waiting.prev, lock); + if (!list_empty(&res->lr_waiting)) { + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_WAIT; GOTO(out, ELDLM_OK); } - - incompat = ldlm_lock_compat(lock); + incompat = ldlm_lock_compat(lock, 0); if (incompat) { - ldlm_resource_add_lock(lock->l_resource, - lock->l_resource->lr_waiting.prev, lock); + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_GRANTED; GOTO(out, ELDLM_OK); } - ldlm_grant_lock(lock->l_resource, lock); + ldlm_grant_lock(res, lock); EXIT; out: /* Don't set 'completion_ast' until here so that if the lock is granted * immediately we don't do an unnecessary completion call. */ lock->l_completion_ast = completion; - spin_unlock(&lock->l_resource->lr_lock); + spin_unlock(&res->lr_lock); return ELDLM_OK; } +/* Must be called with resource->lr_lock taken. */ static int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *converting) { struct list_head *tmp, *pos; - int incompat = 0; + ENTRY; list_for_each_safe(tmp, pos, converting) { struct ldlm_lock *pending; pending = list_entry(tmp, struct ldlm_lock, l_res_link); - incompat = ldlm_lock_compat(pending); - if (incompat) - break; + /* the resource lock protects ldlm_lock_compat */ + if (ldlm_lock_compat(pending, 1)) + RETURN(1); - list_del(&pending->l_res_link); + list_del_init(&pending->l_res_link); ldlm_grant_lock(res, pending); + + ldlm_lock_addref(pending, pending->l_req_mode); + ldlm_lock_decref(pending, pending->l_granted_mode); } - return incompat; + RETURN(0); } -static void ldlm_reprocess_all(struct ldlm_resource *res) +/* Must be called with resource->lr_lock not taken. */ +void ldlm_reprocess_all(struct ldlm_resource *res) { + /* Local lock trees don't get reprocessed. */ + if (res->lr_namespace->ns_local) + return; + + spin_lock(&res->lr_lock); ldlm_reprocess_queue(res, &res->lr_converting); if (list_empty(&res->lr_converting)) ldlm_reprocess_queue(res, &res->lr_waiting); + spin_unlock(&res->lr_lock); } -ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh) +/* Must be called with lock and lock->l_resource unlocked */ +struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock) { - struct ldlm_lock *lock; struct ldlm_resource *res; ENTRY; - lock = ldlm_handle2object(lockh); res = lock->l_resource; - ldlm_resource_del_lock(lock); + spin_lock(&res->lr_lock); + spin_lock(&lock->l_lock); - ldlm_lock_free(lock); + if (lock->l_readers || lock->l_writers) + CDEBUG(D_INFO, "lock still has references (%d readers, %d " + "writers)\n", lock->l_readers, lock->l_writers); + + ldlm_resource_del_lock(lock); if (ldlm_resource_put(res)) - RETURN(ELDLM_OK); - ldlm_reprocess_all(res); + res = NULL; /* res was freed, nothing else to do. */ + else + spin_unlock(&res->lr_lock); + ldlm_lock_free(lock); - RETURN(ELDLM_OK); + RETURN(res); } -ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh, - int new_mode, int *flags) +/* Must be called with lock and lock->l_resource unlocked */ +struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh, + int new_mode, int *flags) { struct ldlm_lock *lock; struct ldlm_resource *res; @@ -286,20 +470,37 @@ ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh, lock = ldlm_handle2object(lockh); res = lock->l_resource; - list_del(&lock->l_res_link); - lock->l_req_mode = new_mode; - list_add(&lock->l_res_link, res->lr_converting.prev); + spin_lock(&res->lr_lock); + + lock->l_req_mode = new_mode; + list_del_init(&lock->l_res_link); + + /* If this is a local resource, put it on the appropriate list. */ + if (res->lr_namespace->ns_local) { + if (*flags & LDLM_FL_BLOCK_CONV) + ldlm_resource_add_lock(res, res->lr_converting.prev, + lock); + else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) + ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + else + ldlm_grant_lock(res, lock); + } else { + list_add(&lock->l_res_link, res->lr_converting.prev); + } - ldlm_reprocess_all(res); + spin_unlock(&res->lr_lock); - RETURN(ELDLM_OK); + RETURN(res); } void ldlm_lock_dump(struct ldlm_lock *lock) { char ver[128]; + if (!(portal_debug & D_OTHER)) + return; + if (RES_VERSION_SIZE != 4) LBUG(); @@ -312,6 +513,8 @@ void ldlm_lock_dump(struct ldlm_lock *lock) CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource); CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n", (int)lock->l_req_mode, (int)lock->l_granted_mode); + CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n", + lock->l_readers, lock->l_writers); if (lock->l_resource->lr_type == LDLM_EXTENT) CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n", (unsigned long long)lock->l_extent.start, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 730d606..9bef050 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -10,59 +10,97 @@ */ #define EXPORT_SYMTAB +#define DEBUG_SUBSYSTEM S_LDLM -#include #include #include -#include - -#define DEBUG_SUBSYSTEM S_LDLM - #include extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; -static int _ldlm_namespace_new(struct obd_device *obddev, - struct ptlrpc_request *req) +#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000)) + +static int is_local_conn(struct ptlrpc_connection *conn) { - struct ldlm_request *dlm_req; - struct ldlm_namespace *ns; - int rc; - ldlm_error_t err; ENTRY; + if (conn == NULL) + RETURN(1); - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) { - CERROR("out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); - } - dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + RETURN(LOOPBACK(conn->c_peer.peer_nid)); +} - err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id, - &ns); - req->rq_status = err; +/* _ldlm_callback and local_callback setup the variables then call this common + * code */ +static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new, + ldlm_mode_t mode, void *data, __u32 data_len) +{ + ENTRY; + ldlm_lock_dump(lock); - CERROR("err = %d\n", err); + if (!lock) + LBUG(); + if (!lock->l_resource) + LBUG(); + spin_lock(&lock->l_resource->lr_lock); + spin_lock(&lock->l_lock); + if (!new) { + CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock); + lock->l_req_mode = mode; + list_del_init(&lock->l_res_link); + ldlm_grant_lock(lock->l_resource, lock); + wake_up(&lock->l_waitq); + spin_unlock(&lock->l_lock); + spin_unlock(&lock->l_resource->lr_lock); + } else { + CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock); + lock->l_flags |= LDLM_FL_DYING; + spin_unlock(&lock->l_lock); + spin_unlock(&lock->l_resource->lr_lock); + if (!lock->l_readers && !lock->l_writers) { + CDEBUG(D_INFO, "Lock already unused, canceling.\n"); + if (ldlm_cli_cancel(lock->l_client, lock)) + LBUG(); + } else { + CDEBUG(D_INFO, "Lock still has references; lock will be" + " cancelled later.\n"); + } + } RETURN(0); } -static int _ldlm_enqueue(struct ptlrpc_request *req) +static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new, + void *data, __u32 data_len) +{ + struct ldlm_lock *lock; + /* the 'remote handle' is the lock in the FS's namespace */ + lock = ldlm_handle2object(&l->l_remote_handle); + + return common_callback(lock, new, l->l_granted_mode, data, data_len); +} + +static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) { struct ldlm_reply *dlm_rep; struct ldlm_request *dlm_req; int rc, size = sizeof(*dlm_rep); ldlm_error_t err; - struct ldlm_lock *lock; + struct ldlm_lock *lock = NULL; + ldlm_lock_callback callback; ENTRY; + /* Is this lock managed locally? */ + if (is_local_conn(req->rq_connection)) + callback = local_callback; + else + callback = ldlm_cli_callback; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); } dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); @@ -71,22 +109,21 @@ static int _ldlm_enqueue(struct ptlrpc_request *req) sizeof(dlm_rep->lock_extent)); dlm_rep->flags = dlm_req->flags; - err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id, + err = ldlm_local_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2, dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, + dlm_req->lock_desc.l_req_mode, + lustre_msg_buf(req->rq_reqmsg, 1), + req->rq_reqmsg->buflens[1], &dlm_rep->lock_handle); if (err != ELDLM_OK) GOTO(out, err); err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle, - dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_extent, &dlm_rep->flags, - ldlm_cli_callback, - ldlm_cli_callback, - lustre_msg_buf(req->rq_reqmsg, 1), - req->rq_reqmsg->buflens[1]); + callback, callback); if (err != ELDLM_OK) GOTO(out, err); @@ -97,78 +134,99 @@ static int _ldlm_enqueue(struct ptlrpc_request *req) EXIT; out: req->rq_status = err; - CERROR("err = %d\n", err); + CDEBUG(D_INFO, "err = %d\n", err); + + if (ptlrpc_reply(svc, req)) + LBUG(); + + if (!err) { + ldlm_reprocess_all(lock->l_resource); + } return 0; } -static int _ldlm_convert(struct ptlrpc_request *req) +static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req) { - struct ldlm_request *dlm_req; - int rc; + struct ldlm_request *dlm_req, *dlm_rep; + struct ldlm_resource *res; + int rc, size = sizeof(*dlm_rep); ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); } dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); + dlm_rep->flags = dlm_req->flags; + + res = ldlm_local_lock_convert(&dlm_req->lock_handle1, + dlm_req->lock_desc.l_req_mode, + &dlm_rep->flags); + req->rq_status = 0; + if (ptlrpc_reply(svc, req) != 0) + LBUG(); + + ldlm_reprocess_all(res); - req->rq_status = - ldlm_local_lock_convert(&dlm_req->lock_handle1, - dlm_req->lock_desc.l_req_mode, - &dlm_req->flags); RETURN(0); } -static int _ldlm_cancel(struct ptlrpc_request *req) +static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req) { struct ldlm_request *dlm_req; + struct ldlm_lock *lock; + struct ldlm_resource *res; int rc; ENTRY; rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); } dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1); + lock = ldlm_handle2object(&dlm_req->lock_handle1); + res = ldlm_local_lock_cancel(lock); + req->rq_status = 0; + if (ptlrpc_reply(svc, req) != 0) + LBUG(); + + if (res != NULL) + ldlm_reprocess_all(res); + RETURN(0); } -static int _ldlm_callback(struct ptlrpc_request *req) +static int _ldlm_callback(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock *lock; + struct ldlm_lock *lock1, *lock2; int rc; ENTRY; rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); } dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - lock = ldlm_handle2object(&dlm_req->lock_handle1); - ldlm_lock_dump(lock); - if (dlm_req->lock_handle2.addr) { - CERROR("Got blocked callback for lock %p.\n", lock); - /* FIXME: do something impressive. */ - } else { - CERROR("Got granted callback for lock %p.\n", lock); - lock->l_granted_mode = lock->l_req_mode; - wake_up(&lock->l_waitq); - } + /* We must send the reply first, so that the thread is free to handle + * any requests made in common_callback() */ + rc = ptlrpc_reply(svc, req); + if (rc != 0) + RETURN(rc); - req->rq_status = 0; + lock1 = ldlm_handle2object(&dlm_req->lock_handle1); + lock2 = ldlm_handle2object(&dlm_req->lock_handle2); + common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL, + 0); RETURN(0); } @@ -191,34 +249,28 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc, } switch (req->rq_reqmsg->opc) { - case LDLM_NAMESPACE_NEW: - CDEBUG(D_INODE, "namespace_new\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0); - rc = _ldlm_namespace_new(dev, req); - break; - case LDLM_ENQUEUE: CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); - rc = _ldlm_enqueue(req); + rc = _ldlm_enqueue(dev, svc, req); break; case LDLM_CONVERT: CDEBUG(D_INODE, "convert\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); - rc = _ldlm_convert(req); + rc = _ldlm_convert(svc, req); break; case LDLM_CANCEL: CDEBUG(D_INODE, "cancel\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); - rc = _ldlm_cancel(req); + rc = _ldlm_cancel(svc, req); break; case LDLM_CALLBACK: CDEBUG(D_INODE, "callback\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); - rc = _ldlm_callback(req); + rc = _ldlm_callback(svc, req); break; default: @@ -226,11 +278,11 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc, RETURN(rc); } + EXIT; out: if (rc) RETURN(ptlrpc_error(svc, req)); - else - RETURN(ptlrpc_reply(svc, req)); + return 0; } static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, @@ -271,13 +323,15 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, return err; } -static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) +static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) { struct ldlm_obd *ldlm = &obddev->u.ldlm; int err; ENTRY; - ldlm_spinlock = SPIN_LOCK_UNLOCKED; + obddev->obd_namespace = ldlm_namespace_new(obddev, 0); + if (obddev->obd_namespace == NULL) + LBUG(); ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, @@ -290,6 +344,11 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) CERROR("cannot start thread\n"); LBUG(); } + err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm"); + if (err) { + CERROR("cannot start thread\n"); + LBUG(); + } OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); if (ldlm->ldlm_client == NULL) @@ -302,78 +361,13 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) RETURN(0); } -static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) -{ - struct list_head *tmp, *pos; - int rc = 0; - - list_for_each_safe(tmp, pos, q) { - struct ldlm_lock *lock; - - if (rc) { - /* Res was already cleaned up. */ - LBUG(); - } - - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - - ldlm_resource_del_lock(lock); - ldlm_lock_free(lock); - rc = ldlm_resource_put(res); - } - - return rc; -} - -static int do_free_namespace(struct ldlm_namespace *ns) -{ - struct list_head *tmp, *pos; - int i, rc; - - for (i = 0; i < RES_HASH_SIZE; i++) { - list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { - struct ldlm_resource *res; - res = list_entry(tmp, struct ldlm_resource, lr_hash); - list_del_init(&res->lr_hash); - - rc = cleanup_resource(res, &res->lr_granted); - if (!rc) - rc = cleanup_resource(res, &res->lr_converting); - if (!rc) - rc = cleanup_resource(res, &res->lr_waiting); - - while (rc == 0) - rc = ldlm_resource_put(res); - } - } - - return ldlm_namespace_free(ns); -} - -static int ldlm_free_all(struct obd_device *obddev) -{ - struct list_head *tmp, *pos; - int rc = 0; - - ldlm_lock(); - - list_for_each_safe(tmp, pos, &ldlm_namespaces) { - struct ldlm_namespace *ns; - ns = list_entry(tmp, struct ldlm_namespace, ns_link); - - rc |= do_free_namespace(ns); - } - - ldlm_unlock(); - - return rc; -} - static int ldlm_cleanup(struct obd_device *obddev) { struct ldlm_obd *ldlm = &obddev->u.ldlm; ENTRY; + ldlm_namespace_free(obddev->obd_namespace); + ptlrpc_stop_all_threads(ldlm->ldlm_service); rpc_unregister_service(ldlm->ldlm_service); @@ -385,11 +379,6 @@ static int ldlm_cleanup(struct obd_device *obddev) OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service)); - if (ldlm_free_all(obddev)) { - CERROR("ldlm_free_all could not complete.\n"); - RETURN(-1); - } - MOD_DEC_USE_COUNT; RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 7c5aa44..eaee614 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -14,59 +14,41 @@ #include -#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000)) - -static int is_local_conn(struct ptlrpc_connection *conn) -{ - ENTRY; - if (conn == NULL) - RETURN(1); - - RETURN(LOOPBACK(conn->c_peer.peer_nid)); -} - int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, - __u32 ns_id, + struct ldlm_namespace *ns, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, - ldlm_lock_callback completion, - ldlm_lock_callback blocking, void *data, __u32 data_len, - struct ldlm_handle *lockh, - struct ptlrpc_request **request) + struct ldlm_handle *lockh) { - struct ldlm_handle local_lockh; struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; char *bufs[2] = {NULL, data}; int rc, size[2] = {sizeof(*body), data_len}; - ldlm_error_t err; ENTRY; - err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id, - type, &local_lockh); - if (err != ELDLM_OK) - RETURN(err); + *flags = 0; + rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode, + NULL, 0, lockh); + if (rc != ELDLM_OK) + GOTO(out, rc); - lock = ldlm_handle2object(&local_lockh); - /* Is this lock locally managed? */ - if (is_local_conn(conn)) - GOTO(local, 0); + lock = ldlm_handle2object(lockh); + spin_unlock(&lock->l_lock); req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs); if (!req) GOTO(out, rc = -ENOMEM); /* Dump all of this data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, 0); - body->lock_desc.l_resource.lr_ns_id = ns_id; body->lock_desc.l_resource.lr_type = type; memcpy(body->lock_desc.l_resource.lr_name, res_id, sizeof(body->lock_desc.l_resource.lr_name)); @@ -77,7 +59,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, sizeof(body->lock_desc.l_extent)); body->flags = *flags; - memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1)); + memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1)); if (parent_lock_handle) memcpy(&body->lock_handle2, parent_lock_handle, @@ -90,73 +72,43 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); if (rc != ELDLM_OK) { + spin_lock(&lock->l_resource->lr_lock); ldlm_resource_put(lock->l_resource); + spin_unlock(&lock->l_resource->lr_lock); ldlm_lock_free(lock); GOTO(out, rc); } lock->l_connection = conn; + lock->l_client = cl; reply = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); + memcpy(req_ex, &reply->lock_extent, sizeof(*req_ex)); *flags = reply->flags; - CERROR("remote handle: %p, flags: %d\n", + CDEBUG(D_INFO, "remote handle: %p, flags: %d\n", (void *)(unsigned long)reply->lock_handle.addr, *flags); - CERROR("extent: %Lu -> %Lu\n", + CDEBUG(D_INFO, "extent: %Lu -> %Lu\n", (unsigned long long)reply->lock_extent.start, (unsigned long long)reply->lock_extent.end); - EXIT; - local: - rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, - completion, blocking, data, data_len); + ptlrpc_free_req(req); + + rc = ldlm_local_lock_enqueue(lockh, req_ex, flags, NULL, NULL); + if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV)) { + LDLM_FL_BLOCK_CONV)) { /* Go to sleep until the lock is granted. */ /* FIXME: or cancelled. */ + CDEBUG(D_NET, "enqueue returned a blocked lock (%p), " + "going to sleep.\n", lock); + ldlm_lock_dump(lock); wait_event_interruptible(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); + CDEBUG(D_NET, "waking up, the lock must be granted.\n"); } - out: - *request = req; - return rc; -} - -int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, __u32 ns_id) -{ - struct ldlm_namespace *ns; - struct ldlm_request *body; - struct ptlrpc_request *req; - int rc, size = sizeof(*body); - ENTRY; - - if (is_local_conn(conn)) - GOTO(local, 0); - - req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - body = lustre_msg_buf(req->rq_reqmsg, 0); - body->lock_desc.l_resource.lr_ns_id = ns_id; - - req->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - if (rc) - GOTO(out, rc); - EXIT; - local: - rc = ldlm_namespace_new(obddev, ns_id, &ns); - if (rc != ELDLM_OK) { - /* XXX: It succeeded remotely but failed locally. What to do? */ - CERROR("Local ldlm_namespace_new failed.\n"); - } out: return rc; } @@ -181,7 +133,11 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, memcpy(&body->lock_handle1, &lock->l_remote_handle, sizeof(body->lock_handle1)); - if (new != NULL) { + if (new == NULL) { + CDEBUG(D_NET, "Sending granted AST\n"); + ldlm_lock2desc(lock, &body->lock_desc); + } else { + CDEBUG(D_NET, "Sending blocked AST\n"); ldlm_lock2desc(new, &body->lock_desc); ldlm_object2handle(new, &body->lock_handle2); } @@ -198,20 +154,18 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, } int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh, - int new_mode, int *flags, struct ptlrpc_request **request) + int new_mode, int *flags) { struct ldlm_request *body; - struct ldlm_reply *reply; struct ldlm_lock *lock; - struct ptlrpc_request *req = NULL; + struct ldlm_resource *res; + struct ptlrpc_request *req; int rc, size[2] = {sizeof(*body), 0}; char *bufs[2] = {NULL, NULL}; ENTRY; lock = ldlm_handle2object(lockh); - - if (is_local_conn(lock->l_connection)) - GOTO(local, 0); + *flags = 0; size[1] = lock->l_data_len; bufs[1] = lock->l_data; @@ -234,31 +188,43 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh, if (rc != ELDLM_OK) GOTO(out, rc); - reply = lustre_msg_buf(req->rq_repmsg, 0); - *flags = reply->flags; - + body = lustre_msg_buf(req->rq_repmsg, 0); + res = ldlm_local_lock_convert(lockh, new_mode, &body->flags); + if (res != NULL) + ldlm_reprocess_all(res); + if (lock->l_req_mode != lock->l_granted_mode) { + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + CDEBUG(D_NET, "convert returned a blocked lock, " + "going to sleep.\n"); + wait_event_interruptible(lock->l_waitq, lock->l_req_mode == + lock->l_granted_mode); + CDEBUG(D_NET, "waking up, the lock must be granted.\n"); + } EXIT; - local: - rc = ldlm_local_lock_convert(lockh, new_mode, flags); out: - *request = req; + ptlrpc_free_req(req); return rc; } -int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh, - struct ptlrpc_request **request) +int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock) { struct ldlm_request *body; - struct ldlm_lock *lock; - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; + struct ldlm_resource *res; int rc, size[2] = {sizeof(*body), 0}; char *bufs[2] = {NULL, NULL}; ENTRY; - lock = ldlm_handle2object(lockh); - - if (is_local_conn(lock->l_connection)) - GOTO(local, 0); + if (lock->l_data_len == sizeof(struct inode)) { + /* FIXME: do something better than throwing away everything */ + struct inode *inode = lock->l_data; + if (inode == NULL) + LBUG(); + down(&inode->i_sem); + invalidate_inode_pages(inode); + up(&inode->i_sem); + } size[1] = lock->l_data_len; bufs[1] = lock->l_data; @@ -275,13 +241,14 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh, rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); if (rc != ELDLM_OK) GOTO(out, rc); + res = ldlm_local_lock_cancel(lock); + if (res != NULL) + ldlm_reprocess_all(res); EXIT; - local: - rc = ldlm_local_lock_cancel(lockh); out: - *request = req; return rc; } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 5e765ee..35f046a 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -12,99 +12,97 @@ #define EXPORT_SYMTAB #define DEBUG_SUBSYSTEM S_LDLM -#include #include kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab; -LIST_HEAD(ldlm_namespaces); -spinlock_t ldlm_spinlock; - -struct ldlm_namespace *ldlm_namespace_find(__u32 id) -{ - struct list_head *tmp; - struct ldlm_namespace *res; - - res = NULL; - list_for_each(tmp, &ldlm_namespaces) { - struct ldlm_namespace *chk; - chk = list_entry(tmp, struct ldlm_namespace, ns_link); - - if ( chk->ns_id == id ) { - res = chk; - break; - } - } - - return res; -} - -/* this must be called with ldlm_lock() held */ -static int res_hash_init(struct ldlm_namespace *ns) +struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, + __u32 local) { - struct list_head *res_hash; + struct ldlm_namespace *ns; struct list_head *bucket; - if (ns->ns_hash != NULL) - RETURN(0); - - /* FIXME: this memory appears to be leaked */ - OBD_ALLOC(res_hash, sizeof(*res_hash) * RES_HASH_SIZE); - if (!res_hash) { + OBD_ALLOC(ns, sizeof(*ns)); + if (!ns) { LBUG(); - RETURN(-ENOMEM); + RETURN(NULL); } + OBD_ALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE); + if (!ns->ns_hash) { + OBD_FREE(ns, sizeof(*ns)); + LBUG(); + RETURN(NULL); + } + + ns->ns_obddev = obddev; + INIT_LIST_HEAD(&ns->ns_root_list); + ns->ns_lock = SPIN_LOCK_UNLOCKED; + ns->ns_refcount = 0; + ns->ns_local = local; - for (bucket = res_hash + RES_HASH_SIZE - 1; bucket >= res_hash; + for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) INIT_LIST_HEAD(bucket); - ns->ns_hash = res_hash; - - return 0; + return ns; } -ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id, - struct ldlm_namespace **ns_out) +static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) { - struct ldlm_namespace *ns; - int rc; + struct list_head *tmp, *pos; + int rc = 0; - if (ldlm_namespace_find(id)) - RETURN(-ELDLM_NAMESPACE_EXISTS); + list_for_each_safe(tmp, pos, q) { + struct ldlm_lock *lock; - OBD_ALLOC(ns, sizeof(*ns)); - if (!ns) { - LBUG(); - RETURN(-ENOMEM); - } + if (rc) { + /* Res was already cleaned up. */ + LBUG(); + } - ns->ns_id = id; - ns->ns_obddev = obddev; - INIT_LIST_HEAD(&ns->ns_root_list); + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + spin_lock(&lock->l_lock); + ldlm_resource_del_lock(lock); + ldlm_lock_free(lock); - rc = res_hash_init(ns); - if (rc) { - OBD_FREE(ns, sizeof(*ns)); - RETURN(rc); + rc = ldlm_resource_put(res); } - list_add(&ns->ns_link, &ldlm_namespaces); - atomic_set(&ns->ns_refcount, 0); - *ns_out = ns; - return ELDLM_OK; + return rc; } int ldlm_namespace_free(struct ldlm_namespace *ns) { - if (atomic_read(&ns->ns_refcount)) - RETURN(-EBUSY); + struct list_head *tmp, *pos; + int i, rc; + + /* We should probably take the ns_lock, but then ldlm_resource_put + * couldn't take it. Hmm. */ + for (i = 0; i < RES_HASH_SIZE; i++) { + list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_hash); + + spin_lock(&res->lr_lock); + rc = cleanup_resource(res, &res->lr_granted); + if (!rc) + rc = cleanup_resource(res, &res->lr_converting); + if (!rc) + rc = cleanup_resource(res, &res->lr_waiting); + + if (rc == 0) { + CERROR("Resource refcount nonzero after lock " + "cleanup; forcing cleanup.\n"); + res->lr_refcount = 1; + rc = ldlm_resource_put(res); + } + } + } - list_del(&ns->ns_link); OBD_FREE(ns->ns_hash, sizeof(struct list_head) * RES_HASH_SIZE); OBD_FREE(ns, sizeof(*ns)); - return 0; + return ELDLM_OK; } static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name) @@ -125,8 +123,10 @@ static struct ldlm_resource *ldlm_resource_new(void) struct ldlm_resource *res; res = kmem_cache_alloc(ldlm_resource_slab, SLAB_KERNEL); - if (res == NULL) + if (res == NULL) { LBUG(); + return NULL; + } memset(res, 0, sizeof(*res)); INIT_LIST_HEAD(&res->lr_children); @@ -136,35 +136,42 @@ static struct ldlm_resource *ldlm_resource_new(void) INIT_LIST_HEAD(&res->lr_waiting); res->lr_lock = SPIN_LOCK_UNLOCKED; - - atomic_set(&res->lr_refcount, 1); + res->lr_refcount = 1; return res; } -/* ldlm_lock() must be taken before calling resource_add */ +/* Args: locked namespace + * Returns: newly-allocated, referenced, unlocked resource */ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u64 *name, __u32 type) { struct list_head *bucket; struct ldlm_resource *res; + ENTRY; - bucket = ns->ns_hash + ldlm_hash_fn(parent, name); + if (type < 0 || type > LDLM_MAX_TYPE) { + LBUG(); + RETURN(NULL); + } res = ldlm_resource_new(); - if (!res) + if (!res) { LBUG(); + RETURN(NULL); + } memcpy(res->lr_name, name, sizeof(res->lr_name)); res->lr_namespace = ns; - if (type < 0 || type > LDLM_MAX_TYPE) - LBUG(); + ns->ns_refcount++; res->lr_type = type; res->lr_most_restr = LCK_NL; + + bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_add(&res->lr_hash, bucket); - atomic_inc(&ns->ns_refcount); + if (parent == NULL) { res->lr_parent = res; list_add(&res->lr_rootlink, &ns->ns_root_list); @@ -173,49 +180,70 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, list_add(&res->lr_childof, &parent->lr_children); } - return res; + RETURN(res); } +/* Args: unlocked namespace + * Locks: takes and releases ns->ns_lock and res->lr_lock + * Returns: referenced, unlocked ldlm_resource or NULL */ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u64 *name, __u32 type, int create) { struct list_head *bucket; struct list_head *tmp = bucket; - struct ldlm_resource *res; + struct ldlm_resource *res = NULL; ENTRY; if (ns->ns_hash == NULL) RETURN(NULL); + + spin_lock(&ns->ns_lock); bucket = ns->ns_hash + ldlm_hash_fn(parent, name); - res = NULL; list_for_each(tmp, bucket) { struct ldlm_resource *chk; chk = list_entry(tmp, struct ldlm_resource, lr_hash); if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) { res = chk; - atomic_inc(&res->lr_refcount); + spin_lock(&res->lr_lock); + res->lr_refcount++; + spin_unlock(&res->lr_lock); + EXIT; break; } } if (res == NULL && create) res = ldlm_resource_add(ns, parent, name, type); + spin_unlock(&ns->ns_lock); RETURN(res); } +/* Args: locked resource + * Locks: takes and releases res->lr_lock + * takes and releases ns->ns_lock iff res->lr_refcount falls to 0 + */ int ldlm_resource_put(struct ldlm_resource *res) { - int rc = 0; + int rc = 0; - if (atomic_read(&res->lr_refcount) <= 0) - LBUG(); + if (res->lr_refcount == 1) { + struct ldlm_namespace *ns = res->lr_namespace; + ENTRY; + + spin_unlock(&res->lr_lock); + spin_lock(&ns->ns_lock); + spin_lock(&res->lr_lock); + + if (res->lr_refcount != 1) { + spin_unlock(&ns->ns_lock); + goto out; + } - if (atomic_dec_and_test(&res->lr_refcount)) { if (!list_empty(&res->lr_granted)) LBUG(); @@ -225,29 +253,41 @@ int ldlm_resource_put(struct ldlm_resource *res) if (!list_empty(&res->lr_waiting)) LBUG(); - atomic_dec(&res->lr_namespace->ns_refcount); + if (!list_empty(&res->lr_children)) + LBUG(); + + ns->ns_refcount--; list_del(&res->lr_hash); list_del(&res->lr_rootlink); list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); + spin_unlock(&ns->ns_lock); rc = 1; + } else { + ENTRY; + out: + res->lr_refcount--; + if (res->lr_refcount < 0) + LBUG(); } - return rc; + RETURN(rc); } +/* Must be called with resource->lr_lock taken */ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { list_add(&lock->l_res_link, head); - atomic_inc(&res->lr_refcount); + res->lr_refcount++; } +/* Must be called with resource->lr_lock taken */ void ldlm_resource_del_lock(struct ldlm_lock *lock) { - list_del(&lock->l_res_link); - atomic_dec(&lock->l_resource->lr_refcount); + list_del_init(&lock->l_res_link); + lock->l_resource->lr_refcount--; } int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) @@ -258,7 +298,6 @@ int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) { - desc->lr_ns_id = res->lr_namespace->ns_id; desc->lr_type = res->lr_type; memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name)); memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version)); @@ -278,8 +317,7 @@ void ldlm_resource_dump(struct ldlm_resource *res) (unsigned long long)res->lr_name[2]); CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name); - CDEBUG(D_OTHER, "Namespace: %p (%u)\n", res->lr_namespace, - res->lr_namespace->ns_id); + CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace); CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root); CDEBUG(D_OTHER, "Granted locks:\n"); @@ -303,4 +341,3 @@ void ldlm_resource_dump(struct ldlm_resource *res) ldlm_lock_dump(lock); } } - diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 20b2301..e6efaf2 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -30,21 +30,21 @@ int ldlm_test_basics(struct obd_device *obddev) struct ldlm_handle lockh_1, lockh_2; int flags; - ldlm_lock(); - - err = ldlm_namespace_new(obddev, 1, &ns); - if (err != ELDLM_OK) + ns = ldlm_namespace_new(obddev, 0); + if (ns == NULL) LBUG(); - err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_1); - err = ldlm_local_lock_enqueue(&lockh_1, LCK_CR, NULL, &flags, NULL, - ldlm_test_callback, NULL, 0); + err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_CR, + NULL, 0, &lockh_1); + err = ldlm_local_lock_enqueue(&lockh_1, NULL, &flags, + ldlm_test_callback, ldlm_test_callback); if (err != ELDLM_OK) LBUG(); - err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_2); - err = ldlm_local_lock_enqueue(&lockh_2, LCK_EX, NULL, &flags, NULL, - ldlm_test_callback, NULL, 0); + err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_EX, + NULL, 0, &lockh_2); + err = ldlm_local_lock_enqueue(&lockh_2, NULL, &flags, + ldlm_test_callback, ldlm_test_callback); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) @@ -55,12 +55,12 @@ int ldlm_test_basics(struct obd_device *obddev) LBUG(); ldlm_resource_dump(res); - err = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags); - if (err != ELDLM_OK) - LBUG(); + res = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags); + if (res != NULL) + ldlm_reprocess_all(res); ldlm_resource_dump(res); - ldlm_unlock(); + ldlm_namespace_free(ns); return 0; } @@ -69,40 +69,39 @@ int ldlm_test_extents(struct obd_device *obddev) { struct ldlm_namespace *ns; struct ldlm_resource *res; + struct ldlm_lock *lock; __u64 res_id[RES_NAME_SIZE] = {0, 0, 0}; struct ldlm_extent ext1 = {4, 6}, ext2 = {6, 9}, ext3 = {10, 11}; struct ldlm_handle ext1_h, ext2_h, ext3_h; ldlm_error_t err; int flags; - ldlm_lock(); - - err = ldlm_namespace_new(obddev, 2, &ns); - if (err != ELDLM_OK) + ns = ldlm_namespace_new(obddev, 0); + if (ns == NULL) LBUG(); flags = 0; - err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext1_h); - err = ldlm_local_lock_enqueue(&ext1_h, LCK_PR, &ext1, &flags, NULL, - NULL, NULL, 0); + err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR, + NULL, 0, &ext1_h); + err = ldlm_local_lock_enqueue(&ext1_h, &ext1, &flags, NULL, NULL); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext2_h); - err = ldlm_local_lock_enqueue(&ext2_h, LCK_PR, &ext2, &flags, NULL, - NULL, NULL, 0); + err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR, + NULL, 0, &ext2_h); + err = ldlm_local_lock_enqueue(&ext2_h, &ext2, &flags, NULL, NULL); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext3_h); - err = ldlm_local_lock_enqueue(&ext3_h, LCK_EX, &ext3, &flags, NULL, - NULL, NULL, 0); + err = ldlm_local_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_EX, + NULL, 0, &ext3_h); + err = ldlm_local_lock_enqueue(&ext3_h, &ext3, &flags, NULL, NULL); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) @@ -112,22 +111,22 @@ int ldlm_test_extents(struct obd_device *obddev) /* Convert/cancel blocking locks */ flags = 0; - err = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags); - if (err != ELDLM_OK) - LBUG(); + res = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags); + if (res != NULL) + ldlm_reprocess_all(res); flags = 0; - err = ldlm_local_lock_cancel(&ext2_h); - if (err != ELDLM_OK) - LBUG(); + lock = ldlm_handle2object(&ext2_h); + res = ldlm_local_lock_cancel(lock); + if (res != NULL) + ldlm_reprocess_all(res); /* Dump the results */ res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0); if (res == NULL) LBUG(); ldlm_resource_dump(res); - - ldlm_unlock(); + ldlm_namespace_free(ns); return 0; } @@ -136,36 +135,19 @@ static int ldlm_test_network(struct obd_device *obddev, struct ptlrpc_connection *conn) { struct ldlm_obd *ldlm = &obddev->u.ldlm; - struct ptlrpc_request *request; __u64 res_id[RES_NAME_SIZE] = {1, 2, 3}; struct ldlm_extent ext = {4, 6}; - struct ldlm_handle lockh1, lockh2; + struct ldlm_handle lockh1; int flags = 0; ldlm_error_t err; - err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, conn, 3); - ptlrpc_free_req(request); - CERROR("ldlm_cli_namespace_new: %d\n", err); - if (err != ELDLM_OK) - GOTO(out, err); - - err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3, + err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags, - NULL, NULL, NULL, 0, &lockh1, &request); - ptlrpc_free_req(request); - CERROR("ldlm_cli_enqueue: %d\n", err); - - flags = 0; - err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3, - NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags, - NULL, NULL, NULL, 0, &lockh2, &request); - ptlrpc_free_req(request); + NULL, 0, &lockh1); CERROR("ldlm_cli_enqueue: %d\n", err); - EXIT; - out: - return err; + RETURN(err); } int ldlm_test(struct obd_device *obddev, struct ptlrpc_connection *conn) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 672b38c..4be688a 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -23,22 +23,9 @@ * (jj@sunsite.ms.mff.cuni.cz) */ -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_LLITE -#include +#include #include int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc); @@ -48,23 +35,23 @@ extern inline struct obdo * ll_oa_from_inode(struct inode *inode, static int ll_file_open(struct inode *inode, struct file *file) { - int rc; + int rc; struct ptlrpc_request *req = NULL; struct ll_file_data *fd; struct obdo *oa; struct ll_sb_info *sbi = ll_i2sbi(inode); ENTRY; - if (file->private_data) + if (file->private_data) LBUG(); - fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL); + fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL); if (!fd) GOTO(out, rc = -ENOMEM); memset(fd, 0, sizeof(*fd)); rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino, - S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); + S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); fd->fd_req = req; ptlrpc_req_finished(req); if (rc) { @@ -79,21 +66,19 @@ static int ll_file_open(struct inode *inode, struct file *file) oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID)); if (oa == NULL) LBUG(); - rc = obd_open(ll_i2obdconn(inode), oa); + rc = obd_open(ll_i2obdconn(inode), oa); obdo_free(oa); if (rc) { /* XXX: Need to do mdc_close here! */ - if (rc > 0) - rc = -rc; - GOTO(out, rc); + GOTO(out, rc = abs(rc)); } file->private_data = fd; - EXIT; + EXIT; out: if (rc && fd) { - kmem_cache_free(ll_file_data_slab, fd); + kmem_cache_free(ll_file_data_slab, fd); file->private_data = NULL; } @@ -112,7 +97,7 @@ static int ll_file_release(struct inode *inode, struct file *file) ENTRY; fd = (struct ll_file_data *)file->private_data; - if (!fd || !fd->fd_mdshandle) { + if (!fd || !fd->fd_mdshandle) { LBUG(); GOTO(out, rc = -EINVAL); } @@ -120,20 +105,17 @@ static int ll_file_release(struct inode *inode, struct file *file) oa = ll_oa_from_inode(inode, (OBD_MD_FLMODE | OBD_MD_FLID)); if (oa == NULL) LBUG(); - rc = obd_close(ll_i2obdconn(inode), oa); + rc = obd_close(ll_i2obdconn(inode), oa); obdo_free(oa); - if (rc) { - if (rc > 0) - rc = -rc; - GOTO(out, rc); - } + if (rc) + GOTO(out, abs(rc)); iattr.ia_valid = ATTR_SIZE; iattr.ia_size = inode->i_size; rc = ll_inode_setattr(inode, &iattr, 0); if (rc) { CERROR("failed - %d.\n", rc); - rc = -EIO; + rc = -EIO; /* XXX - GOTO(out)? -phil */ } rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino, @@ -149,8 +131,8 @@ static int ll_file_release(struct inode *inode, struct file *file) EXIT; out: - if (!rc && fd) { - kmem_cache_free(ll_file_data_slab, fd); + if (!rc && fd) { + kmem_cache_free(ll_file_data_slab, fd); file->private_data = NULL; } return rc; @@ -172,6 +154,53 @@ static inline void ll_remove_suid(struct inode *inode) } } +static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, + loff_t *ppos) +{ + struct inode *inode = filp->f_dentry->d_inode; +#if 0 + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_extent extent; + struct ldlm_handle lockh; + __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; + int flags = 0; + ldlm_error_t err; +#endif + ssize_t retval; + ENTRY; + +#if 0 + extent.start = *ppos; + extent.end = *ppos + count; + CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", + inode->i_ino, extent.start, extent.end); + + err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id, + LDLM_EXTENT, &extent, LCK_PR, &flags, inode, + sizeof(*inode), &lockh); + if (err != ELDLM_OK) + CERROR("lock enqueue: err: %d\n", err); + ldlm_lock_dump((void *)(unsigned long)lockh.addr); +#endif + + CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n", + inode->i_ino, count, *ppos); + retval = generic_file_read(filp, buf, count, ppos); + if (retval > 0) { + struct iattr attr; + attr.ia_valid = ATTR_ATIME; + attr.ia_atime = CURRENT_TIME; + ll_setattr(filp->f_dentry, &attr); + } + +#if 0 + err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh); + if (err != ELDLM_OK) + CERROR("lock cancel: err: %d\n", err); +#endif + + RETURN(retval); +} /* * Write to a file (through the page cache). @@ -179,12 +208,36 @@ static inline void ll_remove_suid(struct inode *inode) static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) { + struct inode *inode = file->f_dentry->d_inode; +#if 0 + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_extent extent; + struct ldlm_handle lockh; + __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; + int flags = 0; + ldlm_error_t err; +#endif ssize_t retval; + ENTRY; + +#if 0 + extent.start = *ppos; + extent.end = *ppos + count; + CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", + inode->i_ino, extent.start, extent.end); + + err = obd_enqueue(&sbi->ll_conn, sbi->ll_namespace, NULL, res_id, + LDLM_EXTENT, &extent, LCK_PW, &flags, inode, + sizeof(*inode), &lockh); + if (err != ELDLM_OK) + CERROR("lock enqueue: err: %d\n", err); + ldlm_lock_dump((void *)(unsigned long)lockh.addr); +#endif + CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n", - file->f_dentry->d_inode->i_ino, (long)count, *ppos); + inode->i_ino, (long)count, *ppos); retval = generic_file_write(file, buf, count, ppos); - CDEBUG(D_INFO, "Wrote %ld\n", (long)retval); /* update mtime/ctime/atime here, NOT size */ if (retval > 0) { @@ -194,20 +247,25 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) CURRENT_TIME; ll_setattr(file->f_dentry, &attr); } - EXIT; - return retval; -} +#if 0 + err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh); + if (err != ELDLM_OK) + CERROR("lock cancel: err: %d\n", err); +#endif + + RETURN(retval); +} /* XXX this does not need to do anything for data, it _does_ need to - call setattr */ + call setattr */ int ll_fsync(struct file *file, struct dentry *dentry, int data) { return 0; } struct file_operations ll_file_operations = { - read: generic_file_read, + read: ll_file_read, write: ll_file_write, open: ll_file_open, release: ll_file_release, @@ -215,9 +273,7 @@ struct file_operations ll_file_operations = { fsync: NULL }; - struct inode_operations ll_file_inode_operations = { truncate: ll_truncate, setattr: ll_setattr }; - diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 21b2556..49b19ad 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -14,13 +14,12 @@ * */ -#include -#include - #define DEBUG_SUBSYSTEM S_LLITE +#include #include #include +#include kmem_cache_t *ll_file_data_slab; extern struct address_space_operations ll_aops; @@ -121,6 +120,12 @@ static struct super_block * ll_read_super(struct super_block *sb, GOTO(out_free, sb = NULL); } + sbi->ll_namespace = ldlm_namespace_new(NULL, 1); + if (sbi->ll_namespace == NULL) { + CERROR("failed to create local lock namespace\n"); + GOTO(out_free, sb = NULL); + } + ptlrpc_init_client(ptlrpc_connmgr, ll_recover, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &sbi->ll_mds_client); @@ -193,6 +198,8 @@ out_disc: obd_disconnect(&sbi->ll_conn); out_free: MOD_DEC_USE_COUNT; + if (sbi->ll_namespace) + ldlm_namespace_free(sbi->ll_namespace); OBD_FREE(sbi, sizeof(*sbi)); } if (device) @@ -209,6 +216,7 @@ static void ll_put_super(struct super_block *sb) ENTRY; ll_commitcbd_cleanup(sbi); obd_disconnect(&sbi->ll_conn); + ldlm_namespace_free(sbi->ll_namespace); ptlrpc_put_connection(sbi->ll_mds_conn); ptlrpc_cleanup_client(&sbi->ll_mds_client); OBD_FREE(sb->u.generic_sbp, sizeof(*sbi)); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 7b33e6c..8f0b85c 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -31,6 +31,14 @@ static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl, *connection = osc->osc_conn; } +static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl, + struct ptlrpc_connection **connection) +{ + struct osc_obd *osc = &conn->oc_dev->u.osc; + *cl = osc->osc_ldlm_client; + *connection = osc->osc_conn; +} + static int osc_connect(struct obd_conn *conn) { struct ptlrpc_request *request; @@ -345,43 +353,35 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; + struct ptlrpc_bulk_desc *bulk; + int rc; + ENTRY; osc_con2cl(conn, &cl, &connection); - if (cl->cli_obd) { - /* local sendpage */ - memcpy((char *)(unsigned long)dst->addr, - (char *)(unsigned long)src->addr, src->len); - } else { - struct ptlrpc_bulk_desc *bulk; - int rc; - - bulk = ptlrpc_prep_bulk(connection); - if (bulk == NULL) - RETURN(-ENOMEM); - - bulk->b_buf = (void *)(unsigned long)src->addr; - bulk->b_buflen = src->len; - bulk->b_xid = dst->xid; - rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); - if (rc != 0) { - CERROR("send_bulk failed: %d\n", rc); - ptlrpc_free_bulk(bulk); - LBUG(); - RETURN(rc); - } - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); + bulk = ptlrpc_prep_bulk(connection); + if (bulk == NULL) + RETURN(-ENOMEM); - if (bulk->b_flags & PTL_RPC_FL_INTR) { - ptlrpc_free_bulk(bulk); - RETURN(-EINTR); - } + bulk->b_buf = (void *)(unsigned long)src->addr; + bulk->b_buflen = src->len; + bulk->b_xid = dst->xid; + rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); + if (rc != 0) { + CERROR("send_bulk failed: %d\n", rc); + ptlrpc_free_bulk(bulk); + LBUG(); + RETURN(rc); + } + wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk)); + if (bulk->b_flags & PTL_RPC_FL_INTR) { ptlrpc_free_bulk(bulk); + RETURN(-EINTR); } - return 0; + ptlrpc_free_bulk(bulk); + RETURN(0); } int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, @@ -392,18 +392,16 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, struct ptlrpc_connection *connection; struct ptlrpc_request *request; struct ost_body *body; - struct obd_ioobj ioo; - struct niobuf src; int pages, rc, i, j, size[3] = {sizeof(*body)}; void *ptr1, *ptr2; struct ptlrpc_bulk_desc **bulk; ENTRY; - size[1] = num_oa * sizeof(ioo); + size[1] = num_oa * sizeof(struct obd_ioobj); pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size[2] = pages * sizeof(src); + size[2] = pages * sizeof(struct niobuf); OBD_ALLOC(bulk, pages * sizeof(*bulk)); if (bulk == NULL) @@ -475,7 +473,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, struct obd_ioobj ioo; struct ost_body *body; struct niobuf *src; - int pages, rc, i, j, size[3] = {sizeof(*body)}; + long pages; + int rc, i, j, size[3] = {sizeof(*body)}; void *ptr1, *ptr2; ENTRY; @@ -519,7 +518,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, GOTO(out, rc = -EINVAL); if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) { - CERROR("buffer length wrong (%d vs. %d)\n", + CERROR("buffer length wrong (%d vs. %ld)\n", request->rq_repmsg->buflens[1], pages * sizeof(struct niobuf)); GOTO(out, rc = -EINVAL); @@ -554,6 +553,74 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, offset, flags); } +int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns, + struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type, + struct ldlm_extent *extent, __u32 mode, int *flags, void *data, + int datalen, struct ldlm_handle *lockh) +{ + struct ptlrpc_connection *conn; + struct ptlrpc_client *cl; + int rc; + __u32 mode2; + + /* Filesystem locks are given a bit of special treatment: first we + * fixup the lock to start and end on page boundaries. */ + extent->start &= PAGE_MASK; + extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK; + + /* Next, search for already existing extent locks that will cover us */ + osc_con2dlmcl(oconn, &cl, &conn); + rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh); + if (rc == 1) { + /* We already have a lock, and it's referenced */ + return 0; + } + + /* Next, search for locks that we can upgrade (if we're trying to write) + * or are more than we need (if we're trying to read). Because the VFS + * and page cache already protect us locally, lots of readers/writers + * can share a single PW lock. */ + if (mode == LCK_PW) + mode2 = LCK_PR; + else + mode2 = LCK_PW; + + rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh); + if (rc == 1) { + int flags; + struct ldlm_lock *lock = ldlm_handle2object(lockh); + /* FIXME: This is not incredibly elegant, but it might + * be more elegant than adding another parameter to + * lock_match. I want a second opinion. */ + ldlm_lock_addref(lock, mode); + ldlm_lock_decref(lock, mode2); + + if (mode == LCK_PR) + return 0; + + rc = ldlm_cli_convert(cl, lockh, type, &flags); + if (rc) + LBUG(); + + return rc; + } + + rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type, + extent, mode, flags, data, datalen, lockh); + return rc; +} + +int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = ldlm_handle2object(lockh); + ldlm_lock_decref(lock, mode); + + RETURN(0); +} + static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) { struct osc_obd *osc = &obddev->u.osc; @@ -613,7 +680,9 @@ struct obd_ops osc_obd_ops = { o_connect: osc_connect, o_disconnect: osc_disconnect, o_brw: osc_brw, - o_punch: osc_punch + o_punch: osc_punch, + o_enqueue: osc_enqueue, + o_cancel: osc_cancel }; static int __init osc_init(void) diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index e49e989..8d56058 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -595,7 +595,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); if (err) GOTO(error_disc, err = -EINVAL); -#if 0 +#if 1 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); if (err) GOTO(error_disc, err = -EINVAL); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index e66aa053..9523a21 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -361,7 +361,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) init_waitqueue_head(&req->rq_wait_for_rep); resend: req->rq_time = CURRENT_TIME; - req->rq_timeout = 3; + req->rq_timeout = 30; rc = ptl_send_rpc(req); if (rc) { CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);