continue;
/* lock_convert() takes the resource lock, so we're sure that
- * req_mode, lr_type, and l_cookie won't change beneath us */
+ * req_mode and lr_type won't change beneath us */
if (lock->l_req_mode != mode)
continue;
if (res->lr_type == LDLM_EXTENT)
memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
- /* policies are not executed on the client */
+ /* policies are not executed on the client or during replay */
local = res->lr_namespace->ns_client;
- if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
+ if (!local && !(*flags & LDLM_FL_REPLAY) &&
+ (policy = ldlm_res_policy_table[res->lr_type])) {
int rc;
rc = policy(lock, cookie, lock->l_req_mode, NULL);
}
}
- lock->l_cookie = cookie;
- lock->l_cookie_len = cookie_len;
-
l_lock(&res->lr_namespace->ns_lock);
if (local && lock->l_req_mode == lock->l_granted_mode) {
/* The server returned a blocked lock, but it was granted before
* namespace only has information about locks taken by that client, and
* thus doesn't have enough information to decide for itself if it can
* be granted (below). In this case, we do exactly what the server
- * tells us to do, as dictated by the 'flags' */
+ * tells us to do, as dictated by the 'flags'.
+ *
+ * We do exactly the same thing during recovery, when the server is
+ * more or less trusting the clients not to lie.
+ *
+ * FIXME (bug 629283): Detect obvious lies by checking compatibility in
+ * granted/converting queues. */
ldlm_resource_unlink_lock(lock);
- if (local) {
+ if (local || (*flags & LDLM_FL_REPLAY)) {
if (*flags & LDLM_FL_BLOCK_CONV)
ldlm_resource_add_lock(res, res->lr_converting.prev,
lock);
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int rc, size = sizeof(*body), req_passed_in = 1;
+ int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
ENTRY;
+ is_replay = *flags & LDLM_FL_REPLAY;
+ LASSERT(connh != NULL || !is_replay);
+
if (connh == NULL)
return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
type, cookie, cookielen, mode,
flags, completion, blocking, data,
data_len, lockh);
- *flags = 0;
- lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
- data, data_len);
- if (lock == NULL)
- GOTO(out_nolock, rc = -ENOMEM);
- LDLM_DEBUG(lock, "client-side enqueue START");
- /* for the local lock, add the reference */
- ldlm_lock_addref_internal(lock, mode);
- ldlm_lock2handle(lock, lockh);
+ /* If we're replaying this lock, just check some invariants.
+ * If we're creating a new lock, get everything all setup nice. */
+ if (is_replay) {
+ lock = ldlm_handle2lock(lockh);
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ LASSERT(connh == lock->l_connh);
+ } else {
+ lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
+ mode, data, data_len);
+ if (lock == NULL)
+ GOTO(out_nolock, rc = -ENOMEM);
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ /* for the local lock, add the reference */
+ ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock2handle(lock, lockh);
+ if (type == LDLM_EXTENT)
+ memcpy(&lock->l_extent, cookie,
+ sizeof(body->lock_desc.l_extent));
+ }
if (req == NULL) {
req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
} else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
LBUG();
- /* Dump all of this data into the request buffer */
+ /* Dump lock data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, 0);
ldlm_lock2desc(lock, &body->lock_desc);
- /* Phil: make this part of ldlm_lock2desc */
- if (type == LDLM_EXTENT) {
- memcpy(&body->lock_desc.l_extent, cookie,
- sizeof(body->lock_desc.l_extent));
- CDEBUG(D_INFO, "extent in body: "LPU64" -> "LPU64"\n",
- body->lock_desc.l_extent.start,
- body->lock_desc.l_extent.end);
- }
body->lock_flags = *flags;
memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
lock->l_connh = connh;
lock->l_export = NULL;
+ LDLM_DEBUG(lock, "sending request");
rc = ptlrpc_queue_wait(req);
- /* FIXME: status check here? */
rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK) {
+ LASSERT(!is_replay);
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
ldlm_lock_decref(lockh, mode);
reply = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
- if (type == LDLM_EXTENT)
- memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
*flags = reply->lock_flags;
CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
(void *)(unsigned long)reply->lock_handle.addr, *flags);
- CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got extent "
- LPU64" -> "LPU64"\n",
- body->lock_desc.l_extent.start, body->lock_desc.l_extent.end,
- reply->lock_extent.start, reply->lock_extent.end);
+ if (type == LDLM_EXTENT) {
+ CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
+ "extent "LPU64" -> "LPU64"\n",
+ body->lock_desc.l_extent.start,
+ body->lock_desc.l_extent.end,
+ reply->lock_extent.start, reply->lock_extent.end);
+ cookie = &reply->lock_extent; /* FIXME bug 629281 */
+ cookielen = sizeof(reply->lock_extent);
+ }
/* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it
* again. */
if ((*flags) & LDLM_FL_LOCK_CHANGED) {
int newmode = reply->lock_mode;
+ LASSERT(!is_replay);
if (newmode && newmode != lock->l_req_mode) {
LDLM_DEBUG(lock, "server returned different mode %s",
ldlm_lockname[newmode]);
if (!req_passed_in)
ptlrpc_req_finished(req);
- rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
- blocking);
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, *flags);
+ if (!is_replay) {
+ rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags,
+ completion, blocking);
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, *flags);
+ }
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
RETURN(0);
}
+int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
+{
+ struct lustre_handle lockh;
+ int flags = LDLM_FL_REPLAY;
+ ldlm_lock2handle(lock, &lockh);
+ return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, NULL,
+ lock->l_resource->lr_type, NULL, 0, -1, &flags,
+ NULL, NULL, NULL, 0, &lockh);
+}
+
static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
int *flags)
{
-
+ ENTRY;
if (lock->l_resource->lr_namespace->ns_client) {
CERROR("Trying to cancel local lock\n");
LBUG();
connh = lock->l_connh;
if (!connh)
- return ldlm_cli_convert_local(lock, new_mode, flags);
+ RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
LDLM_DEBUG(lock, "client-side convert");
return rc;
}
-/* Cancel all locks on a given resource that have 0 readers/writers.
- *
- * If 'local_only' is true, throw the locks away without trying to notify the
- * server. */
-int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
- int local_only)
+static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+ __u64 *res_id, int local_only)
{
struct ldlm_resource *res;
struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
RETURN(0);
}
+
+/* Cancel all locks on a namespace (or a specific resource, if given) that have
+ * 0 readers/writers.
+ *
+ * If 'local_only' is true, throw the locks away without trying to notify the
+ * server. */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
+ int local_only)
+{
+ int i;
+
+ if (res_id)
+ RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, local_only));
+
+ l_lock(&ns->ns_lock);
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ struct list_head *tmp, *pos;
+ list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+ int rc;
+ struct ldlm_resource *res;
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ ldlm_resource_getref(res);
+
+ rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
+ local_only);
+
+ if (rc)
+ CERROR("cancel_unused_res ("LPU64"): %d\n",
+ res->lr_name[0], rc);
+ ldlm_resource_put(res);
+ }
+ }
+ l_unlock(&ns->ns_lock);
+
+ return ELDLM_OK;
+}
inode = de->d_inode;
CDEBUG(D_INODE, "ino %ld\n", inode->i_ino);
- OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
+ OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
to_kdev_t(inode->i_sb->s_dev));
handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
"child missing (%ld/%s); OK for REPLAYING\n",
dir->i_ino, rec->ur_name);
rc = 0;
- } else {
+ } else {
CDEBUG(D_INODE,
"child doesn't exist (dir %ld, name %s)\n",
dir->i_ino, rec->ur_name);
mds_pack_inode2body(body, inode);
}
- OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
+ OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
to_kdev_t(dir->i_sb->s_dev));
switch (rec->ur_mode /* & S_IFMT ? */) {
struct dentry *de_new = NULL;
struct mds_obd *mds = mds_req2mds(req);
struct lustre_handle tgtlockh, srclockh, oldhandle;
- int flags, lock_mode, rc = 0, err;
+ int flags = 0, lock_mode, rc = 0, err;
void *handle;
__u64 res_id[3] = { 0 };
ENTRY;
rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &tgtlockh);
if (rc == 0) {
+ flags = 0;
LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
out_rename_deold:
if (!rc) {
res_id[0] = de_old->d_inode->i_ino;
+ flags = 0;
/* Take an exclusive lock on the resource that we're
* about to free, to force everyone to drop their
* locks. */