Whamcloud - gitweb
some code cleanup in split.
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39 static int mdt_md_create(struct mdt_thread_info *info)
40 {
41         struct mdt_device       *mdt = info->mti_mdt;
42         struct mdt_object       *parent;
43         struct mdt_object       *child;
44         struct mdt_lock_handle  *lh;
45         struct mdt_body         *repbody;
46         struct md_attr          *ma = &info->mti_attr;
47         struct mdt_reint_record *rr = &info->mti_rr;
48         int rc;
49         ENTRY;
50
51         DEBUG_REQ(D_INODE, mdt_info_req(info), "create  (%s->"DFID") in "DFID,
52                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
53
54         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
55
56         lh = &info->mti_lh[MDT_LH_PARENT];
57         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
58
59         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
60                                       MDS_INODELOCK_UPDATE);
61         if (IS_ERR(parent))
62                 RETURN(PTR_ERR(parent));
63
64         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
65         if (!IS_ERR(child)) {
66                 struct md_object *next = mdt_object_child(parent);
67
68                 ma->ma_need = MA_INODE;
69                 ma->ma_valid = 0;
70                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
71                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
72
73                 rc = mdo_create(info->mti_env, next, rr->rr_name,
74                                 mdt_object_child(child),
75                                 &info->mti_spec, ma);
76                 if (rc == 0) {
77                         /* return fid & attr to client. */
78                         if (ma->ma_valid & MA_INODE)
79                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
80                                                    mdt_object_fid(child));
81                 }
82                 mdt_object_put(info->mti_env, child);
83         } else
84                 rc = PTR_ERR(child);
85         mdt_object_unlock_put(info, parent, lh, rc);
86         RETURN(rc);
87 }
88
89 /* partial request to create object only */
90 static int mdt_md_mkobj(struct mdt_thread_info *info)
91 {
92         struct mdt_device      *mdt = info->mti_mdt;
93         struct mdt_object      *o;
94         struct mdt_body        *repbody;
95         struct md_attr         *ma = &info->mti_attr;
96         int rc;
97         ENTRY;
98
99         DEBUG_REQ(D_INODE, mdt_info_req(info), "partial create "DFID"\n", 
100                            PFID(info->mti_rr.rr_fid2));
101
102         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
103
104         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
105         if (!IS_ERR(o)) {
106                 struct md_object *next = mdt_object_child(o);
107
108                 ma->ma_need = MA_INODE;
109                 ma->ma_valid = 0;
110                 /* Cross-ref create can encounter already created obj in case
111                  * of recovery, just get attr in that case */
112                 if (mdt_object_exists(o) == 1) {
113                         rc = mo_attr_get(info->mti_env, next, ma);
114                 } else {
115                         rc = mo_object_create(info->mti_env, next, 
116                                               &info->mti_spec, ma);
117                 }
118                 if (rc == 0) {
119                         /* return fid & attr to client. */
120                         if (ma->ma_valid & MA_INODE)
121                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
122                                                    mdt_object_fid(o));
123                 }
124                 mdt_object_put(info->mti_env, o);
125         } else
126                 rc = PTR_ERR(o);
127
128         RETURN(rc);
129 }
130
131 /* In the raw-setattr case, we lock the child inode.
132  * In the write-back case or if being called from open,
133  *               the client holds a lock already.
134  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
135  * mdt_setattr_unpack()) flag to tell these cases apart. */
136 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
137 {
138         struct md_attr          *ma = &info->mti_attr;
139         struct mdt_lock_handle  *lh;
140         int som_update = 0;
141         int rc;
142         ENTRY;
143
144         if (info->mti_epoch)
145                 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
146
147         /* Try to avoid object_lock if another epoch has been started
148          * already. */
149         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
150                 RETURN(0);
151
152         lh = &info->mti_lh[MDT_LH_PARENT];
153         mdt_lock_pdo_init(lh, LCK_PW, NULL, 0);
154
155         if (!(flags & MRF_SETATTR_LOCKED)) {
156                 __u64 lockpart = MDS_INODELOCK_UPDATE;
157                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
158                         lockpart |= MDS_INODELOCK_LOOKUP;
159
160                 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
161                 if (rc != 0)
162                         GOTO(out, rc);
163         }
164
165         /* Setattrs are syncronized through dlm lock taken above. If another
166          * epoch started, its attributes may be already flushed on disk,
167          * skip setattr. */
168         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
169                 GOTO(out, rc = 0);
170
171         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
172                 GOTO(out, rc = -ENOENT);
173
174         /* all attrs are packed into mti_attr in unpack_setattr */
175         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
176                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
177
178         /* all attrs are packed into mti_attr in unpack_setattr */
179         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
180         if (rc != 0)
181                 GOTO(out, rc);
182
183         /* Re-enable SIZEONMDS. */
184         if (som_update) {
185                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
186                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
187                        mo->mot_epochcount);
188
189                 mdt_sizeonmds_enable(info, mo);
190         }
191
192         EXIT;
193 out:
194         mdt_object_unlock(info, mo, lh, rc);
195         return(rc);
196 }
197
198 static int mdt_reint_setattr(struct mdt_thread_info *info,
199                              struct mdt_lock_handle *lhc)
200 {
201         struct mdt_device       *mdt = info->mti_mdt;
202         struct md_attr          *ma = &info->mti_attr;
203         struct mdt_reint_record *rr = &info->mti_rr;
204         struct ptlrpc_request   *req = mdt_info_req(info);
205         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
206         struct mdt_file_data    *mfd;
207         struct mdt_object       *mo;
208         struct md_object        *next;
209         struct mdt_body         *repbody;
210         int                      rc;
211
212         ENTRY;
213
214         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
215                   (unsigned int)ma->ma_attr.la_valid);
216
217         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
218         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
219         if (IS_ERR(mo))
220                 RETURN(rc = PTR_ERR(mo));
221
222         if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
223                 /* Truncate case. */
224                 rc = mdt_write_get(info->mti_mdt, mo);
225                 if (rc)
226                         GOTO(out, rc);
227
228                 mfd = mdt_mfd_new();
229                 if (mfd == NULL)
230                         GOTO(out, rc = -ENOMEM);
231
232                 mdt_epoch_open(info, mo);
233                 repbody->ioepoch = mo->mot_ioepoch;
234
235                 mdt_object_get(info->mti_env, mo);
236                 mfd->mfd_mode = FMODE_EPOCHLCK;
237                 mfd->mfd_object = mo;
238                 mfd->mfd_xid = req->rq_xid;
239
240                 spin_lock(&med->med_open_lock);
241                 list_add(&mfd->mfd_list, &med->med_open_head);
242                 spin_unlock(&med->med_open_lock);
243                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
244         }
245
246         rc = mdt_attr_set(info, mo, rr->rr_flags);
247         if (rc)
248                 GOTO(out, rc);
249
250         if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
251                 LASSERT(info->mti_epoch);
252
253                 /* Size-on-MDS Update. Find and free mfd. */
254                 spin_lock(&med->med_open_lock);
255                 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
256                 if (mfd == NULL) {
257                         spin_unlock(&med->med_open_lock);
258                         CDEBUG(D_INODE, "no handle for file close: "
259                                "fid = "DFID": cookie = "LPX64"\n",
260                                PFID(info->mti_rr.rr_fid1),
261                                info->mti_epoch->handle.cookie);
262                         GOTO(out, rc = -ESTALE);
263                 }
264
265                 LASSERT(mfd->mfd_mode == FMODE_SOM);
266                 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
267                 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
268
269                 class_handle_unhash(&mfd->mfd_handle);
270                 list_del_init(&mfd->mfd_list);
271                 spin_unlock(&med->med_open_lock);
272                 mdt_mfd_close(info, mfd);
273         }
274
275         ma->ma_need = MA_INODE;
276         ma->ma_valid = 0;
277         next = mdt_object_child(mo);
278         rc = mo_attr_get(info->mti_env, next, ma);
279         if (rc != 0)
280                 GOTO(out, rc);
281
282         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
283
284         if (mdt->mdt_opts.mo_oss_capa &&
285             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu))) {
286                 /* FIXME: only sent truncate capability back in size change
287                  * case */
288                 struct lustre_capa *capa;
289
290                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
291                 LASSERT(capa);
292                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
293                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
294                 if (rc)
295                         RETURN(rc);
296                 repbody->valid |= OBD_MD_FLOSSCAPA;
297         }
298
299         EXIT;
300 out:
301         mdt_object_put(info->mti_env, mo);
302         return rc;
303 }
304
305 static int mdt_reint_create(struct mdt_thread_info *info,
306                             struct mdt_lock_handle *lhc)
307 {
308         int rc;
309         ENTRY;
310
311         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
312                 RETURN(err_serious(-ESTALE));
313
314         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
315         case S_IFDIR:{
316                 if (info->mti_rr.rr_name[0] == 0) {
317                         rc = mdt_md_mkobj(info);
318                         break;
319                 }
320         }
321         case S_IFREG:
322         case S_IFLNK:
323         case S_IFCHR:
324         case S_IFBLK:
325         case S_IFIFO:
326         case S_IFSOCK:{
327                 /* Special file should stay on the same node as parent. */
328                 LASSERT(info->mti_rr.rr_namelen > 0);
329                 rc = mdt_md_create(info);
330                 break;
331         }
332         default:
333                 rc = err_serious(-EOPNOTSUPP);
334         }
335         RETURN(rc);
336 }
337
338 static int mdt_reint_unlink(struct mdt_thread_info *info,
339                             struct mdt_lock_handle *lhc)
340 {
341         struct mdt_reint_record *rr = &info->mti_rr;
342         struct ptlrpc_request   *req = mdt_info_req(info);
343         struct md_attr          *ma = &info->mti_attr;
344         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
345         struct mdt_object       *mp;
346         struct mdt_object       *mc;
347         struct mdt_lock_handle  *parent_lh;
348         struct mdt_lock_handle  *child_lh;
349         int                      rc;
350         ENTRY;
351
352         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
353                   rr->rr_name);
354
355         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
356                 GOTO(out, rc = err_serious(-ENOENT));
357
358         /* step 1: lock the parent */
359         parent_lh = &info->mti_lh[MDT_LH_PARENT];
360         mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
361                           rr->rr_namelen);
362         
363         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
364                                   MDS_INODELOCK_UPDATE);
365         if (IS_ERR(mp))
366                 GOTO(out, rc = PTR_ERR(mp));
367
368         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
369         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
370                                                &RMF_MDT_MD, RCL_SERVER);
371
372         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
373                                                &RMF_LOGCOOKIES);
374         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
375                                                   &RMF_LOGCOOKIES,
376                                                   RCL_SERVER);
377         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
378         ma->ma_valid = 0;
379         if (!ma->ma_lmm || !ma->ma_cookie)
380                 GOTO(out_unlock_parent, rc = -EINVAL);
381
382         if (rr->rr_name[0] == 0) {
383                 /* remote partial operation
384                  * It is possible that replay can happen on parent MDS
385                  * and this operation will be repeated. 
386                  * Therefore the object absense is allowed case
387                  * and nothing should be done
388                  */
389                 if (mdt_object_exists(mp) > 0) {
390                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
391                         rc = mo_ref_del(info->mti_env,
392                                         mdt_object_child(mp), ma);
393                         mdt_handle_last_unlink(info, mp, ma);
394                 } else
395                         rc = 0;
396                 GOTO(out_unlock_parent, rc);
397         }
398
399         /* step 2: find & lock the child */
400         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
401                         rr->rr_name, child_fid);
402         if (rc != 0)
403                  GOTO(out_unlock_parent, rc);
404
405         /* we will lock the child regardless it is local or remote. No harm. */
406         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
407         if (IS_ERR(mc))
408                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
409         child_lh = &info->mti_lh[MDT_LH_CHILD];
410         mdt_lock_reg_init(child_lh, LCK_EX);
411         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
412                              MDT_CROSS_LOCK);
413         if (rc != 0)
414                 GOTO(out_put_child, rc);
415
416         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
417                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
418
419         /*
420          * Now we can only make sure we need MA_INODE, in mdd layer, will check
421          * whether need MA_LOV and MA_COOKIE.
422          */
423         ma->ma_need = MA_INODE;
424         ma->ma_valid = 0;
425         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
426         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
427                         mdt_object_child(mc), rr->rr_name, ma);
428         if (rc)
429                 GOTO(out_unlock_child, rc);
430
431         mdt_handle_last_unlink(info, mc, ma);
432
433         EXIT;
434 out_unlock_child:
435         mdt_object_unlock(info, mc, child_lh, rc);
436 out_put_child:
437         mdt_object_put(info->mti_env, mc);
438 out_unlock_parent:
439         mdt_object_unlock_put(info, mp, parent_lh, rc);
440 out:
441         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
442         return rc;
443 }
444
445 static int mdt_reint_link(struct mdt_thread_info *info,
446                           struct mdt_lock_handle *lhc)
447 {
448         struct mdt_reint_record *rr = &info->mti_rr;
449         struct ptlrpc_request   *req = mdt_info_req(info);
450         struct md_attr          *ma = &info->mti_attr;
451         struct mdt_object       *ms;
452         struct mdt_object       *mp;
453         struct mdt_lock_handle  *lhs;
454         struct mdt_lock_handle  *lhp;
455         int rc;
456         ENTRY;
457
458         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
459                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
460
461         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
462                 RETURN(err_serious(-ENOENT));
463
464         if (rr->rr_name[0] == 0) {
465                 /* MDT holding name ask us to add ref. */
466                 lhs = &info->mti_lh[MDT_LH_CHILD];
467                 mdt_lock_reg_init(lhs, LCK_EX);
468                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
469                                           MDS_INODELOCK_UPDATE);
470                 if (IS_ERR(ms))
471                         RETURN(PTR_ERR(ms));
472
473                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
474                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
475                 mdt_object_unlock_put(info, ms, lhs, rc);
476                 RETURN(rc);
477         }
478
479         /* step 1: find & lock the target parent dir */
480         lhp = &info->mti_lh[MDT_LH_PARENT];
481         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
482                           rr->rr_namelen);
483         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
484                                   MDS_INODELOCK_UPDATE);
485         if (IS_ERR(mp))
486                 RETURN(PTR_ERR(mp));
487
488         /* step 2: find & lock the source */
489         lhs = &info->mti_lh[MDT_LH_CHILD];
490         mdt_lock_reg_init(lhs, LCK_EX);
491         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
492         if (IS_ERR(ms))
493                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
494
495         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
496                              MDT_CROSS_LOCK);
497         if (rc != 0)
498                 GOTO(out_unlock_source, rc);
499
500         /* step 3: link it */
501         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
502                        OBD_FAIL_MDS_REINT_LINK_WRITE);
503
504         rc = mdo_link(info->mti_env, mdt_object_child(mp),
505                       mdt_object_child(ms), rr->rr_name, ma);
506
507         EXIT;
508 out_unlock_source:
509         mdt_object_unlock_put(info, ms, lhs, rc);
510 out_unlock_parent:
511         mdt_object_unlock_put(info, mp, lhp, rc);
512         return rc;
513 }
514
515 /* partial operation for rename */
516 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
517 {
518         struct mdt_reint_record *rr = &info->mti_rr;
519         struct ptlrpc_request   *req = mdt_info_req(info);
520         struct md_attr          *ma = &info->mti_attr;
521         struct mdt_object       *mtgtdir;
522         struct mdt_object       *mtgt = NULL;
523         struct mdt_lock_handle  *lh_tgtdir;
524         struct mdt_lock_handle  *lh_tgt;
525         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
526         int                      rc;
527         ENTRY;
528
529         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
530                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
531
532         /* step 1: lookup & lock the tgt dir */
533         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
534         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
535                           rr->rr_tgtlen);
536         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
537                                        MDS_INODELOCK_UPDATE);
538         if (IS_ERR(mtgtdir))
539                 GOTO(out, rc = PTR_ERR(mtgtdir));
540
541         /*step 2: find & lock the target object if exists*/
542         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
543         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
544                         rr->rr_tgt, tgt_fid);
545         if (rc != 0 && rc != -ENOENT) {
546                 GOTO(out_unlock_tgtdir, rc);
547         } else if (rc == 0) {
548                 /*
549                  * In case of replay that name can be already inserted, check
550                  * that and do nothing if so.
551                  */
552                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
553                         GOTO(out_unlock_tgtdir, rc);
554
555                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
556                 mdt_lock_reg_init(lh_tgt, LCK_EX);
557
558                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
559                                             MDS_INODELOCK_LOOKUP);
560                 if (IS_ERR(mtgt))
561                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
562
563                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
564                                     mdt_object_child(mtgt), rr->rr_fid2,
565                                     rr->rr_tgt, ma);
566         } else /* -ENOENT */ {
567                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
568                                      rr->rr_tgt, rr->rr_fid2,
569                                      S_ISDIR(ma->ma_attr.la_mode));
570         }
571
572         /* handle last link of tgt object */
573         if (rc == 0 && mtgt)
574                 mdt_handle_last_unlink(info, mtgt, ma);
575
576         if (mtgt != NULL)
577                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
578         EXIT;
579 out_unlock_tgtdir:
580         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
581 out:
582         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
583         return rc;
584 }
585
586 static int mdt_rename_lock(struct mdt_thread_info *info,
587                            struct lustre_handle *lh)
588 {
589         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
590         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
591         struct ldlm_res_id res_id;
592         struct lu_site *ls;
593         int rc;
594         ENTRY;
595
596         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
597         fid_build_reg_res_name(&LUSTRE_BFL_FID, &res_id);
598
599         if (ls->ls_control_exp == NULL) {
600                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
601                 
602                 /*
603                  * Current node is controller, that is mdt0, where we should
604                  * take BFL lock.
605                  */
606                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy,
607                                             LCK_EX, &flags, ldlm_blocking_ast,
608                                             ldlm_completion_ast, NULL, NULL, 0,
609                                             NULL, lh);
610         } else {
611                 int flags = 0;
612                 
613                 /*
614                  * This is the case mdt0 is remote node, issue DLM lock like
615                  * other clients.
616                  */
617                 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
618                                       LDLM_IBITS, &policy, LCK_EX, &flags,
619                                       ldlm_blocking_ast, ldlm_completion_ast,
620                                       NULL, NULL, NULL, 0, NULL, lh, 0);
621         }
622
623         RETURN(rc);
624 }
625
626 static void mdt_rename_unlock(struct lustre_handle *lh)
627 {
628         ENTRY;
629         ldlm_lock_decref(lh, LCK_EX);
630         EXIT;
631 }
632
633 /*
634  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
635  * target. Source should not be ancestor of target dir. May be other rename
636  * checks can be moved here later.
637  */
638 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
639 {
640         struct mdt_reint_record *rr = &info->mti_rr;
641         struct lu_fid dst_fid = *rr->rr_fid2;
642         struct mdt_object *dst;
643         int rc = 0;
644         ENTRY;
645
646         do {
647                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
648                 if (!IS_ERR(dst)) {
649                         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
650                                            fid, &dst_fid);
651                         mdt_object_put(info->mti_env, dst);
652                         if (rc < 0 && rc != -EREMOTE) {
653                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
654                         } else if (rc == 1) {
655                                 rc = -EINVAL;
656                         }
657                 } else {
658                         rc = PTR_ERR(dst);
659                 }
660         } while (rc == -EREMOTE);
661
662         RETURN(rc);
663 }
664
665 static int mdt_reint_rename(struct mdt_thread_info *info,
666                             struct mdt_lock_handle *lhc)
667 {
668         struct mdt_reint_record *rr = &info->mti_rr;
669         struct req_capsule      *pill = &info->mti_pill;
670         struct md_attr          *ma = &info->mti_attr;
671         struct ptlrpc_request   *req = mdt_info_req(info);
672         struct mdt_object       *msrcdir;
673         struct mdt_object       *mtgtdir;
674         struct mdt_object       *mold;
675         struct mdt_object       *mnew = NULL;
676         struct mdt_lock_handle  *lh_srcdirp;
677         struct mdt_lock_handle  *lh_tgtdirp;
678         struct mdt_lock_handle  *lh_oldp;
679         struct mdt_lock_handle  *lh_newp;
680         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
681         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
682         struct lustre_handle     rename_lh = { 0 };
683         int                      rc;
684         ENTRY;
685
686         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
687         if (rc == 1) {
688         /* if (rr->rr_name[0] == 0) {*/
689                 rc = mdt_reint_rename_tgt(info);
690                 RETURN(rc);
691         }
692
693         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
694                   PFID(rr->rr_fid1), rr->rr_name,
695                   PFID(rr->rr_fid2), rr->rr_tgt);
696
697         rc = mdt_rename_lock(info, &rename_lh);
698         if (rc) {
699                 CERROR("can't lock FS for rename, rc %d\n", rc);
700                 GOTO(out, rc);
701         }
702
703         lh_newp = &info->mti_lh[MDT_LH_NEW];
704
705         /* step 1: lock the source dir. */
706         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
707         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
708                           rr->rr_namelen);
709         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
710                                        MDS_INODELOCK_UPDATE);
711         if (IS_ERR(msrcdir))
712                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
713
714         /* step 2: find & lock the target dir. */
715         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
716         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
717                           rr->rr_tgtlen);
718         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
719                 mdt_object_get(info->mti_env, msrcdir);
720                 mtgtdir = msrcdir;
721         } else {
722                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
723                                           rr->rr_fid2);
724                 if (IS_ERR(mtgtdir))
725                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
726
727                 rc = mdt_object_exists(mtgtdir);
728                 if (rc == 0)
729                         GOTO(out_unlock_target, rc = -ESTALE);
730                 else if (rc > 0) {
731                         /* we lock the target dir if it is local */
732                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
733                                              MDS_INODELOCK_UPDATE,
734                                              MDT_LOCAL_LOCK);
735                         if (rc != 0)
736                                 GOTO(out_unlock_target, rc);
737                 }
738         }
739
740         /* step 3: find & lock the old object. */
741         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
742                         rr->rr_name, old_fid);
743         if (rc != 0)
744                 GOTO(out_unlock_target, rc);
745
746         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
747                 GOTO(out_unlock_target, rc = -EINVAL);
748
749         lh_oldp = &info->mti_lh[MDT_LH_OLD];
750         mdt_lock_reg_init(lh_oldp, LCK_EX);
751         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
752                                     MDS_INODELOCK_LOOKUP);
753         if (IS_ERR(mold))
754                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
755
756         /* step 4: find & lock the new object. */
757         /* new target object may not exist now */
758         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
759                         rr->rr_tgt, new_fid);
760         if (rc == 0) {
761                 /* the new_fid should have been filled at this moment */
762                 if (lu_fid_eq(old_fid, new_fid))
763                        GOTO(out_unlock_old, rc);
764
765                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
766                     lu_fid_eq(new_fid, rr->rr_fid2))
767                         GOTO(out_unlock_old, rc = -EINVAL);
768
769                 mdt_lock_reg_init(lh_newp, LCK_EX);
770                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
771                 if (IS_ERR(mnew))
772                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
773
774                 rc = mdt_object_lock(info, mnew, lh_newp,
775                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
776                 if (rc != 0) {
777                         mdt_object_put(info->mti_env, mnew);
778                         GOTO(out_unlock_old, rc);
779                 }
780         } else if (rc != -EREMOTE && rc != -ENOENT)
781                 GOTO(out_unlock_old, rc);
782
783         /* step 5: rename it */
784         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
785         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
786                                                &RMF_MDT_MD, RCL_SERVER);
787
788         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
789                                                 &RMF_LOGCOOKIES);
790         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
791                                                &RMF_LOGCOOKIES, RCL_SERVER);
792
793         if (!ma->ma_lmm || !ma->ma_cookie)
794                 GOTO(out_unlock_new, rc = -EINVAL);
795
796         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
797         ma->ma_valid = 0;
798         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
799                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
800
801         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
802         mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
803         
804         /* Check if @dst is subdir of @src. */
805         rc = mdt_rename_sanity(info, old_fid);
806         if (rc)
807                 GOTO(out_unlock_new, rc);
808
809         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
810                         mdt_object_child(mtgtdir), old_fid, rr->rr_name,
811                         (mnew ? mdt_object_child(mnew) : NULL),
812                         rr->rr_tgt, ma);
813
814         /* handle last link of tgt object */
815         if (rc == 0 && mnew)
816                 mdt_handle_last_unlink(info, mnew, ma);
817
818 out_unlock_new:
819         if (mnew)
820                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
821 out_unlock_old:
822         mdt_object_unlock_put(info, mold, lh_oldp, rc);
823 out_unlock_target:
824         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
825 out_unlock_source:
826         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
827 out_rename_lock:
828         mdt_rename_unlock(&rename_lh);
829 out:
830         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
831         return rc;
832 }
833
834 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
835                            struct mdt_lock_handle *lhc);
836
837 static mdt_reinter reinters[REINT_MAX] = {
838         [REINT_SETATTR]  = mdt_reint_setattr,
839         [REINT_CREATE] = mdt_reint_create,
840         [REINT_LINK] = mdt_reint_link,
841         [REINT_UNLINK] = mdt_reint_unlink,
842         [REINT_RENAME] = mdt_reint_rename,
843         [REINT_OPEN] = mdt_reint_open
844 };
845
846 int mdt_reint_rec(struct mdt_thread_info *info,
847                   struct mdt_lock_handle *lhc)
848 {
849         int rc;
850         ENTRY;
851
852         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
853
854         RETURN(rc);
855 }