1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mds/mds_reint.c
5 * Lustre Metadata Server (mds) reintegration routines
7 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
12 * This file is part of Lustre, http://www.lustre.org.
14 * Lustre is free software; you can redistribute it and/or
15 * modify it under the terms of version 2 of the GNU General Public
16 * License as published by the Free Software Foundation.
18 * Lustre is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
23 * You should have received a copy of the GNU General Public License
24 * along with Lustre; if not, write to the Free Software
25 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 #define DEBUG_SUBSYSTEM S_MDS
32 #include <linux/obd_support.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_fsfilt.h>
41 #include "mds_internal.h"
43 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
46 obd_transno_commit_cb(obd, transno, error);
49 struct mds_logcancel_data {
50 struct lov_mds_md *mlcd_lmm;
54 struct llog_cookie mlcd_cookies[0];
57 /* Establish a connection to the OSC when we first need it. We don't do
58 * this during MDS setup because that would introduce setup ordering issues. */
59 static int mds_osc_connect(struct obd_device *obd, struct mds_obd *mds)
64 if (IS_ERR(mds->mds_osc_obd))
65 RETURN(PTR_ERR(mds->mds_osc_obd));
70 mds->mds_osc_obd = class_uuid2obd(&mds->mds_osc_uuid);
71 if (!mds->mds_osc_obd) {
72 CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
73 mds->mds_osc_uuid.uuid);
74 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
78 rc = obd_connect(&mds->mds_osc_conn, mds->mds_osc_obd, &obd->obd_uuid);
80 CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
81 mds->mds_osc_uuid.uuid);
82 mds->mds_osc_obd = ERR_PTR(rc);
86 rc = obd_set_info(&mds->mds_osc_conn, strlen("mds_conn"), "mds_conn",
91 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
92 void *cb_data, int error)
94 struct mds_logcancel_data *mlcd = cb_data;
95 struct lov_stripe_md *lsm = NULL;
98 obd_transno_commit_cb(obd, transno, error);
100 CDEBUG(D_HA, "cancelling %d cookies\n",
101 (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
103 rc = obd_unpackmd(&obd->u.mds.mds_osc_conn, &lsm, mlcd->mlcd_lmm,
104 mlcd->mlcd_eadatalen);
106 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
107 (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
110 rc = obd_log_cancel(&obd->u.mds.mds_osc_conn, lsm,
111 mlcd->mlcd_cookielen /
112 sizeof(*mlcd->mlcd_cookies),
113 mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
114 ///* XXX 0 normally, SENDNOW for debug */);
116 CERROR("error cancelling %d log cookies: rc %d\n",
117 (int)(mlcd->mlcd_cookielen /
118 sizeof(*mlcd->mlcd_cookies)), rc);
121 OBD_FREE(mlcd, mlcd->mlcd_size);
124 /* Assumes caller has already pushed us into the kernel context. */
125 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
126 struct ptlrpc_request *req, int rc, __u32 op_data)
128 struct mds_export_data *med = &req->rq_export->exp_mds_data;
129 struct mds_client_data *mcd = med->med_mcd;
130 struct obd_device *obd = req->rq_export->exp_obd;
137 /* if the export has already been failed, we have no last_rcvd slot */
138 if (req->rq_export->exp_failed) {
139 CERROR("committing transaction for disconnected client\n");
146 /* if we're starting our own xaction, use our own inode */
147 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
148 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
149 if (IS_ERR(handle)) {
150 CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
151 GOTO(out, rc = PTR_ERR(handle));
157 transno = req->rq_reqmsg->transno;
159 spin_lock(&mds->mds_transno_lock);
160 transno = ++mds->mds_last_transno;
161 spin_unlock(&mds->mds_transno_lock);
163 req->rq_repmsg->transno = req->rq_transno = transno;
164 mcd->mcd_last_transno = cpu_to_le64(transno);
165 mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
166 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
167 mcd->mcd_last_result = cpu_to_le32(rc);
168 mcd->mcd_last_data = cpu_to_le32(op_data);
170 fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
171 mds_commit_cb, NULL);
172 written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
173 (char *)mcd, sizeof(*mcd), &off);
174 CDEBUG(D_INODE, "wrote trans "LPU64" client %s at idx %u: written = "
175 LPSZ"\n", transno, mcd->mcd_uuid, med->med_idx, written);
177 if (written != sizeof(*mcd)) {
178 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
188 err = fsfilt_commit(obd, inode, handle, 0);
190 CERROR("error committing transaction: %d\n", err);
200 /* this gives the same functionality as the code between
201 * sys_chmod and inode_setattr
202 * chown_common and inode_setattr
203 * utimes and inode_setattr
205 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
207 time_t now = LTIME_S(CURRENT_TIME);
208 struct iattr *attr = &rec->ur_iattr;
209 unsigned int ia_valid = attr->ia_valid;
213 /* only fix up attrs if the client VFS didn't already */
214 if (!(ia_valid & ATTR_RAW))
217 if (!(ia_valid & ATTR_CTIME_SET))
218 LTIME_S(attr->ia_ctime) = now;
219 if (!(ia_valid & ATTR_ATIME_SET))
220 LTIME_S(attr->ia_atime) = now;
221 if (!(ia_valid & ATTR_MTIME_SET))
222 LTIME_S(attr->ia_mtime) = now;
224 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
228 if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME)) {
229 if (rec->ur_fsuid != inode->i_uid &&
230 (error = permission(inode,MAY_WRITE)) != 0)
234 if (ia_valid & ATTR_SIZE) {
235 if ((error = permission(inode,MAY_WRITE)) != 0)
239 if (ia_valid & ATTR_UID) {
242 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
244 if (attr->ia_uid == (uid_t) -1)
245 attr->ia_uid = inode->i_uid;
246 if (attr->ia_gid == (gid_t) -1)
247 attr->ia_gid = inode->i_gid;
248 attr->ia_mode = inode->i_mode;
250 * If the user or group of a non-directory has been
251 * changed by a non-root user, remove the setuid bit.
252 * 19981026 David C Niemi <niemi@tux.org>
254 * Changed this to apply to all users, including root,
255 * to avoid some races. This is the behavior we had in
256 * 2.0. The check for non-root was definitely wrong
257 * for 2.2 anyway, as it should have been using
258 * CAP_FSETID rather than fsuid -- 19990830 SD.
260 if ((inode->i_mode & S_ISUID) == S_ISUID &&
261 !S_ISDIR(inode->i_mode)) {
262 attr->ia_mode &= ~S_ISUID;
263 attr->ia_valid |= ATTR_MODE;
266 * Likewise, if the user or group of a non-directory
267 * has been changed by a non-root user, remove the
268 * setgid bit UNLESS there is no group execute bit
269 * (this would be a file marked for mandatory
270 * locking). 19981026 David C Niemi <niemi@tux.org>
272 * Removed the fsuid check (see the comment above) --
275 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
276 (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
277 attr->ia_mode &= ~S_ISGID;
278 attr->ia_valid |= ATTR_MODE;
280 } else if (ia_valid & ATTR_MODE) {
281 int mode = attr->ia_mode;
283 if (attr->ia_mode == (mode_t) -1)
284 attr->ia_mode = inode->i_mode;
286 (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
291 static void reconstruct_reint_setattr(struct mds_update_record *rec,
292 int offset, struct ptlrpc_request *req)
294 struct mds_export_data *med = &req->rq_export->exp_mds_data;
295 struct mds_client_data *mcd = med->med_mcd;
296 struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
298 struct mds_body *body;
300 req->rq_transno = mcd->mcd_last_transno;
301 req->rq_status = mcd->mcd_last_result;
303 if (req->rq_export->exp_outstanding_reply)
304 mds_steal_ack_locks(req->rq_export, req);
306 de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
308 LASSERT(PTR_ERR(de) == req->rq_status);
312 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
313 mds_pack_inode2fid(&body->fid1, de->d_inode);
314 mds_pack_inode2body(body, de->d_inode);
316 /* Don't return OST-specific attributes if we didn't just set them */
317 if (rec->ur_iattr.ia_valid & ATTR_SIZE)
318 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
319 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
320 body->valid |= OBD_MD_FLMTIME;
321 if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
322 body->valid |= OBD_MD_FLATIME;
327 /* In the raw-setattr case, we lock the child inode.
328 * In the write-back case or if being called from open, the client holds a lock
331 * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
332 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
333 struct ptlrpc_request *req,
334 struct lustre_handle *lh)
336 struct mds_obd *mds = mds_req2mds(req);
337 struct obd_device *obd = req->rq_export->exp_obd;
338 struct mds_body *body;
340 struct inode *inode = NULL;
341 struct lustre_handle lockh;
343 struct mds_logcancel_data *mlcd = NULL;
344 int rc = 0, cleanup_phase = 0, err, locked = 0;
347 LASSERT(offset == 0);
349 MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
351 if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
352 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
354 GOTO(cleanup, rc = PTR_ERR(de));
356 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
359 GOTO(cleanup, rc = PTR_ERR(de));
367 CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
369 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
370 to_kdev_t(inode->i_sb->s_dev));
372 #ifdef ENABLE_ORPHANS
373 if (unlikely(mds->mds_osc_obd == NULL))
374 mds_osc_connect(obd, mds);
377 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
378 if (IS_ERR(handle)) {
379 rc = PTR_ERR(handle);
384 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
385 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
386 LTIME_S(rec->ur_iattr.ia_mtime),
387 LTIME_S(rec->ur_iattr.ia_ctime));
388 rc = mds_fix_attr(inode, rec);
392 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
393 if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
394 rc = fsfilt_set_md(obd, inode, handle,
395 rec->ur_eadata, rec->ur_eadatalen);
398 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
399 mds_pack_inode2fid(&body->fid1, inode);
400 mds_pack_inode2body(body, inode);
402 /* Don't return OST-specific attributes if we didn't just set them */
403 if (rec->ur_iattr.ia_valid & ATTR_SIZE)
404 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
405 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
406 body->valid |= OBD_MD_FLMTIME;
407 if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
408 body->valid |= OBD_MD_FLATIME;
410 if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
411 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
414 mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
416 mlcd->mlcd_eadatalen = rec->ur_eadatalen;
417 mlcd->mlcd_cookielen = rec->ur_cookielen;
418 mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
419 mlcd->mlcd_cookielen;
420 memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
421 mlcd->mlcd_cookielen);
422 memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
423 mlcd->mlcd_eadatalen);
425 CERROR("unable to allocate log cancel data\n");
431 fsfilt_set_last_rcvd(req->rq_export->exp_obd, 0, handle,
432 mds_cancel_cookies_cb, mlcd);
433 err = mds_finish_transno(mds, inode, handle, req, rc, 0);
434 switch (cleanup_phase) {
439 ldlm_lock_decref(&lockh, LCK_PW);
441 memcpy(&req->rq_ack_locks[0].lock, &lockh,
443 req->rq_ack_locks[0].mode = LCK_PW;
458 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
459 struct ptlrpc_request *req)
461 struct mds_export_data *med = &req->rq_export->exp_mds_data;
462 struct mds_client_data *mcd = med->med_mcd;
463 struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
464 struct dentry *parent, *child;
465 struct mds_body *body;
467 req->rq_transno = mcd->mcd_last_transno;
468 req->rq_status = mcd->mcd_last_result;
470 if (req->rq_export->exp_outstanding_reply)
471 mds_steal_ack_locks(req->rq_export, req);
476 parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
477 LASSERT(!IS_ERR(parent));
478 child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
479 LASSERT(!IS_ERR(child));
480 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
481 mds_pack_inode2fid(&body->fid1, child->d_inode);
482 mds_pack_inode2body(body, child->d_inode);
487 static int mds_reint_create(struct mds_update_record *rec, int offset,
488 struct ptlrpc_request *req,
489 struct lustre_handle *lh)
491 struct dentry *de = NULL;
492 struct mds_obd *mds = mds_req2mds(req);
493 struct obd_device *obd = req->rq_export->exp_obd;
494 struct dentry *dchild = NULL;
495 struct inode *dir = NULL;
497 struct lustre_handle lockh;
498 int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
502 LASSERT(offset == 0);
503 LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
505 MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
507 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
508 GOTO(cleanup, rc = -ESTALE);
510 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
513 CERROR("parent lookup error %d\n", rc);
516 cleanup_phase = 1; /* locked parent dentry */
519 CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
520 dir->i_ino, rec->ur_name, rec->ur_mode);
522 ldlm_lock_dump_handle(D_OTHER, &lockh);
524 dchild = ll_lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
525 if (IS_ERR(dchild)) {
526 rc = PTR_ERR(dchild);
527 CERROR("child lookup error %d\n", rc);
531 cleanup_phase = 2; /* child dentry */
533 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
534 to_kdev_t(dir->i_sb->s_dev));
536 if (dir->i_mode & S_ISGID) {
537 rec->ur_gid = dir->i_gid;
538 if (S_ISDIR(rec->ur_mode))
539 rec->ur_mode |= S_ISGID;
542 if (rec->ur_fid2->id)
543 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
547 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
549 GOTO(cleanup, rc = PTR_ERR(handle));
550 rc = vfs_create(dir, dchild, rec->ur_mode);
555 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
557 GOTO(cleanup, rc = PTR_ERR(handle));
558 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
563 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
565 GOTO(cleanup, rc = PTR_ERR(handle));
566 if (rec->ur_tgt == NULL) /* no target supplied */
567 rc = -EINVAL; /* -EPROTO? */
569 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
577 int rdev = rec->ur_rdev;
578 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
580 GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
581 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
586 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
587 dchild->d_fsdata = NULL;
588 GOTO(cleanup, rc = -EINVAL);
591 /* In case we stored the desired inum in here, we want to clean up. */
592 if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
593 dchild->d_fsdata = NULL;
596 CDEBUG(D_INODE, "error during create: %d\n", rc);
600 struct inode *inode = dchild->d_inode;
601 struct mds_body *body;
604 LTIME_S(iattr.ia_atime) = rec->ur_time;
605 LTIME_S(iattr.ia_ctime) = rec->ur_time;
606 LTIME_S(iattr.ia_mtime) = rec->ur_time;
607 iattr.ia_uid = rec->ur_uid;
608 iattr.ia_gid = rec->ur_gid;
609 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
610 ATTR_MTIME | ATTR_CTIME;
612 if (rec->ur_fid2->id) {
613 LASSERT(rec->ur_fid2->id == inode->i_ino);
614 inode->i_generation = rec->ur_fid2->generation;
615 /* Dirtied and committed by the upcoming setattr. */
616 CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
617 inode->i_ino, inode->i_generation);
619 CDEBUG(D_INODE, "created ino %lu with gen %x\n",
620 inode->i_ino, inode->i_generation);
623 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
625 CERROR("error on setattr: rc = %d\n", rc);
626 /* XXX should we abort here in case of error? */
629 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
630 mds_pack_inode2fid(&body->fid1, inode);
631 mds_pack_inode2body(body, inode);
636 err = mds_finish_transno(mds, dir, handle, req, rc, 0);
639 /* Destroy the file we just created. This should not need
640 * extra journal credits, as we have already modified all of
641 * the blocks needed in order to create the file in the first
646 err = vfs_rmdir(dir, dchild);
648 CERROR("rmdir in error path: %d\n", err);
651 err = vfs_unlink(dir, dchild);
653 CERROR("unlink in error path: %d\n", err);
659 switch (cleanup_phase) {
660 case 2: /* child dentry */
662 case 1: /* locked parent dentry */
664 ldlm_lock_decref(&lockh, LCK_PW);
666 memcpy(&req->rq_ack_locks[0].lock, &lockh,
668 req->rq_ack_locks[0].mode = LCK_PW;
674 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
681 /* This function doesn't use ldlm_match_or_enqueue because we're always called
682 * with EX or PW locks, and the MDS is no longer allowed to match write locks,
683 * because they take the place of local semaphores.
685 * Two locks are taken in numerical order */
686 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
687 struct ldlm_res_id *p1_res_id,
688 struct ldlm_res_id *p2_res_id,
689 struct lustre_handle *p1_lockh,
690 struct lustre_handle *p2_lockh)
692 struct ldlm_res_id res_id[2];
693 struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
697 LASSERT(p1_res_id != NULL && p2_res_id != NULL);
699 CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
700 p1_res_id[0].name[0], p2_res_id[0].name[0]);
702 if (p1_res_id->name[0] < p2_res_id->name[0]) {
703 handles[0] = p1_lockh;
704 handles[1] = p2_lockh;
705 res_id[0] = *p1_res_id;
706 res_id[1] = *p2_res_id;
708 handles[1] = p1_lockh;
709 handles[0] = p2_lockh;
710 res_id[1] = *p1_res_id;
711 res_id[0] = *p2_res_id;
714 CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
715 p1_res_id[0].name[0], p2_res_id[0].name[0]);
717 flags = LDLM_FL_LOCAL_ONLY;
718 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
719 LDLM_PLAIN, NULL, 0, lock_mode, &flags,
720 ldlm_completion_ast, mds_blocking_ast, NULL,
724 ldlm_lock_dump_handle(D_OTHER, handles[0]);
726 if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
727 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
728 ldlm_lock_addref(handles[1], lock_mode);
730 flags = LDLM_FL_LOCAL_ONLY;
731 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
732 res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
733 &flags, ldlm_completion_ast,
734 mds_blocking_ast, NULL, handles[1]);
735 if (rc != ELDLM_OK) {
736 ldlm_lock_decref(handles[0], lock_mode);
740 ldlm_lock_dump_handle(D_OTHER, handles[1]);
745 static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
746 struct ptlrpc_request *req,
747 struct lustre_handle *child_lockh)
749 struct mds_export_data *med = &req->rq_export->exp_mds_data;
750 struct mds_client_data *mcd = med->med_mcd;
752 req->rq_transno = mcd->mcd_last_transno;
753 req->rq_status = mcd->mcd_last_result;
755 if (req->rq_export->exp_outstanding_reply)
756 mds_steal_ack_locks(req->rq_export, req);
758 DEBUG_REQ(D_ERROR, req,
759 "can't get EA for reconstructed unlink, leaking OST inodes");
762 /* If we are unlinking an open file/dir (i.e. creating an orphan) then
763 * we instead link the inode into the PENDING directory until it is
764 * finally released. We can't simply call mds_reint_rename() or some
765 * part thereof, because we don't have the inode to check for link
766 * count/open status until after it is locked.
768 * For lock ordering, we always get the PENDING, then pending_child lock
769 * last to avoid deadlocks.
771 static int mds_unlink_orphan(struct mds_update_record *rec,
772 struct obd_device *obd, struct dentry *dparent,
773 struct dentry *dchild, void **handle)
775 struct mds_obd *mds = &obd->u.mds;
776 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
777 struct dentry *pending_child;
778 char fidname[LL_FID_NAMELEN];
782 LASSERT(!mds_inode_is_orphan(dchild->d_inode));
784 down(&pending_dir->i_sem);
785 fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
786 dchild->d_inode->i_generation);
788 CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
789 mds_open_orphan_count(dchild->d_inode),
790 rec->ur_name, fidname);
792 pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
793 if (IS_ERR(pending_child))
794 GOTO(out_lock, rc = PTR_ERR(pending_child));
796 if (pending_child->d_inode != NULL) {
797 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
798 LASSERT(pending_child->d_inode == dchild->d_inode);
799 GOTO(out_dput, rc = 0);
802 *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
804 GOTO(out_dput, rc = PTR_ERR(*handle));
806 rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
808 CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
809 dparent->d_inode->i_ino, rec->ur_name, rc);
811 mds_inode_set_orphan(dchild->d_inode);
815 up(&pending_dir->i_sem);
819 static int mds_log_op_unlink(struct obd_device *obd, struct mds_obd *mds,
820 struct inode *inode, struct lustre_msg *repmsg,
823 struct lov_stripe_md *lsm = NULL;
824 struct llog_unlink_rec *lur;
828 if (IS_ERR(mds->mds_osc_obd))
829 RETURN(PTR_ERR(mds->mds_osc_obd));
831 rc = obd_unpackmd(&mds->mds_osc_conn, &lsm,
832 lustre_msg_buf(repmsg, offset, 0),
833 repmsg->buflens[offset]);
837 OBD_ALLOC(lur, sizeof(*lur));
840 lur->lur_hdr.lth_len = lur->lur_end_len = sizeof(*lur);
841 lur->lur_hdr.lth_type = MDS_UNLINK_REC;
842 lur->lur_oid = inode->i_ino;
843 lur->lur_ogen = inode->i_generation;
845 rc = obd_log_add(&mds->mds_osc_conn, mds->mds_catalog, &lur->lur_hdr,
846 lsm, lustre_msg_buf(repmsg, offset + 1, 0),
847 repmsg->buflens[offset+1]/sizeof(struct llog_cookie));
849 obd_free_memmd(&mds->mds_osc_conn, &lsm);
850 OBD_FREE(lur, sizeof(*lur));
855 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
856 struct ptlrpc_request *req,
857 struct lustre_handle *lh)
859 struct dentry *dparent = NULL;
860 struct dentry *dchild = NULL;
861 struct mds_obd *mds = mds_req2mds(req);
862 struct obd_device *obd = req->rq_export->exp_obd;
863 struct mds_body *body = NULL;
864 struct inode *child_inode;
865 struct lustre_handle parent_lockh, child_lockh;
867 struct ldlm_res_id child_res_id = { .name = {0} };
868 int rc = 0, flags = 0, log_unlink = 0, cleanup_phase = 0;
871 LASSERT(offset == 0 || offset == 2);
873 MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req,
876 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
877 GOTO(cleanup, rc = -ENOENT);
879 /* Step 1: Lookup the parent by FID */
880 dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
883 GOTO(cleanup, rc = PTR_ERR(dparent));
884 LASSERT(dparent->d_inode);
886 cleanup_phase = 1; /* Have parent dentry lock */
888 /* Step 2: Lookup the child */
889 dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
891 GOTO(cleanup, rc = PTR_ERR(dchild));
893 cleanup_phase = 2; /* child dentry */
895 child_inode = dchild->d_inode;
896 if (child_inode == NULL) {
897 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
898 dparent->d_inode->i_ino, rec->ur_name);
899 GOTO(cleanup, rc = -ENOENT);
902 DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
903 dparent->d_inode->i_ino, child_inode->i_ino);
905 /* Step 3: Get a lock on the child */
906 child_res_id.name[0] = child_inode->i_ino;
907 child_res_id.name[1] = child_inode->i_generation;
909 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
910 child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
911 &flags, ldlm_completion_ast, mds_blocking_ast,
916 cleanup_phase = 3; /* child lock */
918 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
919 to_kdev_t(dparent->d_inode->i_sb->s_dev));
921 /* ldlm_reply in buf[0] if called via intent */
925 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
926 LASSERT(body != NULL);
928 #ifdef ENABLE_ORPHANS
929 if (unlikely(mds->mds_osc_obd == NULL))
930 mds_osc_connect(obd, mds);
933 /* If this is the last reference to this inode, get the OBD EA
934 * data first so the client can destroy OST objects */
935 if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
936 mds_pack_inode2fid(&body->fid1, child_inode);
937 mds_pack_inode2body(body, child_inode);
938 mds_pack_md(obd, req->rq_repmsg, offset + 1, body, child_inode);
939 if (!(body->valid & OBD_MD_FLEASIZE)) {
940 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
941 OBD_MD_FLATIME | OBD_MD_FLMTIME);
946 /* We have to do these checks ourselves, in case we are making an
947 * orphan. The client tells us whether rmdir() or unlink() was called,
948 * so we need to return appropriate errors (bug 72).
950 * We don't have to check permissions, because vfs_rename (called from
951 * mds_unlink_orphan) also calls may_delete. */
952 if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
953 if (!S_ISDIR(child_inode->i_mode))
954 GOTO(cleanup, rc = -ENOTDIR);
956 if (S_ISDIR(child_inode->i_mode))
957 GOTO(cleanup, rc = -EISDIR);
960 if (mds_open_orphan_count(child_inode) > 0) {
961 rc = mds_unlink_orphan(rec, obd, dparent, dchild, &handle);
962 #ifdef ENABLE_ORPHANS
963 if (!rc && mds_log_op_unlink(obd, mds, child_inode,
964 req->rq_repmsg, offset + 1) > 0)
965 body->valid |= OBD_MD_FLCOOKIE;
970 // Step 4: Do the unlink: client decides between rmdir/unlink! (bug 72)
971 switch (rec->ur_mode & S_IFMT) {
973 /* Drop any lingering child directories before we start our
974 * transaction, to avoid doing multiple inode dirty/delete
975 * in our compound transaction (bug 1321). */
976 shrink_dcache_parent(dchild);
977 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
980 GOTO(cleanup, rc = PTR_ERR(handle));
982 rc = vfs_rmdir(dparent->d_inode, dchild);
985 handle = fsfilt_start(obd, dparent->d_inode,
986 FSFILT_OP_UNLINK_LOG, NULL);
988 GOTO(cleanup, rc = PTR_ERR(handle));
991 rc = vfs_unlink(dparent->d_inode, dchild);
992 #ifdef ENABLE_ORPHANS
993 if (!rc && log_unlink)
994 if (mds_log_op_unlink(obd, mds, child_inode,
995 req->rq_repmsg, offset + 1) > 0)
996 body->valid |= OBD_MD_FLCOOKIE;
1005 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1008 GOTO(cleanup, rc = PTR_ERR(handle));
1010 rc = vfs_unlink(dparent->d_inode, dchild);
1013 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1016 GOTO(cleanup, rc = -EINVAL);
1020 switch(cleanup_phase) {
1022 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1024 if (rc && body != NULL) {
1025 // Don't unlink the OST objects if the MDS unlink failed
1028 case 3: /* child lock */
1029 ldlm_lock_decref(&child_lockh, LCK_EX);
1030 case 2: /* child dentry */
1032 case 1: /* parent dentry and lock */
1034 ldlm_lock_decref(&parent_lockh, LCK_PW);
1036 memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
1037 sizeof(parent_lockh));
1038 req->rq_ack_locks[0].mode = LCK_PW;
1044 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1047 req->rq_status = rc;
1051 static void reconstruct_reint_link(struct mds_update_record *rec, int offset,
1052 struct ptlrpc_request *req)
1054 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1055 struct mds_client_data *mcd = med->med_mcd;
1057 req->rq_transno = mcd->mcd_last_transno;
1058 req->rq_status = mcd->mcd_last_result;
1060 if (req->rq_export->exp_outstanding_reply)
1061 mds_steal_ack_locks(req->rq_export, req);
1064 static int mds_reint_link(struct mds_update_record *rec, int offset,
1065 struct ptlrpc_request *req,
1066 struct lustre_handle *lh)
1068 struct obd_device *obd = req->rq_export->exp_obd;
1069 struct dentry *de_src = NULL;
1070 struct dentry *de_tgt_dir = NULL;
1071 struct dentry *dchild = NULL;
1072 struct mds_obd *mds = mds_req2mds(req);
1073 struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
1074 struct ldlm_res_id src_res_id = { .name = {0} };
1075 struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
1076 int lock_mode = 0, rc = 0, cleanup_phase = 0;
1079 LASSERT(offset == 0);
1081 MDS_CHECK_RESENT(req, reconstruct_reint_link(rec, offset, req));
1083 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1084 GOTO(cleanup, rc = -ENOENT);
1086 /* Step 1: Lookup the source inode and target directory by FID */
1087 de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1089 GOTO(cleanup, rc = PTR_ERR(de_src));
1091 cleanup_phase = 1; /* source dentry */
1093 de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1094 if (IS_ERR(de_tgt_dir))
1095 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
1097 cleanup_phase = 2; /* target directory dentry */
1099 CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
1100 de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
1101 de_src->d_inode->i_ino);
1103 /* Step 2: Take the two locks */
1105 src_res_id.name[0] = de_src->d_inode->i_ino;
1106 src_res_id.name[1] = de_src->d_inode->i_generation;
1107 tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
1108 tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
1110 rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
1111 &src_lockh, &tgt_dir_lockh);
1113 GOTO(cleanup, rc = -EIO);
1115 cleanup_phase = 3; /* locks */
1117 /* Step 3: Lookup the child */
1118 dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
1119 if (IS_ERR(dchild)) {
1120 rc = PTR_ERR(dchild);
1121 if (rc != -EPERM && rc != -EACCES)
1122 CERROR("child lookup error %d\n", rc);
1126 cleanup_phase = 4; /* child dentry */
1128 if (dchild->d_inode) {
1129 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
1130 de_tgt_dir->d_inode->i_ino, rec->ur_name);
1135 /* Step 4: Do it. */
1136 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
1137 to_kdev_t(de_src->d_inode->i_sb->s_dev));
1139 handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
1140 if (IS_ERR(handle)) {
1141 rc = PTR_ERR(handle);
1145 rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
1146 if (rc && rc != -EPERM && rc != -EACCES)
1147 CERROR("vfs_link error %d\n", rc);
1149 rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
1150 handle, req, rc, 0);
1153 switch (cleanup_phase) {
1154 case 4: /* child dentry */
1158 ldlm_lock_decref(&src_lockh, lock_mode);
1159 ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
1161 memcpy(&req->rq_ack_locks[0].lock, &src_lockh,
1163 memcpy(&req->rq_ack_locks[1].lock, &tgt_dir_lockh,
1164 sizeof(tgt_dir_lockh));
1165 req->rq_ack_locks[0].mode = lock_mode;
1166 req->rq_ack_locks[1].mode = lock_mode;
1168 case 2: /* target dentry */
1170 case 1: /* source dentry */
1175 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1178 req->rq_status = rc;
1182 static void reconstruct_reint_rename(struct mds_update_record *rec,
1183 int offset, struct ptlrpc_request *req)
1185 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1186 struct mds_client_data *mcd = med->med_mcd;
1188 req->rq_transno = mcd->mcd_last_transno;
1189 req->rq_status = mcd->mcd_last_result;
1191 if (req->rq_export->exp_outstanding_reply)
1192 mds_steal_ack_locks(req->rq_export, req);
1194 LBUG(); /* don't support it yet, but it'll be fun! */
1198 static int mds_reint_rename(struct mds_update_record *rec, int offset,
1199 struct ptlrpc_request *req,
1200 struct lustre_handle *lockh)
1202 struct obd_device *obd = req->rq_export->exp_obd;
1203 struct dentry *de_srcdir = NULL;
1204 struct dentry *de_tgtdir = NULL;
1205 struct dentry *de_old = NULL;
1206 struct dentry *de_new = NULL;
1207 struct mds_obd *mds = mds_req2mds(req);
1208 struct lustre_handle dlm_handles[4];
1209 struct ldlm_res_id p1_res_id = { .name = {0} };
1210 struct ldlm_res_id p2_res_id = { .name = {0} };
1211 struct ldlm_res_id c1_res_id = { .name = {0} };
1212 struct ldlm_res_id c2_res_id = { .name = {0} };
1213 int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
1214 int cleanup_phase = 0;
1215 void *handle = NULL;
1218 LASSERT(offset == 0);
1220 MDS_CHECK_RESENT(req, reconstruct_reint_rename(rec, offset, req));
1222 de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1223 if (IS_ERR(de_srcdir))
1224 GOTO(cleanup, rc = PTR_ERR(de_srcdir));
1226 cleanup_phase = 1; /* source directory dentry */
1228 de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1229 if (IS_ERR(de_tgtdir))
1230 GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
1232 cleanup_phase = 2; /* target directory dentry */
1234 /* The idea here is that we need to get four locks in the end:
1235 * one on each parent directory, one on each child. We need to take
1236 * these locks in some kind of order (to avoid deadlocks), and the order
1237 * I selected is "increasing resource number" order. We need to take
1238 * the locks on the parent directories, however, before we can lookup
1239 * the children. Thus the following plan:
1241 * 1. Take locks on the parent(s), in order
1242 * 2. Lookup the children
1243 * 3. Take locks on the children, in order
1244 * 4. Execute the rename
1247 /* Step 1: Take locks on the parent(s), in order */
1248 p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
1249 p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
1251 p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
1252 p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
1254 rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
1255 &(dlm_handles[0]), &(dlm_handles[1]));
1259 cleanup_phase = 3; /* parent locks */
1261 /* Step 2: Lookup the children */
1262 de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
1263 if (IS_ERR(de_old)) {
1264 CERROR("old child lookup error (%*s): %ld\n",
1265 rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
1266 GOTO(cleanup, rc = PTR_ERR(de_old));
1269 cleanup_phase = 4; /* original name dentry */
1271 if (de_old->d_inode == NULL)
1272 GOTO(cleanup, rc = -ENOENT);
1274 /* sanity check for src inode */
1275 if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1276 de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1277 GOTO(cleanup, rc = -EINVAL);
1279 de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
1280 if (IS_ERR(de_new)) {
1281 CERROR("new child lookup error (%*s): %ld\n",
1282 rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
1283 GOTO(cleanup, rc = PTR_ERR(de_new));
1286 cleanup_phase = 5; /* target dentry */
1288 /* sanity check for dest inode */
1289 if (de_new->d_inode &&
1290 (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1291 de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1292 GOTO(cleanup, rc = -EINVAL);
1294 /* Step 3: Take locks on the children */
1295 c1_res_id.name[0] = de_old->d_inode->i_ino;
1296 c1_res_id.name[1] = de_old->d_inode->i_generation;
1297 if (de_new->d_inode == NULL) {
1298 flags = LDLM_FL_LOCAL_ONLY;
1299 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1300 c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
1301 &flags, ldlm_completion_ast,
1302 mds_blocking_ast, NULL,
1306 c2_res_id.name[0] = de_new->d_inode->i_ino;
1307 c2_res_id.name[1] = de_new->d_inode->i_generation;
1308 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
1316 cleanup_phase = 6; /* child locks */
1318 /* Step 4: Execute the rename */
1319 OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1320 to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
1322 handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
1324 GOTO(cleanup, rc = PTR_ERR(handle));
1327 rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
1332 rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1333 handle, req, rc, 0);
1334 switch (cleanup_phase) {
1335 case 6: /* child locks */
1337 ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1338 if (lock_count == 4)
1339 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1341 memcpy(&req->rq_ack_locks[2].lock, &(dlm_handles[2]),
1342 sizeof(dlm_handles[2]));
1343 req->rq_ack_locks[2].mode = LCK_EX;
1344 if (lock_count == 4) {
1345 memcpy(&req->rq_ack_locks[3].lock,
1346 &dlm_handles[3], sizeof(dlm_handles[3]));
1347 req->rq_ack_locks[3].mode = LCK_EX;
1350 case 5: /* target dentry */
1352 case 4: /* source dentry */
1354 case 3: /* parent locks */
1356 ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
1357 ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
1359 memcpy(&req->rq_ack_locks[0].lock, &(dlm_handles[0]),
1360 sizeof(dlm_handles[0]));
1361 memcpy(&req->rq_ack_locks[1].lock, &(dlm_handles[1]),
1362 sizeof(dlm_handles[1]));
1363 req->rq_ack_locks[0].mode = LCK_EX;
1364 req->rq_ack_locks[1].mode = LCK_EX;
1366 case 2: /* target directory dentry */
1368 case 1: /* source directry dentry */
1373 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1376 req->rq_status = rc;
1380 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1381 struct ptlrpc_request *, struct lustre_handle *);
1383 static mds_reinter reinters[REINT_MAX + 1] = {
1384 [REINT_SETATTR] mds_reint_setattr,
1385 [REINT_CREATE] mds_reint_create,
1386 [REINT_UNLINK] mds_reint_unlink,
1387 [REINT_LINK] mds_reint_link,
1388 [REINT_RENAME] mds_reint_rename,
1389 [REINT_OPEN] mds_open
1392 int mds_reint_rec(struct mds_update_record *rec, int offset,
1393 struct ptlrpc_request *req, struct lustre_handle *lockh)
1395 struct mds_obd *mds = mds_req2mds(req);
1396 struct obd_run_ctxt saved;
1400 /* checked by unpacker */
1401 LASSERT(rec->ur_opcode <= REINT_MAX &&
1402 reinters[rec->ur_opcode] != NULL);
1404 push_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1405 rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1406 pop_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);