Whamcloud - gitweb
merge b_devel into HEAD (20030626 merge tag) for 0.7.1
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/fs.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_fsfilt.h>
40 #include "mds_internal.h"
41
42 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
43
44 static void mds_commit_cb(struct obd_device *obd, __u64 transno, int error)
45 {
46         obd_transno_commit_cb(obd, transno, error);
47 }
48
49 /* Assumes caller has already pushed us into the kernel context. */
50 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
51                        struct ptlrpc_request *req, int rc,
52                        __u32 op_data)
53 {
54         struct mds_export_data *med = &req->rq_export->exp_mds_data;
55         struct mds_client_data *mcd = med->med_mcd;
56         struct obd_device *obd = req->rq_export->exp_obd;
57         int err;
58         __u64 transno;
59         loff_t off;
60         ssize_t written;
61         ENTRY;
62
63         /* if the export has already been failed, we have no last_rcvd slot */
64         if (req->rq_export->exp_failed) {
65                 CERROR("committing transaction for disconnected client\n");
66                 if (handle)
67                         GOTO(commit, rc);
68                 GOTO(out, rc);
69         }
70
71         if (!handle) {
72                 /* if we're starting our own xaction, use our own inode */
73                 i = mds->mds_rcvd_filp->f_dentry->d_inode;
74                 handle = fsfilt_start(obd, i, FSFILT_OP_SETATTR);
75                 if (IS_ERR(handle)) {
76                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
77                         GOTO(out, rc = PTR_ERR(handle));
78                 }
79         }
80
81         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
82
83         transno = req->rq_reqmsg->transno;
84         if (transno == 0) {
85                 spin_lock(&mds->mds_transno_lock);
86                 transno = ++mds->mds_last_transno;
87                 spin_unlock(&mds->mds_transno_lock);
88         }
89         req->rq_repmsg->transno = req->rq_transno = transno;
90         mcd->mcd_last_transno = cpu_to_le64(transno);
91         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
92         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
93         mcd->mcd_last_result = cpu_to_le32(rc);
94         mcd->mcd_last_data = cpu_to_le32(op_data);
95
96         fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
97                              mds_commit_cb);
98         written = lustre_fwrite(mds->mds_rcvd_filp, mcd, sizeof(*mcd), &off);
99         CDEBUG(D_INODE, "wrote trans "LPU64" client %s at #%u: written = "
100                LPSZ"\n", transno, mcd->mcd_uuid, med->med_off, written);
101
102         if (written != sizeof(*mcd)) {
103                 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
104                 if (rc == 0) {
105                         if (written < 0)
106                                 rc = written;
107                         else
108                                 rc = -EIO;
109                 }
110         }
111
112 commit:
113         err = fsfilt_commit(obd, i, handle, 0);
114         if (err) {
115                 CERROR("error committing transaction: %d\n", err);
116                 if (!rc)
117                         rc = err;
118         }
119
120         EXIT;
121  out:
122         return rc;
123 }
124
125 /* this gives the same functionality as the code between
126  * sys_chmod and inode_setattr
127  * chown_common and inode_setattr
128  * utimes and inode_setattr
129  */
130 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
131 {
132         time_t now = LTIME_S(CURRENT_TIME);
133         struct iattr *attr = &rec->ur_iattr;
134         unsigned int ia_valid = attr->ia_valid;
135         int error;
136         ENTRY;
137
138         /* only fix up attrs if the client VFS didn't already */
139         if (!(ia_valid & ATTR_RAW))
140                 RETURN(0);
141
142         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
143                 RETURN(-EPERM);
144
145         LTIME_S(attr->ia_ctime) = now;
146         if (!(ia_valid & ATTR_ATIME_SET))
147                 LTIME_S(attr->ia_atime) = now;
148         if (!(ia_valid & ATTR_MTIME_SET))
149                 LTIME_S(attr->ia_mtime) = now;
150
151         /* times */
152         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME) &&
153              !(ia_valid & ATTR_ATIME_SET)) {
154                 if (rec->ur_fsuid != inode->i_uid &&
155                     (error = permission(inode,MAY_WRITE)) != 0)
156                         RETURN(error);
157         } else if (ia_valid & ATTR_UID) {
158                 /* chown */
159                 error = -EPERM;
160                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
161                         RETURN(-EPERM);
162                 if (attr->ia_uid == (uid_t) -1)
163                         attr->ia_uid = inode->i_uid;
164                 if (attr->ia_gid == (gid_t) -1)
165                         attr->ia_gid = inode->i_gid;
166                 attr->ia_mode = inode->i_mode;
167                 attr->ia_valid =  ATTR_UID | ATTR_GID | ATTR_CTIME;
168                 /*
169                  * If the user or group of a non-directory has been
170                  * changed by a non-root user, remove the setuid bit.
171                  * 19981026 David C Niemi <niemi@tux.org>
172                  *
173                  * Changed this to apply to all users, including root,
174                  * to avoid some races. This is the behavior we had in
175                  * 2.0. The check for non-root was definitely wrong
176                  * for 2.2 anyway, as it should have been using
177                  * CAP_FSETID rather than fsuid -- 19990830 SD.
178                  */
179                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
180                     !S_ISDIR(inode->i_mode)) {
181                         attr->ia_mode &= ~S_ISUID;
182                         attr->ia_valid |= ATTR_MODE;
183                 }
184                 /*
185                  * Likewise, if the user or group of a non-directory
186                  * has been changed by a non-root user, remove the
187                  * setgid bit UNLESS there is no group execute bit
188                  * (this would be a file marked for mandatory
189                  * locking).  19981026 David C Niemi <niemi@tux.org>
190                  *
191                  * Removed the fsuid check (see the comment above) --
192                  * 19990830 SD.
193                  */
194                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
195                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
196                         attr->ia_mode &= ~S_ISGID;
197                         attr->ia_valid |= ATTR_MODE;
198                 }
199         } else if (ia_valid & ATTR_MODE) {
200                 int mode = attr->ia_mode;
201                 /* chmod */
202                 if (attr->ia_mode == (mode_t) -1)
203                         attr->ia_mode = inode->i_mode;
204                 attr->ia_mode =
205                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
206         }
207         RETURN(0);
208 }
209
210 static void reconstruct_reint_setattr(struct mds_update_record *rec,
211                                       int offset, struct ptlrpc_request *req)
212 {
213         struct mds_export_data *med = &req->rq_export->exp_mds_data;
214         struct mds_client_data *mcd = med->med_mcd;
215         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
216         struct dentry *de;
217         struct mds_body *body;
218
219         req->rq_transno = mcd->mcd_last_transno;
220         req->rq_status = mcd->mcd_last_result;
221
222         if (req->rq_export->exp_outstanding_reply)
223                 mds_steal_ack_locks(req->rq_export, req);
224
225         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
226         if (IS_ERR(de)) {
227                 LASSERT(PTR_ERR(de) == req->rq_status);
228                 return;
229         }
230
231         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
232         mds_pack_inode2fid(&body->fid1, de->d_inode);
233         mds_pack_inode2body(body, de->d_inode);
234
235         l_dput(de);
236 }
237
238 /* In the raw-setattr case, we lock the child inode.
239  * In the write-back case or if being called from open, the client holds a lock
240  * already.
241  *
242  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
243 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
244                              struct ptlrpc_request *req,
245                              struct lustre_handle *lh)
246 {
247         struct mds_obd *mds = mds_req2mds(req);
248         struct obd_device *obd = req->rq_export->exp_obd;
249         struct mds_body *body;
250         struct dentry *de;
251         struct inode *inode = NULL;
252         struct lustre_handle lockh;
253         void *handle = NULL;
254         int rc = 0, cleanup_phase = 0, err, locked = 0;
255         ENTRY;
256
257         LASSERT(offset == 0);
258
259         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
260
261         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
262                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
263                 if (IS_ERR(de))
264                         GOTO(cleanup, rc = PTR_ERR(de));
265         } else {
266                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
267                                            &lockh);
268                 if (IS_ERR(de))
269                         GOTO(cleanup, rc = PTR_ERR(de));
270                 locked = 1;
271         }
272
273         cleanup_phase = 1;
274         inode = de->d_inode;
275         LASSERT(inode);
276
277         CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
278
279         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
280                        to_kdev_t(inode->i_sb->s_dev));
281
282         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
283         if (IS_ERR(handle)) {
284                 rc = PTR_ERR(handle);
285                 handle = NULL;
286                 GOTO(cleanup, rc);
287         }
288
289         rc = mds_fix_attr(inode, rec);
290         if (rc)
291                 GOTO(cleanup, rc);
292
293         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
294         if (rc == 0 &&
295             S_ISREG(inode->i_mode) &&
296             rec->ur_eadata != NULL) {
297                 rc = fsfilt_set_md(obd, inode, handle,
298                                    rec->ur_eadata, rec->ur_eadatalen);
299         }
300
301         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
302         mds_pack_inode2fid(&body->fid1, inode);
303         mds_pack_inode2body(body, inode);
304
305         EXIT;
306  cleanup:
307         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
308         switch(cleanup_phase) {
309         case 1:
310                 l_dput(de);
311                 if (locked) {
312                         if (rc) {
313                                 ldlm_lock_decref(&lockh, LCK_PW);
314                         } else {
315                                 memcpy(&req->rq_ack_locks[0].lock, &lockh,
316                                        sizeof(lockh));
317                                 req->rq_ack_locks[0].mode = LCK_PW;
318                         }
319                 }
320         case 0:
321                 break;
322         default:
323                 LBUG();
324         }
325         if (err && !rc)
326                 rc = err;
327
328         req->rq_status = rc;
329         return 0;
330 }
331
332 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
333                                      struct ptlrpc_request *req)
334 {
335         struct mds_export_data *med = &req->rq_export->exp_mds_data;
336         struct mds_client_data *mcd = med->med_mcd;
337         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
338         struct dentry *parent, *child;
339         struct mds_body *body;
340
341         req->rq_transno = mcd->mcd_last_transno;
342         req->rq_status = mcd->mcd_last_result;
343
344         if (req->rq_export->exp_outstanding_reply)
345                 mds_steal_ack_locks(req->rq_export, req);
346
347         if (req->rq_status)
348                 return;
349
350         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
351         LASSERT(!IS_ERR(parent));
352         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
353         LASSERT(!IS_ERR(child));
354         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
355         mds_pack_inode2fid(&body->fid1, child->d_inode);
356         mds_pack_inode2body(body, child->d_inode);
357         l_dput(parent);
358         l_dput(child);
359 }
360
361 static int mds_reint_create(struct mds_update_record *rec, int offset,
362                             struct ptlrpc_request *req,
363                             struct lustre_handle *lh)
364 {
365         struct dentry *de = NULL;
366         struct mds_obd *mds = mds_req2mds(req);
367         struct obd_device *obd = req->rq_export->exp_obd;
368         struct dentry *dchild = NULL;
369         struct inode *dir = NULL;
370         void *handle = NULL;
371         struct lustre_handle lockh;
372         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
373         int created = 0;
374         ENTRY;
375
376         LASSERT(offset == 0);
377         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
378
379         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
380
381         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
382                 GOTO(cleanup, rc = -ESTALE);
383
384         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
385         if (IS_ERR(de)) {
386                 rc = PTR_ERR(de);
387                 CERROR("parent lookup error %d\n", rc);
388                 GOTO(cleanup, rc);
389         }
390         cleanup_phase = 1; /* locked parent dentry */
391         dir = de->d_inode;
392         LASSERT(dir);
393         CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
394                dir->i_ino, rec->ur_name, rec->ur_mode);
395
396         ldlm_lock_dump_handle(D_OTHER, &lockh);
397
398         dchild = ll_lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
399         if (IS_ERR(dchild)) {
400                 rc = PTR_ERR(dchild);
401                 CERROR("child lookup error %d\n", rc);
402                 GOTO(cleanup, rc);
403         }
404
405         cleanup_phase = 2; /* child dentry */
406
407         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
408                        to_kdev_t(dir->i_sb->s_dev));
409
410         if (dir->i_mode & S_ISGID) {
411                 rec->ur_gid = dir->i_gid;
412                 if (S_ISDIR(rec->ur_mode))
413                         rec->ur_mode |= S_ISGID;
414         }
415
416         if (rec->ur_fid2->id)
417                 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
418
419         switch (type) {
420         case S_IFREG:{
421                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
422                 if (IS_ERR(handle))
423                         GOTO(cleanup, rc = PTR_ERR(handle));
424                 rc = vfs_create(dir, dchild, rec->ur_mode);
425                 EXIT;
426                 break;
427         }
428         case S_IFDIR:{
429                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
430                 if (IS_ERR(handle))
431                         GOTO(cleanup, rc = PTR_ERR(handle));
432                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
433                 EXIT;
434                 break;
435         }
436         case S_IFLNK:{
437                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
438                 if (IS_ERR(handle))
439                         GOTO(cleanup, rc = PTR_ERR(handle));
440                 if (rec->ur_tgt == NULL)        /* no target supplied */
441                         rc = -EINVAL;           /* -EPROTO? */
442                 else
443                         rc = vfs_symlink(dir, dchild, rec->ur_tgt);
444                 EXIT;
445                 break;
446         }
447         case S_IFCHR:
448         case S_IFBLK:
449         case S_IFIFO:
450         case S_IFSOCK:{
451                 int rdev = rec->ur_rdev;
452                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
453                 if (IS_ERR(handle))
454                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
455                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
456                 EXIT;
457                 break;
458         }
459         default:
460                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
461                 GOTO(cleanup, rc = -EINVAL);
462         }
463
464         /* In case we stored the desired inum in here, we want to clean up.
465          * We also do this in the cleanup block, for the error cases.
466          */
467         dchild->d_fsdata = NULL;
468
469         if (rc) {
470                 CDEBUG(D_INODE, "error during create: %d\n", rc);
471                 GOTO(cleanup, rc);
472         } else {
473                 struct iattr iattr;
474                 struct inode *inode = dchild->d_inode;
475                 struct mds_body *body;
476
477                 created = 1;
478                 LTIME_S(iattr.ia_atime) = rec->ur_time;
479                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
480                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
481                 iattr.ia_uid = rec->ur_uid;
482                 iattr.ia_gid = rec->ur_gid;
483                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
484                         ATTR_MTIME | ATTR_CTIME;
485
486                 if (rec->ur_fid2->id) {
487                         LASSERT(rec->ur_fid2->id == inode->i_ino);
488                         inode->i_generation = rec->ur_fid2->generation;
489                         /* Dirtied and committed by the upcoming setattr. */
490                         CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
491                                inode->i_ino, inode->i_generation);
492                 } else {
493                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
494                                inode->i_ino, inode->i_generation);
495                 }
496
497                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
498                 if (rc) {
499                         CERROR("error on setattr: rc = %d\n", rc);
500                         /* XXX should we abort here in case of error? */
501                 }
502
503                 body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
504                 mds_pack_inode2fid(&body->fid1, inode);
505                 mds_pack_inode2body(body, inode);
506         }
507         EXIT;
508
509 cleanup:
510         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
511
512         if (rc && created) {
513                 /* Destroy the file we just created.  This should not need
514                  * extra journal credits, as we have already modified all of
515                  * the blocks needed in order to create the file in the first
516                  * place.
517                  */
518                 switch (type) {
519                 case S_IFDIR:
520                         err = vfs_rmdir(dir, dchild);
521                         if (err)
522                                 CERROR("rmdir in error path: %d\n", err);
523                         break;
524                 default:
525                         err = vfs_unlink(dir, dchild);
526                         if (err)
527                                 CERROR("unlink in error path: %d\n", err);
528                         break;
529                 }
530         } else {
531                 rc = err;
532         }
533         switch (cleanup_phase) {
534         case 2: /* child dentry */
535                 dchild->d_fsdata = NULL;
536                 l_dput(dchild);
537         case 1: /* locked parent dentry */
538                 if (rc) {
539                         ldlm_lock_decref(&lockh, LCK_PW);
540                 } else {
541                         memcpy(&req->rq_ack_locks[0].lock, &lockh,
542                                sizeof(lockh));
543                         req->rq_ack_locks[0].mode = LCK_PW;
544                 }
545                 l_dput(de);
546         case 0:
547                 break;
548         default:
549                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
550                 LBUG();
551         }
552         req->rq_status = rc;
553         return 0;
554 }
555
556 /* This function doesn't use ldlm_match_or_enqueue because we're always called
557  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
558  * because they take the place of local semaphores.
559  *
560  * Two locks are taken in numerical order */
561 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
562                           struct ldlm_res_id *p1_res_id,
563                           struct ldlm_res_id *p2_res_id,
564                           struct lustre_handle *p1_lockh,
565                           struct lustre_handle *p2_lockh)
566 {
567         struct ldlm_res_id res_id[2];
568         struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
569         int rc, flags;
570         ENTRY;
571
572         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
573
574         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
575                p1_res_id[0].name[0], p2_res_id[0].name[0]);
576
577         if (p1_res_id->name[0] < p2_res_id->name[0]) {
578                 handles[0] = p1_lockh;
579                 handles[1] = p2_lockh;
580                 res_id[0] = *p1_res_id;
581                 res_id[1] = *p2_res_id;
582         } else {
583                 handles[1] = p1_lockh;
584                 handles[0] = p2_lockh;
585                 res_id[1] = *p1_res_id;
586                 res_id[0] = *p2_res_id;
587         }
588
589         CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
590                p1_res_id[0].name[0], p2_res_id[0].name[0]);
591
592         flags = LDLM_FL_LOCAL_ONLY;
593         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
594                               LDLM_PLAIN, NULL, 0, lock_mode, &flags,
595                               ldlm_completion_ast, mds_blocking_ast, NULL,
596                               handles[0]);
597         if (rc != ELDLM_OK)
598                 RETURN(-EIO);
599         ldlm_lock_dump_handle(D_OTHER, handles[0]);
600
601         if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
602                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
603                 ldlm_lock_addref(handles[1], lock_mode);
604         } else {
605                 flags = LDLM_FL_LOCAL_ONLY;
606                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
607                                       res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
608                                       &flags, ldlm_completion_ast,
609                                       mds_blocking_ast, NULL, handles[1]);
610                 if (rc != ELDLM_OK) {
611                         ldlm_lock_decref(handles[0], lock_mode);
612                         RETURN(-EIO);
613                 }
614         }
615         ldlm_lock_dump_handle(D_OTHER, handles[1]);
616
617         RETURN(0);
618 }
619
620 static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
621                                     struct ptlrpc_request *req,
622                                     struct lustre_handle *child_lockh)
623 {
624         struct mds_export_data *med = &req->rq_export->exp_mds_data;
625         struct mds_client_data *mcd = med->med_mcd;
626
627         req->rq_transno = mcd->mcd_last_transno;
628         req->rq_status = mcd->mcd_last_result;
629
630         if (req->rq_export->exp_outstanding_reply)
631                 mds_steal_ack_locks(req->rq_export, req);
632
633         DEBUG_REQ(D_ERROR, req,
634                   "can't get EA for reconstructed unlink, leaking OST inodes");
635 }
636
637 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
638                             struct ptlrpc_request *req,
639                             struct lustre_handle *child_lockh)
640 {
641         struct dentry *dir_de = NULL;
642         struct dentry *dchild = NULL;
643         struct mds_obd *mds = mds_req2mds(req);
644         struct obd_device *obd = req->rq_export->exp_obd;
645         struct mds_body *body = NULL;
646         struct inode *dir_inode = NULL, *child_inode;
647         struct lustre_handle parent_lockh;
648         void *handle = NULL;
649         struct ldlm_res_id child_res_id = { .name = {0} };
650         int rc = 0, flags = 0, return_lock = 0;
651         int cleanup_phase = 0;
652         ENTRY;
653
654         LASSERT(offset == 0 || offset == 2);
655
656         MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req,
657                                                        child_lockh));
658
659         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
660                 GOTO(cleanup, rc = -ENOENT);
661
662         /* Step 1: Lookup the parent by FID */
663         dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
664                                        &parent_lockh);
665         if (IS_ERR(dir_de))
666                 GOTO(cleanup, rc = PTR_ERR(dir_de));
667         dir_inode = dir_de->d_inode;
668         LASSERT(dir_inode);
669
670         cleanup_phase = 1; /* Have parent dentry lock */
671
672         /* Step 2: Lookup the child */
673         dchild = ll_lookup_one_len(rec->ur_name, dir_de, rec->ur_namelen - 1);
674         if (IS_ERR(dchild))
675                 GOTO(cleanup, rc = PTR_ERR(dchild));
676
677         cleanup_phase = 2; /* child dentry */
678
679         child_inode = dchild->d_inode;
680         if (child_inode == NULL) {
681                 CDEBUG(D_INODE,
682                        "child doesn't exist (dir %lu, name %s)\n",
683                        dir_inode->i_ino, rec->ur_name);
684                 rc = -ENOENT;
685                 GOTO(cleanup, rc);
686         }
687
688         DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
689                   dir_inode->i_ino, child_inode->i_ino);
690
691         /* Step 3: Get a lock on the child */
692         child_res_id.name[0] = child_inode->i_ino;
693         child_res_id.name[1] = child_inode->i_generation;
694
695         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
696                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
697                               &flags, ldlm_completion_ast, mds_blocking_ast,
698                               NULL, child_lockh);
699         if (rc != ELDLM_OK)
700                 GOTO(cleanup, rc);
701
702         cleanup_phase = 3; /* child lock */
703
704         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
705                        to_kdev_t(dir_inode->i_sb->s_dev));
706
707         /* ldlm_reply in buf[0] if called via intent */
708         if (offset)
709                 offset = 1;
710
711         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
712         LASSERT(body != NULL);
713
714         /* Step 4: Do the unlink: client decides between rmdir/unlink!
715          * (bug 72) */
716         switch (rec->ur_mode & S_IFMT) {
717         case S_IFDIR:
718                 /* Drop any lingering child directories before we start our
719                  * transaction, to avoid doing multiple inode dirty/delete
720                  * in our compound transaction (bug 1321).
721                  */
722                 shrink_dcache_parent(dchild);
723                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
724                 if (IS_ERR(handle))
725                         GOTO(cleanup, rc = PTR_ERR(handle));
726                 cleanup_phase = 4;
727                 rc = vfs_rmdir(dir_inode, dchild);
728                 break;
729         case S_IFREG:
730                 /* If this is the last reference to this inode, get the OBD EA
731                  * data first so the client can destroy OST objects */
732                 if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
733                         mds_pack_inode2fid(&body->fid1, child_inode);
734                         mds_pack_inode2body(body, child_inode);
735                         mds_pack_md(obd, req->rq_repmsg, offset + 1,
736                                     body, child_inode);
737                         if (body->valid & OBD_MD_FLEASIZE)
738                                 return_lock = 1;
739                 }
740                 /* no break */
741         case S_IFLNK:
742         case S_IFCHR:
743         case S_IFBLK:
744         case S_IFIFO:
745         case S_IFSOCK:
746                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
747                 if (IS_ERR(handle))
748                         GOTO(cleanup, rc = PTR_ERR(handle));
749                 cleanup_phase = 4;
750                 rc = vfs_unlink(dir_inode, dchild);
751                 break;
752         default:
753                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
754                        rec->ur_name);
755                 LBUG();
756                 GOTO(cleanup, rc = -EINVAL);
757         }
758
759  cleanup:
760         switch(cleanup_phase) {
761             case 4:
762                 rc = mds_finish_transno(mds, dir_inode, handle, req, rc, 0);
763                 if (rc && body) {
764                         /* Don't unlink the OST objects if the MDS unlink failed */
765                         body->valid = 0;
766                 }
767             case 3: /* child lock */
768                 if (rc != 0 || return_lock == 0)
769                         ldlm_lock_decref(child_lockh, LCK_EX);
770             case 2: /* child dentry */
771                 l_dput(dchild);
772             case 1: /* parent dentry and lock */
773                 if (rc) {
774                         ldlm_lock_decref(&parent_lockh, LCK_EX);
775                 } else {
776                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
777                                sizeof(parent_lockh));
778                         req->rq_ack_locks[0].mode = LCK_EX;
779                 }
780                 l_dput(dir_de);
781             case 0:
782                 break;
783             default:
784                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
785                 LBUG();
786         }
787         req->rq_status = rc;
788         return 0;
789 }
790
791 static void reconstruct_reint_link(struct mds_update_record *rec, int offset,
792                                    struct ptlrpc_request *req)
793 {
794         struct mds_export_data *med = &req->rq_export->exp_mds_data;
795         struct mds_client_data *mcd = med->med_mcd;
796
797         req->rq_transno = mcd->mcd_last_transno;
798         req->rq_status = mcd->mcd_last_result;
799
800         if (req->rq_export->exp_outstanding_reply)
801                 mds_steal_ack_locks(req->rq_export, req);
802 }
803
804 static int mds_reint_link(struct mds_update_record *rec, int offset,
805                           struct ptlrpc_request *req,
806                           struct lustre_handle *lh)
807 {
808         struct obd_device *obd = req->rq_export->exp_obd;
809         struct dentry *de_src = NULL;
810         struct dentry *de_tgt_dir = NULL;
811         struct dentry *dchild = NULL;
812         struct mds_obd *mds = mds_req2mds(req);
813         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
814         struct ldlm_res_id src_res_id = { .name = {0} };
815         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
816         int lock_mode = 0, rc = 0, cleanup_phase = 0;
817         ENTRY;
818
819         LASSERT(offset == 0);
820
821         MDS_CHECK_RESENT(req, reconstruct_reint_link(rec, offset, req));
822
823         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
824                 GOTO(cleanup, rc = -ENOENT);
825
826         /* Step 1: Lookup the source inode and target directory by FID */
827         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
828         if (IS_ERR(de_src))
829                 GOTO(cleanup, rc = PTR_ERR(de_src));
830
831         cleanup_phase = 1; /* source dentry */
832
833         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
834         if (IS_ERR(de_tgt_dir))
835                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
836
837         cleanup_phase = 2; /* target directory dentry */
838
839         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
840                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
841                de_src->d_inode->i_ino);
842
843         /* Step 2: Take the two locks */
844         lock_mode = LCK_EX;
845         src_res_id.name[0] = de_src->d_inode->i_ino;
846         src_res_id.name[1] = de_src->d_inode->i_generation;
847         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
848         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
849
850         rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
851                                    &src_lockh, &tgt_dir_lockh);
852         if (rc != ELDLM_OK)
853                 GOTO(cleanup, rc = -EIO);
854
855         cleanup_phase = 3; /* locks */
856
857         /* Step 3: Lookup the child */
858         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
859         if (IS_ERR(dchild)) {
860                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
861                 GOTO(cleanup, rc = PTR_ERR(dchild));
862         }
863
864         cleanup_phase = 4; /* child dentry */
865
866         if (dchild->d_inode) {
867                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
868                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
869                 rc = -EEXIST;
870                 GOTO(cleanup, rc);
871         }
872
873         /* Step 4: Do it. */
874         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
875                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
876
877         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
878         if (IS_ERR(handle)) {
879                 rc = PTR_ERR(handle);
880                 GOTO(cleanup, rc);
881         }
882
883         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
884         if (rc)
885                 CERROR("link error %d\n", rc);
886 cleanup:
887         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
888                                 handle, req, rc, 0);
889         EXIT;
890
891         switch (cleanup_phase) {
892         case 4: /* child dentry */
893                 l_dput(dchild);
894         case 3: /* locks */
895                 if (rc) {
896                         ldlm_lock_decref(&src_lockh, lock_mode);
897                         ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
898                 } else {
899                         memcpy(&req->rq_ack_locks[0].lock, &src_lockh,
900                                sizeof(src_lockh));
901                         memcpy(&req->rq_ack_locks[1].lock, &tgt_dir_lockh,
902                                sizeof(tgt_dir_lockh));
903                         req->rq_ack_locks[0].mode = lock_mode;
904                         req->rq_ack_locks[1].mode = lock_mode;
905                 }
906         case 2: /* target dentry */
907                 l_dput(de_tgt_dir);
908         case 1: /* source dentry */
909                 l_dput(de_src);
910         case 0:
911                 break;
912         default:
913                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
914                 LBUG();
915         }
916         req->rq_status = rc;
917         return 0;
918 }
919
920 static void reconstruct_reint_rename(struct mds_update_record *rec,
921                                      int offset, struct ptlrpc_request *req)
922 {
923         struct mds_export_data *med = &req->rq_export->exp_mds_data;
924         struct mds_client_data *mcd = med->med_mcd;
925
926         req->rq_transno = mcd->mcd_last_transno;
927         req->rq_status = mcd->mcd_last_result;
928
929         if (req->rq_export->exp_outstanding_reply)
930                 mds_steal_ack_locks(req->rq_export, req);
931         else
932                 LBUG(); /* don't support it yet, but it'll be fun! */
933
934 }
935
936 static int mds_reint_rename(struct mds_update_record *rec, int offset,
937                             struct ptlrpc_request *req,
938                             struct lustre_handle *lockh)
939 {
940         struct obd_device *obd = req->rq_export->exp_obd;
941         struct dentry *de_srcdir = NULL;
942         struct dentry *de_tgtdir = NULL;
943         struct dentry *de_old = NULL;
944         struct dentry *de_new = NULL;
945         struct mds_obd *mds = mds_req2mds(req);
946         struct lustre_handle dlm_handles[4];
947         struct ldlm_res_id p1_res_id = { .name = {0} };
948         struct ldlm_res_id p2_res_id = { .name = {0} };
949         struct ldlm_res_id c1_res_id = { .name = {0} };
950         struct ldlm_res_id c2_res_id = { .name = {0} };
951         int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
952         int cleanup_phase = 0;
953         void *handle = NULL;
954         ENTRY;
955
956         LASSERT(offset == 0);
957
958         MDS_CHECK_RESENT(req, reconstruct_reint_rename(rec, offset, req));
959
960         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
961         if (IS_ERR(de_srcdir))
962                 GOTO(cleanup, rc = PTR_ERR(de_srcdir));
963
964         cleanup_phase = 1; /* source directory dentry */
965
966         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
967         if (IS_ERR(de_tgtdir))
968                 GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
969
970         cleanup_phase = 2; /* target directory dentry */
971
972         /* The idea here is that we need to get four locks in the end:
973          * one on each parent directory, one on each child.  We need to take
974          * these locks in some kind of order (to avoid deadlocks), and the order
975          * I selected is "increasing resource number" order.  We need to take
976          * the locks on the parent directories, however, before we can lookup
977          * the children.  Thus the following plan:
978          *
979          * 1. Take locks on the parent(s), in order
980          * 2. Lookup the children
981          * 3. Take locks on the children, in order
982          * 4. Execute the rename
983          */
984
985         /* Step 1: Take locks on the parent(s), in order */
986         p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
987         p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
988
989         p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
990         p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
991
992         rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
993                                    &(dlm_handles[0]), &(dlm_handles[1]));
994         if (rc != ELDLM_OK)
995                 GOTO(cleanup, rc);
996
997         cleanup_phase = 3; /* parent locks */
998
999         /* Step 2: Lookup the children */
1000         de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
1001         if (IS_ERR(de_old)) {
1002                 CERROR("old child lookup error (%*s): %ld\n",
1003                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
1004                 GOTO(cleanup, rc = PTR_ERR(de_old));
1005         }
1006
1007         cleanup_phase = 4; /* original name dentry */
1008
1009         if (de_old->d_inode == NULL)
1010                 GOTO(cleanup, rc = -ENOENT);
1011
1012         /* sanity check for src inode */
1013         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1014             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
1015                 GOTO(cleanup, rc = -EINVAL);
1016
1017         de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
1018         if (IS_ERR(de_new)) {
1019                 CERROR("new child lookup error (%*s): %ld\n",
1020                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
1021                 GOTO(cleanup, rc = PTR_ERR(de_new));
1022         }
1023
1024         cleanup_phase = 5; /* target dentry */
1025
1026         /* sanity check for dest inode */
1027         if (de_new->d_inode &&
1028             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
1029             de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
1030                 GOTO(cleanup, rc = -EINVAL);
1031
1032         /* Step 3: Take locks on the children */
1033         c1_res_id.name[0] = de_old->d_inode->i_ino;
1034         c1_res_id.name[1] = de_old->d_inode->i_generation;
1035         if (de_new->d_inode == NULL) {
1036                 flags = LDLM_FL_LOCAL_ONLY;
1037                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
1038                                       c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
1039                                       &flags, ldlm_completion_ast,
1040                                       mds_blocking_ast, NULL,
1041                                       &(dlm_handles[2]));
1042                 lock_count = 3;
1043         } else {
1044                 c2_res_id.name[0] = de_new->d_inode->i_ino;
1045                 c2_res_id.name[1] = de_new->d_inode->i_generation;
1046                 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
1047                                            &(dlm_handles[2]),
1048                                            &(dlm_handles[3]));
1049                 lock_count = 4;
1050         }
1051         if (rc != ELDLM_OK)
1052                 GOTO(cleanup, rc);
1053
1054         cleanup_phase = 6; /* child locks */
1055
1056         /* Step 4: Execute the rename */
1057         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
1058                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
1059
1060         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
1061         if (IS_ERR(handle))
1062                 GOTO(cleanup, rc = PTR_ERR(handle));
1063
1064         lock_kernel();
1065         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
1066                         NULL);
1067         unlock_kernel();
1068
1069         EXIT;
1070 cleanup:
1071         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
1072                                 handle, req, rc, 0);
1073         switch (cleanup_phase) {
1074         case 6: /* child locks */
1075                 if (rc) {
1076                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
1077                         if (lock_count == 4)
1078                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
1079                 } else {
1080                         memcpy(&req->rq_ack_locks[2].lock, &(dlm_handles[2]),
1081                                sizeof(dlm_handles[2]));
1082                         req->rq_ack_locks[2].mode = LCK_EX;
1083                         if (lock_count == 4) {
1084                                 memcpy(&req->rq_ack_locks[3].lock,
1085                                        &dlm_handles[3], sizeof(dlm_handles[3]));
1086                                 req->rq_ack_locks[3].mode = LCK_EX;
1087                         }
1088                 }
1089         case 5: /* target dentry */
1090                 l_dput(de_new);
1091         case 4: /* source dentry */
1092                 l_dput(de_old);
1093         case 3: /* parent locks */
1094                 if (rc) {
1095                         ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
1096                         ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
1097                 } else {
1098                         memcpy(&req->rq_ack_locks[0].lock, &(dlm_handles[0]),
1099                                sizeof(dlm_handles[0]));
1100                         memcpy(&req->rq_ack_locks[1].lock, &(dlm_handles[1]),
1101                                sizeof(dlm_handles[1]));
1102                         req->rq_ack_locks[0].mode = LCK_EX;
1103                         req->rq_ack_locks[1].mode = LCK_EX;
1104                 }
1105         case 2: /* target directory dentry */
1106                 l_dput(de_tgtdir);
1107         case 1: /* source directry dentry */
1108                 l_dput(de_srcdir);
1109         case 0:
1110                 break;
1111         default:
1112                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1113                 LBUG();
1114         }
1115         req->rq_status = rc;
1116         return 0;
1117 }
1118
1119 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
1120                            struct ptlrpc_request *, struct lustre_handle *);
1121
1122 static mds_reinter reinters[REINT_MAX + 1] = {
1123         [REINT_SETATTR] mds_reint_setattr,
1124         [REINT_CREATE] mds_reint_create,
1125         [REINT_UNLINK] mds_reint_unlink,
1126         [REINT_LINK] mds_reint_link,
1127         [REINT_RENAME] mds_reint_rename,
1128         [REINT_OPEN] mds_open
1129 };
1130
1131 int mds_reint_rec(struct mds_update_record *rec, int offset,
1132                   struct ptlrpc_request *req, struct lustre_handle *lockh)
1133 {
1134         struct mds_obd *mds = mds_req2mds(req);
1135         struct obd_run_ctxt saved;
1136         int rc;
1137         ENTRY;
1138
1139         /* checked by unpacker */
1140         LASSERT(rec->ur_opcode <= REINT_MAX &&
1141                 reinters[rec->ur_opcode] != NULL);
1142
1143         push_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1144         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
1145         pop_ctxt(&saved, &mds->mds_ctxt, &rec->ur_uc);
1146
1147         RETURN(rc);
1148 }