Whamcloud - gitweb
LU-389 update wirecheck for master
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdt/mdt_reint.c
40  *
41  * Lustre Metadata Target (mdt) reintegration routines
42  *
43  * Author: Peter Braam <braam@clusterfs.com>
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Phil Schwan <phil@clusterfs.com>
46  * Author: Huang Hua <huanghua@clusterfs.com>
47  * Author: Yury Umanets <umka@clusterfs.com>
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include "mdt_internal.h"
56 #include "lu_time.h"
57
58 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
59                                      struct md_attr *ma)
60 {
61         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
62         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
63                                                &RMF_MDT_MD, RCL_SERVER);
64
65         ma->ma_cookie = req_capsule_server_get(info->mti_pill,
66                                                &RMF_LOGCOOKIES);
67         ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
68                                                   &RMF_LOGCOOKIES,
69                                                   RCL_SERVER);
70
71         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
72         ma->ma_valid = 0;
73 }
74
75 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
76                                 struct mdt_object *object,
77                                 struct mdt_body *repbody)
78 {
79         ENTRY;
80
81         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
82          * we won't go to below*/
83         if (repbody->valid & OBD_MD_FLMDSCAPA)
84                 RETURN(rc);
85
86         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
87             info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
88                 struct lustre_capa *capa;
89
90                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
91                 LASSERT(capa);
92                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
93                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
94                                  0);
95                 if (rc == 0)
96                         repbody->valid |= OBD_MD_FLMDSCAPA;
97         }
98
99         RETURN(rc);
100 }
101
102 /**
103  * Get version of object by fid.
104  *
105  * Return real version or ENOENT_VERSION if object doesn't exist
106  */
107 static void mdt_obj_version_get(struct mdt_thread_info *info,
108                                 struct mdt_object *o, __u64 *version)
109 {
110         LASSERT(o);
111         LASSERT(mdt_object_exists(o) >= 0);
112         if (mdt_object_exists(o) > 0)
113                 *version = mo_version_get(info->mti_env, mdt_object_child(o));
114         else
115                 *version = ENOENT_VERSION;
116         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
117                PFID(mdt_object_fid(o)), *version);
118 }
119
120 /**
121  * Check version is correct.
122  *
123  * Should be called only during replay.
124  */
125 static int mdt_version_check(struct ptlrpc_request *req,
126                              __u64 version, int idx)
127 {
128         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
129         ENTRY;
130
131         if (!exp_connect_vbr(req->rq_export))
132                 RETURN(0);
133
134         LASSERT(req_is_replay(req));
135         /** VBR: version is checked always because costs nothing */
136         LASSERT(idx < PTLRPC_NUM_VERSIONS);
137         /** Sanity check for malformed buffers */
138         if (pre_ver == NULL) {
139                 CERROR("No versions in request buffer\n");
140                 cfs_spin_lock(&req->rq_export->exp_lock);
141                 req->rq_export->exp_vbr_failed = 1;
142                 cfs_spin_unlock(&req->rq_export->exp_lock);
143                 RETURN(-EOVERFLOW);
144         } else if (pre_ver[idx] != version) {
145                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
146                        pre_ver[idx], version);
147                 cfs_spin_lock(&req->rq_export->exp_lock);
148                 req->rq_export->exp_vbr_failed = 1;
149                 cfs_spin_unlock(&req->rq_export->exp_lock);
150                 RETURN(-EOVERFLOW);
151         }
152         RETURN(0);
153 }
154
155 /**
156  * Save pre-versions in reply.
157  */
158 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
159                              int idx)
160 {
161         __u64 *reply_ver;
162
163         if (!exp_connect_vbr(req->rq_export))
164                 return;
165
166         LASSERT(!req_is_replay(req));
167         LASSERT(req->rq_repmsg != NULL);
168         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
169         if (reply_ver)
170                 reply_ver[idx] = version;
171 }
172
173 /**
174  * Save enoent version, it is needed when it is obvious that object doesn't
175  * exist, e.g. child during create.
176  */
177 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
178 {
179         /* save version of file name for replay, it must be ENOENT here */
180         if (!req_is_replay(mdt_info_req(info))) {
181                 info->mti_ver[idx] = ENOENT_VERSION;
182                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
183         }
184 }
185
186 /**
187  * Get version from disk and save in reply buffer.
188  *
189  * Versions are saved in reply only during normal operations not replays.
190  */
191 void mdt_version_get_save(struct mdt_thread_info *info,
192                           struct mdt_object *mto, int idx)
193 {
194         /* don't save versions during replay */
195         if (!req_is_replay(mdt_info_req(info))) {
196                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
197                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
198         }
199 }
200
201 /**
202  * Get version from disk and check it, no save in reply.
203  */
204 int mdt_version_get_check(struct mdt_thread_info *info,
205                           struct mdt_object *mto, int idx)
206 {
207         /* only check versions during replay */
208         if (!req_is_replay(mdt_info_req(info)))
209                 return 0;
210
211         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
212         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
213 }
214
215 /**
216  * Get version from disk and check if recovery or just save.
217  */
218 int mdt_version_get_check_save(struct mdt_thread_info *info,
219                                struct mdt_object *mto, int idx)
220 {
221         int rc = 0;
222
223         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
224         if (req_is_replay(mdt_info_req(info)))
225                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
226                                        idx);
227         else
228                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
229         return rc;
230 }
231
232 /**
233  * Lookup with version checking.
234  *
235  * This checks version of 'name'. Many reint functions uses 'name' for child not
236  * FID, therefore we need to get object by name and check its version.
237  */
238 int mdt_lookup_version_check(struct mdt_thread_info *info,
239                              struct mdt_object *p, struct lu_name *lname,
240                              struct lu_fid *fid, int idx)
241 {
242         int rc, vbrc;
243
244         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
245                         &info->mti_spec);
246         /* Check version only during replay */
247         if (!req_is_replay(mdt_info_req(info)))
248                 return rc;
249
250         info->mti_ver[idx] = ENOENT_VERSION;
251         if (rc == 0) {
252                 struct mdt_object *child;
253                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
254                 if (likely(!IS_ERR(child))) {
255                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
256                         mdt_object_put(info->mti_env, child);
257                 }
258         }
259         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
260         return vbrc ? vbrc : rc;
261
262 }
263
264 /*
265  * VBR: we save three versions in reply:
266  * 0 - parent. Check that parent version is the same during replay.
267  * 1 - name. Version of 'name' if file exists with the same name or
268  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
269  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
270  * check.
271  */
272 static int mdt_md_create(struct mdt_thread_info *info)
273 {
274         struct mdt_device       *mdt = info->mti_mdt;
275         struct mdt_object       *parent;
276         struct mdt_object       *child;
277         struct mdt_lock_handle  *lh;
278         struct mdt_body         *repbody;
279         struct md_attr          *ma = &info->mti_attr;
280         struct mdt_reint_record *rr = &info->mti_rr;
281         struct lu_name          *lname;
282         int rc;
283         ENTRY;
284
285         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
286                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
287
288         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
289
290         lh = &info->mti_lh[MDT_LH_PARENT];
291         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
292
293         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
294                                       MDS_INODELOCK_UPDATE);
295         if (IS_ERR(parent))
296                 RETURN(PTR_ERR(parent));
297
298         rc = mdt_version_get_check_save(info, parent, 0);
299         if (rc)
300                 GOTO(out_put_parent, rc);
301
302         /*
303          * Check child name version during replay.
304          * During create replay a file may exist with same name.
305          */
306         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
307         rc = mdt_lookup_version_check(info, parent, lname,
308                                       &info->mti_tmp_fid1, 1);
309         /* -ENOENT is expected here */
310         if (rc != 0 && rc != -ENOENT)
311                 GOTO(out_put_parent, rc);
312
313         /* save version of file name for replay, it must be ENOENT here */
314         mdt_enoent_version_save(info, 1);
315
316         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
317         if (likely(!IS_ERR(child))) {
318                 struct md_object *next = mdt_object_child(parent);
319
320                 ma->ma_need = MA_INODE;
321                 ma->ma_valid = 0;
322                 /* capa for cross-ref will be stored here */
323                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
324                                                      &RMF_CAPA1);
325                 LASSERT(ma->ma_capa);
326
327                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
328                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
329
330                 /* Version of child will be updated on disk. */
331                 info->mti_mos = child;
332                 rc = mdt_version_get_check_save(info, child, 2);
333                 if (rc)
334                         GOTO(out_put_child, rc);
335
336                 /* Let lower layer know current lock mode. */
337                 info->mti_spec.sp_cr_mode =
338                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
339
340                 /*
341                  * Do perform lookup sanity check. We do not know if name exists
342                  * or not.
343                  */
344                 info->mti_spec.sp_cr_lookup = 1;
345                 info->mti_spec.sp_feat = &dt_directory_features;
346
347                 rc = mdo_create(info->mti_env, next, lname,
348                                 mdt_object_child(child),
349                                 &info->mti_spec, ma);
350                 if (rc == 0) {
351                         /* Return fid & attr to client. */
352                         if (ma->ma_valid & MA_INODE)
353                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
354                                                    mdt_object_fid(child));
355                 }
356 out_put_child:
357                 mdt_object_put(info->mti_env, child);
358         } else {
359                 rc = PTR_ERR(child);
360         }
361         mdt_create_pack_capa(info, rc, child, repbody);
362 out_put_parent:
363         mdt_object_unlock_put(info, parent, lh, rc);
364         RETURN(rc);
365 }
366
367 /* Partial request to create object only */
368 static int mdt_md_mkobj(struct mdt_thread_info *info)
369 {
370         struct mdt_device      *mdt = info->mti_mdt;
371         struct mdt_object      *o;
372         struct mdt_body        *repbody;
373         struct md_attr         *ma = &info->mti_attr;
374         int rc;
375         ENTRY;
376
377         DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
378                   PFID(info->mti_rr.rr_fid2));
379
380         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
381
382         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
383         if (!IS_ERR(o)) {
384                 struct md_object *next = mdt_object_child(o);
385
386                 ma->ma_need = MA_INODE;
387                 ma->ma_valid = 0;
388
389                 /*
390                  * Cross-ref create can encounter already created obj in case of
391                  * recovery, just get attr in that case.
392                  */
393                 if (mdt_object_exists(o) == 1) {
394                         rc = mo_attr_get(info->mti_env, next, ma);
395                 } else {
396                         /*
397                          * Here, NO permission check for object_create,
398                          * such check has been done on the original MDS.
399                          */
400                         rc = mo_object_create(info->mti_env, next,
401                                               &info->mti_spec, ma);
402                 }
403                 if (rc == 0) {
404                         /* Return fid & attr to client. */
405                         if (ma->ma_valid & MA_INODE)
406                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
407                                                    mdt_object_fid(o));
408                 }
409                 mdt_object_put(info->mti_env, o);
410         } else
411                 rc = PTR_ERR(o);
412
413         mdt_create_pack_capa(info, rc, o, repbody);
414         RETURN(rc);
415 }
416
417 /* In the raw-setattr case, we lock the child inode.
418  * In the write-back case or if being called from open,
419  *               the client holds a lock already.
420  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
421  * mdt_setattr_unpack()) flag to tell these cases apart. */
422 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
423                  struct md_attr *ma, int flags)
424 {
425         struct mdt_lock_handle  *lh;
426         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
427         int rc;
428         ENTRY;
429
430         /* attr shouldn't be set on remote object */
431         LASSERT(mdt_object_exists(mo) >= 0);
432
433         lh = &info->mti_lh[MDT_LH_PARENT];
434         mdt_lock_reg_init(lh, LCK_PW);
435
436         if (!(flags & MRF_SETATTR_LOCKED)) {
437                 __u64 lockpart = MDS_INODELOCK_UPDATE;
438                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
439                         lockpart |= MDS_INODELOCK_LOOKUP;
440
441                 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
442                 if (rc != 0)
443                         RETURN(rc);
444         }
445
446         if (mdt_object_exists(mo) == 0)
447                 GOTO(out_unlock, rc = -ENOENT);
448
449         /* all attrs are packed into mti_attr in unpack_setattr */
450         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
451                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
452
453         /* This is only for set ctime when rename's source is on remote MDS. */
454         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
455                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
456
457         /* VBR: update version if attr changed are important for recovery */
458         if (do_vbr) {
459                 /* update on-disk version of changed object */
460                 info->mti_mos = mo;
461                 rc = mdt_version_get_check_save(info, mo, 0);
462                 if (rc)
463                         GOTO(out_unlock, rc);
464         }
465
466         /* all attrs are packed into mti_attr in unpack_setattr */
467         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
468         if (rc != 0)
469                 GOTO(out_unlock, rc);
470
471         EXIT;
472 out_unlock:
473         mdt_object_unlock(info, mo, lh, rc);
474         return rc;
475 }
476
477 static int mdt_reint_setattr(struct mdt_thread_info *info,
478                              struct mdt_lock_handle *lhc)
479 {
480         struct md_attr          *ma = &info->mti_attr;
481         struct mdt_reint_record *rr = &info->mti_rr;
482         struct ptlrpc_request   *req = mdt_info_req(info);
483         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
484         struct mdt_file_data    *mfd;
485         struct mdt_object       *mo;
486         struct md_object        *next;
487         struct mdt_body         *repbody;
488         int                      som_au, rc;
489         ENTRY;
490
491         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
492                   (unsigned int)ma->ma_attr.la_valid);
493
494         if (info->mti_dlm_req)
495                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
496
497         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
498         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
499         if (IS_ERR(mo))
500                 GOTO(out, rc = PTR_ERR(mo));
501
502         /* start a log jounal handle if needed */
503         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
504                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
505                     (rr->rr_flags & MRF_SETATTR_LOCKED)) {
506                         /* Check write access for the O_TRUNC case */
507                         if (mdt_write_read(mo) < 0)
508                                 GOTO(out_put, rc = -ETXTBSY);
509                 }
510         } else if (info->mti_ioepoch &&
511                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
512                 /* Truncate case. IOEpoch is opened. */
513                 rc = mdt_write_get(mo);
514                 if (rc)
515                         GOTO(out_put, rc);
516
517                 mfd = mdt_mfd_new();
518                 if (mfd == NULL) {
519                         mdt_write_put(mo);
520                         GOTO(out_put, rc = -ENOMEM);
521                 }
522
523                 mdt_ioepoch_open(info, mo, 0);
524                 repbody->ioepoch = mo->mot_ioepoch;
525
526                 mdt_object_get(info->mti_env, mo);
527                 mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
528                 mfd->mfd_object = mo;
529                 mfd->mfd_xid = req->rq_xid;
530
531                 cfs_spin_lock(&med->med_open_lock);
532                 cfs_list_add(&mfd->mfd_list, &med->med_open_head);
533                 cfs_spin_unlock(&med->med_open_lock);
534                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
535         }
536
537         som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
538         if (som_au) {
539                 /* SOM Attribute update case. Find the proper mfd and update
540                  * SOM attributes on the proper object. */
541                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
542                 LASSERT(info->mti_ioepoch);
543
544                 cfs_spin_lock(&med->med_open_lock);
545                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
546                 if (mfd == NULL) {
547                         cfs_spin_unlock(&med->med_open_lock);
548                         CDEBUG(D_INODE, "no handle for file close: "
549                                "fid = "DFID": cookie = "LPX64"\n",
550                                PFID(info->mti_rr.rr_fid1),
551                                info->mti_ioepoch->handle.cookie);
552                         GOTO(out_put, rc = -ESTALE);
553                 }
554                 LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
555                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
556
557                 class_handle_unhash(&mfd->mfd_handle);
558                 cfs_list_del_init(&mfd->mfd_list);
559                 cfs_spin_unlock(&med->med_open_lock);
560
561                 /* Close the found mfd, update attributes. */
562                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
563                 OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
564                 if (ma->ma_lmm == NULL)
565                         GOTO(out_put, rc = -ENOMEM);
566
567                 mdt_mfd_close(info, mfd);
568
569                 OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
570         } else {
571                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
572                 if (rc)
573                         GOTO(out_put, rc);
574         }
575
576         ma->ma_need = MA_INODE;
577         ma->ma_valid = 0;
578         next = mdt_object_child(mo);
579         rc = mo_attr_get(info->mti_env, next, ma);
580         if (rc != 0)
581                 GOTO(out_put, rc);
582
583         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
584
585         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
586             info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
587             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
588             (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
589                 struct lustre_capa *capa;
590
591                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
592                 LASSERT(capa);
593                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
594                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
595                 if (rc)
596                         GOTO(out_put, rc);
597                 repbody->valid |= OBD_MD_FLOSSCAPA;
598         }
599
600         EXIT;
601 out_put:
602         mdt_object_put(info->mti_env, mo);
603 out:
604         if (rc == 0)
605                 mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR);
606
607         mdt_shrink_reply(info);
608         return rc;
609 }
610
611 static int mdt_reint_create(struct mdt_thread_info *info,
612                             struct mdt_lock_handle *lhc)
613 {
614         struct ptlrpc_request   *req = mdt_info_req(info);
615         int                     rc;
616         ENTRY;
617
618         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
619                 RETURN(err_serious(-ESTALE));
620
621         if (info->mti_dlm_req)
622                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
623
624         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
625         case S_IFDIR:{
626                 /* Cross-ref case. */
627                 /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
628                 if (info->mti_cross_ref) {
629                         rc = mdt_md_mkobj(info);
630                 } else {
631                         LASSERT(info->mti_rr.rr_namelen > 0);
632                         mdt_counter_incr(req->rq_export, LPROC_MDT_MKDIR);
633                         rc = mdt_md_create(info);
634                 }
635                 break;
636         }
637         case S_IFREG:
638         case S_IFLNK:
639         case S_IFCHR:
640         case S_IFBLK:
641         case S_IFIFO:
642         case S_IFSOCK:{
643                 /* Special file should stay on the same node as parent. */
644                 LASSERT(info->mti_rr.rr_namelen > 0);
645                 mdt_counter_incr(req->rq_export, LPROC_MDT_MKNOD);
646                 rc = mdt_md_create(info);
647                 break;
648         }
649         default:
650                 rc = err_serious(-EOPNOTSUPP);
651         }
652         RETURN(rc);
653 }
654
655 /*
656  * VBR: save parent version in reply and child version getting by its name.
657  * Version of child is getting and checking during its lookup. If
658  */
659 static int mdt_reint_unlink(struct mdt_thread_info *info,
660                             struct mdt_lock_handle *lhc)
661 {
662         struct mdt_reint_record *rr = &info->mti_rr;
663         struct ptlrpc_request   *req = mdt_info_req(info);
664         struct md_attr          *ma = &info->mti_attr;
665         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
666         struct mdt_object       *mp;
667         struct mdt_object       *mc;
668         struct mdt_lock_handle  *parent_lh;
669         struct mdt_lock_handle  *child_lh;
670         struct lu_name          *lname;
671         int                      rc;
672         ENTRY;
673
674         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
675                   rr->rr_name);
676
677         if (info->mti_dlm_req)
678                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
679
680         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
681                 RETURN(err_serious(-ENOENT));
682
683         /*
684          * step 1: lock the parent. Note, this may be child in case of
685          * remote operation denoted by ->mti_cross_ref flag.
686          */
687         parent_lh = &info->mti_lh[MDT_LH_PARENT];
688         if (info->mti_cross_ref) {
689                 /*
690                  * Init reg lock for cross ref case when we need to do only
691                  * ref del locally.
692                  */
693                 mdt_lock_reg_init(parent_lh, LCK_PW);
694         } else {
695                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
696                                   rr->rr_namelen);
697         }
698         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
699                                   MDS_INODELOCK_UPDATE);
700         if (IS_ERR(mp)) {
701                 rc = PTR_ERR(mp);
702                 /* errors are possible here in cross-ref cases, see below */
703                 if (info->mti_cross_ref)
704                         rc = 0;
705                 GOTO(out, rc);
706         }
707
708         rc = mdt_version_get_check_save(info, mp, 0);
709         if (rc)
710                 GOTO(out_unlock_parent, rc);
711
712         mdt_reint_init_ma(info, ma);
713         if (!ma->ma_lmm || !ma->ma_cookie)
714                 GOTO(out_unlock_parent, rc = -EINVAL);
715
716         if (info->mti_cross_ref) {
717                 /*
718                  * Remote partial operation. It is possible that replay may
719                  * happen on parent MDT and this operation will be repeated.
720                  * Therefore the object absense is allowed case and nothing
721                  * should be done here.
722                  */
723                 if (mdt_object_exists(mp) > 0) {
724                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
725                         rc = mo_ref_del(info->mti_env,
726                                         mdt_object_child(mp), ma);
727                         if (rc == 0)
728                                 mdt_handle_last_unlink(info, mp, ma);
729                 } else
730                         rc = 0;
731                 GOTO(out_unlock_parent, rc);
732         }
733
734         /* step 2: find & lock the child */
735         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
736         /* lookup child object along with version checking */
737         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
738         if (rc != 0)
739                  GOTO(out_unlock_parent, rc);
740
741         /* We will lock the child regardless it is local or remote. No harm. */
742         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
743         if (IS_ERR(mc))
744                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
745         child_lh = &info->mti_lh[MDT_LH_CHILD];
746         mdt_lock_reg_init(child_lh, LCK_EX);
747         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
748                              MDT_CROSS_LOCK);
749         if (rc != 0) {
750                 mdt_object_put(info->mti_env, mc);
751                 GOTO(out_unlock_parent, rc);
752         }
753
754         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
755                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
756         /* save version when object is locked */
757         mdt_version_get_save(info, mc, 1);
758         /*
759          * Now we can only make sure we need MA_INODE, in mdd layer, will check
760          * whether need MA_LOV and MA_COOKIE.
761          */
762         ma->ma_need = MA_INODE;
763         ma->ma_valid = 0;
764         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
765         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
766                         mdt_object_child(mc), lname, ma);
767         if (rc == 0)
768                 mdt_handle_last_unlink(info, mc, ma);
769
770         if (ma->ma_valid & MA_INODE) {
771                 switch (ma->ma_attr.la_mode & S_IFMT) {
772                 case S_IFDIR:
773                         mdt_counter_incr(req->rq_export, LPROC_MDT_RMDIR);
774                         break;
775                 case S_IFREG:
776                 case S_IFLNK:
777                 case S_IFCHR:
778                 case S_IFBLK:
779                 case S_IFIFO:
780                 case S_IFSOCK:
781                         mdt_counter_incr(req->rq_export, LPROC_MDT_UNLINK);
782                         break;
783                 default:
784                         LASSERTF(0, "bad file type %o unlinking\n",
785                                  ma->ma_attr.la_mode);
786                 }
787         }
788
789         EXIT;
790
791         mdt_object_unlock_put(info, mc, child_lh, rc);
792 out_unlock_parent:
793         mdt_object_unlock_put(info, mp, parent_lh, rc);
794 out:
795         return rc;
796 }
797
798 /*
799  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
800  * name.
801  */
802 static int mdt_reint_link(struct mdt_thread_info *info,
803                           struct mdt_lock_handle *lhc)
804 {
805         struct mdt_reint_record *rr = &info->mti_rr;
806         struct ptlrpc_request   *req = mdt_info_req(info);
807         struct md_attr          *ma = &info->mti_attr;
808         struct mdt_object       *ms;
809         struct mdt_object       *mp;
810         struct mdt_lock_handle  *lhs;
811         struct mdt_lock_handle  *lhp;
812         struct lu_name          *lname;
813         int rc;
814         ENTRY;
815
816         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
817                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
818
819         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
820                 RETURN(err_serious(-ENOENT));
821
822         if (info->mti_dlm_req)
823                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
824
825         if (info->mti_cross_ref) {
826                 /* MDT holding name ask us to add ref. */
827                 lhs = &info->mti_lh[MDT_LH_CHILD];
828                 mdt_lock_reg_init(lhs, LCK_EX);
829                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
830                                           MDS_INODELOCK_UPDATE);
831                 if (IS_ERR(ms))
832                         RETURN(PTR_ERR(ms));
833
834                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
835                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
836                 mdt_object_unlock_put(info, ms, lhs, rc);
837                 RETURN(rc);
838         }
839
840         /* Invalid case so return error immediately instead of
841          * processing it */
842         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
843                 RETURN(-EPERM);
844
845         /* step 1: find & lock the target parent dir */
846         lhp = &info->mti_lh[MDT_LH_PARENT];
847         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
848                           rr->rr_namelen);
849         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
850                                   MDS_INODELOCK_UPDATE);
851         if (IS_ERR(mp))
852                 RETURN(PTR_ERR(mp));
853
854         rc = mdt_version_get_check_save(info, mp, 0);
855         if (rc)
856                 GOTO(out_unlock_parent, rc);
857
858         /* step 2: find & lock the source */
859         lhs = &info->mti_lh[MDT_LH_CHILD];
860         mdt_lock_reg_init(lhs, LCK_EX);
861
862         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
863         if (IS_ERR(ms))
864                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
865
866         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
867                             MDT_CROSS_LOCK);
868         if (rc != 0) {
869                 mdt_object_put(info->mti_env, ms);
870                 GOTO(out_unlock_parent, rc);
871         }
872
873         /* step 3: link it */
874         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
875                        OBD_FAIL_MDS_REINT_LINK_WRITE);
876
877         info->mti_mos = ms;
878         rc = mdt_version_get_check_save(info, ms, 1);
879         if (rc)
880                 GOTO(out_unlock_child, rc);
881
882         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
883         /** check target version by name during replay */
884         rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
885         if (rc != 0 && rc != -ENOENT)
886                 GOTO(out_unlock_child, rc);
887         /* save version of file name for replay, it must be ENOENT here */
888         if (!req_is_replay(mdt_info_req(info))) {
889                 info->mti_ver[2] = ENOENT_VERSION;
890                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
891         }
892
893         rc = mdo_link(info->mti_env, mdt_object_child(mp),
894                       mdt_object_child(ms), lname, ma);
895
896         if (rc == 0)
897                 mdt_counter_incr(req->rq_export, LPROC_MDT_LINK);
898
899         EXIT;
900 out_unlock_child:
901         mdt_object_unlock_put(info, ms, lhs, rc);
902 out_unlock_parent:
903         mdt_object_unlock_put(info, mp, lhp, rc);
904         return rc;
905 }
906 /**
907  * lock the part of the directory according to the hash of the name
908  * (lh->mlh_pdo_hash) in parallel directory lock.
909  */
910 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
911                        struct mdt_lock_handle *lh,
912                        struct mdt_object *obj, __u64 ibits)
913 {
914         struct ldlm_res_id *res_id = &info->mti_res_id;
915         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
916         ldlm_policy_data_t *policy = &info->mti_policy;
917         int rc;
918
919         /*
920          * Finish res_id initializing by name hash marking part of
921          * directory which is taking modification.
922          */
923         LASSERT(lh->mlh_pdo_hash != 0);
924         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
925         memset(policy, 0, sizeof(*policy));
926         policy->l_inodebits.bits = ibits;
927         /*
928          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
929          * going to be sent to client. If it is - mdt_intent_policy() path will
930          * fix it up and turn FL_LOCAL flag off.
931          */
932         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
933                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
934                           &info->mti_exp->exp_handle.h_cookie);
935         return rc;
936 }
937
938 /* partial operation for rename */
939 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
940 {
941         struct mdt_reint_record *rr = &info->mti_rr;
942         struct ptlrpc_request   *req = mdt_info_req(info);
943         struct md_attr          *ma = &info->mti_attr;
944         struct mdt_object       *mtgtdir;
945         struct mdt_object       *mtgt = NULL;
946         struct mdt_lock_handle  *lh_tgtdir;
947         struct mdt_lock_handle  *lh_tgt = NULL;
948         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
949         struct lu_name          *lname;
950         int                      rc;
951         ENTRY;
952
953         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
954                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
955
956         /* step 1: lookup & lock the tgt dir. */
957         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
958         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
959                           rr->rr_tgtlen);
960         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
961                                        MDS_INODELOCK_UPDATE);
962         if (IS_ERR(mtgtdir))
963                 RETURN(PTR_ERR(mtgtdir));
964
965         /* step 2: find & lock the target object if exists. */
966         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
967         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
968         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
969                         lname, tgt_fid, &info->mti_spec);
970         if (rc != 0 && rc != -ENOENT) {
971                 GOTO(out_unlock_tgtdir, rc);
972         } else if (rc == 0) {
973                 /*
974                  * In case of replay that name can be already inserted, check
975                  * that and do nothing if so.
976                  */
977                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
978                         GOTO(out_unlock_tgtdir, rc);
979
980                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
981                 mdt_lock_reg_init(lh_tgt, LCK_EX);
982
983                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
984                                             MDS_INODELOCK_LOOKUP);
985                 if (IS_ERR(mtgt))
986                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
987
988                 mdt_reint_init_ma(info, ma);
989                 if (!ma->ma_lmm || !ma->ma_cookie)
990                         GOTO(out_unlock_tgt, rc = -EINVAL);
991
992                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
993                                     mdt_object_child(mtgt), rr->rr_fid2,
994                                     lname, ma);
995         } else /* -ENOENT */ {
996                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
997                                      lname, rr->rr_fid2, ma);
998         }
999
1000         /* handle last link of tgt object */
1001         if (rc == 0 && mtgt)
1002                 mdt_handle_last_unlink(info, mtgt, ma);
1003
1004         EXIT;
1005 out_unlock_tgt:
1006         if (mtgt)
1007                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
1008 out_unlock_tgtdir:
1009         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
1010         return rc;
1011 }
1012
1013 static int mdt_rename_lock(struct mdt_thread_info *info,
1014                            struct lustre_handle *lh)
1015 {
1016         struct ldlm_namespace *ns     = info->mti_mdt->mdt_namespace;
1017         ldlm_policy_data_t    *policy = &info->mti_policy;
1018         struct ldlm_res_id    *res_id = &info->mti_res_id;
1019         struct md_site        *ms;
1020         int rc;
1021         ENTRY;
1022
1023         ms = mdt_md_site(info->mti_mdt);
1024         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1025
1026         memset(policy, 0, sizeof *policy);
1027         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1028
1029         if (ms->ms_control_exp == NULL) {
1030                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1031
1032                 /*
1033                  * Current node is controller, that is mdt0, where we should
1034                  * take BFL lock.
1035                  */
1036                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
1037                                             LCK_EX, &flags, ldlm_blocking_ast,
1038                                             ldlm_completion_ast, NULL, NULL, 0,
1039                                             &info->mti_exp->exp_handle.h_cookie,
1040                                             lh);
1041         } else {
1042                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
1043                      ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
1044                 int flags = 0;
1045
1046                 /*
1047                  * This is the case mdt0 is remote node, issue DLM lock like
1048                  * other clients.
1049                  */
1050                 rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
1051                                       policy, &flags, NULL, 0, lh, 0);
1052         }
1053
1054         RETURN(rc);
1055 }
1056
1057 static void mdt_rename_unlock(struct lustre_handle *lh)
1058 {
1059         ENTRY;
1060         LASSERT(lustre_handle_is_used(lh));
1061         ldlm_lock_decref(lh, LCK_EX);
1062         EXIT;
1063 }
1064
1065 /*
1066  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1067  * target. Source should not be ancestor of target dir. May be other rename
1068  * checks can be moved here later.
1069  */
1070 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
1071 {
1072         struct mdt_reint_record *rr = &info->mti_rr;
1073         struct lu_fid dst_fid = *rr->rr_fid2;
1074         struct mdt_object *dst;
1075         int rc = 0;
1076         ENTRY;
1077
1078         do {
1079                 LASSERT(fid_is_sane(&dst_fid));
1080                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
1081                 if (!IS_ERR(dst)) {
1082                         rc = mdo_is_subdir(info->mti_env,
1083                                            mdt_object_child(dst), fid,
1084                                            &dst_fid);
1085                         mdt_object_put(info->mti_env, dst);
1086                         if (rc != -EREMOTE && rc < 0) {
1087                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
1088                         } else {
1089                                 /* check the found fid */
1090                                 if (lu_fid_eq(&dst_fid, fid))
1091                                         rc = -EINVAL;
1092                         }
1093                 } else {
1094                         rc = PTR_ERR(dst);
1095                 }
1096         } while (rc == -EREMOTE);
1097
1098         RETURN(rc);
1099 }
1100
1101 /*
1102  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1103  * 2 - src child; 3 - tgt child.
1104  * Update on disk version of src child.
1105  */
1106 static int mdt_reint_rename(struct mdt_thread_info *info,
1107                             struct mdt_lock_handle *lhc)
1108 {
1109         struct mdt_reint_record *rr = &info->mti_rr;
1110         struct md_attr          *ma = &info->mti_attr;
1111         struct ptlrpc_request   *req = mdt_info_req(info);
1112         struct mdt_object       *msrcdir;
1113         struct mdt_object       *mtgtdir;
1114         struct mdt_object       *mold;
1115         struct mdt_object       *mnew = NULL;
1116         struct mdt_lock_handle  *lh_srcdirp;
1117         struct mdt_lock_handle  *lh_tgtdirp;
1118         struct mdt_lock_handle  *lh_oldp;
1119         struct mdt_lock_handle  *lh_newp;
1120         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1121         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1122         struct lustre_handle     rename_lh = { 0 };
1123         struct lu_name           slname = { 0 };
1124         struct lu_name          *lname;
1125         int                      rc;
1126         ENTRY;
1127
1128         if (info->mti_dlm_req)
1129                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
1130
1131         if (info->mti_cross_ref) {
1132                 rc = mdt_reint_rename_tgt(info);
1133                 RETURN(rc);
1134         }
1135
1136         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
1137                   PFID(rr->rr_fid1), rr->rr_name,
1138                   PFID(rr->rr_fid2), rr->rr_tgt);
1139
1140         rc = mdt_rename_lock(info, &rename_lh);
1141         if (rc) {
1142                 CERROR("Can't lock FS for rename, rc %d\n", rc);
1143                 RETURN(rc);
1144         }
1145
1146         lh_newp = &info->mti_lh[MDT_LH_NEW];
1147
1148         /* step 1: lock the source dir. */
1149         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1150         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
1151                           rr->rr_namelen);
1152         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
1153                                        MDS_INODELOCK_UPDATE);
1154         if (IS_ERR(msrcdir))
1155                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
1156
1157         rc = mdt_version_get_check_save(info, msrcdir, 0);
1158         if (rc)
1159                 GOTO(out_unlock_source, rc);
1160
1161         /* step 2: find & lock the target dir. */
1162         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1163         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
1164                           rr->rr_tgtlen);
1165         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1166                 mdt_object_get(info->mti_env, msrcdir);
1167                 mtgtdir = msrcdir;
1168                 if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
1169                          rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
1170                                                  MDS_INODELOCK_UPDATE);
1171                          if (rc)
1172                                  GOTO(out_unlock_source, rc);
1173                          OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1174                 }
1175         } else {
1176                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
1177                                           rr->rr_fid2);
1178                 if (IS_ERR(mtgtdir))
1179                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
1180
1181                 /* check early, the real version will be saved after locking */
1182                 rc = mdt_version_get_check(info, mtgtdir, 1);
1183                 if (rc)
1184                         GOTO(out_put_target, rc);
1185
1186                 rc = mdt_object_exists(mtgtdir);
1187                 if (rc == 0) {
1188                         GOTO(out_put_target, rc = -ESTALE);
1189                 } else if (rc > 0) {
1190                         /* we lock the target dir if it is local */
1191                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
1192                                              MDS_INODELOCK_UPDATE,
1193                                              MDT_LOCAL_LOCK);
1194                         if (rc != 0)
1195                                 GOTO(out_put_target, rc);
1196                         /* get and save correct version after locking */
1197                         mdt_version_get_save(info, mtgtdir, 1);
1198                 }
1199         }
1200
1201         /* step 3: find & lock the old object. */
1202         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
1203         mdt_name_copy(&slname, lname);
1204         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
1205         if (rc != 0)
1206                 GOTO(out_unlock_target, rc);
1207
1208         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1209                 GOTO(out_unlock_target, rc = -EINVAL);
1210
1211         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1212         if (IS_ERR(mold))
1213                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
1214
1215         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1216         mdt_lock_reg_init(lh_oldp, LCK_EX);
1217         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1218                              MDT_CROSS_LOCK);
1219         if (rc != 0) {
1220                 mdt_object_put(info->mti_env, mold);
1221                 GOTO(out_unlock_target, rc);
1222         }
1223
1224         info->mti_mos = mold;
1225         /* save version after locking */
1226         mdt_version_get_save(info, mold, 2);
1227         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1228
1229         /* step 4: find & lock the new object. */
1230         /* new target object may not exist now */
1231         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1232         /* lookup with version checking */
1233         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
1234         if (rc == 0) {
1235                 /* the new_fid should have been filled at this moment */
1236                 if (lu_fid_eq(old_fid, new_fid))
1237                        GOTO(out_unlock_old, rc);
1238
1239                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1240                     lu_fid_eq(new_fid, rr->rr_fid2))
1241                         GOTO(out_unlock_old, rc = -EINVAL);
1242
1243                 mdt_lock_reg_init(lh_newp, LCK_EX);
1244                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1245                 if (IS_ERR(mnew))
1246                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1247
1248                 rc = mdt_object_lock(info, mnew, lh_newp,
1249                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1250                 if (rc != 0) {
1251                         mdt_object_put(info->mti_env, mnew);
1252                         GOTO(out_unlock_old, rc);
1253                 }
1254                 /* get and save version after locking */
1255                 mdt_version_get_save(info, mnew, 3);
1256                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1257         } else if (rc != -EREMOTE && rc != -ENOENT) {
1258                 GOTO(out_unlock_old, rc);
1259         } else {
1260                 mdt_enoent_version_save(info, 3);
1261         }
1262
1263         /* step 5: rename it */
1264         mdt_reint_init_ma(info, ma);
1265         if (!ma->ma_lmm || !ma->ma_cookie)
1266                 GOTO(out_unlock_new, rc = -EINVAL);
1267
1268         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1269                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1270
1271
1272         /* Check if @dst is subdir of @src. */
1273         rc = mdt_rename_sanity(info, old_fid);
1274         if (rc)
1275                 GOTO(out_unlock_new, rc);
1276
1277         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1278                         mdt_object_child(mtgtdir), old_fid, &slname,
1279                         (mnew ? mdt_object_child(mnew) : NULL),
1280                         lname, ma);
1281
1282         /* handle last link of tgt object */
1283         if (rc == 0) {
1284                 mdt_counter_incr(req->rq_export, LPROC_MDT_RENAME);
1285                 if (mnew)
1286                         mdt_handle_last_unlink(info, mnew, ma);
1287         }
1288
1289         EXIT;
1290 out_unlock_new:
1291         if (mnew)
1292                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1293 out_unlock_old:
1294         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1295 out_unlock_target:
1296         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
1297 out_put_target:
1298         mdt_object_put(info->mti_env, mtgtdir);
1299 out_unlock_source:
1300         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1301 out_rename_lock:
1302         mdt_rename_unlock(&rename_lh);
1303         return rc;
1304 }
1305
1306 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1307                            struct mdt_lock_handle *lhc);
1308
1309 static mdt_reinter reinters[REINT_MAX] = {
1310         [REINT_SETATTR]  = mdt_reint_setattr,
1311         [REINT_CREATE]   = mdt_reint_create,
1312         [REINT_LINK]     = mdt_reint_link,
1313         [REINT_UNLINK]   = mdt_reint_unlink,
1314         [REINT_RENAME]   = mdt_reint_rename,
1315         [REINT_OPEN]     = mdt_reint_open,
1316         [REINT_SETXATTR] = mdt_reint_setxattr
1317 };
1318
1319 int mdt_reint_rec(struct mdt_thread_info *info,
1320                   struct mdt_lock_handle *lhc)
1321 {
1322         int rc;
1323         ENTRY;
1324
1325         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1326
1327         RETURN(rc);
1328 }