Whamcloud - gitweb
682b348a9512c9c8ff3c4599f737583e9daf5d0c
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_reint.c
37  *
38  * Lustre Metadata Target (mdt) reintegration routines
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Huang Hua <huanghua@clusterfs.com>
44  * Author: Yury Umanets <umka@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include "mdt_internal.h"
50 #include "lu_time.h"
51
52 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
53                                      struct md_attr *ma)
54 {
55         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
56         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
57                                                &RMF_MDT_MD, RCL_SERVER);
58
59         ma->ma_cookie = req_capsule_server_get(info->mti_pill,
60                                                &RMF_LOGCOOKIES);
61         ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
62                                                   &RMF_LOGCOOKIES,
63                                                   RCL_SERVER);
64
65         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
66         ma->ma_valid = 0;
67 }
68
69 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
70                                 struct mdt_object *object,
71                                 struct mdt_body *repbody)
72 {
73         ENTRY;
74
75         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
76          * we won't go to below*/
77         if (repbody->valid & OBD_MD_FLMDSCAPA)
78                 RETURN(rc);
79
80         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
81             info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
82                 struct lustre_capa *capa;
83
84                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
85                 LASSERT(capa);
86                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
87                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
88                                  0);
89                 if (rc == 0)
90                         repbody->valid |= OBD_MD_FLMDSCAPA;
91         }
92
93         RETURN(rc);
94 }
95
96 /**
97  * Get version of object by fid.
98  *
99  * Return real version or ENOENT_VERSION if object doesn't exist
100  */
101 static void mdt_obj_version_get(struct mdt_thread_info *info,
102                                 struct mdt_object *o, __u64 *version)
103 {
104         LASSERT(o);
105         LASSERT(mdt_object_exists(o) >= 0);
106         if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
107                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
108         else
109                 *version = ENOENT_VERSION;
110         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
111                PFID(mdt_object_fid(o)), *version);
112 }
113
114 /**
115  * Check version is correct.
116  *
117  * Should be called only during replay.
118  */
119 static int mdt_version_check(struct ptlrpc_request *req,
120                              __u64 version, int idx)
121 {
122         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
123         ENTRY;
124
125         if (!exp_connect_vbr(req->rq_export))
126                 RETURN(0);
127
128         LASSERT(req_is_replay(req));
129         /** VBR: version is checked always because costs nothing */
130         LASSERT(idx < PTLRPC_NUM_VERSIONS);
131         /** Sanity check for malformed buffers */
132         if (pre_ver == NULL) {
133                 CERROR("No versions in request buffer\n");
134                 cfs_spin_lock(&req->rq_export->exp_lock);
135                 req->rq_export->exp_vbr_failed = 1;
136                 cfs_spin_unlock(&req->rq_export->exp_lock);
137                 RETURN(-EOVERFLOW);
138         } else if (pre_ver[idx] != version) {
139                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
140                        pre_ver[idx], version);
141                 cfs_spin_lock(&req->rq_export->exp_lock);
142                 req->rq_export->exp_vbr_failed = 1;
143                 cfs_spin_unlock(&req->rq_export->exp_lock);
144                 RETURN(-EOVERFLOW);
145         }
146         RETURN(0);
147 }
148
149 /**
150  * Save pre-versions in reply.
151  */
152 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
153                              int idx)
154 {
155         __u64 *reply_ver;
156
157         if (!exp_connect_vbr(req->rq_export))
158                 return;
159
160         LASSERT(!req_is_replay(req));
161         LASSERT(req->rq_repmsg != NULL);
162         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
163         if (reply_ver)
164                 reply_ver[idx] = version;
165 }
166
167 /**
168  * Save enoent version, it is needed when it is obvious that object doesn't
169  * exist, e.g. child during create.
170  */
171 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
172 {
173         /* save version of file name for replay, it must be ENOENT here */
174         if (!req_is_replay(mdt_info_req(info))) {
175                 info->mti_ver[idx] = ENOENT_VERSION;
176                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
177         }
178 }
179
180 /**
181  * Get version from disk and save in reply buffer.
182  *
183  * Versions are saved in reply only during normal operations not replays.
184  */
185 void mdt_version_get_save(struct mdt_thread_info *info,
186                           struct mdt_object *mto, int idx)
187 {
188         /* don't save versions during replay */
189         if (!req_is_replay(mdt_info_req(info))) {
190                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
191                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
192         }
193 }
194
195 /**
196  * Get version from disk and check it, no save in reply.
197  */
198 int mdt_version_get_check(struct mdt_thread_info *info,
199                           struct mdt_object *mto, int idx)
200 {
201         /* only check versions during replay */
202         if (!req_is_replay(mdt_info_req(info)))
203                 return 0;
204
205         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
206         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
207 }
208
209 /**
210  * Get version from disk and check if recovery or just save.
211  */
212 int mdt_version_get_check_save(struct mdt_thread_info *info,
213                                struct mdt_object *mto, int idx)
214 {
215         int rc = 0;
216
217         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
218         if (req_is_replay(mdt_info_req(info)))
219                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
220                                        idx);
221         else
222                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
223         return rc;
224 }
225
226 /**
227  * Lookup with version checking.
228  *
229  * This checks version of 'name'. Many reint functions uses 'name' for child not
230  * FID, therefore we need to get object by name and check its version.
231  */
232 int mdt_lookup_version_check(struct mdt_thread_info *info,
233                              struct mdt_object *p, struct lu_name *lname,
234                              struct lu_fid *fid, int idx)
235 {
236         int rc, vbrc;
237
238         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
239                         &info->mti_spec);
240         /* Check version only during replay */
241         if (!req_is_replay(mdt_info_req(info)))
242                 return rc;
243
244         info->mti_ver[idx] = ENOENT_VERSION;
245         if (rc == 0) {
246                 struct mdt_object *child;
247                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
248                 if (likely(!IS_ERR(child))) {
249                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
250                         mdt_object_put(info->mti_env, child);
251                 }
252         }
253         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
254         return vbrc ? vbrc : rc;
255
256 }
257
258 /*
259  * VBR: we save three versions in reply:
260  * 0 - parent. Check that parent version is the same during replay.
261  * 1 - name. Version of 'name' if file exists with the same name or
262  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
263  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
264  * check.
265  */
266 static int mdt_md_create(struct mdt_thread_info *info)
267 {
268         struct mdt_device       *mdt = info->mti_mdt;
269         struct mdt_object       *parent;
270         struct mdt_object       *child;
271         struct mdt_lock_handle  *lh;
272         struct mdt_body         *repbody;
273         struct md_attr          *ma = &info->mti_attr;
274         struct mdt_reint_record *rr = &info->mti_rr;
275         struct lu_name          *lname;
276         int rc;
277         ENTRY;
278
279         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
280                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
281
282         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
283
284         lh = &info->mti_lh[MDT_LH_PARENT];
285         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
286
287         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
288                                       MDS_INODELOCK_UPDATE);
289         if (IS_ERR(parent))
290                 RETURN(PTR_ERR(parent));
291
292         if (mdt_object_obf(parent))
293                 GOTO(out_put_parent, rc = -EPERM);
294
295         rc = mdt_version_get_check_save(info, parent, 0);
296         if (rc)
297                 GOTO(out_put_parent, rc);
298
299         /*
300          * Check child name version during replay.
301          * During create replay a file may exist with same name.
302          */
303         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
304         rc = mdt_lookup_version_check(info, parent, lname,
305                                       &info->mti_tmp_fid1, 1);
306         if (rc == 0)
307                 GOTO(out_put_parent, rc = -EEXIST);
308
309         /* -ENOENT is expected here */
310         if (rc != -ENOENT)
311                 GOTO(out_put_parent, rc);
312
313         /* save version of file name for replay, it must be ENOENT here */
314         mdt_enoent_version_save(info, 1);
315
316         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
317         if (likely(!IS_ERR(child))) {
318                 struct md_object *next = mdt_object_child(parent);
319
320                 ma->ma_need = MA_INODE;
321                 ma->ma_valid = 0;
322                 /* capa for cross-ref will be stored here */
323                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
324                                                      &RMF_CAPA1);
325                 LASSERT(ma->ma_capa);
326
327                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
328                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
329
330                 /* Version of child will be updated on disk. */
331                 info->mti_mos = child;
332                 rc = mdt_version_get_check_save(info, child, 2);
333                 if (rc)
334                         GOTO(out_put_child, rc);
335
336                 /* Let lower layer know current lock mode. */
337                 info->mti_spec.sp_cr_mode =
338                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
339
340                 /*
341                  * Do not perform lookup sanity check. We know that name does
342                  * not exist.
343                  */
344                 info->mti_spec.sp_cr_lookup = 0;
345                 info->mti_spec.sp_feat = &dt_directory_features;
346
347                 rc = mdo_create(info->mti_env, next, lname,
348                                 mdt_object_child(child),
349                                 &info->mti_spec, ma);
350                 if (rc == 0) {
351                         /* Return fid & attr to client. */
352                         if (ma->ma_valid & MA_INODE)
353                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
354                                                    mdt_object_fid(child));
355                 }
356 out_put_child:
357                 mdt_object_put(info->mti_env, child);
358         } else {
359                 rc = PTR_ERR(child);
360         }
361         mdt_create_pack_capa(info, rc, child, repbody);
362 out_put_parent:
363         mdt_object_unlock_put(info, parent, lh, rc);
364         RETURN(rc);
365 }
366
367 /* Partial request to create object only */
368 static int mdt_md_mkobj(struct mdt_thread_info *info)
369 {
370         struct mdt_device      *mdt = info->mti_mdt;
371         struct mdt_object      *o;
372         struct mdt_body        *repbody;
373         struct md_attr         *ma = &info->mti_attr;
374         int rc;
375         ENTRY;
376
377         DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
378                   PFID(info->mti_rr.rr_fid2));
379
380         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
381
382         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
383         if (!IS_ERR(o)) {
384                 struct md_object *next = mdt_object_child(o);
385
386                 ma->ma_need = MA_INODE;
387                 ma->ma_valid = 0;
388
389                 /*
390                  * Cross-ref create can encounter already created obj in case of
391                  * recovery, just get attr in that case.
392                  */
393                 if (mdt_object_exists(o) == 1) {
394                         rc = mo_attr_get(info->mti_env, next, ma);
395                 } else {
396                         /*
397                          * Here, NO permission check for object_create,
398                          * such check has been done on the original MDS.
399                          */
400                         rc = mo_object_create(info->mti_env, next,
401                                               &info->mti_spec, ma);
402                 }
403                 if (rc == 0) {
404                         /* Return fid & attr to client. */
405                         if (ma->ma_valid & MA_INODE)
406                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
407                                                    mdt_object_fid(o));
408                 }
409                 mdt_object_put(info->mti_env, o);
410         } else
411                 rc = PTR_ERR(o);
412
413         mdt_create_pack_capa(info, rc, o, repbody);
414         RETURN(rc);
415 }
416
417 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
418                  struct md_attr *ma, int flags)
419 {
420         struct mdt_lock_handle  *lh;
421         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
422         __u64 lockpart = MDS_INODELOCK_UPDATE;
423         int rc;
424         ENTRY;
425
426         /* attr shouldn't be set on remote object */
427         LASSERT(mdt_object_exists(mo) >= 0);
428
429         lh = &info->mti_lh[MDT_LH_PARENT];
430         mdt_lock_reg_init(lh, LCK_PW);
431
432         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
433                 lockpart |= MDS_INODELOCK_LOOKUP;
434
435         rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
436         if (rc != 0)
437                 RETURN(rc);
438
439         if (mdt_object_exists(mo) == 0)
440                 GOTO(out_unlock, rc = -ENOENT);
441
442         /* all attrs are packed into mti_attr in unpack_setattr */
443         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
444                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
445
446         /* This is only for set ctime when rename's source is on remote MDS. */
447         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
448                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
449
450         /* VBR: update version if attr changed are important for recovery */
451         if (do_vbr) {
452                 /* update on-disk version of changed object */
453                 info->mti_mos = mo;
454                 rc = mdt_version_get_check_save(info, mo, 0);
455                 if (rc)
456                         GOTO(out_unlock, rc);
457         }
458
459         /* all attrs are packed into mti_attr in unpack_setattr */
460         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
461         if (rc != 0)
462                 GOTO(out_unlock, rc);
463
464         EXIT;
465 out_unlock:
466         mdt_object_unlock(info, mo, lh, rc);
467         return rc;
468 }
469
470 static int mdt_reint_setattr(struct mdt_thread_info *info,
471                              struct mdt_lock_handle *lhc)
472 {
473         struct md_attr          *ma = &info->mti_attr;
474         struct mdt_reint_record *rr = &info->mti_rr;
475         struct ptlrpc_request   *req = mdt_info_req(info);
476         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
477         struct mdt_file_data    *mfd;
478         struct mdt_object       *mo;
479         struct md_object        *next;
480         struct mdt_body         *repbody;
481         int                      som_au, rc, rc2;
482         ENTRY;
483
484         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
485                   (unsigned int)ma->ma_attr.la_valid);
486
487         if (info->mti_dlm_req)
488                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
489
490         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
491         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
492         if (IS_ERR(mo))
493                 GOTO(out, rc = PTR_ERR(mo));
494
495         if (mdt_object_obf(mo))
496                 GOTO(out_put, rc = -EPERM);
497
498         /* start a log jounal handle if needed */
499         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
500                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
501                     (rr->rr_flags & MRF_OPEN_TRUNC)) {
502                         /* Check write access for the O_TRUNC case */
503                         if (mdt_write_read(mo) < 0)
504                                 GOTO(out_put, rc = -ETXTBSY);
505                 }
506         } else if (info->mti_ioepoch &&
507                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
508                 /* Truncate case. IOEpoch is opened. */
509                 rc = mdt_write_get(mo);
510                 if (rc)
511                         GOTO(out_put, rc);
512
513                 mfd = mdt_mfd_new();
514                 if (mfd == NULL) {
515                         mdt_write_put(mo);
516                         GOTO(out_put, rc = -ENOMEM);
517                 }
518
519                 mdt_ioepoch_open(info, mo, 0);
520                 repbody->ioepoch = mo->mot_ioepoch;
521
522                 mdt_object_get(info->mti_env, mo);
523                 mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC);
524                 mfd->mfd_object = mo;
525                 mfd->mfd_xid = req->rq_xid;
526
527                 cfs_spin_lock(&med->med_open_lock);
528                 cfs_list_add(&mfd->mfd_list, &med->med_open_head);
529                 cfs_spin_unlock(&med->med_open_lock);
530                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
531         }
532
533         som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE;
534         if (som_au) {
535                 /* SOM Attribute update case. Find the proper mfd and update
536                  * SOM attributes on the proper object. */
537                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
538                 LASSERT(info->mti_ioepoch);
539
540                 cfs_spin_lock(&med->med_open_lock);
541                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
542                 if (mfd == NULL) {
543                         cfs_spin_unlock(&med->med_open_lock);
544                         CDEBUG(D_INODE, "no handle for file close: "
545                                "fid = "DFID": cookie = "LPX64"\n",
546                                PFID(info->mti_rr.rr_fid1),
547                                info->mti_ioepoch->handle.cookie);
548                         GOTO(out_put, rc = -ESTALE);
549                 }
550                 LASSERT(mfd->mfd_mode == MDS_FMODE_SOM);
551                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
552
553                 class_handle_unhash(&mfd->mfd_handle);
554                 cfs_list_del_init(&mfd->mfd_list);
555                 cfs_spin_unlock(&med->med_open_lock);
556
557                 /* Close the found mfd, update attributes. */
558                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
559                 OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
560                 if (ma->ma_lmm == NULL)
561                         GOTO(out_put, rc = -ENOMEM);
562
563                 mdt_mfd_close(info, mfd);
564
565                 OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
566         } else {
567                 rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
568                 if (rc)
569                         GOTO(out_put, rc);
570         }
571
572         ma->ma_need = MA_INODE;
573         ma->ma_valid = 0;
574         next = mdt_object_child(mo);
575         rc = mo_attr_get(info->mti_env, next, ma);
576         if (rc != 0)
577                 GOTO(out_put, rc);
578
579         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
580
581         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
582             info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
583             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
584             (ma->ma_attr.la_valid & LA_SIZE) && !som_au) {
585                 struct lustre_capa *capa;
586
587                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
588                 LASSERT(capa);
589                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
590                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
591                 if (rc)
592                         GOTO(out_put, rc);
593                 repbody->valid |= OBD_MD_FLOSSCAPA;
594         }
595
596         EXIT;
597 out_put:
598         mdt_object_put(info->mti_env, mo);
599 out:
600         if (rc == 0)
601                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
602
603         mdt_client_compatibility(info);
604         rc2 = mdt_fix_reply(info);
605         if (rc == 0)
606                 rc = rc2;
607         return rc;
608 }
609
610 static int mdt_reint_create(struct mdt_thread_info *info,
611                             struct mdt_lock_handle *lhc)
612 {
613         struct ptlrpc_request   *req = mdt_info_req(info);
614         int                     rc;
615         ENTRY;
616
617         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
618                 RETURN(err_serious(-ESTALE));
619
620         if (info->mti_dlm_req)
621                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
622
623         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
624         case S_IFDIR:{
625                 /* Cross-ref case. */
626                 /* TODO: we can add LPROC_MDT_CROSS for cross-ref stats */
627                 if (info->mti_cross_ref) {
628                         rc = mdt_md_mkobj(info);
629                 } else {
630                         LASSERT(info->mti_rr.rr_namelen > 0);
631                         mdt_counter_incr(req, LPROC_MDT_MKDIR);
632                         rc = mdt_md_create(info);
633                 }
634                 break;
635         }
636         case S_IFREG:
637         case S_IFLNK:
638         case S_IFCHR:
639         case S_IFBLK:
640         case S_IFIFO:
641         case S_IFSOCK:{
642                 /* Special file should stay on the same node as parent. */
643                 LASSERT(info->mti_rr.rr_namelen > 0);
644                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
645                 rc = mdt_md_create(info);
646                 break;
647         }
648         default:
649                 rc = err_serious(-EOPNOTSUPP);
650         }
651         RETURN(rc);
652 }
653
654 /*
655  * VBR: save parent version in reply and child version getting by its name.
656  * Version of child is getting and checking during its lookup. If
657  */
658 static int mdt_reint_unlink(struct mdt_thread_info *info,
659                             struct mdt_lock_handle *lhc)
660 {
661         struct mdt_reint_record *rr = &info->mti_rr;
662         struct ptlrpc_request   *req = mdt_info_req(info);
663         struct md_attr          *ma = &info->mti_attr;
664         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
665         struct mdt_object       *mp;
666         struct mdt_object       *mc;
667         struct mdt_lock_handle  *parent_lh;
668         struct mdt_lock_handle  *child_lh;
669         struct lu_name          *lname;
670         int                      rc;
671         ENTRY;
672
673         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
674                   rr->rr_name);
675
676         if (info->mti_dlm_req)
677                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
678
679         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
680                 RETURN(err_serious(-ENOENT));
681
682         /*
683          * step 1: lock the parent. Note, this may be child in case of
684          * remote operation denoted by ->mti_cross_ref flag.
685          */
686         parent_lh = &info->mti_lh[MDT_LH_PARENT];
687         if (info->mti_cross_ref) {
688                 /*
689                  * Init reg lock for cross ref case when we need to do only
690                  * ref del locally.
691                  */
692                 mdt_lock_reg_init(parent_lh, LCK_PW);
693         } else {
694                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
695                                   rr->rr_namelen);
696         }
697         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
698                                   MDS_INODELOCK_UPDATE);
699         if (IS_ERR(mp)) {
700                 rc = PTR_ERR(mp);
701                 /* errors are possible here in cross-ref cases, see below */
702                 if (info->mti_cross_ref)
703                         rc = 0;
704                 GOTO(out, rc);
705         }
706
707         if (mdt_object_obf(mp))
708                 GOTO(out_unlock_parent, rc = -EPERM);
709
710         rc = mdt_version_get_check_save(info, mp, 0);
711         if (rc)
712                 GOTO(out_unlock_parent, rc);
713
714         mdt_reint_init_ma(info, ma);
715         if (!ma->ma_lmm || !ma->ma_cookie)
716                 GOTO(out_unlock_parent, rc = -EINVAL);
717
718         if (info->mti_cross_ref) {
719                 /*
720                  * Remote partial operation. It is possible that replay may
721                  * happen on parent MDT and this operation will be repeated.
722                  * Therefore the object absense is allowed case and nothing
723                  * should be done here.
724                  */
725                 if (mdt_object_exists(mp) > 0) {
726                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
727                         rc = mo_ref_del(info->mti_env,
728                                         mdt_object_child(mp), ma);
729                         if (rc == 0)
730                                 mdt_handle_last_unlink(info, mp, ma);
731                 } else
732                         rc = 0;
733                 GOTO(out_unlock_parent, rc);
734         }
735
736         /* step 2: find & lock the child */
737         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
738         /* lookup child object along with version checking */
739         fid_zero(child_fid);
740         rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
741         if (rc != 0)
742                  GOTO(out_unlock_parent, rc);
743
744         /* We will lock the child regardless it is local or remote. No harm. */
745         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
746         if (IS_ERR(mc))
747                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
748         child_lh = &info->mti_lh[MDT_LH_CHILD];
749         mdt_lock_reg_init(child_lh, LCK_EX);
750         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
751                              MDT_CROSS_LOCK);
752         if (rc != 0) {
753                 mdt_object_put(info->mti_env, mc);
754                 GOTO(out_unlock_parent, rc);
755         }
756
757         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
758                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
759         /* save version when object is locked */
760         mdt_version_get_save(info, mc, 1);
761         /*
762          * Now we can only make sure we need MA_INODE, in mdd layer, will check
763          * whether need MA_LOV and MA_COOKIE.
764          */
765         ma->ma_need = MA_INODE;
766         ma->ma_valid = 0;
767         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
768         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
769                         mdt_object_child(mc), lname, ma);
770         if (rc == 0)
771                 mdt_handle_last_unlink(info, mc, ma);
772
773         if (ma->ma_valid & MA_INODE) {
774                 switch (ma->ma_attr.la_mode & S_IFMT) {
775                 case S_IFDIR:
776                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
777                         break;
778                 case S_IFREG:
779                 case S_IFLNK:
780                 case S_IFCHR:
781                 case S_IFBLK:
782                 case S_IFIFO:
783                 case S_IFSOCK:
784                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
785                         break;
786                 default:
787                         LASSERTF(0, "bad file type %o unlinking\n",
788                                  ma->ma_attr.la_mode);
789                 }
790         }
791
792         EXIT;
793
794         mdt_object_unlock_put(info, mc, child_lh, rc);
795 out_unlock_parent:
796         mdt_object_unlock_put(info, mp, parent_lh, rc);
797 out:
798         return rc;
799 }
800
801 /*
802  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
803  * name.
804  */
805 static int mdt_reint_link(struct mdt_thread_info *info,
806                           struct mdt_lock_handle *lhc)
807 {
808         struct mdt_reint_record *rr = &info->mti_rr;
809         struct ptlrpc_request   *req = mdt_info_req(info);
810         struct md_attr          *ma = &info->mti_attr;
811         struct mdt_object       *ms;
812         struct mdt_object       *mp;
813         struct mdt_lock_handle  *lhs;
814         struct mdt_lock_handle  *lhp;
815         struct lu_name          *lname;
816         int rc;
817         ENTRY;
818
819         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
820                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
821
822         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
823                 RETURN(err_serious(-ENOENT));
824
825         if (info->mti_dlm_req)
826                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
827
828         if (info->mti_cross_ref) {
829                 /* MDT holding name ask us to add ref. */
830                 lhs = &info->mti_lh[MDT_LH_CHILD];
831                 mdt_lock_reg_init(lhs, LCK_EX);
832                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
833                                           MDS_INODELOCK_UPDATE);
834                 if (IS_ERR(ms))
835                         RETURN(PTR_ERR(ms));
836
837                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
838                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
839                 mdt_object_unlock_put(info, ms, lhs, rc);
840                 RETURN(rc);
841         }
842
843         /* Invalid case so return error immediately instead of
844          * processing it */
845         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
846                 RETURN(-EPERM);
847
848         /* step 1: find & lock the target parent dir */
849         lhp = &info->mti_lh[MDT_LH_PARENT];
850         mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name,
851                           rr->rr_namelen);
852         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
853                                   MDS_INODELOCK_UPDATE);
854         if (IS_ERR(mp))
855                 RETURN(PTR_ERR(mp));
856
857         if (mdt_object_obf(mp))
858                 GOTO(out_unlock_parent, rc = -EPERM);
859
860         rc = mdt_version_get_check_save(info, mp, 0);
861         if (rc)
862                 GOTO(out_unlock_parent, rc);
863
864         /* step 2: find & lock the source */
865         lhs = &info->mti_lh[MDT_LH_CHILD];
866         mdt_lock_reg_init(lhs, LCK_EX);
867
868         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
869         if (IS_ERR(ms))
870                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
871
872         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
873                             MDT_CROSS_LOCK);
874         if (rc != 0) {
875                 mdt_object_put(info->mti_env, ms);
876                 GOTO(out_unlock_parent, rc);
877         }
878
879         /* step 3: link it */
880         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
881                        OBD_FAIL_MDS_REINT_LINK_WRITE);
882
883         info->mti_mos = ms;
884         rc = mdt_version_get_check_save(info, ms, 1);
885         if (rc)
886                 GOTO(out_unlock_child, rc);
887
888         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
889         /** check target version by name during replay */
890         rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
891         if (rc != 0 && rc != -ENOENT)
892                 GOTO(out_unlock_child, rc);
893         /* save version of file name for replay, it must be ENOENT here */
894         if (!req_is_replay(mdt_info_req(info))) {
895                 info->mti_ver[2] = ENOENT_VERSION;
896                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
897         }
898
899         rc = mdo_link(info->mti_env, mdt_object_child(mp),
900                       mdt_object_child(ms), lname, ma);
901
902         if (rc == 0)
903                 mdt_counter_incr(req, LPROC_MDT_LINK);
904
905         EXIT;
906 out_unlock_child:
907         mdt_object_unlock_put(info, ms, lhs, rc);
908 out_unlock_parent:
909         mdt_object_unlock_put(info, mp, lhp, rc);
910         return rc;
911 }
912 /**
913  * lock the part of the directory according to the hash of the name
914  * (lh->mlh_pdo_hash) in parallel directory lock.
915  */
916 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
917                        struct mdt_lock_handle *lh,
918                        struct mdt_object *obj, __u64 ibits)
919 {
920         struct ldlm_res_id *res_id = &info->mti_res_id;
921         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
922         ldlm_policy_data_t *policy = &info->mti_policy;
923         int rc;
924
925         /*
926          * Finish res_id initializing by name hash marking part of
927          * directory which is taking modification.
928          */
929         LASSERT(lh->mlh_pdo_hash != 0);
930         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
931         memset(policy, 0, sizeof(*policy));
932         policy->l_inodebits.bits = ibits;
933         /*
934          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
935          * going to be sent to client. If it is - mdt_intent_policy() path will
936          * fix it up and turn FL_LOCAL flag off.
937          */
938         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
939                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
940                           &info->mti_exp->exp_handle.h_cookie);
941         return rc;
942 }
943
944 /* partial operation for rename */
945 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
946 {
947         struct mdt_reint_record *rr = &info->mti_rr;
948         struct ptlrpc_request   *req = mdt_info_req(info);
949         struct md_attr          *ma = &info->mti_attr;
950         struct mdt_object       *mtgtdir;
951         struct mdt_object       *mtgt = NULL;
952         struct mdt_lock_handle  *lh_tgtdir;
953         struct mdt_lock_handle  *lh_tgt = NULL;
954         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
955         struct lu_name          *lname;
956         int                      rc;
957         ENTRY;
958
959         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
960                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
961
962         /* step 1: lookup & lock the tgt dir. */
963         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
964         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
965                           rr->rr_tgtlen);
966         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
967                                        MDS_INODELOCK_UPDATE);
968         if (IS_ERR(mtgtdir))
969                 RETURN(PTR_ERR(mtgtdir));
970
971         /* step 2: find & lock the target object if exists. */
972         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
973         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
974         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
975                         lname, tgt_fid, &info->mti_spec);
976         if (rc != 0 && rc != -ENOENT) {
977                 GOTO(out_unlock_tgtdir, rc);
978         } else if (rc == 0) {
979                 /*
980                  * In case of replay that name can be already inserted, check
981                  * that and do nothing if so.
982                  */
983                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
984                         GOTO(out_unlock_tgtdir, rc);
985
986                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
987                 mdt_lock_reg_init(lh_tgt, LCK_EX);
988
989                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
990                                             MDS_INODELOCK_LOOKUP);
991                 if (IS_ERR(mtgt))
992                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
993
994                 mdt_reint_init_ma(info, ma);
995                 if (!ma->ma_lmm || !ma->ma_cookie)
996                         GOTO(out_unlock_tgt, rc = -EINVAL);
997
998                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
999                                     mdt_object_child(mtgt), rr->rr_fid2,
1000                                     lname, ma);
1001         } else /* -ENOENT */ {
1002                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
1003                                      lname, rr->rr_fid2, ma);
1004         }
1005
1006         /* handle last link of tgt object */
1007         if (rc == 0 && mtgt)
1008                 mdt_handle_last_unlink(info, mtgt, ma);
1009
1010         EXIT;
1011 out_unlock_tgt:
1012         if (mtgt)
1013                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
1014 out_unlock_tgtdir:
1015         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
1016         return rc;
1017 }
1018
1019 static int mdt_rename_lock(struct mdt_thread_info *info,
1020                            struct lustre_handle *lh)
1021 {
1022         struct ldlm_namespace *ns     = info->mti_mdt->mdt_namespace;
1023         ldlm_policy_data_t    *policy = &info->mti_policy;
1024         struct ldlm_res_id    *res_id = &info->mti_res_id;
1025         struct md_site        *ms;
1026         int rc;
1027         ENTRY;
1028
1029         ms = mdt_md_site(info->mti_mdt);
1030         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1031
1032         memset(policy, 0, sizeof *policy);
1033         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1034
1035         if (ms->ms_control_exp == NULL) {
1036                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1037
1038                 /*
1039                  * Current node is controller, that is mdt0, where we should
1040                  * take BFL lock.
1041                  */
1042                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
1043                                             LCK_EX, &flags, ldlm_blocking_ast,
1044                                             ldlm_completion_ast, NULL, NULL, 0,
1045                                             &info->mti_exp->exp_handle.h_cookie,
1046                                             lh);
1047         } else {
1048                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
1049                      ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
1050                 int flags = 0;
1051
1052                 /*
1053                  * This is the case mdt0 is remote node, issue DLM lock like
1054                  * other clients.
1055                  */
1056                 rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
1057                                       policy, &flags, NULL, 0, lh, 0);
1058         }
1059
1060         RETURN(rc);
1061 }
1062
1063 static void mdt_rename_unlock(struct lustre_handle *lh)
1064 {
1065         ENTRY;
1066         LASSERT(lustre_handle_is_used(lh));
1067         ldlm_lock_decref(lh, LCK_EX);
1068         EXIT;
1069 }
1070
1071 /*
1072  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1073  * target. Source should not be ancestor of target dir. May be other rename
1074  * checks can be moved here later.
1075  */
1076 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
1077 {
1078         struct mdt_reint_record *rr = &info->mti_rr;
1079         struct lu_fid dst_fid = *rr->rr_fid2;
1080         struct mdt_object *dst;
1081         int rc = 0;
1082         ENTRY;
1083
1084         do {
1085                 LASSERT(fid_is_sane(&dst_fid));
1086                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
1087                 if (!IS_ERR(dst)) {
1088                         rc = mdo_is_subdir(info->mti_env,
1089                                            mdt_object_child(dst), fid,
1090                                            &dst_fid);
1091                         mdt_object_put(info->mti_env, dst);
1092                         if (rc != -EREMOTE && rc < 0) {
1093                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
1094                         } else {
1095                                 /* check the found fid */
1096                                 if (lu_fid_eq(&dst_fid, fid))
1097                                         rc = -EINVAL;
1098                         }
1099                 } else {
1100                         rc = PTR_ERR(dst);
1101                 }
1102         } while (rc == -EREMOTE);
1103
1104         RETURN(rc);
1105 }
1106
1107 /*
1108  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1109  * 2 - src child; 3 - tgt child.
1110  * Update on disk version of src child.
1111  */
1112 static int mdt_reint_rename(struct mdt_thread_info *info,
1113                             struct mdt_lock_handle *lhc)
1114 {
1115         struct mdt_reint_record *rr = &info->mti_rr;
1116         struct md_attr          *ma = &info->mti_attr;
1117         struct ptlrpc_request   *req = mdt_info_req(info);
1118         struct mdt_object       *msrcdir;
1119         struct mdt_object       *mtgtdir;
1120         struct mdt_object       *mold;
1121         struct mdt_object       *mnew = NULL;
1122         struct mdt_lock_handle  *lh_srcdirp;
1123         struct mdt_lock_handle  *lh_tgtdirp;
1124         struct mdt_lock_handle  *lh_oldp;
1125         struct mdt_lock_handle  *lh_newp;
1126         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1127         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1128         struct lustre_handle     rename_lh = { 0 };
1129         struct lu_name           slname = { 0 };
1130         struct lu_name          *lname;
1131         int                      rc;
1132         ENTRY;
1133
1134         if (info->mti_dlm_req)
1135                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
1136
1137         if (info->mti_cross_ref) {
1138                 rc = mdt_reint_rename_tgt(info);
1139                 RETURN(rc);
1140         }
1141
1142         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
1143                   PFID(rr->rr_fid1), rr->rr_name,
1144                   PFID(rr->rr_fid2), rr->rr_tgt);
1145
1146         rc = mdt_rename_lock(info, &rename_lh);
1147         if (rc) {
1148                 CERROR("Can't lock FS for rename, rc %d\n", rc);
1149                 RETURN(rc);
1150         }
1151
1152         lh_newp = &info->mti_lh[MDT_LH_NEW];
1153
1154         /* step 1: lock the source dir. */
1155         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1156         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
1157                           rr->rr_namelen);
1158         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
1159                                        MDS_INODELOCK_UPDATE);
1160         if (IS_ERR(msrcdir))
1161                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
1162
1163         if (mdt_object_obf(msrcdir))
1164                 GOTO(out_unlock_source, rc = -EPERM);
1165
1166         rc = mdt_version_get_check_save(info, msrcdir, 0);
1167         if (rc)
1168                 GOTO(out_unlock_source, rc);
1169
1170         /* step 2: find & lock the target dir. */
1171         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1172         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
1173                           rr->rr_tgtlen);
1174         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1175                 mdt_object_get(info->mti_env, msrcdir);
1176                 mtgtdir = msrcdir;
1177                 if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
1178                          rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
1179                                                  MDS_INODELOCK_UPDATE);
1180                          if (rc)
1181                                  GOTO(out_unlock_source, rc);
1182                          OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1183                 }
1184         } else {
1185                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
1186                                           rr->rr_fid2);
1187                 if (IS_ERR(mtgtdir))
1188                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
1189
1190                 if (mdt_object_obf(mtgtdir))
1191                         GOTO(out_put_target, rc = -EPERM);
1192
1193                 /* check early, the real version will be saved after locking */
1194                 rc = mdt_version_get_check(info, mtgtdir, 1);
1195                 if (rc)
1196                         GOTO(out_put_target, rc);
1197
1198                 rc = mdt_object_exists(mtgtdir);
1199                 if (rc == 0) {
1200                         GOTO(out_put_target, rc = -ESTALE);
1201                 } else if (rc > 0) {
1202                         /* we lock the target dir if it is local */
1203                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
1204                                              MDS_INODELOCK_UPDATE,
1205                                              MDT_LOCAL_LOCK);
1206                         if (rc != 0)
1207                                 GOTO(out_put_target, rc);
1208                         /* get and save correct version after locking */
1209                         mdt_version_get_save(info, mtgtdir, 1);
1210                 }
1211         }
1212
1213         /* step 3: find & lock the old object. */
1214         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
1215         mdt_name_copy(&slname, lname);
1216         fid_zero(old_fid);
1217         rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
1218         if (rc != 0)
1219                 GOTO(out_unlock_target, rc);
1220
1221         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1222                 GOTO(out_unlock_target, rc = -EINVAL);
1223
1224         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1225         if (IS_ERR(mold))
1226                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
1227
1228         if (mdt_object_obf(mold)) {
1229                 mdt_object_put(info->mti_env, mold);
1230                 GOTO(out_unlock_target, rc = -EPERM);
1231         }
1232
1233         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1234         mdt_lock_reg_init(lh_oldp, LCK_EX);
1235         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1236                              MDT_CROSS_LOCK);
1237         if (rc != 0) {
1238                 mdt_object_put(info->mti_env, mold);
1239                 GOTO(out_unlock_target, rc);
1240         }
1241
1242         info->mti_mos = mold;
1243         /* save version after locking */
1244         mdt_version_get_save(info, mold, 2);
1245         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1246
1247         /* step 4: find & lock the new object. */
1248         /* new target object may not exist now */
1249         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1250         /* lookup with version checking */
1251         fid_zero(new_fid);
1252         rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
1253         if (rc == 0) {
1254                 /* the new_fid should have been filled at this moment */
1255                 if (lu_fid_eq(old_fid, new_fid))
1256                        GOTO(out_unlock_old, rc);
1257
1258                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1259                     lu_fid_eq(new_fid, rr->rr_fid2))
1260                         GOTO(out_unlock_old, rc = -EINVAL);
1261
1262                 mdt_lock_reg_init(lh_newp, LCK_EX);
1263                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1264                 if (IS_ERR(mnew))
1265                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1266
1267                 if (mdt_object_obf(mnew)) {
1268                         mdt_object_put(info->mti_env, mnew);
1269                         GOTO(out_unlock_old, rc = -EPERM);
1270                 }
1271
1272                 rc = mdt_object_lock(info, mnew, lh_newp,
1273                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1274                 if (rc != 0) {
1275                         mdt_object_put(info->mti_env, mnew);
1276                         GOTO(out_unlock_old, rc);
1277                 }
1278                 /* get and save version after locking */
1279                 mdt_version_get_save(info, mnew, 3);
1280                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1281         } else if (rc != -EREMOTE && rc != -ENOENT) {
1282                 GOTO(out_unlock_old, rc);
1283         } else {
1284                 mdt_enoent_version_save(info, 3);
1285         }
1286
1287         /* step 5: rename it */
1288         mdt_reint_init_ma(info, ma);
1289         if (!ma->ma_lmm || !ma->ma_cookie)
1290                 GOTO(out_unlock_new, rc = -EINVAL);
1291
1292         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1293                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1294
1295
1296         /* Check if @dst is subdir of @src. */
1297         rc = mdt_rename_sanity(info, old_fid);
1298         if (rc)
1299                 GOTO(out_unlock_new, rc);
1300
1301         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1302                         mdt_object_child(mtgtdir), old_fid, &slname,
1303                         (mnew ? mdt_object_child(mnew) : NULL),
1304                         lname, ma);
1305
1306         /* handle last link of tgt object */
1307         if (rc == 0) {
1308                 mdt_counter_incr(req, LPROC_MDT_RENAME);
1309                 if (mnew)
1310                         mdt_handle_last_unlink(info, mnew, ma);
1311
1312                 mdt_rename_counter_tally(info, info->mti_mdt, req,
1313                                          msrcdir, mtgtdir);
1314         }
1315
1316         EXIT;
1317 out_unlock_new:
1318         if (mnew)
1319                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1320 out_unlock_old:
1321         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1322 out_unlock_target:
1323         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
1324 out_put_target:
1325         mdt_object_put(info->mti_env, mtgtdir);
1326 out_unlock_source:
1327         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1328 out_rename_lock:
1329         mdt_rename_unlock(&rename_lh);
1330         return rc;
1331 }
1332
1333 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1334                            struct mdt_lock_handle *lhc);
1335
1336 static mdt_reinter reinters[REINT_MAX] = {
1337         [REINT_SETATTR]  = mdt_reint_setattr,
1338         [REINT_CREATE]   = mdt_reint_create,
1339         [REINT_LINK]     = mdt_reint_link,
1340         [REINT_UNLINK]   = mdt_reint_unlink,
1341         [REINT_RENAME]   = mdt_reint_rename,
1342         [REINT_OPEN]     = mdt_reint_open,
1343         [REINT_SETXATTR] = mdt_reint_setxattr
1344 };
1345
1346 int mdt_reint_rec(struct mdt_thread_info *info,
1347                   struct mdt_lock_handle *lhc)
1348 {
1349         int rc;
1350         ENTRY;
1351
1352         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1353
1354         RETURN(rc);
1355 }