yet.
- API improvements.
- Documentation update.
AUTOMAKE_OPTIONS = foreign
# NOTE: keep extN before mds
-SUBDIRS = ptlrpc llite lib ldlm obdecho mdc osc extN mds ost
+SUBDIRS = ldlm ptlrpc llite lib obdecho mdc osc extN mds ost
SUBDIRS+= utils tests obdfilter obdclass obdfs demos doc scripts
EXTRA_DIST = BUGS FDL Rules include patches
typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh,
struct ldlm_lock_desc *new, void *data,
- __u32 data_len, struct ptlrpc_request **req);
+ __u32 data_len);
struct ldlm_lock {
__u64 l_random;
void *lr_tmp;
};
+struct ldlm_ast_work {
+ struct ldlm_lock *w_lock;
+ int w_blocking;
+ struct ldlm_lock_desc w_desc;
+ struct list_head w_list;
+ void *w_data;
+ int w_datalen;
+};
+
static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
{
return (struct ldlm_extent *)(res->lr_name);
void ldlm_lock_put(struct ldlm_lock *lock);
void ldlm_lock_destroy(struct ldlm_lock *lock);
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode);
-void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
+void ldlm_lock_addref_internal(struct ldlm_lock* , __u32 mode);
+void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_grant_lock(struct ldlm_lock *lock);
int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
void *cookie, int cookielen, ldlm_mode_t mode,
void *data,
__u32 data_len,
struct lustre_handle *lockh);
-int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
- void *data, __u32 data_len, struct ptlrpc_request **reqp);
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
+ void *data, __u32 data_len);
int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
int new_mode, int *flags);
int ldlm_cli_cancel(struct lustre_handle *);
struct obd_device *cli_obd;
__u32 cli_request_portal;
__u32 cli_reply_portal;
+
__u64 cli_last_rcvd;
__u64 cli_last_committed;
__u32 cli_target_devno;
memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
}
-static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new)
{
- struct lustre_handle lockh;
- struct ldlm_lock_desc desc;
- struct ptlrpc_request *req = NULL;
+ struct ldlm_ast_work *w;
ENTRY;
-
-
+
+ OBD_ALLOC(w, sizeof(*w));
+ if (!w) {
+ LBUG();
+ return;
+ }
+
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (lock->l_flags & LDLM_FL_AST_SENT) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- RETURN(0);
+ ldlm_lock_get(lock);
+ if (new) {
+ w->w_blocking = 1;
+ ldlm_lock2desc(new, &w->w_desc);
}
-
+ w->w_lock = lock;
lock->l_flags |= LDLM_FL_AST_SENT;
- /* FIXME: this should merely add the lock to the lr_tmp list */
- ldlm_lock2handle(lock, &lockh);
- ldlm_lock2desc(new, &desc);
- lock->l_blocking_ast(&lockh, &desc, lock->l_data, lock->l_data_len,
- &req);
+ list_add(&w->w_list, lock->l_resource->lr_tmp);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ return;
+}
- if (req != NULL) {
- struct list_head *list = lock->l_resource->lr_tmp;
- list_add(&req->rq_multi, list);
- }
- RETURN(1);
+void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
+{
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(lockh);
+ ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock_put(lock);
}
-/* Args: unlocked lock */
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
+/* only called for local locks */
+void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
{
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
}
/* Args: unlocked lock */
-void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
{
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
ENTRY;
if (lock == NULL)
LBUG();
+ LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers--;
ldlm_lock2handle(lock, &lockh);
lock->l_blocking_ast(&lockh, NULL, lock->l_data,
- lock->l_data_len, NULL);
+ lock->l_data_len);
} else
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
rc = 0;
- CDEBUG(D_OTHER, "compat function failed and lock modes "
- "incompat\n");
if (send_cbs && child->l_blocking_ast != NULL) {
CDEBUG(D_OTHER, "incompatible; sending blocking "
"AST.\n");
- /* It's very difficult to actually send the AST from
- * here, because we'd have to drop the lock before going
- * to sleep to wait for the reply. Instead we build the
- * packet and send it later. */
- ldlm_send_blocking_ast(child, lock);
+ ldlm_add_ast_work_item(child, lock);
}
}
void ldlm_grant_lock(struct ldlm_lock *lock)
{
struct ldlm_resource *res = lock->l_resource;
- struct ptlrpc_request *req = NULL;
ENTRY;
l_lock(&lock->l_resource->lr_namespace->ns_lock);
res->lr_most_restr = lock->l_granted_mode;
if (lock->l_completion_ast) {
- struct lustre_handle lockh;
-
- /* FIXME: this should merely add lock to lr_tmp list */
- ldlm_lock2handle(lock, &lockh);
- lock->l_completion_ast(&lockh, NULL, lock->l_data,
- lock->l_data_len, &req);
- if (req != NULL) {
- struct list_head *list = res->lr_tmp;
- if (list == NULL) {
- LBUG();
- return;
- }
- list_add(&req->rq_multi, list);
- }
+ ldlm_add_ast_work_item(lock, NULL);
}
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
EXIT;
lock->l_extent.end < extent->end))
continue;
- ldlm_lock_addref(lock, mode);
+ ldlm_lock_addref_internal(lock, mode);
return lock;
}
if (lock)
wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
lock->l_granted_mode);
-
+ if (rc)
+ LDLM_DEBUG(lock, "matched");
+ else
+ LDLM_DEBUG(lock, "not matched");
return rc;
}
list_del_init(&pending->l_res_link);
ldlm_grant_lock(pending);
-
- ldlm_lock_addref(pending, pending->l_req_mode);
- ldlm_lock_decref(pending, pending->l_granted_mode);
}
RETURN(0);
}
-static void ldlm_send_delayed_asts(struct list_head *rpc_list)
+static void ldlm_run_ast_work(struct list_head *rpc_list)
{
struct list_head *tmp, *pos;
+ int rc;
ENTRY;
list_for_each_safe(tmp, pos, rpc_list) {
- int rc;
- struct ptlrpc_request *req =
- list_entry(tmp, struct ptlrpc_request, rq_multi);
-
- CDEBUG(D_INFO, "Sending callback.\n");
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- if (rc)
- CERROR("Callback send failed: %d\n", rc);
+ struct ldlm_ast_work *w =
+ list_entry(tmp, struct ldlm_ast_work, w_list);
+ struct lustre_handle lockh;
+
+ ldlm_lock2handle(w->w_lock, &lockh);
+ if (w->w_blocking) {
+ rc = w->w_lock->l_blocking_ast(&lockh, &w->w_desc, w->w_data, w->w_datalen);
+ } else {
+ rc = w->w_lock->l_completion_ast(&lockh, NULL, w->w_data, w->w_datalen);
+ }
+ if (rc) {
+ CERROR("Failed AST - should clean & disconnect client\n");
+ }
+ ldlm_lock_put(w->w_lock);
+ list_del(&w->w_list);
+ OBD_FREE(w, sizeof(*w));
}
EXIT;
}
res->lr_tmp = NULL;
l_unlock(&res->lr_namespace->ns_lock);
- ldlm_send_delayed_asts(&rpc_list);
+ ldlm_run_ast_work(&rpc_list);
EXIT;
}
flags = dlm_req->lock_flags;
err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
- ldlm_cli_callback, ldlm_cli_callback);
+ ldlm_server_ast, ldlm_server_ast);
if (err != ELDLM_OK)
GOTO(out, err);
ldlm_lock2handle(lock, &lockh);
lock->l_blocking_ast(&lockh, descp,
lock->l_data,
- lock->l_data_len, NULL);
+ lock->l_data_len);
}
} else {
LDLM_DEBUG(lock, "Lock still has references, will be"
if (lock == NULL)
GOTO(out, rc = -ENOMEM);
/* for the local lock, add the reference */
- ldlm_lock_addref(lock, mode);
+ ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
return rc;
}
-int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
- void *data, __u32 data_len, struct ptlrpc_request **reqp)
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
+ void *data, __u32 data_len)
{
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ptlrpc_client *cl =
- &lock->l_resource->lr_namespace->ns_rpc_client;
+ struct ptlrpc_client *cl;
int rc = 0, size = sizeof(*body);
ENTRY;
lock = ldlm_handle2lock(lockh);
if (lock == NULL)
LBUG();
-
+ cl = &lock->l_resource->lr_namespace->ns_rpc_client;
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
&size, NULL);
if (!req)
req->rq_replen = lustre_msg_size(0, NULL);
- if (reqp == NULL) {
- LBUG();
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- } else
- *reqp = req;
-
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
EXIT;
out:
GOTO(out, rc);
ldlm_lock_cancel(lock);
- ldlm_reprocess_all(lock->l_resource);
ldlm_lock_put(lock);
EXIT;
out:
static int ldlm_test_callback(struct lustre_handle *lockh,
struct ldlm_lock_desc *new,
- void *data, __u32 data_len,
- struct ptlrpc_request **reqp)
+ void *data, __u32 data_len)
{
printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new);
return 0;
void ll_intent_release(struct dentry *de)
{
- struct ldlm_lock *lock;
struct lustre_handle *handle;
ENTRY;
if (de->d_it->it_lock_mode) {
handle = (struct lustre_handle *)de->d_it->it_lock_handle;
- lock = lustre_handle2object(handle);
- LDLM_DEBUG(lock, "calling ldlm_lock_decref(%d)",
- de->d_it->it_lock_mode);
- ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+ ldlm_lock_decref(handle, de->d_it->it_lock_mode);
}
de->d_it = NULL;
EXIT;
int ll_unlock(__u32 mode, struct lustre_handle *lockh)
{
- struct ldlm_lock *lock;
ENTRY;
- lock = lustre_handle2object(lockh);
- ldlm_lock_decref(lock, mode);
+ ldlm_lock_decref(lockh, mode);
RETURN(0);
}
struct mds_body *body;
struct dentry *de = NULL, *dchild = NULL;
struct inode *dir;
- struct ldlm_lock *lock;
struct lustre_handle lockh;
char *name;
int namelen, flags, lock_mode, rc = 0;
CERROR("lock enqueue: err: %d\n", rc);
GOTO(out_create_de, rc = -EIO);
}
- } else {
- lock = lustre_handle2object(&lockh);
- LDLM_DEBUG(lock, "matched");
}
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
out_create_dchild:
l_dput(dchild);
up(&dir->i_sem);
- lock = lustre_handle2object(&lockh);
- ldlm_lock_decref(lock, lock_mode);
+ ldlm_lock_decref(&lockh, lock_mode);
out_create_de:
l_dput(de);
out_pre_de:
struct dentry *dchild = NULL;
struct inode *dir;
void *handle;
- struct ldlm_lock *lock;
struct lustre_handle lockh;
int rc = 0, err, flags, lock_mode, type = rec->ur_mode & S_IFMT;
__u64 res_id[3] = {0,0,0};
CERROR("lock enqueue: err: %d\n", rc);
GOTO(out_create_de, rc = -EIO);
}
- } else {
- lock = ldlm_handle2lock(&lockh);
- LDLM_DEBUG(lock, "matched");
}
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
out_create_dchild:
l_dput(dchild);
up(&dir->i_sem);
- lock = lustre_handle2object(&lockh);
- ldlm_lock_decref(lock, lock_mode);
+ ldlm_lock_decref(&lockh, lock_mode);
out_create_de:
l_dput(de);
req->rq_status = rc;
int lock_mode, flags;
__u64 res_id[3] = {0};
struct lustre_handle lockh;
- struct ldlm_lock *lock;
void *handle;
int rc = 0;
int err;
CERROR("lock enqueue: err: %d\n", rc);
GOTO(out_unlink_de, rc = -EIO);
}
- } else {
- lock = lustre_handle2object(&lockh);
- LDLM_DEBUG(lock, "matched");
- }
+ } else
+
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
down(&dir->i_sem);
l_dput(dchild);
out_unlink_de:
up(&dir->i_sem);
- lock = lustre_handle2object(&lockh);
- ldlm_lock_decref(lock, lock_mode);
+ ldlm_lock_decref(&lockh
+, lock_mode);
if (!rc) {
/* Take an exclusive lock on the resource that we're
* about to free, to force everyone to drop their
l_dput(de);
if (!rc) {
- lock = lustre_handle2object(&lockh);
- ldlm_lock_decref(lock, LCK_EX);
+ ldlm_lock_decref(&lockh, LCK_EX);
rc = ldlm_cli_cancel(&lockh);
if (rc < 0)
CERROR("failed to cancel child inode lock ino "
sizeof(extent), mode2, lockh);
if (rc == 1) {
int flags;
- struct ldlm_lock *lock = lustre_handle2object(lockh);
/* FIXME: This is not incredibly elegant, but it might
* be more elegant than adding another parameter to
* lock_match. I want a second opinion. */
- ldlm_lock_addref(lock, mode);
- ldlm_lock_decref(lock, mode2);
+ ldlm_lock_addref(lockh, mode);
+ ldlm_lock_decref(lockh, mode2);
if (mode == LCK_PR)
return 0;
static int osc_cancel(struct obd_conn *oconn, __u32 mode,
struct lustre_handle *lockh)
{
- struct ldlm_lock *lock;
ENTRY;
- lock = lustre_handle2object(lockh);
- ldlm_lock_decref(lock, mode);
+ ldlm_lock_decref(lockh, mode);
RETURN(0);
}