Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39 static int mdt_md_create(struct mdt_thread_info *info)
40 {
41         struct mdt_device      *mdt = info->mti_mdt;
42         struct mdt_object      *parent;
43         struct mdt_object      *child;
44         struct mdt_lock_handle *lh;
45         struct mdt_body        *repbody;
46         struct md_attr         *ma = &info->mti_attr;
47         struct mdt_reint_record *rr = &info->mti_rr;
48         int rc;
49         ENTRY;
50
51         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
52
53         lh = &info->mti_lh[MDT_LH_PARENT];
54         lh->mlh_mode = LCK_EX;
55
56         parent = mdt_object_find_lock(info, rr->rr_fid1,
57                                       lh, MDS_INODELOCK_UPDATE,
58                                       rr->rr_capa1);
59         if (IS_ERR(parent))
60                 RETURN(PTR_ERR(parent));
61
62         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA);
63         if (!IS_ERR(child)) {
64                 struct md_object *next = mdt_object_child(parent);
65
66                 ma->ma_need = MA_INODE;
67                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
68                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
69
70                 rc = mdo_create(info->mti_env, next, rr->rr_name,
71                                 mdt_object_child(child),
72                                 &info->mti_spec, ma, &info->mti_uc);
73                 if (rc == 0) {
74                         /* return fid & attr to client. */
75                         if (ma->ma_valid & MA_INODE)
76                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
77                                                    mdt_object_fid(child));
78                                 mdt_body_reverse_idmap(info, repbody);
79                 }
80                 mdt_object_put(info->mti_env, child);
81         } else
82                 rc = PTR_ERR(child);
83         mdt_object_unlock_put(info, parent, lh, rc);
84         RETURN(rc);
85 }
86
87 /* partial request to create object only */
88 static int mdt_md_mkobj(struct mdt_thread_info *info)
89 {
90         struct mdt_device      *mdt = info->mti_mdt;
91         struct mdt_object      *o;
92         struct mdt_body        *repbody;
93         struct md_attr         *ma = &info->mti_attr;
94         int rc;
95         ENTRY;
96
97         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
98
99         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
100                             BYPASS_CAPA);
101         if (!IS_ERR(o)) {
102                 struct md_object *next = mdt_object_child(o);
103
104                 ma->ma_need = MA_INODE;
105                 rc = mo_object_create(info->mti_env, next, &info->mti_spec,
106                                       ma, &info->mti_uc);
107                 if (rc == 0) {
108                         /* return fid & attr to client. */
109                         if (ma->ma_valid & MA_INODE)
110                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
111                                                    mdt_object_fid(o));
112                                 mdt_body_reverse_idmap(info, repbody);
113                 }
114                 mdt_object_put(info->mti_env, o);
115         } else
116                 rc = PTR_ERR(o);
117
118         RETURN(rc);
119 }
120
121 /* In the raw-setattr case, we lock the child inode.
122  * In the write-back case or if being called from open,
123  *               the client holds a lock already.
124  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
125  * mdt_setattr_unpack()) flag to tell these cases apart. */
126 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
127 {
128         struct md_attr          *ma = &info->mti_attr;
129         struct mdt_lock_handle  *lh;
130         int som_update = 0;
131         int rc;
132         ENTRY;
133
134         if (info->mti_epoch)
135                 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
136
137         /* Try to avoid object_lock if another epoch has been started
138          * already. */
139         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
140                 RETURN(0);
141
142         lh = &info->mti_lh[MDT_LH_PARENT];
143         lh->mlh_mode = LCK_EX;
144
145         if (!(flags & MRF_SETATTR_LOCKED)) {
146                 __u64 lockpart = MDS_INODELOCK_UPDATE;
147                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
148                         lockpart |= MDS_INODELOCK_LOOKUP;
149
150                 rc = mdt_object_lock(info, mo, lh, lockpart);
151                 if (rc != 0)
152                         GOTO(out, rc);
153         }
154
155         /* Setattrs are syncronized through dlm lock taken above. If another
156          * epoch started, its attributes may be already flushed on disk,
157          * skip setattr. */
158         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
159                 GOTO(out, rc = 0);
160
161         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
162                 GOTO(out, rc = -ENOENT);
163
164         /* all attrs are packed into mti_attr in unpack_setattr */
165         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
166                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
167
168         /* all attrs are packed into mti_attr in unpack_setattr */
169         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma,
170                          &info->mti_uc);
171         if (rc != 0)
172                 GOTO(out, rc);
173
174         /* Re-enable SIZEONMDS. */
175         if (som_update) {
176                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
177                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
178                        mo->mot_epochcount);
179
180                 mdt_sizeonmds_enable(info, mo);
181         }
182
183         EXIT;
184 out:
185         mdt_object_unlock(info, mo, lh, rc);
186         return(rc);
187 }
188
189 static int mdt_reint_setattr(struct mdt_thread_info *info,
190                              struct mdt_lock_handle *lhc)
191 {
192         struct mdt_device       *mdt = info->mti_mdt;
193         struct md_attr          *ma = &info->mti_attr;
194         struct mdt_reint_record *rr = &info->mti_rr;
195         struct ptlrpc_request   *req = mdt_info_req(info);
196         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
197         struct mdt_file_data    *mfd;
198         struct mdt_object       *mo;
199         struct md_object        *next;
200         struct mdt_body         *repbody;
201         int                      rc;
202
203         ENTRY;
204
205         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
206                   (unsigned int)ma->ma_attr.la_valid);
207
208         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
209         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
210                              rr->rr_capa1);
211         if (IS_ERR(mo))
212                 RETURN(rc = PTR_ERR(mo));
213
214         if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
215                 /* Truncate case. */
216                 rc = mdt_write_get(info->mti_mdt, mo);
217                 if (rc)
218                         GOTO(out, rc);
219
220                 mfd = mdt_mfd_new();
221                 if (mfd == NULL)
222                         GOTO(out, rc = -ENOMEM);
223
224                 /* FIXME: in recovery, need to pass old epoch here */
225                 mdt_epoch_open(info, mo, 0);
226                 repbody->ioepoch = mo->mot_ioepoch;
227
228                 mdt_object_get(info->mti_env, mo);
229                 mfd->mfd_mode = FMODE_EPOCHLCK;
230                 mfd->mfd_object = mo;
231                 mfd->mfd_xid = req->rq_xid;
232
233                 spin_lock(&med->med_open_lock);
234                 list_add(&mfd->mfd_list, &med->med_open_head);
235                 spin_unlock(&med->med_open_lock);
236                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
237         }
238
239         rc = mdt_attr_set(info, mo, rr->rr_flags);
240         if (rc)
241                 GOTO(out, rc);
242
243         if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
244                 LASSERT(info->mti_epoch);
245
246                 /* Size-on-MDS Update. Find and free mfd. */
247                 spin_lock(&med->med_open_lock);
248                 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
249                 if (mfd == NULL) {
250                         spin_unlock(&med->med_open_lock);
251                         CDEBUG(D_INODE, "no handle for file close: "
252                                "fid = "DFID": cookie = "LPX64"\n",
253                                PFID(info->mti_rr.rr_fid1),
254                                info->mti_epoch->handle.cookie);
255                         GOTO(out, rc = -ESTALE);
256                 }
257
258                 LASSERT(mfd->mfd_mode == FMODE_SOM);
259                 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
260                 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
261
262                 class_handle_unhash(&mfd->mfd_handle);
263                 list_del_init(&mfd->mfd_list);
264                 spin_unlock(&med->med_open_lock);
265                 mdt_mfd_close(info, mfd);
266         }
267
268         ma->ma_need = MA_INODE;
269         next = mdt_object_child(mo);
270         rc = mo_attr_get(info->mti_env, next, ma, NULL);
271         if (rc != 0)
272                 GOTO(out, rc);
273
274         mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
275
276         if (mdt->mdt_opts.mo_oss_capa) {
277                 struct lustre_capa *capa;
278
279                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
280                 LASSERT(capa);
281                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
282                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa);
283                 if (rc)
284                         RETURN(rc);
285                 repbody->valid |= OBD_MD_FLOSSCAPA;
286         }
287
288         mdt_body_reverse_idmap(info, repbody);
289         EXIT;
290 out:
291         mdt_object_put(info->mti_env, mo);
292         return rc;
293 }
294
295 static int mdt_reint_create(struct mdt_thread_info *info,
296                             struct mdt_lock_handle *lhc)
297 {
298         int rc;
299         ENTRY;
300
301         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
302                 RETURN(-ESTALE);
303
304         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
305         case S_IFREG:
306         case S_IFDIR:{
307                 if (strlen(info->mti_rr.rr_name) == 0) {
308                         rc = mdt_md_mkobj(info);
309                         break;
310                 }
311         }
312         case S_IFLNK:
313         case S_IFCHR:
314         case S_IFBLK:
315         case S_IFIFO:
316         case S_IFSOCK:{
317                 /* special file should stay on the same node as parent */
318                 LASSERT(strlen(info->mti_rr.rr_name) > 0);
319                 rc = mdt_md_create(info);
320                 break;
321         }
322         default:
323                 rc = -EOPNOTSUPP;
324         }
325         RETURN(rc);
326 }
327
328
329 static int mdt_reint_unlink(struct mdt_thread_info *info,
330                             struct mdt_lock_handle *lhc)
331 {
332         struct mdt_reint_record *rr = &info->mti_rr;
333         struct ptlrpc_request   *req = mdt_info_req(info);
334         struct md_attr          *ma = &info->mti_attr;
335         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
336         struct mdt_object       *mp;
337         struct mdt_object       *mc;
338         struct mdt_lock_handle  *parent_lh;
339         struct mdt_lock_handle  *child_lh;
340         int                      rc;
341         ENTRY;
342
343         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
344                   rr->rr_name);
345
346         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
347                 GOTO(out, rc = -ENOENT);
348
349         /* step 1: lock the parent */
350         parent_lh = &info->mti_lh[MDT_LH_PARENT];
351         parent_lh->mlh_mode = LCK_EX;
352         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
353                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
354         if (IS_ERR(mp))
355                 GOTO(out, rc = PTR_ERR(mp));
356
357         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
358         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
359                                                &RMF_MDT_MD, RCL_SERVER);
360
361         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
362                                                &RMF_LOGCOOKIES);
363         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
364                                                   &RMF_LOGCOOKIES,
365                                                   RCL_SERVER);
366         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
367
368         if (!ma->ma_lmm || !ma->ma_cookie)
369                 GOTO(out_unlock_parent, rc = -EINVAL);
370
371         if (strlen(rr->rr_name) == 0) {
372                 /* remote partial operation */
373                 rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma,
374                                 &info->mti_uc);
375                 GOTO(out_unlock_parent, rc);
376         }
377
378         /* step 2: find & lock the child */
379         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
380                         rr->rr_name, child_fid, &info->mti_uc);
381         if (rc != 0)
382                  GOTO(out_unlock_parent, rc);
383
384         /* we will lock the child regardless it is local or remote. No harm. */
385         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
386                                   BYPASS_CAPA);
387         if (IS_ERR(mc))
388                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
389         child_lh = &info->mti_lh[MDT_LH_CHILD];
390         child_lh->mlh_mode = LCK_EX;
391         rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
392         if (rc != 0)
393                 GOTO(out_put_child, rc);
394
395         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
396                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
397
398         /*
399          * Now we can only make sure we need MA_INODE, in mdd layer, will check
400          * whether need MA_LOV and MA_COOKIE.
401          */
402         ma->ma_need = MA_INODE;
403         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
404                         mdt_object_child(mc), rr->rr_name, ma, &info->mti_uc);
405         if (rc)
406                 GOTO(out_unlock_child, rc);
407
408         mdt_handle_last_unlink(info, mc, ma);
409
410         GOTO(out_unlock_child, rc);
411 out_unlock_child:
412         mdt_object_unlock(info, mc, child_lh, rc);
413 out_put_child:
414         mdt_object_put(info->mti_env, mc);
415 out_unlock_parent:
416         mdt_object_unlock_put(info, mp, parent_lh, rc);
417 out:
418         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
419         return rc;
420 }
421
422 static int mdt_reint_link(struct mdt_thread_info *info,
423                           struct mdt_lock_handle *lhc)
424 {
425         struct mdt_reint_record *rr = &info->mti_rr;
426         struct ptlrpc_request   *req = mdt_info_req(info);
427         struct md_attr          *ma = &info->mti_attr;
428         struct mdt_object       *ms;
429         struct mdt_object       *mp;
430         struct mdt_lock_handle  *lhs;
431         struct mdt_lock_handle  *lhp;
432         int rc;
433
434         ENTRY;
435
436         DEBUG_REQ(D_INODE, req, "link original "DFID" to "DFID" %s",
437                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
438
439         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
440                 RETURN(-ENOENT);
441
442         /* step 1: lock the source */
443         lhs = &info->mti_lh[MDT_LH_PARENT];
444         lhs->mlh_mode = LCK_EX;
445         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
446                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
447         if (IS_ERR(ms))
448                 RETURN(PTR_ERR(ms));
449
450         if (strlen(rr->rr_name) == 0) {
451                 /* remote partial operation */
452                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms),
453                                 &info->mti_uc);
454                 GOTO(out_unlock_source, rc);
455         }
456         /*step 2: find & lock the target parent dir*/
457         lhp = &info->mti_lh[MDT_LH_CHILD];
458         lhp->mlh_mode = LCK_EX;
459         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
460                                   MDS_INODELOCK_UPDATE, rr->rr_capa2);
461         if (IS_ERR(mp))
462                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
463
464         /* step 4: link it */
465
466         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
467                        OBD_FAIL_MDS_REINT_LINK_WRITE);
468
469         rc = mdo_link(info->mti_env, mdt_object_child(mp),
470                       mdt_object_child(ms), rr->rr_name, ma, &info->mti_uc);
471         GOTO(out_unlock_target, rc);
472
473 out_unlock_target:
474         mdt_object_unlock_put(info, mp, lhp, rc);
475 out_unlock_source:
476         mdt_object_unlock_put(info, ms, lhs, rc);
477         return rc;
478 }
479
480 /* partial operation for rename */
481 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
482 {
483         struct mdt_reint_record *rr = &info->mti_rr;
484         struct ptlrpc_request   *req = mdt_info_req(info);
485         struct md_attr          *ma = &info->mti_attr;
486         struct mdt_object       *mtgtdir;
487         struct mdt_object       *mtgt = NULL;
488         struct mdt_lock_handle  *lh_tgtdir;
489         struct mdt_lock_handle  *lh_tgt;
490         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
491         int                      rc;
492
493         ENTRY;
494
495         DEBUG_REQ(D_INODE, req, "rename_tgt "DFID" to "DFID" %s",
496                   PFID(rr->rr_fid2),
497                   PFID(rr->rr_fid1), rr->rr_tgt);
498
499         /* step 1: lookup & lock the tgt dir */
500         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
501         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
502         lh_tgtdir->mlh_mode = LCK_EX;
503         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
504                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
505         if (IS_ERR(mtgtdir))
506                 GOTO(out, rc = PTR_ERR(mtgtdir));
507
508         /*step 2: find & lock the target object if exists*/
509         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
510                         rr->rr_tgt, tgt_fid, &info->mti_uc);
511         if (rc != 0 && rc != -ENOENT) {
512                 GOTO(out_unlock_tgtdir, rc);
513         } else if (rc == 0) {
514                 lh_tgt->mlh_mode = LCK_EX;
515
516                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
517                                             MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
518                 if (IS_ERR(mtgt))
519                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
520
521                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
522                                     mdt_object_child(mtgt), rr->rr_fid2,
523                                     rr->rr_tgt, ma, &info->mti_uc);
524         } else /* -ENOENT */ {
525                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
526                                      rr->rr_tgt, rr->rr_fid2,
527                                      S_ISDIR(ma->ma_attr.la_mode),
528                                      &info->mti_uc);
529         }
530
531         /* handle last link of tgt object */
532         if (rc == 0 && mtgt)
533                 mdt_handle_last_unlink(info, mtgt, ma);
534
535         EXIT;
536         if (mtgt) {
537                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
538         }
539 out_unlock_tgtdir:
540         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
541 out:
542         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
543         return rc;
544 }
545
546 static int mdt_rename_lock(struct mdt_thread_info *info,
547                            struct lustre_handle *lh)
548 {
549         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
550         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
551         int flags = LDLM_FL_ATOMIC_CB;
552         struct ldlm_res_id res_id;
553         struct lu_site *ls;
554         int rc;
555         ENTRY;
556
557         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
558         fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
559
560         if (ls->ls_control_exp == NULL) {
561                 /*
562                  * Current node is controller, that is mdt0 where we should take
563                  * BFL lock.
564                  */
565                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy,
566                                             LCK_EX, &flags, ldlm_blocking_ast,
567                                             ldlm_completion_ast, NULL, NULL, 0,
568                                             NULL, lh);
569         } else {
570                 /*
571                  * This is the case mdt0 is remote node, issue DLM lock like
572                  * other clients.
573                  */
574                 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
575                                       LDLM_IBITS, &policy, LCK_EX, &flags,
576                                       ldlm_blocking_ast, ldlm_completion_ast,
577                                       NULL, NULL, NULL, 0, NULL, lh, 0);
578         }
579
580         RETURN(rc);
581 }
582
583 static void mdt_rename_unlock(struct lustre_handle *lh)
584 {
585         ENTRY;
586         ldlm_lock_decref(lh, LCK_EX);
587         EXIT;
588 }
589
590 /*
591  * This is is_subdir() variant, it is CMD is cmm forwards it to correct
592  * target. Source should not be ancestor of target dir. May be other rename
593  * checks can be moved here later.
594  */
595 static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
596 {
597         struct mdt_reint_record *rr = &info->mti_rr;
598         struct lu_fid dst_fid = *rr->rr_fid2;
599         struct mdt_object *dst;
600         int rc = 0;
601         ENTRY;
602
603         do {
604                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
605                                       BYPASS_CAPA);
606                 if (!IS_ERR(dst)) {
607                         rc = mdo_is_subdir(info->mti_env,
608                                            mdt_object_child(dst),
609                                            fid, &dst_fid, NULL);
610                         mdt_object_put(info->mti_env, dst);
611                         if (rc < 0) {
612                                 CERROR("Error while doing mdo_is_subdir(), rc %d\n",
613                                        rc);
614                         } else if (rc == 1) {
615                                 rc = -EINVAL;
616                         }
617                 } else {
618                         rc = PTR_ERR(dst);
619                 }
620         } while (rc == EREMOTE);
621
622         RETURN(rc);
623 }
624
625 static int mdt_reint_rename(struct mdt_thread_info *info,
626                             struct mdt_lock_handle *lhc)
627 {
628         struct mdt_reint_record *rr = &info->mti_rr;
629         struct req_capsule      *pill = &info->mti_pill;
630         struct ptlrpc_request   *req = mdt_info_req(info);
631         struct md_attr          *ma = &info->mti_attr;
632         struct mdt_object       *msrcdir;
633         struct mdt_object       *mtgtdir;
634         struct mdt_object       *mold;
635         struct mdt_object       *mnew = NULL;
636         struct mdt_lock_handle  *lh_srcdirp;
637         struct mdt_lock_handle  *lh_tgtdirp;
638         struct mdt_lock_handle  *lh_oldp;
639         struct mdt_lock_handle  *lh_newp;
640         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
641         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
642         struct lustre_handle     rename_lh = { 0 };
643         int                      rc;
644
645         ENTRY;
646
647 #if 0
648         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
649                   PFID(rr->rr_fid1), rr->rr_name,
650                   PFID(rr->rr_fid2), rr->rr_tgt);
651 #endif
652
653         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
654         if (rc == 1) {
655         /* if (rr->rr_name[0] == 0) {*/
656                 rc = mdt_reint_rename_tgt(info);
657                 RETURN(rc);
658         }
659
660         rc = mdt_rename_lock(info, &rename_lh);
661         if (rc) {
662                 CERROR("can't lock FS for rename, rc %d\n", rc);
663                 RETURN(rc);
664         }
665
666         lh_newp = &info->mti_lh[MDT_LH_NEW];
667
668         /* step 1: lock the source dir */
669         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
670         lh_srcdirp->mlh_mode = LCK_EX;
671         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
672                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
673         if (IS_ERR(msrcdir))
674                 GOTO(out, rc = PTR_ERR(msrcdir));
675
676         /*step 2: find & lock the target dir*/
677         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
678         lh_tgtdirp->mlh_mode = LCK_EX;
679         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
680                 mdt_object_get(info->mti_env, msrcdir);
681                 mtgtdir = msrcdir;
682         } else {
683                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
684                                           rr->rr_fid2, rr->rr_capa2);
685                 if (IS_ERR(mtgtdir))
686                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
687
688                 rc = mdt_object_cr_lock(info, mtgtdir, lh_tgtdirp,
689                                         MDS_INODELOCK_UPDATE);
690                 if (rc != 0) {
691                         mdt_object_put(info->mti_env, mtgtdir);
692                         GOTO(out_unlock_source, rc);
693                 }
694
695         }
696
697         /*step 3: find & lock the old object*/
698         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
699                         rr->rr_name, old_fid, &info->mti_uc);
700         if (rc != 0)
701                 GOTO(out_unlock_target, rc);
702
703         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
704                 GOTO(out_unlock_target, rc = -EINVAL);
705
706         lh_oldp = &info->mti_lh[MDT_LH_OLD];
707         lh_oldp->mlh_mode = LCK_EX;
708         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
709                                     MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
710         if (IS_ERR(mold))
711                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
712
713         /*step 4: find & lock the new object*/
714         /* new target object may not exist now */
715         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
716                         rr->rr_tgt, new_fid, &info->mti_uc);
717         if (rc == 0) {
718                 /* the new_fid should have been filled at this moment*/
719                 if (lu_fid_eq(old_fid, new_fid))
720                        GOTO(out_unlock_old, rc);
721
722                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
723                     lu_fid_eq(new_fid, rr->rr_fid2))
724                         GOTO(out_unlock_old, rc = -EINVAL);
725
726                 lh_newp->mlh_mode = LCK_EX;
727                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
728                                        BYPASS_CAPA);
729                 if (IS_ERR(mnew))
730                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
731
732                 rc = mdt_object_cr_lock(info, mnew, lh_newp,
733                                         MDS_INODELOCK_FULL);
734                 if (rc != 0) {
735                         mdt_object_put(info->mti_env, mnew);
736                         GOTO(out_unlock_old, rc);
737                 }
738         } else if (rc != -EREMOTE && rc != -ENOENT)
739                 GOTO(out_unlock_old, rc);
740
741         /* step 5: dome some checking ...*/
742         /* step 6: rename it */
743         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
744         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
745                                                &RMF_MDT_MD, RCL_SERVER);
746
747         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
748                                                 &RMF_LOGCOOKIES);
749         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
750                                                &RMF_LOGCOOKIES, RCL_SERVER);
751
752         if (!ma->ma_lmm || !ma->ma_cookie)
753                 GOTO(out_unlock_new, rc = -EINVAL);
754
755         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
756
757         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
758                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
759
760         /* Check if @dst is subdir of @src. */
761         rc = mdt_rename_check(info, old_fid);
762         if (rc)
763                 GOTO(out_unlock_new, rc);
764
765         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
766                         mdt_object_child(mtgtdir), old_fid, rr->rr_name,
767                         (mnew ? mdt_object_child(mnew) : NULL),
768                         rr->rr_tgt, ma, &info->mti_uc);
769
770         /* handle last link of tgt object */
771         if (rc == 0 && mnew)
772                 mdt_handle_last_unlink(info, mnew, ma);
773
774 out_unlock_new:
775         if (mnew) {
776                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
777         }
778 out_unlock_old:
779         mdt_object_unlock_put(info, mold, lh_oldp, rc);
780 out_unlock_target:
781         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
782 out_unlock_source:
783         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
784 out:
785         mdt_rename_unlock(&rename_lh);
786         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
787         return rc;
788 }
789
790 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
791                            struct mdt_lock_handle *lhc);
792
793 static mdt_reinter reinters[REINT_MAX] = {
794         [REINT_SETATTR]  = mdt_reint_setattr,
795         [REINT_CREATE] = mdt_reint_create,
796         [REINT_LINK] = mdt_reint_link,
797         [REINT_UNLINK] = mdt_reint_unlink,
798         [REINT_RENAME] = mdt_reint_rename,
799         [REINT_OPEN] = mdt_reint_open
800 };
801
802 int mdt_reint_rec(struct mdt_thread_info *info,
803                   struct mdt_lock_handle *lhc)
804 {
805         int rc;
806         ENTRY;
807
808         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
809
810         RETURN(rc);
811 }