1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mds/mds_reint.c
5 * Lustre Metadata Server (mds) reintegration routines
7 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
12 * This file is part of Lustre, http://www.lustre.org.
14 * Lustre is free software; you can redistribute it and/or
15 * modify it under the terms of version 2 of the GNU General Public
16 * License as published by the Free Software Foundation.
18 * Lustre is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
23 * You should have received a copy of the GNU General Public License
24 * along with Lustre; if not, write to the Free Software
25 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 #define DEBUG_SUBSYSTEM S_MDS
32 #include <linux/obd_support.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_fsfilt.h>
40 #include "mds_internal.h"
42 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
44 static void mds_commit_cb(struct obd_device *obd, __u64 transno, int error)
46 obd_transno_commit_cb(obd, transno, error);
49 /* Assumes caller has already pushed us into the kernel context. */
50 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
51 struct ptlrpc_request *req, int rc,
54 struct mds_export_data *med = &req->rq_export->exp_mds_data;
55 struct mds_client_data *mcd = med->med_mcd;
56 struct obd_device *obd = req->rq_export->exp_obd;
63 /* if the export has already been failed, we have no last_rcvd slot */
64 if (req->rq_export->exp_failed) {
65 CERROR("committing transaction for disconnected client\n");
72 /* if we're starting our own xaction, use our own inode */
73 i = mds->mds_rcvd_filp->f_dentry->d_inode;
74 handle = fsfilt_start(obd, i, FSFILT_OP_SETATTR);
76 CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
77 GOTO(out, rc = PTR_ERR(handle));
81 off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
83 transno = req->rq_reqmsg->transno;
85 spin_lock(&mds->mds_transno_lock);
86 transno = ++mds->mds_last_transno;
87 spin_unlock(&mds->mds_transno_lock);
89 req->rq_repmsg->transno = req->rq_transno = transno;
90 mcd->mcd_last_transno = cpu_to_le64(transno);
91 mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
92 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
93 mcd->mcd_last_result = cpu_to_le32(rc);
94 mcd->mcd_last_data = cpu_to_le32(op_data);
96 fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
98 written = lustre_fwrite(mds->mds_rcvd_filp, mcd, sizeof(*mcd), &off);
99 CDEBUG(D_INODE, "wrote trans "LPU64" client %s at #%u: written = "
100 LPSZ"\n", transno, mcd->mcd_uuid, med->med_off, written);
102 if (written != sizeof(*mcd)) {
103 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
113 err = fsfilt_commit(obd, i, handle, 0);
115 CERROR("error committing transaction: %d\n", err);
125 /* this gives the same functionality as the code between
126 * sys_chmod and inode_setattr
127 * chown_common and inode_setattr
128 * utimes and inode_setattr
130 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
132 time_t now = LTIME_S(CURRENT_TIME);
133 struct iattr *attr = &rec->ur_iattr;
134 unsigned int ia_valid = attr->ia_valid;
138 /* only fix up attrs if the client VFS didn't already */
139 if (!(ia_valid & ATTR_RAW))
142 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
145 LTIME_S(attr->ia_ctime) = now;
146 if (!(ia_valid & ATTR_ATIME_SET))
147 LTIME_S(attr->ia_atime) = now;
148 if (!(ia_valid & ATTR_MTIME_SET))
149 LTIME_S(attr->ia_mtime) = now;
152 if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME) &&
153 !(ia_valid & ATTR_ATIME_SET)) {
154 if (rec->ur_fsuid != inode->i_uid &&
155 (error = permission(inode,MAY_WRITE)) != 0)
157 } else if (ia_valid & ATTR_UID) {
160 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
162 if (attr->ia_uid == (uid_t) -1)
163 attr->ia_uid = inode->i_uid;
164 if (attr->ia_gid == (gid_t) -1)
165 attr->ia_gid = inode->i_gid;
166 attr->ia_mode = inode->i_mode;
167 attr->ia_valid = ATTR_UID | ATTR_GID | ATTR_CTIME;
169 * If the user or group of a non-directory has been
170 * changed by a non-root user, remove the setuid bit.
171 * 19981026 David C Niemi <niemi@tux.org>
173 * Changed this to apply to all users, including root,
174 * to avoid some races. This is the behavior we had in
175 * 2.0. The check for non-root was definitely wrong
176 * for 2.2 anyway, as it should have been using
177 * CAP_FSETID rather than fsuid -- 19990830 SD.
179 if ((inode->i_mode & S_ISUID) == S_ISUID &&
180 !S_ISDIR(inode->i_mode)) {
181 attr->ia_mode &= ~S_ISUID;
182 attr->ia_valid |= ATTR_MODE;
185 * Likewise, if the user or group of a non-directory
186 * has been changed by a non-root user, remove the
187 * setgid bit UNLESS there is no group execute bit
188 * (this would be a file marked for mandatory
189 * locking). 19981026 David C Niemi <niemi@tux.org>
191 * Removed the fsuid check (see the comment above) --
194 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
195 (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
196 attr->ia_mode &= ~S_ISGID;
197 attr->ia_valid |= ATTR_MODE;
199 } else if (ia_valid & ATTR_MODE) {
200 int mode = attr->ia_mode;
202 if (attr->ia_mode == (mode_t) -1)
203 attr->ia_mode = inode->i_mode;
205 (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
210 static void reconstruct_reint_setattr(struct mds_update_record *rec,
211 int offset, struct ptlrpc_request *req)
213 struct mds_export_data *med = &req->rq_export->exp_mds_data;
214 struct mds_client_data *mcd = med->med_mcd;
215 struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
217 struct mds_body *body;
219 req->rq_transno = mcd->mcd_last_transno;
220 req->rq_status = mcd->mcd_last_result;
222 if (req->rq_export->exp_outstanding_reply)
223 mds_steal_ack_locks(req->rq_export, req);
225 de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
227 LASSERT(PTR_ERR(de) == req->rq_status);
231 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
232 mds_pack_inode2fid(&body->fid1, de->d_inode);
233 mds_pack_inode2body(body, de->d_inode);
238 /* In the raw-setattr case, we lock the child inode.
239 * In the write-back case or if being called from open, the client holds a lock
242 * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
243 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
244 struct ptlrpc_request *req,
245 struct lustre_handle *lh)
247 struct mds_obd *mds = mds_req2mds(req);
248 struct obd_device *obd = req->rq_export->exp_obd;
249 struct mds_body *body;
251 struct inode *inode = NULL;
252 struct lustre_handle lockh;
254 int rc = 0, cleanup_phase = 0, err, locked = 0;
257 LASSERT(offset == 0);
259 MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
261 if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
262 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
264 GOTO(cleanup, rc = PTR_ERR(de));
266 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
269 GOTO(cleanup, rc = PTR_ERR(de));
277 CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
279 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
280 to_kdev_t(inode->i_sb->s_dev));
282 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
283 if (IS_ERR(handle)) {
284 rc = PTR_ERR(handle);
289 rc = mds_fix_attr(inode, rec);
293 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
295 S_ISREG(inode->i_mode) &&
296 rec->ur_eadata != NULL) {
297 rc = fsfilt_set_md(obd, inode, handle,
298 rec->ur_eadata, rec->ur_eadatalen);
301 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
302 mds_pack_inode2fid(&body->fid1, inode);
303 mds_pack_inode2body(body, inode);
307 err = mds_finish_transno(mds, inode, handle, req, rc, 0);
308 switch(cleanup_phase) {
313 ldlm_lock_decref(&lockh, LCK_PW);
315 memcpy(&req->rq_ack_locks[0].lock, &lockh,
317 req->rq_ack_locks[0].mode = LCK_PW;
332 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
333 struct ptlrpc_request *req)
335 struct mds_export_data *med = &req->rq_export->exp_mds_data;
336 struct mds_client_data *mcd = med->med_mcd;
337 struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
338 struct dentry *parent, *child;
339 struct mds_body *body;
341 req->rq_transno = mcd->mcd_last_transno;
342 req->rq_status = mcd->mcd_last_result;
344 if (req->rq_export->exp_outstanding_reply)
345 mds_steal_ack_locks(req->rq_export, req);
350 parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
351 LASSERT(!IS_ERR(parent));
352 child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
353 LASSERT(!IS_ERR(child));
354 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
355 mds_pack_inode2fid(&body->fid1, child->d_inode);
356 mds_pack_inode2body(body, child->d_inode);
361 static int mds_reint_create(struct mds_update_record *rec, int offset,
362 struct ptlrpc_request *req,
363 struct lustre_handle *lh)
365 struct dentry *de = NULL;
366 struct mds_obd *mds = mds_req2mds(req);
367 struct obd_device *obd = req->rq_export->exp_obd;
368 struct dentry *dchild = NULL;
369 struct inode *dir = NULL;
371 struct lustre_handle lockh;
372 int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
376 LASSERT(offset == 0);
377 LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
379 MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
381 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
382 GOTO(cleanup, rc = -ESTALE);
384 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
387 CERROR("parent lookup error %d\n", rc);
390 cleanup_phase = 1; /* locked parent dentry */
393 CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
394 dir->i_ino, rec->ur_name, rec->ur_mode);
396 ldlm_lock_dump_handle(D_OTHER, &lockh);
398 dchild = ll_lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
399 if (IS_ERR(dchild)) {
400 rc = PTR_ERR(dchild);
401 CERROR("child lookup error %d\n", rc);
405 cleanup_phase = 2; /* child dentry */
407 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
408 to_kdev_t(dir->i_sb->s_dev));
410 if (dir->i_mode & S_ISGID) {
411 rec->ur_gid = dir->i_gid;
412 if (S_ISDIR(rec->ur_mode))
413 rec->ur_mode |= S_ISGID;
416 if (rec->ur_fid2->id)
417 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
421 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
423 GOTO(cleanup, rc = PTR_ERR(handle));
424 rc = vfs_create(dir, dchild, rec->ur_mode);
429 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
431 GOTO(cleanup, rc = PTR_ERR(handle));
432 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
437 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
439 GOTO(cleanup, rc = PTR_ERR(handle));
440 if (rec->ur_tgt == NULL) /* no target supplied */
441 rc = -EINVAL; /* -EPROTO? */
443 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
451 int rdev = rec->ur_rdev;
452 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
454 GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
455 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
460 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
461 GOTO(cleanup, rc = -EINVAL);
464 /* In case we stored the desired inum in here, we want to clean up.
465 * We also do this in the cleanup block, for the error cases.
467 dchild->d_fsdata = NULL;
470 CDEBUG(D_INODE, "error during create: %d\n", rc);
474 struct inode *inode = dchild->d_inode;
475 struct mds_body *body;
478 LTIME_S(iattr.ia_atime) = rec->ur_time;
479 LTIME_S(iattr.ia_ctime) = rec->ur_time;
480 LTIME_S(iattr.ia_mtime) = rec->ur_time;
481 iattr.ia_uid = rec->ur_uid;
482 iattr.ia_gid = rec->ur_gid;
483 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
484 ATTR_MTIME | ATTR_CTIME;
486 if (rec->ur_fid2->id) {
487 LASSERT(rec->ur_fid2->id == inode->i_ino);
488 inode->i_generation = rec->ur_fid2->generation;
489 /* Dirtied and committed by the upcoming setattr. */
490 CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
491 inode->i_ino, inode->i_generation);
493 CDEBUG(D_INODE, "created ino %lu with gen %x\n",
494 inode->i_ino, inode->i_generation);
497 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
499 CERROR("error on setattr: rc = %d\n", rc);
500 /* XXX should we abort here in case of error? */
503 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
504 mds_pack_inode2fid(&body->fid1, inode);
505 mds_pack_inode2body(body, inode);
510 err = mds_finish_transno(mds, dir, handle, req, rc, 0);
513 /* Destroy the file we just created. This should not need
514 * extra journal credits, as we have already modified all of
515 * the blocks needed in order to create the file in the first
520 err = vfs_rmdir(dir, dchild);
522 CERROR("rmdir in error path: %d\n", err);
525 err = vfs_unlink(dir, dchild);
527 CERROR("unlink in error path: %d\n", err);
533 switch (cleanup_phase) {
534 case 2: /* child dentry */
535 dchild->d_fsdata = NULL;
537 case 1: /* locked parent dentry */
539 ldlm_lock_decref(&lockh, LCK_PW);
541 memcpy(&req->rq_ack_locks[0].lock, &lockh,
543 req->rq_ack_locks[0].mode = LCK_PW;
549 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
556 /* This function doesn't use ldlm_match_or_enqueue because we're always called
557 * with EX or PW locks, and the MDS is no longer allowed to match write locks,
558 * because they take the place of local semaphores.
560 * Two locks are taken in numerical order */
561 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
562 struct ldlm_res_id *p1_res_id,
563 struct ldlm_res_id *p2_res_id,
564 struct lustre_handle *p1_lockh,
565 struct lustre_handle *p2_lockh)
567 struct ldlm_res_id res_id[2];
568 struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
572 LASSERT(p1_res_id != NULL && p2_res_id != NULL);
574 CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
575 p1_res_id[0].name[0], p2_res_id[0].name[0]);
577 if (p1_res_id->name[0] < p2_res_id->name[0]) {
578 handles[0] = p1_lockh;
579 handles[1] = p2_lockh;
580 res_id[0] = *p1_res_id;
581 res_id[1] = *p2_res_id;
583 handles[1] = p1_lockh;
584 handles[0] = p2_lockh;
585 res_id[1] = *p1_res_id;
586 res_id[0] = *p2_res_id;
589 CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
590 p1_res_id[0].name[0], p2_res_id[0].name[0]);
592 flags = LDLM_FL_LOCAL_ONLY;
593 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
594 LDLM_PLAIN, NULL, 0, lock_mode, &flags,
595 ldlm_completion_ast, mds_blocking_ast, NULL,
599 ldlm_lock_dump_handle(D_OTHER, handles[0]);
601 if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
602 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
603 ldlm_lock_addref(handles[1], lock_mode);
605 flags = LDLM_FL_LOCAL_ONLY;
606 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
607 res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
608 &flags, ldlm_completion_ast,
609 mds_blocking_ast, NULL, handles[1]);
610 if (rc != ELDLM_OK) {
611 ldlm_lock_decref(handles[0], lock_mode);
615 ldlm_lock_dump_handle(D_OTHER, handles[1]);
620 static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
621 struct ptlrpc_request *req,
622 struct lustre_handle *child_lockh)
624 struct mds_export_data *med = &req->rq_export->exp_mds_data;
625 struct mds_client_data *mcd = med->med_mcd;
627 req->rq_transno = mcd->mcd_last_transno;
628 req->rq_status = mcd->mcd_last_result;
630 if (req->rq_export->exp_outstanding_reply)
631 mds_steal_ack_locks(req->rq_export, req);
633 DEBUG_REQ(D_ERROR, req,
634 "can't get EA for reconstructed unlink, leaking OST inodes");
637 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
638 struct ptlrpc_request *req,
639 struct lustre_handle *child_lockh)
641 struct dentry *dir_de = NULL;
642 struct dentry *dchild = NULL;
643 struct mds_obd *mds = mds_req2mds(req);
644 struct obd_device *obd = req->rq_export->exp_obd;
645 struct mds_body *body = NULL;
646 struct inode *dir_inode = NULL, *child_inode;
647 struct lustre_handle parent_lockh;
649 struct ldlm_res_id child_res_id = { .name = {0} };
650 int rc = 0, flags = 0, return_lock = 0;
651 int cleanup_phase = 0;
654 LASSERT(offset == 0 || offset == 2);
656 MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req,
659 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
660 GOTO(cleanup, rc = -ENOENT);
662 /* Step 1: Lookup the parent by FID */
663 dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
666 GOTO(cleanup, rc = PTR_ERR(dir_de));
667 dir_inode = dir_de->d_inode;
670 cleanup_phase = 1; /* Have parent dentry lock */
672 /* Step 2: Lookup the child */
673 dchild = ll_lookup_one_len(rec->ur_name, dir_de, rec->ur_namelen - 1);
675 GOTO(cleanup, rc = PTR_ERR(dchild));
677 cleanup_phase = 2; /* child dentry */
679 child_inode = dchild->d_inode;
680 if (child_inode == NULL) {
682 "child doesn't exist (dir %lu, name %s)\n",
683 dir_inode->i_ino, rec->ur_name);
688 DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
689 dir_inode->i_ino, child_inode->i_ino);
691 /* Step 3: Get a lock on the child */
692 child_res_id.name[0] = child_inode->i_ino;
693 child_res_id.name[1] = child_inode->i_generation;
695 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
696 child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
697 &flags, ldlm_completion_ast, mds_blocking_ast,
702 cleanup_phase = 3; /* child lock */
704 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
705 to_kdev_t(dir_inode->i_sb->s_dev));
707 /* ldlm_reply in buf[0] if called via intent */
711 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
712 LASSERT(body != NULL);
714 /* Step 4: Do the unlink: client decides between rmdir/unlink!
716 switch (rec->ur_mode & S_IFMT) {
718 /* Drop any lingering child directories before we start our
719 * transaction, to avoid doing multiple inode dirty/delete
720 * in our compound transaction (bug 1321).
722 shrink_dcache_parent(dchild);
723 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
725 GOTO(cleanup, rc = PTR_ERR(handle));
727 rc = vfs_rmdir(dir_inode, dchild);
730 /* If this is the last reference to this inode, get the OBD EA
731 * data first so the client can destroy OST objects */
732 if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
733 mds_pack_inode2fid(&body->fid1, child_inode);
734 mds_pack_inode2body(body, child_inode);
735 mds_pack_md(obd, req->rq_repmsg, offset + 1,
737 if (body->valid & OBD_MD_FLEASIZE)
746 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
748 GOTO(cleanup, rc = PTR_ERR(handle));
750 rc = vfs_unlink(dir_inode, dchild);
753 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
756 GOTO(cleanup, rc = -EINVAL);
760 switch(cleanup_phase) {
762 rc = mds_finish_transno(mds, dir_inode, handle, req, rc, 0);
764 /* Don't unlink the OST objects if the MDS unlink failed */
767 case 3: /* child lock */
768 if (rc != 0 || return_lock == 0)
769 ldlm_lock_decref(child_lockh, LCK_EX);
770 case 2: /* child dentry */
772 case 1: /* parent dentry and lock */
774 ldlm_lock_decref(&parent_lockh, LCK_EX);
776 memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
777 sizeof(parent_lockh));
778 req->rq_ack_locks[0].mode = LCK_EX;
784 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
791 static void reconstruct_reint_link(struct mds_update_record *rec, int offset,
792 struct ptlrpc_request *req)
794 struct mds_export_data *med = &req->rq_export->exp_mds_data;
795 struct mds_client_data *mcd = med->med_mcd;
797 req->rq_transno = mcd->mcd_last_transno;
798 req->rq_status = mcd->mcd_last_result;
800 if (req->rq_export->exp_outstanding_reply)
801 mds_steal_ack_locks(req->rq_export, req);
804 static int mds_reint_link(struct mds_update_record *rec, int offset,
805 struct ptlrpc_request *req,
806 struct lustre_handle *lh)
808 struct obd_device *obd = req->rq_export->exp_obd;
809 struct dentry *de_src = NULL;
810 struct dentry *de_tgt_dir = NULL;
811 struct dentry *dchild = NULL;
812 struct mds_obd *mds = mds_req2mds(req);
813 struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
814 struct ldlm_res_id src_res_id = { .name = {0} };
815 struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
816 int lock_mode = 0, rc = 0, cleanup_phase = 0;
819 LASSERT(offset == 0);
821 MDS_CHECK_RESENT(req, reconstruct_reint_link(rec, offset, req));
823 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
824 GOTO(cleanup, rc = -ENOENT);
826 /* Step 1: Lookup the source inode and target directory by FID */
827 de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
829 GOTO(cleanup, rc = PTR_ERR(de_src));
831 cleanup_phase = 1; /* source dentry */
833 de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
834 if (IS_ERR(de_tgt_dir))
835 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
837 cleanup_phase = 2; /* target directory dentry */
839 CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
840 de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
841 de_src->d_inode->i_ino);
843 /* Step 2: Take the two locks */
845 src_res_id.name[0] = de_src->d_inode->i_ino;
846 src_res_id.name[1] = de_src->d_inode->i_generation;
847 tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
848 tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
850 rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
851 &src_lockh, &tgt_dir_lockh);
853 GOTO(cleanup, rc = -EIO);
855 cleanup_phase = 3; /* locks */
857 /* Step 3: Lookup the child */
858 dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
859 if (IS_ERR(dchild)) {
860 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
861 GOTO(cleanup, rc = PTR_ERR(dchild));
864 cleanup_phase = 4; /* child dentry */
866 if (dchild->d_inode) {
867 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
868 de_tgt_dir->d_inode->i_ino, rec->ur_name);
874 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
875 to_kdev_t(de_src->d_inode->i_sb->s_dev));
877 handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
878 if (IS_ERR(handle)) {
879 rc = PTR_ERR(handle);
883 rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
885 CERROR("link error %d\n", rc);
887 rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
891 switch (cleanup_phase) {
892 case 4: /* child dentry */
896 ldlm_lock_decref(&src_lockh, lock_mode);
897 ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
899 memcpy(&req->rq_ack_locks[0].lock, &src_lockh,
901 memcpy(&req->rq_ack_locks[1].lock, &tgt_dir_lockh,
902 sizeof(tgt_dir_lockh));
903 req->rq_ack_locks[0].mode = lock_mode;
904 req->rq_ack_locks[1].mode = lock_mode;
906 case 2: /* target dentry */
908 case 1: /* source dentry */
913 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
920 static void reconstruct_reint_rename(struct mds_update_record *rec,
921 int offset, struct ptlrpc_request *req)
923 struct mds_export_data *med = &req->rq_export->exp_mds_data;
924 struct mds_client_data *mcd = med->med_mcd;
926 req->rq_transno = mcd->mcd_last_transno;
927 req->rq_status = mcd->mcd_last_result;
929 if (req->rq_export->exp_outstanding_reply)
930 mds_steal_ack_locks(req->rq_export, req);
932 LBUG(); /* don't support it yet, but it'll be fun! */
936 static int mds_reint_rename(struct mds_update_record *rec, int offset,
937 struct ptlrpc_request *req,
938 struct lustre_handle *lockh)
940 struct obd_device *obd = req->rq_export->exp_obd;
941 struct dentry *de_srcdir = NULL;
942 struct dentry *de_tgtdir = NULL;
943 struct dentry *de_old = NULL;
944 struct dentry *de_new = NULL;
945 struct mds_obd *mds = mds_req2mds(req);
946 struct lustre_handle dlm_handles[4];
947 struct ldlm_res_id p1_res_id = { .name = {0} };
948 struct ldlm_res_id p2_res_id = { .name = {0} };
949 struct ldlm_res_id c1_res_id = { .name = {0} };
950 struct ldlm_res_id c2_res_id = { .name = {0} };
951 int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
952 int cleanup_phase = 0;
956 LASSERT(offset == 0);
958 MDS_CHECK_RESENT(req, reconstruct_reint_rename(rec, offset, req));
960 de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
961 if (IS_ERR(de_srcdir))
962 GOTO(cleanup, rc = PTR_ERR(de_srcdir));
964 cleanup_phase = 1; /* source directory dentry */
966 de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
967 if (IS_ERR(de_tgtdir))
968 GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
970 cleanup_phase = 2; /* target directory dentry */
972 /* The idea here is that we need to get four locks in the end:
973 * one on each parent directory, one on each child. We need to take
974 * these locks in some kind of order (to avoid deadlocks), and the order
975 * I selected is "increasing resource number" order. We need to take
976 * the locks on the parent directories, however, before we can lookup
977 * the children. Thus the following plan:
979 * 1. Take locks on the parent(s), in order
980 * 2. Lookup the children
981 * 3. Take locks on the children, in order
982 * 4. Execute the rename
985 /* Step 1: Take locks on the parent(s), in order */
986 p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
987 p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
989 p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
990 p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
992 rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
993 &(dlm_handles[0]), &(dlm_handles[1]));
997 cleanup_phase = 3; /* parent locks */
999 /* Step 2: Lookup the children */
1000 de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
1001 if (IS_ERR(de_old)) {
1002 CERROR("old child lookup error (%*s): %ld\n",
1003 rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
1004 GOTO(cleanup, rc = PTR_ERR(de_old));
1007 cleanup_phase = 4; /* original name dentry */
1009 if (de_old->d_inode == NULL)
1010 GOTO(cleanup, rc = -ENOENT);
1012 /* sanity check for src inode */
1013 if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1014 de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1015 GOTO(cleanup, rc = -EINVAL);
1017 de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
1018 if (IS_ERR(de_new)) {
1019 CERROR("new child lookup error (%*s): %ld\n",
1020 rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
1021 GOTO(cleanup, rc = PTR_ERR(de_new));
1024 cleanup_phase = 5; /* target dentry */
1026 /* sanity check for dest inode */
1027 if (de_new->d_inode &&
1028 (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1029 de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1030 GOTO(cleanup, rc = -EINVAL);
1032 /* Step 3: Take locks on the children */
1033 c1_res_id.name[0] = de_old->d_inode->i_ino;
1034 c1_res_id.name[1] = de_old->d_inode->i_generation;
1035 if (de_new->d_inode == NULL) {
1036 flags = LDLM_FL_LOCAL_ONLY;
1037 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1038 c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
1039 &flags, ldlm_completion_ast,
1040 mds_blocking_ast, NULL,
1044 c2_res_id.name[0] = de_new->d_inode->i_ino;
1045 c2_res_id.name[1] = de_new->d_inode->i_generation;
1046 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
1054 cleanup_phase = 6; /* child locks */
1056 /* Step 4: Execute the rename */
1057 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1058 to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
1060 handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
1062 GOTO(cleanup, rc = PTR_ERR(handle));
1065 rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
1071 rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1072 handle, req, rc, 0);
1073 switch (cleanup_phase) {
1074 case 6: /* child locks */
1076 ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1077 if (lock_count == 4)
1078 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1080 memcpy(&req->rq_ack_locks[2].lock, &(dlm_handles[2]),
1081 sizeof(dlm_handles[2]));
1082 req->rq_ack_locks[2].mode = LCK_EX;
1083 if (lock_count == 4) {
1084 memcpy(&req->rq_ack_locks[3].lock,
1085 &dlm_handles[3], sizeof(dlm_handles[3]));
1086 req->rq_ack_locks[3].mode = LCK_EX;
1089 case 5: /* target dentry */
1091 case 4: /* source dentry */
1093 case 3: /* parent locks */
1095 ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
1096 ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
1098 memcpy(&req->rq_ack_locks[0].lock, &(dlm_handles[0]),
1099 sizeof(dlm_handles[0]));
1100 memcpy(&req->rq_ack_locks[1].lock, &(dlm_handles[1]),
1101 sizeof(dlm_handles[1]));
1102 req->rq_ack_locks[0].mode = LCK_EX;
1103 req->rq_ack_locks[1].mode = LCK_EX;
1105 case 2: /* target directory dentry */
1107 case 1: /* source directry dentry */
1112 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1115 req->rq_status = rc;
1119 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1120 struct ptlrpc_request *, struct lustre_handle *);
1122 static mds_reinter reinters[REINT_MAX + 1] = {
1123 [REINT_SETATTR] mds_reint_setattr,
1124 [REINT_CREATE] mds_reint_create,
1125 [REINT_UNLINK] mds_reint_unlink,
1126 [REINT_LINK] mds_reint_link,
1127 [REINT_RENAME] mds_reint_rename,
1128 [REINT_OPEN] mds_open
1131 int mds_reint_rec(struct mds_update_record *rec, int offset,
1132 struct ptlrpc_request *req, struct lustre_handle *lockh)
1134 struct mds_obd *mds = mds_req2mds(req);
1135 struct obd_run_ctxt saved;
1139 /* checked by unpacker */
1140 LASSERT(rec->ur_opcode <= REINT_MAX &&
1141 reinters[rec->ur_opcode] != NULL);
1143 push_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1144 rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1145 pop_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);