Whamcloud - gitweb
LU-7230 llite: clear dir stripe md in ll_iget
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static inline int
61 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
62 {
63         if (!lu_name_is_valid(ln))
64                 return -EINVAL;
65         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
66                 return -ENAMETOOLONG;
67         else
68                 return 0;
69 }
70
71 /* Get FID from name and parent */
72 static int
73 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
74              const struct lu_attr *pattr, const struct lu_name *lname,
75              struct lu_fid* fid, int mask)
76 {
77         const char *name                = lname->ln_name;
78         const struct dt_key *key        = (const struct dt_key *)name;
79         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
80         struct mdd_device *m            = mdo2mdd(pobj);
81         struct dt_object *dir           = mdd_object_child(mdd_obj);
82         int rc;
83         ENTRY;
84
85         if (unlikely(mdd_is_dead_obj(mdd_obj)))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         } else if (!mdd_object_exists(mdd_obj)) {
92                 RETURN(-ESTALE);
93         }
94
95         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
96                                             MOR_TGT_PARENT);
97         if (rc)
98                 RETURN(rc);
99
100         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
101                    dt_try_as_dir(env, dir)))
102                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
103         else
104                 rc = -ENOTDIR;
105
106         RETURN(rc);
107 }
108
109 int mdd_lookup(const struct lu_env *env,
110                struct md_object *pobj, const struct lu_name *lname,
111                struct lu_fid *fid, struct md_op_spec *spec)
112 {
113         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
114         int rc;
115         ENTRY;
116
117         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
118         if (rc != 0)
119                 RETURN(rc);
120
121         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
122                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
123         RETURN(rc);
124 }
125
126 /**
127  * Get parent FID of the directory
128  *
129  * Read parent FID from linkEA, if that fails, then do lookup
130  * dotdot to get the parent FID.
131  *
132  * \param[in] env       execution environment
133  * \param[in] obj       object from which to find the parent FID
134  * \param[in] attr      attribute of the object
135  * \param[out] fid      fid to get the parent FID
136  *
137  * \retval              0 if getting the parent FID succeeds.
138  * \retval              negative errno if getting the parent FID fails.
139  **/
140 static inline int mdd_parent_fid(const struct lu_env *env,
141                                  struct mdd_object *obj,
142                                  const struct lu_attr *attr,
143                                  struct lu_fid *fid)
144 {
145         struct mdd_thread_info  *info = mdd_env_info(env);
146         struct linkea_data      ldata = { NULL };
147         struct lu_buf           *buf = &info->mti_link_buf;
148         struct lu_name          lname;
149         int                     rc = 0;
150
151         ENTRY;
152
153         LASSERT(S_ISDIR(mdd_object_type(obj)));
154
155         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
156         if (buf->lb_buf == NULL)
157                 GOTO(lookup, rc = 0);
158
159         ldata.ld_buf = buf;
160         rc = mdd_links_read(env, obj, &ldata);
161         if (rc != 0)
162                 GOTO(lookup, rc);
163
164         LASSERT(ldata.ld_leh != NULL);
165         /* Directory should only have 1 parent */
166         if (ldata.ld_leh->leh_reccount > 1)
167                 GOTO(lookup, rc);
168
169         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
170
171         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
172         if (likely(fid_is_sane(fid)))
173                 RETURN(0);
174 lookup:
175         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
176         RETURN(rc);
177 }
178
179 /*
180  * For root fid use special function, which does not compare version component
181  * of fid. Version component is different for root fids on all MDTs.
182  */
183 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
184 {
185         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
186                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
187 }
188
189 /*
190  * return 1: if lf is the fid of the ancestor of p1;
191  * return 0: if not;
192  *
193  * return -EREMOTE: if remote object is found, in this
194  * case fid of remote object is saved to @pf;
195  *
196  * otherwise: values < 0, errors.
197  */
198 static int mdd_is_parent(const struct lu_env *env,
199                         struct mdd_device *mdd,
200                         struct mdd_object *p1,
201                         const struct lu_attr *attr,
202                         const struct lu_fid *lf,
203                         struct lu_fid *pf)
204 {
205         struct mdd_object *parent = NULL;
206         struct lu_fid *pfid;
207         int rc;
208         ENTRY;
209
210         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
211         pfid = &mdd_env_info(env)->mti_fid;
212
213         /* Check for root first. */
214         if (mdd_is_root(mdd, mdo2fid(p1)))
215                 RETURN(0);
216
217         for(;;) {
218                 /* this is done recursively */
219                 rc = mdd_parent_fid(env, p1, attr, pfid);
220                 if (rc)
221                         GOTO(out, rc);
222                 if (mdd_is_root(mdd, pfid))
223                         GOTO(out, rc = 0);
224                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
225                         GOTO(out, rc = 0);
226                 if (lu_fid_eq(pfid, lf))
227                         GOTO(out, rc = 1);
228                 if (parent != NULL)
229                         mdd_object_put(env, parent);
230
231                 parent = mdd_object_find(env, mdd, pfid);
232                 if (IS_ERR(parent))
233                         GOTO(out, rc = PTR_ERR(parent));
234                 p1 = parent;
235         }
236         EXIT;
237 out:
238         if (parent && !IS_ERR(parent))
239                 mdd_object_put(env, parent);
240         return rc;
241 }
242
243 /*
244  * No permission check is needed.
245  *
246  * returns 1: if fid is ancestor of @mo;
247  * returns 0: if fid is not an ancestor of @mo;
248  *
249  * returns EREMOTE if remote object is found, fid of remote object is saved to
250  * @fid;
251  *
252  * returns < 0: if error
253  */
254 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
255                   const struct lu_fid *fid, struct lu_fid *sfid)
256 {
257         struct mdd_device *mdd = mdo2mdd(mo);
258         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
259         int rc;
260         ENTRY;
261
262         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
263                 RETURN(0);
264
265         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
266         if (rc != 0)
267                 RETURN(rc);
268
269         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
270         if (rc == 0) {
271                 /* found root */
272                 fid_zero(sfid);
273         } else if (rc == 1) {
274                 /* found @fid is parent */
275                 *sfid = *fid;
276                 rc = 0;
277         }
278         RETURN(rc);
279 }
280
281 /*
282  * Check that @dir contains no entries except (possibly) dot and dotdot.
283  *
284  * Returns:
285  *
286  *             0        empty
287  *      -ENOTDIR        not a directory object
288  *    -ENOTEMPTY        not empty
289  *           -ve        other error
290  *
291  */
292 static int mdd_dir_is_empty(const struct lu_env *env,
293                             struct mdd_object *dir)
294 {
295         struct dt_it     *it;
296         struct dt_object *obj;
297         const struct dt_it_ops *iops;
298         int result;
299         ENTRY;
300
301         obj = mdd_object_child(dir);
302         if (!dt_try_as_dir(env, obj))
303                 RETURN(-ENOTDIR);
304
305         iops = &obj->do_index_ops->dio_it;
306         it = iops->init(env, obj, LUDA_64BITHASH);
307         if (!IS_ERR(it)) {
308                 result = iops->get(env, it, (const struct dt_key *)"");
309                 if (result > 0) {
310                         int i;
311                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
312                                 result = iops->next(env, it);
313                         if (result == 0)
314                                 result = -ENOTEMPTY;
315                         else if (result == 1)
316                                 result = 0;
317                 } else if (result == 0)
318                         /*
319                          * Huh? Index contains no zero key?
320                          */
321                         result = -EIO;
322
323                 iops->put(env, it);
324                 iops->fini(env, it);
325         } else
326                 result = PTR_ERR(it);
327         RETURN(result);
328 }
329
330 /**
331  * Determine if the target object can be hard linked, and right now it only
332  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
333  * directory nlink count might exceed the maximum link count(see
334  * osd_object_ref_add), so it only check nlink for non-directories.
335  *
336  * \param[in] env       thread environment
337  * \param[in] obj       object being linked to
338  * \param[in] la        attributes of \a obj
339  *
340  * \retval              0 if \a obj can be hard linked
341  * \retval              negative error if \a obj is a directory or has too
342  *                      many links
343  */
344 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
345                           const struct lu_attr *la)
346 {
347         struct mdd_device *m = mdd_obj2mdd_dev(obj);
348         ENTRY;
349
350         LASSERT(la != NULL);
351
352         /* Subdir count limitation can be broken through
353          * (see osd_object_ref_add), so only check non-directory here. */
354         if (!S_ISDIR(la->la_mode) &&
355             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
356                 RETURN(-EMLINK);
357
358         RETURN(0);
359 }
360
361 /**
362  * Check whether it may create the cobj under the pobj.
363  *
364  * \param[in] env       execution environment
365  * \param[in] pobj      the parent directory
366  * \param[in] pattr     the attribute of the parent directory
367  * \param[in] cobj      the child to be created
368  * \param[in] check_perm        if check WRITE|EXEC permission for parent
369  *
370  * \retval              = 0 create the child under this dir is allowed
371  * \retval              negative errno create the child under this dir is
372  *                      not allowed
373  */
374 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
375                    const struct lu_attr *pattr, struct mdd_object *cobj,
376                    bool check_perm)
377 {
378         struct mdd_thread_info *info = mdd_env_info(env);
379         struct lu_buf   *xbuf;
380         int rc = 0;
381         ENTRY;
382
383         if (cobj && mdd_object_exists(cobj))
384                 RETURN(-EEXIST);
385
386         if (mdd_is_dead_obj(pobj))
387                 RETURN(-ENOENT);
388
389         /* If the parent is a sub-stripe, check whether it is dead */
390         xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
391         rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV);
392         if (unlikely(rc > 0)) {
393                 struct lmv_mds_md_v1  *lmv1 = xbuf->lb_buf;
394
395                 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
396                     le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
397                         RETURN(-ESTALE);
398         }
399         rc = 0;
400
401         if (check_perm)
402                 rc = mdd_permission_internal_locked(env, pobj, pattr,
403                                                     MAY_WRITE | MAY_EXEC,
404                                                     MOR_TGT_PARENT);
405         RETURN(rc);
406 }
407
408 /*
409  * Check whether can unlink from the pobj in the case of "cobj == NULL".
410  */
411 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
412                    const struct lu_attr *pattr, const struct lu_attr *attr)
413 {
414         int rc;
415         ENTRY;
416
417         if (mdd_is_dead_obj(pobj))
418                 RETURN(-ENOENT);
419
420         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
421                 RETURN(-EPERM);
422
423         rc = mdd_permission_internal_locked(env, pobj, pattr,
424                                             MAY_WRITE | MAY_EXEC,
425                                             MOR_TGT_PARENT);
426         if (rc != 0)
427                 RETURN(rc);
428
429         if (pattr->la_flags & LUSTRE_APPEND_FL)
430                 RETURN(-EPERM);
431
432         RETURN(rc);
433 }
434
435 /*
436  * pobj == NULL is remote ops case, under such case, pobj's
437  * VTX feature has been checked already, no need check again.
438  */
439 static inline int mdd_is_sticky(const struct lu_env *env,
440                                 struct mdd_object *pobj,
441                                 const struct lu_attr *pattr,
442                                 struct mdd_object *cobj,
443                                 const struct lu_attr *cattr)
444 {
445         struct lu_ucred *uc = lu_ucred_assert(env);
446
447         if (pobj != NULL) {
448                 LASSERT(pattr != NULL);
449                 if (!(pattr->la_mode & S_ISVTX) ||
450                     (pattr->la_uid == uc->uc_fsuid))
451                         return 0;
452         }
453
454         LASSERT(cattr != NULL);
455         if (cattr->la_uid == uc->uc_fsuid)
456                 return 0;
457
458         return !md_capable(uc, CFS_CAP_FOWNER);
459 }
460
461 static int mdd_may_delete_entry(const struct lu_env *env,
462                                 struct mdd_object *pobj,
463                                 const struct lu_attr *pattr,
464                                 int check_perm)
465 {
466         ENTRY;
467
468         LASSERT(pobj != NULL);
469         if (!mdd_object_exists(pobj))
470                 RETURN(-ENOENT);
471
472         if (mdd_is_dead_obj(pobj))
473                 RETURN(-ENOENT);
474
475         if (check_perm) {
476                 int rc;
477                 rc = mdd_permission_internal_locked(env, pobj, pattr,
478                                             MAY_WRITE | MAY_EXEC,
479                                             MOR_TGT_PARENT);
480                 if (rc)
481                         RETURN(rc);
482         }
483
484         if (pattr->la_flags & LUSTRE_APPEND_FL)
485                 RETURN(-EPERM);
486
487         RETURN(0);
488 }
489
490 /*
491  * Check whether it may delete the cobj from the pobj.
492  * pobj maybe NULL
493  */
494 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
495                    const struct lu_attr *tpattr, struct mdd_object *tobj,
496                    const struct lu_attr *tattr, const struct lu_attr *cattr,
497                    int check_perm, int check_empty)
498 {
499         int rc = 0;
500         ENTRY;
501
502         if (tpobj) {
503                 LASSERT(tpattr != NULL);
504                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
505                 if (rc != 0)
506                         RETURN(rc);
507         }
508
509         if (tobj == NULL)
510                 RETURN(0);
511
512         if (!mdd_object_exists(tobj))
513                 RETURN(-ENOENT);
514
515         if (mdd_is_dead_obj(tobj))
516                 RETURN(-ESTALE);
517
518         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
519                 RETURN(-EPERM);
520
521         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
522                 RETURN(-EPERM);
523
524         /* additional check the rename case */
525         if (cattr) {
526                 if (S_ISDIR(cattr->la_mode)) {
527                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
528
529                         if (!S_ISDIR(tattr->la_mode))
530                                 RETURN(-ENOTDIR);
531
532                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
533                                 RETURN(-EBUSY);
534                 } else if (S_ISDIR(tattr->la_mode))
535                         RETURN(-EISDIR);
536         }
537
538         if (S_ISDIR(tattr->la_mode) && check_empty)
539                 rc = mdd_dir_is_empty(env, tobj);
540
541         RETURN(rc);
542 }
543
544 /**
545  * Check whether it can create the link file(linked to @src_obj) under
546  * the target directory(@tgt_obj), and src_obj has been locked by
547  * mdd_write_lock.
548  *
549  * \param[in] env       execution environment
550  * \param[in] tgt_obj   the target directory
551  * \param[in] tattr     attributes of target directory
552  * \param[in] lname     the link name
553  * \param[in] src_obj   source object for link
554  * \param[in] cattr     attributes for source object
555  *
556  * \retval              = 0 it is allowed to create the link file under tgt_obj
557  * \retval              negative error not allowed to create the link file
558  */
559 static int mdd_link_sanity_check(const struct lu_env *env,
560                                  struct mdd_object *tgt_obj,
561                                  const struct lu_attr *tattr,
562                                  const struct lu_name *lname,
563                                  struct mdd_object *src_obj,
564                                  const struct lu_attr *cattr)
565 {
566         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
567         int rc = 0;
568         ENTRY;
569
570         if (!mdd_object_exists(src_obj))
571                 RETURN(-ENOENT);
572
573         if (mdd_is_dead_obj(src_obj))
574                 RETURN(-ESTALE);
575
576         /* Local ops, no lookup before link, check filename length here. */
577         rc = mdd_name_check(m, lname);
578         if (rc < 0)
579                 RETURN(rc);
580
581         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
582                 RETURN(-EPERM);
583
584         if (S_ISDIR(mdd_object_type(src_obj)))
585                 RETURN(-EPERM);
586
587         LASSERT(src_obj != tgt_obj);
588         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
589         if (rc != 0)
590                 RETURN(rc);
591
592         rc = __mdd_may_link(env, src_obj, cattr);
593
594         RETURN(rc);
595 }
596
597 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
598                                    const char *name, struct thandle *handle)
599 {
600         struct dt_object *next = mdd_object_child(pobj);
601         int rc;
602         ENTRY;
603
604         if (dt_try_as_dir(env, next))
605                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
606         else
607                 rc = -ENOTDIR;
608
609         RETURN(rc);
610 }
611
612 static int __mdd_index_insert_only(const struct lu_env *env,
613                                    struct mdd_object *pobj,
614                                    const struct lu_fid *lf, __u32 type,
615                                    const char *name,
616                                    struct thandle *handle)
617 {
618         struct dt_object *next = mdd_object_child(pobj);
619         int               rc;
620         ENTRY;
621
622         if (dt_try_as_dir(env, next)) {
623                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
624                 struct lu_ucred         *uc  = lu_ucred_check(env);
625                 int                      ignore_quota;
626
627                 rec->rec_fid = lf;
628                 rec->rec_type = type;
629                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
630                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
631                                (const struct dt_key *)name, handle,
632                                ignore_quota);
633         } else {
634                 rc = -ENOTDIR;
635         }
636         RETURN(rc);
637 }
638
639 /* insert named index, add reference if isdir */
640 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
641                               const struct lu_fid *lf, __u32 type,
642                               const char *name, struct thandle *handle)
643 {
644         int rc;
645         ENTRY;
646
647         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
648         if (rc == 0 && S_ISDIR(type)) {
649                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
650                 mdo_ref_add(env, pobj, handle);
651                 mdd_write_unlock(env, pobj);
652         }
653
654         RETURN(rc);
655 }
656
657 /* delete named index, drop reference if isdir */
658 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
659                               const char *name, int is_dir,
660                               struct thandle *handle)
661 {
662         int               rc;
663         ENTRY;
664
665         rc = __mdd_index_delete_only(env, pobj, name, handle);
666         if (rc == 0 && is_dir) {
667                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
668                 mdo_ref_del(env, pobj, handle);
669                 mdd_write_unlock(env, pobj);
670         }
671
672         RETURN(rc);
673 }
674
675 static int mdd_llog_record_calc_size(const struct lu_env *env,
676                                      const struct lu_name *tname,
677                                      const struct lu_name *sname)
678 {
679         const struct lu_ucred   *uc = lu_ucred(env);
680         enum changelog_rec_flags crf = 0;
681         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
682                                             sizeof(struct changelog_rec);
683
684         if (sname != NULL)
685                 crf |= CLF_RENAME;
686
687         if (uc != NULL && uc->uc_jobid[0] != '\0')
688                 crf |= CLF_JOBID;
689
690         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
691                              (tname != NULL ? tname->ln_namelen : 0) +
692                              (sname != NULL ? 1 + sname->ln_namelen : 0));
693 }
694
695 int mdd_declare_changelog_store(const struct lu_env *env,
696                                 struct mdd_device *mdd,
697                                 const struct lu_name *tname,
698                                 const struct lu_name *sname,
699                                 struct thandle *handle)
700 {
701         struct obd_device               *obd = mdd2obd_dev(mdd);
702         struct llog_ctxt                *ctxt;
703         struct llog_changelog_rec       *rec;
704         struct lu_buf                   *buf;
705         struct thandle                  *llog_th;
706         int                              reclen;
707         int                              rc;
708
709         /* Not recording */
710         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
711                 return 0;
712
713         reclen = mdd_llog_record_calc_size(env, tname, sname);
714         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
715         if (buf->lb_buf == NULL)
716                 return -ENOMEM;
717
718         rec = buf->lb_buf;
719         rec->cr_hdr.lrh_len = reclen;
720         rec->cr_hdr.lrh_type = CHANGELOG_REC;
721
722         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
723         if (ctxt == NULL)
724                 return -ENXIO;
725
726         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
727         if (IS_ERR(llog_th))
728                 GOTO(out_put, rc = PTR_ERR(llog_th));
729
730         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
731
732 out_put:
733         llog_ctxt_put(ctxt);
734
735         return rc;
736 }
737
738 /** Add a changelog entry \a rec to the changelog llog
739  * \param mdd
740  * \param rec
741  * \param handle - currently ignored since llogs start their own transaction;
742  *              this will hopefully be fixed in llog rewrite
743  * \retval 0 ok
744  */
745 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
746                         struct llog_changelog_rec *rec, struct thandle *th)
747 {
748         struct obd_device       *obd = mdd2obd_dev(mdd);
749         struct llog_ctxt        *ctxt;
750         struct thandle          *llog_th;
751         int                      rc;
752
753         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
754                                             changelog_rec_varsize(&rec->cr));
755
756         /* llog_lvfs_write_rec sets the llog tail len */
757         rec->cr_hdr.lrh_type = CHANGELOG_REC;
758         rec->cr.cr_time = cl_time();
759
760         spin_lock(&mdd->mdd_cl.mc_lock);
761         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
762          * but as long as the MDD transactions are ordered correctly for e.g.
763          * rename conflicts, I don't think this should matter. */
764         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
765         spin_unlock(&mdd->mdd_cl.mc_lock);
766
767         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
768         if (ctxt == NULL)
769                 return -ENXIO;
770
771         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
772         if (IS_ERR(llog_th))
773                 GOTO(out_put, rc = PTR_ERR(llog_th));
774
775         /* nested journal transaction */
776         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
777
778 out_put:
779         llog_ctxt_put(ctxt);
780         if (rc > 0)
781                 rc = 0;
782         return rc;
783 }
784
785 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
786                                          const struct lu_fid *sfid,
787                                          const struct lu_fid *spfid,
788                                          const struct lu_name *sname)
789 {
790         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
791         size_t                           extsize = sname->ln_namelen + 1;
792
793         LASSERT(sfid != NULL);
794         LASSERT(spfid != NULL);
795         LASSERT(sname != NULL);
796
797         rnm->cr_sfid = *sfid;
798         rnm->cr_spfid = *spfid;
799
800         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
801         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
802         rec->cr_namelen += extsize;
803 }
804
805 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
806 {
807         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
808
809         if (jobid == NULL || jobid[0] == '\0')
810                 return;
811
812         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
813 }
814
815 /** Store a namespace change changelog record
816  * If this fails, we must fail the whole transaction; we don't
817  * want the change to commit without the log entry.
818  * \param target - mdd_object of change
819  * \param tpfid - target parent dir/object fid
820  * \param sfid - source object fid
821  * \param spfid - source parent fid
822  * \param tname - target name string
823  * \param sname - source name string
824  * \param handle - transaction handle
825  */
826 int mdd_changelog_ns_store(const struct lu_env *env,
827                            struct mdd_device *mdd,
828                            enum changelog_rec_type type,
829                            enum changelog_rec_flags crf,
830                            struct mdd_object *target,
831                            const struct lu_fid *tpfid,
832                            const struct lu_fid *sfid,
833                            const struct lu_fid *spfid,
834                            const struct lu_name *tname,
835                            const struct lu_name *sname,
836                            struct thandle *handle)
837 {
838         const struct lu_ucred           *uc = lu_ucred(env);
839         struct llog_changelog_rec       *rec;
840         struct lu_buf                   *buf;
841         int                              reclen;
842         int                              rc;
843         ENTRY;
844
845         /* Not recording */
846         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
847                 RETURN(0);
848
849         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
850                 RETURN(0);
851
852         LASSERT(tpfid != NULL);
853         LASSERT(tname != NULL);
854         LASSERT(handle != NULL);
855
856         reclen = mdd_llog_record_calc_size(env, tname, sname);
857         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
858         if (buf->lb_buf == NULL)
859                 RETURN(-ENOMEM);
860         rec = buf->lb_buf;
861
862         crf &= CLF_FLAGMASK;
863
864         if (uc != NULL && uc->uc_jobid[0] != '\0')
865                 crf |= CLF_JOBID;
866
867         if (sname != NULL)
868                 crf |= CLF_RENAME;
869         else
870                 crf |= CLF_VERSION;
871
872         rec->cr.cr_flags = crf;
873         rec->cr.cr_type = (__u32)type;
874         rec->cr.cr_pfid = *tpfid;
875         rec->cr.cr_namelen = tname->ln_namelen;
876         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
877
878         if (crf & CLF_RENAME)
879                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
880
881         if (crf & CLF_JOBID)
882                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
883
884         if (likely(target != NULL)) {
885                 rec->cr.cr_tfid = *mdo2fid(target);
886                 target->mod_cltime = cfs_time_current_64();
887         } else {
888                 fid_zero(&rec->cr.cr_tfid);
889         }
890
891         rc = mdd_changelog_store(env, mdd, rec, handle);
892         if (rc < 0) {
893                 CERROR("%s: cannot store changelog record: type = %d, "
894                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
895                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
896                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
897                 return -EFAULT;
898         }
899
900         return 0;
901 }
902
903 static int __mdd_links_add(const struct lu_env *env,
904                            struct mdd_object *mdd_obj,
905                            struct linkea_data *ldata,
906                            const struct lu_name *lname,
907                            const struct lu_fid *pfid,
908                            int first, int check)
909 {
910         int rc;
911
912         if (ldata->ld_leh == NULL) {
913                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
914                 if (rc) {
915                         if (rc != -ENODATA)
916                                 return rc;
917                         rc = linkea_data_new(ldata,
918                                              &mdd_env_info(env)->mti_link_buf);
919                         if (rc)
920                                 return rc;
921                 }
922         }
923
924         if (check) {
925                 rc = linkea_links_find(ldata, lname, pfid);
926                 if (rc && rc != -ENOENT)
927                         return rc;
928                 if (rc == 0)
929                         return -EEXIST;
930         }
931
932         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
933                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
934
935                 *tfid = *pfid;
936                 tfid->f_ver = ~0;
937                 linkea_add_buf(ldata, lname, tfid);
938         }
939
940         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
941                 linkea_add_buf(ldata, lname, pfid);
942
943         return linkea_add_buf(ldata, lname, pfid);
944 }
945
946 static int __mdd_links_del(const struct lu_env *env,
947                            struct mdd_object *mdd_obj,
948                            struct linkea_data *ldata,
949                            const struct lu_name *lname,
950                            const struct lu_fid *pfid)
951 {
952         int rc;
953
954         if (ldata->ld_leh == NULL) {
955                 rc = mdd_links_read(env, mdd_obj, ldata);
956                 if (rc)
957                         return rc;
958         }
959
960         rc = linkea_links_find(ldata, lname, pfid);
961         if (rc)
962                 return rc;
963
964         linkea_del_buf(ldata, lname);
965         return 0;
966 }
967
968 static int mdd_linkea_prepare(const struct lu_env *env,
969                               struct mdd_object *mdd_obj,
970                               const struct lu_fid *oldpfid,
971                               const struct lu_name *oldlname,
972                               const struct lu_fid *newpfid,
973                               const struct lu_name *newlname,
974                               int first, int check,
975                               struct linkea_data *ldata)
976 {
977         int rc = 0;
978         int rc2 = 0;
979         ENTRY;
980
981         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
982                 return 0;
983
984         LASSERT(oldpfid != NULL || newpfid != NULL);
985
986         if (mdd_obj->mod_flags & DEAD_OBJ) {
987                 /* Prevent linkea to be updated which is NOT necessary. */
988                 ldata->ld_reclen = 0;
989                 /* No more links, don't bother */
990                 RETURN(0);
991         }
992
993         if (oldpfid != NULL) {
994                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
995                 if (rc) {
996                         if ((check == 1) ||
997                             (rc != -ENODATA && rc != -ENOENT))
998                                 RETURN(rc);
999                         /* No changes done. */
1000                         rc = 0;
1001                 }
1002         }
1003
1004         /* If renaming, add the new record */
1005         if (newpfid != NULL) {
1006                 /* even if the add fails, we still delete the out-of-date
1007                  * old link */
1008                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1009                                       first, check);
1010         }
1011
1012         rc = rc != 0 ? rc : rc2;
1013
1014         RETURN(rc);
1015 }
1016
1017 int mdd_links_rename(const struct lu_env *env,
1018                      struct mdd_object *mdd_obj,
1019                      const struct lu_fid *oldpfid,
1020                      const struct lu_name *oldlname,
1021                      const struct lu_fid *newpfid,
1022                      const struct lu_name *newlname,
1023                      struct thandle *handle,
1024                      struct linkea_data *ldata,
1025                      int first, int check)
1026 {
1027         int rc = 0;
1028         ENTRY;
1029
1030         if (ldata == NULL) {
1031                 ldata = &mdd_env_info(env)->mti_link_data;
1032                 memset(ldata, 0, sizeof(*ldata));
1033                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1034                                         newpfid, newlname, first, check,
1035                                         ldata);
1036                 if (rc != 0)
1037                         GOTO(out, rc);
1038         }
1039
1040         if (ldata->ld_reclen != 0)
1041                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1042         EXIT;
1043 out:
1044         if (rc != 0) {
1045                 int error = 1;
1046                 if (rc == -EOVERFLOW || rc == -ENOSPC)
1047                         error = 0;
1048                 if (newlname == NULL)
1049                         CDEBUG(error ? D_ERROR : D_OTHER,
1050                                "link_ea add failed %d "DFID"\n",
1051                                rc, PFID(mdd_object_fid(mdd_obj)));
1052                 else if (oldpfid == NULL)
1053                         CDEBUG(error ? D_ERROR : D_OTHER,
1054                                "link_ea add '%.*s' failed %d "DFID"\n",
1055                                newlname->ln_namelen, newlname->ln_name,
1056                                rc, PFID(mdd_object_fid(mdd_obj)));
1057                 else if (newpfid == NULL)
1058                         CDEBUG(error ? D_ERROR : D_OTHER,
1059                                "link_ea del '%.*s' failed %d "DFID"\n",
1060                                oldlname->ln_namelen, oldlname->ln_name,
1061                                rc, PFID(mdd_object_fid(mdd_obj)));
1062                 else
1063                         CDEBUG(error ? D_ERROR : D_OTHER,
1064                                "link_ea rename '%.*s'->'%.*s' failed %d "
1065                                DFID"\n",
1066                                oldlname->ln_namelen, oldlname->ln_name,
1067                                newlname->ln_namelen, newlname->ln_name,
1068                                rc, PFID(mdd_object_fid(mdd_obj)));
1069         }
1070
1071         if (is_vmalloc_addr(ldata->ld_buf))
1072                 /* if we vmalloced a large buffer drop it */
1073                 lu_buf_free(ldata->ld_buf);
1074
1075         return rc;
1076 }
1077
1078 static inline int mdd_links_add(const struct lu_env *env,
1079                                 struct mdd_object *mdd_obj,
1080                                 const struct lu_fid *pfid,
1081                                 const struct lu_name *lname,
1082                                 struct thandle *handle,
1083                                 struct linkea_data *ldata, int first)
1084 {
1085         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1086                                 pfid, lname, handle, ldata, first, 0);
1087 }
1088
1089 static inline int mdd_links_del(const struct lu_env *env,
1090                                 struct mdd_object *mdd_obj,
1091                                 const struct lu_fid *pfid,
1092                                 const struct lu_name *lname,
1093                                 struct thandle *handle)
1094 {
1095         return mdd_links_rename(env, mdd_obj, pfid, lname,
1096                                 NULL, NULL, handle, NULL, 0, 0);
1097 }
1098
1099 /** Read the link EA into a temp buffer.
1100  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1101  * A pointer to the buffer is stored in \a ldata::ld_buf.
1102  *
1103  * \retval 0 or error
1104  */
1105 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1106                    struct linkea_data *ldata)
1107 {
1108         int rc;
1109
1110         if (!mdd_object_exists(mdd_obj))
1111                 return -ENODATA;
1112
1113         /* First try a small buf */
1114         LASSERT(env != NULL);
1115         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1116                                                PAGE_CACHE_SIZE);
1117         if (ldata->ld_buf->lb_buf == NULL)
1118                 return -ENOMEM;
1119
1120         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
1121         if (rc == -ERANGE) {
1122                 /* Buf was too small, figure out what we need. */
1123                 lu_buf_free(ldata->ld_buf);
1124                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1125                                    XATTR_NAME_LINK);
1126                 if (rc < 0)
1127                         return rc;
1128                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1129                 if (ldata->ld_buf->lb_buf == NULL)
1130                         return -ENOMEM;
1131                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1132                                   XATTR_NAME_LINK);
1133         }
1134         if (rc < 0) {
1135                 lu_buf_free(ldata->ld_buf);
1136                 ldata->ld_buf = NULL;
1137                 return rc;
1138         }
1139
1140         return linkea_init(ldata);
1141 }
1142
1143 /** Read the link EA into a temp buffer.
1144  * Uses the name_buf since it is generally large.
1145  * \retval IS_ERR err
1146  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1147  */
1148 struct lu_buf *mdd_links_get(const struct lu_env *env,
1149                              struct mdd_object *mdd_obj)
1150 {
1151         struct linkea_data ldata = { NULL };
1152         int rc;
1153
1154         rc = mdd_links_read(env, mdd_obj, &ldata);
1155         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1156 }
1157
1158 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1159                     struct linkea_data *ldata, struct thandle *handle)
1160 {
1161         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1162                                                      ldata->ld_leh->leh_len);
1163         int                 rc;
1164
1165         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1166                 return 0;
1167
1168         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1169         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1170             mdd_object_remote(mdd_obj) == 0) {
1171                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1172                 struct thandle  *sub_th;
1173
1174                 /* XXX: If the linkEA is overflow, then we need to notify the
1175                  *      namespace LFSCK to skip "nlink" attribute verification
1176                  *      on this object to avoid the "nlink" to be shrinked by
1177                  *      wrong. It may be not good an interaction with LFSCK
1178                  *      like this. We will consider to replace it with other
1179                  *      mechanism in future. LU-5802. */
1180                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1181                                LFSCK_TYPE_NAMESPACE);
1182
1183                 sub_th = thandle_get_sub_by_dt(env, handle,
1184                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1185                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1186                                 lr, sub_th);
1187         }
1188
1189         return rc;
1190 }
1191
1192 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1193                           struct thandle *handle, struct linkea_data *ldata,
1194                           enum mdd_links_add_overflow overflow)
1195 {
1196         int     rc;
1197         int     ea_len;
1198         void    *linkea;
1199
1200         if (ldata != NULL && ldata->ld_leh != NULL) {
1201                 ea_len = ldata->ld_leh->leh_len;
1202                 linkea = ldata->ld_buf->lb_buf;
1203         } else {
1204                 ea_len = DEFAULT_LINKEA_SIZE;
1205                 linkea = NULL;
1206         }
1207
1208         /* XXX: max size? */
1209         rc = mdo_declare_xattr_set(env, mdd_obj,
1210                                    mdd_buf_get_const(env, linkea, ea_len),
1211                                    XATTR_NAME_LINK, 0, handle);
1212         if (rc != 0)
1213                 return rc;
1214
1215         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1216                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1217                 struct thandle  *sub_th;
1218
1219                 /* XXX: If the linkEA is overflow, then we need to notify the
1220                  *      namespace LFSCK to skip "nlink" attribute verification
1221                  *      on this object to avoid the "nlink" to be shrinked by
1222                  *      wrong. It may be not good an interaction with LFSCK
1223                  *      like this. We will consider to replace it with other
1224                  *      mechanism in future. LU-5802. */
1225                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1226                                LFSCK_TYPE_NAMESPACE);
1227
1228                 sub_th = thandle_get_sub_by_dt(env, handle,
1229                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1230                 rc = lfsck_in_notify(env,
1231                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1232                                      lr, sub_th);
1233         }
1234
1235         return rc;
1236 }
1237
1238 static inline int mdd_declare_links_del(const struct lu_env *env,
1239                                         struct mdd_object *c,
1240                                         struct thandle *handle)
1241 {
1242         int rc = 0;
1243
1244         /* For directory, the linkEA will be removed together
1245          * with the object. */
1246         if (!S_ISDIR(mdd_object_type(c)))
1247                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1248
1249         return rc;
1250 }
1251
1252 static int mdd_declare_link(const struct lu_env *env,
1253                             struct mdd_device *mdd,
1254                             struct mdd_object *p,
1255                             struct mdd_object *c,
1256                             const struct lu_name *name,
1257                             struct thandle *handle,
1258                             struct lu_attr *la,
1259                             struct linkea_data *data)
1260 {
1261         int rc;
1262
1263         rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
1264                                       name->ln_name, handle);
1265         if (rc != 0)
1266                 return rc;
1267
1268         rc = mdo_declare_ref_add(env, c, handle);
1269         if (rc != 0)
1270                 return rc;
1271
1272         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1273                 rc = mdo_declare_ref_add(env, c, handle);
1274                 if (rc != 0)
1275                         return rc;
1276         }
1277
1278         la->la_valid = LA_CTIME | LA_MTIME;
1279         rc = mdo_declare_attr_set(env, p, la, handle);
1280         if (rc != 0)
1281                 return rc;
1282
1283         la->la_valid = LA_CTIME;
1284         rc = mdo_declare_attr_set(env, c, la, handle);
1285         if (rc != 0)
1286                 return rc;
1287
1288         rc = mdd_declare_links_add(env, c, handle, data,
1289                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1290         if (rc != 0)
1291                 return rc;
1292
1293         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1294
1295         return rc;
1296 }
1297
1298 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1299                     struct md_object *src_obj, const struct lu_name *lname,
1300                     struct md_attr *ma)
1301 {
1302         const char *name = lname->ln_name;
1303         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1304         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1305         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1306         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1307         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1308         struct mdd_device *mdd = mdo2mdd(src_obj);
1309         struct thandle *handle;
1310         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1311         int rc;
1312         ENTRY;
1313
1314         rc = mdd_la_get(env, mdd_sobj, cattr);
1315         if (rc != 0)
1316                 RETURN(rc);
1317
1318         rc = mdd_la_get(env, mdd_tobj, tattr);
1319         if (rc != 0)
1320                 RETURN(rc);
1321
1322         handle = mdd_trans_create(env, mdd);
1323         if (IS_ERR(handle))
1324                 GOTO(out_pending, rc = PTR_ERR(handle));
1325
1326         memset(ldata, 0, sizeof(*ldata));
1327
1328         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1329         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1330
1331         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1332                               la, ldata);
1333         if (rc)
1334                 GOTO(stop, rc);
1335
1336         rc = mdd_trans_start(env, mdd, handle);
1337         if (rc)
1338                 GOTO(stop, rc);
1339
1340         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1341         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1342                                    cattr);
1343         if (rc)
1344                 GOTO(out_unlock, rc);
1345
1346         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1347                 rc = mdo_ref_add(env, mdd_sobj, handle);
1348                 if (rc != 0)
1349                         GOTO(out_unlock, rc);
1350         }
1351
1352         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1353                 rc = mdo_ref_add(env, mdd_sobj, handle);
1354                 if (rc != 0)
1355                         GOTO(out_unlock, rc);
1356         }
1357
1358         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
1359                 struct lu_fid tfid = *mdo2fid(mdd_sobj);
1360
1361                 tfid.f_oid++;
1362                 rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
1363                                              mdd_object_type(mdd_sobj),
1364                                              name, handle);
1365         } else {
1366                 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1367                                              mdd_object_type(mdd_sobj),
1368                                              name, handle);
1369         }
1370
1371         if (rc != 0) {
1372                 mdo_ref_del(env, mdd_sobj, handle);
1373                 GOTO(out_unlock, rc);
1374         }
1375
1376         la->la_valid = LA_CTIME | LA_MTIME;
1377         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1378         if (rc)
1379                 GOTO(out_unlock, rc);
1380
1381         la->la_valid = LA_CTIME;
1382         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1383         if (rc == 0) {
1384                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1385                                         mdo2fid(mdd_tobj), lname, 0, 0,
1386                                         ldata);
1387                 if (rc == 0)
1388                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1389                                       lname, handle, ldata, 0);
1390                 /* The failure of links_add should not cause the link
1391                  * failure, reset rc here */
1392                 rc = 0;
1393         }
1394         EXIT;
1395 out_unlock:
1396         mdd_write_unlock(env, mdd_sobj);
1397         if (rc == 0)
1398                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1399                                             mdo2fid(mdd_tobj), NULL, NULL,
1400                                             lname, NULL, handle);
1401 stop:
1402         mdd_trans_stop(env, mdd, rc, handle);
1403
1404         if (is_vmalloc_addr(ldata->ld_buf))
1405                 /* if we vmalloced a large buffer drop it */
1406                 lu_buf_free(ldata->ld_buf);
1407 out_pending:
1408         return rc;
1409 }
1410
1411 static int mdd_mark_dead_object(const struct lu_env *env,
1412                                 struct mdd_object *obj, struct thandle *handle,
1413                                 bool declare)
1414 {
1415         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1416         int rc;
1417
1418         if (!declare)
1419                 obj->mod_flags |= DEAD_OBJ;
1420
1421         if (!S_ISDIR(mdd_object_type(obj)))
1422                 return 0;
1423
1424         attr->la_valid = LA_FLAGS;
1425         attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
1426
1427         if (declare)
1428                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1429         else
1430                 rc = mdo_attr_set(env, obj, attr, handle);
1431
1432         return rc;
1433 }
1434
1435 static int mdd_declare_finish_unlink(const struct lu_env *env,
1436                                      struct mdd_object *obj,
1437                                      struct thandle *handle)
1438 {
1439         int     rc;
1440
1441         rc = mdd_mark_dead_object(env, obj, handle, true);
1442         if (rc != 0)
1443                 return rc;
1444
1445         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1446         if (rc != 0)
1447                 return rc;
1448
1449         rc = mdo_declare_destroy(env, obj, handle);
1450         if (rc != 0)
1451                 return rc;
1452
1453         return mdd_declare_links_del(env, obj, handle);
1454 }
1455
1456 /* caller should take a lock before calling */
1457 int mdd_finish_unlink(const struct lu_env *env,
1458                       struct mdd_object *obj, struct md_attr *ma,
1459                       const struct mdd_object *pobj,
1460                       const struct lu_name *lname,
1461                       struct thandle *th)
1462 {
1463         int rc = 0;
1464         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1465         ENTRY;
1466
1467         LASSERT(mdd_write_locked(env, obj) != 0);
1468
1469         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1470                 rc = mdd_mark_dead_object(env, obj, th, false);
1471                 if (rc != 0)
1472                         RETURN(rc);
1473
1474                 /* add new orphan and the object
1475                  * will be deleted during mdd_close() */
1476                 if (obj->mod_count) {
1477                         rc = __mdd_orphan_add(env, obj, th);
1478                         if (rc == 0)
1479                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1480                                         "orphan list, open count = %d\n",
1481                                         PFID(mdd_object_fid(obj)),
1482                                         obj->mod_count);
1483                         else
1484                                 CERROR("Object "DFID" fail to be an orphan, "
1485                                        "open count = %d, maybe cause failed "
1486                                        "open replay\n",
1487                                         PFID(mdd_object_fid(obj)),
1488                                         obj->mod_count);
1489                 } else {
1490                         rc = mdo_destroy(env, obj, th);
1491                 }
1492         } else if (!is_dir) {
1493                 /* old files may not have link ea; ignore errors */
1494                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1495         }
1496
1497         RETURN(rc);
1498 }
1499
1500 /*
1501  * pobj maybe NULL
1502  * has mdd_write_lock on cobj already, but not on pobj yet
1503  */
1504 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1505                             const struct lu_attr *pattr,
1506                             struct mdd_object *cobj,
1507                             const struct lu_attr *cattr)
1508 {
1509         int rc;
1510         ENTRY;
1511
1512         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1513
1514         RETURN(rc);
1515 }
1516
1517 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1518                               struct mdd_object *p, struct mdd_object *c,
1519                               const struct lu_name *name, struct md_attr *ma,
1520                               struct thandle *handle, int no_name, int is_dir)
1521 {
1522         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1523         int              rc;
1524
1525         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1526                 if (likely(no_name == 0)) {
1527                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1528                                                       handle);
1529                         if (rc != 0)
1530                                 return rc;
1531                 }
1532
1533                 if (is_dir != 0) {
1534                         rc = mdo_declare_ref_del(env, p, handle);
1535                         if (rc != 0)
1536                                 return rc;
1537                 }
1538         }
1539
1540         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1541         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1542         la->la_valid = LA_CTIME | LA_MTIME;
1543         rc = mdo_declare_attr_set(env, p, la, handle);
1544         if (rc)
1545                 return rc;
1546
1547         if (c != NULL) {
1548                 rc = mdo_declare_ref_del(env, c, handle);
1549                 if (rc)
1550                         return rc;
1551
1552                 rc = mdo_declare_ref_del(env, c, handle);
1553                 if (rc)
1554                         return rc;
1555
1556                 la->la_valid = LA_CTIME;
1557                 rc = mdo_declare_attr_set(env, c, la, handle);
1558                 if (rc)
1559                         return rc;
1560
1561                 rc = mdd_declare_finish_unlink(env, c, handle);
1562                 if (rc)
1563                         return rc;
1564
1565                 /* FIXME: need changelog for remove entry */
1566                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1567         }
1568
1569         return rc;
1570 }
1571
1572 /*
1573  * test if a file has an HSM archive
1574  * if HSM attributes are not found in ma update them from
1575  * HSM xattr
1576  */
1577 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1578                                    struct mdd_object *obj,
1579                                    struct md_attr *ma)
1580 {
1581         ENTRY;
1582
1583         if (!(ma->ma_valid & MA_HSM)) {
1584                 /* no HSM MD provided, read xattr */
1585                 struct lu_buf   *hsm_buf;
1586                 const size_t     buflen = sizeof(struct hsm_attrs);
1587                 int              rc;
1588
1589                 hsm_buf = mdd_buf_get(env, NULL, 0);
1590                 lu_buf_alloc(hsm_buf, buflen);
1591                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1592                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1593                 lu_buf_free(hsm_buf);
1594                 if (rc < 0)
1595                         RETURN(false);
1596
1597                 ma->ma_valid = MA_HSM;
1598         }
1599         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1600                 RETURN(true);
1601         RETURN(false);
1602 }
1603
1604 /**
1605  * Delete name entry and the object.
1606  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1607  * does not exist for this object, and it could only happen during resending
1608  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1609  * is also needed in this case(needed by changelog), so we have to add another
1610  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1611  * the ENOENT failure should be able to be fixed by redo mechanism.
1612  */
1613 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1614                       struct md_object *cobj, const struct lu_name *lname,
1615                       struct md_attr *ma, int no_name)
1616 {
1617         const char *name = lname->ln_name;
1618         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1619         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1620         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1621         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1622         struct mdd_object *mdd_cobj = NULL;
1623         struct mdd_device *mdd = mdo2mdd(pobj);
1624         struct thandle    *handle;
1625         int rc, is_dir = 0;
1626         ENTRY;
1627
1628         /* cobj == NULL means only delete name entry */
1629         if (likely(cobj != NULL)) {
1630                 mdd_cobj = md2mdd_obj(cobj);
1631                 if (mdd_object_exists(mdd_cobj) == 0)
1632                         RETURN(-ENOENT);
1633         }
1634
1635         rc = mdd_la_get(env, mdd_pobj, pattr);
1636         if (rc)
1637                 RETURN(rc);
1638
1639         if (likely(mdd_cobj != NULL)) {
1640                 /* fetch cattr */
1641                 rc = mdd_la_get(env, mdd_cobj, cattr);
1642                 if (rc)
1643                         RETURN(rc);
1644
1645                 is_dir = S_ISDIR(cattr->la_mode);
1646         }
1647
1648         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1649         if (rc)
1650                 RETURN(rc);
1651
1652         handle = mdd_trans_create(env, mdd);
1653         if (IS_ERR(handle))
1654                 RETURN(PTR_ERR(handle));
1655
1656         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1657                                 lname, ma, handle, no_name, is_dir);
1658         if (rc)
1659                 GOTO(stop, rc);
1660
1661         rc = mdd_trans_start(env, mdd, handle);
1662         if (rc)
1663                 GOTO(stop, rc);
1664
1665         if (likely(mdd_cobj != NULL))
1666                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1667
1668         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1669                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1670                 if (rc)
1671                         GOTO(cleanup, rc);
1672         }
1673
1674         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1675             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1676                 GOTO(cleanup, rc = 0);
1677
1678         if (likely(mdd_cobj != NULL)) {
1679                 rc = mdo_ref_del(env, mdd_cobj, handle);
1680                 if (rc != 0) {
1681                         __mdd_index_insert_only(env, mdd_pobj,
1682                                                 mdo2fid(mdd_cobj),
1683                                                 mdd_object_type(mdd_cobj),
1684                                                 name, handle);
1685                         GOTO(cleanup, rc);
1686                 }
1687
1688                 if (is_dir)
1689                         /* unlink dot */
1690                         mdo_ref_del(env, mdd_cobj, handle);
1691
1692                 /* fetch updated nlink */
1693                 rc = mdd_la_get(env, mdd_cobj, cattr);
1694                 if (rc)
1695                         GOTO(cleanup, rc);
1696         }
1697
1698         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1699         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1700
1701         la->la_valid = LA_CTIME | LA_MTIME;
1702         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1703         if (rc)
1704                 GOTO(cleanup, rc);
1705
1706         /* Enough for only unlink the entry */
1707         if (unlikely(mdd_cobj == NULL))
1708                 GOTO(stop, rc);
1709
1710         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1711                 /* update ctime of an unlinked file only if it is still
1712                  * opened or a link still exists */
1713                 la->la_valid = LA_CTIME;
1714                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1715                 if (rc)
1716                         GOTO(cleanup, rc);
1717         }
1718
1719         /* XXX: this transfer to ma will be removed with LOD/OSP */
1720         ma->ma_attr = *cattr;
1721         ma->ma_valid |= MA_INODE;
1722         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1723
1724         /* fetch updated nlink */
1725         if (rc == 0)
1726                 rc = mdd_la_get(env, mdd_cobj, cattr);
1727
1728         /* if object is removed then we can't get its attrs, use last get */
1729         if (cattr->la_nlink == 0) {
1730                 ma->ma_attr = *cattr;
1731                 ma->ma_valid |= MA_INODE;
1732         }
1733         EXIT;
1734 cleanup:
1735         if (likely(mdd_cobj != NULL))
1736                 mdd_write_unlock(env, mdd_cobj);
1737
1738         if (rc == 0) {
1739                 int cl_flags = 0;
1740
1741                 if (cattr->la_nlink == 0) {
1742                         cl_flags |= CLF_UNLINK_LAST;
1743                         /* search for an existing archive */
1744                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1745                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1746                 }
1747
1748                 rc = mdd_changelog_ns_store(env, mdd,
1749                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1750                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1751                         handle);
1752         }
1753
1754 stop:
1755         mdd_trans_stop(env, mdd, rc, handle);
1756
1757         return rc;
1758 }
1759
1760 /*
1761  * The permission has been checked when obj created, no need check again.
1762  */
1763 static int mdd_cd_sanity_check(const struct lu_env *env,
1764                                struct mdd_object *obj)
1765 {
1766         ENTRY;
1767
1768         /* EEXIST check */
1769         if (!obj || mdd_is_dead_obj(obj))
1770                 RETURN(-ENOENT);
1771
1772         RETURN(0);
1773 }
1774
1775 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1776                            struct md_object *cobj, const struct md_op_spec *spec,
1777                            struct md_attr *ma)
1778 {
1779         struct mdd_device *mdd = mdo2mdd(cobj);
1780         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1781         struct mdd_object *son = md2mdd_obj(cobj);
1782         struct thandle    *handle;
1783         const struct lu_buf *buf;
1784         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1785         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1786         int                rc;
1787         ENTRY;
1788
1789         rc = mdd_cd_sanity_check(env, son);
1790         if (rc)
1791                 RETURN(rc);
1792
1793         if (!md_should_create(spec->sp_cr_flags))
1794                 RETURN(0);
1795
1796         /*
1797          * there are following use cases for this function:
1798          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1799          *    striping can be specified or not
1800          * 2) CMD?
1801          */
1802         rc = mdd_la_get(env, son, attr);
1803         if (rc)
1804                 RETURN(rc);
1805
1806         /* calling ->ah_make_hint() is used to transfer information from parent */
1807         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1808
1809         handle = mdd_trans_create(env, mdd);
1810         if (IS_ERR(handle))
1811                 GOTO(out_free, rc = PTR_ERR(handle));
1812
1813         /*
1814          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1815          * Should this be fixed?
1816          */
1817         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1818                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1819                spec->sp_cr_flags, spec->no_create);
1820
1821         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1822                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1823                                         spec->u.sp_ea.eadatalen);
1824         } else {
1825                 buf = &LU_BUF_NULL;
1826         }
1827
1828         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1829                                   XATTR_NAME_LOV, 0, handle);
1830         if (rc)
1831                 GOTO(stop, rc);
1832
1833         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1834         if (rc)
1835                 GOTO(stop, rc);
1836
1837         rc = mdd_trans_start(env, mdd, handle);
1838         if (rc)
1839                 GOTO(stop, rc);
1840
1841         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1842                           0, handle);
1843
1844         if (rc)
1845                 GOTO(stop, rc);
1846
1847         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1848
1849 stop:
1850         mdd_trans_stop(env, mdd, rc, handle);
1851 out_free:
1852         RETURN(rc);
1853 }
1854
1855 static int mdd_declare_object_initialize(const struct lu_env *env,
1856                                          struct mdd_object *parent,
1857                                          struct mdd_object *child,
1858                                          struct lu_attr *attr,
1859                                          struct thandle *handle)
1860 {
1861         int rc;
1862         ENTRY;
1863
1864         /*
1865          * inode mode has been set in creation time, and it's based on umask,
1866          * la_mode and acl, don't set here again! (which will go wrong
1867          * because below function doesn't consider umask).
1868          * I'd suggest set all object attributes in creation time, see above.
1869          */
1870         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1871         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1872         rc = mdo_declare_attr_set(env, child, attr, handle);
1873         attr->la_valid |= LA_MODE | LA_TYPE;
1874         if (rc != 0 || !S_ISDIR(attr->la_mode))
1875                 RETURN(rc);
1876
1877         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1878                                       dot, handle);
1879         if (rc != 0)
1880                 RETURN(rc);
1881
1882         rc = mdo_declare_ref_add(env, child, handle);
1883         if (rc != 0)
1884                 RETURN(rc);
1885
1886         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1887                                       dotdot, handle);
1888
1889         RETURN(rc);
1890 }
1891
1892 static int mdd_object_initialize(const struct lu_env *env,
1893                                  const struct lu_fid *pfid,
1894                                  struct mdd_object *child,
1895                                  struct lu_attr *attr, struct thandle *handle,
1896                                  const struct md_op_spec *spec)
1897 {
1898         int rc = 0;
1899         ENTRY;
1900
1901         if (S_ISDIR(attr->la_mode)) {
1902                 /* Add "." and ".." for newly created dir */
1903                 mdo_ref_add(env, child, handle);
1904                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1905                                              S_IFDIR, dot, handle);
1906                 if (rc == 0)
1907                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1908                                                      dotdot, handle);
1909                 if (rc != 0)
1910                         mdo_ref_del(env, child, handle);
1911         }
1912
1913         RETURN(rc);
1914 }
1915
1916 /**
1917  * This function checks whether it can create a file/dir under the
1918  * directory(@pobj). The directory(@pobj) is not being locked by
1919  * mdd lock.
1920  *
1921  * \param[in] env       execution environment
1922  * \param[in] pobj      the directory to create files
1923  * \param[in] pattr     the attributes of the directory
1924  * \param[in] lname     the name of the created file/dir
1925  * \param[in] cattr     the attributes of the file/dir
1926  * \param[in] spec      create specification
1927  *
1928  * \retval              = 0 it is allowed to create file/dir under
1929  *                      the directory
1930  * \retval              negative error not allowed to create file/dir
1931  *                      under the directory
1932  */
1933 static int mdd_create_sanity_check(const struct lu_env *env,
1934                                    struct md_object *pobj,
1935                                    const struct lu_attr *pattr,
1936                                    const struct lu_name *lname,
1937                                    struct lu_attr *cattr,
1938                                    struct md_op_spec *spec)
1939 {
1940         struct mdd_thread_info *info = mdd_env_info(env);
1941         struct lu_fid     *fid       = &info->mti_fid;
1942         struct mdd_object *obj       = md2mdd_obj(pobj);
1943         struct mdd_device *m         = mdo2mdd(pobj);
1944         bool            check_perm = true;
1945         int rc;
1946         ENTRY;
1947
1948         /* EEXIST check */
1949         if (mdd_is_dead_obj(obj))
1950                 RETURN(-ENOENT);
1951
1952         /*
1953          * In some cases this lookup is not needed - we know before if name
1954          * exists or not because MDT performs lookup for it.
1955          * name length check is done in lookup.
1956          */
1957         if (spec->sp_cr_lookup) {
1958                 /*
1959                  * Check if the name already exist, though it will be checked in
1960                  * _index_insert also, for avoiding rolling back if exists
1961                  * _index_insert.
1962                  */
1963                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1964                                   MAY_WRITE | MAY_EXEC);
1965                 if (rc != -ENOENT)
1966                         RETURN(rc ? : -EEXIST);
1967
1968                 /* Permission is already being checked in mdd_lookup */
1969                 check_perm = false;
1970         }
1971
1972         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1973         if (rc != 0)
1974                 RETURN(rc);
1975
1976         /* sgid check */
1977         if (pattr->la_mode & S_ISGID) {
1978                 cattr->la_gid = pattr->la_gid;
1979                 if (S_ISDIR(cattr->la_mode)) {
1980                         cattr->la_mode |= S_ISGID;
1981                         cattr->la_valid |= LA_MODE;
1982                 }
1983         }
1984
1985         rc = mdd_name_check(m, lname);
1986         if (rc < 0)
1987                 RETURN(rc);
1988
1989         switch (cattr->la_mode & S_IFMT) {
1990         case S_IFLNK: {
1991                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1992
1993                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1994                         RETURN(-ENAMETOOLONG);
1995                 else
1996                         RETURN(0);
1997         }
1998         case S_IFDIR:
1999         case S_IFREG:
2000         case S_IFCHR:
2001         case S_IFBLK:
2002         case S_IFIFO:
2003         case S_IFSOCK:
2004                 rc = 0;
2005                 break;
2006         default:
2007                 rc = -EINVAL;
2008                 break;
2009         }
2010         RETURN(rc);
2011 }
2012
2013 static int mdd_declare_object_create(const struct lu_env *env,
2014                                      struct mdd_device *mdd,
2015                                      struct mdd_object *p, struct mdd_object *c,
2016                                      struct lu_attr *attr,
2017                                      struct thandle *handle,
2018                                      const struct md_op_spec *spec,
2019                                      struct lu_buf *def_acl_buf,
2020                                      struct lu_buf *acl_buf,
2021                                      struct dt_allocation_hint *hint)
2022 {
2023         int rc;
2024
2025         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
2026                                                 hint);
2027         if (rc)
2028                 GOTO(out, rc);
2029
2030 #ifdef CONFIG_FS_POSIX_ACL
2031         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2032                 /* if dir, then can inherit default ACl */
2033                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2034                                            XATTR_NAME_ACL_DEFAULT,
2035                                            0, handle);
2036                 if (rc)
2037                         GOTO(out, rc);
2038         }
2039
2040         if (acl_buf->lb_len > 0) {
2041                 rc = mdo_declare_attr_set(env, c, attr, handle);
2042                 if (rc)
2043                         GOTO(out, rc);
2044
2045                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2046                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2047                 if (rc)
2048                         GOTO(out, rc);
2049         }
2050 #endif
2051         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2052         if (rc)
2053                 GOTO(out, rc);
2054
2055         /* replay case, create LOV EA from client data */
2056         if (spec->no_create ||
2057             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2058                 const struct lu_buf *buf;
2059
2060                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2061                                         spec->u.sp_ea.eadatalen);
2062                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2063                                            handle);
2064                 if (rc)
2065                         GOTO(out, rc);
2066         }
2067
2068         if (S_ISLNK(attr->la_mode)) {
2069                 const char *target_name = spec->u.sp_symname;
2070                 int sym_len = strlen(target_name);
2071                 const struct lu_buf *buf;
2072
2073                 buf = mdd_buf_get_const(env, target_name, sym_len);
2074                 rc = dt_declare_record_write(env, mdd_object_child(c),
2075                                              buf, 0, handle);
2076                 if (rc)
2077                         GOTO(out, rc);
2078         }
2079 out:
2080         return rc;
2081 }
2082
2083 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2084                               struct mdd_object *p, struct mdd_object *c,
2085                               const struct lu_name *name,
2086                               struct lu_attr *attr,
2087                               struct thandle *handle,
2088                               const struct md_op_spec *spec,
2089                               struct linkea_data *ldata,
2090                               struct lu_buf *def_acl_buf,
2091                               struct lu_buf *acl_buf,
2092                               struct dt_allocation_hint *hint)
2093 {
2094         int rc;
2095
2096         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2097                                        def_acl_buf, acl_buf, hint);
2098         if (rc)
2099                 GOTO(out, rc);
2100
2101         if (S_ISDIR(attr->la_mode)) {
2102                 rc = mdo_declare_ref_add(env, p, handle);
2103                 if (rc)
2104                         GOTO(out, rc);
2105         }
2106
2107         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2108                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2109                 if (rc)
2110                         GOTO(out, rc);
2111         } else {
2112                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2113
2114                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2115                                               name->ln_name, handle);
2116                 if (rc != 0)
2117                         return rc;
2118
2119                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2120                 if (rc)
2121                         return rc;
2122
2123                 *la = *attr;
2124                 la->la_valid = LA_CTIME | LA_MTIME;
2125                 rc = mdo_declare_attr_set(env, p, la, handle);
2126                 if (rc)
2127                         return rc;
2128
2129                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2130                 if (rc)
2131                         return rc;
2132         }
2133 out:
2134         return rc;
2135 }
2136
2137 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2138                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2139                         struct lu_buf *acl_buf)
2140 {
2141         int     rc;
2142         ENTRY;
2143
2144         if (S_ISLNK(la->la_mode)) {
2145                 acl_buf->lb_len = 0;
2146                 def_acl_buf->lb_len = 0;
2147                 RETURN(0);
2148         }
2149
2150         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2151         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2152                            XATTR_NAME_ACL_DEFAULT);
2153         mdd_read_unlock(env, pobj);
2154         if (rc > 0) {
2155                 /* If there are default ACL, fix mode/ACL by default ACL */
2156                 def_acl_buf->lb_len = rc;
2157                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2158                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2159                 acl_buf->lb_len = rc;
2160                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2161                 if (rc < 0)
2162                         RETURN(rc);
2163         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2164                 /* If there are no default ACL, fix mode by mask */
2165                 struct lu_ucred *uc = lu_ucred(env);
2166
2167                 /* The create triggered by MDT internal events, such as
2168                  * LFSCK reset, will not contain valid "uc". */
2169                 if (unlikely(uc != NULL))
2170                         la->la_mode &= ~uc->uc_umask;
2171                 rc = 0;
2172                 acl_buf->lb_len = 0;
2173                 def_acl_buf->lb_len = 0;
2174         }
2175
2176         RETURN(rc);
2177 }
2178
2179 /**
2180  * Create a metadata object and initialize it, set acl, xattr.
2181  **/
2182 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2183                              struct mdd_object *son, struct lu_attr *attr,
2184                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2185                              struct lu_buf *def_acl_buf,
2186                              struct dt_allocation_hint *hint,
2187                              struct thandle *handle)
2188 {
2189         int                     rc;
2190
2191         mdd_write_lock(env, son, MOR_TGT_CHILD);
2192         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2193                                         hint);
2194         if (rc)
2195                 GOTO(unlock, rc);
2196
2197         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2198          * created in declare phase, they also needs to be added to master
2199          * object as sub-directory entry. So it has to initialize the master
2200          * object, then set dir striped EA.(in mdo_xattr_set) */
2201         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2202                                    spec);
2203         if (rc != 0)
2204                 GOTO(err_destroy, rc);
2205
2206         /*
2207          * in case of replay we just set LOVEA provided by the client
2208          * XXX: I think it would be interesting to try "old" way where
2209          *      MDT calls this xattr_set(LOV) in a different transaction.
2210          *      probably this way we code can be made better.
2211          */
2212
2213         /* During creation, there are only a few cases we need do xattr_set to
2214          * create stripes.
2215          * 1. regular file: see comments above.
2216          * 2. dir: inherit default striping or pool settings from parent.
2217          * 3. create striped directory with provided stripeEA.
2218          * 4. create striped directory because inherit default layout from the
2219          * parent.
2220          */
2221         if (spec->no_create ||
2222             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2223             S_ISDIR(attr->la_mode)) {
2224                 const struct lu_buf *buf;
2225
2226                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2227                                         spec->u.sp_ea.eadatalen);
2228                 rc = mdo_xattr_set(env, son, buf,
2229                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2230                                                             XATTR_NAME_LOV, 0,
2231                                    handle);
2232                 if (rc != 0)
2233                         GOTO(err_destroy, rc);
2234         }
2235
2236 #ifdef CONFIG_FS_POSIX_ACL
2237         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2238             S_ISDIR(attr->la_mode)) {
2239                 /* set default acl */
2240                 rc = mdo_xattr_set(env, son, def_acl_buf,
2241                                    XATTR_NAME_ACL_DEFAULT, 0,
2242                                    handle);
2243                 if (rc)
2244                         GOTO(err_destroy, rc);
2245         }
2246         /* set its own acl */
2247         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2248                 rc = mdo_xattr_set(env, son, acl_buf,
2249                                    XATTR_NAME_ACL_ACCESS,
2250                                    0, handle);
2251                 if (rc)
2252                         GOTO(err_destroy, rc);
2253         }
2254 #endif
2255
2256         if (S_ISLNK(attr->la_mode)) {
2257                 struct lu_ucred  *uc = lu_ucred_assert(env);
2258                 struct dt_object *dt = mdd_object_child(son);
2259                 const char *target_name = spec->u.sp_symname;
2260                 int sym_len = strlen(target_name);
2261                 const struct lu_buf *buf;
2262                 loff_t pos = 0;
2263
2264                 buf = mdd_buf_get_const(env, target_name, sym_len);
2265                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2266                                                 uc->uc_cap &
2267                                                 CFS_CAP_SYS_RESOURCE_MASK);
2268
2269                 if (rc == sym_len)
2270                         rc = 0;
2271                 else
2272                         GOTO(err_initlized, rc = -EFAULT);
2273         }
2274
2275 err_initlized:
2276         if (unlikely(rc != 0)) {
2277                 int rc2;
2278                 if (S_ISDIR(attr->la_mode)) {
2279                         /* Drop the reference, no need to delete "."/"..",
2280                          * because the object to be destroied directly. */
2281                         rc2 = mdo_ref_del(env, son, handle);
2282                         if (rc2 != 0)
2283                                 GOTO(unlock, rc);
2284                 }
2285                 rc2 = mdo_ref_del(env, son, handle);
2286                 if (rc2 != 0)
2287                         GOTO(unlock, rc);
2288 err_destroy:
2289                 mdo_destroy(env, son, handle);
2290         }
2291 unlock:
2292         mdd_write_unlock(env, son);
2293         RETURN(rc);
2294 }
2295
2296 static int mdd_index_delete(const struct lu_env *env,
2297                             struct mdd_object *mdd_pobj,
2298                             struct lu_attr *cattr,
2299                             const struct lu_name *lname)
2300 {
2301         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2302         struct thandle *handle;
2303         int rc;
2304         ENTRY;
2305
2306         handle = mdd_trans_create(env, mdd);
2307         if (IS_ERR(handle))
2308                 RETURN(PTR_ERR(handle));
2309
2310         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2311                                       handle);
2312         if (rc != 0)
2313                 GOTO(stop, rc);
2314
2315         if (S_ISDIR(cattr->la_mode)) {
2316                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2317                 if (rc != 0)
2318                         GOTO(stop, rc);
2319         }
2320
2321         /* Since this will only be used in the error handler path,
2322          * Let's set the thandle to be local and not mess the transno */
2323         handle->th_local = 1;
2324         rc = mdd_trans_start(env, mdd, handle);
2325         if (rc)
2326                 GOTO(stop, rc);
2327
2328         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2329                                 S_ISDIR(cattr->la_mode), handle);
2330         if (rc)
2331                 GOTO(stop, rc);
2332 stop:
2333         mdd_trans_stop(env, mdd, rc, handle);
2334         RETURN(rc);
2335 }
2336
2337 /*
2338  * Create object and insert it into namespace.
2339  */
2340 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2341                       const struct lu_name *lname, struct md_object *child,
2342                       struct md_op_spec *spec, struct md_attr* ma)
2343 {
2344         struct mdd_thread_info  *info = mdd_env_info(env);
2345         struct lu_attr          *la = &info->mti_la_for_fix;
2346         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2347         struct mdd_object       *son = md2mdd_obj(child);
2348         struct mdd_device       *mdd = mdo2mdd(pobj);
2349         struct lu_attr          *attr = &ma->ma_attr;
2350         struct thandle          *handle;
2351         struct lu_attr          *pattr = &info->mti_pattr;
2352         struct lu_buf           acl_buf;
2353         struct lu_buf           def_acl_buf;
2354         struct linkea_data      *ldata = &info->mti_link_data;
2355         const char              *name = lname->ln_name;
2356         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2357         int                      rc;
2358         int                      rc2;
2359         ENTRY;
2360
2361         /*
2362          * Two operations have to be performed:
2363          *
2364          *  - an allocation of a new object (->do_create()), and
2365          *
2366          *  - an insertion into a parent index (->dio_insert()).
2367          *
2368          * Due to locking, operation order is not important, when both are
2369          * successful, *but* error handling cases are quite different:
2370          *
2371          *  - if insertion is done first, and following object creation fails,
2372          *  insertion has to be rolled back, but this operation might fail
2373          *  also leaving us with dangling index entry.
2374          *
2375          *  - if creation is done first, is has to be undone if insertion
2376          *  fails, leaving us with leaked space, which is neither good, nor
2377          *  fatal.
2378          *
2379          * It seems that creation-first is simplest solution, but it is
2380          * sub-optimal in the frequent
2381          *
2382          *         $ mkdir foo
2383          *         $ mkdir foo
2384          *
2385          * case, because second mkdir is bound to create object, only to
2386          * destroy it immediately.
2387          *
2388          * To avoid this follow local file systems that do double lookup:
2389          *
2390          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2391          *
2392          *     1. create            (mdd_object_create_internal())
2393          *
2394          *     2. insert            (__mdd_index_insert(), lookup again)
2395          */
2396
2397         rc = mdd_la_get(env, mdd_pobj, pattr);
2398         if (rc != 0)
2399                 RETURN(rc);
2400
2401         /* Sanity checks before big job. */
2402         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2403         if (rc)
2404                 RETURN(rc);
2405
2406         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2407                 GOTO(out_free, rc = -EINPROGRESS);
2408
2409         handle = mdd_trans_create(env, mdd);
2410         if (IS_ERR(handle))
2411                 GOTO(out_free, rc = PTR_ERR(handle));
2412
2413         acl_buf.lb_buf = info->mti_xattr_buf;
2414         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2415         def_acl_buf.lb_buf = info->mti_key;
2416         def_acl_buf.lb_len = sizeof(info->mti_key);
2417         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2418         if (rc < 0)
2419                 GOTO(out_stop, rc);
2420
2421         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2422
2423         memset(ldata, 0, sizeof(*ldata));
2424         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2425                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2426
2427                 tfid.f_oid--;
2428                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2429                                         &tfid, lname, 1, 0, ldata);
2430         } else {
2431                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2432                                         mdd_object_fid(mdd_pobj),
2433                                         lname, 1, 0, ldata);
2434         }
2435
2436         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2437                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2438                                 hint);
2439         if (rc)
2440                 GOTO(out_stop, rc);
2441
2442         rc = mdd_trans_start(env, mdd, handle);
2443         if (rc)
2444                 GOTO(out_stop, rc);
2445
2446         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2447                                &def_acl_buf, hint, handle);
2448         if (rc != 0)
2449                 GOTO(out_stop, rc);
2450
2451         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2452                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2453                 rc = __mdd_orphan_add(env, son, handle);
2454                 GOTO(out_volatile, rc);
2455         } else {
2456                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2457                                         attr->la_mode, name, handle);
2458                 if (rc != 0)
2459                         GOTO(err_created, rc);
2460
2461                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2462                               ldata, 1);
2463
2464                 /* update parent directory mtime/ctime */
2465                 *la = *attr;
2466                 la->la_valid = LA_CTIME | LA_MTIME;
2467                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2468                 if (rc)
2469                         GOTO(err_insert, rc);
2470         }
2471
2472         EXIT;
2473 err_insert:
2474         if (rc != 0) {
2475                 int rc2;
2476
2477                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2478                         rc2 = __mdd_orphan_del(env, son, handle);
2479                 else
2480                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2481                                                  S_ISDIR(attr->la_mode),
2482                                                  handle);
2483                 if (rc2 != 0)
2484                         goto out_stop;
2485
2486 err_created:
2487                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2488                 if (S_ISDIR(attr->la_mode)) {
2489                         /* Drop the reference, no need to delete "."/"..",
2490                          * because the object is to be destroyed directly. */
2491                         rc2 = mdo_ref_del(env, son, handle);
2492                         if (rc2 != 0) {
2493                                 mdd_write_unlock(env, son);
2494                                 goto out_stop;
2495                         }
2496                 }
2497 out_volatile:
2498                 /* For volatile files drop one link immediately, since there is
2499                  * no filename in the namespace, and save any error returned. */
2500                 rc2 = mdo_ref_del(env, son, handle);
2501                 if (rc2 != 0) {
2502                         mdd_write_unlock(env, son);
2503                         if (unlikely(rc == 0))
2504                                 rc = rc2;
2505                         goto out_stop;
2506                 }
2507
2508                 /* Don't destroy the volatile object on success */
2509                 if (likely(rc != 0))
2510                         mdo_destroy(env, son, handle);
2511                 mdd_write_unlock(env, son);
2512         }
2513
2514         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2515             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2516                 rc = mdd_changelog_ns_store(env, mdd,
2517                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2518                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2519                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2520                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2521                                 NULL, handle);
2522 out_stop:
2523         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2524         if (rc == 0) {
2525                 /* If creation fails, it is most likely due to the remote update
2526                  * failure, because local transaction will mostly succeed at
2527                  * this stage. There is no easy way to rollback all of previous
2528                  * updates, so let's remove the object from namespace, and
2529                  * LFSCK should handle the orphan object. */
2530                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2531                         mdd_index_delete(env, mdd_pobj, attr, lname);
2532                 rc = rc2;
2533         }
2534 out_free:
2535         if (is_vmalloc_addr(ldata->ld_buf))
2536                 /* if we vmalloced a large buffer drop it */
2537                 lu_buf_free(ldata->ld_buf);
2538
2539         /* The child object shouldn't be cached anymore */
2540         if (rc)
2541                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2542                         &child->mo_lu.lo_header->loh_flags);
2543         return rc;
2544 }
2545
2546 /*
2547  * Get locks on parents in proper order
2548  * RETURN: < 0 - error, rename_order if successful
2549  */
2550 enum rename_order {
2551         MDD_RN_SAME,
2552         MDD_RN_SRCTGT,
2553         MDD_RN_TGTSRC
2554 };
2555
2556 static int mdd_rename_order(const struct lu_env *env,
2557                             struct mdd_device *mdd,
2558                             struct mdd_object *src_pobj,
2559                             const struct lu_attr *pattr,
2560                             struct mdd_object *tgt_pobj)
2561 {
2562         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2563         int rc;
2564         ENTRY;
2565
2566         if (src_pobj == tgt_pobj)
2567                 RETURN(MDD_RN_SAME);
2568
2569         /* compared the parent child relationship of src_p&tgt_p */
2570         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2571                 rc = MDD_RN_SRCTGT;
2572         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2573                 rc = MDD_RN_TGTSRC;
2574         } else {
2575                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2576                                    NULL);
2577                 if (rc == -EREMOTE)
2578                         rc = 0;
2579
2580                 if (rc == 1)
2581                         rc = MDD_RN_TGTSRC;
2582                 else if (rc == 0)
2583                         rc = MDD_RN_SRCTGT;
2584         }
2585
2586         RETURN(rc);
2587 }
2588
2589 /* has not mdd_write{read}_lock on any obj yet. */
2590 static int mdd_rename_sanity_check(const struct lu_env *env,
2591                                    struct mdd_object *src_pobj,
2592                                    const struct lu_attr *pattr,
2593                                    struct mdd_object *tgt_pobj,
2594                                    const struct lu_attr *tpattr,
2595                                    struct mdd_object *sobj,
2596                                    const struct lu_attr *cattr,
2597                                    struct mdd_object *tobj,
2598                                    const struct lu_attr *tattr)
2599 {
2600         int rc = 0;
2601         ENTRY;
2602
2603         /* XXX: when get here, sobj must NOT be NULL,
2604          * the other case has been processed in cld_rename
2605          * before mdd_rename and enable MDS_PERM_BYPASS. */
2606         LASSERT(sobj);
2607
2608         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2609         if (rc)
2610                 RETURN(rc);
2611
2612         /* XXX: when get here, "tobj == NULL" means tobj must
2613          * NOT exist (neither on remote MDS, such case has been
2614          * processed in cld_rename before mdd_rename and enable
2615          * MDS_PERM_BYPASS).
2616          * So check may_create, but not check may_unlink. */
2617         if (tobj == NULL)
2618                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2619                                     (src_pobj != tgt_pobj));
2620         else
2621                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2622                                     (src_pobj != tgt_pobj), 1);
2623
2624         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2625                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2626
2627         RETURN(rc);
2628 }
2629
2630 static int mdd_declare_rename(const struct lu_env *env,
2631                               struct mdd_device *mdd,
2632                               struct mdd_object *mdd_spobj,
2633                               struct mdd_object *mdd_tpobj,
2634                               struct mdd_object *mdd_sobj,
2635                               struct mdd_object *mdd_tobj,
2636                               const struct lu_name *tname,
2637                               const struct lu_name *sname,
2638                               struct md_attr *ma,
2639                               struct linkea_data *ldata,
2640                               struct thandle *handle)
2641 {
2642         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2643         int rc;
2644
2645         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2646         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2647
2648         LASSERT(mdd_spobj);
2649         LASSERT(mdd_tpobj);
2650         LASSERT(mdd_sobj);
2651
2652         /* name from source dir */
2653         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2654         if (rc)
2655                 return rc;
2656
2657         /* .. from source child */
2658         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2659                 /* source child can be directory,
2660                  * counted by source dir's nlink */
2661                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2662                 if (rc)
2663                         return rc;
2664                 if (mdd_spobj != mdd_tpobj) {
2665                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2666                                                       handle);
2667                         if (rc != 0)
2668                                 return rc;
2669
2670                         rc = mdo_declare_index_insert(env, mdd_sobj,
2671                                                       mdo2fid(mdd_tpobj),
2672                                                       S_IFDIR, dotdot, handle);
2673                         if (rc != 0)
2674                                 return rc;
2675                 }
2676
2677                 /* new target child can be directory,
2678                  * counted by target dir's nlink */
2679                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2680                 if (rc != 0)
2681                         return rc;
2682         }
2683
2684         la->la_valid = LA_CTIME | LA_MTIME;
2685         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2686         if (rc != 0)
2687                 return rc;
2688
2689         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2690         if (rc != 0)
2691                 return rc;
2692
2693         la->la_valid = LA_CTIME;
2694         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2695         if (rc)
2696                 return rc;
2697
2698         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2699                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2700         if (rc)
2701                 return rc;
2702
2703         /* new name */
2704         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2705                                       mdd_object_type(mdd_sobj),
2706                                       tname->ln_name, handle);
2707         if (rc != 0)
2708                 return rc;
2709
2710         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2711                 /* delete target child in target parent directory */
2712                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2713                                               handle);
2714                 if (rc)
2715                         return rc;
2716
2717                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2718                 if (rc)
2719                         return rc;
2720
2721                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2722                         /* target child can be directory,
2723                          * delete "." reference in target child directory */
2724                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2725                         if (rc)
2726                                 return rc;
2727
2728                         /* delete ".." reference in target parent directory */
2729                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2730                         if (rc)
2731                                 return rc;
2732                 }
2733
2734                 la->la_valid = LA_CTIME;
2735                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2736                 if (rc)
2737                         return rc;
2738
2739                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2740                 if (rc)
2741                         return rc;
2742         }
2743
2744         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2745         if (rc)
2746                 return rc;
2747
2748         return rc;
2749 }
2750
2751 /* src object can be remote that is why we use only fid and type of object */
2752 static int mdd_rename(const struct lu_env *env,
2753                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2754                       const struct lu_fid *lf, const struct lu_name *lsname,
2755                       struct md_object *tobj, const struct lu_name *ltname,
2756                       struct md_attr *ma)
2757 {
2758         const char *sname = lsname->ln_name;
2759         const char *tname = ltname->ln_name;
2760         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2761         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2762         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2763         struct mdd_device *mdd = mdo2mdd(src_pobj);
2764         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2765         struct mdd_object *mdd_tobj = NULL;
2766         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2767         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2768         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2769         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2770         struct thandle *handle;
2771         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2772         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2773         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2774         bool is_dir;
2775         bool tobj_ref = 0;
2776         bool tobj_locked = 0;
2777         unsigned cl_flags = 0;
2778         int rc, rc2;
2779         ENTRY;
2780
2781         if (tobj)
2782                 mdd_tobj = md2mdd_obj(tobj);
2783
2784         mdd_sobj = mdd_object_find(env, mdd, lf);
2785         if (IS_ERR(mdd_sobj))
2786                 RETURN(PTR_ERR(mdd_sobj));
2787
2788         rc = mdd_la_get(env, mdd_sobj, cattr);
2789         if (rc)
2790                 GOTO(out_pending, rc);
2791
2792         rc = mdd_la_get(env, mdd_spobj, pattr);
2793         if (rc)
2794                 GOTO(out_pending, rc);
2795
2796         if (mdd_tobj) {
2797                 rc = mdd_la_get(env, mdd_tobj, tattr);
2798                 if (rc)
2799                         GOTO(out_pending, rc);
2800         }
2801
2802         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2803         if (rc)
2804                 GOTO(out_pending, rc);
2805
2806         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2807                                      mdd_sobj, cattr, mdd_tobj, tattr);
2808         if (rc)
2809                 GOTO(out_pending, rc);
2810
2811         rc = mdd_name_check(mdd, ltname);
2812         if (rc < 0)
2813                 GOTO(out_pending, rc);
2814
2815         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2816         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2817         if (rc < 0)
2818                 GOTO(out_pending, rc);
2819
2820         handle = mdd_trans_create(env, mdd);
2821         if (IS_ERR(handle))
2822                 GOTO(out_pending, rc = PTR_ERR(handle));
2823
2824         memset(ldata, 0, sizeof(*ldata));
2825         mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
2826                            mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
2827         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2828                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2829         if (rc)
2830                 GOTO(stop, rc);
2831
2832         rc = mdd_trans_start(env, mdd, handle);
2833         if (rc)
2834                 GOTO(stop, rc);
2835
2836         is_dir = S_ISDIR(cattr->la_mode);
2837
2838         /* Remove source name from source directory */
2839         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2840         if (rc != 0)
2841                 GOTO(stop, rc);
2842
2843         /* "mv dir1 dir2" needs "dir1/.." link update */
2844         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2845                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2846                 if (rc != 0)
2847                         GOTO(fixup_spobj2, rc);
2848
2849                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2850                                              dotdot, handle);
2851                 if (rc != 0)
2852                         GOTO(fixup_spobj, rc);
2853         }
2854
2855         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2856                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2857                 if (rc != 0)
2858                         /* tname might been renamed to something else */
2859                         GOTO(fixup_spobj, rc);
2860         }
2861
2862         /* Insert new fid with target name into target dir */
2863         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2864                                 tname, handle);
2865         if (rc != 0)
2866                 GOTO(fixup_tpobj, rc);
2867
2868         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2869         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2870
2871         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2872         la->la_valid = LA_CTIME;
2873         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2874         if (rc)
2875                 GOTO(fixup_tpobj, rc);
2876
2877         /* Update the linkEA for the source object */
2878         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2879         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2880                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
2881                               0, 0);
2882         if (rc == -ENOENT)
2883                 /* Old files might not have EA entry */
2884                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2885                               lsname, handle, NULL, 0);
2886         mdd_write_unlock(env, mdd_sobj);
2887         /* We don't fail the transaction if the link ea can't be
2888            updated -- fid2path will use alternate lookup method. */
2889         rc = 0;
2890
2891         /* Remove old target object
2892          * For tobj is remote case cmm layer has processed
2893          * and set tobj to NULL then. So when tobj is NOT NULL,
2894          * it must be local one.
2895          */
2896         if (tobj && mdd_object_exists(mdd_tobj)) {
2897                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2898                 tobj_locked = 1;
2899                 if (mdd_is_dead_obj(mdd_tobj)) {
2900                         /* shld not be dead, something is wrong */
2901                         CERROR("tobj is dead, something is wrong\n");
2902                         rc = -EINVAL;
2903                         goto cleanup;
2904                 }
2905                 mdo_ref_del(env, mdd_tobj, handle);
2906
2907                 /* Remove dot reference. */
2908                 if (S_ISDIR(tattr->la_mode))
2909                         mdo_ref_del(env, mdd_tobj, handle);
2910                 tobj_ref = 1;
2911
2912                 /* fetch updated nlink */
2913                 rc = mdd_la_get(env, mdd_tobj, tattr);
2914                 if (rc != 0) {
2915                         CERROR("%s: Failed to get nlink for tobj "
2916                                 DFID": rc = %d\n",
2917                                 mdd2obd_dev(mdd)->obd_name,
2918                                 PFID(tpobj_fid), rc);
2919                         GOTO(fixup_tpobj, rc);
2920                 }
2921
2922                 la->la_valid = LA_CTIME;
2923                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2924                 if (rc != 0) {
2925                         CERROR("%s: Failed to set ctime for tobj "
2926                                 DFID": rc = %d\n",
2927                                 mdd2obd_dev(mdd)->obd_name,
2928                                 PFID(tpobj_fid), rc);
2929                         GOTO(fixup_tpobj, rc);
2930                 }
2931
2932                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2933                 ma->ma_attr = *tattr;
2934                 ma->ma_valid |= MA_INODE;
2935                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2936                                        handle);
2937                 if (rc != 0) {
2938                         CERROR("%s: Failed to unlink tobj "
2939                                 DFID": rc = %d\n",
2940                                 mdd2obd_dev(mdd)->obd_name,
2941                                 PFID(tpobj_fid), rc);
2942                         GOTO(fixup_tpobj, rc);
2943                 }
2944
2945                 /* fetch updated nlink */
2946                 rc = mdd_la_get(env, mdd_tobj, tattr);
2947                 if (rc != 0) {
2948                         CERROR("%s: Failed to get nlink for tobj "
2949                                 DFID": rc = %d\n",
2950                                 mdd2obd_dev(mdd)->obd_name,
2951                                 PFID(tpobj_fid), rc);
2952                         GOTO(fixup_tpobj, rc);
2953                 }
2954                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2955                 ma->ma_attr = *tattr;
2956                 ma->ma_valid |= MA_INODE;
2957
2958                 if (tattr->la_nlink == 0) {
2959                         cl_flags |= CLF_RENAME_LAST;
2960                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2961                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2962                 }
2963         }
2964
2965         la->la_valid = LA_CTIME | LA_MTIME;
2966         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2967         if (rc)
2968                 GOTO(fixup_tpobj, rc);
2969
2970         if (mdd_spobj != mdd_tpobj) {
2971                 la->la_valid = LA_CTIME | LA_MTIME;
2972                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2973                 if (rc != 0)
2974                         GOTO(fixup_tpobj, rc);
2975         }
2976
2977         EXIT;
2978
2979 fixup_tpobj:
2980         if (rc) {
2981                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2982                 if (rc2)
2983                         CWARN("tp obj fix error %d\n",rc2);
2984
2985                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2986                     !mdd_is_dead_obj(mdd_tobj)) {
2987                         if (tobj_ref) {
2988                                 mdo_ref_add(env, mdd_tobj, handle);
2989                                 if (is_dir)
2990                                         mdo_ref_add(env, mdd_tobj, handle);
2991                         }
2992
2993                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2994                                                   mdo2fid(mdd_tobj),
2995                                                   mdd_object_type(mdd_tobj),
2996                                                   tname, handle);
2997                         if (rc2 != 0)
2998                                 CWARN("tp obj fix error: rc = %d\n", rc2);
2999                 }
3000         }
3001
3002 fixup_spobj:
3003         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3004                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3005                 if (rc2)
3006                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3007                                mdd2obd_dev(mdd)->obd_name, rc2);
3008
3009
3010                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3011                                               dotdot, handle);
3012                 if (rc2 != 0)
3013                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3014                               mdd2obd_dev(mdd)->obd_name, rc2);
3015         }
3016
3017 fixup_spobj2:
3018         if (rc != 0) {
3019                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3020                                          mdd_object_type(mdd_sobj), sname,
3021                                          handle);
3022                 if (rc2 != 0)
3023                         CWARN("sp obj fix error: rc = %d\n", rc2);
3024         }
3025
3026 cleanup:
3027         if (tobj_locked)
3028                 mdd_write_unlock(env, mdd_tobj);
3029
3030         if (rc == 0)
3031                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3032                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3033                                             ltname, lsname, handle);
3034
3035 stop:
3036         mdd_trans_stop(env, mdd, rc, handle);
3037
3038 out_pending:
3039         mdd_object_put(env, mdd_sobj);
3040         return rc;
3041 }
3042
3043 /**
3044  * During migration once the parent FID has been changed,
3045  * we need update the parent FID in linkea.
3046  **/
3047 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3048                                             struct mdd_object *parent,
3049                                             struct mdd_object *child,
3050                                             const char *name, int namelen,
3051                                             struct thandle *handle,
3052                                             bool declare)
3053 {
3054         struct mdd_thread_info  *info = mdd_env_info(env);
3055         struct linkea_data      ldata = { NULL };
3056         struct lu_buf           *buf = &info->mti_link_buf;
3057         int                     count;
3058         int                     rc = 0;
3059
3060         ENTRY;
3061
3062         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3063         if (buf->lb_buf == NULL)
3064                 RETURN(-ENOMEM);
3065
3066         ldata.ld_buf = buf;
3067         rc = mdd_links_read(env, child, &ldata);
3068         if (rc != 0) {
3069                 if (rc == -ENOENT || rc == -ENODATA)
3070                         rc = 0;
3071                 RETURN(rc);
3072         }
3073
3074         LASSERT(ldata.ld_leh != NULL);
3075         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3076         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3077                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3078                 struct lu_name lname;
3079                 struct lu_fid  fid;
3080
3081                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3082                                     &lname, &fid);
3083
3084                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3085                     lu_fid_eq(&fid, mdd_object_fid(parent))) {
3086                         ldata.ld_lee = (struct link_ea_entry *)
3087                                        ((char *)ldata.ld_lee +
3088                                         ldata.ld_reclen);
3089                         continue;
3090                 }
3091
3092                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3093                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3094                        lname.ln_namelen, lname.ln_name,
3095                        PFID(mdd_object_fid(parent)));
3096                 /* update to the new parent fid */
3097                 linkea_entry_pack(ldata.ld_lee, &lname,
3098                                   mdd_object_fid(parent));
3099                 if (declare)
3100                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3101                                                    MLAO_IGNORE);
3102                 else
3103                         rc = mdd_links_write(env, child, &ldata, handle);
3104                 break;
3105         }
3106         RETURN(rc);
3107 }
3108
3109 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3110                                            struct mdd_object *parent,
3111                                            struct mdd_object *child,
3112                                            const char *name, int namelen,
3113                                            struct thandle *handle)
3114 {
3115         return mdd_linkea_update_child_internal(env, parent, child, name,
3116                                                 namelen, handle, true);
3117 }
3118
3119 static int mdd_linkea_update_child(const struct lu_env *env,
3120                                    struct mdd_object *parent,
3121                                    struct mdd_object *child,
3122                                    const char *name, int namelen,
3123                                    struct thandle *handle)
3124 {
3125         return mdd_linkea_update_child_internal(env, parent, child, name,
3126                                                 namelen, handle, false);
3127 }
3128
3129 static int mdd_update_linkea_internal(const struct lu_env *env,
3130                                       struct mdd_object *mdd_pobj,
3131                                       struct mdd_object *mdd_sobj,
3132                                       struct mdd_object *mdd_tobj,
3133                                       const struct lu_name *child_name,
3134                                       struct linkea_data *ldata,
3135                                       struct thandle *handle,
3136                                       int declare)
3137 {
3138         struct mdd_thread_info  *info = mdd_env_info(env);
3139         int                     count;
3140         int                     rc = 0;
3141         ENTRY;
3142
3143         LASSERT(ldata->ld_buf != NULL);
3144
3145 again:
3146         /* If it is mulitple links file, we need update the name entry for
3147          * all parent */
3148         LASSERT(ldata->ld_leh != NULL);
3149         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3150         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3151                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3152                 struct mdd_object       *pobj;
3153                 struct lu_name          lname;
3154                 struct lu_fid           fid;
3155
3156                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3157                                     &lname, &fid);
3158                 pobj = mdd_object_find(env, mdd, &fid);
3159                 if (IS_ERR(pobj)) {
3160                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3161                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3162                               PTR_ERR(pobj));
3163                         linkea_del_buf(ldata, &lname);
3164                         goto again;
3165                 }
3166
3167                 if (!mdd_object_exists(pobj)) {
3168                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3169                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3170                         linkea_del_buf(ldata, &lname);
3171                         mdd_object_put(env, pobj);
3172                         goto again;
3173                 }
3174
3175                 if (pobj == mdd_pobj &&
3176                     lname.ln_namelen == child_name->ln_namelen &&
3177                     strncmp(lname.ln_name, child_name->ln_name,
3178                             lname.ln_namelen) == 0) {
3179                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3180                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3181                               PFID(&fid));
3182                         linkea_del_buf(ldata, &lname);
3183                         mdd_object_put(env, pobj);
3184                         goto again;
3185                 }
3186
3187                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3188                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3189                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3190
3191                 if (declare) {
3192                         /* Remove source name from source directory */
3193                         /* Insert new fid with target name into target dir */
3194                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3195                                                       handle);
3196                         if (rc != 0)
3197                                 GOTO(next_put, rc);
3198
3199                         rc = mdo_declare_index_insert(env, pobj,
3200                                         mdd_object_fid(mdd_tobj),
3201                                         mdd_object_type(mdd_tobj),
3202                                         lname.ln_name, handle);
3203                         if (rc != 0)
3204                                 GOTO(next_put, rc);
3205
3206                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3207                         if (rc)
3208                                 GOTO(next_put, rc);
3209
3210                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3211                         if (rc)
3212                                 GOTO(next_put, rc);
3213                 } else {
3214                         char *tmp_name = info->mti_key;
3215
3216                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3217                                 /* lnamelen is too big(> NAME_MAX + 16),
3218                                  * something wrong about this linkea, let's
3219                                  * skip it */
3220                                 linkea_del_buf(ldata, &lname);
3221                                 mdd_object_put(env, pobj);
3222                                 goto again;
3223                         }
3224
3225                         /* Note: lname might be without \0 at the end, see
3226                          * linkea_entry_unpack(), let's add extra \0 by
3227                          * snprintf */
3228                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3229                                  lname.ln_namelen, lname.ln_name);
3230                         lname.ln_name = tmp_name;
3231
3232                         /* Let's check if this linkEA still valid, before
3233                          * it might be packed into the RPC buffer. */
3234                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3235                                         &info->mti_fid, NULL);
3236                         if (rc < 0 ||
3237                             !lu_fid_eq(&info->mti_fid,
3238                                         mdd_object_fid(mdd_sobj))) {
3239                                 /* skip invalid linkea entry */
3240                                 linkea_del_buf(ldata, &lname);
3241                                 mdd_object_put(env, pobj);
3242                                 goto again;
3243                         }
3244
3245                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3246                         if (rc != 0)
3247                                 GOTO(next_put, rc);
3248
3249                         rc = __mdd_index_insert(env, pobj,
3250                                         mdd_object_fid(mdd_tobj),
3251                                         mdd_object_type(mdd_tobj),
3252                                         tmp_name, handle);
3253                         if (rc != 0)
3254                                 GOTO(next_put, rc);
3255
3256                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3257                         rc = mdo_ref_add(env, mdd_tobj, handle);
3258                         mdd_write_unlock(env, mdd_tobj);
3259                         if (rc)
3260                                 GOTO(next_put, rc);
3261
3262                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3263                         mdo_ref_del(env, mdd_sobj, handle);
3264                         mdd_write_unlock(env, mdd_sobj);
3265                 }
3266 next_put:
3267                 mdd_object_put(env, pobj);
3268                 if (rc != 0)
3269                         break;
3270
3271                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3272                                                          ldata->ld_reclen);
3273         }
3274
3275         RETURN(rc);
3276 }
3277
3278 static int mdd_migrate_xattrs(const struct lu_env *env,
3279                               struct mdd_object *mdd_sobj,
3280                               struct mdd_object *mdd_tobj)
3281 {
3282         struct mdd_thread_info  *info = mdd_env_info(env);
3283         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3284         char                    *xname;
3285         struct thandle          *handle;
3286         struct lu_buf           xbuf;
3287         int                     xlen;
3288         int                     rem;
3289         int                     xsize;
3290         int                     list_xsize;
3291         struct lu_buf           list_xbuf;
3292         int                     rc;
3293         int                     rc1;
3294
3295         /* retrieve xattr list from the old object */
3296         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3297         if (list_xsize == -ENODATA)
3298                 return 0;
3299
3300         if (list_xsize < 0)
3301                 return list_xsize;
3302
3303         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3304         if (info->mti_big_buf.lb_buf == NULL)
3305                 return -ENOMEM;
3306
3307         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3308         list_xbuf.lb_len = list_xsize;
3309         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3310         if (rc < 0)
3311                 return rc;
3312         rc = 0;
3313         rem = list_xsize;
3314         xname = list_xbuf.lb_buf;
3315         while (rem > 0) {
3316                 xlen = strnlen(xname, rem - 1) + 1;
3317                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3318                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3319                     strcmp(XATTR_NAME_LMV, xname) == 0)
3320                         goto next;
3321
3322                 /* For directory, if there are default layout, migrate here */
3323                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3324                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3325                         goto next;
3326
3327                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3328                 if (xsize == -ENODATA)
3329                         goto next;
3330                 if (xsize < 0)
3331                         GOTO(out, rc);
3332
3333                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3334                 if (info->mti_link_buf.lb_buf == NULL)
3335                         GOTO(out, rc = -ENOMEM);
3336
3337                 xbuf.lb_len = xsize;
3338                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3339                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3340                 if (rc == -ENODATA)
3341                         goto next;
3342                 if (rc < 0)
3343                         GOTO(out, rc);
3344
3345                 handle = mdd_trans_create(env, mdd);
3346                 if (IS_ERR(handle))
3347                         GOTO(out, rc = PTR_ERR(handle));
3348
3349                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3350                                            handle);
3351                 if (rc != 0)
3352                         GOTO(stop_trans, rc);
3353                 /* Note: this transaction is part of migration, and it is not
3354                  * the last step of migration, so we set th_local = 1 to avoid
3355                  * update last rcvd for this transaction */
3356                 handle->th_local = 1;
3357                 rc = mdd_trans_start(env, mdd, handle);
3358                 if (rc != 0)
3359                         GOTO(stop_trans, rc);
3360
3361                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3362                 if (rc == -EEXIST)
3363                         GOTO(stop_trans, rc = 0);
3364
3365                 if (rc != 0)
3366                         GOTO(stop_trans, rc);
3367 stop_trans:
3368                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3369                 if (rc == 0)
3370                         rc = rc1;
3371                 if (rc != 0)
3372                         GOTO(out, rc);
3373 next:
3374                 rem -= xlen;
3375                 memmove(xname, xname + xlen, rem);
3376         }
3377 out:
3378         return rc;
3379 }
3380
3381 static int mdd_declare_migrate_create(const struct lu_env *env,
3382                                       struct mdd_object *mdd_pobj,
3383                                       struct mdd_object *mdd_sobj,
3384                                       struct mdd_object *mdd_tobj,
3385                                       struct md_op_spec *spec,
3386                                       struct lu_attr *la,
3387                                       union lmv_mds_md *mgr_ea,
3388                                       struct linkea_data *ldata,
3389                                       struct thandle *handle)
3390 {
3391         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3392         const struct lu_buf     *buf;
3393         int                     rc;
3394         int                     mgr_easize;
3395
3396         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3397                                                 handle, spec, NULL);
3398         if (rc != 0)
3399                 return rc;
3400
3401         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3402                                            handle);
3403         if (rc != 0)
3404                 return rc;
3405
3406         if (S_ISLNK(la->la_mode)) {
3407                 const char *target_name = spec->u.sp_symname;
3408                 int sym_len = strlen(target_name);
3409                 const struct lu_buf *buf;
3410
3411                 buf = mdd_buf_get_const(env, target_name, sym_len);
3412                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3413                                              buf, 0, handle);
3414                 if (rc != 0)
3415                         return rc;
3416         } else if (S_ISDIR(la->la_mode)) {
3417                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3418                                            MLAO_IGNORE);
3419                 if (rc != 0)
3420                         return rc;
3421         }
3422
3423         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3424                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3425                                         spec->u.sp_ea.eadatalen);
3426                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3427                                            0, handle);
3428                 if (rc)
3429                         return rc;
3430         }
3431
3432         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3433         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3434         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3435                                    0, handle);
3436         if (rc)
3437                 return rc;
3438
3439         la_flag->la_valid = LA_FLAGS;
3440         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3441         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3442
3443         return rc;
3444 }
3445
3446 static int mdd_migrate_create(const struct lu_env *env,
3447                               struct mdd_object *mdd_pobj,
3448                               struct mdd_object *mdd_sobj,
3449                               struct mdd_object *mdd_tobj,
3450                               const struct lu_name *lname,
3451                               struct lu_attr *la)
3452 {
3453         struct mdd_thread_info  *info = mdd_env_info(env);
3454         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3455         struct md_op_spec       *spec = &info->mti_spec;
3456         struct lu_buf           lmm_buf = { NULL };
3457         struct lu_buf           link_buf = { NULL };
3458         const struct lu_buf     *buf;
3459         struct thandle          *handle;
3460         struct lmv_mds_md_v1    *mgr_ea;
3461         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3462         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3463         int                     mgr_easize;
3464         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3465         int                     rc;
3466         ENTRY;
3467
3468         /* prepare spec for create */
3469         memset(spec, 0, sizeof(*spec));
3470         spec->sp_cr_lookup = 0;
3471         spec->sp_feat = &dt_directory_features;
3472         if (S_ISLNK(la->la_mode)) {
3473                 buf = lu_buf_check_and_alloc(
3474                                 &mdd_env_info(env)->mti_big_buf,
3475                                 la->la_size + 1);
3476                 link_buf = *buf;
3477                 link_buf.lb_len = la->la_size + 1;
3478                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3479                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3480                 if (rc <= 0) {
3481                         rc = rc != 0 ? rc : -EFAULT;
3482                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3483                                mdd2obd_dev(mdd)->obd_name,
3484                                PFID(mdd_object_fid(mdd_sobj)), rc);
3485                         RETURN(rc);
3486                 }
3487                 spec->u.sp_symname = link_buf.lb_buf;
3488         } else if (S_ISREG(la->la_mode)) {
3489                 /* retrieve lov of the old object */
3490                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3491                 if (rc != 0 && rc != -ENODATA)
3492                         RETURN(rc);
3493                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3494                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3495                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3496                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3497                 }
3498         } else if (S_ISDIR(la->la_mode)) {
3499                 rc = mdd_links_read(env, mdd_sobj, ldata);
3500                 if (rc < 0 && rc != -ENODATA)
3501                         RETURN(rc);
3502         }
3503
3504         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3505         memset(mgr_ea, 0, sizeof(*mgr_ea));
3506         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3507         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3508         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3509         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3510         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3511         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3512
3513         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3514
3515         handle = mdd_trans_create(env, mdd);
3516         if (IS_ERR(handle))
3517                 GOTO(out_free, rc = PTR_ERR(handle));
3518
3519         /* Note: this transaction is part of migration, and it is not
3520          * the last step of migration, so we set th_local = 1 to avoid
3521          * update last rcvd for this transaction */
3522         handle->th_local = 1;
3523         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3524                                         spec, la,
3525                                         (union lmv_mds_md *)info->mti_xattr_buf,
3526                                         ldata, handle);
3527         if (rc != 0)
3528                 GOTO(stop_trans, rc);
3529
3530         rc = mdd_trans_start(env, mdd, handle);
3531         if (rc != 0)
3532                 GOTO(stop_trans, rc);
3533
3534         /* create the target object */
3535         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3536                                hint, handle);
3537         if (rc != 0)
3538                 GOTO(stop_trans, rc);
3539
3540         if (S_ISDIR(la->la_mode)) {
3541                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3542                 if (rc != 0)
3543                         GOTO(stop_trans, rc);
3544         }
3545
3546         /* Set MIGRATE EA on the source inode, so once the migration needs
3547          * to be re-done during failover, the re-do process can locate the
3548          * target object which is already being created. */
3549         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3550         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3551         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
3552         if (rc != 0)
3553                 GOTO(stop_trans, rc);
3554
3555         /* Set immutable flag, so any modification is disabled until
3556          * the migration is done. Once the migration is interrupted,
3557          * if the resume process find the migrating object has both
3558          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3559          * flag and approve the migration */
3560         la_flag->la_valid = LA_FLAGS;
3561         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3562         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3563 stop_trans:
3564         if (handle != NULL) {
3565                 int rc1;
3566
3567                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3568                 if (rc == 0)
3569                         rc = rc1;
3570         }
3571 out_free:
3572         if (lmm_buf.lb_buf != NULL)
3573                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3574         RETURN(rc);
3575 }
3576
3577 static int mdd_migrate_entries(const struct lu_env *env,
3578                                struct mdd_object *mdd_sobj,
3579                                struct mdd_object *mdd_tobj)
3580 {
3581         struct dt_object        *next = mdd_object_child(mdd_sobj);
3582         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3583         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3584         struct thandle          *handle;
3585         struct dt_it            *it;
3586         const struct dt_it_ops  *iops;
3587         int                      rc;
3588         int                      result;
3589         struct lu_dirent        *ent;
3590         ENTRY;
3591
3592         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3593         if (ent == NULL)
3594                 RETURN(-ENOMEM);
3595
3596         if (!dt_try_as_dir(env, next))
3597                 GOTO(out_ent, rc = -ENOTDIR);
3598         /*
3599          * iterate directories
3600          */
3601         iops = &next->do_index_ops->dio_it;
3602         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3603         if (IS_ERR(it))
3604                 GOTO(out_ent, rc = PTR_ERR(it));
3605
3606         rc = iops->load(env, it, 0);
3607         if (rc == 0)
3608                 rc = iops->next(env, it);
3609         else if (rc > 0)
3610                 rc = 0;
3611         /*
3612          * At this point and across for-loop:
3613          *
3614          *  rc == 0 -> ok, proceed.
3615          *  rc >  0 -> end of directory.
3616          *  rc <  0 -> error.
3617          */
3618         do {
3619                 struct mdd_object       *child;
3620                 char                    *name = mdd_env_info(env)->mti_key;
3621                 int                     len;
3622                 int                     recsize;
3623                 int                     is_dir;
3624                 bool                    target_exist = false;
3625                 int                     rc1;
3626
3627                 len = iops->key_size(env, it);
3628                 if (len == 0)
3629                         goto next;
3630
3631                 result = iops->rec(env, it, (struct dt_rec *)ent,
3632                                    LUDA_FID | LUDA_TYPE);
3633                 if (result == -ESTALE)
3634                         goto next;
3635                 if (result != 0) {
3636                         rc = result;
3637                         goto out;
3638                 }
3639
3640                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3641                 recsize = le16_to_cpu(ent->lde_reclen);
3642
3643                 /* Insert new fid with target name into target dir */
3644                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3645                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3646                      ent->lde_name[1] == '.'))
3647                         goto next;
3648
3649                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3650                 if (IS_ERR(child))
3651                         GOTO(out, rc = PTR_ERR(child));
3652
3653                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3654                 is_dir = S_ISDIR(mdd_object_type(child));
3655
3656                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3657
3658                 /* Check whether the name has been inserted to the target */
3659                 if (dt_try_as_dir(env, dt_tobj)) {
3660                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3661
3662                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3663                                        (struct dt_key *)name);
3664                         if (unlikely(rc == 0))
3665                                 target_exist = true;
3666                 }
3667
3668                 handle = mdd_trans_create(env, mdd);
3669                 if (IS_ERR(handle))
3670                         GOTO(out, rc = PTR_ERR(handle));
3671
3672                 /* Note: this transaction is part of migration, and it is not
3673                  * the last step of migration, so we set th_local = 1 to avoid
3674                  * updating last rcvd for this transaction */
3675                 handle->th_local = 1;
3676                 if (likely(!target_exist)) {
3677                         rc = mdo_declare_index_insert(env, mdd_tobj,
3678                                                       &ent->lde_fid,
3679                                                       mdd_object_type(child),
3680                                                       name, handle);
3681                         if (rc != 0)
3682                                 GOTO(out_put, rc);
3683
3684                         if (is_dir) {
3685                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3686                                 if (rc != 0)
3687                                         GOTO(out_put, rc);
3688                         }
3689                 }
3690
3691                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3692                 if (rc != 0)
3693                         GOTO(out_put, rc);
3694
3695                 if (is_dir) {
3696                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3697                         if (rc != 0)
3698                                 GOTO(out_put, rc);
3699
3700                         /* Update .. for child */
3701                         rc = mdo_declare_index_delete(env, child, dotdot,
3702                                                       handle);
3703                         if (rc != 0)
3704                                 GOTO(out_put, rc);
3705
3706                         rc = mdo_declare_index_insert(env, child,
3707                                                       mdd_object_fid(mdd_tobj),
3708                                                       S_IFDIR, dotdot, handle);
3709                         if (rc != 0)
3710                                 GOTO(out_put, rc);
3711                 }
3712
3713                 rc = mdd_linkea_declare_update_child(env, mdd_tobj,
3714                                                      child, name,
3715                                                      strlen(name),
3716                                                      handle);
3717                 if (rc != 0)
3718                         GOTO(out_put, rc);
3719
3720                 rc = mdd_trans_start(env, mdd, handle);
3721                 if (rc != 0) {
3722                         CERROR("%s: transaction start failed: rc = %d\n",
3723                                mdd2obd_dev(mdd)->obd_name, rc);
3724                         GOTO(out_put, rc);
3725                 }
3726
3727                 if (likely(!target_exist)) {
3728                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3729                                                 mdd_object_type(child),
3730                                                 name, handle);
3731                         if (rc != 0)
3732                                 GOTO(out_put, rc);
3733                 }
3734
3735                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3736                 if (rc != 0)
3737                         GOTO(out_put, rc);
3738
3739                 if (is_dir) {
3740                         rc = __mdd_index_delete_only(env, child, dotdot,
3741                                                      handle);
3742                         if (rc != 0)
3743                                 GOTO(out_put, rc);
3744
3745                         rc = __mdd_index_insert_only(env, child,
3746                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3747                                          dotdot, handle);
3748                         if (rc != 0)
3749                                 GOTO(out_put, rc);
3750                 }
3751
3752                 rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
3753                                              strlen(name), handle);
3754
3755 out_put:
3756                 mdd_write_unlock(env, child);
3757                 mdd_object_put(env, child);
3758                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3759                 if (rc == 0)
3760                         rc = rc1;
3761
3762                 if (rc != 0)
3763                         GOTO(out, rc);
3764 next:
3765                 result = iops->next(env, it);
3766                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3767                         GOTO(out, rc = -EINTR);
3768
3769                 if (result == -ESTALE)
3770                         goto next;
3771         } while (result == 0);
3772 out:
3773         iops->put(env, it);
3774         iops->fini(env, it);
3775 out_ent:
3776         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3777         RETURN(rc);
3778 }
3779
3780 static int mdd_declare_update_linkea(const struct lu_env *env,
3781                                      struct mdd_object *mdd_pobj,
3782                                      struct mdd_object *mdd_sobj,
3783                                      struct mdd_object *mdd_tobj,
3784                                      const struct lu_name *child_name,
3785                                      struct linkea_data *ldata,
3786                                      struct thandle *handle)
3787 {
3788         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3789                                           child_name, ldata, handle, 1);
3790 }
3791
3792 static int mdd_update_linkea(const struct lu_env *env,
3793                              struct mdd_object *mdd_pobj,
3794                              struct mdd_object *mdd_sobj,
3795                              struct mdd_object *mdd_tobj,
3796                              const struct lu_name *child_name,
3797                              struct linkea_data *ldata,
3798                              struct thandle *handle)
3799 {
3800         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3801                                           child_name, ldata, handle, 0);
3802 }
3803
3804 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3805                                            struct mdd_object *mdd_pobj,
3806                                            struct mdd_object *mdd_sobj,
3807                                            struct mdd_object *mdd_tobj,
3808                                            const struct lu_name *lname,
3809                                            struct lu_attr *la,
3810                                            struct lu_attr *parent_la,
3811                                            struct linkea_data *ldata,
3812                                            struct thandle *handle)
3813 {
3814         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3815         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
3816         int rc;
3817
3818         /* Revert IMMUTABLE flag */
3819         la_flag->la_valid = LA_FLAGS;
3820         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3821         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3822         if (rc != 0)
3823                 return rc;
3824
3825         /* delete entry from source dir */
3826         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3827         if (rc != 0)
3828                 return rc;
3829
3830         if (ldata->ld_buf != NULL) {
3831                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3832                                                mdd_tobj, lname, ldata, handle);
3833                 if (rc != 0)
3834                         return rc;
3835         }
3836
3837         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3838                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3839                                            handle);
3840                 if (rc != 0)
3841                         return rc;
3842
3843                 handle->th_complex = 1;
3844                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3845                                            XATTR_NAME_FID,
3846                                            LU_XATTR_REPLACE, handle);
3847                 if (rc < 0)
3848                         return rc;
3849         }
3850
3851         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3852                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3853                 if (rc != 0)
3854                         return rc;
3855         }
3856
3857         /* new name */
3858         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3859                                       mdd_object_type(mdd_tobj),
3860                                       lname->ln_name, handle);
3861         if (rc != 0)
3862                 return rc;
3863
3864         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
3865         if (rc != 0)
3866                 return rc;
3867
3868         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3869                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3870                 if (rc != 0)
3871                         return rc;
3872         }
3873
3874         /* delete old object */
3875         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3876         if (rc != 0)
3877                 return rc;
3878
3879         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3880                 /* delete old object */
3881                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3882                 if (rc != 0)
3883                         return rc;
3884                 /* set nlink to 0 */
3885                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3886                 if (rc != 0)
3887                         return rc;
3888         }
3889
3890         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3891         if (rc)
3892                 return rc;
3893
3894         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3895         if (rc != 0)
3896                 return rc;
3897
3898         rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
3899
3900         return rc;
3901 }
3902
3903 static int mdd_migrate_update_name(const struct lu_env *env,
3904                                    struct mdd_object *mdd_pobj,
3905                                    struct mdd_object *mdd_sobj,
3906                                    struct mdd_object *mdd_tobj,
3907                                    const struct lu_name *lname,
3908                                    struct md_attr *ma)
3909 {
3910         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3911         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3912         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3913         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3914         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3915         struct thandle          *handle;
3916         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3917         const char              *name = lname->ln_name;
3918         int                     rc;
3919         ENTRY;
3920
3921         /* update time for parent */
3922         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3923         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3924         p_la->la_valid = LA_CTIME;
3925
3926         rc = mdd_la_get(env, mdd_sobj, so_attr);
3927         if (rc != 0)
3928                 RETURN(rc);
3929
3930         ldata->ld_buf = NULL;
3931         rc = mdd_links_read(env, mdd_sobj, ldata);
3932         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
3933                 RETURN(rc);
3934
3935         handle = mdd_trans_create(env, mdd);
3936         if (IS_ERR(handle))
3937                 RETURN(PTR_ERR(handle));
3938
3939         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3940                                              lname, so_attr, p_la, ldata,
3941                                              handle);
3942         if (rc != 0) {
3943                 /* If the migration can not be fit in one transaction, just
3944                  * leave it in the original MDT */
3945                 if (rc == -E2BIG)
3946                         GOTO(stop_trans, rc = 0);
3947                 else
3948                         GOTO(stop_trans, rc);
3949         }
3950
3951         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3952                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3953                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3954                PFID(mdd_object_fid(mdd_tobj)));
3955
3956         rc = mdd_trans_start(env, mdd, handle);
3957         if (rc != 0)
3958                 GOTO(stop_trans, rc);
3959
3960         /* Revert IMMUTABLE flag */
3961         la_flag->la_valid = LA_FLAGS;
3962         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3963         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3964         if (rc != 0)
3965                 GOTO(stop_trans, rc);
3966
3967         /* Remove source name from source directory */
3968         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
3969         if (rc != 0)
3970                 GOTO(stop_trans, rc);
3971
3972         if (ldata->ld_buf != NULL) {
3973                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3974                                        lname, ldata, handle);
3975                 if (rc != 0)
3976                         GOTO(stop_trans, rc);
3977
3978                 /*  linkea update might decrease the source object
3979                  *  nlink, let's get the attr again after ref_del */
3980                 rc = mdd_la_get(env, mdd_sobj, so_attr);
3981                 if (rc != 0)
3982                         GOTO(stop_trans, rc);
3983         }
3984
3985         if (S_ISREG(so_attr->la_mode)) {
3986                 if (so_attr->la_nlink == 1) {
3987                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3988                                            handle);
3989                         if (rc != 0 && rc != -ENODATA)
3990                                 GOTO(stop_trans, rc);
3991
3992                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
3993                                            XATTR_NAME_FID,
3994                                            LU_XATTR_REPLACE, handle);
3995                         if (rc < 0)
3996                                 GOTO(stop_trans, rc);
3997                 }
3998         }
3999
4000         /* Insert new fid with target name into target dir */
4001         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4002                                 mdd_object_type(mdd_tobj), name, handle);
4003         if (rc != 0)
4004                 GOTO(stop_trans, rc);
4005
4006         linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
4007         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
4008                            ldata, 1);
4009         if (rc != 0)
4010                 GOTO(stop_trans, rc);
4011
4012         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4013
4014         /* Increase mod_count to add the source object to the orphan list,
4015          * so if other clients still send RPC to the old object, then these
4016          * objects can help the request to find the new object, see
4017          * mdt_reint_open() */
4018         mdd_sobj->mod_count++;
4019         rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
4020         mdd_sobj->mod_count--;
4021         if (rc != 0)
4022                 GOTO(out_unlock, rc);
4023
4024         mdo_ref_del(env, mdd_sobj, handle);
4025         if (is_dir)
4026                 mdo_ref_del(env, mdd_sobj, handle);
4027
4028         /* Get the attr again after ref_del */
4029         rc = mdd_la_get(env, mdd_sobj, so_attr);
4030         if (rc != 0)
4031                 GOTO(out_unlock, rc);
4032
4033         ma->ma_attr = *so_attr;
4034         ma->ma_valid |= MA_INODE;
4035
4036         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4037         if (rc != 0)
4038                 GOTO(out_unlock, rc);
4039
4040         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4041                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4042                                mdo2fid(mdd_pobj), lname, lname, handle);
4043         if (rc != 0) {
4044                 CWARN("%s: changelog for migrate %s "DFID
4045                       "under "DFID" failed: rc = %d\n",
4046                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4047                       PFID(mdd_object_fid(mdd_sobj)),
4048                       PFID(mdd_object_fid(mdd_pobj)), rc);
4049                 /* Sigh, there are no easy way to migrate back the object, so
4050                  * let's reset the result to 0 for now XXX */
4051                 rc = 0;
4052         }
4053 out_unlock:
4054         mdd_write_unlock(env, mdd_sobj);
4055
4056 stop_trans:
4057         mdd_trans_stop(env, mdd, rc, handle);
4058
4059         RETURN(rc);
4060 }
4061
4062 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4063                           const struct lu_fid *fid, __u32 *mdt_index)
4064 {
4065         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4066         struct seq_server_site *ss;
4067         int rc;
4068
4069         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4070
4071         range->lsr_flags = LU_SEQ_RANGE_MDT;
4072         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4073         if (rc != 0)
4074                 return rc;
4075
4076         *mdt_index = range->lsr_index;
4077
4078         return 0;
4079 }
4080 /**
4081  * Check whether we should migrate the file/dir
4082  * return val
4083  *      < 0  permission check failed or other error.
4084  *      = 0  the file can be migrated.
4085  *      > 0  the file does not need to be migrated, mostly
4086  *           for multiple link file
4087  **/
4088 static int mdd_migrate_sanity_check(const struct lu_env *env,
4089                                     struct mdd_object *pobj,
4090                                     const struct lu_attr *pattr,
4091                                     struct mdd_object *sobj,
4092                                     struct lu_attr *sattr)
4093 {
4094         struct mdd_thread_info  *info = mdd_env_info(env);
4095         struct linkea_data      *ldata = &info->mti_link_data;
4096         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4097         int                     mgr_easize;
4098         struct lu_buf           *mgr_buf;
4099         int                     count;
4100         int                     rc;
4101         __u64 mdt_index;
4102         ENTRY;
4103
4104         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4105         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4106         if (mgr_buf->lb_buf == NULL)
4107                 RETURN(-ENOMEM);
4108
4109         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4110         if (rc > 0) {
4111                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4112
4113                 /* If the object has migrateEA, it means IMMUTE flag
4114                  * is being set by previous migration process, so it
4115                  * needs to override the IMMUTE flag, otherwise the
4116                  * following sanity check will fail */
4117                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4118                                                 LMV_HASH_FLAG_MIGRATION) {
4119                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4120
4121                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4122                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4123                                mdd2obd_dev(mdd)->obd_name,
4124                                PFID(mdd_object_fid(sobj)));
4125                 }
4126         }
4127
4128         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4129                                      sobj, sattr, NULL, NULL);
4130         if (rc != 0)
4131                 RETURN(rc);
4132
4133         /* Then it will check if the file should be migrated. If the file
4134          * has mulitple links, we only need migrate the file if all of its
4135          * entries has been migrated to the remote MDT */
4136         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4137                 RETURN(0);
4138
4139         rc = mdd_links_read(env, sobj, ldata);
4140         if (rc != 0) {
4141                 /* For multiple links files, if there are no linkEA data at all,
4142                  * means the file might be created before linkEA is enabled, and
4143                  * all of its links should not be migrated yet, otherwise it
4144                  * should have some linkEA there */
4145                 if (rc == -ENOENT || rc == -ENODATA)
4146                         RETURN(1);
4147                 RETURN(rc);
4148         }
4149
4150         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4151         /* If there are still links locally, then the file will not be
4152          * migrated. */
4153         LASSERT(ldata->ld_leh != NULL);
4154         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4155         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4156                 struct lu_name          lname;
4157                 struct lu_fid           fid;
4158                 __u32                   parent_mdt_index;
4159
4160                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4161                                     &lname, &fid);
4162                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4163                                                          ldata->ld_reclen);
4164
4165                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4166                 if (rc != 0)
4167                         RETURN(rc);
4168
4169                 /* Migrate the object only if none of its parents are on the
4170                  * current MDT. */
4171                 if (parent_mdt_index != mdt_index)
4172                         continue;
4173
4174                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4175                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4176                        lname.ln_name, PFID(&fid));
4177                 rc = 1;
4178                 break;
4179         }
4180
4181         RETURN(rc);
4182 }
4183
4184 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4185                        struct md_object *sobj, const struct lu_name *lname,
4186                        struct md_object *tobj, struct md_attr *ma)
4187 {
4188         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4189         struct mdd_device       *mdd = mdo2mdd(pobj);
4190         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4191         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4192         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4193         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4194         int                     rc;
4195
4196         ENTRY;
4197         /* If the file will being migrated, it will check whether
4198          * the file is being opened by someone else right now */
4199         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4200         if (mdd_sobj->mod_count > 0) {
4201                 CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
4202                        mdd2obd_dev(mdd)->obd_name,
4203                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4204                        mdd_sobj->mod_count, -EBUSY);
4205                 mdd_read_unlock(env, mdd_sobj);
4206                 GOTO(put, rc = -EBUSY);
4207         }
4208         mdd_read_unlock(env, mdd_sobj);
4209
4210         rc = mdd_la_get(env, mdd_sobj, so_attr);
4211         if (rc != 0)
4212                 GOTO(put, rc);
4213
4214         rc = mdd_la_get(env, mdd_pobj, pattr);
4215         if (rc != 0)
4216                 GOTO(put, rc);
4217
4218         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4219         if (rc != 0) {
4220                 if (rc > 0)
4221                         rc = 0;
4222                 GOTO(put, rc);
4223         }
4224
4225         /* Sigh, it is impossible to finish all of migration in a single
4226          * transaction, for example migrating big directory entries to the
4227          * new MDT, it needs insert all of name entries of children in the
4228          * new directory.
4229          *
4230          * So migration will be done in multiple steps and transactions.
4231          *
4232          * 1. create an orphan object on the remote MDT in one transaction.
4233          * 2. migrate extend attributes to the new target file/directory.
4234          * 3. For directory, migrate the entries to the new MDT and update
4235          * linkEA of each children. Because we can not migrate all entries
4236          * in a single transaction, so the migrating directory will become
4237          * a striped directory during migration, so once the process is
4238          * interrupted, the directory is still accessible. (During lookup,
4239          * client will locate the name by searching both original and target
4240          * object).
4241          * 4. Finally, update the name/FID to point to the new file/directory
4242          * in a separate transaction.
4243          */
4244
4245         /* step 1: Check whether the orphan object has been created, and create
4246          * orphan object on the remote MDT if needed */
4247         if (!mdd_object_exists(mdd_tobj)) {
4248                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4249                                         lname, so_attr);
4250                 if (rc != 0)
4251                         GOTO(put, rc);
4252         }
4253
4254         LASSERT(mdd_object_exists(mdd_tobj));
4255         /* step 2: migrate xattr */
4256         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4257         if (rc != 0)
4258                 GOTO(put, rc);
4259
4260         /* step 3: migrate name entries to the orphan object */
4261         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4262                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4263                 if (rc != 0)
4264                         GOTO(put, rc);
4265                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4266                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4267                         GOTO(put, rc = 0);
4268         } else {
4269                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4270         }
4271
4272         LASSERT(mdd_object_exists(mdd_tobj));
4273         /* step 4: update name entry to the new object */
4274         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4275                                      ma);
4276         if (rc != 0)
4277                 GOTO(put, rc);
4278 put:
4279         RETURN(rc);
4280 }
4281
4282 const struct md_dir_operations mdd_dir_ops = {
4283         .mdo_is_subdir     = mdd_is_subdir,
4284         .mdo_lookup        = mdd_lookup,
4285         .mdo_create        = mdd_create,
4286         .mdo_rename        = mdd_rename,
4287         .mdo_link          = mdd_link,
4288         .mdo_unlink        = mdd_unlink,
4289         .mdo_create_data   = mdd_create_data,
4290         .mdo_migrate       = mdd_migrate,
4291 };