Whamcloud - gitweb
8c0a2267d21a5d445ead29309463889e617022eb
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  *  Author: Peter Braam <braam@clusterfs.com>
25  *  Author: Andreas Dilger <adilger@clusterfs.com>
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/obd_class.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 /* Assumes caller has already pushed us into the kernel context. */
43 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
44                          struct ptlrpc_request *req)
45 {
46         struct mds_export_data *med = &req->rq_export->exp_mds_data;
47         struct mds_client_data *mcd = med->med_mcd;
48         __u64 last_rcvd;
49         loff_t off;
50         int rc;
51
52         /* we don't allocate new transnos for replayed requests */
53         if (req->rq_level == LUSTRE_CONN_RECOVD)
54                 RETURN(0);
55
56         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
57
58         spin_lock(&mds->mds_last_lock);
59         last_rcvd = ++mds->mds_last_rcvd;
60         spin_unlock(&mds->mds_last_lock);
61         req->rq_repmsg->transno = HTON__u64(last_rcvd);
62         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
63         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
64         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
65
66         mds_fs_set_last_rcvd(mds, handle);
67         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
68         CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = "
69                "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc);
70
71         if (rc == sizeof(*mcd))
72                 rc = 0;
73         else {
74                 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
75                 if (rc >= 0)
76                         rc = -EIO;
77         }
78
79         return rc;
80 }
81
82 /* In the write-back case, the client holds a lock on a subtree.
83  * In the intent case, the client holds a lock on the child inode.
84  * In the pathname case, the client (may) hold a lock on the child inode. */
85 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
86                              struct ptlrpc_request *req)
87 {
88         struct mds_obd *mds = mds_req2mds(req);
89         struct obd_device *obd = req->rq_export->exp_obd;
90         struct mds_body *body;
91         struct dentry *de;
92         struct inode *inode;
93         void *handle;
94         struct lustre_handle child_lockh;
95         int rc = 0, err;
96
97         if (req->rq_reqmsg->bufcount > offset + 1) {
98                 struct dentry *dir;
99                 struct lustre_handle dir_lockh;
100                 char *name;
101                 int namelen;
102
103                 /* a name was supplied by the client; fid1 is the directory */
104                 dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
105                                             &dir_lockh);
106                 if (IS_ERR(dir)) {
107                         LBUG();
108                         GOTO(out_setattr, rc = PTR_ERR(dir));
109                 }
110
111                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
112                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
113                 de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
114                                             0, &child_lockh, LCK_PR);
115                 l_dput(dir);
116                 if (IS_ERR(de)) {
117                         LBUG();
118                         GOTO(out_setattr_de, rc = PTR_ERR(de));
119                 }
120         } else {
121                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
122                 if (!de || IS_ERR(de)) {
123                         LBUG();
124                         GOTO(out_setattr_de, rc = -ESTALE);
125                 }
126         }
127         inode = de->d_inode;
128         CDEBUG(D_INODE, "ino %ld\n", inode->i_ino);
129
130         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
131                        to_kdev_t(inode->i_sb->s_dev));
132
133         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
134         if (!handle)
135                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
136
137         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
138
139         if (offset) {
140                 body = lustre_msg_buf(req->rq_repmsg, 1);
141                 mds_pack_inode2fid(&body->fid1, inode);
142                 mds_pack_inode2body(body, inode);
143         }
144
145         if (!rc)
146                 rc = mds_update_last_rcvd(mds, handle, req);
147
148         err = mds_fs_commit(mds, de->d_inode, handle);
149         if (err) {
150                 CERROR("error on commit: err = %d\n", err);
151                 if (!rc)
152                         rc = err;
153         }
154
155         EXIT;
156 out_setattr_de:
157         l_dput(de);
158 out_setattr:
159         req->rq_status = rc;
160         return 0;
161 }
162
163 static int mds_reint_create(struct mds_update_record *rec, int offset,
164                             struct ptlrpc_request *req)
165 {
166         struct dentry *de = NULL;
167         struct mds_obd *mds = mds_req2mds(req);
168         struct obd_device *obd = req->rq_export->exp_obd;
169         struct dentry *dchild = NULL;
170         struct inode *dir;
171         void *handle;
172         struct lustre_handle lockh;
173         int rc = 0, err, lock_mode, type = rec->ur_mode & S_IFMT;
174         ENTRY;
175
176         /* requests were at offset 2, replies go back at 1 */
177         if (offset)
178                 offset = 1;
179
180         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
181
182         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
183
184         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
185                 GOTO(out_create, rc = -ESTALE);
186
187         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
188         if (IS_ERR(de)) {
189                 rc = PTR_ERR(de);
190                 CERROR("parent lookup error %d\n", rc);
191                 LBUG();
192                 GOTO(out_create, rc);
193         }
194         dir = de->d_inode;
195         CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n",
196                dir->i_ino, rec->ur_name, rec->ur_mode);
197
198         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
199
200         down(&dir->i_sem);
201         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
202         if (IS_ERR(dchild)) {
203                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
204                 LBUG();
205                 GOTO(out_create_de, rc = -ESTALE);
206         }
207
208         if (dchild->d_inode) {
209                 struct mds_body *body;
210                 struct inode *inode = dchild->d_inode;
211
212                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
213                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
214
215                 /* XXX check that mode is correct? */
216
217                 body = lustre_msg_buf(req->rq_repmsg, offset);
218                 mds_pack_inode2fid(&body->fid1, inode);
219                 mds_pack_inode2body(body, inode);
220                 if (S_ISREG(inode->i_mode)) {
221                         struct lov_mds_md *lmm;
222
223                         lmm = lustre_msg_buf(req->rq_repmsg, offset + 1);
224                         lmm->lmm_easize = mds->mds_max_mdsize;
225
226                         if (mds_fs_get_md(mds, inode, lmm) < 0) {
227                                 CDEBUG(D_INFO,"No md for %ld: rc %d\n",
228                                        inode->i_ino, rc);
229                                 memset(lmm, 0, lmm->lmm_easize);
230                         } else
231                                 body->valid |= OBD_MD_FLEASIZE;
232                 }
233
234                 /* This isn't an error for RECREATE. */
235                 if (rec->ur_opcode & REINT_REPLAYING) {
236                         CDEBUG(D_INODE, "EEXIST suppressed for REPLAYING\n");
237                         rc = 0;
238                 } else {
239                         rc = -EEXIST;
240                 }
241                 GOTO(out_create_dchild, rc);
242         }
243
244         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
245                        to_kdev_t(dir->i_sb->s_dev));
246
247         if (dir->i_mode & S_ISGID) {
248                 rec->ur_gid = dir->i_gid;
249                 if (S_ISDIR(rec->ur_mode))
250                         rec->ur_mode |= S_ISGID;
251         }
252
253         switch (type) {
254         case S_IFREG:{
255                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
256                 if (!handle)
257                         GOTO(out_create_dchild, PTR_ERR(handle));
258                 rc = vfs_create(dir, dchild, rec->ur_mode);
259                 EXIT;
260                 break;
261         }
262         case S_IFDIR:{
263                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
264                 if (!handle)
265                         GOTO(out_create_dchild, PTR_ERR(handle));
266                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
267                 EXIT;
268                 break;
269         }
270         case S_IFLNK:{
271                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
272                 if (!handle)
273                         GOTO(out_create_dchild, PTR_ERR(handle));
274                 rc = vfs_symlink(dir, dchild, rec->ur_name);
275                 EXIT;
276                 break;
277         }
278         case S_IFCHR:
279         case S_IFBLK:
280         case S_IFIFO:
281         case S_IFSOCK:{
282                 int rdev = rec->ur_rdev;
283                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
284                 if (!handle)
285                         GOTO(out_create_dchild, PTR_ERR(handle));
286                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
287                 EXIT;
288                 break;
289         }
290         default:
291                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
292                 GOTO(out_create_dchild, rc = -EINVAL);
293         }
294
295         if (rc) {
296                 CDEBUG(D_INODE, "error during create: %d\n", rc);
297                 GOTO(out_create_commit, rc);
298         } else {
299                 struct iattr iattr;
300                 struct inode *inode = dchild->d_inode;
301                 struct mds_body *body;
302
303                 iattr.ia_atime = rec->ur_time;
304                 iattr.ia_ctime = rec->ur_time;
305                 iattr.ia_mtime = rec->ur_time;
306                 iattr.ia_uid = rec->ur_uid;
307                 iattr.ia_gid = rec->ur_gid;
308                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
309                         ATTR_MTIME | ATTR_CTIME;
310
311                 if (rec->ur_fid2->id) {
312                         LASSERT(rec->ur_opcode & REINT_REPLAYING);
313                         inode->i_generation = rec->ur_fid2->generation;
314                         /* Dirtied and committed by this setattr: */
315                         CDEBUG(D_INODE, "recreated ino %ld with gen %ld\n",
316                                inode->i_ino, inode->i_generation);
317                 } else {
318                         CDEBUG(D_INODE, "created ino %ld\n", inode->i_ino);
319                 }
320
321                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
322                 if (rc) {
323                         CERROR("error on setattr: rc = %d\n", rc);
324                         /* XXX should we abort here in case of error? */
325                 }
326
327                 rc = mds_update_last_rcvd(mds, handle, req);
328                 if (rc) {
329                         CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
330                         GOTO(out_create_unlink, rc);
331                 }
332
333                 body = lustre_msg_buf(req->rq_repmsg, offset);
334                 mds_pack_inode2fid(&body->fid1, inode);
335                 mds_pack_inode2body(body, inode);
336         }
337         EXIT;
338 out_create_commit:
339         err = mds_fs_commit(mds, dir, handle);
340         if (err) {
341                 CERROR("error on commit: err = %d\n", err);
342                 if (!rc)
343                         rc = err;
344         }
345 out_create_dchild:
346         l_dput(dchild);
347         ldlm_lock_decref(&lockh, lock_mode);
348 out_create_de:
349         up(&dir->i_sem);
350         l_dput(de);
351 out_create:
352         req->rq_status = rc;
353         return 0;
354
355 out_create_unlink:
356         /* Destroy the file we just created.  This should not need extra
357          * journal credits, as we have already modified all of the blocks
358          * needed in order to create the file in the first place.
359          */
360         switch (type) {
361         case S_IFDIR:
362                 err = vfs_rmdir(dir, dchild);
363                 if (err)
364                         CERROR("failed rmdir in error path: rc = %d\n", err);
365                 break;
366         default:
367                 err = vfs_unlink(dir, dchild);
368                 if (err)
369                         CERROR("failed unlink in error path: rc = %d\n", err);
370                 break;
371         }
372
373         goto out_create_commit;
374 }
375
376 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
377                             struct ptlrpc_request *req)
378 {
379         struct dentry *de = NULL;
380         struct dentry *dchild = NULL;
381         struct mds_obd *mds = mds_req2mds(req);
382         struct obd_device *obd = req->rq_export->exp_obd;
383         struct mds_body *body = NULL;
384         char *name;
385         struct inode *dir, *inode;
386         struct lustre_handle lockh, child_lockh;
387         void *handle;
388         int namelen, lock_mode, err, rc = 0;
389         ENTRY;
390
391         /* a name was supplied by the client; fid1 is the directory */
392         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
393         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
394         if (IS_ERR(de)) {
395                 LBUG();
396                 RETURN(PTR_ERR(de));
397         }
398
399         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
400                 GOTO(out_unlink, rc = -ENOENT);
401
402         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
403         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
404 #warning "FIXME: if mds_name2locked_dentry decrefs this lock, we must not"
405         memcpy(&child_lockh, &lockh, sizeof(child_lockh));
406         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
407                                         LCK_EX, &child_lockh, lock_mode);
408
409         if (IS_ERR(dchild)) {
410                 LBUG();
411                 GOTO(out_unlink, rc = PTR_ERR(dchild));
412         }
413
414         dir = de->d_inode;
415         inode = dchild->d_inode;
416         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
417
418         if (!inode) {
419                 if (rec->ur_opcode & REINT_REPLAYING) {
420                         CDEBUG(D_INODE,
421                                "child missing (%ld/%s); OK for REPLAYING\n",
422                                dir->i_ino, rec->ur_name);
423                         rc = 0;
424                 } else {
425                         CDEBUG(D_INODE,
426                                "child doesn't exist (dir %ld, name %s)\n",
427                                dir->i_ino, rec->ur_name);
428                         rc = -ENOENT;
429                 }
430                 /* going to out_unlink_cancel causes an LBUG, don't know why */
431                 GOTO(out_unlink_dchild, rc);
432         }
433
434         if (offset) {
435                 /* XXX offset? */
436                 offset = 1;
437
438                 body = lustre_msg_buf(req->rq_repmsg, offset);
439                 mds_pack_inode2fid(&body->fid1, inode);
440                 mds_pack_inode2body(body, inode);
441         }
442
443         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
444                        to_kdev_t(dir->i_sb->s_dev));
445
446         switch (rec->ur_mode /* & S_IFMT ? */) {
447         case S_IFDIR:
448                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
449                 if (!handle)
450                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
451                 rc = vfs_rmdir(dir, dchild);
452                 break;
453         case S_IFREG:
454                 /* get OBD EA data first so client can also destroy object */
455                 if ((inode->i_mode & S_IFMT) == S_IFREG && offset) {
456                         struct lov_mds_md *lmm;
457
458                         lmm = lustre_msg_buf(req->rq_repmsg, offset + 1);
459                         lmm->lmm_easize = mds->mds_max_mdsize;
460                         if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
461                                 CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n",
462                                        inode->i_ino, rc);
463                                 memset(lmm, 0, lmm->lmm_easize);
464                         } else
465                                 body->valid |= OBD_MD_FLEASIZE;
466                 }
467                 /* no break */
468         case S_IFLNK:
469         case S_IFCHR:
470         case S_IFBLK:
471         case S_IFIFO:
472         case S_IFSOCK:
473                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
474                 if (!handle)
475                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
476                 rc = vfs_unlink(dir, dchild);
477                 break;
478         default:
479                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
480                 handle = NULL;
481                 LBUG();
482                 GOTO(out_unlink_cancel, rc = -EINVAL);
483         }
484
485         if (!rc)
486                 rc = mds_update_last_rcvd(mds, handle, req);
487         err = mds_fs_commit(mds, dir, handle);
488         if (err) {
489                 CERROR("error on commit: err = %d\n", err);
490                 if (!rc)
491                         rc = err;
492         }
493
494         EXIT;
495
496 out_unlink_cancel:
497         ldlm_lock_decref(&child_lockh, LCK_EX);
498         err = ldlm_cli_cancel(&child_lockh);
499         if (err < 0) {
500                 CERROR("failed to cancel child inode lock: err = %d\n", err);
501                 if (!rc)
502                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
503         }
504 out_unlink_dchild:
505         l_dput(dchild);
506         up(&dir->i_sem);
507 out_unlink:
508         ldlm_lock_decref(&lockh, lock_mode);
509         l_dput(de);
510         req->rq_status = rc;
511         return 0;
512 }
513
514 static int mds_reint_link(struct mds_update_record *rec, int offset,
515                           struct ptlrpc_request *req)
516 {
517         struct obd_device *obd = req->rq_export->exp_obd;
518         struct dentry *de_src = NULL;
519         struct dentry *de_tgt_dir = NULL;
520         struct dentry *dchild = NULL;
521         struct mds_obd *mds = mds_req2mds(req);
522         struct lustre_handle *handle, tgtlockh, srclockh;
523         int lock_mode;
524         __u64 res_id[3] = { 0 };
525         int flags = 0;
526         int rc = 0, err;
527
528         ENTRY;
529         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
530         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
531                 GOTO(out_link, rc = -ESTALE);
532         }
533
534         /* plan to change the link count on this inode: write lock */
535         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
536         res_id[0] = de_src->d_inode->i_ino;
537         res_id[1] = de_src->d_inode->i_generation;
538
539         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
540                              NULL, 0, lock_mode, &srclockh);
541         if (rc == 0) {
542                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
543                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
544                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
545                                       &flags, ldlm_completion_ast,
546                                       mds_blocking_ast, NULL, 0, &srclockh);
547                 if (rc != ELDLM_OK) {
548                         CERROR("lock enqueue: err: %d\n", rc);
549                         GOTO(out_link_src_put, rc = -EIO);
550                 }
551         } else
552                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
553
554         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
555         if (IS_ERR(de_tgt_dir)) {
556                 GOTO(out_link_src, rc = -ESTALE);
557         }
558
559         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
560         res_id[0] = de_tgt_dir->d_inode->i_ino;
561         res_id[1] = de_tgt_dir->d_inode->i_generation;
562
563         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
564                              NULL, 0, lock_mode, &tgtlockh);
565         if (rc == 0) {
566                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
567                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
568                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
569                                       &flags, ldlm_completion_ast,
570                                       mds_blocking_ast, NULL, 0, &tgtlockh);
571                 if (rc != ELDLM_OK) {
572                         CERROR("lock enqueue: err: %d\n", rc);
573                         GOTO(out_link_tgt_dir_put, rc = -EIO);
574                 }
575         } else
576                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
577
578         down(&de_tgt_dir->d_inode->i_sem);
579         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
580         if (IS_ERR(dchild)) {
581                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
582                 GOTO(out_link_tgt_dir, rc = -ESTALE);
583         }
584
585         if (dchild->d_inode) {
586                 struct inode *inode = dchild->d_inode;
587                 /* in intent case ship back attributes to client */
588                 if (offset) {
589                         struct mds_body *body =
590                                 lustre_msg_buf(req->rq_repmsg, 1);
591
592                         mds_pack_inode2fid(&body->fid1, inode);
593                         mds_pack_inode2body(body, inode);
594                         if (S_ISREG(inode->i_mode)) {
595                                 struct lov_mds_md *lmm;
596
597                                 lmm = lustre_msg_buf(req->rq_repmsg, 2);
598                                 lmm->lmm_easize = mds->mds_max_mdsize;
599                                 if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
600                                         CDEBUG(D_INFO,"No md for %ld: rc %d\n",
601                                                inode->i_ino, rc);
602                                         memset(lmm, 0, lmm->lmm_easize);
603                                 } else
604                                         body->valid |= OBD_MD_FLEASIZE;
605                         }
606                 }
607                 if (rec->ur_opcode & REINT_REPLAYING) {
608                         /* XXX verify that the link is to the the right file? */
609                         rc = 0;
610                         CDEBUG(D_INODE,
611                                "child exists (dir %ld, name %s) (REPLAYING)\n",
612                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
613                 } else {
614                         rc = -EEXIST;
615                         CERROR("child exists (dir %ld, name %s)\n",
616                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
617                 }
618                 GOTO(out_link_dchild, rc);
619         }
620
621         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
622                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
623
624         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
625         if (!handle)
626                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
627
628         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
629         if (rc)
630                 CERROR("link error %d\n", rc);
631         if (!rc)
632                 rc = mds_update_last_rcvd(mds, handle, req);
633
634         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
635         if (err) {
636                 CERROR("error on commit: err = %d\n", err);
637                 if (!rc)
638                         rc = err;
639         }
640         EXIT;
641
642 out_link_dchild:
643         l_dput(dchild);
644 out_link_tgt_dir:
645         ldlm_lock_decref(&tgtlockh, lock_mode);
646 out_link_tgt_dir_put:
647         up(&de_tgt_dir->d_inode->i_sem);
648         l_dput(de_tgt_dir);
649 out_link_src:
650         ldlm_lock_decref(&srclockh, lock_mode);
651 out_link_src_put:
652         l_dput(de_src);
653 out_link:
654         req->rq_status = rc;
655         return 0;
656 }
657
658 static int mds_reint_rename(struct mds_update_record *rec, int offset,
659                             struct ptlrpc_request *req)
660 {
661         struct obd_device *obd = req->rq_export->exp_obd;
662         struct dentry *de_srcdir = NULL;
663         struct dentry *de_tgtdir = NULL;
664         struct dentry *de_old = NULL;
665         struct dentry *de_new = NULL;
666         struct mds_obd *mds = mds_req2mds(req);
667         struct lustre_handle tgtlockh, srclockh, oldhandle;
668         int flags = 0, lock_mode, rc = 0, err;
669         void *handle;
670         __u64 res_id[3] = { 0 };
671         ENTRY;
672
673         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
674         if (IS_ERR(de_srcdir))
675                 GOTO(out_rename, rc = -ESTALE);
676
677         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
678         res_id[0] = de_srcdir->d_inode->i_ino;
679         res_id[1] = de_srcdir->d_inode->i_generation;
680
681         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
682                              NULL, 0, lock_mode, &srclockh);
683         if (rc == 0) {
684                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
685                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
686                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
687                                       &flags, ldlm_completion_ast,
688                                       mds_blocking_ast, NULL, 0, &srclockh);
689                 if (rc != ELDLM_OK) {
690                         CERROR("lock enqueue: err: %d\n", rc);
691                         GOTO(out_rename_srcput, rc = -EIO);
692                 }
693         } else
694                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
695
696         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
697         if (IS_ERR(de_tgtdir))
698                 GOTO(out_rename_srcdir, rc = -ESTALE);
699
700         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
701         res_id[0] = de_tgtdir->d_inode->i_ino;
702         res_id[1] = de_tgtdir->d_inode->i_generation;
703
704         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
705                              NULL, 0, lock_mode, &tgtlockh);
706         if (rc == 0) {
707                 flags = 0;
708                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
709                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
710                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
711                                       &flags, ldlm_completion_ast,
712                                       mds_blocking_ast, NULL, 0, &tgtlockh);
713                 if (rc != ELDLM_OK) {
714                         CERROR("lock enqueue: err: %d\n", rc);
715                         GOTO(out_rename_tgtput, rc = -EIO);
716                 }
717         } else
718                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
719
720 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
721         double_lock(de_tgtdir, de_srcdir);
722 #endif
723         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
724         if (IS_ERR(de_old)) {
725                 CERROR("old child lookup error (%*s): %ld\n",
726                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
727                 GOTO(out_rename_tgtdir, rc = -ENOENT);
728         }
729
730         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
731         if (IS_ERR(de_new)) {
732                 CERROR("new child lookup error (%*s): %ld\n",
733                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
734                 GOTO(out_rename_deold, rc = -ENOENT);
735         }
736
737         /* in intent case ship back attributes to client */
738         if (offset) {
739                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
740                 struct inode *inode = de_new->d_inode;
741
742                 if (!inode) {
743                         body->valid = 0;
744                 } else {
745                         mds_pack_inode2fid(&body->fid1, inode);
746                         mds_pack_inode2body(body, inode);
747                         if (S_ISREG(inode->i_mode)) {
748                                 struct lov_mds_md *lmm;
749
750                                 lmm = lustre_msg_buf(req->rq_repmsg, 2);
751                                 lmm->lmm_easize = mds->mds_max_mdsize;
752                                 if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
753                                         CDEBUG(D_INFO,"No md for %ld: rc %d\n",
754                                                inode->i_ino, rc);
755                                         memset(lmm, 0, lmm->lmm_easize);
756                                 } else
757                                         body->valid |= OBD_MD_FLEASIZE;
758                         }
759                 }
760         }
761
762         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
763                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
764
765         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
766         if (!handle)
767                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
768         lock_kernel();
769         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
770                         NULL);
771         unlock_kernel();
772
773         if (!rc)
774                 rc = mds_update_last_rcvd(mds, handle, req);
775
776         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
777         if (err) {
778                 CERROR("error on commit: err = %d\n", err);
779                 if (!rc)
780                         rc = err;
781         }
782         EXIT;
783
784 out_rename_denew:
785         l_dput(de_new);
786 out_rename_deold:
787         if (!rc) {
788                 res_id[0] = de_old->d_inode->i_ino;
789                 res_id[1] = de_old->d_inode->i_generation;
790                 flags = 0;
791                 /* Take an exclusive lock on the resource that we're
792                  * about to free, to force everyone to drop their
793                  * locks. */
794                 LDLM_DEBUG_NOLOCK("getting EX lock res "LPU64, res_id[0]);
795                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
796                                       res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
797                                       &flags, ldlm_completion_ast,
798                                       mds_blocking_ast, NULL, 0, &oldhandle);
799                 if (rc)
800                         CERROR("failed to get child inode lock (child ino "
801                                LPD64" dir ino %ld)\n",
802                                res_id[0], de_old->d_inode->i_ino);
803         }
804
805         l_dput(de_old);
806
807         if (!rc) {
808                 ldlm_lock_decref(&oldhandle, LCK_EX);
809                 rc = ldlm_cli_cancel(&oldhandle);
810                 if (rc < 0)
811                         CERROR("failed to cancel child inode lock ino "
812                                LPD64": %d\n", res_id[0], rc);
813         }
814 out_rename_tgtdir:
815 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
816         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
817 #endif
818         ldlm_lock_decref(&tgtlockh, lock_mode);
819 out_rename_tgtput:
820         l_dput(de_tgtdir);
821 out_rename_srcdir:
822         ldlm_lock_decref(&srclockh, lock_mode);
823 out_rename_srcput:
824         l_dput(de_srcdir);
825 out_rename:
826         req->rq_status = rc;
827         return 0;
828 }
829
830 typedef int (*mds_reinter) (struct mds_update_record *, int offset,
831                             struct ptlrpc_request *);
832
833 static mds_reinter reinters[REINT_MAX + 1] = {
834         [REINT_SETATTR] mds_reint_setattr,
835         [REINT_CREATE] mds_reint_create,
836         [REINT_UNLINK] mds_reint_unlink,
837         [REINT_LINK] mds_reint_link,
838         [REINT_RENAME] mds_reint_rename,
839 };
840
841 int mds_reint_rec(struct mds_update_record *rec, int offset,
842                   struct ptlrpc_request *req)
843 {
844         struct mds_obd *mds = mds_req2mds(req);
845         struct obd_run_ctxt saved;
846         struct obd_ucred uc;
847         int realop = rec->ur_opcode & REINT_OPCODE_MASK;
848         int rc;
849
850         if (realop < 1 || realop > REINT_MAX) {
851                 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
852                        rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
853                 rc = req->rq_status = -EINVAL;
854                 RETURN(rc);
855         }
856
857         uc.ouc_fsuid = rec->ur_fsuid;
858         uc.ouc_fsgid = rec->ur_fsgid;
859
860         push_ctxt(&saved, &mds->mds_ctxt, &uc);
861         rc = reinters[realop] (rec, offset, req);
862         pop_ctxt(&saved);
863
864         return rc;
865 }