Whamcloud - gitweb
854b357bc45702ce81a627b5ef6cb230c6e6c4fe
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  *  Author: Peter Braam <braam@clusterfs.com>
25  *  Author: Andreas Dilger <adilger@clusterfs.com>
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/obd_class.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 /* Assumes caller has already pushed us into the kernel context. */
43 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
44                          struct ptlrpc_request *req)
45 {
46         struct mds_export_data *med = &req->rq_export->exp_mds_data;
47         struct mds_client_data *mcd = med->med_mcd;
48         __u64 last_rcvd;
49         loff_t off;
50         int rc;
51
52         /* we don't allocate new transnos for replayed requests */
53         if (req->rq_level == LUSTRE_CONN_RECOVD)
54                 RETURN(0);
55
56         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
57
58         spin_lock(&mds->mds_last_lock);
59         last_rcvd = ++mds->mds_last_rcvd;
60         spin_unlock(&mds->mds_last_lock);
61         req->rq_repmsg->transno = HTON__u64(last_rcvd);
62         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
63         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
64         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
65
66         mds_fs_set_last_rcvd(mds, handle);
67         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
68         CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = "
69                "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc);
70
71         if (rc == sizeof(*mcd))
72                 rc = 0;
73         else {
74                 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
75                 if (rc >= 0)
76                         rc = -EIO;
77         }
78
79         return rc;
80 }
81
82 /* In the write-back case, the client holds a lock on a subtree.
83  * In the intent case, the client holds a lock on the child inode.
84  * In the pathname case, the client (may) hold a lock on the child inode. */
85 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
86                              struct ptlrpc_request *req)
87 {
88         struct mds_obd *mds = mds_req2mds(req);
89         struct obd_device *obd = req->rq_export->exp_obd;
90         struct mds_body *body;
91         struct dentry *de;
92         struct inode *inode;
93         void *handle;
94         struct lustre_handle child_lockh;
95         int rc = 0, err;
96
97         if (req->rq_reqmsg->bufcount > offset + 1) {
98                 struct dentry *dir;
99                 struct lustre_handle dir_lockh;
100                 char *name;
101                 int namelen;
102
103                 /* a name was supplied by the client; fid1 is the directory */
104                 dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
105                                             &dir_lockh);
106                 if (IS_ERR(dir)) {
107                         LBUG();
108                         GOTO(out_setattr, rc = PTR_ERR(dir));
109                 }
110
111                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
112                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
113                 de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
114                                             0, &child_lockh, LCK_PR);
115                 l_dput(dir);
116                 if (IS_ERR(de)) {
117                         LBUG();
118                         GOTO(out_setattr_de, rc = PTR_ERR(de));
119                 }
120         } else {
121                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
122                 if (!de || IS_ERR(de)) {
123                         GOTO(out_setattr_de, rc = PTR_ERR(de));
124                 }
125         }
126         inode = de->d_inode;
127         CDEBUG(D_INODE, "ino %ld\n", inode->i_ino);
128
129         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
130                        to_kdev_t(inode->i_sb->s_dev));
131
132         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
133         if (!handle)
134                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
135
136         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
137
138         if (offset) {
139                 body = lustre_msg_buf(req->rq_repmsg, 1);
140                 mds_pack_inode2fid(&body->fid1, inode);
141                 mds_pack_inode2body(body, inode);
142         }
143
144         if (!rc)
145                 rc = mds_update_last_rcvd(mds, handle, req);
146
147         err = mds_fs_commit(mds, de->d_inode, handle);
148         if (err) {
149                 CERROR("error on commit: err = %d\n", err);
150                 if (!rc)
151                         rc = err;
152         }
153
154         EXIT;
155 out_setattr_de:
156         l_dput(de);
157 out_setattr:
158         req->rq_status = rc;
159         return 0;
160 }
161
162 static int mds_reint_create(struct mds_update_record *rec, int offset,
163                             struct ptlrpc_request *req)
164 {
165         struct dentry *de = NULL;
166         struct mds_obd *mds = mds_req2mds(req);
167         struct obd_device *obd = req->rq_export->exp_obd;
168         struct dentry *dchild = NULL;
169         struct inode *dir;
170         void *handle;
171         struct lustre_handle lockh;
172         int rc = 0, err, lock_mode, type = rec->ur_mode & S_IFMT;
173         ENTRY;
174
175         /* requests were at offset 2, replies go back at 1 */
176         if (offset)
177                 offset = 1;
178
179         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
180
181         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
182
183         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
184                 GOTO(out_create, rc = -ESTALE);
185
186         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
187         if (IS_ERR(de)) {
188                 rc = PTR_ERR(de);
189                 CERROR("parent lookup error %d\n", rc);
190                 LBUG();
191                 GOTO(out_create, rc);
192         }
193         dir = de->d_inode;
194         CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n",
195                dir->i_ino, rec->ur_name, rec->ur_mode);
196
197         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
198
199         down(&dir->i_sem);
200         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
201         if (IS_ERR(dchild)) {
202                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
203                 LBUG();
204                 GOTO(out_create_de, rc = -ESTALE);
205         }
206
207         if (dchild->d_inode) {
208                 struct mds_body *body;
209                 struct inode *inode = dchild->d_inode;
210
211                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
212                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
213
214                 /* XXX check that mode is correct? */
215
216                 body = lustre_msg_buf(req->rq_repmsg, offset);
217                 mds_pack_inode2fid(&body->fid1, inode);
218                 mds_pack_inode2body(body, inode);
219                 if (S_ISREG(inode->i_mode))
220                         rc = mds_pack_md(mds, req, offset + 1, body, inode);
221
222                 /* This isn't an error for RECREATE. */
223                 if (rec->ur_opcode & REINT_REPLAYING) {
224                         CDEBUG(D_INODE, "EEXIST suppressed for REPLAYING\n");
225                         rc = 0;
226                 } else {
227                         rc = -EEXIST;
228                 }
229                 GOTO(out_create_dchild, rc);
230         }
231
232         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
233                        to_kdev_t(dir->i_sb->s_dev));
234
235         if (dir->i_mode & S_ISGID) {
236                 rec->ur_gid = dir->i_gid;
237                 if (S_ISDIR(rec->ur_mode))
238                         rec->ur_mode |= S_ISGID;
239         }
240
241         switch (type) {
242         case S_IFREG:{
243                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
244                 if (!handle)
245                         GOTO(out_create_dchild, PTR_ERR(handle));
246                 rc = vfs_create(dir, dchild, rec->ur_mode);
247                 EXIT;
248                 break;
249         }
250         case S_IFDIR:{
251                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
252                 if (!handle)
253                         GOTO(out_create_dchild, PTR_ERR(handle));
254                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
255                 EXIT;
256                 break;
257         }
258         case S_IFLNK:{
259                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
260                 if (!handle)
261                         GOTO(out_create_dchild, PTR_ERR(handle));
262                 rc = vfs_symlink(dir, dchild, rec->ur_name);
263                 EXIT;
264                 break;
265         }
266         case S_IFCHR:
267         case S_IFBLK:
268         case S_IFIFO:
269         case S_IFSOCK:{
270                 int rdev = rec->ur_rdev;
271                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
272                 if (!handle)
273                         GOTO(out_create_dchild, PTR_ERR(handle));
274                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
275                 EXIT;
276                 break;
277         }
278         default:
279                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
280                 GOTO(out_create_dchild, rc = -EINVAL);
281         }
282
283         if (rc) {
284                 CDEBUG(D_INODE, "error during create: %d\n", rc);
285                 GOTO(out_create_commit, rc);
286         } else {
287                 struct iattr iattr;
288                 struct inode *inode = dchild->d_inode;
289                 struct mds_body *body;
290
291                 iattr.ia_atime = rec->ur_time;
292                 iattr.ia_ctime = rec->ur_time;
293                 iattr.ia_mtime = rec->ur_time;
294                 iattr.ia_uid = rec->ur_uid;
295                 iattr.ia_gid = rec->ur_gid;
296                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
297                         ATTR_MTIME | ATTR_CTIME;
298
299                 if (rec->ur_fid2->id) {
300                         LASSERT(rec->ur_opcode & REINT_REPLAYING);
301                         inode->i_generation = rec->ur_fid2->generation;
302                         /* Dirtied and committed by this setattr: */
303                         CDEBUG(D_INODE, "recreated ino %ld with gen %ld\n",
304                                inode->i_ino, inode->i_generation);
305                 } else {
306                         CDEBUG(D_INODE, "created ino %ld\n", inode->i_ino);
307                 }
308
309                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
310                 if (rc) {
311                         CERROR("error on setattr: rc = %d\n", rc);
312                         /* XXX should we abort here in case of error? */
313                 }
314
315                 rc = mds_update_last_rcvd(mds, handle, req);
316                 if (rc) {
317                         CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
318                         GOTO(out_create_unlink, rc);
319                 }
320
321                 body = lustre_msg_buf(req->rq_repmsg, offset);
322                 mds_pack_inode2fid(&body->fid1, inode);
323                 mds_pack_inode2body(body, inode);
324         }
325         EXIT;
326 out_create_commit:
327         err = mds_fs_commit(mds, dir, handle);
328         if (err) {
329                 CERROR("error on commit: err = %d\n", err);
330                 if (!rc)
331                         rc = err;
332         }
333 out_create_dchild:
334         l_dput(dchild);
335         ldlm_lock_decref(&lockh, lock_mode);
336 out_create_de:
337         up(&dir->i_sem);
338         l_dput(de);
339 out_create:
340         req->rq_status = rc;
341         return 0;
342
343 out_create_unlink:
344         /* Destroy the file we just created.  This should not need extra
345          * journal credits, as we have already modified all of the blocks
346          * needed in order to create the file in the first place.
347          */
348         switch (type) {
349         case S_IFDIR:
350                 err = vfs_rmdir(dir, dchild);
351                 if (err)
352                         CERROR("failed rmdir in error path: rc = %d\n", err);
353                 break;
354         default:
355                 err = vfs_unlink(dir, dchild);
356                 if (err)
357                         CERROR("failed unlink in error path: rc = %d\n", err);
358                 break;
359         }
360
361         goto out_create_commit;
362 }
363
364 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
365                             struct ptlrpc_request *req)
366 {
367         struct dentry *de = NULL;
368         struct dentry *dchild = NULL;
369         struct mds_obd *mds = mds_req2mds(req);
370         struct obd_device *obd = req->rq_export->exp_obd;
371         struct mds_body *body = NULL;
372         char *name;
373         struct inode *dir, *inode;
374         struct lustre_handle lockh, child_lockh;
375         void *handle;
376         int namelen, lock_mode, err, rc = 0;
377         ENTRY;
378
379         /* a name was supplied by the client; fid1 is the directory */
380         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
381         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
382         if (IS_ERR(de)) {
383                 LBUG();
384                 RETURN(PTR_ERR(de));
385         }
386
387         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
388                 GOTO(out_unlink, rc = -ENOENT);
389
390         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
391         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
392 #warning "FIXME: if mds_name2locked_dentry decrefs this lock, we must not"
393         memcpy(&child_lockh, &lockh, sizeof(child_lockh));
394         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
395                                         LCK_EX, &child_lockh, lock_mode);
396
397         if (IS_ERR(dchild)) {
398                 LBUG();
399                 GOTO(out_unlink, rc = PTR_ERR(dchild));
400         }
401
402         dir = de->d_inode;
403         inode = dchild->d_inode;
404         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
405
406         if (!inode) {
407                 if (rec->ur_opcode & REINT_REPLAYING) {
408                         CDEBUG(D_INODE,
409                                "child missing (%ld/%s); OK for REPLAYING\n",
410                                dir->i_ino, rec->ur_name);
411                         rc = 0;
412                 } else {
413                         CDEBUG(D_INODE,
414                                "child doesn't exist (dir %ld, name %s)\n",
415                                dir->i_ino, rec->ur_name);
416                         rc = -ENOENT;
417                 }
418                 /* going to out_unlink_cancel causes an LBUG, don't know why */
419                 GOTO(out_unlink_dchild, rc);
420         }
421
422         if (offset) {
423                 /* XXX offset? */
424                 offset = 1;
425
426                 body = lustre_msg_buf(req->rq_repmsg, offset);
427                 mds_pack_inode2fid(&body->fid1, inode);
428                 mds_pack_inode2body(body, inode);
429         }
430
431         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
432                        to_kdev_t(dir->i_sb->s_dev));
433
434         switch (rec->ur_mode /* & S_IFMT ? */) {
435         case S_IFDIR:
436                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
437                 if (!handle)
438                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
439                 rc = vfs_rmdir(dir, dchild);
440                 break;
441         case S_IFREG:
442                 /* get OBD EA data first so client can also destroy object */
443                 if ((inode->i_mode & S_IFMT) == S_IFREG && offset)
444                         rc = mds_pack_md(mds, req, offset + 1, body, inode);
445                 /* no break */
446         case S_IFLNK:
447         case S_IFCHR:
448         case S_IFBLK:
449         case S_IFIFO:
450         case S_IFSOCK:
451                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
452                 if (!handle)
453                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
454                 rc = vfs_unlink(dir, dchild);
455                 break;
456         default:
457                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
458                 handle = NULL;
459                 LBUG();
460                 GOTO(out_unlink_cancel, rc = -EINVAL);
461         }
462
463         if (!rc)
464                 rc = mds_update_last_rcvd(mds, handle, req);
465         err = mds_fs_commit(mds, dir, handle);
466         if (err) {
467                 CERROR("error on commit: err = %d\n", err);
468                 if (!rc)
469                         rc = err;
470         }
471
472         EXIT;
473
474 out_unlink_cancel:
475         ldlm_lock_decref(&child_lockh, LCK_EX);
476         err = ldlm_cli_cancel(&child_lockh);
477         if (err < 0) {
478                 CERROR("failed to cancel child inode lock: err = %d\n", err);
479                 if (!rc)
480                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
481         }
482 out_unlink_dchild:
483         l_dput(dchild);
484         up(&dir->i_sem);
485 out_unlink:
486         ldlm_lock_decref(&lockh, lock_mode);
487         l_dput(de);
488         req->rq_status = rc;
489         return 0;
490 }
491
492 static int mds_reint_link(struct mds_update_record *rec, int offset,
493                           struct ptlrpc_request *req)
494 {
495         struct obd_device *obd = req->rq_export->exp_obd;
496         struct dentry *de_src = NULL;
497         struct dentry *de_tgt_dir = NULL;
498         struct dentry *dchild = NULL;
499         struct mds_obd *mds = mds_req2mds(req);
500         struct lustre_handle *handle, tgtlockh, srclockh;
501         int lock_mode;
502         __u64 res_id[3] = { 0 };
503         int flags = 0;
504         int rc = 0, err;
505
506         ENTRY;
507         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
508         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
509                 GOTO(out_link, rc = -ESTALE);
510         }
511
512         /* plan to change the link count on this inode: write lock */
513         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
514         res_id[0] = de_src->d_inode->i_ino;
515         res_id[1] = de_src->d_inode->i_generation;
516
517         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
518                              NULL, 0, lock_mode, &srclockh);
519         if (rc == 0) {
520                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
521                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
522                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
523                                       &flags, ldlm_completion_ast,
524                                       mds_blocking_ast, NULL, 0, &srclockh);
525                 if (rc != ELDLM_OK) {
526                         CERROR("lock enqueue: err: %d\n", rc);
527                         GOTO(out_link_src_put, rc = -EIO);
528                 }
529         } else
530                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
531
532         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
533         if (IS_ERR(de_tgt_dir)) {
534                 GOTO(out_link_src, rc = -ESTALE);
535         }
536
537         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
538         res_id[0] = de_tgt_dir->d_inode->i_ino;
539         res_id[1] = de_tgt_dir->d_inode->i_generation;
540
541         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
542                              NULL, 0, lock_mode, &tgtlockh);
543         if (rc == 0) {
544                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
545                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
546                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
547                                       &flags, ldlm_completion_ast,
548                                       mds_blocking_ast, NULL, 0, &tgtlockh);
549                 if (rc != ELDLM_OK) {
550                         CERROR("lock enqueue: err: %d\n", rc);
551                         GOTO(out_link_tgt_dir_put, rc = -EIO);
552                 }
553         } else
554                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
555
556         down(&de_tgt_dir->d_inode->i_sem);
557         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
558         if (IS_ERR(dchild)) {
559                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
560                 GOTO(out_link_tgt_dir, rc = -ESTALE);
561         }
562
563         if (dchild->d_inode) {
564                 struct inode *inode = dchild->d_inode;
565                 /* in intent case ship back attributes to client */
566                 if (offset) {
567                         struct mds_body *body =
568                                 lustre_msg_buf(req->rq_repmsg, 1);
569
570                         mds_pack_inode2fid(&body->fid1, inode);
571                         mds_pack_inode2body(body, inode);
572                         if (S_ISREG(inode->i_mode))
573                                 rc = mds_pack_md(mds, req, 2, body, inode);
574                 }
575                 if (rec->ur_opcode & REINT_REPLAYING) {
576                         /* XXX verify that the link is to the the right file? */
577                         rc = 0;
578                         CDEBUG(D_INODE,
579                                "child exists (dir %ld, name %s) (REPLAYING)\n",
580                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
581                 } else {
582                         rc = -EEXIST;
583                         CERROR("child exists (dir %ld, name %s)\n",
584                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
585                 }
586                 GOTO(out_link_dchild, rc);
587         }
588
589         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
590                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
591
592         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
593         if (!handle)
594                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
595
596         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
597         if (rc)
598                 CERROR("link error %d\n", rc);
599         if (!rc)
600                 rc = mds_update_last_rcvd(mds, handle, req);
601
602         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
603         if (err) {
604                 CERROR("error on commit: err = %d\n", err);
605                 if (!rc)
606                         rc = err;
607         }
608         EXIT;
609
610 out_link_dchild:
611         l_dput(dchild);
612 out_link_tgt_dir:
613         ldlm_lock_decref(&tgtlockh, lock_mode);
614 out_link_tgt_dir_put:
615         up(&de_tgt_dir->d_inode->i_sem);
616         l_dput(de_tgt_dir);
617 out_link_src:
618         ldlm_lock_decref(&srclockh, lock_mode);
619 out_link_src_put:
620         l_dput(de_src);
621 out_link:
622         req->rq_status = rc;
623         return 0;
624 }
625
626 static int mds_reint_rename(struct mds_update_record *rec, int offset,
627                             struct ptlrpc_request *req)
628 {
629         struct obd_device *obd = req->rq_export->exp_obd;
630         struct dentry *de_srcdir = NULL;
631         struct dentry *de_tgtdir = NULL;
632         struct dentry *de_old = NULL;
633         struct dentry *de_new = NULL;
634         struct mds_obd *mds = mds_req2mds(req);
635         struct lustre_handle tgtlockh, srclockh, oldhandle;
636         int flags = 0, lock_mode, rc = 0, err;
637         void *handle;
638         __u64 res_id[3] = { 0 };
639         ENTRY;
640
641         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
642         if (IS_ERR(de_srcdir))
643                 GOTO(out_rename, rc = -ESTALE);
644
645         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
646         res_id[0] = de_srcdir->d_inode->i_ino;
647         res_id[1] = de_srcdir->d_inode->i_generation;
648
649         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
650                              NULL, 0, lock_mode, &srclockh);
651         if (rc == 0) {
652                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
653                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
654                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
655                                       &flags, ldlm_completion_ast,
656                                       mds_blocking_ast, NULL, 0, &srclockh);
657                 if (rc != ELDLM_OK) {
658                         CERROR("lock enqueue: err: %d\n", rc);
659                         GOTO(out_rename_srcput, rc = -EIO);
660                 }
661         } else
662                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
663
664         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
665         if (IS_ERR(de_tgtdir))
666                 GOTO(out_rename_srcdir, rc = -ESTALE);
667
668         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
669         res_id[0] = de_tgtdir->d_inode->i_ino;
670         res_id[1] = de_tgtdir->d_inode->i_generation;
671
672         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
673                              NULL, 0, lock_mode, &tgtlockh);
674         if (rc == 0) {
675                 flags = 0;
676                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
677                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
678                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
679                                       &flags, ldlm_completion_ast,
680                                       mds_blocking_ast, NULL, 0, &tgtlockh);
681                 if (rc != ELDLM_OK) {
682                         CERROR("lock enqueue: err: %d\n", rc);
683                         GOTO(out_rename_tgtput, rc = -EIO);
684                 }
685         } else
686                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
687
688 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
689         double_lock(de_tgtdir, de_srcdir);
690 #endif
691         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
692         if (IS_ERR(de_old)) {
693                 CERROR("old child lookup error (%*s): %ld\n",
694                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
695                 GOTO(out_rename_tgtdir, rc = -ENOENT);
696         }
697
698         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
699         if (IS_ERR(de_new)) {
700                 CERROR("new child lookup error (%*s): %ld\n",
701                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
702                 GOTO(out_rename_deold, rc = -ENOENT);
703         }
704
705         /* in intent case ship back attributes to client */
706         if (offset) {
707                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
708                 struct inode *inode = de_new->d_inode;
709
710                 if (!inode) {
711                         body->valid = 0;
712                 } else {
713                         mds_pack_inode2fid(&body->fid1, inode);
714                         mds_pack_inode2body(body, inode);
715                         if (S_ISREG(inode->i_mode))
716                                 rc = mds_pack_md(mds, req, 2, body, inode);
717                 }
718         }
719
720         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
721                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
722
723         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
724         if (!handle)
725                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
726         lock_kernel();
727         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
728                         NULL);
729         unlock_kernel();
730
731         if (!rc)
732                 rc = mds_update_last_rcvd(mds, handle, req);
733
734         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
735         if (err) {
736                 CERROR("error on commit: err = %d\n", err);
737                 if (!rc)
738                         rc = err;
739         }
740         EXIT;
741
742 out_rename_denew:
743         l_dput(de_new);
744 out_rename_deold:
745         if (!rc) {
746                 res_id[0] = de_old->d_inode->i_ino;
747                 res_id[1] = de_old->d_inode->i_generation;
748                 flags = 0;
749                 /* Take an exclusive lock on the resource that we're
750                  * about to free, to force everyone to drop their
751                  * locks. */
752                 LDLM_DEBUG_NOLOCK("getting EX lock res "LPU64, res_id[0]);
753                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
754                                       res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
755                                       &flags, ldlm_completion_ast,
756                                       mds_blocking_ast, NULL, 0, &oldhandle);
757                 if (rc)
758                         CERROR("failed to get child inode lock (child ino "
759                                LPD64" dir ino %ld)\n",
760                                res_id[0], de_old->d_inode->i_ino);
761         }
762
763         l_dput(de_old);
764
765         if (!rc) {
766                 ldlm_lock_decref(&oldhandle, LCK_EX);
767                 rc = ldlm_cli_cancel(&oldhandle);
768                 if (rc < 0)
769                         CERROR("failed to cancel child inode lock ino "
770                                LPD64": %d\n", res_id[0], rc);
771         }
772 out_rename_tgtdir:
773 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
774         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
775 #endif
776         ldlm_lock_decref(&tgtlockh, lock_mode);
777 out_rename_tgtput:
778         l_dput(de_tgtdir);
779 out_rename_srcdir:
780         ldlm_lock_decref(&srclockh, lock_mode);
781 out_rename_srcput:
782         l_dput(de_srcdir);
783 out_rename:
784         req->rq_status = rc;
785         return 0;
786 }
787
788 typedef int (*mds_reinter) (struct mds_update_record *, int offset,
789                             struct ptlrpc_request *);
790
791 static mds_reinter reinters[REINT_MAX + 1] = {
792         [REINT_SETATTR] mds_reint_setattr,
793         [REINT_CREATE] mds_reint_create,
794         [REINT_UNLINK] mds_reint_unlink,
795         [REINT_LINK] mds_reint_link,
796         [REINT_RENAME] mds_reint_rename,
797 };
798
799 int mds_reint_rec(struct mds_update_record *rec, int offset,
800                   struct ptlrpc_request *req)
801 {
802         struct mds_obd *mds = mds_req2mds(req);
803         struct obd_run_ctxt saved;
804         struct obd_ucred uc;
805         int realop = rec->ur_opcode & REINT_OPCODE_MASK;
806         int rc;
807
808         if (realop < 1 || realop > REINT_MAX) {
809                 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
810                        rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
811                 rc = req->rq_status = -EINVAL;
812                 RETURN(rc);
813         }
814
815         uc.ouc_fsuid = rec->ur_fsuid;
816         uc.ouc_fsgid = rec->ur_fsgid;
817         uc.ouc_cap = rec->ur_cap;
818
819         push_ctxt(&saved, &mds->mds_ctxt, &uc);
820         rc = reinters[realop] (rec, offset, req);
821         pop_ctxt(&saved);
822
823         return rc;
824 }