int ldlm_expired_completion_wait(void *data)
{
- static unsigned long next_dump = 0;
struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
struct obd_import *imp;
struct obd_device *obd;
if (lock->l_conn_export == NULL) {
+ static unsigned long next_dump = 0;
+
LDLM_ERROR(lock, "lock timed out; not entering recovery in "
"server code, just going back to sleep");
if (time_after(jiffies, next_dump)) {
- unsigned int debug = portal_debug;
- portal_debug |= D_OTHER;
ldlm_namespace_dump(lock->l_resource->lr_namespace);
- portal_debug = debug;
if (next_dump == 0)
portals_debug_dumplog();
next_dump = jiffies + 300 * HZ;
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
- /* XXX ALLOCATE - 160 mytes */
+ /* XXX ALLOCATE - 160 bytes */
struct lock_wait_data lwd;
unsigned long irqflags;
struct obd_device *obd;
struct obd_import *imp = NULL;
- int rc = 0;
struct l_wait_info lwi;
+ int rc = 0;
ENTRY;
if (flags == LDLM_FL_WAIT_NOREPROC)
goto noreproc;
- if (flags == 0) {
+ if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV))) {
wake_up(&lock->l_waitq);
RETURN(0);
}
- if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV)))
- RETURN(0);
-
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
"sleeping");
ldlm_lock_dump(D_OTHER, lock, 0);
lwd.lwd_lock = lock;
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
- interrupted_completion_wait, &lwd);
+ if (flags & LDLM_FL_NO_TIMEOUT) {
+ LDLM_DEBUG(lock, "waiting indefinitely for group lock\n");
+ lwi = LWI_INTR(interrupted_completion_wait, &lwd);
+ } else {
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ,
+ ldlm_expired_completion_wait,
+ interrupted_completion_wait, &lwd);
+ }
+
if (imp != NULL) {
spin_lock_irqsave(&imp->imp_lock, irqflags);
lwd.lwd_generation = imp->imp_generation;
/* Go to sleep until the lock is granted or cancelled. */
rc = l_wait_event(lock->l_waitq,
((lock->l_req_mode == lock->l_granted_mode) ||
- (lock->l_flags & LDLM_FL_CANCEL)), &lwi);
+ (lock->l_flags & LDLM_FL_FAILED)), &lwi);
- if (lock->l_destroyed) {
+ if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
RETURN(-EIO);
}
}
static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
- struct lustre_handle *parent_lockh,
struct ldlm_res_id res_id,
__u32 type,
- void *cookie, int cookielen,
+ ldlm_policy_data_t *policy,
ldlm_mode_t mode,
int *flags,
- ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
- void *data,
+ ldlm_completion_callback completion,
+ ldlm_glimpse_callback glimpse,
+ void *data, __u32 lvb_len,
+ void *lvb_swabber,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
LBUG();
}
- lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
- blocking, completion, data);
+ lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ completion, glimpse, data, lvb_len);
if (!lock)
GOTO(out_nolock, err = -ENOMEM);
LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
-
- err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
+
+ err = ldlm_lock_enqueue(ns, &lock, policy, flags);
if (err != ELDLM_OK)
GOTO(out, err);
- if (type != LDLM_PLAIN)
- memcpy(cookie, &lock->l_policy_data, cookielen);
+ if (policy != NULL)
+ memcpy(policy, &lock->l_policy_data, sizeof(*policy));
if ((*flags) & LDLM_FL_LOCK_CHANGED)
memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
int ldlm_cli_enqueue(struct obd_export *exp,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
- struct lustre_handle *parent_lock_handle,
struct ldlm_res_id res_id,
__u32 type,
- void *cookie, int cookielen,
+ ldlm_policy_data_t *policy,
ldlm_mode_t mode,
int *flags,
- ldlm_completion_callback completion,
ldlm_blocking_callback blocking,
+ ldlm_completion_callback completion,
+ ldlm_glimpse_callback glimpse,
void *data,
+ void *lvb,
+ __u32 lvb_len,
+ void *lvb_swabber,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
+ int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
+ int is_replay = *flags & LDLM_FL_REPLAY;
+ int cleanup_phase = 0;
ENTRY;
- is_replay = *flags & LDLM_FL_REPLAY;
- LASSERT(exp != NULL || !is_replay);
-
if (exp == NULL) {
- rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
- type, cookie, cookielen, mode,
- flags, completion, blocking, data,
+ LASSERT(!is_replay);
+ rc = ldlm_cli_enqueue_local(ns, res_id, type, policy, mode,
+ flags, blocking, completion,
+ glimpse, data, lvb_len, lvb_swabber,
lockh);
RETURN(rc);
}
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
- mode, blocking, completion, data);
+ lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ completion, glimpse, data, lvb_len);
if (lock == NULL)
- GOTO(out_nolock, rc = -ENOMEM);
+ RETURN(-ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
- if (type != LDLM_PLAIN)
- memcpy(&lock->l_policy_data, cookie, cookielen);
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_req_extent, &policy->l_extent,
+ sizeof(policy->l_extent));
LDLM_DEBUG(lock, "client-side enqueue START");
}
+ /* lock not sent to server yet */
+ cleanup_phase = 2;
+
if (req == NULL) {
req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
- &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
+ size, NULL);
+ if (req == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
req_passed_in = 0;
} else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
LBUG();
body->lock_flags = *flags;
memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
- if (parent_lock_handle)
- memcpy(&body->lock_handle2, parent_lock_handle,
- sizeof(body->lock_handle2));
/* Continue as normal. */
if (!req_passed_in) {
- size = sizeof(*reply);
- req->rq_replen = lustre_msg_size(1, &size);
+ int buffers = 1;
+ if (lvb_len > 0)
+ buffers = 2;
+ size[0] = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(buffers, size);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
LASSERT(!is_replay);
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
- failed_lock_cleanup(ns, lock, lockh, mode);
if (rc == ELDLM_LOCK_ABORTED) {
/* Before we return, swab the reply */
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_req, rc = -EPROTO);
+ rc = -EPROTO;
+ }
+ if (lvb_len) {
+ void *tmplvb;
+ tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
+ lvb_swabber);
+ if (tmplvb == NULL)
+ GOTO(cleanup, rc = -EPROTO);
+ if (lvb != NULL)
+ memcpy(lvb, tmplvb, lvb_len);
}
}
- GOTO(out_req, rc);
+ GOTO(cleanup, rc);
}
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_req, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
}
+ /* XXX - Phil, wasn't sure if this shoiuld go before or after the
+ /* lustre_swab_repbuf() ? If we can't unpack the reply then we
+ /* don't know what occurred on the server so I think the safest
+ /* bet is to cleanup the lock as if it didn't make it ? */
+
+ /* lock enqueued on the server */
+ cleanup_phase = 1;
+
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
lock, reply->lock_handle.cookie, *flags);
- if (type == LDLM_EXTENT) {
- CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
- "extent "LPU64" -> "LPU64"\n",
- body->lock_desc.l_policy_data.l_extent.start,
- body->lock_desc.l_policy_data.l_extent.end,
- reply->lock_policy_data.l_extent.start,
- reply->lock_policy_data.l_extent.end);
-
- cookie = &reply->lock_policy_data; /* FIXME bug 267 */
- cookielen = sizeof(struct ldlm_extent);
- } else if (type == LDLM_FLOCK) {
- cookie = &reply->lock_policy_data;
- cookielen = sizeof(struct ldlm_flock);
- }
/* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it
* again. */
if ((*flags) & LDLM_FL_LOCK_CHANGED) {
- int newmode = reply->lock_mode;
+ int newmode = reply->lock_desc.l_req_mode;
LASSERT(!is_replay);
if (newmode && newmode != lock->l_req_mode) {
LDLM_DEBUG(lock, "server returned different mode %s",
lock->l_req_mode = newmode;
}
- if (reply->lock_resource_name.name[0] !=
- lock->l_resource->lr_name.name[0]) {
+ if (reply->lock_desc.l_resource.lr_name.name[0] !=
+ lock->l_resource->lr_name.name[0] ||
+ reply->lock_desc.l_resource.lr_name.name[1] !=
+ lock->l_resource->lr_name.name[1]) {
CDEBUG(D_INFO, "remote intent success, locking %ld "
"instead of %ld\n",
- (long)reply->lock_resource_name.name[0],
+ (long)reply->lock_desc.l_resource.lr_name.name[0],
(long)lock->l_resource->lr_name.name[0]);
ldlm_lock_change_resource(ns, lock,
- reply->lock_resource_name);
+ reply->lock_desc.l_resource.lr_name);
if (lock->l_resource == NULL) {
LBUG();
- GOTO(out_req, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
}
LDLM_DEBUG(lock, "client-side enqueue, new resource");
}
+ if (policy != NULL)
+ memcpy(&lock->l_policy_data,
+ &reply->lock_desc.l_policy_data,
+ sizeof(reply->lock_desc.l_policy_data));
+ if (type != LDLM_PLAIN)
+ LDLM_DEBUG(lock,"client-side enqueue, new policy data");
}
+
if ((*flags) & LDLM_FL_AST_SENT) {
l_lock(&ns->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
+ /* If the lock has already been granted by a completion AST, don't
+ * clobber the LVB with an older one. */
+ if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
+ void *tmplvb;
+ tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
+ if (tmplvb == NULL)
+ GOTO(cleanup, rc = -EPROTO);
+ memcpy(lock->l_lvb_data, tmplvb, lvb_len);
+ }
+
if (!is_replay) {
- rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
+ rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
if (lock->l_completion_ast != NULL) {
int err = lock->l_completion_ast(lock, *flags, NULL);
- if (err)
- failed_lock_cleanup(ns, lock, lockh, mode);
if (!rc)
rc = err;
}
}
+ if (lvb_len && lvb != NULL) {
+ /* Copy the LVB here, and not earlier, because the completion
+ * AST (if any) can override what we got in the reply */
+ memcpy(lvb, lock->l_lvb_data, lvb_len);
+ }
+
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
- out_req:
- if (!req_passed_in)
- ptlrpc_req_finished(req);
- out:
+cleanup:
+ switch (cleanup_phase) {
+ case 2:
+ if (rc)
+ failed_lock_cleanup(ns, lock, lockh, mode);
+ case 1:
+ if (!req_passed_in && req != NULL)
+ ptlrpc_req_finished(req);
+ }
+
LDLM_LOCK_PUT(lock);
- out_nolock:
return rc;
}
-int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
-{
- struct lustre_handle lockh;
- struct ldlm_res_id junk;
- int flags = LDLM_FL_REPLAY;
- ldlm_lock2handle(lock, &lockh);
- return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk,
- lock->l_resource->lr_type, NULL, 0, -1, &flags,
- NULL, NULL, NULL, &lockh);
-}
-
static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
int *flags)
{
rc = ptlrpc_queue_wait(req);
if (rc == ESTALE) {
- CERROR("client/server (nid "LPU64") out of sync--not "
- "fatal\n",
- req->rq_import->imp_connection->c_peer.peer_nid);
+ char str[PTL_NALFMT_SIZE];
+ CERROR("client/server (nid %s) out of sync"
+ " -- not fatal\n",
+ ptlrpc_peernid2str(&req->rq_import->
+ imp_connection->c_peer, str));
} else if (rc == -ETIMEDOUT) {
ptlrpc_req_finished(req);
GOTO(restart, rc);
return rc;
}
-int ldlm_cancel_lru(struct ldlm_namespace *ns)
+/* when called with LDLM_ASYNC the blocking callback will be handled
+ * in a thread and this function will return after the thread has been
+ * asked to call the callback. when called with LDLM_SYNC the blocking
+ * callback will be performed in this function. */
+int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
{
- struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+ struct list_head *tmp, *next;
+ struct ldlm_lock *lock;
int count, rc = 0;
- struct ldlm_ast_work *w;
+ LIST_HEAD(cblist);
ENTRY;
l_lock(&ns->ns_lock);
}
list_for_each_safe(tmp, next, &ns->ns_unused_list) {
- struct ldlm_lock *lock;
+
lock = list_entry(tmp, struct ldlm_lock, l_lru);
LASSERT(!lock->l_readers && !lock->l_writers);
* won't see this flag and call l_blocking_ast */
lock->l_flags |= LDLM_FL_CBPENDING;
- OBD_ALLOC(w, sizeof(*w));
- LASSERT(w);
-
- w->w_lock = LDLM_LOCK_GET(lock);
- list_add(&w->w_list, &list);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
+ if (sync == LDLM_ASYNC)
+ ldlm_bl_to_thread(ns, NULL, lock);
+ else
+ list_add(&lock->l_lru, &cblist);
if (--count == 0)
break;
}
l_unlock(&ns->ns_lock);
- list_for_each_safe(tmp, next, &list) {
- struct lustre_handle lockh;
- int rc;
- w = list_entry(tmp, struct ldlm_ast_work, w_list);
-
- ldlm_lock2handle(w->w_lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
-
- list_del(&w->w_list);
- LDLM_LOCK_PUT(w->w_lock);
- OBD_FREE(w, sizeof(*w));
+ list_for_each_safe(tmp, next, &cblist) {
+ lock = list_entry(tmp, struct ldlm_lock, l_lru);
+ list_del_init(&lock->l_lru);
+ ldlm_handle_bl_callback(ns, NULL, lock);
}
-
RETURN(rc);
}
if (opaque != NULL && lock->l_ast_data != opaque) {
LDLM_ERROR(lock, "data %p doesn't match opaque %p",
lock->l_ast_data, opaque);
- //LBUG();
continue;
}
if (lock->l_readers || lock->l_writers) {
- if (flags & LDLM_FL_WARN) {
+ if (flags & LDLM_FL_CONFIG_CHANGE)
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ else if (flags & LDLM_FL_WARN)
LDLM_ERROR(lock, "lock in use");
- //LBUG();
- }
continue;
}
w->w_lock = LDLM_LOCK_GET(lock);
- /* Prevent the cancel callback from being called by setting
- * LDLM_FL_CANCEL in the lock. Very sneaky. -p */
- if (flags & LDLM_FL_NO_CALLBACK)
- w->w_lock->l_flags |= LDLM_FL_CANCEL;
-
list_add(&w->w_list, &list);
}
l_unlock(&ns->ns_lock);
RETURN(0);
}
+static inline int have_no_nsresource(struct ldlm_namespace *ns)
+{
+ int no_resource = 0;
+
+ spin_lock(&ns->ns_counter_lock);
+ if (ns->ns_resources == 0)
+ no_resource = 1;
+ spin_unlock(&ns->ns_counter_lock);
+
+ RETURN(no_resource);
+}
+
/* Cancel all locks on a namespace (or a specific resource, if given)
* that have 0 readers/writers.
*
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
* If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
- * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use.
+ * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback
+ */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
struct ldlm_res_id *res_id, int flags, void *opaque)
{
int i;
+ struct l_wait_info lwi = { 0 };
ENTRY;
if (ns == NULL)
l_lock(&ns->ns_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *pos;
- list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ struct list_head *tmp, *next;
+ list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
int rc;
struct ldlm_resource *res;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
ldlm_resource_getref(res);
+ l_unlock(&ns->ns_lock);
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
flags, opaque);
-
if (rc)
CERROR("cancel_unused_res ("LPU64"): %d\n",
res->lr_name.name[0], rc);
+
+ l_lock(&ns->ns_lock);
+ next = tmp->next;
ldlm_resource_putref(res);
}
}
l_unlock(&ns->ns_lock);
+ if (flags & LDLM_FL_CONFIG_CHANGE)
+ l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
RETURN(ELDLM_OK);
}
struct ptlrpc_request *req;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int size;
+ int buffers = 1;
+ int size[2];
int flags;
/*
else
flags = LDLM_FL_REPLAY;
- size = sizeof(*body);
- req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
+ size[0] = sizeof(*body);
+ req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
body->lock_flags = flags;
ldlm_lock2handle(lock, &body->lock_handle1);
- size = sizeof(*reply);
- req->rq_replen = lustre_msg_size(1, &size);
+ size[0] = sizeof(*reply);
+ if (lock->l_lvb_len != 0) {
+ buffers = 2;
+ size[1] = lock->l_lvb_len;
+ }
+ req->rq_replen = lustre_msg_size(buffers, size);
LDLM_DEBUG(lock, "replaying lock:");
INIT_LIST_HEAD(&list);
LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+ LASSERT(ns != NULL);
/* ensure this doesn't fall to 0 before all have been queued */
atomic_inc(&imp->imp_replay_inflight);