Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/fs.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_fsfilt.h>
40
41 #include "mds_internal.h"
42
43 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
44                    int error)
45 {
46         obd_transno_commit_cb(obd, transno, error);
47 }
48
49 struct mds_logcancel_data {
50         struct lov_mds_md      *mlcd_lmm;
51         int                     mlcd_size;
52         int                     mlcd_cookielen;
53         int                     mlcd_eadatalen;
54         struct llog_cookie      mlcd_cookies[0];
55 };
56
57 /* Establish a connection to the OSC when we first need it.  We don't do
58  * this during MDS setup because that would introduce setup ordering issues. */
59 static int mds_osc_connect(struct obd_device *obd, struct mds_obd *mds)
60 {
61         int rc;
62         ENTRY;
63
64         if (IS_ERR(mds->mds_osc_obd))
65                 RETURN(PTR_ERR(mds->mds_osc_obd));
66
67         if (mds->mds_osc_obd)
68                 RETURN(0);
69
70         mds->mds_osc_obd = class_uuid2obd(&mds->mds_osc_uuid);
71         if (!mds->mds_osc_obd) {
72                 CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
73                        mds->mds_osc_uuid.uuid);
74                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
75                 RETURN(-ENOTCONN);
76         }
77
78         rc = obd_connect(&mds->mds_osc_conn, mds->mds_osc_obd, &obd->obd_uuid);
79         if (rc) {
80                 CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
81                        mds->mds_osc_uuid.uuid);
82                 mds->mds_osc_obd = ERR_PTR(rc);
83                 RETURN(rc);
84         }
85
86         rc = obd_set_info(&mds->mds_osc_conn, strlen("mds_conn"), "mds_conn",
87                           0, NULL);
88         RETURN(rc);
89 }
90
91 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
92                                   void *cb_data, int error)
93 {
94         struct mds_logcancel_data *mlcd = cb_data;
95         struct lov_stripe_md *lsm = NULL;
96         int rc;
97
98         obd_transno_commit_cb(obd, transno, error);
99
100         CDEBUG(D_HA, "cancelling %d cookies\n",
101                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
102
103         rc = obd_unpackmd(&obd->u.mds.mds_osc_conn, &lsm, mlcd->mlcd_lmm,
104                           mlcd->mlcd_eadatalen);
105         if (rc < 0) {
106                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
107                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
108                        rc);
109         } else {
110                 rc = obd_log_cancel(&obd->u.mds.mds_osc_conn, lsm,
111                                     mlcd->mlcd_cookielen /
112                                     sizeof(*mlcd->mlcd_cookies),
113                                     mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
114                 ///* XXX 0 normally, SENDNOW for debug */);
115                 if (rc)
116                         CERROR("error cancelling %d log cookies: rc %d\n",
117                                (int)(mlcd->mlcd_cookielen /
118                                      sizeof(*mlcd->mlcd_cookies)), rc);
119         }
120
121         OBD_FREE(mlcd, mlcd->mlcd_size);
122 }
123
124 /* Assumes caller has already pushed us into the kernel context. */
125 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
126                        struct ptlrpc_request *req, int rc, __u32 op_data)
127 {
128         struct mds_export_data *med = &req->rq_export->exp_mds_data;
129         struct mds_client_data *mcd = med->med_mcd;
130         struct obd_device *obd = req->rq_export->exp_obd;
131         int err;
132         __u64 transno;
133         loff_t off;
134         ssize_t written;
135         ENTRY;
136
137         /* if the export has already been failed, we have no last_rcvd slot */
138         if (req->rq_export->exp_failed) {
139                 CERROR("committing transaction for disconnected client\n");
140                 if (handle)
141                         GOTO(commit, rc);
142                 GOTO(out, rc);
143         }
144
145         if (!handle) {
146                 /* if we're starting our own xaction, use our own inode */
147                 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
148                 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
149                 if (IS_ERR(handle)) {
150                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
151                         GOTO(out, rc = PTR_ERR(handle));
152                 }
153         }
154
155         off = med->med_off;
156
157         transno = req->rq_reqmsg->transno;
158         if (transno == 0) {
159                 spin_lock(&mds->mds_transno_lock);
160                 transno = ++mds->mds_last_transno;
161                 spin_unlock(&mds->mds_transno_lock);
162         }
163         req->rq_repmsg->transno = req->rq_transno = transno;
164         mcd->mcd_last_transno = cpu_to_le64(transno);
165         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
166         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
167         mcd->mcd_last_result = cpu_to_le32(rc);
168         mcd->mcd_last_data = cpu_to_le32(op_data);
169
170         fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
171                              mds_commit_cb, NULL);
172         written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
173                                       (char *)mcd, sizeof(*mcd), &off);
174         CDEBUG(D_INODE, "wrote trans "LPU64" client %s at idx %u: written = "
175                LPSZ"\n", transno, mcd->mcd_uuid, med->med_idx, written);
176
177         if (written != sizeof(*mcd)) {
178                 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
179                 if (rc == 0) {
180                         if (written < 0)
181                                 rc = written;
182                         else
183                                 rc = -EIO;
184                 }
185         }
186
187 commit:
188         err = fsfilt_commit(obd, inode, handle, 0);
189         if (err) {
190                 CERROR("error committing transaction: %d\n", err);
191                 if (!rc)
192                         rc = err;
193         }
194
195         EXIT;
196  out:
197         return rc;
198 }
199
200 /* this gives the same functionality as the code between
201  * sys_chmod and inode_setattr
202  * chown_common and inode_setattr
203  * utimes and inode_setattr
204  */
205 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
206 {
207         time_t now = LTIME_S(CURRENT_TIME);
208         struct iattr *attr = &rec->ur_iattr;
209         unsigned int ia_valid = attr->ia_valid;
210         int error;
211         ENTRY;
212
213         /* only fix up attrs if the client VFS didn't already */
214         if (!(ia_valid & ATTR_RAW))
215                 RETURN(0);
216
217         if (!(ia_valid & ATTR_CTIME_SET))
218                 LTIME_S(attr->ia_ctime) = now;
219         if (!(ia_valid & ATTR_ATIME_SET))
220                 LTIME_S(attr->ia_atime) = now;
221         if (!(ia_valid & ATTR_MTIME_SET))
222                 LTIME_S(attr->ia_mtime) = now;
223
224         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
225                 RETURN(-EPERM);
226
227         /* times */
228         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME)) {
229                 if (rec->ur_fsuid != inode->i_uid &&
230                     (error = permission(inode,MAY_WRITE)) != 0)
231                         RETURN(error);
232         }
233
234         if (ia_valid & ATTR_SIZE) {
235                 if ((error = permission(inode,MAY_WRITE)) != 0)
236                         RETURN(error);
237         }
238
239         if (ia_valid & ATTR_UID) {
240                 /* chown */
241                 error = -EPERM;
242                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
243                         RETURN(-EPERM);
244                 if (attr->ia_uid == (uid_t) -1)
245                         attr->ia_uid = inode->i_uid;
246                 if (attr->ia_gid == (gid_t) -1)
247                         attr->ia_gid = inode->i_gid;
248                 attr->ia_mode = inode->i_mode;
249                 /*
250                  * If the user or group of a non-directory has been
251                  * changed by a non-root user, remove the setuid bit.
252                  * 19981026 David C Niemi <niemi@tux.org>
253                  *
254                  * Changed this to apply to all users, including root,
255                  * to avoid some races. This is the behavior we had in
256                  * 2.0. The check for non-root was definitely wrong
257                  * for 2.2 anyway, as it should have been using
258                  * CAP_FSETID rather than fsuid -- 19990830 SD.
259                  */
260                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
261                     !S_ISDIR(inode->i_mode)) {
262                         attr->ia_mode &= ~S_ISUID;
263                         attr->ia_valid |= ATTR_MODE;
264                 }
265                 /*
266                  * Likewise, if the user or group of a non-directory
267                  * has been changed by a non-root user, remove the
268                  * setgid bit UNLESS there is no group execute bit
269                  * (this would be a file marked for mandatory
270                  * locking).  19981026 David C Niemi <niemi@tux.org>
271                  *
272                  * Removed the fsuid check (see the comment above) --
273                  * 19990830 SD.
274                  */
275                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
276                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
277                         attr->ia_mode &= ~S_ISGID;
278                         attr->ia_valid |= ATTR_MODE;
279                 }
280         } else if (ia_valid & ATTR_MODE) {
281                 int mode = attr->ia_mode;
282                 /* chmod */
283                 if (attr->ia_mode == (mode_t) -1)
284                         attr->ia_mode = inode->i_mode;
285                 attr->ia_mode =
286                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
287         }
288         RETURN(0);
289 }
290
291 static void reconstruct_reint_setattr(struct mds_update_record *rec,
292                                       int offset, struct ptlrpc_request *req)
293 {
294         struct mds_export_data *med = &req->rq_export->exp_mds_data;
295         struct mds_client_data *mcd = med->med_mcd;
296         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
297         struct dentry *de;
298         struct mds_body *body;
299
300         req->rq_transno = mcd->mcd_last_transno;
301         req->rq_status = mcd->mcd_last_result;
302
303         if (req->rq_export->exp_outstanding_reply)
304                 mds_steal_ack_locks(req->rq_export, req);
305
306         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
307         if (IS_ERR(de)) {
308                 LASSERT(PTR_ERR(de) == req->rq_status);
309                 return;
310         }
311
312         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
313         mds_pack_inode2fid(&body->fid1, de->d_inode);
314         mds_pack_inode2body(body, de->d_inode);
315
316         /* Don't return OST-specific attributes if we didn't just set them */
317         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
318                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
319         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
320                 body->valid |= OBD_MD_FLMTIME;
321         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
322                 body->valid |= OBD_MD_FLATIME;
323
324         l_dput(de);
325 }
326
327 /* In the raw-setattr case, we lock the child inode.
328  * In the write-back case or if being called from open, the client holds a lock
329  * already.
330  *
331  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
332 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
333                              struct ptlrpc_request *req,
334                              struct lustre_handle *lh)
335 {
336         struct mds_obd *mds = mds_req2mds(req);
337         struct obd_device *obd = req->rq_export->exp_obd;
338         struct mds_body *body;
339         struct dentry *de;
340         struct inode *inode = NULL;
341         struct lustre_handle lockh;
342         void *handle = NULL;
343         struct mds_logcancel_data *mlcd = NULL;
344         int rc = 0, cleanup_phase = 0, err, locked = 0;
345         ENTRY;
346
347         LASSERT(offset == 0);
348
349         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
350
351         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
352                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
353                 if (IS_ERR(de))
354                         GOTO(cleanup, rc = PTR_ERR(de));
355         } else {
356                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
357                                            &lockh);
358                 if (IS_ERR(de))
359                         GOTO(cleanup, rc = PTR_ERR(de));
360                 locked = 1;
361         }
362
363         cleanup_phase = 1;
364         inode = de->d_inode;
365         LASSERT(inode);
366
367         CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
368
369         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
370                        to_kdev_t(inode->i_sb->s_dev));
371
372 #ifdef ENABLE_ORPHANS
373         if (unlikely(mds->mds_osc_obd == NULL))
374                 mds_osc_connect(obd, mds);
375 #endif
376
377         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
378         if (IS_ERR(handle)) {
379                 rc = PTR_ERR(handle);
380                 handle = NULL;
381                 GOTO(cleanup, rc);
382         }
383
384         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
385                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
386                        LTIME_S(rec->ur_iattr.ia_mtime),
387                        LTIME_S(rec->ur_iattr.ia_ctime));
388         rc = mds_fix_attr(inode, rec);
389         if (rc)
390                 GOTO(cleanup, rc);
391
392         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
393         if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
394                 rc = fsfilt_set_md(obd, inode, handle,
395                                    rec->ur_eadata, rec->ur_eadatalen);
396         }
397
398         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
399         mds_pack_inode2fid(&body->fid1, inode);
400         mds_pack_inode2body(body, inode);
401
402         /* Don't return OST-specific attributes if we didn't just set them */
403         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
404                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
405         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
406                 body->valid |= OBD_MD_FLMTIME;
407         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
408                 body->valid |= OBD_MD_FLATIME;
409
410         if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
411                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
412                           rec->ur_eadatalen);
413                 if (mlcd) {
414                         mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
415                                 rec->ur_eadatalen;
416                         mlcd->mlcd_eadatalen = rec->ur_eadatalen;
417                         mlcd->mlcd_cookielen = rec->ur_cookielen;
418                         mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
419                                 mlcd->mlcd_cookielen;
420                         memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
421                                mlcd->mlcd_cookielen);
422                         memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
423                                mlcd->mlcd_eadatalen);
424                 } else {
425                         CERROR("unable to allocate log cancel data\n");
426                 }
427         }
428         EXIT;
429  cleanup:
430         if (mlcd != NULL)
431                 fsfilt_set_last_rcvd(req->rq_export->exp_obd, 0, handle,
432                                      mds_cancel_cookies_cb, mlcd);
433         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
434         switch (cleanup_phase) {
435         case 1:
436                 l_dput(de);
437                 if (locked) {
438                         if (rc) {
439                                 ldlm_lock_decref(&lockh, LCK_PW);
440                         } else {
441                                 memcpy(&req->rq_ack_locks[0].lock, &lockh,
442                                        sizeof(lockh));
443                                 req->rq_ack_locks[0].mode = LCK_PW;
444                         }
445                 }
446         case 0:
447                 break;
448         default:
449                 LBUG();
450         }
451         if (err && !rc)
452                 rc = err;
453
454         req->rq_status = rc;
455         return 0;
456 }
457
458 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
459                                      struct ptlrpc_request *req)
460 {
461         struct mds_export_data *med = &req->rq_export->exp_mds_data;
462         struct mds_client_data *mcd = med->med_mcd;
463         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
464         struct dentry *parent, *child;
465         struct mds_body *body;
466
467         req->rq_transno = mcd->mcd_last_transno;
468         req->rq_status = mcd->mcd_last_result;
469
470         if (req->rq_export->exp_outstanding_reply)
471                 mds_steal_ack_locks(req->rq_export, req);
472
473         if (req->rq_status)
474                 return;
475
476         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
477         LASSERT(!IS_ERR(parent));
478         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
479         LASSERT(!IS_ERR(child));
480         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
481         mds_pack_inode2fid(&body->fid1, child->d_inode);
482         mds_pack_inode2body(body, child->d_inode);
483         l_dput(parent);
484         l_dput(child);
485 }
486
487 static int mds_reint_create(struct mds_update_record *rec, int offset,
488                             struct ptlrpc_request *req,
489                             struct lustre_handle *lh)
490 {
491         struct dentry *de = NULL;
492         struct mds_obd *mds = mds_req2mds(req);
493         struct obd_device *obd = req->rq_export->exp_obd;
494         struct dentry *dchild = NULL;
495         struct inode *dir = NULL;
496         void *handle = NULL;
497         struct lustre_handle lockh;
498         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
499         int created = 0;
500         ENTRY;
501
502         LASSERT(offset == 0);
503         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
504
505         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
506
507         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
508                 GOTO(cleanup, rc = -ESTALE);
509
510         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
511         if (IS_ERR(de)) {
512                 rc = PTR_ERR(de);
513                 CERROR("parent lookup error %d\n", rc);
514                 GOTO(cleanup, rc);
515         }
516         cleanup_phase = 1; /* locked parent dentry */
517         dir = de->d_inode;
518         LASSERT(dir);
519         CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
520                dir->i_ino, rec->ur_name, rec->ur_mode);
521
522         ldlm_lock_dump_handle(D_OTHER, &lockh);
523
524         dchild = ll_lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
525         if (IS_ERR(dchild)) {
526                 rc = PTR_ERR(dchild);
527                 CERROR("child lookup error %d\n", rc);
528                 GOTO(cleanup, rc);
529         }
530
531         cleanup_phase = 2; /* child dentry */
532
533         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
534                        to_kdev_t(dir->i_sb->s_dev));
535
536         if (dir->i_mode & S_ISGID) {
537                 rec->ur_gid = dir->i_gid;
538                 if (S_ISDIR(rec->ur_mode))
539                         rec->ur_mode |= S_ISGID;
540         }
541
542         if (rec->ur_fid2->id)
543                 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
544
545         switch (type) {
546         case S_IFREG:{
547                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
548                 if (IS_ERR(handle))
549                         GOTO(cleanup, rc = PTR_ERR(handle));
550                 rc = vfs_create(dir, dchild, rec->ur_mode);
551                 EXIT;
552                 break;
553         }
554         case S_IFDIR:{
555                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
556                 if (IS_ERR(handle))
557                         GOTO(cleanup, rc = PTR_ERR(handle));
558                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
559                 EXIT;
560                 break;
561         }
562         case S_IFLNK:{
563                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
564                 if (IS_ERR(handle))
565                         GOTO(cleanup, rc = PTR_ERR(handle));
566                 if (rec->ur_tgt == NULL)        /* no target supplied */
567                         rc = -EINVAL;           /* -EPROTO? */
568                 else
569                         rc = vfs_symlink(dir, dchild, rec->ur_tgt);
570                 EXIT;
571                 break;
572         }
573         case S_IFCHR:
574         case S_IFBLK:
575         case S_IFIFO:
576         case S_IFSOCK:{
577                 int rdev = rec->ur_rdev;
578                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
579                 if (IS_ERR(handle))
580                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
581                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
582                 EXIT;
583                 break;
584         }
585         default:
586                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
587                 dchild->d_fsdata = NULL;
588                 GOTO(cleanup, rc = -EINVAL);
589         }
590
591         /* In case we stored the desired inum in here, we want to clean up. */
592         if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
593                 dchild->d_fsdata = NULL;
594
595         if (rc) {
596                 CDEBUG(D_INODE, "error during create: %d\n", rc);
597                 GOTO(cleanup, rc);
598         } else {
599                 struct iattr iattr;
600                 struct inode *inode = dchild->d_inode;
601                 struct mds_body *body;
602
603                 created = 1;
604                 LTIME_S(iattr.ia_atime) = rec->ur_time;
605                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
606                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
607                 iattr.ia_uid = rec->ur_uid;
608                 iattr.ia_gid = rec->ur_gid;
609                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
610                         ATTR_MTIME | ATTR_CTIME;
611
612                 if (rec->ur_fid2->id) {
613                         LASSERT(rec->ur_fid2->id == inode->i_ino);
614                         inode->i_generation = rec->ur_fid2->generation;
615                         /* Dirtied and committed by the upcoming setattr. */
616                         CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
617                                inode->i_ino, inode->i_generation);
618                 } else {
619                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
620                                inode->i_ino, inode->i_generation);
621                 }
622
623                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
624                 if (rc) {
625                         CERROR("error on setattr: rc = %d\n", rc);
626                         /* XXX should we abort here in case of error? */
627                 }
628
629                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
630                 mds_pack_inode2fid(&body->fid1, inode);
631                 mds_pack_inode2body(body, inode);
632         }
633         EXIT;
634
635 cleanup:
636         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
637
638         if (rc && created) {
639                 /* Destroy the file we just created.  This should not need
640                  * extra journal credits, as we have already modified all of
641                  * the blocks needed in order to create the file in the first
642                  * place.
643                  */
644                 switch (type) {
645                 case S_IFDIR:
646                         err = vfs_rmdir(dir, dchild);
647                         if (err)
648                                 CERROR("rmdir in error path: %d\n", err);
649                         break;
650                 default:
651                         err = vfs_unlink(dir, dchild);
652                         if (err)
653                                 CERROR("unlink in error path: %d\n", err);
654                         break;
655                 }
656         } else {
657                 rc = err;
658         }
659         switch (cleanup_phase) {
660         case 2: /* child dentry */
661                 l_dput(dchild);
662         case 1: /* locked parent dentry */
663                 if (rc) {
664                         ldlm_lock_decref(&lockh, LCK_PW);
665                 } else {
666                         memcpy(&req->rq_ack_locks[0].lock, &lockh,
667                                sizeof(lockh));
668                         req->rq_ack_locks[0].mode = LCK_PW;
669                 }
670                 l_dput(de);
671         case 0:
672                 break;
673         default:
674                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
675                 LBUG();
676         }
677         req->rq_status = rc;
678         return 0;
679 }
680
681 /* This function doesn't use ldlm_match_or_enqueue because we're always called
682  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
683  * because they take the place of local semaphores.
684  *
685  * Two locks are taken in numerical order */
686 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
687                           struct ldlm_res_id *p1_res_id,
688                           struct ldlm_res_id *p2_res_id,
689                           struct lustre_handle *p1_lockh,
690                           struct lustre_handle *p2_lockh)
691 {
692         struct ldlm_res_id res_id[2];
693         struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
694         int rc, flags;
695         ENTRY;
696
697         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
698
699         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
700                p1_res_id[0].name[0], p2_res_id[0].name[0]);
701
702         if (p1_res_id->name[0] < p2_res_id->name[0]) {
703                 handles[0] = p1_lockh;
704                 handles[1] = p2_lockh;
705                 res_id[0] = *p1_res_id;
706                 res_id[1] = *p2_res_id;
707         } else {
708                 handles[1] = p1_lockh;
709                 handles[0] = p2_lockh;
710                 res_id[1] = *p1_res_id;
711                 res_id[0] = *p2_res_id;
712         }
713
714         CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
715                p1_res_id[0].name[0], p2_res_id[0].name[0]);
716
717         flags = LDLM_FL_LOCAL_ONLY;
718         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
719                               LDLM_PLAIN, NULL, 0, lock_mode, &flags,
720                               ldlm_completion_ast, mds_blocking_ast, NULL,
721                               handles[0]);
722         if (rc != ELDLM_OK)
723                 RETURN(-EIO);
724         ldlm_lock_dump_handle(D_OTHER, handles[0]);
725
726         if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
727                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
728                 ldlm_lock_addref(handles[1], lock_mode);
729         } else {
730                 flags = LDLM_FL_LOCAL_ONLY;
731                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
732                                       res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
733                                       &flags, ldlm_completion_ast,
734                                       mds_blocking_ast, NULL, handles[1]);
735                 if (rc != ELDLM_OK) {
736                         ldlm_lock_decref(handles[0], lock_mode);
737                         RETURN(-EIO);
738                 }
739         }
740         ldlm_lock_dump_handle(D_OTHER, handles[1]);
741
742         RETURN(0);
743 }
744
745 static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
746                                     struct ptlrpc_request *req,
747                                     struct lustre_handle *child_lockh)
748 {
749         struct mds_export_data *med = &req->rq_export->exp_mds_data;
750         struct mds_client_data *mcd = med->med_mcd;
751
752         req->rq_transno = mcd->mcd_last_transno;
753         req->rq_status = mcd->mcd_last_result;
754
755         if (req->rq_export->exp_outstanding_reply)
756                 mds_steal_ack_locks(req->rq_export, req);
757
758         DEBUG_REQ(D_ERROR, req,
759                   "can't get EA for reconstructed unlink, leaking OST inodes");
760 }
761
762 /* If we are unlinking an open file/dir (i.e. creating an orphan) then
763  * we instead link the inode into the PENDING directory until it is
764  * finally released.  We can't simply call mds_reint_rename() or some
765  * part thereof, because we don't have the inode to check for link
766  * count/open status until after it is locked.
767  *
768  * For lock ordering, we always get the PENDING, then pending_child lock
769  * last to avoid deadlocks.
770  */
771 static int mds_unlink_orphan(struct mds_update_record *rec,
772                              struct obd_device *obd, struct dentry *dparent,
773                              struct dentry *dchild, void **handle)
774 {
775         struct mds_obd *mds = &obd->u.mds;
776         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
777         struct dentry *pending_child;
778         char fidname[LL_FID_NAMELEN];
779         int fidlen = 0, rc;
780         ENTRY;
781
782         LASSERT(!mds_inode_is_orphan(dchild->d_inode));
783
784         down(&pending_dir->i_sem);
785         fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
786                             dchild->d_inode->i_generation);
787
788         CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
789                mds_open_orphan_count(dchild->d_inode),
790                rec->ur_name, fidname);
791
792         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
793         if (IS_ERR(pending_child))
794                 GOTO(out_lock, rc = PTR_ERR(pending_child));
795
796         if (pending_child->d_inode != NULL) {
797                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
798                 LASSERT(pending_child->d_inode == dchild->d_inode);
799                 GOTO(out_dput, rc = 0);
800         }
801
802         *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
803         if (IS_ERR(*handle))
804                 GOTO(out_dput, rc = PTR_ERR(*handle));
805
806         rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
807         if (rc)
808                 CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
809                        dparent->d_inode->i_ino, rec->ur_name, rc);
810         else
811                 mds_inode_set_orphan(dchild->d_inode);
812 out_dput:
813         dput(pending_child);
814 out_lock:
815         up(&pending_dir->i_sem);
816         RETURN(rc);
817 }
818
819 static int mds_log_op_unlink(struct obd_device *obd, struct mds_obd *mds,
820                              struct inode *inode, struct lustre_msg *repmsg,
821                              int offset)
822 {
823         struct lov_stripe_md *lsm = NULL;
824         struct llog_unlink_rec *lur;
825         int rc;
826         ENTRY;
827
828         if (IS_ERR(mds->mds_osc_obd))
829                 RETURN(PTR_ERR(mds->mds_osc_obd));
830
831         rc = obd_unpackmd(&mds->mds_osc_conn, &lsm,
832                           lustre_msg_buf(repmsg, offset, 0),
833                           repmsg->buflens[offset]);
834         if (rc < 0)
835                 RETURN(rc);
836
837         OBD_ALLOC(lur, sizeof(*lur));
838         if (!lur)
839                 RETURN(-ENOMEM);
840         lur->lur_hdr.lth_len = lur->lur_end_len = sizeof(*lur);
841         lur->lur_hdr.lth_type = MDS_UNLINK_REC;
842         lur->lur_oid = inode->i_ino;
843         lur->lur_ogen = inode->i_generation;
844
845         rc = obd_log_add(&mds->mds_osc_conn, mds->mds_catalog, &lur->lur_hdr,
846                          lsm, lustre_msg_buf(repmsg, offset + 1, 0),
847                          repmsg->buflens[offset+1]/sizeof(struct llog_cookie));
848
849         obd_free_memmd(&mds->mds_osc_conn, &lsm);
850         OBD_FREE(lur, sizeof(*lur));
851
852         RETURN(rc);
853 }
854
855 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
856                             struct ptlrpc_request *req,
857                             struct lustre_handle *lh)
858 {
859         struct dentry *dparent = NULL;
860         struct dentry *dchild = NULL;
861         struct mds_obd *mds = mds_req2mds(req);
862         struct obd_device *obd = req->rq_export->exp_obd;
863         struct mds_body *body = NULL;
864         struct inode *child_inode;
865         struct lustre_handle parent_lockh, child_lockh;
866         void *handle = NULL;
867         struct ldlm_res_id child_res_id = { .name = {0} };
868         int rc = 0, flags = 0, log_unlink = 0, cleanup_phase = 0;
869         ENTRY;
870
871         LASSERT(offset == 0 || offset == 2);
872
873         MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req,
874                                                        &child_lockh));
875
876         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
877                 GOTO(cleanup, rc = -ENOENT);
878
879         /* Step 1: Lookup the parent by FID */
880         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
881                                         &parent_lockh);
882         if (IS_ERR(dparent))
883                 GOTO(cleanup, rc = PTR_ERR(dparent));
884         LASSERT(dparent->d_inode);
885
886         cleanup_phase = 1; /* Have parent dentry lock */
887
888         /* Step 2: Lookup the child */
889         dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
890         if (IS_ERR(dchild))
891                 GOTO(cleanup, rc = PTR_ERR(dchild));
892
893         cleanup_phase = 2; /* child dentry */
894
895         child_inode = dchild->d_inode;
896         if (child_inode == NULL) {
897                 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
898                        dparent->d_inode->i_ino, rec->ur_name);
899                 GOTO(cleanup, rc = -ENOENT);
900         }
901
902         DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
903                   dparent->d_inode->i_ino, child_inode->i_ino);
904
905         /* Step 3: Get a lock on the child */
906         child_res_id.name[0] = child_inode->i_ino;
907         child_res_id.name[1] = child_inode->i_generation;
908
909         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
910                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
911                               &flags, ldlm_completion_ast, mds_blocking_ast,
912                               NULL, &child_lockh);
913         if (rc != ELDLM_OK)
914                 GOTO(cleanup, rc);
915
916         cleanup_phase = 3; /* child lock */
917
918         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
919                        to_kdev_t(dparent->d_inode->i_sb->s_dev));
920
921         /* ldlm_reply in buf[0] if called via intent */
922         if (offset)
923                 offset = 1;
924
925         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
926         LASSERT(body != NULL);
927
928 #ifdef ENABLE_ORPHANS
929         if (unlikely(mds->mds_osc_obd == NULL))
930                 mds_osc_connect(obd, mds);
931 #endif
932
933         /* If this is the last reference to this inode, get the OBD EA
934          * data first so the client can destroy OST objects */
935         if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
936                 mds_pack_inode2fid(&body->fid1, child_inode);
937                 mds_pack_inode2body(body, child_inode);
938                 mds_pack_md(obd, req->rq_repmsg, offset + 1, body, child_inode);
939                 if (!(body->valid & OBD_MD_FLEASIZE)) {
940                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
941                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
942                         log_unlink = 1;
943                 }
944         }
945
946         /* We have to do these checks ourselves, in case we are making an
947          * orphan.  The client tells us whether rmdir() or unlink() was called,
948          * so we need to return appropriate errors (bug 72).
949          *
950          * We don't have to check permissions, because vfs_rename (called from
951          * mds_unlink_orphan) also calls may_delete. */
952         if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
953                 if (!S_ISDIR(child_inode->i_mode))
954                         GOTO(cleanup, rc = -ENOTDIR);
955         } else {
956                 if (S_ISDIR(child_inode->i_mode))
957                         GOTO(cleanup, rc = -EISDIR);
958         }
959
960         if (mds_open_orphan_count(child_inode) > 0) {
961                 rc = mds_unlink_orphan(rec, obd, dparent, dchild, &handle);
962 #ifdef ENABLE_ORPHANS
963                 if (!rc && mds_log_op_unlink(obd, mds, child_inode,
964                                              req->rq_repmsg, offset + 1) > 0)
965                         body->valid |= OBD_MD_FLCOOKIE;
966 #endif
967                 GOTO(cleanup, rc);
968         }
969
970         // Step 4: Do the unlink: client decides between rmdir/unlink! (bug 72)
971         switch (rec->ur_mode & S_IFMT) {
972         case S_IFDIR:
973                 /* Drop any lingering child directories before we start our
974                  * transaction, to avoid doing multiple inode dirty/delete
975                  * in our compound transaction (bug 1321). */
976                 shrink_dcache_parent(dchild);
977                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
978                                       NULL);
979                 if (IS_ERR(handle))
980                         GOTO(cleanup, rc = PTR_ERR(handle));
981                 cleanup_phase = 4;
982                 rc = vfs_rmdir(dparent->d_inode, dchild);
983                 break;
984         case S_IFREG: {
985                 handle = fsfilt_start(obd, dparent->d_inode,
986                                       FSFILT_OP_UNLINK_LOG, NULL);
987                 if (IS_ERR(handle))
988                         GOTO(cleanup, rc = PTR_ERR(handle));
989
990                 cleanup_phase = 4;
991                 rc = vfs_unlink(dparent->d_inode, dchild);
992 #ifdef ENABLE_ORPHANS
993                 if (!rc && log_unlink)
994                         if (mds_log_op_unlink(obd, mds, child_inode,
995                                               req->rq_repmsg, offset + 1) > 0)
996                                 body->valid |= OBD_MD_FLCOOKIE;
997 #endif
998                 break;
999         }
1000         case S_IFLNK:
1001         case S_IFCHR:
1002         case S_IFBLK:
1003         case S_IFIFO:
1004         case S_IFSOCK:
1005                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1006                                       NULL);
1007                 if (IS_ERR(handle))
1008                         GOTO(cleanup, rc = PTR_ERR(handle));
1009                 cleanup_phase = 4;
1010                 rc = vfs_unlink(dparent->d_inode, dchild);
1011                 break;
1012         default:
1013                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1014                        rec->ur_name);
1015                 LBUG();
1016                 GOTO(cleanup, rc = -EINVAL);
1017         }
1018
1019  cleanup:
1020         switch(cleanup_phase) {
1021         case 4:
1022                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1023                                         rc, 0);
1024                 if (rc && body != NULL) {
1025                         // Don't unlink the OST objects if the MDS unlink failed
1026                         body->valid = 0;
1027                 }
1028         case 3: /* child lock */
1029                 ldlm_lock_decref(&child_lockh, LCK_EX);
1030         case 2: /* child dentry */
1031                 l_dput(dchild);
1032         case 1: /* parent dentry and lock */
1033                 if (rc) {
1034                         ldlm_lock_decref(&parent_lockh, LCK_PW);
1035                 } else {
1036                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
1037                                sizeof(parent_lockh));
1038                         req->rq_ack_locks[0].mode = LCK_PW;
1039                 }
1040                 l_dput(dparent);
1041         case 0:
1042                 break;
1043         default:
1044                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1045                 LBUG();
1046         }
1047         req->rq_status = rc;
1048         return 0;
1049 }
1050
1051 static void reconstruct_reint_link(struct mds_update_record *rec, int offset,
1052                                    struct ptlrpc_request *req)
1053 {
1054         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1055         struct mds_client_data *mcd = med->med_mcd;
1056
1057         req->rq_transno = mcd->mcd_last_transno;
1058         req->rq_status = mcd->mcd_last_result;
1059
1060         if (req->rq_export->exp_outstanding_reply)
1061                 mds_steal_ack_locks(req->rq_export, req);
1062 }
1063
1064 static int mds_reint_link(struct mds_update_record *rec, int offset,
1065                           struct ptlrpc_request *req,
1066                           struct lustre_handle *lh)
1067 {
1068         struct obd_device *obd = req->rq_export->exp_obd;
1069         struct dentry *de_src = NULL;
1070         struct dentry *de_tgt_dir = NULL;
1071         struct dentry *dchild = NULL;
1072         struct mds_obd *mds = mds_req2mds(req);
1073         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
1074         struct ldlm_res_id src_res_id = { .name = {0} };
1075         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
1076         int lock_mode = 0, rc = 0, cleanup_phase = 0;
1077         ENTRY;
1078
1079         LASSERT(offset == 0);
1080
1081         MDS_CHECK_RESENT(req, reconstruct_reint_link(rec, offset, req));
1082
1083         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1084                 GOTO(cleanup, rc = -ENOENT);
1085
1086         /* Step 1: Lookup the source inode and target directory by FID */
1087         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1088         if (IS_ERR(de_src))
1089                 GOTO(cleanup, rc = PTR_ERR(de_src));
1090
1091         cleanup_phase = 1; /* source dentry */
1092
1093         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1094         if (IS_ERR(de_tgt_dir))
1095                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
1096
1097         cleanup_phase = 2; /* target directory dentry */
1098
1099         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
1100                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
1101                de_src->d_inode->i_ino);
1102
1103         /* Step 2: Take the two locks */
1104         lock_mode = LCK_EX;
1105         src_res_id.name[0] = de_src->d_inode->i_ino;
1106         src_res_id.name[1] = de_src->d_inode->i_generation;
1107         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
1108         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
1109
1110         rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
1111                                    &src_lockh, &tgt_dir_lockh);
1112         if (rc != ELDLM_OK)
1113                 GOTO(cleanup, rc = -EIO);
1114
1115         cleanup_phase = 3; /* locks */
1116
1117         /* Step 3: Lookup the child */
1118         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
1119         if (IS_ERR(dchild)) {
1120                 rc = PTR_ERR(dchild);
1121                 if (rc != -EPERM && rc != -EACCES)
1122                         CERROR("child lookup error %d\n", rc);
1123                 GOTO(cleanup, rc);
1124         }
1125
1126         cleanup_phase = 4; /* child dentry */
1127
1128         if (dchild->d_inode) {
1129                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
1130                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
1131                 rc = -EEXIST;
1132                 GOTO(cleanup, rc);
1133         }
1134
1135         /* Step 4: Do it. */
1136         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
1137                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
1138
1139         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
1140         if (IS_ERR(handle)) {
1141                 rc = PTR_ERR(handle);
1142                 GOTO(cleanup, rc);
1143         }
1144
1145         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
1146         if (rc && rc != -EPERM && rc != -EACCES)
1147                 CERROR("vfs_link error %d\n", rc);
1148 cleanup:
1149         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
1150                                 handle, req, rc, 0);
1151         EXIT;
1152
1153         switch (cleanup_phase) {
1154         case 4: /* child dentry */
1155                 l_dput(dchild);
1156         case 3: /* locks */
1157                 if (rc) {
1158                         ldlm_lock_decref(&src_lockh, lock_mode);
1159                         ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
1160                 } else {
1161                         memcpy(&req->rq_ack_locks[0].lock, &src_lockh,
1162                                sizeof(src_lockh));
1163                         memcpy(&req->rq_ack_locks[1].lock, &tgt_dir_lockh,
1164                                sizeof(tgt_dir_lockh));
1165                         req->rq_ack_locks[0].mode = lock_mode;
1166                         req->rq_ack_locks[1].mode = lock_mode;
1167                 }
1168         case 2: /* target dentry */
1169                 l_dput(de_tgt_dir);
1170         case 1: /* source dentry */
1171                 l_dput(de_src);
1172         case 0:
1173                 break;
1174         default:
1175                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1176                 LBUG();
1177         }
1178         req->rq_status = rc;
1179         return 0;
1180 }
1181
1182 static void reconstruct_reint_rename(struct mds_update_record *rec,
1183                                      int offset, struct ptlrpc_request *req)
1184 {
1185         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1186         struct mds_client_data *mcd = med->med_mcd;
1187
1188         req->rq_transno = mcd->mcd_last_transno;
1189         req->rq_status = mcd->mcd_last_result;
1190
1191         if (req->rq_export->exp_outstanding_reply)
1192                 mds_steal_ack_locks(req->rq_export, req);
1193         else
1194                 LBUG(); /* don't support it yet, but it'll be fun! */
1195
1196 }
1197
1198 static int mds_reint_rename(struct mds_update_record *rec, int offset,
1199                             struct ptlrpc_request *req,
1200                             struct lustre_handle *lockh)
1201 {
1202         struct obd_device *obd = req->rq_export->exp_obd;
1203         struct dentry *de_srcdir = NULL;
1204         struct dentry *de_tgtdir = NULL;
1205         struct dentry *de_old = NULL;
1206         struct dentry *de_new = NULL;
1207         struct mds_obd *mds = mds_req2mds(req);
1208         struct lustre_handle dlm_handles[4];
1209         struct ldlm_res_id p1_res_id = { .name = {0} };
1210         struct ldlm_res_id p2_res_id = { .name = {0} };
1211         struct ldlm_res_id c1_res_id = { .name = {0} };
1212         struct ldlm_res_id c2_res_id = { .name = {0} };
1213         int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
1214         int cleanup_phase = 0;
1215         void *handle = NULL;
1216         ENTRY;
1217
1218         LASSERT(offset == 0);
1219
1220         MDS_CHECK_RESENT(req, reconstruct_reint_rename(rec, offset, req));
1221
1222         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1223         if (IS_ERR(de_srcdir))
1224                 GOTO(cleanup, rc = PTR_ERR(de_srcdir));
1225
1226         cleanup_phase = 1; /* source directory dentry */
1227
1228         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
1229         if (IS_ERR(de_tgtdir))
1230                 GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
1231
1232         cleanup_phase = 2; /* target directory dentry */
1233
1234         /* The idea here is that we need to get four locks in the end:
1235          * one on each parent directory, one on each child.  We need to take
1236          * these locks in some kind of order (to avoid deadlocks), and the order
1237          * I selected is "increasing resource number" order.  We need to take
1238          * the locks on the parent directories, however, before we can lookup
1239          * the children.  Thus the following plan:
1240          *
1241          * 1. Take locks on the parent(s), in order
1242          * 2. Lookup the children
1243          * 3. Take locks on the children, in order
1244          * 4. Execute the rename
1245          */
1246
1247         /* Step 1: Take locks on the parent(s), in order */
1248         p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
1249         p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
1250
1251         p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
1252         p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
1253
1254         rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
1255                                    &(dlm_handles[0]), &(dlm_handles[1]));
1256         if (rc != ELDLM_OK)
1257                 GOTO(cleanup, rc);
1258
1259         cleanup_phase = 3; /* parent locks */
1260
1261         /* Step 2: Lookup the children */
1262         de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
1263         if (IS_ERR(de_old)) {
1264                 CERROR("old child lookup error (%*s): %ld\n",
1265                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
1266                 GOTO(cleanup, rc = PTR_ERR(de_old));
1267         }
1268
1269         cleanup_phase = 4; /* original name dentry */
1270
1271         if (de_old->d_inode == NULL)
1272                 GOTO(cleanup, rc = -ENOENT);
1273
1274         /* sanity check for src inode */
1275         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1276             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1277                 GOTO(cleanup, rc = -EINVAL);
1278
1279         de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
1280         if (IS_ERR(de_new)) {
1281                 CERROR("new child lookup error (%*s): %ld\n",
1282                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
1283                 GOTO(cleanup, rc = PTR_ERR(de_new));
1284         }
1285
1286         cleanup_phase = 5; /* target dentry */
1287
1288         /* sanity check for dest inode */
1289         if (de_new->d_inode &&
1290             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1291             de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1292                 GOTO(cleanup, rc = -EINVAL);
1293
1294         /* Step 3: Take locks on the children */
1295         c1_res_id.name[0] = de_old->d_inode->i_ino;
1296         c1_res_id.name[1] = de_old->d_inode->i_generation;
1297         if (de_new->d_inode == NULL) {
1298                 flags = LDLM_FL_LOCAL_ONLY;
1299                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1300                                       c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
1301                                       &flags, ldlm_completion_ast,
1302                                       mds_blocking_ast, NULL,
1303                                       &(dlm_handles[2]));
1304                 lock_count = 3;
1305         } else {
1306                 c2_res_id.name[0] = de_new->d_inode->i_ino;
1307                 c2_res_id.name[1] = de_new->d_inode->i_generation;
1308                 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
1309                                            &(dlm_handles[2]),
1310                                            &(dlm_handles[3]));
1311                 lock_count = 4;
1312         }
1313         if (rc != ELDLM_OK)
1314                 GOTO(cleanup, rc);
1315
1316         cleanup_phase = 6; /* child locks */
1317
1318         /* Step 4: Execute the rename */
1319         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1320                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
1321
1322         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
1323         if (IS_ERR(handle))
1324                 GOTO(cleanup, rc = PTR_ERR(handle));
1325
1326         lock_kernel();
1327         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
1328         unlock_kernel();
1329
1330         EXIT;
1331 cleanup:
1332         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1333                                 handle, req, rc, 0);
1334         switch (cleanup_phase) {
1335         case 6: /* child locks */
1336                 if (rc) {
1337                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1338                         if (lock_count == 4)
1339                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1340                 } else {
1341                         memcpy(&req->rq_ack_locks[2].lock, &(dlm_handles[2]),
1342                                sizeof(dlm_handles[2]));
1343                         req->rq_ack_locks[2].mode = LCK_EX;
1344                         if (lock_count == 4) {
1345                                 memcpy(&req->rq_ack_locks[3].lock,
1346                                        &dlm_handles[3], sizeof(dlm_handles[3]));
1347                                 req->rq_ack_locks[3].mode = LCK_EX;
1348                         }
1349                 }
1350         case 5: /* target dentry */
1351                 l_dput(de_new);
1352         case 4: /* source dentry */
1353                 l_dput(de_old);
1354         case 3: /* parent locks */
1355                 if (rc) {
1356                         ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
1357                         ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
1358                 } else {
1359                         memcpy(&req->rq_ack_locks[0].lock, &(dlm_handles[0]),
1360                                sizeof(dlm_handles[0]));
1361                         memcpy(&req->rq_ack_locks[1].lock, &(dlm_handles[1]),
1362                                sizeof(dlm_handles[1]));
1363                         req->rq_ack_locks[0].mode = LCK_EX;
1364                         req->rq_ack_locks[1].mode = LCK_EX;
1365                 }
1366         case 2: /* target directory dentry */
1367                 l_dput(de_tgtdir);
1368         case 1: /* source directry dentry */
1369                 l_dput(de_srcdir);
1370         case 0:
1371                 break;
1372         default:
1373                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1374                 LBUG();
1375         }
1376         req->rq_status = rc;
1377         return 0;
1378 }
1379
1380 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1381                            struct ptlrpc_request *, struct lustre_handle *);
1382
1383 static mds_reinter reinters[REINT_MAX + 1] = {
1384         [REINT_SETATTR] mds_reint_setattr,
1385         [REINT_CREATE] mds_reint_create,
1386         [REINT_UNLINK] mds_reint_unlink,
1387         [REINT_LINK] mds_reint_link,
1388         [REINT_RENAME] mds_reint_rename,
1389         [REINT_OPEN] mds_open
1390 };
1391
1392 int mds_reint_rec(struct mds_update_record *rec, int offset,
1393                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1394 {
1395         struct mds_obd *mds = mds_req2mds(req);
1396         struct obd_run_ctxt saved;
1397         int rc;
1398         ENTRY;
1399
1400         /* checked by unpacker */
1401         LASSERT(rec->ur_opcode <= REINT_MAX &&
1402                 reinters[rec->ur_opcode] != NULL);
1403
1404         push_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1405         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1406         pop_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1407
1408         RETURN(rc);
1409 }