Whamcloud - gitweb
LU-1187 mdt: unlink remote directory
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_reint.c
37  *
38  * Lustre Metadata Target (mdt) reintegration routines
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Huang Hua <huanghua@clusterfs.com>
44  * Author: Yury Umanets <umka@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include "mdt_internal.h"
50
51 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
52                                      struct md_attr *ma)
53 {
54         ma->ma_need = MA_INODE;
55         ma->ma_valid = 0;
56 }
57
58 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
59                                 struct mdt_object *object,
60                                 struct mdt_body *repbody)
61 {
62         ENTRY;
63
64         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
65          * we won't go to below*/
66         if (repbody->valid & OBD_MD_FLMDSCAPA)
67                 RETURN(rc);
68
69         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
70             exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
71                 struct lustre_capa *capa;
72
73                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
74                 LASSERT(capa);
75                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
76                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
77                                  0);
78                 if (rc == 0)
79                         repbody->valid |= OBD_MD_FLMDSCAPA;
80         }
81
82         RETURN(rc);
83 }
84
85 /**
86  * Get version of object by fid.
87  *
88  * Return real version or ENOENT_VERSION if object doesn't exist
89  */
90 static void mdt_obj_version_get(struct mdt_thread_info *info,
91                                 struct mdt_object *o, __u64 *version)
92 {
93         LASSERT(o);
94         if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
95                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
96         else
97                 *version = ENOENT_VERSION;
98         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
99                PFID(mdt_object_fid(o)), *version);
100 }
101
102 /**
103  * Check version is correct.
104  *
105  * Should be called only during replay.
106  */
107 static int mdt_version_check(struct ptlrpc_request *req,
108                              __u64 version, int idx)
109 {
110         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
111         ENTRY;
112
113         if (!exp_connect_vbr(req->rq_export))
114                 RETURN(0);
115
116         LASSERT(req_is_replay(req));
117         /** VBR: version is checked always because costs nothing */
118         LASSERT(idx < PTLRPC_NUM_VERSIONS);
119         /** Sanity check for malformed buffers */
120         if (pre_ver == NULL) {
121                 CERROR("No versions in request buffer\n");
122                 spin_lock(&req->rq_export->exp_lock);
123                 req->rq_export->exp_vbr_failed = 1;
124                 spin_unlock(&req->rq_export->exp_lock);
125                 RETURN(-EOVERFLOW);
126         } else if (pre_ver[idx] != version) {
127                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
128                        pre_ver[idx], version);
129                 spin_lock(&req->rq_export->exp_lock);
130                 req->rq_export->exp_vbr_failed = 1;
131                 spin_unlock(&req->rq_export->exp_lock);
132                 RETURN(-EOVERFLOW);
133         }
134         RETURN(0);
135 }
136
137 /**
138  * Save pre-versions in reply.
139  */
140 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
141                              int idx)
142 {
143         __u64 *reply_ver;
144
145         if (!exp_connect_vbr(req->rq_export))
146                 return;
147
148         LASSERT(!req_is_replay(req));
149         LASSERT(req->rq_repmsg != NULL);
150         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
151         if (reply_ver)
152                 reply_ver[idx] = version;
153 }
154
155 /**
156  * Save enoent version, it is needed when it is obvious that object doesn't
157  * exist, e.g. child during create.
158  */
159 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
160 {
161         /* save version of file name for replay, it must be ENOENT here */
162         if (!req_is_replay(mdt_info_req(info))) {
163                 info->mti_ver[idx] = ENOENT_VERSION;
164                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
165         }
166 }
167
168 /**
169  * Get version from disk and save in reply buffer.
170  *
171  * Versions are saved in reply only during normal operations not replays.
172  */
173 void mdt_version_get_save(struct mdt_thread_info *info,
174                           struct mdt_object *mto, int idx)
175 {
176         /* don't save versions during replay */
177         if (!req_is_replay(mdt_info_req(info))) {
178                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
180         }
181 }
182
183 /**
184  * Get version from disk and check it, no save in reply.
185  */
186 int mdt_version_get_check(struct mdt_thread_info *info,
187                           struct mdt_object *mto, int idx)
188 {
189         /* only check versions during replay */
190         if (!req_is_replay(mdt_info_req(info)))
191                 return 0;
192
193         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
194         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
195 }
196
197 /**
198  * Get version from disk and check if recovery or just save.
199  */
200 int mdt_version_get_check_save(struct mdt_thread_info *info,
201                                struct mdt_object *mto, int idx)
202 {
203         int rc = 0;
204
205         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
206         if (req_is_replay(mdt_info_req(info)))
207                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
208                                        idx);
209         else
210                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
211         return rc;
212 }
213
214 /**
215  * Lookup with version checking.
216  *
217  * This checks version of 'name'. Many reint functions uses 'name' for child not
218  * FID, therefore we need to get object by name and check its version.
219  */
220 int mdt_lookup_version_check(struct mdt_thread_info *info,
221                              struct mdt_object *p, struct lu_name *lname,
222                              struct lu_fid *fid, int idx)
223 {
224         int rc, vbrc;
225
226         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
227                         &info->mti_spec);
228         /* Check version only during replay */
229         if (!req_is_replay(mdt_info_req(info)))
230                 return rc;
231
232         info->mti_ver[idx] = ENOENT_VERSION;
233         if (rc == 0) {
234                 struct mdt_object *child;
235                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
236                 if (likely(!IS_ERR(child))) {
237                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
238                         mdt_object_put(info->mti_env, child);
239                 }
240         }
241         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
242         return vbrc ? vbrc : rc;
243
244 }
245
246 /*
247  * VBR: we save three versions in reply:
248  * 0 - parent. Check that parent version is the same during replay.
249  * 1 - name. Version of 'name' if file exists with the same name or
250  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
251  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
252  * check.
253  */
254 static int mdt_md_create(struct mdt_thread_info *info)
255 {
256         struct mdt_device       *mdt = info->mti_mdt;
257         struct mdt_object       *parent;
258         struct mdt_object       *child;
259         struct mdt_lock_handle  *lh;
260         struct mdt_body         *repbody;
261         struct md_attr          *ma = &info->mti_attr;
262         struct mdt_reint_record *rr = &info->mti_rr;
263         struct lu_name          *lname;
264         int rc;
265         ENTRY;
266
267         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
268                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
269
270         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
271
272         lh = &info->mti_lh[MDT_LH_PARENT];
273         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
274
275         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
276                                       MDS_INODELOCK_UPDATE);
277         if (IS_ERR(parent))
278                 RETURN(PTR_ERR(parent));
279
280         if (mdt_object_obf(parent))
281                 GOTO(out_put_parent, rc = -EPERM);
282
283         rc = mdt_version_get_check_save(info, parent, 0);
284         if (rc)
285                 GOTO(out_put_parent, rc);
286
287         /*
288          * Check child name version during replay.
289          * During create replay a file may exist with same name.
290          */
291         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
292         rc = mdt_lookup_version_check(info, parent, lname,
293                                       &info->mti_tmp_fid1, 1);
294         if (rc == 0)
295                 GOTO(out_put_parent, rc = -EEXIST);
296
297         /* -ENOENT is expected here */
298         if (rc != -ENOENT)
299                 GOTO(out_put_parent, rc);
300
301         /* save version of file name for replay, it must be ENOENT here */
302         mdt_enoent_version_save(info, 1);
303
304         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
305         if (likely(!IS_ERR(child))) {
306                 struct md_object *next = mdt_object_child(parent);
307
308                 if (mdt_object_exists(child) < 0) {
309                         struct seq_server_site *ss;
310                         struct lu_ucred *uc  = mdt_ucred(info);
311
312                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
313                                 CERROR("%s: Creating remote dir is only "
314                                        "permitted for administrator: rc = %d\n",
315                                         mdt2obd_dev(mdt)->obd_name, -EPERM);
316                                 GOTO(out_put_child, rc = -EPERM);
317                         }
318
319                         ss = mdt_seq_site(mdt);
320                         if (ss->ss_node_id != 0 &&
321                             mdt->mdt_enable_remote_dir == 0) {
322                                 CERROR("%s: remote dir is only permitted on"
323                                        " MDT0 or set_param"
324                                        " mdt.*.enable_remote_dir=1\n",
325                                        mdt2obd_dev(mdt)->obd_name);
326                                 GOTO(out_put_child, rc = -EPERM);
327                         }
328                 }
329                 ma->ma_need = MA_INODE;
330                 ma->ma_valid = 0;
331                 /* capa for cross-ref will be stored here */
332                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
333                                                      &RMF_CAPA1);
334                 LASSERT(ma->ma_capa);
335
336                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
337                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
338
339                 /* Version of child will be updated on disk. */
340                 info->mti_mos = child;
341                 rc = mdt_version_get_check_save(info, child, 2);
342                 if (rc)
343                         GOTO(out_put_child, rc);
344
345                 /* Let lower layer know current lock mode. */
346                 info->mti_spec.sp_cr_mode =
347                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
348
349                 /*
350                  * Do not perform lookup sanity check. We know that name does
351                  * not exist.
352                  */
353                 info->mti_spec.sp_cr_lookup = 0;
354                 info->mti_spec.sp_feat = &dt_directory_features;
355
356                 rc = mdo_create(info->mti_env, next, lname,
357                                 mdt_object_child(child),
358                                 &info->mti_spec, ma);
359                 if (rc == 0)
360                         rc = mdt_attr_get_complex(info, child, ma);
361
362                 if (rc == 0) {
363                         /* Return fid & attr to client. */
364                         if (ma->ma_valid & MA_INODE)
365                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
366                                                    mdt_object_fid(child));
367                 }
368 out_put_child:
369                 mdt_object_put(info->mti_env, child);
370         } else {
371                 rc = PTR_ERR(child);
372         }
373         mdt_create_pack_capa(info, rc, child, repbody);
374 out_put_parent:
375         mdt_object_unlock_put(info, parent, lh, rc);
376         RETURN(rc);
377 }
378
379 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
380                  struct md_attr *ma, int flags)
381 {
382         struct mdt_lock_handle  *lh;
383         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
384         __u64 lockpart = MDS_INODELOCK_UPDATE;
385         int rc;
386         ENTRY;
387
388         /* attr shouldn't be set on remote object */
389         LASSERT(mdt_object_exists(mo) >= 0);
390
391         lh = &info->mti_lh[MDT_LH_PARENT];
392         mdt_lock_reg_init(lh, LCK_PW);
393
394         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
395                 lockpart |= MDS_INODELOCK_LOOKUP;
396
397         rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
398         if (rc != 0)
399                 RETURN(rc);
400
401         if (mdt_object_exists(mo) == 0)
402                 GOTO(out_unlock, rc = -ENOENT);
403
404         /* all attrs are packed into mti_attr in unpack_setattr */
405         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
406                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
407
408         /* This is only for set ctime when rename's source is on remote MDS. */
409         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
410                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
411
412         /* VBR: update version if attr changed are important for recovery */
413         if (do_vbr) {
414                 /* update on-disk version of changed object */
415                 info->mti_mos = mo;
416                 rc = mdt_version_get_check_save(info, mo, 0);
417                 if (rc)
418                         GOTO(out_unlock, rc);
419         }
420
421         /* all attrs are packed into mti_attr in unpack_setattr */
422         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
423         if (rc != 0)
424                 GOTO(out_unlock, rc);
425
426         EXIT;
427 out_unlock:
428         mdt_object_unlock(info, mo, lh, rc);
429         return rc;
430 }
431
432 /**
433  * Check HSM flags and add HS_DIRTY flag if relevant.
434  *
435  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
436  * and is not RELEASED.
437  */
438 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
439                         struct md_attr *ma)
440 {
441         int rc;
442         ENTRY;
443
444         /* If the file was modified, add the dirty flag */
445         ma->ma_need = MA_HSM;
446         rc = mdt_attr_get_complex(info, mo, ma);
447         if (rc) {
448                 CERROR("file attribute read error for "DFID": %d.\n",
449                         PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc);
450                 RETURN(rc);
451         }
452
453         /* If an up2date copy exists in the backend, add dirty flag */
454         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
455             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
456
457                 ma->ma_hsm.mh_flags |= HS_DIRTY;
458                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
459                 if (rc) {
460                         CERROR("file attribute change error for "DFID": %d\n",
461                                 PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc);
462                         RETURN(rc);
463                 }
464         }
465
466         RETURN(rc);
467 }
468
469 static int mdt_reint_setattr(struct mdt_thread_info *info,
470                              struct mdt_lock_handle *lhc)
471 {
472         struct md_attr          *ma = &info->mti_attr;
473         struct mdt_reint_record *rr = &info->mti_rr;
474         struct ptlrpc_request   *req = mdt_info_req(info);
475         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
476         struct mdt_file_data    *mfd;
477         struct mdt_object       *mo;
478         struct mdt_body         *repbody;
479         int                      som_au, rc, rc2;
480         ENTRY;
481
482         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
483                   (unsigned int)ma->ma_attr.la_valid);
484
485         if (info->mti_dlm_req)
486                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
487
488         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
489         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
490         if (IS_ERR(mo))
491                 GOTO(out, rc = PTR_ERR(mo));
492
493         if (mdt_object_obf(mo))
494                 GOTO(out_put, rc = -EPERM);
495
496         /* start a log jounal handle if needed */
497         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
498                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
499                     (rr->rr_flags & MRF_OPEN_TRUNC)) {
500                         /* Check write access for the O_TRUNC case */
501                         if (mdt_write_read(mo) < 0)
502                                 GOTO(out_put, rc = -ETXTBSY);
503                 }
504         } else if (info->mti_ioepoch &&
505                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
506                 /* Truncate case. IOEpoch is opened. */
507                 rc = mdt_write_get(mo);
508                 if (rc)
509                         GOTO(out_put, rc);
510
511                 mfd = mdt_mfd_new();
512                 if (mfd == NULL) {
513                         mdt_write_put(mo);
514                         GOTO(out_put, rc = -ENOMEM);
515                 }
516
517                 mdt_ioepoch_open(info, mo, 0);
518                 repbody->ioepoch = mo->mot_ioepoch;
519
520                 mdt_object_get(info->mti_env, mo);
521                 mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
522                 mfd->mfd_object = mo;
523                 mfd->mfd_xid = req->rq_xid;
524
525                 spin_lock(&med->med_open_lock);
526                 cfs_list_add(&mfd->mfd_list, &med->med_open_head);
527                 spin_unlock(&med->med_open_lock);
528                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
529         }
530
531         som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
532         if (som_au) {
533                 /* SOM Attribute update case. Find the proper mfd and update
534                  * SOM attributes on the proper object. */
535                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
536                 LASSERT(info->mti_ioepoch);
537
538                 spin_lock(&med->med_open_lock);
539                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
540                 if (mfd == NULL) {
541                         spin_unlock(&med->med_open_lock);
542                         CDEBUG(D_INODE, "no handle for file close: "
543                                "fid = "DFID": cookie = "LPX64"\n",
544                                PFID(info->mti_rr.rr_fid1),
545                                info->mti_ioepoch->handle.cookie);
546                         GOTO(out_put, rc = -ESTALE);
547                 }
548                 LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
549                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
550
551                 class_handle_unhash(&mfd->mfd_handle);
552                 cfs_list_del_init(&mfd->mfd_list);
553                 spin_unlock(&med->med_open_lock);
554
555                 mdt_mfd_close(info, mfd);
556         } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
557                 LASSERT((ma->ma_valid & MA_LOV) == 0);
558                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
559                 if (rc)
560                         GOTO(out_put, rc);
561         } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
562                 struct lu_buf *buf  = &info->mti_buf;
563                 LASSERT(ma->ma_attr.la_valid == 0);
564                 buf->lb_buf = ma->ma_lmm;
565                 buf->lb_len = ma->ma_lmm_size;
566                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
567                                   buf, XATTR_NAME_LOV, 0);
568                 if (rc)
569                         GOTO(out_put, rc);
570         } else
571                 LBUG();
572
573         /* If file data is modified, add the dirty flag */
574         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
575                 rc = mdt_add_dirty_flag(info, mo, ma);
576
577         ma->ma_need = MA_INODE;
578         ma->ma_valid = 0;
579         rc = mdt_attr_get_complex(info, mo, ma);
580         if (rc != 0)
581                 GOTO(out_put, rc);
582
583         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
584
585         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
586             exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA &&
587             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
588             (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
589                 struct lustre_capa *capa;
590
591                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
592                 LASSERT(capa);
593                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
594                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
595                 if (rc)
596                         GOTO(out_put, rc);
597                 repbody->valid |= OBD_MD_FLOSSCAPA;
598         }
599
600         EXIT;
601 out_put:
602         mdt_object_put(info->mti_env, mo);
603 out:
604         if (rc == 0)
605                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
606
607         mdt_client_compatibility(info);
608         rc2 = mdt_fix_reply(info);
609         if (rc == 0)
610                 rc = rc2;
611         return rc;
612 }
613
614 static int mdt_reint_create(struct mdt_thread_info *info,
615                             struct mdt_lock_handle *lhc)
616 {
617         struct ptlrpc_request   *req = mdt_info_req(info);
618         int                     rc;
619         ENTRY;
620
621         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
622                 RETURN(err_serious(-ESTALE));
623
624         if (info->mti_dlm_req)
625                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
626
627         LASSERT(info->mti_rr.rr_namelen > 0);
628         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
629         case S_IFDIR:
630                 mdt_counter_incr(req, LPROC_MDT_MKDIR);
631                 break;
632         case S_IFREG:
633         case S_IFLNK:
634         case S_IFCHR:
635         case S_IFBLK:
636         case S_IFIFO:
637         case S_IFSOCK:
638                 /* Special file should stay on the same node as parent. */
639                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
640                 break;
641         default:
642                 CERROR("%s: Unsupported mode %o\n",
643                        mdt2obd_dev(info->mti_mdt)->obd_name,
644                        info->mti_attr.ma_attr.la_mode);
645                 RETURN(err_serious(-EOPNOTSUPP));
646         }
647
648         rc = mdt_md_create(info);
649         RETURN(rc);
650 }
651
652 /*
653  * VBR: save parent version in reply and child version getting by its name.
654  * Version of child is getting and checking during its lookup. If
655  */
656 static int mdt_reint_unlink(struct mdt_thread_info *info,
657                             struct mdt_lock_handle *lhc)
658 {
659         struct mdt_reint_record *rr = &info->mti_rr;
660         struct ptlrpc_request   *req = mdt_info_req(info);
661         struct md_attr          *ma = &info->mti_attr;
662         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
663         struct mdt_object       *mp;
664         struct mdt_object       *mc;
665         struct mdt_lock_handle  *parent_lh;
666         struct mdt_lock_handle  *child_lh;
667         struct lu_name          *lname;
668         int                      rc;
669         ENTRY;
670
671         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
672                   rr->rr_name);
673
674         if (info->mti_dlm_req)
675                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
676
677         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
678                 RETURN(err_serious(-ENOENT));
679
680         /*
681          * step 1: Found the parent.
682          */
683         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
684         if (IS_ERR(mp)) {
685                 rc = PTR_ERR(mp);
686                 GOTO(out, rc);
687         }
688
689         if (mdt_object_obf(mp))
690                 GOTO(put_parent, rc = -EPERM);
691
692         parent_lh = &info->mti_lh[MDT_LH_PARENT];
693         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
694         if (mdt_object_exists(mp) < 0) {
695                 mdt_lock_reg_init(parent_lh, LCK_EX);
696                 rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
697                                             parent_lh->mlh_rreg_mode,
698                                             MDS_INODELOCK_UPDATE);
699                 if (rc != ELDLM_OK)
700                         GOTO(put_parent, rc);
701
702         } else {
703                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
704                                   rr->rr_namelen);
705                 rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
706                                      MDT_LOCAL_LOCK);
707                 if (rc)
708                         GOTO(put_parent, rc);
709
710                 rc = mdt_version_get_check_save(info, mp, 0);
711                 if (rc)
712                         GOTO(unlock_parent, rc);
713         }
714
715         /* step 2: find & lock the child */
716         /* lookup child object along with version checking */
717         fid_zero(child_fid);
718         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
719         if (rc != 0)
720                 GOTO(unlock_parent, rc);
721
722         mdt_reint_init_ma(info, ma);
723
724         /* We will lock the child regardless it is local or remote. No harm. */
725         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
726         if (IS_ERR(mc))
727                 GOTO(unlock_parent, rc = PTR_ERR(mc));
728
729         child_lh = &info->mti_lh[MDT_LH_CHILD];
730         mdt_lock_reg_init(child_lh, LCK_EX);
731         if (mdt_object_exists(mc) < 0) {
732                 struct mdt_body  *repbody;
733
734                 if (!fid_is_zero(rr->rr_fid2)) {
735                         CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n",
736                                mdt2obd_dev(info->mti_mdt)->obd_name,
737                                (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
738                         GOTO(unlock_parent, rc = -ENOENT);
739                 }
740                 CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n",
741                        mdt2obd_dev(info->mti_mdt)->obd_name,
742                        (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
743
744                 if (info->mti_spec.sp_rm_entry) {
745                         struct lu_ucred *uc  = mdt_ucred(info);
746
747                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
748                                 CERROR("%s: unlink remote entry is only "
749                                        "permitted for administrator: rc = %d\n",
750                                         mdt2obd_dev(info->mti_mdt)->obd_name,
751                                         -EPERM);
752                                 GOTO(unlock_parent, rc = -EPERM);
753                         }
754
755                         ma->ma_need = MA_INODE;
756                         ma->ma_valid = 0;
757                         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
758                         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
759                                         NULL, lname, ma);
760                         mdt_object_put(info->mti_env, mc);
761                         GOTO(unlock_parent, rc);
762                 }
763                 /* Revoke the LOOKUP lock of the remote object granted by
764                  * this MDT. Since the unlink will happen on another MDT,
765                  * it will release the LOOKUP lock right away. Then What
766                  * would happen if another client try to grab the LOOKUP
767                  * lock at the same time with unlink XXX */
768                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP,
769                                 MDT_CROSS_LOCK);
770                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
771                 LASSERT(repbody != NULL);
772                 repbody->fid1 = *mdt_object_fid(mc);
773                 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
774                 mdt_object_unlock_put(info, mc, child_lh, rc);
775                 GOTO(unlock_parent, rc = -EREMOTE);
776         } else if (info->mti_spec.sp_rm_entry) {
777                 CERROR("%s: lfs rmdir should not be used on local dir %s\n",
778                        mdt2obd_dev(info->mti_mdt)->obd_name,
779                        (char *)rr->rr_name);
780                 mdt_object_put(info->mti_env, mc);
781                 GOTO(unlock_parent, rc = -EPERM);
782         }
783
784         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
785                              MDT_CROSS_LOCK);
786         if (rc != 0) {
787                 mdt_object_put(info->mti_env, mc);
788                 GOTO(unlock_parent, rc);
789         }
790
791         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
792                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
793         /* save version when object is locked */
794         mdt_version_get_save(info, mc, 1);
795         /*
796          * Now we can only make sure we need MA_INODE, in mdd layer, will check
797          * whether need MA_LOV and MA_COOKIE.
798          */
799         ma->ma_need = MA_INODE;
800         ma->ma_valid = 0;
801         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
802         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
803                         mdt_object_child(mc), lname, ma);
804         if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
805                 rc = mdt_attr_get_complex(info, mc, ma);
806         if (rc == 0)
807                 mdt_handle_last_unlink(info, mc, ma);
808
809         if (ma->ma_valid & MA_INODE) {
810                 switch (ma->ma_attr.la_mode & S_IFMT) {
811                 case S_IFDIR:
812                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
813                         break;
814                 case S_IFREG:
815                 case S_IFLNK:
816                 case S_IFCHR:
817                 case S_IFBLK:
818                 case S_IFIFO:
819                 case S_IFSOCK:
820                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
821                         break;
822                 default:
823                         LASSERTF(0, "bad file type %o unlinking\n",
824                                  ma->ma_attr.la_mode);
825                 }
826         }
827
828         EXIT;
829
830         mdt_object_unlock_put(info, mc, child_lh, rc);
831 unlock_parent:
832         mdt_object_unlock(info, mp, parent_lh, rc);
833 put_parent:
834         mdt_object_put(info->mti_env, mp);
835 out:
836         return rc;
837 }
838
839 /*
840  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
841  * name.
842  */
843 static int mdt_reint_link(struct mdt_thread_info *info,
844                           struct mdt_lock_handle *lhc)
845 {
846         struct mdt_reint_record *rr = &info->mti_rr;
847         struct ptlrpc_request   *req = mdt_info_req(info);
848         struct md_attr          *ma = &info->mti_attr;
849         struct mdt_object       *ms;
850         struct mdt_object       *mp;
851         struct mdt_lock_handle  *lhs;
852         struct mdt_lock_handle  *lhp;
853         struct lu_name          *lname;
854         int rc;
855         ENTRY;
856
857         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
858                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
859
860         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
861                 RETURN(err_serious(-ENOENT));
862
863         if (info->mti_dlm_req)
864                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
865
866         /* Invalid case so return error immediately instead of
867          * processing it */
868         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
869                 RETURN(-EPERM);
870
871         /* step 1: find & lock the target parent dir */
872         lhp = &info->mti_lh[MDT_LH_PARENT];
873         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
874                           rr->rr_namelen);
875         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
876                                   MDS_INODELOCK_UPDATE);
877         if (IS_ERR(mp))
878                 RETURN(PTR_ERR(mp));
879
880         if (mdt_object_obf(mp))
881                 GOTO(out_unlock_parent, rc = -EPERM);
882
883         rc = mdt_version_get_check_save(info, mp, 0);
884         if (rc)
885                 GOTO(out_unlock_parent, rc);
886
887         /* step 2: find & lock the source */
888         lhs = &info->mti_lh[MDT_LH_CHILD];
889         mdt_lock_reg_init(lhs, LCK_EX);
890
891         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
892         if (IS_ERR(ms))
893                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
894
895         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
896                             MDT_CROSS_LOCK);
897         if (rc != 0) {
898                 mdt_object_put(info->mti_env, ms);
899                 GOTO(out_unlock_parent, rc);
900         }
901
902         /* step 3: link it */
903         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
904                        OBD_FAIL_MDS_REINT_LINK_WRITE);
905
906         info->mti_mos = ms;
907         rc = mdt_version_get_check_save(info, ms, 1);
908         if (rc)
909                 GOTO(out_unlock_child, rc);
910
911         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
912         /** check target version by name during replay */
913         rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
914         if (rc != 0 && rc != -ENOENT)
915                 GOTO(out_unlock_child, rc);
916         /* save version of file name for replay, it must be ENOENT here */
917         if (!req_is_replay(mdt_info_req(info))) {
918                 info->mti_ver[2] = ENOENT_VERSION;
919                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
920         }
921
922         rc = mdo_link(info->mti_env, mdt_object_child(mp),
923                       mdt_object_child(ms), lname, ma);
924
925         if (rc == 0)
926                 mdt_counter_incr(req, LPROC_MDT_LINK);
927
928         EXIT;
929 out_unlock_child:
930         mdt_object_unlock_put(info, ms, lhs, rc);
931 out_unlock_parent:
932         mdt_object_unlock_put(info, mp, lhp, rc);
933         return rc;
934 }
935 /**
936  * lock the part of the directory according to the hash of the name
937  * (lh->mlh_pdo_hash) in parallel directory lock.
938  */
939 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
940                        struct mdt_lock_handle *lh,
941                        struct mdt_object *obj, __u64 ibits)
942 {
943         struct ldlm_res_id *res_id = &info->mti_res_id;
944         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
945         ldlm_policy_data_t *policy = &info->mti_policy;
946         int rc;
947
948         /*
949          * Finish res_id initializing by name hash marking part of
950          * directory which is taking modification.
951          */
952         LASSERT(lh->mlh_pdo_hash != 0);
953         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
954         memset(policy, 0, sizeof(*policy));
955         policy->l_inodebits.bits = ibits;
956         /*
957          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
958          * going to be sent to client. If it is - mdt_intent_policy() path will
959          * fix it up and turn FL_LOCAL flag off.
960          */
961         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
962                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
963                           &info->mti_exp->exp_handle.h_cookie);
964         return rc;
965 }
966
967 static int mdt_rename_lock(struct mdt_thread_info *info,
968                            struct lustre_handle *lh)
969 {
970         struct ldlm_namespace   *ns = info->mti_mdt->mdt_namespace;
971         ldlm_policy_data_t      *policy = &info->mti_policy;
972         struct ldlm_res_id      *res_id = &info->mti_res_id;
973         __u64                   flags = 0;
974         int rc;
975         ENTRY;
976
977         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
978
979         memset(policy, 0, sizeof *policy);
980         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
981 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 4, 53, 0)
982         /* In phase I, we will not do cross-rename, so local BFL lock would
983          * be enough
984          */
985         flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
986         /*
987          * Current node is controller, that is mdt0, where we should
988          * take BFL lock.
989          */
990         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
991                                     LCK_EX, &flags, ldlm_blocking_ast,
992                                     ldlm_completion_ast, NULL, NULL, 0,
993                                     LVB_T_NONE,
994                                     &info->mti_exp->exp_handle.h_cookie,
995                                     lh);
996 #else
997 #warning "Local rename lock is invalid for DNE phase II."
998 #endif
999         RETURN(rc);
1000 }
1001
1002 static void mdt_rename_unlock(struct lustre_handle *lh)
1003 {
1004         ENTRY;
1005         LASSERT(lustre_handle_is_used(lh));
1006         ldlm_lock_decref(lh, LCK_EX);
1007         EXIT;
1008 }
1009
1010 /*
1011  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1012  * target. Source should not be ancestor of target dir. May be other rename
1013  * checks can be moved here later.
1014  */
1015 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
1016 {
1017         struct mdt_reint_record *rr = &info->mti_rr;
1018         struct lu_fid dst_fid = *rr->rr_fid2;
1019         struct mdt_object *dst;
1020         int rc = 0;
1021         ENTRY;
1022
1023         do {
1024                 LASSERT(fid_is_sane(&dst_fid));
1025                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
1026                 if (!IS_ERR(dst)) {
1027                         rc = mdo_is_subdir(info->mti_env,
1028                                            mdt_object_child(dst), fid,
1029                                            &dst_fid);
1030                         mdt_object_put(info->mti_env, dst);
1031                         if (rc != -EREMOTE && rc < 0) {
1032                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
1033                         } else {
1034                                 /* check the found fid */
1035                                 if (lu_fid_eq(&dst_fid, fid))
1036                                         rc = -EINVAL;
1037                         }
1038                 } else {
1039                         rc = PTR_ERR(dst);
1040                 }
1041         } while (rc == -EREMOTE);
1042
1043         RETURN(rc);
1044 }
1045
1046 /*
1047  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1048  * 2 - src child; 3 - tgt child.
1049  * Update on disk version of src child.
1050  */
1051 static int mdt_reint_rename(struct mdt_thread_info *info,
1052                             struct mdt_lock_handle *lhc)
1053 {
1054         struct mdt_reint_record *rr = &info->mti_rr;
1055         struct md_attr          *ma = &info->mti_attr;
1056         struct ptlrpc_request   *req = mdt_info_req(info);
1057         struct mdt_object       *msrcdir;
1058         struct mdt_object       *mtgtdir;
1059         struct mdt_object       *mold;
1060         struct mdt_object       *mnew = NULL;
1061         struct mdt_lock_handle  *lh_srcdirp;
1062         struct mdt_lock_handle  *lh_tgtdirp;
1063         struct mdt_lock_handle  *lh_oldp;
1064         struct mdt_lock_handle  *lh_newp;
1065         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1066         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1067         struct lustre_handle     rename_lh = { 0 };
1068         struct lu_name           slname = { 0 };
1069         struct lu_name          *lname;
1070         int                      rc;
1071         ENTRY;
1072
1073         if (info->mti_dlm_req)
1074                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
1075
1076         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
1077                   PFID(rr->rr_fid1), rr->rr_name,
1078                   PFID(rr->rr_fid2), rr->rr_tgt);
1079
1080         rc = mdt_rename_lock(info, &rename_lh);
1081         if (rc) {
1082                 CERROR("Can't lock FS for rename, rc %d\n", rc);
1083                 RETURN(rc);
1084         }
1085
1086         lh_newp = &info->mti_lh[MDT_LH_NEW];
1087
1088         /* step 1: lock the source dir. */
1089         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1090         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
1091                           rr->rr_namelen);
1092         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
1093                                        MDS_INODELOCK_UPDATE);
1094         if (IS_ERR(msrcdir))
1095                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
1096
1097         if (mdt_object_obf(msrcdir))
1098                 GOTO(out_unlock_source, rc = -EPERM);
1099
1100         rc = mdt_version_get_check_save(info, msrcdir, 0);
1101         if (rc)
1102                 GOTO(out_unlock_source, rc);
1103
1104         /* step 2: find & lock the target dir. */
1105         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1106         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
1107                           rr->rr_tgtlen);
1108         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1109                 mdt_object_get(info->mti_env, msrcdir);
1110                 mtgtdir = msrcdir;
1111                 if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
1112                          rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
1113                                                  MDS_INODELOCK_UPDATE);
1114                          if (rc)
1115                                  GOTO(out_unlock_source, rc);
1116                          OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1117                 }
1118         } else {
1119                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
1120                                           rr->rr_fid2);
1121                 if (IS_ERR(mtgtdir))
1122                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
1123
1124                 if (mdt_object_obf(mtgtdir))
1125                         GOTO(out_put_target, rc = -EPERM);
1126
1127                 /* check early, the real version will be saved after locking */
1128                 rc = mdt_version_get_check(info, mtgtdir, 1);
1129                 if (rc)
1130                         GOTO(out_put_target, rc);
1131
1132                 rc = mdt_object_exists(mtgtdir);
1133                 if (rc == 0) {
1134                         GOTO(out_put_target, rc = -ESTALE);
1135                 } else if (rc > 0) {
1136                         /* we lock the target dir if it is local */
1137                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
1138                                              MDS_INODELOCK_UPDATE,
1139                                              MDT_LOCAL_LOCK);
1140                         if (rc != 0)
1141                                 GOTO(out_put_target, rc);
1142                         /* get and save correct version after locking */
1143                         mdt_version_get_save(info, mtgtdir, 1);
1144                 }
1145         }
1146
1147         /* step 3: find & lock the old object. */
1148         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
1149         mdt_name_copy(&slname, lname);
1150         fid_zero(old_fid);
1151         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
1152         if (rc != 0)
1153                 GOTO(out_unlock_target, rc);
1154
1155         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1156                 GOTO(out_unlock_target, rc = -EINVAL);
1157
1158         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1159         if (IS_ERR(mold))
1160                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
1161
1162         if (mdt_object_obf(mold)) {
1163                 mdt_object_put(info->mti_env, mold);
1164                 GOTO(out_unlock_target, rc = -EPERM);
1165         }
1166
1167         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1168         mdt_lock_reg_init(lh_oldp, LCK_EX);
1169         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1170                              MDT_CROSS_LOCK);
1171         if (rc != 0) {
1172                 mdt_object_put(info->mti_env, mold);
1173                 GOTO(out_unlock_target, rc);
1174         }
1175
1176         info->mti_mos = mold;
1177         /* save version after locking */
1178         mdt_version_get_save(info, mold, 2);
1179         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1180
1181         /* step 4: find & lock the new object. */
1182         /* new target object may not exist now */
1183         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1184         /* lookup with version checking */
1185         fid_zero(new_fid);
1186         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
1187         if (rc == 0) {
1188                 /* the new_fid should have been filled at this moment */
1189                 if (lu_fid_eq(old_fid, new_fid))
1190                        GOTO(out_unlock_old, rc);
1191
1192                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1193                     lu_fid_eq(new_fid, rr->rr_fid2))
1194                         GOTO(out_unlock_old, rc = -EINVAL);
1195
1196                 mdt_lock_reg_init(lh_newp, LCK_EX);
1197                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1198                 if (IS_ERR(mnew))
1199                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1200
1201                 if (mdt_object_obf(mnew)) {
1202                         mdt_object_put(info->mti_env, mnew);
1203                         GOTO(out_unlock_old, rc = -EPERM);
1204                 }
1205
1206                 rc = mdt_object_lock(info, mnew, lh_newp,
1207                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1208                 if (rc != 0) {
1209                         mdt_object_put(info->mti_env, mnew);
1210                         GOTO(out_unlock_old, rc);
1211                 }
1212                 /* get and save version after locking */
1213                 mdt_version_get_save(info, mnew, 3);
1214                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1215         } else if (rc != -EREMOTE && rc != -ENOENT) {
1216                 GOTO(out_unlock_old, rc);
1217         } else {
1218                 mdt_enoent_version_save(info, 3);
1219         }
1220
1221         /* step 5: rename it */
1222         mdt_reint_init_ma(info, ma);
1223
1224         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1225                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1226
1227
1228         /* Check if @dst is subdir of @src. */
1229         rc = mdt_rename_sanity(info, old_fid);
1230         if (rc)
1231                 GOTO(out_unlock_new, rc);
1232
1233         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1234                         mdt_object_child(mtgtdir), old_fid, &slname,
1235                         (mnew ? mdt_object_child(mnew) : NULL),
1236                         lname, ma);
1237
1238         /* handle last link of tgt object */
1239         if (rc == 0) {
1240                 mdt_counter_incr(req, LPROC_MDT_RENAME);
1241                 if (mnew)
1242                         mdt_handle_last_unlink(info, mnew, ma);
1243
1244                 mdt_rename_counter_tally(info, info->mti_mdt, req,
1245                                          msrcdir, mtgtdir);
1246         }
1247
1248         EXIT;
1249 out_unlock_new:
1250         if (mnew)
1251                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1252 out_unlock_old:
1253         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1254 out_unlock_target:
1255         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
1256 out_put_target:
1257         mdt_object_put(info->mti_env, mtgtdir);
1258 out_unlock_source:
1259         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1260 out_rename_lock:
1261         if (lustre_handle_is_used(&rename_lh))
1262                 mdt_rename_unlock(&rename_lh);
1263         return rc;
1264 }
1265
1266 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1267                            struct mdt_lock_handle *lhc);
1268
1269 static mdt_reinter reinters[REINT_MAX] = {
1270         [REINT_SETATTR]  = mdt_reint_setattr,
1271         [REINT_CREATE]   = mdt_reint_create,
1272         [REINT_LINK]     = mdt_reint_link,
1273         [REINT_UNLINK]   = mdt_reint_unlink,
1274         [REINT_RENAME]   = mdt_reint_rename,
1275         [REINT_OPEN]     = mdt_reint_open,
1276         [REINT_SETXATTR] = mdt_reint_setxattr,
1277         [REINT_RMENTRY]  = mdt_reint_unlink
1278 };
1279
1280 int mdt_reint_rec(struct mdt_thread_info *info,
1281                   struct mdt_lock_handle *lhc)
1282 {
1283         int rc;
1284         ENTRY;
1285
1286         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1287
1288         RETURN(rc);
1289 }