Whamcloud - gitweb
7a53b7ab71902afa08bf57d9ca8623b4fba90a58
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdt/mdt_reint.c
33  *
34  * Lustre Metadata Target (mdt) reintegration routines
35  *
36  * Author: Peter Braam <braam@clusterfs.com>
37  * Author: Andreas Dilger <adilger@clusterfs.com>
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Huang Hua <huanghua@clusterfs.com>
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <lprocfs_status.h>
46 #include "mdt_internal.h"
47 #include <lustre_lmv.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84         ENTRY;
85
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
210                 if (likely(!IS_ERR(child))) {
211                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
212                         mdt_object_put(info->mti_env, child);
213                 }
214         }
215         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
216         return vbrc ? vbrc : rc;
217
218 }
219
220 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
221                              struct mdt_object *obj,
222                              struct ldlm_enqueue_info *einfo,
223                              int decref)
224 {
225         union ldlm_policy_data *policy = &mti->mti_policy;
226         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
227         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
228         int i;
229
230         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
231         LASSERT(slave_locks);
232
233         memset(policy, 0, sizeof(*policy));
234         policy->l_inodebits.bits = einfo->ei_inodebits;
235         mdt_lock_handle_init(lh);
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < slave_locks->ha_count; i++) {
238                 if (test_bit(i, (void *)slave_locks->ha_map))
239                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 slave_locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
279                            enum ldlm_mode mode, __u64 ibits,
280                            struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285
286         einfo->ei_type = LDLM_IBITS;
287         einfo->ei_mode = mode;
288         einfo->ei_cb_bl = mdt_remote_blocking_ast;
289         einfo->ei_cb_local_bl = mdt_blocking_ast;
290         einfo->ei_cb_cp = ldlm_completion_ast;
291         einfo->ei_enq_slave = 1;
292         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
293         einfo->ei_inodebits = ibits;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 int mdt_reint_striped_lock(struct mdt_thread_info *info,
302                            struct mdt_object *o,
303                            struct mdt_lock_handle *lh,
304                            __u64 ibits,
305                            struct ldlm_enqueue_info *einfo,
306                            bool cos_incompat)
307 {
308         int rc;
309
310         LASSERT(!mdt_object_remote(o));
311
312         memset(einfo, 0, sizeof(*einfo));
313
314         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
315         if (rc)
316                 return rc;
317
318         rc = mdt_object_striped(info, o);
319         if (rc != 1) {
320                 if (rc < 0)
321                         mdt_object_unlock(info, o, lh, rc);
322                 return rc;
323         }
324
325         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
326         if (rc) {
327                 mdt_object_unlock(info, o, lh, rc);
328                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
329                         rc = 0;
330         }
331
332         return rc;
333 }
334
335 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
336                               struct mdt_object *o,
337                               struct mdt_lock_handle *lh,
338                               struct ldlm_enqueue_info *einfo, int decref)
339 {
340         if (einfo->ei_cbdata)
341                 mdt_unlock_slaves(info, o, einfo, decref);
342         mdt_object_unlock(info, o, lh, decref);
343 }
344
345 static int mdt_restripe(struct mdt_thread_info *info,
346                         struct mdt_object *parent,
347                         const struct lu_name *lname,
348                         const struct lu_fid *tfid,
349                         struct md_op_spec *spec,
350                         struct md_attr *ma)
351 {
352         struct mdt_device *mdt = info->mti_mdt;
353         struct lu_fid *fid = &info->mti_tmp_fid2;
354         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
355         struct lmv_mds_md_v1 *lmv;
356         struct mdt_object *child;
357         struct mdt_lock_handle *lhp;
358         struct mdt_lock_handle *lhc;
359         struct mdt_body *repbody;
360         int rc;
361
362         ENTRY;
363
364         if (!mdt->mdt_enable_dir_restripe)
365                 RETURN(-EPERM);
366
367         rc = mdt_version_get_check_save(info, parent, 0);
368         if (rc)
369                 RETURN(rc);
370
371         lhp = &info->mti_lh[MDT_LH_PARENT];
372         mdt_lock_pdo_init(lhp, LCK_PW, lname);
373         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
374                                    true);
375         if (rc)
376                 RETURN(rc);
377
378         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
379         if (rc)
380                 GOTO(unlock_parent, rc);
381
382         if (ma->ma_valid & MA_LMV) {
383                 /* don't allow restripe if parent dir layout is changing */
384                 lmv = &ma->ma_lmv->lmv_md_v1;
385                 if (!lmv_is_sane(lmv))
386                         GOTO(unlock_parent, rc = -EBADF);
387
388                 if (lmv_is_layout_changing(lmv))
389                         GOTO(unlock_parent, rc = -EBUSY);
390         }
391
392         fid_zero(fid);
393         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
394         if (rc)
395                 GOTO(unlock_parent, rc);
396
397         child = mdt_object_find(info->mti_env, mdt, fid);
398         if (IS_ERR(child))
399                 GOTO(unlock_parent, rc = PTR_ERR(child));
400
401         if (!mdt_object_exists(child))
402                 GOTO(out_child, rc = -ENOENT);
403
404         if (mdt_object_remote(child)) {
405                 struct mdt_body *repbody;
406
407                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
408                 if (!repbody)
409                         GOTO(out_child, rc = -EPROTO);
410
411                 repbody->mbo_fid1 = *fid;
412                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
413                 GOTO(out_child, rc = -EREMOTE);
414         }
415
416         /* lock object */
417         lhc = &info->mti_lh[MDT_LH_CHILD];
418         mdt_lock_reg_init(lhc, LCK_EX);
419
420         /* enqueue object remote LOOKUP lock */
421         if (mdt_object_remote(parent)) {
422                 rc = mdt_remote_object_lock(info, parent, fid,
423                                             &lhc->mlh_rreg_lh,
424                                             lhc->mlh_rreg_mode,
425                                             MDS_INODELOCK_LOOKUP, false);
426                 if (rc != ELDLM_OK)
427                         GOTO(out_child, rc);
428         }
429
430         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
431                                     true);
432         if (rc)
433                 GOTO(unlock_child, rc);
434
435         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
436         rc = mdt_version_get_check_save(info, child, 1);
437         if (rc)
438                 GOTO(unlock_child, rc);
439
440         spin_lock(&mdt->mdt_restriper.mdr_lock);
441         if (child->mot_restriping) {
442                 /* race? */
443                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
444                 GOTO(unlock_child, rc = -EBUSY);
445         }
446         child->mot_restriping = 1;
447         spin_unlock(&mdt->mdt_restriper.mdr_lock);
448
449         *fid = *tfid;
450         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
451         if (rc)
452                 GOTO(restriping_clear, rc);
453
454         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
455         if (!repbody)
456                 GOTO(restriping_clear, rc = -EPROTO);
457
458         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
459         EXIT;
460
461 restriping_clear:
462         child->mot_restriping = 0;
463 unlock_child:
464         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
465 out_child:
466         mdt_object_put(info->mti_env, child);
467 unlock_parent:
468         mdt_object_unlock(info, parent, lhp, rc);
469
470         return rc;
471 }
472
473 /*
474  * VBR: we save three versions in reply:
475  * 0 - parent. Check that parent version is the same during replay.
476  * 1 - name. Version of 'name' if file exists with the same name or
477  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
478  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
479  * check.
480  */
481 static int mdt_create(struct mdt_thread_info *info)
482 {
483         struct mdt_device *mdt = info->mti_mdt;
484         struct mdt_object *parent;
485         struct mdt_object *child;
486         struct mdt_lock_handle *lh;
487         struct mdt_body *repbody;
488         struct md_attr *ma = &info->mti_attr;
489         struct mdt_reint_record *rr = &info->mti_rr;
490         struct md_op_spec *spec = &info->mti_spec;
491         bool restripe = false;
492         int rc;
493         ENTRY;
494
495         DEBUG_REQ(D_INODE, mdt_info_req(info),
496                   "Create ("DNAME"->"DFID") in "DFID,
497                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
498
499         if (!fid_is_md_operative(rr->rr_fid1))
500                 RETURN(-EPERM);
501
502         if (S_ISDIR(ma->ma_attr.la_mode) &&
503             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
504                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
505                 struct lu_ucred *uc = mdt_ucred(info);
506                 struct obd_export *exp = mdt_info_req(info)->rq_export;
507
508                 /* Only new clients can create remote dir( >= 2.4) and
509                  * striped dir(>= 2.6), old client will return -ENOTSUPP */
510                 if (!mdt_is_dne_client(exp))
511                         RETURN(-ENOTSUPP);
512
513                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
514                         if (!mdt_is_striped_client(exp))
515                                 RETURN(-ENOTSUPP);
516
517                         if (!mdt->mdt_enable_striped_dir)
518                                 RETURN(-EPERM);
519                 } else if (!mdt->mdt_enable_remote_dir) {
520                         RETURN(-EPERM);
521                 }
522
523                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
524                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
525                     LMV_HASH_TYPE_CRUSH)
526                         RETURN(-EPROTO);
527
528                 if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
529                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
530                     mdt->mdt_enable_remote_dir_gid != -1)
531                         RETURN(-EPERM);
532
533                 /* restripe if later found dir exists */
534                 if (le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT)
535                         restripe = true;
536         }
537
538         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
539
540         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
541         if (IS_ERR(parent))
542                 RETURN(PTR_ERR(parent));
543
544         if (!mdt_object_exists(parent))
545                 GOTO(put_parent, rc = -ENOENT);
546
547         /*
548          * LU-10235: check if name exists locklessly first to avoid massive
549          * lock recalls on existing directories.
550          */
551         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
552                                       &info->mti_tmp_fid1, 1);
553         if (rc == 0) {
554                 if (!restripe)
555                         GOTO(put_parent, rc = -EEXIST);
556
557                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
558                                   ma);
559         }
560
561         /* -ENOENT is expected here */
562         if (rc != -ENOENT)
563                 GOTO(put_parent, rc);
564
565         /* save version of file name for replay, it must be ENOENT here */
566         mdt_enoent_version_save(info, 1);
567
568         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
569
570         lh = &info->mti_lh[MDT_LH_PARENT];
571         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
572         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
573         if (rc)
574                 GOTO(put_parent, rc);
575
576         if (!mdt_object_remote(parent)) {
577                 rc = mdt_version_get_check_save(info, parent, 0);
578                 if (rc)
579                         GOTO(unlock_parent, rc);
580         }
581
582         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
583         if (unlikely(IS_ERR(child)))
584                 GOTO(unlock_parent, rc = PTR_ERR(child));
585
586         ma->ma_need = MA_INODE;
587         ma->ma_valid = 0;
588
589         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
590                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
591
592         /* Version of child will be updated on disk. */
593         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
594         rc = mdt_version_get_check_save(info, child, 2);
595         if (rc)
596                 GOTO(put_child, rc);
597
598         /* Let lower layer know current lock mode. */
599         info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
600
601         /*
602          * Do not perform lookup sanity check. We know that name does
603          * not exist.
604          */
605         info->mti_spec.sp_cr_lookup = 0;
606         info->mti_spec.sp_feat = &dt_directory_features;
607
608         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
609                         mdt_object_child(child), &info->mti_spec, ma);
610         if (rc == 0)
611                 rc = mdt_attr_get_complex(info, child, ma);
612
613         if (rc < 0)
614                 GOTO(put_child, rc);
615
616         /*
617          * On DNE, we need to eliminate dependey between 'mkdir a' and
618          * 'mkdir a/b' if b is a striped directory, to achieve this, two
619          * things are done below:
620          * 1. save child and slaves lock.
621          * 2. if the child is a striped directory, relock parent so to
622          *    compare against with COS locks to ensure parent was
623          *    committed to disk.
624          */
625         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
626                 struct mdt_lock_handle *lhc;
627                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
628                 bool cos_incompat;
629
630                 rc = mdt_object_striped(info, child);
631                 if (rc < 0)
632                         GOTO(put_child, rc);
633
634                 cos_incompat = rc;
635                 if (cos_incompat) {
636                         if (!mdt_object_remote(parent)) {
637                                 mdt_object_unlock(info, parent, lh, 1);
638                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
639                                 rc = mdt_reint_object_lock(info, parent, lh,
640                                                            MDS_INODELOCK_UPDATE,
641                                                            true);
642                                 if (rc)
643                                         GOTO(put_child, rc);
644                         }
645                 }
646
647                 lhc = &info->mti_lh[MDT_LH_CHILD];
648                 mdt_lock_handle_init(lhc);
649                 mdt_lock_reg_init(lhc, LCK_PW);
650                 rc = mdt_reint_striped_lock(info, child, lhc,
651                                             MDS_INODELOCK_UPDATE, einfo,
652                                             cos_incompat);
653                 if (rc)
654                         GOTO(put_child, rc);
655
656                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
657         }
658
659         /* Return fid & attr to client. */
660         if (ma->ma_valid & MA_INODE)
661                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
662                                    mdt_object_fid(child));
663         EXIT;
664 put_child:
665         mdt_object_put(info->mti_env, child);
666 unlock_parent:
667         mdt_object_unlock(info, parent, lh, rc);
668 put_parent:
669         mdt_object_put(info->mti_env, parent);
670         return rc;
671 }
672
673 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
674                         struct md_attr *ma)
675 {
676         struct mdt_lock_handle  *lh;
677         int do_vbr = ma->ma_attr.la_valid &
678                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
679         __u64 lockpart = MDS_INODELOCK_UPDATE;
680         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
681         bool cos_incompat;
682         int rc;
683         ENTRY;
684
685         rc = mdt_object_striped(info, mo);
686         if (rc < 0)
687                 RETURN(rc);
688
689         cos_incompat = rc;
690
691         lh = &info->mti_lh[MDT_LH_PARENT];
692         mdt_lock_reg_init(lh, LCK_PW);
693
694         /* Even though the new MDT will grant PERM lock to the old
695          * client, but the old client will almost ignore that during
696          * So it needs to revoke both LOOKUP and PERM lock here, so
697          * both new and old client can cancel the dcache */
698         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
699                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
700
701         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
702                                     cos_incompat);
703         if (rc != 0)
704                 RETURN(rc);
705
706         /* all attrs are packed into mti_attr in unpack_setattr */
707         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
708                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
709
710         /* VBR: update version if attr changed are important for recovery */
711         if (do_vbr) {
712                 /* update on-disk version of changed object */
713                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
714                 rc = mdt_version_get_check_save(info, mo, 0);
715                 if (rc)
716                         GOTO(out_unlock, rc);
717         }
718
719         /* Ensure constant striping during chown(). See LU-2789. */
720         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
721                 mutex_lock(&mo->mot_lov_mutex);
722
723         /* all attrs are packed into mti_attr in unpack_setattr */
724         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
725
726         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
727                 mutex_unlock(&mo->mot_lov_mutex);
728
729         if (rc != 0)
730                 GOTO(out_unlock, rc);
731         mdt_dom_obj_lvb_update(info->mti_env, mo, false);
732         EXIT;
733 out_unlock:
734         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
735         return rc;
736 }
737
738 /**
739  * Check HSM flags and add HS_DIRTY flag if relevant.
740  *
741  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
742  * and is not RELEASED.
743  */
744 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
745                         struct md_attr *ma)
746 {
747         struct lu_ucred *uc = mdt_ucred(info);
748         cfs_cap_t cap_saved;
749         int rc;
750         ENTRY;
751
752         /* If the file was modified, add the dirty flag */
753         ma->ma_need = MA_HSM;
754         rc = mdt_attr_get_complex(info, mo, ma);
755         if (rc) {
756                 CERROR("file attribute read error for "DFID": %d.\n",
757                         PFID(mdt_object_fid(mo)), rc);
758                 RETURN(rc);
759         }
760
761         /* If an up2date copy exists in the backend, add dirty flag */
762         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
763             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
764                 ma->ma_hsm.mh_flags |= HS_DIRTY;
765
766                 /* Bump cap so that closes from non-owner writers can
767                  * set the HSM state to dirty. */
768                 cap_saved = uc->uc_cap;
769                 uc->uc_cap |= MD_CAP_TO_MASK(CFS_CAP_FOWNER);
770                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
771                 uc->uc_cap = cap_saved;
772                 if (rc)
773                         CERROR("file attribute change error for "DFID": %d\n",
774                                 PFID(mdt_object_fid(mo)), rc);
775         }
776
777         RETURN(rc);
778 }
779
780 static int mdt_reint_setattr(struct mdt_thread_info *info,
781                              struct mdt_lock_handle *lhc)
782 {
783         struct mdt_device *mdt = info->mti_mdt;
784         struct md_attr *ma = &info->mti_attr;
785         struct mdt_reint_record *rr = &info->mti_rr;
786         struct ptlrpc_request *req = mdt_info_req(info);
787         struct mdt_object *mo;
788         struct mdt_body *repbody;
789         int rc, rc2;
790         ENTRY;
791
792         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
793                   (unsigned int)ma->ma_attr.la_valid);
794
795         if (info->mti_dlm_req)
796                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
797
798         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
799
800         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
801         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
802         if (IS_ERR(mo))
803                 GOTO(out, rc = PTR_ERR(mo));
804
805         if (!mdt_object_exists(mo))
806                 GOTO(out_put, rc = -ENOENT);
807
808         if (mdt_object_remote(mo))
809                 GOTO(out_put, rc = -EREMOTE);
810
811         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
812         /* revoke lease lock if size is going to be changed */
813         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
814                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
815                      atomic_read(&mo->mot_lease_count) > 0)) {
816                 down_read(&mo->mot_open_sem);
817
818                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
819                         lhc = &info->mti_lh[MDT_LH_LOCAL];
820                         mdt_lock_reg_init(lhc, LCK_CW);
821
822                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
823                         if (rc != 0) {
824                                 up_read(&mo->mot_open_sem);
825                                 GOTO(out_put, rc);
826                         }
827
828                         /* revoke lease lock */
829                         mdt_object_unlock(info, mo, lhc, 1);
830                 }
831                 up_read(&mo->mot_open_sem);
832         }
833
834         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
835                 /* Check write access for the O_TRUNC case */
836                 if (mdt_write_read(mo) < 0)
837                         GOTO(out_put, rc = -ETXTBSY);
838
839                 /* LU-10286: compatibility check for FLR.
840                  * Please check the comment in mdt_finish_open() for details */
841                 if (!exp_connect_flr(info->mti_exp) ||
842                     !exp_connect_overstriping(info->mti_exp)) {
843                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
844                         if (rc < 0 && rc != -ENODATA)
845                                 GOTO(out_put, rc);
846
847                         if (!exp_connect_flr(info->mti_exp)) {
848                                 if (rc > 0 &&
849                                     mdt_lmm_is_flr(info->mti_big_lmm))
850                                         GOTO(out_put, rc = -EOPNOTSUPP);
851                         }
852
853                         if (!exp_connect_overstriping(info->mti_exp)) {
854                                 if (rc > 0 &&
855                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
856                                         GOTO(out_put, rc = -EOPNOTSUPP);
857                         }
858                 }
859
860                 /* For truncate, the file size sent from client
861                  * is believable, but the blocks are incorrect,
862                  * which makes the block size in LSOM attribute
863                  * inconsisent with the real block size.
864                  */
865                 rc = mdt_lsom_update(info, mo, true);
866                 if (rc)
867                         GOTO(out_put, rc);
868         }
869
870         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
871                 if (ma->ma_valid & MA_LOV)
872                         GOTO(out_put, rc = -EPROTO);
873
874                 /* MDT supports FMD for regular files due to Data-on-MDT */
875                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
876                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
877                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
878                                        req->rq_xid);
879
880                 rc = mdt_attr_set(info, mo, ma);
881                 if (rc)
882                         GOTO(out_put, rc);
883         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
884                    (ma->ma_valid & MA_INODE)) {
885                 struct lu_buf *buf = &info->mti_buf;
886                 struct lu_ucred *uc = mdt_ucred(info);
887                 struct mdt_lock_handle *lh;
888                 const char *name;
889                 __u64 lockpart = MDS_INODELOCK_XATTR;
890
891                 /* reject if either remote or striped dir is disabled */
892                 if (ma->ma_valid & MA_LMV) {
893                         if (!mdt->mdt_enable_remote_dir ||
894                             !mdt->mdt_enable_striped_dir)
895                                 GOTO(out_put, rc = -EPERM);
896
897                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
898                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
899                             mdt->mdt_enable_remote_dir_gid != -1)
900                                 GOTO(out_put, rc = -EPERM);
901                 }
902
903                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
904                         GOTO(out_put, rc = -ENOTDIR);
905
906                 if (ma->ma_attr.la_valid != 0)
907                         GOTO(out_put, rc = -EPROTO);
908
909                 if (ma->ma_valid & MA_LOV) {
910                         buf->lb_buf = ma->ma_lmm;
911                         buf->lb_len = ma->ma_lmm_size;
912                         name = XATTR_NAME_LOV;
913                 } else {
914                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
915
916                         buf->lb_buf = lmu;
917                         buf->lb_len = ma->ma_lmv_size;
918                         name = XATTR_NAME_DEFAULT_LMV;
919                         /* force client to update dir default layout */
920                         lockpart |= MDS_INODELOCK_LOOKUP;
921                 }
922
923                 lh = &info->mti_lh[MDT_LH_PARENT];
924                 mdt_lock_reg_init(lh, LCK_PW);
925
926                 rc = mdt_object_lock(info, mo, lh, lockpart);
927                 if (rc != 0)
928                         GOTO(out_put, rc);
929
930                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
931                                   name, 0);
932
933                 mdt_object_unlock(info, mo, lh, rc);
934                 if (rc)
935                         GOTO(out_put, rc);
936         } else {
937                 GOTO(out_put, rc = -EPROTO);
938         }
939
940         /* If file data is modified, add the dirty flag */
941         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
942                 rc = mdt_add_dirty_flag(info, mo, ma);
943
944         ma->ma_need = MA_INODE;
945         ma->ma_valid = 0;
946         rc = mdt_attr_get_complex(info, mo, ma);
947         if (rc != 0)
948                 GOTO(out_put, rc);
949
950         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
951
952         EXIT;
953 out_put:
954         mdt_object_put(info->mti_env, mo);
955 out:
956         if (rc == 0)
957                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
958
959         mdt_client_compatibility(info);
960         rc2 = mdt_fix_reply(info);
961         if (rc == 0)
962                 rc = rc2;
963         return rc;
964 }
965
966 static int mdt_reint_create(struct mdt_thread_info *info,
967                             struct mdt_lock_handle *lhc)
968 {
969         struct ptlrpc_request   *req = mdt_info_req(info);
970         int                     rc;
971         ENTRY;
972
973         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
974                 RETURN(err_serious(-ESTALE));
975
976         if (info->mti_dlm_req)
977                 ldlm_request_cancel(mdt_info_req(info),
978                                     info->mti_dlm_req, 0, LATF_SKIP);
979
980         if (!lu_name_is_valid(&info->mti_rr.rr_name))
981                 RETURN(-EPROTO);
982
983         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
984         case S_IFDIR:
985                 mdt_counter_incr(req, LPROC_MDT_MKDIR);
986                 break;
987         case S_IFREG:
988         case S_IFLNK:
989         case S_IFCHR:
990         case S_IFBLK:
991         case S_IFIFO:
992         case S_IFSOCK:
993                 /* Special file should stay on the same node as parent. */
994                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
995                 break;
996         default:
997                 CERROR("%s: Unsupported mode %o\n",
998                        mdt_obd_name(info->mti_mdt),
999                        info->mti_attr.ma_attr.la_mode);
1000                 RETURN(err_serious(-EOPNOTSUPP));
1001         }
1002
1003         rc = mdt_create(info);
1004         RETURN(rc);
1005 }
1006
1007 /*
1008  * VBR: save parent version in reply and child version getting by its name.
1009  * Version of child is getting and checking during its lookup. If
1010  */
1011 static int mdt_reint_unlink(struct mdt_thread_info *info,
1012                             struct mdt_lock_handle *lhc)
1013 {
1014         struct mdt_reint_record *rr = &info->mti_rr;
1015         struct ptlrpc_request *req = mdt_info_req(info);
1016         struct md_attr *ma = &info->mti_attr;
1017         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1018         struct mdt_object *mp;
1019         struct mdt_object *mc;
1020         struct mdt_lock_handle *parent_lh;
1021         struct mdt_lock_handle *child_lh;
1022         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1023         __u64 lock_ibits;
1024         bool cos_incompat = false;
1025         int no_name = 0;
1026         int rc;
1027
1028         ENTRY;
1029
1030         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1031                   PNAME(&rr->rr_name));
1032
1033         if (info->mti_dlm_req)
1034                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1035
1036         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1037                 RETURN(err_serious(-ENOENT));
1038
1039         if (!fid_is_md_operative(rr->rr_fid1))
1040                 RETURN(-EPERM);
1041
1042         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1043         if (IS_ERR(mp))
1044                 RETURN(PTR_ERR(mp));
1045
1046         if (mdt_object_remote(mp)) {
1047                 cos_incompat = true;
1048         } else {
1049                 rc = mdt_version_get_check_save(info, mp, 0);
1050                 if (rc)
1051                         GOTO(put_parent, rc);
1052         }
1053
1054 relock:
1055         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1056         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1057         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1058                                    cos_incompat);
1059         if (rc != 0)
1060                 GOTO(put_parent, rc);
1061
1062         /* lookup child object along with version checking */
1063         fid_zero(child_fid);
1064         rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
1065         if (rc != 0) {
1066                 /* Name might not be able to find during resend of
1067                  * remote unlink, considering following case.
1068                  * dir_A is a remote directory, the name entry of
1069                  * dir_A is on MDT0, the directory is on MDT1,
1070                  *
1071                  * 1. client sends unlink req to MDT1.
1072                  * 2. MDT1 sends name delete update to MDT0.
1073                  * 3. name entry is being deleted in MDT0 synchronously.
1074                  * 4. MDT1 is restarted.
1075                  * 5. client resends unlink req to MDT1. So it can not
1076                  *    find the name entry on MDT0 anymore.
1077                  * In this case, MDT1 only needs to destory the local
1078                  * directory.
1079                  * */
1080                 if (mdt_object_remote(mp) && rc == -ENOENT &&
1081                     !fid_is_zero(rr->rr_fid2) &&
1082                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1083                         no_name = 1;
1084                         *child_fid = *rr->rr_fid2;
1085                  } else {
1086                         GOTO(unlock_parent, rc);
1087                  }
1088         }
1089
1090         if (!fid_is_md_operative(child_fid))
1091                 GOTO(unlock_parent, rc = -EPERM);
1092
1093         /* We will lock the child regardless it is local or remote. No harm. */
1094         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1095         if (IS_ERR(mc))
1096                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1097
1098         if (!cos_incompat) {
1099                 rc = mdt_object_striped(info, mc);
1100                 if (rc < 0)
1101                         GOTO(put_child, rc);
1102
1103                 cos_incompat = rc;
1104                 if (cos_incompat) {
1105                         mdt_object_put(info->mti_env, mc);
1106                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1107                         goto relock;
1108                 }
1109         }
1110
1111         child_lh = &info->mti_lh[MDT_LH_CHILD];
1112         mdt_lock_reg_init(child_lh, LCK_EX);
1113         if (info->mti_spec.sp_rm_entry) {
1114                 struct lu_ucred *uc  = mdt_ucred(info);
1115
1116                 if (!mdt_is_dne_client(req->rq_export))
1117                         /* Return -ENOTSUPP for old client */
1118                         GOTO(put_child, rc = -ENOTSUPP);
1119
1120                 if (!md_capable(uc, CFS_CAP_SYS_ADMIN))
1121                         GOTO(put_child, rc = -EPERM);
1122
1123                 ma->ma_need = MA_INODE;
1124                 ma->ma_valid = 0;
1125                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1126                                 NULL, &rr->rr_name, ma, no_name);
1127                 GOTO(put_child, rc);
1128         }
1129
1130         if (mdt_object_remote(mc)) {
1131                 struct mdt_body  *repbody;
1132
1133                 if (!fid_is_zero(rr->rr_fid2)) {
1134                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1135                                mdt_obd_name(info->mti_mdt),
1136                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1137                         GOTO(put_child, rc = -ENOENT);
1138                 }
1139                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1140                        mdt_obd_name(info->mti_mdt),
1141                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1142
1143                 if (!mdt_is_dne_client(req->rq_export))
1144                         /* Return -ENOTSUPP for old client */
1145                         GOTO(put_child, rc = -ENOTSUPP);
1146
1147                 /* Revoke the LOOKUP lock of the remote object granted by
1148                  * this MDT. Since the unlink will happen on another MDT,
1149                  * it will release the LOOKUP lock right away. Then What
1150                  * would happen if another client try to grab the LOOKUP
1151                  * lock at the same time with unlink XXX */
1152                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1153                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1154                 LASSERT(repbody != NULL);
1155                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1156                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1157                 GOTO(unlock_child, rc = -EREMOTE);
1158         }
1159         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1160          * this now because a running HSM restore on the child (unlink
1161          * victim) will hold the layout lock. See LU-4002. */
1162         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1163         if (mdt_object_remote(mp)) {
1164                 /* Enqueue lookup lock from parent MDT */
1165                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1166                                             &child_lh->mlh_rreg_lh,
1167                                             child_lh->mlh_rreg_mode,
1168                                             MDS_INODELOCK_LOOKUP, false);
1169                 if (rc != ELDLM_OK)
1170                         GOTO(put_child, rc);
1171
1172                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1173         }
1174
1175         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1176                                     cos_incompat);
1177         if (rc != 0)
1178                 GOTO(put_child, rc);
1179
1180         /*
1181          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1182          * whether need MA_LOV and MA_COOKIE.
1183          */
1184         ma->ma_need = MA_INODE;
1185         ma->ma_valid = 0;
1186
1187         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1188                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1189         /* save version when object is locked */
1190         mdt_version_get_save(info, mc, 1);
1191
1192         mutex_lock(&mc->mot_lov_mutex);
1193
1194         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1195                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1196
1197         mutex_unlock(&mc->mot_lov_mutex);
1198         if (rc != 0)
1199                 GOTO(unlock_child, rc);
1200
1201         if (!lu_object_is_dying(&mc->mot_header)) {
1202                 rc = mdt_attr_get_complex(info, mc, ma);
1203                 if (rc)
1204                         GOTO(out_stat, rc);
1205         } else if (mdt_dom_check_for_discard(info, mc)) {
1206                 mdt_dom_discard_data(info, mc);
1207         }
1208         mdt_handle_last_unlink(info, mc, ma);
1209
1210 out_stat:
1211         if (ma->ma_valid & MA_INODE) {
1212                 switch (ma->ma_attr.la_mode & S_IFMT) {
1213                 case S_IFDIR:
1214                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
1215                         break;
1216                 case S_IFREG:
1217                 case S_IFLNK:
1218                 case S_IFCHR:
1219                 case S_IFBLK:
1220                 case S_IFIFO:
1221                 case S_IFSOCK:
1222                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
1223                         break;
1224                 default:
1225                         LASSERTF(0, "bad file type %o unlinking\n",
1226                                 ma->ma_attr.la_mode);
1227                 }
1228         }
1229
1230         EXIT;
1231
1232 unlock_child:
1233         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1234 put_child:
1235         mdt_object_put(info->mti_env, mc);
1236 unlock_parent:
1237         mdt_object_unlock(info, mp, parent_lh, rc);
1238 put_parent:
1239         mdt_object_put(info->mti_env, mp);
1240         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1241         return rc;
1242 }
1243
1244 /*
1245  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1246  * name.
1247  */
1248 static int mdt_reint_link(struct mdt_thread_info *info,
1249                           struct mdt_lock_handle *lhc)
1250 {
1251         struct mdt_reint_record *rr = &info->mti_rr;
1252         struct ptlrpc_request   *req = mdt_info_req(info);
1253         struct md_attr          *ma = &info->mti_attr;
1254         struct mdt_object       *ms;
1255         struct mdt_object       *mp;
1256         struct mdt_lock_handle  *lhs;
1257         struct mdt_lock_handle  *lhp;
1258         bool cos_incompat;
1259         int rc;
1260         ENTRY;
1261
1262         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1263                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1264
1265         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1266                 RETURN(err_serious(-ENOENT));
1267
1268         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
1269                 req->rq_no_reply = 1;
1270                 RETURN(err_serious(-ENOENT));
1271         }
1272
1273         if (info->mti_dlm_req)
1274                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1275
1276         /* Invalid case so return error immediately instead of
1277          * processing it */
1278         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1279                 RETURN(-EPERM);
1280
1281         if (!fid_is_md_operative(rr->rr_fid1) ||
1282             !fid_is_md_operative(rr->rr_fid2))
1283                 RETURN(-EPERM);
1284
1285         /* step 1: find target parent dir */
1286         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1287         if (IS_ERR(mp))
1288                 RETURN(PTR_ERR(mp));
1289
1290         rc = mdt_version_get_check_save(info, mp, 0);
1291         if (rc)
1292                 GOTO(put_parent, rc);
1293
1294         /* step 2: find source */
1295         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1296         if (IS_ERR(ms))
1297                 GOTO(put_parent, rc = PTR_ERR(ms));
1298
1299         if (!mdt_object_exists(ms)) {
1300                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1301                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1302                 GOTO(put_source, rc = -ENOENT);
1303         }
1304
1305         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1306
1307         lhp = &info->mti_lh[MDT_LH_PARENT];
1308         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1309         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1310                                    cos_incompat);
1311         if (rc != 0)
1312                 GOTO(put_source, rc);
1313
1314         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1315
1316         lhs = &info->mti_lh[MDT_LH_CHILD];
1317         mdt_lock_reg_init(lhs, LCK_EX);
1318         rc = mdt_reint_object_lock(info, ms, lhs,
1319                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1320                                    cos_incompat);
1321         if (rc != 0)
1322                 GOTO(unlock_parent, rc);
1323
1324         /* step 3: link it */
1325         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1326                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1327
1328         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1329         rc = mdt_version_get_check_save(info, ms, 1);
1330         if (rc)
1331                 GOTO(unlock_source, rc);
1332
1333         /** check target version by name during replay */
1334         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1335                                       &info->mti_tmp_fid1, 2);
1336         if (rc != 0 && rc != -ENOENT)
1337                 GOTO(unlock_source, rc);
1338         /* save version of file name for replay, it must be ENOENT here */
1339         if (!req_is_replay(mdt_info_req(info))) {
1340                 if (rc != -ENOENT) {
1341                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1342                                PNAME(&rr->rr_name));
1343                         GOTO(unlock_source, rc = -EEXIST);
1344                 }
1345                 info->mti_ver[2] = ENOENT_VERSION;
1346                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1347         }
1348
1349         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1350                       mdt_object_child(ms), &rr->rr_name, ma);
1351
1352         if (rc == 0)
1353                 mdt_counter_incr(req, LPROC_MDT_LINK);
1354
1355         EXIT;
1356 unlock_source:
1357         mdt_object_unlock(info, ms, lhs, rc);
1358 unlock_parent:
1359         mdt_object_unlock(info, mp, lhp, rc);
1360 put_source:
1361         mdt_object_put(info->mti_env, ms);
1362 put_parent:
1363         mdt_object_put(info->mti_env, mp);
1364         return rc;
1365 }
1366 /**
1367  * lock the part of the directory according to the hash of the name
1368  * (lh->mlh_pdo_hash) in parallel directory lock.
1369  */
1370 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1371                               struct mdt_lock_handle *lh,
1372                               struct mdt_object *obj, __u64 ibits,
1373                               bool cos_incompat)
1374 {
1375         struct ldlm_res_id *res = &info->mti_res_id;
1376         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1377         union ldlm_policy_data *policy = &info->mti_policy;
1378         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1379         int rc;
1380
1381         /*
1382          * Finish res_id initializing by name hash marking part of
1383          * directory which is taking modification.
1384          */
1385         LASSERT(lh->mlh_pdo_hash != 0);
1386         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1387         memset(policy, 0, sizeof(*policy));
1388         policy->l_inodebits.bits = ibits;
1389         if (cos_incompat &&
1390             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1391                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1392         /*
1393          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1394          * going to be sent to client. If it is - mdt_intent_policy() path will
1395          * fix it up and turn FL_LOCAL flag off.
1396          */
1397         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1398                           policy, res, dlmflags,
1399                           &info->mti_exp->exp_handle.h_cookie);
1400         return rc;
1401 }
1402
1403 /**
1404  * Get BFL lock for rename or migrate process.
1405  **/
1406 static int mdt_rename_lock(struct mdt_thread_info *info,
1407                            struct lustre_handle *lh)
1408 {
1409         int     rc;
1410         ENTRY;
1411
1412         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1413                 struct lu_fid *fid = &info->mti_tmp_fid1;
1414                 struct mdt_object *obj;
1415
1416                 /* XXX, right now, it has to use object API to
1417                  * enqueue lock cross MDT, so it will enqueue
1418                  * rename lock(with LUSTRE_BFL_FID) by root object */
1419                 lu_root_fid(fid);
1420                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1421                 if (IS_ERR(obj))
1422                         RETURN(PTR_ERR(obj));
1423
1424                 rc = mdt_remote_object_lock(info, obj,
1425                                             &LUSTRE_BFL_FID, lh,
1426                                             LCK_EX,
1427                                             MDS_INODELOCK_UPDATE, false);
1428                 mdt_object_put(info->mti_env, obj);
1429         } else {
1430                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1431                 union ldlm_policy_data *policy = &info->mti_policy;
1432                 struct ldlm_res_id *res_id = &info->mti_res_id;
1433                 __u64 flags = 0;
1434
1435                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1436                 memset(policy, 0, sizeof *policy);
1437                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1438                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1439                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1440                                             LDLM_IBITS, policy, LCK_EX, &flags,
1441                                             ldlm_blocking_ast,
1442                                             ldlm_completion_ast, NULL, NULL, 0,
1443                                             LVB_T_NONE,
1444                                             &info->mti_exp->exp_handle.h_cookie,
1445                                             lh);
1446                 RETURN(rc);
1447         }
1448         RETURN(rc);
1449 }
1450
1451 static void mdt_rename_unlock(struct lustre_handle *lh)
1452 {
1453         ENTRY;
1454         LASSERT(lustre_handle_is_used(lh));
1455         /* Cancel the single rename lock right away */
1456         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1457         EXIT;
1458 }
1459
1460 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1461                                                 const struct lu_fid *fid,
1462                                                 int idx)
1463 {
1464         struct mdt_object *dir;
1465         int rc;
1466
1467         ENTRY;
1468
1469         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1470         if (IS_ERR(dir))
1471                 RETURN(dir);
1472
1473         /* check early, the real version will be saved after locking */
1474         rc = mdt_version_get_check(info, dir, idx);
1475         if (rc)
1476                 GOTO(out_put, rc);
1477
1478         if (!mdt_object_exists(dir))
1479                 GOTO(out_put, rc = -ENOENT);
1480
1481         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1482                 GOTO(out_put, rc = -ENOTDIR);
1483
1484         RETURN(dir);
1485 out_put:
1486         mdt_object_put(info->mti_env, dir);
1487         return ERR_PTR(rc);
1488 }
1489
1490 /*
1491  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1492  * herein we don't really check it, just do revoke.
1493  */
1494 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1495                                   struct mdt_object *pobj,
1496                                   struct mdt_object *obj)
1497 {
1498         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1499         int rc;
1500
1501         mdt_lock_handle_init(lh);
1502         mdt_lock_reg_init(lh, LCK_EX);
1503
1504         if (mdt_object_remote(pobj)) {
1505                 /* don't bother to check if pobj and obj are on the same MDT. */
1506                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1507                                             &lh->mlh_rreg_lh, LCK_EX,
1508                                             MDS_INODELOCK_LOOKUP, false);
1509         } else if (mdt_object_remote(obj)) {
1510                 struct ldlm_res_id *res = &info->mti_res_id;
1511                 union ldlm_policy_data *policy = &info->mti_policy;
1512                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1513                                  LDLM_FL_COS_INCOMPAT;
1514
1515                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1516                 memset(policy, 0, sizeof(*policy));
1517                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1518                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1519                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1520                                   dlmflags, NULL);
1521         } else {
1522                 /* do nothing if both are local */
1523                 return 0;
1524         }
1525
1526         if (rc != ELDLM_OK)
1527                 return rc;
1528
1529         /*
1530          * TODO, currently we don't save this lock because there is no place to
1531          * hold this lock handle, but to avoid race we need to save this lock.
1532          */
1533         mdt_object_unlock(info, NULL, lh, 1);
1534
1535         return 0;
1536 }
1537
1538 /*
1539  * operation may takes locks of linkea, or directory stripes, group them in
1540  * different list.
1541  */
1542 struct mdt_sub_lock {
1543         struct mdt_object      *msl_obj;
1544         struct mdt_lock_handle  msl_lh;
1545         struct list_head        msl_linkage;
1546 };
1547
1548 static void mdt_unlock_list(struct mdt_thread_info *info,
1549                             struct list_head *list, int decref)
1550 {
1551         struct mdt_sub_lock *msl;
1552         struct mdt_sub_lock *tmp;
1553
1554         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1555                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1556                 list_del(&msl->msl_linkage);
1557                 OBD_FREE_PTR(msl);
1558         }
1559 }
1560
1561 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1562                                              struct mdt_object *obj,
1563                                              struct mdt_lock_handle *lh,
1564                                              struct ldlm_enqueue_info *einfo,
1565                                              struct list_head *slave_locks,
1566                                              int decref)
1567 {
1568         if (mdt_object_remote(obj)) {
1569                 mdt_unlock_list(info, slave_locks, decref);
1570                 mdt_object_unlock(info, obj, lh, decref);
1571         } else {
1572                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1573         }
1574 }
1575
1576 /*
1577  * lock parents of links, and also check whether total locks don't exceed
1578  * RS_MAX_LOCKS.
1579  *
1580  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1581  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1582  * \retval      -ev negative errno upon error
1583  */
1584 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1585                                  struct mdt_object *pobj,
1586                                  const struct md_attr *ma,
1587                                  struct mdt_object *obj,
1588                                  struct mdt_lock_handle *lhp,
1589                                  struct ldlm_enqueue_info *peinfo,
1590                                  struct list_head *parent_slave_locks,
1591                                  struct list_head *link_locks)
1592 {
1593         struct mdt_device *mdt = info->mti_mdt;
1594         struct lu_buf *buf = &info->mti_big_buf;
1595         struct lu_name *lname = &info->mti_name;
1596         struct linkea_data ldata = { NULL };
1597         bool blocked = false;
1598         int local_lnkp_cnt = 0;
1599         int rc;
1600
1601         ENTRY;
1602
1603         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1604                 RETURN(0);
1605
1606         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1607         if (buf->lb_buf == NULL)
1608                 RETURN(-ENOMEM);
1609
1610         ldata.ld_buf = buf;
1611         rc = mdt_links_read(info, obj, &ldata);
1612         if (rc) {
1613                 if (rc == -ENOENT || rc == -ENODATA)
1614                         rc = 0;
1615                 RETURN(rc);
1616         }
1617
1618         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1619              linkea_next_entry(&ldata)) {
1620                 struct mdt_object *lnkp;
1621                 struct mdt_sub_lock *msl;
1622                 struct lu_fid fid;
1623                 __u64 ibits;
1624
1625                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1626                                     &fid);
1627
1628                 /* check if it's also linked to parent */
1629                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1630                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1631                                PFID(&fid), PNAME(lname));
1632                         /* in case link is remote object, revoke LOOKUP lock */
1633                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1634                         continue;
1635                 }
1636
1637                 lnkp = NULL;
1638
1639                 /* check if it's linked to a stripe of parent */
1640                 if (ma->ma_valid & MA_LMV) {
1641                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1642                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1643                         int j = 0;
1644
1645                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1646                                 fid_le_to_cpu(stripe_fid,
1647                                               &lmv->lmv_stripe_fids[j]);
1648                                 if (lu_fid_eq(stripe_fid, &fid)) {
1649                                         CDEBUG(D_INFO, "skip stripe "DFID
1650                                                ", reovke "DNAME"\n",
1651                                                PFID(&fid), PNAME(lname));
1652                                         lnkp = mdt_object_find(info->mti_env,
1653                                                                mdt, &fid);
1654                                         if (IS_ERR(lnkp))
1655                                                 GOTO(out, rc = PTR_ERR(lnkp));
1656                                         break;
1657                                 }
1658                         }
1659
1660                         if (lnkp) {
1661                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1662                                                                    obj);
1663                                 mdt_object_put(info->mti_env, lnkp);
1664                                 continue;
1665                         }
1666                 }
1667
1668                 /* Check if it's already locked */
1669                 list_for_each_entry(msl, link_locks, msl_linkage) {
1670                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1671                                 CDEBUG(D_INFO,
1672                                        DFID" was locked, revoke "DNAME"\n",
1673                                        PFID(&fid), PNAME(lname));
1674                                 lnkp = msl->msl_obj;
1675                                 break;
1676                         }
1677                 }
1678
1679                 if (lnkp) {
1680                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1681                         continue;
1682                 }
1683
1684                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1685                        PFID(&fid), PNAME(lname));
1686
1687                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1688                 if (IS_ERR(lnkp)) {
1689                         CWARN("%s: cannot find obj "DFID": %ld\n",
1690                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1691                         continue;
1692                 }
1693
1694                 if (!mdt_object_exists(lnkp)) {
1695                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1696                               PFID(&fid), PNAME(lname));
1697                         mdt_object_put(info->mti_env, lnkp);
1698                         continue;
1699                 }
1700
1701                 if (!mdt_object_remote(lnkp))
1702                         local_lnkp_cnt++;
1703
1704                 OBD_ALLOC_PTR(msl);
1705                 if (msl == NULL)
1706                         GOTO(out, rc = -ENOMEM);
1707
1708                 /*
1709                  * we can't follow parent-child lock order like other MD
1710                  * operations, use lock_try here to avoid deadlock, if the lock
1711                  * cannot be taken, drop all locks taken, revoke the blocked
1712                  * one, and continue processing the remaining entries, and in
1713                  * the end of the loop restart from beginning.
1714                  */
1715                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1716                 ibits = 0;
1717                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1718                                          MDS_INODELOCK_UPDATE, true);
1719                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1720
1721                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1722                                PFID(&fid), PNAME(lname));
1723
1724                         mdt_unlock_list(info, link_locks, 1);
1725                         /* also unlock parent locks to avoid deadlock */
1726                         if (!blocked)
1727                                 mdt_migrate_object_unlock(info, pobj, lhp,
1728                                                           peinfo,
1729                                                           parent_slave_locks,
1730                                                           1);
1731
1732                         blocked = true;
1733
1734                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1735                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1736                                              MDS_INODELOCK_UPDATE);
1737                         if (rc) {
1738                                 mdt_object_put(info->mti_env, lnkp);
1739                                 OBD_FREE_PTR(msl);
1740                                 GOTO(out, rc);
1741                         }
1742
1743                         if (mdt_object_remote(lnkp)) {
1744                                 struct ldlm_lock *lock;
1745
1746                                 /*
1747                                  * for remote object, set lock cb_atomic,
1748                                  * so lock can be released in blocking_ast()
1749                                  * immediately, then the next lock_try will
1750                                  * have better chance of success.
1751                                  */
1752                                 lock = ldlm_handle2lock(
1753                                                 &msl->msl_lh.mlh_rreg_lh);
1754                                 LASSERT(lock != NULL);
1755                                 lock_res_and_lock(lock);
1756                                 ldlm_set_atomic_cb(lock);
1757                                 unlock_res_and_lock(lock);
1758                                 LDLM_LOCK_PUT(lock);
1759                         }
1760
1761                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1762                         OBD_FREE_PTR(msl);
1763                         continue;
1764                 }
1765
1766                 INIT_LIST_HEAD(&msl->msl_linkage);
1767                 msl->msl_obj = lnkp;
1768                 list_add_tail(&msl->msl_linkage, link_locks);
1769
1770                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1771         }
1772
1773         if (blocked)
1774                 GOTO(out, rc = -EBUSY);
1775
1776         EXIT;
1777 out:
1778         if (rc)
1779                 mdt_unlock_list(info, link_locks, rc);
1780         else if (local_lnkp_cnt > RS_MAX_LOCKS - 6)
1781                 /*
1782                  * parent may have 3 local objects: master object and 2 stripes
1783                  * (if it's being migrated too); source may have 2 local
1784                  * objects: master and 1 stripe; target has 1 local object.
1785                  */
1786                 rc = 1;
1787         return rc;
1788 }
1789
1790 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1791                                   struct mdt_object *obj,
1792                                   const struct md_attr *ma,
1793                                   struct list_head *slave_locks)
1794 {
1795         struct mdt_device *mdt = info->mti_mdt;
1796         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1797         struct lu_fid *fid = &info->mti_tmp_fid1;
1798         struct mdt_object *slave;
1799         struct mdt_sub_lock *msl;
1800         int i;
1801         int rc;
1802
1803         ENTRY;
1804
1805         LASSERT(mdt_object_remote(obj));
1806         LASSERT(ma->ma_valid & MA_LMV);
1807         LASSERT(lmv);
1808
1809         if (!lmv_is_sane(lmv))
1810                 RETURN(-EINVAL);
1811
1812         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1813                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1814
1815                 if (!fid_is_sane(fid))
1816                         continue;
1817
1818                 slave = mdt_object_find(info->mti_env, mdt, fid);
1819                 if (IS_ERR(slave))
1820                         GOTO(out, rc = PTR_ERR(slave));
1821
1822                 OBD_ALLOC_PTR(msl);
1823                 if (!msl) {
1824                         mdt_object_put(info->mti_env, slave);
1825                         GOTO(out, rc = -ENOMEM);
1826                 }
1827
1828                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1829                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1830                                            MDS_INODELOCK_UPDATE, true);
1831                 if (rc) {
1832                         OBD_FREE_PTR(msl);
1833                         mdt_object_put(info->mti_env, slave);
1834                         GOTO(out, rc);
1835                 }
1836
1837                 INIT_LIST_HEAD(&msl->msl_linkage);
1838                 msl->msl_obj = slave;
1839                 list_add_tail(&msl->msl_linkage, slave_locks);
1840         }
1841         EXIT;
1842
1843 out:
1844         if (rc)
1845                 mdt_unlock_list(info, slave_locks, rc);
1846         return rc;
1847 }
1848
1849 /* lock parent and its stripes */
1850 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1851                                    struct mdt_object *obj,
1852                                    const struct md_attr *ma,
1853                                    struct mdt_lock_handle *lh,
1854                                    struct ldlm_enqueue_info *einfo,
1855                                    struct list_head *slave_locks)
1856 {
1857         int rc;
1858
1859         if (mdt_object_remote(obj)) {
1860                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1861                                             &lh->mlh_rreg_lh, LCK_PW,
1862                                             MDS_INODELOCK_UPDATE, false);
1863                 if (rc != ELDLM_OK)
1864                         return rc;
1865
1866                 /*
1867                  * if obj is remote and striped, lock its stripes explicitly
1868                  * because it's not striped in LOD layer on this MDT.
1869                  */
1870                 if (ma->ma_valid & MA_LMV) {
1871                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1872                         if (rc)
1873                                 mdt_object_unlock(info, obj, lh, rc);
1874                 }
1875         } else {
1876                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1877                                             einfo, true);
1878         }
1879
1880         return rc;
1881 }
1882
1883 /*
1884  * in migration, object may be remote, and we need take full lock of it and its
1885  * stripes if it's directory, besides, object may be a remote object on its
1886  * parent, revoke its LOOKUP lock on where its parent is located.
1887  */
1888 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1889                                    struct mdt_object *pobj,
1890                                    struct mdt_object *obj,
1891                                    struct mdt_lock_handle *lh,
1892                                    struct ldlm_enqueue_info *einfo,
1893                                    struct list_head *slave_locks)
1894 {
1895         int rc;
1896
1897         if (mdt_object_remote(obj)) {
1898                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1899                 if (rc)
1900                         return rc;
1901
1902                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1903                                             &lh->mlh_rreg_lh, LCK_EX,
1904                                             MDS_INODELOCK_FULL, false);
1905                 if (rc != ELDLM_OK)
1906                         return rc;
1907
1908                 /*
1909                  * if obj is remote and striped, lock its stripes explicitly
1910                  * because it's not striped in LOD layer on this MDT.
1911                  */
1912                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1913                         struct md_attr *ma = &info->mti_attr;
1914
1915                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1916                         if (rc) {
1917                                 mdt_object_unlock(info, obj, lh, rc);
1918                                 return rc;
1919                         }
1920
1921                         if (ma->ma_valid & MA_LMV) {
1922                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1923                                                             slave_locks);
1924                                 if (rc)
1925                                         mdt_object_unlock(info, obj, lh, rc);
1926                         }
1927                 }
1928         } else {
1929                 if (mdt_object_remote(pobj)) {
1930                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1931                         if (rc)
1932                                 return rc;
1933                 }
1934
1935                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
1936                                             einfo, true);
1937         }
1938
1939         return rc;
1940 }
1941
1942 /*
1943  * lookup source by name, if parent is striped directory, we need to find the
1944  * corresponding stripe where source is located, and then lookup there.
1945  *
1946  * besides, if parent is migrating too, and file is already in target stripe,
1947  * this should be a redo of 'lfs migrate' on client side.
1948  */
1949 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1950                               struct mdt_object *pobj,
1951                               const struct md_attr *ma,
1952                               const struct lu_name *lname,
1953                               struct mdt_object **spobj,
1954                               struct mdt_object **sobj)
1955 {
1956         const struct lu_env *env = info->mti_env;
1957         struct lu_fid *fid = &info->mti_tmp_fid1;
1958         struct mdt_object *stripe;
1959         int rc;
1960
1961         if (ma->ma_valid & MA_LMV) {
1962                 /* if parent is striped, lookup on corresponding stripe */
1963                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1964
1965                 if (!lmv_is_sane(lmv))
1966                         return -EBADF;
1967
1968                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1969                                                   lname->ln_namelen);
1970                 if (rc < 0)
1971                         return rc;
1972
1973                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
1974
1975                 stripe = mdt_object_find(env, info->mti_mdt, fid);
1976                 if (IS_ERR(stripe))
1977                         return PTR_ERR(stripe);
1978
1979                 fid_zero(fid);
1980                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
1981                                 &info->mti_spec);
1982                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
1983                         /*
1984                          * if parent layout is changeing, and lookup child
1985                          * failed on source stripe, lookup again on target
1986                          * stripe, if it exists, it means previous migration
1987                          * was interrupted, and current file was migrated
1988                          * already.
1989                          */
1990                         mdt_object_put(env, stripe);
1991
1992                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
1993                                                       lname->ln_namelen);
1994                         if (rc < 0)
1995                                 return rc;
1996
1997                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
1998
1999                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2000                         if (IS_ERR(stripe))
2001                                 return PTR_ERR(stripe);
2002
2003                         fid_zero(fid);
2004                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2005                                         fid, &info->mti_spec);
2006                         mdt_object_put(env, stripe);
2007                         return rc ?: -EALREADY;
2008                 } else if (rc) {
2009                         mdt_object_put(env, stripe);
2010                         return rc;
2011                 }
2012         } else {
2013                 fid_zero(fid);
2014                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2015                                 &info->mti_spec);
2016                 if (rc)
2017                         return rc;
2018
2019                 stripe = pobj;
2020                 mdt_object_get(env, stripe);
2021         }
2022
2023         *spobj = stripe;
2024
2025         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2026         if (IS_ERR(*sobj)) {
2027                 mdt_object_put(env, stripe);
2028                 rc = PTR_ERR(*sobj);
2029                 *spobj = NULL;
2030                 *sobj = NULL;
2031         }
2032
2033         return rc;
2034 }
2035
2036 /* end lease and close file for regular file */
2037 static int mdd_migrate_close(struct mdt_thread_info *info,
2038                              struct mdt_object *obj)
2039 {
2040         struct close_data *data;
2041         struct mdt_body *repbody;
2042         struct ldlm_lock *lease;
2043         int rc;
2044         int rc2;
2045
2046         rc = -EPROTO;
2047         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2048                                       RCL_CLIENT) ||
2049             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2050                                       RCL_CLIENT))
2051                 goto close;
2052
2053         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2054         if (!data)
2055                 goto close;
2056
2057         rc = -ESTALE;
2058         lease = ldlm_handle2lock(&data->cd_handle);
2059         if (!lease)
2060                 goto close;
2061
2062         /* check if the lease was already canceled */
2063         lock_res_and_lock(lease);
2064         rc = ldlm_is_cancel(lease);
2065         unlock_res_and_lock(lease);
2066
2067         if (rc) {
2068                 rc = -EAGAIN;
2069                 LDLM_DEBUG(lease, DFID" lease broken",
2070                            PFID(mdt_object_fid(obj)));
2071         }
2072
2073         /*
2074          * cancel server side lease, client side counterpart should have been
2075          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2076          */
2077         ldlm_lock_cancel(lease);
2078         ldlm_reprocess_all(lease->l_resource, lease);
2079         LDLM_LOCK_PUT(lease);
2080
2081 close:
2082         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2083         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2084         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2085
2086         return rc ?: rc2;
2087 }
2088
2089 /*
2090  * migrate file in below steps:
2091  *  1. lock parent and its stripes
2092  *  2. lookup source by name
2093  *  3. lock parents of source links if source is not directory
2094  *  4. reject if source is in HSM
2095  *  5. take source open_sem and close file if source is regular file
2096  *  6. lock source and its stripes if it's directory
2097  *  7. lock target so subsequent change to it can trigger COS
2098  *  8. migrate file
2099  *  9. unlock above locks
2100  * 10. sync device if source has links
2101  */
2102 int mdt_reint_migrate(struct mdt_thread_info *info,
2103                       struct mdt_lock_handle *unused)
2104 {
2105         const struct lu_env *env = info->mti_env;
2106         struct mdt_device *mdt = info->mti_mdt;
2107         struct ptlrpc_request *req = mdt_info_req(info);
2108         struct mdt_reint_record *rr = &info->mti_rr;
2109         struct lu_ucred *uc = mdt_ucred(info);
2110         struct md_attr *ma = &info->mti_attr;
2111         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2112         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2113         struct mdt_object *pobj;
2114         struct mdt_object *spobj = NULL;
2115         struct mdt_object *sobj = NULL;
2116         struct mdt_object *tobj;
2117         struct lustre_handle rename_lh = { 0 };
2118         struct mdt_lock_handle *lhp;
2119         struct mdt_lock_handle *lhs;
2120         struct mdt_lock_handle *lht;
2121         LIST_HEAD(parent_slave_locks);
2122         LIST_HEAD(child_slave_locks);
2123         LIST_HEAD(link_locks);
2124         int lock_retries = 5;
2125         bool open_sem_locked = false;
2126         bool do_sync = false;
2127         int rc;
2128         ENTRY;
2129
2130         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2131                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2132
2133         if (info->mti_dlm_req)
2134                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2135
2136         if (!fid_is_md_operative(rr->rr_fid1) ||
2137             !fid_is_md_operative(rr->rr_fid2))
2138                 RETURN(-EPERM);
2139
2140         /* don't allow migrate . or .. */
2141         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2142                 RETURN(-EBUSY);
2143
2144         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2145                 RETURN(-EPERM);
2146
2147         if (uc && !md_capable(uc, CFS_CAP_SYS_ADMIN) &&
2148             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2149             mdt->mdt_enable_remote_dir_gid != -1)
2150                 RETURN(-EPERM);
2151
2152         /*
2153          * Note: do not enqueue rename lock for replay request, because
2154          * if other MDT holds rename lock, but being blocked to wait for
2155          * this MDT to finish its recovery, and the failover MDT can not
2156          * get rename lock, which will cause deadlock.
2157          *
2158          * req is NULL if this is called by directory auto-split.
2159          */
2160         if (req && !req_is_replay(req)) {
2161                 rc = mdt_rename_lock(info, &rename_lh);
2162                 if (rc != 0) {
2163                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2164                                mdt_obd_name(info->mti_mdt), rc);
2165                         RETURN(rc);
2166                 }
2167         }
2168
2169         /* pobj is master object of parent */
2170         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2171         if (IS_ERR(pobj))
2172                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2173
2174         if (req) {
2175                 rc = mdt_version_get_check(info, pobj, 0);
2176                 if (rc)
2177                         GOTO(put_parent, rc);
2178         }
2179
2180         if (!mdt_object_exists(pobj))
2181                 GOTO(put_parent, rc = -ENOENT);
2182
2183         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2184                 GOTO(put_parent, rc = -ENOTDIR);
2185
2186         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2187         if (rc)
2188                 GOTO(put_parent, rc);
2189
2190 lock_parent:
2191         /* lock parent object */
2192         lhp = &info->mti_lh[MDT_LH_PARENT];
2193         mdt_lock_reg_init(lhp, LCK_PW);
2194         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2195                                      &parent_slave_locks);
2196         if (rc)
2197                 GOTO(put_parent, rc);
2198
2199         /*
2200          * spobj is the corresponding stripe against name if pobj is striped
2201          * directory, which is the real parent, and no need to lock, because
2202          * we've taken full lock of pobj.
2203          */
2204         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2205         if (rc)
2206                 GOTO(unlock_parent, rc);
2207
2208         /* lock parents of source links, and revoke LOOKUP lock of links */
2209         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2210                                    &parent_slave_locks, &link_locks);
2211         if (rc == -EBUSY && lock_retries-- > 0) {
2212                 mdt_object_put(env, sobj);
2213                 mdt_object_put(env, spobj);
2214                 goto lock_parent;
2215         }
2216
2217         if (rc < 0)
2218                 GOTO(put_source, rc);
2219
2220         /*
2221          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2222          * with one request, if total lock count exceeds this limit, we will
2223          * drop all locks after migration, and synchronous device in the end.
2224          */
2225         do_sync = rc;
2226
2227         /* TODO: DoM migration is not supported, migrate dirent only */
2228         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2229                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2230                 if (rc)
2231                         GOTO(unlock_links, rc);
2232
2233                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2234                         info->mti_spec.sp_migrate_nsonly = 1;
2235         }
2236
2237         /* if migration HSM is allowed */
2238         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2239                 ma->ma_need = MA_HSM;
2240                 ma->ma_valid = 0;
2241                 rc = mdt_attr_get_complex(info, sobj, ma);
2242                 if (rc)
2243                         GOTO(unlock_links, rc);
2244
2245                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2246                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2247         }
2248
2249         /* end lease and close file for regular file */
2250         if (info->mti_spec.sp_migrate_close) {
2251                 /* try to hold open_sem so that nobody else can open the file */
2252                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2253                         /* close anyway */
2254                         mdd_migrate_close(info, sobj);
2255                         GOTO(unlock_links, rc = -EBUSY);
2256                 } else {
2257                         open_sem_locked = true;
2258                         rc = mdd_migrate_close(info, sobj);
2259                         if (rc)
2260                                 GOTO(unlock_open_sem, rc);
2261                 }
2262         }
2263
2264         /* lock source */
2265         lhs = &info->mti_lh[MDT_LH_OLD];
2266         mdt_lock_reg_init(lhs, LCK_EX);
2267         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2268                                      &child_slave_locks);
2269         if (rc)
2270                 GOTO(unlock_open_sem, rc);
2271
2272         /* lock target */
2273         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2274         if (IS_ERR(tobj))
2275                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2276
2277         lht = &info->mti_lh[MDT_LH_NEW];
2278         mdt_lock_reg_init(lht, LCK_EX);
2279         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2280         if (rc)
2281                 GOTO(put_target, rc);
2282
2283         /* Don't do lookup sanity check. We know name doesn't exist. */
2284         info->mti_spec.sp_cr_lookup = 0;
2285         info->mti_spec.sp_feat = &dt_directory_features;
2286
2287         rc = mdo_migrate(env, mdt_object_child(pobj),
2288                          mdt_object_child(sobj), &rr->rr_name,
2289                          mdt_object_child(tobj),
2290                          &info->mti_spec, ma);
2291         if (!rc)
2292                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2293                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2294         EXIT;
2295
2296         mdt_object_unlock(info, tobj, lht, rc);
2297 put_target:
2298         mdt_object_put(env, tobj);
2299 unlock_source:
2300         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2301                                   &child_slave_locks, rc);
2302 unlock_open_sem:
2303         if (open_sem_locked)
2304                 up_write(&sobj->mot_open_sem);
2305 unlock_links:
2306         /* if we've got too many locks to save into RPC,
2307          * then just commit before the locks are released */
2308         if (!rc && do_sync)
2309                 mdt_device_sync(env, mdt);
2310         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2311 put_source:
2312         mdt_object_put(env, sobj);
2313         mdt_object_put(env, spobj);
2314 unlock_parent:
2315         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2316                                   &parent_slave_locks, rc);
2317 put_parent:
2318         mdt_object_put(env, pobj);
2319 unlock_rename:
2320         if (lustre_handle_is_used(&rename_lh))
2321                 mdt_rename_unlock(&rename_lh);
2322
2323         return rc;
2324 }
2325
2326 static int mdt_object_lock_save(struct mdt_thread_info *info,
2327                                 struct mdt_object *dir,
2328                                 struct mdt_lock_handle *lh,
2329                                 int idx, bool cos_incompat)
2330 {
2331         int rc;
2332
2333         /* we lock the target dir if it is local */
2334         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2335                                    cos_incompat);
2336         if (rc != 0)
2337                 return rc;
2338
2339         /* get and save correct version after locking */
2340         mdt_version_get_save(info, dir, idx);
2341         return 0;
2342 }
2343
2344 /*
2345  * determine lock order of sobj and tobj
2346  *
2347  * there are two situations we need to lock tobj before sobj:
2348  * 1. sobj is child of tobj
2349  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2350  *    larger than that of tobj
2351  *
2352  * \retval      1 lock tobj before sobj
2353  * \retval      0 lock sobj before tobj
2354  * \retval      -ev negative errno upon error
2355  */
2356 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2357                                            struct mdt_object *sobj,
2358                                            struct mdt_object *tobj)
2359 {
2360         struct md_attr *ma = &info->mti_attr;
2361         struct lu_fid *spfid = &info->mti_tmp_fid1;
2362         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2363         struct lmv_mds_md_v1 *lmv;
2364         __u32 sindex;
2365         __u32 tindex;
2366         int rc;
2367
2368         /* sobj and tobj are the same */
2369         if (sobj == tobj)
2370                 return 0;
2371
2372         if (fid_is_root(mdt_object_fid(sobj)))
2373                 return 0;
2374
2375         if (fid_is_root(mdt_object_fid(tobj)))
2376                 return 1;
2377
2378         /* check whether sobj is child of tobj */
2379         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2380                            mdt_object_fid(tobj));
2381         if (rc < 0)
2382                 return rc;
2383
2384         if (rc == 1)
2385                 return 1;
2386
2387         /* check whether sobj and tobj are children of the same parent */
2388         rc = mdt_attr_get_pfid(info, sobj, spfid);
2389         if (rc)
2390                 return rc;
2391
2392         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2393         if (rc)
2394                 return rc;
2395
2396         if (!lu_fid_eq(spfid, tpfid))
2397                 return 0;
2398
2399         /* check whether sobj and tobj are sibling stripes */
2400         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2401         if (rc)
2402                 return rc;
2403
2404         if (!(ma->ma_valid & MA_LMV))
2405                 return 0;
2406
2407         lmv = &ma->ma_lmv->lmv_md_v1;
2408         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2409                 return 0;
2410         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2411
2412         ma->ma_valid = 0;
2413         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2414         if (rc)
2415                 return rc;
2416
2417         if (!(ma->ma_valid & MA_LMV))
2418                 return -ENODATA;
2419
2420         lmv = &ma->ma_lmv->lmv_md_v1;
2421         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2422                 return -EINVAL;
2423         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2424
2425         /* check stripe index of sobj and tobj */
2426         if (sindex == tindex)
2427                 return -EINVAL;
2428
2429         return sindex < tindex ? 0 : 1;
2430 }
2431
2432 /*
2433  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2434  * 2 - srcdir child; 3 - tgtdir child.
2435  * Update on disk version of srcdir child.
2436  */
2437 static int mdt_reint_rename(struct mdt_thread_info *info,
2438                             struct mdt_lock_handle *unused)
2439 {
2440         struct mdt_device *mdt = info->mti_mdt;
2441         struct mdt_reint_record *rr = &info->mti_rr;
2442         struct md_attr *ma = &info->mti_attr;
2443         struct ptlrpc_request *req = mdt_info_req(info);
2444         struct mdt_object *msrcdir = NULL;
2445         struct mdt_object *mtgtdir = NULL;
2446         struct mdt_object *mold;
2447         struct mdt_object *mnew = NULL;
2448         struct lustre_handle rename_lh = { 0 };
2449         struct mdt_lock_handle *lh_srcdirp;
2450         struct mdt_lock_handle *lh_tgtdirp;
2451         struct mdt_lock_handle *lh_oldp = NULL;
2452         struct mdt_lock_handle *lh_newp = NULL;
2453         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2454         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2455         __u64 lock_ibits;
2456         bool reverse = false, discard = false;
2457         bool cos_incompat;
2458         int rc;
2459         ENTRY;
2460
2461         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2462                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2463                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2464
2465         if (info->mti_dlm_req)
2466                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2467
2468         if (!fid_is_md_operative(rr->rr_fid1) ||
2469             !fid_is_md_operative(rr->rr_fid2))
2470                 RETURN(-EPERM);
2471
2472         /* find both parents. */
2473         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2474         if (IS_ERR(msrcdir))
2475                 RETURN(PTR_ERR(msrcdir));
2476
2477         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2478
2479         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2480                 mtgtdir = msrcdir;
2481                 mdt_object_get(info->mti_env, mtgtdir);
2482         } else {
2483                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2484                 if (IS_ERR(mtgtdir))
2485                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2486         }
2487
2488         /*
2489          * Note: do not enqueue rename lock for replay request, because
2490          * if other MDT holds rename lock, but being blocked to wait for
2491          * this MDT to finish its recovery, and the failover MDT can not
2492          * get rename lock, which will cause deadlock.
2493          */
2494         if (!req_is_replay(req)) {
2495                 /*
2496                  * Normally rename RPC is handled on the MDT with the target
2497                  * directory (if target exists, it's on the MDT with the
2498                  * target), if the source directory is remote, it's a hint that
2499                  * source is remote too (this may not be true, but it won't
2500                  * cause any issue), return -EXDEV early to avoid taking
2501                  * rename_lock.
2502                  */
2503                 if (!mdt->mdt_enable_remote_rename &&
2504                     mdt_object_remote(msrcdir))
2505                         GOTO(out_put_tgtdir, rc = -EXDEV);
2506
2507                 rc = mdt_rename_lock(info, &rename_lh);
2508                 if (rc != 0) {
2509                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2510                                mdt_obd_name(mdt), rc);
2511                         GOTO(out_put_tgtdir, rc);
2512                 }
2513         }
2514
2515         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2516         if (rc < 0)
2517                 GOTO(out_unlock_rename, rc);
2518
2519         reverse = rc;
2520
2521         /* source needs to be looked up after locking source parent, otherwise
2522          * this rename may race with unlink source, and cause rename hang, see
2523          * sanityn.sh 55b, so check parents first, if later we found source is
2524          * remote, relock parents. */
2525         cos_incompat = (mdt_object_remote(msrcdir) ||
2526                         mdt_object_remote(mtgtdir));
2527
2528         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2529
2530         /* lock parents in the proper order. */
2531         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2532         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2533
2534 relock:
2535         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2536         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2537
2538         if (reverse) {
2539                 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2540                                           cos_incompat);
2541                 if (rc)
2542                         GOTO(out_unlock_rename, rc);
2543
2544                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2545
2546                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2547                                           cos_incompat);
2548                 if (rc != 0) {
2549                         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2550                         GOTO(out_unlock_rename, rc);
2551                 }
2552         } else {
2553                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2554                                           cos_incompat);
2555                 if (rc)
2556                         GOTO(out_unlock_rename, rc);
2557
2558                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2559
2560                 if (mtgtdir != msrcdir) {
2561                         rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2562                                                   cos_incompat);
2563                 } else if (!mdt_object_remote(mtgtdir) &&
2564                            lh_srcdirp->mlh_pdo_hash !=
2565                            lh_tgtdirp->mlh_pdo_hash) {
2566                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2567                                                 MDS_INODELOCK_UPDATE,
2568                                                 cos_incompat);
2569                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2570                 }
2571                 if (rc != 0) {
2572                         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2573                         GOTO(out_unlock_rename, rc);
2574                 }
2575         }
2576
2577         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2578         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2579
2580         /* find mold object. */
2581         fid_zero(old_fid);
2582         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2583         if (rc != 0)
2584                 GOTO(out_unlock_parents, rc);
2585
2586         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2587                 GOTO(out_unlock_parents, rc = -EINVAL);
2588
2589         if (!fid_is_md_operative(old_fid))
2590                 GOTO(out_unlock_parents, rc = -EPERM);
2591
2592         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2593         if (IS_ERR(mold))
2594                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2595
2596         if (!mdt_object_exists(mold)) {
2597                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2598                                 &mold->mot_obj,
2599                                 "object does not exist");
2600                 GOTO(out_put_old, rc = -ENOENT);
2601         }
2602
2603         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2604                 GOTO(out_put_old, rc = -EXDEV);
2605
2606         /* Check if @mtgtdir is subdir of @mold, before locking child
2607          * to avoid reverse locking. */
2608         if (mtgtdir != msrcdir) {
2609                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2610                                    old_fid);
2611                 if (rc) {
2612                         if (rc == 1)
2613                                 rc = -EINVAL;
2614                         GOTO(out_put_old, rc);
2615                 }
2616         }
2617
2618         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2619         /* save version after locking */
2620         mdt_version_get_save(info, mold, 2);
2621
2622         if (!cos_incompat && mdt_object_remote(mold)) {
2623                 cos_incompat = true;
2624                 mdt_object_put(info->mti_env, mold);
2625                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2626                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2627                 goto relock;
2628         }
2629
2630         /* find mnew object:
2631          * mnew target object may not exist now
2632          * lookup with version checking */
2633         fid_zero(new_fid);
2634         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2635                                       3);
2636         if (rc == 0) {
2637                 /* the new_fid should have been filled at this moment */
2638                 if (lu_fid_eq(old_fid, new_fid))
2639                         GOTO(out_put_old, rc);
2640
2641                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2642                     lu_fid_eq(new_fid, rr->rr_fid2))
2643                         GOTO(out_put_old, rc = -EINVAL);
2644
2645                 if (!fid_is_md_operative(new_fid))
2646                         GOTO(out_put_old, rc = -EPERM);
2647
2648                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2649                 if (IS_ERR(mnew))
2650                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2651
2652                 if (!mdt_object_exists(mnew)) {
2653                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2654                                         &mnew->mot_obj,
2655                                         "object does not exist");
2656                         GOTO(out_put_new, rc = -ENOENT);
2657                 }
2658
2659                 if (mdt_object_remote(mnew)) {
2660                         struct mdt_body  *repbody;
2661
2662                         /* Always send rename req to the target child MDT */
2663                         repbody = req_capsule_server_get(info->mti_pill,
2664                                                          &RMF_MDT_BODY);
2665                         LASSERT(repbody != NULL);
2666                         repbody->mbo_fid1 = *new_fid;
2667                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2668                         GOTO(out_put_new, rc = -EXDEV);
2669                 }
2670                 /* Before locking the target dir, check we do not replace
2671                  * a dir with a non-dir, otherwise it may deadlock with
2672                  * link op which tries to create a link in this dir
2673                  * back to this non-dir. */
2674                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2675                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2676                         GOTO(out_put_new, rc = -EISDIR);
2677
2678                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2679                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2680                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2681                 if (mdt_object_remote(msrcdir)) {
2682                         /* Enqueue lookup lock from the parent MDT */
2683                         rc = mdt_remote_object_lock(info, msrcdir,
2684                                                     mdt_object_fid(mold),
2685                                                     &lh_oldp->mlh_rreg_lh,
2686                                                     lh_oldp->mlh_rreg_mode,
2687                                                     MDS_INODELOCK_LOOKUP,
2688                                                     false);
2689                         if (rc != ELDLM_OK)
2690                                 GOTO(out_put_new, rc);
2691
2692                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2693                 }
2694
2695                 rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
2696                                            cos_incompat);
2697                 if (rc != 0)
2698                         GOTO(out_unlock_old, rc);
2699
2700                 /* Check if @msrcdir is subdir of @mnew, before locking child
2701                  * to avoid reverse locking. */
2702                 if (mtgtdir != msrcdir) {
2703                         rc = mdo_is_subdir(info->mti_env,
2704                                            mdt_object_child(msrcdir), new_fid);
2705                         if (rc) {
2706                                 if (rc == 1)
2707                                         rc = -EINVAL;
2708                                 GOTO(out_unlock_old, rc);
2709                         }
2710                 }
2711
2712                 /* We used to acquire MDS_INODELOCK_FULL here but we
2713                  * can't do this now because a running HSM restore on
2714                  * the rename onto victim will hold the layout
2715                  * lock. See LU-4002. */
2716
2717                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2718                 mdt_lock_reg_init(lh_newp, LCK_EX);
2719                 rc = mdt_reint_object_lock(info, mnew, lh_newp,
2720                                            MDS_INODELOCK_LOOKUP |
2721                                            MDS_INODELOCK_UPDATE,
2722                                            cos_incompat);
2723                 if (rc != 0)
2724                         GOTO(out_unlock_old, rc);
2725
2726                 /* get and save version after locking */
2727                 mdt_version_get_save(info, mnew, 3);
2728         } else if (rc != -EREMOTE && rc != -ENOENT) {
2729                 GOTO(out_put_old, rc);
2730         } else {
2731                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2732                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2733                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2734                 if (mdt_object_remote(msrcdir)) {
2735                         /* Enqueue lookup lock from the parent MDT */
2736                         rc = mdt_remote_object_lock(info, msrcdir,
2737                                                     mdt_object_fid(mold),
2738                                                     &lh_oldp->mlh_rreg_lh,
2739                                                     lh_oldp->mlh_rreg_mode,
2740                                                     MDS_INODELOCK_LOOKUP,
2741                                                     false);
2742                         if (rc != ELDLM_OK)
2743                                 GOTO(out_put_old, rc);
2744
2745                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2746                 }
2747
2748                 rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
2749                                            cos_incompat);
2750                 if (rc != 0)
2751                         GOTO(out_unlock_old, rc);
2752
2753                 mdt_enoent_version_save(info, 3);
2754         }
2755
2756         /* step 5: rename it */
2757         mdt_reint_init_ma(info, ma);
2758
2759         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2760                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2761
2762         if (mnew != NULL)
2763                 mutex_lock(&mnew->mot_lov_mutex);
2764
2765         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2766                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2767                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2768                         &rr->rr_tgt_name, ma);
2769
2770         if (mnew != NULL)
2771                 mutex_unlock(&mnew->mot_lov_mutex);
2772
2773         /* handle last link of tgt object */
2774         if (rc == 0) {
2775                 mdt_counter_incr(req, LPROC_MDT_RENAME);
2776                 if (mnew) {
2777                         mdt_handle_last_unlink(info, mnew, ma);
2778                         discard = mdt_dom_check_for_discard(info, mnew);
2779                 }
2780                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2781                                          msrcdir, mtgtdir);
2782         }
2783
2784         EXIT;
2785         if (mnew != NULL)
2786                 mdt_object_unlock(info, mnew, lh_newp, rc);
2787 out_unlock_old:
2788         mdt_object_unlock(info, mold, lh_oldp, rc);
2789 out_put_new:
2790         if (mnew && !discard)
2791                 mdt_object_put(info->mti_env, mnew);
2792 out_put_old:
2793         mdt_object_put(info->mti_env, mold);
2794 out_unlock_parents:
2795         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2796         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2797 out_unlock_rename:
2798         if (lustre_handle_is_used(&rename_lh))
2799                 mdt_rename_unlock(&rename_lh);
2800 out_put_tgtdir:
2801         mdt_object_put(info->mti_env, mtgtdir);
2802 out_put_srcdir:
2803         mdt_object_put(info->mti_env, msrcdir);
2804
2805         /* The DoM discard can be done right in the place above where it is
2806          * assigned, meanwhile it is done here after rename unlock due to
2807          * compatibility with old clients, for them the discard blocks
2808          * the main thread until completion. Check LU-11359 for details.
2809          */
2810         if (discard) {
2811                 mdt_dom_discard_data(info, mnew);
2812                 mdt_object_put(info->mti_env, mnew);
2813         }
2814         return rc;
2815 }
2816
2817 static int mdt_reint_resync(struct mdt_thread_info *info,
2818                             struct mdt_lock_handle *lhc)
2819 {
2820         struct mdt_reint_record *rr = &info->mti_rr;
2821         struct ptlrpc_request *req = mdt_info_req(info);
2822         struct md_attr *ma = &info->mti_attr;
2823         struct mdt_object *mo;
2824         struct ldlm_lock *lease;
2825         struct mdt_body *repbody;
2826         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2827         bool lease_broken;
2828         int rc, rc2;
2829
2830         ENTRY;
2831
2832         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2833
2834         if (info->mti_dlm_req)
2835                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2836
2837         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2838         if (IS_ERR(mo))
2839                 GOTO(out, rc = PTR_ERR(mo));
2840
2841         if (!mdt_object_exists(mo))
2842                 GOTO(out_obj, rc = -ENOENT);
2843
2844         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2845                 GOTO(out_obj, rc = -EINVAL);
2846
2847         if (mdt_object_remote(mo))
2848                 GOTO(out_obj, rc = -EREMOTE);
2849
2850         lease = ldlm_handle2lock(rr->rr_lease_handle);
2851         if (lease == NULL)
2852                 GOTO(out_obj, rc = -ESTALE);
2853
2854         /* It's really necessary to grab open_sem and check if the lease lock
2855          * has been lost. There would exist a concurrent writer coming in and
2856          * generating some dirty data in memory cache, the writeback would fail
2857          * after the layout version is increased by MDS_REINT_RESYNC RPC. */
2858         if (!down_write_trylock(&mo->mot_open_sem))
2859                 GOTO(out_put_lease, rc = -EBUSY);
2860
2861         lock_res_and_lock(lease);
2862         lease_broken = ldlm_is_cancel(lease);
2863         unlock_res_and_lock(lease);
2864         if (lease_broken)
2865                 GOTO(out_unlock, rc = -EBUSY);
2866
2867         /* the file has yet opened by anyone else after we took the lease. */
2868         layout.mlc_opc = MD_LAYOUT_RESYNC;
2869         lhc = &info->mti_lh[MDT_LH_LOCAL];
2870         rc = mdt_layout_change(info, mo, lhc, &layout);
2871         if (rc)
2872                 GOTO(out_unlock, rc);
2873
2874         mdt_object_unlock(info, mo, lhc, 0);
2875
2876         ma->ma_need = MA_INODE;
2877         ma->ma_valid = 0;
2878         rc = mdt_attr_get_complex(info, mo, ma);
2879         if (rc != 0)
2880                 GOTO(out_unlock, rc);
2881
2882         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2883         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2884
2885         EXIT;
2886 out_unlock:
2887         up_write(&mo->mot_open_sem);
2888 out_put_lease:
2889         LDLM_LOCK_PUT(lease);
2890 out_obj:
2891         mdt_object_put(info->mti_env, mo);
2892 out:
2893         mdt_client_compatibility(info);
2894         rc2 = mdt_fix_reply(info);
2895         if (rc == 0)
2896                 rc = rc2;
2897         return rc;
2898 }
2899
2900 struct mdt_reinter {
2901         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
2902         enum lprocfs_extra_opc mr_extra_opc;
2903 };
2904
2905 static const struct mdt_reinter mdt_reinters[] = {
2906         [REINT_SETATTR] = {
2907                 .mr_handler = &mdt_reint_setattr,
2908                 .mr_extra_opc = MDS_REINT_SETATTR,
2909         },
2910         [REINT_CREATE] = {
2911                 .mr_handler = &mdt_reint_create,
2912                 .mr_extra_opc = MDS_REINT_CREATE,
2913         },
2914         [REINT_LINK] = {
2915                 .mr_handler = &mdt_reint_link,
2916                 .mr_extra_opc = MDS_REINT_LINK,
2917         },
2918         [REINT_UNLINK] = {
2919                 .mr_handler = &mdt_reint_unlink,
2920                 .mr_extra_opc = MDS_REINT_UNLINK,
2921         },
2922         [REINT_RENAME] = {
2923                 .mr_handler = &mdt_reint_rename,
2924                 .mr_extra_opc = MDS_REINT_RENAME,
2925         },
2926         [REINT_OPEN] = {
2927                 .mr_handler = &mdt_reint_open,
2928                 .mr_extra_opc = MDS_REINT_OPEN,
2929         },
2930         [REINT_SETXATTR] = {
2931                 .mr_handler = &mdt_reint_setxattr,
2932                 .mr_extra_opc = MDS_REINT_SETXATTR,
2933         },
2934         [REINT_RMENTRY] = {
2935                 .mr_handler = &mdt_reint_unlink,
2936                 .mr_extra_opc = MDS_REINT_UNLINK,
2937         },
2938         [REINT_MIGRATE] = {
2939                 .mr_handler = &mdt_reint_migrate,
2940                 .mr_extra_opc = MDS_REINT_RENAME,
2941         },
2942         [REINT_RESYNC] = {
2943                 .mr_handler = &mdt_reint_resync,
2944                 .mr_extra_opc = MDS_REINT_RESYNC,
2945         },
2946 };
2947
2948 int mdt_reint_rec(struct mdt_thread_info *info,
2949                   struct mdt_lock_handle *lhc)
2950 {
2951         const struct mdt_reinter *mr;
2952         int rc;
2953         ENTRY;
2954
2955         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
2956                 RETURN(-EPROTO);
2957
2958         mr = &mdt_reinters[info->mti_rr.rr_opcode];
2959         if (mr->mr_handler == NULL)
2960                 RETURN(-EPROTO);
2961
2962         rc = (*mr->mr_handler)(info, lhc);
2963
2964         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
2965                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
2966
2967         RETURN(rc);
2968 }