struct ldlm_reply *reply;
int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
int is_replay = *flags & LDLM_FL_REPLAY;
+ int cleanup_phase = 0;
ENTRY;
if (exp == NULL) {
lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
completion, glimpse, data, lvb_len);
if (lock == NULL)
- GOTO(out_nolock, rc = -ENOMEM);
+ RETURN(-ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
}
+ /* lock not sent to server yet */
+ cleanup_phase = 2;
+
if (req == NULL) {
req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
size, NULL);
if (req == NULL)
- GOTO(out_lock, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
req_passed_in = 0;
} else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
LBUG();
tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
lvb_swabber);
if (tmplvb == NULL)
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
if (lvb != NULL)
memcpy(lvb, tmplvb, lvb_len);
}
}
- GOTO(out_lock, rc);
+ GOTO(cleanup, rc);
}
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
}
+ /* XXX - Phil, wasn't sure if this shoiuld go before or after the
+ /* lustre_swab_repbuf() ? If we can't unpack the reply then we
+ /* don't know what occurred on the server so I think the safest
+ /* bet is to cleanup the lock as if it didn't make it ? */
+
+ /* lock enqueued on the server */
+ cleanup_phase = 1;
+
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
}
if (reply->lock_desc.l_resource.lr_name.name[0] !=
- lock->l_resource->lr_name.name[0]) {
+ lock->l_resource->lr_name.name[0] ||
+ reply->lock_desc.l_resource.lr_name.name[1] !=
+ lock->l_resource->lr_name.name[1]) {
CDEBUG(D_INFO, "remote intent success, locking %ld "
"instead of %ld\n",
(long)reply->lock_desc.l_resource.lr_name.name[0],
reply->lock_desc.l_resource.lr_name);
if (lock->l_resource == NULL) {
LBUG();
- GOTO(out_lock, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
}
LDLM_DEBUG(lock, "client-side enqueue, new resource");
}
void *tmplvb;
tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
if (tmplvb == NULL)
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
memcpy(lock->l_lvb_data, tmplvb, lvb_len);
}
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
- out_lock:
- if (rc)
- failed_lock_cleanup(ns, lock, lockh, mode);
- if (!req_passed_in && req != NULL)
- ptlrpc_req_finished(req);
+cleanup:
+ switch (cleanup_phase) {
+ case 2:
+ if (rc)
+ failed_lock_cleanup(ns, lock, lockh, mode);
+ case 1:
+ if (!req_passed_in && req != NULL)
+ ptlrpc_req_finished(req);
+ }
+
LDLM_LOCK_PUT(lock);
- out_nolock:
return rc;
}
rc = ptlrpc_queue_wait(req);
if (rc == ESTALE) {
- CERROR("client/server (nid "LPU64") out of sync--not "
- "fatal\n",
- req->rq_import->imp_connection->c_peer.peer_nid);
+ char str[PTL_NALFMT_SIZE];
+ CERROR("client/server (nid %s) out of sync"
+ " -- not fatal\n",
+ ptlrpc_peernid2str(&req->rq_import->
+ imp_connection->c_peer, str));
} else if (rc == -ETIMEDOUT) {
ptlrpc_req_finished(req);
GOTO(restart, rc);
return rc;
}
-int ldlm_cancel_lru(struct ldlm_namespace *ns)
+/* when called with LDLM_ASYNC the blocking callback will be handled
+ * in a thread and this function will return after the thread has been
+ * asked to call the callback. when called with LDLM_SYNC the blocking
+ * callback will be performed in this function. */
+int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
{
struct list_head *tmp, *next;
-#ifndef __KERNEL__
- LIST_HEAD(cblist);
-#endif
+ struct ldlm_lock *lock;
int count, rc = 0;
+ LIST_HEAD(cblist);
ENTRY;
l_lock(&ns->ns_lock);
}
list_for_each_safe(tmp, next, &ns->ns_unused_list) {
- struct ldlm_lock *lock;
+
lock = list_entry(tmp, struct ldlm_lock, l_lru);
LASSERT(!lock->l_readers && !lock->l_writers);
LDLM_LOCK_GET(lock); /* dropped by bl thread */
ldlm_lock_remove_from_lru(lock);
-#if __KERNEL__
- ldlm_bl_to_thread(ns, NULL, lock);
-#else
- list_add(&lock->l_lru, &cblist);
-#endif
+ if (sync == LDLM_ASYNC)
+ ldlm_bl_to_thread(ns, NULL, lock);
+ else
+ list_add(&lock->l_lru, &cblist);
if (--count == 0)
break;
}
l_unlock(&ns->ns_lock);
-#ifndef __KERNEL__
- while (!list_empty(&cblist)) {
- struct ldlm_lock *lock;
- lock = list_entry(cblist.next, struct ldlm_lock, l_lru);
+ list_for_each_safe(tmp, next, &cblist) {
+ lock = list_entry(tmp, struct ldlm_lock, l_lru);
list_del_init(&lock->l_lru);
- liblustre_ldlm_handle_bl_callback(ns, NULL, lock);
+ ldlm_handle_bl_callback(ns, NULL, lock);
}
-#endif
RETURN(rc);
}
if (opaque != NULL && lock->l_ast_data != opaque) {
LDLM_ERROR(lock, "data %p doesn't match opaque %p",
lock->l_ast_data, opaque);
- //LBUG();
continue;
}
if (lock->l_readers || lock->l_writers) {
- if (flags & LDLM_FL_WARN) {
+ if (flags & LDLM_FL_CONFIG_CHANGE)
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ else if (flags & LDLM_FL_WARN)
LDLM_ERROR(lock, "lock in use");
- //LBUG();
- }
continue;
}
RETURN(0);
}
+static inline int have_no_nsresource(struct ldlm_namespace *ns)
+{
+ int no_resource = 0;
+
+ spin_lock(&ns->ns_counter_lock);
+ if (ns->ns_resources == 0)
+ no_resource = 1;
+ spin_unlock(&ns->ns_counter_lock);
+
+ RETURN(no_resource);
+}
+
/* Cancel all locks on a namespace (or a specific resource, if given)
* that have 0 readers/writers.
*
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server.
- * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
+ * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use.
+ * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback
+ */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
struct ldlm_res_id *res_id, int flags, void *opaque)
{
int i;
+ struct l_wait_info lwi = { 0 };
ENTRY;
if (ns == NULL)
l_lock(&ns->ns_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
- struct list_head *tmp, *pos;
- list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ struct list_head *tmp, *next;
+ list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
int rc;
struct ldlm_resource *res;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
ldlm_resource_getref(res);
+ l_unlock(&ns->ns_lock);
rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
flags, opaque);
-
if (rc)
CERROR("cancel_unused_res ("LPU64"): %d\n",
res->lr_name.name[0], rc);
+
+ l_lock(&ns->ns_lock);
+ next = tmp->next;
ldlm_resource_putref(res);
}
}
l_unlock(&ns->ns_lock);
+ if (flags & LDLM_FL_CONFIG_CHANGE)
+ l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
RETURN(ELDLM_OK);
}
INIT_LIST_HEAD(&list);
LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+ LASSERT(ns != NULL);
/* ensure this doesn't fall to 0 before all have been queued */
atomic_inc(&imp->imp_replay_inflight);