Whamcloud - gitweb
removed demos/Makefile from configure.in
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  *  Author: Peter Braam <braam@clusterfs.com>
25  *  Author: Andreas Dilger <adilger@clusterfs.com>
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/obd_class.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 /* Assumes caller has already pushed us into the kernel context. */
43 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
44                          struct ptlrpc_request *req)
45 {
46         struct mds_export_data *med = &req->rq_export->exp_mds_data;
47         struct mds_client_data *mcd = med->med_mcd;
48         __u64 last_rcvd;
49         loff_t off;
50         int rc;
51
52         /* we don't allocate new transnos for replayed requests */
53         if (req->rq_level == LUSTRE_CONN_RECOVD)
54                 RETURN(0);
55
56         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
57
58         spin_lock(&mds->mds_last_lock);
59         last_rcvd = ++mds->mds_last_rcvd;
60         spin_unlock(&mds->mds_last_lock);
61         req->rq_repmsg->transno = HTON__u64(last_rcvd);
62         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
63         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
64         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
65
66         mds_fs_set_last_rcvd(mds, handle);
67         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
68         CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = "
69                "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc);
70
71         if (rc == sizeof(*mcd))
72                 rc = 0;
73         else {
74                 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
75                 if (rc >= 0)
76                         rc = -EIO;
77         }
78
79         return rc;
80 }
81
82 /* In the write-back case, the client holds a lock on a subtree.
83  * In the intent case, the client holds a lock on the child inode.
84  * In the pathname case, the client (may) hold a lock on the child inode. */
85 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
86                              struct ptlrpc_request *req)
87 {
88         struct mds_obd *mds = mds_req2mds(req);
89         struct obd_device *obd = req->rq_export->exp_obd;
90         struct mds_body *body;
91         struct dentry *de;
92         struct inode *inode;
93         void *handle;
94         struct lustre_handle child_lockh;
95         int rc = 0, err;
96
97         if (req->rq_reqmsg->bufcount > offset + 1) {
98                 struct dentry *dir;
99                 struct lustre_handle dir_lockh;
100                 char *name;
101                 int namelen;
102
103                 /* a name was supplied by the client; fid1 is the directory */
104                 dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
105                                             &dir_lockh);
106                 if (IS_ERR(dir)) {
107                         LBUG();
108                         GOTO(out_setattr, rc = PTR_ERR(dir));
109                 }
110
111                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
112                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
113                 de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
114                                             0, &child_lockh, LCK_PR);
115                 l_dput(dir);
116                 if (IS_ERR(de)) {
117                         LBUG();
118                         GOTO(out_setattr_de, rc = PTR_ERR(de));
119                 }
120         } else {
121                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
122                 if (!de || IS_ERR(de)) {
123                         LBUG();
124                         GOTO(out_setattr_de, rc = -ESTALE);
125                 }
126         }
127         inode = de->d_inode;
128         CDEBUG(D_INODE, "ino %ld\n", inode->i_ino);
129
130         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, 
131                        to_kdev_t(inode->i_sb->s_dev));
132
133         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
134         if (!handle)
135                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
136
137         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
138
139         if (offset) {
140                 body = lustre_msg_buf(req->rq_repmsg, 1);
141                 mds_pack_inode2fid(&body->fid1, inode);
142                 mds_pack_inode2body(body, inode);
143         }
144
145         if (!rc)
146                 rc = mds_update_last_rcvd(mds, handle, req);
147
148         err = mds_fs_commit(mds, de->d_inode, handle);
149         if (err) {
150                 CERROR("error on commit: err = %d\n", err);
151                 if (!rc)
152                         rc = err;
153         }
154
155         EXIT;
156 out_setattr_de:
157         l_dput(de);
158 out_setattr:
159         req->rq_status = rc;
160         return 0;
161 }
162
163 static int mds_reint_recreate(struct mds_update_record *rec, int offset,
164                               struct ptlrpc_request *req)
165 {
166         struct dentry *de = NULL;
167         struct mds_obd *mds = mds_req2mds(req);
168         struct dentry *dchild = NULL;
169         struct inode *dir;
170         int rc = 0;
171         ENTRY;
172
173         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
174         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
175                 LBUG();
176                 GOTO(out_create_de, rc = -ESTALE);
177         }
178         dir = de->d_inode;
179         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
180
181         down(&dir->i_sem);
182         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
183         if (IS_ERR(dchild)) {
184                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
185                 up(&dir->i_sem);
186                 LBUG();
187                 GOTO(out_create_dchild, rc = -ESTALE);
188         }
189
190         if (dchild->d_inode) {
191                 struct mds_body *body;
192                 rc = 0;
193                 body = lustre_msg_buf(req->rq_repmsg, 0);
194                 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
195                 mds_pack_inode2body(body, dchild->d_inode);
196         } else {
197                 CERROR("child doesn't exist (dir %ld, name %s)\n",
198                        dir->i_ino, rec->ur_name);
199                 rc = -ENOENT;
200                 LBUG();
201         }
202
203 out_create_dchild:
204         l_dput(dchild);
205         up(&dir->i_sem);
206 out_create_de:
207         l_dput(de);
208         req->rq_status = rc;
209         return 0;
210 }
211
212 static int mds_reint_create(struct mds_update_record *rec, int offset,
213                             struct ptlrpc_request *req)
214 {
215         struct dentry *de = NULL;
216         struct mds_obd *mds = mds_req2mds(req);
217         struct obd_device *obd = req->rq_export->exp_obd;
218         struct dentry *dchild = NULL;
219         struct inode *dir;
220         void *handle;
221         struct lustre_handle lockh;
222         int rc = 0, err, lock_mode, type = rec->ur_mode & S_IFMT;
223         ENTRY;
224
225         /* requests were at offset 2, replies go back at 1 */
226         if (offset)
227                 offset = 1;
228
229         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
230
231         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
232
233         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
234                 GOTO(out_create, rc = -ESTALE);
235
236         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
237         if (IS_ERR(de)) {
238                 rc = PTR_ERR(de);
239                 CERROR("parent lookup error %d\n", rc);
240                 LBUG();
241                 GOTO(out_create, rc);
242         }
243         dir = de->d_inode;
244         CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n",
245                dir->i_ino, rec->ur_name, rec->ur_mode);
246
247         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
248
249         down(&dir->i_sem);
250         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
251         if (IS_ERR(dchild)) {
252                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
253                 LBUG();
254                 GOTO(out_create_de, rc = -ESTALE);
255         }
256
257         if (dchild->d_inode) {
258                 struct mds_body *body;
259                 struct inode *inode = dchild->d_inode;
260
261                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
262                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
263
264                 body = lustre_msg_buf(req->rq_repmsg, offset);
265                 mds_pack_inode2fid(&body->fid1, inode);
266                 mds_pack_inode2body(body, inode);
267                 if (S_ISREG(inode->i_mode)) {
268                         struct lov_mds_md *lmm;
269
270                         lmm = lustre_msg_buf(req->rq_repmsg, offset + 1);
271                         lmm->lmm_easize = mds->mds_max_mdsize;
272
273                         if (mds_fs_get_md(mds, inode, lmm) < 0) {
274                                 CDEBUG(D_INFO,"No md for %ld: rc %d\n",
275                                        inode->i_ino, rc);
276                                 memset(lmm, 0, lmm->lmm_easize);
277                         } else
278                                 body->valid |= OBD_MD_FLEASIZE;
279                 }
280                 /* now a normal case for intent locking */
281                 GOTO(out_create_dchild, rc = -EEXIST);
282         }
283
284         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, 
285                        to_kdev_t(dir->i_sb->s_dev));
286
287         if (dir->i_mode & S_ISGID) {
288                 rec->ur_gid = dir->i_gid;
289                 if (S_ISDIR(rec->ur_mode))
290                         rec->ur_mode |= S_ISGID;
291         }
292
293         switch (type) {
294         case S_IFREG:{
295                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
296                 if (!handle)
297                         GOTO(out_create_dchild, PTR_ERR(handle));
298                 rc = vfs_create(dir, dchild, rec->ur_mode);
299                 EXIT;
300                 break;
301         }
302         case S_IFDIR:{
303                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
304                 if (!handle)
305                         GOTO(out_create_dchild, PTR_ERR(handle));
306                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
307                 EXIT;
308                 break;
309         }
310         case S_IFLNK:{
311                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
312                 if (!handle)
313                         GOTO(out_create_dchild, PTR_ERR(handle));
314                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
315                 EXIT;
316                 break;
317         }
318         case S_IFCHR:
319         case S_IFBLK:
320         case S_IFIFO:
321         case S_IFSOCK:{
322                 int rdev = rec->ur_rdev;
323                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
324                 if (!handle)
325                         GOTO(out_create_dchild, PTR_ERR(handle));
326                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
327                 EXIT;
328                 break;
329         }
330         default:
331                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
332                 GOTO(out_create_dchild, rc = -EINVAL);
333         }
334
335         if (rc) {
336                 CDEBUG(D_INODE, "error during create: %d\n", rc);
337                 GOTO(out_create_commit, rc);
338         } else {
339                 struct iattr iattr;
340                 struct inode *inode = dchild->d_inode;
341                 struct mds_body *body;
342
343                 CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
344
345                 iattr.ia_atime = rec->ur_time;
346                 iattr.ia_ctime = rec->ur_time;
347                 iattr.ia_mtime = rec->ur_time;
348                 iattr.ia_uid = rec->ur_uid;
349                 iattr.ia_gid = rec->ur_gid;
350                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
351                         ATTR_MTIME | ATTR_CTIME;
352
353                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
354                 if (rc) {
355                         CERROR("error on setattr: rc = %d\n", rc);
356                         /* XXX should we abort here in case of error? */
357                 }
358
359                 rc = mds_update_last_rcvd(mds, handle, req);
360                 if (rc) {
361                         CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
362                         GOTO(out_create_unlink, rc);
363                 }
364
365                 body = lustre_msg_buf(req->rq_repmsg, offset);
366                 mds_pack_inode2fid(&body->fid1, inode);
367                 mds_pack_inode2body(body, inode);
368         }
369         EXIT;
370 out_create_commit:
371         err = mds_fs_commit(mds, dir, handle);
372         if (err) {
373                 CERROR("error on commit: err = %d\n", err);
374                 if (!rc)
375                         rc = err;
376         }
377 out_create_dchild:
378         l_dput(dchild);
379         ldlm_lock_decref(&lockh, lock_mode);
380 out_create_de:
381         up(&dir->i_sem);
382         l_dput(de);
383 out_create:
384         req->rq_status = rc;
385         return 0;
386
387 out_create_unlink:
388         /* Destroy the file we just created.  This should not need extra
389          * journal credits, as we have already modified all of the blocks
390          * needed in order to create the file in the first place.
391          */
392         switch (type) {
393         case S_IFDIR:
394                 err = vfs_rmdir(dir, dchild);
395                 if (err)
396                         CERROR("failed rmdir in error path: rc = %d\n", err);
397                 break;
398         default:
399                 err = vfs_unlink(dir, dchild);
400                 if (err)
401                         CERROR("failed unlink in error path: rc = %d\n", err);
402                 break;
403         }
404
405         goto out_create_commit;
406 }
407
408 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
409                             struct ptlrpc_request *req)
410 {
411         struct dentry *de = NULL;
412         struct dentry *dchild = NULL;
413         struct mds_obd *mds = mds_req2mds(req);
414         struct obd_device *obd = req->rq_export->exp_obd;
415         struct mds_body *body = NULL;
416         char *name;
417         struct inode *dir, *inode;
418         struct lustre_handle lockh, child_lockh;
419         void *handle;
420         int namelen, lock_mode, err, rc = 0;
421         ENTRY;
422
423         /* a name was supplied by the client; fid1 is the directory */
424         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
425         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh);
426         if (IS_ERR(de)) {
427                 LBUG();
428                 RETURN(PTR_ERR(de));
429         }
430
431         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
432                 GOTO(out_unlink, rc = -ENOENT);
433
434         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
435         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
436 #warning "FIXME: if mds_name2locked_dentry decrefs this lock, we must not"
437         memcpy(&child_lockh, &lockh, sizeof(child_lockh));
438         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
439                                         LCK_EX, &child_lockh, lock_mode);
440
441         if (IS_ERR(dchild)) {
442                 LBUG();
443                 GOTO(out_unlink, rc = PTR_ERR(dchild));
444         }
445
446         dir = de->d_inode;
447         inode = dchild->d_inode;
448         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
449
450         if (!inode) {
451                 CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s\n",
452                        dir->i_ino, rec->ur_name);
453                 /* going to out_unlink_cancel causes an LBUG, don't know why */
454                 GOTO(out_unlink_dchild, rc = -ENOENT);
455         }
456
457         if (offset) {
458                 /* XXX offset? */
459                 offset = 1;
460
461                 body = lustre_msg_buf(req->rq_repmsg, offset);
462                 mds_pack_inode2fid(&body->fid1, inode);
463                 mds_pack_inode2body(body, inode);
464         }
465
466         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, 
467                        to_kdev_t(dir->i_sb->s_dev));
468
469         switch (rec->ur_mode /* & S_IFMT ? */) {
470         case S_IFDIR:
471                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
472                 if (!handle)
473                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
474                 rc = vfs_rmdir(dir, dchild);
475                 break;
476         case S_IFREG:
477                 /* get OBD EA data first so client can also destroy object */
478                 if ((inode->i_mode & S_IFMT) == S_IFREG && offset) {
479                         struct lov_mds_md *lmm;
480
481                         lmm = lustre_msg_buf(req->rq_repmsg, offset + 1);
482                         lmm->lmm_easize = mds->mds_max_mdsize;
483                         if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
484                                 CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n",
485                                        inode->i_ino, rc);
486                                 memset(lmm, 0, lmm->lmm_easize);
487                         } else
488                                 body->valid |= OBD_MD_FLEASIZE;
489                 }
490                 /* no break */
491         case S_IFLNK:
492         case S_IFCHR:
493         case S_IFBLK:
494         case S_IFIFO:
495         case S_IFSOCK:
496                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
497                 if (!handle)
498                         GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
499                 rc = vfs_unlink(dir, dchild);
500                 break;
501         default:
502                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
503                 handle = NULL;
504                 LBUG();
505                 GOTO(out_unlink_cancel, rc = -EINVAL);
506         }
507
508         if (!rc)
509                 rc = mds_update_last_rcvd(mds, handle, req);
510         err = mds_fs_commit(mds, dir, handle);
511         if (err) {
512                 CERROR("error on commit: err = %d\n", err);
513                 if (!rc)
514                         rc = err;
515         }
516
517         EXIT;
518
519 out_unlink_cancel:
520         ldlm_lock_decref(&child_lockh, LCK_EX);
521         err = ldlm_cli_cancel(&child_lockh);
522         if (err < 0) {
523                 CERROR("failed to cancel child inode lock: err = %d\n", err);
524                 if (!rc)
525                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
526         }
527 out_unlink_dchild:
528         l_dput(dchild);
529         up(&dir->i_sem);
530 out_unlink:
531         ldlm_lock_decref(&lockh, lock_mode);
532         l_dput(de);
533         req->rq_status = rc;
534         return 0;
535 }
536
537 static int mds_reint_link(struct mds_update_record *rec, int offset,
538                           struct ptlrpc_request *req)
539 {
540         struct obd_device *obd = req->rq_export->exp_obd;
541         struct dentry *de_src = NULL;
542         struct dentry *de_tgt_dir = NULL;
543         struct dentry *dchild = NULL;
544         struct mds_obd *mds = mds_req2mds(req);
545         struct lustre_handle *handle, tgtlockh, srclockh;
546         int lock_mode;
547         __u64 res_id[3] = { 0 };
548         int flags = 0;
549         int rc = 0, err;
550
551         ENTRY;
552         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
553         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
554                 GOTO(out_link, rc = -ESTALE);
555         }
556
557         /* plan to change the link count on this inode: write lock */
558         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
559         res_id[0] = de_src->d_inode->i_ino;
560
561         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
562                              NULL, 0, lock_mode, &srclockh);
563         if (rc == 0) {
564                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
565                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
566                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
567                                       &flags, ldlm_completion_ast,
568                                       mds_blocking_ast, NULL, 0, &srclockh);
569                 if (rc != ELDLM_OK) {
570                         CERROR("lock enqueue: err: %d\n", rc);
571                         GOTO(out_link_src_put, rc = -EIO);
572                 }
573         } else
574                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
575
576         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
577         if (IS_ERR(de_tgt_dir)) {
578                 GOTO(out_link_src, rc = -ESTALE);
579         }
580
581         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
582         res_id[0] = de_tgt_dir->d_inode->i_ino;
583
584         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
585                              NULL, 0, lock_mode, &tgtlockh);
586         if (rc == 0) {
587                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
588                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
589                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
590                                       &flags, ldlm_completion_ast,
591                                       mds_blocking_ast, NULL, 0, &tgtlockh);
592                 if (rc != ELDLM_OK) {
593                         CERROR("lock enqueue: err: %d\n", rc);
594                         GOTO(out_link_tgt_dir_put, rc = -EIO);
595                 }
596         } else
597                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
598
599         down(&de_tgt_dir->d_inode->i_sem);
600         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
601         if (IS_ERR(dchild)) {
602                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
603                 GOTO(out_link_tgt_dir, rc = -ESTALE);
604         }
605
606         if (dchild->d_inode) {
607                 struct inode *inode = dchild->d_inode;
608                 /* in intent case ship back attributes to client */
609                 if (offset) {
610                         struct mds_body *body =
611                                 lustre_msg_buf(req->rq_repmsg, 1);
612
613                         mds_pack_inode2fid(&body->fid1, inode);
614                         mds_pack_inode2body(body, inode);
615                         if (S_ISREG(inode->i_mode)) {
616                                 struct lov_mds_md *lmm;
617
618                                 lmm = lustre_msg_buf(req->rq_repmsg, 2);
619                                 lmm->lmm_easize = mds->mds_max_mdsize;
620                                 if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
621                                         CDEBUG(D_INFO,"No md for %ld: rc %d\n",
622                                                inode->i_ino, rc);
623                                         memset(lmm, 0, lmm->lmm_easize);
624                                 } else
625                                         body->valid |= OBD_MD_FLEASIZE;
626                         }
627                 }
628                 CERROR("child exists (dir %ld, name %s\n",
629                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
630                 GOTO(out_link_dchild, rc = -EEXIST);
631         }
632
633         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
634                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
635
636         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
637         if (!handle)
638                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
639
640         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
641         if (rc)
642                 CERROR("link error %d\n", rc);
643         if (!rc)
644                 rc = mds_update_last_rcvd(mds, handle, req);
645
646         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
647         if (err) {
648                 CERROR("error on commit: err = %d\n", err);
649                 if (!rc)
650                         rc = err;
651         }
652         EXIT;
653
654
655
656 out_link_dchild:
657         l_dput(dchild);
658 out_link_tgt_dir:
659         ldlm_lock_decref(&tgtlockh, lock_mode);
660 out_link_tgt_dir_put:
661         up(&de_tgt_dir->d_inode->i_sem);
662         l_dput(de_tgt_dir);
663 out_link_src:
664         ldlm_lock_decref(&srclockh, lock_mode);
665 out_link_src_put:
666         l_dput(de_src);
667 out_link:
668         req->rq_status = rc;
669         return 0;
670 }
671
672 static int mds_reint_rename(struct mds_update_record *rec, int offset,
673                             struct ptlrpc_request *req)
674 {
675         struct obd_device *obd = req->rq_export->exp_obd;
676         struct dentry *de_srcdir = NULL;
677         struct dentry *de_tgtdir = NULL;
678         struct dentry *de_old = NULL;
679         struct dentry *de_new = NULL;
680         struct mds_obd *mds = mds_req2mds(req);
681         struct lustre_handle tgtlockh, srclockh, oldhandle;
682         int flags, lock_mode, rc = 0, err;
683         void *handle;
684         __u64 res_id[3] = { 0 };
685         ENTRY;
686
687         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
688         if (IS_ERR(de_srcdir))
689                 GOTO(out_rename, rc = -ESTALE);
690
691         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
692         res_id[0] = de_srcdir->d_inode->i_ino;
693
694         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
695                              NULL, 0, lock_mode, &srclockh);
696         if (rc == 0) {
697                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
698                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
699                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
700                                       &flags, ldlm_completion_ast,
701                                       mds_blocking_ast, NULL, 0, &srclockh);
702                 if (rc != ELDLM_OK) {
703                         CERROR("lock enqueue: err: %d\n", rc);
704                         GOTO(out_rename_srcput, rc = -EIO);
705                 }
706         } else
707                 ldlm_lock_dump((void *)(unsigned long)srclockh.addr);
708
709         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
710         if (IS_ERR(de_tgtdir))
711                 GOTO(out_rename_srcdir, rc = -ESTALE);
712
713         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_PW : LCK_PW;
714         res_id[0] = de_tgtdir->d_inode->i_ino;
715
716         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
717                              NULL, 0, lock_mode, &tgtlockh);
718         if (rc == 0) {
719                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
720                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
721                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
722                                       &flags, ldlm_completion_ast,
723                                       mds_blocking_ast, NULL, 0, &tgtlockh);
724                 if (rc != ELDLM_OK) {
725                         CERROR("lock enqueue: err: %d\n", rc);
726                         GOTO(out_rename_tgtput, rc = -EIO);
727                 }
728         } else
729                 ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr);
730
731 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
732         double_lock(de_tgtdir, de_srcdir);
733 #endif
734         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
735         if (IS_ERR(de_old)) {
736                 CERROR("old child lookup error (%*s): %ld\n",
737                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
738                 GOTO(out_rename_tgtdir, rc = -ENOENT);
739         }
740
741         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
742         if (IS_ERR(de_new)) {
743                 CERROR("new child lookup error (%*s): %ld\n",
744                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
745                 GOTO(out_rename_deold, rc = -ENOENT);
746         }
747
748         /* in intent case ship back attributes to client */
749         if (offset) {
750                 struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
751                 struct inode *inode = de_new->d_inode;
752
753                 if (!inode) {
754                         body->valid = 0;
755                 } else {
756                         mds_pack_inode2fid(&body->fid1, inode);
757                         mds_pack_inode2body(body, inode);
758                         if (S_ISREG(inode->i_mode)) {
759                                 struct lov_mds_md *lmm;
760
761                                 lmm = lustre_msg_buf(req->rq_repmsg, 2);
762                                 lmm->lmm_easize = mds->mds_max_mdsize;
763                                 if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) {
764                                         CDEBUG(D_INFO,"No md for %ld: rc %d\n",
765                                                inode->i_ino, rc);
766                                         memset(lmm, 0, lmm->lmm_easize);
767                                 } else
768                                         body->valid |= OBD_MD_FLEASIZE;
769                         }
770                 }
771         }
772
773         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
774                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
775
776         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
777         if (!handle)
778                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
779         lock_kernel();
780         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
781                         NULL);
782         unlock_kernel();
783
784         if (!rc)
785                 rc = mds_update_last_rcvd(mds, handle, req);
786
787         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
788         if (err) {
789                 CERROR("error on commit: err = %d\n", err);
790                 if (!rc)
791                         rc = err;
792         }
793         EXIT;
794
795 out_rename_denew:
796         l_dput(de_new);
797 out_rename_deold:
798         if (!rc) {
799                 res_id[0] = de_old->d_inode->i_ino;
800                 /* Take an exclusive lock on the resource that we're
801                  * about to free, to force everyone to drop their
802                  * locks. */
803                 LDLM_DEBUG_NOLOCK("getting EX lock res "LPU64, res_id[0]);
804                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
805                                       res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
806                                       &flags, ldlm_completion_ast,
807                                       mds_blocking_ast, NULL, 0, &oldhandle);
808                 if (rc)
809                         CERROR("failed to get child inode lock (child ino "
810                                LPD64" dir ino %ld)\n",
811                                res_id[0], de_old->d_inode->i_ino);
812         }
813
814         l_dput(de_old);
815
816         if (!rc) {
817                 ldlm_lock_decref(&oldhandle, LCK_EX);
818                 rc = ldlm_cli_cancel(&oldhandle);
819                 if (rc < 0)
820                         CERROR("failed to cancel child inode lock ino "
821                                LPD64": %d\n", res_id[0], rc);
822         }
823 out_rename_tgtdir:
824 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
825         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
826 #endif
827         ldlm_lock_decref(&tgtlockh, lock_mode);
828 out_rename_tgtput:
829         l_dput(de_tgtdir);
830 out_rename_srcdir:
831         ldlm_lock_decref(&srclockh, lock_mode);
832 out_rename_srcput:
833         l_dput(de_srcdir);
834 out_rename:
835         req->rq_status = rc;
836         return 0;
837 }
838
839 typedef int (*mds_reinter) (struct mds_update_record *, int offset,
840                             struct ptlrpc_request *);
841
842 static mds_reinter reinters[REINT_MAX + 1] = {
843         [REINT_SETATTR] mds_reint_setattr,
844         [REINT_CREATE] mds_reint_create,
845         [REINT_UNLINK] mds_reint_unlink,
846         [REINT_LINK] mds_reint_link,
847         [REINT_RENAME] mds_reint_rename,
848         [REINT_RECREATE] mds_reint_recreate,
849 };
850
851 int mds_reint_rec(struct mds_update_record *rec, int offset,
852                   struct ptlrpc_request *req)
853 {
854         struct mds_obd *mds = mds_req2mds(req);
855         struct obd_run_ctxt saved;
856         struct obd_ucred uc;
857
858         int rc;
859
860         if (rec->ur_opcode < 1 || rec->ur_opcode > REINT_MAX) {
861                 CERROR("opcode %d not valid\n", rec->ur_opcode);
862                 rc = req->rq_status = -EINVAL;
863                 RETURN(rc);
864         }
865
866         uc.ouc_fsuid = rec->ur_fsuid;
867         uc.ouc_fsgid = rec->ur_fsgid;
868
869         push_ctxt(&saved, &mds->mds_ctxt, &uc);
870         rc = reinters[rec->ur_opcode] (rec, offset, req);
871         pop_ctxt(&saved);
872
873         return rc;
874 }