Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/fs.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_fsfilt.h>
40
41 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
42
43 static void mds_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd, int error)
44 {
45         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
46                last_rcvd, error);
47         if (!error && last_rcvd > obd->obd_last_committed)
48                 obd->obd_last_committed = last_rcvd;
49 }
50
51 /* Assumes caller has already pushed us into the kernel context. */
52 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
53                        struct ptlrpc_request *req, int rc,
54                        __u32 op_data)
55 {
56         struct mds_export_data *med = &req->rq_export->exp_mds_data;
57         struct mds_client_data *mcd = med->med_mcd;
58         struct obd_device *obd = req->rq_export->exp_obd;
59         int started_handle = 0, err;
60         __u64 transno;
61         loff_t off;
62         ssize_t written;
63         ENTRY;
64
65         /* we don't allocate new transnos for replayed requests */
66         if (req->rq_level == LUSTRE_CONN_RECOVD)
67                 GOTO(out, rc = rc);
68
69         if (!handle) {
70                 /* if we're starting our own xaction, use our own inode */
71                 i = mds->mds_rcvd_filp->f_dentry->d_inode;
72                 handle = fsfilt_start(obd, i, FSFILT_OP_SETATTR);
73                 if (IS_ERR(handle)) {
74                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
75                         GOTO(out, rc = PTR_ERR(handle));
76                 }
77                 started_handle = 1;
78         }
79
80         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
81
82         spin_lock(&mds->mds_transno_lock);
83         transno = ++mds->mds_last_transno;
84         spin_unlock(&mds->mds_transno_lock);
85         req->rq_repmsg->transno = req->rq_transno = HTON__u64(transno);
86         mcd->mcd_last_transno = cpu_to_le64(transno);
87         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
88         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
89         mcd->mcd_last_result = cpu_to_le32(rc);
90         mcd->mcd_last_data = cpu_to_le32(op_data);
91
92         fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
93                              mds_last_rcvd_cb);
94         written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
95                                 &off);
96         CDEBUG(D_INODE, "wrote trans "LPU64" client %s at #%u: written = "
97                LPSZ"\n", transno, mcd->mcd_uuid, med->med_off, written);
98
99         if (written != sizeof(*mcd)) {
100                 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
101                 if (rc == 0) {
102                         if (written < 0)
103                                 rc = written;
104                         else
105                                 rc = -EIO;
106                 }
107         }
108
109         err = fsfilt_commit(obd, i, handle);
110         if (err) {
111                 CERROR("error committing transaction: %d\n", err);
112                 if (!rc)
113                         rc = err;
114         }
115
116         EXIT;
117  out:
118         return rc;
119 }
120
121 /* this gives the same functionality as the code between
122  * sys_chmod and inode_setattr
123  * chown_common and inode_setattr
124  * utimes and inode_setattr
125  */
126 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
127 {
128 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
129         time_t now = CURRENT_TIME;
130 #else
131         time_t now = CURRENT_TIME.tv_sec;
132 #endif
133         struct iattr *attr = &rec->ur_iattr;
134         unsigned int ia_valid = attr->ia_valid;
135         int error;
136         ENTRY;
137
138         /* only fix up attrs if the client VFS didn't already */
139         if (!(ia_valid & ATTR_RAW))
140                 RETURN(0);
141
142         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
143                 RETURN(-EPERM);
144
145 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
146         attr->ia_ctime = now;
147         if (!(ia_valid & ATTR_ATIME_SET))
148                 attr->ia_atime = now;
149         if (!(ia_valid & ATTR_MTIME_SET))
150                 attr->ia_mtime = now;
151 #else
152         attr->ia_ctime.tv_sec = now;
153         if (!(ia_valid & ATTR_ATIME_SET))
154                 attr->ia_atime.tv_sec = now;
155         if (!(ia_valid & ATTR_MTIME_SET))
156                 attr->ia_mtime.tv_sec = now;
157 #endif
158
159         /* times */
160         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME) &&
161              !(ia_valid & ATTR_ATIME_SET)) {
162                 if (rec->ur_fsuid != inode->i_uid &&
163                     (error = permission(inode,MAY_WRITE)) != 0)
164                         RETURN(error);
165         } else if (ia_valid & ATTR_UID) {
166                 /* chown */
167                 error = -EPERM;
168                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
169                         RETURN(-EPERM);
170                 if (attr->ia_uid == (uid_t) -1)
171                         attr->ia_uid = inode->i_uid;
172                 if (attr->ia_gid == (gid_t) -1)
173                         attr->ia_gid = inode->i_gid;
174                 attr->ia_mode = inode->i_mode;
175                 attr->ia_valid =  ATTR_UID | ATTR_GID | ATTR_CTIME;
176                 /*
177                  * If the user or group of a non-directory has been
178                  * changed by a non-root user, remove the setuid bit.
179                  * 19981026 David C Niemi <niemi@tux.org>
180                  *
181                  * Changed this to apply to all users, including root,
182                  * to avoid some races. This is the behavior we had in
183                  * 2.0. The check for non-root was definitely wrong
184                  * for 2.2 anyway, as it should have been using
185                  * CAP_FSETID rather than fsuid -- 19990830 SD.
186                  */
187                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
188                     !S_ISDIR(inode->i_mode)) {
189                         attr->ia_mode &= ~S_ISUID;
190                         attr->ia_valid |= ATTR_MODE;
191                 }
192                 /*
193                  * Likewise, if the user or group of a non-directory
194                  * has been changed by a non-root user, remove the
195                  * setgid bit UNLESS there is no group execute bit
196                  * (this would be a file marked for mandatory
197                  * locking).  19981026 David C Niemi <niemi@tux.org>
198                  *
199                  * Removed the fsuid check (see the comment above) --
200                  * 19990830 SD.
201                  */
202                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
203                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
204                         attr->ia_mode &= ~S_ISGID;
205                         attr->ia_valid |= ATTR_MODE;
206                 }
207         } else if (ia_valid & ATTR_MODE) {
208                 int mode = attr->ia_mode;
209                 /* chmod */
210                 if (attr->ia_mode == (mode_t) -1)
211                         attr->ia_mode = inode->i_mode;
212                 attr->ia_mode =
213                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
214         }
215         RETURN(0);
216 }
217
218 static void reconstruct_reint_setattr(struct mds_update_record *rec,
219                                       int offset, struct ptlrpc_request *req)
220 {
221         struct mds_export_data *med = &req->rq_export->exp_mds_data;
222         struct mds_client_data *mcd = med->med_mcd;
223         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
224         struct dentry *de;
225         struct mds_body *body;
226
227         req->rq_transno = mcd->mcd_last_transno;
228         req->rq_status = mcd->mcd_last_result;
229
230         if (med->med_outstanding_reply)
231                 mds_steal_ack_locks(med, req);
232
233         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
234         if (IS_ERR(de)) {
235                 LASSERT(PTR_ERR(de) == req->rq_status);
236                 return;
237         }
238
239         body = lustre_msg_buf(req->rq_repmsg, 0);
240         mds_pack_inode2fid(&body->fid1, de->d_inode);
241         mds_pack_inode2body(body, de->d_inode);
242
243         l_dput(de);
244 }
245
246 /* In the raw-setattr case, we lock the child inode.
247  * In the write-back case or if being called from open, the client holds a lock
248  * already.
249  *
250  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
251 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
252                              struct ptlrpc_request *req,
253                              struct lustre_handle *lh)
254 {
255         struct mds_obd *mds = mds_req2mds(req);
256         struct obd_device *obd = req->rq_export->exp_obd;
257         struct mds_body *body;
258         struct dentry *de;
259         struct inode *inode = NULL;
260         struct lustre_handle lockh;
261         void *handle = NULL;
262         int rc = 0, cleanup_phase = 0, err, locked = 0;
263         ENTRY;
264
265         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
266
267         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
268                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
269                 if (IS_ERR(de))
270                         GOTO(cleanup, rc = PTR_ERR(de));
271         } else {
272                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
273                                            &lockh);
274                 if (IS_ERR(de))
275                         GOTO(cleanup, rc = PTR_ERR(de));
276                 locked = 1;
277         }
278
279         cleanup_phase = 1;
280         inode = de->d_inode;
281         LASSERT(inode);
282
283         CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
284
285         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
286                        to_kdev_t(inode->i_sb->s_dev));
287
288         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
289         if (IS_ERR(handle)) {
290                 rc = PTR_ERR(handle);
291                 handle = NULL;
292                 GOTO(cleanup, rc);
293         }
294
295         rc = mds_fix_attr(inode, rec);
296         if (rc)
297                 GOTO(cleanup, rc);
298
299         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr);
300         if (rc == 0 && S_ISREG(inode->i_mode) &&
301             req->rq_reqmsg->bufcount > 1) {
302                 rc = fsfilt_set_md(obd, inode, handle,
303                                    lustre_msg_buf(req->rq_reqmsg, 1),
304                                    req->rq_reqmsg->buflens[1]);
305         }
306
307         body = lustre_msg_buf(req->rq_repmsg, 0);
308         mds_pack_inode2fid(&body->fid1, inode);
309         mds_pack_inode2body(body, inode);
310
311         EXIT;
312  cleanup:
313         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
314         switch(cleanup_phase) {
315         case 1:
316                 l_dput(de);
317                 if (locked) {
318                         if (rc) {
319                                 ldlm_lock_decref(&lockh, LCK_PW);
320                         } else {
321                                 memcpy(&req->rq_ack_locks[0].lock, &lockh,
322                                        sizeof(lockh));
323                                 req->rq_ack_locks[0].mode = LCK_PW;
324                         }
325                 }
326         case 0:
327                 break;
328         default:
329                 LBUG();
330         }
331         if (err && !rc)
332                 rc = err;
333
334         req->rq_status = rc;
335         return 0;
336 }
337
338 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
339                                      struct ptlrpc_request *req)
340 {
341         struct mds_export_data *med = &req->rq_export->exp_mds_data;
342         struct mds_client_data *mcd = med->med_mcd;
343         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
344         struct dentry *parent, *child;
345         struct mds_body *body;
346         
347         req->rq_transno = mcd->mcd_last_transno;
348         req->rq_status = mcd->mcd_last_result;
349
350         if (med->med_outstanding_reply)
351                 mds_steal_ack_locks(med, req);
352         
353         if (req->rq_status)
354                 return;
355
356         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
357         LASSERT(!IS_ERR(parent));
358         child = lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
359         LASSERT(!IS_ERR(child));
360         body = lustre_msg_buf(req->rq_repmsg, offset);
361         mds_pack_inode2fid(&body->fid1, child->d_inode);
362         mds_pack_inode2body(body, child->d_inode);
363         l_dput(parent);
364         l_dput(child);
365 }
366
367 static int mds_reint_create(struct mds_update_record *rec, int offset,
368                             struct ptlrpc_request *req,
369                             struct lustre_handle *lh)
370 {
371         struct dentry *de = NULL;
372         struct mds_obd *mds = mds_req2mds(req);
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct dentry *dchild = NULL;
375         struct inode *dir = NULL;
376         void *handle = NULL;
377         struct lustre_handle lockh;
378         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
379         int created = 0;
380         ENTRY;
381
382         LASSERT(offset == 0);
383         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
384
385         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
386
387         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
388                 GOTO(cleanup, rc = -ESTALE);
389
390         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
391         if (IS_ERR(de)) {
392                 rc = PTR_ERR(de);
393                 CERROR("parent lookup error %d\n", rc);
394                 GOTO(cleanup, rc);
395         }
396         cleanup_phase = 1; /* locked parent dentry */
397         dir = de->d_inode;
398         LASSERT(dir);
399         CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
400                dir->i_ino, rec->ur_name, rec->ur_mode);
401
402         ldlm_lock_dump_handle(D_OTHER, &lockh);
403
404         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
405         if (IS_ERR(dchild)) {
406                 rc = PTR_ERR(dchild);
407                 CERROR("child lookup error %d\n", rc);
408                 GOTO(cleanup, rc);
409         }
410
411         cleanup_phase = 2; /* child dentry */
412
413         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
414                        to_kdev_t(dir->i_sb->s_dev));
415
416         if (dir->i_mode & S_ISGID) {
417                 rec->ur_gid = dir->i_gid;
418                 if (S_ISDIR(rec->ur_mode))
419                         rec->ur_mode |= S_ISGID;
420         }
421
422         if (rec->ur_fid2->id)
423                 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
424         else
425                 LASSERT(!(rec->ur_opcode & REINT_REPLAYING));
426
427         switch (type) {
428         case S_IFREG:{
429                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
430                 if (IS_ERR(handle))
431                         GOTO(cleanup, rc = PTR_ERR(handle));
432                 rc = vfs_create(dir, dchild, rec->ur_mode);
433                 EXIT;
434                 break;
435         }
436         case S_IFDIR:{
437                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
438                 if (IS_ERR(handle))
439                         GOTO(cleanup, rc = PTR_ERR(handle));
440                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
441                 EXIT;
442                 break;
443         }
444         case S_IFLNK:{
445                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
446                 if (IS_ERR(handle))
447                         GOTO(cleanup, rc = PTR_ERR(handle));
448                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
449                 EXIT;
450                 break;
451         }
452         case S_IFCHR:
453         case S_IFBLK:
454         case S_IFIFO:
455         case S_IFSOCK:{
456                 int rdev = rec->ur_rdev;
457                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
458                 if (IS_ERR(handle))
459                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
460                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
461                 EXIT;
462                 break;
463         }
464         default:
465                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
466                 GOTO(cleanup, rc = -EINVAL);
467         }
468
469         /* In case we stored the desired inum in here, we want to clean up.
470          * We also do this in the cleanup block, for the error cases.
471          */
472         dchild->d_fsdata = NULL;
473
474         if (rc) {
475                 CDEBUG(D_INODE, "error during create: %d\n", rc);
476                 GOTO(cleanup, rc);
477         } else {
478                 struct iattr iattr;
479                 struct inode *inode = dchild->d_inode;
480                 struct mds_body *body;
481
482                 created = 1;
483 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
484                 iattr.ia_atime = rec->ur_time;
485                 iattr.ia_ctime = rec->ur_time;
486                 iattr.ia_mtime = rec->ur_time;
487 #else
488                 iattr.ia_atime.tv_sec = rec->ur_time;
489                 iattr.ia_ctime.tv_sec = rec->ur_time;
490                 iattr.ia_mtime.tv_sec = rec->ur_time;
491 #endif
492                 iattr.ia_uid = rec->ur_uid;
493                 iattr.ia_gid = rec->ur_gid;
494                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
495                         ATTR_MTIME | ATTR_CTIME;
496
497                 if (rec->ur_fid2->id) {
498                         LASSERT(rec->ur_fid2->id == inode->i_ino);
499                         inode->i_generation = rec->ur_fid2->generation;
500                         /* Dirtied and committed by the upcoming setattr. */
501                         CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
502                                inode->i_ino, inode->i_generation);
503                 } else {
504                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
505                                inode->i_ino, inode->i_generation);
506                 }
507
508                 rc = fsfilt_setattr(obd, dchild, handle, &iattr);
509                 if (rc) {
510                         CERROR("error on setattr: rc = %d\n", rc);
511                         /* XXX should we abort here in case of error? */
512                 }
513
514                 body = lustre_msg_buf(req->rq_repmsg, offset);
515                 mds_pack_inode2fid(&body->fid1, inode);
516                 mds_pack_inode2body(body, inode);
517         }
518         EXIT;
519
520 cleanup:
521         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
522                 
523         if (rc && created) {
524                 /* Destroy the file we just created.  This should not need
525                  * extra journal credits, as we have already modified all of
526                  * the blocks needed in order to create the file in the first
527                  * place.
528                  */
529                 switch (type) {
530                 case S_IFDIR:
531                         err = vfs_rmdir(dir, dchild);
532                         if (err)
533                                 CERROR("rmdir in error path: %d\n", err);
534                         break;
535                 default:
536                         err = vfs_unlink(dir, dchild);
537                         if (err)
538                                 CERROR("unlink in error path: %d\n", err);
539                         break;
540                 }
541         } else {
542                 rc = err;
543         }
544         switch (cleanup_phase) {
545         case 2: /* child dentry */
546                 dchild->d_fsdata = NULL;
547                 l_dput(dchild);
548         case 1: /* locked parent dentry */
549                 if (rc) {
550                         ldlm_lock_decref(&lockh, LCK_PW);
551                 } else {
552                         memcpy(&req->rq_ack_locks[0].lock, &lockh,
553                                sizeof(lockh));
554                         req->rq_ack_locks[0].mode = LCK_PW;
555                 }
556                 l_dput(de);
557         case 0:
558                 break;
559         default:
560                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
561                 LBUG();
562         }
563         req->rq_status = rc;
564         return 0;
565 }
566
567 /* This function doesn't use ldlm_match_or_enqueue because we're always called
568  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
569  * because they take the place of local semaphores.
570  *
571  * Two locks are taken in numerical order */
572 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
573                           struct ldlm_res_id *p1_res_id,
574                           struct ldlm_res_id *p2_res_id,
575                           struct lustre_handle *p1_lockh,
576                           struct lustre_handle *p2_lockh)
577 {
578         struct ldlm_res_id res_id[2];
579         struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
580         int rc, flags;
581         ENTRY;
582
583         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
584
585         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
586                p1_res_id[0].name[0], p2_res_id[0].name[0]);
587
588         if (p1_res_id->name[0] < p2_res_id->name[0]) {
589                 handles[0] = p1_lockh;
590                 handles[1] = p2_lockh;
591                 res_id[0] = *p1_res_id;
592                 res_id[1] = *p2_res_id;
593         } else {
594                 handles[1] = p1_lockh;
595                 handles[0] = p2_lockh;
596                 res_id[1] = *p1_res_id;
597                 res_id[0] = *p2_res_id;
598         }
599
600         CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
601                p1_res_id[0].name[0], p2_res_id[0].name[0]);
602
603         flags = LDLM_FL_LOCAL_ONLY;
604         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
605                               LDLM_PLAIN, NULL, 0, lock_mode, &flags,
606                               ldlm_completion_ast, mds_blocking_ast, NULL,
607                               NULL, handles[0]);
608         if (rc != ELDLM_OK)
609                 RETURN(-EIO);
610         ldlm_lock_dump_handle(D_OTHER, handles[0]);
611
612         if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
613                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
614                 ldlm_lock_addref(handles[1], lock_mode);
615         } else {
616                 flags = LDLM_FL_LOCAL_ONLY;
617                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
618                                       res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
619                                       &flags, ldlm_completion_ast,
620                                       mds_blocking_ast, NULL, 0, handles[1]);
621                 if (rc != ELDLM_OK) {
622                         ldlm_lock_decref(handles[0], lock_mode);
623                         RETURN(-EIO);
624                 }
625         }
626         ldlm_lock_dump_handle(D_OTHER, handles[1]);
627
628         RETURN(0);
629 }
630
631 static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
632                                     struct ptlrpc_request *req,
633                                     struct lustre_handle *child_lockh)
634 {
635         struct mds_export_data *med = &req->rq_export->exp_mds_data;
636         struct mds_client_data *mcd = med->med_mcd;
637
638         req->rq_transno = mcd->mcd_last_transno;
639         req->rq_status = mcd->mcd_last_result;
640
641         if (med->med_outstanding_reply)
642                 mds_steal_ack_locks(med, req);
643         
644         DEBUG_REQ(D_ERROR, req,
645                   "can't get EA for reconstructed unlink, leaking OST inodes");
646 }
647
648 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
649                             struct ptlrpc_request *req,
650                             struct lustre_handle *child_lockh)
651 {
652         struct dentry *dir_de = NULL;
653         struct dentry *dchild = NULL;
654         struct mds_obd *mds = mds_req2mds(req);
655         struct obd_device *obd = req->rq_export->exp_obd;
656         struct mds_body *body = NULL;
657         struct inode *dir_inode = NULL, *child_inode;
658         struct lustre_handle parent_lockh;
659         void *handle = NULL;
660         struct ldlm_res_id child_res_id = { .name = {0} };
661         char *name;
662         int namelen, rc = 0, flags = 0, return_lock = 0;
663         int cleanup_phase = 0;
664         ENTRY;
665
666         MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req, 
667                                                        child_lockh));
668
669         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
670                 GOTO(cleanup, rc = -ENOENT);
671
672         /* Step 1: Lookup the parent by FID */
673         dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
674                                        &parent_lockh);
675         if (IS_ERR(dir_de))
676                 GOTO(cleanup, rc = PTR_ERR(dir_de));
677         dir_inode = dir_de->d_inode;
678         LASSERT(dir_inode);
679
680         cleanup_phase = 1; /* Have parent dentry lock */
681
682         /* Step 2: Lookup the child */
683         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
684         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
685
686         dchild = lookup_one_len(name, dir_de, namelen);
687         if (IS_ERR(dchild))
688                 GOTO(cleanup, rc = PTR_ERR(dchild));
689         
690         cleanup_phase = 2; /* child dentry */
691
692         child_inode = dchild->d_inode;
693         if (child_inode == NULL) {
694                 if (rec->ur_opcode & REINT_REPLAYING) {
695                         CDEBUG(D_INODE,
696                                "child missing (%lu/%s); OK for REPLAYING\n",
697                                dir_inode->i_ino, rec->ur_name);
698                         rc = 0;
699                 } else {
700                         CDEBUG(D_INODE,
701                                "child doesn't exist (dir %lu, name %s)\n",
702                                dir_inode->i_ino, rec->ur_name);
703                         rc = -ENOENT;
704                 }
705                 GOTO(cleanup, rc);
706         }
707
708         DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
709                   dir_inode->i_ino, child_inode->i_ino);
710
711         /* Step 3: Get a lock on the child */
712         child_res_id.name[0] = child_inode->i_ino;
713         child_res_id.name[1] = child_inode->i_generation;
714
715         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
716                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
717                               &flags, ldlm_completion_ast, mds_blocking_ast,
718                               NULL, NULL, child_lockh);
719         if (rc != ELDLM_OK)
720                 GOTO(cleanup, rc);
721
722         cleanup_phase = 3; /* child lock */
723
724         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
725                        to_kdev_t(dir_inode->i_sb->s_dev));
726
727         /* Slightly magical; see ldlm_intent_policy */
728         if (offset)
729                 offset = 1;
730
731         body = lustre_msg_buf(req->rq_repmsg, offset);
732
733         /* Step 4: Do the unlink: client decides between rmdir/unlink!
734          * (bug 72) */
735         switch (rec->ur_mode & S_IFMT) {
736         case S_IFDIR:
737                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
738                 if (IS_ERR(handle))
739                         GOTO(cleanup, rc = PTR_ERR(handle));
740                 rc = vfs_rmdir(dir_inode, dchild);
741                 break;
742         case S_IFREG:
743                 /* If this is the last reference to this inode, get the OBD EA
744                  * data first so the client can destroy OST objects */
745                 if ((child_inode->i_mode & S_IFMT) == S_IFREG &&
746                     child_inode->i_nlink == 1) {
747                         mds_pack_inode2fid(&body->fid1, child_inode);
748                         mds_pack_inode2body(body, child_inode);
749                         mds_pack_md(obd, req->rq_repmsg, offset + 1,
750                                     body, child_inode);
751                         if (body->valid & OBD_MD_FLEASIZE)
752                                 return_lock = 1;
753                 }
754                 /* no break */
755         case S_IFLNK:
756         case S_IFCHR:
757         case S_IFBLK:
758         case S_IFIFO:
759         case S_IFSOCK:
760                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
761                 if (IS_ERR(handle))
762                         GOTO(cleanup, rc = PTR_ERR(handle));
763                 rc = vfs_unlink(dir_inode, dchild);
764                 break;
765         default:
766                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
767                 LBUG();
768                 GOTO(cleanup, rc = -EINVAL);
769         }
770
771  cleanup:
772         rc = mds_finish_transno(mds, dir_inode, handle, req, rc, 0);
773         if (rc && body) {
774                 /* Don't unlink the OST objects if the MDS unlink failed */
775                 body->valid = 0;
776         }
777         switch(cleanup_phase) {
778             case 3: /* child lock */
779                 if (rc != 0 || return_lock == 0)
780                         ldlm_lock_decref(child_lockh, LCK_EX);
781             case 2: /* child dentry */
782                 l_dput(dchild);
783             case 1: /* parent dentry and lock */
784                 if (rc) {
785                         ldlm_lock_decref(&parent_lockh, LCK_EX);
786                 } else {
787                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
788                                sizeof(parent_lockh));
789                         req->rq_ack_locks[0].mode = LCK_EX;
790                 }
791                 l_dput(dir_de);
792             case 0:
793                 break;
794             default:
795                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
796                 LBUG();
797         }
798         req->rq_status = rc;
799         return 0;
800 }
801
802 static void reconstruct_reint_link(struct mds_update_record *rec, int offset,
803                                    struct ptlrpc_request *req)
804 {
805         struct mds_export_data *med = &req->rq_export->exp_mds_data;
806         struct mds_client_data *mcd = med->med_mcd;
807
808         req->rq_transno = mcd->mcd_last_transno;
809         req->rq_status = mcd->mcd_last_result;
810         
811         if (med->med_outstanding_reply)
812                 mds_steal_ack_locks(med, req);
813         else
814                 LBUG(); /* don't support it yet, but it'll be fun! */
815 }
816
817 static int mds_reint_link(struct mds_update_record *rec, int offset,
818                           struct ptlrpc_request *req,
819                           struct lustre_handle *lh)
820 {
821         struct obd_device *obd = req->rq_export->exp_obd;
822         struct dentry *de_src = NULL;
823         struct dentry *de_tgt_dir = NULL;
824         struct dentry *dchild = NULL;
825         struct mds_obd *mds = mds_req2mds(req);
826         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
827         struct ldlm_res_id src_res_id = { .name = {0} };
828         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
829         int lock_mode = 0, rc = 0, cleanup_phase = 0;
830         ENTRY;
831
832         MDS_CHECK_RESENT(req, reconstruct_reint_link(rec, offset, req));
833
834         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
835                 GOTO(cleanup, rc = -ENOENT);
836
837         /* Step 1: Lookup the source inode and target directory by FID */
838         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
839         if (IS_ERR(de_src))
840                 GOTO(cleanup, rc = PTR_ERR(de_src));
841
842         cleanup_phase = 1; /* source dentry */
843
844         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
845         if (IS_ERR(de_tgt_dir))
846                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
847
848         cleanup_phase = 2; /* target directory dentry */
849
850         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
851                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
852                de_src->d_inode->i_ino);
853
854         /* Step 2: Take the two locks */
855         lock_mode = LCK_EX;
856         src_res_id.name[0] = de_src->d_inode->i_ino;
857         src_res_id.name[1] = de_src->d_inode->i_generation;
858         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
859         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
860
861         rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
862                                    &src_lockh, &tgt_dir_lockh);
863         if (rc != ELDLM_OK)
864                 GOTO(cleanup, rc = -EIO);
865
866         cleanup_phase = 3; /* locks */
867
868         /* Step 3: Lookup the child */
869         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
870         if (IS_ERR(dchild)) {
871                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
872                 GOTO(cleanup, rc = PTR_ERR(dchild));
873         }
874
875         cleanup_phase = 4; /* child dentry */
876
877         if (dchild->d_inode) {
878                 if (rec->ur_opcode & REINT_REPLAYING) {
879                         /* XXX verify that the link is to the the right file? */
880                         CDEBUG(D_INODE,
881                                "child exists (dir %lu, name %s) (REPLAYING)\n",
882                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
883                         rc = 0;
884                 } else {
885                         CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
886                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
887                         rc = -EEXIST;
888                 }
889                 GOTO(cleanup, rc);
890         }
891
892         /* Step 4: Do it. */
893         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
894                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
895
896         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
897         if (IS_ERR(handle)) {
898                 rc = PTR_ERR(handle);
899                 GOTO(cleanup, rc);
900         }
901
902         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
903         if (rc)
904                 CERROR("link error %d\n", rc);
905 cleanup:
906         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
907                                 handle, req, rc, 0);
908         EXIT;
909
910         switch (cleanup_phase) {
911         case 4: /* child dentry */
912                 l_dput(dchild);
913         case 3: /* locks */
914                 if (rc) {
915                         ldlm_lock_decref(&src_lockh, lock_mode);
916                         ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
917                 } else {
918                         memcpy(&req->rq_ack_locks[0].lock, &src_lockh,
919                                sizeof(src_lockh));
920                         memcpy(&req->rq_ack_locks[1].lock, &tgt_dir_lockh,
921                                sizeof(tgt_dir_lockh));
922                         req->rq_ack_locks[0].mode = lock_mode;
923                         req->rq_ack_locks[1].mode = lock_mode;
924                 }
925         case 2: /* target dentry */
926                 l_dput(de_tgt_dir);
927         case 1: /* source dentry */
928                 l_dput(de_src);
929         case 0:
930                 break;
931         default:
932                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
933                 LBUG();
934         }
935         req->rq_status = rc;
936         return 0;
937 }
938
939 static void reconstruct_reint_rename(struct mds_update_record *rec,
940                                      int offset, struct ptlrpc_request *req)
941 {
942         struct mds_export_data *med = &req->rq_export->exp_mds_data;
943         struct mds_client_data *mcd = med->med_mcd;
944
945         req->rq_transno = mcd->mcd_last_transno;
946         req->rq_status = mcd->mcd_last_result;
947         
948         if (med->med_outstanding_reply)
949                 mds_steal_ack_locks(med, req);
950         else
951                 LBUG(); /* don't support it yet, but it'll be fun! */
952
953 }
954
955 static int mds_reint_rename(struct mds_update_record *rec, int offset,
956                             struct ptlrpc_request *req,
957                             struct lustre_handle *lockh)
958 {
959         struct obd_device *obd = req->rq_export->exp_obd;
960         struct dentry *de_srcdir = NULL;
961         struct dentry *de_tgtdir = NULL;
962         struct dentry *de_old = NULL;
963         struct dentry *de_new = NULL;
964         struct mds_obd *mds = mds_req2mds(req);
965         struct lustre_handle dlm_handles[4];
966         struct ldlm_res_id p1_res_id = { .name = {0} };
967         struct ldlm_res_id p2_res_id = { .name = {0} };
968         struct ldlm_res_id c1_res_id = { .name = {0} };
969         struct ldlm_res_id c2_res_id = { .name = {0} };
970         int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
971         int cleanup_phase = 0;
972         void *handle = NULL;
973         ENTRY;
974
975         MDS_CHECK_RESENT(req, reconstruct_reint_rename(rec, offset, req));
976
977         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
978         if (IS_ERR(de_srcdir))
979                 GOTO(cleanup, rc = PTR_ERR(de_srcdir));
980         
981         cleanup_phase = 1; /* source directory dentry */
982
983         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
984         if (IS_ERR(de_tgtdir))
985                 GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
986
987         cleanup_phase = 2; /* target directory dentry */
988
989         /* The idea here is that we need to get four locks in the end:
990          * one on each parent directory, one on each child.  We need to take
991          * these locks in some kind of order (to avoid deadlocks), and the order
992          * I selected is "increasing resource number" order.  We need to take
993          * the locks on the parent directories, however, before we can lookup
994          * the children.  Thus the following plan:
995          *
996          * 1. Take locks on the parent(s), in order
997          * 2. Lookup the children
998          * 3. Take locks on the children, in order
999          * 4. Execute the rename
1000          */
1001
1002         /* Step 1: Take locks on the parent(s), in order */
1003         p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
1004         p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
1005
1006         p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
1007         p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
1008
1009         rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
1010                                    &(dlm_handles[0]), &(dlm_handles[1]));
1011         if (rc != ELDLM_OK)
1012                 GOTO(cleanup, rc);
1013
1014         cleanup_phase = 3; /* parent locks */
1015
1016         /* Step 2: Lookup the children */
1017         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
1018         if (IS_ERR(de_old)) {
1019                 CERROR("old child lookup error (%*s): %ld\n",
1020                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
1021                 GOTO(cleanup, rc = PTR_ERR(de_old));
1022         }
1023
1024         cleanup_phase = 4; /* original name dentry */
1025
1026         if (de_old->d_inode == NULL)
1027                 GOTO(cleanup, rc = -ENOENT);
1028
1029         /* sanity check for src inode */
1030         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1031             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1032                 GOTO(cleanup, rc = -EINVAL);
1033
1034         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
1035         if (IS_ERR(de_new)) {
1036                 CERROR("new child lookup error (%*s): %ld\n",
1037                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
1038                 GOTO(cleanup, rc = PTR_ERR(de_new));
1039         }
1040
1041         cleanup_phase = 5; /* target dentry */
1042
1043         /* sanity check for dest inode */
1044         if (de_new->d_inode &&
1045             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1046             de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1047                 GOTO(cleanup, rc = -EINVAL);
1048
1049         /* Step 3: Take locks on the children */
1050         c1_res_id.name[0] = de_old->d_inode->i_ino;
1051         c1_res_id.name[1] = de_old->d_inode->i_generation;
1052         if (de_new->d_inode == NULL) {
1053                 flags = LDLM_FL_LOCAL_ONLY;
1054                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1055                                       c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
1056                                       &flags, ldlm_completion_ast,
1057                                       mds_blocking_ast, NULL, NULL,
1058                                       &(dlm_handles[2]));
1059                 lock_count = 3;
1060         } else {
1061                 c2_res_id.name[0] = de_new->d_inode->i_ino;
1062                 c2_res_id.name[1] = de_new->d_inode->i_generation;
1063                 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
1064                                            &(dlm_handles[2]),
1065                                            &(dlm_handles[3]));
1066                 lock_count = 4;
1067         }
1068         if (rc != ELDLM_OK)
1069                 GOTO(cleanup, rc);
1070
1071         cleanup_phase = 6; /* child locks */
1072
1073         /* Step 4: Execute the rename */
1074         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1075                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
1076
1077         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
1078         if (IS_ERR(handle))
1079                 GOTO(cleanup, rc = PTR_ERR(handle));
1080
1081         lock_kernel();
1082         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
1083                         NULL);
1084         unlock_kernel();
1085
1086         EXIT;
1087 cleanup:
1088         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1089                                 handle, req, rc, 0);
1090         switch (cleanup_phase) {
1091         case 6: /* child locks */
1092                 if (rc) {
1093                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1094                         if (lock_count == 4)
1095                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1096                 } else {
1097                         memcpy(&req->rq_ack_locks[2].lock, &(dlm_handles[2]),
1098                                sizeof(dlm_handles[2]));
1099                         req->rq_ack_locks[2].mode = LCK_EX;
1100                         if (lock_count == 4) {
1101                                 memcpy(&req->rq_ack_locks[3].lock,
1102                                        &dlm_handles[3], sizeof(dlm_handles[3]));
1103                                 req->rq_ack_locks[3].mode = LCK_EX;
1104                         }
1105                 }
1106         case 5: /* target dentry */
1107                 l_dput(de_new);
1108         case 4: /* source dentry */
1109                 l_dput(de_old);
1110         case 3: /* parent locks */
1111                 if (rc) {
1112                         ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
1113                         ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
1114                 } else {
1115                         memcpy(&req->rq_ack_locks[0].lock, &(dlm_handles[0]),
1116                                sizeof(dlm_handles[0]));
1117                         memcpy(&req->rq_ack_locks[1].lock, &(dlm_handles[1]),
1118                                sizeof(dlm_handles[1]));
1119                         req->rq_ack_locks[0].mode = LCK_EX;
1120                         req->rq_ack_locks[1].mode = LCK_EX;
1121                 }
1122         case 2: /* target directory dentry */
1123                 l_dput(de_tgtdir);
1124         case 1: /* source directry dentry */
1125                 l_dput(de_srcdir);
1126         case 0:
1127                 break;
1128         default:
1129                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1130                 LBUG();
1131         }
1132         req->rq_status = rc;
1133         return 0;
1134 }
1135
1136 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1137                            struct ptlrpc_request *, struct lustre_handle *);
1138
1139 static mds_reinter reinters[REINT_MAX + 1] = {
1140         [REINT_SETATTR] mds_reint_setattr,
1141         [REINT_CREATE] mds_reint_create,
1142         [REINT_UNLINK] mds_reint_unlink,
1143         [REINT_LINK] mds_reint_link,
1144         [REINT_RENAME] mds_reint_rename,
1145         [REINT_OPEN] mds_open
1146 };
1147
1148 int mds_reint_rec(struct mds_update_record *rec, int offset,
1149                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1150 {
1151         struct mds_obd *mds = mds_req2mds(req);
1152         struct obd_run_ctxt saved;
1153         struct obd_ucred uc;
1154         int realop = rec->ur_opcode & REINT_OPCODE_MASK, rc;
1155         ENTRY;
1156
1157         if (realop < 1 || realop > REINT_MAX) {
1158                 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
1159                        rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
1160                 rc = req->rq_status = -EINVAL;
1161                 RETURN(rc);
1162         }
1163
1164         uc.ouc_fsuid = rec->ur_fsuid;
1165         uc.ouc_fsgid = rec->ur_fsgid;
1166         uc.ouc_cap = rec->ur_cap;
1167         uc.ouc_suppgid1 = rec->ur_suppgid1;
1168         uc.ouc_suppgid2 = rec->ur_suppgid2;
1169
1170         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1171         rc = reinters[realop] (rec, offset, req, lockh);
1172         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1173
1174         RETURN(rc);
1175 }