Whamcloud - gitweb
Branch b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39 static int mdt_md_create(struct mdt_thread_info *info)
40 {
41         struct mdt_device      *mdt = info->mti_mdt;
42         struct mdt_object      *parent;
43         struct mdt_object      *child;
44         struct mdt_lock_handle *lh;
45         struct mdt_body        *repbody;
46         struct md_attr         *ma = &info->mti_attr;
47         struct mdt_reint_record *rr = &info->mti_rr;
48         int rc;
49         ENTRY;
50
51         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
52
53         lh = &info->mti_lh[MDT_LH_PARENT];
54         lh->mlh_mode = LCK_EX;
55
56         parent = mdt_object_find_lock(info, rr->rr_fid1,
57                                       lh, MDS_INODELOCK_UPDATE,
58                                       rr->rr_capa1);
59         if (IS_ERR(parent))
60                 RETURN(PTR_ERR(parent));
61
62         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA);
63         if (!IS_ERR(child)) {
64                 struct md_object *next = mdt_object_child(parent);
65
66                 ma->ma_need = MA_INODE;
67                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
68                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
69
70                 rc = mdo_create(info->mti_env, next, rr->rr_name,
71                                 mdt_object_child(child),
72                                 &info->mti_spec, ma);
73                 if (rc == 0) {
74                         /* return fid & attr to client. */
75                         if (ma->ma_valid & MA_INODE)
76                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
77                                                    mdt_object_fid(child));
78                                 mdt_body_reverse_idmap(info, repbody);
79                 }
80                 mdt_object_put(info->mti_env, child);
81         } else
82                 rc = PTR_ERR(child);
83         mdt_object_unlock_put(info, parent, lh, rc);
84         RETURN(rc);
85 }
86
87 /* partial request to create object only */
88 static int mdt_md_mkobj(struct mdt_thread_info *info)
89 {
90         struct mdt_device      *mdt = info->mti_mdt;
91         struct mdt_object      *o;
92         struct mdt_body        *repbody;
93         struct md_attr         *ma = &info->mti_attr;
94         int rc;
95         ENTRY;
96
97         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
98
99         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
100                             BYPASS_CAPA);
101         if (!IS_ERR(o)) {
102                 struct md_object *next = mdt_object_child(o);
103
104                 ma->ma_need = MA_INODE;
105                 /* Cross-ref create can encounter already created obj in case
106                  * of recovery, just get attr in that case */
107                 if (lu_object_exists(&o->mot_obj.mo_lu) == 1) {
108                         rc = mo_attr_get(info->mti_env, next, ma);
109                 } else {
110                         rc = mo_object_create(info->mti_env, next, &info->mti_spec,
111                                       ma);
112                 }
113                 if (rc == 0) {
114                         /* return fid & attr to client. */
115                         if (ma->ma_valid & MA_INODE)
116                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
117                                                    mdt_object_fid(o));
118                                 mdt_body_reverse_idmap(info, repbody);
119                 }
120                 mdt_object_put(info->mti_env, o);
121         } else
122                 rc = PTR_ERR(o);
123
124         RETURN(rc);
125 }
126
127 /* In the raw-setattr case, we lock the child inode.
128  * In the write-back case or if being called from open,
129  *               the client holds a lock already.
130  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
131  * mdt_setattr_unpack()) flag to tell these cases apart. */
132 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
133 {
134         struct md_attr          *ma = &info->mti_attr;
135         struct mdt_lock_handle  *lh;
136         int som_update = 0;
137         int rc;
138         ENTRY;
139
140         if (info->mti_epoch)
141                 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
142
143         /* Try to avoid object_lock if another epoch has been started
144          * already. */
145         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
146                 RETURN(0);
147
148         lh = &info->mti_lh[MDT_LH_PARENT];
149         lh->mlh_mode = LCK_EX;
150
151         if (!(flags & MRF_SETATTR_LOCKED)) {
152                 __u64 lockpart = MDS_INODELOCK_UPDATE;
153                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
154                         lockpart |= MDS_INODELOCK_LOOKUP;
155
156                 rc = mdt_object_lock(info, mo, lh, lockpart);
157                 if (rc != 0)
158                         GOTO(out, rc);
159         }
160
161         /* Setattrs are syncronized through dlm lock taken above. If another
162          * epoch started, its attributes may be already flushed on disk,
163          * skip setattr. */
164         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
165                 GOTO(out, rc = 0);
166
167         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
168                 GOTO(out, rc = -ENOENT);
169
170         /* all attrs are packed into mti_attr in unpack_setattr */
171         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
172                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
173
174         /* all attrs are packed into mti_attr in unpack_setattr */
175         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
176         if (rc != 0)
177                 GOTO(out, rc);
178
179         /* Re-enable SIZEONMDS. */
180         if (som_update) {
181                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
182                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
183                        mo->mot_epochcount);
184
185                 mdt_sizeonmds_enable(info, mo);
186         }
187
188         EXIT;
189 out:
190         mdt_object_unlock(info, mo, lh, rc);
191         return(rc);
192 }
193
194 static int mdt_reint_setattr(struct mdt_thread_info *info,
195                              struct mdt_lock_handle *lhc)
196 {
197         struct mdt_device       *mdt = info->mti_mdt;
198         struct md_attr          *ma = &info->mti_attr;
199         struct mdt_reint_record *rr = &info->mti_rr;
200         struct ptlrpc_request   *req = mdt_info_req(info);
201         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
202         struct mdt_file_data    *mfd;
203         struct mdt_object       *mo;
204         struct md_object        *next;
205         struct mdt_body         *repbody;
206         int                      rc;
207
208         ENTRY;
209
210         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
211                   (unsigned int)ma->ma_attr.la_valid);
212
213         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
214         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
215                              rr->rr_capa1);
216         if (IS_ERR(mo))
217                 RETURN(rc = PTR_ERR(mo));
218
219         if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
220                 /* Truncate case. */
221                 rc = mdt_write_get(info->mti_mdt, mo);
222                 if (rc)
223                         GOTO(out, rc);
224
225                 mfd = mdt_mfd_new();
226                 if (mfd == NULL)
227                         GOTO(out, rc = -ENOMEM);
228
229                 mdt_epoch_open(info, mo);
230                 repbody->ioepoch = mo->mot_ioepoch;
231
232                 mdt_object_get(info->mti_env, mo);
233                 mfd->mfd_mode = FMODE_EPOCHLCK;
234                 mfd->mfd_object = mo;
235                 mfd->mfd_xid = req->rq_xid;
236
237                 spin_lock(&med->med_open_lock);
238                 list_add(&mfd->mfd_list, &med->med_open_head);
239                 spin_unlock(&med->med_open_lock);
240                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
241         }
242
243         rc = mdt_attr_set(info, mo, rr->rr_flags);
244         if (rc)
245                 GOTO(out, rc);
246
247         if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
248                 LASSERT(info->mti_epoch);
249
250                 /* Size-on-MDS Update. Find and free mfd. */
251                 spin_lock(&med->med_open_lock);
252                 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
253                 if (mfd == NULL) {
254                         spin_unlock(&med->med_open_lock);
255                         CDEBUG(D_INODE, "no handle for file close: "
256                                "fid = "DFID": cookie = "LPX64"\n",
257                                PFID(info->mti_rr.rr_fid1),
258                                info->mti_epoch->handle.cookie);
259                         GOTO(out, rc = -ESTALE);
260                 }
261
262                 LASSERT(mfd->mfd_mode == FMODE_SOM);
263                 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
264                 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
265
266                 class_handle_unhash(&mfd->mfd_handle);
267                 list_del_init(&mfd->mfd_list);
268                 spin_unlock(&med->med_open_lock);
269                 mdt_mfd_close(info, mfd);
270         }
271
272         ma->ma_need = MA_INODE;
273         next = mdt_object_child(mo);
274         rc = mo_attr_get(info->mti_env, next, ma);
275         if (rc != 0)
276                 GOTO(out, rc);
277
278         mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
279
280         if (mdt->mdt_opts.mo_oss_capa) {
281                 struct lustre_capa *capa;
282
283                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
284                 LASSERT(capa);
285                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
286                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa);
287                 if (rc)
288                         RETURN(rc);
289                 repbody->valid |= OBD_MD_FLOSSCAPA;
290         }
291
292         mdt_body_reverse_idmap(info, repbody);
293         EXIT;
294 out:
295         mdt_object_put(info->mti_env, mo);
296         return rc;
297 }
298
299 static int mdt_reint_create(struct mdt_thread_info *info,
300                             struct mdt_lock_handle *lhc)
301 {
302         int rc;
303         ENTRY;
304
305         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
306                 RETURN(-ESTALE);
307
308         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
309         case S_IFREG:
310         case S_IFDIR:{
311                 if (strlen(info->mti_rr.rr_name) == 0) {
312                         rc = mdt_md_mkobj(info);
313                         break;
314                 }
315         }
316         case S_IFLNK:
317         case S_IFCHR:
318         case S_IFBLK:
319         case S_IFIFO:
320         case S_IFSOCK:{
321                 /* special file should stay on the same node as parent */
322                 LASSERT(strlen(info->mti_rr.rr_name) > 0);
323                 rc = mdt_md_create(info);
324                 break;
325         }
326         default:
327                 rc = -EOPNOTSUPP;
328         }
329         RETURN(rc);
330 }
331
332
333 static int mdt_reint_unlink(struct mdt_thread_info *info,
334                             struct mdt_lock_handle *lhc)
335 {
336         struct mdt_reint_record *rr = &info->mti_rr;
337         struct ptlrpc_request   *req = mdt_info_req(info);
338         struct md_attr          *ma = &info->mti_attr;
339         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
340         struct mdt_object       *mp;
341         struct mdt_object       *mc;
342         struct mdt_lock_handle  *parent_lh;
343         struct mdt_lock_handle  *child_lh;
344         int                      rc;
345         ENTRY;
346
347         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
348                   rr->rr_name);
349
350         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
351                 GOTO(out, rc = -ENOENT);
352
353         /* step 1: lock the parent */
354         parent_lh = &info->mti_lh[MDT_LH_PARENT];
355         parent_lh->mlh_mode = LCK_EX;
356         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
357                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
358         if (IS_ERR(mp))
359                 GOTO(out, rc = PTR_ERR(mp));
360
361         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
362         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
363                                                &RMF_MDT_MD, RCL_SERVER);
364
365         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
366                                                &RMF_LOGCOOKIES);
367         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
368                                                   &RMF_LOGCOOKIES,
369                                                   RCL_SERVER);
370         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
371
372         if (!ma->ma_lmm || !ma->ma_cookie)
373                 GOTO(out_unlock_parent, rc = -EINVAL);
374
375         if (strlen(rr->rr_name) == 0) {
376                 /* remote partial operation */
377                 rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma);
378                 
379                 mdt_handle_last_unlink(info, mp, ma);
380                 GOTO(out_unlock_parent, rc);
381         }
382
383         /* step 2: find & lock the child */
384         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
385                         rr->rr_name, child_fid);
386         if (rc != 0)
387                  GOTO(out_unlock_parent, rc);
388
389         /* we will lock the child regardless it is local or remote. No harm. */
390         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
391                                   BYPASS_CAPA);
392         if (IS_ERR(mc))
393                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
394         child_lh = &info->mti_lh[MDT_LH_CHILD];
395         child_lh->mlh_mode = LCK_EX;
396         rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
397         if (rc != 0)
398                 GOTO(out_put_child, rc);
399
400         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
401                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
402
403         /*
404          * Now we can only make sure we need MA_INODE, in mdd layer, will check
405          * whether need MA_LOV and MA_COOKIE.
406          */
407         ma->ma_need = MA_INODE;
408         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
409                         mdt_object_child(mc), rr->rr_name, ma);
410         if (rc)
411                 GOTO(out_unlock_child, rc);
412
413         mdt_handle_last_unlink(info, mc, ma);
414
415         GOTO(out_unlock_child, rc);
416 out_unlock_child:
417         mdt_object_unlock(info, mc, child_lh, rc);
418 out_put_child:
419         mdt_object_put(info->mti_env, mc);
420 out_unlock_parent:
421         mdt_object_unlock_put(info, mp, parent_lh, rc);
422 out:
423         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
424         return rc;
425 }
426
427 static int mdt_reint_link(struct mdt_thread_info *info,
428                           struct mdt_lock_handle *lhc)
429 {
430         struct mdt_reint_record *rr = &info->mti_rr;
431         struct ptlrpc_request   *req = mdt_info_req(info);
432         struct md_attr          *ma = &info->mti_attr;
433         struct mdt_object       *ms;
434         struct mdt_object       *mp;
435         struct mdt_lock_handle  *lhs;
436         struct mdt_lock_handle  *lhp;
437         int rc;
438
439         ENTRY;
440
441         DEBUG_REQ(D_INODE, req, "link original "DFID" to "DFID" %s",
442                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
443
444         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
445                 RETURN(-ENOENT);
446
447         /* step 1: lock the source */
448         lhs = &info->mti_lh[MDT_LH_PARENT];
449         lhs->mlh_mode = LCK_EX;
450         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
451                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
452         if (IS_ERR(ms))
453                 RETURN(PTR_ERR(ms));
454
455         if (strlen(rr->rr_name) == 0) {
456                 /* remote partial operation */
457                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
458                 GOTO(out_unlock_source, rc);
459         }
460         /*step 2: find & lock the target parent dir*/
461         lhp = &info->mti_lh[MDT_LH_CHILD];
462         lhp->mlh_mode = LCK_EX;
463         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
464                                   MDS_INODELOCK_UPDATE, rr->rr_capa2);
465         if (IS_ERR(mp))
466                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
467
468         /* step 4: link it */
469
470         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
471                        OBD_FAIL_MDS_REINT_LINK_WRITE);
472
473         rc = mdo_link(info->mti_env, mdt_object_child(mp),
474                       mdt_object_child(ms), rr->rr_name, ma);
475         GOTO(out_unlock_target, rc);
476
477 out_unlock_target:
478         mdt_object_unlock_put(info, mp, lhp, rc);
479 out_unlock_source:
480         mdt_object_unlock_put(info, ms, lhs, rc);
481         return rc;
482 }
483
484 /* partial operation for rename */
485 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
486 {
487         struct mdt_reint_record *rr = &info->mti_rr;
488         struct ptlrpc_request   *req = mdt_info_req(info);
489         struct md_attr          *ma = &info->mti_attr;
490         struct mdt_object       *mtgtdir;
491         struct mdt_object       *mtgt = NULL;
492         struct mdt_lock_handle  *lh_tgtdir;
493         struct mdt_lock_handle  *lh_tgt;
494         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
495         int                      rc;
496
497         ENTRY;
498
499         DEBUG_REQ(D_INODE, req, "rename_tgt "DFID" to "DFID" %s",
500                   PFID(rr->rr_fid2),
501                   PFID(rr->rr_fid1), rr->rr_tgt);
502
503         /* step 1: lookup & lock the tgt dir */
504         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
505         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
506         lh_tgtdir->mlh_mode = LCK_EX;
507         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
508                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
509         if (IS_ERR(mtgtdir))
510                 GOTO(out, rc = PTR_ERR(mtgtdir));
511
512         /*step 2: find & lock the target object if exists*/
513         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
514                         rr->rr_tgt, tgt_fid);
515         if (rc != 0 && rc != -ENOENT) {
516                 GOTO(out_unlock_tgtdir, rc);
517         } else if (rc == 0) {
518                 lh_tgt->mlh_mode = LCK_EX;
519
520                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
521                                             MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
522                 if (IS_ERR(mtgt))
523                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
524
525                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
526                                     mdt_object_child(mtgt), rr->rr_fid2,
527                                     rr->rr_tgt, ma);
528         } else /* -ENOENT */ {
529                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
530                                      rr->rr_tgt, rr->rr_fid2,
531                                      S_ISDIR(ma->ma_attr.la_mode));
532         }
533
534         /* handle last link of tgt object */
535         if (rc == 0 && mtgt)
536                 mdt_handle_last_unlink(info, mtgt, ma);
537
538         EXIT;
539         if (mtgt) {
540                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
541         }
542 out_unlock_tgtdir:
543         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
544 out:
545         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
546         return rc;
547 }
548
549 static int mdt_rename_lock(struct mdt_thread_info *info,
550                            struct lustre_handle *lh)
551 {
552         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
553         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
554         int flags = LDLM_FL_ATOMIC_CB;
555         struct ldlm_res_id res_id;
556         struct lu_site *ls;
557         int rc;
558         ENTRY;
559
560         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
561         fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
562
563         if (ls->ls_control_exp == NULL) {
564                 /*
565                  * Current node is controller, that is mdt0 where we should take
566                  * BFL lock.
567                  */
568                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy,
569                                             LCK_EX, &flags, ldlm_blocking_ast,
570                                             ldlm_completion_ast, NULL, NULL, 0,
571                                             NULL, lh);
572         } else {
573                 /*
574                  * This is the case mdt0 is remote node, issue DLM lock like
575                  * other clients.
576                  */
577                 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
578                                       LDLM_IBITS, &policy, LCK_EX, &flags,
579                                       ldlm_blocking_ast, ldlm_completion_ast,
580                                       NULL, NULL, NULL, 0, NULL, lh, 0);
581         }
582
583         RETURN(rc);
584 }
585
586 static void mdt_rename_unlock(struct lustre_handle *lh)
587 {
588         ENTRY;
589         ldlm_lock_decref(lh, LCK_EX);
590         EXIT;
591 }
592
593 /*
594  * This is is_subdir() variant, it is CMD is cmm forwards it to correct
595  * target. Source should not be ancestor of target dir. May be other rename
596  * checks can be moved here later.
597  */
598 static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
599 {
600         struct mdt_reint_record *rr = &info->mti_rr;
601         struct lu_fid dst_fid = *rr->rr_fid2;
602         struct mdt_object *dst;
603         int rc = 0;
604         ENTRY;
605
606         do {
607                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
608                                       BYPASS_CAPA);
609                 if (!IS_ERR(dst)) {
610                         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
611                                            fid, &dst_fid);
612                         mdt_object_put(info->mti_env, dst);
613                         if (rc < 0) {
614                                 CERROR("Error while doing mdo_is_subdir(), rc %d\n",
615                                        rc);
616                         } else if (rc == 1) {
617                                 rc = -EINVAL;
618                         }
619                 } else {
620                         rc = PTR_ERR(dst);
621                 }
622         } while (rc == EREMOTE);
623
624         RETURN(rc);
625 }
626
627 static int mdt_reint_rename(struct mdt_thread_info *info,
628                             struct mdt_lock_handle *lhc)
629 {
630         struct mdt_reint_record *rr = &info->mti_rr;
631         struct req_capsule      *pill = &info->mti_pill;
632         struct md_attr          *ma = &info->mti_attr;
633         struct mdt_object       *msrcdir;
634         struct mdt_object       *mtgtdir;
635         struct mdt_object       *mold;
636         struct mdt_object       *mnew = NULL;
637         struct mdt_lock_handle  *lh_srcdirp;
638         struct mdt_lock_handle  *lh_tgtdirp;
639         struct mdt_lock_handle  *lh_oldp;
640         struct mdt_lock_handle  *lh_newp;
641         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
642         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
643         struct lustre_handle     rename_lh = { 0 };
644         int                      rc;
645
646         ENTRY;
647
648         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
649         if (rc == 1) {
650         /* if (rr->rr_name[0] == 0) {*/
651                 rc = mdt_reint_rename_tgt(info);
652                 RETURN(rc);
653         }
654
655         rc = mdt_rename_lock(info, &rename_lh);
656         if (rc) {
657                 CERROR("can't lock FS for rename, rc %d\n", rc);
658                 GOTO(out, rc);
659         }
660
661         lh_newp = &info->mti_lh[MDT_LH_NEW];
662
663         /* step 1: lock the source dir */
664         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
665         lh_srcdirp->mlh_mode = LCK_EX;
666         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
667                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
668         if (IS_ERR(msrcdir))
669                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
670
671         /*step 2: find & lock the target dir*/
672         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
673         lh_tgtdirp->mlh_mode = LCK_EX;
674         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
675                 mdt_object_get(info->mti_env, msrcdir);
676                 mtgtdir = msrcdir;
677         } else {
678                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
679                                           rr->rr_fid2, rr->rr_capa2);
680                 if (IS_ERR(mtgtdir))
681                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
682
683                 rc = mdt_object_cr_lock(info, mtgtdir, lh_tgtdirp,
684                                         MDS_INODELOCK_UPDATE);
685                 if (rc != 0) {
686                         mdt_object_put(info->mti_env, mtgtdir);
687                         GOTO(out_unlock_source, rc);
688                 }
689
690         }
691
692         /*step 3: find & lock the old object*/
693         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
694                         rr->rr_name, old_fid);
695         if (rc != 0)
696                 GOTO(out_unlock_target, rc);
697
698         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
699                 GOTO(out_unlock_target, rc = -EINVAL);
700
701         lh_oldp = &info->mti_lh[MDT_LH_OLD];
702         lh_oldp->mlh_mode = LCK_EX;
703         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
704                                     MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
705         if (IS_ERR(mold))
706                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
707
708         /*step 4: find & lock the new object*/
709         /* new target object may not exist now */
710         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
711                         rr->rr_tgt, new_fid);
712         if (rc == 0) {
713                 /* the new_fid should have been filled at this moment*/
714                 if (lu_fid_eq(old_fid, new_fid))
715                        GOTO(out_unlock_old, rc);
716
717                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
718                     lu_fid_eq(new_fid, rr->rr_fid2))
719                         GOTO(out_unlock_old, rc = -EINVAL);
720
721                 lh_newp->mlh_mode = LCK_EX;
722                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
723                                        BYPASS_CAPA);
724                 if (IS_ERR(mnew))
725                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
726
727                 rc = mdt_object_cr_lock(info, mnew, lh_newp,
728                                         MDS_INODELOCK_FULL);
729                 if (rc != 0) {
730                         mdt_object_put(info->mti_env, mnew);
731                         GOTO(out_unlock_old, rc);
732                 }
733         } else if (rc != -EREMOTE && rc != -ENOENT)
734                 GOTO(out_unlock_old, rc);
735
736         /* step 5: dome some checking ...*/
737         /* step 6: rename it */
738         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
739         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
740                                                &RMF_MDT_MD, RCL_SERVER);
741
742         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
743                                                 &RMF_LOGCOOKIES);
744         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
745                                                &RMF_LOGCOOKIES, RCL_SERVER);
746
747         if (!ma->ma_lmm || !ma->ma_cookie)
748                 GOTO(out_unlock_new, rc = -EINVAL);
749
750         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
751
752         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
753                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
754
755         /* Check if @dst is subdir of @src. */
756         rc = mdt_rename_check(info, old_fid);
757         if (rc)
758                 GOTO(out_unlock_new, rc);
759
760         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
761                         mdt_object_child(mtgtdir), old_fid, rr->rr_name,
762                         (mnew ? mdt_object_child(mnew) : NULL),
763                         rr->rr_tgt, ma);
764
765         /* handle last link of tgt object */
766         if (rc == 0 && mnew)
767                 mdt_handle_last_unlink(info, mnew, ma);
768
769 out_unlock_new:
770         if (mnew) {
771                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
772         }
773 out_unlock_old:
774         mdt_object_unlock_put(info, mold, lh_oldp, rc);
775 out_unlock_target:
776         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
777 out_unlock_source:
778         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
779 out_rename_lock:
780         mdt_rename_unlock(&rename_lh);
781 out:
782         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
783         return rc;
784 }
785
786 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
787                            struct mdt_lock_handle *lhc);
788
789 static mdt_reinter reinters[REINT_MAX] = {
790         [REINT_SETATTR]  = mdt_reint_setattr,
791         [REINT_CREATE] = mdt_reint_create,
792         [REINT_LINK] = mdt_reint_link,
793         [REINT_UNLINK] = mdt_reint_unlink,
794         [REINT_RENAME] = mdt_reint_rename,
795         [REINT_OPEN] = mdt_reint_open
796 };
797
798 int mdt_reint_rec(struct mdt_thread_info *info,
799                   struct mdt_lock_handle *lhc)
800 {
801         int rc;
802         ENTRY;
803
804         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
805
806         RETURN(rc);
807 }