Whamcloud - gitweb
LU-4423 lnet: fix deadloop in ksocknal_push
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_reint.c
37  *
38  * Lustre Metadata Target (mdt) reintegration routines
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Huang Hua <huanghua@clusterfs.com>
44  * Author: Yury Umanets <umka@clusterfs.com>
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lprocfs_status.h>
50 #include "mdt_internal.h"
51 #include <lustre_lmv.h>
52
53 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
54                                      struct md_attr *ma)
55 {
56         ma->ma_need = MA_INODE;
57         ma->ma_valid = 0;
58 }
59
60 /**
61  * Get version of object by fid.
62  *
63  * Return real version or ENOENT_VERSION if object doesn't exist
64  */
65 static void mdt_obj_version_get(struct mdt_thread_info *info,
66                                 struct mdt_object *o, __u64 *version)
67 {
68         LASSERT(o);
69         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
70             !fid_is_obf(mdt_object_fid(o)))
71                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
72         else
73                 *version = ENOENT_VERSION;
74         CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
75                PFID(mdt_object_fid(o)), *version);
76 }
77
78 /**
79  * Check version is correct.
80  *
81  * Should be called only during replay.
82  */
83 static int mdt_version_check(struct ptlrpc_request *req,
84                              __u64 version, int idx)
85 {
86         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
87         ENTRY;
88
89         if (!exp_connect_vbr(req->rq_export))
90                 RETURN(0);
91
92         LASSERT(req_is_replay(req));
93         /** VBR: version is checked always because costs nothing */
94         LASSERT(idx < PTLRPC_NUM_VERSIONS);
95         /** Sanity check for malformed buffers */
96         if (pre_ver == NULL) {
97                 CERROR("No versions in request buffer\n");
98                 spin_lock(&req->rq_export->exp_lock);
99                 req->rq_export->exp_vbr_failed = 1;
100                 spin_unlock(&req->rq_export->exp_lock);
101                 RETURN(-EOVERFLOW);
102         } else if (pre_ver[idx] != version) {
103                 CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
104                        pre_ver[idx], version);
105                 spin_lock(&req->rq_export->exp_lock);
106                 req->rq_export->exp_vbr_failed = 1;
107                 spin_unlock(&req->rq_export->exp_lock);
108                 RETURN(-EOVERFLOW);
109         }
110         RETURN(0);
111 }
112
113 /**
114  * Save pre-versions in reply.
115  */
116 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
117                              int idx)
118 {
119         __u64 *reply_ver;
120
121         if (!exp_connect_vbr(req->rq_export))
122                 return;
123
124         LASSERT(!req_is_replay(req));
125         LASSERT(req->rq_repmsg != NULL);
126         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
127         if (reply_ver)
128                 reply_ver[idx] = version;
129 }
130
131 /**
132  * Save enoent version, it is needed when it is obvious that object doesn't
133  * exist, e.g. child during create.
134  */
135 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
136 {
137         /* save version of file name for replay, it must be ENOENT here */
138         if (!req_is_replay(mdt_info_req(info))) {
139                 info->mti_ver[idx] = ENOENT_VERSION;
140                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
141         }
142 }
143
144 /**
145  * Get version from disk and save in reply buffer.
146  *
147  * Versions are saved in reply only during normal operations not replays.
148  */
149 void mdt_version_get_save(struct mdt_thread_info *info,
150                           struct mdt_object *mto, int idx)
151 {
152         /* don't save versions during replay */
153         if (!req_is_replay(mdt_info_req(info))) {
154                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
155                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
156         }
157 }
158
159 /**
160  * Get version from disk and check it, no save in reply.
161  */
162 int mdt_version_get_check(struct mdt_thread_info *info,
163                           struct mdt_object *mto, int idx)
164 {
165         /* only check versions during replay */
166         if (!req_is_replay(mdt_info_req(info)))
167                 return 0;
168
169         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
170         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
171 }
172
173 /**
174  * Get version from disk and check if recovery or just save.
175  */
176 int mdt_version_get_check_save(struct mdt_thread_info *info,
177                                struct mdt_object *mto, int idx)
178 {
179         int rc = 0;
180
181         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
182         if (req_is_replay(mdt_info_req(info)))
183                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
184                                        idx);
185         else
186                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
187         return rc;
188 }
189
190 /**
191  * Lookup with version checking.
192  *
193  * This checks version of 'name'. Many reint functions uses 'name' for child not
194  * FID, therefore we need to get object by name and check its version.
195  */
196 static int mdt_lookup_version_check(struct mdt_thread_info *info,
197                                     struct mdt_object *p,
198                                     const struct lu_name *lname,
199                                     struct lu_fid *fid, int idx)
200 {
201         int rc, vbrc;
202
203         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
204                         &info->mti_spec);
205         /* Check version only during replay */
206         if (!req_is_replay(mdt_info_req(info)))
207                 return rc;
208
209         info->mti_ver[idx] = ENOENT_VERSION;
210         if (rc == 0) {
211                 struct mdt_object *child;
212                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
213                 if (likely(!IS_ERR(child))) {
214                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
215                         mdt_object_put(info->mti_env, child);
216                 }
217         }
218         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
219         return vbrc ? vbrc : rc;
220
221 }
222
223 /**
224  * mdt_remote_permission: Check whether the remote operation is permitted,
225  *
226  * Before we implement async cross-MDT updates (DNE phase 2). There are a few
227  * limitations here:
228  *
229  * 1.Only sysadmin can create remote directory and striped directory and
230  *   migrate directory now, unless
231  *   lctl set_param mdt.*.enable_remote_dir_gid=allow_gid.
232  * 2.Remote directory can only be created on MDT0, unless
233  *   lctl set_param mdt.*.enable_remote_dir = 1
234  * 3.Only new clients can access remote dir( >= 2.4) and striped dir(>= 2.6),
235  *   old client will return -ENOTSUPP.
236  *
237  * XXX these check are only needed for remote synchronization, once async
238  * update is supported, these check will be removed.
239  *
240  * param[in]info:       execution environment.
241  * param[in]parent:     the directory of this operation.
242  * param[in]child:      the child of this operation.
243  *
244  * retval       = 0 remote operation is allowed.
245  *              < 0 remote operation is denied.
246  */
247 static int mdt_remote_permission(struct mdt_thread_info *info,
248                                  struct mdt_object *parent,
249                                  struct mdt_object *child)
250 {
251         struct mdt_device       *mdt = info->mti_mdt;
252         struct lu_ucred         *uc  = mdt_ucred(info);
253         struct md_op_spec       *spec = &info->mti_spec;
254         struct lu_attr          *attr = &info->mti_attr.ma_attr;
255         struct obd_export       *exp = mdt_info_req(info)->rq_export;
256
257         /* Only check create remote directory, striped directory and
258          * migration */
259         if (mdt_object_remote(parent) == 0 && mdt_object_remote(child) == 0 &&
260             !(S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
261                                         spec->u.sp_ea.eadatalen != 0) &&
262             info->mti_rr.rr_opcode != REINT_MIGRATE)
263                 return 0;
264
265         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
266                 if (uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
267                     mdt->mdt_enable_remote_dir_gid != -1)
268                         return -EPERM;
269         }
270
271         if (!mdt_is_dne_client(exp))
272                 return -ENOTSUPP;
273
274         if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
275             spec->u.sp_ea.eadatalen != 0) {
276                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
277
278                 if (le32_to_cpu(lum->lum_stripe_count) > 1 &&
279                     !mdt_is_striped_client(exp))
280                         return -ENOTSUPP;
281         }
282
283         return 0;
284 }
285
286 /*
287  * VBR: we save three versions in reply:
288  * 0 - parent. Check that parent version is the same during replay.
289  * 1 - name. Version of 'name' if file exists with the same name or
290  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
291  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
292  * check.
293  */
294 static int mdt_md_create(struct mdt_thread_info *info)
295 {
296         struct mdt_device       *mdt = info->mti_mdt;
297         struct mdt_object       *parent;
298         struct mdt_object       *child;
299         struct mdt_lock_handle  *lh;
300         struct mdt_body         *repbody;
301         struct md_attr          *ma = &info->mti_attr;
302         struct mdt_reint_record *rr = &info->mti_rr;
303         int rc;
304         ENTRY;
305
306         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  ("DNAME"->"DFID") "
307                   "in "DFID,
308                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
309
310         if (!fid_is_md_operative(rr->rr_fid1))
311                 RETURN(-EPERM);
312
313         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
314
315         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
316         if (IS_ERR(parent))
317                 RETURN(PTR_ERR(parent));
318
319         if (!mdt_object_exists(parent))
320                 GOTO(put_parent, rc = -ENOENT);
321
322         lh = &info->mti_lh[MDT_LH_PARENT];
323         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
324         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
325         if (rc)
326                 GOTO(put_parent, rc);
327
328         if (!mdt_object_remote(parent)) {
329                 rc = mdt_version_get_check_save(info, parent, 0);
330                 if (rc)
331                         GOTO(unlock_parent, rc);
332         }
333
334         /*
335          * Check child name version during replay.
336          * During create replay a file may exist with same name.
337          */
338         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
339                                       &info->mti_tmp_fid1, 1);
340         if (rc == 0)
341                 GOTO(unlock_parent, rc = -EEXIST);
342
343         /* -ENOENT is expected here */
344         if (rc != -ENOENT)
345                 GOTO(unlock_parent, rc);
346
347         /* save version of file name for replay, it must be ENOENT here */
348         mdt_enoent_version_save(info, 1);
349
350         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
351         if (likely(!IS_ERR(child))) {
352                 struct md_object *next = mdt_object_child(parent);
353
354                 rc = mdt_remote_permission(info, parent, child);
355                 if (rc != 0)
356                         GOTO(out_put_child, rc);
357
358                 ma->ma_need = MA_INODE;
359                 ma->ma_valid = 0;
360
361                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
362                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
363
364                 /* Version of child will be updated on disk. */
365                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
366                 rc = mdt_version_get_check_save(info, child, 2);
367                 if (rc)
368                         GOTO(out_put_child, rc);
369
370                 /* Let lower layer know current lock mode. */
371                 info->mti_spec.sp_cr_mode =
372                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
373
374                 /*
375                  * Do not perform lookup sanity check. We know that name does
376                  * not exist.
377                  */
378                 info->mti_spec.sp_cr_lookup = 0;
379                 info->mti_spec.sp_feat = &dt_directory_features;
380
381                 rc = mdo_create(info->mti_env, next, &rr->rr_name,
382                                 mdt_object_child(child), &info->mti_spec, ma);
383                 if (rc == 0)
384                         rc = mdt_attr_get_complex(info, child, ma);
385
386                 if (rc == 0) {
387                         /* Return fid & attr to client. */
388                         if (ma->ma_valid & MA_INODE)
389                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
390                                                    mdt_object_fid(child));
391                 }
392 out_put_child:
393                 mdt_object_put(info->mti_env, child);
394         } else {
395                 rc = PTR_ERR(child);
396         }
397 unlock_parent:
398         mdt_object_unlock(info, parent, lh, rc);
399 put_parent:
400         mdt_object_put(info->mti_env, parent);
401         RETURN(rc);
402 }
403
404 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
405                              struct mdt_object *obj, __u64 ibits,
406                              struct mdt_lock_handle *s0_lh,
407                              struct mdt_object *s0_obj,
408                              struct ldlm_enqueue_info *einfo)
409 {
410         ldlm_policy_data_t      *policy = &mti->mti_policy;
411         int                     rc;
412         ENTRY;
413
414         if (!S_ISDIR(obj->mot_header.loh_attr))
415                 RETURN(0);
416
417         /* Unlock stripe 0 */
418         if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
419                 LASSERT(s0_obj != NULL);
420                 mdt_object_unlock_put(mti, s0_obj, s0_lh, 1);
421         }
422
423         memset(policy, 0, sizeof(*policy));
424         policy->l_inodebits.bits = ibits;
425
426         rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
427                               policy);
428         RETURN(rc);
429 }
430
431 /**
432  * Lock slave stripes if necessary, the lock handles of slave stripes
433  * will be stored in einfo->ei_cbdata.
434  **/
435 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
436                            ldlm_mode_t mode, __u64 ibits,
437                            struct mdt_lock_handle *s0_lh,
438                            struct mdt_object **s0_objp,
439                            struct ldlm_enqueue_info *einfo)
440 {
441         ldlm_policy_data_t      *policy = &mti->mti_policy;
442         struct lu_buf           *buf = &mti->mti_buf;
443         struct lmv_mds_md_v1    *lmv;
444         struct lu_fid           *fid = &mti->mti_tmp_fid1;
445         int rc;
446         ENTRY;
447
448         if (!S_ISDIR(obj->mot_header.loh_attr))
449                 RETURN(0);
450
451         buf->lb_buf = mti->mti_xattr_buf;
452         buf->lb_len = sizeof(mti->mti_xattr_buf);
453         rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
454                           XATTR_NAME_LMV);
455         if (rc == -ERANGE) {
456                 rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
457                 if (rc > 0) {
458                         buf->lb_buf = mti->mti_big_lmm;
459                         buf->lb_len = mti->mti_big_lmmsize;
460                 }
461         }
462
463         if (rc == -ENODATA || rc == -ENOENT)
464                 RETURN(0);
465
466         if (rc <= 0)
467                 RETURN(rc);
468
469         lmv = buf->lb_buf;
470         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
471                 RETURN(-EINVAL);
472
473         /* Sigh, 0_stripe and master object are different
474          * object, though they are in the same MDT, to avoid
475          * adding osd_object_lock here, so we will enqueue the
476          * stripe0 lock in MDT0 for now */
477         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
478         *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits);
479         if (IS_ERR(*s0_objp))
480                 RETURN(PTR_ERR(*s0_objp));
481
482         memset(einfo, 0, sizeof(*einfo));
483         einfo->ei_type = LDLM_IBITS;
484         einfo->ei_mode = mode;
485         einfo->ei_cb_bl = mdt_remote_blocking_ast;
486         einfo->ei_cb_cp = ldlm_completion_ast;
487         einfo->ei_enq_slave = 1;
488         memset(policy, 0, sizeof(*policy));
489         policy->l_inodebits.bits = ibits;
490
491         rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
492                             policy);
493         RETURN(rc);
494 }
495
496 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
497                         struct md_attr *ma)
498 {
499         struct mdt_lock_handle  *lh;
500         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
501         __u64 lockpart = MDS_INODELOCK_UPDATE;
502         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
503         struct mdt_lock_handle  *s0_lh;
504         struct mdt_object       *s0_obj = NULL;
505         int rc;
506         ENTRY;
507
508         lh = &info->mti_lh[MDT_LH_PARENT];
509         mdt_lock_reg_init(lh, LCK_PW);
510
511         /* Even though the new MDT will grant PERM lock to the old
512          * client, but the old client will almost ignore that during
513          * So it needs to revoke both LOOKUP and PERM lock here, so
514          * both new and old client can cancel the dcache */
515         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
516                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
517
518         rc = mdt_object_lock(info, mo, lh, lockpart);
519         if (rc != 0)
520                 RETURN(rc);
521
522         s0_lh = &info->mti_lh[MDT_LH_LOCAL];
523         mdt_lock_reg_init(s0_lh, LCK_PW);
524         rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo);
525         if (rc != 0)
526                 GOTO(out_unlock, rc);
527
528         /* all attrs are packed into mti_attr in unpack_setattr */
529         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
530                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
531
532         /* This is only for set ctime when rename's source is on remote MDS. */
533         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
534                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
535
536         /* VBR: update version if attr changed are important for recovery */
537         if (do_vbr) {
538                 /* update on-disk version of changed object */
539                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
540                 rc = mdt_version_get_check_save(info, mo, 0);
541                 if (rc)
542                         GOTO(out_unlock, rc);
543         }
544
545         /* Ensure constant striping during chown(). See LU-2789. */
546         if (ma->ma_attr.la_valid & (LA_UID|LA_GID))
547                 mutex_lock(&mo->mot_lov_mutex);
548
549         /* all attrs are packed into mti_attr in unpack_setattr */
550         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
551
552         if (ma->ma_attr.la_valid & (LA_UID|LA_GID))
553                 mutex_unlock(&mo->mot_lov_mutex);
554
555         if (rc != 0)
556                 GOTO(out_unlock, rc);
557
558         EXIT;
559 out_unlock:
560         mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo);
561         mdt_object_unlock(info, mo, lh, rc);
562         return rc;
563 }
564
565 /**
566  * Check HSM flags and add HS_DIRTY flag if relevant.
567  *
568  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
569  * and is not RELEASED.
570  */
571 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
572                         struct md_attr *ma)
573 {
574         int rc;
575         ENTRY;
576
577         /* If the file was modified, add the dirty flag */
578         ma->ma_need = MA_HSM;
579         rc = mdt_attr_get_complex(info, mo, ma);
580         if (rc) {
581                 CERROR("file attribute read error for "DFID": %d.\n",
582                         PFID(mdt_object_fid(mo)), rc);
583                 RETURN(rc);
584         }
585
586         /* If an up2date copy exists in the backend, add dirty flag */
587         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
588             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
589                 struct mdt_lock_handle  *lh = &info->mti_lh[MDT_LH_CHILD];
590
591                 ma->ma_hsm.mh_flags |= HS_DIRTY;
592
593                 mdt_lock_reg_init(lh, LCK_PW);
594                 rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR);
595                 if (rc != 0)
596                         RETURN(rc);
597
598                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
599                 if (rc)
600                         CERROR("file attribute change error for "DFID": %d\n",
601                                 PFID(mdt_object_fid(mo)), rc);
602                 mdt_object_unlock(info, mo, lh, rc);
603         }
604
605         RETURN(rc);
606 }
607
608 static int mdt_reint_setattr(struct mdt_thread_info *info,
609                              struct mdt_lock_handle *lhc)
610 {
611         struct md_attr          *ma = &info->mti_attr;
612         struct mdt_reint_record *rr = &info->mti_rr;
613         struct ptlrpc_request   *req = mdt_info_req(info);
614         struct mdt_object       *mo;
615         struct mdt_body         *repbody;
616         int                      rc, rc2;
617         ENTRY;
618
619         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
620                   (unsigned int)ma->ma_attr.la_valid);
621
622         if (info->mti_dlm_req)
623                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
624
625         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
626         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
627         if (IS_ERR(mo))
628                 GOTO(out, rc = PTR_ERR(mo));
629
630         if (!mdt_object_exists(mo))
631                 GOTO(out_put, rc = -ENOENT);
632
633         if (mdt_object_remote(mo))
634                 GOTO(out_put, rc = -EREMOTE);
635
636         if ((ma->ma_attr.la_valid & LA_SIZE) ||
637             (rr->rr_flags & MRF_OPEN_TRUNC)) {
638                 /* Check write access for the O_TRUNC case */
639                 if (mdt_write_read(mo) < 0)
640                         GOTO(out_put, rc = -ETXTBSY);
641         }
642
643         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
644                 if (ma->ma_valid & MA_LOV)
645                         GOTO(out_put, rc = -EPROTO);
646
647                 rc = mdt_attr_set(info, mo, ma);
648                 if (rc)
649                         GOTO(out_put, rc);
650         } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
651                 struct lu_buf *buf  = &info->mti_buf;
652
653                 if (ma->ma_attr.la_valid != 0)
654                         GOTO(out_put, rc = -EPROTO);
655
656                 buf->lb_buf = ma->ma_lmm;
657                 buf->lb_len = ma->ma_lmm_size;
658                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
659                                   buf, XATTR_NAME_LOV, 0);
660                 if (rc)
661                         GOTO(out_put, rc);
662         } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) {
663                 struct lu_buf *buf  = &info->mti_buf;
664                 struct mdt_lock_handle  *lh;
665
666                 if (ma->ma_attr.la_valid != 0)
667                         GOTO(out_put, rc = -EPROTO);
668
669                 lh = &info->mti_lh[MDT_LH_PARENT];
670                 mdt_lock_reg_init(lh, LCK_PW);
671
672                 rc = mdt_object_lock(info, mo, lh,
673                                      MDS_INODELOCK_XATTR);
674                 if (rc != 0)
675                         GOTO(out_put, rc);
676
677                 buf->lb_buf = ma->ma_lmv;
678                 buf->lb_len = ma->ma_lmv_size;
679                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
680                                   buf, XATTR_NAME_DEFAULT_LMV, 0);
681
682                 mdt_object_unlock(info, mo, lh, rc);
683                 if (rc)
684                         GOTO(out_put, rc);
685         } else {
686                 GOTO(out_put, rc = -EPROTO);
687         }
688
689         /* If file data is modified, add the dirty flag */
690         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
691                 rc = mdt_add_dirty_flag(info, mo, ma);
692
693         ma->ma_need = MA_INODE;
694         ma->ma_valid = 0;
695         rc = mdt_attr_get_complex(info, mo, ma);
696         if (rc != 0)
697                 GOTO(out_put, rc);
698
699         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
700
701         EXIT;
702 out_put:
703         mdt_object_put(info->mti_env, mo);
704 out:
705         if (rc == 0)
706                 mdt_counter_incr(req, LPROC_MDT_SETATTR);
707
708         mdt_client_compatibility(info);
709         rc2 = mdt_fix_reply(info);
710         if (rc == 0)
711                 rc = rc2;
712         return rc;
713 }
714
715 static int mdt_reint_create(struct mdt_thread_info *info,
716                             struct mdt_lock_handle *lhc)
717 {
718         struct ptlrpc_request   *req = mdt_info_req(info);
719         int                     rc;
720         ENTRY;
721
722         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
723                 RETURN(err_serious(-ESTALE));
724
725         if (info->mti_dlm_req)
726                 ldlm_request_cancel(mdt_info_req(info),
727                                     info->mti_dlm_req, 0, LATF_SKIP);
728
729         if (!lu_name_is_valid(&info->mti_rr.rr_name))
730                 RETURN(-EPROTO);
731
732         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
733         case S_IFDIR:
734                 mdt_counter_incr(req, LPROC_MDT_MKDIR);
735                 break;
736         case S_IFREG:
737         case S_IFLNK:
738         case S_IFCHR:
739         case S_IFBLK:
740         case S_IFIFO:
741         case S_IFSOCK:
742                 /* Special file should stay on the same node as parent. */
743                 mdt_counter_incr(req, LPROC_MDT_MKNOD);
744                 break;
745         default:
746                 CERROR("%s: Unsupported mode %o\n",
747                        mdt_obd_name(info->mti_mdt),
748                        info->mti_attr.ma_attr.la_mode);
749                 RETURN(err_serious(-EOPNOTSUPP));
750         }
751
752         rc = mdt_md_create(info);
753         RETURN(rc);
754 }
755
756 /*
757  * VBR: save parent version in reply and child version getting by its name.
758  * Version of child is getting and checking during its lookup. If
759  */
760 static int mdt_reint_unlink(struct mdt_thread_info *info,
761                             struct mdt_lock_handle *lhc)
762 {
763         struct mdt_reint_record *rr = &info->mti_rr;
764         struct ptlrpc_request   *req = mdt_info_req(info);
765         struct md_attr          *ma = &info->mti_attr;
766         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
767         struct mdt_object       *mp;
768         struct mdt_object       *mc;
769         struct mdt_lock_handle  *parent_lh;
770         struct mdt_lock_handle  *child_lh;
771         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
772         struct mdt_lock_handle  *s0_lh = NULL;
773         struct mdt_object       *s0_obj = NULL;
774         __u64                   lock_ibits;
775         int                     rc;
776         int                     no_name = 0;
777         ENTRY;
778
779         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
780                   PNAME(&rr->rr_name));
781
782         if (info->mti_dlm_req)
783                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
784
785         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
786                 RETURN(err_serious(-ENOENT));
787
788         if (!fid_is_md_operative(rr->rr_fid1))
789                 RETURN(-EPERM);
790
791         /*
792          * step 1: Found the parent.
793          */
794         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
795         if (IS_ERR(mp)) {
796                 rc = PTR_ERR(mp);
797                 GOTO(out, rc);
798         }
799
800         parent_lh = &info->mti_lh[MDT_LH_PARENT];
801         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
802         rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE);
803         if (rc != 0)
804                 GOTO(put_parent, rc);
805
806         if (!mdt_object_remote(mp)) {
807                 rc = mdt_version_get_check_save(info, mp, 0);
808                 if (rc)
809                         GOTO(unlock_parent, rc);
810         }
811
812         /* step 2: find & lock the child */
813         /* lookup child object along with version checking */
814         fid_zero(child_fid);
815         rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
816         if (rc != 0) {
817                 /* Name might not be able to find during resend of
818                  * remote unlink, considering following case.
819                  * dir_A is a remote directory, the name entry of
820                  * dir_A is on MDT0, the directory is on MDT1,
821                  *
822                  * 1. client sends unlink req to MDT1.
823                  * 2. MDT1 sends name delete update to MDT0.
824                  * 3. name entry is being deleted in MDT0 synchronously.
825                  * 4. MDT1 is restarted.
826                  * 5. client resends unlink req to MDT1. So it can not
827                  *    find the name entry on MDT0 anymore.
828                  * In this case, MDT1 only needs to destory the local
829                  * directory.
830                  * */
831                 if (mdt_object_remote(mp) && rc == -ENOENT &&
832                     !fid_is_zero(rr->rr_fid2) &&
833                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
834                         no_name = 1;
835                         *child_fid = *rr->rr_fid2;
836                  } else {
837                         GOTO(unlock_parent, rc);
838                  }
839         }
840
841         if (!fid_is_md_operative(child_fid))
842                 GOTO(unlock_parent, rc = -EPERM);
843
844         /* We will lock the child regardless it is local or remote. No harm. */
845         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
846         if (IS_ERR(mc))
847                 GOTO(unlock_parent, rc = PTR_ERR(mc));
848
849         child_lh = &info->mti_lh[MDT_LH_CHILD];
850         mdt_lock_reg_init(child_lh, LCK_EX);
851         if (mdt_object_remote(mc)) {
852                 struct mdt_body  *repbody;
853
854                 if (!fid_is_zero(rr->rr_fid2)) {
855                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
856                                mdt_obd_name(info->mti_mdt),
857                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
858                         GOTO(put_child, rc = -ENOENT);
859                 }
860                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
861                        mdt_obd_name(info->mti_mdt),
862                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
863
864                 if (!mdt_is_dne_client(req->rq_export))
865                         /* Return -ENOTSUPP for old client */
866                         GOTO(put_child, rc = -ENOTSUPP);
867
868                 if (info->mti_spec.sp_rm_entry) {
869                         struct lu_ucred *uc  = mdt_ucred(info);
870
871                         if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
872                                 CERROR("%s: unlink remote entry is only "
873                                        "permitted for administrator: rc = %d\n",
874                                         mdt_obd_name(info->mti_mdt),
875                                         -EPERM);
876                                 GOTO(put_child, rc = -EPERM);
877                         }
878
879                         ma->ma_need = MA_INODE;
880                         ma->ma_valid = 0;
881                         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
882                                         NULL, &rr->rr_name, ma, no_name);
883                         GOTO(put_child, rc);
884                 }
885                 /* Revoke the LOOKUP lock of the remote object granted by
886                  * this MDT. Since the unlink will happen on another MDT,
887                  * it will release the LOOKUP lock right away. Then What
888                  * would happen if another client try to grab the LOOKUP
889                  * lock at the same time with unlink XXX */
890                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
891                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
892                 LASSERT(repbody != NULL);
893                 repbody->mbo_fid1 = *mdt_object_fid(mc);
894                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
895                 GOTO(unlock_child, rc = -EREMOTE);
896         } else if (info->mti_spec.sp_rm_entry) {
897                 rc = -EPERM;
898                 CDEBUG(D_INFO, "%s: no rm_entry on local dir '"DNAME"': "
899                        "rc = %d\n",
900                        mdt_obd_name(info->mti_mdt), PNAME(&rr->rr_name), rc);
901                 GOTO(put_child, rc);
902         }
903
904         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
905          * this now because a running HSM restore on the child (unlink
906          * victim) will hold the layout lock. See LU-4002. */
907         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
908         if (mdt_object_remote(mp)) {
909                 /* Enqueue lookup lock from parent MDT */
910                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
911                                             &child_lh->mlh_rreg_lh,
912                                             child_lh->mlh_rreg_mode,
913                                             MDS_INODELOCK_LOOKUP);
914                 if (rc != ELDLM_OK)
915                         GOTO(put_child, rc);
916
917                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
918         }
919
920         rc = mdt_object_lock(info, mc, child_lh, lock_ibits);
921         if (rc != 0)
922                 GOTO(put_child, rc);
923         /*
924          * Now we can only make sure we need MA_INODE, in mdd layer, will check
925          * whether need MA_LOV and MA_COOKIE.
926          */
927         ma->ma_need = MA_INODE;
928         ma->ma_valid = 0;
929
930         s0_lh = &info->mti_lh[MDT_LH_LOCAL];
931         mdt_lock_reg_init(s0_lh, LCK_EX);
932         rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh,
933                              &s0_obj, einfo);
934         if (rc != 0)
935                 GOTO(unlock_child, rc);
936
937         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
938                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
939         /* save version when object is locked */
940         mdt_version_get_save(info, mc, 1);
941
942         mutex_lock(&mc->mot_lov_mutex);
943
944         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
945                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
946
947         mutex_unlock(&mc->mot_lov_mutex);
948
949         if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
950                 rc = mdt_attr_get_complex(info, mc, ma);
951         if (rc == 0)
952                 mdt_handle_last_unlink(info, mc, ma);
953
954         if (ma->ma_valid & MA_INODE) {
955                 switch (ma->ma_attr.la_mode & S_IFMT) {
956                 case S_IFDIR:
957                         mdt_counter_incr(req, LPROC_MDT_RMDIR);
958                         break;
959                 case S_IFREG:
960                 case S_IFLNK:
961                 case S_IFCHR:
962                 case S_IFBLK:
963                 case S_IFIFO:
964                 case S_IFSOCK:
965                         mdt_counter_incr(req, LPROC_MDT_UNLINK);
966                         break;
967                 default:
968                         LASSERTF(0, "bad file type %o unlinking\n",
969                                  ma->ma_attr.la_mode);
970                 }
971         }
972
973         EXIT;
974
975 unlock_child:
976         mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo);
977         mdt_object_unlock(info, mc, child_lh, rc);
978 put_child:
979         mdt_object_put(info->mti_env, mc);
980 unlock_parent:
981         mdt_object_unlock(info, mp, parent_lh, rc);
982 put_parent:
983         mdt_object_put(info->mti_env, mp);
984 out:
985         return rc;
986 }
987
988 /*
989  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
990  * name.
991  */
992 static int mdt_reint_link(struct mdt_thread_info *info,
993                           struct mdt_lock_handle *lhc)
994 {
995         struct mdt_reint_record *rr = &info->mti_rr;
996         struct ptlrpc_request   *req = mdt_info_req(info);
997         struct md_attr          *ma = &info->mti_attr;
998         struct mdt_object       *ms;
999         struct mdt_object       *mp;
1000         struct mdt_lock_handle  *lhs;
1001         struct mdt_lock_handle  *lhp;
1002         int rc;
1003         ENTRY;
1004
1005         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1006                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1007
1008         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1009                 RETURN(err_serious(-ENOENT));
1010
1011         if (info->mti_dlm_req)
1012                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1013
1014         /* Invalid case so return error immediately instead of
1015          * processing it */
1016         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1017                 RETURN(-EPERM);
1018
1019         if (!fid_is_md_operative(rr->rr_fid1) ||
1020             !fid_is_md_operative(rr->rr_fid2))
1021                 RETURN(-EPERM);
1022
1023         /* step 1: find & lock the target parent dir */
1024         lhp = &info->mti_lh[MDT_LH_PARENT];
1025         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1026         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
1027                                   MDS_INODELOCK_UPDATE);
1028         if (IS_ERR(mp))
1029                 RETURN(PTR_ERR(mp));
1030
1031         rc = mdt_version_get_check_save(info, mp, 0);
1032         if (rc)
1033                 GOTO(out_unlock_parent, rc);
1034
1035         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1036
1037         /* step 2: find & lock the source */
1038         lhs = &info->mti_lh[MDT_LH_CHILD];
1039         mdt_lock_reg_init(lhs, LCK_EX);
1040
1041         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1042         if (IS_ERR(ms))
1043                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
1044
1045         if (!mdt_object_exists(ms)) {
1046                 mdt_object_put(info->mti_env, ms);
1047                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1048                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1049                 GOTO(out_unlock_parent, rc = -ENOENT);
1050         }
1051
1052         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE |
1053                              MDS_INODELOCK_XATTR);
1054         if (rc != 0) {
1055                 mdt_object_put(info->mti_env, ms);
1056                 GOTO(out_unlock_parent, rc);
1057         }
1058
1059         /* step 3: link it */
1060         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1061                        OBD_FAIL_MDS_REINT_LINK_WRITE);
1062
1063         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1064         rc = mdt_version_get_check_save(info, ms, 1);
1065         if (rc)
1066                 GOTO(out_unlock_child, rc);
1067
1068         /** check target version by name during replay */
1069         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1070                                       &info->mti_tmp_fid1, 2);
1071         if (rc != 0 && rc != -ENOENT)
1072                 GOTO(out_unlock_child, rc);
1073         /* save version of file name for replay, it must be ENOENT here */
1074         if (!req_is_replay(mdt_info_req(info))) {
1075                 if (rc != -ENOENT) {
1076                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1077                                PNAME(&rr->rr_name));
1078                         GOTO(out_unlock_child, rc = -EEXIST);
1079                 }
1080                 info->mti_ver[2] = ENOENT_VERSION;
1081                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1082         }
1083
1084         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1085               mdt_object_child(ms), &rr->rr_name, ma);
1086
1087         if (rc == 0)
1088                 mdt_counter_incr(req, LPROC_MDT_LINK);
1089
1090         EXIT;
1091 out_unlock_child:
1092         mdt_object_unlock_put(info, ms, lhs, rc);
1093 out_unlock_parent:
1094         mdt_object_unlock_put(info, mp, lhp, rc);
1095         return rc;
1096 }
1097 /**
1098  * lock the part of the directory according to the hash of the name
1099  * (lh->mlh_pdo_hash) in parallel directory lock.
1100  */
1101 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1102                               struct mdt_lock_handle *lh,
1103                               struct mdt_object *obj, __u64 ibits)
1104 {
1105         struct ldlm_res_id *res = &info->mti_res_id;
1106         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1107         ldlm_policy_data_t *policy = &info->mti_policy;
1108         int rc;
1109
1110         /*
1111          * Finish res_id initializing by name hash marking part of
1112          * directory which is taking modification.
1113          */
1114         LASSERT(lh->mlh_pdo_hash != 0);
1115         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1116         memset(policy, 0, sizeof(*policy));
1117         policy->l_inodebits.bits = ibits;
1118         /*
1119          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1120          * going to be sent to client. If it is - mdt_intent_policy() path will
1121          * fix it up and turn FL_LOCAL flag off.
1122          */
1123         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
1124                           res, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
1125                           &info->mti_exp->exp_handle.h_cookie);
1126         return rc;
1127 }
1128
1129 /**
1130  * Get BFL lock for rename or migrate process.
1131  **/
1132 static int mdt_rename_lock(struct mdt_thread_info *info,
1133                            struct lustre_handle *lh)
1134 {
1135         int     rc;
1136         ENTRY;
1137
1138         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1139                 struct lu_fid *fid = &info->mti_tmp_fid1;
1140                 struct mdt_object *obj;
1141
1142                 /* XXX, right now, it has to use object API to
1143                  * enqueue lock cross MDT, so it will enqueue
1144                  * rename lock(with LUSTRE_BFL_FID) by root object */
1145                 lu_root_fid(fid);
1146                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1147                 if (IS_ERR(obj))
1148                         RETURN(PTR_ERR(obj));
1149
1150                 rc = mdt_remote_object_lock(info, obj,
1151                                             &LUSTRE_BFL_FID, lh,
1152                                             LCK_EX,
1153                                             MDS_INODELOCK_UPDATE);
1154                 mdt_object_put(info->mti_env, obj);
1155         } else {
1156                 struct ldlm_namespace   *ns = info->mti_mdt->mdt_namespace;
1157                 ldlm_policy_data_t      *policy = &info->mti_policy;
1158                 struct ldlm_res_id      *res_id = &info->mti_res_id;
1159                 __u64                   flags = 0;
1160
1161                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1162                 memset(policy, 0, sizeof *policy);
1163                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1164                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1165                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
1166                                            LCK_EX, &flags, ldlm_blocking_ast,
1167                                            ldlm_completion_ast, NULL, NULL, 0,
1168                                            LVB_T_NONE,
1169                                            &info->mti_exp->exp_handle.h_cookie,
1170                                            lh);
1171                 RETURN(rc);
1172         }
1173         RETURN(rc);
1174 }
1175
1176 static void mdt_rename_unlock(struct lustre_handle *lh)
1177 {
1178         ENTRY;
1179         LASSERT(lustre_handle_is_used(lh));
1180         /* Cancel the single rename lock right away */
1181         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1182         EXIT;
1183 }
1184
1185 /*
1186  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
1187  * target. Source should not be ancestor of target dir. May be other rename
1188  * checks can be moved here later.
1189  */
1190 static int mdt_is_subdir(struct mdt_thread_info *info,
1191                          struct mdt_object *dir,
1192                          const struct lu_fid *fid)
1193 {
1194         struct lu_fid dir_fid = dir->mot_header.loh_fid;
1195         int rc = 0;
1196         ENTRY;
1197
1198         /* If the source and target are in the same directory, they can not
1199          * be parent/child relationship, so subdir check is not needed */
1200         if (lu_fid_eq(&dir_fid, fid))
1201                 return 0;
1202
1203         if (!mdt_object_exists(dir))
1204                 RETURN(-ENOENT);
1205
1206         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir),
1207                            fid, &dir_fid);
1208         if (rc < 0) {
1209                 CERROR("%s: failed subdir check in "DFID" for "DFID
1210                        ": rc = %d\n", mdt_obd_name(info->mti_mdt),
1211                        PFID(&dir_fid), PFID(fid), rc);
1212                 /* Return EINVAL only if a parent is the @fid */
1213                 if (rc == -EINVAL)
1214                         rc = -EIO;
1215         } else {
1216                 /* check the found fid */
1217                 if (lu_fid_eq(&dir_fid, fid))
1218                         rc = -EINVAL;
1219         }
1220
1221         RETURN(rc);
1222 }
1223
1224 /* Update object linkEA */
1225 struct mdt_lock_list {
1226         struct mdt_object       *mll_obj;
1227         struct mdt_lock_handle  mll_lh;
1228         struct list_head        mll_list;
1229 };
1230
1231 static void mdt_unlock_list(struct mdt_thread_info *info,
1232                             struct list_head *list, int rc)
1233 {
1234         struct mdt_lock_list *mll;
1235         struct mdt_lock_list *mll2;
1236
1237         list_for_each_entry_safe(mll, mll2, list, mll_list) {
1238                 mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
1239                 list_del(&mll->mll_list);
1240                 OBD_FREE_PTR(mll);
1241         }
1242 }
1243
1244 static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
1245                                       struct mdt_object *obj,
1246                                       struct mdt_object *pobj,
1247                                       struct list_head *lock_list)
1248 {
1249         struct lu_buf           *buf = &info->mti_big_buf;
1250         struct linkea_data      ldata = { NULL };
1251         int                     count;
1252         int                     rc;
1253         ENTRY;
1254
1255         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1256                 RETURN(0);
1257
1258         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
1259         if (buf->lb_buf == NULL)
1260                 RETURN(-ENOMEM);
1261
1262         ldata.ld_buf = buf;
1263         rc = mdt_links_read(info, obj, &ldata);
1264         if (rc != 0) {
1265                 if (rc == -ENOENT || rc == -ENODATA)
1266                         rc = 0;
1267                 RETURN(rc);
1268         }
1269
1270         LASSERT(ldata.ld_leh != NULL);
1271         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
1272         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
1273                 struct mdt_device *mdt = info->mti_mdt;
1274                 struct mdt_object *mdt_pobj;
1275                 struct mdt_lock_list *mll;
1276                 struct lu_name name;
1277                 struct lu_fid  fid;
1278
1279                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
1280                                     &name, &fid);
1281                 ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
1282                                                          ldata.ld_reclen);
1283                 mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
1284                 if (IS_ERR(mdt_pobj)) {
1285                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
1286                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
1287                         continue;
1288                 }
1289
1290                 if (!mdt_object_exists(mdt_pobj)) {
1291                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
1292                               mdt_obd_name(mdt), PFID(&fid));
1293                         mdt_object_put(info->mti_env, mdt_pobj);
1294                         continue;
1295                 }
1296
1297                 if (mdt_pobj == pobj) {
1298                         CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
1299                                mdt_obd_name(mdt), PFID(&fid));
1300                         mdt_object_put(info->mti_env, mdt_pobj);
1301                         continue;
1302                 }
1303
1304                 OBD_ALLOC_PTR(mll);
1305                 if (mll == NULL) {
1306                         mdt_object_put(info->mti_env, mdt_pobj);
1307                         GOTO(out, rc = -ENOMEM);
1308                 }
1309
1310                 mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
1311                 rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
1312                                      MDS_INODELOCK_UPDATE);
1313                 if (rc != 0) {
1314                         CERROR("%s: cannot lock "DFID": rc =%d\n",
1315                                mdt_obd_name(mdt), PFID(&fid), rc);
1316                         mdt_object_put(info->mti_env, mdt_pobj);
1317                         OBD_FREE_PTR(mll);
1318                         GOTO(out, rc);
1319                 }
1320
1321                 INIT_LIST_HEAD(&mll->mll_list);
1322                 mll->mll_obj = mdt_pobj;
1323                 list_add_tail(&mll->mll_list, lock_list);
1324         }
1325 out:
1326         if (rc != 0)
1327                 mdt_unlock_list(info, lock_list, rc);
1328         RETURN(rc);
1329 }
1330
1331 /* migrate files from one MDT to another MDT */
1332 static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
1333                                       struct mdt_lock_handle *lhc)
1334 {
1335         struct mdt_reint_record *rr = &info->mti_rr;
1336         struct md_attr          *ma = &info->mti_attr;
1337         struct mdt_object       *msrcdir;
1338         struct mdt_object       *mold;
1339         struct mdt_object       *mnew = NULL;
1340         struct mdt_lock_handle  *lh_dirp;
1341         struct mdt_lock_handle  *lh_childp;
1342         struct mdt_lock_handle  *lh_tgtp = NULL;
1343         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1344         struct list_head        lock_list;
1345         __u64                   lock_ibits;
1346         int                     rc;
1347         ENTRY;
1348
1349         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
1350                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
1351
1352         /* 1: lock the source dir. */
1353         msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1354         if (IS_ERR(msrcdir)) {
1355                 CERROR("%s: cannot find source dir "DFID" : rc = %d\n",
1356                         mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
1357                         (int)PTR_ERR(msrcdir));
1358                 RETURN(PTR_ERR(msrcdir));
1359         }
1360
1361         lh_dirp = &info->mti_lh[MDT_LH_PARENT];
1362         mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
1363         rc = mdt_object_lock(info, msrcdir, lh_dirp,
1364                              MDS_INODELOCK_UPDATE);
1365         if (rc)
1366                 GOTO(out_put_parent, rc);
1367
1368         if (!mdt_object_remote(msrcdir)) {
1369                 rc = mdt_version_get_check_save(info, msrcdir, 0);
1370                 if (rc)
1371                         GOTO(out_unlock_parent, rc);
1372         }
1373
1374         /* 2: sanity check and find the object to be migrated. */
1375         fid_zero(old_fid);
1376         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
1377         if (rc != 0)
1378                 GOTO(out_unlock_parent, rc);
1379
1380         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1381                 GOTO(out_unlock_parent, rc = -EINVAL);
1382
1383         if (!fid_is_md_operative(old_fid))
1384                 GOTO(out_unlock_parent, rc = -EPERM);
1385
1386         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1387         if (IS_ERR(mold))
1388                 GOTO(out_unlock_parent, rc = PTR_ERR(mold));
1389
1390         if (mdt_object_remote(mold)) {
1391                 CERROR("%s: source "DFID" is on the remote MDT\n",
1392                        mdt_obd_name(info->mti_mdt), PFID(old_fid));
1393                 GOTO(out_put_child, rc = -EREMOTE);
1394         }
1395
1396         if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
1397             !mdt_object_remote(msrcdir)) {
1398                 CERROR("%s: parent "DFID" is still on the same"
1399                        " MDT, which should be migrated first:"
1400                        " rc = %d\n", mdt_obd_name(info->mti_mdt),
1401                        PFID(mdt_object_fid(msrcdir)), -EPERM);
1402                 GOTO(out_put_child, rc = -EPERM);
1403         }
1404
1405         rc = mdt_remote_permission(info, msrcdir, mold);
1406         if (rc != 0)
1407                 GOTO(out_put_child, rc);
1408
1409         /* 3: iterate the linkea of the object and lock all of the objects */
1410         INIT_LIST_HEAD(&lock_list);
1411         rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
1412         if (rc != 0)
1413                 GOTO(out_put_child, rc);
1414
1415         /* 4: lock of the object migrated object */
1416         lh_childp = &info->mti_lh[MDT_LH_OLD];
1417         mdt_lock_reg_init(lh_childp, LCK_EX);
1418         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
1419                      MDS_INODELOCK_LAYOUT;
1420         if (mdt_object_remote(msrcdir)) {
1421                 /* Enqueue lookup lock from the parent MDT */
1422                 rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
1423                                             &lh_childp->mlh_rreg_lh,
1424                                             lh_childp->mlh_rreg_mode,
1425                                             MDS_INODELOCK_LOOKUP);
1426                 if (rc != ELDLM_OK)
1427                         GOTO(out_unlock_list, rc);
1428
1429                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1430         }
1431
1432         rc = mdt_object_lock(info, mold, lh_childp, lock_ibits);
1433         if (rc != 0)
1434                 GOTO(out_unlock_child, rc);
1435
1436         ma->ma_need = MA_LMV;
1437         ma->ma_valid = 0;
1438         ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
1439         ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
1440         rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
1441         if (rc != 0)
1442                 GOTO(out_unlock_child, rc);
1443
1444         if ((ma->ma_valid & MA_LMV)) {
1445                 struct lmv_mds_md_v1 *lmm1;
1446
1447                 lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
1448                 lmm1 = &ma->ma_lmv->lmv_md_v1;
1449                 if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) {
1450                         CERROR("%s: can not migrate striped dir "DFID
1451                                ": rc = %d\n", mdt_obd_name(info->mti_mdt),
1452                                PFID(mdt_object_fid(mold)), -EPERM);
1453                         GOTO(out_unlock_child, rc = -EPERM);
1454                 }
1455
1456                 if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
1457                         GOTO(out_unlock_child, rc = -EINVAL);
1458
1459                 mnew = mdt_object_find(info->mti_env, info->mti_mdt,
1460                                        &lmm1->lmv_stripe_fids[1]);
1461                 if (IS_ERR(mnew))
1462                         GOTO(out_unlock_child, rc = PTR_ERR(mnew));
1463
1464                 if (!mdt_object_remote(mnew)) {
1465                         CERROR("%s: "DFID" being migrated is on this MDT:"
1466                                " rc  = %d\n", mdt_obd_name(info->mti_mdt),
1467                                PFID(rr->rr_fid2), -EPERM);
1468                         GOTO(out_put_new, rc = -EPERM);
1469                 }
1470
1471                 lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
1472                 mdt_lock_reg_init(lh_tgtp, LCK_EX);
1473                 rc = mdt_remote_object_lock(info, mnew,
1474                                             mdt_object_fid(mnew),
1475                                             &lh_tgtp->mlh_rreg_lh,
1476                                             lh_tgtp->mlh_rreg_mode,
1477                                             MDS_INODELOCK_UPDATE);
1478                 if (rc != 0) {
1479                         lh_tgtp = NULL;
1480                         GOTO(out_put_new, rc);
1481                 }
1482         } else {
1483                 mnew = mdt_object_find(info->mti_env, info->mti_mdt,
1484                                        rr->rr_fid2);
1485                 if (IS_ERR(mnew))
1486                         GOTO(out_unlock_child, rc = PTR_ERR(mnew));
1487                 if (!mdt_object_remote(mnew)) {
1488                         CERROR("%s: Migration "DFID" is on this MDT:"
1489                                " rc = %d\n", mdt_obd_name(info->mti_mdt),
1490                                PFID(rr->rr_fid2), -EXDEV);
1491                         GOTO(out_put_new, rc = -EXDEV);
1492                 }
1493         }
1494
1495         /* 5: migrate it */
1496         mdt_reint_init_ma(info, ma);
1497
1498         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1499                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1500
1501         rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
1502                          mdt_object_child(mold), &rr->rr_name,
1503                          mdt_object_child(mnew), ma);
1504         if (rc != 0)
1505                 GOTO(out_unlock_new, rc);
1506 out_unlock_new:
1507         if (lh_tgtp != NULL)
1508                 mdt_object_unlock(info, mnew, lh_tgtp, rc);
1509 out_put_new:
1510         if (mnew)
1511                 mdt_object_put(info->mti_env, mnew);
1512 out_unlock_child:
1513         mdt_object_unlock(info, mold, lh_childp, rc);
1514 out_unlock_list:
1515         mdt_unlock_list(info, &lock_list, rc);
1516 out_put_child:
1517         mdt_object_put(info->mti_env, mold);
1518 out_unlock_parent:
1519         mdt_object_unlock(info, msrcdir, lh_dirp, rc);
1520 out_put_parent:
1521         mdt_object_put(info->mti_env, msrcdir);
1522
1523         RETURN(rc);
1524 }
1525
1526 static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info,
1527                                                 const struct lu_fid *fid,
1528                                                 int idx)
1529 {
1530         struct mdt_object *dir;
1531         int rc;
1532         ENTRY;
1533
1534         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1535         if (IS_ERR(dir))
1536                 RETURN(dir);
1537
1538         /* check early, the real version will be saved after locking */
1539         rc = mdt_version_get_check(info, dir, idx);
1540         if (rc)
1541                 GOTO(out_put, rc);
1542
1543         RETURN(dir);
1544 out_put:
1545         mdt_object_put(info->mti_env, dir);
1546         return ERR_PTR(rc);
1547 }
1548
1549 static int mdt_object_lock_save(struct mdt_thread_info *info,
1550                                 struct mdt_object *dir,
1551                                 struct mdt_lock_handle *lh,
1552                                 int idx)
1553 {
1554         int rc;
1555
1556         /* we lock the target dir if it is local */
1557         rc = mdt_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE);
1558         if (rc != 0)
1559                 return rc;
1560
1561         /* get and save correct version after locking */
1562         mdt_version_get_save(info, dir, idx);
1563         return 0;
1564 }
1565
1566
1567 static int mdt_rename_parents_lock(struct mdt_thread_info *info,
1568                                    struct mdt_object **srcp,
1569                                    struct mdt_object **tgtp)
1570 {
1571         struct mdt_reint_record *rr = &info->mti_rr;
1572         const struct lu_fid     *fid_src = rr->rr_fid1;
1573         const struct lu_fid     *fid_tgt = rr->rr_fid2;
1574         struct mdt_lock_handle  *lh_src = &info->mti_lh[MDT_LH_PARENT];
1575         struct mdt_lock_handle  *lh_tgt = &info->mti_lh[MDT_LH_CHILD];
1576         struct mdt_object       *src;
1577         struct mdt_object       *tgt;
1578         int                      reverse = 0;
1579         int                      rc;
1580         ENTRY;
1581
1582         /* find both parents. */
1583         src = mdt_object_find_check(info, fid_src, 0);
1584         if (IS_ERR(src))
1585                 RETURN(PTR_ERR(src));
1586
1587         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1588
1589         if (lu_fid_eq(fid_src, fid_tgt)) {
1590                 tgt = src;
1591                 mdt_object_get(info->mti_env, tgt);
1592         } else {
1593                 /* Check if the @src is not a child of the @tgt, otherwise a
1594                  * reverse locking must take place. */
1595                 rc = mdt_is_subdir(info, src, fid_tgt);
1596                 if (rc == -EINVAL)
1597                         reverse = 1;
1598                 else if (rc)
1599                         GOTO(err_src_put, rc);
1600
1601                 tgt = mdt_object_find_check(info, fid_tgt, 1);
1602                 if (IS_ERR(tgt))
1603                         GOTO(err_src_put, rc = PTR_ERR(tgt));
1604         }
1605
1606         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
1607
1608         /* lock parents in the proper order. */
1609         if (reverse) {
1610                 rc = mdt_object_lock_save(info, tgt, lh_tgt, 1);
1611                 if (rc)
1612                         GOTO(err_tgt_put, rc);
1613
1614                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
1615
1616                 rc = mdt_object_lock_save(info, src, lh_src, 0);
1617         } else {
1618                 rc = mdt_object_lock_save(info, src, lh_src, 0);
1619                 if (rc)
1620                         GOTO(err_tgt_put, rc);
1621
1622                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
1623
1624                 if (tgt != src)
1625                         rc = mdt_object_lock_save(info, tgt, lh_tgt, 1);
1626                 else if (lh_src->mlh_pdo_hash != lh_tgt->mlh_pdo_hash) {
1627                         rc = mdt_pdir_hash_lock(info, lh_tgt, tgt,
1628                                                 MDS_INODELOCK_UPDATE);
1629                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
1630                 }
1631         }
1632         if (rc)
1633                 GOTO(err_unlock, rc);
1634
1635         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
1636
1637         *srcp = src;
1638         *tgtp = tgt;
1639         RETURN(0);
1640
1641 err_unlock:
1642         /* The order does not matter as the handle is checked inside,
1643          * as well as not used handle. */
1644         mdt_object_unlock(info, src, lh_src, rc);
1645         mdt_object_unlock(info, tgt, lh_tgt, rc);
1646 err_tgt_put:
1647         mdt_object_put(info->mti_env, tgt);
1648 err_src_put:
1649         mdt_object_put(info->mti_env, src);
1650         RETURN(rc);
1651 }
1652
1653 /*
1654  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
1655  * 2 - src child; 3 - tgt child.
1656  * Update on disk version of src child.
1657  */
1658 /**
1659  * For DNE phase I, only these renames are allowed
1660  *      mv src_p/src_c tgt_p/tgt_c
1661  * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT.
1662  * 2. src_p and tgt_p are same directory, and tgt_c does not
1663  *    exists. In this case, all of modification will happen
1664  *    in the MDT where ithesource parent is, only one remote
1665  *    update is needed, i.e. set c_time/m_time on the child.
1666  *    And tgt_c will be still in the same MDT as the original
1667  *    src_c.
1668  */
1669 static int mdt_reint_rename_internal(struct mdt_thread_info *info,
1670                                      struct mdt_lock_handle *lhc)
1671 {
1672         struct mdt_reint_record *rr = &info->mti_rr;
1673         struct md_attr          *ma = &info->mti_attr;
1674         struct ptlrpc_request   *req = mdt_info_req(info);
1675         struct mdt_object       *msrcdir = NULL;
1676         struct mdt_object       *mtgtdir = NULL;
1677         struct mdt_object       *mold;
1678         struct mdt_object       *mnew = NULL;
1679         struct mdt_lock_handle  *lh_srcdirp;
1680         struct mdt_lock_handle  *lh_tgtdirp;
1681         struct mdt_lock_handle  *lh_oldp = NULL;
1682         struct mdt_lock_handle  *lh_newp = NULL;
1683         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
1684         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
1685         __u64                   lock_ibits;
1686         int                      rc;
1687         ENTRY;
1688
1689         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
1690                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
1691                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
1692
1693         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
1694         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
1695         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
1696         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
1697
1698         /* step 1&2: lock the source and target dirs. */
1699         rc = mdt_rename_parents_lock(info, &msrcdir, &mtgtdir);
1700         if (rc)
1701                 RETURN(rc);
1702
1703         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
1704
1705         /* step 3: find & lock the old object. */
1706         fid_zero(old_fid);
1707         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
1708         if (rc != 0)
1709                 GOTO(out_unlock_parents, rc);
1710
1711         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
1712                 GOTO(out_unlock_parents, rc = -EINVAL);
1713
1714         if (!fid_is_md_operative(old_fid))
1715                 GOTO(out_unlock_parents, rc = -EPERM);
1716
1717         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
1718         if (IS_ERR(mold))
1719                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
1720
1721         /* Check if @mtgtdir is subdir of @mold, before locking child
1722          * to avoid reverse locking. */
1723         rc = mdt_is_subdir(info, mtgtdir, old_fid);
1724         if (rc)
1725                 GOTO(out_put_old, rc);
1726
1727         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
1728         /* save version after locking */
1729         mdt_version_get_save(info, mold, 2);
1730
1731         /* step 4: find & lock the new object. */
1732         /* new target object may not exist now */
1733         /* lookup with version checking */
1734         fid_zero(new_fid);
1735         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
1736                                       3);
1737         if (rc == 0) {
1738                 /* the new_fid should have been filled at this moment */
1739                 if (lu_fid_eq(old_fid, new_fid))
1740                         GOTO(out_put_old, rc);
1741
1742                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1743                     lu_fid_eq(new_fid, rr->rr_fid2))
1744                         GOTO(out_put_old, rc = -EINVAL);
1745
1746                 if (!fid_is_md_operative(new_fid))
1747                         GOTO(out_put_old, rc = -EPERM);
1748
1749                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1750                 if (IS_ERR(mnew))
1751                         GOTO(out_put_old, rc = PTR_ERR(mnew));
1752
1753                 if (mdt_object_remote(mnew)) {
1754                         struct mdt_body  *repbody;
1755
1756                         /* Always send rename req to the target child MDT */
1757                         repbody = req_capsule_server_get(info->mti_pill,
1758                                                          &RMF_MDT_BODY);
1759                         LASSERT(repbody != NULL);
1760                         repbody->mbo_fid1 = *new_fid;
1761                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1762                         GOTO(out_put_old, rc = -EREMOTE);
1763                 }
1764                 /* Before locking the target dir, check we do not replace
1765                  * a dir with a non-dir, otherwise it may deadlock with
1766                  * link op which tries to create a link in this dir
1767                  * back to this non-dir. */
1768                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
1769                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
1770                         GOTO(out_put_new, rc = -EISDIR);
1771
1772                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
1773                 mdt_lock_reg_init(lh_oldp, LCK_EX);
1774
1775                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
1776                 if (mdt_object_remote(msrcdir)) {
1777                         /* Enqueue lookup lock from the parent MDT */
1778                         rc = mdt_remote_object_lock(info, msrcdir,
1779                                                     mdt_object_fid(mold),
1780                                                     &lh_oldp->mlh_rreg_lh,
1781                                                     lh_oldp->mlh_rreg_mode,
1782                                                     MDS_INODELOCK_LOOKUP);
1783                         if (rc != ELDLM_OK)
1784                                 GOTO(out_put_new, rc);
1785
1786                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1787                 }
1788
1789                 rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits);
1790                 if (rc != 0)
1791                         GOTO(out_unlock_old, rc);
1792
1793                 /* Check if @msrcdir is subdir of @mnew, before locking child
1794                  * to avoid reverse locking. */
1795                 rc = mdt_is_subdir(info, msrcdir, new_fid);
1796                 if (rc)
1797                         GOTO(out_unlock_old, rc);
1798
1799                 /* We used to acquire MDS_INODELOCK_FULL here but we
1800                  * can't do this now because a running HSM restore on
1801                  * the rename onto victim will hold the layout
1802                  * lock. See LU-4002. */
1803
1804                 lh_newp = &info->mti_lh[MDT_LH_NEW];
1805                 mdt_lock_reg_init(lh_newp, LCK_EX);
1806                 rc = mdt_object_lock(info, mnew, lh_newp,
1807                                      MDS_INODELOCK_LOOKUP |
1808                                      MDS_INODELOCK_UPDATE);
1809                 if (rc != 0)
1810                         GOTO(out_unlock_old, rc);
1811
1812                 /* get and save version after locking */
1813                 mdt_version_get_save(info, mnew, 3);
1814         } else if (rc != -EREMOTE && rc != -ENOENT) {
1815                 GOTO(out_put_old, rc);
1816         } else {
1817                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
1818                 mdt_lock_reg_init(lh_oldp, LCK_EX);
1819
1820                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
1821                 if (mdt_object_remote(msrcdir)) {
1822                         /* Enqueue lookup lock from the parent MDT */
1823                         rc = mdt_remote_object_lock(info, msrcdir,
1824                                                     mdt_object_fid(mold),
1825                                                     &lh_oldp->mlh_rreg_lh,
1826                                                     lh_oldp->mlh_rreg_mode,
1827                                                     MDS_INODELOCK_LOOKUP);
1828                         if (rc != ELDLM_OK)
1829                                 GOTO(out_put_new, rc);
1830
1831                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1832                 }
1833
1834                 rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits);
1835                 if (rc != 0)
1836                         GOTO(out_put_old, rc);
1837
1838                 mdt_enoent_version_save(info, 3);
1839         }
1840
1841         /* step 5: rename it */
1842         mdt_reint_init_ma(info, ma);
1843
1844         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1845                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1846
1847         if (mnew != NULL)
1848                 mutex_lock(&mnew->mot_lov_mutex);
1849
1850         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1851                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
1852                         mnew != NULL ? mdt_object_child(mnew) : NULL,
1853                         &rr->rr_tgt_name, ma);
1854
1855         if (mnew != NULL)
1856                 mutex_unlock(&mnew->mot_lov_mutex);
1857
1858         /* handle last link of tgt object */
1859         if (rc == 0) {
1860                 mdt_counter_incr(req, LPROC_MDT_RENAME);
1861                 if (mnew)
1862                         mdt_handle_last_unlink(info, mnew, ma);
1863
1864                 mdt_rename_counter_tally(info, info->mti_mdt, req,
1865                                          msrcdir, mtgtdir);
1866         }
1867
1868         EXIT;
1869         if (mnew != NULL)
1870                 mdt_object_unlock(info, mnew, lh_newp, rc);
1871 out_unlock_old:
1872         mdt_object_unlock(info, mold, lh_oldp, rc);
1873 out_put_new:
1874         if (mnew != NULL)
1875                 mdt_object_put(info->mti_env, mnew);
1876 out_put_old:
1877         mdt_object_put(info->mti_env, mold);
1878 out_unlock_parents:
1879         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
1880         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1881         return rc;
1882 }
1883
1884 static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
1885                                        struct mdt_lock_handle *lhc, bool rename)
1886 {
1887         struct mdt_reint_record *rr = &info->mti_rr;
1888         struct ptlrpc_request   *req = mdt_info_req(info);
1889         struct lustre_handle    rename_lh = { 0 };
1890         int                     rc;
1891         ENTRY;
1892
1893         if (info->mti_dlm_req)
1894                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1895
1896         if (!fid_is_md_operative(rr->rr_fid1) ||
1897             !fid_is_md_operative(rr->rr_fid2))
1898                 RETURN(-EPERM);
1899
1900         /* Note: do not enqueue rename lock for replay request, because
1901          * if other MDT holds rename lock, but being blocked to wait for
1902          * this MDT to finish its recovery, and the failover MDT can not
1903          * get rename lock, which will cause deadlock. */
1904         if (!req_is_replay(req)) {
1905                 rc = mdt_rename_lock(info, &rename_lh);
1906                 if (rc != 0) {
1907                         CERROR("%s: can't lock FS for rename: rc  = %d\n",
1908                                mdt_obd_name(info->mti_mdt), rc);
1909                         RETURN(rc);
1910                 }
1911         }
1912
1913         if (rename)
1914                 rc = mdt_reint_rename_internal(info, lhc);
1915         else
1916                 rc = mdt_reint_migrate_internal(info, lhc);
1917
1918         if (lustre_handle_is_used(&rename_lh))
1919                 mdt_rename_unlock(&rename_lh);
1920
1921         RETURN(rc);
1922 }
1923
1924 static int mdt_reint_rename(struct mdt_thread_info *info,
1925                             struct mdt_lock_handle *lhc)
1926 {
1927         return mdt_reint_rename_or_migrate(info, lhc, true);
1928 }
1929
1930 static int mdt_reint_migrate(struct mdt_thread_info *info,
1931                             struct mdt_lock_handle *lhc)
1932 {
1933         return mdt_reint_rename_or_migrate(info, lhc, false);
1934 }
1935
1936 struct mdt_reinter {
1937         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
1938         enum lprocfs_extra_opc mr_extra_opc;
1939 };
1940
1941 static const struct mdt_reinter mdt_reinters[] = {
1942         [REINT_SETATTR] = {
1943                 .mr_handler = &mdt_reint_setattr,
1944                 .mr_extra_opc = MDS_REINT_SETATTR,
1945         },
1946         [REINT_CREATE] = {
1947                 .mr_handler = &mdt_reint_create,
1948                 .mr_extra_opc = MDS_REINT_CREATE,
1949         },
1950         [REINT_LINK] = {
1951                 .mr_handler = &mdt_reint_link,
1952                 .mr_extra_opc = MDS_REINT_LINK,
1953         },
1954         [REINT_UNLINK] = {
1955                 .mr_handler = &mdt_reint_unlink,
1956                 .mr_extra_opc = MDS_REINT_UNLINK,
1957         },
1958         [REINT_RENAME] = {
1959                 .mr_handler = &mdt_reint_rename,
1960                 .mr_extra_opc = MDS_REINT_RENAME,
1961         },
1962         [REINT_OPEN] = {
1963                 .mr_handler = &mdt_reint_open,
1964                 .mr_extra_opc = MDS_REINT_OPEN,
1965         },
1966         [REINT_SETXATTR] = {
1967                 .mr_handler = &mdt_reint_setxattr,
1968                 .mr_extra_opc = MDS_REINT_SETXATTR,
1969         },
1970         [REINT_RMENTRY] = {
1971                 .mr_handler = &mdt_reint_unlink,
1972                 .mr_extra_opc = MDS_REINT_UNLINK,
1973         },
1974         [REINT_MIGRATE] = {
1975                 .mr_handler = &mdt_reint_migrate,
1976                 .mr_extra_opc = MDS_REINT_RENAME,
1977         },
1978 };
1979
1980 int mdt_reint_rec(struct mdt_thread_info *info,
1981                   struct mdt_lock_handle *lhc)
1982 {
1983         const struct mdt_reinter *mr;
1984         int rc;
1985         ENTRY;
1986
1987         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
1988                 RETURN(-EPROTO);
1989
1990         mr = &mdt_reinters[info->mti_rr.rr_opcode];
1991         if (mr->mr_handler == NULL)
1992                 RETURN(-EPROTO);
1993
1994         rc = (*mr->mr_handler)(info, lhc);
1995
1996         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
1997                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
1998
1999         RETURN(rc);
2000 }