Whamcloud - gitweb
land b_md onto HEAD. the highlights:
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  *  Author: Peter Braam <braam@clusterfs.com>
25  *  Author: Andreas Dilger <adilger@clusterfs.com>
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/lustre_fsfilt.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 static void mds_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd, int error)
43 {
44         struct mds_obd *mds = &obd->u.mds;
45
46         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
47                last_rcvd, error);
48         if (!error && last_rcvd > mds->mds_last_committed)
49                 mds->mds_last_committed = last_rcvd;
50 }
51
52 void mds_start_transno(struct mds_obd *mds)
53 {
54         ENTRY;
55         down(&mds->mds_transno_sem);
56 }
57
58 /* Assumes caller has already pushed us into the kernel context. */
59 int mds_finish_transno(struct mds_obd *mds, void *handle,
60                        struct ptlrpc_request *req, int rc)
61 {
62         struct mds_export_data *med = &req->rq_export->exp_mds_data;
63         struct mds_client_data *mcd = med->med_mcd;
64         __u64 last_rcvd;
65         loff_t off;
66         ssize_t written;
67
68         /* Propagate error code. */
69         if (rc)
70                 GOTO(out, rc);
71
72         /* we don't allocate new transnos for replayed requests */
73         if (req->rq_level == LUSTRE_CONN_RECOVD)
74                 GOTO(out, rc = 0);
75
76         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
77
78         last_rcvd = ++mds->mds_last_rcvd;
79         req->rq_repmsg->transno = HTON__u64(last_rcvd);
80         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
81         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
82         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
83
84         fsfilt_set_last_rcvd(req->rq_export->exp_obd, last_rcvd, handle,
85                              mds_last_rcvd_cb);
86         written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
87                                 &off);
88         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
89                "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, written);
90
91         if (written == sizeof(*mcd))
92                 GOTO(out, rc = 0);
93         CERROR("error writing to last_rcvd file: rc = %d\n", rc);
94         if (written >= 0)
95                 GOTO(out, rc = -EIO);
96
97         rc = 0;
98
99         EXIT;
100  out:
101         up(&mds->mds_transno_sem);
102         return rc;
103 }
104
105 /* In the write-back case, the client holds a lock on a subtree.
106  * In the intent case, the client holds a lock on the child inode.
107  * In the pathname case, the client (may) hold a lock on the child inode. */
108 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
109                              struct ptlrpc_request *req)
110 {
111         struct mds_obd *mds = mds_req2mds(req);
112         struct obd_device *obd = req->rq_export->exp_obd;
113         struct mds_body *body;
114         struct dentry *de;
115         struct inode *inode;
116         void *handle;
117         struct lustre_handle child_lockh;
118         int rc = 0, err;
119
120         if (req->rq_reqmsg->bufcount > offset + 1) {
121                 struct dentry *dir;
122                 struct lustre_handle dir_lockh;
123                 char *name;
124                 int namelen;
125
126                 /* a name was supplied by the client; fid1 is the directory */
127                 dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
128                                             &dir_lockh);
129                 if (IS_ERR(dir)) {
130                         LBUG();
131                         GOTO(out_setattr, rc = PTR_ERR(dir));
132                 }
133
134                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
135                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
136                 de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
137                                             0, &child_lockh, LCK_PR);
138                 l_dput(dir);
139                 if (IS_ERR(de)) {
140                         LBUG();
141                         GOTO(out_setattr_de, rc = PTR_ERR(de));
142                 }
143         } else {
144                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
145                 if (!de || IS_ERR(de)) {
146                         GOTO(out_setattr_de, rc = PTR_ERR(de));
147                 }
148         }
149         inode = de->d_inode;
150         CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
151
152         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
153                        to_kdev_t(inode->i_sb->s_dev));
154
155         mds_start_transno(mds);
156         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
157         if (IS_ERR(handle)) {
158                 rc = PTR_ERR(handle);
159                 (void)mds_finish_transno(mds, handle, req, rc);
160                 GOTO(out_setattr_de, rc);
161         }
162
163         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr);
164
165         if (offset) {
166                 body = lustre_msg_buf(req->rq_repmsg, 1);
167                 mds_pack_inode2fid(&body->fid1, inode);
168                 mds_pack_inode2body(body, inode);
169         }
170
171         rc = mds_finish_transno(mds, handle, req, rc);
172
173         err = fsfilt_commit(obd, de->d_inode, handle);
174         if (err) {
175                 CERROR("error on commit: err = %d\n", err);
176                 if (!rc)
177                         rc = err;
178         }
179
180         EXIT;
181 out_setattr_de:
182         l_dput(de);
183 out_setattr:
184         req->rq_status = rc;
185         return 0;
186 }
187
188 static int mds_reint_create(struct mds_update_record *rec, int offset,
189                             struct ptlrpc_request *req)
190 {
191         struct dentry *de = NULL;
192         struct mds_obd *mds = mds_req2mds(req);
193         struct obd_device *obd = req->rq_export->exp_obd;
194         struct dentry *dchild = NULL;
195         struct inode *dir;
196         void *handle;
197         struct lustre_handle lockh;
198         int rc = 0, err, lock_mode, type = rec->ur_mode & S_IFMT;
199         ENTRY;
200
201         /* requests were at offset 2, replies go back at 1 */
202         if (offset)
203                 offset = 1;
204
205         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
206
207         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
208
209         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
210                 GOTO(out_create, rc = -ESTALE);
211
212         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
213         if (IS_ERR(de)) {
214                 rc = PTR_ERR(de);
215                 CERROR("parent lookup error %d\n", rc);
216                 LBUG();
217                 GOTO(out_create, rc);
218         }
219         dir = de->d_inode;
220         CDEBUG(D_INODE, "parent ino %lu name %s mode %o\n",
221                dir->i_ino, rec->ur_name, rec->ur_mode);
222
223         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
224
225         down(&dir->i_sem);
226         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
227         if (IS_ERR(dchild)) {
228                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
229                 LBUG();
230                 GOTO(out_create_de, rc = -ESTALE);
231         }
232
233         if (dchild->d_inode) {
234                 struct mds_body *body;
235                 struct inode *inode = dchild->d_inode;
236
237                 CDEBUG(D_INODE, "child exists (dir %lu, name %s, ino %lu)\n",
238                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
239
240                 /* XXX check that mode is correct? */
241
242                 body = lustre_msg_buf(req->rq_repmsg, offset);
243                 mds_pack_inode2fid(&body->fid1, inode);
244                 mds_pack_inode2body(body, inode);
245                 if (S_ISREG(inode->i_mode))
246                         mds_pack_md(mds, req, offset + 1, body, inode);
247
248                 /* This isn't an error for RECREATE. */
249                 if (rec->ur_opcode & REINT_REPLAYING) {
250                         CDEBUG(D_INODE, "EEXIST suppressed for REPLAYING\n");
251                         rc = 0;
252                 } else {
253                         rc = -EEXIST;
254                 }
255                 GOTO(out_create_dchild, rc);
256         }
257
258         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
259                        to_kdev_t(dir->i_sb->s_dev));
260
261         if (dir->i_mode & S_ISGID) {
262                 rec->ur_gid = dir->i_gid;
263                 if (S_ISDIR(rec->ur_mode))
264                         rec->ur_mode |= S_ISGID;
265         }
266
267         /* From here on, we must exit via a path that calls mds_finish_transno,
268          * so that we release the mds_transno_sem (and, in the case of success,
269          * update the transno correctly).  out_create_commit and
270          * out_transno_dchild are good candidates.
271          */
272         mds_start_transno(mds);
273
274         switch (type) {
275         case S_IFREG:{
276                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
277                 if (IS_ERR(handle))
278                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
279                 rc = vfs_create(dir, dchild, rec->ur_mode);
280                 EXIT;
281                 break;
282         }
283         case S_IFDIR:{
284                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
285                 if (IS_ERR(handle))
286                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
287                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
288                 EXIT;
289                 break;
290         }
291         case S_IFLNK:{
292                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
293                 if (IS_ERR(handle))
294                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
295                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
296                 EXIT;
297                 break;
298         }
299         case S_IFCHR:
300         case S_IFBLK:
301         case S_IFIFO:
302         case S_IFSOCK:{
303                 int rdev = rec->ur_rdev;
304                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
305                 if (IS_ERR(handle))
306                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
307                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
308                 EXIT;
309                 break;
310         }
311         default:
312                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
313                 handle = NULL; /* quell uninitialized warning */
314                 GOTO(out_transno_dchild, rc = -EINVAL);
315         }
316
317         if (rc) {
318                 CDEBUG(D_INODE, "error during create: %d\n", rc);
319                 GOTO(out_create_commit, rc);
320         } else {
321                 struct iattr iattr;
322                 struct inode *inode = dchild->d_inode;
323                 struct mds_body *body;
324
325                 iattr.ia_atime = rec->ur_time;
326                 iattr.ia_ctime = rec->ur_time;
327                 iattr.ia_mtime = rec->ur_time;
328                 iattr.ia_uid = rec->ur_uid;
329                 iattr.ia_gid = rec->ur_gid;
330                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
331                         ATTR_MTIME | ATTR_CTIME;
332
333                 if (rec->ur_fid2->id) {
334                         LASSERT(rec->ur_opcode & REINT_REPLAYING);
335                         inode->i_generation = rec->ur_fid2->generation;
336                         /* Dirtied and committed by the upcoming setattr. */
337                         CDEBUG(D_INODE, "recreated ino %lu with gen %lu\n",
338                                inode->i_ino, inode->i_generation);
339                 } else {
340                         CDEBUG(D_INODE, "created ino %lu\n", inode->i_ino);
341                 }
342
343                 rc = fsfilt_setattr(obd, dchild, handle, &iattr);
344                 if (rc) {
345                         CERROR("error on setattr: rc = %d\n", rc);
346                         /* XXX should we abort here in case of error? */
347                 }
348
349                 body = lustre_msg_buf(req->rq_repmsg, offset);
350                 mds_pack_inode2fid(&body->fid1, inode);
351                 mds_pack_inode2body(body, inode);
352         }
353         EXIT;
354 out_create_commit:
355         if (rc) {
356                 rc = mds_finish_transno(mds, handle, req, rc);
357         } else {
358                 rc = mds_finish_transno(mds, handle, req, rc);
359                 if (rc)
360                         GOTO(out_create_unlink, rc);
361         }
362         err = fsfilt_commit(obd, dir, handle);
363         if (err) {
364                 CERROR("error on commit: err = %d\n", err);
365                 if (!rc)
366                         rc = err;
367         }
368 out_create_dchild:
369         l_dput(dchild);
370         ldlm_lock_decref(&lockh, lock_mode);
371 out_create_de:
372         up(&dir->i_sem);
373         l_dput(de);
374 out_create:
375         req->rq_status = rc;
376         return 0;
377
378 out_transno_dchild:
379         /* Need to release the transno lock, and then put the dchild. */
380         LASSERT(rc);
381         mds_finish_transno(mds, handle, req, rc);
382         goto out_create_dchild;
383
384 out_create_unlink:
385         /* Destroy the file we just created.  This should not need extra
386          * journal credits, as we have already modified all of the blocks
387          * needed in order to create the file in the first place.
388          */
389         switch (type) {
390         case S_IFDIR:
391                 err = vfs_rmdir(dir, dchild);
392                 if (err)
393                         CERROR("failed rmdir in error path: rc = %d\n", err);
394                 break;
395         default:
396                 err = vfs_unlink(dir, dchild);
397                 if (err)
398                         CERROR("failed unlink in error path: rc = %d\n", err);
399                 break;
400         }
401
402         goto out_create_commit;
403 }
404
405 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
406                             struct ptlrpc_request *req)
407 {
408         struct dentry *de = NULL;
409         struct dentry *dchild = NULL;
410         struct mds_obd *mds = mds_req2mds(req);
411         struct obd_device *obd = req->rq_export->exp_obd;
412         struct mds_body *body = NULL;
413         char *name;
414         struct inode *dir, *inode;
415         struct lustre_handle lockh, child_lockh;
416         void *handle;
417         int namelen, lock_mode, err, rc = 0;
418         ENTRY;
419
420         /* a name was supplied by the client; fid1 is the directory */
421         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
422         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
423         if (IS_ERR(de)) {
424                 LBUG();
425                 RETURN(PTR_ERR(de));
426         }
427
428         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
429                 GOTO(out_unlink, rc = -ENOENT);
430
431         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
432         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
433 #warning "FIXME: if mds_name2locked_dentry decrefs this lock, we must not"
434         memcpy(&child_lockh, &lockh, sizeof(child_lockh));
435         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
436                                         LCK_EX, &child_lockh, lock_mode);
437
438         if (IS_ERR(dchild)) {
439                 LBUG();
440                 GOTO(out_unlink, rc = PTR_ERR(dchild));
441         }
442
443         dir = de->d_inode;
444         inode = dchild->d_inode;
445         CDEBUG(D_INODE, "parent ino %lu\n", dir->i_ino);
446
447         if (!inode) {
448                 if (rec->ur_opcode & REINT_REPLAYING) {
449                         CDEBUG(D_INODE,
450                                "child missing (%lu/%s); OK for REPLAYING\n",
451                                dir->i_ino, rec->ur_name);
452                         rc = 0;
453                 } else {
454                         CDEBUG(D_INODE,
455                                "child doesn't exist (dir %lu, name %s)\n",
456                                dir->i_ino, rec->ur_name);
457                         rc = -ENOENT;
458                 }
459                 /* going to out_unlink_cancel causes an LBUG, don't know why */
460                 GOTO(out_unlink_dchild, rc);
461         }
462
463         if (offset) {
464                 /* XXX offset? */
465                 offset = 1;
466
467                 body = lustre_msg_buf(req->rq_repmsg, offset);
468                 mds_pack_inode2fid(&body->fid1, inode);
469                 mds_pack_inode2body(body, inode);
470         }
471
472         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
473                        to_kdev_t(dir->i_sb->s_dev));
474
475         mds_start_transno(mds);
476         switch (rec->ur_mode /* & S_IFMT ? */) {
477         case S_IFDIR:
478                 handle = fsfilt_start(obd, dir, FSFILT_OP_RMDIR);
479                 if (IS_ERR(handle))
480                         GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
481                 rc = vfs_rmdir(dir, dchild);
482                 break;
483         case S_IFREG:
484                 /* get OBD EA data first so client can also destroy object */
485                 if ((inode->i_mode & S_IFMT) == S_IFREG && offset)
486                         mds_pack_md(mds, req, offset + 1, body, inode);
487                 /* no break */
488         case S_IFLNK:
489         case S_IFCHR:
490         case S_IFBLK:
491         case S_IFIFO:
492         case S_IFSOCK:
493                 handle = fsfilt_start(obd, dir, FSFILT_OP_UNLINK);
494                 if (IS_ERR(handle))
495                         GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
496                 rc = vfs_unlink(dir, dchild);
497                 break;
498         default:
499                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
500                 handle = NULL;
501                 LBUG();
502                 GOTO(out_unlink_cancel_transno, rc = -EINVAL);
503         }
504
505         rc = mds_finish_transno(mds, handle, req, rc);
506         err = fsfilt_commit(obd, dir, handle);
507         if (err) {
508                 CERROR("error on commit: err = %d\n", err);
509                 if (!rc)
510                         rc = err;
511         }
512
513         EXIT;
514
515 out_unlink_cancel:
516         ldlm_lock_decref(&child_lockh, LCK_EX);
517         err = ldlm_cli_cancel(&child_lockh);
518         if (err < 0) {
519                 CERROR("failed to cancel child inode lock: err = %d\n", err);
520                 if (!rc)
521                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
522         }
523 out_unlink_dchild:
524         l_dput(dchild);
525         up(&dir->i_sem);
526 out_unlink:
527         ldlm_lock_decref(&lockh, lock_mode);
528         l_dput(de);
529         req->rq_status = rc;
530         return 0;
531
532 out_unlink_cancel_transno:
533         rc = mds_finish_transno(mds, handle, req, rc);
534         goto out_unlink_cancel;
535 }
536
537 static int mds_reint_link(struct mds_update_record *rec, int offset,
538                           struct ptlrpc_request *req)
539 {
540         struct obd_device *obd = req->rq_export->exp_obd;
541         struct dentry *de_src = NULL;
542         struct dentry *de_tgt_dir = NULL;
543         struct dentry *dchild = NULL;
544         struct mds_obd *mds = mds_req2mds(req);
545         struct lustre_handle *handle, tgtlockh, srclockh;
546         int lock_mode;
547         __u64 res_id[3] = { 0 };
548         int flags = 0;
549         int rc = 0, err;
550
551         ENTRY;
552         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
553         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
554                 GOTO(out_link, rc = -ESTALE);
555         }
556
557         /* plan to change the link count on this inode: write lock */
558         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
559         res_id[0] = de_src->d_inode->i_ino;
560         res_id[1] = de_src->d_inode->i_generation;
561
562         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
563                              NULL, 0, lock_mode, &srclockh);
564         if (rc == 0) {
565                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
566                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
567                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
568                                       &flags, ldlm_completion_ast,
569                                       mds_blocking_ast, NULL, 0, &srclockh);
570                 if (rc != ELDLM_OK) {
571                         CERROR("lock enqueue: err: %d\n", rc);
572                         GOTO(out_link_src_put, rc = -EIO);
573                 }
574         } else
575                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
576
577         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
578         if (IS_ERR(de_tgt_dir)) {
579                 GOTO(out_link_src, rc = -ESTALE);
580         }
581
582         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
583         res_id[0] = de_tgt_dir->d_inode->i_ino;
584         res_id[1] = de_tgt_dir->d_inode->i_generation;
585
586         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
587                              NULL, 0, lock_mode, &tgtlockh);
588         if (rc == 0) {
589                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
590                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
591                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
592                                       &flags, ldlm_completion_ast,
593                                       mds_blocking_ast, NULL, 0, &tgtlockh);
594                 if (rc != ELDLM_OK) {
595                         CERROR("lock enqueue: err: %d\n", rc);
596                         GOTO(out_link_tgt_dir_put, rc = -EIO);
597                 }
598         } else
599                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
600
601         down(&de_tgt_dir->d_inode->i_sem);
602         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
603         if (IS_ERR(dchild)) {
604                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
605                 GOTO(out_link_tgt_dir, rc = -ESTALE);
606         }
607
608         if (dchild->d_inode) {
609                 struct inode *inode = dchild->d_inode;
610                 /* in intent case ship back attributes to client */
611                 if (offset) {
612                         struct mds_body *body =
613                                 lustre_msg_buf(req->rq_repmsg, 1);
614
615                         mds_pack_inode2fid(&body->fid1, inode);
616                         mds_pack_inode2body(body, inode);
617                         if (S_ISREG(inode->i_mode))
618                                 mds_pack_md(mds, req, 2, body, inode);
619                 }
620                 if (rec->ur_opcode & REINT_REPLAYING) {
621                         /* XXX verify that the link is to the the right file? */
622                         rc = 0;
623                         CDEBUG(D_INODE,
624                                "child exists (dir %lu, name %s) (REPLAYING)\n",
625                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
626                 } else {
627                         rc = -EEXIST;
628                         CERROR("child exists (dir %lu, name %s)\n",
629                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
630                 }
631                 GOTO(out_link_dchild, rc);
632         }
633
634         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
635                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
636
637         mds_start_transno(mds);
638         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
639         if (IS_ERR(handle)) {
640                 rc = PTR_ERR(handle);
641                 mds_finish_transno(mds, handle, req, rc);
642                 GOTO(out_link_dchild, rc);
643         }
644
645         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
646         if (rc)
647                 CERROR("link error %d\n", rc);
648         rc = mds_finish_transno(mds, handle, req, rc);
649
650         err = fsfilt_commit(obd, de_tgt_dir->d_inode, handle);
651         if (err) {
652                 CERROR("error on commit: err = %d\n", err);
653                 if (!rc)
654                         rc = err;
655         }
656         EXIT;
657
658 out_link_dchild:
659         l_dput(dchild);
660 out_link_tgt_dir:
661         ldlm_lock_decref(&tgtlockh, lock_mode);
662 out_link_tgt_dir_put:
663         up(&de_tgt_dir->d_inode->i_sem);
664         l_dput(de_tgt_dir);
665 out_link_src:
666         ldlm_lock_decref(&srclockh, lock_mode);
667 out_link_src_put:
668         l_dput(de_src);
669 out_link:
670         req->rq_status = rc;
671         return 0;
672 }
673
674 static int mds_reint_rename(struct mds_update_record *rec, int offset,
675                             struct ptlrpc_request *req)
676 {
677         struct obd_device *obd = req->rq_export->exp_obd;
678         struct dentry *de_srcdir = NULL;
679         struct dentry *de_tgtdir = NULL;
680         struct dentry *de_old = NULL;
681         struct dentry *de_new = NULL;
682         struct mds_obd *mds = mds_req2mds(req);
683         struct lustre_handle tgtlockh, srclockh, oldhandle;
684         int flags = 0, lock_mode, rc = 0, err;
685         void *handle;
686         __u64 res_id[3] = { 0 };
687         ENTRY;
688
689         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
690         if (IS_ERR(de_srcdir))
691                 GOTO(out_rename, rc = -ESTALE);
692
693         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
694         res_id[0] = de_srcdir->d_inode->i_ino;
695         res_id[1] = de_srcdir->d_inode->i_generation;
696
697         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
698                              NULL, 0, lock_mode, &srclockh);
699         if (rc == 0) {
700                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
701                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
702                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
703                                       &flags, ldlm_completion_ast,
704                                       mds_blocking_ast, NULL, 0, &srclockh);
705                 if (rc != ELDLM_OK) {
706                         CERROR("lock enqueue: err: %d\n", rc);
707                         GOTO(out_rename_srcput, rc = -EIO);
708                 }
709         } else
710                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
711
712         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
713         if (IS_ERR(de_tgtdir))
714                 GOTO(out_rename_srcdir, rc = -ESTALE);
715
716         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
717         res_id[0] = de_tgtdir->d_inode->i_ino;
718         res_id[1] = de_tgtdir->d_inode->i_generation;
719
720         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
721                              NULL, 0, lock_mode, &tgtlockh);
722         if (rc == 0) {
723                 flags = 0;
724                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
725                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
726                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
727                                       &flags, ldlm_completion_ast,
728                                       mds_blocking_ast, NULL, 0, &tgtlockh);
729                 if (rc != ELDLM_OK) {
730                         CERROR("lock enqueue: err: %d\n", rc);
731                         GOTO(out_rename_tgtput, rc = -EIO);
732                 }
733         } else
734                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
735
736 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
737         double_lock(de_tgtdir, de_srcdir);
738 #endif
739         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
740         if (IS_ERR(de_old)) {
741                 CERROR("old child lookup error (%*s): %ld\n",
742                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
743                 GOTO(out_rename_tgtdir, rc = -ENOENT);
744         }
745
746         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
747         if (IS_ERR(de_new)) {
748                 CERROR("new child lookup error (%*s): %ld\n",
749                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
750                 GOTO(out_rename_deold, rc = -ENOENT);
751         }
752
753         /* in intent case ship back attributes to client */
754         if (offset) {
755                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
756                 struct inode *inode = de_new->d_inode;
757
758                 if (!inode) {
759                         body->valid = 0;
760                 } else {
761                         mds_pack_inode2fid(&body->fid1, inode);
762                         mds_pack_inode2body(body, inode);
763                         if (S_ISREG(inode->i_mode))
764                                 mds_pack_md(mds, req, 2, body, inode);
765                 }
766         }
767
768         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
769                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
770
771         mds_start_transno(mds);
772         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
773         if (IS_ERR(handle)) {
774                 rc = PTR_ERR(handle);
775                 mds_finish_transno(mds, handle, req, rc);
776                 GOTO(out_rename_denew, rc);
777         }
778
779         lock_kernel();
780         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
781                         NULL);
782         unlock_kernel();
783
784         rc = mds_finish_transno(mds, handle, req, rc);
785
786         err = fsfilt_commit(obd, de_tgtdir->d_inode, handle);
787         if (err) {
788                 CERROR("error on commit: err = %d\n", err);
789                 if (!rc)
790                         rc = err;
791         }
792         EXIT;
793
794 out_rename_denew:
795         l_dput(de_new);
796 out_rename_deold:
797         if (!rc) {
798                 res_id[0] = de_old->d_inode->i_ino;
799                 res_id[1] = de_old->d_inode->i_generation;
800                 flags = 0;
801                 /* Take an exclusive lock on the resource that we're
802                  * about to free, to force everyone to drop their
803                  * locks. */
804                 LDLM_DEBUG_NOLOCK("getting EX lock res "LPU64, res_id[0]);
805                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
806                                       res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
807                                       &flags, ldlm_completion_ast,
808                                       mds_blocking_ast, NULL, 0, &oldhandle);
809                 if (rc)
810                         CERROR("failed to get child inode lock (child ino "
811                                LPD64" dir ino %lu)\n",
812                                res_id[0], de_old->d_inode->i_ino);
813         }
814
815         l_dput(de_old);
816
817         if (!rc) {
818                 ldlm_lock_decref(&oldhandle, LCK_EX);
819                 rc = ldlm_cli_cancel(&oldhandle);
820                 if (rc < 0)
821                         CERROR("failed to cancel child inode lock ino "
822                                LPD64": %d\n", res_id[0], rc);
823         }
824 out_rename_tgtdir:
825 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
826         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
827 #endif
828         ldlm_lock_decref(&tgtlockh, lock_mode);
829 out_rename_tgtput:
830         l_dput(de_tgtdir);
831 out_rename_srcdir:
832         ldlm_lock_decref(&srclockh, lock_mode);
833 out_rename_srcput:
834         l_dput(de_srcdir);
835 out_rename:
836         req->rq_status = rc;
837         return 0;
838 }
839
840 typedef int (*mds_reinter) (struct mds_update_record *, int offset,
841                             struct ptlrpc_request *);
842
843 static mds_reinter reinters[REINT_MAX + 1] = {
844         [REINT_SETATTR] mds_reint_setattr,
845         [REINT_CREATE] mds_reint_create,
846         [REINT_UNLINK] mds_reint_unlink,
847         [REINT_LINK] mds_reint_link,
848         [REINT_RENAME] mds_reint_rename,
849 };
850
851 int mds_reint_rec(struct mds_update_record *rec, int offset,
852                   struct ptlrpc_request *req)
853 {
854         struct mds_obd *mds = mds_req2mds(req);
855         struct obd_run_ctxt saved;
856         struct obd_ucred uc;
857         int realop = rec->ur_opcode & REINT_OPCODE_MASK;
858         int rc;
859
860         if (realop < 1 || realop > REINT_MAX) {
861                 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
862                        rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
863                 rc = req->rq_status = -EINVAL;
864                 RETURN(rc);
865         }
866
867         uc.ouc_fsuid = rec->ur_fsuid;
868         uc.ouc_fsgid = rec->ur_fsgid;
869         uc.ouc_cap = rec->ur_cap;
870
871         push_ctxt(&saved, &mds->mds_ctxt, &uc);
872         rc = reinters[realop] (rec, offset, req);
873         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
874
875         return rc;
876 }