Whamcloud - gitweb
3227f348f7c9f65291d3f92bae312788c9aaa053
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/fs.h>
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include <linux/obd.h>
37 #include <linux/lustre_lib.h>
38 #include <linux/lustre_idl.h>
39 #include <linux/lustre_mds.h>
40 #include <linux/lustre_dlm.h>
41 #include <linux/lustre_fsfilt.h>
42
43 #include "mds_internal.h"
44
45 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
46                    int error)
47 {
48         obd_transno_commit_cb(obd, transno, error);
49 }
50
51 struct mds_logcancel_data {
52         struct lov_mds_md      *mlcd_lmm;
53         int                     mlcd_size;
54         int                     mlcd_cookielen;
55         int                     mlcd_eadatalen;
56         struct llog_cookie      mlcd_cookies[0];
57 };
58
59
60 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
61                                   void *cb_data, int error)
62 {
63         struct mds_logcancel_data *mlcd = cb_data;
64         struct lov_stripe_md *lsm = NULL;
65         struct llog_ctxt *ctxt;
66         int rc;
67
68         obd_transno_commit_cb(obd, transno, error);
69
70         CDEBUG(D_HA, "cancelling %d cookies\n",
71                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
72
73         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, mlcd->mlcd_lmm,
74                           mlcd->mlcd_eadatalen);
75         if (rc < 0) {
76                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
77                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
78                        rc);
79         } else {
80                 ///* XXX 0 normally, SENDNOW for debug */);
81                 ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
82                 rc = llog_cancel(ctxt, lsm,
83                                          mlcd->mlcd_cookielen /
84                                          sizeof(*mlcd->mlcd_cookies),
85                                          mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
86                 if (rc)
87                         CERROR("error cancelling %d log cookies: rc %d\n",
88                                (int)(mlcd->mlcd_cookielen /
89                                      sizeof(*mlcd->mlcd_cookies)), rc);
90         }
91
92         OBD_FREE(mlcd, mlcd->mlcd_size);
93 }
94
95 /* Assumes caller has already pushed us into the kernel context. */
96 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
97                        struct ptlrpc_request *req, int rc, __u32 op_data)
98 {
99         struct mds_export_data *med = &req->rq_export->exp_mds_data;
100         struct mds_client_data *mcd = med->med_mcd;
101         struct obd_device *obd = req->rq_export->exp_obd;
102         int err;
103         __u64 transno;
104         loff_t off;
105         int log_pri = D_HA;
106         ENTRY;
107
108         /* if the export has already been failed, we have no last_rcvd slot */
109         if (req->rq_export->exp_failed) {
110                 CERROR("committing transaction for disconnected client\n");
111                 if (handle)
112                         GOTO(commit, rc);
113                 RETURN(rc);
114         }
115
116         if (IS_ERR(handle))
117                 RETURN(rc);
118
119         if (handle == NULL) {
120                 /* if we're starting our own xaction, use our own inode */
121                 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
122                 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
123                 if (IS_ERR(handle)) {
124                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
125                         RETURN(PTR_ERR(handle));
126                 }
127         }
128
129         off = med->med_off;
130
131         transno = req->rq_reqmsg->transno;
132         if (transno == 0) {
133                 spin_lock(&mds->mds_transno_lock);
134                 transno = ++mds->mds_last_transno;
135                 spin_unlock(&mds->mds_transno_lock);
136         } else {
137                 spin_lock(&mds->mds_transno_lock);
138                 if (transno > mds->mds_last_transno)
139                         mds->mds_last_transno = transno;
140                 spin_unlock(&mds->mds_transno_lock);
141         }
142         req->rq_repmsg->transno = req->rq_transno = transno;
143         mcd->mcd_last_transno = cpu_to_le64(transno);
144         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
145         mcd->mcd_last_result = cpu_to_le32(rc);
146         mcd->mcd_last_data = cpu_to_le32(op_data);
147
148         fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
149                               mds_commit_cb, NULL);
150         err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
151                                   &off, 0);
152
153         if (err) {
154                 log_pri = D_ERROR;
155                 if (rc == 0)
156                         rc = err;
157         }
158
159         DEBUG_REQ(log_pri, req,
160                   "wrote trans #"LPU64" client %s at idx %u: err = %d",
161                   transno, mcd->mcd_uuid, med->med_idx, err);
162
163         err = mds_lov_write_objids(obd);
164         if (err) {
165                 log_pri = D_ERROR;
166                 if (rc == 0)
167                         rc = err;
168         }
169         CDEBUG(log_pri, "wrote objids: err = %d\n", err);
170
171 commit:
172         err = fsfilt_commit(obd, inode, handle, 0);
173         if (err) {
174                 CERROR("error committing transaction: %d\n", err);
175                 if (!rc)
176                         rc = err;
177         }
178
179         RETURN(rc);
180 }
181
182 /* this gives the same functionality as the code between
183  * sys_chmod and inode_setattr
184  * chown_common and inode_setattr
185  * utimes and inode_setattr
186  */
187 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
188 {
189         time_t now = LTIME_S(CURRENT_TIME);
190         struct iattr *attr = &rec->ur_iattr;
191         unsigned int ia_valid = attr->ia_valid;
192         int error;
193         ENTRY;
194
195         /* only fix up attrs if the client VFS didn't already */
196         if (!(ia_valid & ATTR_RAW))
197                 RETURN(0);
198
199         if (!(ia_valid & ATTR_CTIME_SET))
200                 LTIME_S(attr->ia_ctime) = now;
201         if (!(ia_valid & ATTR_ATIME_SET))
202                 LTIME_S(attr->ia_atime) = now;
203         if (!(ia_valid & ATTR_MTIME_SET))
204                 LTIME_S(attr->ia_mtime) = now;
205
206         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
207                 RETURN(-EPERM);
208
209         /* times */
210         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
211                 if (rec->ur_fsuid != inode->i_uid &&
212                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
213                         RETURN(error);
214         }
215
216         if (ia_valid & ATTR_SIZE) {
217                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
218                         RETURN(error);
219         }
220
221         if (ia_valid & ATTR_UID) {
222                 /* chown */
223                 error = -EPERM;
224                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
225                         RETURN(-EPERM);
226                 if (attr->ia_uid == (uid_t) -1)
227                         attr->ia_uid = inode->i_uid;
228                 if (attr->ia_gid == (gid_t) -1)
229                         attr->ia_gid = inode->i_gid;
230                 attr->ia_mode = inode->i_mode;
231                 /*
232                  * If the user or group of a non-directory has been
233                  * changed by a non-root user, remove the setuid bit.
234                  * 19981026 David C Niemi <niemi@tux.org>
235                  *
236                  * Changed this to apply to all users, including root,
237                  * to avoid some races. This is the behavior we had in
238                  * 2.0. The check for non-root was definitely wrong
239                  * for 2.2 anyway, as it should have been using
240                  * CAP_FSETID rather than fsuid -- 19990830 SD.
241                  */
242                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
243                     !S_ISDIR(inode->i_mode)) {
244                         attr->ia_mode &= ~S_ISUID;
245                         attr->ia_valid |= ATTR_MODE;
246                 }
247                 /*
248                  * Likewise, if the user or group of a non-directory
249                  * has been changed by a non-root user, remove the
250                  * setgid bit UNLESS there is no group execute bit
251                  * (this would be a file marked for mandatory
252                  * locking).  19981026 David C Niemi <niemi@tux.org>
253                  *
254                  * Removed the fsuid check (see the comment above) --
255                  * 19990830 SD.
256                  */
257                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
258                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
259                         attr->ia_mode &= ~S_ISGID;
260                         attr->ia_valid |= ATTR_MODE;
261                 }
262         } else if (ia_valid & ATTR_MODE) {
263                 int mode = attr->ia_mode;
264                 /* chmod */
265                 if (attr->ia_mode == (mode_t) -1)
266                         attr->ia_mode = inode->i_mode;
267                 attr->ia_mode =
268                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
269         }
270         RETURN(0);
271 }
272
273 void mds_steal_ack_locks(struct ptlrpc_request *req)
274 {
275         struct obd_export         *exp = req->rq_export;
276         struct list_head          *tmp;
277         struct ptlrpc_reply_state *oldrep;
278         struct ptlrpc_service     *svc;
279         unsigned long              flags;
280         int                        i;
281
282         /* CAVEAT EMPTOR: spinlock order */
283         spin_lock_irqsave (&exp->exp_lock, flags);
284         list_for_each (tmp, &exp->exp_outstanding_replies) {
285                 oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
286
287                 if (oldrep->rs_xid != req->rq_xid)
288                         continue;
289
290                 if (oldrep->rs_msg.opc != req->rq_reqmsg->opc)
291                         CERROR ("Resent req xid "LPX64" has mismatched opc: "
292                                 "new %d old %d\n", req->rq_xid,
293                                 req->rq_reqmsg->opc, oldrep->rs_msg.opc);
294
295                 svc = oldrep->rs_srv_ni->sni_service;
296                 spin_lock (&svc->srv_lock);
297
298                 list_del_init (&oldrep->rs_exp_list);
299
300                 CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
301                       " o%d NID"LPX64"\n",
302                       oldrep->rs_nlocks, oldrep, 
303                       oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
304                       exp->exp_connection->c_peer.peer_nid);
305
306                 for (i = 0; i < oldrep->rs_nlocks; i++)
307                         ptlrpc_save_lock(req, 
308                                          &oldrep->rs_locks[i],
309                                          oldrep->rs_modes[i]);
310                 oldrep->rs_nlocks = 0;
311
312                 DEBUG_REQ(D_HA, req, "stole locks for");
313                 ptlrpc_schedule_difficult_reply (oldrep);
314
315                 spin_unlock (&svc->srv_lock);
316                 spin_unlock_irqrestore (&exp->exp_lock, flags);
317                 return;
318         }
319         spin_unlock_irqrestore (&exp->exp_lock, flags);
320 }
321
322 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
323 {
324         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
325                   mcd->mcd_last_transno, mcd->mcd_last_result);
326         req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
327         req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
328
329         mds_steal_ack_locks(req);
330 }
331
332 static void reconstruct_reint_setattr(struct mds_update_record *rec,
333                                       int offset, struct ptlrpc_request *req)
334 {
335         struct mds_export_data *med = &req->rq_export->exp_mds_data;
336         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
337         struct dentry *de;
338         struct mds_body *body;
339
340         mds_req_from_mcd(req, med->med_mcd);
341
342         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
343         if (IS_ERR(de)) {
344                 LASSERT(PTR_ERR(de) == req->rq_status);
345                 return;
346         }
347
348         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
349         mds_pack_inode2fid(&body->fid1, de->d_inode);
350         mds_pack_inode2body(body, de->d_inode);
351
352         /* Don't return OST-specific attributes if we didn't just set them */
353         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
354                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
355         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
356                 body->valid |= OBD_MD_FLMTIME;
357         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
358                 body->valid |= OBD_MD_FLATIME;
359
360         l_dput(de);
361 }
362
363 /* In the raw-setattr case, we lock the child inode.
364  * In the write-back case or if being called from open, the client holds a lock
365  * already.
366  *
367  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
368 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
369                              struct ptlrpc_request *req,
370                              struct lustre_handle *lh)
371 {
372         struct mds_obd *mds = mds_req2mds(req);
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct mds_body *body;
375         struct dentry *de;
376         struct inode *inode = NULL;
377         struct lustre_handle lockh;
378         void *handle = NULL;
379         struct mds_logcancel_data *mlcd = NULL;
380         int rc = 0, cleanup_phase = 0, err, locked = 0;
381         ENTRY;
382
383         LASSERT(offset == 0);
384
385         DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
386                   rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
387
388         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
389
390         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
391                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
392                 if (IS_ERR(de))
393                         GOTO(cleanup, rc = PTR_ERR(de));
394         } else {
395                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
396                                            &lockh, NULL, 0);
397                 if (IS_ERR(de))
398                         GOTO(cleanup, rc = PTR_ERR(de));
399                 locked = 1;
400         }
401
402         cleanup_phase = 1;
403         inode = de->d_inode;
404         LASSERT(inode);
405         if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
406                 down(&inode->i_sem);
407
408         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
409
410         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
411         if (IS_ERR(handle))
412                 GOTO(cleanup, rc = PTR_ERR(handle));
413
414         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
415                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
416                        LTIME_S(rec->ur_iattr.ia_mtime),
417                        LTIME_S(rec->ur_iattr.ia_ctime));
418         rc = mds_fix_attr(inode, rec);
419         if (rc)
420                 GOTO(cleanup, rc);
421
422         if (rec->ur_iattr.ia_valid & ATTR_ATTR_FLAG)    /* ioctl */
423                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_SETFLAGS,
424                                       (long)&rec->ur_iattr.ia_attr_flags);
425         else                                            /* setattr */
426                 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
427
428         if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
429                 rc = fsfilt_set_md(obd, inode, handle,
430                                    rec->ur_eadata, rec->ur_eadatalen);
431         }
432
433         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
434         mds_pack_inode2fid(&body->fid1, inode);
435         mds_pack_inode2body(body, inode);
436
437         /* Don't return OST-specific attributes if we didn't just set them */
438         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
439                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
440         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
441                 body->valid |= OBD_MD_FLMTIME;
442         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
443                 body->valid |= OBD_MD_FLATIME;
444
445         if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
446                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
447                           rec->ur_eadatalen);
448                 if (mlcd) {
449                         mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
450                                 rec->ur_eadatalen;
451                         mlcd->mlcd_eadatalen = rec->ur_eadatalen;
452                         mlcd->mlcd_cookielen = rec->ur_cookielen;
453                         mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
454                                 mlcd->mlcd_cookielen;
455                         memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
456                                mlcd->mlcd_cookielen);
457                         memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
458                                mlcd->mlcd_eadatalen);
459                 } else {
460                         CERROR("unable to allocate log cancel data\n");
461                 }
462         }
463         EXIT;
464  cleanup:
465         if (mlcd != NULL)
466                 fsfilt_add_journal_cb(req->rq_export->exp_obd, 0, handle,
467                                       mds_cancel_cookies_cb, mlcd);
468         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
469         switch (cleanup_phase) {
470         case 1:
471                 if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
472                         up(&inode->i_sem);
473                 l_dput(de);
474                 if (locked) {
475                         if (rc) {
476                                 ldlm_lock_decref(&lockh, LCK_PW);
477                         } else {
478                                 ptlrpc_save_lock (req, &lockh, LCK_PW);
479                         }
480                 }
481         case 0:
482                 break;
483         default:
484                 LBUG();
485         }
486         if (err && !rc)
487                 rc = err;
488
489         req->rq_status = rc;
490         return 0;
491 }
492
493 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
494                                      struct ptlrpc_request *req)
495 {
496         struct mds_export_data *med = &req->rq_export->exp_mds_data;
497         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
498         struct dentry *parent, *child;
499         struct mds_body *body;
500
501         mds_req_from_mcd(req, med->med_mcd);
502
503         if (req->rq_status)
504                 return;
505
506         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
507         LASSERT(!IS_ERR(parent));
508         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
509         LASSERT(!IS_ERR(child));
510         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
511         mds_pack_inode2fid(&body->fid1, child->d_inode);
512         mds_pack_inode2body(body, child->d_inode);
513         l_dput(parent);
514         l_dput(child);
515 }
516
517 static int mds_reint_create(struct mds_update_record *rec, int offset,
518                             struct ptlrpc_request *req,
519                             struct lustre_handle *lh)
520 {
521         struct dentry *dparent = NULL;
522         struct mds_obd *mds = mds_req2mds(req);
523         struct obd_device *obd = req->rq_export->exp_obd;
524         struct dentry *dchild = NULL;
525         struct inode *dir = NULL;
526         void *handle = NULL;
527         struct lustre_handle lockh;
528         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
529         int created = 0;
530         struct dentry_params dp;
531         ENTRY;
532
533         LASSERT(offset == 0);
534         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
535
536         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
537                   rec->ur_fid1->id, rec->ur_fid1->generation,
538                   rec->ur_name, rec->ur_mode);
539
540         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
541
542         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
543                 GOTO(cleanup, rc = -ESTALE);
544
545         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh,
546                                         rec->ur_name, rec->ur_namelen - 1);
547         if (IS_ERR(dparent)) {
548                 rc = PTR_ERR(dparent);
549                 CERROR("parent lookup error %d\n", rc);
550                 GOTO(cleanup, rc);
551         }
552         cleanup_phase = 1; /* locked parent dentry */
553         dir = dparent->d_inode;
554         LASSERT(dir);
555
556         ldlm_lock_dump_handle(D_OTHER, &lockh);
557
558         dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
559         if (IS_ERR(dchild)) {
560                 rc = PTR_ERR(dchild);
561                 CERROR("child lookup error %d\n", rc);
562                 GOTO(cleanup, rc);
563         }
564
565         cleanup_phase = 2; /* child dentry */
566
567         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb);
568
569         if (dir->i_mode & S_ISGID) {
570                 if (S_ISDIR(rec->ur_mode))
571                         rec->ur_mode |= S_ISGID;
572         }
573
574         dchild->d_fsdata = (void *)&dp;
575         dp.p_inum = (unsigned long)rec->ur_fid2->id;
576         dp.p_ptr = req;
577
578         switch (type) {
579         case S_IFREG:{
580                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
581                 if (IS_ERR(handle))
582                         GOTO(cleanup, rc = PTR_ERR(handle));
583                 rc = ll_vfs_create(dir, dchild, rec->ur_mode, NULL);
584                 EXIT;
585                 break;
586         }
587         case S_IFDIR:{
588                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
589                 if (IS_ERR(handle))
590                         GOTO(cleanup, rc = PTR_ERR(handle));
591                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
592                 EXIT;
593                 break;
594         }
595         case S_IFLNK:{
596                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
597                 if (IS_ERR(handle))
598                         GOTO(cleanup, rc = PTR_ERR(handle));
599                 if (rec->ur_tgt == NULL)        /* no target supplied */
600                         rc = -EINVAL;           /* -EPROTO? */
601                 else
602                         rc = vfs_symlink(dir, dchild, rec->ur_tgt);
603                 EXIT;
604                 break;
605         }
606         case S_IFCHR:
607         case S_IFBLK:
608         case S_IFIFO:
609         case S_IFSOCK:{
610                 int rdev = rec->ur_rdev;
611                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
612                 if (IS_ERR(handle))
613                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
614                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
615                 EXIT;
616                 break;
617         }
618         default:
619                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
620                 dchild->d_fsdata = NULL;
621                 GOTO(cleanup, rc = -EINVAL);
622         }
623
624         /* In case we stored the desired inum in here, we want to clean up. */
625         if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
626                 dchild->d_fsdata = NULL;
627
628         if (rc) {
629                 CDEBUG(D_INODE, "error during create: %d\n", rc);
630                 GOTO(cleanup, rc);
631         } else {
632                 struct iattr iattr;
633                 struct inode *inode = dchild->d_inode;
634                 struct mds_body *body;
635
636                 created = 1;
637                 LTIME_S(iattr.ia_atime) = rec->ur_time;
638                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
639                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
640                 iattr.ia_uid = rec->ur_fsuid;
641                 if (dir->i_mode & S_ISGID)
642                         iattr.ia_gid = dir->i_gid;
643                 else
644                         iattr.ia_gid = rec->ur_fsgid;
645                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
646                         ATTR_MTIME | ATTR_CTIME;
647
648                 if (rec->ur_fid2->id) {
649                         LASSERT(rec->ur_fid2->id == inode->i_ino);
650                         inode->i_generation = rec->ur_fid2->generation;
651                         /* Dirtied and committed by the upcoming setattr. */
652                         CDEBUG(D_INODE, "recreated ino %lu with gen %u\n",
653                                inode->i_ino, inode->i_generation);
654                 } else {
655                         struct lustre_handle child_ino_lockh;
656                         struct ldlm_res_id child_res_id =
657                              { .name = { inode->i_ino, 0 } };
658                         int lock_flags = 0;
659
660                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
661                                inode->i_ino, inode->i_generation);
662
663                         /* The inode we were allocated may have just been freed
664                          * by an unlink operation.  We take this lock to
665                          * synchronize against the matching reply-ack-lock taken
666                          * in unlink, to avoid replay problems if this reply
667                          * makes it out to the client but the unlink's does not.
668                          * See bug 2029 for more detail.*/
669                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
670                                               child_res_id, LDLM_PLAIN, NULL,
671                                               LCK_EX, &lock_flags,
672                                               mds_blocking_ast,
673                                               ldlm_completion_ast, NULL, NULL,
674                                               NULL, 0, NULL, &child_ino_lockh);
675                         if (rc != ELDLM_OK) {
676                                 CERROR("error locking for unlink/create sync: "
677                                        "%d\n", rc);
678                         } else {
679                                 ldlm_lock_decref(&child_ino_lockh, LCK_EX);
680                         }
681                 }
682
683                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
684                 if (rc)
685                         CERROR("error on child setattr: rc = %d\n", rc);
686
687                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
688                 rc = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
689                 if (rc)
690                         CERROR("error on parent setattr: rc = %d\n", rc);
691
692                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
693                 mds_pack_inode2fid(&body->fid1, inode);
694                 mds_pack_inode2body(body, inode);
695         }
696         EXIT;
697
698 cleanup:
699         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
700
701         if (rc && created) {
702                 /* Destroy the file we just created.  This should not need
703                  * extra journal credits, as we have already modified all of
704                  * the blocks needed in order to create the file in the first
705                  * place.
706                  */
707                 switch (type) {
708                 case S_IFDIR:
709                         err = vfs_rmdir(dir, dchild);
710                         if (err)
711                                 CERROR("rmdir in error path: %d\n", err);
712                         break;
713                 default:
714                         err = vfs_unlink(dir, dchild);
715                         if (err)
716                                 CERROR("unlink in error path: %d\n", err);
717                         break;
718                 }
719         } else {
720                 rc = err;
721         }
722         switch (cleanup_phase) {
723         case 2: /* child dentry */
724                 l_dput(dchild);
725         case 1: /* locked parent dentry */
726                 if (rc) {
727                         ldlm_lock_decref(&lockh, LCK_PW);
728                 } else {
729                         ptlrpc_save_lock (req, &lockh, LCK_PW);
730                 }
731                 l_dput(dparent);
732         case 0:
733                 break;
734         default:
735                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
736                 LBUG();
737         }
738         req->rq_status = rc;
739         return 0;
740 }
741
742 int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
743 {
744         int i;
745
746         for (i = 0; i < RES_NAME_SIZE; i++) {
747                 /* return 1 here, because enqueue_ordered will skip resources
748                  * of all zeroes if they're sorted to the end of the list. */
749                 if (res1->name[i] == 0 && res2->name[i] != 0)
750                         return 1;
751                 if (res2->name[i] == 0 && res1->name[i] != 0)
752                         return 0;
753
754                 if (res1->name[i] > res2->name[i])
755                         return 1;
756                 if (res1->name[i] < res2->name[i])
757                         return 0;
758         }
759         return 0;
760 }
761
762 /* This function doesn't use ldlm_match_or_enqueue because we're always called
763  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
764  * because they take the place of local semaphores.
765  *
766  * One or two locks are taken in numerical order.  A res_id->name[0] of 0 means
767  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
768 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
769                           struct lustre_handle *p1_lockh, int p1_lock_mode,
770                           struct ldlm_res_id *p2_res_id,
771                           struct lustre_handle *p2_lockh, int p2_lock_mode)
772 {
773         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
774         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
775         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
776         int rc, flags;
777         ENTRY;
778
779         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
780
781         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
782                res_id[0]->name[0], res_id[1]->name[0]);
783
784         if (res_gt(p1_res_id, p2_res_id)) {
785                 handles[1] = p1_lockh;
786                 handles[0] = p2_lockh;
787                 res_id[1] = p1_res_id;
788                 res_id[0] = p2_res_id;
789                 lock_modes[1] = p1_lock_mode;
790                 lock_modes[0] = p2_lock_mode;
791         }
792
793         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
794                res_id[0]->name[0], res_id[1]->name[0]);
795
796         flags = LDLM_FL_LOCAL_ONLY;
797         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
798                               LDLM_PLAIN, NULL, lock_modes[0], &flags,
799                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
800                               NULL, 0, NULL, handles[0]);
801         if (rc != ELDLM_OK)
802                 RETURN(-EIO);
803         ldlm_lock_dump_handle(D_OTHER, handles[0]);
804
805         if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
806                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
807                 ldlm_lock_addref(handles[1], lock_modes[1]);
808         } else if (res_id[1]->name[0] != 0) {
809                 flags = LDLM_FL_LOCAL_ONLY;
810                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
811                                       *res_id[1], LDLM_PLAIN, NULL,
812                                       lock_modes[1], &flags, mds_blocking_ast,
813                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
814                                       NULL, handles[1]);
815                 if (rc != ELDLM_OK) {
816                         ldlm_lock_decref(handles[0], lock_modes[0]);
817                         RETURN(-EIO);
818                 }
819                 ldlm_lock_dump_handle(D_OTHER, handles[1]);
820         }
821
822         RETURN(0);
823 }
824
825 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
826                            struct lustre_handle *p1_lockh, int p1_lock_mode,
827                            struct ldlm_res_id *p2_res_id,
828                            struct lustre_handle *p2_lockh, int p2_lock_mode,
829                            struct ldlm_res_id *c1_res_id,
830                            struct lustre_handle *c1_lockh, int c1_lock_mode,
831                            struct ldlm_res_id *c2_res_id,
832                            struct lustre_handle *c2_lockh, int c2_lock_mode)
833 {
834         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
835                                           c1_res_id, c2_res_id };
836         struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
837                                                  c1_lockh, c2_lockh };
838         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
839                               c1_lock_mode, c2_lock_mode };
840         int rc, i, j, sorted, flags;
841         ENTRY;
842
843         CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
844                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
845                res_id[3]->name[0]);
846
847         /* simple insertion sort - we have at most 4 elements */
848         for (i = 1; i < 4; i++) {
849                 j = i - 1;
850                 dlm_handles[4] = dlm_handles[i];
851                 res_id[4] = res_id[i];
852                 lock_modes[4] = lock_modes[i];
853
854                 sorted = 0;
855                 do {
856                         if (res_gt(res_id[j], res_id[4])) {
857                                 dlm_handles[j + 1] = dlm_handles[j];
858                                 res_id[j + 1] = res_id[j];
859                                 lock_modes[j + 1] = lock_modes[j];
860                                 j--;
861                         } else {
862                                 sorted = 1;
863                         }
864                 } while (j >= 0 && !sorted);
865
866                 dlm_handles[j + 1] = dlm_handles[4];
867                 res_id[j + 1] = res_id[4];
868                 lock_modes[j + 1] = lock_modes[4];
869         }
870
871         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
872                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
873                res_id[3]->name[0]);
874
875         /* XXX we could send ASTs on all these locks first before blocking? */
876         for (i = 0; i < 4; i++) {
877                 flags = 0;
878                 if (res_id[i]->name[0] == 0)
879                         break;
880                 if (i != 0 &&
881                     memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
882                         memcpy(dlm_handles[i], dlm_handles[i-1],
883                                sizeof(*(dlm_handles[i])));
884                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
885                 } else {
886                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
887                                               *res_id[i], LDLM_PLAIN, NULL,
888                                               lock_modes[i], &flags,
889                                               mds_blocking_ast,
890                                               ldlm_completion_ast, NULL, NULL,
891                                               NULL, 0, NULL, dlm_handles[i]);
892                         if (rc != ELDLM_OK)
893                                 GOTO(out_err, rc = -EIO);
894                         ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
895                 }
896         }
897
898         RETURN(0);
899 out_err:
900         while (i-- > 0)
901                 ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
902
903         return rc;
904 }
905
906 /* In the unlikely case that the child changed while we were waiting
907  * on the lock, we need to drop the lock on the old child and either:
908  * - if the child has a lower resource name, then we have to also
909  *   drop the parent lock and regain the locks in the right order
910  * - in the rename case, if the child has a lower resource name than one of
911  *   the other parent/child resources (maxres) we also need to reget the locks
912  * - if the child has a higher resource name (this is the common case)
913  *   we can just get the lock on the new child (still in lock order)
914  *
915  * Returns 0 if the child did not change or if it changed but could be locked.
916  * Returns 1 if the child changed and we need to re-lock (no locks held).
917  * Returns -ve error with a valid dchild (no locks held). */
918 static int mds_verify_child(struct obd_device *obd,
919                             struct ldlm_res_id *parent_res_id,
920                             struct lustre_handle *parent_lockh,
921                             struct dentry *dparent, int parent_mode,
922                             struct ldlm_res_id *child_res_id,
923                             struct lustre_handle *child_lockh,
924                             struct dentry **dchildp, int child_mode,
925                             const char *name, int namelen,
926                             struct ldlm_res_id *maxres)
927 {
928         struct dentry *vchild, *dchild = *dchildp;
929         int rc = 0, cleanup_phase = 2; /* parent, child locks */
930         ENTRY;
931
932         vchild = ll_lookup_one_len(name, dparent, namelen - 1);
933         if (IS_ERR(vchild))
934                 GOTO(cleanup, rc = PTR_ERR(vchild));
935
936         if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
937                    (vchild->d_inode != NULL &&
938                     child_res_id->name[0] == vchild->d_inode->i_ino &&
939                     child_res_id->name[1] == vchild->d_inode->i_generation))) {
940                 if (dchild != NULL)
941                         l_dput(dchild);
942                 *dchildp = vchild;
943
944                 RETURN(0);
945         }
946
947         CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
948                vchild->d_inode, dchild ? dchild->d_inode : 0,
949                vchild->d_inode ? vchild->d_inode->i_ino : 0,
950                child_res_id->name[0]);
951         if (child_res_id->name[0] != 0)
952                 ldlm_lock_decref(child_lockh, child_mode);
953         if (dchild)
954                 l_dput(dchild);
955
956         cleanup_phase = 1; /* parent lock only */
957         *dchildp = dchild = vchild;
958
959         if (dchild->d_inode) {
960                 int flags = 0;
961                 child_res_id->name[0] = dchild->d_inode->i_ino;
962                 child_res_id->name[1] = dchild->d_inode->i_generation;
963
964                 if (res_gt(parent_res_id, child_res_id) ||
965                     res_gt(maxres, child_res_id)) {
966                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
967                                child_res_id->name[0], parent_res_id->name[0],
968                                maxres->name[0]);
969                         GOTO(cleanup, rc = 1);
970                 }
971
972                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
973                                       *child_res_id, LDLM_PLAIN, NULL,
974                                       child_mode, &flags, mds_blocking_ast,
975                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
976                                       NULL, child_lockh);
977                 if (rc != ELDLM_OK)
978                         GOTO(cleanup, rc = -EIO);
979         } else {
980                 memset(child_res_id, 0, sizeof(*child_res_id));
981         }
982
983         EXIT;
984 cleanup:
985         if (rc) {
986                 switch(cleanup_phase) {
987                 case 2:
988                         if (child_res_id->name[0] != 0)
989                                 ldlm_lock_decref(child_lockh, child_mode);
990                 case 1:
991                         ldlm_lock_decref(parent_lockh, parent_mode);
992                 }
993         }
994         return rc;
995 }
996
997 int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
998                                 struct ll_fid *fid,
999                                 struct lustre_handle *parent_lockh,
1000                                 struct dentry **dparentp, int parent_mode,
1001                                 char *name, int namelen,
1002                                 struct lustre_handle *child_lockh,
1003                                 struct dentry **dchildp, int child_mode)
1004 {
1005         struct ldlm_res_id child_res_id = { .name = {0} };
1006         struct ldlm_res_id parent_res_id = { .name = {0} };
1007         struct inode *inode;
1008         int rc = 0, cleanup_phase = 0;
1009         ENTRY;
1010
1011         /* Step 1: Lookup parent */
1012         *dparentp = mds_fid2dentry(mds, fid, NULL);
1013         if (IS_ERR(*dparentp))
1014                 RETURN(rc = PTR_ERR(*dparentp));
1015         LASSERT((*dparentp)->d_inode);
1016
1017         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
1018                (*dparentp)->d_inode->i_ino, name);
1019
1020         parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
1021         parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
1022
1023         cleanup_phase = 1; /* parent dentry */
1024
1025         /* Step 2: Lookup child (without DLM lock, to get resource name) */
1026         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
1027         if (IS_ERR(*dchildp)) {
1028                 rc = PTR_ERR(*dchildp);
1029                 CDEBUG(D_INODE, "child lookup error %d\n", rc);
1030                 GOTO(cleanup, rc);
1031         }
1032
1033         inode = (*dchildp)->d_inode;
1034         if (inode != NULL)
1035                 inode = igrab(inode);
1036         if (inode == NULL)
1037                 goto retry_locks;
1038
1039         child_res_id.name[0] = inode->i_ino;
1040         child_res_id.name[1] = inode->i_generation;
1041         iput(inode);
1042
1043 retry_locks:
1044         cleanup_phase = 2; /* child dentry */
1045
1046         /* Step 3: Lock parent and child in resource order.  If child doesn't
1047          *         exist, we still have to lock the parent and re-lookup. */
1048         rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
1049                                    &child_res_id, child_lockh, child_mode);
1050         if (rc)
1051                 GOTO(cleanup, rc);
1052
1053         if (!(*dchildp)->d_inode)
1054                 cleanup_phase = 3; /* parent lock */
1055         else
1056                 cleanup_phase = 4; /* child lock */
1057
1058         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
1059         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
1060                               parent_mode, &child_res_id, child_lockh, dchildp,
1061                               child_mode, name, namelen, &parent_res_id);
1062         if (rc > 0)
1063                 goto retry_locks;
1064         if (rc < 0) {
1065                 cleanup_phase = 3;
1066                 GOTO(cleanup, rc);
1067         }
1068
1069 cleanup:
1070         if (rc) {
1071                 switch (cleanup_phase) {
1072                 case 4:
1073                         ldlm_lock_decref(child_lockh, child_mode);
1074                 case 3:
1075                         ldlm_lock_decref(parent_lockh, parent_mode);
1076                 case 2:
1077                         l_dput(*dchildp);
1078                 case 1:
1079                         l_dput(*dparentp);
1080                 default: ;
1081                 }
1082         }
1083         return rc;
1084 }
1085
1086 void mds_reconstruct_generic(struct ptlrpc_request *req)
1087 {
1088         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1089
1090         mds_req_from_mcd(req, med->med_mcd);
1091 }
1092
1093 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
1094                             struct ptlrpc_request *req,
1095                             struct lustre_handle *lh)
1096 {
1097         struct dentry *dparent, *dchild;
1098         struct mds_obd *mds = mds_req2mds(req);
1099         struct obd_device *obd = req->rq_export->exp_obd;
1100         struct mds_body *body = NULL;
1101         struct inode *child_inode;
1102         struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh;
1103         void *handle = NULL;
1104         int rc = 0, log_unlink = 0, cleanup_phase = 0;
1105         ENTRY;
1106
1107         LASSERT(offset == 0 || offset == 2);
1108
1109         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
1110                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
1111
1112         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1113
1114         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1115                 GOTO(cleanup, rc = -ENOENT);
1116
1117         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
1118                                          &parent_lockh, &dparent, LCK_PW,
1119                                          rec->ur_name, rec->ur_namelen,
1120                                          &child_lockh, &dchild, LCK_EX);
1121         if (rc)
1122                 GOTO(cleanup, rc);
1123
1124         cleanup_phase = 1; /* dchild, dparent, locks */
1125
1126         child_inode = dchild->d_inode;
1127         if (child_inode == NULL) {
1128                 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
1129                        dparent->d_inode->i_ino, rec->ur_name);
1130                 GOTO(cleanup, rc = -ENOENT);
1131         }
1132
1133         cleanup_phase = 2; /* dchild has a lock */
1134
1135         /* Step 4: Get a lock on the ino to sync with creation WRT inode
1136          * reuse (see bug 2029). */
1137         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
1138         if (rc != ELDLM_OK)
1139                 GOTO(cleanup, rc);
1140
1141         cleanup_phase = 3; /* child inum lock */
1142
1143         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
1144
1145         /* ldlm_reply in buf[0] if called via intent */
1146         if (offset)
1147                 offset = 1;
1148
1149         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
1150         LASSERT(body != NULL);
1151
1152         /* If this is the last reference to this inode, get the OBD EA
1153          * data first so the client can destroy OST objects.
1154          * we only do the object removal if no open files remain.
1155          * Nobody can get at this name anymore because of the locks so
1156          * we make decisions here as to whether to remove the inode */
1157         if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
1158             mds_open_orphan_count(child_inode) == 0) {
1159                 mds_pack_inode2fid(&body->fid1, child_inode);
1160                 mds_pack_inode2body(body, child_inode);
1161                 mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
1162                             child_inode, 1);
1163                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1164                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1165                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1166                 } else {
1167                         log_unlink = 1;
1168                 }
1169         }
1170
1171         /* We have to do these checks ourselves, in case we are making an
1172          * orphan.  The client tells us whether rmdir() or unlink() was called,
1173          * so we need to return appropriate errors (bug 72).
1174          *
1175          * We don't have to check permissions, because vfs_rename (called from
1176          * mds_open_unlink_rename) also calls may_delete. */
1177         if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
1178                 if (!S_ISDIR(child_inode->i_mode))
1179                         GOTO(cleanup, rc = -ENOTDIR);
1180         } else {
1181                 if (S_ISDIR(child_inode->i_mode))
1182                         GOTO(cleanup, rc = -EISDIR);
1183         }
1184
1185         if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
1186             mds_open_orphan_count(child_inode) > 0) {
1187                 rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
1188                 cleanup_phase = 4; /* transaction */
1189                 GOTO(cleanup, rc);
1190         }
1191
1192         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
1193         switch (child_inode->i_mode & S_IFMT) {
1194         case S_IFDIR:
1195                 /* Drop any lingering child directories before we start our
1196                  * transaction, to avoid doing multiple inode dirty/delete
1197                  * in our compound transaction (bug 1321). */
1198                 shrink_dcache_parent(dchild);
1199                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
1200                                       NULL);
1201                 if (IS_ERR(handle))
1202                         GOTO(cleanup, rc = PTR_ERR(handle));
1203                 cleanup_phase = 4; /* transaction */
1204                 rc = vfs_rmdir(dparent->d_inode, dchild);
1205                 break;
1206         case S_IFREG: {
1207                 handle = fsfilt_start(obd, dparent->d_inode,
1208                                       FSFILT_OP_UNLINK_LOG, NULL);
1209                 if (IS_ERR(handle))
1210                         GOTO(cleanup, rc = PTR_ERR(handle));
1211
1212                 cleanup_phase = 4; /* transaction */
1213                 rc = vfs_unlink(dparent->d_inode, dchild);
1214
1215                 if (!rc && log_unlink)
1216                         if (mds_log_op_unlink(obd, child_inode,
1217                                 lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
1218                                 req->rq_repmsg->buflens[offset + 1],
1219                                 lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
1220                                 req->rq_repmsg->buflens[offset + 2]) > 0)
1221                                 body->valid |= OBD_MD_FLCOOKIE;
1222                 break;
1223         }
1224         case S_IFLNK:
1225         case S_IFCHR:
1226         case S_IFBLK:
1227         case S_IFIFO:
1228         case S_IFSOCK:
1229                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1230                                       NULL);
1231                 if (IS_ERR(handle))
1232                         GOTO(cleanup, rc = PTR_ERR(handle));
1233                 cleanup_phase = 4; /* transaction */
1234                 rc = vfs_unlink(dparent->d_inode, dchild);
1235                 break;
1236         default:
1237                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1238                        rec->ur_name);
1239                 LBUG();
1240                 GOTO(cleanup, rc = -EINVAL);
1241         }
1242
1243  cleanup:
1244         if (rc == 0) {
1245                 struct iattr iattr;
1246                 int err;
1247
1248                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
1249                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
1250                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
1251
1252                 err = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
1253                 if (err)
1254                         CERROR("error on parent setattr: rc = %d\n", err);
1255         }
1256
1257         switch(cleanup_phase) {
1258         case 4:
1259                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1260                                         rc, 0);
1261                 if (!rc)
1262                         (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
1263                                            "unlinked", 0, NULL);
1264         case 3: /* child ino-reuse lock */
1265                 if (rc && body != NULL) {
1266                         // Don't unlink the OST objects if the MDS unlink failed
1267                         body->valid = 0;
1268                 }
1269                 if (rc)
1270                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
1271                 else
1272                         ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
1273         case 2: /* child lock */
1274                 ldlm_lock_decref(&child_lockh, LCK_EX);
1275         case 1: /* child and parent dentry, parent lock */
1276                 if (rc)
1277                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1278                 else
1279                         ptlrpc_save_lock(req, &parent_lockh, LCK_PW);
1280                 l_dput(dchild);
1281                 l_dput(dparent);
1282         case 0:
1283                 break;
1284         default:
1285                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1286                 LBUG();
1287         }
1288         req->rq_status = rc;
1289         return 0;
1290 }
1291
1292 static int mds_reint_link(struct mds_update_record *rec, int offset,
1293                           struct ptlrpc_request *req,
1294                           struct lustre_handle *lh)
1295 {
1296         struct obd_device *obd = req->rq_export->exp_obd;
1297         struct dentry *de_src = NULL;
1298         struct dentry *de_tgt_dir = NULL;
1299         struct dentry *dchild = NULL;
1300         struct mds_obd *mds = mds_req2mds(req);
1301         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
1302         struct ldlm_res_id src_res_id = { .name = {0} };
1303         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
1304         int rc = 0, cleanup_phase = 0;
1305         ENTRY;
1306
1307         LASSERT(offset == 0);
1308
1309         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
1310                   rec->ur_fid1->id, rec->ur_fid1->generation,
1311                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
1312
1313         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1314
1315         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1316                 GOTO(cleanup, rc = -ENOENT);
1317
1318         /* Step 1: Lookup the source inode and target directory by FID */
1319         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1320         if (IS_ERR(de_src))
1321                 GOTO(cleanup, rc = PTR_ERR(de_src));
1322
1323         cleanup_phase = 1; /* source dentry */
1324
1325         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1326         if (IS_ERR(de_tgt_dir))
1327                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
1328
1329         cleanup_phase = 2; /* target directory dentry */
1330
1331         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
1332                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
1333                de_src->d_inode->i_ino);
1334
1335         /* Step 2: Take the two locks */
1336         src_res_id.name[0] = de_src->d_inode->i_ino;
1337         src_res_id.name[1] = de_src->d_inode->i_generation;
1338         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
1339         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
1340
1341         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
1342                                    &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
1343         if (rc)
1344                 GOTO(cleanup, rc);
1345
1346         cleanup_phase = 3; /* locks */
1347
1348         /* Step 3: Lookup the child */
1349         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
1350         if (IS_ERR(dchild)) {
1351                 rc = PTR_ERR(dchild);
1352                 if (rc != -EPERM && rc != -EACCES)
1353                         CERROR("child lookup error %d\n", rc);
1354                 GOTO(cleanup, rc);
1355         }
1356
1357         cleanup_phase = 4; /* child dentry */
1358
1359         if (dchild->d_inode) {
1360                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
1361                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
1362                 rc = -EEXIST;
1363                 GOTO(cleanup, rc);
1364         }
1365
1366         /* Step 4: Do it. */
1367         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_src->d_inode->i_sb);
1368
1369         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
1370         if (IS_ERR(handle)) {
1371                 rc = PTR_ERR(handle);
1372                 GOTO(cleanup, rc);
1373         }
1374
1375         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
1376         if (rc && rc != -EPERM && rc != -EACCES)
1377                 CERROR("vfs_link error %d\n", rc);
1378 cleanup:
1379         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
1380                                 handle, req, rc, 0);
1381         EXIT;
1382
1383         switch (cleanup_phase) {
1384         case 4: /* child dentry */
1385                 l_dput(dchild);
1386         case 3: /* locks */
1387                 if (rc) {
1388                         ldlm_lock_decref(&src_lockh, LCK_EX);
1389                         ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
1390                 } else {
1391                         ptlrpc_save_lock(req, &src_lockh, LCK_EX);
1392                         ptlrpc_save_lock(req, &tgt_dir_lockh, LCK_EX);
1393                 }
1394         case 2: /* target dentry */
1395                 l_dput(de_tgt_dir);
1396         case 1: /* source dentry */
1397                 l_dput(de_src);
1398         case 0:
1399                 break;
1400         default:
1401                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1402                 LBUG();
1403         }
1404         req->rq_status = rc;
1405         return 0;
1406 }
1407
1408 /*
1409  * add a hard link in the PENDING directory, only used by rename()
1410  */
1411 static int mds_add_link_orphan(struct mds_update_record *rec,
1412                                struct obd_device *obd,
1413                                struct dentry *dentry)
1414 {
1415         struct mds_obd *mds = &obd->u.mds;
1416         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
1417         struct dentry *pending_child;
1418         char fidname[LL_FID_NAMELEN];
1419         int fidlen = 0, rc;
1420         ENTRY;
1421
1422         LASSERT(dentry->d_inode);
1423         LASSERT(!mds_inode_is_orphan(dentry->d_inode));
1424
1425         down(&pending_dir->i_sem);
1426         fidlen = ll_fid2str(fidname, dentry->d_inode->i_ino,
1427                             dentry->d_inode->i_generation);
1428
1429         CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
1430                mds_open_orphan_count(dentry->d_inode),
1431                rec->ur_name, fidname);
1432
1433         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
1434         if (IS_ERR(pending_child))
1435                 GOTO(out_lock, rc = PTR_ERR(pending_child));
1436
1437         if (pending_child->d_inode != NULL) {
1438                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
1439                 LASSERT(pending_child->d_inode == dentry->d_inode);
1440                 GOTO(out_dput, rc = 0);
1441         }
1442
1443         lock_kernel();
1444         rc = vfs_link(dentry, pending_dir, pending_child);
1445         unlock_kernel();
1446         if (rc)
1447                 CERROR("error addlink orphan %s to PENDING: rc = %d\n",
1448                        rec->ur_name, rc);
1449         else
1450                 mds_inode_set_orphan(dentry->d_inode);
1451 out_dput:
1452         l_dput(pending_child);
1453 out_lock:
1454         up(&pending_dir->i_sem);
1455         RETURN(rc);
1456 }
1457
1458 /* The idea here is that we need to get four locks in the end:
1459  * one on each parent directory, one on each child.  We need to take
1460  * these locks in some kind of order (to avoid deadlocks), and the order
1461  * I selected is "increasing resource number" order.  We need to look up
1462  * the children, however, before we know what the resource number(s) are.
1463  * Thus the following plan:
1464  *
1465  * 1,2. Look up the parents
1466  * 3,4. Look up the children
1467  * 5. Take locks on the parents and children, in order
1468  * 6. Verify that the children haven't changed since they were looked up
1469  *
1470  * If there was a race and the children changed since they were first looked
1471  * up, it is possible that mds_verify_child() will be able to just grab the
1472  * lock on the new child resource (if it has a higher resource than any other)
1473  * but we need to compare against not only its parent, but also against the
1474  * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
1475  *
1476  * We need the fancy igrab() on the child inodes because we aren't holding a
1477  * lock on the parent after the lookup is done, so dentry->d_inode may change
1478  * at any time, and igrab() itself doesn't like getting passed a NULL argument.
1479  */
1480 static int mds_get_parents_children_locked(struct obd_device *obd,
1481                                            struct mds_obd *mds,
1482                                            struct ll_fid *p1_fid,
1483                                            struct dentry **de_srcdirp,
1484                                            struct ll_fid *p2_fid,
1485                                            struct dentry **de_tgtdirp,
1486                                            int parent_mode,
1487                                            const char *old_name, int old_len,
1488                                            struct dentry **de_oldp,
1489                                            const char *new_name, int new_len,
1490                                            struct dentry **de_newp,
1491                                            struct lustre_handle *dlm_handles,
1492                                            int child_mode)
1493 {
1494         struct ldlm_res_id p1_res_id = { .name = {0} };
1495         struct ldlm_res_id p2_res_id = { .name = {0} };
1496         struct ldlm_res_id c1_res_id = { .name = {0} };
1497         struct ldlm_res_id c2_res_id = { .name = {0} };
1498         struct ldlm_res_id *maxres_src, *maxres_tgt;
1499         struct inode *inode;
1500         int rc = 0, cleanup_phase = 0;
1501         ENTRY;
1502
1503         /* Step 1: Lookup the source directory */
1504         *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
1505         if (IS_ERR(*de_srcdirp))
1506                 GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
1507
1508         cleanup_phase = 1; /* source directory dentry */
1509
1510         p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
1511         p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
1512
1513         /* Step 2: Lookup the target directory */
1514         if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
1515                 *de_tgtdirp = dget(*de_srcdirp);
1516         } else {
1517                 *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
1518                 if (IS_ERR(*de_tgtdirp))
1519                         GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
1520         }
1521
1522         cleanup_phase = 2; /* target directory dentry */
1523
1524         p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
1525         p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
1526
1527         /* Step 3: Lookup the source child entry */
1528         *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
1529         if (IS_ERR(*de_oldp)) {
1530                 rc = PTR_ERR(*de_oldp);
1531                 CERROR("old child lookup error (%*s): %d\n",
1532                        old_len - 1, old_name, rc);
1533                 GOTO(cleanup, rc);
1534         }
1535
1536         cleanup_phase = 3; /* original name dentry */
1537
1538         inode = (*de_oldp)->d_inode;
1539         if (inode != NULL)
1540                 inode = igrab(inode);
1541         if (inode == NULL)
1542                 GOTO(cleanup, rc = -ENOENT);
1543
1544         c1_res_id.name[0] = inode->i_ino;
1545         c1_res_id.name[1] = inode->i_generation;
1546         iput(inode);
1547
1548         /* Step 4: Lookup the target child entry */
1549         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
1550         if (IS_ERR(*de_newp)) {
1551                 rc = PTR_ERR(*de_newp);
1552                 CERROR("new child lookup error (%*s): %d\n",
1553                        old_len - 1, old_name, rc);
1554                 GOTO(cleanup, rc);
1555         }
1556
1557         cleanup_phase = 4; /* target dentry */
1558
1559         inode = (*de_newp)->d_inode;
1560         if (inode != NULL)
1561                 inode = igrab(inode);
1562         if (inode == NULL)
1563                 goto retry_locks;
1564
1565         c2_res_id.name[0] = inode->i_ino;
1566         c2_res_id.name[1] = inode->i_generation;
1567         iput(inode);
1568
1569 retry_locks:
1570         /* Step 5: Take locks on the parents and child(ren) */
1571         maxres_src = &p1_res_id;
1572         maxres_tgt = &p2_res_id;
1573         cleanup_phase = 4; /* target dentry */
1574
1575         if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
1576                 maxres_src = &c1_res_id;
1577         if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
1578                 maxres_tgt = &c2_res_id;
1579
1580         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
1581                                     &p2_res_id, &dlm_handles[1], parent_mode,
1582                                     &c1_res_id, &dlm_handles[2], child_mode,
1583                                     &c2_res_id, &dlm_handles[3], child_mode);
1584         if (rc)
1585                 GOTO(cleanup, rc);
1586
1587         cleanup_phase = 6; /* parent and child(ren) locks */
1588
1589         /* Step 6a: Re-lookup source child to verify it hasn't changed */
1590         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
1591                               parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
1592                               child_mode, old_name, old_len, maxres_tgt);
1593         if (rc) {
1594                 if (c2_res_id.name[0] != 0)
1595                         ldlm_lock_decref(&dlm_handles[3], child_mode);
1596                 ldlm_lock_decref(&dlm_handles[1], parent_mode);
1597                 cleanup_phase = 4;
1598                 if (rc > 0)
1599                         goto retry_locks;
1600                 GOTO(cleanup, rc);
1601         }
1602
1603         if ((*de_oldp)->d_inode == NULL)
1604                 GOTO(cleanup, rc = -ENOENT);
1605
1606         /* Step 6b: Re-lookup target child to verify it hasn't changed */
1607         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
1608                               parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
1609                               child_mode, new_name, new_len, maxres_src);
1610         if (rc) {
1611                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1612                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
1613                 cleanup_phase = 4;
1614                 if (rc > 0)
1615                         goto retry_locks;
1616                 GOTO(cleanup, rc);
1617         }
1618
1619         EXIT;
1620 cleanup:
1621         if (rc) {
1622                 switch (cleanup_phase) {
1623                 case 6: /* child lock(s) */
1624                         if (c2_res_id.name[0] != 0)
1625                                 ldlm_lock_decref(&dlm_handles[3], child_mode);
1626                         if (c1_res_id.name[0] != 0)
1627                                 ldlm_lock_decref(&dlm_handles[2], child_mode);
1628                 case 5: /* parent locks */
1629                         ldlm_lock_decref(&dlm_handles[1], parent_mode);
1630                         ldlm_lock_decref(&dlm_handles[0], parent_mode);
1631                 case 4: /* target dentry */
1632                         l_dput(*de_newp);
1633                 case 3: /* source dentry */
1634                         l_dput(*de_oldp);
1635                 case 2: /* target directory dentry */
1636                         l_dput(*de_tgtdirp);
1637                 case 1: /* source directry dentry */
1638                         l_dput(*de_srcdirp);
1639                 }
1640         }
1641
1642         return rc;
1643 }
1644
1645 static int mds_reint_rename(struct mds_update_record *rec, int offset,
1646                             struct ptlrpc_request *req,
1647                             struct lustre_handle *lockh)
1648 {
1649         struct obd_device *obd = req->rq_export->exp_obd;
1650         struct dentry *de_srcdir = NULL;
1651         struct dentry *de_tgtdir = NULL;
1652         struct dentry *de_old = NULL;
1653         struct dentry *de_new = NULL;
1654         struct mds_obd *mds = mds_req2mds(req);
1655         struct lustre_handle dlm_handles[4];
1656         struct mds_body *body = NULL;
1657         int rc = 0, lock_count = 3;
1658         int cleanup_phase = 0;
1659         void *handle = NULL;
1660         ENTRY;
1661
1662         LASSERT(offset == 0);
1663
1664         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
1665                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
1666                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
1667
1668         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1669
1670         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
1671                                              rec->ur_fid2, &de_tgtdir, LCK_PW,
1672                                              rec->ur_name, rec->ur_namelen,
1673                                              &de_old, rec->ur_tgt,
1674                                              rec->ur_tgtlen, &de_new,
1675                                              dlm_handles, LCK_EX);
1676         if (rc)
1677                 GOTO(cleanup, rc);
1678
1679         cleanup_phase = 1; /* parent(s), children, locks */
1680
1681         if (de_new->d_inode)
1682                 lock_count = 4;
1683
1684         /* sanity check for src inode */
1685         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1686             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1687                 GOTO(cleanup, rc = -EINVAL);
1688
1689         /* sanity check for dest inode */
1690         if (de_new->d_inode &&
1691             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1692              de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1693                 GOTO(cleanup, rc = -EINVAL);
1694
1695         if (de_old->d_inode == de_new->d_inode) {
1696                 GOTO(cleanup, rc = 0);
1697         }
1698
1699         /* if we are about to remove the target at first, pass the EA of
1700          * that inode to client to perform and cleanup on OST */
1701         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
1702         LASSERT(body != NULL);
1703
1704         if (de_new->d_inode &&
1705             S_ISREG(de_new->d_inode->i_mode) &&
1706             de_new->d_inode->i_nlink == 1 &&
1707             mds_open_orphan_count(de_new->d_inode) == 0) {
1708                 mds_pack_inode2fid(&body->fid1, de_new->d_inode);
1709                 mds_pack_inode2body(body, de_new->d_inode);
1710                 mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
1711                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1712                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1713                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1714                 } else {
1715                         /* XXX need log unlink? */
1716                 }
1717         }
1718
1719         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1720                        de_srcdir->d_inode->i_sb);
1721
1722         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
1723         if (IS_ERR(handle))
1724                 GOTO(cleanup, rc = PTR_ERR(handle));
1725
1726         /* FIXME need adjust the journal block count? */
1727         /* if the target should be moved to PENDING, we at first increase the
1728          * link and later vfs_rename() will decrease the link count again */
1729         if (de_new->d_inode &&
1730             S_ISREG(de_new->d_inode->i_mode) &&
1731             de_new->d_inode->i_nlink == 1 &&
1732             mds_open_orphan_count(de_new->d_inode) > 0) {
1733                 rc = mds_add_link_orphan(rec, obd, de_new);
1734                 if (rc)
1735                         GOTO(cleanup, rc);
1736         }
1737
1738         lock_kernel();
1739         de_old->d_fsdata = req;
1740         de_new->d_fsdata = req;
1741         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
1742         unlock_kernel();
1743
1744         GOTO(cleanup, rc);
1745 cleanup:
1746         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1747                                 handle, req, rc, 0);
1748         switch (cleanup_phase) {
1749         case 1:
1750                 if (rc) {
1751                         if (lock_count == 4)
1752                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1753                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1754                         ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
1755                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
1756                 } else {
1757                         if (lock_count == 4)
1758                                 ptlrpc_save_lock(req,
1759                                               &(dlm_handles[3]), LCK_EX);
1760                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
1761                         ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
1762                         ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
1763                 }
1764                 l_dput(de_new);
1765                 l_dput(de_old);
1766                 l_dput(de_tgtdir);
1767                 l_dput(de_srcdir);
1768         case 0:
1769                 break;
1770         default:
1771                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1772                 LBUG();
1773         }
1774         req->rq_status = rc;
1775         return 0;
1776 }
1777
1778 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1779                            struct ptlrpc_request *, struct lustre_handle *);
1780
1781 static mds_reinter reinters[REINT_MAX + 1] = {
1782         [REINT_SETATTR] mds_reint_setattr,
1783         [REINT_CREATE] mds_reint_create,
1784         [REINT_LINK] mds_reint_link,
1785         [REINT_UNLINK] mds_reint_unlink,
1786         [REINT_RENAME] mds_reint_rename,
1787         [REINT_OPEN] mds_open
1788 };
1789
1790 int mds_reint_rec(struct mds_update_record *rec, int offset,
1791                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1792 {
1793         struct obd_device *obd = req->rq_export->exp_obd;
1794         struct obd_run_ctxt saved;
1795         int rc;
1796         ENTRY;
1797
1798         /* checked by unpacker */
1799         LASSERT(rec->ur_opcode <= REINT_MAX &&
1800                 reinters[rec->ur_opcode] != NULL);
1801
1802         push_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1803         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1804         pop_ctxt(&saved, &obd->obd_ctxt, &rec->ur_uc);
1805
1806         RETURN(rc);
1807 }