Whamcloud - gitweb
- wrong parameter fix
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39
40 static int mdt_md_create(struct mdt_thread_info *info)
41 {
42         struct mdt_device      *mdt = info->mti_mdt;
43         struct mdt_object      *parent;
44         struct mdt_object      *child;
45         struct mdt_lock_handle *lh;
46         struct mdt_body        *repbody;
47         struct md_attr         *ma = &info->mti_attr;
48         struct mdt_reint_record *rr = &info->mti_rr;
49         int rc;
50         ENTRY;
51
52         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
53
54         lh = &info->mti_lh[MDT_LH_PARENT];
55         lh->mlh_mode = LCK_EX;
56
57         parent = mdt_object_find_lock(info, rr->rr_fid1,
58                                       lh, MDS_INODELOCK_UPDATE);
59         if (IS_ERR(parent))
60                 RETURN(PTR_ERR(parent));
61
62         child = mdt_object_find(info->mti_ctxt, mdt, rr->rr_fid2);
63         if (!IS_ERR(child)) {
64                 struct md_object *next = mdt_object_child(parent);
65
66                 ma->ma_need = MA_INODE;
67                 mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
68                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
69
70                 rc = mdo_create(info->mti_ctxt, next, rr->rr_name,
71                                 mdt_object_child(child), &info->mti_spec,
72                                 ma);
73                 if (rc == 0) {
74                         /* return fid & attr to client. */
75                         if (ma->ma_valid & MA_INODE)
76                                 mdt_pack_attr2body(repbody, &ma->ma_attr, 
77                                                    mdt_object_fid(child));
78                 }
79                 mdt_object_put(info->mti_ctxt, child);
80         } else
81                 rc = PTR_ERR(child);
82         mdt_object_unlock_put(info, parent, lh, rc);
83         RETURN(rc);
84 }
85
86 /* partial request to create object only */
87 static int mdt_md_mkobj(struct mdt_thread_info *info)
88 {
89         struct mdt_device      *mdt = info->mti_mdt;
90         struct mdt_object      *o;
91         struct mdt_body        *repbody;
92         struct md_attr         *ma = &info->mti_attr;
93         int rc;
94         ENTRY;
95
96         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
97
98         o = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2);
99         if (!IS_ERR(o)) {
100                 struct md_object *next = mdt_object_child(o);
101
102                 ma->ma_need = MA_INODE;
103                 rc = mo_object_create(info->mti_ctxt, next,
104                                       &info->mti_spec, ma);
105                 if (rc == 0) {
106                         /* return fid & attr to client. */
107                         if (ma->ma_valid & MA_INODE)
108                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
109                                                    mdt_object_fid(o));
110                 }
111                 mdt_object_put(info->mti_ctxt, o);
112         } else
113                 rc = PTR_ERR(o);
114
115         RETURN(rc);
116 }
117
118 /* In the raw-setattr case, we lock the child inode.
119  * In the write-back case or if being called from open,
120  *               the client holds a lock already.
121  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
122  * mdt_setattr_unpack()) flag to tell these cases apart. */
123 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
124 {
125         struct md_attr          *ma = &info->mti_attr;
126         struct mdt_lock_handle  *lh;
127         int som_update = 0;
128         int rc;
129         ENTRY;
130
131         if (info->mti_epoch)
132                 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
133
134         /* Try to avoid object_lock if another epoch has been started
135          * already. */
136         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
137                 RETURN(0);
138         
139         lh = &info->mti_lh[MDT_LH_PARENT];
140         lh->mlh_mode = LCK_EX;
141
142         if (!(flags & MRF_SETATTR_LOCKED)) {
143                 __u64 lockpart = MDS_INODELOCK_UPDATE;
144                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
145                         lockpart |= MDS_INODELOCK_LOOKUP;
146
147                 rc = mdt_object_lock(info, mo, lh, lockpart);
148                 if (rc != 0)
149                         GOTO(out, rc);
150         }
151
152         /* Setattrs are syncronized through dlm lock taken above. If another
153          * epoch started, its attributes may be already flushed on disk,
154          * skip setattr. */
155         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
156                 GOTO(out, rc = 0);
157                 
158         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
159                 GOTO(out, rc = -ENOENT);
160
161         /* all attrs are packed into mti_attr in unpack_setattr */
162         mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
163                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
164
165         /* all attrs are packed into mti_attr in unpack_setattr */
166         rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), ma);
167         if (rc != 0)
168                 GOTO(out, rc);
169
170         /* Re-enable SIZEONMDS. */
171         if (som_update) {
172                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
173                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
174                        mo->mot_epochcount);
175  
176                 mdt_sizeonmds_enable(info, mo);
177         }
178         
179         EXIT;
180 out:
181         mdt_object_unlock(info, mo, lh, rc);
182         return(rc);
183 }
184
185 static int mdt_reint_setattr(struct mdt_thread_info *info,
186                              struct mdt_lock_handle *lhc)
187 {
188         struct md_attr          *ma = &info->mti_attr;
189         struct mdt_reint_record *rr = &info->mti_rr;
190         struct ptlrpc_request   *req = mdt_info_req(info);
191         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
192         struct mdt_file_data    *mfd;
193         struct mdt_object       *mo;
194         struct md_object        *next;
195         struct mdt_body         *repbody;
196         int                      rc;
197
198         ENTRY;
199
200         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
201                   (unsigned int)ma->ma_attr.la_valid);
202
203         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
204         mo = mdt_object_find(info->mti_ctxt, info->mti_mdt, rr->rr_fid1);
205         if (IS_ERR(mo))
206                 RETURN(rc = PTR_ERR(mo));
207
208         if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
209                 /* Truncate case. */
210                 rc = mdt_write_get(info->mti_mdt, mo);
211                 if (rc)
212                         GOTO(out, rc);
213
214                 mfd = mdt_mfd_new();
215                 if (mfd == NULL)
216                         GOTO(out, rc = -ENOMEM);
217                 
218                 /* FIXME: in recovery, need to pass old epoch here */
219                 mdt_epoch_open(info, mo, 0);
220                 repbody->ioepoch = mo->mot_ioepoch;
221
222                 mdt_object_get(info->mti_ctxt, mo);
223                 mfd->mfd_mode = FMODE_EPOCHLCK;
224                 mfd->mfd_object = mo;
225                 mfd->mfd_xid = req->rq_xid;
226
227                 spin_lock(&med->med_open_lock);
228                 list_add(&mfd->mfd_list, &med->med_open_head);
229                 spin_unlock(&med->med_open_lock);
230                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
231         }
232
233         rc = mdt_attr_set(info, mo, rr->rr_flags);
234         if (rc)
235                 GOTO(out, rc);
236
237         if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
238                 LASSERT(info->mti_epoch);
239
240                 /* Size-on-MDS Update. Find and free mfd. */
241                 spin_lock(&med->med_open_lock);
242                 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
243                 if (mfd == NULL) {
244                         spin_unlock(&med->med_open_lock);
245                         CDEBUG(D_INODE, "no handle for file close: "
246                                "fid = "DFID": cookie = "LPX64"\n", 
247                                PFID(info->mti_rr.rr_fid1),
248                                info->mti_epoch->handle.cookie);
249                         GOTO(out, rc = -ESTALE);
250                 }
251
252                 LASSERT(mfd->mfd_mode == FMODE_SOM);
253                 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
254                 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
255
256                 class_handle_unhash(&mfd->mfd_handle);
257                 list_del_init(&mfd->mfd_list);
258                 spin_unlock(&med->med_open_lock);
259                 mdt_mfd_close(info, mfd);
260         }
261
262         ma->ma_need = MA_INODE;
263         next = mdt_object_child(mo);
264         rc = mo_attr_get(info->mti_ctxt, next, ma);
265         if (rc != 0)
266                 GOTO(out, rc);
267
268         mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
269         EXIT;
270 out:
271         mdt_object_put(info->mti_ctxt, mo);
272         return rc;
273 }
274
275 static int mdt_reint_create(struct mdt_thread_info *info,
276                             struct mdt_lock_handle *lhc)
277 {
278         int rc;
279         ENTRY;
280
281         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
282                 RETURN(-ESTALE);
283
284         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
285         case S_IFREG:
286         case S_IFDIR:{
287                 if (strlen(info->mti_rr.rr_name) == 0) {
288                         rc = mdt_md_mkobj(info);
289                         break;
290                 }
291         }
292         case S_IFLNK:
293         case S_IFCHR:
294         case S_IFBLK:
295         case S_IFIFO:
296         case S_IFSOCK:{
297                 /* special file should stay on the same node as parent */
298                 LASSERT(strlen(info->mti_rr.rr_name) > 0);
299                 rc = mdt_md_create(info);
300                 break;
301         }
302         default:
303                 rc = -EOPNOTSUPP;
304         }
305         RETURN(rc);
306 }
307
308
309 static int mdt_reint_unlink(struct mdt_thread_info *info,
310                             struct mdt_lock_handle *lhc)
311 {
312         struct mdt_reint_record *rr = &info->mti_rr;
313         struct ptlrpc_request   *req = mdt_info_req(info);
314         struct md_attr          *ma = &info->mti_attr;
315         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
316         struct mdt_object       *mp;
317         struct mdt_object       *mc;
318         struct mdt_lock_handle  *parent_lh;
319         struct mdt_lock_handle  *child_lh;
320         int                      rc;
321         ENTRY;
322
323         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
324                   rr->rr_name);
325
326         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
327                 GOTO(out, rc = -ENOENT);
328
329         /* step 1: lock the parent */
330         parent_lh = &info->mti_lh[MDT_LH_PARENT];
331         parent_lh->mlh_mode = LCK_EX;
332         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
333                                   MDS_INODELOCK_UPDATE);
334         if (IS_ERR(mp))
335                 GOTO(out, rc = PTR_ERR(mp));
336
337         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
338         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
339                                                &RMF_MDT_MD, RCL_SERVER);
340
341         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
342                                                &RMF_LOGCOOKIES);
343         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
344                                                   &RMF_LOGCOOKIES,
345                                                   RCL_SERVER);
346
347         if (!ma->ma_lmm || !ma->ma_cookie)
348                 GOTO(out_unlock_parent, rc = -EINVAL);
349
350         if (strlen(rr->rr_name) == 0) {
351                 /* remote partial operation */
352                 rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp), ma);
353                 GOTO(out_unlock_parent, rc);
354         }
355
356         /* step 2: find & lock the child */
357         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
358                         rr->rr_name, child_fid);
359         if (rc != 0)
360                  GOTO(out_unlock_parent, rc);
361
362         /* we will lock the child regardless it is local or remote. No harm. */
363         mc = mdt_object_find(info->mti_ctxt, info->mti_mdt, child_fid);
364         if (IS_ERR(mc))
365                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
366         child_lh = &info->mti_lh[MDT_LH_CHILD];
367         child_lh->mlh_mode = LCK_EX;
368         rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
369         if (rc != 0)
370                 GOTO(out_put_child, rc);
371
372         mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
373                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
374
375         /*
376          * Now we can only make sure we need MA_INODE, in mdd layer, will check
377          * whether need MA_LOV and MA_COOKIE.
378          */
379         ma->ma_need = MA_INODE;
380         rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
381                         mdt_object_child(mc), rr->rr_name, ma);
382         if (rc)
383                 GOTO(out_unlock_child, rc);
384
385         mdt_handle_last_unlink(info, mc, ma);
386
387         GOTO(out_unlock_child, rc);
388 out_unlock_child:
389         mdt_object_unlock(info, mc, child_lh, rc);
390 out_put_child:
391         mdt_object_put(info->mti_ctxt, mc);
392 out_unlock_parent:
393         mdt_object_unlock_put(info, mp, parent_lh, rc);
394 out:
395         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
396         return rc;
397 }
398
399 static int mdt_reint_link(struct mdt_thread_info *info,
400                           struct mdt_lock_handle *lhc)
401 {
402         struct mdt_reint_record *rr = &info->mti_rr;
403         struct ptlrpc_request   *req = mdt_info_req(info);
404         struct md_attr          *ma = &info->mti_attr;
405         struct mdt_object       *ms;
406         struct mdt_object       *mp;
407         struct mdt_lock_handle  *lhs;
408         struct mdt_lock_handle  *lhp;
409         int rc;
410
411         ENTRY;
412
413         DEBUG_REQ(D_INODE, req, "link original "DFID" to "DFID" %s",
414                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
415
416         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
417                 RETURN(-ENOENT);
418
419         /* step 1: lock the source */
420         lhs = &info->mti_lh[MDT_LH_PARENT];
421         lhs->mlh_mode = LCK_EX;
422         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, 
423                                   MDS_INODELOCK_UPDATE);
424         if (IS_ERR(ms))
425                 RETURN(PTR_ERR(ms));
426
427         if (strlen(rr->rr_name) == 0) {
428                 /* remote partial operation */
429                 rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
430                 GOTO(out_unlock_source, rc);
431         }
432         /*step 2: find & lock the target parent dir*/
433         lhp = &info->mti_lh[MDT_LH_CHILD];
434         lhp->mlh_mode = LCK_EX;
435         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, 
436                                   MDS_INODELOCK_UPDATE);
437         if (IS_ERR(mp))
438                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
439
440         /* step 4: link it */
441
442         mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
443                        OBD_FAIL_MDS_REINT_LINK_WRITE);
444
445         rc = mdo_link(info->mti_ctxt, mdt_object_child(mp),
446                       mdt_object_child(ms), rr->rr_name, ma);
447         GOTO(out_unlock_target, rc);
448
449 out_unlock_target:
450         mdt_object_unlock_put(info, mp, lhp, rc);
451 out_unlock_source:
452         mdt_object_unlock_put(info, ms, lhs, rc);
453         return rc;
454 }
455
456 /* partial operation for rename */
457 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
458 {
459         struct mdt_reint_record *rr = &info->mti_rr;
460         struct ptlrpc_request   *req = mdt_info_req(info);
461         struct md_attr          *ma = &info->mti_attr;
462         struct mdt_object       *mtgtdir;
463         struct mdt_object       *mtgt = NULL;
464         struct mdt_lock_handle  *lh_tgtdir;
465         struct mdt_lock_handle  *lh_tgt;
466         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
467         int                      rc;
468
469         ENTRY;
470
471         DEBUG_REQ(D_INODE, req, "rename_tgt "DFID" to "DFID" %s",
472                   PFID(rr->rr_fid2),
473                   PFID(rr->rr_fid1), rr->rr_tgt);
474
475         /* step 1: lookup & lock the tgt dir */
476         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
477         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
478         lh_tgtdir->mlh_mode = LCK_EX;
479         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
480                                        MDS_INODELOCK_UPDATE);
481         if (IS_ERR(mtgtdir))
482                 GOTO(out, rc = PTR_ERR(mtgtdir));
483
484         /*step 2: find & lock the target object if exists*/
485         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
486                         rr->rr_tgt, tgt_fid);
487         if (rc != 0 && rc != -ENOENT) {
488                 GOTO(out_unlock_tgtdir, rc);
489         } else if (rc == 0) {
490                 lh_tgt->mlh_mode = LCK_EX;
491
492                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
493                                             MDS_INODELOCK_LOOKUP);
494                 if (IS_ERR(mtgt))
495                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
496
497                 rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
498                                     mdt_object_child(mtgt),
499                                     rr->rr_fid2, rr->rr_tgt, ma);
500         } else /* -ENOENT */ {
501                 rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
502                                      rr->rr_tgt, rr->rr_fid2,
503                                      S_ISDIR(ma->ma_attr.la_mode));
504         }
505
506         /* handle last link of tgt object */
507         if (rc == 0 && mtgt)
508                 mdt_handle_last_unlink(info, mtgt, ma);
509
510         EXIT;
511 out_unlock_tgt:
512         if (mtgt) {
513                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
514         }
515 out_unlock_tgtdir:
516         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
517 out:
518         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
519         return rc;
520 }
521
522 static int mdt_rename_lock(struct mdt_thread_info *info,
523                            struct lustre_handle *lh)
524 {
525         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; 
526         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
527         int flags = LDLM_FL_ATOMIC_CB;
528         struct ldlm_res_id res_id;
529         struct lu_site *ls;
530         int rc;
531         ENTRY;
532
533         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
534         fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
535         
536         if (ls->ls_control_exp == NULL) {
537                 /* 
538                  * Current node is controller, that is mdt0 where we should take
539                  * BFL lock.
540                  */
541                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy,
542                                             LCK_EX, &flags, ldlm_blocking_ast,
543                                             ldlm_completion_ast, NULL, NULL, 0,
544                                             NULL, lh);
545         } else {
546                 /*
547                  * This is the case mdt0 is remote node, issue DLM lock like
548                  * other clients.
549                  */
550                 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
551                                       LDLM_IBITS, &policy, LCK_EX, &flags,
552                                       ldlm_blocking_ast, ldlm_completion_ast,
553                                       NULL, NULL, NULL, 0, NULL, lh, 0);
554         }
555
556         RETURN(rc);
557 }
558
559 static void mdt_rename_unlock(struct lustre_handle *lh)
560 {
561         ENTRY;
562         ldlm_lock_decref(lh, LCK_EX);
563         EXIT;
564 }
565
566 /* 
567  * This is is_subdir() variant, it is CMD is cmm forwards it to correct
568  * target. Source should not be ancestor of target dir. May be other rename
569  * checks can be moved here later.
570  */
571 static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
572 {
573         struct mdt_reint_record *rr = &info->mti_rr;
574         struct lu_fid dst_fid = *rr->rr_fid2;
575         struct mdt_object *dst;
576         int rc = 0;
577         ENTRY;
578
579         do {
580                 dst = mdt_object_find(info->mti_ctxt, info->mti_mdt, &dst_fid);
581                 if (!IS_ERR(dst)) {
582                         rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(dst),
583                                            fid, &dst_fid);
584                         mdt_object_put(info->mti_ctxt, dst);
585                         if (rc < 0) {
586                                 CERROR("Error while doing mdo_is_subdir(), rc %d\n",
587                                        rc);
588                         } else if (rc == 1) {
589                                 rc = -EINVAL;
590                         }
591                 } else {
592                         rc = PTR_ERR(dst);
593                 }
594         } while (rc == EREMOTE);
595         
596         RETURN(rc);
597 }
598
599 static int mdt_reint_rename(struct mdt_thread_info *info,
600                             struct mdt_lock_handle *lhc)
601 {
602         struct mdt_reint_record *rr = &info->mti_rr;
603         struct req_capsule      *pill = &info->mti_pill;
604         struct ptlrpc_request   *req = mdt_info_req(info);
605         struct md_attr          *ma = &info->mti_attr;
606         struct mdt_object       *msrcdir;
607         struct mdt_object       *mtgtdir;
608         struct mdt_object       *mold;
609         struct mdt_object       *mnew = NULL;
610         struct mdt_lock_handle  *lh_srcdirp;
611         struct mdt_lock_handle  *lh_tgtdirp;
612         struct mdt_lock_handle  *lh_oldp;
613         struct mdt_lock_handle  *lh_newp;
614         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
615         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
616         struct lustre_handle     rename_lh = { 0 };
617         int                      rc;
618
619         ENTRY;
620
621         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
622                   PFID(rr->rr_fid1), rr->rr_name,
623                   PFID(rr->rr_fid2), rr->rr_tgt);
624
625         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
626         if (rc == 1) {
627         /* if (rr->rr_name[0] == 0) {*/
628                 rc = mdt_reint_rename_tgt(info);
629                 RETURN(rc);
630         }
631
632         rc = mdt_rename_lock(info, &rename_lh);
633         if (rc) {
634                 CERROR("can't lock FS for rename, rc %d\n", rc);
635                 RETURN(rc);
636         }
637
638         lh_newp = &info->mti_lh[MDT_LH_NEW];
639
640         /* step 1: lock the source dir */
641         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
642         lh_srcdirp->mlh_mode = LCK_EX;
643         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
644                                        MDS_INODELOCK_UPDATE);
645         if (IS_ERR(msrcdir))
646                 GOTO(out, rc = PTR_ERR(msrcdir));
647
648         /*step 2: find & lock the target dir*/
649         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
650         lh_tgtdirp->mlh_mode = LCK_EX;
651         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
652                 mdt_object_get(info->mti_ctxt, msrcdir);
653                 mtgtdir = msrcdir;
654         } else {
655                 mtgtdir = mdt_object_find(info->mti_ctxt,
656                                           info->mti_mdt, rr->rr_fid2);
657                 if (IS_ERR(mtgtdir))
658                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
659                 
660                 rc = mdt_object_cr_lock(info, mtgtdir, lh_tgtdirp,
661                                         MDS_INODELOCK_UPDATE);
662                 if (rc != 0) {
663                         mdt_object_put(info->mti_ctxt, mtgtdir);
664                         GOTO(out_unlock_source, rc);
665                 }
666
667         }
668
669         /*step 3: find & lock the old object*/
670         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
671                         rr->rr_name, old_fid);
672         if (rc != 0)
673                 GOTO(out_unlock_target, rc);
674
675         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
676                 GOTO(out_unlock_target, rc = -EINVAL);
677
678         lh_oldp = &info->mti_lh[MDT_LH_OLD];
679         lh_oldp->mlh_mode = LCK_EX;
680         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
681                                     MDS_INODELOCK_LOOKUP);
682         if (IS_ERR(mold))
683                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
684
685         /*step 4: find & lock the new object*/
686         /* new target object may not exist now */
687         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
688                         rr->rr_tgt, new_fid);
689         if (rc == 0) {
690                 /* the new_fid should have been filled at this moment*/
691                 if (lu_fid_eq(old_fid, new_fid))
692                        GOTO(out_unlock_old, rc);
693
694                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
695                     lu_fid_eq(new_fid, rr->rr_fid2))
696                         GOTO(out_unlock_old, rc = -EINVAL);
697
698                 lh_newp->mlh_mode = LCK_EX;
699                 mnew = mdt_object_find(info->mti_ctxt, info->mti_mdt, new_fid);
700                 if (IS_ERR(mnew))
701                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
702
703                 rc = mdt_object_cr_lock(info, mnew, lh_newp,
704                                         MDS_INODELOCK_FULL);
705                 if (rc != 0) {
706                         mdt_object_put(info->mti_ctxt, mnew);
707                         GOTO(out_unlock_old, rc);
708                 }
709         } else if (rc != -EREMOTE && rc != -ENOENT)
710                 GOTO(out_unlock_old, rc);
711
712         /* step 5: dome some checking ...*/
713         /* step 6: rename it */
714         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
715         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
716                                                &RMF_MDT_MD, RCL_SERVER);
717
718         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
719                                                 &RMF_LOGCOOKIES);
720         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
721                                                &RMF_LOGCOOKIES, RCL_SERVER);
722
723         if (!ma->ma_lmm || !ma->ma_cookie)
724                 GOTO(out_unlock_new, rc = -EINVAL);
725
726         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
727
728         mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
729                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
730
731         /* Check if @dst is subdir of @src. */
732         rc = mdt_rename_check(info, old_fid);
733         if (rc)
734                 GOTO(out_unlock_new, rc);
735
736         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
737                         mdt_object_child(mtgtdir), old_fid, rr->rr_name,
738                         (mnew ? mdt_object_child(mnew) : NULL), rr->rr_tgt, ma);
739         
740         /* handle last link of tgt object */
741         if (rc == 0 && mnew)
742                 mdt_handle_last_unlink(info, mnew, ma);
743
744 out_unlock_new:
745         if (mnew) {
746                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
747         }
748 out_unlock_old:
749         mdt_object_unlock_put(info, mold, lh_oldp, rc);
750 out_unlock_target:
751         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
752 out_unlock_source:
753         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
754 out:
755         mdt_rename_unlock(&rename_lh);
756         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
757         return rc;
758 }
759
760 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
761                            struct mdt_lock_handle *lhc);
762
763 static mdt_reinter reinters[REINT_MAX] = {
764         [REINT_SETATTR]  = mdt_reint_setattr,
765         [REINT_CREATE] = mdt_reint_create,
766         [REINT_LINK] = mdt_reint_link,
767         [REINT_UNLINK] = mdt_reint_unlink,
768         [REINT_RENAME] = mdt_reint_rename,
769         [REINT_OPEN] = mdt_reint_open
770 };
771
772 int mdt_reint_rec(struct mdt_thread_info *info,
773                   struct mdt_lock_handle *lhc)
774 {
775         int rc;
776         ENTRY;
777
778         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
779
780         RETURN(rc);
781 }