Whamcloud - gitweb
LU-12675 mdt: release object reference upon error
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdt/mdt_reint.c
33  *
34  * Lustre Metadata Target (mdt) reintegration routines
35  *
36  * Author: Peter Braam <braam@clusterfs.com>
37  * Author: Andreas Dilger <adilger@clusterfs.com>
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Huang Hua <huanghua@clusterfs.com>
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <lprocfs_status.h>
46 #include "mdt_internal.h"
47 #include <lustre_lmv.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84         ENTRY;
85
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 static int mdt_lookup_version_check(struct mdt_thread_info *info,
194                                     struct mdt_object *p,
195                                     const struct lu_name *lname,
196                                     struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
210                 if (likely(!IS_ERR(child))) {
211                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
212                         mdt_object_put(info->mti_env, child);
213                 }
214         }
215         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
216         return vbrc ? vbrc : rc;
217
218 }
219
220 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
221                              struct mdt_object *obj,
222                              struct ldlm_enqueue_info *einfo,
223                              int decref)
224 {
225         union ldlm_policy_data *policy = &mti->mti_policy;
226         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
227         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
228         int i;
229
230         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
231         LASSERT(slave_locks);
232
233         memset(policy, 0, sizeof(*policy));
234         policy->l_inodebits.bits = einfo->ei_inodebits;
235         mdt_lock_handle_init(lh);
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < slave_locks->ha_count; i++) {
238                 if (test_bit(i, (void *)slave_locks->ha_map))
239                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 slave_locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
279                            enum ldlm_mode mode, __u64 ibits,
280                            struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285
286         einfo->ei_type = LDLM_IBITS;
287         einfo->ei_mode = mode;
288         einfo->ei_cb_bl = mdt_remote_blocking_ast;
289         einfo->ei_cb_local_bl = mdt_blocking_ast;
290         einfo->ei_cb_cp = ldlm_completion_ast;
291         einfo->ei_enq_slave = 1;
292         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
293         einfo->ei_inodebits = ibits;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 int mdt_reint_striped_lock(struct mdt_thread_info *info,
302                            struct mdt_object *o,
303                            struct mdt_lock_handle *lh,
304                            __u64 ibits,
305                            struct ldlm_enqueue_info *einfo,
306                            bool cos_incompat)
307 {
308         int rc;
309
310         LASSERT(!mdt_object_remote(o));
311
312         memset(einfo, 0, sizeof(*einfo));
313
314         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
315         if (rc)
316                 return rc;
317
318         rc = mdt_object_striped(info, o);
319         if (rc != 1) {
320                 if (rc < 0)
321                         mdt_object_unlock(info, o, lh, rc);
322                 return rc;
323         }
324
325         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
326         if (rc) {
327                 mdt_object_unlock(info, o, lh, rc);
328                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
329                         rc = 0;
330         }
331
332         return rc;
333 }
334
335 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
336                               struct mdt_object *o,
337                               struct mdt_lock_handle *lh,
338                               struct ldlm_enqueue_info *einfo, int decref)
339 {
340         if (einfo->ei_cbdata)
341                 mdt_unlock_slaves(info, o, einfo, decref);
342         mdt_object_unlock(info, o, lh, decref);
343 }
344
345 /*
346  * VBR: we save three versions in reply:
347  * 0 - parent. Check that parent version is the same during replay.
348  * 1 - name. Version of 'name' if file exists with the same name or
349  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
350  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
351  * check.
352  */
353 static int mdt_create(struct mdt_thread_info *info)
354 {
355         struct mdt_device       *mdt = info->mti_mdt;
356         struct mdt_object       *parent;
357         struct mdt_object       *child;
358         struct mdt_lock_handle  *lh;
359         struct mdt_body         *repbody;
360         struct md_attr          *ma = &info->mti_attr;
361         struct mdt_reint_record *rr = &info->mti_rr;
362         struct md_op_spec       *spec = &info->mti_spec;
363         int rc;
364         ENTRY;
365
366         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  ("DNAME"->"DFID") "
367                   "in "DFID,
368                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
369
370         if (!fid_is_md_operative(rr->rr_fid1))
371                 RETURN(-EPERM);
372
373         if (S_ISDIR(ma->ma_attr.la_mode) &&
374             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
375                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
376                 struct lu_ucred *uc = mdt_ucred(info);
377                 struct obd_export *exp = mdt_info_req(info)->rq_export;
378
379                 /* Only new clients can create remote dir( >= 2.4) and
380                  * striped dir(>= 2.6), old client will return -ENOTSUPP */
381                 if (!mdt_is_dne_client(exp))
382                         RETURN(-ENOTSUPP);
383
384                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
385                         if (!mdt_is_striped_client(exp))
386                                 RETURN(-ENOTSUPP);
387
388                         if (!mdt->mdt_enable_striped_dir)
389                                 RETURN(-EPERM);
390                 } else if (!mdt->mdt_enable_remote_dir) {
391                         RETURN(-EPERM);
392                 }
393
394                 if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
395                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
396                     mdt->mdt_enable_remote_dir_gid != -1)
397                         RETURN(-EPERM);
398         }
399
400         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
401
402         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
403         if (IS_ERR(parent))
404                 RETURN(PTR_ERR(parent));
405
406         if (!mdt_object_exists(parent))
407                 GOTO(put_parent, rc = -ENOENT);
408
409         lh = &info->mti_lh[MDT_LH_PARENT];
410         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
411         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
412         if (rc)
413                 GOTO(put_parent, rc);
414
415         if (!mdt_object_remote(parent)) {
416                 rc = mdt_version_get_check_save(info, parent, 0);
417                 if (rc)
418                         GOTO(unlock_parent, rc);
419         }
420
421         /*
422          * Check child name version during replay.
423          * During create replay a file may exist with same name.
424          */
425         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
426                                       &info->mti_tmp_fid1, 1);
427         if (rc == 0)
428                 GOTO(unlock_parent, rc = -EEXIST);
429
430         /* -ENOENT is expected here */
431         if (rc != -ENOENT)
432                 GOTO(unlock_parent, rc);
433
434         /* save version of file name for replay, it must be ENOENT here */
435         mdt_enoent_version_save(info, 1);
436
437         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
438         if (unlikely(IS_ERR(child)))
439                 GOTO(unlock_parent, rc = PTR_ERR(child));
440
441         ma->ma_need = MA_INODE;
442         ma->ma_valid = 0;
443
444         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
445                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
446
447         /* Version of child will be updated on disk. */
448         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
449         rc = mdt_version_get_check_save(info, child, 2);
450         if (rc)
451                 GOTO(put_child, rc);
452
453         /* Let lower layer know current lock mode. */
454         info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
455
456         /*
457          * Do not perform lookup sanity check. We know that name does
458          * not exist.
459          */
460         info->mti_spec.sp_cr_lookup = 0;
461         info->mti_spec.sp_feat = &dt_directory_features;
462
463         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
464                         mdt_object_child(child), &info->mti_spec, ma);
465         if (rc == 0)
466                 rc = mdt_attr_get_complex(info, child, ma);
467
468         if (rc < 0)
469                 GOTO(put_child, rc);
470
471         /*
472          * On DNE, we need to eliminate dependey between 'mkdir a' and
473          * 'mkdir a/b' if b is a striped directory, to achieve this, two
474          * things are done below:
475          * 1. save child and slaves lock.
476          * 2. if the child is a striped directory, relock parent so to
477          *    compare against with COS locks to ensure parent was
478          *    committed to disk.
479          */
480         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
481                 struct mdt_lock_handle *lhc;
482                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
483                 bool cos_incompat;
484
485                 rc = mdt_object_striped(info, child);
486                 if (rc < 0)
487                         GOTO(put_child, rc);
488
489                 cos_incompat = rc;
490                 if (cos_incompat) {
491                         if (!mdt_object_remote(parent)) {
492                                 mdt_object_unlock(info, parent, lh, 1);
493                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
494                                 rc = mdt_reint_object_lock(info, parent, lh,
495                                                            MDS_INODELOCK_UPDATE,
496                                                            true);
497                                 if (rc)
498                                         GOTO(put_child, rc);
499                         }
500                 }
501
502                 lhc = &info->mti_lh[MDT_LH_CHILD];
503                 mdt_lock_handle_init(lhc);
504                 mdt_lock_reg_init(lhc, LCK_PW);
505                 rc = mdt_reint_striped_lock(info, child, lhc,
506                                             MDS_INODELOCK_UPDATE, einfo,
507                                             cos_incompat);
508                 if (rc)
509                         GOTO(put_child, rc);
510
511                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
512         }
513
514         /* Return fid & attr to client. */
515         if (ma->ma_valid & MA_INODE)
516                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
517                                    mdt_object_fid(child));
518 put_child:
519         mdt_object_put(info->mti_env, child);
520 unlock_parent:
521         mdt_object_unlock(info, parent, lh, rc);
522 put_parent:
523         mdt_object_put(info->mti_env, parent);
524         RETURN(rc);
525 }
526
527 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
528                         struct md_attr *ma)
529 {
530         struct mdt_lock_handle  *lh;
531         int do_vbr = ma->ma_attr.la_valid &
532                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
533         __u64 lockpart = MDS_INODELOCK_UPDATE;
534         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
535         bool cos_incompat;
536         int rc;
537         ENTRY;
538
539         rc = mdt_object_striped(info, mo);
540         if (rc < 0)
541                 RETURN(rc);
542
543         cos_incompat = rc;
544
545         lh = &info->mti_lh[MDT_LH_PARENT];
546         mdt_lock_reg_init(lh, LCK_PW);
547
548         /* Even though the new MDT will grant PERM lock to the old
549          * client, but the old client will almost ignore that during
550          * So it needs to revoke both LOOKUP and PERM lock here, so
551          * both new and old client can cancel the dcache */
552         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
553                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
554
555         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
556                                     cos_incompat);
557         if (rc != 0)
558                 RETURN(rc);
559
560         /* all attrs are packed into mti_attr in unpack_setattr */
561         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
562                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
563
564         /* VBR: update version if attr changed are important for recovery */
565         if (do_vbr) {
566                 /* update on-disk version of changed object */
567                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
568                 rc = mdt_version_get_check_save(info, mo, 0);
569                 if (rc)
570                         GOTO(out_unlock, rc);
571         }
572
573         /* Ensure constant striping during chown(). See LU-2789. */
574         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
575                 mutex_lock(&mo->mot_lov_mutex);
576
577         /* all attrs are packed into mti_attr in unpack_setattr */
578         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
579
580         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
581                 mutex_unlock(&mo->mot_lov_mutex);
582
583         if (rc != 0)
584                 GOTO(out_unlock, rc);
585         mdt_dom_obj_lvb_update(info->mti_env, mo, false);
586         EXIT;
587 out_unlock:
588         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
589         return rc;
590 }
591
592 /**
593  * Check HSM flags and add HS_DIRTY flag if relevant.
594  *
595  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
596  * and is not RELEASED.
597  */
598 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
599                         struct md_attr *ma)
600 {
601         struct lu_ucred *uc = mdt_ucred(info);
602         cfs_cap_t cap_saved;
603         int rc;
604         ENTRY;
605
606         /* If the file was modified, add the dirty flag */
607         ma->ma_need = MA_HSM;
608         rc = mdt_attr_get_complex(info, mo, ma);
609         if (rc) {
610                 CERROR("file attribute read error for "DFID": %d.\n",
611                         PFID(mdt_object_fid(mo)), rc);
612                 RETURN(rc);
613         }
614
615         /* If an up2date copy exists in the backend, add dirty flag */
616         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
617             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
618                 ma->ma_hsm.mh_flags |= HS_DIRTY;
619
620                 /* Bump cap so that closes from non-owner writers can
621                  * set the HSM state to dirty. */
622                 cap_saved = uc->uc_cap;
623                 uc->uc_cap |= MD_CAP_TO_MASK(CFS_CAP_FOWNER);
624                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
625                 uc->uc_cap = cap_saved;
626                 if (rc)
627                         CERROR("file attribute change error for "DFID": %d\n",
628                                 PFID(mdt_object_fid(mo)), rc);
629         }
630
631         RETURN(rc);
632 }
633
634 static int mdt_reint_setattr(struct mdt_thread_info *info,
635                              struct mdt_lock_handle *lhc)
636 {
637         struct mdt_device *mdt = info->mti_mdt;
638         struct md_attr *ma = &info->mti_attr;
639         struct mdt_reint_record *rr = &info->mti_rr;
640         struct ptlrpc_request *req = mdt_info_req(info);
641         struct mdt_object *mo;
642         struct mdt_body *repbody;
643         int rc, rc2;
644         ENTRY;
645
646         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
647                   (unsigned int)ma->ma_attr.la_valid);
648
649         if (info->mti_dlm_req)
650                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
651
652         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
653         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
654         if (IS_ERR(mo))
655                 GOTO(out, rc = PTR_ERR(mo));
656
657         if (!mdt_object_exists(mo))
658                 GOTO(out_put, rc = -ENOENT);
659
660         if (mdt_object_remote(mo))
661                 GOTO(out_put, rc = -EREMOTE);
662
663         /* revoke lease lock if size is going to be changed */
664         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
665                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
666                      atomic_read(&mo->mot_lease_count) > 0)) {
667                 down_read(&mo->mot_open_sem);
668
669                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
670                         lhc = &info->mti_lh[MDT_LH_LOCAL];
671                         mdt_lock_reg_init(lhc, LCK_CW);
672
673                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
674                         if (rc != 0) {
675                                 up_read(&mo->mot_open_sem);
676                                 GOTO(out_put, rc);
677                         }
678
679                         /* revoke lease lock */
680                         mdt_object_unlock(info, mo, lhc, 1);
681                 }
682                 up_read(&mo->mot_open_sem);
683         }
684
685         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
686                 /* Check write access for the O_TRUNC case */
687                 if (mdt_write_read(mo) < 0)
688                         GOTO(out_put, rc = -ETXTBSY);
689
690                 /* LU-10286: compatibility check for FLR.
691                  * Please check the comment in mdt_finish_open() for details */
692                 if (!exp_connect_flr(info->mti_exp) ||
693                     !exp_connect_overstriping(info->mti_exp)) {
694                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
695                         if (rc < 0 && rc != -ENODATA)
696                                 GOTO(out_put, rc);
697
698                         if (!exp_connect_flr(info->mti_exp)) {
699                                 if (rc > 0 &&
700                                     mdt_lmm_is_flr(info->mti_big_lmm))
701                                         GOTO(out_put, rc = -EOPNOTSUPP);
702                         }
703
704                         if (!exp_connect_overstriping(info->mti_exp)) {
705                                 if (rc > 0 &&
706                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
707                                         GOTO(out_put, rc = -EOPNOTSUPP);
708                         }
709                 }
710
711                 /* For truncate, the file size sent from client
712                  * is believable, but the blocks are incorrect,
713                  * which makes the block size in LSOM attribute
714                  * inconsisent with the real block size.
715                  */
716                 rc = mdt_lsom_update(info, mo, true);
717                 if (rc)
718                         GOTO(out_put, rc);
719         }
720
721         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
722                 if (ma->ma_valid & MA_LOV)
723                         GOTO(out_put, rc = -EPROTO);
724
725                 /* MDT supports FMD for regular files due to Data-on-MDT */
726                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
727                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
728                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
729                                        req->rq_xid);
730
731                 rc = mdt_attr_set(info, mo, ma);
732                 if (rc)
733                         GOTO(out_put, rc);
734         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
735                    (ma->ma_valid & MA_INODE)) {
736                 struct lu_buf *buf = &info->mti_buf;
737                 struct lu_ucred *uc = mdt_ucred(info);
738                 struct mdt_lock_handle *lh;
739                 const char *name;
740                 __u64 lockpart = MDS_INODELOCK_XATTR;
741
742                 /* reject if either remote or striped dir is disabled */
743                 if (ma->ma_valid & MA_LMV) {
744                         if (!mdt->mdt_enable_remote_dir ||
745                             !mdt->mdt_enable_striped_dir)
746                                 GOTO(out_put, rc = -EPERM);
747
748                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
749                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
750                             mdt->mdt_enable_remote_dir_gid != -1)
751                                 GOTO(out_put, rc = -EPERM);
752                 }
753
754                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
755                         GOTO(out_put, rc = -ENOTDIR);
756
757                 if (ma->ma_attr.la_valid != 0)
758                         GOTO(out_put, rc = -EPROTO);
759
760                 if (ma->ma_valid & MA_LOV) {
761                         buf->lb_buf = ma->ma_lmm;
762                         buf->lb_len = ma->ma_lmm_size;
763                         name = XATTR_NAME_LOV;
764                 } else {
765                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
766
767                         buf->lb_buf = lmu;
768                         buf->lb_len = ma->ma_lmv_size;
769
770                         if (le32_to_cpu(lmu->lum_hash_type) &
771                             LMV_HASH_FLAG_SPACE) {
772                                 /*
773                                  * only allow setting "space" hash flag on
774                                  * plain directory.
775                                  */
776                                 rc = mdt_object_striped(info, mo);
777                                 if (rc)
778                                         GOTO(out_put,
779                                              rc = (rc == 1) ? -EPERM : rc);
780                         }
781
782                         name = XATTR_NAME_DEFAULT_LMV;
783                         /* force client to update dir default layout */
784                         lockpart |= MDS_INODELOCK_LOOKUP;
785                 }
786
787                 lh = &info->mti_lh[MDT_LH_PARENT];
788                 mdt_lock_reg_init(lh, LCK_PW);
789
790                 rc = mdt_object_lock(info, mo, lh, lockpart);
791                 if (rc != 0)
792                         GOTO(out_put, rc);
793
794                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
795                                   name, 0);
796
797                 mdt_object_unlock(info, mo, lh, rc);
798                 if (rc)
799                         GOTO(out_put, rc);
800         } else {
801                 GOTO(out_put, rc = -EPROTO);
802         }
803
804         /* If file data is modified, add the dirty flag */
805         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
806                 rc = mdt_add_dirty_flag(info, mo, ma);
807
808         ma->ma_need = MA_INODE;
809         ma->ma_valid = 0;
810         rc = mdt_attr_get_complex(info, mo, ma);
811         if (rc != 0)
812                 GOTO(out_put, rc);
813
814         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
815
816         EXIT;
817 out_put:
818         mdt_object_put(info->mti_env, mo);
819 out:
820         if (rc == 0)
821                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
822
823         mdt_client_compatibility(info);
824         rc2 = mdt_fix_reply(info);
825         if (rc == 0)
826                 rc = rc2;
827         return rc;
828 }
829
830 static int mdt_reint_create(struct mdt_thread_info *info,
831                             struct mdt_lock_handle *lhc)
832 {
833         struct ptlrpc_request   *req = mdt_info_req(info);
834         int                     rc;
835         ENTRY;
836
837         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
838                 RETURN(err_serious(-ESTALE));
839
840         if (info->mti_dlm_req)
841                 ldlm_request_cancel(mdt_info_req(info),
842                                     info->mti_dlm_req, 0, LATF_SKIP);
843
844         if (!lu_name_is_valid(&info->mti_rr.rr_name))
845                 RETURN(-EPROTO);
846
847         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
848         case S_IFDIR:
849                 mdt_counter_incr(req, LPROC_MDT_MKDIR);
850                 break;
851         case S_IFREG:
852         case S_IFLNK:
853         case S_IFCHR:
854         case S_IFBLK:
855         case S_IFIFO:
856         case S_IFSOCK:
857                 /* Special file should stay on the same node as parent. */
858                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
859                 break;
860         default:
861                 CERROR("%s: Unsupported mode %o\n",
862                        mdt_obd_name(info->mti_mdt),
863                        info->mti_attr.ma_attr.la_mode);
864                 RETURN(err_serious(-EOPNOTSUPP));
865         }
866
867         rc = mdt_create(info);
868         RETURN(rc);
869 }
870
871 /*
872  * VBR: save parent version in reply and child version getting by its name.
873  * Version of child is getting and checking during its lookup. If
874  */
875 static int mdt_reint_unlink(struct mdt_thread_info *info,
876                             struct mdt_lock_handle *lhc)
877 {
878         struct mdt_reint_record *rr = &info->mti_rr;
879         struct ptlrpc_request *req = mdt_info_req(info);
880         struct md_attr *ma = &info->mti_attr;
881         struct lu_fid *child_fid = &info->mti_tmp_fid1;
882         struct mdt_object *mp;
883         struct mdt_object *mc;
884         struct mdt_lock_handle *parent_lh;
885         struct mdt_lock_handle *child_lh;
886         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
887         __u64 lock_ibits;
888         bool cos_incompat = false;
889         int no_name = 0;
890         int rc;
891
892         ENTRY;
893
894         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
895                   PNAME(&rr->rr_name));
896
897         if (info->mti_dlm_req)
898                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
899
900         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
901                 RETURN(err_serious(-ENOENT));
902
903         if (!fid_is_md_operative(rr->rr_fid1))
904                 RETURN(-EPERM);
905
906         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
907         if (IS_ERR(mp))
908                 RETURN(PTR_ERR(mp));
909
910         if (mdt_object_remote(mp)) {
911                 cos_incompat = true;
912         } else {
913                 rc = mdt_version_get_check_save(info, mp, 0);
914                 if (rc)
915                         GOTO(put_parent, rc);
916         }
917
918 relock:
919         parent_lh = &info->mti_lh[MDT_LH_PARENT];
920         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
921         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
922                                    cos_incompat);
923         if (rc != 0)
924                 GOTO(put_parent, rc);
925
926         /* lookup child object along with version checking */
927         fid_zero(child_fid);
928         rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
929         if (rc != 0) {
930                 /* Name might not be able to find during resend of
931                  * remote unlink, considering following case.
932                  * dir_A is a remote directory, the name entry of
933                  * dir_A is on MDT0, the directory is on MDT1,
934                  *
935                  * 1. client sends unlink req to MDT1.
936                  * 2. MDT1 sends name delete update to MDT0.
937                  * 3. name entry is being deleted in MDT0 synchronously.
938                  * 4. MDT1 is restarted.
939                  * 5. client resends unlink req to MDT1. So it can not
940                  *    find the name entry on MDT0 anymore.
941                  * In this case, MDT1 only needs to destory the local
942                  * directory.
943                  * */
944                 if (mdt_object_remote(mp) && rc == -ENOENT &&
945                     !fid_is_zero(rr->rr_fid2) &&
946                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
947                         no_name = 1;
948                         *child_fid = *rr->rr_fid2;
949                  } else {
950                         GOTO(unlock_parent, rc);
951                  }
952         }
953
954         if (!fid_is_md_operative(child_fid))
955                 GOTO(unlock_parent, rc = -EPERM);
956
957         /* We will lock the child regardless it is local or remote. No harm. */
958         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
959         if (IS_ERR(mc))
960                 GOTO(unlock_parent, rc = PTR_ERR(mc));
961
962         if (!cos_incompat) {
963                 rc = mdt_object_striped(info, mc);
964                 if (rc < 0)
965                         GOTO(put_child, rc);
966
967                 cos_incompat = rc;
968                 if (cos_incompat) {
969                         mdt_object_put(info->mti_env, mc);
970                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
971                         goto relock;
972                 }
973         }
974
975         child_lh = &info->mti_lh[MDT_LH_CHILD];
976         mdt_lock_reg_init(child_lh, LCK_EX);
977         if (info->mti_spec.sp_rm_entry) {
978                 struct lu_ucred *uc  = mdt_ucred(info);
979
980                 if (!mdt_is_dne_client(req->rq_export))
981                         /* Return -ENOTSUPP for old client */
982                         GOTO(put_child, rc = -ENOTSUPP);
983
984                 if (!md_capable(uc, CFS_CAP_SYS_ADMIN))
985                         GOTO(put_child, rc = -EPERM);
986
987                 ma->ma_need = MA_INODE;
988                 ma->ma_valid = 0;
989                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
990                                 NULL, &rr->rr_name, ma, no_name);
991                 GOTO(put_child, rc);
992         }
993
994         if (mdt_object_remote(mc)) {
995                 struct mdt_body  *repbody;
996
997                 if (!fid_is_zero(rr->rr_fid2)) {
998                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
999                                mdt_obd_name(info->mti_mdt),
1000                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1001                         GOTO(put_child, rc = -ENOENT);
1002                 }
1003                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1004                        mdt_obd_name(info->mti_mdt),
1005                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1006
1007                 if (!mdt_is_dne_client(req->rq_export))
1008                         /* Return -ENOTSUPP for old client */
1009                         GOTO(put_child, rc = -ENOTSUPP);
1010
1011                 /* Revoke the LOOKUP lock of the remote object granted by
1012                  * this MDT. Since the unlink will happen on another MDT,
1013                  * it will release the LOOKUP lock right away. Then What
1014                  * would happen if another client try to grab the LOOKUP
1015                  * lock at the same time with unlink XXX */
1016                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1017                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1018                 LASSERT(repbody != NULL);
1019                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1020                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1021                 GOTO(unlock_child, rc = -EREMOTE);
1022         }
1023         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1024          * this now because a running HSM restore on the child (unlink
1025          * victim) will hold the layout lock. See LU-4002. */
1026         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1027         if (mdt_object_remote(mp)) {
1028                 /* Enqueue lookup lock from parent MDT */
1029                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1030                                             &child_lh->mlh_rreg_lh,
1031                                             child_lh->mlh_rreg_mode,
1032                                             MDS_INODELOCK_LOOKUP, false);
1033                 if (rc != ELDLM_OK)
1034                         GOTO(put_child, rc);
1035
1036                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1037         }
1038
1039         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1040                                     cos_incompat);
1041         if (rc != 0)
1042                 GOTO(put_child, rc);
1043
1044         /*
1045          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1046          * whether need MA_LOV and MA_COOKIE.
1047          */
1048         ma->ma_need = MA_INODE;
1049         ma->ma_valid = 0;
1050
1051         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1052                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1053         /* save version when object is locked */
1054         mdt_version_get_save(info, mc, 1);
1055
1056         mutex_lock(&mc->mot_lov_mutex);
1057
1058         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1059                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1060
1061         mutex_unlock(&mc->mot_lov_mutex);
1062         if (rc != 0)
1063                 GOTO(unlock_child, rc);
1064
1065         if (!lu_object_is_dying(&mc->mot_header)) {
1066                 rc = mdt_attr_get_complex(info, mc, ma);
1067                 if (rc)
1068                         GOTO(out_stat, rc);
1069         } else if (mdt_dom_check_for_discard(info, mc)) {
1070                 mdt_dom_discard_data(info, mc);
1071         }
1072         mdt_handle_last_unlink(info, mc, ma);
1073
1074 out_stat:
1075         if (ma->ma_valid & MA_INODE) {
1076                 switch (ma->ma_attr.la_mode & S_IFMT) {
1077                 case S_IFDIR:
1078                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
1079                         break;
1080                 case S_IFREG:
1081                 case S_IFLNK:
1082                 case S_IFCHR:
1083                 case S_IFBLK:
1084                 case S_IFIFO:
1085                 case S_IFSOCK:
1086                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
1087                         break;
1088                 default:
1089                         LASSERTF(0, "bad file type %o unlinking\n",
1090                                 ma->ma_attr.la_mode);
1091                 }
1092         }
1093
1094         EXIT;
1095
1096 unlock_child:
1097         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1098 put_child:
1099         mdt_object_put(info->mti_env, mc);
1100 unlock_parent:
1101         mdt_object_unlock(info, mp, parent_lh, rc);
1102 put_parent:
1103         mdt_object_put(info->mti_env, mp);
1104         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1105         return rc;
1106 }
1107
1108 /*
1109  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1110  * name.
1111  */
1112 static int mdt_reint_link(struct mdt_thread_info *info,
1113                           struct mdt_lock_handle *lhc)
1114 {
1115         struct mdt_reint_record *rr = &info->mti_rr;
1116         struct ptlrpc_request   *req = mdt_info_req(info);
1117         struct md_attr          *ma = &info->mti_attr;
1118         struct mdt_object       *ms;
1119         struct mdt_object       *mp;
1120         struct mdt_lock_handle  *lhs;
1121         struct mdt_lock_handle  *lhp;
1122         bool cos_incompat;
1123         int rc;
1124         ENTRY;
1125
1126         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1127                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1128
1129         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1130                 RETURN(err_serious(-ENOENT));
1131
1132         if (info->mti_dlm_req)
1133                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1134
1135         /* Invalid case so return error immediately instead of
1136          * processing it */
1137         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1138                 RETURN(-EPERM);
1139
1140         if (!fid_is_md_operative(rr->rr_fid1) ||
1141             !fid_is_md_operative(rr->rr_fid2))
1142                 RETURN(-EPERM);
1143
1144         /* step 1: find target parent dir */
1145         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1146         if (IS_ERR(mp))
1147                 RETURN(PTR_ERR(mp));
1148
1149         rc = mdt_version_get_check_save(info, mp, 0);
1150         if (rc)
1151                 GOTO(put_parent, rc);
1152
1153         /* step 2: find source */
1154         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1155         if (IS_ERR(ms))
1156                 GOTO(put_parent, rc = PTR_ERR(ms));
1157
1158         if (!mdt_object_exists(ms)) {
1159                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1160                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1161                 GOTO(put_source, rc = -ENOENT);
1162         }
1163
1164         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1165
1166         lhp = &info->mti_lh[MDT_LH_PARENT];
1167         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1168         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1169                                    cos_incompat);
1170         if (rc != 0)
1171                 GOTO(put_source, rc);
1172
1173         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1174
1175         lhs = &info->mti_lh[MDT_LH_CHILD];
1176         mdt_lock_reg_init(lhs, LCK_EX);
1177         rc = mdt_reint_object_lock(info, ms, lhs,
1178                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1179                                    cos_incompat);
1180         if (rc != 0)
1181                 GOTO(unlock_parent, rc);
1182
1183         /* step 3: link it */
1184         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1185                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1186
1187         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1188         rc = mdt_version_get_check_save(info, ms, 1);
1189         if (rc)
1190                 GOTO(unlock_source, rc);
1191
1192         /** check target version by name during replay */
1193         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1194                                       &info->mti_tmp_fid1, 2);
1195         if (rc != 0 && rc != -ENOENT)
1196                 GOTO(unlock_source, rc);
1197         /* save version of file name for replay, it must be ENOENT here */
1198         if (!req_is_replay(mdt_info_req(info))) {
1199                 if (rc != -ENOENT) {
1200                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1201                                PNAME(&rr->rr_name));
1202                         GOTO(unlock_source, rc = -EEXIST);
1203                 }
1204                 info->mti_ver[2] = ENOENT_VERSION;
1205                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1206         }
1207
1208         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1209                       mdt_object_child(ms), &rr->rr_name, ma);
1210
1211         if (rc == 0)
1212                 mdt_counter_incr(req, LPROC_MDT_LINK);
1213
1214         EXIT;
1215 unlock_source:
1216         mdt_object_unlock(info, ms, lhs, rc);
1217 unlock_parent:
1218         mdt_object_unlock(info, mp, lhp, rc);
1219 put_source:
1220         mdt_object_put(info->mti_env, ms);
1221 put_parent:
1222         mdt_object_put(info->mti_env, mp);
1223         return rc;
1224 }
1225 /**
1226  * lock the part of the directory according to the hash of the name
1227  * (lh->mlh_pdo_hash) in parallel directory lock.
1228  */
1229 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1230                               struct mdt_lock_handle *lh,
1231                               struct mdt_object *obj, __u64 ibits,
1232                               bool cos_incompat)
1233 {
1234         struct ldlm_res_id *res = &info->mti_res_id;
1235         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1236         union ldlm_policy_data *policy = &info->mti_policy;
1237         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1238         int rc;
1239
1240         /*
1241          * Finish res_id initializing by name hash marking part of
1242          * directory which is taking modification.
1243          */
1244         LASSERT(lh->mlh_pdo_hash != 0);
1245         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1246         memset(policy, 0, sizeof(*policy));
1247         policy->l_inodebits.bits = ibits;
1248         if (cos_incompat &&
1249             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1250                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1251         /*
1252          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1253          * going to be sent to client. If it is - mdt_intent_policy() path will
1254          * fix it up and turn FL_LOCAL flag off.
1255          */
1256         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1257                           policy, res, dlmflags,
1258                           &info->mti_exp->exp_handle.h_cookie);
1259         return rc;
1260 }
1261
1262 /**
1263  * Get BFL lock for rename or migrate process.
1264  **/
1265 static int mdt_rename_lock(struct mdt_thread_info *info,
1266                            struct lustre_handle *lh)
1267 {
1268         int     rc;
1269         ENTRY;
1270
1271         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1272                 struct lu_fid *fid = &info->mti_tmp_fid1;
1273                 struct mdt_object *obj;
1274
1275                 /* XXX, right now, it has to use object API to
1276                  * enqueue lock cross MDT, so it will enqueue
1277                  * rename lock(with LUSTRE_BFL_FID) by root object */
1278                 lu_root_fid(fid);
1279                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1280                 if (IS_ERR(obj))
1281                         RETURN(PTR_ERR(obj));
1282
1283                 rc = mdt_remote_object_lock(info, obj,
1284                                             &LUSTRE_BFL_FID, lh,
1285                                             LCK_EX,
1286                                             MDS_INODELOCK_UPDATE, false);
1287                 mdt_object_put(info->mti_env, obj);
1288         } else {
1289                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1290                 union ldlm_policy_data *policy = &info->mti_policy;
1291                 struct ldlm_res_id *res_id = &info->mti_res_id;
1292                 __u64 flags = 0;
1293
1294                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1295                 memset(policy, 0, sizeof *policy);
1296                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1297                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1298                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1299                                             LDLM_IBITS, policy, LCK_EX, &flags,
1300                                             ldlm_blocking_ast,
1301                                             ldlm_completion_ast, NULL, NULL, 0,
1302                                             LVB_T_NONE,
1303                                             &info->mti_exp->exp_handle.h_cookie,
1304                                             lh);
1305                 RETURN(rc);
1306         }
1307         RETURN(rc);
1308 }
1309
1310 static void mdt_rename_unlock(struct lustre_handle *lh)
1311 {
1312         ENTRY;
1313         LASSERT(lustre_handle_is_used(lh));
1314         /* Cancel the single rename lock right away */
1315         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1316         EXIT;
1317 }
1318
1319 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1320                                                 const struct lu_fid *fid,
1321                                                 int idx)
1322 {
1323         struct mdt_object *dir;
1324         int rc;
1325
1326         ENTRY;
1327
1328         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1329         if (IS_ERR(dir))
1330                 RETURN(dir);
1331
1332         /* check early, the real version will be saved after locking */
1333         rc = mdt_version_get_check(info, dir, idx);
1334         if (rc)
1335                 GOTO(out_put, rc);
1336
1337         if (!mdt_object_exists(dir))
1338                 GOTO(out_put, rc = -ENOENT);
1339
1340         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1341                 GOTO(out_put, rc = -ENOTDIR);
1342
1343         RETURN(dir);
1344 out_put:
1345         mdt_object_put(info->mti_env, dir);
1346         return ERR_PTR(rc);
1347 }
1348
1349 /*
1350  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1351  * herein we don't really check it, just do revoke.
1352  */
1353 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1354                                   struct mdt_object *pobj,
1355                                   struct mdt_object *obj)
1356 {
1357         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1358         int rc;
1359
1360         mdt_lock_handle_init(lh);
1361         mdt_lock_reg_init(lh, LCK_EX);
1362
1363         if (mdt_object_remote(pobj)) {
1364                 /* don't bother to check if pobj and obj are on the same MDT. */
1365                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1366                                             &lh->mlh_rreg_lh, LCK_EX,
1367                                             MDS_INODELOCK_LOOKUP, false);
1368         } else if (mdt_object_remote(obj)) {
1369                 struct ldlm_res_id *res = &info->mti_res_id;
1370                 union ldlm_policy_data *policy = &info->mti_policy;
1371                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1372                                  LDLM_FL_COS_INCOMPAT;
1373
1374                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1375                 memset(policy, 0, sizeof(*policy));
1376                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1377                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1378                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1379                                   dlmflags, NULL);
1380         } else {
1381                 /* do nothing if both are local */
1382                 return 0;
1383         }
1384
1385         if (rc != ELDLM_OK)
1386                 return rc;
1387
1388         /*
1389          * TODO, currently we don't save this lock because there is no place to
1390          * hold this lock handle, but to avoid race we need to save this lock.
1391          */
1392         mdt_object_unlock(info, NULL, lh, 1);
1393
1394         return 0;
1395 }
1396
1397 /*
1398  * operation may takes locks of linkea, or directory stripes, group them in
1399  * different list.
1400  */
1401 struct mdt_sub_lock {
1402         struct mdt_object      *msl_obj;
1403         struct mdt_lock_handle  msl_lh;
1404         struct list_head        msl_linkage;
1405 };
1406
1407 static void mdt_unlock_list(struct mdt_thread_info *info,
1408                             struct list_head *list, int decref)
1409 {
1410         struct mdt_sub_lock *msl;
1411         struct mdt_sub_lock *tmp;
1412
1413         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1414                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1415                 list_del(&msl->msl_linkage);
1416                 OBD_FREE_PTR(msl);
1417         }
1418 }
1419
1420 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1421                                              struct mdt_object *obj,
1422                                              struct mdt_lock_handle *lh,
1423                                              struct ldlm_enqueue_info *einfo,
1424                                              struct list_head *slave_locks,
1425                                              int decref)
1426 {
1427         if (mdt_object_remote(obj)) {
1428                 mdt_unlock_list(info, slave_locks, decref);
1429                 mdt_object_unlock(info, obj, lh, decref);
1430         } else {
1431                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1432         }
1433 }
1434
1435 /*
1436  * lock parents of links, and also check whether total locks don't exceed
1437  * RS_MAX_LOCKS.
1438  *
1439  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1440  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1441  * \retval      -ev negative errno upon error
1442  */
1443 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1444                                  struct mdt_object *pobj,
1445                                  const struct md_attr *ma,
1446                                  struct mdt_object *obj,
1447                                  struct mdt_lock_handle *lhp,
1448                                  struct ldlm_enqueue_info *peinfo,
1449                                  struct list_head *parent_slave_locks,
1450                                  struct list_head *link_locks)
1451 {
1452         struct mdt_device *mdt = info->mti_mdt;
1453         struct lu_buf *buf = &info->mti_big_buf;
1454         struct lu_name *lname = &info->mti_name;
1455         struct linkea_data ldata = { NULL };
1456         bool blocked = false;
1457         int local_lnkp_cnt = 0;
1458         int rc;
1459
1460         ENTRY;
1461
1462         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1463                 RETURN(0);
1464
1465         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1466         if (buf->lb_buf == NULL)
1467                 RETURN(-ENOMEM);
1468
1469         ldata.ld_buf = buf;
1470         rc = mdt_links_read(info, obj, &ldata);
1471         if (rc) {
1472                 if (rc == -ENOENT || rc == -ENODATA)
1473                         rc = 0;
1474                 RETURN(rc);
1475         }
1476
1477         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1478              linkea_next_entry(&ldata)) {
1479                 struct mdt_object *lnkp;
1480                 struct mdt_sub_lock *msl;
1481                 struct lu_fid fid;
1482                 __u64 ibits;
1483
1484                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1485                                     &fid);
1486
1487                 /* check if it's also linked to parent */
1488                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1489                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1490                                PFID(&fid), PNAME(lname));
1491                         /* in case link is remote object, revoke LOOKUP lock */
1492                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1493                         continue;
1494                 }
1495
1496                 lnkp = NULL;
1497
1498                 /* check if it's linked to a stripe of parent */
1499                 if (ma->ma_valid & MA_LMV) {
1500                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1501                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1502                         int j = 0;
1503
1504                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1505                                 fid_le_to_cpu(stripe_fid,
1506                                               &lmv->lmv_stripe_fids[j]);
1507                                 if (lu_fid_eq(stripe_fid, &fid)) {
1508                                         CDEBUG(D_INFO, "skip stripe "DFID
1509                                                ", reovke "DNAME"\n",
1510                                                PFID(&fid), PNAME(lname));
1511                                         lnkp = mdt_object_find(info->mti_env,
1512                                                                mdt, &fid);
1513                                         if (IS_ERR(lnkp))
1514                                                 GOTO(out, rc = PTR_ERR(lnkp));
1515                                         break;
1516                                 }
1517                         }
1518
1519                         if (lnkp) {
1520                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1521                                                                    obj);
1522                                 mdt_object_put(info->mti_env, lnkp);
1523                                 continue;
1524                         }
1525                 }
1526
1527                 /* Check if it's already locked */
1528                 list_for_each_entry(msl, link_locks, msl_linkage) {
1529                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1530                                 CDEBUG(D_INFO,
1531                                        DFID" was locked, revoke "DNAME"\n",
1532                                        PFID(&fid), PNAME(lname));
1533                                 lnkp = msl->msl_obj;
1534                                 break;
1535                         }
1536                 }
1537
1538                 if (lnkp) {
1539                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1540                         continue;
1541                 }
1542
1543                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1544                        PFID(&fid), PNAME(lname));
1545
1546                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1547                 if (IS_ERR(lnkp)) {
1548                         CWARN("%s: cannot find obj "DFID": %ld\n",
1549                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1550                         continue;
1551                 }
1552
1553                 if (!mdt_object_exists(lnkp)) {
1554                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1555                               PFID(&fid), PNAME(lname));
1556                         mdt_object_put(info->mti_env, lnkp);
1557                         continue;
1558                 }
1559
1560                 if (!mdt_object_remote(lnkp))
1561                         local_lnkp_cnt++;
1562
1563                 OBD_ALLOC_PTR(msl);
1564                 if (msl == NULL)
1565                         GOTO(out, rc = -ENOMEM);
1566
1567                 /*
1568                  * we can't follow parent-child lock order like other MD
1569                  * operations, use lock_try here to avoid deadlock, if the lock
1570                  * cannot be taken, drop all locks taken, revoke the blocked
1571                  * one, and continue processing the remaining entries, and in
1572                  * the end of the loop restart from beginning.
1573                  */
1574                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1575                 ibits = 0;
1576                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1577                                          MDS_INODELOCK_UPDATE, true);
1578                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1579
1580                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1581                                PFID(&fid), PNAME(lname));
1582
1583                         mdt_unlock_list(info, link_locks, 1);
1584                         /* also unlock parent locks to avoid deadlock */
1585                         if (!blocked)
1586                                 mdt_migrate_object_unlock(info, pobj, lhp,
1587                                                           peinfo,
1588                                                           parent_slave_locks,
1589                                                           1);
1590
1591                         blocked = true;
1592
1593                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1594                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1595                                              MDS_INODELOCK_UPDATE);
1596                         if (rc) {
1597                                 mdt_object_put(info->mti_env, lnkp);
1598                                 OBD_FREE_PTR(msl);
1599                                 GOTO(out, rc);
1600                         }
1601
1602                         if (mdt_object_remote(lnkp)) {
1603                                 struct ldlm_lock *lock;
1604
1605                                 /*
1606                                  * for remote object, set lock cb_atomic,
1607                                  * so lock can be released in blocking_ast()
1608                                  * immediately, then the next lock_try will
1609                                  * have better chance of success.
1610                                  */
1611                                 lock = ldlm_handle2lock(
1612                                                 &msl->msl_lh.mlh_rreg_lh);
1613                                 LASSERT(lock != NULL);
1614                                 lock_res_and_lock(lock);
1615                                 ldlm_set_atomic_cb(lock);
1616                                 unlock_res_and_lock(lock);
1617                                 LDLM_LOCK_PUT(lock);
1618                         }
1619
1620                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1621                         OBD_FREE_PTR(msl);
1622                         continue;
1623                 }
1624
1625                 INIT_LIST_HEAD(&msl->msl_linkage);
1626                 msl->msl_obj = lnkp;
1627                 list_add_tail(&msl->msl_linkage, link_locks);
1628
1629                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1630         }
1631
1632         if (blocked)
1633                 GOTO(out, rc = -EBUSY);
1634
1635         EXIT;
1636 out:
1637         if (rc)
1638                 mdt_unlock_list(info, link_locks, rc);
1639         else if (local_lnkp_cnt > RS_MAX_LOCKS - 6)
1640                 /*
1641                  * parent may have 3 local objects: master object and 2 stripes
1642                  * (if it's being migrated too); source may have 2 local
1643                  * objects: master and 1 stripe; target has 1 local object.
1644                  */
1645                 rc = 1;
1646         return rc;
1647 }
1648
1649 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1650                                   struct mdt_object *obj,
1651                                   const struct md_attr *ma,
1652                                   struct list_head *slave_locks)
1653 {
1654         struct mdt_device *mdt = info->mti_mdt;
1655         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1656         struct lu_fid *fid = &info->mti_tmp_fid1;
1657         struct mdt_object *slave;
1658         struct mdt_sub_lock *msl;
1659         int i;
1660         int rc;
1661
1662         ENTRY;
1663
1664         LASSERT(mdt_object_remote(obj));
1665         LASSERT(ma->ma_valid & MA_LMV);
1666         LASSERT(lmv);
1667
1668         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
1669                 RETURN(-EINVAL);
1670
1671         if (le32_to_cpu(lmv->lmv_stripe_count) < 1)
1672                 RETURN(0);
1673
1674         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1675                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1676
1677                 if (!fid_is_sane(fid))
1678                         continue;
1679
1680                 slave = mdt_object_find(info->mti_env, mdt, fid);
1681                 if (IS_ERR(slave))
1682                         GOTO(out, rc = PTR_ERR(slave));
1683
1684                 OBD_ALLOC_PTR(msl);
1685                 if (!msl) {
1686                         mdt_object_put(info->mti_env, slave);
1687                         GOTO(out, rc = -ENOMEM);
1688                 }
1689
1690                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1691                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1692                                            MDS_INODELOCK_UPDATE, true);
1693                 if (rc) {
1694                         OBD_FREE_PTR(msl);
1695                         mdt_object_put(info->mti_env, slave);
1696                         GOTO(out, rc);
1697                 }
1698
1699                 INIT_LIST_HEAD(&msl->msl_linkage);
1700                 msl->msl_obj = slave;
1701                 list_add_tail(&msl->msl_linkage, slave_locks);
1702         }
1703         EXIT;
1704
1705 out:
1706         if (rc)
1707                 mdt_unlock_list(info, slave_locks, rc);
1708         return rc;
1709 }
1710
1711 /* lock parent and its stripes */
1712 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1713                                    struct mdt_object *obj,
1714                                    const struct md_attr *ma,
1715                                    struct mdt_lock_handle *lh,
1716                                    struct ldlm_enqueue_info *einfo,
1717                                    struct list_head *slave_locks)
1718 {
1719         int rc;
1720
1721         if (mdt_object_remote(obj)) {
1722                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1723                                             &lh->mlh_rreg_lh, LCK_PW,
1724                                             MDS_INODELOCK_UPDATE, false);
1725                 if (rc != ELDLM_OK)
1726                         return rc;
1727
1728                 /*
1729                  * if obj is remote and striped, lock its stripes explicitly
1730                  * because it's not striped in LOD layer on this MDT.
1731                  */
1732                 if (ma->ma_valid & MA_LMV) {
1733                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1734                         if (rc)
1735                                 mdt_object_unlock(info, obj, lh, rc);
1736                 }
1737         } else {
1738                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1739                                             einfo, true);
1740         }
1741
1742         return rc;
1743 }
1744
1745 /*
1746  * in migration, object may be remote, and we need take full lock of it and its
1747  * stripes if it's directory, besides, object may be a remote object on its
1748  * parent, revoke its LOOKUP lock on where its parent is located.
1749  */
1750 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1751                                    struct mdt_object *pobj,
1752                                    struct mdt_object *obj,
1753                                    struct mdt_lock_handle *lh,
1754                                    struct ldlm_enqueue_info *einfo,
1755                                    struct list_head *slave_locks)
1756 {
1757         int rc;
1758
1759         if (mdt_object_remote(obj)) {
1760                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1761                 if (rc)
1762                         return rc;
1763
1764                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1765                                             &lh->mlh_rreg_lh, LCK_EX,
1766                                             MDS_INODELOCK_FULL, false);
1767                 if (rc != ELDLM_OK)
1768                         return rc;
1769
1770                 /*
1771                  * if obj is remote and striped, lock its stripes explicitly
1772                  * because it's not striped in LOD layer on this MDT.
1773                  */
1774                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1775                         struct md_attr *ma = &info->mti_attr;
1776
1777                         ma->ma_lmv = info->mti_big_lmm;
1778                         ma->ma_lmv_size = info->mti_big_lmmsize;
1779                         ma->ma_valid = 0;
1780                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1781                         if (rc) {
1782                                 mdt_object_unlock(info, obj, lh, rc);
1783                                 return rc;
1784                         }
1785
1786                         if (ma->ma_valid & MA_LMV) {
1787                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1788                                                             slave_locks);
1789                                 if (rc)
1790                                         mdt_object_unlock(info, obj, lh, rc);
1791                         }
1792                 }
1793         } else {
1794                 if (mdt_object_remote(pobj)) {
1795                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1796                         if (rc)
1797                                 return rc;
1798                 }
1799
1800                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
1801                                             einfo, true);
1802         }
1803
1804         return rc;
1805 }
1806
1807 /*
1808  * lookup source by name, if parent is striped directory, we need to find the
1809  * corresponding stripe where source is located, and then lookup there.
1810  *
1811  * besides, if parent is migrating too, and file is already in target stripe,
1812  * this should be a redo of 'lfs migrate' on client side.
1813  */
1814 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1815                               struct mdt_object *pobj,
1816                               const struct md_attr *ma,
1817                               const struct lu_name *lname,
1818                               struct mdt_object **spobj,
1819                               struct mdt_object **sobj)
1820 {
1821         const struct lu_env *env = info->mti_env;
1822         struct lu_fid *fid = &info->mti_tmp_fid1;
1823         struct mdt_object *stripe;
1824         int rc;
1825
1826         if (ma->ma_valid & MA_LMV) {
1827                 /* if parent is striped, lookup on corresponding stripe */
1828                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1829                 __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
1830                 __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1831                 bool is_migrating = le32_to_cpu(lmv->lmv_hash_type) &
1832                                     LMV_HASH_FLAG_MIGRATION;
1833
1834                 if (is_migrating) {
1835                         hash_type = le32_to_cpu(lmv->lmv_migrate_hash);
1836                         stripe_count -= le32_to_cpu(lmv->lmv_migrate_offset);
1837                 }
1838
1839                 rc = lmv_name_to_stripe_index(hash_type, stripe_count,
1840                                               lname->ln_name,
1841                                               lname->ln_namelen);
1842                 if (rc < 0)
1843                         return rc;
1844
1845                 if (le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1846                         rc += le32_to_cpu(lmv->lmv_migrate_offset);
1847
1848                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
1849
1850                 stripe = mdt_object_find(env, info->mti_mdt, fid);
1851                 if (IS_ERR(stripe))
1852                         return PTR_ERR(stripe);
1853
1854                 fid_zero(fid);
1855                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
1856                                 &info->mti_spec);
1857                 if (rc == -ENOENT && is_migrating) {
1858                         /*
1859                          * if parent is migrating, and lookup child failed on
1860                          * source stripe, lookup again on target stripe, if it
1861                          * exists, it means previous migration was interrupted,
1862                          * and current file was migrated already.
1863                          */
1864                         mdt_object_put(env, stripe);
1865
1866                         hash_type = le32_to_cpu(lmv->lmv_hash_type);
1867                         stripe_count = le32_to_cpu(lmv->lmv_migrate_offset);
1868
1869                         rc = lmv_name_to_stripe_index(hash_type, stripe_count,
1870                                                       lname->ln_name,
1871                                                       lname->ln_namelen);
1872                         if (rc < 0)
1873                                 return rc;
1874
1875                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
1876
1877                         stripe = mdt_object_find(env, info->mti_mdt, fid);
1878                         if (IS_ERR(stripe))
1879                                 return PTR_ERR(stripe);
1880
1881                         fid_zero(fid);
1882                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
1883                                         fid, &info->mti_spec);
1884                         mdt_object_put(env, stripe);
1885                         return rc ?: -EALREADY;
1886                 } else if (rc) {
1887                         mdt_object_put(env, stripe);
1888                         return rc;
1889                 }
1890         } else {
1891                 fid_zero(fid);
1892                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
1893                                 &info->mti_spec);
1894                 if (rc)
1895                         return rc;
1896
1897                 stripe = pobj;
1898                 mdt_object_get(env, stripe);
1899         }
1900
1901         *spobj = stripe;
1902
1903         *sobj = mdt_object_find(env, info->mti_mdt, fid);
1904         if (IS_ERR(*sobj)) {
1905                 mdt_object_put(env, stripe);
1906                 rc = PTR_ERR(*sobj);
1907                 *spobj = NULL;
1908                 *sobj = NULL;
1909         }
1910
1911         return rc;
1912 }
1913
1914 /* end lease and close file for regular file */
1915 static int mdd_migrate_close(struct mdt_thread_info *info,
1916                              struct mdt_object *obj)
1917 {
1918         struct close_data *data;
1919         struct mdt_body *repbody;
1920         struct ldlm_lock *lease;
1921         int rc;
1922         int rc2;
1923
1924         rc = -EPROTO;
1925         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
1926                                       RCL_CLIENT) ||
1927             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
1928                                       RCL_CLIENT))
1929                 goto close;
1930
1931         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
1932         if (!data)
1933                 goto close;
1934
1935         rc = -ESTALE;
1936         lease = ldlm_handle2lock(&data->cd_handle);
1937         if (!lease)
1938                 goto close;
1939
1940         /* check if the lease was already canceled */
1941         lock_res_and_lock(lease);
1942         rc = ldlm_is_cancel(lease);
1943         unlock_res_and_lock(lease);
1944
1945         if (rc) {
1946                 rc = -EAGAIN;
1947                 LDLM_DEBUG(lease, DFID" lease broken",
1948                            PFID(mdt_object_fid(obj)));
1949         }
1950
1951         /*
1952          * cancel server side lease, client side counterpart should have been
1953          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
1954          */
1955         ldlm_lock_cancel(lease);
1956         ldlm_reprocess_all(lease->l_resource);
1957         LDLM_LOCK_PUT(lease);
1958
1959 close:
1960         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
1961         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1962         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
1963
1964         return rc ?: rc2;
1965 }
1966
1967 /*
1968  * migrate file in below steps:
1969  *  1. lock parent and its stripes
1970  *  2. lookup source by name
1971  *  3. lock parents of source links if source is not directory
1972  *  4. reject if source is in HSM
1973  *  5. take source open_sem and close file if source is regular file
1974  *  6. lock source and its stripes if it's directory
1975  *  7. lock target so subsequent change to it can trigger COS
1976  *  8. migrate file
1977  *  9. unlock above locks
1978  * 10. sync device if source has links
1979  */
1980 static int mdt_reint_migrate(struct mdt_thread_info *info,
1981                              struct mdt_lock_handle *unused)
1982 {
1983         const struct lu_env *env = info->mti_env;
1984         struct mdt_device *mdt = info->mti_mdt;
1985         struct ptlrpc_request *req = mdt_info_req(info);
1986         struct mdt_reint_record *rr = &info->mti_rr;
1987         struct lu_ucred *uc = mdt_ucred(info);
1988         struct md_attr *ma = &info->mti_attr;
1989         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
1990         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
1991         struct mdt_object *pobj;
1992         struct mdt_object *spobj = NULL;
1993         struct mdt_object *sobj = NULL;
1994         struct mdt_object *tobj;
1995         struct lustre_handle rename_lh = { 0 };
1996         struct mdt_lock_handle *lhp;
1997         struct mdt_lock_handle *lhs;
1998         struct mdt_lock_handle *lht;
1999         LIST_HEAD(parent_slave_locks);
2000         LIST_HEAD(child_slave_locks);
2001         LIST_HEAD(link_locks);
2002         int lock_retries = 5;
2003         bool open_sem_locked = false;
2004         bool do_sync = false;
2005         int rc;
2006         ENTRY;
2007
2008         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2009                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2010
2011         if (info->mti_dlm_req)
2012                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2013
2014         if (!fid_is_md_operative(rr->rr_fid1) ||
2015             !fid_is_md_operative(rr->rr_fid2))
2016                 RETURN(-EPERM);
2017
2018         /* don't allow migrate . or .. */
2019         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2020                 RETURN(-EBUSY);
2021
2022         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2023                 RETURN(-EPERM);
2024
2025         if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
2026             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2027             mdt->mdt_enable_remote_dir_gid != -1)
2028                 RETURN(-EPERM);
2029
2030         /*
2031          * Note: do not enqueue rename lock for replay request, because
2032          * if other MDT holds rename lock, but being blocked to wait for
2033          * this MDT to finish its recovery, and the failover MDT can not
2034          * get rename lock, which will cause deadlock.
2035          */
2036         if (!req_is_replay(req)) {
2037                 rc = mdt_rename_lock(info, &rename_lh);
2038                 if (rc != 0) {
2039                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2040                                mdt_obd_name(info->mti_mdt), rc);
2041                         RETURN(rc);
2042                 }
2043         }
2044
2045         /* pobj is master object of parent */
2046         pobj = mdt_parent_find_check(info, rr->rr_fid1, 0);
2047         if (IS_ERR(pobj))
2048                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2049
2050         if (unlikely(!info->mti_big_lmm)) {
2051                 info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
2052                 OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
2053                 if (!info->mti_big_lmm)
2054                         GOTO(put_parent, rc = -ENOMEM);
2055         }
2056
2057         ma->ma_lmv = info->mti_big_lmm;
2058         ma->ma_lmv_size = info->mti_big_lmmsize;
2059         ma->ma_valid = 0;
2060         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2061         if (rc)
2062                 GOTO(put_parent, rc);
2063
2064 lock_parent:
2065         /* lock parent object */
2066         lhp = &info->mti_lh[MDT_LH_PARENT];
2067         mdt_lock_reg_init(lhp, LCK_PW);
2068         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2069                                      &parent_slave_locks);
2070         if (rc)
2071                 GOTO(put_parent, rc);
2072
2073         /*
2074          * spobj is the corresponding stripe against name if pobj is striped
2075          * directory, which is the real parent, and no need to lock, because
2076          * we've taken full lock of pobj.
2077          */
2078         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2079         if (rc)
2080                 GOTO(unlock_parent, rc);
2081
2082         /* lock parents of source links, and revoke LOOKUP lock of links */
2083         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2084                                    &parent_slave_locks, &link_locks);
2085         if (rc == -EBUSY && lock_retries-- > 0) {
2086                 mdt_object_put(env, sobj);
2087                 mdt_object_put(env, spobj);
2088                 goto lock_parent;
2089         }
2090
2091         if (rc < 0)
2092                 GOTO(put_source, rc);
2093
2094         /*
2095          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2096          * with one request, if total lock count exceeds this limit, we will
2097          * drop all locks after migration, and synchronous device in the end.
2098          */
2099         do_sync = rc;
2100
2101         /* TODO: DoM migration is not supported yet */
2102         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2103                 ma->ma_lmm = info->mti_big_lmm;
2104                 ma->ma_lmm_size = info->mti_big_lmmsize;
2105                 ma->ma_valid = 0;
2106                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2107                 if (rc)
2108                         GOTO(put_source, rc);
2109
2110                 if (ma->ma_valid & MA_LOV &&
2111                     mdt_lmm_dom_entry(ma->ma_lmm) != LMM_NO_DOM)
2112                         GOTO(put_source, rc = -EOPNOTSUPP);
2113         }
2114
2115         /* if migration HSM is allowed */
2116         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2117                 ma->ma_need = MA_HSM;
2118                 ma->ma_valid = 0;
2119                 rc = mdt_attr_get_complex(info, sobj, ma);
2120                 if (rc)
2121                         GOTO(unlock_links, rc);
2122
2123                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2124                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2125         }
2126
2127         /* end lease and close file for regular file */
2128         if (info->mti_spec.sp_migrate_close) {
2129                 /* try to hold open_sem so that nobody else can open the file */
2130                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2131                         /* close anyway */
2132                         mdd_migrate_close(info, sobj);
2133                         GOTO(unlock_links, rc = -EBUSY);
2134                 } else {
2135                         open_sem_locked = true;
2136                         rc = mdd_migrate_close(info, sobj);
2137                         if (rc)
2138                                 GOTO(unlock_open_sem, rc);
2139                 }
2140         }
2141
2142         /* lock source */
2143         lhs = &info->mti_lh[MDT_LH_OLD];
2144         mdt_lock_reg_init(lhs, LCK_EX);
2145         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2146                                      &child_slave_locks);
2147         if (rc)
2148                 GOTO(unlock_open_sem, rc);
2149
2150         /* lock target */
2151         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2152         if (IS_ERR(tobj))
2153                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2154
2155         lht = &info->mti_lh[MDT_LH_NEW];
2156         mdt_lock_reg_init(lht, LCK_EX);
2157         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2158         if (rc)
2159                 GOTO(put_target, rc);
2160
2161         /* Don't do lookup sanity check. We know name doesn't exist. */
2162         info->mti_spec.sp_cr_lookup = 0;
2163         info->mti_spec.sp_feat = &dt_directory_features;
2164
2165         rc = mdo_migrate(env, mdt_object_child(pobj),
2166                          mdt_object_child(sobj), &rr->rr_name,
2167                          mdt_object_child(tobj), &info->mti_spec, ma);
2168         EXIT;
2169
2170         mdt_object_unlock(info, tobj, lht, rc);
2171 put_target:
2172         mdt_object_put(env, tobj);
2173 unlock_source:
2174         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2175                                   &child_slave_locks, rc);
2176 unlock_open_sem:
2177         if (open_sem_locked)
2178                 up_write(&sobj->mot_open_sem);
2179 unlock_links:
2180         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2181 put_source:
2182         mdt_object_put(env, sobj);
2183         mdt_object_put(env, spobj);
2184 unlock_parent:
2185         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2186                                   &parent_slave_locks, rc);
2187 put_parent:
2188         mdt_object_put(env, pobj);
2189 unlock_rename:
2190         if (lustre_handle_is_used(&rename_lh))
2191                 mdt_rename_unlock(&rename_lh);
2192
2193         if (!rc && do_sync)
2194                 mdt_device_sync(env, mdt);
2195
2196         return rc;
2197 }
2198
2199 static int mdt_object_lock_save(struct mdt_thread_info *info,
2200                                 struct mdt_object *dir,
2201                                 struct mdt_lock_handle *lh,
2202                                 int idx, bool cos_incompat)
2203 {
2204         int rc;
2205
2206         /* we lock the target dir if it is local */
2207         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2208                                    cos_incompat);
2209         if (rc != 0)
2210                 return rc;
2211
2212         /* get and save correct version after locking */
2213         mdt_version_get_save(info, dir, idx);
2214         return 0;
2215 }
2216
2217 /*
2218  * determine lock order of sobj and tobj
2219  *
2220  * there are two situations we need to lock tobj before sobj:
2221  * 1. sobj is child of tobj
2222  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2223  *    larger than that of tobj
2224  *
2225  * \retval      1 lock tobj before sobj
2226  * \retval      0 lock sobj before tobj
2227  * \retval      -ev negative errno upon error
2228  */
2229 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2230                                            struct mdt_object *sobj,
2231                                            struct mdt_object *tobj)
2232 {
2233         struct md_attr *ma = &info->mti_attr;
2234         struct lu_fid *spfid = &info->mti_tmp_fid1;
2235         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2236         struct lmv_mds_md_v1 *lmv;
2237         __u32 sindex;
2238         __u32 tindex;
2239         int rc;
2240
2241         /* sobj and tobj are the same */
2242         if (sobj == tobj)
2243                 return 0;
2244
2245         if (fid_is_root(mdt_object_fid(sobj)))
2246                 return 0;
2247
2248         if (fid_is_root(mdt_object_fid(tobj)))
2249                 return 1;
2250
2251         /* check whether sobj is child of tobj */
2252         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2253                            mdt_object_fid(tobj));
2254         if (rc < 0)
2255                 return rc;
2256
2257         if (rc == 1)
2258                 return 1;
2259
2260         /* check whether sobj and tobj are children of the same parent */
2261         rc = mdt_attr_get_pfid(info, sobj, spfid);
2262         if (rc)
2263                 return rc;
2264
2265         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2266         if (rc)
2267                 return rc;
2268
2269         if (!lu_fid_eq(spfid, tpfid))
2270                 return 0;
2271
2272         /* check whether sobj and tobj are sibling stripes */
2273         ma->ma_need = MA_LMV;
2274         ma->ma_valid = 0;
2275         ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
2276         ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
2277         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2278         if (rc)
2279                 return rc;
2280
2281         if (!(ma->ma_valid & MA_LMV))
2282                 return 0;
2283
2284         lmv = &ma->ma_lmv->lmv_md_v1;
2285         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2286                 return 0;
2287         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2288
2289         ma->ma_valid = 0;
2290         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2291         if (rc)
2292                 return rc;
2293
2294         if (!(ma->ma_valid & MA_LMV))
2295                 return -ENODATA;
2296
2297         lmv = &ma->ma_lmv->lmv_md_v1;
2298         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2299                 return -EINVAL;
2300         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2301
2302         /* check stripe index of sobj and tobj */
2303         if (sindex == tindex)
2304                 return -EINVAL;
2305
2306         return sindex < tindex ? 0 : 1;
2307 }
2308
2309 /*
2310  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2311  * 2 - srcdir child; 3 - tgtdir child.
2312  * Update on disk version of srcdir child.
2313  */
2314 static int mdt_reint_rename(struct mdt_thread_info *info,
2315                             struct mdt_lock_handle *unused)
2316 {
2317         struct mdt_device *mdt = info->mti_mdt;
2318         struct mdt_reint_record *rr = &info->mti_rr;
2319         struct md_attr *ma = &info->mti_attr;
2320         struct ptlrpc_request *req = mdt_info_req(info);
2321         struct mdt_object *msrcdir = NULL;
2322         struct mdt_object *mtgtdir = NULL;
2323         struct mdt_object *mold;
2324         struct mdt_object *mnew = NULL;
2325         struct lustre_handle rename_lh = { 0 };
2326         struct mdt_lock_handle *lh_srcdirp;
2327         struct mdt_lock_handle *lh_tgtdirp;
2328         struct mdt_lock_handle *lh_oldp = NULL;
2329         struct mdt_lock_handle *lh_newp = NULL;
2330         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2331         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2332         __u64 lock_ibits;
2333         bool reverse = false, discard = false;
2334         bool cos_incompat;
2335         int rc;
2336         ENTRY;
2337
2338         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2339                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2340                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2341
2342         if (info->mti_dlm_req)
2343                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2344
2345         if (!fid_is_md_operative(rr->rr_fid1) ||
2346             !fid_is_md_operative(rr->rr_fid2))
2347                 RETURN(-EPERM);
2348
2349         /* find both parents. */
2350         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2351         if (IS_ERR(msrcdir))
2352                 RETURN(PTR_ERR(msrcdir));
2353
2354         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2355
2356         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2357                 mtgtdir = msrcdir;
2358                 mdt_object_get(info->mti_env, mtgtdir);
2359         } else {
2360                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2361                 if (IS_ERR(mtgtdir))
2362                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2363         }
2364
2365         /*
2366          * Note: do not enqueue rename lock for replay request, because
2367          * if other MDT holds rename lock, but being blocked to wait for
2368          * this MDT to finish its recovery, and the failover MDT can not
2369          * get rename lock, which will cause deadlock.
2370          */
2371         if (!req_is_replay(req)) {
2372                 /*
2373                  * Normally rename RPC is handled on the MDT with the target
2374                  * directory (if target exists, it's on the MDT with the
2375                  * target), if the source directory is remote, it's a hint that
2376                  * source is remote too (this may not be true, but it won't
2377                  * cause any issue), return -EXDEV early to avoid taking
2378                  * rename_lock.
2379                  */
2380                 if (!mdt->mdt_enable_remote_rename &&
2381                     mdt_object_remote(msrcdir))
2382                         GOTO(out_put_tgtdir, rc = -EXDEV);
2383
2384                 rc = mdt_rename_lock(info, &rename_lh);
2385                 if (rc != 0) {
2386                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2387                                mdt_obd_name(mdt), rc);
2388                         GOTO(out_put_tgtdir, rc);
2389                 }
2390         }
2391
2392         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2393         if (rc < 0)
2394                 GOTO(out_unlock_rename, rc);
2395
2396         reverse = rc;
2397
2398         /* source needs to be looked up after locking source parent, otherwise
2399          * this rename may race with unlink source, and cause rename hang, see
2400          * sanityn.sh 55b, so check parents first, if later we found source is
2401          * remote, relock parents. */
2402         cos_incompat = (mdt_object_remote(msrcdir) ||
2403                         mdt_object_remote(mtgtdir));
2404
2405         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2406
2407         /* lock parents in the proper order. */
2408         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2409         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2410
2411 relock:
2412         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2413         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2414
2415         if (reverse) {
2416                 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2417                                           cos_incompat);
2418                 if (rc)
2419                         GOTO(out_unlock_rename, rc);
2420
2421                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2422
2423                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2424                                           cos_incompat);
2425                 if (rc != 0) {
2426                         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2427                         GOTO(out_unlock_rename, rc);
2428                 }
2429         } else {
2430                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2431                                           cos_incompat);
2432                 if (rc)
2433                         GOTO(out_unlock_rename, rc);
2434
2435                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2436
2437                 if (mtgtdir != msrcdir) {
2438                         rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2439                                                   cos_incompat);
2440                 } else if (!mdt_object_remote(mtgtdir) &&
2441                            lh_srcdirp->mlh_pdo_hash !=
2442                            lh_tgtdirp->mlh_pdo_hash) {
2443                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2444                                                 MDS_INODELOCK_UPDATE,
2445                                                 cos_incompat);
2446                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2447                 }
2448                 if (rc != 0) {
2449                         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2450                         GOTO(out_unlock_rename, rc);
2451                 }
2452         }
2453
2454         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2455         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2456
2457         /* find mold object. */
2458         fid_zero(old_fid);
2459         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2460         if (rc != 0)
2461                 GOTO(out_unlock_parents, rc);
2462
2463         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2464                 GOTO(out_unlock_parents, rc = -EINVAL);
2465
2466         if (!fid_is_md_operative(old_fid))
2467                 GOTO(out_unlock_parents, rc = -EPERM);
2468
2469         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2470         if (IS_ERR(mold))
2471                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2472
2473         if (!mdt_object_exists(mold)) {
2474                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2475                                 &mold->mot_obj,
2476                                 "object does not exist");
2477                 GOTO(out_put_old, rc = -ENOENT);
2478         }
2479
2480         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2481                 GOTO(out_put_old, rc = -EXDEV);
2482
2483         /* Check if @mtgtdir is subdir of @mold, before locking child
2484          * to avoid reverse locking. */
2485         if (mtgtdir != msrcdir) {
2486                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2487                                    old_fid);
2488                 if (rc) {
2489                         if (rc == 1)
2490                                 rc = -EINVAL;
2491                         GOTO(out_put_old, rc);
2492                 }
2493         }
2494
2495         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2496         /* save version after locking */
2497         mdt_version_get_save(info, mold, 2);
2498
2499         if (!cos_incompat && mdt_object_remote(mold)) {
2500                 cos_incompat = true;
2501                 mdt_object_put(info->mti_env, mold);
2502                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2503                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2504                 goto relock;
2505         }
2506
2507         /* find mnew object:
2508          * mnew target object may not exist now
2509          * lookup with version checking */
2510         fid_zero(new_fid);
2511         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2512                                       3);
2513         if (rc == 0) {
2514                 /* the new_fid should have been filled at this moment */
2515                 if (lu_fid_eq(old_fid, new_fid))
2516                         GOTO(out_put_old, rc);
2517
2518                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2519                     lu_fid_eq(new_fid, rr->rr_fid2))
2520                         GOTO(out_put_old, rc = -EINVAL);
2521
2522                 if (!fid_is_md_operative(new_fid))
2523                         GOTO(out_put_old, rc = -EPERM);
2524
2525                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2526                 if (IS_ERR(mnew))
2527                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2528
2529                 if (!mdt_object_exists(mnew)) {
2530                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2531                                         &mnew->mot_obj,
2532                                         "object does not exist");
2533                         GOTO(out_put_new, rc = -ENOENT);
2534                 }
2535
2536                 if (mdt_object_remote(mnew)) {
2537                         struct mdt_body  *repbody;
2538
2539                         /* Always send rename req to the target child MDT */
2540                         repbody = req_capsule_server_get(info->mti_pill,
2541                                                          &RMF_MDT_BODY);
2542                         LASSERT(repbody != NULL);
2543                         repbody->mbo_fid1 = *new_fid;
2544                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2545                         GOTO(out_put_new, rc = -EXDEV);
2546                 }
2547                 /* Before locking the target dir, check we do not replace
2548                  * a dir with a non-dir, otherwise it may deadlock with
2549                  * link op which tries to create a link in this dir
2550                  * back to this non-dir. */
2551                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2552                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2553                         GOTO(out_put_new, rc = -EISDIR);
2554
2555                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2556                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2557                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2558                 if (mdt_object_remote(msrcdir)) {
2559                         /* Enqueue lookup lock from the parent MDT */
2560                         rc = mdt_remote_object_lock(info, msrcdir,
2561                                                     mdt_object_fid(mold),
2562                                                     &lh_oldp->mlh_rreg_lh,
2563                                                     lh_oldp->mlh_rreg_mode,
2564                                                     MDS_INODELOCK_LOOKUP,
2565                                                     false);
2566                         if (rc != ELDLM_OK)
2567                                 GOTO(out_put_new, rc);
2568
2569                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2570                 }
2571
2572                 rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
2573                                            cos_incompat);
2574                 if (rc != 0)
2575                         GOTO(out_unlock_old, rc);
2576
2577                 /* Check if @msrcdir is subdir of @mnew, before locking child
2578                  * to avoid reverse locking. */
2579                 if (mtgtdir != msrcdir) {
2580                         rc = mdo_is_subdir(info->mti_env,
2581                                            mdt_object_child(msrcdir), new_fid);
2582                         if (rc) {
2583                                 if (rc == 1)
2584                                         rc = -EINVAL;
2585                                 GOTO(out_unlock_old, rc);
2586                         }
2587                 }
2588
2589                 /* We used to acquire MDS_INODELOCK_FULL here but we
2590                  * can't do this now because a running HSM restore on
2591                  * the rename onto victim will hold the layout
2592                  * lock. See LU-4002. */
2593
2594                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2595                 mdt_lock_reg_init(lh_newp, LCK_EX);
2596                 rc = mdt_reint_object_lock(info, mnew, lh_newp,
2597                                            MDS_INODELOCK_LOOKUP |
2598                                            MDS_INODELOCK_UPDATE,
2599                                            cos_incompat);
2600                 if (rc != 0)
2601                         GOTO(out_unlock_old, rc);
2602
2603                 /* get and save version after locking */
2604                 mdt_version_get_save(info, mnew, 3);
2605         } else if (rc != -EREMOTE && rc != -ENOENT) {
2606                 GOTO(out_put_old, rc);
2607         } else {
2608                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2609                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2610                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2611                 if (mdt_object_remote(msrcdir)) {
2612                         /* Enqueue lookup lock from the parent MDT */
2613                         rc = mdt_remote_object_lock(info, msrcdir,
2614                                                     mdt_object_fid(mold),
2615                                                     &lh_oldp->mlh_rreg_lh,
2616                                                     lh_oldp->mlh_rreg_mode,
2617                                                     MDS_INODELOCK_LOOKUP,
2618                                                     false);
2619                         if (rc != ELDLM_OK)
2620                                 GOTO(out_put_old, rc);
2621
2622                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2623                 }
2624
2625                 rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
2626                                            cos_incompat);
2627                 if (rc != 0)
2628                         GOTO(out_unlock_old, rc);
2629
2630                 mdt_enoent_version_save(info, 3);
2631         }
2632
2633         /* step 5: rename it */
2634         mdt_reint_init_ma(info, ma);
2635
2636         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2637                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2638
2639         if (mnew != NULL)
2640                 mutex_lock(&mnew->mot_lov_mutex);
2641
2642         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2643                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2644                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2645                         &rr->rr_tgt_name, ma);
2646
2647         if (mnew != NULL)
2648                 mutex_unlock(&mnew->mot_lov_mutex);
2649
2650         /* handle last link of tgt object */
2651         if (rc == 0) {
2652                 mdt_counter_incr(req, LPROC_MDT_RENAME);
2653                 if (mnew) {
2654                         mdt_handle_last_unlink(info, mnew, ma);
2655                         discard = mdt_dom_check_for_discard(info, mnew);
2656                 }
2657                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2658                                          msrcdir, mtgtdir);
2659         }
2660
2661         EXIT;
2662         if (mnew != NULL)
2663                 mdt_object_unlock(info, mnew, lh_newp, rc);
2664 out_unlock_old:
2665         mdt_object_unlock(info, mold, lh_oldp, rc);
2666 out_put_new:
2667         if (mnew && !discard)
2668                 mdt_object_put(info->mti_env, mnew);
2669 out_put_old:
2670         mdt_object_put(info->mti_env, mold);
2671 out_unlock_parents:
2672         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2673         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2674 out_unlock_rename:
2675         if (lustre_handle_is_used(&rename_lh))
2676                 mdt_rename_unlock(&rename_lh);
2677 out_put_tgtdir:
2678         mdt_object_put(info->mti_env, mtgtdir);
2679 out_put_srcdir:
2680         mdt_object_put(info->mti_env, msrcdir);
2681
2682         /* The DoM discard can be done right in the place above where it is
2683          * assigned, meanwhile it is done here after rename unlock due to
2684          * compatibility with old clients, for them the discard blocks
2685          * the main thread until completion. Check LU-11359 for details.
2686          */
2687         if (discard) {
2688                 mdt_dom_discard_data(info, mnew);
2689                 mdt_object_put(info->mti_env, mnew);
2690         }
2691         return rc;
2692 }
2693
2694 static int mdt_reint_resync(struct mdt_thread_info *info,
2695                             struct mdt_lock_handle *lhc)
2696 {
2697         struct mdt_reint_record *rr = &info->mti_rr;
2698         struct ptlrpc_request   *req = mdt_info_req(info);
2699         struct md_attr          *ma = &info->mti_attr;
2700         struct mdt_object       *mo;
2701         struct ldlm_lock        *lease;
2702         struct mdt_body         *repbody;
2703         struct md_layout_change  layout = { .mlc_mirror_id = rr->rr_mirror_id };
2704         bool                     lease_broken;
2705         int                      rc, rc2;
2706         ENTRY;
2707
2708         DEBUG_REQ(D_INODE, req, DFID": FLR file resync\n", PFID(rr->rr_fid1));
2709
2710         if (info->mti_dlm_req)
2711                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2712
2713         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2714         if (IS_ERR(mo))
2715                 GOTO(out, rc = PTR_ERR(mo));
2716
2717         if (!mdt_object_exists(mo))
2718                 GOTO(out_obj, rc = -ENOENT);
2719
2720         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2721                 GOTO(out_obj, rc = -EINVAL);
2722
2723         if (mdt_object_remote(mo))
2724                 GOTO(out_obj, rc = -EREMOTE);
2725
2726         lease = ldlm_handle2lock(rr->rr_lease_handle);
2727         if (lease == NULL)
2728                 GOTO(out_obj, rc = -ESTALE);
2729
2730         /* It's really necessary to grab open_sem and check if the lease lock
2731          * has been lost. There would exist a concurrent writer coming in and
2732          * generating some dirty data in memory cache, the writeback would fail
2733          * after the layout version is increased by MDS_REINT_RESYNC RPC. */
2734         if (!down_write_trylock(&mo->mot_open_sem))
2735                 GOTO(out_put_lease, rc = -EBUSY);
2736
2737         lock_res_and_lock(lease);
2738         lease_broken = ldlm_is_cancel(lease);
2739         unlock_res_and_lock(lease);
2740         if (lease_broken)
2741                 GOTO(out_unlock, rc = -EBUSY);
2742
2743         /* the file has yet opened by anyone else after we took the lease. */
2744         layout.mlc_opc = MD_LAYOUT_RESYNC;
2745         rc = mdt_layout_change(info, mo, &layout);
2746         if (rc)
2747                 GOTO(out_unlock, rc);
2748
2749         ma->ma_need = MA_INODE;
2750         ma->ma_valid = 0;
2751         rc = mdt_attr_get_complex(info, mo, ma);
2752         if (rc != 0)
2753                 GOTO(out_unlock, rc);
2754
2755         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2756         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2757
2758         EXIT;
2759 out_unlock:
2760         up_write(&mo->mot_open_sem);
2761 out_put_lease:
2762         LDLM_LOCK_PUT(lease);
2763 out_obj:
2764         mdt_object_put(info->mti_env, mo);
2765 out:
2766         mdt_client_compatibility(info);
2767         rc2 = mdt_fix_reply(info);
2768         if (rc == 0)
2769                 rc = rc2;
2770         return rc;
2771 }
2772
2773 struct mdt_reinter {
2774         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
2775         enum lprocfs_extra_opc mr_extra_opc;
2776 };
2777
2778 static const struct mdt_reinter mdt_reinters[] = {
2779         [REINT_SETATTR] = {
2780                 .mr_handler = &mdt_reint_setattr,
2781                 .mr_extra_opc = MDS_REINT_SETATTR,
2782         },
2783         [REINT_CREATE] = {
2784                 .mr_handler = &mdt_reint_create,
2785                 .mr_extra_opc = MDS_REINT_CREATE,
2786         },
2787         [REINT_LINK] = {
2788                 .mr_handler = &mdt_reint_link,
2789                 .mr_extra_opc = MDS_REINT_LINK,
2790         },
2791         [REINT_UNLINK] = {
2792                 .mr_handler = &mdt_reint_unlink,
2793                 .mr_extra_opc = MDS_REINT_UNLINK,
2794         },
2795         [REINT_RENAME] = {
2796                 .mr_handler = &mdt_reint_rename,
2797                 .mr_extra_opc = MDS_REINT_RENAME,
2798         },
2799         [REINT_OPEN] = {
2800                 .mr_handler = &mdt_reint_open,
2801                 .mr_extra_opc = MDS_REINT_OPEN,
2802         },
2803         [REINT_SETXATTR] = {
2804                 .mr_handler = &mdt_reint_setxattr,
2805                 .mr_extra_opc = MDS_REINT_SETXATTR,
2806         },
2807         [REINT_RMENTRY] = {
2808                 .mr_handler = &mdt_reint_unlink,
2809                 .mr_extra_opc = MDS_REINT_UNLINK,
2810         },
2811         [REINT_MIGRATE] = {
2812                 .mr_handler = &mdt_reint_migrate,
2813                 .mr_extra_opc = MDS_REINT_RENAME,
2814         },
2815         [REINT_RESYNC] = {
2816                 .mr_handler = &mdt_reint_resync,
2817                 .mr_extra_opc = MDS_REINT_RESYNC,
2818         },
2819 };
2820
2821 int mdt_reint_rec(struct mdt_thread_info *info,
2822                   struct mdt_lock_handle *lhc)
2823 {
2824         const struct mdt_reinter *mr;
2825         int rc;
2826         ENTRY;
2827
2828         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
2829                 RETURN(-EPROTO);
2830
2831         mr = &mdt_reinters[info->mti_rr.rr_opcode];
2832         if (mr->mr_handler == NULL)
2833                 RETURN(-EPROTO);
2834
2835         rc = (*mr->mr_handler)(info, lhc);
2836
2837         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
2838                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
2839
2840         RETURN(rc);
2841 }