Whamcloud - gitweb
ef422a9b3427e2079664df7d7e34698d021f90e2
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdt/mdt_reint.c
40  *
41  * Lustre Metadata Target (mdt) reintegration routines
42  *
43  * Author: Peter Braam <braam@clusterfs.com>
44  * Author: Andreas Dilger <adilger@clusterfs.com>
45  * Author: Phil Schwan <phil@clusterfs.com>
46  * Author: Huang Hua <huanghua@clusterfs.com>
47  * Author: Yury Umanets <umka@clusterfs.com>
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include "mdt_internal.h"
56 #include "lu_time.h"
57
58 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
59                                      struct md_attr *ma)
60 {
61         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
62         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
63                                                &RMF_MDT_MD, RCL_SERVER);
64
65         ma->ma_cookie = req_capsule_server_get(info->mti_pill,
66                                                &RMF_LOGCOOKIES);
67         ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
68                                                   &RMF_LOGCOOKIES,
69                                                   RCL_SERVER);
70
71         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
72         ma->ma_valid = 0;
73 }
74
75 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
76                                 struct mdt_object *object,
77                                 struct mdt_body *repbody)
78 {
79         ENTRY;
80
81         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
82          * we won't go to below*/
83         if (repbody->valid & OBD_MD_FLMDSCAPA)
84                 RETURN(rc);
85
86         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
87             info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
88                 struct lustre_capa *capa;
89
90                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
91                 LASSERT(capa);
92                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
93                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
94                                  0);
95                 if (rc == 0)
96                         repbody->valid |= OBD_MD_FLMDSCAPA;
97         }
98
99         RETURN(rc);
100 }
101
102 /**
103  * Get version of object by fid.
104  *
105  * Return real version or ENOENT_VERSION if object doesn't exist
106  */
107 static void mdt_obj_version_get(struct mdt_thread_info *info,
108                                 struct mdt_object *o, __u64 *version)
109 {
110         LASSERT(o);
111         LASSERT(mdt_object_exists(o) >= 0);
112         if (mdt_object_exists(o) > 0)
113                 *version = mo_version_get(info->mti_env, mdt_object_child(o));
114         else
115                 *version = ENOENT_VERSION;
116         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
117                PFID(mdt_object_fid(o)), *version);
118 }
119
120 /**
121  * Check version is correct.
122  *
123  * Should be called only during replay.
124  */
125 static int mdt_version_check(struct ptlrpc_request *req,
126                              __u64 version, int idx)
127 {
128         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
129         ENTRY;
130
131         if (!exp_connect_vbr(req->rq_export))
132                 RETURN(0);
133
134         LASSERT(req_is_replay(req));
135         /** VBR: version is checked always because costs nothing */
136         LASSERT(idx < PTLRPC_NUM_VERSIONS);
137         /** Sanity check for malformed buffers */
138         if (pre_ver == NULL) {
139                 CERROR("No versions in request buffer\n");
140                 cfs_spin_lock(&req->rq_export->exp_lock);
141                 req->rq_export->exp_vbr_failed = 1;
142                 cfs_spin_unlock(&req->rq_export->exp_lock);
143                 RETURN(-EOVERFLOW);
144         } else if (pre_ver[idx] != version) {
145                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
146                        pre_ver[idx], version);
147                 cfs_spin_lock(&req->rq_export->exp_lock);
148                 req->rq_export->exp_vbr_failed = 1;
149                 cfs_spin_unlock(&req->rq_export->exp_lock);
150                 RETURN(-EOVERFLOW);
151         }
152         RETURN(0);
153 }
154
155 /**
156  * Save pre-versions in reply.
157  */
158 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
159                              int idx)
160 {
161         __u64 *reply_ver;
162
163         if (!exp_connect_vbr(req->rq_export))
164                 return;
165
166         LASSERT(!req_is_replay(req));
167         LASSERT(req->rq_repmsg != NULL);
168         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
169         if (reply_ver)
170                 reply_ver[idx] = version;
171 }
172
173 /**
174  * Save enoent version, it is needed when it is obvious that object doesn't
175  * exist, e.g. child during create.
176  */
177 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
178 {
179         /* save version of file name for replay, it must be ENOENT here */
180         if (!req_is_replay(mdt_info_req(info))) {
181                 info->mti_ver[idx] = ENOENT_VERSION;
182                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
183         }
184 }
185
186 /**
187  * Get version from disk and save in reply buffer.
188  *
189  * Versions are saved in reply only during normal operations not replays.
190  */
191 void mdt_version_get_save(struct mdt_thread_info *info,
192                           struct mdt_object *mto, int idx)
193 {
194         /* don't save versions during replay */
195         if (!req_is_replay(mdt_info_req(info))) {
196                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
197                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
198         }
199 }
200
201 /**
202  * Get version from disk and check it, no save in reply.
203  */
204 int mdt_version_get_check(struct mdt_thread_info *info,
205                           struct mdt_object *mto, int idx)
206 {
207         /* only check versions during replay */
208         if (!req_is_replay(mdt_info_req(info)))
209                 return 0;
210
211         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
212         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
213 }
214
215 /**
216  * Get version from disk and check if recovery or just save.
217  */
218 int mdt_version_get_check_save(struct mdt_thread_info *info,
219                                struct mdt_object *mto, int idx)
220 {
221         int rc = 0;
222
223         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
224         if (req_is_replay(mdt_info_req(info)))
225                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
226                                        idx);
227         else
228                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
229         return rc;
230 }
231
232 /**
233  * Lookup with version checking.
234  *
235  * This checks version of 'name'. Many reint functions uses 'name' for child not
236  * FID, therefore we need to get object by name and check its version.
237  */
238 int mdt_lookup_version_check(struct mdt_thread_info *info,
239                              struct mdt_object *p, struct lu_name *lname,
240                              struct lu_fid *fid, int idx)
241 {
242         int rc, vbrc;
243
244         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
245                         &info->mti_spec);
246         /* Check version only during replay */
247         if (!req_is_replay(mdt_info_req(info)))
248                 return rc;
249
250         info->mti_ver[idx] = ENOENT_VERSION;
251         if (rc == 0) {
252                 struct mdt_object *child;
253                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid,
254                                         MDT_OBJ_MAY_NOT_EXIST);
255                 if (likely(!IS_ERR(child))) {
256                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
257                         mdt_object_put(info->mti_env, child);
258                 }
259         }
260         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
261         return vbrc ? vbrc : rc;
262
263 }
264
265 /*
266  * VBR: we save three versions in reply:
267  * 0 - parent. Check that parent version is the same during replay.
268  * 1 - name. Version of 'name' if file exists with the same name or
269  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
270  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
271  * check.
272  */
273 static int mdt_md_create(struct mdt_thread_info *info)
274 {
275         struct mdt_device       *mdt = info->mti_mdt;
276         struct mdt_object       *parent;
277         struct mdt_object       *child;
278         struct mdt_lock_handle  *lh;
279         struct mdt_body         *repbody;
280         struct md_attr          *ma = &info->mti_attr;
281         struct mdt_reint_record *rr = &info->mti_rr;
282         struct lu_name          *lname;
283         int rc;
284         ENTRY;
285
286         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
287                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
288
289         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
290
291         lh = &info->mti_lh[MDT_LH_PARENT];
292         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
293
294         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
295                                       MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
296         if (IS_ERR(parent))
297                 RETURN(PTR_ERR(parent));
298
299         rc = mdt_version_get_check_save(info, parent, 0);
300         if (rc)
301                 GOTO(out_put_parent, rc);
302
303         /*
304          * Check child name version during replay.
305          * During create replay a file may exist with same name.
306          */
307         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
308         rc = mdt_lookup_version_check(info, parent, lname,
309                                       &info->mti_tmp_fid1, 1);
310         /* -ENOENT is expected here */
311         if (rc != 0 && rc != -ENOENT)
312                 GOTO(out_put_parent, rc);
313
314         /* save version of file name for replay, it must be ENOENT here */
315         mdt_enoent_version_save(info, 1);
316
317         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2,
318                                 MDT_OBJ_MAY_NOT_EXIST);
319         if (likely(!IS_ERR(child))) {
320                 struct md_object *next = mdt_object_child(parent);
321
322                 ma->ma_need = MA_INODE;
323                 ma->ma_valid = 0;
324                 /* capa for cross-ref will be stored here */
325                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
326                                                      &RMF_CAPA1);
327                 LASSERT(ma->ma_capa);
328
329                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
330                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
331
332                 /* Version of child will be updated on disk. */
333                 info->mti_mos = child;
334                 rc = mdt_version_get_check_save(info, child, 2);
335                 if (rc)
336                         GOTO(out_put_child, rc);
337
338                 /* Let lower layer know current lock mode. */
339                 info->mti_spec.sp_cr_mode =
340                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
341
342                 /*
343                  * Do perform lookup sanity check. We do not know if name exists
344                  * or not.
345                  */
346                 info->mti_spec.sp_cr_lookup = 1;
347                 info->mti_spec.sp_feat = &dt_directory_features;
348
349                 rc = mdo_create(info->mti_env, next, lname,
350                                 mdt_object_child(child),
351                                 &info->mti_spec, ma);
352                 if (rc == 0) {
353                         /* Return fid & attr to client. */
354                         if (ma->ma_valid & MA_INODE)
355                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
356                                                    mdt_object_fid(child));
357                 }
358 out_put_child:
359                 mdt_object_put(info->mti_env, child);
360         } else {
361                 rc = PTR_ERR(child);
362         }
363         mdt_create_pack_capa(info, rc, child, repbody);
364 out_put_parent:
365         mdt_object_unlock_put(info, parent, lh, rc);
366         RETURN(rc);
367 }
368
369 /* Partial request to create object only */
370 static int mdt_md_mkobj(struct mdt_thread_info *info)
371 {
372         struct mdt_device      *mdt = info->mti_mdt;
373         struct mdt_object      *o;
374         struct mdt_body        *repbody;
375         struct md_attr         *ma = &info->mti_attr;
376         int rc;
377         ENTRY;
378
379         DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
380                   PFID(info->mti_rr.rr_fid2));
381
382         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
383
384         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
385                             MDT_OBJ_MAY_NOT_EXIST);
386         if (!IS_ERR(o)) {
387                 struct md_object *next = mdt_object_child(o);
388
389                 ma->ma_need = MA_INODE;
390                 ma->ma_valid = 0;
391
392                 /*
393                  * Cross-ref create can encounter already created obj in case of
394                  * recovery, just get attr in that case.
395                  */
396                 if (mdt_object_exists(o) == 1) {
397                         rc = mo_attr_get(info->mti_env, next, ma);
398                 } else {
399                         /*
400                          * Here, NO permission check for object_create,
401                          * such check has been done on the original MDS.
402                          */
403                         rc = mo_object_create(info->mti_env, next,
404                                               &info->mti_spec, ma);
405                 }
406                 if (rc == 0) {
407                         /* Return fid & attr to client. */
408                         if (ma->ma_valid & MA_INODE)
409                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
410                                                    mdt_object_fid(o));
411                 }
412                 mdt_object_put(info->mti_env, o);
413         } else
414                 rc = PTR_ERR(o);
415
416         mdt_create_pack_capa(info, rc, o, repbody);
417         RETURN(rc);
418 }
419
420 /* In the raw-setattr case, we lock the child inode.
421  * In the write-back case or if being called from open,
422  *               the client holds a lock already.
423  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
424  * mdt_setattr_unpack()) flag to tell these cases apart. */
425 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
426                  struct md_attr *ma, int flags)
427 {
428         struct mdt_lock_handle  *lh;
429         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
430         int rc;
431         ENTRY;
432
433         /* attr shouldn't be set on remote object */
434         LASSERT(mdt_object_exists(mo) >= 0);
435
436         lh = &info->mti_lh[MDT_LH_PARENT];
437         mdt_lock_reg_init(lh, LCK_PW);
438
439         if (!(flags & MRF_SETATTR_LOCKED)) {
440                 __u64 lockpart = MDS_INODELOCK_UPDATE;
441                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
442                         lockpart |= MDS_INODELOCK_LOOKUP;
443
444                 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
445                 if (rc != 0)
446                         RETURN(rc);
447         }
448
449         if (mdt_object_exists(mo) == 0)
450                 GOTO(out_unlock, rc = -ENOENT);
451
452         /* all attrs are packed into mti_attr in unpack_setattr */
453         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
454                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
455
456         /* This is only for set ctime when rename's source is on remote MDS. */
457         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
458                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
459
460         /* VBR: update version if attr changed are important for recovery */
461         if (do_vbr) {
462                 /* update on-disk version of changed object */
463                 info->mti_mos = mo;
464                 rc = mdt_version_get_check_save(info, mo, 0);
465                 if (rc)
466                         GOTO(out_unlock, rc);
467         }
468
469         /* all attrs are packed into mti_attr in unpack_setattr */
470         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
471         if (rc != 0)
472                 GOTO(out_unlock, rc);
473
474         EXIT;
475 out_unlock:
476         mdt_object_unlock(info, mo, lh, rc);
477         return rc;
478 }
479
480 static int mdt_reint_setattr(struct mdt_thread_info *info,
481                              struct mdt_lock_handle *lhc)
482 {
483         struct md_attr          *ma = &info->mti_attr;
484         struct mdt_reint_record *rr = &info->mti_rr;
485         struct ptlrpc_request   *req = mdt_info_req(info);
486         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
487         struct mdt_file_data    *mfd;
488         struct mdt_object       *mo;
489         struct md_object        *next;
490         struct mdt_body         *repbody;
491         int                      som_au, rc;
492         ENTRY;
493
494         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
495                   (unsigned int)ma->ma_attr.la_valid);
496
497         if (info->mti_dlm_req)
498                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
499
500         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
501         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
502                              MDT_OBJ_MUST_EXIST);
503         if (IS_ERR(mo))
504                 GOTO(out, rc = PTR_ERR(mo));
505
506         /* start a log jounal handle if needed */
507         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
508                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
509                     (rr->rr_flags & MRF_SETATTR_LOCKED)) {
510                         /* Check write access for the O_TRUNC case */
511                         if (mdt_write_read(mo) < 0)
512                                 GOTO(out_put, rc = -ETXTBSY);
513                 }
514         } else if (info->mti_ioepoch &&
515                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
516                 /* Truncate case. IOEpoch is opened. */
517                 rc = mdt_write_get(mo);
518                 if (rc)
519                         GOTO(out_put, rc);
520
521                 mfd = mdt_mfd_new();
522                 if (mfd == NULL) {
523                         mdt_write_put(mo);
524                         GOTO(out_put, rc = -ENOMEM);
525                 }
526
527                 mdt_ioepoch_open(info, mo, 0);
528                 repbody->ioepoch = mo->mot_ioepoch;
529
530                 mdt_object_get(info->mti_env, mo);
531                 mdt_mfd_set_mode(mfd, FMODE_TRUNC);
532                 mfd->mfd_object = mo;
533                 mfd->mfd_xid = req->rq_xid;
534
535                 cfs_spin_lock(&med->med_open_lock);
536                 cfs_list_add(&mfd->mfd_list, &med->med_open_head);
537                 cfs_spin_unlock(&med->med_open_lock);
538                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
539         }
540
541         som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
542         if (som_au) {
543                 /* SOM Attribute update case. Find the proper mfd and update
544                  * SOM attributes on the proper object. */
545                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
546                 LASSERT(info->mti_ioepoch);
547
548                 cfs_spin_lock(&med->med_open_lock);
549                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
550                 if (mfd == NULL) {
551                         cfs_spin_unlock(&med->med_open_lock);
552                         CDEBUG(D_INODE, "no handle for file close: "
553                                "fid = "DFID": cookie = "LPX64"\n",
554                                PFID(info->mti_rr.rr_fid1),
555                                info->mti_ioepoch->handle.cookie);
556                         GOTO(out_put, rc = -ESTALE);
557                 }
558                 LASSERT(mfd->mfd_mode == FMODE_SOM);
559                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
560
561                 class_handle_unhash(&mfd->mfd_handle);
562                 cfs_list_del_init(&mfd->mfd_list);
563                 cfs_spin_unlock(&med->med_open_lock);
564
565                 /* Close the found mfd, update attributes. */
566                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
567                 OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
568                 if (ma->ma_lmm == NULL)
569                         GOTO(out_put, rc = -ENOMEM);
570
571                 mdt_mfd_close(info, mfd);
572
573                 OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
574         } else {
575                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
576                 if (rc)
577                         GOTO(out_put, rc);
578         }
579
580         ma->ma_need = MA_INODE;
581         ma->ma_valid = 0;
582         next = mdt_object_child(mo);
583         rc = mo_attr_get(info->mti_env, next, ma);
584         if (rc != 0)
585                 GOTO(out_put, rc);
586
587         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
588
589         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
590             info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
591             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
592             (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
593                 struct lustre_capa *capa;
594
595                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
596                 LASSERT(capa);
597                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
598                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
599                 if (rc)
600                         GOTO(out_put, rc);
601                 repbody->valid |= OBD_MD_FLOSSCAPA;
602         }
603
604         EXIT;
605 out_put:
606         mdt_object_put(info->mti_env, mo);
607 out:
608         if (rc == 0)
609                 mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR);
610
611         mdt_shrink_reply(info);
612         return rc;
613 }
614
615 static int mdt_reint_create(struct mdt_thread_info *info,
616                             struct mdt_lock_handle *lhc)
617 {
618         struct ptlrpc_request   *req = mdt_info_req(info);
619         int                     rc;
620         ENTRY;
621
622         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
623                 RETURN(err_serious(-ESTALE));
624
625         if (info->mti_dlm_req)
626                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
627
628         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
629         case S_IFDIR:{
630                 /* Cross-ref case. */
631                 /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
632                 if (info->mti_cross_ref) {
633                         rc = mdt_md_mkobj(info);
634                 } else {
635                         LASSERT(info->mti_rr.rr_namelen > 0);
636                         mdt_counter_incr(req->rq_export, LPROC_MDT_MKDIR);
637                         rc = mdt_md_create(info);
638                 }
639                 break;
640         }
641         case S_IFREG:
642         case S_IFLNK:
643         case S_IFCHR:
644         case S_IFBLK:
645         case S_IFIFO:
646         case S_IFSOCK:{
647                 /* Special file should stay on the same node as parent. */
648                 LASSERT(info->mti_rr.rr_namelen > 0);
649                 mdt_counter_incr(req->rq_export, LPROC_MDT_MKNOD);
650                 rc = mdt_md_create(info);
651                 break;
652         }
653         default:
654                 rc = err_serious(-EOPNOTSUPP);
655         }
656         RETURN(rc);
657 }
658
659 /*
660  * VBR: save parent version in reply and child version getting by its name.
661  * Version of child is getting and checking during its lookup. If
662  */
663 static int mdt_reint_unlink(struct mdt_thread_info *info,
664                             struct mdt_lock_handle *lhc)
665 {
666         struct mdt_reint_record *rr = &info->mti_rr;
667         struct ptlrpc_request   *req = mdt_info_req(info);
668         struct md_attr          *ma = &info->mti_attr;
669         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
670         struct mdt_object       *mp;
671         struct mdt_object       *mc;
672         struct mdt_lock_handle  *parent_lh;
673         struct mdt_lock_handle  *child_lh;
674         struct lu_name          *lname;
675         int                      rc;
676         ENTRY;
677
678         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
679                   rr->rr_name);
680
681         if (info->mti_dlm_req)
682                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
683
684         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
685                 RETURN(err_serious(-ENOENT));
686
687         /*
688          * step 1: lock the parent. Note, this may be child in case of
689          * remote operation denoted by ->mti_cross_ref flag.
690          */
691         parent_lh = &info->mti_lh[MDT_LH_PARENT];
692         if (info->mti_cross_ref) {
693                 /*
694                  * Init reg lock for cross ref case when we need to do only
695                  * ref del locally.
696                  */
697                 mdt_lock_reg_init(parent_lh, LCK_PW);
698         } else {
699                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
700                                   rr->rr_namelen);
701         }
702         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
703                                   MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
704         if (IS_ERR(mp)) {
705                 rc = PTR_ERR(mp);
706                 /* errors are possible here in cross-ref cases, see below */
707                 if (info->mti_cross_ref)
708                         rc = 0;
709                 GOTO(out, rc);
710         }
711
712         rc = mdt_version_get_check_save(info, mp, 0);
713         if (rc)
714                 GOTO(out_unlock_parent, rc);
715
716         mdt_reint_init_ma(info, ma);
717         if (!ma->ma_lmm || !ma->ma_cookie)
718                 GOTO(out_unlock_parent, rc = -EINVAL);
719
720         if (info->mti_cross_ref) {
721                 /*
722                  * Remote partial operation. It is possible that replay may
723                  * happen on parent MDT and this operation will be repeated.
724                  * Therefore the object absense is allowed case and nothing
725                  * should be done here.
726                  */
727                 if (mdt_object_exists(mp) > 0) {
728                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
729                         rc = mo_ref_del(info->mti_env,
730                                         mdt_object_child(mp), ma);
731                         if (rc == 0)
732                                 mdt_handle_last_unlink(info, mp, ma);
733                 } else
734                         rc = 0;
735                 GOTO(out_unlock_parent, rc);
736         }
737
738         /* step 2: find & lock the child */
739         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
740         /* lookup child object along with version checking */
741         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
742         if (rc != 0)
743                  GOTO(out_unlock_parent, rc);
744
745         /* We will lock the child regardless it is local or remote. No harm. */
746         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
747                              MDT_OBJ_MUST_EXIST);
748         if (IS_ERR(mc))
749                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
750         child_lh = &info->mti_lh[MDT_LH_CHILD];
751         mdt_lock_reg_init(child_lh, LCK_EX);
752         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
753                              MDT_CROSS_LOCK);
754         if (rc != 0) {
755                 mdt_object_put(info->mti_env, mc);
756                 GOTO(out_unlock_parent, rc);
757         }
758
759         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
760                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
761         /* save version when object is locked */
762         mdt_version_get_save(info, mc, 1);
763         /*
764          * Now we can only make sure we need MA_INODE, in mdd layer, will check
765          * whether need MA_LOV and MA_COOKIE.
766          */
767         ma->ma_need = MA_INODE;
768         ma->ma_valid = 0;
769         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
770         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
771                         mdt_object_child(mc), lname, ma);
772         if (rc == 0)
773                 mdt_handle_last_unlink(info, mc, ma);
774
775         if (ma->ma_valid & MA_INODE) {
776                 switch (ma->ma_attr.la_mode & S_IFMT) {
777                 case S_IFDIR:
778                         mdt_counter_incr(req->rq_export, LPROC_MDT_RMDIR);
779                         break;
780                 case S_IFREG:
781                 case S_IFLNK:
782                 case S_IFCHR:
783                 case S_IFBLK:
784                 case S_IFIFO:
785                 case S_IFSOCK:
786                         mdt_counter_incr(req->rq_export, LPROC_MDT_UNLINK);
787                         break;
788                 default:
789                         LASSERTF(0, "bad file type %o unlinking\n",
790                                  ma->ma_attr.la_mode);
791                 }
792         }
793
794         EXIT;
795
796         mdt_object_unlock_put(info, mc, child_lh, rc);
797 out_unlock_parent:
798         mdt_object_unlock_put(info, mp, parent_lh, rc);
799 out:
800         return rc;
801 }
802
803 /*
804  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
805  * name.
806  */
807 static int mdt_reint_link(struct mdt_thread_info *info,
808                           struct mdt_lock_handle *lhc)
809 {
810         struct mdt_reint_record *rr = &info->mti_rr;
811         struct ptlrpc_request   *req = mdt_info_req(info);
812         struct md_attr          *ma = &info->mti_attr;
813         struct mdt_object       *ms;
814         struct mdt_object       *mp;
815         struct mdt_lock_handle  *lhs;
816         struct mdt_lock_handle  *lhp;
817         struct lu_name          *lname;
818         int rc;
819         ENTRY;
820
821         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
822                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
823
824         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
825                 RETURN(err_serious(-ENOENT));
826
827         if (info->mti_dlm_req)
828                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
829
830         if (info->mti_cross_ref) {
831                 /* MDT holding name ask us to add ref. */
832                 lhs = &info->mti_lh[MDT_LH_CHILD];
833                 mdt_lock_reg_init(lhs, LCK_EX);
834                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
835                                           MDS_INODELOCK_UPDATE,
836                                           MDT_OBJ_MUST_EXIST);
837                 if (IS_ERR(ms))
838                         RETURN(PTR_ERR(ms));
839
840                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
841                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
842                 mdt_object_unlock_put(info, ms, lhs, rc);
843                 RETURN(rc);
844         }
845
846         /* Invalid case so return error immediately instead of
847          * processing it */
848         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
849                 RETURN(-EPERM);
850
851         /* step 1: find & lock the target parent dir */
852         lhp = &info->mti_lh[MDT_LH_PARENT];
853         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
854                           rr->rr_namelen);
855         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
856                                   MDS_INODELOCK_UPDATE, MDT_OBJ_MUST_EXIST);
857         if (IS_ERR(mp))
858                 RETURN(PTR_ERR(mp));
859
860         rc = mdt_version_get_check_save(info, mp, 0);
861         if (rc)
862                 GOTO(out_unlock_parent, rc);
863
864         /* step 2: find & lock the source */
865         lhs = &info->mti_lh[MDT_LH_CHILD];
866         mdt_lock_reg_init(lhs, LCK_EX);
867
868         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
869                              MDT_OBJ_MUST_EXIST);
870         if (IS_ERR(ms))
871                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
872
873         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
874                             MDT_CROSS_LOCK);
875         if (rc != 0) {
876                 mdt_object_put(info->mti_env, ms);
877                 GOTO(out_unlock_parent, rc);
878         }
879
880         /* step 3: link it */
881         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
882                        OBD_FAIL_MDS_REINT_LINK_WRITE);
883
884         info->mti_mos = ms;
885         rc = mdt_version_get_check_save(info, ms, 1);
886         if (rc)
887                 GOTO(out_unlock_child, rc);
888
889         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
890         /** check target version by name during replay */
891         rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
892         if (rc != 0 && rc != -ENOENT)
893                 GOTO(out_unlock_child, rc);
894         /* save version of file name for replay, it must be ENOENT here */
895         if (!req_is_replay(mdt_info_req(info))) {
896                 info->mti_ver[2] = ENOENT_VERSION;
897                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
898         }
899
900         rc = mdo_link(info->mti_env, mdt_object_child(mp),
901                       mdt_object_child(ms), lname, ma);
902
903         if (rc == 0)
904                 mdt_counter_incr(req->rq_export, LPROC_MDT_LINK);
905
906         EXIT;
907 out_unlock_child:
908         mdt_object_unlock_put(info, ms, lhs, rc);
909 out_unlock_parent:
910         mdt_object_unlock_put(info, mp, lhp, rc);
911         return rc;
912 }
913 /**
914  * lock the part of the directory according to the hash of the name
915  * (lh->mlh_pdo_hash) in parallel directory lock.
916  */
917 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
918                        struct mdt_lock_handle *lh,
919                        struct mdt_object *obj, __u64 ibits)
920 {
921         struct ldlm_res_id *res_id = &info->mti_res_id;
922         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
923         ldlm_policy_data_t *policy = &info->mti_policy;
924         int rc;
925
926         /*
927          * Finish res_id initializing by name hash marking part of
928          * directory which is taking modification.
929          */
930         LASSERT(lh->mlh_pdo_hash != 0);
931         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
932         memset(policy, 0, sizeof(*policy));
933         policy->l_inodebits.bits = ibits;
934         /*
935          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
936          * going to be sent to client. If it is - mdt_intent_policy() path will
937          * fix it up and turn FL_LOCAL flag off.
938          */
939         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
940                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
941                           &info->mti_exp->exp_handle.h_cookie);
942         return rc;
943 }
944
945 /* partial operation for rename */
946 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
947 {
948         struct mdt_reint_record *rr = &info->mti_rr;
949         struct ptlrpc_request   *req = mdt_info_req(info);
950         struct md_attr          *ma = &info->mti_attr;
951         struct mdt_object       *mtgtdir;
952         struct mdt_object       *mtgt = NULL;
953         struct mdt_lock_handle  *lh_tgtdir;
954         struct mdt_lock_handle  *lh_tgt = NULL;
955         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
956         struct lu_name          *lname;
957         int                      rc;
958         ENTRY;
959
960         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
961                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
962
963         /* step 1: lookup & lock the tgt dir. */
964         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
965         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
966                           rr->rr_tgtlen);
967         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
968                                        MDS_INODELOCK_UPDATE,
969                                        MDT_OBJ_MUST_EXIST);
970         if (IS_ERR(mtgtdir))
971                 RETURN(PTR_ERR(mtgtdir));
972
973         /* step 2: find & lock the target object if exists. */
974         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
975         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
976         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
977                         lname, tgt_fid, &info->mti_spec);
978         if (rc != 0 && rc != -ENOENT) {
979                 GOTO(out_unlock_tgtdir, rc);
980         } else if (rc == 0) {
981                 /*
982                  * In case of replay that name can be already inserted, check
983                  * that and do nothing if so.
984                  */
985                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
986                         GOTO(out_unlock_tgtdir, rc);
987
988                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
989                 mdt_lock_reg_init(lh_tgt, LCK_EX);
990
991                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
992                                             MDS_INODELOCK_LOOKUP,
993                                             MDT_OBJ_MUST_EXIST);
994                 if (IS_ERR(mtgt))
995                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
996
997                 mdt_reint_init_ma(info, ma);
998                 if (!ma->ma_lmm || !ma->ma_cookie)
999                         GOTO(out_unlock_tgt, rc = -EINVAL);
1000
1001                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
1002                                     mdt_object_child(mtgt), rr->rr_fid2,
1003                                     lname, ma);
1004         } else /* -ENOENT */ {
1005                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
1006                                      lname, rr->rr_fid2, ma);
1007         }
1008
1009         /* handle last link of tgt object */
1010         if (rc == 0 && mtgt)
1011                 mdt_handle_last_unlink(info, mtgt, ma);
1012
1013         EXIT;
1014 out_unlock_tgt:
1015         if (mtgt)
1016                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
1017 out_unlock_tgtdir:
1018         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
1019         return rc;
1020 }
1021
1022 static int mdt_rename_lock(struct mdt_thread_info *info,
1023                            struct lustre_handle *lh)
1024 {
1025         struct ldlm_namespace *ns     = info->mti_mdt->mdt_namespace;
1026         ldlm_policy_data_t    *policy = &info->mti_policy;
1027         struct ldlm_res_id    *res_id = &info->mti_res_id;
1028         struct md_site        *ms;
1029         int rc;
1030         ENTRY;
1031
1032         ms = mdt_md_site(info->mti_mdt);
1033         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1034
1035         memset(policy, 0, sizeof *policy);
1036         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1037
1038         if (ms->ms_control_exp == NULL) {
1039                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1040
1041                 /*
1042                  * Current node is controller, that is mdt0, where we should
1043                  * take BFL lock.
1044                  */
1045                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
1046                                             LCK_EX, &flags, ldlm_blocking_ast,
1047                                             ldlm_completion_ast, NULL, NULL, 0,
1048                                             &info->mti_exp->exp_handle.h_cookie,
1049                                             lh);
1050         } else {
1051                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
1052                      ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
1053                 int flags = 0;
1054
1055                 /*
1056                  * This is the case mdt0 is remote node, issue DLM lock like
1057                  * other clients.
1058                  */
1059                 rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
1060                                       policy, &flags, NULL, 0, lh, 0);
1061         }
1062
1063         RETURN(rc);
1064 }
1065
1066 static void mdt_rename_unlock(struct lustre_handle *lh)
1067 {
1068         ENTRY;
1069         LASSERT(lustre_handle_is_used(lh));
1070         ldlm_lock_decref(lh, LCK_EX);
1071         EXIT;
1072 }
1073
1074 /*
1075  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1076  * target. Source should not be ancestor of target dir. May be other rename
1077  * checks can be moved here later.
1078  */
1079 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
1080 {
1081         struct mdt_reint_record *rr = &info->mti_rr;
1082         struct lu_fid dst_fid = *rr->rr_fid2;
1083         struct mdt_object *dst;
1084         int rc = 0;
1085         ENTRY;
1086
1087         do {
1088                 LASSERT(fid_is_sane(&dst_fid));
1089                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
1090                                       MDT_OBJ_MUST_EXIST);
1091                 if (!IS_ERR(dst)) {
1092                         rc = mdo_is_subdir(info->mti_env,
1093                                            mdt_object_child(dst), fid,
1094                                            &dst_fid);
1095                         mdt_object_put(info->mti_env, dst);
1096                         if (rc != -EREMOTE && rc < 0) {
1097                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
1098                         } else {
1099                                 /* check the found fid */
1100                                 if (lu_fid_eq(&dst_fid, fid))
1101                                         rc = -EINVAL;
1102                         }
1103                 } else {
1104                         rc = PTR_ERR(dst);
1105                 }
1106         } while (rc == -EREMOTE);
1107
1108         RETURN(rc);
1109 }
1110
1111 /*
1112  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1113  * 2 - src child; 3 - tgt child.
1114  * Update on disk version of src child.
1115  */
1116 static int mdt_reint_rename(struct mdt_thread_info *info,
1117                             struct mdt_lock_handle *lhc)
1118 {
1119         struct mdt_reint_record *rr = &info->mti_rr;
1120         struct md_attr          *ma = &info->mti_attr;
1121         struct ptlrpc_request   *req = mdt_info_req(info);
1122         struct mdt_object       *msrcdir;
1123         struct mdt_object       *mtgtdir;
1124         struct mdt_object       *mold;
1125         struct mdt_object       *mnew = NULL;
1126         struct mdt_lock_handle  *lh_srcdirp;
1127         struct mdt_lock_handle  *lh_tgtdirp;
1128         struct mdt_lock_handle  *lh_oldp;
1129         struct mdt_lock_handle  *lh_newp;
1130         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1131         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1132         struct lustre_handle     rename_lh = { 0 };
1133         struct lu_name           slname = { 0 };
1134         struct lu_name          *lname;
1135         int                      rc;
1136         ENTRY;
1137
1138         if (info->mti_dlm_req)
1139                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
1140
1141         if (info->mti_cross_ref) {
1142                 rc = mdt_reint_rename_tgt(info);
1143                 RETURN(rc);
1144         }
1145
1146         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
1147                   PFID(rr->rr_fid1), rr->rr_name,
1148                   PFID(rr->rr_fid2), rr->rr_tgt);
1149
1150         rc = mdt_rename_lock(info, &rename_lh);
1151         if (rc) {
1152                 CERROR("Can't lock FS for rename, rc %d\n", rc);
1153                 RETURN(rc);
1154         }
1155
1156         lh_newp = &info->mti_lh[MDT_LH_NEW];
1157
1158         /* step 1: lock the source dir. */
1159         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1160         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
1161                           rr->rr_namelen);
1162         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
1163                                        MDS_INODELOCK_UPDATE,
1164                                        MDT_OBJ_MUST_EXIST);
1165         if (IS_ERR(msrcdir))
1166                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
1167
1168         rc = mdt_version_get_check_save(info, msrcdir, 0);
1169         if (rc)
1170                 GOTO(out_unlock_source, rc);
1171
1172         /* step 2: find & lock the target dir. */
1173         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1174         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
1175                           rr->rr_tgtlen);
1176         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1177                 mdt_object_get(info->mti_env, msrcdir);
1178                 mtgtdir = msrcdir;
1179                 if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
1180                          rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
1181                                                  MDS_INODELOCK_UPDATE);
1182                          if (rc)
1183                                  GOTO(out_unlock_source, rc);
1184                          OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1185                 }
1186         } else {
1187                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
1188                                           rr->rr_fid2, MDT_OBJ_MUST_EXIST);
1189                 if (IS_ERR(mtgtdir))
1190                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
1191
1192                 /* check early, the real version will be saved after locking */
1193                 rc = mdt_version_get_check(info, mtgtdir, 1);
1194                 if (rc)
1195                         GOTO(out_put_target, rc);
1196
1197                 if (mdt_object_exists(mtgtdir) > 0) {
1198                         /* we lock the target dir if it is local */
1199                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
1200                                              MDS_INODELOCK_UPDATE,
1201                                              MDT_LOCAL_LOCK);
1202                         if (rc != 0)
1203                                 GOTO(out_put_target, rc);
1204                         /* get and save correct version after locking */
1205                         mdt_version_get_save(info, mtgtdir, 1);
1206                 }
1207         }
1208
1209         /* step 3: find & lock the old object. */
1210         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
1211         mdt_name_copy(&slname, lname);
1212         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
1213         if (rc != 0)
1214                 GOTO(out_unlock_target, rc);
1215
1216         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1217                 GOTO(out_unlock_target, rc = -EINVAL);
1218
1219         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid,
1220                                MDT_OBJ_MUST_EXIST);
1221         if (IS_ERR(mold))
1222                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
1223
1224         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1225         mdt_lock_reg_init(lh_oldp, LCK_EX);
1226         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1227                              MDT_CROSS_LOCK);
1228         if (rc != 0) {
1229                 mdt_object_put(info->mti_env, mold);
1230                 GOTO(out_unlock_target, rc);
1231         }
1232
1233         info->mti_mos = mold;
1234         /* save version after locking */
1235         mdt_version_get_save(info, mold, 2);
1236         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1237
1238         /* step 4: find & lock the new object. */
1239         /* new target object may not exist now */
1240         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1241         /* lookup with version checking */
1242         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
1243         if (rc == 0) {
1244                 /* the new_fid should have been filled at this moment */
1245                 if (lu_fid_eq(old_fid, new_fid))
1246                        GOTO(out_unlock_old, rc);
1247
1248                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1249                     lu_fid_eq(new_fid, rr->rr_fid2))
1250                         GOTO(out_unlock_old, rc = -EINVAL);
1251
1252                 mdt_lock_reg_init(lh_newp, LCK_EX);
1253                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
1254                                        MDT_OBJ_MAY_NOT_EXIST);
1255                 if (IS_ERR(mnew))
1256                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1257
1258                 rc = mdt_object_lock(info, mnew, lh_newp,
1259                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1260                 if (rc != 0) {
1261                         mdt_object_put(info->mti_env, mnew);
1262                         GOTO(out_unlock_old, rc);
1263                 }
1264                 /* get and save version after locking */
1265                 mdt_version_get_save(info, mnew, 3);
1266                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1267         } else if (rc != -EREMOTE && rc != -ENOENT) {
1268                 GOTO(out_unlock_old, rc);
1269         } else {
1270                 mdt_enoent_version_save(info, 3);
1271         }
1272
1273         /* step 5: rename it */
1274         mdt_reint_init_ma(info, ma);
1275         if (!ma->ma_lmm || !ma->ma_cookie)
1276                 GOTO(out_unlock_new, rc = -EINVAL);
1277
1278         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1279                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1280
1281
1282         /* Check if @dst is subdir of @src. */
1283         rc = mdt_rename_sanity(info, old_fid);
1284         if (rc)
1285                 GOTO(out_unlock_new, rc);
1286
1287         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1288                         mdt_object_child(mtgtdir), old_fid, &slname,
1289                         (mnew ? mdt_object_child(mnew) : NULL),
1290                         lname, ma);
1291
1292         /* handle last link of tgt object */
1293         if (rc == 0) {
1294                 mdt_counter_incr(req->rq_export, LPROC_MDT_RENAME);
1295                 if (mnew)
1296                         mdt_handle_last_unlink(info, mnew, ma);
1297         }
1298
1299         EXIT;
1300 out_unlock_new:
1301         if (mnew)
1302                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1303 out_unlock_old:
1304         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1305 out_unlock_target:
1306         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
1307 out_put_target:
1308         mdt_object_put(info->mti_env, mtgtdir);
1309 out_unlock_source:
1310         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1311 out_rename_lock:
1312         mdt_rename_unlock(&rename_lh);
1313         return rc;
1314 }
1315
1316 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1317                            struct mdt_lock_handle *lhc);
1318
1319 static mdt_reinter reinters[REINT_MAX] = {
1320         [REINT_SETATTR]  = mdt_reint_setattr,
1321         [REINT_CREATE]   = mdt_reint_create,
1322         [REINT_LINK]     = mdt_reint_link,
1323         [REINT_UNLINK]   = mdt_reint_unlink,
1324         [REINT_RENAME]   = mdt_reint_rename,
1325         [REINT_OPEN]     = mdt_reint_open,
1326         [REINT_SETXATTR] = mdt_reint_setxattr
1327 };
1328
1329 int mdt_reint_rec(struct mdt_thread_info *info,
1330                   struct mdt_lock_handle *lhc)
1331 {
1332         int rc;
1333         ENTRY;
1334
1335         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1336
1337         RETURN(rc);
1338 }