Whamcloud - gitweb
Put the HOWTO back in the book.
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  *  Author: Peter Braam <braam@clusterfs.com>
25  *  Author: Andreas Dilger <adilger@clusterfs.com>
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/obd_class.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 /* Assumes caller has already pushed us into the kernel context. */
43 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
44                          struct ptlrpc_request *req)
45 {
46         struct mds_export_data *med = &req->rq_export->exp_mds_data;
47         struct mds_client_data *mcd = med->med_mcd;
48         __u64 last_rcvd;
49         loff_t off;
50         int rc;
51
52         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
53
54         spin_lock(&mds->mds_last_lock);
55         last_rcvd = ++mds->mds_last_rcvd;
56         spin_unlock(&mds->mds_last_lock);
57         req->rq_repmsg->transno = HTON__u64(last_rcvd);
58         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
59         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
60         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
61
62         mds_fs_set_last_rcvd(mds, handle);
63         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
64         CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = %d\n",
65                last_rcvd, mcd->mcd_uuid, med->med_off, rc);
66
67         if (rc == sizeof(*mcd))
68                 rc = 0;
69         else {
70                 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
71                 if (rc >= 0)
72                         rc = -EIO;
73         }
74
75         return rc;
76 }
77
78 /* In the write-back case, the client holds a lock on a subtree.
79  * In the intent case, the client holds a lock on the child inode.
80  * In the pathname case, the client (may) hold a lock on the child inode. */
81 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
82                              struct ptlrpc_request *req)
83 {
84         struct mds_obd *mds = mds_req2mds(req);
85         struct obd_device *obd = req->rq_export->exp_obd;
86         struct dentry *de;
87         void *handle;
88         struct lustre_handle child_lockh;
89         int rc = 0, err;
90
91         if (req->rq_reqmsg->bufcount > offset + 1) {
92                 struct dentry *dir;
93                 struct lustre_handle dir_lockh;
94                 char *name;
95                 int namelen;
96
97                 /* a name was supplied by the client; fid1 is the directory */
98                 dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
99                                             &dir_lockh);
100                 if (IS_ERR(dir)) {
101                         LBUG();
102                         GOTO(out_setattr, rc = PTR_ERR(dir));
103                 }
104
105                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
106                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
107                 de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
108                                             0, &child_lockh, LCK_PR);
109                 l_dput(dir);
110                 if (IS_ERR(de)) {
111                         LBUG();
112                         GOTO(out_setattr_de, rc = PTR_ERR(de));
113                 }
114         } else {
115                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
116                 if (!de || IS_ERR(de)) {
117                         LBUG();
118                         GOTO(out_setattr_de, rc = -ESTALE);
119                 }
120         }
121         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
122
123         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
124                        de->d_inode->i_sb->s_dev);
125
126         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
127         if (!handle)
128                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
129
130         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
131
132         if (!rc)
133                 rc = mds_update_last_rcvd(mds, handle, req);
134
135         err = mds_fs_commit(mds, de->d_inode, handle);
136         if (err) {
137                 CERROR("error on commit: err = %d\n", err);
138                 if (!rc)
139                         rc = err;
140         }
141
142         EXIT;
143 out_setattr_de:
144         l_dput(de);
145 out_setattr:
146         req->rq_status = rc;
147         return (0);
148 }
149
150 static int mds_reint_recreate(struct mds_update_record *rec, int offset,
151                               struct ptlrpc_request *req)
152 {
153         struct dentry *de = NULL;
154         struct mds_obd *mds = mds_req2mds(req);
155         struct dentry *dchild = NULL;
156         struct inode *dir;
157         int rc = 0;
158         ENTRY;
159
160         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
161         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
162                 LBUG();
163                 GOTO(out_create_de, rc = -ESTALE);
164         }
165         dir = de->d_inode;
166         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
167
168         down(&dir->i_sem);
169         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
170         if (IS_ERR(dchild)) {
171                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
172                 up(&dir->i_sem);
173                 LBUG();
174                 GOTO(out_create_dchild, rc = -ESTALE);
175         }
176
177         if (dchild->d_inode) {
178                 struct mds_body *body;
179                 rc = 0;
180                 body = lustre_msg_buf(req->rq_repmsg, 0);
181                 body->ino = dchild->d_inode->i_ino;
182                 body->generation = dchild->d_inode->i_generation;
183         } else {
184                 CERROR("child doesn't exist (dir %ld, name %s)\n",
185                        dir->i_ino, rec->ur_name);
186                 rc = -ENOENT;
187                 LBUG();
188         }
189
190 out_create_dchild:
191         l_dput(dchild);
192         up(&dir->i_sem);
193 out_create_de:
194         l_dput(de);
195         req->rq_status = rc;
196         return 0;
197 }
198
199 static int mds_reint_create(struct mds_update_record *rec, int offset,
200                             struct ptlrpc_request *req)
201 {
202         struct dentry *de = NULL;
203         struct mds_obd *mds = mds_req2mds(req);
204         struct obd_device *obd = req->rq_export->exp_obd;
205         struct dentry *dchild = NULL;
206         struct inode *dir;
207         void *handle;
208         struct lustre_handle lockh;
209         int rc = 0, err, lock_mode, type = rec->ur_mode & S_IFMT;
210         ENTRY;
211
212         /* requests were at offset 2, replies go back at 1 */
213         if (offset)
214                 offset = 1;
215
216         if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0)
217                 LBUG();
218
219         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
220
221         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
222                 GOTO(out_create, rc = -ESTALE);
223
224         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
225         if (IS_ERR(de)) {
226                 rc = PTR_ERR(de);
227                 CERROR("parent lookup error %d\n", rc);
228                 LBUG();
229                 GOTO(out_create, rc);
230         }
231         dir = de->d_inode;
232         CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n",
233                dir->i_ino, rec->ur_name, rec->ur_mode);
234
235         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
236
237         down(&dir->i_sem);
238         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
239         if (IS_ERR(dchild)) {
240                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
241                 LBUG();
242                 GOTO(out_create_de, rc = -ESTALE);
243         }
244
245         if (dchild->d_inode) {
246                 struct mds_body *body;
247                 struct inode *inode = dchild->d_inode;
248                 struct lov_mds_md *lmm;
249
250                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
251                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
252
253                 body = lustre_msg_buf(req->rq_repmsg, offset);
254                 mds_pack_inode2fid(&body->fid1, inode);
255                 mds_pack_inode2body(body, inode);
256                 if (S_ISREG(inode->i_mode)) {
257                         lmm = lustre_msg_buf(req->rq_repmsg, offset + 1);
258                         lmm->lmm_easize = mds->mds_max_mdsize;
259
260                         if (mds_fs_get_md(mds, inode, lmm) < 0)
261                                 memset(lmm, 0, lmm->lmm_easize);
262                 }
263                 /* now a normal case for intent locking */
264                 GOTO(out_create_dchild, rc = -EEXIST);
265         }
266
267         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb->s_dev);
268
269         if (dir->i_mode & S_ISGID) {
270                 rec->ur_gid = dir->i_gid;
271                 if (S_ISDIR(rec->ur_mode))
272                         rec->ur_mode |= S_ISGID;
273         }
274
275         switch (type) {
276         case S_IFREG:{
277                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
278                 if (!handle)
279                         GOTO(out_create_dchild, PTR_ERR(handle));
280                 rc = vfs_create(dir, dchild, rec->ur_mode);
281                 EXIT;
282                 break;
283         }
284         case S_IFDIR:{
285                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
286                 if (!handle)
287                         GOTO(out_create_dchild, PTR_ERR(handle));
288                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
289                 EXIT;
290                 break;
291         }
292         case S_IFLNK:{
293                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
294                 if (!handle)
295                         GOTO(out_create_dchild, PTR_ERR(handle));
296                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
297                 EXIT;
298                 break;
299         }
300         case S_IFCHR:
301         case S_IFBLK:
302         case S_IFIFO:
303         case S_IFSOCK:{
304                 int rdev = rec->ur_rdev;
305                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
306                 if (!handle)
307                         GOTO(out_create_dchild, PTR_ERR(handle));
308                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
309                 EXIT;
310                 break;
311         }
312         default:
313                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
314                 GOTO(out_create_dchild, rc = -EINVAL);
315         }
316
317         if (rc) {
318                 CERROR("error during create: %d\n", rc);
319                 if (rc != -ENOSPC)
320                         LBUG();
321                 GOTO(out_create_commit, rc);
322         } else {
323                 struct iattr iattr;
324                 struct inode *inode = dchild->d_inode;
325                 struct mds_body *body;
326
327                 CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
328                 if (!offset && type == S_IFREG) {
329                         struct lov_mds_md *lmm;
330
331                         lmm = lustre_msg_buf(req->rq_reqmsg, 2);
332                         rc = mds_fs_set_md(mds, inode, handle, lmm);
333                         if (rc) {
334                                 CERROR("error %d setting LOV MD for %ld\n",
335                                        rc, inode->i_ino);
336                                 GOTO(out_create_unlink, rc);
337                         }
338                 }
339
340                 iattr.ia_atime = rec->ur_time;
341                 iattr.ia_ctime = rec->ur_time;
342                 iattr.ia_mtime = rec->ur_time;
343                 iattr.ia_uid = rec->ur_uid;
344                 iattr.ia_gid = rec->ur_gid;
345                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
346                         ATTR_MTIME | ATTR_CTIME;
347
348                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
349                 if (rc) {
350                         CERROR("error on setattr: rc = %d\n", rc);
351                         /* XXX should we abort here in case of error? */
352                 }
353
354                 rc = mds_update_last_rcvd(mds, handle, req);
355                 if (rc) {
356                         CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
357                         /* XXX should we abort here in case of error? */
358                 }
359
360                 body = lustre_msg_buf(req->rq_repmsg, offset);
361                 body->ino = inode->i_ino;
362                 body->generation = inode->i_generation;
363                 body->valid = OBD_MD_FLID | OBD_MD_FLGENER;
364         }
365         EXIT;
366 out_create_commit:
367         err = mds_fs_commit(mds, dir, handle);
368         if (err) {
369                 CERROR("error on commit: err = %d\n", err);
370                 if (!rc)
371                         rc = err;
372         }
373 out_create_dchild:
374         l_dput(dchild);
375         ldlm_lock_decref(&lockh, lock_mode);
376 out_create_de:
377         up(&dir->i_sem);
378         l_dput(de);
379 out_create:
380         req->rq_status = rc;
381         return 0;
382
383 out_create_unlink:
384         /* Destroy the file we just created.  This should not need extra
385          * journal credits, as we have already modified all of the blocks
386          * needed in order to create the file in the first place.
387          */
388         switch (type) {
389         case S_IFDIR:
390                 err = vfs_rmdir(dir, dchild);
391                 if (err)
392                         CERROR("failed rmdir in error path: rc = %d\n", err);
393                 break;
394         default:
395                 err = vfs_unlink(dir, dchild);
396                 if (err)
397                         CERROR("failed unlink in error path: rc = %d\n", err);
398                 break;
399         }
400
401         goto out_create_commit;
402 }
403
404 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
405                             struct ptlrpc_request *req)
406 {
407         struct dentry *de = NULL;
408         struct dentry *dchild = NULL;
409         struct mds_obd *mds = mds_req2mds(req);
410         struct obd_device *obd = req->rq_export->exp_obd;
411         char *name;
412         struct inode *dir, *inode;
413         struct lustre_handle lockh, child_lockh;
414         void *handle;
415         int namelen, lock_mode, err, rc = 0;
416         ENTRY;
417
418         /* a name was supplied by the client; fid1 is the directory */
419         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
420         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
421         if (IS_ERR(de)) {
422                 LBUG();
423                 RETURN(PTR_ERR(de));
424         }
425
426         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
427                 GOTO(out_unlink, rc = -ENOENT);
428
429         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
430         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
431         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
432                                         LCK_EX, &child_lockh, lock_mode);
433
434         if (IS_ERR(dchild)) {
435                 LBUG();
436                 GOTO(out_unlink, rc = PTR_ERR(dchild));
437         }
438
439         dir = de->d_inode;
440         inode = dchild->d_inode;
441         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
442
443         if (!inode) {
444                 CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s\n",
445                        dir->i_ino, rec->ur_name);
446                 /* going to out_unlink_cancel causes an LBUG, don't know why */
447                 GOTO(out_unlink_dchild, rc = -ENOENT);
448         } else if (offset) {
449                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
450                 mds_pack_inode2fid(&body->fid1, inode);
451                 mds_pack_inode2body(body, inode);
452         }
453
454         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
455
456         switch (rec->ur_mode /* & S_IFMT ? */) {
457         case S_IFDIR:
458                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
459                 if (!handle)
460                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
461                 rc = vfs_rmdir(dir, dchild);
462                 break;
463         case S_IFREG:
464                 /* get OBD EA data first so client can also destroy object */
465                 if ((inode->i_mode & S_IFMT) == S_IFREG && offset) {
466                         struct lov_mds_md *lmm;
467
468                         lmm = lustre_msg_buf(req->rq_repmsg, 2);
469                         lmm->lmm_easize = mds->mds_max_mdsize;
470                         if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
471                                 CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n",
472                                        inode->i_ino, rc);
473                                 memset(lmm, 0, lmm->lmm_easize);
474                         }
475                 }
476                 /* no break */
477         case S_IFLNK:
478         case S_IFCHR:
479         case S_IFBLK:
480         case S_IFIFO:
481         case S_IFSOCK:
482                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
483                 if (!handle)
484                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
485                 rc = vfs_unlink(dir, dchild);
486                 break;
487         default:
488                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
489                 handle = NULL;
490                 LBUG();
491                 GOTO(out_unlink_cancel, rc = -EINVAL);
492         }
493
494         if (!rc)
495                 rc = mds_update_last_rcvd(mds, handle, req);
496         err = mds_fs_commit(mds, dir, handle);
497         if (err) {
498                 CERROR("error on commit: err = %d\n", err);
499                 if (!rc)
500                         rc = err;
501         }
502
503         EXIT;
504
505 out_unlink_cancel:
506         ldlm_lock_decref(&child_lockh, LCK_EX);
507         err = ldlm_cli_cancel(&child_lockh);
508         if (err < 0) {
509                 CERROR("failed to cancel child inode lock: err = %d\n", err);
510                 if (!rc)
511                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
512         }
513 out_unlink_dchild:
514         l_dput(dchild);
515         up(&dir->i_sem);
516 out_unlink:
517         ldlm_lock_decref(&lockh, lock_mode);
518         l_dput(de);
519         req->rq_status = rc;
520         return 0;
521 }
522
523 static int mds_reint_link(struct mds_update_record *rec, int offset,
524                           struct ptlrpc_request *req)
525 {
526         struct dentry *de_src = NULL;
527         struct dentry *de_tgt_dir = NULL;
528         struct dentry *dchild = NULL;
529         struct mds_obd *mds = mds_req2mds(req);
530         void *handle;
531         int rc = 0;
532         int err;
533
534         ENTRY;
535         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
536         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
537                 GOTO(out_link, rc = -ESTALE);
538         }
539
540         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
541         if (IS_ERR(de_tgt_dir)) {
542                 GOTO(out_link_de_src, rc = -ESTALE);
543         }
544
545         down(&de_tgt_dir->d_inode->i_sem);
546         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
547         if (IS_ERR(dchild)) {
548                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
549                 GOTO(out_link_de_tgt_dir, rc = -ESTALE);
550         }
551
552         if (dchild->d_inode) {
553                 CERROR("child exists (dir %ld, name %s\n",
554                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
555                 GOTO(out_link_dchild, rc = -EEXIST);
556         }
557
558         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
559                        de_src->d_inode->i_sb->s_dev);
560
561         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
562         if (!handle)
563                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
564
565         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
566
567         if (!rc)
568                 rc = mds_update_last_rcvd(mds, handle, req);
569
570         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
571         if (err) {
572                 CERROR("error on commit: err = %d\n", err);
573                 if (!rc)
574                         rc = err;
575         }
576         EXIT;
577
578 out_link_dchild:
579         l_dput(dchild);
580 out_link_de_tgt_dir:
581         up(&de_tgt_dir->d_inode->i_sem);
582         l_dput(de_tgt_dir);
583 out_link_de_src:
584         l_dput(de_src);
585 out_link:
586         req->rq_status = rc;
587         return 0;
588 }
589
590 static int mds_reint_rename(struct mds_update_record *rec, int offset,
591                             struct ptlrpc_request *req)
592 {
593         struct obd_device *obd = req->rq_export->exp_obd;
594         struct dentry *de_srcdir = NULL;
595         struct dentry *de_tgtdir = NULL;
596         struct dentry *de_old = NULL;
597         struct dentry *de_new = NULL;
598         struct mds_obd *mds = mds_req2mds(req);
599         struct lustre_handle tgtlockh, srclockh, oldhandle;
600         int flags, lock_mode, rc = 0, err;
601         void *handle;
602         __u64 res_id[3] = { 0 };
603         ENTRY;
604
605         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
606         if (IS_ERR(de_srcdir))
607                 GOTO(out_rename, rc = -ESTALE);
608
609         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
610         res_id[0] = de_srcdir->d_inode->i_ino;
611
612         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
613                              NULL, 0, lock_mode, &srclockh);
614         if (rc == 0) {
615                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
616                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
617                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
618                                       &flags, ldlm_completion_ast,
619                                       mds_blocking_ast, NULL, 0, &srclockh);
620                 if (rc != ELDLM_OK) {
621                         CERROR("lock enqueue: err: %d\n", rc);
622                         GOTO(out_rename_srcput, rc = -EIO);
623                 }
624         } else
625                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
626
627         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
628         if (IS_ERR(de_tgtdir))
629                 GOTO(out_rename_srcdir, rc = -ESTALE);
630
631         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
632         res_id[0] = de_tgtdir->d_inode->i_ino;
633
634         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
635                              NULL, 0, lock_mode, &tgtlockh);
636         if (rc == 0) {
637                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
638                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
639                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
640                                       &flags, ldlm_completion_ast,
641                                       mds_blocking_ast, NULL, 0, &tgtlockh);
642                 if (rc != ELDLM_OK) {
643                         CERROR("lock enqueue: err: %d\n", rc);
644                         GOTO(out_rename_tgtput, rc = -EIO);
645                 }
646         } else
647                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
648
649         double_lock(de_tgtdir, de_srcdir);
650
651         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
652         if (IS_ERR(de_old)) {
653                 CERROR("old child lookup error (%*s): %ld\n",
654                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
655                 GOTO(out_rename_tgtdir, rc = -ENOENT);
656         }
657
658         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
659         if (IS_ERR(de_new)) {
660                 CERROR("new child lookup error (%*s): %ld\n",
661                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
662                 GOTO(out_rename_deold, rc = -ENOENT);
663         }
664         
665         /* in intent case ship back attributes to client */
666         if (offset) { 
667                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
668                 struct inode *inode = de_new->d_inode;
669
670                 if (!inode) {
671                         body->valid = 0; 
672                 } else {
673                         mds_pack_inode2fid(&body->fid1, inode);
674                         mds_pack_inode2body(body, inode);
675                         if (S_ISREG(inode->i_mode)) {
676                                 struct lov_mds_md *lmm;
677
678                                 lmm = lustre_msg_buf(req->rq_repmsg, 2);
679                                 lmm->lmm_easize = mds->mds_max_mdsize;
680                                 if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
681                                         CDEBUG(D_INFO,"No md for %ld: rc %d\n",
682                                                inode->i_ino, rc);
683                                         memset(lmm, 0, lmm->lmm_easize);
684                                 }
685                         }
686                 }
687         }
688
689         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
690                        de_srcdir->d_inode->i_sb->s_dev);
691
692         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
693         if (!handle)
694                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
695         lock_kernel();
696         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
697                         NULL);
698         unlock_kernel();
699
700         if (!rc)
701                 rc = mds_update_last_rcvd(mds, handle, req);
702
703         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
704         if (err) {
705                 CERROR("error on commit: err = %d\n", err);
706                 if (!rc)
707                         rc = err;
708         }
709         EXIT;
710
711       out_rename_denew:
712         l_dput(de_new);
713       out_rename_deold:
714         if (!rc) {
715                 res_id[0] = de_old->d_inode->i_ino;
716                 /* Take an exclusive lock on the resource that we're
717                  * about to free, to force everyone to drop their
718                  * locks. */
719                 LDLM_DEBUG_NOLOCK("getting EX lock res "LPU64, res_id[0]);
720                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
721                                       res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
722                                       &flags, ldlm_completion_ast,
723                                       mds_blocking_ast, NULL, 0, &oldhandle);
724                 if (rc)
725                         CERROR("failed to get child inode lock (child ino "LPD64", "
726                                "dir ino %ld)\n",
727                                res_id[0], de_old->d_inode->i_ino);
728         }
729
730         l_dput(de_old);
731
732         if (!rc) {
733                 ldlm_lock_decref(&oldhandle, LCK_EX);
734                 rc = ldlm_cli_cancel(&oldhandle);
735                 if (rc < 0)
736                         CERROR("failed to cancel child inode lock ino "
737                                LPD64": %d\n", res_id[0], rc);
738         }
739 out_rename_tgtdir:
740         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
741         ldlm_lock_decref(&tgtlockh, lock_mode);
742 out_rename_tgtput:
743         l_dput(de_tgtdir);
744 out_rename_srcdir:
745         ldlm_lock_decref(&srclockh, lock_mode);
746 out_rename_srcput:
747         l_dput(de_srcdir);
748 out_rename:
749         req->rq_status = rc;
750         return 0;
751 }
752
753 typedef int (*mds_reinter) (struct mds_update_record *, int offset,
754                             struct ptlrpc_request *);
755
756 static mds_reinter reinters[REINT_MAX + 1] = {
757         [REINT_SETATTR] mds_reint_setattr,
758         [REINT_CREATE] mds_reint_create,
759         [REINT_UNLINK] mds_reint_unlink,
760         [REINT_LINK] mds_reint_link,
761         [REINT_RENAME] mds_reint_rename,
762         [REINT_RECREATE] mds_reint_recreate,
763 };
764
765 int mds_reint_rec(struct mds_update_record *rec, int offset,
766                   struct ptlrpc_request *req)
767 {
768         struct mds_obd *mds = mds_req2mds(req);
769         struct obd_run_ctxt saved;
770
771         int rc;
772
773         if (rec->ur_opcode < 1 || rec->ur_opcode > REINT_MAX) {
774                 CERROR("opcode %d not valid\n", rec->ur_opcode);
775                 rc = req->rq_status = -EINVAL;
776                 RETURN(rc);
777         }
778
779         push_ctxt(&saved, &mds->mds_ctxt);
780         rc = reinters[rec->ur_opcode] (rec, offset, req);
781         pop_ctxt(&saved);
782
783         return rc;
784 }