1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mds/mds_reint.c
5 * Lustre Metadata Server (mds) reintegration routines
7 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
12 * This file is part of Lustre, http://www.lustre.org.
14 * Lustre is free software; you can redistribute it and/or
15 * modify it under the terms of version 2 of the GNU General Public
16 * License as published by the Free Software Foundation.
18 * Lustre is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
23 * You should have received a copy of the GNU General Public License
24 * along with Lustre; if not, write to the Free Software
25 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 #define DEBUG_SUBSYSTEM S_MDS
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/lustre_fsfilt.h>
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
42 static void mds_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd, int error)
44 CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
46 if (!error && last_rcvd > obd->obd_last_committed)
47 obd->obd_last_committed = last_rcvd;
50 void mds_start_transno(struct mds_obd *mds)
53 down(&mds->mds_transno_sem);
56 /* Assumes caller has already pushed us into the kernel context. */
57 int mds_finish_transno(struct mds_obd *mds, void *handle,
58 struct ptlrpc_request *req, int rc)
60 struct mds_export_data *med = &req->rq_export->exp_mds_data;
61 struct mds_client_data *mcd = med->med_mcd;
66 /* Propagate error code. */
70 /* we don't allocate new transnos for replayed requests */
71 if (req->rq_level == LUSTRE_CONN_RECOVD)
74 off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
76 last_rcvd = ++mds->mds_last_rcvd;
77 req->rq_repmsg->transno = HTON__u64(last_rcvd);
78 mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
79 mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
80 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
82 fsfilt_set_last_rcvd(req->rq_export->exp_obd, last_rcvd, handle,
84 written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
86 CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
87 LPSZ"\n", last_rcvd, mcd->mcd_uuid, med->med_off, written);
89 if (written == sizeof(*mcd))
91 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
99 up(&mds->mds_transno_sem);
103 /* In the write-back case, the client holds a lock on a subtree (not supported).
104 * In the intent case, the client holds a lock on the child inode. */
105 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
106 struct ptlrpc_request *req,
107 struct lustre_handle *lh)
109 struct mds_obd *mds = mds_req2mds(req);
110 struct obd_device *obd = req->rq_export->exp_obd;
111 struct mds_body *body;
117 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
119 GOTO(out_setattr, rc = PTR_ERR(de));
123 CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
125 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
126 to_kdev_t(inode->i_sb->s_dev));
128 mds_start_transno(mds);
129 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
130 if (IS_ERR(handle)) {
131 rc = PTR_ERR(handle);
132 (void)mds_finish_transno(mds, handle, req, rc);
133 GOTO(out_setattr_de, rc);
136 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr);
137 if (rc == 0 && S_ISREG(inode->i_mode) &&
138 req->rq_reqmsg->bufcount > 1) {
139 rc = fsfilt_set_md(obd, inode, handle,
140 lustre_msg_buf(req->rq_reqmsg, 1),
141 req->rq_reqmsg->buflens[1]);
144 body = lustre_msg_buf(req->rq_repmsg, 0);
145 mds_pack_inode2fid(&body->fid1, inode);
146 mds_pack_inode2body(body, inode);
148 rc = mds_finish_transno(mds, handle, req, rc);
149 err = fsfilt_commit(obd, de->d_inode, handle);
151 CERROR("error on commit: err = %d\n", err);
164 static int mds_reint_create(struct mds_update_record *rec, int offset,
165 struct ptlrpc_request *req,
166 struct lustre_handle *lh)
168 struct dentry *de = NULL;
169 struct mds_obd *mds = mds_req2mds(req);
170 struct obd_device *obd = req->rq_export->exp_obd;
171 struct dentry *dchild = NULL;
174 struct lustre_handle lockh;
175 int rc = 0, err, type = rec->ur_mode & S_IFMT;
178 LASSERT(offset == 0);
179 LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
181 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
182 GOTO(out_create, rc = -ESTALE);
184 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
187 CERROR("parent lookup error %d\n", rc);
189 GOTO(out_create, rc);
193 CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
194 dir->i_ino, rec->ur_name, rec->ur_mode);
196 ldlm_lock_dump_handle(D_OTHER, &lockh);
198 dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
199 if (IS_ERR(dchild)) {
200 rc = PTR_ERR(dchild);
201 CERROR("child lookup error %d\n", rc);
202 GOTO(out_create_de, rc);
205 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
206 to_kdev_t(dir->i_sb->s_dev));
208 if (dir->i_mode & S_ISGID) {
209 rec->ur_gid = dir->i_gid;
210 if (S_ISDIR(rec->ur_mode))
211 rec->ur_mode |= S_ISGID;
214 if (rec->ur_fid2->id)
215 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
217 LASSERT(!(rec->ur_opcode & REINT_REPLAYING));
219 /* From here on, we must exit via a path that calls mds_finish_transno,
220 * so that we release the mds_transno_sem (and, in the case of success,
221 * update the transno correctly). out_create_commit and
222 * out_transno_dchild are good candidates.
224 mds_start_transno(mds);
228 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
230 GOTO(out_transno_dchild, rc = PTR_ERR(handle));
231 rc = vfs_create(dir, dchild, rec->ur_mode);
236 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
238 GOTO(out_transno_dchild, rc = PTR_ERR(handle));
239 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
244 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
246 GOTO(out_transno_dchild, rc = PTR_ERR(handle));
247 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
255 int rdev = rec->ur_rdev;
256 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
258 GOTO(out_transno_dchild, rc = PTR_ERR(handle));
259 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
264 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
265 handle = NULL; /* quell uninitialized warning */
266 GOTO(out_transno_dchild, rc = -EINVAL);
269 /* In case we stored the desired inum in here, we want to clean up.
270 * We also do this in the out_transno_dchild block, for the error cases.
272 dchild->d_fsdata = NULL;
275 CDEBUG(D_INODE, "error during create: %d\n", rc);
276 GOTO(out_create_commit, rc);
279 struct inode *inode = dchild->d_inode;
280 struct mds_body *body;
282 iattr.ia_atime = rec->ur_time;
283 iattr.ia_ctime = rec->ur_time;
284 iattr.ia_mtime = rec->ur_time;
285 iattr.ia_uid = rec->ur_uid;
286 iattr.ia_gid = rec->ur_gid;
287 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
288 ATTR_MTIME | ATTR_CTIME;
290 if (rec->ur_fid2->id) {
291 LASSERT(rec->ur_fid2->id == inode->i_ino);
292 inode->i_generation = rec->ur_fid2->generation;
293 /* Dirtied and committed by the upcoming setattr. */
294 CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
295 inode->i_ino, inode->i_generation);
297 CDEBUG(D_INODE, "created ino %lu with gen %x\n",
298 inode->i_ino, inode->i_generation);
301 rc = fsfilt_setattr(obd, dchild, handle, &iattr);
303 CERROR("error on setattr: rc = %d\n", rc);
304 /* XXX should we abort here in case of error? */
307 body = lustre_msg_buf(req->rq_repmsg, offset);
308 mds_pack_inode2fid(&body->fid1, inode);
309 mds_pack_inode2body(body, inode);
314 rc = mds_finish_transno(mds, handle, req, rc);
316 rc = mds_finish_transno(mds, handle, req, rc);
318 GOTO(out_create_unlink, rc);
320 err = fsfilt_commit(obd, dir, handle);
322 CERROR("error on commit: err = %d\n", err);
329 ldlm_lock_decref(&lockh, LCK_PW);
336 dchild->d_fsdata = NULL;
337 /* Need to release the transno lock, and then put the dchild. */
339 mds_finish_transno(mds, handle, req, rc);
340 goto out_create_dchild;
343 /* Destroy the file we just created. This should not need extra
344 * journal credits, as we have already modified all of the blocks
345 * needed in order to create the file in the first place.
349 err = vfs_rmdir(dir, dchild);
351 CERROR("failed rmdir in error path: rc = %d\n", err);
354 err = vfs_unlink(dir, dchild);
356 CERROR("failed unlink in error path: rc = %d\n", err);
360 goto out_create_commit;
363 /* This function doesn't use ldlm_match_or_enqueue because we're always called
364 * with EX or PW locks, and the MDS is no longer allowed to match write locks,
365 * because they take the place of local semaphores.
367 * Two locks are taken in numerical order */
368 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
369 struct ldlm_res_id *p1_res_id,
370 struct ldlm_res_id *p2_res_id,
371 struct lustre_handle *p1_lockh,
372 struct lustre_handle *p2_lockh)
374 struct ldlm_res_id res_id[2];
375 struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
379 LASSERT(p1_res_id != NULL && p2_res_id != NULL);
381 CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
382 p1_res_id[0].name[0], p2_res_id[0].name[0]);
384 if (p1_res_id->name[0] < p2_res_id->name[0]) {
385 handles[0] = p1_lockh;
386 handles[1] = p2_lockh;
387 res_id[0] = *p1_res_id;
388 res_id[1] = *p2_res_id;
390 handles[1] = p1_lockh;
391 handles[0] = p2_lockh;
392 res_id[1] = *p1_res_id;
393 res_id[0] = *p2_res_id;
396 CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
397 p1_res_id[0].name[0], p2_res_id[0].name[0]);
399 flags = LDLM_FL_LOCAL_ONLY;
400 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
401 LDLM_PLAIN, NULL, 0, lock_mode, &flags,
402 ldlm_completion_ast, mds_blocking_ast, NULL,
406 ldlm_lock_dump_handle(D_OTHER, handles[0]);
408 if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
409 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
410 ldlm_lock_addref(handles[1], lock_mode);
412 flags = LDLM_FL_LOCAL_ONLY;
413 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
414 res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
415 &flags, ldlm_completion_ast,
416 mds_blocking_ast, NULL, 0, handles[1]);
417 if (rc != ELDLM_OK) {
418 ldlm_lock_decref(handles[0], lock_mode);
422 ldlm_lock_dump_handle(D_OTHER, handles[1]);
427 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
428 struct ptlrpc_request *req,
429 struct lustre_handle *child_lockh)
431 struct dentry *dir_de = NULL;
432 struct dentry *dchild = NULL;
433 struct mds_obd *mds = mds_req2mds(req);
434 struct obd_device *obd = req->rq_export->exp_obd;
435 struct mds_body *body = NULL;
436 struct inode *dir_inode, *child_inode;
437 struct lustre_handle *handle, parent_lockh;
438 struct ldlm_res_id child_res_id = { .name = {0} };
440 int namelen, err, rc = 0, flags = 0, return_lock = 0;
443 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
444 GOTO(out, rc = -ENOENT);
446 /* Step 1: Lookup the parent by FID */
447 dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
450 GOTO(out, rc = PTR_ERR(dir_de));
451 dir_inode = dir_de->d_inode;
454 /* Step 2: Lookup the child */
455 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
456 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
458 dchild = lookup_one_len(name, dir_de, namelen);
460 GOTO(out_step_2a, rc = PTR_ERR(dchild));
461 child_inode = dchild->d_inode;
462 if (child_inode == NULL) {
463 if (rec->ur_opcode & REINT_REPLAYING) {
465 "child missing (%lu/%s); OK for REPLAYING\n",
466 dir_inode->i_ino, rec->ur_name);
470 "child doesn't exist (dir %lu, name %s)\n",
471 dir_inode->i_ino, rec->ur_name);
474 GOTO(out_step_2b, rc);
477 DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
478 dir_inode->i_ino, child_inode->i_ino);
480 /* Step 3: Get lock a lock on the child */
481 child_res_id.name[0] = child_inode->i_ino;
482 child_res_id.name[1] = child_inode->i_generation;
484 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
485 child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
486 &flags, ldlm_completion_ast, mds_blocking_ast,
487 NULL, NULL, child_lockh);
489 GOTO(out_step_2b, rc);
491 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
492 to_kdev_t(dir_inode->i_sb->s_dev));
494 /* Slightly magical; see ldlm_intent_policy */
498 body = lustre_msg_buf(req->rq_repmsg, offset);
500 /* Step 4: Do the unlink: client decides between rmdir/unlink!
502 mds_start_transno(mds);
503 switch (rec->ur_mode & S_IFMT) {
505 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
507 GOTO(out_cancel_transno, rc = PTR_ERR(handle));
508 rc = vfs_rmdir(dir_inode, dchild);
511 /* If this is the last reference to this inode, get the OBD EA
512 * data first so the client can destroy OST objects */
513 if ((child_inode->i_mode & S_IFMT) == S_IFREG &&
514 child_inode->i_nlink == 1) {
515 mds_pack_inode2fid(&body->fid1, child_inode);
516 mds_pack_inode2body(body, child_inode);
517 mds_pack_md(obd, req->rq_repmsg, offset + 1,
519 if (body->valid & OBD_MD_FLEASIZE)
528 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
530 GOTO(out_cancel_transno, rc = PTR_ERR(handle));
531 rc = vfs_unlink(dir_inode, dchild);
534 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
537 GOTO(out_cancel_transno, rc = -EINVAL);
540 rc = mds_finish_transno(mds, handle, req, rc);
541 err = fsfilt_commit(obd, dir_inode, handle);
542 if (rc != 0 || err != 0) {
543 /* Don't unlink the OST objects if the MDS unlink failed */
547 CERROR("error on commit: err = %d\n", err);
552 GOTO(out_step_4, rc);
554 if (rc != 0 || return_lock == 0)
555 ldlm_lock_decref(child_lockh, LCK_EX);
559 ldlm_lock_decref(&parent_lockh, LCK_EX);
566 rc = mds_finish_transno(mds, handle, req, rc);
570 static int mds_reint_link(struct mds_update_record *rec, int offset,
571 struct ptlrpc_request *req, struct lustre_handle *lh)
573 struct obd_device *obd = req->rq_export->exp_obd;
574 struct dentry *de_src = NULL;
575 struct dentry *de_tgt_dir = NULL;
576 struct dentry *dchild = NULL;
577 struct mds_obd *mds = mds_req2mds(req);
578 struct lustre_handle *handle, tgt_dir_lockh, src_lockh;
579 struct ldlm_res_id src_res_id = { .name = {0} };
580 struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
581 int lock_mode, rc = 0, err;
584 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
585 GOTO(out, rc = -ENOENT);
587 /* Step 1: Lookup the source inode and target directory by FID */
588 de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
590 GOTO(out, rc = PTR_ERR(de_src));
592 de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
593 if (IS_ERR(de_tgt_dir))
594 GOTO(out_de_src, rc = PTR_ERR(de_tgt_dir));
596 CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
597 de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
598 de_src->d_inode->i_ino);
600 /* Step 2: Take the two locks */
602 src_res_id.name[0] = de_src->d_inode->i_ino;
603 src_res_id.name[1] = de_src->d_inode->i_generation;
604 tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
605 tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
607 rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
608 &src_lockh, &tgt_dir_lockh);
610 GOTO(out_tgt_dir, rc = -EIO);
612 /* Step 3: Lookup the child */
613 dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
614 if (IS_ERR(dchild)) {
615 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
616 GOTO(out_drop_locks, rc = PTR_ERR(dchild));
619 if (dchild->d_inode) {
620 if (rec->ur_opcode & REINT_REPLAYING) {
621 /* XXX verify that the link is to the the right file? */
623 "child exists (dir %lu, name %s) (REPLAYING)\n",
624 de_tgt_dir->d_inode->i_ino, rec->ur_name);
627 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
628 de_tgt_dir->d_inode->i_ino, rec->ur_name);
631 GOTO(out_drop_child, rc);
635 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
636 to_kdev_t(de_src->d_inode->i_sb->s_dev));
638 mds_start_transno(mds);
639 handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
640 if (IS_ERR(handle)) {
641 rc = PTR_ERR(handle);
642 mds_finish_transno(mds, handle, req, rc);
643 GOTO(out_drop_child, rc);
646 rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
648 CERROR("link error %d\n", rc);
649 rc = mds_finish_transno(mds, handle, req, rc);
651 err = fsfilt_commit(obd, de_tgt_dir->d_inode, handle);
653 CERROR("error on commit: err = %d\n", err);
663 ldlm_lock_decref(&src_lockh, lock_mode);
664 ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
674 static int mds_reint_rename(struct mds_update_record *rec, int offset,
675 struct ptlrpc_request *req,
676 struct lustre_handle *lockh)
678 struct obd_device *obd = req->rq_export->exp_obd;
679 struct dentry *de_srcdir = NULL;
680 struct dentry *de_tgtdir = NULL;
681 struct dentry *de_old = NULL;
682 struct dentry *de_new = NULL;
683 struct mds_obd *mds = mds_req2mds(req);
684 struct lustre_handle dlm_handles[4];
685 struct ldlm_res_id p1_res_id = { .name = {0} };
686 struct ldlm_res_id p2_res_id = { .name = {0} };
687 struct ldlm_res_id c1_res_id = { .name = {0} };
688 struct ldlm_res_id c2_res_id = { .name = {0} };
689 int rc = 0, err, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
693 de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
694 if (IS_ERR(de_srcdir))
695 GOTO(out, rc = PTR_ERR(de_srcdir));
696 de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
697 if (IS_ERR(de_tgtdir))
698 GOTO(out_put_srcdir, rc = PTR_ERR(de_tgtdir));
700 /* The idea here is that we need to get four locks in the end:
701 * one on each parent directory, one on each child. We need to take
702 * these locks in some kind of order (to avoid deadlocks), and the order
703 * I selected is "increasing resource number" order. We need to take
704 * the locks on the parent directories, however, before we can lookup
705 * the children. Thus the following plan:
707 * 1. Take locks on the parent(s), in order
708 * 2. Lookup the children
709 * 3. Take locks on the children, in order
710 * 4. Execute the rename
713 /* Step 1: Take locks on the parent(s), in order */
714 p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
715 p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
717 p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
718 p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
720 rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
721 &(dlm_handles[0]), &(dlm_handles[1]));
723 GOTO(out_put_tgtdir, rc);
725 /* Step 2: Lookup the children */
726 de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
727 if (IS_ERR(de_old)) {
728 CERROR("old child lookup error (%*s): %ld\n",
729 rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
730 GOTO(out_step_2a, rc = PTR_ERR(de_old));
733 if (de_old->d_inode == NULL)
734 GOTO(out_step_2b, rc = -ENOENT);
736 de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
737 if (IS_ERR(de_new)) {
738 CERROR("new child lookup error (%*s): %ld\n",
739 rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
740 GOTO(out_step_2b, rc = PTR_ERR(de_new));
743 /* Step 3: Take locks on the children */
744 c1_res_id.name[0] = de_old->d_inode->i_ino;
745 c1_res_id.name[1] = de_old->d_inode->i_generation;
746 if (de_new->d_inode == NULL) {
747 flags = LDLM_FL_LOCAL_ONLY;
748 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
749 c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
750 &flags, ldlm_completion_ast,
751 mds_blocking_ast, NULL, NULL,
755 c2_res_id.name[0] = de_new->d_inode->i_ino;
756 c2_res_id.name[1] = de_new->d_inode->i_generation;
757 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
763 GOTO(out_step_3, rc);
765 /* Step 4: Execute the rename */
766 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
767 to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
769 mds_start_transno(mds);
770 handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
771 if (IS_ERR(handle)) {
772 rc = PTR_ERR(handle);
773 mds_finish_transno(mds, handle, req, rc);
774 GOTO(out_step_4, rc);
778 rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
782 rc = mds_finish_transno(mds, handle, req, rc);
784 err = fsfilt_commit(obd, de_tgtdir->d_inode, handle);
786 CERROR("error on commit: err = %d\n", err);
793 ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
795 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
801 ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
802 ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
812 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
813 struct ptlrpc_request *, struct lustre_handle *);
815 static mds_reinter reinters[REINT_MAX + 1] = {
816 [REINT_SETATTR] mds_reint_setattr,
817 [REINT_CREATE] mds_reint_create,
818 [REINT_UNLINK] mds_reint_unlink,
819 [REINT_LINK] mds_reint_link,
820 [REINT_RENAME] mds_reint_rename,
821 [REINT_OPEN] mds_open
824 int mds_reint_rec(struct mds_update_record *rec, int offset,
825 struct ptlrpc_request *req, struct lustre_handle *lockh)
827 struct mds_obd *mds = mds_req2mds(req);
828 struct obd_run_ctxt saved;
830 int realop = rec->ur_opcode & REINT_OPCODE_MASK, rc;
833 if (realop < 1 || realop > REINT_MAX) {
834 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
835 rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
836 rc = req->rq_status = -EINVAL;
840 uc.ouc_fsuid = rec->ur_fsuid;
841 uc.ouc_fsgid = rec->ur_fsgid;
842 uc.ouc_cap = rec->ur_cap;
843 uc.ouc_suppgid = rec->ur_suppgid;
845 push_ctxt(&saved, &mds->mds_ctxt, &uc);
846 rc = reinters[realop] (rec, offset, req, lockh);
847 pop_ctxt(&saved, &mds->mds_ctxt, &uc);