Whamcloud - gitweb
- Fixed some screwy ldlm resource refcount issues
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *
6  *  Lustre Metadata Server (mds) reintegration routines
7  *
8  *  Copyright (C) 2002  Cluster File Systems, Inc.
9  *  author: Peter Braam <braam@clusterfs.com>
10  *
11  *  This code is issued under the GNU General Public License.
12  *  See the file COPYING in this distribution
13  *
14  */
15
16 // XXX - add transaction sequence numbers
17
18 #define EXPORT_SYMTAB
19 #define DEBUG_SUBSYSTEM S_MDS
20
21 #include <linux/obd_support.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd.h>
24 #include <linux/lustre_lib.h>
25 #include <linux/lustre_idl.h>
26 #include <linux/lustre_mds.h>
27 #include <linux/lustre_dlm.h>
28 #include <linux/obd_class.h>
29
30 struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid)
31 {
32         struct list_head *p;
33
34         if (!uuid)
35                 return NULL;
36
37         list_for_each(p, &mds->mds_client_info) {
38                 struct mds_client_info *mci;
39
40                 mci = list_entry(p, struct mds_client_info, mci_list);
41                 CDEBUG(D_INFO, "checking client UUID '%s'\n",
42                        mci->mci_mcd->mcd_uuid);
43                 if (!strncmp(mci->mci_mcd->mcd_uuid, uuid,
44                              sizeof(mci->mci_mcd->mcd_uuid)))
45                         return mci;
46         }
47         CDEBUG(D_INFO, "no mds client info found for  UUID '%s'\n", uuid);
48         return NULL;
49 }
50
51 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
52                          struct ptlrpc_request *req)
53 {
54         /* get from req->rq_connection-> or req->rq_client */
55         struct obd_run_ctxt saved;
56         struct mds_client_info *mci;
57         loff_t off;
58         int rc;
59
60         mci = mds_uuid_to_mci(mds, req->rq_connection->c_remote_uuid);
61         if (!mci) {
62                 CERROR("unable to locate MDS client data for UUID '%s'\n",
63                        ptlrpc_req_to_uuid(req));
64                 /* This will be a real error once everything is working */
65                 //LBUG();
66                 RETURN(0);
67         }
68
69         off = MDS_LR_CLIENT + mci->mci_off * MDS_LR_SIZE;
70
71         ++mds->mds_last_rcvd;   /* lock this, or make it an LDLM function? */
72         req->rq_repmsg->transno = HTON__u64(mds->mds_last_rcvd);
73         mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
74         mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
75         mci->mci_mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
76
77         mds_fs_set_last_rcvd(mds, handle);
78         push_ctxt(&saved, &mds->mds_ctxt);
79         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mci->mci_mcd,
80                            sizeof(*mci->mci_mcd), &off);
81         pop_ctxt(&saved);
82         CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
83                mds->mds_last_rcvd, mci->mci_mcd->mcd_uuid, mci->mci_off, rc);
84         // store new value and last committed value in req struct
85
86         if (rc == sizeof(*mci->mci_mcd))
87                 rc = 0;
88         else {
89                 CERROR("error writing to last_rcvd file: rc = %d\n", rc);
90                 if (rc >= 0)
91                         rc = -EIO;
92         }
93
94         return rc;
95 }
96
97 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
98                              struct ptlrpc_request *req)
99 {
100         struct mds_obd *mds = &req->rq_obd->u.mds;
101         struct dentry *de;
102         void *handle;
103         int rc = 0;
104         int err;
105
106         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
107         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) {
108                 GOTO(out_setattr, rc = -ESTALE);
109         }
110
111         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
112
113         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
114                        de->d_inode->i_sb->s_dev);
115
116         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
117         if (!handle)
118                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
119         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
120
121         if (!rc)
122                 rc = mds_update_last_rcvd(mds, handle, req);
123
124         err = mds_fs_commit(mds, de->d_inode, handle);
125         if (err) {
126                 CERROR("error on commit: err = %d\n", err);
127                 if (!rc)
128                         rc = err;
129         }
130         EXIT;
131 out_setattr_de:
132         l_dput(de);
133 out_setattr:
134         req->rq_status = rc;
135         return(0);
136 }
137
138 static int mds_reint_recreate(struct mds_update_record *rec, int offset,
139                             struct ptlrpc_request *req)
140 {
141         struct dentry *de = NULL;
142         struct mds_obd *mds = &req->rq_obd->u.mds;
143         struct dentry *dchild = NULL;
144         struct inode *dir;
145         int rc = 0;
146         ENTRY;
147
148         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
149         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
150                 LBUG();
151                 GOTO(out_create_de, rc = -ESTALE);
152         }
153         dir = de->d_inode;
154         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
155
156         down(&dir->i_sem);
157         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
158         if (IS_ERR(dchild)) {
159                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
160                 up(&dir->i_sem);
161                 LBUG();
162                 GOTO(out_create_dchild, rc = -ESTALE);
163         }
164
165         if (dchild->d_inode) {
166                 struct mds_body *body;
167                 rc = 0;
168                 body = lustre_msg_buf(req->rq_repmsg, 0);
169                 body->ino = dchild->d_inode->i_ino;
170                 body->generation = dchild->d_inode->i_generation;
171         } else {
172                 CERROR("child doesn't exist (dir %ld, name %s)\n",
173                        dir->i_ino, rec->ur_name);
174                 rc = -ENOENT;
175                 LBUG();
176         }
177
178 out_create_dchild:
179         l_dput(dchild);
180         up(&dir->i_sem);
181 out_create_de:
182         l_dput(de);
183         req->rq_status = rc;
184         return 0;
185 }
186
187 static int mds_reint_create(struct mds_update_record *rec, int offset,
188                             struct ptlrpc_request *req)
189 {
190         struct dentry *de = NULL;
191         struct mds_obd *mds = &req->rq_obd->u.mds;
192         struct dentry *dchild = NULL;
193         struct inode *dir;
194         void *handle;
195         struct ldlm_lock *lock;
196         struct lustre_handle lockh;
197         int rc = 0, err, flags, lock_mode, type = rec->ur_mode & S_IFMT;
198         __u64 res_id[3] = {0,0,0};
199         ENTRY;
200
201         /* requests were at offset 2, replies go back at 1 */
202         if (offset)
203                 offset = 1;
204
205         if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0)
206                 LBUG();
207
208         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
209         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
210                 LBUG();
211                 GOTO(out_create_de, rc = -ESTALE);
212         }
213         dir = de->d_inode;
214         CDEBUG(D_INODE, "parent ino %ld name %s mode %o\n",
215                dir->i_ino, rec->ur_name, rec->ur_mode);
216
217         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
218         res_id[0] = dir->i_ino;
219
220         rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
221                                    NULL, 0, lock_mode, &lockh);
222         if (rc == 0) {
223                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
224                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
225                                       NULL, mds->mds_local_namespace, NULL,
226                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
227                                       &flags, (void *)mds_lock_callback, NULL,
228                                       0, &lockh);
229                 if (rc != ELDLM_OK) {
230                         CERROR("lock enqueue: err: %d\n", rc);
231                         GOTO(out_create_de, rc = -EIO);
232                 }
233         } else {
234                 lock = lustre_handle2object(&lockh);
235                 LDLM_DEBUG(lock, "matched");
236         }
237         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
238
239         down(&dir->i_sem);
240         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
241         if (IS_ERR(dchild)) {
242                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
243                 up(&dir->i_sem);
244                 LBUG();
245                 GOTO(out_create_dchild, rc = -ESTALE);
246         }
247
248         if (dchild->d_inode) {
249                 struct mds_body *body;
250                 struct obdo *obdo;
251                 struct inode *inode = dchild->d_inode;
252                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
253                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
254
255                 body = lustre_msg_buf(req->rq_repmsg, offset);
256                 mds_pack_inode2fid(&body->fid1, inode);
257                 mds_pack_inode2body(body, inode);
258 #warning FIXME: This ext3/N-specific code does not belong here
259                 /* If i_file_acl is set, this inode has an EA */
260                 if (S_ISREG(inode->i_mode) && inode->u.ext3_i.i_file_acl) {
261                         obdo = lustre_msg_buf(req->rq_repmsg, offset + 1);
262                         mds_fs_get_obdo(mds, inode, obdo);
263                 }
264                 /* now a normal case for intent locking */
265                 GOTO(out_create_dchild, rc = -EEXIST);
266         }
267
268         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb->s_dev);
269
270         switch (type) {
271         case S_IFREG: {
272                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
273                 if (!handle)
274                         GOTO(out_create_dchild, PTR_ERR(handle));
275                 rc = vfs_create(dir, dchild, rec->ur_mode);
276                 EXIT;
277                 break;
278         }
279         case S_IFDIR: {
280                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
281                 if (!handle)
282                         GOTO(out_create_dchild, PTR_ERR(handle));
283                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
284                 EXIT;
285                 break;
286         }
287         case S_IFLNK: {
288                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
289                 if (!handle)
290                         GOTO(out_create_dchild, PTR_ERR(handle));
291                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
292                 EXIT;
293                 break;
294         }
295         case S_IFCHR:
296         case S_IFBLK:
297         case S_IFIFO:
298         case S_IFSOCK: {
299                 int rdev = rec->ur_rdev;
300                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
301                 if (!handle)
302                         GOTO(out_create_dchild, PTR_ERR(handle));
303                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
304                 EXIT;
305                 break;
306         }
307         default:
308                 CERROR("bad file type %d for create of %s\n",type,rec->ur_name);
309                 GOTO(out_create_dchild, rc = -EINVAL);
310         }
311
312         if (rc) {
313                 CERROR("error during create: %d\n", rc);
314                 if (rc != -ENOSPC) LBUG();
315                 GOTO(out_create_commit, rc);
316         } else {
317                 struct iattr iattr;
318                 struct inode *inode = dchild->d_inode;
319                 struct mds_body *body;
320
321                 CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
322                 if (!offset && type == S_IFREG) {
323                         struct obdo *obdo;
324                         obdo = lustre_msg_buf(req->rq_reqmsg, 2);
325                         rc = mds_fs_set_obdo(mds, inode, handle, obdo);
326                         if (rc)
327                                 CERROR("error %d setting objid for %ld\n",
328                                        rc, inode->i_ino);
329                 }
330
331                 iattr.ia_atime = rec->ur_time;
332                 iattr.ia_ctime = rec->ur_time;
333                 iattr.ia_mtime = rec->ur_time;
334                 iattr.ia_uid = rec->ur_uid;
335                 iattr.ia_gid = rec->ur_gid;
336                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
337                         ATTR_MTIME | ATTR_CTIME;
338
339                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
340                 if (rc) {
341                         CERROR("error on setattr: rc = %d\n", rc);
342                         /* XXX should we abort here in case of error? */
343                 }
344
345                 rc = mds_update_last_rcvd(mds, handle, req);
346                 if (rc) {
347                         CERROR("error on update_last_rcvd: rc = %d\n", rc);
348                         /* XXX should we abort here in case of error? */
349                 }
350
351                 body = lustre_msg_buf(req->rq_repmsg, offset);
352                 body->ino = inode->i_ino;
353                 body->generation = inode->i_generation;
354         }
355         EXIT;
356 out_create_commit:
357         err = mds_fs_commit(mds, dir, handle);
358         if (err) {
359                 CERROR("error on commit: err = %d\n", err);
360                 if (!rc)
361                         rc = err;
362         }
363 out_create_dchild:
364         l_dput(dchild);
365         up(&dir->i_sem);
366         lock = lustre_handle2object(&lockh);
367         ldlm_lock_decref(lock, lock_mode);
368 out_create_de:
369         l_dput(de);
370         req->rq_status = rc;
371         return 0;
372 }
373
374 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
375                             struct ptlrpc_request *req)
376 {
377         struct dentry *de = NULL;
378         struct dentry *dchild = NULL;
379         struct mds_obd *mds = &req->rq_obd->u.mds;
380         struct obdo *obdo;
381         struct inode *dir, *inode;
382         int lock_mode, flags;
383         __u64 res_id[3] = {0};
384         struct lustre_handle lockh;
385         struct ldlm_lock *lock;
386         void *handle;
387         int rc = 0;
388         int err;
389         ENTRY;
390
391         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
392         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
393                 LBUG();
394                 GOTO(out_unlink, rc = -ESTALE);
395         }
396
397         dir = de->d_inode;
398         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
399         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
400         res_id[0] = dir->i_ino;
401
402         rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
403                                    NULL, 0, lock_mode, &lockh);
404         if (rc == 0) {
405                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
406                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
407                                       NULL, mds->mds_local_namespace, NULL,
408                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
409                                       &flags, (void *)mds_lock_callback, NULL,
410                                       0, &lockh);
411                 if (rc != ELDLM_OK) {
412                         CERROR("lock enqueue: err: %d\n", rc);
413                         GOTO(out_unlink_de, rc = -EIO);
414                 }
415         } else {
416                 lock = lustre_handle2object(&lockh);
417                 LDLM_DEBUG(lock, "matched");
418         }
419         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
420
421         down(&dir->i_sem);
422         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
423         if (IS_ERR(dchild)) {
424                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
425                 LBUG();
426                 GOTO(out_unlink_de, rc = -ESTALE);
427         }
428
429         inode = dchild->d_inode;
430         if (!inode) {
431                 CERROR("child doesn't exist (dir %ld, name %s\n",
432                        dir->i_ino, rec->ur_name);
433                 LBUG();
434                 GOTO(out_unlink_dchild, rc = -ESTALE);
435         }
436
437 #if 0 /* in intent case the client doesn't have the inode */
438         if (inode->i_ino != rec->ur_fid2->id) {
439                 CERROR("inode and FID ID do not match (%ld != %Ld)\n",
440                        inode->i_ino, rec->ur_fid2->id);
441                 LBUG();
442                 GOTO(out_unlink_dchild, rc = -ESTALE);
443         }
444         if (inode->i_generation != rec->ur_fid2->generation) {
445                 CERROR("inode and FID GENERATION do not match (%d != %d)\n",
446                        inode->i_generation, rec->ur_fid2->generation);
447                 LBUG();
448                 GOTO(out_unlink_dchild, rc = -ESTALE);
449         }
450 #endif
451
452         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
453
454         switch (dchild->d_inode->i_mode & S_IFMT) {
455         case S_IFDIR:
456                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
457                 if (!handle)
458                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
459                 rc = vfs_rmdir(dir, dchild);
460                 break;
461         default:
462                 if (offset) {
463                         obdo = lustre_msg_buf(req->rq_repmsg, 1); 
464                         rc = mds_fs_get_obdo(mds, inode, obdo);
465                         if (rc < 0)
466                                 CDEBUG(D_INFO, "No obdo for ino %ld err %d\n",
467                                        inode->i_ino, rc);
468                 }
469
470                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
471                 if (!handle)
472                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
473                 rc = vfs_unlink(dir, dchild);
474                 break;
475         }
476
477         if (!rc)
478                 rc = mds_update_last_rcvd(mds, handle, req);
479         err = mds_fs_commit(mds, dir, handle);
480         if (err) {
481                 CERROR("error on commit: err = %d\n", err);
482                 if (!rc)
483                         rc = err;
484         }
485
486         EXIT;
487 out_unlink_dchild:
488         res_id[0] = inode->i_ino;
489         l_dput(dchild);
490 out_unlink_de:
491         up(&dir->i_sem);
492         lock = lustre_handle2object(&lockh);
493         ldlm_lock_decref(lock, lock_mode);
494         if (!rc) { 
495                 /* Take an exclusive lock on the resource that we're
496                  * about to free, to force everyone to drop their
497                  * locks. */
498                 LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
499                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
500                                       NULL, mds->mds_local_namespace, NULL, 
501                                       res_id,
502                                       LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
503                                       (void *)mds_lock_callback, NULL, 0, 
504                                       &lockh);
505                 if (rc) 
506                         CERROR("failed to get child inode lock (child ino %Ld, "
507                                "dir ino %ld)\n",
508                                res_id[0], de->d_inode->i_ino);
509         }
510
511         l_dput(de);
512
513         if (!rc) { 
514                 lock = lustre_handle2object(&lockh);
515                 ldlm_lock_decref(lock, LCK_EX);
516                 rc = ldlm_cli_cancel(lock->l_client, lock);
517                 if (rc < 0)
518                         CERROR("failed to cancel child inode lock ino "
519                                "%Ld: %d\n", res_id[0], rc);
520         }
521
522 out_unlink:
523         req->rq_status = rc;
524         return 0;
525 }
526
527 static int mds_reint_link(struct mds_update_record *rec, int offset,
528                             struct ptlrpc_request *req)
529 {
530         struct dentry *de_src = NULL;
531         struct dentry *de_tgt_dir = NULL;
532         struct dentry *dchild = NULL;
533         struct mds_obd *mds = &req->rq_obd->u.mds;
534         void *handle;
535         int rc = 0;
536         int err;
537
538         ENTRY;
539         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
540         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
541                 GOTO(out_link, rc = -ESTALE);
542         }
543
544         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
545         if (IS_ERR(de_tgt_dir)) {
546                 GOTO(out_link_de_src, rc = -ESTALE);
547         }
548
549         down(&de_tgt_dir->d_inode->i_sem);
550         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
551         if (IS_ERR(dchild)) {
552                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
553                 GOTO(out_link_de_tgt_dir, rc = -ESTALE);
554         }
555
556         if (dchild->d_inode) {
557                 CERROR("child exists (dir %ld, name %s\n",
558                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
559                 GOTO(out_link_dchild, rc = -EEXIST);
560         }
561
562         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
563                        de_src->d_inode->i_sb->s_dev);
564
565         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
566         if (!handle)
567                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
568
569         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
570
571         if (!rc)
572                 rc = mds_update_last_rcvd(mds, handle, req);
573
574         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
575         if (err) {
576                 CERROR("error on commit: err = %d\n", err);
577                 if (!rc)
578                         rc = err;
579         }
580         EXIT;
581
582 out_link_dchild:
583         l_dput(dchild);
584 out_link_de_tgt_dir:
585         up(&de_tgt_dir->d_inode->i_sem);
586         l_dput(de_tgt_dir);
587 out_link_de_src:
588         l_dput(de_src);
589 out_link:
590         req->rq_status = rc;
591         return 0;
592 }
593
594 static int mds_reint_rename(struct mds_update_record *rec, int offset,
595                             struct ptlrpc_request *req)
596 {
597         struct dentry *de_srcdir = NULL;
598         struct dentry *de_tgtdir = NULL;
599         struct dentry *de_old = NULL;
600         struct dentry *de_new = NULL;
601         struct mds_obd *mds = &req->rq_obd->u.mds;
602         void *handle;
603         int rc = 0;
604         int err;
605         ENTRY;
606
607         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
608         if (IS_ERR(de_srcdir)) {
609                 GOTO(out_rename, rc = -ESTALE);
610         }
611
612         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
613         if (IS_ERR(de_tgtdir)) {
614                 GOTO(out_rename_srcdir, rc = -ESTALE);
615         }
616
617 #warning FIXME: This needs locking attention
618
619         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
620         if (IS_ERR(de_old)) {
621                 CERROR("old child lookup error %ld\n", PTR_ERR(de_old));
622                 GOTO(out_rename_tgtdir, rc = -ESTALE);
623         }
624
625         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
626         if (IS_ERR(de_new)) {
627                 CERROR("new child lookup error %ld\n", PTR_ERR(de_new));
628                 GOTO(out_rename_deold, rc = -ESTALE);
629         }
630
631         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
632                        de_srcdir->d_inode->i_sb->s_dev);
633
634         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
635         if (!handle)
636                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
637         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
638                         NULL);
639
640         if (!rc)
641                 rc = mds_update_last_rcvd(mds, handle, req);
642
643         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
644         if (err) {
645                 CERROR("error on commit: err = %d\n", err);
646                 if (!rc)
647                         rc = err;
648         }
649         EXIT;
650
651 out_rename_denew:
652         l_dput(de_new);
653 out_rename_deold:
654         l_dput(de_old);
655 out_rename_tgtdir:
656         l_dput(de_tgtdir);
657 out_rename_srcdir:
658         l_dput(de_srcdir);
659 out_rename:
660         req->rq_status = rc;
661         return 0;
662 }
663
664 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
665                            struct ptlrpc_request *);
666
667 static mds_reinter reinters[REINT_MAX+1] = {
668         [REINT_SETATTR]   mds_reint_setattr,
669         [REINT_CREATE]    mds_reint_create,
670         [REINT_UNLINK]    mds_reint_unlink,
671         [REINT_LINK]      mds_reint_link,
672         [REINT_RENAME]    mds_reint_rename,
673         [REINT_RECREATE]  mds_reint_recreate,
674 };
675
676 int mds_reint_rec(struct mds_update_record *rec, int offset,
677                   struct ptlrpc_request *req)
678 {
679         int rc;
680
681         if (rec->ur_opcode < 1 || rec->ur_opcode > REINT_MAX) {
682                 CERROR("opcode %d not valid\n", rec->ur_opcode);
683                 rc = req->rq_status = -EINVAL;
684                 RETURN(rc);
685         }
686
687         rc = reinters[rec->ur_opcode](rec, offset, req);
688
689         return rc;
690 }