RETURN(0);
}
+void mds_steal_ack_locks(struct obd_export *exp, struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
+
+ if (oldrep == NULL)
+ return;
+ memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
+ sizeof req->rq_ack_locks);
+ spin_lock_irqsave(&req->rq_lock, flags);
+ oldrep->rq_resent = 1;
+ wake_up(&oldrep->rq_reply_waitq);
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+ DEBUG_REQ(D_HA, oldrep, "stole locks from");
+ DEBUG_REQ(D_HA, req, "stole locks for");
+}
+
void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
{
DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
mds_req_from_mcd(req, med->med_mcd);
- if (req->rq_export->exp_outstanding_reply)
- mds_steal_ack_locks(req->rq_export, req);
-
de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
if (IS_ERR(de)) {
LASSERT(PTR_ERR(de) == req->rq_status);
LASSERT(offset == 0);
+ DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x\n", rec->ur_fid1->id,
+ rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
+
MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
down(&inode->i_sem);
- CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
LASSERT(offset == 0);
LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
+ DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o\n",
+ rec->ur_fid1->id, rec->ur_fid1->generation,
+ rec->ur_name, rec->ur_mode);
+
MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
cleanup_phase = 1; /* locked parent dentry */
dir = dparent->d_inode;
LASSERT(dir);
- CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
- dir->i_ino, rec->ur_name, rec->ur_mode);
ldlm_lock_dump_handle(D_OTHER, &lockh);
return 0;
}
+int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
+{
+ int i;
+
+ for (i = 0; i < RES_NAME_SIZE; i++) {
+ /* return 1 here, because enqueue_ordered will skip resources
+ * of all zeroes if they're sorted to the end of the list. */
+ if (res1->name[i] == 0 && res2->name[i] != 0)
+ return 1;
+ if (res2->name[i] == 0 && res1->name[i] != 0)
+ return 0;
+
+ if (res1->name[i] > res2->name[i])
+ return 1;
+ if (res1->name[i] < res2->name[i])
+ return 0;
+ }
+ return 0;
+}
+
/* This function doesn't use ldlm_match_or_enqueue because we're always called
* with EX or PW locks, and the MDS is no longer allowed to match write locks,
* because they take the place of local semaphores.
*
- * Two locks are taken in numerical order */
-int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
- struct ldlm_res_id *p1_res_id,
+ * One or two locks are taken in numerical order. A res_id->name[0] of 0 means
+ * no lock is taken for that res_id. Must be at least one non-zero res_id. */
+int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
+ struct lustre_handle *p1_lockh, int p1_lock_mode,
struct ldlm_res_id *p2_res_id,
- struct lustre_handle *p1_lockh,
- struct lustre_handle *p2_lockh)
+ struct lustre_handle *p2_lockh, int p2_lock_mode)
{
- struct ldlm_res_id res_id[2];
- struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
+ struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
+ struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
+ int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
int rc, flags;
ENTRY;
LASSERT(p1_res_id != NULL && p2_res_id != NULL);
CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
- p1_res_id[0].name[0], p2_res_id[0].name[0]);
+ res_id[0]->name[0], res_id[1]->name[0]);
- if (p1_res_id->name[0] < p2_res_id->name[0]) {
- handles[0] = p1_lockh;
- handles[1] = p2_lockh;
- res_id[0] = *p1_res_id;
- res_id[1] = *p2_res_id;
- } else {
+ if (res_gt(p1_res_id, p2_res_id)) {
handles[1] = p1_lockh;
handles[0] = p2_lockh;
- res_id[1] = *p1_res_id;
- res_id[0] = *p2_res_id;
+ res_id[1] = p1_res_id;
+ res_id[0] = p2_res_id;
+ lock_modes[1] = p1_lock_mode;
+ lock_modes[0] = p2_lock_mode;
}
- CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
- p1_res_id[0].name[0], p2_res_id[0].name[0]);
+ CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
+ res_id[0]->name[0], res_id[1]->name[0]);
flags = LDLM_FL_LOCAL_ONLY;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
- LDLM_PLAIN, NULL, 0, lock_mode, &flags,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0],
+ LDLM_PLAIN, NULL, 0, lock_modes[0], &flags,
ldlm_completion_ast, mds_blocking_ast, NULL,
handles[0]);
if (rc != ELDLM_OK)
RETURN(-EIO);
ldlm_lock_dump_handle(D_OTHER, handles[0]);
- if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
+ if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
memcpy(handles[1], handles[0], sizeof(*(handles[1])));
- ldlm_lock_addref(handles[1], lock_mode);
- } else {
+ ldlm_lock_addref(handles[1], lock_modes[1]);
+ } else if (res_id[1]->name[0] != 0) {
flags = LDLM_FL_LOCAL_ONLY;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_completion_ast,
+ *res_id[1], LDLM_PLAIN, NULL, 0,
+ lock_modes[1], &flags,ldlm_completion_ast,
mds_blocking_ast, NULL, handles[1]);
if (rc != ELDLM_OK) {
- ldlm_lock_decref(handles[0], lock_mode);
+ ldlm_lock_decref(handles[0], lock_modes[0]);
RETURN(-EIO);
}
+ ldlm_lock_dump_handle(D_OTHER, handles[1]);
}
- ldlm_lock_dump_handle(D_OTHER, handles[1]);
RETURN(0);
}
+int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
+ struct lustre_handle *p1_lockh, int p1_lock_mode,
+ struct ldlm_res_id *p2_res_id,
+ struct lustre_handle *p2_lockh, int p2_lock_mode,
+ struct ldlm_res_id *c1_res_id,
+ struct lustre_handle *c1_lockh, int c1_lock_mode,
+ struct ldlm_res_id *c2_res_id,
+ struct lustre_handle *c2_lockh, int c2_lock_mode)
+{
+ struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
+ c1_res_id, c2_res_id };
+ struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
+ c1_lockh, c2_lockh };
+ int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
+ c1_lock_mode, c2_lock_mode };
+ int rc, i, j, sorted, flags;
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+ res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
+ res_id[3]->name[0]);
+
+ /* simple insertion sort - we have at most 4 elements */
+ for (i = 1; i < 4; i++) {
+ j = i - 1;
+ dlm_handles[4] = dlm_handles[i];
+ res_id[4] = res_id[i];
+ lock_modes[4] = lock_modes[i];
+
+ sorted = 0;
+ do {
+ if (res_gt(res_id[j], res_id[4])) {
+ dlm_handles[j + 1] = dlm_handles[j];
+ res_id[j + 1] = res_id[j];
+ lock_modes[j + 1] = lock_modes[j];
+ j--;
+ } else {
+ sorted = 1;
+ }
+ } while (j >= 0 && !sorted);
+
+ dlm_handles[j + 1] = dlm_handles[4];
+ res_id[j + 1] = res_id[4];
+ lock_modes[j + 1] = lock_modes[4];
+ }
+
+ CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+ res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
+ res_id[3]->name[0]);
+
+ /* XXX we could send ASTs on all these locks first before blocking? */
+ for (i = 0; i < 4; i++) {
+ flags = 0;
+ if (res_id[i]->name[0] == 0)
+ break;
+ if (i != 0 &&
+ memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
+ memcpy(dlm_handles[i], dlm_handles[i-1],
+ sizeof(*(dlm_handles[i])));
+ ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
+ } else {
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ NULL, *res_id[i], LDLM_PLAIN,
+ NULL, 0, lock_modes[i], &flags,
+ ldlm_completion_ast,
+ mds_blocking_ast, NULL,
+ dlm_handles[i]);
+ if (rc != ELDLM_OK)
+ GOTO(out_err, rc = -EIO);
+ ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
+ }
+ }
+
+ RETURN(0);
+out_err:
+ while (i-- > 0)
+ ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
+
+ return rc;
+}
+
+/* In the unlikely case that the child changed while we were waiting
+ * on the lock, we need to drop the lock on the old child and either:
+ * - if the child has a lower resource name, then we have to also
+ * drop the parent lock and regain the locks in the right order
+ * - in the rename case, if the child has a lower resource name than one of
+ * the other parent/child resources (maxres) we also need to reget the locks
+ * - if the child has a higher resource name (this is the common case)
+ * we can just get the lock on the new child (still in lock order)
+ *
+ * Returns 0 if the child did not change or if it changed but could be locked.
+ * Returns 1 if the child changed and we need to re-lock (no locks held).
+ * Returns -ve error with a valid dchild (no locks held). */
+static int mds_verify_child(struct obd_device *obd,
+ struct ldlm_res_id *parent_res_id,
+ struct lustre_handle *parent_lockh,
+ struct dentry *dparent, int parent_mode,
+ struct ldlm_res_id *child_res_id,
+ struct lustre_handle *child_lockh,
+ struct dentry **dchildp, int child_mode,
+ const char *name, int namelen,
+ struct ldlm_res_id *maxres)
+{
+ struct dentry *vchild, *dchild = *dchildp;
+ int rc = 0, cleanup_phase = 2; /* parent, child locks */
+ ENTRY;
+
+ vchild = ll_lookup_one_len(name, dparent, namelen - 1);
+ if (IS_ERR(vchild))
+ GOTO(cleanup, rc = PTR_ERR(vchild));
+
+ if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
+ (vchild->d_inode != NULL &&
+ child_res_id->name[0] == vchild->d_inode->i_ino &&
+ child_res_id->name[1] == vchild->d_inode->i_generation))) {
+ if (dchild != NULL)
+ l_dput(dchild);
+ *dchildp = vchild;
+
+ RETURN(0);
+ }
+
+ CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
+ vchild->d_inode, dchild ? dchild->d_inode : 0,
+ vchild->d_inode ? vchild->d_inode->i_ino : 0,
+ child_res_id->name[0]);
+ if (child_res_id->name[0] != 0)
+ ldlm_lock_decref(child_lockh, child_mode);
+ if (dchild)
+ l_dput(dchild);
+
+ cleanup_phase = 1; /* parent lock only */
+ *dchildp = dchild = vchild;
+
+ if (dchild->d_inode) {
+ int flags = 0;
+ child_res_id->name[0] = dchild->d_inode->i_ino;
+ child_res_id->name[1] = dchild->d_inode->i_generation;
+
+ if (res_gt(parent_res_id, child_res_id) ||
+ res_gt(maxres, child_res_id)) {
+ CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
+ child_res_id->name[0], parent_res_id->name[0],
+ maxres->name[0]);
+ GOTO(cleanup, rc = 1);
+ }
+
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ NULL, *child_res_id, LDLM_PLAIN,
+ NULL, 0, child_mode, &flags,
+ ldlm_completion_ast, mds_blocking_ast,
+ NULL, child_lockh);
+ if (rc != ELDLM_OK)
+ GOTO(cleanup, rc = -EIO);
+ } else {
+ memset(child_res_id, 0, sizeof(*child_res_id));
+ }
+
+ EXIT;
+cleanup:
+ if (rc) {
+ switch(cleanup_phase) {
+ case 2:
+ if (child_res_id->name[0] != 0)
+ ldlm_lock_decref(child_lockh, child_mode);
+ case 1:
+ ldlm_lock_decref(parent_lockh, parent_mode);
+ }
+ }
+ return rc;
+}
+
+int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
+ struct ll_fid *fid,
+ struct lustre_handle *parent_lockh,
+ struct dentry **dparentp, int parent_mode,
+ char *name, int namelen,
+ struct lustre_handle *child_lockh,
+ struct dentry **dchildp, int child_mode)
+{
+ struct ldlm_res_id child_res_id = { .name = {0} };
+ struct ldlm_res_id parent_res_id = { .name = {0} };
+ struct inode *inode;
+ int rc = 0, cleanup_phase = 0;
+ ENTRY;
+
+ /* Step 1: Lookup parent */
+ *dparentp = mds_fid2dentry(mds, fid, NULL);
+ if (IS_ERR(*dparentp))
+ RETURN(rc = PTR_ERR(*dparentp));
+ LASSERT((*dparentp)->d_inode);
+
+ CDEBUG(D_INODE, "parent ino %lu, name %s\n",
+ (*dparentp)->d_inode->i_ino, name);
+
+ parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
+ parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
+
+ cleanup_phase = 1; /* parent dentry */
+
+ /* Step 2: Lookup child (without lock, to get resource name) */
+ *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
+ if (IS_ERR(*dchildp)) {
+ rc = PTR_ERR(*dchildp);
+ CDEBUG(D_INODE, "child lookup error %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+
+ inode = (*dchildp)->d_inode;
+ if (inode != NULL)
+ inode = igrab(inode);
+ if (inode == NULL)
+ goto retry_locks;
+
+ child_res_id.name[0] = inode->i_ino;
+ child_res_id.name[1] = inode->i_generation;
+ iput(inode);
+
+retry_locks:
+ cleanup_phase = 2; /* child dentry */
+
+ /* Step 3: Lock parent and child in resource order. If child doesn't
+ * exist, we still have to lock the parent and re-lookup. */
+ rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
+ &child_res_id, child_lockh, child_mode);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ if (!(*dchildp)->d_inode)
+ cleanup_phase = 3; /* parent lock */
+ else
+ cleanup_phase = 4; /* child lock */
+
+ /* Step 4: Re-lookup child to verify it hasn't changed since locking */
+ rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
+ parent_mode, &child_res_id, child_lockh, dchildp,
+ child_mode, name, namelen, &parent_res_id);
+ if (rc > 0)
+ goto retry_locks;
+ if (rc < 0) {
+ cleanup_phase = 3;
+ GOTO(cleanup, rc);
+ }
+
+cleanup:
+ if (rc) {
+ switch (cleanup_phase) {
+ case 4:
+ ldlm_lock_decref(child_lockh, child_mode);
+ case 3:
+ ldlm_lock_decref(parent_lockh, parent_mode);
+ case 2:
+ l_dput(*dchildp);
+ case 1:
+ l_dput(*dparentp);
+ default: ;
+ }
+ }
+ return rc;
+}
+
void mds_reconstruct_generic(struct ptlrpc_request *req)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct ptlrpc_request *req,
struct lustre_handle *lh)
{
- struct dentry *dparent = NULL;
- struct dentry *dchild = NULL;
+ struct dentry *dparent, *dchild;
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_body *body = NULL;
struct inode *child_inode;
struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh;
void *handle = NULL;
- struct ldlm_res_id child_res_id = { .name = {0} };
- int rc = 0, flags = 0, log_unlink = 0, cleanup_phase = 0;
+ int rc = 0, log_unlink = 0, cleanup_phase = 0;
ENTRY;
LASSERT(offset == 0 || offset == 2);
+ DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
+ rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
+
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
GOTO(cleanup, rc = -ENOENT);
- /* Step 1: Lookup the parent by FID */
- dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
- &parent_lockh, rec->ur_name,
- rec->ur_namelen - 1);
- if (IS_ERR(dparent))
- GOTO(cleanup, rc = PTR_ERR(dparent));
- LASSERT(dparent->d_inode);
-
- cleanup_phase = 1; /* Have parent dentry lock */
-
- /* Step 2: Lookup the child */
- dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
- if (IS_ERR(dchild))
- GOTO(cleanup, rc = PTR_ERR(dchild));
+ rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
+ &parent_lockh, &dparent, LCK_PW,
+ rec->ur_name, rec->ur_namelen,
+ &child_lockh, &dchild, LCK_EX);
+ if (rc)
+ GOTO(cleanup, rc);
- cleanup_phase = 2; /* child dentry */
+ cleanup_phase = 1; /* dchild, dparent, locks */
child_inode = dchild->d_inode;
if (child_inode == NULL) {
GOTO(cleanup, rc = -ENOENT);
}
- DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
- dparent->d_inode->i_ino, child_inode->i_ino);
-
- /* Step 3: Get a lock on the child */
- child_res_id.name[0] = child_inode->i_ino;
- child_res_id.name[1] = child_inode->i_generation;
-
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
- &flags, ldlm_completion_ast, mds_blocking_ast,
- NULL, &child_lockh);
- if (rc != ELDLM_OK)
- GOTO(cleanup, rc);
-
- cleanup_phase = 3; /* child lock */
+ cleanup_phase = 2; /* dchild has a lock */
/* Step 4: Get a lock on the ino to sync with creation WRT inode
* reuse (see bug 2029). */
- child_res_id.name[1] = 0;
-
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
- &flags, ldlm_completion_ast, mds_blocking_ast,
- NULL, &child_reuse_lockh);
+ rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
if (rc != ELDLM_OK)
GOTO(cleanup, rc);
- cleanup_phase = 4; /* child lock */
-
+ cleanup_phase = 3; /* child inum lock */
+
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
/* ldlm_reply in buf[0] if called via intent */
LASSERT(body != NULL);
/* If this is the last reference to this inode, get the OBD EA
- * data first so the client can destroy OST objects.
+ * data first so the client can destroy OST objects.
* we only do the object removal if no open files remain.
- * Nobody can get at this name anymore because of the locks so
+ * Nobody can get at this name anymore because of the locks so
* we make decisions here as to whether to remove the inode */
- if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
+ if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
mds_open_orphan_count(child_inode) == 0) {
mds_pack_inode2fid(&body->fid1, child_inode);
mds_pack_inode2body(body, child_inode);
if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
mds_open_orphan_count(child_inode) > 0) {
rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
- cleanup_phase = 5; /* transaction */
+ cleanup_phase = 4; /* transaction */
GOTO(cleanup, rc);
}
NULL);
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
- cleanup_phase = 5; /* transaction */
+ cleanup_phase = 4; /* transaction */
rc = vfs_rmdir(dparent->d_inode, dchild);
break;
case S_IFREG: {
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
- cleanup_phase = 5; /* transaction */
+ cleanup_phase = 4; /* transaction */
rc = vfs_unlink(dparent->d_inode, dchild);
#ifdef ENABLE_ORPHANS
NULL);
if (IS_ERR(handle))
GOTO(cleanup, rc = PTR_ERR(handle));
- cleanup_phase = 5;
+ cleanup_phase = 4; /* transaction */
rc = vfs_unlink(dparent->d_inode, dchild);
break;
default:
}
switch(cleanup_phase) {
- case 5:
+ case 4:
rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
rc, 0);
if (!rc)
(void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
"unlinked", 0, NULL);
- case 4: /* child ino-reuse lock */
+ case 3: /* child ino-reuse lock */
if (rc && body != NULL) {
// Don't unlink the OST objects if the MDS unlink failed
body->valid = 0;
ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
else
ldlm_put_lock_into_req(req, &child_reuse_lockh, LCK_EX);
- case 3: /* child lock */
+ case 2: /* child lock */
ldlm_lock_decref(&child_lockh, LCK_EX);
- case 2: /* child dentry */
- l_dput(dchild);
- case 1: /* parent dentry and lock */
- if (rc) {
+ case 1: /* child and parent dentry, parent lock */
+ if (rc)
ldlm_lock_decref(&parent_lockh, LCK_PW);
- } else {
+ else
ldlm_put_lock_into_req(req, &parent_lockh, LCK_PW);
- }
+ l_dput(dchild);
l_dput(dparent);
case 0:
break;
struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
struct ldlm_res_id src_res_id = { .name = {0} };
struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
- int lock_mode = 0, rc = 0, cleanup_phase = 0;
+ int rc = 0, cleanup_phase = 0;
ENTRY;
LASSERT(offset == 0);
+ DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
+ rec->ur_fid1->id, rec->ur_fid1->generation,
+ rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
+
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
de_src->d_inode->i_ino);
/* Step 2: Take the two locks */
- lock_mode = LCK_EX;
src_res_id.name[0] = de_src->d_inode->i_ino;
src_res_id.name[1] = de_src->d_inode->i_generation;
tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
- rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
- &src_lockh, &tgt_dir_lockh);
- if (rc != ELDLM_OK)
- GOTO(cleanup, rc = -EIO);
+ rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
+ &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
+ if (rc)
+ GOTO(cleanup, rc);
cleanup_phase = 3; /* locks */
l_dput(dchild);
case 3: /* locks */
if (rc) {
- ldlm_lock_decref(&src_lockh, lock_mode);
- ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
+ ldlm_lock_decref(&src_lockh, LCK_EX);
+ ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
} else {
- ldlm_put_lock_into_req(req, &src_lockh, lock_mode);
- ldlm_put_lock_into_req(req, &tgt_dir_lockh, lock_mode);
+ ldlm_put_lock_into_req(req, &src_lockh, LCK_EX);
+ ldlm_put_lock_into_req(req, &tgt_dir_lockh, LCK_EX);
}
case 2: /* target dentry */
l_dput(de_tgt_dir);
RETURN(rc);
}
+/* The idea here is that we need to get four locks in the end:
+ * one on each parent directory, one on each child. We need to take
+ * these locks in some kind of order (to avoid deadlocks), and the order
+ * I selected is "increasing resource number" order. We need to look up
+ * the children, however, before we know what the resource number(s) are.
+ * Thus the following plan:
+ *
+ * 1,2. Look up the parents
+ * 3,4. Look up the children
+ * 5. Take locks on the parents and children, in order
+ * 6. Verify that the children haven't changed since they were looked up
+ *
+ * If there was a race and the children changed since they were first looked
+ * up, it is possible that mds_verify_child() will be able to just grab the
+ * lock on the new child resource (if it has a higher resource than any other)
+ * but we need to compare against not only its parent, but also against the
+ * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
+ *
+ * We need the fancy igrab() on the child inodes because we aren't holding a
+ * lock on the parent after the lookup is done, so dentry->d_inode may change
+ * at any time, and igrab() itself doesn't like getting passed a NULL argument.
+ */
+static int mds_get_parents_children_locked(struct obd_device *obd,
+ struct mds_obd *mds,
+ struct ll_fid *p1_fid,
+ struct dentry **de_srcdirp,
+ struct ll_fid *p2_fid,
+ struct dentry **de_tgtdirp,
+ int parent_mode,
+ const char *old_name, int old_len,
+ struct dentry **de_oldp,
+ const char *new_name, int new_len,
+ struct dentry **de_newp,
+ struct lustre_handle *dlm_handles,
+ int child_mode)
+{
+ struct ldlm_res_id p1_res_id = { .name = {0} };
+ struct ldlm_res_id p2_res_id = { .name = {0} };
+ struct ldlm_res_id c1_res_id = { .name = {0} };
+ struct ldlm_res_id c2_res_id = { .name = {0} };
+ struct ldlm_res_id *maxres_src, *maxres_tgt;
+ struct inode *inode;
+ int rc = 0, cleanup_phase = 0;
+ ENTRY;
+
+ /* Step 1: Lookup the source directory */
+ *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
+ if (IS_ERR(*de_srcdirp))
+ GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
+
+ cleanup_phase = 1; /* source directory dentry */
+
+ p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
+ p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
+
+ /* Step 2: Lookup the target directory */
+ if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
+ *de_tgtdirp = dget(*de_srcdirp);
+ } else {
+ *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
+ if (IS_ERR(*de_tgtdirp))
+ GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
+ }
+
+ cleanup_phase = 2; /* target directory dentry */
+
+ p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
+ p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
+
+ /* Step 3: Lookup the source child entry */
+ *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
+ if (IS_ERR(*de_oldp)) {
+ rc = PTR_ERR(*de_oldp);
+ CERROR("old child lookup error (%*s): %d\n",
+ old_len - 1, old_name, rc);
+ GOTO(cleanup, rc);
+ }
+
+ cleanup_phase = 3; /* original name dentry */
+
+ inode = (*de_oldp)->d_inode;
+ if (inode != NULL)
+ inode = igrab(inode);
+ if (inode == NULL)
+ GOTO(cleanup, rc = -ENOENT);
+
+ c1_res_id.name[0] = inode->i_ino;
+ c1_res_id.name[1] = inode->i_generation;
+ iput(inode);
+
+ /* Step 4: Lookup the target child entry */
+ *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
+ if (IS_ERR(*de_newp)) {
+ rc = PTR_ERR(*de_newp);
+ CERROR("new child lookup error (%*s): %d\n",
+ old_len - 1, old_name, rc);
+ GOTO(cleanup, rc);
+ }
+
+ cleanup_phase = 4; /* target dentry */
+
+ inode = (*de_newp)->d_inode;
+ if (inode != NULL)
+ inode = igrab(inode);
+ if (inode == NULL)
+ goto retry_locks;
+
+ c2_res_id.name[0] = inode->i_ino;
+ c2_res_id.name[1] = inode->i_generation;
+ iput(inode);
+
+retry_locks:
+ /* Step 5: Take locks on the parents and child(ren) */
+ maxres_src = &p1_res_id;
+ maxres_tgt = &p2_res_id;
+ cleanup_phase = 4; /* target dentry */
+
+ if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
+ maxres_src = &c1_res_id;
+ if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
+ maxres_tgt = &c2_res_id;
+
+ rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
+ &p2_res_id, &dlm_handles[1], parent_mode,
+ &c1_res_id, &dlm_handles[2], child_mode,
+ &c2_res_id, &dlm_handles[3], child_mode);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ cleanup_phase = 6; /* parent and child(ren) locks */
+
+ /* Step 6a: Re-lookup source child to verify it hasn't changed */
+ rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
+ parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
+ child_mode, old_name, old_len, maxres_tgt);
+ if (rc) {
+ if (c2_res_id.name[0] != 0)
+ ldlm_lock_decref(&dlm_handles[3], child_mode);
+ ldlm_lock_decref(&dlm_handles[1], parent_mode);
+ cleanup_phase = 4;
+ if (rc > 0)
+ goto retry_locks;
+ GOTO(cleanup, rc);
+ }
+
+ if ((*de_oldp)->d_inode == NULL)
+ GOTO(cleanup, rc = -ENOENT);
+
+ /* Step 6b: Re-lookup target child to verify it hasn't changed */
+ rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
+ parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
+ child_mode, new_name, new_len, maxres_src);
+ if (rc) {
+ ldlm_lock_decref(&dlm_handles[2], child_mode);
+ ldlm_lock_decref(&dlm_handles[0], parent_mode);
+ cleanup_phase = 4;
+ if (rc > 0)
+ goto retry_locks;
+ GOTO(cleanup, rc);
+ }
+
+ EXIT;
+cleanup:
+ if (rc) {
+ switch (cleanup_phase) {
+ case 6: /* child lock(s) */
+ if (c2_res_id.name[0] != 0)
+ ldlm_lock_decref(&dlm_handles[3], child_mode);
+ if (c1_res_id.name[0] != 0)
+ ldlm_lock_decref(&dlm_handles[2], child_mode);
+ case 5: /* parent locks */
+ ldlm_lock_decref(&dlm_handles[1], parent_mode);
+ ldlm_lock_decref(&dlm_handles[0], parent_mode);
+ case 4: /* target dentry */
+ l_dput(*de_newp);
+ case 3: /* source dentry */
+ l_dput(*de_oldp);
+ case 2: /* target directory dentry */
+ l_dput(*de_tgtdirp);
+ case 1: /* source directry dentry */
+ l_dput(*de_srcdirp);
+ }
+ }
+
+ return rc;
+}
+
static int mds_reint_rename(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req,
struct lustre_handle *lockh)
struct dentry *de_new = NULL;
struct mds_obd *mds = mds_req2mds(req);
struct lustre_handle dlm_handles[4];
- struct ldlm_res_id p1_res_id = { .name = {0} };
- struct ldlm_res_id p2_res_id = { .name = {0} };
- struct ldlm_res_id c1_res_id = { .name = {0} };
- struct ldlm_res_id c2_res_id = { .name = {0} };
struct mds_body *body = NULL;
- int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
+ int rc = 0, lock_count = 3;
int cleanup_phase = 0;
void *handle = NULL;
ENTRY;
LASSERT(offset == 0);
- MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
-
- de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
- if (IS_ERR(de_srcdir))
- GOTO(cleanup, rc = PTR_ERR(de_srcdir));
-
- cleanup_phase = 1; /* source directory dentry */
-
- de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
- if (IS_ERR(de_tgtdir))
- GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
-
- cleanup_phase = 2; /* target directory dentry */
+ DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
+ rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
+ rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
- /* The idea here is that we need to get four locks in the end:
- * one on each parent directory, one on each child. We need to take
- * these locks in some kind of order (to avoid deadlocks), and the order
- * I selected is "increasing resource number" order. We need to take
- * the locks on the parent directories, however, before we can lookup
- * the children. Thus the following plan:
- *
- * 1. Take locks on the parent(s), in order
- * 2. Lookup the children
- * 3. Take locks on the children, in order
- * 4. Execute the rename
- */
-
- /* Step 1: Take locks on the parent(s), in order */
- p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
- p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
-
- p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
- p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
+ MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
- rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
- &(dlm_handles[0]), &(dlm_handles[1]));
- if (rc != ELDLM_OK)
+ rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
+ rec->ur_fid2, &de_tgtdir, LCK_PW,
+ rec->ur_name, rec->ur_namelen,
+ &de_old, rec->ur_tgt,
+ rec->ur_tgtlen, &de_new,
+ dlm_handles, LCK_EX);
+ if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 3; /* parent locks */
+ cleanup_phase = 1; /* parent(s), children, locks */
- /* Step 2: Lookup the children */
- de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
- if (IS_ERR(de_old)) {
- CERROR("old child lookup error (%*s): %ld\n",
- rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
- GOTO(cleanup, rc = PTR_ERR(de_old));
- }
-
- cleanup_phase = 4; /* original name dentry */
-
- if (de_old->d_inode == NULL)
- GOTO(cleanup, rc = -ENOENT);
+ if (de_new->d_inode)
+ lock_count = 4;
/* sanity check for src inode */
if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
GOTO(cleanup, rc = -EINVAL);
- de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
- if (IS_ERR(de_new)) {
- CERROR("new child lookup error (%*s): %ld\n",
- rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
- GOTO(cleanup, rc = PTR_ERR(de_new));
- }
-
- cleanup_phase = 5; /* target dentry */
-
/* sanity check for dest inode */
if (de_new->d_inode &&
(de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
- de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
+ de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
GOTO(cleanup, rc = -EINVAL);
- /* Step 3: Take locks on the children */
- c1_res_id.name[0] = de_old->d_inode->i_ino;
- c1_res_id.name[1] = de_old->d_inode->i_generation;
- if (de_new->d_inode == NULL) {
- flags = LDLM_FL_LOCAL_ONLY;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
- &flags, ldlm_completion_ast,
- mds_blocking_ast, NULL,
- &(dlm_handles[2]));
- lock_count = 3;
- } else {
- c2_res_id.name[0] = de_new->d_inode->i_ino;
- c2_res_id.name[1] = de_new->d_inode->i_generation;
- rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
- &(dlm_handles[2]),
- &(dlm_handles[3]));
- lock_count = 4;
- }
- if (rc != ELDLM_OK)
- GOTO(cleanup, rc);
-
- cleanup_phase = 6; /* child locks */
-
/* if we are about to remove the target at first, pass the EA of
* that inode to client to perform and cleanup on OST */
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
if (!(body->valid & OBD_MD_FLEASIZE)) {
body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLATIME | OBD_MD_FLMTIME);
+ OBD_MD_FLATIME | OBD_MD_FLMTIME);
} else {
/* XXX need log unlink? */
}
}
- /* Step 4: Execute the rename */
- OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,de_srcdir->d_inode->i_sb);
+ OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
+ de_srcdir->d_inode->i_sb);
handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
if (IS_ERR(handle))
rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
unlock_kernel();
- EXIT;
+ GOTO(cleanup, rc);
cleanup:
rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
handle, req, rc, 0);
switch (cleanup_phase) {
- case 6: /* child locks */
+ case 1:
if (rc) {
- ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
if (lock_count == 4)
ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
+ ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
+ ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
+ ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
} else {
- ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
if (lock_count == 4)
ldlm_put_lock_into_req(req,
- &(dlm_handles[3]), LCK_EX);
+ &(dlm_handles[3]), LCK_EX);
+ ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
+ ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_PW);
+ ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_PW);
}
- case 5: /* target dentry */
l_dput(de_new);
- case 4: /* source dentry */
l_dput(de_old);
- case 3: /* parent locks */
- if (rc) {
- ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
- ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
- } else {
- ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_EX);
- ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_EX);
- }
- case 2: /* target directory dentry */
l_dput(de_tgtdir);
- case 1: /* source directry dentry */
l_dput(de_srcdir);
case 0:
break;