Whamcloud - gitweb
LU-1187 mdt: add sanity check for rename and link
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_reint.c
37  *
38  * Lustre Metadata Target (mdt) reintegration routines
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Huang Hua <huanghua@clusterfs.com>
44  * Author: Yury Umanets <umka@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include "mdt_internal.h"
50
51 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
52                                      struct md_attr *ma)
53 {
54         ma->ma_need = MA_INODE;
55         ma->ma_valid = 0;
56 }
57
58 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
59                                 struct mdt_object *object,
60                                 struct mdt_body *repbody)
61 {
62         ENTRY;
63
64         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
65          * we won't go to below*/
66         if (repbody->valid & OBD_MD_FLMDSCAPA)
67                 RETURN(rc);
68
69         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
70             exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
71                 struct lustre_capa *capa;
72
73                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
74                 LASSERT(capa);
75                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
76                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
77                                  0);
78                 if (rc == 0)
79                         repbody->valid |= OBD_MD_FLMDSCAPA;
80         }
81
82         RETURN(rc);
83 }
84
85 /**
86  * Get version of object by fid.
87  *
88  * Return real version or ENOENT_VERSION if object doesn't exist
89  */
90 static void mdt_obj_version_get(struct mdt_thread_info *info,
91                                 struct mdt_object *o, __u64 *version)
92 {
93         LASSERT(o);
94         if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
95                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
96         else
97                 *version = ENOENT_VERSION;
98         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
99                PFID(mdt_object_fid(o)), *version);
100 }
101
102 /**
103  * Check version is correct.
104  *
105  * Should be called only during replay.
106  */
107 static int mdt_version_check(struct ptlrpc_request *req,
108                              __u64 version, int idx)
109 {
110         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
111         ENTRY;
112
113         if (!exp_connect_vbr(req->rq_export))
114                 RETURN(0);
115
116         LASSERT(req_is_replay(req));
117         /** VBR: version is checked always because costs nothing */
118         LASSERT(idx < PTLRPC_NUM_VERSIONS);
119         /** Sanity check for malformed buffers */
120         if (pre_ver == NULL) {
121                 CERROR("No versions in request buffer\n");
122                 spin_lock(&req->rq_export->exp_lock);
123                 req->rq_export->exp_vbr_failed = 1;
124                 spin_unlock(&req->rq_export->exp_lock);
125                 RETURN(-EOVERFLOW);
126         } else if (pre_ver[idx] != version) {
127                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
128                        pre_ver[idx], version);
129                 spin_lock(&req->rq_export->exp_lock);
130                 req->rq_export->exp_vbr_failed = 1;
131                 spin_unlock(&req->rq_export->exp_lock);
132                 RETURN(-EOVERFLOW);
133         }
134         RETURN(0);
135 }
136
137 /**
138  * Save pre-versions in reply.
139  */
140 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
141                              int idx)
142 {
143         __u64 *reply_ver;
144
145         if (!exp_connect_vbr(req->rq_export))
146                 return;
147
148         LASSERT(!req_is_replay(req));
149         LASSERT(req->rq_repmsg != NULL);
150         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
151         if (reply_ver)
152                 reply_ver[idx] = version;
153 }
154
155 /**
156  * Save enoent version, it is needed when it is obvious that object doesn't
157  * exist, e.g. child during create.
158  */
159 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
160 {
161         /* save version of file name for replay, it must be ENOENT here */
162         if (!req_is_replay(mdt_info_req(info))) {
163                 info->mti_ver[idx] = ENOENT_VERSION;
164                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
165         }
166 }
167
168 /**
169  * Get version from disk and save in reply buffer.
170  *
171  * Versions are saved in reply only during normal operations not replays.
172  */
173 void mdt_version_get_save(struct mdt_thread_info *info,
174                           struct mdt_object *mto, int idx)
175 {
176         /* don't save versions during replay */
177         if (!req_is_replay(mdt_info_req(info))) {
178                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
180         }
181 }
182
183 /**
184  * Get version from disk and check it, no save in reply.
185  */
186 int mdt_version_get_check(struct mdt_thread_info *info,
187                           struct mdt_object *mto, int idx)
188 {
189         /* only check versions during replay */
190         if (!req_is_replay(mdt_info_req(info)))
191                 return 0;
192
193         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
194         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
195 }
196
197 /**
198  * Get version from disk and check if recovery or just save.
199  */
200 int mdt_version_get_check_save(struct mdt_thread_info *info,
201                                struct mdt_object *mto, int idx)
202 {
203         int rc = 0;
204
205         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
206         if (req_is_replay(mdt_info_req(info)))
207                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
208                                        idx);
209         else
210                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
211         return rc;
212 }
213
214 /**
215  * Lookup with version checking.
216  *
217  * This checks version of 'name'. Many reint functions uses 'name' for child not
218  * FID, therefore we need to get object by name and check its version.
219  */
220 int mdt_lookup_version_check(struct mdt_thread_info *info,
221                              struct mdt_object *p, struct lu_name *lname,
222                              struct lu_fid *fid, int idx)
223 {
224         int rc, vbrc;
225
226         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
227                         &info->mti_spec);
228         /* Check version only during replay */
229         if (!req_is_replay(mdt_info_req(info)))
230                 return rc;
231
232         info->mti_ver[idx] = ENOENT_VERSION;
233         if (rc == 0) {
234                 struct mdt_object *child;
235                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
236                 if (likely(!IS_ERR(child))) {
237                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
238                         mdt_object_put(info->mti_env, child);
239                 }
240         }
241         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
242         return vbrc ? vbrc : rc;
243
244 }
245
246 /*
247  * VBR: we save three versions in reply:
248  * 0 - parent. Check that parent version is the same during replay.
249  * 1 - name. Version of 'name' if file exists with the same name or
250  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
251  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
252  * check.
253  */
254 static int mdt_md_create(struct mdt_thread_info *info)
255 {
256         struct mdt_device       *mdt = info->mti_mdt;
257         struct mdt_object       *parent;
258         struct mdt_object       *child;
259         struct mdt_lock_handle  *lh;
260         struct mdt_body         *repbody;
261         struct md_attr          *ma = &info->mti_attr;
262         struct mdt_reint_record *rr = &info->mti_rr;
263         struct lu_name          *lname;
264         int rc;
265         ENTRY;
266
267         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
268                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
269
270         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
271
272         lh = &info->mti_lh[MDT_LH_PARENT];
273         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
274
275         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
276                                       MDS_INODELOCK_UPDATE);
277         if (IS_ERR(parent))
278                 RETURN(PTR_ERR(parent));
279
280         if (mdt_object_obf(parent))
281                 GOTO(out_put_parent, rc = -EPERM);
282
283         rc = mdt_version_get_check_save(info, parent, 0);
284         if (rc)
285                 GOTO(out_put_parent, rc);
286
287         /*
288          * Check child name version during replay.
289          * During create replay a file may exist with same name.
290          */
291         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
292         rc = mdt_lookup_version_check(info, parent, lname,
293                                       &info->mti_tmp_fid1, 1);
294         if (rc == 0)
295                 GOTO(out_put_parent, rc = -EEXIST);
296
297         /* -ENOENT is expected here */
298         if (rc != -ENOENT)
299                 GOTO(out_put_parent, rc);
300
301         /* save version of file name for replay, it must be ENOENT here */
302         mdt_enoent_version_save(info, 1);
303
304         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
305         if (likely(!IS_ERR(child))) {
306                 struct md_object *next = mdt_object_child(parent);
307
308                 if (mdt_object_exists(child) < 0) {
309                         struct seq_server_site *ss;
310                         struct lu_ucred *uc  = mdt_ucred(info);
311
312                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
313                                 CERROR("%s: Creating remote dir is only "
314                                        "permitted for administrator: rc = %d\n",
315                                         mdt2obd_dev(mdt)->obd_name, -EPERM);
316                                 GOTO(out_put_child, rc = -EPERM);
317                         }
318
319                         ss = mdt_seq_site(mdt);
320                         if (ss->ss_node_id != 0 &&
321                             mdt->mdt_enable_remote_dir == 0) {
322                                 CERROR("%s: remote dir is only permitted on"
323                                        " MDT0 or set_param"
324                                        " mdt.*.enable_remote_dir=1\n",
325                                        mdt2obd_dev(mdt)->obd_name);
326                                 GOTO(out_put_child, rc = -EPERM);
327                         }
328                 }
329                 ma->ma_need = MA_INODE;
330                 ma->ma_valid = 0;
331                 /* capa for cross-ref will be stored here */
332                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
333                                                      &RMF_CAPA1);
334                 LASSERT(ma->ma_capa);
335
336                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
337                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
338
339                 /* Version of child will be updated on disk. */
340                 info->mti_mos = child;
341                 rc = mdt_version_get_check_save(info, child, 2);
342                 if (rc)
343                         GOTO(out_put_child, rc);
344
345                 /* Let lower layer know current lock mode. */
346                 info->mti_spec.sp_cr_mode =
347                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
348
349                 /*
350                  * Do not perform lookup sanity check. We know that name does
351                  * not exist.
352                  */
353                 info->mti_spec.sp_cr_lookup = 0;
354                 info->mti_spec.sp_feat = &dt_directory_features;
355
356                 rc = mdo_create(info->mti_env, next, lname,
357                                 mdt_object_child(child),
358                                 &info->mti_spec, ma);
359                 if (rc == 0)
360                         rc = mdt_attr_get_complex(info, child, ma);
361
362                 if (rc == 0) {
363                         /* Return fid & attr to client. */
364                         if (ma->ma_valid & MA_INODE)
365                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
366                                                    mdt_object_fid(child));
367                 }
368 out_put_child:
369                 mdt_object_put(info->mti_env, child);
370         } else {
371                 rc = PTR_ERR(child);
372         }
373         mdt_create_pack_capa(info, rc, child, repbody);
374 out_put_parent:
375         mdt_object_unlock_put(info, parent, lh, rc);
376         RETURN(rc);
377 }
378
379 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
380                  struct md_attr *ma, int flags)
381 {
382         struct mdt_lock_handle  *lh;
383         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
384         __u64 lockpart = MDS_INODELOCK_UPDATE;
385         int rc;
386         ENTRY;
387
388         /* attr shouldn't be set on remote object */
389         LASSERT(mdt_object_exists(mo) >= 0);
390
391         lh = &info->mti_lh[MDT_LH_PARENT];
392         mdt_lock_reg_init(lh, LCK_PW);
393
394         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
395                 lockpart |= MDS_INODELOCK_LOOKUP;
396
397         rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
398         if (rc != 0)
399                 RETURN(rc);
400
401         if (mdt_object_exists(mo) == 0)
402                 GOTO(out_unlock, rc = -ENOENT);
403
404         /* all attrs are packed into mti_attr in unpack_setattr */
405         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
406                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
407
408         /* This is only for set ctime when rename's source is on remote MDS. */
409         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
410                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
411
412         /* VBR: update version if attr changed are important for recovery */
413         if (do_vbr) {
414                 /* update on-disk version of changed object */
415                 info->mti_mos = mo;
416                 rc = mdt_version_get_check_save(info, mo, 0);
417                 if (rc)
418                         GOTO(out_unlock, rc);
419         }
420
421         /* all attrs are packed into mti_attr in unpack_setattr */
422         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
423         if (rc != 0)
424                 GOTO(out_unlock, rc);
425
426         EXIT;
427 out_unlock:
428         mdt_object_unlock(info, mo, lh, rc);
429         return rc;
430 }
431
432 /**
433  * Check HSM flags and add HS_DIRTY flag if relevant.
434  *
435  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
436  * and is not RELEASED.
437  */
438 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
439                         struct md_attr *ma)
440 {
441         int rc;
442         ENTRY;
443
444         /* If the file was modified, add the dirty flag */
445         ma->ma_need = MA_HSM;
446         rc = mdt_attr_get_complex(info, mo, ma);
447         if (rc) {
448                 CERROR("file attribute read error for "DFID": %d.\n",
449                         PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc);
450                 RETURN(rc);
451         }
452
453         /* If an up2date copy exists in the backend, add dirty flag */
454         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
455             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
456
457                 ma->ma_hsm.mh_flags |= HS_DIRTY;
458                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
459                 if (rc) {
460                         CERROR("file attribute change error for "DFID": %d\n",
461                                 PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc);
462                         RETURN(rc);
463                 }
464         }
465
466         RETURN(rc);
467 }
468
469 static int mdt_reint_setattr(struct mdt_thread_info *info,
470                              struct mdt_lock_handle *lhc)
471 {
472         struct md_attr          *ma = &info->mti_attr;
473         struct mdt_reint_record *rr = &info->mti_rr;
474         struct ptlrpc_request   *req = mdt_info_req(info);
475         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
476         struct mdt_file_data    *mfd;
477         struct mdt_object       *mo;
478         struct mdt_body         *repbody;
479         int                      som_au, rc, rc2;
480         ENTRY;
481
482         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
483                   (unsigned int)ma->ma_attr.la_valid);
484
485         if (info->mti_dlm_req)
486                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
487
488         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
489         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
490         if (IS_ERR(mo))
491                 GOTO(out, rc = PTR_ERR(mo));
492
493         if (mdt_object_obf(mo))
494                 GOTO(out_put, rc = -EPERM);
495
496         /* start a log jounal handle if needed */
497         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
498                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
499                     (rr->rr_flags & MRF_OPEN_TRUNC)) {
500                         /* Check write access for the O_TRUNC case */
501                         if (mdt_write_read(mo) < 0)
502                                 GOTO(out_put, rc = -ETXTBSY);
503                 }
504         } else if (info->mti_ioepoch &&
505                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
506                 /* Truncate case. IOEpoch is opened. */
507                 rc = mdt_write_get(mo);
508                 if (rc)
509                         GOTO(out_put, rc);
510
511                 mfd = mdt_mfd_new();
512                 if (mfd == NULL) {
513                         mdt_write_put(mo);
514                         GOTO(out_put, rc = -ENOMEM);
515                 }
516
517                 mdt_ioepoch_open(info, mo, 0);
518                 repbody->ioepoch = mo->mot_ioepoch;
519
520                 mdt_object_get(info->mti_env, mo);
521                 mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
522                 mfd->mfd_object = mo;
523                 mfd->mfd_xid = req->rq_xid;
524
525                 spin_lock(&med->med_open_lock);
526                 cfs_list_add(&mfd->mfd_list, &med->med_open_head);
527                 spin_unlock(&med->med_open_lock);
528                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
529         }
530
531         som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
532         if (som_au) {
533                 /* SOM Attribute update case. Find the proper mfd and update
534                  * SOM attributes on the proper object. */
535                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
536                 LASSERT(info->mti_ioepoch);
537
538                 spin_lock(&med->med_open_lock);
539                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
540                 if (mfd == NULL) {
541                         spin_unlock(&med->med_open_lock);
542                         CDEBUG(D_INODE, "no handle for file close: "
543                                "fid = "DFID": cookie = "LPX64"\n",
544                                PFID(info->mti_rr.rr_fid1),
545                                info->mti_ioepoch->handle.cookie);
546                         GOTO(out_put, rc = -ESTALE);
547                 }
548                 LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
549                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
550
551                 class_handle_unhash(&mfd->mfd_handle);
552                 cfs_list_del_init(&mfd->mfd_list);
553                 spin_unlock(&med->med_open_lock);
554
555                 mdt_mfd_close(info, mfd);
556         } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
557                 LASSERT((ma->ma_valid & MA_LOV) == 0);
558                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
559                 if (rc)
560                         GOTO(out_put, rc);
561         } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
562                 struct lu_buf *buf  = &info->mti_buf;
563                 LASSERT(ma->ma_attr.la_valid == 0);
564                 buf->lb_buf = ma->ma_lmm;
565                 buf->lb_len = ma->ma_lmm_size;
566                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
567                                   buf, XATTR_NAME_LOV, 0);
568                 if (rc)
569                         GOTO(out_put, rc);
570         } else
571                 LBUG();
572
573         /* If file data is modified, add the dirty flag */
574         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
575                 rc = mdt_add_dirty_flag(info, mo, ma);
576
577         ma->ma_need = MA_INODE;
578         ma->ma_valid = 0;
579         rc = mdt_attr_get_complex(info, mo, ma);
580         if (rc != 0)
581                 GOTO(out_put, rc);
582
583         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
584
585         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
586             exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA &&
587             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
588             (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
589                 struct lustre_capa *capa;
590
591                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
592                 LASSERT(capa);
593                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
594                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
595                 if (rc)
596                         GOTO(out_put, rc);
597                 repbody->valid |= OBD_MD_FLOSSCAPA;
598         }
599
600         EXIT;
601 out_put:
602         mdt_object_put(info->mti_env, mo);
603 out:
604         if (rc == 0)
605                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
606
607         mdt_client_compatibility(info);
608         rc2 = mdt_fix_reply(info);
609         if (rc == 0)
610                 rc = rc2;
611         return rc;
612 }
613
614 static int mdt_reint_create(struct mdt_thread_info *info,
615                             struct mdt_lock_handle *lhc)
616 {
617         struct ptlrpc_request   *req = mdt_info_req(info);
618         int                     rc;
619         ENTRY;
620
621         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
622                 RETURN(err_serious(-ESTALE));
623
624         if (info->mti_dlm_req)
625                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
626
627         LASSERT(info->mti_rr.rr_namelen > 0);
628         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
629         case S_IFDIR:
630                 mdt_counter_incr(req, LPROC_MDT_MKDIR);
631                 break;
632         case S_IFREG:
633         case S_IFLNK:
634         case S_IFCHR:
635         case S_IFBLK:
636         case S_IFIFO:
637         case S_IFSOCK:
638                 /* Special file should stay on the same node as parent. */
639                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
640                 break;
641         default:
642                 CERROR("%s: Unsupported mode %o\n",
643                        mdt2obd_dev(info->mti_mdt)->obd_name,
644                        info->mti_attr.ma_attr.la_mode);
645                 RETURN(err_serious(-EOPNOTSUPP));
646         }
647
648         rc = mdt_md_create(info);
649         RETURN(rc);
650 }
651
652 /*
653  * VBR: save parent version in reply and child version getting by its name.
654  * Version of child is getting and checking during its lookup. If
655  */
656 static int mdt_reint_unlink(struct mdt_thread_info *info,
657                             struct mdt_lock_handle *lhc)
658 {
659         struct mdt_reint_record *rr = &info->mti_rr;
660         struct ptlrpc_request   *req = mdt_info_req(info);
661         struct md_attr          *ma = &info->mti_attr;
662         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
663         struct mdt_object       *mp;
664         struct mdt_object       *mc;
665         struct mdt_lock_handle  *parent_lh;
666         struct mdt_lock_handle  *child_lh;
667         struct lu_name          *lname;
668         int                      rc;
669         ENTRY;
670
671         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
672                   rr->rr_name);
673
674         if (info->mti_dlm_req)
675                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
676
677         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
678                 RETURN(err_serious(-ENOENT));
679
680         /*
681          * step 1: Found the parent.
682          */
683         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
684         if (IS_ERR(mp)) {
685                 rc = PTR_ERR(mp);
686                 GOTO(out, rc);
687         }
688
689         if (mdt_object_obf(mp))
690                 GOTO(put_parent, rc = -EPERM);
691
692         parent_lh = &info->mti_lh[MDT_LH_PARENT];
693         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
694         if (mdt_object_exists(mp) < 0) {
695                 mdt_lock_reg_init(parent_lh, LCK_EX);
696                 rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
697                                             parent_lh->mlh_rreg_mode,
698                                             MDS_INODELOCK_UPDATE);
699                 if (rc != ELDLM_OK)
700                         GOTO(put_parent, rc);
701
702         } else {
703                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
704                                   rr->rr_namelen);
705                 rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
706                                      MDT_LOCAL_LOCK);
707                 if (rc)
708                         GOTO(put_parent, rc);
709
710                 rc = mdt_version_get_check_save(info, mp, 0);
711                 if (rc)
712                         GOTO(unlock_parent, rc);
713         }
714
715         /* step 2: find & lock the child */
716         /* lookup child object along with version checking */
717         fid_zero(child_fid);
718         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
719         if (rc != 0)
720                 GOTO(unlock_parent, rc);
721
722         mdt_reint_init_ma(info, ma);
723
724         /* We will lock the child regardless it is local or remote. No harm. */
725         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
726         if (IS_ERR(mc))
727                 GOTO(unlock_parent, rc = PTR_ERR(mc));
728
729         child_lh = &info->mti_lh[MDT_LH_CHILD];
730         mdt_lock_reg_init(child_lh, LCK_EX);
731         if (mdt_object_exists(mc) < 0) {
732                 struct mdt_body  *repbody;
733
734                 if (!fid_is_zero(rr->rr_fid2)) {
735                         CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n",
736                                mdt2obd_dev(info->mti_mdt)->obd_name,
737                                (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
738                         GOTO(unlock_parent, rc = -ENOENT);
739                 }
740                 CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n",
741                        mdt2obd_dev(info->mti_mdt)->obd_name,
742                        (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
743
744                 if (info->mti_spec.sp_rm_entry) {
745                         struct lu_ucred *uc  = mdt_ucred(info);
746
747                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
748                                 CERROR("%s: unlink remote entry is only "
749                                        "permitted for administrator: rc = %d\n",
750                                         mdt2obd_dev(info->mti_mdt)->obd_name,
751                                         -EPERM);
752                                 GOTO(unlock_parent, rc = -EPERM);
753                         }
754
755                         ma->ma_need = MA_INODE;
756                         ma->ma_valid = 0;
757                         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
758                         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
759                                         NULL, lname, ma);
760                         mdt_object_put(info->mti_env, mc);
761                         GOTO(unlock_parent, rc);
762                 }
763                 /* Revoke the LOOKUP lock of the remote object granted by
764                  * this MDT. Since the unlink will happen on another MDT,
765                  * it will release the LOOKUP lock right away. Then What
766                  * would happen if another client try to grab the LOOKUP
767                  * lock at the same time with unlink XXX */
768                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP,
769                                 MDT_CROSS_LOCK);
770                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
771                 LASSERT(repbody != NULL);
772                 repbody->fid1 = *mdt_object_fid(mc);
773                 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
774                 mdt_object_unlock_put(info, mc, child_lh, rc);
775                 GOTO(unlock_parent, rc = -EREMOTE);
776         } else if (info->mti_spec.sp_rm_entry) {
777                 CERROR("%s: lfs rmdir should not be used on local dir %s\n",
778                        mdt2obd_dev(info->mti_mdt)->obd_name,
779                        (char *)rr->rr_name);
780                 mdt_object_put(info->mti_env, mc);
781                 GOTO(unlock_parent, rc = -EPERM);
782         }
783
784         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
785                              MDT_CROSS_LOCK);
786         if (rc != 0) {
787                 mdt_object_put(info->mti_env, mc);
788                 GOTO(unlock_parent, rc);
789         }
790
791         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
792                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
793         /* save version when object is locked */
794         mdt_version_get_save(info, mc, 1);
795         /*
796          * Now we can only make sure we need MA_INODE, in mdd layer, will check
797          * whether need MA_LOV and MA_COOKIE.
798          */
799         ma->ma_need = MA_INODE;
800         ma->ma_valid = 0;
801         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
802         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
803                         mdt_object_child(mc), lname, ma);
804         if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
805                 rc = mdt_attr_get_complex(info, mc, ma);
806         if (rc == 0)
807                 mdt_handle_last_unlink(info, mc, ma);
808
809         if (ma->ma_valid & MA_INODE) {
810                 switch (ma->ma_attr.la_mode & S_IFMT) {
811                 case S_IFDIR:
812                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
813                         break;
814                 case S_IFREG:
815                 case S_IFLNK:
816                 case S_IFCHR:
817                 case S_IFBLK:
818                 case S_IFIFO:
819                 case S_IFSOCK:
820                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
821                         break;
822                 default:
823                         LASSERTF(0, "bad file type %o unlinking\n",
824                                  ma->ma_attr.la_mode);
825                 }
826         }
827
828         EXIT;
829
830         mdt_object_unlock_put(info, mc, child_lh, rc);
831 unlock_parent:
832         mdt_object_unlock(info, mp, parent_lh, rc);
833 put_parent:
834         mdt_object_put(info->mti_env, mp);
835 out:
836         return rc;
837 }
838
839 /*
840  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
841  * name.
842  */
843 static int mdt_reint_link(struct mdt_thread_info *info,
844                           struct mdt_lock_handle *lhc)
845 {
846         struct mdt_reint_record *rr = &info->mti_rr;
847         struct ptlrpc_request   *req = mdt_info_req(info);
848         struct md_attr          *ma = &info->mti_attr;
849         struct mdt_object       *ms;
850         struct mdt_object       *mp;
851         struct mdt_lock_handle  *lhs;
852         struct mdt_lock_handle  *lhp;
853         struct lu_name          *lname;
854         int rc;
855         ENTRY;
856
857         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
858                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
859
860         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
861                 RETURN(err_serious(-ENOENT));
862
863         if (info->mti_dlm_req)
864                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
865
866         /* Invalid case so return error immediately instead of
867          * processing it */
868         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
869                 RETURN(-EPERM);
870
871         /* step 1: find & lock the target parent dir */
872         lhp = &info->mti_lh[MDT_LH_PARENT];
873         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
874                           rr->rr_namelen);
875         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
876                                   MDS_INODELOCK_UPDATE);
877         if (IS_ERR(mp))
878                 RETURN(PTR_ERR(mp));
879
880         if (mdt_object_obf(mp))
881                 GOTO(out_unlock_parent, rc = -EPERM);
882
883         rc = mdt_version_get_check_save(info, mp, 0);
884         if (rc)
885                 GOTO(out_unlock_parent, rc);
886
887         /* step 2: find & lock the source */
888         lhs = &info->mti_lh[MDT_LH_CHILD];
889         mdt_lock_reg_init(lhs, LCK_EX);
890
891         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
892         if (IS_ERR(ms))
893                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
894
895         if (mdt_object_exists(ms) < 0) {
896                 mdt_object_put(info->mti_env, ms);
897                 CERROR("Target directory "DFID" is on another MDT\n",
898                         PFID(rr->rr_fid1));
899                 GOTO(out_unlock_parent, rc = -EXDEV);
900         }
901
902         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
903                             MDT_CROSS_LOCK);
904         if (rc != 0) {
905                 mdt_object_put(info->mti_env, ms);
906                 GOTO(out_unlock_parent, rc);
907         }
908
909         /* step 3: link it */
910         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
911                        OBD_FAIL_MDS_REINT_LINK_WRITE);
912
913         info->mti_mos = ms;
914         rc = mdt_version_get_check_save(info, ms, 1);
915         if (rc)
916                 GOTO(out_unlock_child, rc);
917
918         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
919         /** check target version by name during replay */
920         rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
921         if (rc != 0 && rc != -ENOENT)
922                 GOTO(out_unlock_child, rc);
923         /* save version of file name for replay, it must be ENOENT here */
924         if (!req_is_replay(mdt_info_req(info))) {
925                 info->mti_ver[2] = ENOENT_VERSION;
926                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
927         }
928
929         rc = mdo_link(info->mti_env, mdt_object_child(mp),
930                       mdt_object_child(ms), lname, ma);
931
932         if (rc == 0)
933                 mdt_counter_incr(req, LPROC_MDT_LINK);
934
935         EXIT;
936 out_unlock_child:
937         mdt_object_unlock_put(info, ms, lhs, rc);
938 out_unlock_parent:
939         mdt_object_unlock_put(info, mp, lhp, rc);
940         return rc;
941 }
942 /**
943  * lock the part of the directory according to the hash of the name
944  * (lh->mlh_pdo_hash) in parallel directory lock.
945  */
946 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
947                        struct mdt_lock_handle *lh,
948                        struct mdt_object *obj, __u64 ibits)
949 {
950         struct ldlm_res_id *res_id = &info->mti_res_id;
951         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
952         ldlm_policy_data_t *policy = &info->mti_policy;
953         int rc;
954
955         /*
956          * Finish res_id initializing by name hash marking part of
957          * directory which is taking modification.
958          */
959         LASSERT(lh->mlh_pdo_hash != 0);
960         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
961         memset(policy, 0, sizeof(*policy));
962         policy->l_inodebits.bits = ibits;
963         /*
964          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
965          * going to be sent to client. If it is - mdt_intent_policy() path will
966          * fix it up and turn FL_LOCAL flag off.
967          */
968         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
969                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
970                           &info->mti_exp->exp_handle.h_cookie);
971         return rc;
972 }
973
974 static int mdt_rename_lock(struct mdt_thread_info *info,
975                            struct lustre_handle *lh)
976 {
977         struct ldlm_namespace   *ns = info->mti_mdt->mdt_namespace;
978         ldlm_policy_data_t      *policy = &info->mti_policy;
979         struct ldlm_res_id      *res_id = &info->mti_res_id;
980         __u64                   flags = 0;
981         int rc;
982         ENTRY;
983
984         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
985
986         memset(policy, 0, sizeof *policy);
987         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
988 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 4, 53, 0)
989         /* In phase I, we will not do cross-rename, so local BFL lock would
990          * be enough
991          */
992         flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
993         /*
994          * Current node is controller, that is mdt0, where we should
995          * take BFL lock.
996          */
997         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
998                                     LCK_EX, &flags, ldlm_blocking_ast,
999                                     ldlm_completion_ast, NULL, NULL, 0,
1000                                     LVB_T_NONE,
1001                                     &info->mti_exp->exp_handle.h_cookie,
1002                                     lh);
1003 #else
1004 #warning "Local rename lock is invalid for DNE phase II."
1005 #endif
1006         RETURN(rc);
1007 }
1008
1009 static void mdt_rename_unlock(struct lustre_handle *lh)
1010 {
1011         ENTRY;
1012         LASSERT(lustre_handle_is_used(lh));
1013         ldlm_lock_decref(lh, LCK_EX);
1014         EXIT;
1015 }
1016
1017 /*
1018  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1019  * target. Source should not be ancestor of target dir. May be other rename
1020  * checks can be moved here later.
1021  */
1022 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
1023 {
1024         struct mdt_reint_record *rr = &info->mti_rr;
1025         struct lu_fid dst_fid = *rr->rr_fid2;
1026         struct mdt_object *dst;
1027         int rc = 0;
1028         ENTRY;
1029
1030         do {
1031                 LASSERT(fid_is_sane(&dst_fid));
1032                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
1033                 if (!IS_ERR(dst)) {
1034                         rc = mdo_is_subdir(info->mti_env,
1035                                            mdt_object_child(dst), fid,
1036                                            &dst_fid);
1037                         mdt_object_put(info->mti_env, dst);
1038                         if (rc != -EREMOTE && rc < 0) {
1039                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
1040                         } else {
1041                                 /* check the found fid */
1042                                 if (lu_fid_eq(&dst_fid, fid))
1043                                         rc = -EINVAL;
1044                         }
1045                 } else {
1046                         rc = PTR_ERR(dst);
1047                 }
1048         } while (rc == -EREMOTE);
1049
1050         RETURN(rc);
1051 }
1052
1053 /*
1054  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1055  * 2 - src child; 3 - tgt child.
1056  * Update on disk version of src child.
1057  */
1058 static int mdt_reint_rename(struct mdt_thread_info *info,
1059                             struct mdt_lock_handle *lhc)
1060 {
1061         struct mdt_reint_record *rr = &info->mti_rr;
1062         struct md_attr          *ma = &info->mti_attr;
1063         struct ptlrpc_request   *req = mdt_info_req(info);
1064         struct mdt_object       *msrcdir;
1065         struct mdt_object       *mtgtdir;
1066         struct mdt_object       *mold;
1067         struct mdt_object       *mnew = NULL;
1068         struct mdt_lock_handle  *lh_srcdirp;
1069         struct mdt_lock_handle  *lh_tgtdirp;
1070         struct mdt_lock_handle  *lh_oldp;
1071         struct mdt_lock_handle  *lh_newp;
1072         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1073         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1074         struct lustre_handle     rename_lh = { 0 };
1075         struct lu_name           slname = { 0 };
1076         struct lu_name          *lname;
1077         int                      rc;
1078         ENTRY;
1079
1080         if (info->mti_dlm_req)
1081                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
1082
1083         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
1084                   PFID(rr->rr_fid1), rr->rr_name,
1085                   PFID(rr->rr_fid2), rr->rr_tgt);
1086
1087         rc = mdt_rename_lock(info, &rename_lh);
1088         if (rc) {
1089                 CERROR("Can't lock FS for rename, rc %d\n", rc);
1090                 RETURN(rc);
1091         }
1092
1093         lh_newp = &info->mti_lh[MDT_LH_NEW];
1094
1095         /* step 1: lock the source dir. */
1096         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1097         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
1098                           rr->rr_namelen);
1099         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
1100                                        MDS_INODELOCK_UPDATE);
1101         if (IS_ERR(msrcdir))
1102                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
1103
1104         if (mdt_object_obf(msrcdir))
1105                 GOTO(out_unlock_source, rc = -EPERM);
1106
1107         rc = mdt_version_get_check_save(info, msrcdir, 0);
1108         if (rc)
1109                 GOTO(out_unlock_source, rc);
1110
1111         /* step 2: find & lock the target dir. */
1112         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1113         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
1114                           rr->rr_tgtlen);
1115         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1116                 mdt_object_get(info->mti_env, msrcdir);
1117                 mtgtdir = msrcdir;
1118                 if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
1119                          rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
1120                                                  MDS_INODELOCK_UPDATE);
1121                          if (rc)
1122                                  GOTO(out_unlock_source, rc);
1123                          OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1124                 }
1125         } else {
1126                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
1127                                           rr->rr_fid2);
1128                 if (IS_ERR(mtgtdir))
1129                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
1130
1131                 if (mdt_object_obf(mtgtdir))
1132                         GOTO(out_put_target, rc = -EPERM);
1133
1134                 /* check early, the real version will be saved after locking */
1135                 rc = mdt_version_get_check(info, mtgtdir, 1);
1136                 if (rc)
1137                         GOTO(out_put_target, rc);
1138
1139                 rc = mdt_object_exists(mtgtdir);
1140                 if (rc == 0) {
1141                         GOTO(out_put_target, rc = -ESTALE);
1142                 } else if (rc > 0) {
1143                         /* we lock the target dir if it is local */
1144                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
1145                                              MDS_INODELOCK_UPDATE,
1146                                              MDT_LOCAL_LOCK);
1147                         if (rc != 0)
1148                                 GOTO(out_put_target, rc);
1149                         /* get and save correct version after locking */
1150                         mdt_version_get_save(info, mtgtdir, 1);
1151                 } else if (rc < 0) {
1152                         CERROR("Source dir "DFID" target dir "DFID
1153                                "on different MDTs\n", PFID(rr->rr_fid1),
1154                                PFID(rr->rr_fid2));
1155                         GOTO(out_put_target, rc = -EXDEV);
1156                 }
1157         }
1158
1159         /* step 3: find & lock the old object. */
1160         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
1161         mdt_name_copy(&slname, lname);
1162         fid_zero(old_fid);
1163         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
1164         if (rc != 0)
1165                 GOTO(out_unlock_target, rc);
1166
1167         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1168                 GOTO(out_unlock_target, rc = -EINVAL);
1169
1170         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1171         if (IS_ERR(mold))
1172                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
1173         if (mdt_object_exists(mold) < 0) {
1174                 mdt_object_put(info->mti_env, mold);
1175                 CERROR("Source child "DFID" is on another MDT\n", PFID(old_fid));
1176                 GOTO(out_unlock_target, rc = -EXDEV);
1177         }
1178
1179         if (mdt_object_obf(mold)) {
1180                 mdt_object_put(info->mti_env, mold);
1181                 GOTO(out_unlock_target, rc = -EPERM);
1182         }
1183
1184         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1185         mdt_lock_reg_init(lh_oldp, LCK_EX);
1186         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1187                              MDT_CROSS_LOCK);
1188         if (rc != 0) {
1189                 mdt_object_put(info->mti_env, mold);
1190                 GOTO(out_unlock_target, rc);
1191         }
1192
1193         info->mti_mos = mold;
1194         /* save version after locking */
1195         mdt_version_get_save(info, mold, 2);
1196         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1197
1198         /* step 4: find & lock the new object. */
1199         /* new target object may not exist now */
1200         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1201         /* lookup with version checking */
1202         fid_zero(new_fid);
1203         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
1204         if (rc == 0) {
1205                 /* the new_fid should have been filled at this moment */
1206                 if (lu_fid_eq(old_fid, new_fid))
1207                        GOTO(out_unlock_old, rc);
1208
1209                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1210                     lu_fid_eq(new_fid, rr->rr_fid2))
1211                         GOTO(out_unlock_old, rc = -EINVAL);
1212
1213                 mdt_lock_reg_init(lh_newp, LCK_EX);
1214                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1215                 if (IS_ERR(mnew))
1216                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1217
1218                 if (mdt_object_obf(mnew)) {
1219                         mdt_object_put(info->mti_env, mnew);
1220                         GOTO(out_unlock_old, rc = -EPERM);
1221                 }
1222
1223                 if (mdt_object_exists(mnew) < 0) {
1224                         mdt_object_put(info->mti_env, mnew);
1225                         CERROR("Source child "DFID" is on another MDT\n",
1226                                PFID(new_fid));
1227                         GOTO(out_unlock_old, rc = -EXDEV);
1228                 }
1229
1230                 rc = mdt_object_lock(info, mnew, lh_newp,
1231                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1232                 if (rc != 0) {
1233                         mdt_object_put(info->mti_env, mnew);
1234                         GOTO(out_unlock_old, rc);
1235                 }
1236                 /* get and save version after locking */
1237                 mdt_version_get_save(info, mnew, 3);
1238                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1239         } else if (rc != -EREMOTE && rc != -ENOENT) {
1240                 GOTO(out_unlock_old, rc);
1241         } else {
1242                 mdt_enoent_version_save(info, 3);
1243         }
1244
1245         /* step 5: rename it */
1246         mdt_reint_init_ma(info, ma);
1247
1248         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1249                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1250
1251
1252         /* Check if @dst is subdir of @src. */
1253         rc = mdt_rename_sanity(info, old_fid);
1254         if (rc)
1255                 GOTO(out_unlock_new, rc);
1256
1257         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1258                         mdt_object_child(mtgtdir), old_fid, &slname,
1259                         (mnew ? mdt_object_child(mnew) : NULL),
1260                         lname, ma);
1261
1262         /* handle last link of tgt object */
1263         if (rc == 0) {
1264                 mdt_counter_incr(req, LPROC_MDT_RENAME);
1265                 if (mnew)
1266                         mdt_handle_last_unlink(info, mnew, ma);
1267
1268                 mdt_rename_counter_tally(info, info->mti_mdt, req,
1269                                          msrcdir, mtgtdir);
1270         }
1271
1272         EXIT;
1273 out_unlock_new:
1274         if (mnew)
1275                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1276 out_unlock_old:
1277         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1278 out_unlock_target:
1279         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
1280 out_put_target:
1281         mdt_object_put(info->mti_env, mtgtdir);
1282 out_unlock_source:
1283         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1284 out_rename_lock:
1285         if (lustre_handle_is_used(&rename_lh))
1286                 mdt_rename_unlock(&rename_lh);
1287         return rc;
1288 }
1289
1290 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1291                            struct mdt_lock_handle *lhc);
1292
1293 static mdt_reinter reinters[REINT_MAX] = {
1294         [REINT_SETATTR]  = mdt_reint_setattr,
1295         [REINT_CREATE]   = mdt_reint_create,
1296         [REINT_LINK]     = mdt_reint_link,
1297         [REINT_UNLINK]   = mdt_reint_unlink,
1298         [REINT_RENAME]   = mdt_reint_rename,
1299         [REINT_OPEN]     = mdt_reint_open,
1300         [REINT_SETXATTR] = mdt_reint_setxattr,
1301         [REINT_RMENTRY]  = mdt_reint_unlink
1302 };
1303
1304 int mdt_reint_rec(struct mdt_thread_info *info,
1305                   struct mdt_lock_handle *lhc)
1306 {
1307         int rc;
1308         ENTRY;
1309
1310         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1311
1312         RETURN(rc);
1313 }