Whamcloud - gitweb
31891a190c852183afa642aebb667f172aebf5b2
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/fs.h>
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include <linux/obd.h>
37 #include <linux/lustre_lib.h>
38 #include <linux/lustre_idl.h>
39 #include <linux/lustre_mds.h>
40 #include <linux/lustre_dlm.h>
41 #include <linux/lustre_fsfilt.h>
42
43 #include "mds_internal.h"
44
45 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
46                    int error)
47 {
48         obd_transno_commit_cb(obd, transno, error);
49 }
50
51 struct mds_logcancel_data {
52         struct lov_mds_md      *mlcd_lmm;
53         int                     mlcd_size;
54         int                     mlcd_cookielen;
55         int                     mlcd_eadatalen;
56         struct llog_cookie      mlcd_cookies[0];
57 };
58
59
60 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
61                                   void *cb_data, int error)
62 {
63         struct mds_logcancel_data *mlcd = cb_data;
64         struct lov_stripe_md *lsm = NULL;
65 #ifdef ENABLE_ORPHANS
66         struct llog_ctxt *ctxt;
67 #endif
68         int rc;
69
70         obd_transno_commit_cb(obd, transno, error);
71
72         CDEBUG(D_HA, "cancelling %d cookies\n",
73                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
74
75         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, mlcd->mlcd_lmm,
76                           mlcd->mlcd_eadatalen);
77         if (rc < 0) {
78                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
79                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
80                        rc);
81         } else {
82 #ifdef ENABLE_ORPHANS
83                 ///* XXX 0 normally, SENDNOW for debug */);
84                 ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
85                 rc = llog_cancel(ctxt, lsm,
86                                          mlcd->mlcd_cookielen /
87                                          sizeof(*mlcd->mlcd_cookies),
88                                          mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
89                 if (rc)
90                         CERROR("error cancelling %d log cookies: rc %d\n",
91                                (int)(mlcd->mlcd_cookielen /
92                                      sizeof(*mlcd->mlcd_cookies)), rc);
93 #endif
94         }
95
96         OBD_FREE(mlcd, mlcd->mlcd_size);
97 }
98
99 /* Assumes caller has already pushed us into the kernel context. */
100 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
101                        struct ptlrpc_request *req, int rc, __u32 op_data)
102 {
103         struct mds_export_data *med = &req->rq_export->exp_mds_data;
104         struct mds_client_data *mcd = med->med_mcd;
105         struct obd_device *obd = req->rq_export->exp_obd;
106         int err;
107         __u64 transno;
108         loff_t off;
109         int log_pri = D_HA;
110         ENTRY;
111
112         /* if the export has already been failed, we have no last_rcvd slot */
113         if (req->rq_export->exp_failed) {
114                 CERROR("committing transaction for disconnected client\n");
115                 if (handle)
116                         GOTO(commit, rc);
117                 RETURN(rc);
118         }
119
120         if (IS_ERR(handle))
121                 RETURN(rc);
122
123         if (handle == NULL) {
124                 /* if we're starting our own xaction, use our own inode */
125                 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
126                 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
127                 if (IS_ERR(handle)) {
128                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
129                         RETURN(PTR_ERR(handle));
130                 }
131         }
132
133         off = med->med_off;
134
135         transno = req->rq_reqmsg->transno;
136         if (transno == 0) {
137                 spin_lock(&mds->mds_transno_lock);
138                 transno = ++mds->mds_last_transno;
139                 spin_unlock(&mds->mds_transno_lock);
140         } else {
141                 spin_lock(&mds->mds_transno_lock);
142                 if (transno > mds->mds_last_transno)
143                         mds->mds_last_transno = transno;
144                 spin_unlock(&mds->mds_transno_lock);
145         }
146         req->rq_repmsg->transno = req->rq_transno = transno;
147         mcd->mcd_last_transno = cpu_to_le64(transno);
148         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
149         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
150         mcd->mcd_last_result = cpu_to_le32(rc);
151         mcd->mcd_last_data = cpu_to_le32(op_data);
152
153         fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
154                               mds_commit_cb, NULL);
155         err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
156                                   &off, 0);
157
158         if (err) {
159                 log_pri = D_ERROR;
160                 if (rc == 0)
161                         rc = err;
162         }
163
164         DEBUG_REQ(log_pri, req,
165                   "wrote trans #"LPU64" client %s at idx %u: err = %d\n",
166                   transno, mcd->mcd_uuid, med->med_idx, err);
167
168         err = mds_lov_write_objids(obd);
169         if (err) {
170                 log_pri = D_ERROR;
171                 if (rc == 0)
172                         rc = err;
173         }
174         CDEBUG(log_pri, "wrote objids: err = %d\n", err);
175
176 commit:
177         err = fsfilt_commit(obd, inode, handle, 0);
178         if (err) {
179                 CERROR("error committing transaction: %d\n", err);
180                 if (!rc)
181                         rc = err;
182         }
183
184         RETURN(rc);
185 }
186
187 /* this gives the same functionality as the code between
188  * sys_chmod and inode_setattr
189  * chown_common and inode_setattr
190  * utimes and inode_setattr
191  */
192 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
193 {
194         time_t now = LTIME_S(CURRENT_TIME);
195         struct iattr *attr = &rec->ur_iattr;
196         unsigned int ia_valid = attr->ia_valid;
197         int error;
198         ENTRY;
199
200         /* only fix up attrs if the client VFS didn't already */
201         if (!(ia_valid & ATTR_RAW))
202                 RETURN(0);
203
204         if (!(ia_valid & ATTR_CTIME_SET))
205                 LTIME_S(attr->ia_ctime) = now;
206         if (!(ia_valid & ATTR_ATIME_SET))
207                 LTIME_S(attr->ia_atime) = now;
208         if (!(ia_valid & ATTR_MTIME_SET))
209                 LTIME_S(attr->ia_mtime) = now;
210
211         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
212                 RETURN(-EPERM);
213
214         /* times */
215         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
216                 if (rec->ur_fsuid != inode->i_uid &&
217                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
218                         RETURN(error);
219         }
220
221         if (ia_valid & ATTR_SIZE) {
222                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
223                         RETURN(error);
224         }
225
226         if (ia_valid & ATTR_UID) {
227                 /* chown */
228                 error = -EPERM;
229                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
230                         RETURN(-EPERM);
231                 if (attr->ia_uid == (uid_t) -1)
232                         attr->ia_uid = inode->i_uid;
233                 if (attr->ia_gid == (gid_t) -1)
234                         attr->ia_gid = inode->i_gid;
235                 attr->ia_mode = inode->i_mode;
236                 /*
237                  * If the user or group of a non-directory has been
238                  * changed by a non-root user, remove the setuid bit.
239                  * 19981026 David C Niemi <niemi@tux.org>
240                  *
241                  * Changed this to apply to all users, including root,
242                  * to avoid some races. This is the behavior we had in
243                  * 2.0. The check for non-root was definitely wrong
244                  * for 2.2 anyway, as it should have been using
245                  * CAP_FSETID rather than fsuid -- 19990830 SD.
246                  */
247                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
248                     !S_ISDIR(inode->i_mode)) {
249                         attr->ia_mode &= ~S_ISUID;
250                         attr->ia_valid |= ATTR_MODE;
251                 }
252                 /*
253                  * Likewise, if the user or group of a non-directory
254                  * has been changed by a non-root user, remove the
255                  * setgid bit UNLESS there is no group execute bit
256                  * (this would be a file marked for mandatory
257                  * locking).  19981026 David C Niemi <niemi@tux.org>
258                  *
259                  * Removed the fsuid check (see the comment above) --
260                  * 19990830 SD.
261                  */
262                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
263                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
264                         attr->ia_mode &= ~S_ISGID;
265                         attr->ia_valid |= ATTR_MODE;
266                 }
267         } else if (ia_valid & ATTR_MODE) {
268                 int mode = attr->ia_mode;
269                 /* chmod */
270                 if (attr->ia_mode == (mode_t) -1)
271                         attr->ia_mode = inode->i_mode;
272                 attr->ia_mode =
273                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
274         }
275         RETURN(0);
276 }
277
278 void mds_steal_ack_locks(struct obd_export *exp, struct ptlrpc_request *req)
279 {
280         unsigned long flags;
281         struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
282
283         if (oldrep == NULL)
284                 return;
285         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
286                sizeof req->rq_ack_locks);
287         spin_lock_irqsave(&req->rq_lock, flags);
288         oldrep->rq_resent = 1;
289         wake_up(&oldrep->rq_reply_waitq);
290         spin_unlock_irqrestore(&req->rq_lock, flags);
291         DEBUG_REQ(D_HA, oldrep, "stole locks from");
292         DEBUG_REQ(D_HA, req, "stole locks for");
293 }
294
295 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
296 {
297         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
298                   mcd->mcd_last_transno, mcd->mcd_last_result);
299         req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
300         req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
301
302         if (req->rq_export->exp_outstanding_reply)
303                 mds_steal_ack_locks(req->rq_export, req);
304 }
305
306 static void reconstruct_reint_setattr(struct mds_update_record *rec,
307                                       int offset, struct ptlrpc_request *req)
308 {
309         struct mds_export_data *med = &req->rq_export->exp_mds_data;
310         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
311         struct dentry *de;
312         struct mds_body *body;
313
314         mds_req_from_mcd(req, med->med_mcd);
315
316         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
317         if (IS_ERR(de)) {
318                 LASSERT(PTR_ERR(de) == req->rq_status);
319                 return;
320         }
321
322         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
323         mds_pack_inode2fid(&body->fid1, de->d_inode);
324         mds_pack_inode2body(body, de->d_inode);
325
326         /* Don't return OST-specific attributes if we didn't just set them */
327         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
328                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
329         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
330                 body->valid |= OBD_MD_FLMTIME;
331         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
332                 body->valid |= OBD_MD_FLATIME;
333
334         l_dput(de);
335 }
336
337 /* In the raw-setattr case, we lock the child inode.
338  * In the write-back case or if being called from open, the client holds a lock
339  * already.
340  *
341  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
342 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
343                              struct ptlrpc_request *req,
344                              struct lustre_handle *lh)
345 {
346         struct mds_obd *mds = mds_req2mds(req);
347         struct obd_device *obd = req->rq_export->exp_obd;
348         struct mds_body *body;
349         struct dentry *de;
350         struct inode *inode = NULL;
351         struct lustre_handle lockh;
352         void *handle = NULL;
353         struct mds_logcancel_data *mlcd = NULL;
354         int rc = 0, cleanup_phase = 0, err, locked = 0;
355         ENTRY;
356
357         LASSERT(offset == 0);
358
359         DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x\n", rec->ur_fid1->id,
360                   rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
361
362         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
363
364         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
365                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
366                 if (IS_ERR(de))
367                         GOTO(cleanup, rc = PTR_ERR(de));
368         } else {
369                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
370                                            &lockh, NULL, 0);
371                 if (IS_ERR(de))
372                         GOTO(cleanup, rc = PTR_ERR(de));
373                 locked = 1;
374         }
375
376         cleanup_phase = 1;
377         inode = de->d_inode;
378         LASSERT(inode);
379         if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
380                 down(&inode->i_sem);
381
382         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
383
384         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
385         if (IS_ERR(handle))
386                 GOTO(cleanup, rc = PTR_ERR(handle));
387
388         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
389                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
390                        LTIME_S(rec->ur_iattr.ia_mtime),
391                        LTIME_S(rec->ur_iattr.ia_ctime));
392         rc = mds_fix_attr(inode, rec);
393         if (rc)
394                 GOTO(cleanup, rc);
395
396         if (rec->ur_iattr.ia_valid & ATTR_ATTR_FLAG)    /* ioctl */
397                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_SETFLAGS,
398                                       (long)&rec->ur_iattr.ia_attr_flags);
399         else                                            /* setattr */
400                 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
401
402         if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
403                 rc = fsfilt_set_md(obd, inode, handle,
404                                    rec->ur_eadata, rec->ur_eadatalen);
405         }
406
407         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
408         mds_pack_inode2fid(&body->fid1, inode);
409         mds_pack_inode2body(body, inode);
410
411         /* Don't return OST-specific attributes if we didn't just set them */
412         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
413                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
414         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
415                 body->valid |= OBD_MD_FLMTIME;
416         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
417                 body->valid |= OBD_MD_FLATIME;
418
419         if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
420                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
421                           rec->ur_eadatalen);
422                 if (mlcd) {
423                         mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
424                                 rec->ur_eadatalen;
425                         mlcd->mlcd_eadatalen = rec->ur_eadatalen;
426                         mlcd->mlcd_cookielen = rec->ur_cookielen;
427                         mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
428                                 mlcd->mlcd_cookielen;
429                         memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
430                                mlcd->mlcd_cookielen);
431                         memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
432                                mlcd->mlcd_eadatalen);
433                 } else {
434                         CERROR("unable to allocate log cancel data\n");
435                 }
436         }
437         EXIT;
438  cleanup:
439         if (mlcd != NULL)
440                 fsfilt_add_journal_cb(req->rq_export->exp_obd, 0, handle,
441                                       mds_cancel_cookies_cb, mlcd);
442         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
443         switch (cleanup_phase) {
444         case 1:
445                 if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
446                         down(&inode->i_sem);
447                 l_dput(de);
448                 if (locked) {
449                         if (rc) {
450                                 ldlm_lock_decref(&lockh, LCK_PW);
451                         } else {
452                                 ldlm_put_lock_into_req(req, &lockh, LCK_PW);
453                         }
454                 }
455         case 0:
456                 break;
457         default:
458                 LBUG();
459         }
460         if (err && !rc)
461                 rc = err;
462
463         req->rq_status = rc;
464         return 0;
465 }
466
467 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
468                                      struct ptlrpc_request *req)
469 {
470         struct mds_export_data *med = &req->rq_export->exp_mds_data;
471         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
472         struct dentry *parent, *child;
473         struct mds_body *body;
474
475         mds_req_from_mcd(req, med->med_mcd);
476
477         if (req->rq_status)
478                 return;
479
480         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
481         LASSERT(!IS_ERR(parent));
482         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
483         LASSERT(!IS_ERR(child));
484         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
485         mds_pack_inode2fid(&body->fid1, child->d_inode);
486         mds_pack_inode2body(body, child->d_inode);
487         l_dput(parent);
488         l_dput(child);
489 }
490
491 static int mds_reint_create(struct mds_update_record *rec, int offset,
492                             struct ptlrpc_request *req,
493                             struct lustre_handle *lh)
494 {
495         struct dentry *dparent = NULL;
496         struct mds_obd *mds = mds_req2mds(req);
497         struct obd_device *obd = req->rq_export->exp_obd;
498         struct dentry *dchild = NULL;
499         struct inode *dir = NULL;
500         void *handle = NULL;
501         struct lustre_handle lockh;
502         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
503         int created = 0;
504         struct dentry_params dp;
505         ENTRY;
506
507         LASSERT(offset == 0);
508         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
509
510         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o\n",
511                   rec->ur_fid1->id, rec->ur_fid1->generation,
512                   rec->ur_name, rec->ur_mode);
513
514         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
515
516         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
517                 GOTO(cleanup, rc = -ESTALE);
518
519         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh,
520                                         rec->ur_name, rec->ur_namelen - 1);
521         if (IS_ERR(dparent)) {
522                 rc = PTR_ERR(dparent);
523                 CERROR("parent lookup error %d\n", rc);
524                 GOTO(cleanup, rc);
525         }
526         cleanup_phase = 1; /* locked parent dentry */
527         dir = dparent->d_inode;
528         LASSERT(dir);
529
530         ldlm_lock_dump_handle(D_OTHER, &lockh);
531
532         dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
533         if (IS_ERR(dchild)) {
534                 rc = PTR_ERR(dchild);
535                 CERROR("child lookup error %d\n", rc);
536                 GOTO(cleanup, rc);
537         }
538
539         cleanup_phase = 2; /* child dentry */
540
541         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb);
542
543         if (dir->i_mode & S_ISGID) {
544                 if (S_ISDIR(rec->ur_mode))
545                         rec->ur_mode |= S_ISGID;
546         }
547
548         dchild->d_fsdata = (void *)&dp;
549         dp.p_inum = (unsigned long)rec->ur_fid2->id;
550         dp.p_ptr = req;
551
552         switch (type) {
553         case S_IFREG:{
554                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
555                 if (IS_ERR(handle))
556                         GOTO(cleanup, rc = PTR_ERR(handle));
557                 rc = ll_vfs_create(dir, dchild, rec->ur_mode, NULL);
558                 EXIT;
559                 break;
560         }
561         case S_IFDIR:{
562                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
563                 if (IS_ERR(handle))
564                         GOTO(cleanup, rc = PTR_ERR(handle));
565                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
566                 EXIT;
567                 break;
568         }
569         case S_IFLNK:{
570                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
571                 if (IS_ERR(handle))
572                         GOTO(cleanup, rc = PTR_ERR(handle));
573                 if (rec->ur_tgt == NULL)        /* no target supplied */
574                         rc = -EINVAL;           /* -EPROTO? */
575                 else
576                         rc = vfs_symlink(dir, dchild, rec->ur_tgt);
577                 EXIT;
578                 break;
579         }
580         case S_IFCHR:
581         case S_IFBLK:
582         case S_IFIFO:
583         case S_IFSOCK:{
584                 int rdev = rec->ur_rdev;
585                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
586                 if (IS_ERR(handle))
587                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
588                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
589                 EXIT;
590                 break;
591         }
592         default:
593                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
594                 dchild->d_fsdata = NULL;
595                 GOTO(cleanup, rc = -EINVAL);
596         }
597
598         /* In case we stored the desired inum in here, we want to clean up. */
599         if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
600                 dchild->d_fsdata = NULL;
601
602         if (rc) {
603                 CDEBUG(D_INODE, "error during create: %d\n", rc);
604                 GOTO(cleanup, rc);
605         } else {
606                 struct iattr iattr;
607                 struct inode *inode = dchild->d_inode;
608                 struct mds_body *body;
609
610                 created = 1;
611                 LTIME_S(iattr.ia_atime) = rec->ur_time;
612                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
613                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
614                 iattr.ia_uid = rec->ur_fsuid;
615                 if (dir->i_mode & S_ISGID)
616                         iattr.ia_gid = dir->i_gid;
617                 else
618                         iattr.ia_gid = rec->ur_fsgid;
619                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
620                         ATTR_MTIME | ATTR_CTIME;
621
622                 if (rec->ur_fid2->id) {
623                         LASSERT(rec->ur_fid2->id == inode->i_ino);
624                         inode->i_generation = rec->ur_fid2->generation;
625                         /* Dirtied and committed by the upcoming setattr. */
626                         CDEBUG(D_INODE, "recreated ino %lu with gen %u\n",
627                                inode->i_ino, inode->i_generation);
628                 } else {
629                         struct lustre_handle child_ino_lockh;
630                         struct ldlm_res_id child_res_id =
631                              { .name = { inode->i_ino, 0 } };
632                         int lock_flags = 0;
633
634                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
635                                inode->i_ino, inode->i_generation);
636
637                         /* The inode we were allocated may have just been freed
638                          * by an unlink operation.  We take this lock to
639                          * synchronize against the matching reply-ack-lock taken
640                          * in unlink, to avoid replay problems if this reply
641                          * makes it out to the client but the unlink's does not.
642                          * See bug 2029 for more detail.*/
643                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
644                                               NULL, child_res_id, LDLM_PLAIN,
645                                               NULL, 0, LCK_EX, &lock_flags,
646                                               ldlm_completion_ast,
647                                               mds_blocking_ast, NULL,
648                                               &child_ino_lockh);
649                         if (rc != ELDLM_OK) {
650                                 CERROR("error locking for unlink/create sync: "
651                                        "%d\n", rc);
652                         } else {
653                                 ldlm_lock_decref(&child_ino_lockh, LCK_EX);
654                         }
655                 }
656
657                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
658                 if (rc)
659                         CERROR("error on child setattr: rc = %d\n", rc);
660
661                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
662                 rc = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
663                 if (rc)
664                         CERROR("error on parent setattr: rc = %d\n", rc);
665
666                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
667                 mds_pack_inode2fid(&body->fid1, inode);
668                 mds_pack_inode2body(body, inode);
669         }
670         EXIT;
671
672 cleanup:
673         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
674
675         if (rc && created) {
676                 /* Destroy the file we just created.  This should not need
677                  * extra journal credits, as we have already modified all of
678                  * the blocks needed in order to create the file in the first
679                  * place.
680                  */
681                 switch (type) {
682                 case S_IFDIR:
683                         err = vfs_rmdir(dir, dchild);
684                         if (err)
685                                 CERROR("rmdir in error path: %d\n", err);
686                         break;
687                 default:
688                         err = vfs_unlink(dir, dchild);
689                         if (err)
690                                 CERROR("unlink in error path: %d\n", err);
691                         break;
692                 }
693         } else {
694                 rc = err;
695         }
696         switch (cleanup_phase) {
697         case 2: /* child dentry */
698                 l_dput(dchild);
699         case 1: /* locked parent dentry */
700                 if (rc) {
701                         ldlm_lock_decref(&lockh, LCK_PW);
702                 } else {
703                         ldlm_put_lock_into_req(req, &lockh, LCK_PW);
704                 }
705                 l_dput(dparent);
706         case 0:
707                 break;
708         default:
709                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
710                 LBUG();
711         }
712         req->rq_status = rc;
713         return 0;
714 }
715
716 int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
717 {
718         int i;
719
720         for (i = 0; i < RES_NAME_SIZE; i++) {
721                 /* return 1 here, because enqueue_ordered will skip resources
722                  * of all zeroes if they're sorted to the end of the list. */
723                 if (res1->name[i] == 0 && res2->name[i] != 0)
724                         return 1;
725                 if (res2->name[i] == 0 && res1->name[i] != 0)
726                         return 0;
727
728                 if (res1->name[i] > res2->name[i])
729                         return 1;
730                 if (res1->name[i] < res2->name[i])
731                         return 0;
732         }
733         return 0;
734 }
735
736 /* This function doesn't use ldlm_match_or_enqueue because we're always called
737  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
738  * because they take the place of local semaphores.
739  *
740  * One or two locks are taken in numerical order.  A res_id->name[0] of 0 means
741  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
742 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
743                           struct lustre_handle *p1_lockh, int p1_lock_mode,
744                           struct ldlm_res_id *p2_res_id,
745                           struct lustre_handle *p2_lockh, int p2_lock_mode)
746 {
747         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
748         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
749         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
750         int rc, flags;
751         ENTRY;
752
753         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
754
755         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
756                res_id[0]->name[0], res_id[1]->name[0]);
757
758         if (res_gt(p1_res_id, p2_res_id)) {
759                 handles[1] = p1_lockh;
760                 handles[0] = p2_lockh;
761                 res_id[1] = p1_res_id;
762                 res_id[0] = p2_res_id;
763                 lock_modes[1] = p1_lock_mode;
764                 lock_modes[0] = p2_lock_mode;
765         }
766
767         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
768                res_id[0]->name[0], res_id[1]->name[0]);
769
770         flags = LDLM_FL_LOCAL_ONLY;
771         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0],
772                               LDLM_PLAIN, NULL, 0, lock_modes[0], &flags,
773                               ldlm_completion_ast, mds_blocking_ast, NULL,
774                               handles[0]);
775         if (rc != ELDLM_OK)
776                 RETURN(-EIO);
777         ldlm_lock_dump_handle(D_OTHER, handles[0]);
778
779         if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
780                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
781                 ldlm_lock_addref(handles[1], lock_modes[1]);
782         } else if (res_id[1]->name[0] != 0) {
783                 flags = LDLM_FL_LOCAL_ONLY;
784                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
785                                       *res_id[1], LDLM_PLAIN, NULL, 0,
786                                       lock_modes[1], &flags,ldlm_completion_ast,
787                                       mds_blocking_ast, NULL, handles[1]);
788                 if (rc != ELDLM_OK) {
789                         ldlm_lock_decref(handles[0], lock_modes[0]);
790                         RETURN(-EIO);
791                 }
792                 ldlm_lock_dump_handle(D_OTHER, handles[1]);
793         }
794
795         RETURN(0);
796 }
797
798 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
799                            struct lustre_handle *p1_lockh, int p1_lock_mode,
800                            struct ldlm_res_id *p2_res_id,
801                            struct lustre_handle *p2_lockh, int p2_lock_mode,
802                            struct ldlm_res_id *c1_res_id,
803                            struct lustre_handle *c1_lockh, int c1_lock_mode,
804                            struct ldlm_res_id *c2_res_id,
805                            struct lustre_handle *c2_lockh, int c2_lock_mode)
806 {
807         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
808                                           c1_res_id, c2_res_id };
809         struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
810                                                  c1_lockh, c2_lockh };
811         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
812                               c1_lock_mode, c2_lock_mode };
813         int rc, i, j, sorted, flags;
814         ENTRY;
815
816         CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
817                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
818                res_id[3]->name[0]);
819
820         /* simple insertion sort - we have at most 4 elements */
821         for (i = 1; i < 4; i++) {
822                 j = i - 1;
823                 dlm_handles[4] = dlm_handles[i];
824                 res_id[4] = res_id[i];
825                 lock_modes[4] = lock_modes[i];
826
827                 sorted = 0;
828                 do {
829                         if (res_gt(res_id[j], res_id[4])) {
830                                 dlm_handles[j + 1] = dlm_handles[j];
831                                 res_id[j + 1] = res_id[j];
832                                 lock_modes[j + 1] = lock_modes[j];
833                                 j--;
834                         } else {
835                                 sorted = 1;
836                         }
837                 } while (j >= 0 && !sorted);
838
839                 dlm_handles[j + 1] = dlm_handles[4];
840                 res_id[j + 1] = res_id[4];
841                 lock_modes[j + 1] = lock_modes[4];
842         }
843
844         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
845                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
846                res_id[3]->name[0]);
847
848         /* XXX we could send ASTs on all these locks first before blocking? */
849         for (i = 0; i < 4; i++) {
850                 flags = 0;
851                 if (res_id[i]->name[0] == 0)
852                         break;
853                 if (i != 0 &&
854                     memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
855                         memcpy(dlm_handles[i], dlm_handles[i-1],
856                                sizeof(*(dlm_handles[i])));
857                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
858                 } else {
859                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
860                                               NULL, *res_id[i], LDLM_PLAIN,
861                                               NULL, 0, lock_modes[i], &flags,
862                                               ldlm_completion_ast,
863                                               mds_blocking_ast, NULL,
864                                               dlm_handles[i]);
865                         if (rc != ELDLM_OK)
866                                 GOTO(out_err, rc = -EIO);
867                         ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
868                 }
869         }
870
871         RETURN(0);
872 out_err:
873         while (i-- > 0)
874                 ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
875
876         return rc;
877 }
878
879 /* In the unlikely case that the child changed while we were waiting
880  * on the lock, we need to drop the lock on the old child and either:
881  * - if the child has a lower resource name, then we have to also
882  *   drop the parent lock and regain the locks in the right order
883  * - in the rename case, if the child has a lower resource name than one of
884  *   the other parent/child resources (maxres) we also need to reget the locks
885  * - if the child has a higher resource name (this is the common case)
886  *   we can just get the lock on the new child (still in lock order)
887  *
888  * Returns 0 if the child did not change or if it changed but could be locked.
889  * Returns 1 if the child changed and we need to re-lock (no locks held).
890  * Returns -ve error with a valid dchild (no locks held). */
891 static int mds_verify_child(struct obd_device *obd,
892                             struct ldlm_res_id *parent_res_id,
893                             struct lustre_handle *parent_lockh,
894                             struct dentry *dparent, int parent_mode,
895                             struct ldlm_res_id *child_res_id,
896                             struct lustre_handle *child_lockh,
897                             struct dentry **dchildp, int child_mode,
898                             const char *name, int namelen,
899                             struct ldlm_res_id *maxres)
900 {
901         struct dentry *vchild, *dchild = *dchildp;
902         int rc = 0, cleanup_phase = 2; /* parent, child locks */
903         ENTRY;
904
905         vchild = ll_lookup_one_len(name, dparent, namelen - 1);
906         if (IS_ERR(vchild))
907                 GOTO(cleanup, rc = PTR_ERR(vchild));
908
909         if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
910                    (vchild->d_inode != NULL &&
911                     child_res_id->name[0] == vchild->d_inode->i_ino &&
912                     child_res_id->name[1] == vchild->d_inode->i_generation))) {
913                 if (dchild != NULL)
914                         l_dput(dchild);
915                 *dchildp = vchild;
916
917                 RETURN(0);
918         }
919
920         CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
921                vchild->d_inode, dchild ? dchild->d_inode : 0,
922                vchild->d_inode ? vchild->d_inode->i_ino : 0,
923                child_res_id->name[0]);
924         if (child_res_id->name[0] != 0)
925                 ldlm_lock_decref(child_lockh, child_mode);
926         if (dchild)
927                 l_dput(dchild);
928
929         cleanup_phase = 1; /* parent lock only */
930         *dchildp = dchild = vchild;
931
932         if (dchild->d_inode) {
933                 int flags = 0;
934                 child_res_id->name[0] = dchild->d_inode->i_ino;
935                 child_res_id->name[1] = dchild->d_inode->i_generation;
936
937                 if (res_gt(parent_res_id, child_res_id) ||
938                     res_gt(maxres, child_res_id)) {
939                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
940                                child_res_id->name[0], parent_res_id->name[0],
941                                maxres->name[0]);
942                         GOTO(cleanup, rc = 1);
943                 }
944
945                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
946                                       NULL, *child_res_id, LDLM_PLAIN,
947                                       NULL, 0, child_mode, &flags,
948                                       ldlm_completion_ast, mds_blocking_ast,
949                                       NULL, child_lockh);
950                 if (rc != ELDLM_OK)
951                         GOTO(cleanup, rc = -EIO);
952         } else {
953                 memset(child_res_id, 0, sizeof(*child_res_id));
954         }
955
956         EXIT;
957 cleanup:
958         if (rc) {
959                 switch(cleanup_phase) {
960                 case 2:
961                         if (child_res_id->name[0] != 0)
962                                 ldlm_lock_decref(child_lockh, child_mode);
963                 case 1:
964                         ldlm_lock_decref(parent_lockh, parent_mode);
965                 }
966         }
967         return rc;
968 }
969
970 int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
971                                 struct ll_fid *fid,
972                                 struct lustre_handle *parent_lockh,
973                                 struct dentry **dparentp, int parent_mode,
974                                 char *name, int namelen,
975                                 struct lustre_handle *child_lockh,
976                                 struct dentry **dchildp, int child_mode)
977 {
978         struct ldlm_res_id child_res_id = { .name = {0} };
979         struct ldlm_res_id parent_res_id = { .name = {0} };
980         struct inode *inode;
981         int rc = 0, cleanup_phase = 0;
982         ENTRY;
983
984         /* Step 1: Lookup parent */
985         *dparentp = mds_fid2dentry(mds, fid, NULL);
986         if (IS_ERR(*dparentp))
987                 RETURN(rc = PTR_ERR(*dparentp));
988         LASSERT((*dparentp)->d_inode);
989
990         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
991                (*dparentp)->d_inode->i_ino, name);
992
993         parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
994         parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
995
996         cleanup_phase = 1; /* parent dentry */
997
998         /* Step 2: Lookup child (without lock, to get resource name) */
999         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
1000         if (IS_ERR(*dchildp)) {
1001                 rc = PTR_ERR(*dchildp);
1002                 CDEBUG(D_INODE, "child lookup error %d\n", rc);
1003                 GOTO(cleanup, rc);
1004         }
1005
1006         inode = (*dchildp)->d_inode;
1007         if (inode != NULL)
1008                 inode = igrab(inode);
1009         if (inode == NULL)
1010                 goto retry_locks;
1011
1012         child_res_id.name[0] = inode->i_ino;
1013         child_res_id.name[1] = inode->i_generation;
1014         iput(inode);
1015
1016 retry_locks:
1017         cleanup_phase = 2; /* child dentry */
1018
1019         /* Step 3: Lock parent and child in resource order.  If child doesn't
1020          *         exist, we still have to lock the parent and re-lookup. */
1021         rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
1022                                    &child_res_id, child_lockh, child_mode);
1023         if (rc)
1024                 GOTO(cleanup, rc);
1025
1026         if (!(*dchildp)->d_inode)
1027                 cleanup_phase = 3; /* parent lock */
1028         else
1029                 cleanup_phase = 4; /* child lock */
1030
1031         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
1032         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
1033                               parent_mode, &child_res_id, child_lockh, dchildp,
1034                               child_mode, name, namelen, &parent_res_id);
1035         if (rc > 0)
1036                 goto retry_locks;
1037         if (rc < 0) {
1038                 cleanup_phase = 3;
1039                 GOTO(cleanup, rc);
1040         }
1041
1042 cleanup:
1043         if (rc) {
1044                 switch (cleanup_phase) {
1045                 case 4:
1046                         ldlm_lock_decref(child_lockh, child_mode);
1047                 case 3:
1048                         ldlm_lock_decref(parent_lockh, parent_mode);
1049                 case 2:
1050                         l_dput(*dchildp);
1051                 case 1:
1052                         l_dput(*dparentp);
1053                 default: ;
1054                 }
1055         }
1056         return rc;
1057 }
1058
1059 void mds_reconstruct_generic(struct ptlrpc_request *req)
1060 {
1061         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1062
1063         mds_req_from_mcd(req, med->med_mcd);
1064 }
1065
1066 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
1067                             struct ptlrpc_request *req,
1068                             struct lustre_handle *lh)
1069 {
1070         struct dentry *dparent, *dchild;
1071         struct mds_obd *mds = mds_req2mds(req);
1072         struct obd_device *obd = req->rq_export->exp_obd;
1073         struct mds_body *body = NULL;
1074         struct inode *child_inode;
1075         struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh;
1076         void *handle = NULL;
1077         int rc = 0, log_unlink = 0, cleanup_phase = 0;
1078         ENTRY;
1079
1080         LASSERT(offset == 0 || offset == 2);
1081
1082         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
1083                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
1084
1085         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1086
1087         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1088                 GOTO(cleanup, rc = -ENOENT);
1089
1090         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
1091                                          &parent_lockh, &dparent, LCK_PW,
1092                                          rec->ur_name, rec->ur_namelen,
1093                                          &child_lockh, &dchild, LCK_EX);
1094         if (rc)
1095                 GOTO(cleanup, rc);
1096
1097         cleanup_phase = 1; /* dchild, dparent, locks */
1098
1099         child_inode = dchild->d_inode;
1100         if (child_inode == NULL) {
1101                 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
1102                        dparent->d_inode->i_ino, rec->ur_name);
1103                 GOTO(cleanup, rc = -ENOENT);
1104         }
1105
1106         cleanup_phase = 2; /* dchild has a lock */
1107
1108         /* Step 4: Get a lock on the ino to sync with creation WRT inode
1109          * reuse (see bug 2029). */
1110         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
1111         if (rc != ELDLM_OK)
1112                 GOTO(cleanup, rc);
1113
1114         cleanup_phase = 3; /* child inum lock */
1115
1116         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
1117
1118         /* ldlm_reply in buf[0] if called via intent */
1119         if (offset)
1120                 offset = 1;
1121
1122         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
1123         LASSERT(body != NULL);
1124
1125         /* If this is the last reference to this inode, get the OBD EA
1126          * data first so the client can destroy OST objects.
1127          * we only do the object removal if no open files remain.
1128          * Nobody can get at this name anymore because of the locks so
1129          * we make decisions here as to whether to remove the inode */
1130         if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
1131             mds_open_orphan_count(child_inode) == 0) {
1132                 mds_pack_inode2fid(&body->fid1, child_inode);
1133                 mds_pack_inode2body(body, child_inode);
1134                 mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
1135                             child_inode, 1);
1136                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1137                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1138                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1139                 } else {
1140                         log_unlink = 1;
1141                 }
1142         }
1143
1144         /* We have to do these checks ourselves, in case we are making an
1145          * orphan.  The client tells us whether rmdir() or unlink() was called,
1146          * so we need to return appropriate errors (bug 72).
1147          *
1148          * We don't have to check permissions, because vfs_rename (called from
1149          * mds_open_unlink_rename) also calls may_delete. */
1150         if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
1151                 if (!S_ISDIR(child_inode->i_mode))
1152                         GOTO(cleanup, rc = -ENOTDIR);
1153         } else {
1154                 if (S_ISDIR(child_inode->i_mode))
1155                         GOTO(cleanup, rc = -EISDIR);
1156         }
1157
1158         if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
1159             mds_open_orphan_count(child_inode) > 0) {
1160                 rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
1161                 cleanup_phase = 4; /* transaction */
1162                 GOTO(cleanup, rc);
1163         }
1164
1165         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
1166         switch (child_inode->i_mode & S_IFMT) {
1167         case S_IFDIR:
1168                 /* Drop any lingering child directories before we start our
1169                  * transaction, to avoid doing multiple inode dirty/delete
1170                  * in our compound transaction (bug 1321). */
1171                 shrink_dcache_parent(dchild);
1172                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
1173                                       NULL);
1174                 if (IS_ERR(handle))
1175                         GOTO(cleanup, rc = PTR_ERR(handle));
1176                 cleanup_phase = 4; /* transaction */
1177                 rc = vfs_rmdir(dparent->d_inode, dchild);
1178                 break;
1179         case S_IFREG: {
1180                 handle = fsfilt_start(obd, dparent->d_inode,
1181                                       FSFILT_OP_UNLINK_LOG, NULL);
1182                 if (IS_ERR(handle))
1183                         GOTO(cleanup, rc = PTR_ERR(handle));
1184
1185                 cleanup_phase = 4; /* transaction */
1186                 rc = vfs_unlink(dparent->d_inode, dchild);
1187
1188 #ifdef ENABLE_ORPHANS
1189
1190                 if (!rc && log_unlink)
1191                         if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg,
1192                                               offset + 1) > 0)
1193                                 body->valid |= OBD_MD_FLCOOKIE;
1194 #endif
1195                 break;
1196         }
1197         case S_IFLNK:
1198         case S_IFCHR:
1199         case S_IFBLK:
1200         case S_IFIFO:
1201         case S_IFSOCK:
1202                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1203                                       NULL);
1204                 if (IS_ERR(handle))
1205                         GOTO(cleanup, rc = PTR_ERR(handle));
1206                 cleanup_phase = 4; /* transaction */
1207                 rc = vfs_unlink(dparent->d_inode, dchild);
1208                 break;
1209         default:
1210                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1211                        rec->ur_name);
1212                 LBUG();
1213                 GOTO(cleanup, rc = -EINVAL);
1214         }
1215
1216  cleanup:
1217         if (rc == 0) {
1218                 struct iattr iattr;
1219                 int err;
1220
1221                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
1222                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
1223                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
1224
1225                 err = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
1226                 if (err)
1227                         CERROR("error on parent setattr: rc = %d\n", err);
1228         }
1229
1230         switch(cleanup_phase) {
1231         case 4:
1232                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1233                                         rc, 0);
1234                 if (!rc)
1235                         (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
1236                                            "unlinked", 0, NULL);
1237         case 3: /* child ino-reuse lock */
1238                 if (rc && body != NULL) {
1239                         // Don't unlink the OST objects if the MDS unlink failed
1240                         body->valid = 0;
1241                 }
1242                 if (rc)
1243                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
1244                 else
1245                         ldlm_put_lock_into_req(req, &child_reuse_lockh, LCK_EX);
1246         case 2: /* child lock */
1247                 ldlm_lock_decref(&child_lockh, LCK_EX);
1248         case 1: /* child and parent dentry, parent lock */
1249                 if (rc)
1250                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1251                 else
1252                         ldlm_put_lock_into_req(req, &parent_lockh, LCK_PW);
1253                 l_dput(dchild);
1254                 l_dput(dparent);
1255         case 0:
1256                 break;
1257         default:
1258                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1259                 LBUG();
1260         }
1261         req->rq_status = rc;
1262         return 0;
1263 }
1264
1265 static int mds_reint_link(struct mds_update_record *rec, int offset,
1266                           struct ptlrpc_request *req,
1267                           struct lustre_handle *lh)
1268 {
1269         struct obd_device *obd = req->rq_export->exp_obd;
1270         struct dentry *de_src = NULL;
1271         struct dentry *de_tgt_dir = NULL;
1272         struct dentry *dchild = NULL;
1273         struct mds_obd *mds = mds_req2mds(req);
1274         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
1275         struct ldlm_res_id src_res_id = { .name = {0} };
1276         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
1277         int rc = 0, cleanup_phase = 0;
1278         ENTRY;
1279
1280         LASSERT(offset == 0);
1281
1282         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
1283                   rec->ur_fid1->id, rec->ur_fid1->generation,
1284                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
1285
1286         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1287
1288         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1289                 GOTO(cleanup, rc = -ENOENT);
1290
1291         /* Step 1: Lookup the source inode and target directory by FID */
1292         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1293         if (IS_ERR(de_src))
1294                 GOTO(cleanup, rc = PTR_ERR(de_src));
1295
1296         cleanup_phase = 1; /* source dentry */
1297
1298         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1299         if (IS_ERR(de_tgt_dir))
1300                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
1301
1302         cleanup_phase = 2; /* target directory dentry */
1303
1304         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
1305                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
1306                de_src->d_inode->i_ino);
1307
1308         /* Step 2: Take the two locks */
1309         src_res_id.name[0] = de_src->d_inode->i_ino;
1310         src_res_id.name[1] = de_src->d_inode->i_generation;
1311         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
1312         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
1313
1314         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
1315                                    &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
1316         if (rc)
1317                 GOTO(cleanup, rc);
1318
1319         cleanup_phase = 3; /* locks */
1320
1321         /* Step 3: Lookup the child */
1322         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
1323         if (IS_ERR(dchild)) {
1324                 rc = PTR_ERR(dchild);
1325                 if (rc != -EPERM && rc != -EACCES)
1326                         CERROR("child lookup error %d\n", rc);
1327                 GOTO(cleanup, rc);
1328         }
1329
1330         cleanup_phase = 4; /* child dentry */
1331
1332         if (dchild->d_inode) {
1333                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
1334                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
1335                 rc = -EEXIST;
1336                 GOTO(cleanup, rc);
1337         }
1338
1339         /* Step 4: Do it. */
1340         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_src->d_inode->i_sb);
1341
1342         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
1343         if (IS_ERR(handle)) {
1344                 rc = PTR_ERR(handle);
1345                 GOTO(cleanup, rc);
1346         }
1347
1348         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
1349         if (rc && rc != -EPERM && rc != -EACCES)
1350                 CERROR("vfs_link error %d\n", rc);
1351 cleanup:
1352         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
1353                                 handle, req, rc, 0);
1354         EXIT;
1355
1356         switch (cleanup_phase) {
1357         case 4: /* child dentry */
1358                 l_dput(dchild);
1359         case 3: /* locks */
1360                 if (rc) {
1361                         ldlm_lock_decref(&src_lockh, LCK_EX);
1362                         ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
1363                 } else {
1364                         ldlm_put_lock_into_req(req, &src_lockh, LCK_EX);
1365                         ldlm_put_lock_into_req(req, &tgt_dir_lockh, LCK_EX);
1366                 }
1367         case 2: /* target dentry */
1368                 l_dput(de_tgt_dir);
1369         case 1: /* source dentry */
1370                 l_dput(de_src);
1371         case 0:
1372                 break;
1373         default:
1374                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1375                 LBUG();
1376         }
1377         req->rq_status = rc;
1378         return 0;
1379 }
1380
1381 /*
1382  * add a hard link in the PENDING directory, only used by rename()
1383  */
1384 static int mds_add_link_orphan(struct mds_update_record *rec,
1385                                struct obd_device *obd,
1386                                struct dentry *dentry)
1387 {
1388         struct mds_obd *mds = &obd->u.mds;
1389         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
1390         struct dentry *pending_child;
1391         char fidname[LL_FID_NAMELEN];
1392         int fidlen = 0, rc;
1393         ENTRY;
1394
1395         LASSERT(dentry->d_inode);
1396         LASSERT(!mds_inode_is_orphan(dentry->d_inode));
1397
1398         down(&pending_dir->i_sem);
1399         fidlen = ll_fid2str(fidname, dentry->d_inode->i_ino,
1400                             dentry->d_inode->i_generation);
1401
1402         CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
1403                mds_open_orphan_count(dentry->d_inode),
1404                rec->ur_name, fidname);
1405
1406         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
1407         if (IS_ERR(pending_child))
1408                 GOTO(out_lock, rc = PTR_ERR(pending_child));
1409
1410         if (pending_child->d_inode != NULL) {
1411                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
1412                 LASSERT(pending_child->d_inode == dentry->d_inode);
1413                 GOTO(out_dput, rc = 0);
1414         }
1415
1416         lock_kernel();
1417         rc = vfs_link(dentry, pending_dir, pending_child);
1418         unlock_kernel();
1419         if (rc)
1420                 CERROR("error addlink orphan %s to PENDING: rc = %d\n",
1421                        rec->ur_name, rc);
1422         else
1423                 mds_inode_set_orphan(dentry->d_inode);
1424 out_dput:
1425         l_dput(pending_child);
1426 out_lock:
1427         up(&pending_dir->i_sem);
1428         RETURN(rc);
1429 }
1430
1431 /* The idea here is that we need to get four locks in the end:
1432  * one on each parent directory, one on each child.  We need to take
1433  * these locks in some kind of order (to avoid deadlocks), and the order
1434  * I selected is "increasing resource number" order.  We need to look up
1435  * the children, however, before we know what the resource number(s) are.
1436  * Thus the following plan:
1437  *
1438  * 1,2. Look up the parents
1439  * 3,4. Look up the children
1440  * 5. Take locks on the parents and children, in order
1441  * 6. Verify that the children haven't changed since they were looked up
1442  *
1443  * If there was a race and the children changed since they were first looked
1444  * up, it is possible that mds_verify_child() will be able to just grab the
1445  * lock on the new child resource (if it has a higher resource than any other)
1446  * but we need to compare against not only its parent, but also against the
1447  * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
1448  *
1449  * We need the fancy igrab() on the child inodes because we aren't holding a
1450  * lock on the parent after the lookup is done, so dentry->d_inode may change
1451  * at any time, and igrab() itself doesn't like getting passed a NULL argument.
1452  */
1453 static int mds_get_parents_children_locked(struct obd_device *obd,
1454                                            struct mds_obd *mds,
1455                                            struct ll_fid *p1_fid,
1456                                            struct dentry **de_srcdirp,
1457                                            struct ll_fid *p2_fid,
1458                                            struct dentry **de_tgtdirp,
1459                                            int parent_mode,
1460                                            const char *old_name, int old_len,
1461                                            struct dentry **de_oldp,
1462                                            const char *new_name, int new_len,
1463                                            struct dentry **de_newp,
1464                                            struct lustre_handle *dlm_handles,
1465                                            int child_mode)
1466 {
1467         struct ldlm_res_id p1_res_id = { .name = {0} };
1468         struct ldlm_res_id p2_res_id = { .name = {0} };
1469         struct ldlm_res_id c1_res_id = { .name = {0} };
1470         struct ldlm_res_id c2_res_id = { .name = {0} };
1471         struct ldlm_res_id *maxres_src, *maxres_tgt;
1472         struct inode *inode;
1473         int rc = 0, cleanup_phase = 0;
1474         ENTRY;
1475
1476         /* Step 1: Lookup the source directory */
1477         *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
1478         if (IS_ERR(*de_srcdirp))
1479                 GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
1480
1481         cleanup_phase = 1; /* source directory dentry */
1482
1483         p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
1484         p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
1485
1486         /* Step 2: Lookup the target directory */
1487         if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
1488                 *de_tgtdirp = dget(*de_srcdirp);
1489         } else {
1490                 *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
1491                 if (IS_ERR(*de_tgtdirp))
1492                         GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
1493         }
1494
1495         cleanup_phase = 2; /* target directory dentry */
1496
1497         p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
1498         p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
1499
1500         /* Step 3: Lookup the source child entry */
1501         *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
1502         if (IS_ERR(*de_oldp)) {
1503                 rc = PTR_ERR(*de_oldp);
1504                 CERROR("old child lookup error (%*s): %d\n",
1505                        old_len - 1, old_name, rc);
1506                 GOTO(cleanup, rc);
1507         }
1508
1509         cleanup_phase = 3; /* original name dentry */
1510
1511         inode = (*de_oldp)->d_inode;
1512         if (inode != NULL)
1513                 inode = igrab(inode);
1514         if (inode == NULL)
1515                 GOTO(cleanup, rc = -ENOENT);
1516
1517         c1_res_id.name[0] = inode->i_ino;
1518         c1_res_id.name[1] = inode->i_generation;
1519         iput(inode);
1520
1521         /* Step 4: Lookup the target child entry */
1522         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
1523         if (IS_ERR(*de_newp)) {
1524                 rc = PTR_ERR(*de_newp);
1525                 CERROR("new child lookup error (%*s): %d\n",
1526                        old_len - 1, old_name, rc);
1527                 GOTO(cleanup, rc);
1528         }
1529
1530         cleanup_phase = 4; /* target dentry */
1531
1532         inode = (*de_newp)->d_inode;
1533         if (inode != NULL)
1534                 inode = igrab(inode);
1535         if (inode == NULL)
1536                 goto retry_locks;
1537
1538         c2_res_id.name[0] = inode->i_ino;
1539         c2_res_id.name[1] = inode->i_generation;
1540         iput(inode);
1541
1542 retry_locks:
1543         /* Step 5: Take locks on the parents and child(ren) */
1544         maxres_src = &p1_res_id;
1545         maxres_tgt = &p2_res_id;
1546         cleanup_phase = 4; /* target dentry */
1547
1548         if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
1549                 maxres_src = &c1_res_id;
1550         if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
1551                 maxres_tgt = &c2_res_id;
1552
1553         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
1554                                     &p2_res_id, &dlm_handles[1], parent_mode,
1555                                     &c1_res_id, &dlm_handles[2], child_mode,
1556                                     &c2_res_id, &dlm_handles[3], child_mode);
1557         if (rc)
1558                 GOTO(cleanup, rc);
1559
1560         cleanup_phase = 6; /* parent and child(ren) locks */
1561
1562         /* Step 6a: Re-lookup source child to verify it hasn't changed */
1563         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
1564                               parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
1565                               child_mode, old_name, old_len, maxres_tgt);
1566         if (rc) {
1567                 if (c2_res_id.name[0] != 0)
1568                         ldlm_lock_decref(&dlm_handles[3], child_mode);
1569                 ldlm_lock_decref(&dlm_handles[1], parent_mode);
1570                 cleanup_phase = 4;
1571                 if (rc > 0)
1572                         goto retry_locks;
1573                 GOTO(cleanup, rc);
1574         }
1575
1576         if ((*de_oldp)->d_inode == NULL)
1577                 GOTO(cleanup, rc = -ENOENT);
1578
1579         /* Step 6b: Re-lookup target child to verify it hasn't changed */
1580         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
1581                               parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
1582                               child_mode, new_name, new_len, maxres_src);
1583         if (rc) {
1584                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1585                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
1586                 cleanup_phase = 4;
1587                 if (rc > 0)
1588                         goto retry_locks;
1589                 GOTO(cleanup, rc);
1590         }
1591
1592         EXIT;
1593 cleanup:
1594         if (rc) {
1595                 switch (cleanup_phase) {
1596                 case 6: /* child lock(s) */
1597                         if (c2_res_id.name[0] != 0)
1598                                 ldlm_lock_decref(&dlm_handles[3], child_mode);
1599                         if (c1_res_id.name[0] != 0)
1600                                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1601                 case 5: /* parent locks */
1602                         ldlm_lock_decref(&dlm_handles[1], parent_mode);
1603                         ldlm_lock_decref(&dlm_handles[0], parent_mode);
1604                 case 4: /* target dentry */
1605                         l_dput(*de_newp);
1606                 case 3: /* source dentry */
1607                         l_dput(*de_oldp);
1608                 case 2: /* target directory dentry */
1609                         l_dput(*de_tgtdirp);
1610                 case 1: /* source directry dentry */
1611                         l_dput(*de_srcdirp);
1612                 }
1613         }
1614
1615         return rc;
1616 }
1617
1618 static int mds_reint_rename(struct mds_update_record *rec, int offset,
1619                             struct ptlrpc_request *req,
1620                             struct lustre_handle *lockh)
1621 {
1622         struct obd_device *obd = req->rq_export->exp_obd;
1623         struct dentry *de_srcdir = NULL;
1624         struct dentry *de_tgtdir = NULL;
1625         struct dentry *de_old = NULL;
1626         struct dentry *de_new = NULL;
1627         struct mds_obd *mds = mds_req2mds(req);
1628         struct lustre_handle dlm_handles[4];
1629         struct mds_body *body = NULL;
1630         int rc = 0, lock_count = 3;
1631         int cleanup_phase = 0;
1632         void *handle = NULL;
1633         ENTRY;
1634
1635         LASSERT(offset == 0);
1636
1637         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
1638                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
1639                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
1640
1641         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1642
1643         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
1644                                              rec->ur_fid2, &de_tgtdir, LCK_PW,
1645                                              rec->ur_name, rec->ur_namelen,
1646                                              &de_old, rec->ur_tgt,
1647                                              rec->ur_tgtlen, &de_new,
1648                                              dlm_handles, LCK_EX);
1649         if (rc)
1650                 GOTO(cleanup, rc);
1651
1652         cleanup_phase = 1; /* parent(s), children, locks */
1653
1654         if (de_new->d_inode)
1655                 lock_count = 4;
1656
1657         /* sanity check for src inode */
1658         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1659             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1660                 GOTO(cleanup, rc = -EINVAL);
1661
1662         /* sanity check for dest inode */
1663         if (de_new->d_inode &&
1664             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1665              de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1666                 GOTO(cleanup, rc = -EINVAL);
1667
1668         /* if we are about to remove the target at first, pass the EA of
1669          * that inode to client to perform and cleanup on OST */
1670         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
1671         LASSERT(body != NULL);
1672
1673         if (de_new->d_inode &&
1674             S_ISREG(de_new->d_inode->i_mode) &&
1675             de_new->d_inode->i_nlink == 1 &&
1676             mds_open_orphan_count(de_new->d_inode) == 0) {
1677                 mds_pack_inode2fid(&body->fid1, de_new->d_inode);
1678                 mds_pack_inode2body(body, de_new->d_inode);
1679                 mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
1680                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1681                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1682                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1683                 } else {
1684                         /* XXX need log unlink? */
1685                 }
1686         }
1687
1688         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1689                        de_srcdir->d_inode->i_sb);
1690
1691         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
1692         if (IS_ERR(handle))
1693                 GOTO(cleanup, rc = PTR_ERR(handle));
1694
1695         /* FIXME need adjust the journal block count? */
1696         /* if the target should be moved to PENDING, we at first increase the
1697          * link and later vfs_rename() will decrease the link count again */
1698         if (de_new->d_inode &&
1699             S_ISREG(de_new->d_inode->i_mode) &&
1700             de_new->d_inode->i_nlink == 1 &&
1701             mds_open_orphan_count(de_new->d_inode) > 0) {
1702                 rc = mds_add_link_orphan(rec, obd, de_new);
1703                 if (rc)
1704                         GOTO(cleanup, rc);
1705         }
1706
1707         lock_kernel();
1708         de_old->d_fsdata = req;
1709         de_new->d_fsdata = req;
1710         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
1711         unlock_kernel();
1712
1713         GOTO(cleanup, rc);
1714 cleanup:
1715         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1716                                 handle, req, rc, 0);
1717         switch (cleanup_phase) {
1718         case 1:
1719                 if (rc) {
1720                         if (lock_count == 4)
1721                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1722                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1723                         ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
1724                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
1725                 } else {
1726                         if (lock_count == 4)
1727                                 ldlm_put_lock_into_req(req,
1728                                                     &(dlm_handles[3]), LCK_EX);
1729                         ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
1730                         ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_PW);
1731                         ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_PW);
1732                 }
1733                 l_dput(de_new);
1734                 l_dput(de_old);
1735                 l_dput(de_tgtdir);
1736                 l_dput(de_srcdir);
1737         case 0:
1738                 break;
1739         default:
1740                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1741                 LBUG();
1742         }
1743         req->rq_status = rc;
1744         return 0;
1745 }
1746
1747 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1748                            struct ptlrpc_request *, struct lustre_handle *);
1749
1750 static mds_reinter reinters[REINT_MAX + 1] = {
1751         [REINT_SETATTR] mds_reint_setattr,
1752         [REINT_CREATE] mds_reint_create,
1753         [REINT_UNLINK] mds_reint_unlink,
1754         [REINT_LINK] mds_reint_link,
1755         [REINT_RENAME] mds_reint_rename,
1756         [REINT_OPEN] mds_open
1757 };
1758
1759 int mds_reint_rec(struct mds_update_record *rec, int offset,
1760                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1761 {
1762         struct obd_device *obd = req->rq_export->exp_obd;
1763         struct obd_run_ctxt saved;
1764         int rc;
1765         ENTRY;
1766
1767         /* checked by unpacker */
1768         LASSERT(rec->ur_opcode <= REINT_MAX &&
1769                 reinters[rec->ur_opcode] != NULL);
1770
1771         push_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1772         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1773         pop_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1774
1775         RETURN(rc);
1776 }