Whamcloud - gitweb
b44dc2210e724838a0b76a7f49f792180eb492d0
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/fs.h>
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include <linux/obd.h>
37 #include <linux/lustre_lib.h>
38 #include <linux/lustre_idl.h>
39 #include <linux/lustre_mds.h>
40 #include <linux/lustre_dlm.h>
41 #include <linux/lustre_fsfilt.h>
42
43 #include "mds_internal.h"
44
45 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
46                    int error)
47 {
48         obd_transno_commit_cb(obd, transno, error);
49 }
50
51 struct mds_logcancel_data {
52         struct lov_mds_md      *mlcd_lmm;
53         int                     mlcd_size;
54         int                     mlcd_cookielen;
55         int                     mlcd_eadatalen;
56         struct llog_cookie      mlcd_cookies[0];
57 };
58
59
60 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
61                                   void *cb_data, int error)
62 {
63         struct mds_logcancel_data *mlcd = cb_data;
64         struct lov_stripe_md *lsm = NULL;
65         struct llog_ctxt *ctxt;
66         int rc;
67
68         obd_transno_commit_cb(obd, transno, error);
69
70         CDEBUG(D_HA, "cancelling %d cookies\n",
71                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
72
73         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, mlcd->mlcd_lmm,
74                           mlcd->mlcd_eadatalen);
75         if (rc < 0) {
76                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
77                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
78                        rc);
79         } else {
80                 ///* XXX 0 normally, SENDNOW for debug */);
81                 ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
82                 rc = llog_cancel(ctxt, lsm,
83                                          mlcd->mlcd_cookielen /
84                                          sizeof(*mlcd->mlcd_cookies),
85                                          mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
86                 if (rc)
87                         CERROR("error cancelling %d log cookies: rc %d\n",
88                                (int)(mlcd->mlcd_cookielen /
89                                      sizeof(*mlcd->mlcd_cookies)), rc);
90         }
91
92         OBD_FREE(mlcd, mlcd->mlcd_size);
93 }
94
95 /* Assumes caller has already pushed us into the kernel context. */
96 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
97                        struct ptlrpc_request *req, int rc, __u32 op_data)
98 {
99         struct mds_export_data *med = &req->rq_export->exp_mds_data;
100         struct mds_client_data *mcd = med->med_mcd;
101         struct obd_device *obd = req->rq_export->exp_obd;
102         int err;
103         __u64 transno;
104         loff_t off;
105         int log_pri = D_HA;
106         ENTRY;
107
108         /* if the export has already been failed, we have no last_rcvd slot */
109         if (req->rq_export->exp_failed) {
110                 CERROR("committing transaction for disconnected client\n");
111                 if (handle)
112                         GOTO(commit, rc);
113                 RETURN(rc);
114         }
115
116         if (IS_ERR(handle))
117                 RETURN(rc);
118
119         if (handle == NULL) {
120                 /* if we're starting our own xaction, use our own inode */
121                 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
122                 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
123                 if (IS_ERR(handle)) {
124                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
125                         RETURN(PTR_ERR(handle));
126                 }
127         }
128
129         off = med->med_off;
130
131         transno = req->rq_reqmsg->transno;
132         if (transno == 0) {
133                 spin_lock(&mds->mds_transno_lock);
134                 transno = ++mds->mds_last_transno;
135                 spin_unlock(&mds->mds_transno_lock);
136         } else {
137                 spin_lock(&mds->mds_transno_lock);
138                 if (transno > mds->mds_last_transno)
139                         mds->mds_last_transno = transno;
140                 spin_unlock(&mds->mds_transno_lock);
141         }
142         req->rq_repmsg->transno = req->rq_transno = transno;
143         mcd->mcd_last_transno = cpu_to_le64(transno);
144         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
145         mcd->mcd_last_result = cpu_to_le32(rc);
146         mcd->mcd_last_data = cpu_to_le32(op_data);
147
148         fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
149                               mds_commit_cb, NULL);
150         err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
151                                   &off, 0);
152
153         if (err) {
154                 log_pri = D_ERROR;
155                 if (rc == 0)
156                         rc = err;
157         }
158
159         DEBUG_REQ(log_pri, req,
160                   "wrote trans #"LPU64" client %s at idx %u: err = %d",
161                   transno, mcd->mcd_uuid, med->med_idx, err);
162
163         err = mds_lov_write_objids(obd);
164         if (err) {
165                 log_pri = D_ERROR;
166                 if (rc == 0)
167                         rc = err;
168         }
169         CDEBUG(log_pri, "wrote objids: err = %d\n", err);
170
171 commit:
172         err = fsfilt_commit(obd, inode, handle, 0);
173         if (err) {
174                 CERROR("error committing transaction: %d\n", err);
175                 if (!rc)
176                         rc = err;
177         }
178
179         RETURN(rc);
180 }
181
182 /* this gives the same functionality as the code between
183  * sys_chmod and inode_setattr
184  * chown_common and inode_setattr
185  * utimes and inode_setattr
186  */
187 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
188 {
189         time_t now = LTIME_S(CURRENT_TIME);
190         struct iattr *attr = &rec->ur_iattr;
191         unsigned int ia_valid = attr->ia_valid;
192         int error;
193         ENTRY;
194
195         /* only fix up attrs if the client VFS didn't already */
196         if (!(ia_valid & ATTR_RAW))
197                 RETURN(0);
198
199         if (!(ia_valid & ATTR_CTIME_SET))
200                 LTIME_S(attr->ia_ctime) = now;
201         if (!(ia_valid & ATTR_ATIME_SET))
202                 LTIME_S(attr->ia_atime) = now;
203         if (!(ia_valid & ATTR_MTIME_SET))
204                 LTIME_S(attr->ia_mtime) = now;
205
206         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
207                 RETURN(-EPERM);
208
209         /* times */
210         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
211                 if (rec->ur_fsuid != inode->i_uid &&
212                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
213                         RETURN(error);
214         }
215
216         if (ia_valid & ATTR_SIZE) {
217                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
218                         RETURN(error);
219         }
220
221         if (ia_valid & ATTR_UID) {
222                 /* chown */
223                 error = -EPERM;
224                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
225                         RETURN(-EPERM);
226                 if (attr->ia_uid == (uid_t) -1)
227                         attr->ia_uid = inode->i_uid;
228                 if (attr->ia_gid == (gid_t) -1)
229                         attr->ia_gid = inode->i_gid;
230                 attr->ia_mode = inode->i_mode;
231                 /*
232                  * If the user or group of a non-directory has been
233                  * changed by a non-root user, remove the setuid bit.
234                  * 19981026 David C Niemi <niemi@tux.org>
235                  *
236                  * Changed this to apply to all users, including root,
237                  * to avoid some races. This is the behavior we had in
238                  * 2.0. The check for non-root was definitely wrong
239                  * for 2.2 anyway, as it should have been using
240                  * CAP_FSETID rather than fsuid -- 19990830 SD.
241                  */
242                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
243                     !S_ISDIR(inode->i_mode)) {
244                         attr->ia_mode &= ~S_ISUID;
245                         attr->ia_valid |= ATTR_MODE;
246                 }
247                 /*
248                  * Likewise, if the user or group of a non-directory
249                  * has been changed by a non-root user, remove the
250                  * setgid bit UNLESS there is no group execute bit
251                  * (this would be a file marked for mandatory
252                  * locking).  19981026 David C Niemi <niemi@tux.org>
253                  *
254                  * Removed the fsuid check (see the comment above) --
255                  * 19990830 SD.
256                  */
257                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
258                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
259                         attr->ia_mode &= ~S_ISGID;
260                         attr->ia_valid |= ATTR_MODE;
261                 }
262         } else if (ia_valid & ATTR_MODE) {
263                 int mode = attr->ia_mode;
264                 /* chmod */
265                 if (attr->ia_mode == (mode_t) -1)
266                         attr->ia_mode = inode->i_mode;
267                 attr->ia_mode =
268                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
269         }
270         RETURN(0);
271 }
272
273 void mds_steal_ack_locks(struct ptlrpc_request *req)
274 {
275         struct obd_export         *exp = req->rq_export;
276         struct list_head          *tmp;
277         struct ptlrpc_reply_state *oldrep;
278         struct ptlrpc_service     *svc;
279         unsigned long              flags;
280         int                        i;
281
282         /* CAVEAT EMPTOR: spinlock order */
283         spin_lock_irqsave (&exp->exp_lock, flags);
284         list_for_each (tmp, &exp->exp_outstanding_replies) {
285                 oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
286
287                 if (oldrep->rs_xid != req->rq_xid)
288                         continue;
289
290                 if (oldrep->rs_msg.opc != req->rq_reqmsg->opc)
291                         CERROR ("Resent req xid "LPX64" has mismatched opc: "
292                                 "new %d old %d\n", req->rq_xid,
293                                 req->rq_reqmsg->opc, oldrep->rs_msg.opc);
294
295                 svc = oldrep->rs_srv_ni->sni_service;
296                 spin_lock (&svc->srv_lock);
297
298                 list_del_init (&oldrep->rs_exp_list);
299
300                 CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
301                       " o%d NID"LPX64"\n",
302                       oldrep->rs_nlocks, oldrep, 
303                       oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
304                       exp->exp_connection->c_peer.peer_nid);
305
306                 for (i = 0; i < oldrep->rs_nlocks; i++)
307                         ptlrpc_save_lock(req, 
308                                          &oldrep->rs_locks[i],
309                                          oldrep->rs_modes[i]);
310                 oldrep->rs_nlocks = 0;
311
312                 DEBUG_REQ(D_HA, req, "stole locks for");
313                 ptlrpc_schedule_difficult_reply (oldrep);
314
315                 spin_unlock (&svc->srv_lock);
316                 spin_unlock_irqrestore (&exp->exp_lock, flags);
317                 return;
318         }
319         spin_unlock_irqrestore (&exp->exp_lock, flags);
320 }
321
322 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
323 {
324         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
325                   mcd->mcd_last_transno, mcd->mcd_last_result);
326         req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
327         req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
328
329         mds_steal_ack_locks(req);
330 }
331
332 static void reconstruct_reint_setattr(struct mds_update_record *rec,
333                                       int offset, struct ptlrpc_request *req)
334 {
335         struct mds_export_data *med = &req->rq_export->exp_mds_data;
336         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
337         struct dentry *de;
338         struct mds_body *body;
339
340         mds_req_from_mcd(req, med->med_mcd);
341
342         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
343         if (IS_ERR(de)) {
344                 LASSERT(PTR_ERR(de) == req->rq_status);
345                 return;
346         }
347
348         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
349         mds_pack_inode2fid(&body->fid1, de->d_inode);
350         mds_pack_inode2body(body, de->d_inode);
351
352         /* Don't return OST-specific attributes if we didn't just set them */
353         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
354                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
355         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
356                 body->valid |= OBD_MD_FLMTIME;
357         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
358                 body->valid |= OBD_MD_FLATIME;
359
360         l_dput(de);
361 }
362
363 /* In the raw-setattr case, we lock the child inode.
364  * In the write-back case or if being called from open, the client holds a lock
365  * already.
366  *
367  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
368 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
369                              struct ptlrpc_request *req,
370                              struct lustre_handle *lh)
371 {
372         struct mds_obd *mds = mds_req2mds(req);
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct mds_body *body;
375         struct dentry *de;
376         struct inode *inode = NULL;
377         struct lustre_handle lockh;
378         void *handle = NULL;
379         struct mds_logcancel_data *mlcd = NULL;
380         int rc = 0, cleanup_phase = 0, err, locked = 0;
381         ENTRY;
382
383         LASSERT(offset == 0);
384
385         DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
386                   rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
387
388         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
389
390         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
391                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
392                 if (IS_ERR(de))
393                         GOTO(cleanup, rc = PTR_ERR(de));
394         } else {
395                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
396                                            &lockh, NULL, 0);
397                 if (IS_ERR(de))
398                         GOTO(cleanup, rc = PTR_ERR(de));
399                 locked = 1;
400         }
401
402         cleanup_phase = 1;
403         inode = de->d_inode;
404         LASSERT(inode);
405         if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
406                 down(&inode->i_sem);
407
408         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
409
410         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
411         if (IS_ERR(handle))
412                 GOTO(cleanup, rc = PTR_ERR(handle));
413
414         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
415                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
416                        LTIME_S(rec->ur_iattr.ia_mtime),
417                        LTIME_S(rec->ur_iattr.ia_ctime));
418         rc = mds_fix_attr(inode, rec);
419         if (rc)
420                 GOTO(cleanup, rc);
421
422         if (rec->ur_iattr.ia_valid & ATTR_ATTR_FLAG)    /* ioctl */
423                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_SETFLAGS,
424                                       (long)&rec->ur_iattr.ia_attr_flags);
425         else                                            /* setattr */
426                 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
427
428         if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
429                 rc = fsfilt_set_md(obd, inode, handle,
430                                    rec->ur_eadata, rec->ur_eadatalen);
431         }
432
433         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
434         mds_pack_inode2fid(&body->fid1, inode);
435         mds_pack_inode2body(body, inode);
436
437         /* Don't return OST-specific attributes if we didn't just set them */
438         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
439                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
440         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
441                 body->valid |= OBD_MD_FLMTIME;
442         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
443                 body->valid |= OBD_MD_FLATIME;
444
445         if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
446                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
447                           rec->ur_eadatalen);
448                 if (mlcd) {
449                         mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
450                                 rec->ur_eadatalen;
451                         mlcd->mlcd_eadatalen = rec->ur_eadatalen;
452                         mlcd->mlcd_cookielen = rec->ur_cookielen;
453                         mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
454                                 mlcd->mlcd_cookielen;
455                         memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
456                                mlcd->mlcd_cookielen);
457                         memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
458                                mlcd->mlcd_eadatalen);
459                 } else {
460                         CERROR("unable to allocate log cancel data\n");
461                 }
462         }
463         EXIT;
464  cleanup:
465         if (mlcd != NULL)
466                 fsfilt_add_journal_cb(req->rq_export->exp_obd, 0, handle,
467                                       mds_cancel_cookies_cb, mlcd);
468         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
469         switch (cleanup_phase) {
470         case 1:
471                 if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
472                         up(&inode->i_sem);
473                 l_dput(de);
474                 if (locked) {
475                         if (rc) {
476                                 ldlm_lock_decref(&lockh, LCK_PW);
477                         } else {
478                                 ptlrpc_save_lock (req, &lockh, LCK_PW);
479                         }
480                 }
481         case 0:
482                 break;
483         default:
484                 LBUG();
485         }
486         if (err && !rc)
487                 rc = err;
488
489         req->rq_status = rc;
490         return 0;
491 }
492
493 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
494                                      struct ptlrpc_request *req)
495 {
496         struct mds_export_data *med = &req->rq_export->exp_mds_data;
497         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
498         struct dentry *parent, *child;
499         struct mds_body *body;
500
501         mds_req_from_mcd(req, med->med_mcd);
502
503         if (req->rq_status)
504                 return;
505
506         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
507         LASSERT(!IS_ERR(parent));
508         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
509         LASSERT(!IS_ERR(child));
510         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
511         mds_pack_inode2fid(&body->fid1, child->d_inode);
512         mds_pack_inode2body(body, child->d_inode);
513         l_dput(parent);
514         l_dput(child);
515 }
516
517 static int mds_reint_create(struct mds_update_record *rec, int offset,
518                             struct ptlrpc_request *req,
519                             struct lustre_handle *lh)
520 {
521         struct dentry *dparent = NULL;
522         struct mds_obd *mds = mds_req2mds(req);
523         struct obd_device *obd = req->rq_export->exp_obd;
524         struct dentry *dchild = NULL;
525         struct inode *dir = NULL;
526         void *handle = NULL;
527         struct lustre_handle lockh;
528         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
529         int created = 0;
530         struct dentry_params dp;
531         ENTRY;
532
533         LASSERT(offset == 0);
534         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
535
536         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
537                   rec->ur_fid1->id, rec->ur_fid1->generation,
538                   rec->ur_name, rec->ur_mode);
539
540         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
541
542         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
543                 GOTO(cleanup, rc = -ESTALE);
544
545         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh,
546                                         rec->ur_name, rec->ur_namelen - 1);
547         if (IS_ERR(dparent)) {
548                 rc = PTR_ERR(dparent);
549                 CERROR("parent lookup error %d\n", rc);
550                 GOTO(cleanup, rc);
551         }
552         cleanup_phase = 1; /* locked parent dentry */
553         dir = dparent->d_inode;
554         LASSERT(dir);
555
556         ldlm_lock_dump_handle(D_OTHER, &lockh);
557
558         dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
559         if (IS_ERR(dchild)) {
560                 rc = PTR_ERR(dchild);
561                 CERROR("child lookup error %d\n", rc);
562                 GOTO(cleanup, rc);
563         }
564
565         cleanup_phase = 2; /* child dentry */
566
567         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb);
568
569         if (dir->i_mode & S_ISGID) {
570                 if (S_ISDIR(rec->ur_mode))
571                         rec->ur_mode |= S_ISGID;
572         }
573
574         dchild->d_fsdata = (void *)&dp;
575         dp.p_inum = (unsigned long)rec->ur_fid2->id;
576         dp.p_ptr = req;
577
578         switch (type) {
579         case S_IFREG:{
580                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
581                 if (IS_ERR(handle))
582                         GOTO(cleanup, rc = PTR_ERR(handle));
583                 rc = ll_vfs_create(dir, dchild, rec->ur_mode, NULL);
584                 EXIT;
585                 break;
586         }
587         case S_IFDIR:{
588                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
589                 if (IS_ERR(handle))
590                         GOTO(cleanup, rc = PTR_ERR(handle));
591                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
592                 EXIT;
593                 break;
594         }
595         case S_IFLNK:{
596                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
597                 if (IS_ERR(handle))
598                         GOTO(cleanup, rc = PTR_ERR(handle));
599                 if (rec->ur_tgt == NULL)        /* no target supplied */
600                         rc = -EINVAL;           /* -EPROTO? */
601                 else
602                         rc = vfs_symlink(dir, dchild, rec->ur_tgt);
603                 EXIT;
604                 break;
605         }
606         case S_IFCHR:
607         case S_IFBLK:
608         case S_IFIFO:
609         case S_IFSOCK:{
610                 int rdev = rec->ur_rdev;
611                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
612                 if (IS_ERR(handle))
613                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
614                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
615                 EXIT;
616                 break;
617         }
618         default:
619                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
620                 dchild->d_fsdata = NULL;
621                 GOTO(cleanup, rc = -EINVAL);
622         }
623
624         /* In case we stored the desired inum in here, we want to clean up. */
625         if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
626                 dchild->d_fsdata = NULL;
627
628         if (rc) {
629                 CDEBUG(D_INODE, "error during create: %d\n", rc);
630                 GOTO(cleanup, rc);
631         } else {
632                 struct iattr iattr;
633                 struct inode *inode = dchild->d_inode;
634                 struct mds_body *body;
635
636                 created = 1;
637                 LTIME_S(iattr.ia_atime) = rec->ur_time;
638                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
639                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
640                 iattr.ia_uid = rec->ur_fsuid;
641                 if (dir->i_mode & S_ISGID)
642                         iattr.ia_gid = dir->i_gid;
643                 else
644                         iattr.ia_gid = rec->ur_fsgid;
645                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
646                         ATTR_MTIME | ATTR_CTIME;
647
648                 if (rec->ur_fid2->id) {
649                         LASSERT(rec->ur_fid2->id == inode->i_ino);
650                         inode->i_generation = rec->ur_fid2->generation;
651                         /* Dirtied and committed by the upcoming setattr. */
652                         CDEBUG(D_INODE, "recreated ino %lu with gen %u\n",
653                                inode->i_ino, inode->i_generation);
654                 } else {
655                         struct lustre_handle child_ino_lockh;
656                         struct ldlm_res_id child_res_id =
657                              { .name = { inode->i_ino, 0 } };
658                         int lock_flags = 0;
659
660                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
661                                inode->i_ino, inode->i_generation);
662
663                         /* The inode we were allocated may have just been freed
664                          * by an unlink operation.  We take this lock to
665                          * synchronize against the matching reply-ack-lock taken
666                          * in unlink, to avoid replay problems if this reply
667                          * makes it out to the client but the unlink's does not.
668                          * See bug 2029 for more detail.*/
669                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
670                                               NULL, child_res_id, LDLM_PLAIN,
671                                               NULL, 0, LCK_EX, &lock_flags,
672                                               ldlm_completion_ast,
673                                               mds_blocking_ast, NULL,
674                                               &child_ino_lockh);
675                         if (rc != ELDLM_OK) {
676                                 CERROR("error locking for unlink/create sync: "
677                                        "%d\n", rc);
678                         } else {
679                                 ldlm_lock_decref(&child_ino_lockh, LCK_EX);
680                         }
681                 }
682
683                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
684                 if (rc)
685                         CERROR("error on child setattr: rc = %d\n", rc);
686
687                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
688                 rc = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
689                 if (rc)
690                         CERROR("error on parent setattr: rc = %d\n", rc);
691
692                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
693                 mds_pack_inode2fid(&body->fid1, inode);
694                 mds_pack_inode2body(body, inode);
695         }
696         EXIT;
697
698 cleanup:
699         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
700
701         if (rc && created) {
702                 /* Destroy the file we just created.  This should not need
703                  * extra journal credits, as we have already modified all of
704                  * the blocks needed in order to create the file in the first
705                  * place.
706                  */
707                 switch (type) {
708                 case S_IFDIR:
709                         err = vfs_rmdir(dir, dchild);
710                         if (err)
711                                 CERROR("rmdir in error path: %d\n", err);
712                         break;
713                 default:
714                         err = vfs_unlink(dir, dchild);
715                         if (err)
716                                 CERROR("unlink in error path: %d\n", err);
717                         break;
718                 }
719         } else {
720                 rc = err;
721         }
722         switch (cleanup_phase) {
723         case 2: /* child dentry */
724                 l_dput(dchild);
725         case 1: /* locked parent dentry */
726                 if (rc) {
727                         ldlm_lock_decref(&lockh, LCK_PW);
728                 } else {
729                         ptlrpc_save_lock (req, &lockh, LCK_PW);
730                 }
731                 l_dput(dparent);
732         case 0:
733                 break;
734         default:
735                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
736                 LBUG();
737         }
738         req->rq_status = rc;
739         return 0;
740 }
741
742 int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
743 {
744         int i;
745
746         for (i = 0; i < RES_NAME_SIZE; i++) {
747                 /* return 1 here, because enqueue_ordered will skip resources
748                  * of all zeroes if they're sorted to the end of the list. */
749                 if (res1->name[i] == 0 && res2->name[i] != 0)
750                         return 1;
751                 if (res2->name[i] == 0 && res1->name[i] != 0)
752                         return 0;
753
754                 if (res1->name[i] > res2->name[i])
755                         return 1;
756                 if (res1->name[i] < res2->name[i])
757                         return 0;
758         }
759         return 0;
760 }
761
762 /* This function doesn't use ldlm_match_or_enqueue because we're always called
763  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
764  * because they take the place of local semaphores.
765  *
766  * One or two locks are taken in numerical order.  A res_id->name[0] of 0 means
767  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
768 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
769                           struct lustre_handle *p1_lockh, int p1_lock_mode,
770                           struct ldlm_res_id *p2_res_id,
771                           struct lustre_handle *p2_lockh, int p2_lock_mode)
772 {
773         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
774         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
775         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
776         int rc, flags;
777         ENTRY;
778
779         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
780
781         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
782                res_id[0]->name[0], res_id[1]->name[0]);
783
784         if (res_gt(p1_res_id, p2_res_id)) {
785                 handles[1] = p1_lockh;
786                 handles[0] = p2_lockh;
787                 res_id[1] = p1_res_id;
788                 res_id[0] = p2_res_id;
789                 lock_modes[1] = p1_lock_mode;
790                 lock_modes[0] = p2_lock_mode;
791         }
792
793         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
794                res_id[0]->name[0], res_id[1]->name[0]);
795
796         flags = LDLM_FL_LOCAL_ONLY;
797         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0],
798                               LDLM_PLAIN, NULL, 0, lock_modes[0], &flags,
799                               ldlm_completion_ast, mds_blocking_ast, NULL,
800                               handles[0]);
801         if (rc != ELDLM_OK)
802                 RETURN(-EIO);
803         ldlm_lock_dump_handle(D_OTHER, handles[0]);
804
805         if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
806                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
807                 ldlm_lock_addref(handles[1], lock_modes[1]);
808         } else if (res_id[1]->name[0] != 0) {
809                 flags = LDLM_FL_LOCAL_ONLY;
810                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
811                                       *res_id[1], LDLM_PLAIN, NULL, 0,
812                                       lock_modes[1], &flags,ldlm_completion_ast,
813                                       mds_blocking_ast, NULL, handles[1]);
814                 if (rc != ELDLM_OK) {
815                         ldlm_lock_decref(handles[0], lock_modes[0]);
816                         RETURN(-EIO);
817                 }
818                 ldlm_lock_dump_handle(D_OTHER, handles[1]);
819         }
820
821         RETURN(0);
822 }
823
824 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
825                            struct lustre_handle *p1_lockh, int p1_lock_mode,
826                            struct ldlm_res_id *p2_res_id,
827                            struct lustre_handle *p2_lockh, int p2_lock_mode,
828                            struct ldlm_res_id *c1_res_id,
829                            struct lustre_handle *c1_lockh, int c1_lock_mode,
830                            struct ldlm_res_id *c2_res_id,
831                            struct lustre_handle *c2_lockh, int c2_lock_mode)
832 {
833         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
834                                           c1_res_id, c2_res_id };
835         struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
836                                                  c1_lockh, c2_lockh };
837         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
838                               c1_lock_mode, c2_lock_mode };
839         int rc, i, j, sorted, flags;
840         ENTRY;
841
842         CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
843                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
844                res_id[3]->name[0]);
845
846         /* simple insertion sort - we have at most 4 elements */
847         for (i = 1; i < 4; i++) {
848                 j = i - 1;
849                 dlm_handles[4] = dlm_handles[i];
850                 res_id[4] = res_id[i];
851                 lock_modes[4] = lock_modes[i];
852
853                 sorted = 0;
854                 do {
855                         if (res_gt(res_id[j], res_id[4])) {
856                                 dlm_handles[j + 1] = dlm_handles[j];
857                                 res_id[j + 1] = res_id[j];
858                                 lock_modes[j + 1] = lock_modes[j];
859                                 j--;
860                         } else {
861                                 sorted = 1;
862                         }
863                 } while (j >= 0 && !sorted);
864
865                 dlm_handles[j + 1] = dlm_handles[4];
866                 res_id[j + 1] = res_id[4];
867                 lock_modes[j + 1] = lock_modes[4];
868         }
869
870         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
871                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
872                res_id[3]->name[0]);
873
874         /* XXX we could send ASTs on all these locks first before blocking? */
875         for (i = 0; i < 4; i++) {
876                 flags = 0;
877                 if (res_id[i]->name[0] == 0)
878                         break;
879                 if (i != 0 &&
880                     memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
881                         memcpy(dlm_handles[i], dlm_handles[i-1],
882                                sizeof(*(dlm_handles[i])));
883                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
884                 } else {
885                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
886                                               NULL, *res_id[i], LDLM_PLAIN,
887                                               NULL, 0, lock_modes[i], &flags,
888                                               ldlm_completion_ast,
889                                               mds_blocking_ast, NULL,
890                                               dlm_handles[i]);
891                         if (rc != ELDLM_OK)
892                                 GOTO(out_err, rc = -EIO);
893                         ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
894                 }
895         }
896
897         RETURN(0);
898 out_err:
899         while (i-- > 0)
900                 ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
901
902         return rc;
903 }
904
905 /* In the unlikely case that the child changed while we were waiting
906  * on the lock, we need to drop the lock on the old child and either:
907  * - if the child has a lower resource name, then we have to also
908  *   drop the parent lock and regain the locks in the right order
909  * - in the rename case, if the child has a lower resource name than one of
910  *   the other parent/child resources (maxres) we also need to reget the locks
911  * - if the child has a higher resource name (this is the common case)
912  *   we can just get the lock on the new child (still in lock order)
913  *
914  * Returns 0 if the child did not change or if it changed but could be locked.
915  * Returns 1 if the child changed and we need to re-lock (no locks held).
916  * Returns -ve error with a valid dchild (no locks held). */
917 static int mds_verify_child(struct obd_device *obd,
918                             struct ldlm_res_id *parent_res_id,
919                             struct lustre_handle *parent_lockh,
920                             struct dentry *dparent, int parent_mode,
921                             struct ldlm_res_id *child_res_id,
922                             struct lustre_handle *child_lockh,
923                             struct dentry **dchildp, int child_mode,
924                             const char *name, int namelen,
925                             struct ldlm_res_id *maxres)
926 {
927         struct dentry *vchild, *dchild = *dchildp;
928         int rc = 0, cleanup_phase = 2; /* parent, child locks */
929         ENTRY;
930
931         vchild = ll_lookup_one_len(name, dparent, namelen - 1);
932         if (IS_ERR(vchild))
933                 GOTO(cleanup, rc = PTR_ERR(vchild));
934
935         if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
936                    (vchild->d_inode != NULL &&
937                     child_res_id->name[0] == vchild->d_inode->i_ino &&
938                     child_res_id->name[1] == vchild->d_inode->i_generation))) {
939                 if (dchild != NULL)
940                         l_dput(dchild);
941                 *dchildp = vchild;
942
943                 RETURN(0);
944         }
945
946         CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
947                vchild->d_inode, dchild ? dchild->d_inode : 0,
948                vchild->d_inode ? vchild->d_inode->i_ino : 0,
949                child_res_id->name[0]);
950         if (child_res_id->name[0] != 0)
951                 ldlm_lock_decref(child_lockh, child_mode);
952         if (dchild)
953                 l_dput(dchild);
954
955         cleanup_phase = 1; /* parent lock only */
956         *dchildp = dchild = vchild;
957
958         if (dchild->d_inode) {
959                 int flags = 0;
960                 child_res_id->name[0] = dchild->d_inode->i_ino;
961                 child_res_id->name[1] = dchild->d_inode->i_generation;
962
963                 if (res_gt(parent_res_id, child_res_id) ||
964                     res_gt(maxres, child_res_id)) {
965                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
966                                child_res_id->name[0], parent_res_id->name[0],
967                                maxres->name[0]);
968                         GOTO(cleanup, rc = 1);
969                 }
970
971                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
972                                       NULL, *child_res_id, LDLM_PLAIN,
973                                       NULL, 0, child_mode, &flags,
974                                       ldlm_completion_ast, mds_blocking_ast,
975                                       NULL, child_lockh);
976                 if (rc != ELDLM_OK)
977                         GOTO(cleanup, rc = -EIO);
978         } else {
979                 memset(child_res_id, 0, sizeof(*child_res_id));
980         }
981
982         EXIT;
983 cleanup:
984         if (rc) {
985                 switch(cleanup_phase) {
986                 case 2:
987                         if (child_res_id->name[0] != 0)
988                                 ldlm_lock_decref(child_lockh, child_mode);
989                 case 1:
990                         ldlm_lock_decref(parent_lockh, parent_mode);
991                 }
992         }
993         return rc;
994 }
995
996 int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
997                                 struct ll_fid *fid,
998                                 struct lustre_handle *parent_lockh,
999                                 struct dentry **dparentp, int parent_mode,
1000                                 char *name, int namelen,
1001                                 struct lustre_handle *child_lockh,
1002                                 struct dentry **dchildp, int child_mode)
1003 {
1004         struct ldlm_res_id child_res_id = { .name = {0} };
1005         struct ldlm_res_id parent_res_id = { .name = {0} };
1006         struct inode *inode;
1007         int rc = 0, cleanup_phase = 0;
1008         ENTRY;
1009
1010         /* Step 1: Lookup parent */
1011         *dparentp = mds_fid2dentry(mds, fid, NULL);
1012         if (IS_ERR(*dparentp))
1013                 RETURN(rc = PTR_ERR(*dparentp));
1014         LASSERT((*dparentp)->d_inode);
1015
1016         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
1017                (*dparentp)->d_inode->i_ino, name);
1018
1019         parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
1020         parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
1021
1022         cleanup_phase = 1; /* parent dentry */
1023
1024         /* Step 2: Lookup child (without DLM lock, to get resource name) */
1025         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
1026         if (IS_ERR(*dchildp)) {
1027                 rc = PTR_ERR(*dchildp);
1028                 CDEBUG(D_INODE, "child lookup error %d\n", rc);
1029                 GOTO(cleanup, rc);
1030         }
1031
1032         inode = (*dchildp)->d_inode;
1033         if (inode != NULL)
1034                 inode = igrab(inode);
1035         if (inode == NULL)
1036                 goto retry_locks;
1037
1038         child_res_id.name[0] = inode->i_ino;
1039         child_res_id.name[1] = inode->i_generation;
1040         iput(inode);
1041
1042 retry_locks:
1043         cleanup_phase = 2; /* child dentry */
1044
1045         /* Step 3: Lock parent and child in resource order.  If child doesn't
1046          *         exist, we still have to lock the parent and re-lookup. */
1047         rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
1048                                    &child_res_id, child_lockh, child_mode);
1049         if (rc)
1050                 GOTO(cleanup, rc);
1051
1052         if (!(*dchildp)->d_inode)
1053                 cleanup_phase = 3; /* parent lock */
1054         else
1055                 cleanup_phase = 4; /* child lock */
1056
1057         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
1058         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
1059                               parent_mode, &child_res_id, child_lockh, dchildp,
1060                               child_mode, name, namelen, &parent_res_id);
1061         if (rc > 0)
1062                 goto retry_locks;
1063         if (rc < 0) {
1064                 cleanup_phase = 3;
1065                 GOTO(cleanup, rc);
1066         }
1067
1068 cleanup:
1069         if (rc) {
1070                 switch (cleanup_phase) {
1071                 case 4:
1072                         ldlm_lock_decref(child_lockh, child_mode);
1073                 case 3:
1074                         ldlm_lock_decref(parent_lockh, parent_mode);
1075                 case 2:
1076                         l_dput(*dchildp);
1077                 case 1:
1078                         l_dput(*dparentp);
1079                 default: ;
1080                 }
1081         }
1082         return rc;
1083 }
1084
1085 void mds_reconstruct_generic(struct ptlrpc_request *req)
1086 {
1087         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1088
1089         mds_req_from_mcd(req, med->med_mcd);
1090 }
1091
1092 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
1093                             struct ptlrpc_request *req,
1094                             struct lustre_handle *lh)
1095 {
1096         struct dentry *dparent, *dchild;
1097         struct mds_obd *mds = mds_req2mds(req);
1098         struct obd_device *obd = req->rq_export->exp_obd;
1099         struct mds_body *body = NULL;
1100         struct inode *child_inode;
1101         struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh;
1102         void *handle = NULL;
1103         int rc = 0, log_unlink = 0, cleanup_phase = 0;
1104         ENTRY;
1105
1106         LASSERT(offset == 0 || offset == 2);
1107
1108         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
1109                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
1110
1111         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1112
1113         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1114                 GOTO(cleanup, rc = -ENOENT);
1115
1116         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
1117                                          &parent_lockh, &dparent, LCK_PW,
1118                                          rec->ur_name, rec->ur_namelen,
1119                                          &child_lockh, &dchild, LCK_EX);
1120         if (rc)
1121                 GOTO(cleanup, rc);
1122
1123         cleanup_phase = 1; /* dchild, dparent, locks */
1124
1125         child_inode = dchild->d_inode;
1126         if (child_inode == NULL) {
1127                 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
1128                        dparent->d_inode->i_ino, rec->ur_name);
1129                 GOTO(cleanup, rc = -ENOENT);
1130         }
1131
1132         cleanup_phase = 2; /* dchild has a lock */
1133
1134         /* Step 4: Get a lock on the ino to sync with creation WRT inode
1135          * reuse (see bug 2029). */
1136         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
1137         if (rc != ELDLM_OK)
1138                 GOTO(cleanup, rc);
1139
1140         cleanup_phase = 3; /* child inum lock */
1141
1142         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
1143
1144         /* ldlm_reply in buf[0] if called via intent */
1145         if (offset)
1146                 offset = 1;
1147
1148         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
1149         LASSERT(body != NULL);
1150
1151         /* If this is the last reference to this inode, get the OBD EA
1152          * data first so the client can destroy OST objects.
1153          * we only do the object removal if no open files remain.
1154          * Nobody can get at this name anymore because of the locks so
1155          * we make decisions here as to whether to remove the inode */
1156         if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
1157             mds_open_orphan_count(child_inode) == 0) {
1158                 mds_pack_inode2fid(&body->fid1, child_inode);
1159                 mds_pack_inode2body(body, child_inode);
1160                 mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
1161                             child_inode, 1);
1162                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1163                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1164                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1165                 } else {
1166                         log_unlink = 1;
1167                 }
1168         }
1169
1170         /* We have to do these checks ourselves, in case we are making an
1171          * orphan.  The client tells us whether rmdir() or unlink() was called,
1172          * so we need to return appropriate errors (bug 72).
1173          *
1174          * We don't have to check permissions, because vfs_rename (called from
1175          * mds_open_unlink_rename) also calls may_delete. */
1176         if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
1177                 if (!S_ISDIR(child_inode->i_mode))
1178                         GOTO(cleanup, rc = -ENOTDIR);
1179         } else {
1180                 if (S_ISDIR(child_inode->i_mode))
1181                         GOTO(cleanup, rc = -EISDIR);
1182         }
1183
1184         if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
1185             mds_open_orphan_count(child_inode) > 0) {
1186                 rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
1187                 cleanup_phase = 4; /* transaction */
1188                 GOTO(cleanup, rc);
1189         }
1190
1191         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
1192         switch (child_inode->i_mode & S_IFMT) {
1193         case S_IFDIR:
1194                 /* Drop any lingering child directories before we start our
1195                  * transaction, to avoid doing multiple inode dirty/delete
1196                  * in our compound transaction (bug 1321). */
1197                 shrink_dcache_parent(dchild);
1198                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
1199                                       NULL);
1200                 if (IS_ERR(handle))
1201                         GOTO(cleanup, rc = PTR_ERR(handle));
1202                 cleanup_phase = 4; /* transaction */
1203                 rc = vfs_rmdir(dparent->d_inode, dchild);
1204                 break;
1205         case S_IFREG: {
1206                 handle = fsfilt_start(obd, dparent->d_inode,
1207                                       FSFILT_OP_UNLINK_LOG, NULL);
1208                 if (IS_ERR(handle))
1209                         GOTO(cleanup, rc = PTR_ERR(handle));
1210
1211                 cleanup_phase = 4; /* transaction */
1212                 rc = vfs_unlink(dparent->d_inode, dchild);
1213
1214                 if (!rc && log_unlink)
1215                         if (mds_log_op_unlink(obd, child_inode,
1216                                 lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
1217                                 req->rq_repmsg->buflens[offset + 1],
1218                                 lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
1219                                 req->rq_repmsg->buflens[offset + 2]) > 0)
1220                                 body->valid |= OBD_MD_FLCOOKIE;
1221                 break;
1222         }
1223         case S_IFLNK:
1224         case S_IFCHR:
1225         case S_IFBLK:
1226         case S_IFIFO:
1227         case S_IFSOCK:
1228                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1229                                       NULL);
1230                 if (IS_ERR(handle))
1231                         GOTO(cleanup, rc = PTR_ERR(handle));
1232                 cleanup_phase = 4; /* transaction */
1233                 rc = vfs_unlink(dparent->d_inode, dchild);
1234                 break;
1235         default:
1236                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1237                        rec->ur_name);
1238                 LBUG();
1239                 GOTO(cleanup, rc = -EINVAL);
1240         }
1241
1242  cleanup:
1243         if (rc == 0) {
1244                 struct iattr iattr;
1245                 int err;
1246
1247                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
1248                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
1249                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
1250
1251                 err = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
1252                 if (err)
1253                         CERROR("error on parent setattr: rc = %d\n", err);
1254         }
1255
1256         switch(cleanup_phase) {
1257         case 4:
1258                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1259                                         rc, 0);
1260                 if (!rc)
1261                         (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
1262                                            "unlinked", 0, NULL);
1263         case 3: /* child ino-reuse lock */
1264                 if (rc && body != NULL) {
1265                         // Don't unlink the OST objects if the MDS unlink failed
1266                         body->valid = 0;
1267                 }
1268                 if (rc)
1269                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
1270                 else
1271                         ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
1272         case 2: /* child lock */
1273                 ldlm_lock_decref(&child_lockh, LCK_EX);
1274         case 1: /* child and parent dentry, parent lock */
1275                 if (rc)
1276                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1277                 else
1278                         ptlrpc_save_lock(req, &parent_lockh, LCK_PW);
1279                 l_dput(dchild);
1280                 l_dput(dparent);
1281         case 0:
1282                 break;
1283         default:
1284                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1285                 LBUG();
1286         }
1287         req->rq_status = rc;
1288         return 0;
1289 }
1290
1291 static int mds_reint_link(struct mds_update_record *rec, int offset,
1292                           struct ptlrpc_request *req,
1293                           struct lustre_handle *lh)
1294 {
1295         struct obd_device *obd = req->rq_export->exp_obd;
1296         struct dentry *de_src = NULL;
1297         struct dentry *de_tgt_dir = NULL;
1298         struct dentry *dchild = NULL;
1299         struct mds_obd *mds = mds_req2mds(req);
1300         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
1301         struct ldlm_res_id src_res_id = { .name = {0} };
1302         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
1303         int rc = 0, cleanup_phase = 0;
1304         ENTRY;
1305
1306         LASSERT(offset == 0);
1307
1308         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
1309                   rec->ur_fid1->id, rec->ur_fid1->generation,
1310                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
1311
1312         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1313
1314         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1315                 GOTO(cleanup, rc = -ENOENT);
1316
1317         /* Step 1: Lookup the source inode and target directory by FID */
1318         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1319         if (IS_ERR(de_src))
1320                 GOTO(cleanup, rc = PTR_ERR(de_src));
1321
1322         cleanup_phase = 1; /* source dentry */
1323
1324         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1325         if (IS_ERR(de_tgt_dir))
1326                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
1327
1328         cleanup_phase = 2; /* target directory dentry */
1329
1330         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
1331                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
1332                de_src->d_inode->i_ino);
1333
1334         /* Step 2: Take the two locks */
1335         src_res_id.name[0] = de_src->d_inode->i_ino;
1336         src_res_id.name[1] = de_src->d_inode->i_generation;
1337         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
1338         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
1339
1340         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
1341                                    &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
1342         if (rc)
1343                 GOTO(cleanup, rc);
1344
1345         cleanup_phase = 3; /* locks */
1346
1347         /* Step 3: Lookup the child */
1348         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
1349         if (IS_ERR(dchild)) {
1350                 rc = PTR_ERR(dchild);
1351                 if (rc != -EPERM && rc != -EACCES)
1352                         CERROR("child lookup error %d\n", rc);
1353                 GOTO(cleanup, rc);
1354         }
1355
1356         cleanup_phase = 4; /* child dentry */
1357
1358         if (dchild->d_inode) {
1359                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
1360                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
1361                 rc = -EEXIST;
1362                 GOTO(cleanup, rc);
1363         }
1364
1365         /* Step 4: Do it. */
1366         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_src->d_inode->i_sb);
1367
1368         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
1369         if (IS_ERR(handle)) {
1370                 rc = PTR_ERR(handle);
1371                 GOTO(cleanup, rc);
1372         }
1373
1374         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
1375         if (rc && rc != -EPERM && rc != -EACCES)
1376                 CERROR("vfs_link error %d\n", rc);
1377 cleanup:
1378         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
1379                                 handle, req, rc, 0);
1380         EXIT;
1381
1382         switch (cleanup_phase) {
1383         case 4: /* child dentry */
1384                 l_dput(dchild);
1385         case 3: /* locks */
1386                 if (rc) {
1387                         ldlm_lock_decref(&src_lockh, LCK_EX);
1388                         ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
1389                 } else {
1390                         ptlrpc_save_lock(req, &src_lockh, LCK_EX);
1391                         ptlrpc_save_lock(req, &tgt_dir_lockh, LCK_EX);
1392                 }
1393         case 2: /* target dentry */
1394                 l_dput(de_tgt_dir);
1395         case 1: /* source dentry */
1396                 l_dput(de_src);
1397         case 0:
1398                 break;
1399         default:
1400                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1401                 LBUG();
1402         }
1403         req->rq_status = rc;
1404         return 0;
1405 }
1406
1407 /*
1408  * add a hard link in the PENDING directory, only used by rename()
1409  */
1410 static int mds_add_link_orphan(struct mds_update_record *rec,
1411                                struct obd_device *obd,
1412                                struct dentry *dentry)
1413 {
1414         struct mds_obd *mds = &obd->u.mds;
1415         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
1416         struct dentry *pending_child;
1417         char fidname[LL_FID_NAMELEN];
1418         int fidlen = 0, rc;
1419         ENTRY;
1420
1421         LASSERT(dentry->d_inode);
1422         LASSERT(!mds_inode_is_orphan(dentry->d_inode));
1423
1424         down(&pending_dir->i_sem);
1425         fidlen = ll_fid2str(fidname, dentry->d_inode->i_ino,
1426                             dentry->d_inode->i_generation);
1427
1428         CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
1429                mds_open_orphan_count(dentry->d_inode),
1430                rec->ur_name, fidname);
1431
1432         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
1433         if (IS_ERR(pending_child))
1434                 GOTO(out_lock, rc = PTR_ERR(pending_child));
1435
1436         if (pending_child->d_inode != NULL) {
1437                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
1438                 LASSERT(pending_child->d_inode == dentry->d_inode);
1439                 GOTO(out_dput, rc = 0);
1440         }
1441
1442         lock_kernel();
1443         rc = vfs_link(dentry, pending_dir, pending_child);
1444         unlock_kernel();
1445         if (rc)
1446                 CERROR("error addlink orphan %s to PENDING: rc = %d\n",
1447                        rec->ur_name, rc);
1448         else
1449                 mds_inode_set_orphan(dentry->d_inode);
1450 out_dput:
1451         l_dput(pending_child);
1452 out_lock:
1453         up(&pending_dir->i_sem);
1454         RETURN(rc);
1455 }
1456
1457 /* The idea here is that we need to get four locks in the end:
1458  * one on each parent directory, one on each child.  We need to take
1459  * these locks in some kind of order (to avoid deadlocks), and the order
1460  * I selected is "increasing resource number" order.  We need to look up
1461  * the children, however, before we know what the resource number(s) are.
1462  * Thus the following plan:
1463  *
1464  * 1,2. Look up the parents
1465  * 3,4. Look up the children
1466  * 5. Take locks on the parents and children, in order
1467  * 6. Verify that the children haven't changed since they were looked up
1468  *
1469  * If there was a race and the children changed since they were first looked
1470  * up, it is possible that mds_verify_child() will be able to just grab the
1471  * lock on the new child resource (if it has a higher resource than any other)
1472  * but we need to compare against not only its parent, but also against the
1473  * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
1474  *
1475  * We need the fancy igrab() on the child inodes because we aren't holding a
1476  * lock on the parent after the lookup is done, so dentry->d_inode may change
1477  * at any time, and igrab() itself doesn't like getting passed a NULL argument.
1478  */
1479 static int mds_get_parents_children_locked(struct obd_device *obd,
1480                                            struct mds_obd *mds,
1481                                            struct ll_fid *p1_fid,
1482                                            struct dentry **de_srcdirp,
1483                                            struct ll_fid *p2_fid,
1484                                            struct dentry **de_tgtdirp,
1485                                            int parent_mode,
1486                                            const char *old_name, int old_len,
1487                                            struct dentry **de_oldp,
1488                                            const char *new_name, int new_len,
1489                                            struct dentry **de_newp,
1490                                            struct lustre_handle *dlm_handles,
1491                                            int child_mode)
1492 {
1493         struct ldlm_res_id p1_res_id = { .name = {0} };
1494         struct ldlm_res_id p2_res_id = { .name = {0} };
1495         struct ldlm_res_id c1_res_id = { .name = {0} };
1496         struct ldlm_res_id c2_res_id = { .name = {0} };
1497         struct ldlm_res_id *maxres_src, *maxres_tgt;
1498         struct inode *inode;
1499         int rc = 0, cleanup_phase = 0;
1500         ENTRY;
1501
1502         /* Step 1: Lookup the source directory */
1503         *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
1504         if (IS_ERR(*de_srcdirp))
1505                 GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
1506
1507         cleanup_phase = 1; /* source directory dentry */
1508
1509         p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
1510         p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
1511
1512         /* Step 2: Lookup the target directory */
1513         if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
1514                 *de_tgtdirp = dget(*de_srcdirp);
1515         } else {
1516                 *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
1517                 if (IS_ERR(*de_tgtdirp))
1518                         GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
1519         }
1520
1521         cleanup_phase = 2; /* target directory dentry */
1522
1523         p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
1524         p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
1525
1526         /* Step 3: Lookup the source child entry */
1527         *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
1528         if (IS_ERR(*de_oldp)) {
1529                 rc = PTR_ERR(*de_oldp);
1530                 CERROR("old child lookup error (%*s): %d\n",
1531                        old_len - 1, old_name, rc);
1532                 GOTO(cleanup, rc);
1533         }
1534
1535         cleanup_phase = 3; /* original name dentry */
1536
1537         inode = (*de_oldp)->d_inode;
1538         if (inode != NULL)
1539                 inode = igrab(inode);
1540         if (inode == NULL)
1541                 GOTO(cleanup, rc = -ENOENT);
1542
1543         c1_res_id.name[0] = inode->i_ino;
1544         c1_res_id.name[1] = inode->i_generation;
1545         iput(inode);
1546
1547         /* Step 4: Lookup the target child entry */
1548         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
1549         if (IS_ERR(*de_newp)) {
1550                 rc = PTR_ERR(*de_newp);
1551                 CERROR("new child lookup error (%*s): %d\n",
1552                        old_len - 1, old_name, rc);
1553                 GOTO(cleanup, rc);
1554         }
1555
1556         cleanup_phase = 4; /* target dentry */
1557
1558         inode = (*de_newp)->d_inode;
1559         if (inode != NULL)
1560                 inode = igrab(inode);
1561         if (inode == NULL)
1562                 goto retry_locks;
1563
1564         c2_res_id.name[0] = inode->i_ino;
1565         c2_res_id.name[1] = inode->i_generation;
1566         iput(inode);
1567
1568 retry_locks:
1569         /* Step 5: Take locks on the parents and child(ren) */
1570         maxres_src = &p1_res_id;
1571         maxres_tgt = &p2_res_id;
1572         cleanup_phase = 4; /* target dentry */
1573
1574         if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
1575                 maxres_src = &c1_res_id;
1576         if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
1577                 maxres_tgt = &c2_res_id;
1578
1579         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
1580                                     &p2_res_id, &dlm_handles[1], parent_mode,
1581                                     &c1_res_id, &dlm_handles[2], child_mode,
1582                                     &c2_res_id, &dlm_handles[3], child_mode);
1583         if (rc)
1584                 GOTO(cleanup, rc);
1585
1586         cleanup_phase = 6; /* parent and child(ren) locks */
1587
1588         /* Step 6a: Re-lookup source child to verify it hasn't changed */
1589         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
1590                               parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
1591                               child_mode, old_name, old_len, maxres_tgt);
1592         if (rc) {
1593                 if (c2_res_id.name[0] != 0)
1594                         ldlm_lock_decref(&dlm_handles[3], child_mode);
1595                 ldlm_lock_decref(&dlm_handles[1], parent_mode);
1596                 cleanup_phase = 4;
1597                 if (rc > 0)
1598                         goto retry_locks;
1599                 GOTO(cleanup, rc);
1600         }
1601
1602         if ((*de_oldp)->d_inode == NULL)
1603                 GOTO(cleanup, rc = -ENOENT);
1604
1605         /* Step 6b: Re-lookup target child to verify it hasn't changed */
1606         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
1607                               parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
1608                               child_mode, new_name, new_len, maxres_src);
1609         if (rc) {
1610                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1611                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
1612                 cleanup_phase = 4;
1613                 if (rc > 0)
1614                         goto retry_locks;
1615                 GOTO(cleanup, rc);
1616         }
1617
1618         EXIT;
1619 cleanup:
1620         if (rc) {
1621                 switch (cleanup_phase) {
1622                 case 6: /* child lock(s) */
1623                         if (c2_res_id.name[0] != 0)
1624                                 ldlm_lock_decref(&dlm_handles[3], child_mode);
1625                         if (c1_res_id.name[0] != 0)
1626                                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1627                 case 5: /* parent locks */
1628                         ldlm_lock_decref(&dlm_handles[1], parent_mode);
1629                         ldlm_lock_decref(&dlm_handles[0], parent_mode);
1630                 case 4: /* target dentry */
1631                         l_dput(*de_newp);
1632                 case 3: /* source dentry */
1633                         l_dput(*de_oldp);
1634                 case 2: /* target directory dentry */
1635                         l_dput(*de_tgtdirp);
1636                 case 1: /* source directry dentry */
1637                         l_dput(*de_srcdirp);
1638                 }
1639         }
1640
1641         return rc;
1642 }
1643
1644 static int mds_reint_rename(struct mds_update_record *rec, int offset,
1645                             struct ptlrpc_request *req,
1646                             struct lustre_handle *lockh)
1647 {
1648         struct obd_device *obd = req->rq_export->exp_obd;
1649         struct dentry *de_srcdir = NULL;
1650         struct dentry *de_tgtdir = NULL;
1651         struct dentry *de_old = NULL;
1652         struct dentry *de_new = NULL;
1653         struct mds_obd *mds = mds_req2mds(req);
1654         struct lustre_handle dlm_handles[4];
1655         struct mds_body *body = NULL;
1656         int rc = 0, lock_count = 3;
1657         int cleanup_phase = 0;
1658         void *handle = NULL;
1659         ENTRY;
1660
1661         LASSERT(offset == 0);
1662
1663         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
1664                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
1665                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
1666
1667         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1668
1669         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
1670                                              rec->ur_fid2, &de_tgtdir, LCK_PW,
1671                                              rec->ur_name, rec->ur_namelen,
1672                                              &de_old, rec->ur_tgt,
1673                                              rec->ur_tgtlen, &de_new,
1674                                              dlm_handles, LCK_EX);
1675         if (rc)
1676                 GOTO(cleanup, rc);
1677
1678         cleanup_phase = 1; /* parent(s), children, locks */
1679
1680         if (de_new->d_inode)
1681                 lock_count = 4;
1682
1683         /* sanity check for src inode */
1684         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1685             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1686                 GOTO(cleanup, rc = -EINVAL);
1687
1688         /* sanity check for dest inode */
1689         if (de_new->d_inode &&
1690             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1691              de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1692                 GOTO(cleanup, rc = -EINVAL);
1693
1694         if (de_old->d_inode == de_new->d_inode) {
1695                 GOTO(cleanup, rc = 0);
1696         }
1697
1698         /* if we are about to remove the target at first, pass the EA of
1699          * that inode to client to perform and cleanup on OST */
1700         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
1701         LASSERT(body != NULL);
1702
1703         if (de_new->d_inode &&
1704             S_ISREG(de_new->d_inode->i_mode) &&
1705             de_new->d_inode->i_nlink == 1 &&
1706             mds_open_orphan_count(de_new->d_inode) == 0) {
1707                 mds_pack_inode2fid(&body->fid1, de_new->d_inode);
1708                 mds_pack_inode2body(body, de_new->d_inode);
1709                 mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
1710                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1711                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1712                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1713                 } else {
1714                         /* XXX need log unlink? */
1715                 }
1716         }
1717
1718         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1719                        de_srcdir->d_inode->i_sb);
1720
1721         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
1722         if (IS_ERR(handle))
1723                 GOTO(cleanup, rc = PTR_ERR(handle));
1724
1725         /* FIXME need adjust the journal block count? */
1726         /* if the target should be moved to PENDING, we at first increase the
1727          * link and later vfs_rename() will decrease the link count again */
1728         if (de_new->d_inode &&
1729             S_ISREG(de_new->d_inode->i_mode) &&
1730             de_new->d_inode->i_nlink == 1 &&
1731             mds_open_orphan_count(de_new->d_inode) > 0) {
1732                 rc = mds_add_link_orphan(rec, obd, de_new);
1733                 if (rc)
1734                         GOTO(cleanup, rc);
1735         }
1736
1737         lock_kernel();
1738         de_old->d_fsdata = req;
1739         de_new->d_fsdata = req;
1740         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
1741         unlock_kernel();
1742
1743         GOTO(cleanup, rc);
1744 cleanup:
1745         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1746                                 handle, req, rc, 0);
1747         switch (cleanup_phase) {
1748         case 1:
1749                 if (rc) {
1750                         if (lock_count == 4)
1751                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1752                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1753                         ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
1754                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
1755                 } else {
1756                         if (lock_count == 4)
1757                                 ptlrpc_save_lock(req,
1758                                               &(dlm_handles[3]), LCK_EX);
1759                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
1760                         ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
1761                         ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
1762                 }
1763                 l_dput(de_new);
1764                 l_dput(de_old);
1765                 l_dput(de_tgtdir);
1766                 l_dput(de_srcdir);
1767         case 0:
1768                 break;
1769         default:
1770                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1771                 LBUG();
1772         }
1773         req->rq_status = rc;
1774         return 0;
1775 }
1776
1777 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1778                            struct ptlrpc_request *, struct lustre_handle *);
1779
1780 static mds_reinter reinters[REINT_MAX + 1] = {
1781         [REINT_SETATTR] mds_reint_setattr,
1782         [REINT_CREATE] mds_reint_create,
1783         [REINT_LINK] mds_reint_link,
1784         [REINT_UNLINK] mds_reint_unlink,
1785         [REINT_RENAME] mds_reint_rename,
1786         [REINT_OPEN] mds_open
1787 };
1788
1789 int mds_reint_rec(struct mds_update_record *rec, int offset,
1790                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1791 {
1792         struct obd_device *obd = req->rq_export->exp_obd;
1793         struct obd_run_ctxt saved;
1794         int rc;
1795         ENTRY;
1796
1797         /* checked by unpacker */
1798         LASSERT(rec->ur_opcode <= REINT_MAX &&
1799                 reinters[rec->ur_opcode] != NULL);
1800
1801         push_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1802         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1803         pop_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1804
1805         RETURN(rc);
1806 }