Whamcloud - gitweb
LU-10220 mdd: fix buf alloc in mdd_changelog_data_store_by_fid
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         .ln_name        = (char *) dotdot,
53         .ln_namelen     = sizeof(dotdot) - 1,
54 };
55
56 static inline int
57 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
58 {
59         if (!lu_name_is_valid(ln))
60                 return -EINVAL;
61         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
62                 return -ENAMETOOLONG;
63         else
64                 return 0;
65 }
66
67 /* Get FID from name and parent */
68 static int
69 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
70              const struct lu_attr *pattr, const struct lu_name *lname,
71              struct lu_fid* fid, int mask)
72 {
73         const char *name                = lname->ln_name;
74         const struct dt_key *key        = (const struct dt_key *)name;
75         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
76         struct mdd_device *m            = mdo2mdd(pobj);
77         struct dt_object *dir           = mdd_object_child(mdd_obj);
78         int rc;
79         ENTRY;
80
81         if (unlikely(mdd_is_dead_obj(mdd_obj)))
82                 RETURN(-ESTALE);
83
84         if (!mdd_object_exists(mdd_obj))
85                 RETURN(-ESTALE);
86
87         if (mdd_object_remote(mdd_obj)) {
88                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
89                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
90         }
91
92         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
93                                             MOR_TGT_PARENT);
94         if (rc)
95                 RETURN(rc);
96
97         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
98                    dt_try_as_dir(env, dir)))
99                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
100         else
101                 rc = -ENOTDIR;
102
103         RETURN(rc);
104 }
105
106 int mdd_lookup(const struct lu_env *env,
107                struct md_object *pobj, const struct lu_name *lname,
108                struct lu_fid *fid, struct md_op_spec *spec)
109 {
110         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
111         int rc;
112         ENTRY;
113
114         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
115         if (rc != 0)
116                 RETURN(rc);
117
118         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
119                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
120         RETURN(rc);
121 }
122
123 /** Read the link EA into a temp buffer.
124  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
125  * A pointer to the buffer is stored in \a ldata::ld_buf.
126  *
127  * \retval 0 or error
128  */
129 static int __mdd_links_read(const struct lu_env *env,
130                             struct mdd_object *mdd_obj,
131                             struct linkea_data *ldata)
132 {
133         int rc;
134
135         if (!mdd_object_exists(mdd_obj))
136                 return -ENODATA;
137
138         /* First try a small buf */
139         LASSERT(env != NULL);
140         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
141                                                PAGE_SIZE);
142         if (ldata->ld_buf->lb_buf == NULL)
143                 return -ENOMEM;
144
145         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
146         if (rc == -ERANGE) {
147                 /* Buf was too small, figure out what we need. */
148                 lu_buf_free(ldata->ld_buf);
149                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
150                                    XATTR_NAME_LINK);
151                 if (rc < 0)
152                         return rc;
153                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
154                 if (ldata->ld_buf->lb_buf == NULL)
155                         return -ENOMEM;
156                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
157                                   XATTR_NAME_LINK);
158         }
159         if (rc < 0) {
160                 lu_buf_free(ldata->ld_buf);
161                 ldata->ld_buf = NULL;
162                 return rc;
163         }
164
165         return linkea_init(ldata);
166 }
167
168 static int mdd_links_read(const struct lu_env *env,
169                           struct mdd_object *mdd_obj,
170                           struct linkea_data *ldata)
171 {
172         int rc;
173
174         rc = __mdd_links_read(env, mdd_obj, ldata);
175         if (!rc)
176                 rc = linkea_init(ldata);
177
178         return rc;
179 }
180
181 static int mdd_links_read_with_rec(const struct lu_env *env,
182                                    struct mdd_object *mdd_obj,
183                                    struct linkea_data *ldata)
184 {
185         int rc;
186
187         rc = __mdd_links_read(env, mdd_obj, ldata);
188         if (!rc)
189                 rc = linkea_init_with_rec(ldata);
190
191         return rc;
192 }
193
194 /**
195  * Get parent FID of the directory
196  *
197  * Read parent FID from linkEA, if that fails, then do lookup
198  * dotdot to get the parent FID.
199  *
200  * \param[in] env       execution environment
201  * \param[in] obj       object from which to find the parent FID
202  * \param[in] attr      attribute of the object
203  * \param[out] fid      fid to get the parent FID
204  *
205  * \retval              0 if getting the parent FID succeeds.
206  * \retval              negative errno if getting the parent FID fails.
207  **/
208 static inline int mdd_parent_fid(const struct lu_env *env,
209                                  struct mdd_object *obj,
210                                  const struct lu_attr *attr,
211                                  struct lu_fid *fid)
212 {
213         struct mdd_thread_info  *info = mdd_env_info(env);
214         struct linkea_data      ldata = { NULL };
215         struct lu_buf           *buf = &info->mti_link_buf;
216         struct lu_name          lname;
217         int                     rc = 0;
218
219         ENTRY;
220
221         LASSERT(S_ISDIR(mdd_object_type(obj)));
222
223         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
224         if (buf->lb_buf == NULL)
225                 GOTO(lookup, rc = 0);
226
227         ldata.ld_buf = buf;
228         rc = mdd_links_read_with_rec(env, obj, &ldata);
229         if (rc != 0)
230                 GOTO(lookup, rc);
231
232         LASSERT(ldata.ld_leh != NULL);
233         /* Directory should only have 1 parent */
234         if (ldata.ld_leh->leh_reccount > 1)
235                 GOTO(lookup, rc);
236
237         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
238
239         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
240         if (likely(fid_is_sane(fid)))
241                 RETURN(0);
242 lookup:
243         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
244         RETURN(rc);
245 }
246
247 /*
248  * For root fid use special function, which does not compare version component
249  * of fid. Version component is different for root fids on all MDTs.
250  */
251 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
252 {
253         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
254                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
255 }
256
257 /*
258  * return 1: if lf is the fid of the ancestor of p1;
259  * return 0: if not;
260  *
261  * return -EREMOTE: if remote object is found, in this
262  * case fid of remote object is saved to @pf;
263  *
264  * otherwise: values < 0, errors.
265  */
266 static int mdd_is_parent(const struct lu_env *env,
267                         struct mdd_device *mdd,
268                         struct mdd_object *p1,
269                         const struct lu_attr *attr,
270                         const struct lu_fid *lf,
271                         struct lu_fid *pf)
272 {
273         struct mdd_object *parent = NULL;
274         struct lu_fid *pfid;
275         int rc;
276         ENTRY;
277
278         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
279         pfid = &mdd_env_info(env)->mti_fid;
280
281         /* Check for root first. */
282         if (mdd_is_root(mdd, mdo2fid(p1)))
283                 RETURN(0);
284
285         for(;;) {
286                 /* this is done recursively */
287                 rc = mdd_parent_fid(env, p1, attr, pfid);
288                 if (rc)
289                         GOTO(out, rc);
290                 if (mdd_is_root(mdd, pfid))
291                         GOTO(out, rc = 0);
292                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
293                         GOTO(out, rc = 0);
294                 if (lu_fid_eq(pfid, lf))
295                         GOTO(out, rc = 1);
296                 if (parent != NULL)
297                         mdd_object_put(env, parent);
298
299                 parent = mdd_object_find(env, mdd, pfid);
300                 if (IS_ERR(parent))
301                         GOTO(out, rc = PTR_ERR(parent));
302
303                 if (!mdd_object_exists(parent))
304                         GOTO(out, rc = -EINVAL);
305
306                 p1 = parent;
307         }
308         EXIT;
309 out:
310         if (parent && !IS_ERR(parent))
311                 mdd_object_put(env, parent);
312         return rc;
313 }
314
315 /*
316  * No permission check is needed.
317  *
318  * returns 1: if fid is ancestor of @mo;
319  * returns 0: if fid is not an ancestor of @mo;
320  *
321  * returns EREMOTE if remote object is found, fid of remote object is saved to
322  * @fid;
323  *
324  * returns < 0: if error
325  */
326 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
327                   const struct lu_fid *fid, struct lu_fid *sfid)
328 {
329         struct mdd_device *mdd = mdo2mdd(mo);
330         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
331         int rc;
332         ENTRY;
333
334         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
335                 RETURN(0);
336
337         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
338         if (rc != 0)
339                 RETURN(rc);
340
341         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
342         if (rc == 0) {
343                 /* found root */
344                 fid_zero(sfid);
345         } else if (rc == 1) {
346                 /* found @fid is parent */
347                 *sfid = *fid;
348                 rc = 0;
349         }
350         RETURN(rc);
351 }
352
353 /*
354  * Check that @dir contains no entries except (possibly) dot and dotdot.
355  *
356  * Returns:
357  *
358  *             0        empty
359  *      -ENOTDIR        not a directory object
360  *    -ENOTEMPTY        not empty
361  *           -ve        other error
362  *
363  */
364 static int mdd_dir_is_empty(const struct lu_env *env,
365                             struct mdd_object *dir)
366 {
367         struct dt_it     *it;
368         struct dt_object *obj;
369         const struct dt_it_ops *iops;
370         int result;
371         ENTRY;
372
373         obj = mdd_object_child(dir);
374         if (!dt_try_as_dir(env, obj))
375                 RETURN(-ENOTDIR);
376
377         iops = &obj->do_index_ops->dio_it;
378         it = iops->init(env, obj, LUDA_64BITHASH);
379         if (!IS_ERR(it)) {
380                 result = iops->get(env, it, (const struct dt_key *)"");
381                 if (result > 0) {
382                         int i;
383                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
384                                 result = iops->next(env, it);
385                         if (result == 0)
386                                 result = -ENOTEMPTY;
387                         else if (result == 1)
388                                 result = 0;
389                 } else if (result == 0)
390                         /*
391                          * Huh? Index contains no zero key?
392                          */
393                         result = -EIO;
394
395                 iops->put(env, it);
396                 iops->fini(env, it);
397         } else
398                 result = PTR_ERR(it);
399         RETURN(result);
400 }
401
402 /**
403  * Determine if the target object can be hard linked, and right now it only
404  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
405  * directory nlink count might exceed the maximum link count(see
406  * osd_object_ref_add), so it only check nlink for non-directories.
407  *
408  * \param[in] env       thread environment
409  * \param[in] obj       object being linked to
410  * \param[in] la        attributes of \a obj
411  *
412  * \retval              0 if \a obj can be hard linked
413  * \retval              negative error if \a obj is a directory or has too
414  *                      many links
415  */
416 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
417                           const struct lu_attr *la)
418 {
419         struct mdd_device *m = mdd_obj2mdd_dev(obj);
420         ENTRY;
421
422         LASSERT(la != NULL);
423
424         /* Subdir count limitation can be broken through
425          * (see osd_object_ref_add), so only check non-directory here. */
426         if (!S_ISDIR(la->la_mode) &&
427             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
428                 RETURN(-EMLINK);
429
430         RETURN(0);
431 }
432
433 /**
434  * Check whether it may create the cobj under the pobj.
435  *
436  * \param[in] env       execution environment
437  * \param[in] pobj      the parent directory
438  * \param[in] pattr     the attribute of the parent directory
439  * \param[in] cobj      the child to be created
440  * \param[in] check_perm        if check WRITE|EXEC permission for parent
441  *
442  * \retval              = 0 create the child under this dir is allowed
443  * \retval              negative errno create the child under this dir is
444  *                      not allowed
445  */
446 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
447                    const struct lu_attr *pattr, struct mdd_object *cobj,
448                    bool check_perm)
449 {
450         int rc = 0;
451         ENTRY;
452
453         if (cobj && mdd_object_exists(cobj))
454                 RETURN(-EEXIST);
455
456         if (mdd_is_dead_obj(pobj))
457                 RETURN(-ENOENT);
458
459         if (check_perm)
460                 rc = mdd_permission_internal_locked(env, pobj, pattr,
461                                                     MAY_WRITE | MAY_EXEC,
462                                                     MOR_TGT_PARENT);
463         RETURN(rc);
464 }
465
466 /*
467  * Check whether can unlink from the pobj in the case of "cobj == NULL".
468  */
469 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
470                    const struct lu_attr *pattr, const struct lu_attr *attr)
471 {
472         int rc;
473         ENTRY;
474
475         if (mdd_is_dead_obj(pobj))
476                 RETURN(-ENOENT);
477
478         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
479                 RETURN(-EPERM);
480
481         rc = mdd_permission_internal_locked(env, pobj, pattr,
482                                             MAY_WRITE | MAY_EXEC,
483                                             MOR_TGT_PARENT);
484         if (rc != 0)
485                 RETURN(rc);
486
487         if (pattr->la_flags & LUSTRE_APPEND_FL)
488                 RETURN(-EPERM);
489
490         RETURN(rc);
491 }
492
493 /*
494  * pobj == NULL is remote ops case, under such case, pobj's
495  * VTX feature has been checked already, no need check again.
496  */
497 static inline int mdd_is_sticky(const struct lu_env *env,
498                                 struct mdd_object *pobj,
499                                 const struct lu_attr *pattr,
500                                 struct mdd_object *cobj,
501                                 const struct lu_attr *cattr)
502 {
503         struct lu_ucred *uc = lu_ucred_assert(env);
504
505         if (pobj != NULL) {
506                 LASSERT(pattr != NULL);
507                 if (!(pattr->la_mode & S_ISVTX) ||
508                     (pattr->la_uid == uc->uc_fsuid))
509                         return 0;
510         }
511
512         LASSERT(cattr != NULL);
513         if (cattr->la_uid == uc->uc_fsuid)
514                 return 0;
515
516         return !md_capable(uc, CFS_CAP_FOWNER);
517 }
518
519 static int mdd_may_delete_entry(const struct lu_env *env,
520                                 struct mdd_object *pobj,
521                                 const struct lu_attr *pattr,
522                                 int check_perm)
523 {
524         ENTRY;
525
526         LASSERT(pobj != NULL);
527         if (!mdd_object_exists(pobj))
528                 RETURN(-ENOENT);
529
530         if (mdd_is_dead_obj(pobj))
531                 RETURN(-ENOENT);
532
533         if (check_perm) {
534                 int rc;
535                 rc = mdd_permission_internal_locked(env, pobj, pattr,
536                                             MAY_WRITE | MAY_EXEC,
537                                             MOR_TGT_PARENT);
538                 if (rc)
539                         RETURN(rc);
540         }
541
542         if (pattr->la_flags & LUSTRE_APPEND_FL)
543                 RETURN(-EPERM);
544
545         RETURN(0);
546 }
547
548 /*
549  * Check whether it may delete the cobj from the pobj.
550  * pobj maybe NULL
551  */
552 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
553                    const struct lu_attr *tpattr, struct mdd_object *tobj,
554                    const struct lu_attr *tattr, const struct lu_attr *cattr,
555                    int check_perm, int check_empty)
556 {
557         int rc = 0;
558         ENTRY;
559
560         if (tpobj) {
561                 LASSERT(tpattr != NULL);
562                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
563                 if (rc != 0)
564                         RETURN(rc);
565         }
566
567         if (tobj == NULL)
568                 RETURN(0);
569
570         if (!mdd_object_exists(tobj))
571                 RETURN(-ENOENT);
572
573         if (mdd_is_dead_obj(tobj))
574                 RETURN(-ESTALE);
575
576         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
577                 RETURN(-EPERM);
578
579         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
580                 RETURN(-EPERM);
581
582         /* additional check the rename case */
583         if (cattr) {
584                 if (S_ISDIR(cattr->la_mode)) {
585                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
586
587                         if (!S_ISDIR(tattr->la_mode))
588                                 RETURN(-ENOTDIR);
589
590                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
591                                 RETURN(-EBUSY);
592                 } else if (S_ISDIR(tattr->la_mode))
593                         RETURN(-EISDIR);
594         }
595
596         if (S_ISDIR(tattr->la_mode) && check_empty)
597                 rc = mdd_dir_is_empty(env, tobj);
598
599         RETURN(rc);
600 }
601
602 /**
603  * Check whether it can create the link file(linked to @src_obj) under
604  * the target directory(@tgt_obj), and src_obj has been locked by
605  * mdd_write_lock.
606  *
607  * \param[in] env       execution environment
608  * \param[in] tgt_obj   the target directory
609  * \param[in] tattr     attributes of target directory
610  * \param[in] lname     the link name
611  * \param[in] src_obj   source object for link
612  * \param[in] cattr     attributes for source object
613  *
614  * \retval              = 0 it is allowed to create the link file under tgt_obj
615  * \retval              negative error not allowed to create the link file
616  */
617 static int mdd_link_sanity_check(const struct lu_env *env,
618                                  struct mdd_object *tgt_obj,
619                                  const struct lu_attr *tattr,
620                                  const struct lu_name *lname,
621                                  struct mdd_object *src_obj,
622                                  const struct lu_attr *cattr)
623 {
624         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
625         int rc = 0;
626         ENTRY;
627
628         if (!mdd_object_exists(src_obj))
629                 RETURN(-ENOENT);
630
631         if (mdd_is_dead_obj(src_obj))
632                 RETURN(-ESTALE);
633
634         /* Local ops, no lookup before link, check filename length here. */
635         rc = mdd_name_check(m, lname);
636         if (rc < 0)
637                 RETURN(rc);
638
639         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
640                 RETURN(-EPERM);
641
642         if (S_ISDIR(mdd_object_type(src_obj)))
643                 RETURN(-EPERM);
644
645         LASSERT(src_obj != tgt_obj);
646         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
647         if (rc != 0)
648                 RETURN(rc);
649
650         rc = __mdd_may_link(env, src_obj, cattr);
651
652         RETURN(rc);
653 }
654
655 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
656                                    const char *name, struct thandle *handle)
657 {
658         struct dt_object *next = mdd_object_child(pobj);
659         int rc;
660         ENTRY;
661
662         if (dt_try_as_dir(env, next))
663                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
664         else
665                 rc = -ENOTDIR;
666
667         RETURN(rc);
668 }
669
670 static int __mdd_index_insert_only(const struct lu_env *env,
671                                    struct mdd_object *pobj,
672                                    const struct lu_fid *lf, __u32 type,
673                                    const char *name,
674                                    struct thandle *handle)
675 {
676         struct dt_object *next = mdd_object_child(pobj);
677         int               rc;
678         ENTRY;
679
680         if (dt_try_as_dir(env, next)) {
681                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
682                 struct lu_ucred         *uc  = lu_ucred_check(env);
683                 int                      ignore_quota;
684
685                 rec->rec_fid = lf;
686                 rec->rec_type = type;
687                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
688                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
689                                (const struct dt_key *)name, handle,
690                                ignore_quota);
691         } else {
692                 rc = -ENOTDIR;
693         }
694         RETURN(rc);
695 }
696
697 /* insert named index, add reference if isdir */
698 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
699                               const struct lu_fid *lf, __u32 type,
700                               const char *name, struct thandle *handle)
701 {
702         int rc;
703         ENTRY;
704
705         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
706         if (rc == 0 && S_ISDIR(type)) {
707                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
708                 mdo_ref_add(env, pobj, handle);
709                 mdd_write_unlock(env, pobj);
710         }
711
712         RETURN(rc);
713 }
714
715 /* delete named index, drop reference if isdir */
716 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
717                               const char *name, int is_dir,
718                               struct thandle *handle)
719 {
720         int               rc;
721         ENTRY;
722
723         rc = __mdd_index_delete_only(env, pobj, name, handle);
724         if (rc == 0 && is_dir) {
725                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
726                 mdo_ref_del(env, pobj, handle);
727                 mdd_write_unlock(env, pobj);
728         }
729
730         RETURN(rc);
731 }
732
733 static int mdd_llog_record_calc_size(const struct lu_env *env,
734                                      const struct lu_name *tname,
735                                      const struct lu_name *sname)
736 {
737         const struct lu_ucred   *uc = lu_ucred(env);
738         enum changelog_rec_flags crf = 0;
739
740         if (sname != NULL)
741                 crf |= CLF_RENAME;
742
743         if (uc != NULL && uc->uc_jobid[0] != '\0')
744                 crf |= CLF_JOBID;
745
746         return llog_data_len(LLOG_CHANGELOG_HDR_SZ + changelog_rec_offset(crf) +
747                              (tname != NULL ? tname->ln_namelen : 0) +
748                              (sname != NULL ? 1 + sname->ln_namelen : 0));
749 }
750
751 int mdd_declare_changelog_store(const struct lu_env *env,
752                                 struct mdd_device *mdd,
753                                 const struct lu_name *tname,
754                                 const struct lu_name *sname,
755                                 struct thandle *handle)
756 {
757         struct obd_device               *obd = mdd2obd_dev(mdd);
758         struct llog_ctxt                *ctxt;
759         struct llog_changelog_rec       *rec;
760         struct lu_buf                   *buf;
761         struct thandle                  *llog_th;
762         int                              reclen;
763         int                              rc;
764
765         /* Not recording */
766         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
767                 return 0;
768
769         reclen = mdd_llog_record_calc_size(env, tname, sname);
770         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
771         if (buf->lb_buf == NULL)
772                 return -ENOMEM;
773
774         rec = buf->lb_buf;
775         rec->cr_hdr.lrh_len = reclen;
776         rec->cr_hdr.lrh_type = CHANGELOG_REC;
777
778         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
779         if (ctxt == NULL)
780                 return -ENXIO;
781
782         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
783         if (IS_ERR(llog_th))
784                 GOTO(out_put, rc = PTR_ERR(llog_th));
785
786         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
787
788 out_put:
789         llog_ctxt_put(ctxt);
790
791         return rc;
792 }
793
794 /** Add a changelog entry \a rec to the changelog llog
795  * \param mdd
796  * \param rec
797  * \param handle - currently ignored since llogs start their own transaction;
798  *              this will hopefully be fixed in llog rewrite
799  * \retval 0 ok
800  */
801 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
802                         struct llog_changelog_rec *rec, struct thandle *th)
803 {
804         struct obd_device       *obd = mdd2obd_dev(mdd);
805         struct llog_ctxt        *ctxt;
806         struct thandle          *llog_th;
807         int                      rc;
808
809         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
810                                             changelog_rec_varsize(&rec->cr));
811
812         /* llog_lvfs_write_rec sets the llog tail len */
813         rec->cr_hdr.lrh_type = CHANGELOG_REC;
814         rec->cr.cr_time = cl_time();
815
816         spin_lock(&mdd->mdd_cl.mc_lock);
817         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
818          * but as long as the MDD transactions are ordered correctly for e.g.
819          * rename conflicts, I don't think this should matter. */
820         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
821         spin_unlock(&mdd->mdd_cl.mc_lock);
822
823         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
824         if (ctxt == NULL)
825                 return -ENXIO;
826
827         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
828         if (IS_ERR(llog_th))
829                 GOTO(out_put, rc = PTR_ERR(llog_th));
830
831         /* nested journal transaction */
832         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
833
834 out_put:
835         llog_ctxt_put(ctxt);
836         if (rc > 0)
837                 rc = 0;
838         return rc;
839 }
840
841 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
842                                          const struct lu_fid *sfid,
843                                          const struct lu_fid *spfid,
844                                          const struct lu_name *sname)
845 {
846         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
847         size_t                           extsize = sname->ln_namelen + 1;
848
849         LASSERT(sfid != NULL);
850         LASSERT(spfid != NULL);
851         LASSERT(sname != NULL);
852
853         rnm->cr_sfid = *sfid;
854         rnm->cr_spfid = *spfid;
855
856         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
857         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
858         rec->cr_namelen += extsize;
859 }
860
861 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
862 {
863         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
864
865         if (jobid == NULL || jobid[0] == '\0')
866                 return;
867
868         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
869 }
870
871 /** Store a namespace change changelog record
872  * If this fails, we must fail the whole transaction; we don't
873  * want the change to commit without the log entry.
874  * \param target - mdd_object of change
875  * \param tpfid - target parent dir/object fid
876  * \param sfid - source object fid
877  * \param spfid - source parent fid
878  * \param tname - target name string
879  * \param sname - source name string
880  * \param handle - transaction handle
881  */
882 int mdd_changelog_ns_store(const struct lu_env *env,
883                            struct mdd_device *mdd,
884                            enum changelog_rec_type type,
885                            enum changelog_rec_flags crf,
886                            struct mdd_object *target,
887                            const struct lu_fid *tpfid,
888                            const struct lu_fid *sfid,
889                            const struct lu_fid *spfid,
890                            const struct lu_name *tname,
891                            const struct lu_name *sname,
892                            struct thandle *handle)
893 {
894         const struct lu_ucred           *uc = lu_ucred(env);
895         struct llog_changelog_rec       *rec;
896         struct lu_buf                   *buf;
897         int                              reclen;
898         int                              rc;
899         ENTRY;
900
901         /* Not recording */
902         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
903                 RETURN(0);
904
905         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
906                 RETURN(0);
907
908         LASSERT(tpfid != NULL);
909         LASSERT(tname != NULL);
910         LASSERT(handle != NULL);
911
912         reclen = mdd_llog_record_calc_size(env, tname, sname);
913         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
914         if (buf->lb_buf == NULL)
915                 RETURN(-ENOMEM);
916         rec = buf->lb_buf;
917
918         crf &= CLF_FLAGMASK;
919
920         if (uc != NULL && uc->uc_jobid[0] != '\0')
921                 crf |= CLF_JOBID;
922
923         if (sname != NULL)
924                 crf |= CLF_RENAME;
925         else
926                 crf |= CLF_VERSION;
927
928         rec->cr.cr_flags = crf;
929         rec->cr.cr_type = (__u32)type;
930         rec->cr.cr_pfid = *tpfid;
931         rec->cr.cr_namelen = tname->ln_namelen;
932         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
933
934         if (crf & CLF_RENAME)
935                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
936
937         if (crf & CLF_JOBID)
938                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
939
940         if (likely(target != NULL)) {
941                 rec->cr.cr_tfid = *mdo2fid(target);
942                 target->mod_cltime = ktime_get();
943         } else {
944                 fid_zero(&rec->cr.cr_tfid);
945         }
946
947         rc = mdd_changelog_store(env, mdd, rec, handle);
948         if (rc < 0) {
949                 CERROR("%s: cannot store changelog record: type = %d, "
950                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
951                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
952                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
953                 return -EFAULT;
954         }
955
956         return 0;
957 }
958
959 static int __mdd_links_add(const struct lu_env *env,
960                            struct mdd_object *mdd_obj,
961                            struct linkea_data *ldata,
962                            const struct lu_name *lname,
963                            const struct lu_fid *pfid,
964                            int first, int check)
965 {
966         int rc;
967
968         if (ldata->ld_leh == NULL) {
969                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
970                 if (rc) {
971                         if (rc != -ENODATA)
972                                 return rc;
973                         rc = linkea_data_new(ldata,
974                                              &mdd_env_info(env)->mti_link_buf);
975                         if (rc)
976                                 return rc;
977                 }
978         }
979
980         if (check) {
981                 rc = linkea_links_find(ldata, lname, pfid);
982                 if (rc && rc != -ENOENT)
983                         return rc;
984                 if (rc == 0)
985                         return -EEXIST;
986         }
987
988         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
989                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
990
991                 *tfid = *pfid;
992                 tfid->f_ver = ~0;
993                 linkea_add_buf(ldata, lname, tfid);
994         }
995
996         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
997                 linkea_add_buf(ldata, lname, pfid);
998
999         return linkea_add_buf(ldata, lname, pfid);
1000 }
1001
1002 static int __mdd_links_del(const struct lu_env *env,
1003                            struct mdd_object *mdd_obj,
1004                            struct linkea_data *ldata,
1005                            const struct lu_name *lname,
1006                            const struct lu_fid *pfid)
1007 {
1008         int rc;
1009
1010         if (ldata->ld_leh == NULL) {
1011                 rc = mdd_links_read(env, mdd_obj, ldata);
1012                 if (rc)
1013                         return rc;
1014         }
1015
1016         rc = linkea_links_find(ldata, lname, pfid);
1017         if (rc)
1018                 return rc;
1019
1020         linkea_del_buf(ldata, lname);
1021         return 0;
1022 }
1023
1024 static int mdd_linkea_prepare(const struct lu_env *env,
1025                               struct mdd_object *mdd_obj,
1026                               const struct lu_fid *oldpfid,
1027                               const struct lu_name *oldlname,
1028                               const struct lu_fid *newpfid,
1029                               const struct lu_name *newlname,
1030                               int first, int check,
1031                               struct linkea_data *ldata)
1032 {
1033         int rc = 0;
1034         ENTRY;
1035
1036         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1037                 RETURN(0);
1038
1039         LASSERT(oldpfid != NULL || newpfid != NULL);
1040
1041         if (mdd_obj->mod_flags & DEAD_OBJ)
1042                 /* Unnecessary to update linkEA for dead object.  */
1043                 RETURN(0);
1044
1045         if (oldpfid != NULL) {
1046                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1047                 if (rc) {
1048                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1049                                 RETURN(rc);
1050
1051                         /* No changes done. */
1052                         rc = 0;
1053                 }
1054         }
1055
1056         /* If renaming, add the new record */
1057         if (newpfid != NULL)
1058                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1059                                      first, check);
1060
1061         RETURN(rc);
1062 }
1063
1064 int mdd_links_rename(const struct lu_env *env,
1065                      struct mdd_object *mdd_obj,
1066                      const struct lu_fid *oldpfid,
1067                      const struct lu_name *oldlname,
1068                      const struct lu_fid *newpfid,
1069                      const struct lu_name *newlname,
1070                      struct thandle *handle,
1071                      struct linkea_data *ldata,
1072                      int first, int check)
1073 {
1074         int rc = 0;
1075         ENTRY;
1076
1077         if (ldata == NULL) {
1078                 ldata = &mdd_env_info(env)->mti_link_data;
1079                 memset(ldata, 0, sizeof(*ldata));
1080                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1081                                         newpfid, newlname, first, check, ldata);
1082                 if (rc)
1083                         GOTO(out, rc);
1084         }
1085
1086         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1087                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1088
1089         GOTO(out, rc);
1090
1091 out:
1092         if (rc != 0) {
1093                 if (newlname == NULL)
1094                         CERROR("link_ea add failed %d "DFID"\n",
1095                                rc, PFID(mdd_object_fid(mdd_obj)));
1096                 else if (oldpfid == NULL)
1097                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1098                                newlname->ln_namelen, newlname->ln_name, rc,
1099                                PFID(mdd_object_fid(mdd_obj)));
1100                 else if (newpfid == NULL)
1101                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1102                                oldlname->ln_namelen, oldlname->ln_name, rc,
1103                                PFID(mdd_object_fid(mdd_obj)));
1104                 else
1105                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1106                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1107                                newlname->ln_namelen, newlname->ln_name, rc,
1108                                PFID(mdd_object_fid(mdd_obj)));
1109         }
1110
1111         if (is_vmalloc_addr(ldata->ld_buf))
1112                 /* if we vmalloced a large buffer drop it */
1113                 lu_buf_free(ldata->ld_buf);
1114
1115         return rc;
1116 }
1117
1118 static inline int mdd_links_add(const struct lu_env *env,
1119                                 struct mdd_object *mdd_obj,
1120                                 const struct lu_fid *pfid,
1121                                 const struct lu_name *lname,
1122                                 struct thandle *handle,
1123                                 struct linkea_data *ldata, int first)
1124 {
1125         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1126                                 pfid, lname, handle, ldata, first, 0);
1127 }
1128
1129 static inline int mdd_links_del(const struct lu_env *env,
1130                                 struct mdd_object *mdd_obj,
1131                                 const struct lu_fid *pfid,
1132                                 const struct lu_name *lname,
1133                                 struct thandle *handle)
1134 {
1135         return mdd_links_rename(env, mdd_obj, pfid, lname,
1136                                 NULL, NULL, handle, NULL, 0, 0);
1137 }
1138
1139 /** Read the link EA into a temp buffer.
1140  * Uses the name_buf since it is generally large.
1141  * \retval IS_ERR err
1142  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1143  */
1144 struct lu_buf *mdd_links_get(const struct lu_env *env,
1145                              struct mdd_object *mdd_obj)
1146 {
1147         struct linkea_data ldata = { NULL };
1148         int rc;
1149
1150         rc = mdd_links_read(env, mdd_obj, &ldata);
1151         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1152 }
1153
1154 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1155                     struct linkea_data *ldata, struct thandle *handle)
1156 {
1157         const struct lu_buf *buf;
1158         int                 rc;
1159
1160         if (ldata == NULL || ldata->ld_buf == NULL ||
1161             ldata->ld_leh == NULL)
1162                 return 0;
1163
1164         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1165                 return 0;
1166
1167 again:
1168         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1169                                 ldata->ld_leh->leh_len);
1170         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1171         if (unlikely(rc == -ENOSPC)) {
1172                 rc = linkea_overflow_shrink(ldata);
1173                 if (likely(rc > 0))
1174                         goto again;
1175         }
1176
1177         return rc;
1178 }
1179
1180 static int mdd_declare_links_add(const struct lu_env *env,
1181                                  struct mdd_object *mdd_obj,
1182                                  struct thandle *handle,
1183                                  struct linkea_data *ldata)
1184 {
1185         int     rc;
1186         int     ea_len;
1187         void    *linkea;
1188
1189         if (ldata != NULL && ldata->ld_leh != NULL) {
1190                 ea_len = ldata->ld_leh->leh_len;
1191                 linkea = ldata->ld_buf->lb_buf;
1192         } else {
1193                 ea_len = MAX_LINKEA_SIZE;
1194                 linkea = NULL;
1195         }
1196
1197         rc = mdo_declare_xattr_set(env, mdd_obj,
1198                                    mdd_buf_get_const(env, linkea, ea_len),
1199                                    XATTR_NAME_LINK, 0, handle);
1200
1201         return rc;
1202 }
1203
1204 static inline int mdd_declare_links_del(const struct lu_env *env,
1205                                         struct mdd_object *c,
1206                                         struct thandle *handle)
1207 {
1208         int rc = 0;
1209
1210         /* For directory, the linkEA will be removed together
1211          * with the object. */
1212         if (!S_ISDIR(mdd_object_type(c)))
1213                 rc = mdd_declare_links_add(env, c, handle, NULL);
1214
1215         return rc;
1216 }
1217
1218 static int mdd_declare_link(const struct lu_env *env,
1219                             struct mdd_device *mdd,
1220                             struct mdd_object *p,
1221                             struct mdd_object *c,
1222                             const struct lu_name *name,
1223                             struct thandle *handle,
1224                             struct lu_attr *la,
1225                             struct linkea_data *data)
1226 {
1227         struct lu_fid tfid = *mdo2fid(c);
1228         int rc;
1229
1230         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1231                 tfid.f_oid = cfs_fail_val;
1232
1233         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1234                                       name->ln_name, handle);
1235         if (rc != 0)
1236                 return rc;
1237
1238         rc = mdo_declare_ref_add(env, c, handle);
1239         if (rc != 0)
1240                 return rc;
1241
1242         la->la_valid = LA_CTIME | LA_MTIME;
1243         rc = mdo_declare_attr_set(env, p, la, handle);
1244         if (rc != 0)
1245                 return rc;
1246
1247         la->la_valid = LA_CTIME;
1248         rc = mdo_declare_attr_set(env, c, la, handle);
1249         if (rc != 0)
1250                 return rc;
1251
1252         rc = mdd_declare_links_add(env, c, handle, data);
1253         if (rc != 0)
1254                 return rc;
1255
1256         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1257
1258         return rc;
1259 }
1260
1261 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1262                     struct md_object *src_obj, const struct lu_name *lname,
1263                     struct md_attr *ma)
1264 {
1265         const char *name = lname->ln_name;
1266         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1267         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1268         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1269         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1270         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1271         struct mdd_device *mdd = mdo2mdd(src_obj);
1272         struct thandle *handle;
1273         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1274         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1275         int rc;
1276         ENTRY;
1277
1278         rc = mdd_la_get(env, mdd_sobj, cattr);
1279         if (rc != 0)
1280                 RETURN(rc);
1281
1282         rc = mdd_la_get(env, mdd_tobj, tattr);
1283         if (rc != 0)
1284                 RETURN(rc);
1285
1286         /*
1287          * If we are using project inheritance, we only allow hard link
1288          * creation in our tree when the project IDs are the same;
1289          * otherwise the tree quota mechanism could be circumvented.
1290          */
1291         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1292             (tattr->la_projid != cattr->la_projid))
1293                 RETURN(-EXDEV);
1294
1295         handle = mdd_trans_create(env, mdd);
1296         if (IS_ERR(handle))
1297                 GOTO(out_pending, rc = PTR_ERR(handle));
1298
1299         memset(ldata, 0, sizeof(*ldata));
1300
1301         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1302         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1303
1304         /* Note: even this function will change ldata, but it comes from
1305          * thread_info, which is completely temporary and only seen in
1306          * this function, so we do not need reset ldata once it fails.*/
1307         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1308                                 lname, 0, 0, ldata);
1309         if (rc != 0)
1310                 GOTO(stop, rc);
1311
1312         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1313                               la, ldata);
1314         if (rc)
1315                 GOTO(stop, rc);
1316
1317         rc = mdd_trans_start(env, mdd, handle);
1318         if (rc)
1319                 GOTO(stop, rc);
1320
1321         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1322         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1323                                    cattr);
1324         if (rc)
1325                 GOTO(out_unlock, rc);
1326
1327         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1328                 rc = mdo_ref_add(env, mdd_sobj, handle);
1329                 if (rc != 0)
1330                         GOTO(out_unlock, rc);
1331         }
1332
1333         *tfid = *mdo2fid(mdd_sobj);
1334         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1335                 tfid->f_oid = cfs_fail_val;
1336
1337         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1338                                      mdd_object_type(mdd_sobj), name, handle);
1339         if (rc != 0) {
1340                 mdo_ref_del(env, mdd_sobj, handle);
1341                 GOTO(out_unlock, rc);
1342         }
1343
1344         la->la_valid = LA_CTIME | LA_MTIME;
1345         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1346         if (rc)
1347                 GOTO(out_unlock, rc);
1348
1349         la->la_valid = LA_CTIME;
1350         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1351         if (rc == 0)
1352                 /* Note: The failure of links_add should not cause the
1353                  * link failure, so do not check return value. */
1354                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1355                               lname, handle, ldata, 0);
1356
1357         EXIT;
1358 out_unlock:
1359         mdd_write_unlock(env, mdd_sobj);
1360         if (rc == 0)
1361                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1362                                             mdo2fid(mdd_tobj), NULL, NULL,
1363                                             lname, NULL, handle);
1364 stop:
1365         rc = mdd_trans_stop(env, mdd, rc, handle);
1366         if (is_vmalloc_addr(ldata->ld_buf))
1367                 /* if we vmalloced a large buffer drop it */
1368                 lu_buf_free(ldata->ld_buf);
1369 out_pending:
1370         return rc;
1371 }
1372
1373 static int mdd_mark_orphan_object(const struct lu_env *env,
1374                                 struct mdd_object *obj, struct thandle *handle,
1375                                 bool declare)
1376 {
1377         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1378         int rc;
1379
1380         if (!S_ISDIR(mdd_object_type(obj)))
1381                 return 0;
1382
1383         attr->la_valid = LA_FLAGS;
1384         attr->la_flags = LUSTRE_ORPHAN_FL;
1385
1386         if (declare)
1387                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1388         else
1389                 rc = mdo_attr_set(env, obj, attr, handle);
1390
1391         return rc;
1392 }
1393
1394 static int mdd_declare_finish_unlink(const struct lu_env *env,
1395                                      struct mdd_object *obj,
1396                                      struct thandle *handle)
1397 {
1398         int     rc;
1399
1400         /* Sigh, we do not know if the unlink object will become orphan in
1401          * declare phase, but fortunately the flags here does not matter
1402          * in current declare implementation */
1403         rc = mdd_mark_orphan_object(env, obj, handle, true);
1404         if (rc != 0)
1405                 return rc;
1406
1407         rc = mdo_declare_destroy(env, obj, handle);
1408         if (rc != 0)
1409                 return rc;
1410
1411         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1412         if (rc != 0)
1413                 return rc;
1414
1415         return mdd_declare_links_del(env, obj, handle);
1416 }
1417
1418 /* caller should take a lock before calling */
1419 int mdd_finish_unlink(const struct lu_env *env,
1420                       struct mdd_object *obj, struct md_attr *ma,
1421                       const struct mdd_object *pobj,
1422                       const struct lu_name *lname,
1423                       struct thandle *th)
1424 {
1425         int rc = 0;
1426         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1427         ENTRY;
1428
1429         LASSERT(mdd_write_locked(env, obj) != 0);
1430
1431         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1432                 /* add new orphan and the object
1433                  * will be deleted during mdd_close() */
1434                 obj->mod_flags |= DEAD_OBJ;
1435                 if (obj->mod_count) {
1436                         rc = __mdd_orphan_add(env, obj, th);
1437                         if (rc == 0)
1438                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1439                                         "orphan list, open count = %d\n",
1440                                         PFID(mdd_object_fid(obj)),
1441                                         obj->mod_count);
1442                         else
1443                                 CERROR("Object "DFID" fail to be an orphan, "
1444                                        "open count = %d, maybe cause failed "
1445                                        "open replay\n",
1446                                         PFID(mdd_object_fid(obj)),
1447                                         obj->mod_count);
1448
1449                         /* mark object as an orphan here, not
1450                          * before __mdd_orphan_add() as racing
1451                          * mdd_la_get() may propagate ORPHAN_OBJ
1452                          * causing the asserition */
1453                         rc = mdd_mark_orphan_object(env, obj, th, false);
1454                 } else {
1455                         rc = mdo_destroy(env, obj, th);
1456                 }
1457         } else if (!is_dir) {
1458                 /* old files may not have link ea; ignore errors */
1459                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1460         }
1461
1462         RETURN(rc);
1463 }
1464
1465 /*
1466  * pobj maybe NULL
1467  * has mdd_write_lock on cobj already, but not on pobj yet
1468  */
1469 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1470                             const struct lu_attr *pattr,
1471                             struct mdd_object *cobj,
1472                             const struct lu_attr *cattr)
1473 {
1474         int rc;
1475         ENTRY;
1476
1477         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1478
1479         RETURN(rc);
1480 }
1481
1482 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1483                               struct mdd_object *p, struct mdd_object *c,
1484                               const struct lu_name *name, struct md_attr *ma,
1485                               struct thandle *handle, int no_name, int is_dir)
1486 {
1487         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1488         int              rc;
1489
1490         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1491                 if (likely(no_name == 0)) {
1492                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1493                                                       handle);
1494                         if (rc != 0)
1495                                 return rc;
1496                 }
1497
1498                 if (is_dir != 0) {
1499                         rc = mdo_declare_ref_del(env, p, handle);
1500                         if (rc != 0)
1501                                 return rc;
1502                 }
1503         }
1504
1505         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1506         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1507         la->la_valid = LA_CTIME | LA_MTIME;
1508         rc = mdo_declare_attr_set(env, p, la, handle);
1509         if (rc)
1510                 return rc;
1511
1512         if (c != NULL) {
1513                 rc = mdo_declare_ref_del(env, c, handle);
1514                 if (rc)
1515                         return rc;
1516
1517                 rc = mdo_declare_ref_del(env, c, handle);
1518                 if (rc)
1519                         return rc;
1520
1521                 la->la_valid = LA_CTIME;
1522                 rc = mdo_declare_attr_set(env, c, la, handle);
1523                 if (rc)
1524                         return rc;
1525
1526                 rc = mdd_declare_finish_unlink(env, c, handle);
1527                 if (rc)
1528                         return rc;
1529
1530                 /* FIXME: need changelog for remove entry */
1531                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1532         }
1533
1534         return rc;
1535 }
1536
1537 /*
1538  * test if a file has an HSM archive
1539  * if HSM attributes are not found in ma update them from
1540  * HSM xattr
1541  */
1542 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1543                                    struct mdd_object *obj,
1544                                    struct md_attr *ma)
1545 {
1546         ENTRY;
1547
1548         if (!(ma->ma_valid & MA_HSM)) {
1549                 /* no HSM MD provided, read xattr */
1550                 struct lu_buf   *hsm_buf;
1551                 const size_t     buflen = sizeof(struct hsm_attrs);
1552                 int              rc;
1553
1554                 hsm_buf = mdd_buf_get(env, NULL, 0);
1555                 lu_buf_alloc(hsm_buf, buflen);
1556                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1557                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1558                 lu_buf_free(hsm_buf);
1559                 if (rc < 0)
1560                         RETURN(false);
1561
1562                 ma->ma_valid |= MA_HSM;
1563         }
1564         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1565                 RETURN(true);
1566         RETURN(false);
1567 }
1568
1569 /**
1570  * Delete name entry and the object.
1571  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1572  * does not exist for this object, and it could only happen during resending
1573  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1574  * is also needed in this case(needed by changelog), so we have to add another
1575  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1576  * the ENOENT failure should be able to be fixed by redo mechanism.
1577  */
1578 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1579                       struct md_object *cobj, const struct lu_name *lname,
1580                       struct md_attr *ma, int no_name)
1581 {
1582         const char *name = lname->ln_name;
1583         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1584         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1585         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1586         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1587         struct mdd_object *mdd_cobj = NULL;
1588         struct mdd_device *mdd = mdo2mdd(pobj);
1589         struct thandle    *handle;
1590         int rc, is_dir = 0, cl_flags = 0;
1591         ENTRY;
1592
1593         /* cobj == NULL means only delete name entry */
1594         if (likely(cobj != NULL)) {
1595                 mdd_cobj = md2mdd_obj(cobj);
1596                 if (mdd_object_exists(mdd_cobj) == 0)
1597                         RETURN(-ENOENT);
1598         }
1599
1600         rc = mdd_la_get(env, mdd_pobj, pattr);
1601         if (rc)
1602                 RETURN(rc);
1603
1604         if (likely(mdd_cobj != NULL)) {
1605                 /* fetch cattr */
1606                 rc = mdd_la_get(env, mdd_cobj, cattr);
1607                 if (rc)
1608                         RETURN(rc);
1609
1610                 is_dir = S_ISDIR(cattr->la_mode);
1611                 /* search for an existing archive.
1612                  * we should check ahead as the object
1613                  * can be destroyed in this transaction */
1614                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1615                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1616         }
1617
1618         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1619         if (rc)
1620                 RETURN(rc);
1621
1622         handle = mdd_trans_create(env, mdd);
1623         if (IS_ERR(handle))
1624                 RETURN(PTR_ERR(handle));
1625
1626         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1627                                 lname, ma, handle, no_name, is_dir);
1628         if (rc)
1629                 GOTO(stop, rc);
1630
1631         rc = mdd_trans_start(env, mdd, handle);
1632         if (rc)
1633                 GOTO(stop, rc);
1634
1635         if (likely(mdd_cobj != NULL))
1636                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1637
1638         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1639                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1640                 if (rc)
1641                         GOTO(cleanup, rc);
1642         }
1643
1644         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1645             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1646                 GOTO(cleanup, rc = 0);
1647
1648         if (likely(mdd_cobj != NULL)) {
1649                 rc = mdo_ref_del(env, mdd_cobj, handle);
1650                 if (rc != 0) {
1651                         __mdd_index_insert_only(env, mdd_pobj,
1652                                                 mdo2fid(mdd_cobj),
1653                                                 mdd_object_type(mdd_cobj),
1654                                                 name, handle);
1655                         GOTO(cleanup, rc);
1656                 }
1657
1658                 if (is_dir)
1659                         /* unlink dot */
1660                         mdo_ref_del(env, mdd_cobj, handle);
1661
1662                 /* fetch updated nlink */
1663                 rc = mdd_la_get(env, mdd_cobj, cattr);
1664                 if (rc)
1665                         GOTO(cleanup, rc);
1666         }
1667
1668         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1669         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1670
1671         la->la_valid = LA_CTIME | LA_MTIME;
1672         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1673         if (rc)
1674                 GOTO(cleanup, rc);
1675
1676         /* Enough for only unlink the entry */
1677         if (unlikely(mdd_cobj == NULL))
1678                 GOTO(stop, rc);
1679
1680         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1681                 /* update ctime of an unlinked file only if it is still
1682                  * opened or a link still exists */
1683                 la->la_valid = LA_CTIME;
1684                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1685                 if (rc)
1686                         GOTO(cleanup, rc);
1687         }
1688
1689         /* XXX: this transfer to ma will be removed with LOD/OSP */
1690         ma->ma_attr = *cattr;
1691         ma->ma_valid |= MA_INODE;
1692         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1693         if (rc != 0)
1694                 GOTO(cleanup, rc);
1695
1696         /* fetch updated nlink */
1697         rc = mdd_la_get(env, mdd_cobj, cattr);
1698         /* if object is removed then we can't get its attrs,
1699          * use last get */
1700         if (rc == -ENOENT) {
1701                 cattr->la_nlink = 0;
1702                 rc = 0;
1703         }
1704
1705         if (cattr->la_nlink == 0) {
1706                 ma->ma_attr = *cattr;
1707                 ma->ma_valid |= MA_INODE;
1708         }
1709
1710         EXIT;
1711 cleanup:
1712         if (likely(mdd_cobj != NULL))
1713                 mdd_write_unlock(env, mdd_cobj);
1714
1715         if (rc == 0) {
1716                 if (cattr->la_nlink == 0)
1717                         cl_flags |= CLF_UNLINK_LAST;
1718                 else
1719                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1720
1721                 rc = mdd_changelog_ns_store(env, mdd,
1722                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1723                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1724                         handle);
1725         }
1726
1727 stop:
1728         rc = mdd_trans_stop(env, mdd, rc, handle);
1729
1730         return rc;
1731 }
1732
1733 /*
1734  * The permission has been checked when obj created, no need check again.
1735  */
1736 static int mdd_cd_sanity_check(const struct lu_env *env,
1737                                struct mdd_object *obj)
1738 {
1739         ENTRY;
1740
1741         /* EEXIST check */
1742         if (!obj || mdd_is_dead_obj(obj))
1743                 RETURN(-ENOENT);
1744
1745         RETURN(0);
1746 }
1747
1748 static int mdd_create_data(const struct lu_env *env,
1749                            struct md_object *pobj,
1750                            struct md_object *cobj,
1751                            const struct md_op_spec *spec,
1752                            struct md_attr *ma)
1753 {
1754         struct mdd_device *mdd = mdo2mdd(cobj);
1755         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1756         struct mdd_object *son = md2mdd_obj(cobj);
1757         struct thandle    *handle;
1758         const struct lu_buf *buf;
1759         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1760         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1761         int                rc;
1762         ENTRY;
1763
1764         rc = mdd_cd_sanity_check(env, son);
1765         if (rc)
1766                 RETURN(rc);
1767
1768         if (!md_should_create(spec->sp_cr_flags))
1769                 RETURN(0);
1770
1771         /*
1772          * there are following use cases for this function:
1773          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1774          *    striping can be specified or not
1775          * 2) CMD?
1776          */
1777         rc = mdd_la_get(env, son, attr);
1778         if (rc)
1779                 RETURN(rc);
1780
1781         /* calling ->ah_make_hint() is used to transfer information from parent */
1782         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1783
1784         handle = mdd_trans_create(env, mdd);
1785         if (IS_ERR(handle))
1786                 GOTO(out_free, rc = PTR_ERR(handle));
1787
1788         /*
1789          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1790          * Should this be fixed?
1791          */
1792         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1793                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1794                spec->sp_cr_flags, spec->no_create);
1795
1796         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1797                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1798                                         spec->u.sp_ea.eadatalen);
1799         } else {
1800                 buf = &LU_BUF_NULL;
1801         }
1802
1803         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1804                                   XATTR_NAME_LOV, 0, handle);
1805         if (rc)
1806                 GOTO(stop, rc);
1807
1808         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1809         if (rc)
1810                 GOTO(stop, rc);
1811
1812         rc = mdd_trans_start(env, mdd, handle);
1813         if (rc)
1814                 GOTO(stop, rc);
1815
1816         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1817                           0, handle);
1818
1819         if (rc)
1820                 GOTO(stop, rc);
1821
1822         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1823
1824 stop:
1825         rc = mdd_trans_stop(env, mdd, rc, handle);
1826
1827 out_free:
1828         RETURN(rc);
1829 }
1830
1831 static int mdd_declare_object_initialize(const struct lu_env *env,
1832                                          struct mdd_object *parent,
1833                                          struct mdd_object *child,
1834                                          const struct lu_attr *attr,
1835                                          struct thandle *handle)
1836 {
1837         int rc;
1838         ENTRY;
1839
1840         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1841         if (!S_ISDIR(attr->la_mode))
1842                 RETURN(0);
1843
1844         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1845                                       dot, handle);
1846         if (rc != 0)
1847                 RETURN(rc);
1848
1849         rc = mdo_declare_ref_add(env, child, handle);
1850         if (rc != 0)
1851                 RETURN(rc);
1852
1853         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1854                                       dotdot, handle);
1855
1856         RETURN(rc);
1857 }
1858
1859 static int mdd_object_initialize(const struct lu_env *env,
1860                                  const struct lu_fid *pfid,
1861                                  struct mdd_object *child,
1862                                  struct lu_attr *attr, struct thandle *handle,
1863                                  const struct md_op_spec *spec)
1864 {
1865         int rc = 0;
1866         ENTRY;
1867
1868         if (S_ISDIR(attr->la_mode)) {
1869                 /* Add "." and ".." for newly created dir */
1870                 mdo_ref_add(env, child, handle);
1871                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1872                                              S_IFDIR, dot, handle);
1873                 if (rc == 0)
1874                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1875                                                      dotdot, handle);
1876                 if (rc != 0)
1877                         mdo_ref_del(env, child, handle);
1878         }
1879
1880         RETURN(rc);
1881 }
1882
1883 /**
1884  * This function checks whether it can create a file/dir under the
1885  * directory(@pobj). The directory(@pobj) is not being locked by
1886  * mdd lock.
1887  *
1888  * \param[in] env       execution environment
1889  * \param[in] pobj      the directory to create files
1890  * \param[in] pattr     the attributes of the directory
1891  * \param[in] lname     the name of the created file/dir
1892  * \param[in] cattr     the attributes of the file/dir
1893  * \param[in] spec      create specification
1894  *
1895  * \retval              = 0 it is allowed to create file/dir under
1896  *                      the directory
1897  * \retval              negative error not allowed to create file/dir
1898  *                      under the directory
1899  */
1900 static int mdd_create_sanity_check(const struct lu_env *env,
1901                                    struct md_object *pobj,
1902                                    const struct lu_attr *pattr,
1903                                    const struct lu_name *lname,
1904                                    struct lu_attr *cattr,
1905                                    struct md_op_spec *spec)
1906 {
1907         struct mdd_thread_info *info = mdd_env_info(env);
1908         struct lu_fid     *fid       = &info->mti_fid;
1909         struct mdd_object *obj       = md2mdd_obj(pobj);
1910         struct mdd_device *m         = mdo2mdd(pobj);
1911         bool            check_perm = true;
1912         int rc;
1913         ENTRY;
1914
1915         /* EEXIST check */
1916         if (mdd_is_dead_obj(obj))
1917                 RETURN(-ENOENT);
1918
1919         /*
1920          * In some cases this lookup is not needed - we know before if name
1921          * exists or not because MDT performs lookup for it.
1922          * name length check is done in lookup.
1923          */
1924         if (spec->sp_cr_lookup) {
1925                 /*
1926                  * Check if the name already exist, though it will be checked in
1927                  * _index_insert also, for avoiding rolling back if exists
1928                  * _index_insert.
1929                  */
1930                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1931                                   MAY_WRITE | MAY_EXEC);
1932                 if (rc != -ENOENT)
1933                         RETURN(rc ? : -EEXIST);
1934
1935                 /* Permission is already being checked in mdd_lookup */
1936                 check_perm = false;
1937         }
1938
1939         if (S_ISDIR(cattr->la_mode) &&
1940             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
1941             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
1942                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
1943
1944                 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC) &&
1945                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
1946                         rc = -EINVAL;
1947                         CERROR("%s: invalid lmv_user_md: magic = %x, "
1948                                "stripe_offset = %d, stripe_count = %u: "
1949                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
1950                                 le32_to_cpu(lum->lum_magic),
1951                                (int)le32_to_cpu(lum->lum_stripe_offset),
1952                                le32_to_cpu(lum->lum_stripe_count), rc);
1953                         return rc;
1954                 }
1955         }
1956
1957         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1958         if (rc != 0)
1959                 RETURN(rc);
1960
1961         /* sgid check */
1962         if (pattr->la_mode & S_ISGID) {
1963                 cattr->la_gid = pattr->la_gid;
1964                 if (S_ISDIR(cattr->la_mode)) {
1965                         cattr->la_mode |= S_ISGID;
1966                         cattr->la_valid |= LA_MODE;
1967                 }
1968         }
1969
1970         /* Inherit project ID from parent directory */
1971         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
1972                 cattr->la_projid = pattr->la_projid;
1973                 if (S_ISDIR(cattr->la_mode)) {
1974                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
1975                         cattr->la_valid |= LA_FLAGS;
1976                 }
1977                 cattr->la_valid |= LA_PROJID;
1978         }
1979
1980         rc = mdd_name_check(m, lname);
1981         if (rc < 0)
1982                 RETURN(rc);
1983
1984         switch (cattr->la_mode & S_IFMT) {
1985         case S_IFLNK: {
1986                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1987
1988                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
1989                         RETURN(-ENAMETOOLONG);
1990                 else
1991                         RETURN(0);
1992         }
1993         case S_IFDIR:
1994         case S_IFREG:
1995         case S_IFCHR:
1996         case S_IFBLK:
1997         case S_IFIFO:
1998         case S_IFSOCK:
1999                 rc = 0;
2000                 break;
2001         default:
2002                 rc = -EINVAL;
2003                 break;
2004         }
2005         RETURN(rc);
2006 }
2007
2008 static int mdd_declare_create_object(const struct lu_env *env,
2009                                      struct mdd_device *mdd,
2010                                      struct mdd_object *p, struct mdd_object *c,
2011                                      struct lu_attr *attr,
2012                                      struct thandle *handle,
2013                                      const struct md_op_spec *spec,
2014                                      struct lu_buf *def_acl_buf,
2015                                      struct lu_buf *acl_buf,
2016                                      struct dt_allocation_hint *hint)
2017 {
2018         const struct lu_buf *buf;
2019         int rc;
2020
2021         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2022                                                 hint);
2023         if (rc)
2024                 GOTO(out, rc);
2025
2026 #ifdef CONFIG_FS_POSIX_ACL
2027         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2028                 /* if dir, then can inherit default ACl */
2029                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2030                                            XATTR_NAME_ACL_DEFAULT,
2031                                            0, handle);
2032                 if (rc)
2033                         GOTO(out, rc);
2034         }
2035
2036         if (acl_buf->lb_len > 0) {
2037                 rc = mdo_declare_attr_set(env, c, attr, handle);
2038                 if (rc)
2039                         GOTO(out, rc);
2040
2041                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2042                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2043                 if (rc)
2044                         GOTO(out, rc);
2045         }
2046 #endif
2047         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2048         if (rc)
2049                 GOTO(out, rc);
2050
2051         /* replay case, create LOV EA from client data */
2052         if (spec->no_create ||
2053             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2054                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2055                                         spec->u.sp_ea.eadatalen);
2056                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2057                                            handle);
2058                 if (rc)
2059                         GOTO(out, rc);
2060         }
2061
2062         if (S_ISLNK(attr->la_mode)) {
2063                 const char *target_name = spec->u.sp_symname;
2064                 int sym_len = strlen(target_name);
2065                 const struct lu_buf *buf;
2066
2067                 buf = mdd_buf_get_const(env, target_name, sym_len);
2068                 rc = dt_declare_record_write(env, mdd_object_child(c),
2069                                              buf, 0, handle);
2070                 if (rc)
2071                         GOTO(out, rc);
2072         }
2073
2074         if (spec->sp_cr_file_secctx_name != NULL) {
2075                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2076                                         spec->sp_cr_file_secctx_size);
2077                 rc = mdo_declare_xattr_set(env, c, buf,
2078                                            spec->sp_cr_file_secctx_name, 0,
2079                                            handle);
2080                 if (rc < 0)
2081                         GOTO(out, rc);
2082         }
2083 out:
2084         return rc;
2085 }
2086
2087 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2088                               struct mdd_object *p, struct mdd_object *c,
2089                               const struct lu_name *name,
2090                               struct lu_attr *attr,
2091                               struct thandle *handle,
2092                               const struct md_op_spec *spec,
2093                               struct linkea_data *ldata,
2094                               struct lu_buf *def_acl_buf,
2095                               struct lu_buf *acl_buf,
2096                               struct dt_allocation_hint *hint)
2097 {
2098         int rc;
2099
2100         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2101                                        def_acl_buf, acl_buf, hint);
2102         if (rc)
2103                 GOTO(out, rc);
2104
2105         if (S_ISDIR(attr->la_mode)) {
2106                 rc = mdo_declare_ref_add(env, p, handle);
2107                 if (rc)
2108                         GOTO(out, rc);
2109         }
2110
2111         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2112                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2113                 if (rc)
2114                         GOTO(out, rc);
2115         } else {
2116                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2117
2118                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2119                                               name->ln_name, handle);
2120                 if (rc != 0)
2121                         return rc;
2122
2123                 rc = mdd_declare_links_add(env, c, handle, ldata);
2124                 if (rc)
2125                         return rc;
2126
2127                 *la = *attr;
2128                 la->la_valid = LA_CTIME | LA_MTIME;
2129                 rc = mdo_declare_attr_set(env, p, la, handle);
2130                 if (rc)
2131                         return rc;
2132
2133                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2134                 if (rc)
2135                         return rc;
2136         }
2137 out:
2138         return rc;
2139 }
2140
2141 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2142                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2143                         struct lu_buf *acl_buf)
2144 {
2145         int     rc;
2146         ENTRY;
2147
2148         if (S_ISLNK(la->la_mode)) {
2149                 acl_buf->lb_len = 0;
2150                 def_acl_buf->lb_len = 0;
2151                 RETURN(0);
2152         }
2153
2154         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2155         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2156                            XATTR_NAME_ACL_DEFAULT);
2157         mdd_read_unlock(env, pobj);
2158         if (rc > 0) {
2159                 /* If there are default ACL, fix mode/ACL by default ACL */
2160                 def_acl_buf->lb_len = rc;
2161                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2162                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2163                 acl_buf->lb_len = rc;
2164                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2165                 if (rc < 0)
2166                         RETURN(rc);
2167         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2168                 /* If there are no default ACL, fix mode by mask */
2169                 struct lu_ucred *uc = lu_ucred(env);
2170
2171                 /* The create triggered by MDT internal events, such as
2172                  * LFSCK reset, will not contain valid "uc". */
2173                 if (unlikely(uc != NULL))
2174                         la->la_mode &= ~uc->uc_umask;
2175                 rc = 0;
2176                 acl_buf->lb_len = 0;
2177                 def_acl_buf->lb_len = 0;
2178         }
2179
2180         RETURN(rc);
2181 }
2182
2183 /**
2184  * Create a metadata object and initialize it, set acl, xattr.
2185  **/
2186 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2187                              struct mdd_object *son, struct lu_attr *attr,
2188                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2189                              struct lu_buf *def_acl_buf,
2190                              struct dt_allocation_hint *hint,
2191                              struct thandle *handle)
2192 {
2193         const struct lu_buf *buf;
2194         int rc;
2195
2196         mdd_write_lock(env, son, MOR_TGT_CHILD);
2197         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2198                                         hint);
2199         if (rc)
2200                 GOTO(unlock, rc);
2201
2202         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2203          * created in declare phase, they also needs to be added to master
2204          * object as sub-directory entry. So it has to initialize the master
2205          * object, then set dir striped EA.(in mdo_xattr_set) */
2206         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2207                                    spec);
2208         if (rc != 0)
2209                 GOTO(err_destroy, rc);
2210
2211         /*
2212          * in case of replay we just set LOVEA provided by the client
2213          * XXX: I think it would be interesting to try "old" way where
2214          *      MDT calls this xattr_set(LOV) in a different transaction.
2215          *      probably this way we code can be made better.
2216          */
2217
2218         /* During creation, there are only a few cases we need do xattr_set to
2219          * create stripes.
2220          * 1. regular file: see comments above.
2221          * 2. dir: inherit default striping or pool settings from parent.
2222          * 3. create striped directory with provided stripeEA.
2223          * 4. create striped directory because inherit default layout from the
2224          * parent.
2225          */
2226         if (spec->no_create ||
2227             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2228             S_ISDIR(attr->la_mode)) {
2229                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2230                                         spec->u.sp_ea.eadatalen);
2231                 rc = mdo_xattr_set(env, son, buf,
2232                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2233                                                             XATTR_NAME_LOV, 0,
2234                                    handle);
2235                 if (rc != 0)
2236                         GOTO(err_destroy, rc);
2237         }
2238
2239 #ifdef CONFIG_FS_POSIX_ACL
2240         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2241             S_ISDIR(attr->la_mode)) {
2242                 /* set default acl */
2243                 rc = mdo_xattr_set(env, son, def_acl_buf,
2244                                    XATTR_NAME_ACL_DEFAULT, 0,
2245                                    handle);
2246                 if (rc)
2247                         GOTO(err_destroy, rc);
2248         }
2249         /* set its own acl */
2250         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2251                 rc = mdo_xattr_set(env, son, acl_buf,
2252                                    XATTR_NAME_ACL_ACCESS,
2253                                    0, handle);
2254                 if (rc)
2255                         GOTO(err_destroy, rc);
2256         }
2257 #endif
2258
2259         if (S_ISLNK(attr->la_mode)) {
2260                 struct lu_ucred  *uc = lu_ucred_assert(env);
2261                 struct dt_object *dt = mdd_object_child(son);
2262                 const char *target_name = spec->u.sp_symname;
2263                 int sym_len = strlen(target_name);
2264                 loff_t pos = 0;
2265
2266                 buf = mdd_buf_get_const(env, target_name, sym_len);
2267                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2268                                                 uc->uc_cap &
2269                                                 CFS_CAP_SYS_RESOURCE_MASK);
2270
2271                 if (rc == sym_len)
2272                         rc = 0;
2273                 else
2274                         GOTO(err_initlized, rc = -EFAULT);
2275         }
2276
2277         if (spec->sp_cr_file_secctx_name != NULL) {
2278                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2279                                         spec->sp_cr_file_secctx_size);
2280                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2281                                    0, handle);
2282                 if (rc < 0)
2283                         GOTO(err_initlized, rc);
2284         }
2285
2286 err_initlized:
2287         if (unlikely(rc != 0)) {
2288                 int rc2;
2289                 if (S_ISDIR(attr->la_mode)) {
2290                         /* Drop the reference, no need to delete "."/"..",
2291                          * because the object to be destroied directly. */
2292                         rc2 = mdo_ref_del(env, son, handle);
2293                         if (rc2 != 0)
2294                                 GOTO(unlock, rc);
2295                 }
2296                 rc2 = mdo_ref_del(env, son, handle);
2297                 if (rc2 != 0)
2298                         GOTO(unlock, rc);
2299 err_destroy:
2300                 mdo_destroy(env, son, handle);
2301         }
2302 unlock:
2303         mdd_write_unlock(env, son);
2304         RETURN(rc);
2305 }
2306
2307 static int mdd_index_delete(const struct lu_env *env,
2308                             struct mdd_object *mdd_pobj,
2309                             struct lu_attr *cattr,
2310                             const struct lu_name *lname)
2311 {
2312         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2313         struct thandle *handle;
2314         int rc;
2315         ENTRY;
2316
2317         handle = mdd_trans_create(env, mdd);
2318         if (IS_ERR(handle))
2319                 RETURN(PTR_ERR(handle));
2320
2321         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2322                                       handle);
2323         if (rc != 0)
2324                 GOTO(stop, rc);
2325
2326         if (S_ISDIR(cattr->la_mode)) {
2327                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2328                 if (rc != 0)
2329                         GOTO(stop, rc);
2330         }
2331
2332         /* Since this will only be used in the error handler path,
2333          * Let's set the thandle to be local and not mess the transno */
2334         handle->th_local = 1;
2335         rc = mdd_trans_start(env, mdd, handle);
2336         if (rc)
2337                 GOTO(stop, rc);
2338
2339         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2340                                 S_ISDIR(cattr->la_mode), handle);
2341         if (rc)
2342                 GOTO(stop, rc);
2343 stop:
2344         rc = mdd_trans_stop(env, mdd, rc, handle);
2345
2346         RETURN(rc);
2347 }
2348
2349 /**
2350  * Create object and insert it into namespace.
2351  *
2352  * Two operations have to be performed:
2353  *
2354  *  - an allocation of a new object (->do_create()), and
2355  *  - an insertion into a parent index (->dio_insert()).
2356  *
2357  * Due to locking, operation order is not important, when both are
2358  * successful, *but* error handling cases are quite different:
2359  *
2360  *  - if insertion is done first, and following object creation fails,
2361  *  insertion has to be rolled back, but this operation might fail
2362  *  also leaving us with dangling index entry.
2363  *
2364  *  - if creation is done first, is has to be undone if insertion fails,
2365  *  leaving us with leaked space, which is not good but not fatal.
2366  *
2367  * It seems that creation-first is simplest solution, but it is sub-optimal
2368  * in the frequent
2369  *
2370  * $ mkdir foo
2371  * $ mkdir foo
2372  *
2373  * case, because second mkdir is bound to create object, only to
2374  * destroy it immediately.
2375  *
2376  * To avoid this follow local file systems that do double lookup:
2377  *
2378  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2379  * 1. create            (mdd_create_object_internal())
2380  * 2. insert            (__mdd_index_insert(), lookup again)
2381  *
2382  * \param[in] pobj      parent object
2383  * \param[in] lname     name of child being created
2384  * \param[in,out] child child object being created
2385  * \param[in] spec      additional create parameters
2386  * \param[in] ma        attributes for new child object
2387  *
2388  * \retval              0 on success
2389  * \retval              negative errno on failure
2390  */
2391 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2392                       const struct lu_name *lname, struct md_object *child,
2393                       struct md_op_spec *spec, struct md_attr *ma)
2394 {
2395         struct mdd_thread_info  *info = mdd_env_info(env);
2396         struct lu_attr          *la = &info->mti_la_for_fix;
2397         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2398         struct mdd_object       *son = md2mdd_obj(child);
2399         struct mdd_device       *mdd = mdo2mdd(pobj);
2400         struct lu_attr          *attr = &ma->ma_attr;
2401         struct thandle          *handle;
2402         struct lu_attr          *pattr = &info->mti_pattr;
2403         struct lu_buf           acl_buf;
2404         struct lu_buf           def_acl_buf;
2405         struct linkea_data      *ldata = &info->mti_link_data;
2406         const char              *name = lname->ln_name;
2407         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2408         int                      rc;
2409         int                      rc2;
2410         ENTRY;
2411
2412         rc = mdd_la_get(env, mdd_pobj, pattr);
2413         if (rc != 0)
2414                 RETURN(rc);
2415
2416         /* Sanity checks before big job. */
2417         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2418         if (rc)
2419                 RETURN(rc);
2420
2421         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2422                 GOTO(out_free, rc = -EINPROGRESS);
2423
2424         handle = mdd_trans_create(env, mdd);
2425         if (IS_ERR(handle))
2426                 GOTO(out_free, rc = PTR_ERR(handle));
2427
2428         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2429                                mdd->mdd_dt_conf.ddp_max_ea_size);
2430         acl_buf = info->mti_xattr_buf;
2431         def_acl_buf.lb_buf = info->mti_key;
2432         def_acl_buf.lb_len = sizeof(info->mti_key);
2433         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2434         if (rc < 0)
2435                 GOTO(out_stop, rc);
2436
2437         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2438
2439         memset(ldata, 0, sizeof(*ldata));
2440         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2441                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2442
2443                 tfid.f_oid--;
2444                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2445                                         &tfid, lname, 1, 0, ldata);
2446         } else {
2447                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2448                                         mdd_object_fid(mdd_pobj),
2449                                         lname, 1, 0, ldata);
2450         }
2451
2452         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2453                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2454                                 hint);
2455         if (rc)
2456                 GOTO(out_stop, rc);
2457
2458         rc = mdd_trans_start(env, mdd, handle);
2459         if (rc)
2460                 GOTO(out_stop, rc);
2461
2462         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2463                                &def_acl_buf, hint, handle);
2464         if (rc != 0)
2465                 GOTO(out_stop, rc);
2466
2467         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2468                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2469                 son->mod_flags |= VOLATILE_OBJ;
2470                 rc = __mdd_orphan_add(env, son, handle);
2471                 GOTO(out_volatile, rc);
2472         } else {
2473                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2474                                         attr->la_mode, name, handle);
2475                 if (rc != 0)
2476                         GOTO(err_created, rc);
2477
2478                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2479                               ldata, 1);
2480
2481                 /* update parent directory mtime/ctime */
2482                 *la = *attr;
2483                 la->la_valid = LA_CTIME | LA_MTIME;
2484                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2485                 if (rc)
2486                         GOTO(err_insert, rc);
2487         }
2488
2489         EXIT;
2490 err_insert:
2491         if (rc != 0) {
2492                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2493                         rc2 = __mdd_orphan_del(env, son, handle);
2494                 else
2495                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2496                                                  S_ISDIR(attr->la_mode),
2497                                                  handle);
2498                 if (rc2 != 0)
2499                         goto out_stop;
2500
2501 err_created:
2502                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2503                 if (S_ISDIR(attr->la_mode)) {
2504                         /* Drop the reference, no need to delete "."/"..",
2505                          * because the object is to be destroyed directly. */
2506                         rc2 = mdo_ref_del(env, son, handle);
2507                         if (rc2 != 0) {
2508                                 mdd_write_unlock(env, son);
2509                                 goto out_stop;
2510                         }
2511                 }
2512 out_volatile:
2513                 /* For volatile files drop one link immediately, since there is
2514                  * no filename in the namespace, and save any error returned. */
2515                 rc2 = mdo_ref_del(env, son, handle);
2516                 if (rc2 != 0) {
2517                         mdd_write_unlock(env, son);
2518                         if (unlikely(rc == 0))
2519                                 rc = rc2;
2520                         goto out_stop;
2521                 }
2522
2523                 /* Don't destroy the volatile object on success */
2524                 if (likely(rc != 0))
2525                         mdo_destroy(env, son, handle);
2526                 mdd_write_unlock(env, son);
2527         }
2528
2529         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2530             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2531                 rc = mdd_changelog_ns_store(env, mdd,
2532                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2533                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2534                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2535                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2536                                 NULL, handle);
2537 out_stop:
2538         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2539         if (rc == 0) {
2540                 /* If creation fails, it is most likely due to the remote update
2541                  * failure, because local transaction will mostly succeed at
2542                  * this stage. There is no easy way to rollback all of previous
2543                  * updates, so let's remove the object from namespace, and
2544                  * LFSCK should handle the orphan object. */
2545                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2546                         mdd_index_delete(env, mdd_pobj, attr, lname);
2547                 rc = rc2;
2548         }
2549 out_free:
2550         if (is_vmalloc_addr(ldata->ld_buf))
2551                 /* if we vmalloced a large buffer drop it */
2552                 lu_buf_free(ldata->ld_buf);
2553
2554         /* The child object shouldn't be cached anymore */
2555         if (rc)
2556                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2557                         &child->mo_lu.lo_header->loh_flags);
2558         return rc;
2559 }
2560
2561 /*
2562  * Get locks on parents in proper order
2563  * RETURN: < 0 - error, rename_order if successful
2564  */
2565 enum rename_order {
2566         MDD_RN_SAME,
2567         MDD_RN_SRCTGT,
2568         MDD_RN_TGTSRC
2569 };
2570
2571 static int mdd_rename_order(const struct lu_env *env,
2572                             struct mdd_device *mdd,
2573                             struct mdd_object *src_pobj,
2574                             const struct lu_attr *pattr,
2575                             struct mdd_object *tgt_pobj)
2576 {
2577         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2578         int rc;
2579         ENTRY;
2580
2581         if (src_pobj == tgt_pobj)
2582                 RETURN(MDD_RN_SAME);
2583
2584         /* compared the parent child relationship of src_p&tgt_p */
2585         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2586                 rc = MDD_RN_SRCTGT;
2587         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2588                 rc = MDD_RN_TGTSRC;
2589         } else {
2590                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2591                                    NULL);
2592                 if (rc == -EREMOTE)
2593                         rc = 0;
2594
2595                 if (rc == 1)
2596                         rc = MDD_RN_TGTSRC;
2597                 else if (rc == 0)
2598                         rc = MDD_RN_SRCTGT;
2599         }
2600
2601         RETURN(rc);
2602 }
2603
2604 /* has not mdd_write{read}_lock on any obj yet. */
2605 static int mdd_rename_sanity_check(const struct lu_env *env,
2606                                    struct mdd_object *src_pobj,
2607                                    const struct lu_attr *pattr,
2608                                    struct mdd_object *tgt_pobj,
2609                                    const struct lu_attr *tpattr,
2610                                    struct mdd_object *sobj,
2611                                    const struct lu_attr *cattr,
2612                                    struct mdd_object *tobj,
2613                                    const struct lu_attr *tattr)
2614 {
2615         int rc = 0;
2616         ENTRY;
2617
2618         /* XXX: when get here, sobj must NOT be NULL,
2619          * the other case has been processed in cld_rename
2620          * before mdd_rename and enable MDS_PERM_BYPASS. */
2621         LASSERT(sobj);
2622
2623         /*
2624          * If we are using project inheritance, we only allow renames
2625          * into our tree when the project IDs are the same; otherwise
2626          * tree quota mechanism would be circumvented.
2627          */
2628         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2629             tpattr->la_projid != cattr->la_projid) ||
2630             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2631             (pattr->la_projid != tpattr->la_projid)))
2632                 RETURN(-EXDEV);
2633
2634         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2635         if (rc)
2636                 RETURN(rc);
2637
2638         /* XXX: when get here, "tobj == NULL" means tobj must
2639          * NOT exist (neither on remote MDS, such case has been
2640          * processed in cld_rename before mdd_rename and enable
2641          * MDS_PERM_BYPASS).
2642          * So check may_create, but not check may_unlink. */
2643         if (tobj == NULL)
2644                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2645                                     (src_pobj != tgt_pobj));
2646         else
2647                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2648                                     (src_pobj != tgt_pobj), 1);
2649
2650         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2651                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2652
2653         RETURN(rc);
2654 }
2655
2656 static int mdd_declare_rename(const struct lu_env *env,
2657                               struct mdd_device *mdd,
2658                               struct mdd_object *mdd_spobj,
2659                               struct mdd_object *mdd_tpobj,
2660                               struct mdd_object *mdd_sobj,
2661                               struct mdd_object *mdd_tobj,
2662                               const struct lu_name *sname,
2663                               const struct lu_name *tname,
2664                               struct md_attr *ma,
2665                               struct linkea_data *ldata,
2666                               struct thandle *handle)
2667 {
2668         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2669         int rc;
2670
2671         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2672         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2673
2674         LASSERT(mdd_spobj);
2675         LASSERT(mdd_tpobj);
2676         LASSERT(mdd_sobj);
2677
2678         /* name from source dir */
2679         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2680         if (rc)
2681                 return rc;
2682
2683         /* .. from source child */
2684         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2685                 /* source child can be directory,
2686                  * counted by source dir's nlink */
2687                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2688                 if (rc)
2689                         return rc;
2690                 if (mdd_spobj != mdd_tpobj) {
2691                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2692                                                       handle);
2693                         if (rc != 0)
2694                                 return rc;
2695
2696                         rc = mdo_declare_index_insert(env, mdd_sobj,
2697                                                       mdo2fid(mdd_tpobj),
2698                                                       S_IFDIR, dotdot, handle);
2699                         if (rc != 0)
2700                                 return rc;
2701                 }
2702
2703                 /* new target child can be directory,
2704                  * counted by target dir's nlink */
2705                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2706                 if (rc != 0)
2707                         return rc;
2708         }
2709
2710         la->la_valid = LA_CTIME | LA_MTIME;
2711         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2712         if (rc != 0)
2713                 return rc;
2714
2715         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2716         if (rc != 0)
2717                 return rc;
2718
2719         la->la_valid = LA_CTIME;
2720         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2721         if (rc)
2722                 return rc;
2723
2724         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2725         if (rc)
2726                 return rc;
2727
2728         /* new name */
2729         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2730                                       mdd_object_type(mdd_sobj),
2731                                       tname->ln_name, handle);
2732         if (rc != 0)
2733                 return rc;
2734
2735         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2736                 /* delete target child in target parent directory */
2737                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2738                                               handle);
2739                 if (rc)
2740                         return rc;
2741
2742                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2743                 if (rc)
2744                         return rc;
2745
2746                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2747                         /* target child can be directory,
2748                          * delete "." reference in target child directory */
2749                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2750                         if (rc)
2751                                 return rc;
2752
2753                         /* delete ".." reference in target parent directory */
2754                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2755                         if (rc)
2756                                 return rc;
2757                 }
2758
2759                 la->la_valid = LA_CTIME;
2760                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2761                 if (rc)
2762                         return rc;
2763
2764                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2765                 if (rc)
2766                         return rc;
2767         }
2768
2769         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2770         if (rc)
2771                 return rc;
2772
2773         return rc;
2774 }
2775
2776 /* src object can be remote that is why we use only fid and type of object */
2777 static int mdd_rename(const struct lu_env *env,
2778                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2779                       const struct lu_fid *lf, const struct lu_name *lsname,
2780                       struct md_object *tobj, const struct lu_name *ltname,
2781                       struct md_attr *ma)
2782 {
2783         const char *sname = lsname->ln_name;
2784         const char *tname = ltname->ln_name;
2785         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2786         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2787         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2788         struct mdd_device *mdd = mdo2mdd(src_pobj);
2789         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2790         struct mdd_object *mdd_tobj = NULL;
2791         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2792         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2793         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2794         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2795         struct thandle *handle;
2796         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2797         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2798         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2799         bool is_dir;
2800         bool tobj_ref = 0;
2801         bool tobj_locked = 0;
2802         unsigned cl_flags = 0;
2803         int rc, rc2;
2804         ENTRY;
2805
2806         if (tobj)
2807                 mdd_tobj = md2mdd_obj(tobj);
2808
2809         mdd_sobj = mdd_object_find(env, mdd, lf);
2810         if (IS_ERR(mdd_sobj))
2811                 RETURN(PTR_ERR(mdd_sobj));
2812
2813         rc = mdd_la_get(env, mdd_sobj, cattr);
2814         if (rc)
2815                 GOTO(out_pending, rc);
2816
2817         rc = mdd_la_get(env, mdd_spobj, pattr);
2818         if (rc)
2819                 GOTO(out_pending, rc);
2820
2821         if (mdd_tobj) {
2822                 rc = mdd_la_get(env, mdd_tobj, tattr);
2823                 if (rc)
2824                         GOTO(out_pending, rc);
2825                 /* search for an existing archive.
2826                  * we should check ahead as the object
2827                  * can be destroyed in this transaction */
2828                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2829                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2830         }
2831
2832         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2833         if (rc)
2834                 GOTO(out_pending, rc);
2835
2836         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2837                                      mdd_sobj, cattr, mdd_tobj, tattr);
2838         if (rc)
2839                 GOTO(out_pending, rc);
2840
2841         rc = mdd_name_check(mdd, ltname);
2842         if (rc < 0)
2843                 GOTO(out_pending, rc);
2844
2845         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2846         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2847         if (rc < 0)
2848                 GOTO(out_pending, rc);
2849
2850         handle = mdd_trans_create(env, mdd);
2851         if (IS_ERR(handle))
2852                 GOTO(out_pending, rc = PTR_ERR(handle));
2853
2854         memset(ldata, 0, sizeof(*ldata));
2855         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2856                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2857                                 1, 0, ldata);
2858         if (rc)
2859                 GOTO(stop, rc);
2860
2861         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2862                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2863         if (rc)
2864                 GOTO(stop, rc);
2865
2866         rc = mdd_trans_start(env, mdd, handle);
2867         if (rc)
2868                 GOTO(stop, rc);
2869
2870         is_dir = S_ISDIR(cattr->la_mode);
2871
2872         /* Remove source name from source directory */
2873         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2874         if (rc != 0)
2875                 GOTO(stop, rc);
2876
2877         /* "mv dir1 dir2" needs "dir1/.." link update */
2878         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2879                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2880                 if (rc != 0)
2881                         GOTO(fixup_spobj2, rc);
2882
2883                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2884                                              dotdot, handle);
2885                 if (rc != 0)
2886                         GOTO(fixup_spobj, rc);
2887         }
2888
2889         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2890                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2891                 if (rc != 0)
2892                         /* tname might been renamed to something else */
2893                         GOTO(fixup_spobj, rc);
2894         }
2895
2896         /* Insert new fid with target name into target dir */
2897         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2898                                 tname, handle);
2899         if (rc != 0)
2900                 GOTO(fixup_tpobj, rc);
2901
2902         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2903         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2904
2905         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2906         la->la_valid = LA_CTIME;
2907         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2908         if (rc)
2909                 GOTO(fixup_tpobj, rc);
2910
2911         /* Update the linkEA for the source object */
2912         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2913         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2914                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
2915                               0, 0);
2916         if (rc == -ENOENT)
2917                 /* Old files might not have EA entry */
2918                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2919                               lsname, handle, NULL, 0);
2920         mdd_write_unlock(env, mdd_sobj);
2921         /* We don't fail the transaction if the link ea can't be
2922            updated -- fid2path will use alternate lookup method. */
2923         rc = 0;
2924
2925         /* Remove old target object
2926          * For tobj is remote case cmm layer has processed
2927          * and set tobj to NULL then. So when tobj is NOT NULL,
2928          * it must be local one.
2929          */
2930         if (tobj && mdd_object_exists(mdd_tobj)) {
2931                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2932                 tobj_locked = 1;
2933                 if (mdd_is_dead_obj(mdd_tobj)) {
2934                         /* shld not be dead, something is wrong */
2935                         CERROR("tobj is dead, something is wrong\n");
2936                         rc = -EINVAL;
2937                         goto cleanup;
2938                 }
2939                 mdo_ref_del(env, mdd_tobj, handle);
2940
2941                 /* Remove dot reference. */
2942                 if (S_ISDIR(tattr->la_mode))
2943                         mdo_ref_del(env, mdd_tobj, handle);
2944                 tobj_ref = 1;
2945
2946                 /* fetch updated nlink */
2947                 rc = mdd_la_get(env, mdd_tobj, tattr);
2948                 if (rc != 0) {
2949                         CERROR("%s: Failed to get nlink for tobj "
2950                                 DFID": rc = %d\n",
2951                                 mdd2obd_dev(mdd)->obd_name,
2952                                 PFID(tpobj_fid), rc);
2953                         GOTO(fixup_tpobj, rc);
2954                 }
2955
2956                 la->la_valid = LA_CTIME;
2957                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2958                 if (rc != 0) {
2959                         CERROR("%s: Failed to set ctime for tobj "
2960                                 DFID": rc = %d\n",
2961                                 mdd2obd_dev(mdd)->obd_name,
2962                                 PFID(tpobj_fid), rc);
2963                         GOTO(fixup_tpobj, rc);
2964                 }
2965
2966                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2967                 ma->ma_attr = *tattr;
2968                 ma->ma_valid |= MA_INODE;
2969                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2970                                        handle);
2971                 if (rc != 0) {
2972                         CERROR("%s: Failed to unlink tobj "
2973                                 DFID": rc = %d\n",
2974                                 mdd2obd_dev(mdd)->obd_name,
2975                                 PFID(tpobj_fid), rc);
2976                         GOTO(fixup_tpobj, rc);
2977                 }
2978
2979                 /* fetch updated nlink */
2980                 rc = mdd_la_get(env, mdd_tobj, tattr);
2981                 if (rc == -ENOENT) {
2982                         /* the object got removed, let's
2983                          * return the latest known attributes */
2984                         tattr->la_nlink = 0;
2985                         rc = 0;
2986                 } else if (rc != 0) {
2987                         CERROR("%s: Failed to get nlink for tobj "
2988                                 DFID": rc = %d\n",
2989                                 mdd2obd_dev(mdd)->obd_name,
2990                                 PFID(tpobj_fid), rc);
2991                         GOTO(fixup_tpobj, rc);
2992                 }
2993                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2994                 ma->ma_attr = *tattr;
2995                 ma->ma_valid |= MA_INODE;
2996
2997                 if (tattr->la_nlink == 0)
2998                         cl_flags |= CLF_RENAME_LAST;
2999                 else
3000                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3001         }
3002
3003         la->la_valid = LA_CTIME | LA_MTIME;
3004         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3005         if (rc)
3006                 GOTO(fixup_tpobj, rc);
3007
3008         if (mdd_spobj != mdd_tpobj) {
3009                 la->la_valid = LA_CTIME | LA_MTIME;
3010                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3011                 if (rc != 0)
3012                         GOTO(fixup_tpobj, rc);
3013         }
3014
3015         EXIT;
3016
3017 fixup_tpobj:
3018         if (rc) {
3019                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3020                 if (rc2)
3021                         CWARN("tp obj fix error %d\n",rc2);
3022
3023                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3024                     !mdd_is_dead_obj(mdd_tobj)) {
3025                         if (tobj_ref) {
3026                                 mdo_ref_add(env, mdd_tobj, handle);
3027                                 if (is_dir)
3028                                         mdo_ref_add(env, mdd_tobj, handle);
3029                         }
3030
3031                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3032                                                   mdo2fid(mdd_tobj),
3033                                                   mdd_object_type(mdd_tobj),
3034                                                   tname, handle);
3035                         if (rc2 != 0)
3036                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3037                 }
3038         }
3039
3040 fixup_spobj:
3041         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3042                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3043                 if (rc2)
3044                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3045                                mdd2obd_dev(mdd)->obd_name, rc2);
3046
3047
3048                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3049                                               dotdot, handle);
3050                 if (rc2 != 0)
3051                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3052                               mdd2obd_dev(mdd)->obd_name, rc2);
3053         }
3054
3055 fixup_spobj2:
3056         if (rc != 0) {
3057                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3058                                          mdd_object_type(mdd_sobj), sname,
3059                                          handle);
3060                 if (rc2 != 0)
3061                         CWARN("sp obj fix error: rc = %d\n", rc2);
3062         }
3063
3064 cleanup:
3065         if (tobj_locked)
3066                 mdd_write_unlock(env, mdd_tobj);
3067
3068         if (rc == 0)
3069                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3070                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3071                                             ltname, lsname, handle);
3072
3073 stop:
3074         rc = mdd_trans_stop(env, mdd, rc, handle);
3075
3076 out_pending:
3077         mdd_object_put(env, mdd_sobj);
3078         return rc;
3079 }
3080
3081 /**
3082  * During migration once the parent FID has been changed,
3083  * we need update the parent FID in linkea.
3084  **/
3085 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3086                                             struct mdd_object *parent,
3087                                             struct mdd_object *newparent,
3088                                             struct mdd_object *child,
3089                                             const char *name, int namelen,
3090                                             struct thandle *handle,
3091                                             bool declare)
3092 {
3093         struct mdd_thread_info  *info = mdd_env_info(env);
3094         struct linkea_data      ldata = { NULL };
3095         struct lu_buf           *buf = &info->mti_link_buf;
3096         int                     count;
3097         int                     rc = 0;
3098
3099         ENTRY;
3100
3101         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3102         if (buf->lb_buf == NULL)
3103                 RETURN(-ENOMEM);
3104
3105         ldata.ld_buf = buf;
3106         rc = mdd_links_read(env, child, &ldata);
3107         if (rc != 0) {
3108                 if (rc == -ENOENT || rc == -ENODATA)
3109                         rc = 0;
3110                 RETURN(rc);
3111         }
3112
3113         LASSERT(ldata.ld_leh != NULL);
3114         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3115         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3116                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3117                 struct lu_name lname;
3118                 struct lu_fid  fid;
3119
3120                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3121                                     &lname, &fid);
3122
3123                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3124                     !lu_fid_eq(&fid, mdd_object_fid(parent))) {
3125                         ldata.ld_lee = (struct link_ea_entry *)
3126                                        ((char *)ldata.ld_lee +
3127                                         ldata.ld_reclen);
3128                         continue;
3129                 }
3130
3131                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3132                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3133                        lname.ln_namelen, lname.ln_name,
3134                        PFID(mdd_object_fid(newparent)));
3135                 /* update to the new parent fid */
3136                 linkea_entry_pack(ldata.ld_lee, &lname,
3137                                   mdd_object_fid(newparent));
3138                 if (declare)
3139                         rc = mdd_declare_links_add(env, child, handle, &ldata);
3140                 else
3141                         rc = mdd_links_write(env, child, &ldata, handle);
3142                 break;
3143         }
3144         RETURN(rc);
3145 }
3146
3147 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3148                                            struct mdd_object *parent,
3149                                            struct mdd_object *newparent,
3150                                            struct mdd_object *child,
3151                                            const char *name, int namelen,
3152                                            struct thandle *handle)
3153 {
3154         return mdd_linkea_update_child_internal(env, parent, newparent,
3155                                                 child, name,
3156                                                 namelen, handle, true);
3157 }
3158
3159 static int mdd_linkea_update_child(const struct lu_env *env,
3160                                    struct mdd_object *parent,
3161                                    struct mdd_object *newparent,
3162                                    struct mdd_object *child,
3163                                    const char *name, int namelen,
3164                                    struct thandle *handle)
3165 {
3166         return mdd_linkea_update_child_internal(env, parent, newparent,
3167                                                 child, name,
3168                                                 namelen, handle, false);
3169 }
3170
3171 static int mdd_update_linkea_internal(const struct lu_env *env,
3172                                       struct mdd_object *mdd_pobj,
3173                                       struct mdd_object *mdd_sobj,
3174                                       struct mdd_object *mdd_tobj,
3175                                       const struct lu_name *child_name,
3176                                       struct linkea_data *ldata,
3177                                       struct thandle *handle,
3178                                       int declare)
3179 {
3180         struct mdd_thread_info  *info = mdd_env_info(env);
3181         int                     count;
3182         int                     rc = 0;
3183         ENTRY;
3184
3185         LASSERT(ldata->ld_buf != NULL);
3186         LASSERT(ldata->ld_leh != NULL);
3187
3188         /* If it is mulitple links file, we need update the name entry for
3189          * all parent */
3190         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3191         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3192                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3193                 struct mdd_object       *pobj;
3194                 struct lu_name          lname;
3195                 struct lu_fid           fid;
3196
3197                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3198                                     &lname, &fid);
3199                 pobj = mdd_object_find(env, mdd, &fid);
3200                 if (IS_ERR(pobj)) {
3201                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3202                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3203                               PTR_ERR(pobj));
3204                         continue;
3205                 }
3206
3207                 if (!mdd_object_exists(pobj)) {
3208                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3209                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3210                         goto next_put;
3211                 }
3212
3213                 if (pobj == mdd_pobj &&
3214                     lname.ln_namelen == child_name->ln_namelen &&
3215                     strncmp(lname.ln_name, child_name->ln_name,
3216                             lname.ln_namelen) == 0) {
3217                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3218                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3219                               PFID(&fid));
3220                         goto next_put;
3221                 }
3222
3223                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3224                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3225                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3226
3227                 if (declare) {
3228                         /* Remove source name from source directory */
3229                         /* Insert new fid with target name into target dir */
3230                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3231                                                       handle);
3232                         if (rc != 0)
3233                                 GOTO(next_put, rc);
3234
3235                         rc = mdo_declare_index_insert(env, pobj,
3236                                         mdd_object_fid(mdd_tobj),
3237                                         mdd_object_type(mdd_tobj),
3238                                         lname.ln_name, handle);
3239                         if (rc != 0)
3240                                 GOTO(next_put, rc);
3241
3242                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3243                         if (rc)
3244                                 GOTO(next_put, rc);
3245
3246                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3247                         if (rc)
3248                                 GOTO(next_put, rc);
3249                 } else {
3250                         char *tmp_name = info->mti_key;
3251
3252                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3253                                 /* lnamelen is too big(> NAME_MAX + 16),
3254                                  * something wrong about this linkea, let's
3255                                  * skip it */
3256                                 CWARN("%s: the name %.*s is too long under "
3257                                       DFID"\n", mdd2obd_dev(mdd)->obd_name,
3258                                       lname.ln_namelen, lname.ln_name,
3259                                       PFID(&fid));
3260                                 goto next_put;
3261                         }
3262
3263                         /* Note: lname might be without \0 at the end, see
3264                          * linkea_entry_unpack(), let's add extra \0 by
3265                          * snprintf */
3266                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3267                                  lname.ln_namelen, lname.ln_name);
3268                         lname.ln_name = tmp_name;
3269
3270                         /* Let's check if this linkEA still valid, before
3271                          * it might be packed into the RPC buffer. */
3272                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3273                                         &info->mti_fid, NULL);
3274                         if (rc < 0 || !lu_fid_eq(&info->mti_fid,
3275                                                  mdd_object_fid(mdd_sobj)))
3276                                 GOTO(next_put, rc == -ENOENT ? 0 : rc);
3277
3278                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3279                         if (rc != 0)
3280                                 GOTO(next_put, rc);
3281
3282                         rc = __mdd_index_insert(env, pobj,
3283                                         mdd_object_fid(mdd_tobj),
3284                                         mdd_object_type(mdd_tobj),
3285                                         tmp_name, handle);
3286                         if (rc != 0)
3287                                 GOTO(next_put, rc);
3288
3289                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3290                         rc = mdo_ref_add(env, mdd_tobj, handle);
3291                         mdd_write_unlock(env, mdd_tobj);
3292                         if (rc)
3293                                 GOTO(next_put, rc);
3294
3295                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3296                         mdo_ref_del(env, mdd_sobj, handle);
3297                         mdd_write_unlock(env, mdd_sobj);
3298                 }
3299 next_put:
3300                 mdd_object_put(env, pobj);
3301                 if (rc != 0)
3302                         break;
3303
3304                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3305                                                          ldata->ld_reclen);
3306         }
3307
3308         RETURN(rc);
3309 }
3310
3311 static int mdd_migrate_xattrs(const struct lu_env *env,
3312                               struct mdd_object *mdd_sobj,
3313                               struct mdd_object *mdd_tobj)
3314 {
3315         struct mdd_thread_info  *info = mdd_env_info(env);
3316         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3317         char                    *xname;
3318         struct thandle          *handle;
3319         struct lu_buf           xbuf;
3320         int                     xlen;
3321         int                     rem;
3322         int                     xsize;
3323         int                     list_xsize;
3324         struct lu_buf           list_xbuf;
3325         int                     rc;
3326
3327         /* retrieve xattr list from the old object */
3328         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3329         if (list_xsize == -ENODATA)
3330                 return 0;
3331
3332         if (list_xsize < 0)
3333                 return list_xsize;
3334
3335         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3336         if (info->mti_big_buf.lb_buf == NULL)
3337                 return -ENOMEM;
3338
3339         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3340         list_xbuf.lb_len = list_xsize;
3341         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3342         if (rc < 0)
3343                 return rc;
3344         rc = 0;
3345         rem = list_xsize;
3346         xname = list_xbuf.lb_buf;
3347         while (rem > 0) {
3348                 xlen = strnlen(xname, rem - 1) + 1;
3349                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3350                     strcmp(XATTR_NAME_LMV, xname) == 0)
3351                         goto next;
3352
3353                 /* For directory, if there are default layout, migrate here */
3354                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3355                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3356                         goto next;
3357
3358                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3359                 if (xsize == -ENODATA)
3360                         goto next;
3361                 if (xsize < 0)
3362                         GOTO(out, rc);
3363
3364                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3365                 if (info->mti_link_buf.lb_buf == NULL)
3366                         GOTO(out, rc = -ENOMEM);
3367
3368                 xbuf.lb_len = xsize;
3369                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3370                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3371                 if (rc == -ENODATA)
3372                         goto next;
3373                 if (rc < 0)
3374                         GOTO(out, rc);
3375
3376                 handle = mdd_trans_create(env, mdd);
3377                 if (IS_ERR(handle))
3378                         GOTO(out, rc = PTR_ERR(handle));
3379
3380                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3381                                            handle);
3382                 if (rc != 0)
3383                         GOTO(stop_trans, rc);
3384                 /* Note: this transaction is part of migration, and it is not
3385                  * the last step of migration, so we set th_local = 1 to avoid
3386                  * update last rcvd for this transaction */
3387                 handle->th_local = 1;
3388                 rc = mdd_trans_start(env, mdd, handle);
3389                 if (rc != 0)
3390                         GOTO(stop_trans, rc);
3391
3392 again:
3393                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3394                 if (rc == -EEXIST)
3395                         GOTO(stop_trans, rc = 0);
3396
3397                 if (unlikely(rc == -ENOSPC &&
3398                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3399                         rc = linkea_overflow_shrink(
3400                                         (struct linkea_data *)(xbuf.lb_buf));
3401                         if (likely(rc > 0)) {
3402                                 xbuf.lb_len = rc;
3403                                 goto again;
3404                         }
3405                 }
3406
3407                 if (rc != 0)
3408                         GOTO(stop_trans, rc);
3409 stop_trans:
3410                 rc = mdd_trans_stop(env, mdd, rc, handle);
3411                 if (rc != 0)
3412                         GOTO(out, rc);
3413 next:
3414                 rem -= xlen;
3415                 memmove(xname, xname + xlen, rem);
3416         }
3417 out:
3418         return rc;
3419 }
3420
3421 static int mdd_declare_migrate_create(const struct lu_env *env,
3422                                       struct mdd_object *mdd_pobj,
3423                                       struct mdd_object *mdd_sobj,
3424                                       struct mdd_object *mdd_tobj,
3425                                       struct md_op_spec *spec,
3426                                       struct lu_attr *la,
3427                                       union lmv_mds_md *mgr_ea,
3428                                       struct linkea_data *ldata,
3429                                       struct thandle *handle)
3430 {
3431         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3432         const struct lu_buf     *buf;
3433         int                     rc;
3434         int                     mgr_easize;
3435
3436         rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
3437                                                 handle, spec, NULL);
3438         if (rc != 0)
3439                 return rc;
3440
3441         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3442                                            handle);
3443         if (rc != 0)
3444                 return rc;
3445
3446         if (S_ISLNK(la->la_mode)) {
3447                 const char *target_name = spec->u.sp_symname;
3448                 int sym_len = strlen(target_name);
3449                 const struct lu_buf *buf;
3450
3451                 buf = mdd_buf_get_const(env, target_name, sym_len);
3452                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3453                                              buf, 0, handle);
3454                 if (rc != 0)
3455                         return rc;
3456         } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
3457                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
3458                 if (rc != 0)
3459                         return rc;
3460         }
3461
3462         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3463                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3464                                         spec->u.sp_ea.eadatalen);
3465                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3466                                            0, handle);
3467                 if (rc)
3468                         return rc;
3469         }
3470
3471         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3472         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3473         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3474                                    0, handle);
3475         if (rc)
3476                 return rc;
3477
3478         la_flag->la_valid = LA_FLAGS;
3479         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3480         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3481
3482         return rc;
3483 }
3484
3485 static int mdd_migrate_create(const struct lu_env *env,
3486                               struct mdd_object *mdd_pobj,
3487                               struct mdd_object *mdd_sobj,
3488                               struct mdd_object *mdd_tobj,
3489                               const struct lu_name *lname,
3490                               struct lu_attr *la)
3491 {
3492         struct mdd_thread_info  *info = mdd_env_info(env);
3493         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3494         struct md_op_spec       *spec = &info->mti_spec;
3495         struct lu_buf           lmm_buf = { NULL };
3496         struct lu_buf           link_buf = { NULL };
3497         struct lu_buf            mgr_buf;
3498         struct thandle          *handle;
3499         struct lmv_mds_md_v1    *mgr_ea;
3500         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3501         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3502         int                     mgr_easize;
3503         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3504         int                     rc;
3505         ENTRY;
3506
3507         /* prepare spec for create */
3508         memset(spec, 0, sizeof(*spec));
3509         spec->sp_cr_lookup = 0;
3510         spec->sp_feat = &dt_directory_features;
3511         if (S_ISLNK(la->la_mode)) {
3512                 const struct lu_buf *buf;
3513
3514                 buf = lu_buf_check_and_alloc(
3515                                 &mdd_env_info(env)->mti_big_buf,
3516                                 la->la_size + 1);
3517                 link_buf = *buf;
3518                 link_buf.lb_len = la->la_size + 1;
3519                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3520                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3521                 if (rc <= 0) {
3522                         rc = rc != 0 ? rc : -EFAULT;
3523                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3524                                mdd2obd_dev(mdd)->obd_name,
3525                                PFID(mdd_object_fid(mdd_sobj)), rc);
3526                         RETURN(rc);
3527                 }
3528                 spec->u.sp_symname = link_buf.lb_buf;
3529         } else if (S_ISREG(la->la_mode)) {
3530                 /* retrieve lov of the old object */
3531                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3532                 if (rc != 0 && rc != -ENODATA)
3533                         RETURN(rc);
3534                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3535                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3536                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3537                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3538                 }
3539         } else if (S_ISDIR(la->la_mode)) {
3540                 rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
3541                 if (rc == -ENODATA) {
3542                         /* ignore the non-linkEA error */
3543                         ldata = NULL;
3544                         rc = 0;
3545                 }
3546                 if (rc < 0)
3547                         RETURN(rc);
3548         }
3549
3550         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3551         lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
3552         mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
3553         mgr_buf.lb_len = mgr_easize;
3554         mgr_ea = mgr_buf.lb_buf;
3555         memset(mgr_ea, 0, sizeof(*mgr_ea));
3556         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3557         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3558         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3559         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3560         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3561         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3562
3563         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3564
3565         handle = mdd_trans_create(env, mdd);
3566         if (IS_ERR(handle))
3567                 GOTO(out_free, rc = PTR_ERR(handle));
3568
3569         /* Note: this transaction is part of migration, and it is not
3570          * the last step of migration, so we set th_local = 1 to avoid
3571          * update last rcvd for this transaction */
3572         handle->th_local = 1;
3573         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
3574                                         la, mgr_buf.lb_buf, ldata, handle);
3575         if (rc != 0)
3576                 GOTO(stop_trans, rc);
3577
3578         rc = mdd_trans_start(env, mdd, handle);
3579         if (rc != 0)
3580                 GOTO(stop_trans, rc);
3581
3582         /* don't set nlink from the original object */
3583         la->la_valid &= ~LA_NLINK;
3584
3585         /* create the target object */
3586         rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3587                                hint, handle);
3588         if (rc != 0)
3589                 GOTO(stop_trans, rc);
3590
3591         if (S_ISDIR(la->la_mode) && ldata != NULL) {
3592                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3593                 if (rc != 0)
3594                         GOTO(stop_trans, rc);
3595         }
3596
3597         /* Set MIGRATE EA on the source inode, so once the migration needs
3598          * to be re-done during failover, the re-do process can locate the
3599          * target object which is already being created. */
3600         rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
3601         if (rc != 0)
3602                 GOTO(stop_trans, rc);
3603
3604         /* Set immutable flag, so any modification is disabled until
3605          * the migration is done. Once the migration is interrupted,
3606          * if the resume process find the migrating object has both
3607          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3608          * flag and approve the migration */
3609         la_flag->la_valid = LA_FLAGS;
3610         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3611         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3612 stop_trans:
3613         if (handle != NULL)
3614                 rc = mdd_trans_stop(env, mdd, rc, handle);
3615 out_free:
3616         if (lmm_buf.lb_buf != NULL)
3617                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3618         RETURN(rc);
3619 }
3620
3621 static int mdd_migrate_entries(const struct lu_env *env,
3622                                struct mdd_object *mdd_sobj,
3623                                struct mdd_object *mdd_tobj)
3624 {
3625         struct dt_object        *next = mdd_object_child(mdd_sobj);
3626         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3627         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3628         struct thandle          *handle;
3629         struct dt_it            *it;
3630         const struct dt_it_ops  *iops;
3631         int                      result;
3632         struct lu_dirent        *ent;
3633         int                      rc;
3634         ENTRY;
3635
3636         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3637         if (ent == NULL)
3638                 RETURN(-ENOMEM);
3639
3640         if (!dt_try_as_dir(env, next))
3641                 GOTO(out_ent, rc = -ENOTDIR);
3642         /*
3643          * iterate directories
3644          */
3645         iops = &next->do_index_ops->dio_it;
3646         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3647         if (IS_ERR(it))
3648                 GOTO(out_ent, rc = PTR_ERR(it));
3649
3650         rc = iops->load(env, it, 0);
3651         if (rc == 0)
3652                 rc = iops->next(env, it);
3653         else if (rc > 0)
3654                 rc = 0;
3655         /*
3656          * At this point and across for-loop:
3657          *
3658          *  rc == 0 -> ok, proceed.
3659          *  rc >  0 -> end of directory.
3660          *  rc <  0 -> error.
3661          */
3662         do {
3663                 struct mdd_object       *child;
3664                 char                    *name = mdd_env_info(env)->mti_key;
3665                 int                     len;
3666                 int                     is_dir;
3667                 bool                    target_exist = false;
3668
3669                 len = iops->key_size(env, it);
3670                 if (len == 0)
3671                         goto next;
3672
3673                 result = iops->rec(env, it, (struct dt_rec *)ent,
3674                                    LUDA_FID | LUDA_TYPE);
3675                 if (result == -ESTALE)
3676                         goto next;
3677                 if (result != 0) {
3678                         rc = result;
3679                         goto out;
3680                 }
3681
3682                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3683
3684                 /* Insert new fid with target name into target dir */
3685                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3686                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3687                      ent->lde_name[1] == '.'))
3688                         goto next;
3689
3690                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3691                 if (IS_ERR(child))
3692                         GOTO(out, rc = PTR_ERR(child));
3693
3694                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3695                 is_dir = S_ISDIR(mdd_object_type(child));
3696
3697                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3698
3699                 /* Check whether the name has been inserted to the target */
3700                 if (dt_try_as_dir(env, dt_tobj)) {
3701                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3702
3703                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3704                                        (struct dt_key *)name);
3705                         if (unlikely(rc == 0))
3706                                 target_exist = true;
3707                 }
3708
3709                 handle = mdd_trans_create(env, mdd);
3710                 if (IS_ERR(handle))
3711                         GOTO(out_put, rc = PTR_ERR(handle));
3712
3713                 /* Note: this transaction is part of migration, and it is not
3714                  * the last step of migration, so we set th_local = 1 to avoid
3715                  * updating last rcvd for this transaction */
3716                 handle->th_local = 1;
3717                 if (likely(!target_exist)) {
3718                         rc = mdo_declare_index_insert(env, mdd_tobj,
3719                                                       &ent->lde_fid,
3720                                                       mdd_object_type(child),
3721                                                       name, handle);
3722                         if (rc != 0)
3723                                 GOTO(out_put, rc);
3724
3725                         if (is_dir) {
3726                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3727                                 if (rc != 0)
3728                                         GOTO(out_put, rc);
3729                         }
3730                 }
3731
3732                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3733                 if (rc != 0)
3734                         GOTO(out_put, rc);
3735
3736                 if (is_dir) {
3737                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3738                         if (rc != 0)
3739                                 GOTO(out_put, rc);
3740
3741                         /* Update .. for child */
3742                         rc = mdo_declare_index_delete(env, child, dotdot,
3743                                                       handle);
3744                         if (rc != 0)
3745                                 GOTO(out_put, rc);
3746
3747                         rc = mdo_declare_index_insert(env, child,
3748                                                       mdd_object_fid(mdd_tobj),
3749                                                       S_IFDIR, dotdot, handle);
3750                         if (rc != 0)
3751                                 GOTO(out_put, rc);
3752                 }
3753
3754                 rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
3755                                                      child, name,
3756                                                      strlen(name),
3757                                                      handle);
3758                 if (rc != 0)
3759                         GOTO(out_put, rc);
3760
3761                 rc = mdd_trans_start(env, mdd, handle);
3762                 if (rc != 0) {
3763                         CERROR("%s: transaction start failed: rc = %d\n",
3764                                mdd2obd_dev(mdd)->obd_name, rc);
3765                         GOTO(out_put, rc);
3766                 }
3767
3768                 if (likely(!target_exist)) {
3769                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3770                                                 mdd_object_type(child),
3771                                                 name, handle);
3772                         if (rc != 0)
3773                                 GOTO(out_put, rc);
3774                 }
3775
3776                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3777                 if (rc != 0)
3778                         GOTO(out_put, rc);
3779
3780                 if (is_dir) {
3781                         rc = __mdd_index_delete_only(env, child, dotdot,
3782                                                      handle);
3783                         if (rc != 0)
3784                                 GOTO(out_put, rc);
3785
3786                         rc = __mdd_index_insert_only(env, child,
3787                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3788                                          dotdot, handle);
3789                         if (rc != 0)
3790                                 GOTO(out_put, rc);
3791                 }
3792
3793                 rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
3794                                              child, name,
3795                                              strlen(name), handle);
3796
3797 out_put:
3798                 mdd_write_unlock(env, child);
3799                 mdd_object_put(env, child);
3800                 rc = mdd_trans_stop(env, mdd, rc, handle);
3801                 if (rc != 0)
3802                         GOTO(out, rc);
3803 next:
3804                 result = iops->next(env, it);
3805                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3806                         GOTO(out, rc = -EINTR);
3807
3808                 if (result == -ESTALE)
3809                         goto next;
3810         } while (result == 0);
3811 out:
3812         iops->put(env, it);
3813         iops->fini(env, it);
3814 out_ent:
3815         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3816         RETURN(rc);
3817 }
3818
3819 static int mdd_declare_update_linkea(const struct lu_env *env,
3820                                      struct mdd_object *mdd_pobj,
3821                                      struct mdd_object *mdd_sobj,
3822                                      struct mdd_object *mdd_tobj,
3823                                      const struct lu_name *child_name,
3824                                      struct linkea_data *ldata,
3825                                      struct thandle *handle)
3826 {
3827         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3828                                           child_name, ldata, handle, 1);
3829 }
3830
3831 static int mdd_update_linkea(const struct lu_env *env,
3832                              struct mdd_object *mdd_pobj,
3833                              struct mdd_object *mdd_sobj,
3834                              struct mdd_object *mdd_tobj,
3835                              const struct lu_name *child_name,
3836                              struct linkea_data *ldata,
3837                              struct thandle *handle)
3838 {
3839         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3840                                           child_name, ldata, handle, 0);
3841 }
3842
3843 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3844                                            struct mdd_object *mdd_pobj,
3845                                            struct mdd_object *mdd_sobj,
3846                                            struct mdd_object *mdd_tobj,
3847                                            const struct lu_name *lname,
3848                                            struct lu_attr *la,
3849                                            struct lu_attr *parent_la,
3850                                            struct linkea_data *ldata,
3851                                            struct thandle *handle)
3852 {
3853         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3854         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
3855         int rc;
3856
3857         /* Revert IMMUTABLE flag */
3858         la_flag->la_valid = LA_FLAGS;
3859         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3860         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3861         if (rc != 0)
3862                 return rc;
3863
3864         /* delete entry from source dir */
3865         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3866         if (rc != 0)
3867                 return rc;
3868
3869         if (ldata->ld_buf != NULL) {
3870                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3871                                                mdd_tobj, lname, ldata, handle);
3872                 if (rc != 0)
3873                         return rc;
3874         }
3875
3876         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3877                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3878                                            handle);
3879                 if (rc != 0)
3880                         return rc;
3881
3882                 handle->th_complex = 1;
3883                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3884                                            XATTR_NAME_FID,
3885                                            LU_XATTR_REPLACE, handle);
3886                 if (rc < 0)
3887                         return rc;
3888         }
3889
3890         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3891                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3892                 if (rc != 0)
3893                         return rc;
3894         }
3895
3896         /* new name */
3897         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3898                                       mdd_object_type(mdd_tobj),
3899                                       lname->ln_name, handle);
3900         if (rc != 0)
3901                 return rc;
3902
3903         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
3904         if (rc != 0)
3905                 return rc;
3906
3907         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3908                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3909                 if (rc != 0)
3910                         return rc;
3911         }
3912
3913         /* delete old object */
3914         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3915         if (rc != 0)
3916                 return rc;
3917
3918         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3919                 /* delete old object */
3920                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3921                 if (rc != 0)
3922                         return rc;
3923                 /* set nlink to 0 */
3924                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3925                 if (rc != 0)
3926                         return rc;
3927         }
3928
3929         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3930         if (rc)
3931                 return rc;
3932
3933         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3934         if (rc != 0)
3935                 return rc;
3936
3937         rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
3938
3939         return rc;
3940 }
3941
3942 static int mdd_migrate_update_name(const struct lu_env *env,
3943                                    struct mdd_object *mdd_pobj,
3944                                    struct mdd_object *mdd_sobj,
3945                                    struct mdd_object *mdd_tobj,
3946                                    const struct lu_name *lname,
3947                                    struct md_attr *ma)
3948 {
3949         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3950         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3951         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3952         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3953         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3954         struct thandle          *handle;
3955         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3956         const char              *name = lname->ln_name;
3957         int                     rc;
3958         ENTRY;
3959
3960         /* update time for parent */
3961         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3962         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3963         p_la->la_valid = LA_CTIME;
3964
3965         rc = mdd_la_get(env, mdd_sobj, so_attr);
3966         if (rc != 0)
3967                 RETURN(rc);
3968
3969         ldata->ld_buf = NULL;
3970         rc = mdd_links_read(env, mdd_sobj, ldata);
3971         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
3972                 RETURN(rc);
3973
3974         handle = mdd_trans_create(env, mdd);
3975         if (IS_ERR(handle))
3976                 RETURN(PTR_ERR(handle));
3977
3978         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3979                                              lname, so_attr, p_la, ldata,
3980                                              handle);
3981         if (rc != 0) {
3982                 /* If the migration can not be fit in one transaction, just
3983                  * leave it in the original MDT */
3984                 if (rc == -E2BIG)
3985                         GOTO(stop_trans, rc = 0);
3986                 else
3987                         GOTO(stop_trans, rc);
3988         }
3989
3990         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3991                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3992                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3993                PFID(mdd_object_fid(mdd_tobj)));
3994
3995         rc = mdd_trans_start(env, mdd, handle);
3996         if (rc != 0)
3997                 GOTO(stop_trans, rc);
3998
3999         /* Revert IMMUTABLE flag */
4000         la_flag->la_valid = LA_FLAGS;
4001         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
4002         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
4003         if (rc != 0)
4004                 GOTO(stop_trans, rc);
4005
4006         /* Remove source name from source directory */
4007         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
4008         if (rc != 0)
4009                 GOTO(stop_trans, rc);
4010
4011         if (ldata->ld_buf != NULL) {
4012                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
4013                                        lname, ldata, handle);
4014                 if (rc != 0)
4015                         GOTO(stop_trans, rc);
4016
4017                 /*  linkea update might decrease the source object
4018                  *  nlink, let's get the attr again after ref_del */
4019                 rc = mdd_la_get(env, mdd_sobj, so_attr);
4020                 if (rc != 0)
4021                         GOTO(stop_trans, rc);
4022         }
4023
4024         if (S_ISREG(so_attr->la_mode)) {
4025                 if (so_attr->la_nlink == 1) {
4026                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4027                                            handle);
4028                         if (rc != 0 && rc != -ENODATA)
4029                                 GOTO(stop_trans, rc);
4030
4031                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
4032                                            XATTR_NAME_FID,
4033                                            LU_XATTR_REPLACE, handle);
4034                         if (rc < 0)
4035                                 GOTO(stop_trans, rc);
4036                 }
4037         }
4038
4039         /* Insert new fid with target name into target dir */
4040         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4041                                 mdd_object_type(mdd_tobj), name, handle);
4042         if (rc != 0)
4043                 GOTO(stop_trans, rc);
4044
4045         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4046
4047         mdd_sobj->mod_flags |= DEAD_OBJ;
4048         rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
4049         if (rc != 0)
4050                 GOTO(out_unlock, rc);
4051
4052         rc = __mdd_orphan_add(env, mdd_sobj, handle);
4053         if (rc != 0)
4054                 GOTO(out_unlock, rc);
4055
4056         mdo_ref_del(env, mdd_sobj, handle);
4057         if (is_dir)
4058                 mdo_ref_del(env, mdd_sobj, handle);
4059
4060         /* Get the attr again after ref_del */
4061         rc = mdd_la_get(env, mdd_sobj, so_attr);
4062         if (rc != 0)
4063                 GOTO(out_unlock, rc);
4064
4065         ma->ma_attr = *so_attr;
4066         ma->ma_valid |= MA_INODE;
4067
4068         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4069         if (rc != 0)
4070                 GOTO(out_unlock, rc);
4071
4072         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4073                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4074                                mdo2fid(mdd_pobj), lname, lname, handle);
4075         if (rc != 0) {
4076                 CWARN("%s: changelog for migrate %s "DFID
4077                       "under "DFID" failed: rc = %d\n",
4078                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4079                       PFID(mdd_object_fid(mdd_sobj)),
4080                       PFID(mdd_object_fid(mdd_pobj)), rc);
4081                 /* Sigh, there are no easy way to migrate back the object, so
4082                  * let's reset the result to 0 for now XXX */
4083                 rc = 0;
4084         }
4085 out_unlock:
4086         mdd_write_unlock(env, mdd_sobj);
4087
4088 stop_trans:
4089         rc = mdd_trans_stop(env, mdd, rc, handle);
4090
4091         RETURN(rc);
4092 }
4093
4094 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4095                           const struct lu_fid *fid, __u32 *mdt_index)
4096 {
4097         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4098         struct seq_server_site *ss;
4099         int rc;
4100
4101         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4102
4103         range->lsr_flags = LU_SEQ_RANGE_MDT;
4104         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4105         if (rc != 0)
4106                 return rc;
4107
4108         *mdt_index = range->lsr_index;
4109
4110         return 0;
4111 }
4112 /**
4113  * Check whether we should migrate the file/dir
4114  * return val
4115  *      < 0  permission check failed or other error.
4116  *      = 0  the file can be migrated.
4117  *      > 0  the file does not need to be migrated, mostly
4118  *           for multiple link file
4119  **/
4120 static int mdd_migrate_sanity_check(const struct lu_env *env,
4121                                     struct mdd_object *pobj,
4122                                     const struct lu_attr *pattr,
4123                                     struct mdd_object *sobj,
4124                                     struct lu_attr *sattr)
4125 {
4126         struct mdd_thread_info  *info = mdd_env_info(env);
4127         struct linkea_data      *ldata = &info->mti_link_data;
4128         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4129         int                     mgr_easize;
4130         struct lu_buf           *mgr_buf;
4131         int                     count;
4132         int                     rc;
4133         __u64 mdt_index;
4134         ENTRY;
4135
4136         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4137         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4138         if (mgr_buf->lb_buf == NULL)
4139                 RETURN(-ENOMEM);
4140
4141         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4142         if (rc > 0) {
4143                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4144
4145                 /* If the object has migrateEA, it means IMMUTE flag
4146                  * is being set by previous migration process, so it
4147                  * needs to override the IMMUTE flag, otherwise the
4148                  * following sanity check will fail */
4149                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4150                                                 LMV_HASH_FLAG_MIGRATION) {
4151                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4152
4153                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4154                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4155                                mdd2obd_dev(mdd)->obd_name,
4156                                PFID(mdd_object_fid(sobj)));
4157                 }
4158         }
4159
4160         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4161                                      sobj, sattr, NULL, NULL);
4162         if (rc != 0)
4163                 RETURN(rc);
4164
4165         /* Then it will check if the file should be migrated. If the file
4166          * has mulitple links, we only need migrate the file if all of its
4167          * entries has been migrated to the remote MDT */
4168         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4169                 RETURN(0);
4170
4171         rc = mdd_links_read(env, sobj, ldata);
4172         if (rc != 0) {
4173                 /* For multiple links files, if there are no linkEA data at all,
4174                  * means the file might be created before linkEA is enabled, and
4175                  * all of its links should not be migrated yet, otherwise it
4176                  * should have some linkEA there */
4177                 if (rc == -ENOENT || rc == -ENODATA)
4178                         RETURN(1);
4179                 RETURN(rc);
4180         }
4181
4182         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4183         /* If there are still links locally, then the file will not be
4184          * migrated. */
4185         LASSERT(ldata->ld_leh != NULL);
4186
4187         /* If the linkEA is overflow, then means there are some unknown name
4188          * entries under unknown parents, that will prevent the migration. */
4189         if (unlikely(ldata->ld_leh->leh_overflow_time))
4190                 RETURN(1);
4191
4192         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4193         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4194                 struct lu_name          lname;
4195                 struct lu_fid           fid;
4196                 __u32                   parent_mdt_index;
4197
4198                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4199                                     &lname, &fid);
4200                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4201                                                          ldata->ld_reclen);
4202
4203                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4204                 if (rc != 0)
4205                         RETURN(rc);
4206
4207                 /* Migrate the object only if none of its parents are on the
4208                  * current MDT. */
4209                 if (parent_mdt_index != mdt_index)
4210                         continue;
4211
4212                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4213                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4214                        lname.ln_name, PFID(&fid));
4215                 rc = 1;
4216                 break;
4217         }
4218
4219         RETURN(rc);
4220 }
4221
4222 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4223                        struct md_object *sobj, const struct lu_name *lname,
4224                        struct md_object *tobj, struct md_attr *ma)
4225 {
4226         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4227         struct mdd_device       *mdd = mdo2mdd(pobj);
4228         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4229         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4230         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4231         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4232         bool                    created = false;
4233         int                     rc;
4234
4235         ENTRY;
4236         /* If the file will being migrated, it will check whether
4237          * the file is being opened by someone else right now */
4238         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4239         if (mdd_sobj->mod_count > 0) {
4240                 CDEBUG(D_OTHER,
4241                        "%s: "DFID"%s is already opened count %d: rc = %d\n",
4242                        mdd2obd_dev(mdd)->obd_name,
4243                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4244                        mdd_sobj->mod_count, -EBUSY);
4245                 mdd_read_unlock(env, mdd_sobj);
4246                 GOTO(put, rc = -EBUSY);
4247         }
4248         mdd_read_unlock(env, mdd_sobj);
4249
4250         rc = mdd_la_get(env, mdd_sobj, so_attr);
4251         if (rc != 0)
4252                 GOTO(put, rc);
4253
4254         rc = mdd_la_get(env, mdd_pobj, pattr);
4255         if (rc != 0)
4256                 GOTO(put, rc);
4257
4258         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4259         if (rc != 0) {
4260                 if (rc > 0)
4261                         rc = 0;
4262                 GOTO(put, rc);
4263         }
4264
4265         /* Sigh, it is impossible to finish all of migration in a single
4266          * transaction, for example migrating big directory entries to the
4267          * new MDT, it needs insert all of name entries of children in the
4268          * new directory.
4269          *
4270          * So migration will be done in multiple steps and transactions.
4271          *
4272          * 1. create an orphan object on the remote MDT in one transaction.
4273          * 2. migrate extend attributes to the new target file/directory.
4274          * 3. For directory, migrate the entries to the new MDT and update
4275          * linkEA of each children. Because we can not migrate all entries
4276          * in a single transaction, so the migrating directory will become
4277          * a striped directory during migration, so once the process is
4278          * interrupted, the directory is still accessible. (During lookup,
4279          * client will locate the name by searching both original and target
4280          * object).
4281          * 4. Finally, update the name/FID to point to the new file/directory
4282          * in a separate transaction.
4283          */
4284
4285         /* step 1: Check whether the orphan object has been created, and create
4286          * orphan object on the remote MDT if needed */
4287         if (!mdd_object_exists(mdd_tobj)) {
4288                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4289                                         lname, so_attr);
4290                 if (rc != 0)
4291                         GOTO(put, rc);
4292                 created = true;
4293         }
4294
4295         LASSERT(mdd_object_exists(mdd_tobj));
4296         /* step 2: migrate xattr */
4297         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4298         if (rc != 0)
4299                 GOTO(put, rc);
4300
4301         /* step 3: migrate name entries to the orphan object */
4302         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4303                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4304                 if (rc != 0)
4305                         GOTO(put, rc);
4306                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4307                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4308                         GOTO(put, rc = 0);
4309         } else {
4310                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4311         }
4312
4313         LASSERT(mdd_object_exists(mdd_tobj));
4314         /* step 4: update name entry to the new object */
4315         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4316                                      ma);
4317         if (rc != 0)
4318                 GOTO(put, rc);
4319
4320         /* newly created target was not locked, don't cache its attributes */
4321         if (created)
4322                 mdd_invalidate(env, tobj);
4323 put:
4324         RETURN(rc);
4325 }
4326
4327 const struct md_dir_operations mdd_dir_ops = {
4328         .mdo_is_subdir     = mdd_is_subdir,
4329         .mdo_lookup        = mdd_lookup,
4330         .mdo_create        = mdd_create,
4331         .mdo_rename        = mdd_rename,
4332         .mdo_link          = mdd_link,
4333         .mdo_unlink        = mdd_unlink,
4334         .mdo_create_data   = mdd_create_data,
4335         .mdo_migrate       = mdd_migrate,
4336 };