Whamcloud - gitweb
LU-14854 mdd: proper handle error in mdd_swap_layouts()
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdd/mdd_dir.c
32  *
33  * Lustre Metadata Server (mdd) routines
34  *
35  * Author: Wang Di <wangdi@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <obd_class.h>
41 #include <obd_support.h>
42 #include <lustre_mds.h>
43 #include <lustre_fid.h>
44 #include <lustre_lmv.h>
45 #include <lustre_idmap.h>
46
47 #include "mdd_internal.h"
48
49 static const char dot[] = ".";
50 static const char dotdot[] = "..";
51
52 static struct lu_name lname_dotdot = {
53         .ln_name        = (char *) dotdot,
54         .ln_namelen     = sizeof(dotdot) - 1,
55 };
56
57 static inline int
58 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
59 {
60         if (!lu_name_is_valid(ln))
61                 return -EINVAL;
62         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
63                 return -ENAMETOOLONG;
64         else
65                 return 0;
66 }
67
68 /* Get FID from name and parent */
69 static int
70 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
71              const struct lu_attr *pattr, const struct lu_name *lname,
72              struct lu_fid *fid, unsigned int may_mask)
73 {
74         const char *name = lname->ln_name;
75         const struct dt_key *key = (const struct dt_key *)name;
76         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
77         struct dt_object *dir = mdd_object_child(mdd_obj);
78         int rc;
79
80         ENTRY;
81
82         if (unlikely(mdd_is_dead_obj(mdd_obj)))
83                 RETURN(-ESTALE);
84
85         if (!mdd_object_exists(mdd_obj))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" located on remote server\n",
90                        mdd_obj_dev_name(mdd_obj),
91                        PFID(mdd_object_fid(mdd_obj)));
92         }
93
94         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, may_mask,
95                                             DT_TGT_PARENT);
96         if (rc)
97                 RETURN(rc);
98
99         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
100                    dt_try_as_dir(env, dir)))
101                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
102         else
103                 rc = -ENOTDIR;
104
105         RETURN(rc);
106 }
107
108 int mdd_lookup(const struct lu_env *env,
109                struct md_object *pobj, const struct lu_name *lname,
110                struct lu_fid *fid, struct md_op_spec *spec)
111 {
112         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
113         int rc;
114         ENTRY;
115
116         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
117         if (rc != 0)
118                 RETURN(rc);
119
120         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
121                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
122         RETURN(rc);
123 }
124
125 /** Read the link EA into a temp buffer.
126  * Uses the mdd_thread_info::mdi_link_buf since it is generally large.
127  * A pointer to the buffer is stored in \a ldata::ld_buf.
128  *
129  * \retval 0 or error
130  */
131 static int __mdd_links_read(const struct lu_env *env,
132                             struct mdd_object *mdd_obj,
133                             struct linkea_data *ldata)
134 {
135         int rc;
136
137         if (!mdd_object_exists(mdd_obj))
138                 return -ENODATA;
139
140         /* First try a small buf */
141         LASSERT(env != NULL);
142         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_link_buf,
143                                                PAGE_SIZE);
144         if (ldata->ld_buf->lb_buf == NULL)
145                 return -ENOMEM;
146
147         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
148         if (rc == -ERANGE) {
149                 /* Buf was too small, figure out what we need. */
150                 lu_buf_free(ldata->ld_buf);
151                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
152                                    XATTR_NAME_LINK);
153                 if (rc < 0)
154                         return rc;
155                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
156                 if (ldata->ld_buf->lb_buf == NULL)
157                         return -ENOMEM;
158                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
159                                   XATTR_NAME_LINK);
160         }
161         if (rc < 0) {
162                 lu_buf_free(ldata->ld_buf);
163                 ldata->ld_buf = NULL;
164                 return rc;
165         }
166
167         return linkea_init(ldata);
168 }
169
170 int mdd_links_read(const struct lu_env *env,
171                    struct mdd_object *mdd_obj,
172                    struct linkea_data *ldata)
173 {
174         int rc;
175
176         rc = __mdd_links_read(env, mdd_obj, ldata);
177         if (!rc)
178                 rc = linkea_init(ldata);
179
180         return rc;
181 }
182
183 static int mdd_links_read_with_rec(const struct lu_env *env,
184                                    struct mdd_object *mdd_obj,
185                                    struct linkea_data *ldata)
186 {
187         int rc;
188
189         rc = __mdd_links_read(env, mdd_obj, ldata);
190         if (!rc)
191                 rc = linkea_init_with_rec(ldata);
192
193         return rc;
194 }
195
196 /**
197  * Get parent FID of the directory
198  *
199  * Read parent FID from linkEA, if that fails, then do lookup
200  * dotdot to get the parent FID.
201  *
202  * \param[in] env       execution environment
203  * \param[in] obj       object from which to find the parent FID
204  * \param[in] attr      attribute of the object
205  * \param[out] fid      fid to get the parent FID
206  *
207  * \retval              0 if getting the parent FID succeeds.
208  * \retval              negative errno if getting the parent FID fails.
209  **/
210 static inline int mdd_parent_fid(const struct lu_env *env,
211                                  struct mdd_object *obj,
212                                  const struct lu_attr *attr,
213                                  struct lu_fid *fid)
214 {
215         struct mdd_thread_info *info = mdd_env_info(env);
216         struct linkea_data ldata = { NULL };
217         struct lu_buf *buf = &info->mdi_link_buf;
218         struct lu_name lname;
219         int rc = 0;
220
221         ENTRY;
222
223         LASSERTF(S_ISDIR(mdd_object_type(obj)),
224                  "%s: FID "DFID" is not a directory type = %o\n",
225                  mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
226                  mdd_object_type(obj));
227
228         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
229         if (buf->lb_buf == NULL)
230                 GOTO(lookup, rc = 0);
231
232         ldata.ld_buf = buf;
233         rc = mdd_links_read_with_rec(env, obj, &ldata);
234         if (rc != 0)
235                 GOTO(lookup, rc);
236
237         /* the obj is not locked, don't cache attributes */
238         mdd_invalidate(env, &obj->mod_obj);
239
240         LASSERT(ldata.ld_leh != NULL);
241         /* Directory should only have 1 parent */
242         if (ldata.ld_leh->leh_reccount > 1)
243                 GOTO(lookup, rc);
244
245         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
246
247         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
248         if (likely(fid_is_sane(fid)))
249                 RETURN(0);
250 lookup:
251         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
252         RETURN(rc);
253 }
254
255 /*
256  * For root fid use special function, which does not compare version component
257  * of fid. Version component is different for root fids on all MDTs.
258  */
259 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
260 {
261         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
262                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
263 }
264
265 /*
266  * return 1: if \a tfid is the fid of the ancestor of \a mo;
267  * return 0: if not;
268  * otherwise: values < 0, errors.
269  */
270 static int mdd_is_parent(const struct lu_env *env,
271                         struct mdd_device *mdd,
272                         struct mdd_object *mo,
273                         const struct lu_attr *attr,
274                         const struct lu_fid *tfid)
275 {
276         struct mdd_object *mp;
277         struct lu_fid *pfid;
278         int rc;
279
280         LASSERT(!lu_fid_eq(mdd_object_fid(mo), tfid));
281         pfid = &mdd_env_info(env)->mdi_fid;
282
283         if (mdd_is_root(mdd, mdd_object_fid(mo)))
284                 return 0;
285
286         if (mdd_is_root(mdd, tfid))
287                 return 1;
288
289         rc = mdd_parent_fid(env, mo, attr, pfid);
290         if (rc)
291                 return rc;
292
293         while (1) {
294                 if (lu_fid_eq(pfid, tfid))
295                         return 1;
296
297                 if (mdd_is_root(mdd, pfid))
298                         return 0;
299
300                 mp = mdd_object_find(env, mdd, pfid);
301                 if (IS_ERR(mp))
302                         return PTR_ERR(mp);
303
304                 if (!mdd_object_exists(mp)) {
305                         mdd_object_put(env, mp);
306                         return -ENOENT;
307                 }
308
309                 rc = mdd_parent_fid(env, mp, attr, pfid);
310                 mdd_object_put(env, mp);
311                 if (rc)
312                         return rc;
313         }
314
315         return 0;
316 }
317
318 /*
319  * No permission check is needed.
320  *
321  * returns 1: if fid is ancestor of @mo;
322  * returns 0: if fid is not an ancestor of @mo;
323  * returns < 0: if error
324  */
325 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
326                   const struct lu_fid *fid)
327 {
328         struct mdd_device *mdd = mdo2mdd(mo);
329         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
330         int rc;
331         ENTRY;
332
333         if (!mdd_object_exists(md2mdd_obj(mo)))
334                 RETURN(-ENOENT);
335
336         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
337                 RETURN(-ENOTDIR);
338
339         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
340         if (rc != 0)
341                 RETURN(rc);
342
343         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
344         RETURN(rc);
345 }
346
347 /*
348  * Check that @dir contains no entries except (possibly) dot and dotdot.
349  *
350  * Returns:
351  *
352  *             0        empty
353  *      -ENOTDIR        not a directory object
354  *    -ENOTEMPTY        not empty
355  *           -ve        other error
356  *
357  */
358 int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir)
359 {
360         struct dt_it     *it;
361         struct dt_object *obj;
362         const struct dt_it_ops *iops;
363         int result;
364         ENTRY;
365
366         obj = mdd_object_child(dir);
367         if (!dt_try_as_dir(env, obj))
368                 RETURN(-ENOTDIR);
369
370         iops = &obj->do_index_ops->dio_it;
371         it = iops->init(env, obj, LUDA_64BITHASH);
372         if (!IS_ERR(it)) {
373                 result = iops->get(env, it, (const struct dt_key *)"");
374                 if (result > 0) {
375                         int i;
376                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
377                                 result = iops->next(env, it);
378                         if (result == 0)
379                                 result = -ENOTEMPTY;
380                         else if (result == 1)
381                                 result = 0;
382                 } else if (result == 0)
383                         /*
384                          * Huh? Index contains no zero key?
385                          */
386                         result = -EIO;
387
388                 iops->put(env, it);
389                 iops->fini(env, it);
390         } else {
391                 result = PTR_ERR(it);
392                 /* -ENODEV means no valid stripe */
393                 if (result == -ENODEV)
394                         RETURN(0);
395         }
396         RETURN(result);
397 }
398
399 /**
400  * Determine if the target object can be hard linked, and right now it only
401  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
402  * directory nlink count might exceed the maximum link count(see
403  * osd_object_ref_add), so it only check nlink for non-directories.
404  *
405  * \param[in] env       thread environment
406  * \param[in] obj       object being linked to
407  * \param[in] la        attributes of \a obj
408  *
409  * \retval              0 if \a obj can be hard linked
410  * \retval              negative error if \a obj is a directory or has too
411  *                      many links
412  */
413 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
414                           const struct lu_attr *la)
415 {
416         struct mdd_device *m = mdd_obj2mdd_dev(obj);
417         ENTRY;
418
419         LASSERT(la != NULL);
420
421         /* Subdir count limitation can be broken through
422          * (see osd_object_ref_add), so only check non-directory here. */
423         if (!S_ISDIR(la->la_mode) &&
424             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
425                 RETURN(-EMLINK);
426
427         RETURN(0);
428 }
429
430 /**
431  * Check whether it may create the cobj under the pobj.
432  *
433  * \param[in] env       execution environment
434  * \param[in] pobj      the parent directory
435  * \param[in] pattr     the attribute of the parent directory
436  * \param[in] cobj      the child to be created
437  * \param[in] check_perm        if check WRITE|EXEC permission for parent
438  *
439  * \retval              = 0 create the child under this dir is allowed
440  * \retval              negative errno create the child under this dir is
441  *                      not allowed
442  */
443 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
444                    const struct lu_attr *pattr, struct mdd_object *cobj,
445                    bool check_perm)
446 {
447         int rc = 0;
448         ENTRY;
449
450         if (cobj && mdd_object_exists(cobj))
451                 RETURN(-EEXIST);
452
453         if (mdd_is_dead_obj(pobj))
454                 RETURN(-ENOENT);
455
456         if (check_perm)
457                 rc = mdd_permission_internal_locked(env, pobj, pattr,
458                                                     MAY_WRITE | MAY_EXEC,
459                                                     DT_TGT_PARENT);
460         RETURN(rc);
461 }
462
463 /*
464  * Check whether can unlink from the pobj in the case of "cobj == NULL".
465  */
466 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
467                    const struct lu_attr *pattr, const struct lu_attr *attr)
468 {
469         int rc;
470         ENTRY;
471
472         if (mdd_is_dead_obj(pobj))
473                 RETURN(-ENOENT);
474
475         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
476                 RETURN(-EPERM);
477
478         rc = mdd_permission_internal_locked(env, pobj, pattr,
479                                             MAY_WRITE | MAY_EXEC,
480                                             DT_TGT_PARENT);
481         if (rc != 0)
482                 RETURN(rc);
483
484         if (pattr->la_flags & LUSTRE_APPEND_FL)
485                 RETURN(-EPERM);
486
487         RETURN(rc);
488 }
489
490 /*
491  * pobj == NULL is remote ops case, under such case, pobj's
492  * VTX feature has been checked already, no need check again.
493  */
494 static inline int mdd_is_sticky(const struct lu_env *env,
495                                 struct mdd_object *pobj,
496                                 const struct lu_attr *pattr,
497                                 struct mdd_object *cobj,
498                                 const struct lu_attr *cattr)
499 {
500         struct lu_ucred *uc = lu_ucred_assert(env);
501
502         if (pobj != NULL) {
503                 LASSERT(pattr != NULL);
504                 if (!(pattr->la_mode & S_ISVTX) ||
505                     (pattr->la_uid == uc->uc_fsuid))
506                         return 0;
507         }
508
509         LASSERT(cattr != NULL);
510         if (cattr->la_uid == uc->uc_fsuid)
511                 return 0;
512
513         return !md_capable(uc, CAP_FOWNER);
514 }
515
516 static int mdd_may_delete_entry(const struct lu_env *env,
517                                 struct mdd_object *pobj,
518                                 const struct lu_attr *pattr,
519                                 int check_perm)
520 {
521         ENTRY;
522
523         LASSERT(pobj != NULL);
524         if (!mdd_object_exists(pobj))
525                 RETURN(-ENOENT);
526
527         if (mdd_is_dead_obj(pobj))
528                 RETURN(-ENOENT);
529
530         if (check_perm) {
531                 int rc;
532                 rc = mdd_permission_internal_locked(env, pobj, pattr,
533                                             MAY_WRITE | MAY_EXEC,
534                                             DT_TGT_PARENT);
535                 if (rc)
536                         RETURN(rc);
537         }
538
539         if (pattr->la_flags & LUSTRE_APPEND_FL)
540                 RETURN(-EPERM);
541
542         RETURN(0);
543 }
544
545 /*
546  * Check whether it may delete the cobj from the pobj.
547  * pobj maybe NULL
548  */
549 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
550                    const struct lu_attr *tpattr, struct mdd_object *tobj,
551                    const struct lu_attr *tattr, const struct lu_attr *cattr,
552                    int check_perm, int check_empty)
553 {
554         int rc = 0;
555         ENTRY;
556
557         if (tpobj) {
558                 LASSERT(tpattr != NULL);
559                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
560                 if (rc != 0)
561                         RETURN(rc);
562         }
563
564         if (tobj == NULL)
565                 RETURN(0);
566
567         if (!mdd_object_exists(tobj))
568                 RETURN(-ENOENT);
569
570         if (mdd_is_dead_obj(tobj))
571                 RETURN(-ESTALE);
572
573         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
574                 RETURN(-EPERM);
575
576         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
577                 RETURN(-EPERM);
578
579         /* additional check the rename case */
580         if (cattr) {
581                 if (S_ISDIR(cattr->la_mode)) {
582                         if (!S_ISDIR(tattr->la_mode))
583                                 RETURN(-ENOTDIR);
584
585                         if (mdd_is_root(mdo2mdd(&tobj->mod_obj),
586                                         mdd_object_fid(tobj)))
587                                 RETURN(-EBUSY);
588                 } else if (S_ISDIR(tattr->la_mode))
589                         RETURN(-EISDIR);
590         }
591
592         if (S_ISDIR(tattr->la_mode) && check_empty)
593                 rc = mdd_dir_is_empty(env, tobj);
594
595         RETURN(rc);
596 }
597
598 /**
599  * Check whether it can create the link file(linked to @src_obj) under
600  * the target directory(@tgt_obj), and src_obj has been locked by
601  * mdd_write_lock.
602  *
603  * \param[in] env       execution environment
604  * \param[in] tgt_obj   the target directory
605  * \param[in] tattr     attributes of target directory
606  * \param[in] lname     the link name
607  * \param[in] src_obj   source object for link
608  * \param[in] cattr     attributes for source object
609  *
610  * \retval              = 0 it is allowed to create the link file under tgt_obj
611  * \retval              negative error not allowed to create the link file
612  */
613 static int mdd_link_sanity_check(const struct lu_env *env,
614                                  struct mdd_object *tgt_obj,
615                                  const struct lu_attr *tattr,
616                                  const struct lu_name *lname,
617                                  struct mdd_object *src_obj,
618                                  const struct lu_attr *cattr)
619 {
620         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
621         int rc = 0;
622         ENTRY;
623
624         if (!mdd_object_exists(src_obj))
625                 RETURN(-ENOENT);
626
627         if (mdd_is_dead_obj(src_obj))
628                 RETURN(-ESTALE);
629
630         /* Local ops, no lookup before link, check filename length here. */
631         rc = mdd_name_check(m, lname);
632         if (rc < 0)
633                 RETURN(rc);
634
635         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
636                 RETURN(-EPERM);
637
638         if (S_ISDIR(mdd_object_type(src_obj)))
639                 RETURN(-EPERM);
640
641         LASSERT(src_obj != tgt_obj);
642         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
643         if (rc != 0)
644                 RETURN(rc);
645
646         rc = __mdd_may_link(env, src_obj, cattr);
647
648         RETURN(rc);
649 }
650
651 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
652                                    const char *name, struct thandle *handle)
653 {
654         struct dt_object *next = mdd_object_child(pobj);
655         int rc;
656         ENTRY;
657
658         if (dt_try_as_dir(env, next))
659                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
660         else
661                 rc = -ENOTDIR;
662
663         RETURN(rc);
664 }
665
666 static int __mdd_index_insert_only(const struct lu_env *env,
667                                    struct mdd_object *pobj,
668                                    const struct lu_fid *lf, __u32 type,
669                                    const char *name, struct thandle *handle)
670 {
671         struct dt_object *next = mdd_object_child(pobj);
672         int rc;
673         ENTRY;
674
675         if (dt_try_as_dir(env, next)) {
676                 struct dt_insert_rec *rec = &mdd_env_info(env)->mdi_dt_rec;
677
678                 rec->rec_fid = lf;
679                 rec->rec_type = type;
680                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
681                                (const struct dt_key *)name, handle);
682         } else {
683                 rc = -ENOTDIR;
684         }
685         RETURN(rc);
686 }
687
688 /* insert named index, add reference if isdir */
689 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
690                               const struct lu_fid *lf, __u32 type,
691                               const char *name, struct thandle *handle)
692 {
693         int rc;
694         ENTRY;
695
696         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
697         if (rc == 0 && S_ISDIR(type)) {
698                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
699                 mdo_ref_add(env, pobj, handle);
700                 mdd_write_unlock(env, pobj);
701         }
702
703         RETURN(rc);
704 }
705
706 /* delete named index, drop reference if isdir */
707 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
708                               const char *name, int is_dir,
709                               struct thandle *handle)
710 {
711         int rc;
712         ENTRY;
713
714         rc = __mdd_index_delete_only(env, pobj, name, handle);
715         if (rc == 0 && is_dir) {
716                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
717                 mdo_ref_del(env, pobj, handle);
718                 mdd_write_unlock(env, pobj);
719         }
720
721         RETURN(rc);
722 }
723
724 static int mdd_llog_record_calc_size(const struct lu_env *env,
725                                      const struct lu_name *tname,
726                                      const struct lu_name *sname)
727 {
728         const struct lu_ucred   *uc = lu_ucred(env);
729         enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
730         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
731
732         if (sname != NULL)
733                 clf_flags |= CLF_RENAME;
734
735         if (uc != NULL && uc->uc_jobid[0] != '\0')
736                 clf_flags |= CLF_JOBID;
737
738         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
739                              changelog_rec_offset(clf_flags, crfe) +
740                              (tname != NULL ? tname->ln_namelen : 0) +
741                              (sname != NULL ? 1 + sname->ln_namelen : 0));
742 }
743
744 int mdd_declare_changelog_store(const struct lu_env *env,
745                                 struct mdd_device *mdd,
746                                 enum changelog_rec_type type,
747                                 const struct lu_name *tname,
748                                 const struct lu_name *sname,
749                                 struct thandle *handle)
750 {
751         struct obd_device *obd = mdd2obd_dev(mdd);
752         struct llog_ctxt *ctxt;
753         struct llog_rec_hdr rec_hdr;
754         struct thandle *llog_th;
755         int rc;
756
757         if (!mdd_changelog_enabled(env, mdd, type))
758                 return 0;
759
760         rec_hdr.lrh_len = mdd_llog_record_calc_size(env, tname, sname);
761         rec_hdr.lrh_type = CHANGELOG_REC;
762
763         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
764         if (ctxt == NULL)
765                 return -ENXIO;
766
767         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
768         if (IS_ERR(llog_th))
769                 GOTO(out_put, rc = PTR_ERR(llog_th));
770
771         rc = llog_declare_add(env, ctxt->loc_handle, &rec_hdr, llog_th);
772
773 out_put:
774         llog_ctxt_put(ctxt);
775
776         return rc;
777 }
778
779 int mdd_changelog_write_rec(const struct lu_env *env,
780                             struct llog_handle *loghandle,
781                             struct llog_rec_hdr *r,
782                             struct llog_cookie *cookie,
783                             int idx, struct thandle *th)
784 {
785         int rc;
786
787         if (r->lrh_type == CHANGELOG_REC) {
788                 struct mdd_device *mdd;
789                 struct llog_changelog_rec *rec;
790
791                 mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
792                 rec = container_of(r, struct llog_changelog_rec, cr_hdr);
793
794                 spin_lock(&mdd->mdd_cl.mc_lock);
795                 rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
796                 spin_unlock(&mdd->mdd_cl.mc_lock);
797
798                 rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
799                                                 cookie, idx, th);
800
801                 /*
802                  * if current llog is full, we will generate a new
803                  * llog, and since it's actually not an error, let's
804                  * avoid increasing index so that userspace apps
805                  * should not see a gap in the changelog sequence
806                  */
807                 if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
808                         spin_lock(&mdd->mdd_cl.mc_lock);
809                         ++mdd->mdd_cl.mc_index;
810                         spin_unlock(&mdd->mdd_cl.mc_lock);
811                 }
812         } else {
813                 rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
814                                                 cookie, idx, th);
815         }
816
817         return rc;
818 }
819
820 /** Add a changelog entry \a rec to the changelog llog
821  * \param mdd
822  * \param rec
823  * \param handle - currently ignored since llogs start their own transaction;
824  *              this will hopefully be fixed in llog rewrite
825  * \retval 0 ok
826  */
827 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
828                         struct llog_changelog_rec *rec, struct thandle *th)
829 {
830         struct obd_device       *obd = mdd2obd_dev(mdd);
831         struct llog_ctxt        *ctxt;
832         struct thandle          *llog_th;
833         int                      rc;
834
835         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
836                                             changelog_rec_varsize(&rec->cr));
837
838         /* llog_lvfs_write_rec sets the llog tail len */
839         rec->cr_hdr.lrh_type = CHANGELOG_REC;
840         rec->cr.cr_time = cl_time();
841
842         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
843         if (ctxt == NULL)
844                 return -ENXIO;
845
846         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
847         if (IS_ERR(llog_th))
848                 GOTO(out_put, rc = PTR_ERR(llog_th));
849
850         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
851         /* nested journal transaction */
852         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
853
854         /* time to recover some space ?? */
855         if (likely(!mdd->mdd_changelog_gc ||
856                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
857                    mdd->mdd_changelog_min_gc_interval >=
858                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
859                 /* save a spin_lock trip */
860                 goto out_put;
861         spin_lock(&mdd->mdd_cl.mc_lock);
862         if (likely(mdd->mdd_changelog_gc &&
863                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
864                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
865                         mdd->mdd_changelog_min_gc_interval)) {
866                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
867                              mdd->mdd_changelog_min_free_cat_entries ||
868                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
869                         CWARN("%s:%s low on changelog_catalog free entries, "
870                               "starting ChangeLog garbage collection thread\n",
871                               obd->obd_name,
872                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
873                                 " simulate" : "");
874
875                         /* indicate further kthread run will occur outside
876                          * right after current journal transaction filling has
877                          * completed
878                          */
879                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
880                 }
881                 /* next check in mdd_changelog_min_gc_interval anyway
882                  */
883                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
884         }
885         spin_unlock(&mdd->mdd_cl.mc_lock);
886 out_put:
887         llog_ctxt_put(ctxt);
888         if (rc > 0)
889                 rc = 0;
890         return rc;
891 }
892
893 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
894                                          const struct lu_fid *sfid,
895                                          const struct lu_fid *spfid,
896                                          const struct lu_name *sname)
897 {
898         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
899         size_t extsize;
900
901         LASSERT(sfid != NULL);
902         LASSERT(spfid != NULL);
903         LASSERT(sname != NULL);
904
905         extsize = sname->ln_namelen + 1;
906
907         rnm->cr_sfid = *sfid;
908         rnm->cr_spfid = *spfid;
909
910         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
911         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
912         rec->cr_namelen += extsize;
913 }
914
915 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
916 {
917         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
918
919         if (jobid == NULL || jobid[0] == '\0')
920                 return;
921
922         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
923 }
924
925 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
926 {
927         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
928
929         ef->cr_extra_flags = eflags;
930 }
931
932 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
933                                     __u64 uid, __u64 gid)
934 {
935         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
936
937         uidgid->cr_uid = uid;
938         uidgid->cr_gid = gid;
939 }
940
941 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
942                                  lnet_nid_t nid)
943 {
944         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
945
946         clnid->cr_nid = nid;
947 }
948
949 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
950 {
951         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
952
953         omd->cr_openflags = flags;
954 }
955
956 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
957                                    const char *xattr_name)
958 {
959         struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
960
961         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
962 }
963
964 /** Store a namespace change changelog record
965  * If this fails, we must fail the whole transaction; we don't
966  * want the change to commit without the log entry.
967  * \param target - mdd_object of change
968  * \param tpfid - target parent dir/object fid
969  * \param sfid - source object fid
970  * \param spfid - source parent fid
971  * \param tname - target name string
972  * \param sname - source name string
973  * \param handle - transaction handle
974  */
975 int mdd_changelog_ns_store(const struct lu_env *env,
976                            struct mdd_device *mdd,
977                            enum changelog_rec_type type,
978                            enum changelog_rec_flags clf_flags,
979                            struct mdd_object *target,
980                            const struct lu_fid *tpfid,
981                            const struct lu_fid *sfid,
982                            const struct lu_fid *spfid,
983                            const struct lu_name *tname,
984                            const struct lu_name *sname,
985                            struct thandle *handle)
986 {
987         const struct lu_ucred           *uc = lu_ucred(env);
988         struct llog_changelog_rec       *rec;
989         struct lu_buf                   *buf;
990         int                              reclen;
991         __u64                            xflags = CLFE_INVALID;
992         int                              rc;
993         ENTRY;
994
995         if (!mdd_changelog_enabled(env, mdd, type))
996                 RETURN(0);
997
998         LASSERT(tpfid != NULL);
999         LASSERT(tname != NULL);
1000         LASSERT(handle != NULL);
1001
1002         reclen = mdd_llog_record_calc_size(env, tname, sname);
1003         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_chlg_buf, reclen);
1004         if (buf->lb_buf == NULL)
1005                 RETURN(-ENOMEM);
1006         rec = buf->lb_buf;
1007
1008         clf_flags &= CLF_FLAGMASK;
1009         clf_flags |= CLF_EXTRA_FLAGS;
1010
1011         if (uc) {
1012                 if (uc->uc_jobid[0] != '\0')
1013                         clf_flags |= CLF_JOBID;
1014                 xflags |= CLFE_UIDGID;
1015                 xflags |= CLFE_NID;
1016         }
1017
1018         if (sname != NULL)
1019                 clf_flags |= CLF_RENAME;
1020         else
1021                 clf_flags |= CLF_VERSION;
1022
1023         rec->cr.cr_flags = clf_flags;
1024
1025         if (clf_flags & CLF_EXTRA_FLAGS) {
1026                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
1027                 if (xflags & CLFE_UIDGID)
1028                         mdd_changelog_rec_extra_uidgid(&rec->cr,
1029                                                        uc->uc_uid, uc->uc_gid);
1030                 if (xflags & CLFE_NID)
1031                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
1032         }
1033
1034         rec->cr.cr_type = (__u32)type;
1035         rec->cr.cr_pfid = *tpfid;
1036         rec->cr.cr_namelen = tname->ln_namelen;
1037         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1038
1039         if (clf_flags & CLF_RENAME)
1040                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1041
1042         if (clf_flags & CLF_JOBID)
1043                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1044
1045         if (likely(target != NULL)) {
1046                 rec->cr.cr_tfid = *mdd_object_fid(target);
1047                 target->mod_cltime = ktime_get();
1048         } else {
1049                 fid_zero(&rec->cr.cr_tfid);
1050         }
1051
1052         rc = mdd_changelog_store(env, mdd, rec, handle);
1053         if (rc < 0) {
1054                 CERROR("%s: cannot store changelog record: type = %d, "
1055                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1056                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1057                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1058                 return -EFAULT;
1059         }
1060
1061         return 0;
1062 }
1063
1064 static int __mdd_links_add(const struct lu_env *env,
1065                            struct mdd_object *mdd_obj,
1066                            struct linkea_data *ldata,
1067                            const struct lu_name *lname,
1068                            const struct lu_fid *pfid,
1069                            int first, int check)
1070 {
1071         /* cattr is set in mdd_link */
1072         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1073         int rc;
1074
1075         if (ldata->ld_leh == NULL) {
1076                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1077                 if (rc) {
1078                         if (rc != -ENODATA)
1079                                 return rc;
1080                         rc = linkea_data_new(ldata,
1081                                              &mdd_env_info(env)->mdi_link_buf);
1082                         if (rc)
1083                                 return rc;
1084                 }
1085         }
1086
1087         if (check) {
1088                 rc = linkea_links_find(ldata, lname, pfid);
1089                 if (rc && rc != -ENOENT)
1090                         return rc;
1091                 if (rc == 0)
1092                         return -EEXIST;
1093         }
1094
1095         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1096                 struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1097
1098                 *tfid = *pfid;
1099                 tfid->f_ver = ~0;
1100                 linkea_add_buf(ldata, lname, tfid, false);
1101         }
1102
1103         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1104                 linkea_add_buf(ldata, lname, pfid, false);
1105
1106         /* For encrypted file, we want to limit number of hard links to what
1107          * linkEA can contain. So ask to return error in case of overflow.
1108          * Currently linkEA stores 4KiB of links, that is 14 NAME_MAX links,
1109          * or 119 16-byte names.
1110          */
1111         return linkea_add_buf(ldata, lname, pfid,
1112                               cattr->la_valid & LA_FLAGS &&
1113                               cattr->la_flags & LUSTRE_ENCRYPT_FL);
1114 }
1115
1116 static int __mdd_links_del(const struct lu_env *env,
1117                            struct mdd_object *mdd_obj,
1118                            struct linkea_data *ldata,
1119                            const struct lu_name *lname,
1120                            const struct lu_fid *pfid)
1121 {
1122         /* cattr is set in mdd_link */
1123         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1124         int rc;
1125
1126         if (ldata->ld_leh == NULL) {
1127                 rc = mdd_links_read(env, mdd_obj, ldata);
1128                 if (rc)
1129                         return rc;
1130         }
1131
1132         rc = linkea_links_find(ldata, lname, pfid);
1133         if (rc)
1134                 return rc;
1135
1136         linkea_del_buf(ldata, lname,
1137                        cattr->la_valid & LA_FLAGS &&
1138                        cattr->la_flags & LUSTRE_ENCRYPT_FL);
1139         return 0;
1140 }
1141
1142 static int mdd_linkea_prepare(const struct lu_env *env,
1143                               struct mdd_object *mdd_obj,
1144                               const struct lu_fid *oldpfid,
1145                               const struct lu_name *oldlname,
1146                               const struct lu_fid *newpfid,
1147                               const struct lu_name *newlname,
1148                               int first, int check,
1149                               struct linkea_data *ldata)
1150 {
1151         int rc = 0;
1152         ENTRY;
1153
1154         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1155                 RETURN(0);
1156
1157         LASSERT(oldpfid != NULL || newpfid != NULL);
1158
1159         if (mdd_obj->mod_flags & DEAD_OBJ)
1160                 /* Unnecessary to update linkEA for dead object.  */
1161                 RETURN(0);
1162
1163         if (oldpfid != NULL) {
1164                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1165                 if (rc) {
1166                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1167                                 RETURN(rc);
1168
1169                         /* No changes done. */
1170                         rc = 0;
1171                 }
1172         }
1173
1174         /* If renaming, add the new record */
1175         if (newpfid != NULL)
1176                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1177                                      first, check);
1178
1179         RETURN(rc);
1180 }
1181
1182 int mdd_links_rename(const struct lu_env *env,
1183                      struct mdd_object *mdd_obj,
1184                      const struct lu_fid *oldpfid,
1185                      const struct lu_name *oldlname,
1186                      const struct lu_fid *newpfid,
1187                      const struct lu_name *newlname,
1188                      struct thandle *handle,
1189                      struct linkea_data *ldata,
1190                      int first, int check)
1191 {
1192         int rc = 0;
1193         ENTRY;
1194
1195         if (ldata == NULL) {
1196                 ldata = &mdd_env_info(env)->mdi_link_data;
1197                 memset(ldata, 0, sizeof(*ldata));
1198                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1199                                         newpfid, newlname, first, check, ldata);
1200                 if (rc)
1201                         GOTO(out, rc);
1202         }
1203
1204         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1205                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1206
1207         GOTO(out, rc);
1208
1209 out:
1210         if (rc != 0) {
1211                 if (newlname == NULL)
1212                         CERROR("link_ea add failed %d "DFID"\n",
1213                                rc, PFID(mdd_object_fid(mdd_obj)));
1214                 else if (oldpfid == NULL)
1215                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1216                                newlname->ln_namelen, newlname->ln_name, rc,
1217                                PFID(mdd_object_fid(mdd_obj)));
1218                 else if (newpfid == NULL)
1219                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1220                                oldlname->ln_namelen, oldlname->ln_name, rc,
1221                                PFID(mdd_object_fid(mdd_obj)));
1222                 else
1223                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1224                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1225                                newlname->ln_namelen, newlname->ln_name, rc,
1226                                PFID(mdd_object_fid(mdd_obj)));
1227         }
1228
1229         if (is_vmalloc_addr(ldata->ld_buf))
1230                 /* if we vmalloced a large buffer drop it */
1231                 lu_buf_free(ldata->ld_buf);
1232
1233         return rc;
1234 }
1235
1236 static inline int mdd_links_add(const struct lu_env *env,
1237                                 struct mdd_object *mdd_obj,
1238                                 const struct lu_fid *pfid,
1239                                 const struct lu_name *lname,
1240                                 struct thandle *handle,
1241                                 struct linkea_data *ldata, int first)
1242 {
1243         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1244                                 pfid, lname, handle, ldata, first, 0);
1245 }
1246
1247 static inline int mdd_links_del(const struct lu_env *env,
1248                                 struct mdd_object *mdd_obj,
1249                                 const struct lu_fid *pfid,
1250                                 const struct lu_name *lname,
1251                                 struct thandle *handle)
1252 {
1253         return mdd_links_rename(env, mdd_obj, pfid, lname,
1254                                 NULL, NULL, handle, NULL, 0, 0);
1255 }
1256
1257 /** Read the link EA into a temp buffer.
1258  * Uses the name_buf since it is generally large.
1259  * \retval IS_ERR err
1260  * \retval ptr to \a lu_buf (always \a mdi_link_buf)
1261  */
1262 struct lu_buf *mdd_links_get(const struct lu_env *env,
1263                              struct mdd_object *mdd_obj)
1264 {
1265         struct linkea_data ldata = { NULL };
1266         int rc;
1267
1268         rc = mdd_links_read(env, mdd_obj, &ldata);
1269         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1270 }
1271
1272 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1273                     struct linkea_data *ldata, struct thandle *handle)
1274 {
1275         const struct lu_buf *buf;
1276         int                 rc;
1277
1278         if (ldata == NULL || ldata->ld_buf == NULL ||
1279             ldata->ld_leh == NULL)
1280                 return 0;
1281
1282         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1283                 return 0;
1284
1285 again:
1286         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1287                                 ldata->ld_leh->leh_len);
1288         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1289         if (unlikely(rc == -ENOSPC)) {
1290                 rc = linkea_overflow_shrink(ldata);
1291                 if (likely(rc > 0))
1292                         goto again;
1293         }
1294
1295         return rc;
1296 }
1297
1298 static int mdd_declare_links_add(const struct lu_env *env,
1299                                  struct mdd_object *mdd_obj,
1300                                  struct thandle *handle,
1301                                  struct linkea_data *ldata)
1302 {
1303         int     rc;
1304         int     ea_len;
1305         void    *linkea;
1306
1307         if (ldata != NULL && ldata->ld_leh != NULL) {
1308                 ea_len = ldata->ld_leh->leh_len;
1309                 linkea = ldata->ld_buf->lb_buf;
1310         } else {
1311                 ea_len = MAX_LINKEA_SIZE;
1312                 linkea = NULL;
1313         }
1314
1315         rc = mdo_declare_xattr_set(env, mdd_obj,
1316                                    mdd_buf_get_const(env, linkea, ea_len),
1317                                    XATTR_NAME_LINK, 0, handle);
1318
1319         return rc;
1320 }
1321
1322 static inline int mdd_declare_links_del(const struct lu_env *env,
1323                                         struct mdd_object *c,
1324                                         struct thandle *handle)
1325 {
1326         int rc = 0;
1327
1328         /* For directory, the linkEA will be removed together
1329          * with the object. */
1330         if (!S_ISDIR(mdd_object_type(c)))
1331                 rc = mdd_declare_links_add(env, c, handle, NULL);
1332
1333         return rc;
1334 }
1335
1336 static int mdd_declare_link(const struct lu_env *env,
1337                             struct mdd_device *mdd,
1338                             struct mdd_object *p,
1339                             struct mdd_object *c,
1340                             const struct lu_name *name,
1341                             struct thandle *handle,
1342                             struct lu_attr *la,
1343                             struct linkea_data *data)
1344 {
1345         struct lu_fid tfid = *mdd_object_fid(c);
1346         int rc;
1347
1348         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1349                 tfid.f_oid = cfs_fail_val;
1350
1351         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1352                                       name->ln_name, handle);
1353         if (rc != 0)
1354                 return rc;
1355
1356         rc = mdo_declare_ref_add(env, c, handle);
1357         if (rc != 0)
1358                 return rc;
1359
1360         la->la_valid = LA_CTIME | LA_MTIME;
1361         rc = mdo_declare_attr_set(env, p, la, handle);
1362         if (rc != 0)
1363                 return rc;
1364
1365         la->la_valid = LA_CTIME;
1366         rc = mdo_declare_attr_set(env, c, la, handle);
1367         if (rc != 0)
1368                 return rc;
1369
1370         rc = mdd_declare_links_add(env, c, handle, data);
1371         if (rc != 0)
1372                 return rc;
1373
1374         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1375                                          handle);
1376
1377         return rc;
1378 }
1379
1380 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1381                     struct md_object *src_obj, const struct lu_name *lname,
1382                     struct md_attr *ma)
1383 {
1384         const char *name = lname->ln_name;
1385         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
1386         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1387         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1388         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1389         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
1390         struct mdd_device *mdd = mdo2mdd(src_obj);
1391         struct thandle *handle;
1392         struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1393         struct linkea_data *ldata = &mdd_env_info(env)->mdi_link_data;
1394         int rc;
1395         ENTRY;
1396
1397         rc = mdd_la_get(env, mdd_sobj, cattr);
1398         if (rc != 0)
1399                 RETURN(rc);
1400
1401         rc = mdd_la_get(env, mdd_tobj, tattr);
1402         if (rc != 0)
1403                 RETURN(rc);
1404
1405         /*
1406          * If we are using project inheritance, we only allow hard link
1407          * creation in our tree when the project IDs are the same;
1408          * otherwise the tree quota mechanism could be circumvented.
1409          */
1410         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1411             (tattr->la_projid != cattr->la_projid))
1412                 RETURN(-EXDEV);
1413
1414         handle = mdd_trans_create(env, mdd);
1415         if (IS_ERR(handle))
1416                 GOTO(out_pending, rc = PTR_ERR(handle));
1417
1418         memset(ldata, 0, sizeof(*ldata));
1419
1420         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1421         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1422
1423         /* Note: even this function will change ldata, but it comes from
1424          * thread_info, which is completely temporary and only seen in
1425          * this function, so we do not need reset ldata once it fails.*/
1426         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1427                                 mdd_object_fid(mdd_tobj), lname, 0, 0, ldata);
1428         if (rc != 0)
1429                 GOTO(stop, rc);
1430
1431         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1432                               la, ldata);
1433         if (rc)
1434                 GOTO(stop, rc);
1435
1436         rc = mdd_trans_start(env, mdd, handle);
1437         if (rc)
1438                 GOTO(stop, rc);
1439
1440         mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
1441         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1442                                    cattr);
1443         if (rc)
1444                 GOTO(out_unlock, rc);
1445
1446         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1447                 rc = mdo_ref_add(env, mdd_sobj, handle);
1448                 if (rc != 0)
1449                         GOTO(out_unlock, rc);
1450         }
1451
1452         *tfid = *mdd_object_fid(mdd_sobj);
1453         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1454                 tfid->f_oid = cfs_fail_val;
1455
1456         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1457                                      mdd_object_type(mdd_sobj), name, handle);
1458         if (rc != 0) {
1459                 mdo_ref_del(env, mdd_sobj, handle);
1460                 GOTO(out_unlock, rc);
1461         }
1462
1463         la->la_valid = LA_CTIME | LA_MTIME;
1464         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1465         if (rc)
1466                 GOTO(out_unlock, rc);
1467
1468         la->la_valid = LA_CTIME;
1469         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1470         if (rc == 0)
1471                 /* Note: The failure of links_add should not cause the
1472                  * link failure, so do not check return value. */
1473                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_tobj),
1474                               lname, handle, ldata, 0);
1475
1476         EXIT;
1477 out_unlock:
1478         mdd_write_unlock(env, mdd_sobj);
1479         if (rc == 0)
1480                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1481                                             mdd_object_fid(mdd_tobj), NULL,
1482                                             NULL, lname, NULL, handle);
1483 stop:
1484         rc = mdd_trans_stop(env, mdd, rc, handle);
1485         if (is_vmalloc_addr(ldata->ld_buf))
1486                 /* if we vmalloced a large buffer drop it */
1487                 lu_buf_free(ldata->ld_buf);
1488 out_pending:
1489         return rc;
1490 }
1491
1492 static int mdd_mark_orphan_object(const struct lu_env *env,
1493                                 struct mdd_object *obj, struct thandle *handle,
1494                                 bool declare)
1495 {
1496         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1497         int rc;
1498
1499         attr->la_valid = LA_FLAGS;
1500         attr->la_flags = LUSTRE_ORPHAN_FL;
1501
1502         if (declare)
1503                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1504         else
1505                 rc = mdo_attr_set(env, obj, attr, handle);
1506
1507         return rc;
1508 }
1509
1510 static int mdd_declare_finish_unlink(const struct lu_env *env,
1511                                      struct mdd_object *obj,
1512                                      struct thandle *handle)
1513 {
1514         int rc;
1515
1516         /* Sigh, we do not know if the unlink object will become orphan in
1517          * declare phase, but fortunately the flags here does not matter
1518          * in current declare implementation */
1519         rc = mdd_mark_orphan_object(env, obj, handle, true);
1520         if (rc != 0)
1521                 return rc;
1522
1523         rc = mdo_declare_destroy(env, obj, handle);
1524         if (rc != 0)
1525                 return rc;
1526
1527         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1528         if (rc != 0)
1529                 return rc;
1530
1531         return mdd_declare_links_del(env, obj, handle);
1532 }
1533
1534 /* caller should take a lock before calling */
1535 int mdd_finish_unlink(const struct lu_env *env,
1536                       struct mdd_object *obj, struct md_attr *ma,
1537                       struct mdd_object *pobj,
1538                       const struct lu_name *lname,
1539                       struct thandle *th)
1540 {
1541         int rc = 0;
1542         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1543         ENTRY;
1544
1545         LASSERT(mdd_write_locked(env, obj) != 0);
1546
1547         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1548                 /* add new orphan and the object
1549                  * will be deleted during mdd_close() */
1550                 obj->mod_flags |= DEAD_OBJ;
1551                 if (obj->mod_count) {
1552                         rc = mdd_orphan_insert(env, obj, th);
1553                         if (rc == 0)
1554                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1555                                         "orphan list, open count = %d\n",
1556                                         PFID(mdd_object_fid(obj)),
1557                                         obj->mod_count);
1558                         else
1559                                 CERROR("Object "DFID" fail to be an orphan, "
1560                                        "open count = %d, maybe cause failed "
1561                                        "open replay\n",
1562                                         PFID(mdd_object_fid(obj)),
1563                                         obj->mod_count);
1564
1565                         /* mark object as an orphan here, not
1566                          * before mdd_orphan_insert() as racing
1567                          * mdd_la_get() may propagate ORPHAN_OBJ
1568                          * causing the asserition */
1569                         rc = mdd_mark_orphan_object(env, obj, th, false);
1570                 } else {
1571                         rc = mdo_destroy(env, obj, th);
1572                 }
1573         } else if (!is_dir) {
1574                 /* old files may not have link ea; ignore errors */
1575                 mdd_links_del(env, obj, mdd_object_fid(pobj), lname, th);
1576         }
1577
1578         RETURN(rc);
1579 }
1580
1581 /*
1582  * pobj maybe NULL
1583  * has mdd_write_lock on cobj already, but not on pobj yet
1584  */
1585 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1586                             const struct lu_attr *pattr,
1587                             struct mdd_object *cobj,
1588                             const struct lu_attr *cattr)
1589 {
1590         int rc;
1591         ENTRY;
1592
1593         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1594
1595         RETURN(rc);
1596 }
1597
1598 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1599                               struct mdd_object *p, struct mdd_object *c,
1600                               const struct lu_name *name, struct md_attr *ma,
1601                               struct thandle *handle, int no_name, int is_dir)
1602 {
1603         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
1604         int rc;
1605
1606         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1607                 if (likely(no_name == 0)) {
1608                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1609                                                       handle);
1610                         if (rc != 0)
1611                                 return rc;
1612                 }
1613
1614                 if (is_dir != 0) {
1615                         rc = mdo_declare_ref_del(env, p, handle);
1616                         if (rc != 0)
1617                                 return rc;
1618                 }
1619         }
1620
1621         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1622         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1623         la->la_valid = LA_CTIME | LA_MTIME;
1624         rc = mdo_declare_attr_set(env, p, la, handle);
1625         if (rc)
1626                 return rc;
1627
1628         if (c != NULL) {
1629                 rc = mdo_declare_ref_del(env, c, handle);
1630                 if (rc)
1631                         return rc;
1632
1633                 rc = mdo_declare_ref_del(env, c, handle);
1634                 if (rc)
1635                         return rc;
1636
1637                 la->la_valid = LA_CTIME;
1638                 rc = mdo_declare_attr_set(env, c, la, handle);
1639                 if (rc)
1640                         return rc;
1641
1642                 rc = mdd_declare_finish_unlink(env, c, handle);
1643                 if (rc)
1644                         return rc;
1645
1646                 /* FIXME: need changelog for remove entry */
1647                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1648                                                  NULL, handle);
1649         }
1650
1651         return rc;
1652 }
1653
1654 /*
1655  * test if a file has an HSM archive
1656  * if HSM attributes are not found in ma update them from
1657  * HSM xattr
1658  */
1659 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1660                                    struct mdd_object *obj,
1661                                    struct md_attr *ma)
1662 {
1663         ENTRY;
1664
1665         if (!(ma->ma_valid & MA_HSM)) {
1666                 /* no HSM MD provided, read xattr */
1667                 struct lu_buf   *hsm_buf;
1668                 const size_t     buflen = sizeof(struct hsm_attrs);
1669                 int              rc;
1670
1671                 hsm_buf = mdd_buf_get(env, NULL, 0);
1672                 lu_buf_alloc(hsm_buf, buflen);
1673                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1674                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1675                 lu_buf_free(hsm_buf);
1676                 if (rc < 0)
1677                         RETURN(false);
1678
1679                 ma->ma_valid |= MA_HSM;
1680         }
1681         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1682                 RETURN(true);
1683         RETURN(false);
1684 }
1685
1686 /**
1687  * Delete name entry and the object.
1688  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1689  * does not exist for this object, and it could only happen during resending
1690  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1691  * is also needed in this case(needed by changelog), so we have to add another
1692  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1693  * the ENOENT failure should be able to be fixed by redo mechanism.
1694  */
1695 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1696                       struct md_object *cobj, const struct lu_name *lname,
1697                       struct md_attr *ma, int no_name)
1698 {
1699         const char *name = lname->ln_name;
1700         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1701         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1702         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
1703         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1704         struct mdd_object *mdd_cobj = NULL;
1705         struct mdd_device *mdd = mdo2mdd(pobj);
1706         struct thandle    *handle;
1707         int rc, is_dir = 0, cl_flags = 0;
1708         ENTRY;
1709
1710         /* let shutdown to start */
1711         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
1712
1713         /* cobj == NULL means only delete name entry */
1714         if (likely(cobj != NULL)) {
1715                 mdd_cobj = md2mdd_obj(cobj);
1716                 if (mdd_object_exists(mdd_cobj) == 0)
1717                         RETURN(-ENOENT);
1718         }
1719
1720         rc = mdd_la_get(env, mdd_pobj, pattr);
1721         if (rc)
1722                 RETURN(rc);
1723
1724         if (likely(mdd_cobj != NULL)) {
1725                 /* fetch cattr */
1726                 rc = mdd_la_get(env, mdd_cobj, cattr);
1727                 if (rc)
1728                         RETURN(rc);
1729
1730                 is_dir = S_ISDIR(cattr->la_mode);
1731                 /* search for an existing archive.
1732                  * we should check ahead as the object
1733                  * can be destroyed in this transaction */
1734                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1735                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1736         }
1737
1738         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1739         if (rc)
1740                 RETURN(rc);
1741
1742         handle = mdd_trans_create(env, mdd);
1743         if (IS_ERR(handle))
1744                 RETURN(PTR_ERR(handle));
1745
1746         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1747                                 lname, ma, handle, no_name, is_dir);
1748         if (rc)
1749                 GOTO(stop, rc);
1750
1751         rc = mdd_trans_start(env, mdd, handle);
1752         if (rc)
1753                 GOTO(stop, rc);
1754
1755         if (likely(mdd_cobj != NULL))
1756                 mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
1757
1758         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1759                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1760                 if (rc)
1761                         GOTO(cleanup, rc);
1762         }
1763
1764         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1765             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1766                 GOTO(cleanup, rc = 0);
1767
1768         if (likely(mdd_cobj != NULL)) {
1769                 rc = mdo_ref_del(env, mdd_cobj, handle);
1770                 if (rc != 0) {
1771                         __mdd_index_insert_only(env, mdd_pobj,
1772                                                 mdd_object_fid(mdd_cobj),
1773                                                 mdd_object_type(mdd_cobj),
1774                                                 name, handle);
1775                         GOTO(cleanup, rc);
1776                 }
1777
1778                 if (is_dir)
1779                         /* unlink dot */
1780                         mdo_ref_del(env, mdd_cobj, handle);
1781
1782                 /* fetch updated nlink */
1783                 rc = mdd_la_get(env, mdd_cobj, cattr);
1784                 if (rc)
1785                         GOTO(cleanup, rc);
1786         }
1787
1788         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1789         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1790
1791         la->la_valid = LA_CTIME | LA_MTIME;
1792         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1793         if (rc)
1794                 GOTO(cleanup, rc);
1795
1796         /* Enough for only unlink the entry */
1797         if (unlikely(mdd_cobj == NULL))
1798                 GOTO(cleanup, rc);
1799
1800         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1801                 /* update ctime of an unlinked file only if it is still
1802                  * opened or a link still exists */
1803                 la->la_valid = LA_CTIME;
1804                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1805                 if (rc)
1806                         GOTO(cleanup, rc);
1807         }
1808
1809         /* XXX: this transfer to ma will be removed with LOD/OSP */
1810         ma->ma_attr = *cattr;
1811         ma->ma_valid |= MA_INODE;
1812         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1813         if (rc != 0)
1814                 GOTO(cleanup, rc);
1815
1816         /* fetch updated nlink */
1817         rc = mdd_la_get(env, mdd_cobj, cattr);
1818         /* if object is removed then we can't get its attrs,
1819          * use last get */
1820         if (rc == -ENOENT) {
1821                 cattr->la_nlink = 0;
1822                 rc = 0;
1823         }
1824
1825         if (cattr->la_nlink == 0) {
1826                 ma->ma_attr = *cattr;
1827                 ma->ma_valid |= MA_INODE;
1828         }
1829
1830         EXIT;
1831 cleanup:
1832         if (likely(mdd_cobj != NULL))
1833                 mdd_write_unlock(env, mdd_cobj);
1834
1835         if (rc == 0) {
1836                 if (cattr->la_nlink == 0)
1837                         cl_flags |= CLF_UNLINK_LAST;
1838                 else
1839                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1840
1841                 rc = mdd_changelog_ns_store(env, mdd,
1842                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1843                         mdd_cobj, mdd_object_fid(mdd_pobj), NULL, NULL,
1844                         lname, NULL, handle);
1845         }
1846
1847 stop:
1848         rc = mdd_trans_stop(env, mdd, rc, handle);
1849
1850         return rc;
1851 }
1852
1853 /*
1854  * The permission has been checked when obj created, no need check again.
1855  */
1856 static int mdd_cd_sanity_check(const struct lu_env *env,
1857                                struct mdd_object *obj)
1858 {
1859         ENTRY;
1860
1861         /* EEXIST check */
1862         if (!obj || mdd_is_dead_obj(obj))
1863                 RETURN(-ENOENT);
1864
1865         RETURN(0);
1866 }
1867
1868 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1869                            struct md_object *cobj,
1870                            const struct md_op_spec *spec, struct md_attr *ma)
1871 {
1872         struct mdd_device *mdd = mdo2mdd(cobj);
1873         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1874         struct mdd_object *son = md2mdd_obj(cobj);
1875         struct thandle *handle;
1876         const struct lu_buf *buf;
1877         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
1878         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
1879         int rc;
1880         ENTRY;
1881
1882         rc = mdd_cd_sanity_check(env, son);
1883         if (rc)
1884                 RETURN(rc);
1885
1886         if (!md_should_create(spec->sp_cr_flags))
1887                 RETURN(0);
1888
1889         /*
1890          * there are following use cases for this function:
1891          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1892          *    striping can be specified or not
1893          * 2) CMD?
1894          */
1895         rc = mdd_la_get(env, son, attr);
1896         if (rc)
1897                 RETURN(rc);
1898
1899         /* calling ->ah_make_hint() is used to transfer information from parent */
1900         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1901
1902         handle = mdd_trans_create(env, mdd);
1903         if (IS_ERR(handle))
1904                 GOTO(out_free, rc = PTR_ERR(handle));
1905
1906         /*
1907          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1908          * Should this be fixed?
1909          */
1910         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1911                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1912                spec->sp_cr_flags, spec->no_create);
1913
1914         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1915                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1916                                         spec->u.sp_ea.eadatalen);
1917         } else {
1918                 buf = &LU_BUF_NULL;
1919         }
1920
1921         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1922                                   XATTR_NAME_LOV, 0, handle);
1923         if (rc)
1924                 GOTO(stop, rc);
1925
1926         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1927                                          handle);
1928         if (rc)
1929                 GOTO(stop, rc);
1930
1931         rc = mdd_trans_start(env, mdd, handle);
1932         if (rc)
1933                 GOTO(stop, rc);
1934
1935         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1936                           0, handle);
1937
1938         if (rc)
1939                 GOTO(stop, rc);
1940
1941         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle,
1942                                       NULL);
1943
1944 stop:
1945         rc = mdd_trans_stop(env, mdd, rc, handle);
1946
1947 out_free:
1948         RETURN(rc);
1949 }
1950
1951 static int mdd_declare_object_initialize(const struct lu_env *env,
1952                                          struct mdd_object *parent,
1953                                          struct mdd_object *child,
1954                                          const struct lu_attr *attr,
1955                                          struct thandle *handle)
1956 {
1957         int rc;
1958         ENTRY;
1959
1960         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1961         if (!S_ISDIR(attr->la_mode))
1962                 RETURN(0);
1963
1964         rc = mdo_declare_index_insert(env, child, mdd_object_fid(child),
1965                                       S_IFDIR, dot, handle);
1966         if (rc != 0)
1967                 RETURN(rc);
1968
1969         rc = mdo_declare_ref_add(env, child, handle);
1970         if (rc != 0)
1971                 RETURN(rc);
1972
1973         rc = mdo_declare_index_insert(env, child, mdd_object_fid(parent),
1974                                       S_IFDIR, dotdot, handle);
1975
1976         RETURN(rc);
1977 }
1978
1979 static int mdd_object_initialize(const struct lu_env *env,
1980                                  const struct lu_fid *pfid,
1981                                  struct mdd_object *child,
1982                                  struct lu_attr *attr,
1983                                  struct thandle *handle)
1984 {
1985         int rc = 0;
1986         ENTRY;
1987
1988         if (S_ISDIR(attr->la_mode)) {
1989                 /* Add "." and ".." for newly created dir */
1990                 mdo_ref_add(env, child, handle);
1991                 rc = __mdd_index_insert_only(env, child, mdd_object_fid(child),
1992                                              S_IFDIR, dot, handle);
1993                 if (rc == 0)
1994                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1995                                                      dotdot, handle);
1996                 if (rc != 0)
1997                         mdo_ref_del(env, child, handle);
1998         }
1999
2000         RETURN(rc);
2001 }
2002
2003 /**
2004  * This function checks whether it can create a file/dir under the
2005  * directory(@pobj). The directory(@pobj) is not being locked by
2006  * mdd lock.
2007  *
2008  * \param[in] env       execution environment
2009  * \param[in] pobj      the directory to create files
2010  * \param[in] pattr     the attributes of the directory
2011  * \param[in] lname     the name of the created file/dir
2012  * \param[in] cattr     the attributes of the file/dir
2013  * \param[in] spec      create specification
2014  *
2015  * \retval              = 0 it is allowed to create file/dir under
2016  *                      the directory
2017  * \retval              negative error not allowed to create file/dir
2018  *                      under the directory
2019  */
2020 static int mdd_create_sanity_check(const struct lu_env *env,
2021                                    struct md_object *pobj,
2022                                    const struct lu_attr *pattr,
2023                                    const struct lu_name *lname,
2024                                    struct lu_attr *cattr,
2025                                    struct md_op_spec *spec)
2026 {
2027         struct mdd_thread_info *info = mdd_env_info(env);
2028         struct lu_fid *fid = &info->mdi_fid;
2029         struct mdd_object *obj = md2mdd_obj(pobj);
2030         struct mdd_device *m = mdo2mdd(pobj);
2031         bool check_perm = true;
2032         int rc;
2033         ENTRY;
2034
2035         /* EEXIST check */
2036         if (mdd_is_dead_obj(obj))
2037                 RETURN(-ENOENT);
2038
2039         /*
2040          * In some cases this lookup is not needed - we know before if name
2041          * exists or not because MDT performs lookup for it.
2042          * name length check is done in lookup.
2043          */
2044         if (spec->sp_cr_lookup) {
2045                 /*
2046                  * Check if the name already exist, though it will be checked in
2047                  * _index_insert also, for avoiding rolling back if exists
2048                  * _index_insert.
2049                  */
2050                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2051                                   MAY_WRITE | MAY_EXEC);
2052                 if (rc != -ENOENT)
2053                         RETURN(rc ? : -EEXIST);
2054
2055                 /* Permission is already being checked in mdd_lookup */
2056                 check_perm = false;
2057         }
2058
2059         if (S_ISDIR(cattr->la_mode) &&
2060             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2061             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2062                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2063
2064                 if (!lmv_user_magic_supported(le32_to_cpu(lum->lum_magic)) &&
2065                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2066                         rc = -EINVAL;
2067                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2068                                "stripe_offset = %d, stripe_count = %u: "
2069                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2070                                 le32_to_cpu(lum->lum_magic),
2071                                (int)le32_to_cpu(lum->lum_stripe_offset),
2072                                le32_to_cpu(lum->lum_stripe_count), rc);
2073                         return rc;
2074                 }
2075         }
2076
2077         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2078         if (rc != 0)
2079                 RETURN(rc);
2080
2081         /* sgid check */
2082         if (pattr->la_mode & S_ISGID) {
2083                 struct lu_ucred *uc = lu_ucred(env);
2084
2085                 cattr->la_gid = pattr->la_gid;
2086
2087                 /* Directories are special, and always inherit S_ISGID */
2088                 if (S_ISDIR(cattr->la_mode)) {
2089                         cattr->la_mode |= S_ISGID;
2090                         cattr->la_valid |= LA_MODE;
2091                 } else if ((cattr->la_mode & (S_ISGID | S_IXGRP))
2092                                 == (S_ISGID | S_IXGRP) &&
2093                            !lustre_in_group_p(uc,
2094                                               (cattr->la_valid & LA_GID) ?
2095                                               cattr->la_gid : pattr->la_gid) &&
2096                            !md_capable(uc, CAP_FSETID)) {
2097                         cattr->la_mode &= ~S_ISGID;
2098                         cattr->la_valid |= LA_MODE;
2099                 }
2100         }
2101
2102         /* Inherit project ID from parent directory */
2103         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2104                 cattr->la_projid = pattr->la_projid;
2105                 if (S_ISDIR(cattr->la_mode)) {
2106                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2107                         cattr->la_valid |= LA_FLAGS;
2108                 }
2109                 cattr->la_valid |= LA_PROJID;
2110         }
2111
2112         rc = mdd_name_check(m, lname);
2113         if (rc < 0)
2114                 RETURN(rc);
2115
2116         switch (cattr->la_mode & S_IFMT) {
2117         case S_IFLNK: {
2118                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2119
2120                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2121                         RETURN(-ENAMETOOLONG);
2122                 else
2123                         RETURN(0);
2124         }
2125         case S_IFDIR:
2126         case S_IFREG:
2127         case S_IFCHR:
2128         case S_IFBLK:
2129         case S_IFIFO:
2130         case S_IFSOCK:
2131                 rc = 0;
2132                 break;
2133         default:
2134                 rc = -EINVAL;
2135                 break;
2136         }
2137         RETURN(rc);
2138 }
2139
2140 static int mdd_declare_create_object(const struct lu_env *env,
2141                                      struct mdd_device *mdd,
2142                                      struct mdd_object *p, struct mdd_object *c,
2143                                      struct lu_attr *attr,
2144                                      struct thandle *handle,
2145                                      const struct md_op_spec *spec,
2146                                      struct lu_buf *def_acl_buf,
2147                                      struct lu_buf *acl_buf,
2148                                      struct lu_buf *hsm_buf,
2149                                      struct dt_allocation_hint *hint)
2150 {
2151         const struct lu_buf *buf;
2152         int rc;
2153
2154 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2155         /* ldiskfs OSD needs this information for credit allocation */
2156         if (def_acl_buf)
2157                 hint->dah_acl_len = def_acl_buf->lb_len;
2158 #endif
2159         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2160                                                 hint);
2161         if (rc)
2162                 GOTO(out, rc);
2163
2164 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2165         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2166                 /* if dir, then can inherit default ACl */
2167                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2168                                            XATTR_NAME_ACL_DEFAULT,
2169                                            0, handle);
2170                 if (rc)
2171                         GOTO(out, rc);
2172         }
2173
2174         if (acl_buf && acl_buf->lb_len > 0) {
2175                 rc = mdo_declare_attr_set(env, c, attr, handle);
2176                 if (rc)
2177                         GOTO(out, rc);
2178
2179                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2180                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2181                 if (rc)
2182                         GOTO(out, rc);
2183         }
2184 #endif
2185         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2186         if (rc)
2187                 GOTO(out, rc);
2188
2189         /* replay case, create LOV EA from client data */
2190         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2191             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2192                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2193                                         spec->u.sp_ea.eadatalen);
2194                 rc = mdo_declare_xattr_set(env, c, buf,
2195                                            S_ISDIR(attr->la_mode) ?
2196                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2197                                            LU_XATTR_CREATE, handle);
2198                 if (rc)
2199                         GOTO(out, rc);
2200
2201                 if (spec->sp_cr_flags & MDS_OPEN_PCC) {
2202                         rc = mdo_declare_xattr_set(env, c, hsm_buf,
2203                                                    XATTR_NAME_HSM,
2204                                                    0, handle);
2205                         if (rc)
2206                                 GOTO(out, rc);
2207                 }
2208         }
2209
2210         if (S_ISLNK(attr->la_mode)) {
2211                 const char *target_name = spec->u.sp_symname;
2212                 int sym_len = strlen(target_name);
2213                 const struct lu_buf *buf;
2214
2215                 buf = mdd_buf_get_const(env, target_name, sym_len);
2216                 rc = dt_declare_record_write(env, mdd_object_child(c),
2217                                              buf, 0, handle);
2218                 if (rc)
2219                         GOTO(out, rc);
2220         }
2221
2222         if (spec->sp_cr_file_secctx_name != NULL) {
2223                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2224                                         spec->sp_cr_file_secctx_size);
2225                 rc = mdo_declare_xattr_set(env, c, buf,
2226                                            spec->sp_cr_file_secctx_name, 0,
2227                                            handle);
2228                 if (rc < 0)
2229                         GOTO(out, rc);
2230         }
2231
2232         if (spec->sp_cr_file_encctx != NULL) {
2233                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2234                                         spec->sp_cr_file_encctx_size);
2235                 rc = mdo_declare_xattr_set(env, c, buf,
2236                                            LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2237                                            handle);
2238                 if (rc < 0)
2239                         GOTO(out, rc);
2240         }
2241 out:
2242         return rc;
2243 }
2244
2245 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2246                               struct mdd_object *p, struct mdd_object *c,
2247                               const struct lu_name *name,
2248                               struct lu_attr *attr,
2249                               struct thandle *handle,
2250                               const struct md_op_spec *spec,
2251                               struct linkea_data *ldata,
2252                               struct lu_buf *def_acl_buf,
2253                               struct lu_buf *acl_buf,
2254                               struct lu_buf *hsm_buf,
2255                               struct dt_allocation_hint *hint)
2256 {
2257         int rc;
2258
2259         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2260                                        def_acl_buf, acl_buf, hsm_buf, hint);
2261         if (rc)
2262                 GOTO(out, rc);
2263
2264         if (S_ISDIR(attr->la_mode)) {
2265                 rc = mdo_declare_ref_add(env, p, handle);
2266                 if (rc)
2267                         GOTO(out, rc);
2268         }
2269
2270         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2271                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2272                 if (rc)
2273                         GOTO(out, rc);
2274         } else {
2275                 struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2276                 enum changelog_rec_type type;
2277
2278                 rc = mdo_declare_index_insert(env, p, mdd_object_fid(c),
2279                                               attr->la_mode, name->ln_name,
2280                                               handle);
2281                 if (rc != 0)
2282                         return rc;
2283
2284                 rc = mdd_declare_links_add(env, c, handle, ldata);
2285                 if (rc)
2286                         return rc;
2287
2288                 *la = *attr;
2289                 la->la_valid = LA_CTIME | LA_MTIME;
2290                 rc = mdo_declare_attr_set(env, p, la, handle);
2291                 if (rc)
2292                         return rc;
2293
2294                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2295                        S_ISREG(attr->la_mode) ? CL_CREATE :
2296                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2297
2298                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2299                                                  handle);
2300                 if (rc)
2301                         return rc;
2302         }
2303 out:
2304         return rc;
2305 }
2306
2307 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2308                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2309                         struct lu_buf *acl_buf)
2310 {
2311         int     rc;
2312
2313         ENTRY;
2314
2315         if (S_ISLNK(la->la_mode)) {
2316                 acl_buf->lb_len = 0;
2317                 def_acl_buf->lb_len = 0;
2318                 RETURN(0);
2319         }
2320
2321         mdd_read_lock(env, pobj, DT_TGT_PARENT);
2322         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2323                            XATTR_NAME_ACL_DEFAULT);
2324         mdd_read_unlock(env, pobj);
2325         if (rc > 0) {
2326                 /* ACL buffer size is not enough, need realloc */
2327                 if (rc > acl_buf->lb_len)
2328                         RETURN(-ERANGE);
2329
2330                 /* If there are default ACL, fix mode/ACL by default ACL */
2331                 def_acl_buf->lb_len = rc;
2332                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2333                 acl_buf->lb_len = rc;
2334                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2335                 if (rc < 0)
2336                         RETURN(rc);
2337         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2338                 /* If there are no default ACL, fix mode by mask */
2339                 struct lu_ucred *uc = lu_ucred(env);
2340
2341                 /* The create triggered by MDT internal events, such as
2342                  * LFSCK reset, will not contain valid "uc". */
2343                 if (unlikely(uc != NULL))
2344                         la->la_mode &= ~uc->uc_umask;
2345                 rc = 0;
2346                 acl_buf->lb_len = 0;
2347                 def_acl_buf->lb_len = 0;
2348         }
2349
2350         RETURN(rc);
2351 }
2352
2353 /**
2354  * Create a metadata object and initialize it, set acl, xattr.
2355  **/
2356 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2357                              struct mdd_object *son, struct lu_attr *attr,
2358                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2359                              struct lu_buf *def_acl_buf,
2360                              struct lu_buf *hsm_buf,
2361                              struct dt_allocation_hint *hint,
2362                              struct thandle *handle, bool initsecctx)
2363 {
2364         const struct lu_buf *buf;
2365         int rc;
2366
2367         mdd_write_lock(env, son, DT_TGT_CHILD);
2368         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2369                                         hint);
2370         if (rc)
2371                 GOTO(unlock, rc);
2372
2373         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2374          * created in declare phase, they also needs to be added to master
2375          * object as sub-directory entry. So it has to initialize the master
2376          * object, then set dir striped EA.(in mdo_xattr_set) */
2377         rc = mdd_object_initialize(env, mdd_object_fid(pobj), son, attr,
2378                                    handle);
2379         if (rc != 0)
2380                 GOTO(err_destroy, rc);
2381
2382         /*
2383          * in case of replay we just set LOVEA provided by the client
2384          * XXX: I think it would be interesting to try "old" way where
2385          *      MDT calls this xattr_set(LOV) in a different transaction.
2386          *      probably this way we code can be made better.
2387          */
2388
2389         /* During creation, there are only a few cases we need do xattr_set to
2390          * create stripes.
2391          * 1. regular file: see comments above.
2392          * 2. dir: inherit default striping or pool settings from parent.
2393          * 3. create striped directory with provided stripeEA.
2394          * 4. create striped directory because inherit default layout from the
2395          * parent.
2396          */
2397         if (spec->no_create ||
2398             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2399             S_ISDIR(attr->la_mode)) {
2400                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2401                                         spec->u.sp_ea.eadatalen);
2402                 rc = mdo_xattr_set(env, son, buf,
2403                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2404                                                             XATTR_NAME_LOV,
2405                                    LU_XATTR_CREATE, handle);
2406                 if (rc != 0)
2407                         GOTO(err_destroy, rc);
2408         }
2409
2410         if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
2411                 struct md_hsm mh;
2412
2413                 memset(&mh, 0, sizeof(mh));
2414                 mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
2415                 mh.mh_arch_id = spec->sp_archive_id;
2416                 lustre_hsm2buf(hsm_buf->lb_buf, &mh);
2417                 rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
2418                                    0, handle);
2419                 if (rc != 0)
2420                         GOTO(err_destroy, rc);
2421         }
2422
2423 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2424         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2425             S_ISDIR(attr->la_mode)) {
2426                 /* set default acl */
2427                 rc = mdo_xattr_set(env, son, def_acl_buf,
2428                                    XATTR_NAME_ACL_DEFAULT, 0,
2429                                    handle);
2430                 if (rc)
2431                         GOTO(err_destroy, rc);
2432         }
2433         /* set its own acl */
2434         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2435                 rc = mdo_xattr_set(env, son, acl_buf,
2436                                    XATTR_NAME_ACL_ACCESS,
2437                                    0, handle);
2438                 if (rc)
2439                         GOTO(err_destroy, rc);
2440         }
2441 #endif
2442
2443         if (S_ISLNK(attr->la_mode)) {
2444                 struct dt_object *dt = mdd_object_child(son);
2445                 const char *target_name = spec->u.sp_symname;
2446                 int sym_len = strlen(target_name);
2447                 loff_t pos = 0;
2448
2449                 buf = mdd_buf_get_const(env, target_name, sym_len);
2450                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
2451                 if (rc == sym_len)
2452                         rc = 0;
2453                 else
2454                         GOTO(err_initlized, rc = -EFAULT);
2455         }
2456
2457         if (initsecctx && spec->sp_cr_file_secctx_name != NULL) {
2458                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2459                                         spec->sp_cr_file_secctx_size);
2460                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2461                                    0, handle);
2462                 if (rc < 0)
2463                         GOTO(err_initlized, rc);
2464         }
2465
2466         if (spec->sp_cr_file_encctx != NULL) {
2467                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2468                                         spec->sp_cr_file_encctx_size);
2469                 rc = mdo_xattr_set(env, son, buf,
2470                                    LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2471                                    handle);
2472                 if (rc < 0)
2473                         GOTO(err_initlized, rc);
2474         }
2475
2476 err_initlized:
2477         if (unlikely(rc != 0)) {
2478                 int rc2;
2479                 if (S_ISDIR(attr->la_mode)) {
2480                         /* Drop the reference, no need to delete "."/"..",
2481                          * because the object to be destroied directly. */
2482                         rc2 = mdo_ref_del(env, son, handle);
2483                         if (rc2 != 0)
2484                                 GOTO(unlock, rc);
2485                 }
2486                 rc2 = mdo_ref_del(env, son, handle);
2487                 if (rc2 != 0)
2488                         GOTO(unlock, rc);
2489 err_destroy:
2490                 mdo_destroy(env, son, handle);
2491         }
2492 unlock:
2493         mdd_write_unlock(env, son);
2494         RETURN(rc);
2495 }
2496
2497 static int mdd_index_delete(const struct lu_env *env,
2498                             struct mdd_object *mdd_pobj,
2499                             struct lu_attr *cattr,
2500                             const struct lu_name *lname)
2501 {
2502         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2503         struct thandle *handle;
2504         int rc;
2505         ENTRY;
2506
2507         handle = mdd_trans_create(env, mdd);
2508         if (IS_ERR(handle))
2509                 RETURN(PTR_ERR(handle));
2510
2511         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2512                                       handle);
2513         if (rc != 0)
2514                 GOTO(stop, rc);
2515
2516         if (S_ISDIR(cattr->la_mode)) {
2517                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2518                 if (rc != 0)
2519                         GOTO(stop, rc);
2520         }
2521
2522         /* Since this will only be used in the error handler path,
2523          * Let's set the thandle to be local and not mess the transno */
2524         handle->th_local = 1;
2525         rc = mdd_trans_start(env, mdd, handle);
2526         if (rc)
2527                 GOTO(stop, rc);
2528
2529         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2530                                 S_ISDIR(cattr->la_mode), handle);
2531         if (rc)
2532                 GOTO(stop, rc);
2533 stop:
2534         rc = mdd_trans_stop(env, mdd, rc, handle);
2535
2536         RETURN(rc);
2537 }
2538
2539 /**
2540  * Create object and insert it into namespace.
2541  *
2542  * Two operations have to be performed:
2543  *
2544  *  - an allocation of a new object (->do_create()), and
2545  *  - an insertion into a parent index (->dio_insert()).
2546  *
2547  * Due to locking, operation order is not important, when both are
2548  * successful, *but* error handling cases are quite different:
2549  *
2550  *  - if insertion is done first, and following object creation fails,
2551  *  insertion has to be rolled back, but this operation might fail
2552  *  also leaving us with dangling index entry.
2553  *
2554  *  - if creation is done first, is has to be undone if insertion fails,
2555  *  leaving us with leaked space, which is not good but not fatal.
2556  *
2557  * It seems that creation-first is simplest solution, but it is sub-optimal
2558  * in the frequent
2559  *
2560  * $ mkdir foo
2561  * $ mkdir foo
2562  *
2563  * case, because second mkdir is bound to create object, only to
2564  * destroy it immediately.
2565  *
2566  * To avoid this follow local file systems that do double lookup:
2567  *
2568  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2569  * 1. create            (mdd_create_object_internal())
2570  * 2. insert            (__mdd_index_insert(), lookup again)
2571  *
2572  * \param[in] pobj      parent object
2573  * \param[in] lname     name of child being created
2574  * \param[in,out] child child object being created
2575  * \param[in] spec      additional create parameters
2576  * \param[in] ma        attributes for new child object
2577  *
2578  * \retval              0 on success
2579  * \retval              negative errno on failure
2580  */
2581 int mdd_create(const struct lu_env *env, struct md_object *pobj,
2582                       const struct lu_name *lname, struct md_object *child,
2583                       struct md_op_spec *spec, struct md_attr *ma)
2584 {
2585         struct mdd_thread_info *info = mdd_env_info(env);
2586         struct lu_attr *la = &info->mdi_la_for_fix;
2587         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2588         struct mdd_object *son = md2mdd_obj(child);
2589         struct mdd_device *mdd = mdo2mdd(pobj);
2590         struct lu_attr *attr = &ma->ma_attr;
2591         struct thandle *handle;
2592         struct lu_attr *pattr = &info->mdi_pattr;
2593         struct lu_buf acl_buf;
2594         struct lu_buf def_acl_buf;
2595         struct lu_buf hsm_buf;
2596         struct linkea_data *ldata = &info->mdi_link_data;
2597         const char *name = lname->ln_name;
2598         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
2599         int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
2600         int rc, rc2;
2601
2602         ENTRY;
2603
2604         rc = mdd_la_get(env, mdd_pobj, pattr);
2605         if (rc != 0)
2606                 RETURN(rc);
2607
2608         /* Sanity checks before big job. */
2609         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2610         if (rc)
2611                 RETURN(rc);
2612
2613         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2614                 GOTO(out_free, rc = -EINPROGRESS);
2615
2616         handle = mdd_trans_create(env, mdd);
2617         if (IS_ERR(handle))
2618                 GOTO(out_free, rc = PTR_ERR(handle));
2619
2620 use_bigger_buffer:
2621         acl_buf = *lu_buf_check_and_alloc(&info->mdi_xattr_buf, acl_size);
2622         if (!acl_buf.lb_buf)
2623                 GOTO(out_stop, rc = -ENOMEM);
2624
2625         def_acl_buf = *lu_buf_check_and_alloc(&info->mdi_big_buf, acl_size);
2626         if (!def_acl_buf.lb_buf)
2627                 GOTO(out_stop, rc = -ENOMEM);
2628
2629         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2630         if (unlikely(rc == -ERANGE &&
2631                      acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
2632                 /* use maximum-sized xattr buffer for too-big default ACL */
2633                 acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
2634                                  XATTR_SIZE_MAX);
2635                 goto use_bigger_buffer;
2636         }
2637         if (rc < 0)
2638                 GOTO(out_stop, rc);
2639
2640         if (S_ISDIR(attr->la_mode)) {
2641                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
2642
2643                 /*
2644                  * migrate may create 1-stripe directory, so lod_ah_init()
2645                  * doesn't adjust stripe count from lmu.
2646                  */
2647                 if (lmu && lmu->lum_stripe_count == cpu_to_le32(1))
2648                         lmu->lum_stripe_count = 0;
2649         }
2650
2651         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2652
2653         memset(ldata, 0, sizeof(*ldata));
2654         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2655                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2656
2657                 tfid.f_oid--;
2658                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2659                                         &tfid, lname, 1, 0, ldata);
2660         } else {
2661                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2662                                         mdd_object_fid(mdd_pobj),
2663                                         lname, 1, 0, ldata);
2664         }
2665
2666         if (spec->sp_cr_flags & MDS_OPEN_PCC) {
2667                 LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
2668
2669                 memset(&hsm_buf, 0, sizeof(hsm_buf));
2670                 lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
2671                 if (hsm_buf.lb_buf == NULL)
2672                         GOTO(out_stop, rc = -ENOMEM);
2673         }
2674
2675         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2676                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2677                                 &hsm_buf, hint);
2678         if (rc)
2679                 GOTO(out_stop, rc);
2680
2681         rc = mdd_trans_start(env, mdd, handle);
2682         if (rc)
2683                 GOTO(out_stop, rc);
2684
2685         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2686                                &def_acl_buf, &hsm_buf, hint, handle, true);
2687         if (rc != 0)
2688                 GOTO(out_stop, rc);
2689
2690         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2691                 mdd_write_lock(env, son, DT_TGT_CHILD);
2692                 son->mod_flags |= VOLATILE_OBJ;
2693                 rc = mdd_orphan_insert(env, son, handle);
2694                 GOTO(out_volatile, rc);
2695         } else {
2696                 rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(son),
2697                                         attr->la_mode, name, handle);
2698                 if (rc != 0)
2699                         GOTO(err_created, rc);
2700
2701                 mdd_links_add(env, son, mdd_object_fid(mdd_pobj), lname,
2702                               handle, ldata, 1);
2703
2704                 /* update parent directory mtime/ctime */
2705                 *la = *attr;
2706                 la->la_valid = LA_CTIME | LA_MTIME;
2707                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2708                 if (rc)
2709                         GOTO(err_insert, rc);
2710         }
2711
2712         EXIT;
2713 err_insert:
2714         if (rc != 0) {
2715                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2716                         rc2 = mdd_orphan_delete(env, son, handle);
2717                 else
2718                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2719                                                  S_ISDIR(attr->la_mode),
2720                                                  handle);
2721                 if (rc2 != 0)
2722                         goto out_stop;
2723
2724 err_created:
2725                 mdd_write_lock(env, son, DT_TGT_CHILD);
2726                 if (S_ISDIR(attr->la_mode)) {
2727                         /* Drop the reference, no need to delete "."/"..",
2728                          * because the object is to be destroyed directly. */
2729                         rc2 = mdo_ref_del(env, son, handle);
2730                         if (rc2 != 0) {
2731                                 mdd_write_unlock(env, son);
2732                                 goto out_stop;
2733                         }
2734                 }
2735 out_volatile:
2736                 /* For volatile files drop one link immediately, since there is
2737                  * no filename in the namespace, and save any error returned. */
2738                 rc2 = mdo_ref_del(env, son, handle);
2739                 if (rc2 != 0) {
2740                         mdd_write_unlock(env, son);
2741                         if (unlikely(rc == 0))
2742                                 rc = rc2;
2743                         goto out_stop;
2744                 }
2745
2746                 /* Don't destroy the volatile object on success */
2747                 if (likely(rc != 0))
2748                         mdo_destroy(env, son, handle);
2749                 mdd_write_unlock(env, son);
2750         }
2751
2752         if (rc == 0 && fid_is_namespace_visible(mdd_object_fid(son)) &&
2753             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2754                 rc = mdd_changelog_ns_store(env, mdd,
2755                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2756                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2757                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2758                                 0, son, mdd_object_fid(mdd_pobj), NULL, NULL,
2759                                 lname, NULL, handle);
2760 out_stop:
2761         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2762         if (rc == 0) {
2763                 /* If creation fails, it is most likely due to the remote update
2764                  * failure, because local transaction will mostly succeed at
2765                  * this stage. There is no easy way to rollback all of previous
2766                  * updates, so let's remove the object from namespace, and
2767                  * LFSCK should handle the orphan object. */
2768                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2769                         mdd_index_delete(env, mdd_pobj, attr, lname);
2770                 rc = rc2;
2771         }
2772 out_free:
2773         if (is_vmalloc_addr(ldata->ld_buf))
2774                 /* if we vmalloced a large buffer drop it */
2775                 lu_buf_free(ldata->ld_buf);
2776
2777         if (spec->sp_cr_flags & MDS_OPEN_PCC)
2778                 lu_buf_free(&hsm_buf);
2779
2780         /* The child object shouldn't be cached anymore */
2781         if (rc)
2782                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2783                         &child->mo_lu.lo_header->loh_flags);
2784         return rc;
2785 }
2786
2787 /* has not mdd_write{read}_lock on any obj yet. */
2788 static int mdd_rename_sanity_check(const struct lu_env *env,
2789                                    struct mdd_object *src_pobj,
2790                                    const struct lu_attr *pattr,
2791                                    struct mdd_object *tgt_pobj,
2792                                    const struct lu_attr *tpattr,
2793                                    struct mdd_object *sobj,
2794                                    const struct lu_attr *cattr,
2795                                    struct mdd_object *tobj,
2796                                    const struct lu_attr *tattr)
2797 {
2798         int rc = 0;
2799         ENTRY;
2800
2801         /* XXX: when get here, sobj must NOT be NULL,
2802          * the other case has been processed in cld_rename
2803          * before mdd_rename and enable MDS_PERM_BYPASS. */
2804         LASSERT(sobj);
2805
2806         /*
2807          * If we are using project inheritance, we only allow renames
2808          * into our tree when the project IDs are the same; otherwise
2809          * tree quota mechanism would be circumvented.
2810          */
2811         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2812             tpattr->la_projid != cattr->la_projid) ||
2813             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2814             (pattr->la_projid != tpattr->la_projid)))
2815                 RETURN(-EXDEV);
2816
2817         /* we prevent an encrypted file from being renamed
2818          * into an unencrypted dir
2819          */
2820         if ((cattr->la_valid & LA_FLAGS &&
2821              cattr->la_flags & LUSTRE_ENCRYPT_FL) &&
2822             !(tpattr->la_valid & LA_FLAGS &&
2823               tpattr->la_flags & LUSTRE_ENCRYPT_FL))
2824                 RETURN(-EXDEV);
2825
2826         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2827         if (rc)
2828                 RETURN(rc);
2829
2830         /* XXX: when get here, "tobj == NULL" means tobj must
2831          * NOT exist (neither on remote MDS, such case has been
2832          * processed in cld_rename before mdd_rename and enable
2833          * MDS_PERM_BYPASS).
2834          * So check may_create, but not check may_unlink. */
2835         if (tobj == NULL)
2836                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2837                                     (src_pobj != tgt_pobj));
2838         else
2839                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2840                                     (src_pobj != tgt_pobj), 1);
2841
2842         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2843                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2844
2845         RETURN(rc);
2846 }
2847
2848 static int mdd_declare_rename(const struct lu_env *env,
2849                               struct mdd_device *mdd,
2850                               struct mdd_object *mdd_spobj,
2851                               struct mdd_object *mdd_tpobj,
2852                               struct mdd_object *mdd_sobj,
2853                               struct mdd_object *mdd_tobj,
2854                               const struct lu_name *sname,
2855                               const struct lu_name *tname,
2856                               struct md_attr *ma,
2857                               struct linkea_data *ldata,
2858                               struct thandle *handle)
2859 {
2860         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2861         int rc;
2862
2863         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2864         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2865
2866         LASSERT(mdd_spobj);
2867         LASSERT(mdd_tpobj);
2868         LASSERT(mdd_sobj);
2869
2870         /* name from source dir */
2871         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2872         if (rc)
2873                 return rc;
2874
2875         /* .. from source child */
2876         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2877                 /* source child can be directory, count by source dir's nlink */
2878                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2879                 if (rc)
2880                         return rc;
2881                 if (mdd_spobj != mdd_tpobj) {
2882                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2883                                                       handle);
2884                         if (rc != 0)
2885                                 return rc;
2886
2887                         rc = mdo_declare_index_insert(env, mdd_sobj,
2888                                                       mdd_object_fid(mdd_tpobj),
2889                                                       S_IFDIR, dotdot, handle);
2890                         if (rc != 0)
2891                                 return rc;
2892                 }
2893
2894                 /* new target child can be directory,
2895                  * counted by target dir's nlink */
2896                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2897                 if (rc != 0)
2898                         return rc;
2899         }
2900
2901         la->la_valid = LA_CTIME | LA_MTIME;
2902         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2903         if (rc != 0)
2904                 return rc;
2905
2906         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2907         if (rc != 0)
2908                 return rc;
2909
2910         la->la_valid = LA_CTIME;
2911         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2912         if (rc)
2913                 return rc;
2914
2915         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2916         if (rc)
2917                 return rc;
2918
2919         /* new name */
2920         rc = mdo_declare_index_insert(env, mdd_tpobj, mdd_object_fid(mdd_sobj),
2921                                       mdd_object_type(mdd_sobj),
2922                                       tname->ln_name, handle);
2923         if (rc != 0)
2924                 return rc;
2925
2926         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2927                 /* delete target child in target parent directory */
2928                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2929                                               handle);
2930                 if (rc)
2931                         return rc;
2932
2933                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2934                 if (rc)
2935                         return rc;
2936
2937                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2938                         /* target child can be directory,
2939                          * delete "." reference in target child directory */
2940                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2941                         if (rc)
2942                                 return rc;
2943
2944                         /* delete ".." reference in target parent directory */
2945                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2946                         if (rc)
2947                                 return rc;
2948                 }
2949
2950                 la->la_valid = LA_CTIME;
2951                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2952                 if (rc)
2953                         return rc;
2954
2955                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2956                 if (rc)
2957                         return rc;
2958         }
2959
2960         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2961                                          handle);
2962         if (rc)
2963                 return rc;
2964
2965         return rc;
2966 }
2967
2968 static int mdd_migrate_object(const struct lu_env *env,
2969                               struct mdd_object *spobj,
2970                               struct mdd_object *tpobj,
2971                               struct mdd_object *sobj,
2972                               struct mdd_object *tobj,
2973                               const struct lu_name *sname,
2974                               const struct lu_name *tname,
2975                               struct md_op_spec *spec,
2976                               struct md_attr *ma);
2977
2978 /* src object can be remote that is why we use only fid and type of object */
2979 static int mdd_rename(const struct lu_env *env,
2980                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2981                       const struct lu_fid *lf, const struct lu_name *lsname,
2982                       struct md_object *tobj, const struct lu_name *ltname,
2983                       struct md_attr *ma)
2984 {
2985         const char *sname = lsname->ln_name;
2986         const char *tname = ltname->ln_name;
2987         struct lu_attr    *la = &mdd_env_info(env)->mdi_la_for_fix;
2988         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2989         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2990         struct mdd_device *mdd = mdo2mdd(src_pobj);
2991         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2992         struct mdd_object *mdd_tobj = NULL;
2993         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2994         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2995         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2996         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2997         struct thandle *handle;
2998         struct linkea_data  *ldata = &mdd_env_info(env)->mdi_link_data;
2999         const struct lu_fid *tpobj_fid = mdd_object_fid(mdd_tpobj);
3000         const struct lu_fid *spobj_fid = mdd_object_fid(mdd_spobj);
3001         bool is_dir;
3002         bool tobj_ref = 0;
3003         bool tobj_locked = 0;
3004         unsigned cl_flags = 0;
3005         int rc, rc2;
3006         ENTRY;
3007
3008         /* let unlink to complete and commit */
3009         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
3010
3011         if (tobj)
3012                 mdd_tobj = md2mdd_obj(tobj);
3013
3014         mdd_sobj = mdd_object_find(env, mdd, lf);
3015         if (IS_ERR(mdd_sobj))
3016                 RETURN(PTR_ERR(mdd_sobj));
3017
3018         rc = mdd_la_get(env, mdd_sobj, cattr);
3019         if (rc)
3020                 GOTO(out_pending, rc);
3021
3022         /* if rename is cross MDTs, migrate symlink if it doesn't have other
3023          * hard links, and target doesn't exist.
3024          */
3025         if (mdd_object_remote(mdd_sobj) && S_ISLNK(cattr->la_mode) &&
3026             cattr->la_nlink == 1 && !tobj) {
3027                 struct md_op_spec *spec = &mdd_env_info(env)->mdi_spec;
3028                 struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
3029                 struct lu_fid tfid;
3030
3031                 rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
3032                                                NULL);
3033                 if (rc < 0)
3034                         GOTO(out_pending, rc);
3035
3036                 mdd_tobj = mdd_object_find(env, mdd, &tfid);
3037                 if (IS_ERR(mdd_tobj))
3038                         GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
3039
3040                 memset(spec, 0, sizeof(*spec));
3041                 rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
3042                                         mdd_tobj, lsname, ltname, spec, ma);
3043                 mdd_object_put(env, mdd_tobj);
3044                 GOTO(out_pending, rc);
3045         }
3046
3047         rc = mdd_la_get(env, mdd_spobj, pattr);
3048         if (rc)
3049                 GOTO(out_pending, rc);
3050
3051         if (mdd_tobj) {
3052                 rc = mdd_la_get(env, mdd_tobj, tattr);
3053                 if (rc)
3054                         GOTO(out_pending, rc);
3055                 /* search for an existing archive.
3056                  * we should check ahead as the object
3057                  * can be destroyed in this transaction */
3058                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
3059                         cl_flags |= CLF_RENAME_LAST_EXISTS;
3060         }
3061
3062         rc = mdd_la_get(env, mdd_tpobj, tpattr);
3063         if (rc)
3064                 GOTO(out_pending, rc);
3065
3066         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
3067                                      mdd_sobj, cattr, mdd_tobj, tattr);
3068         if (rc)
3069                 GOTO(out_pending, rc);
3070
3071         rc = mdd_name_check(mdd, ltname);
3072         if (rc < 0)
3073                 GOTO(out_pending, rc);
3074
3075         handle = mdd_trans_create(env, mdd);
3076         if (IS_ERR(handle))
3077                 GOTO(out_pending, rc = PTR_ERR(handle));
3078
3079         memset(ldata, 0, sizeof(*ldata));
3080         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3081                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
3082                                 1, 0, ldata);
3083         if (rc)
3084                 GOTO(stop, rc);
3085
3086         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
3087                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
3088         if (rc)
3089                 GOTO(stop, rc);
3090
3091         rc = mdd_trans_start(env, mdd, handle);
3092         if (rc)
3093                 GOTO(stop, rc);
3094
3095         is_dir = S_ISDIR(cattr->la_mode);
3096
3097         /* Remove source name from source directory */
3098         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
3099         if (rc != 0)
3100                 GOTO(stop, rc);
3101
3102         /* "mv dir1 dir2" needs "dir1/.." link update */
3103         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
3104                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3105                 if (rc != 0)
3106                         GOTO(fixup_spobj2, rc);
3107
3108                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
3109                                              dotdot, handle);
3110                 if (rc != 0)
3111                         GOTO(fixup_spobj, rc);
3112         }
3113
3114         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
3115                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3116                 if (rc != 0)
3117                         /* tname might been renamed to something else */
3118                         GOTO(fixup_spobj, rc);
3119         }
3120
3121         /* Insert new fid with target name into target dir */
3122         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
3123                                 tname, handle);
3124         if (rc != 0)
3125                 GOTO(fixup_tpobj, rc);
3126
3127         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3128         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3129
3130         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3131         la->la_valid = LA_CTIME;
3132         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3133         if (rc)
3134                 GOTO(fixup_tpobj, rc);
3135
3136         /* Update the linkEA for the source object */
3137         mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
3138         rc = mdd_links_rename(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3139                               lsname, mdd_object_fid(mdd_tpobj), ltname,
3140                               handle, ldata, 0, 0);
3141         if (rc == -ENOENT)
3142                 /* Old files might not have EA entry */
3143                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3144                               lsname, handle, NULL, 0);
3145         mdd_write_unlock(env, mdd_sobj);
3146         /* We don't fail the transaction if the link ea can't be
3147            updated -- fid2path will use alternate lookup method. */
3148         rc = 0;
3149
3150         /* Remove old target object
3151          * For tobj is remote case cmm layer has processed
3152          * and set tobj to NULL then. So when tobj is NOT NULL,
3153          * it must be local one.
3154          */
3155         if (tobj && mdd_object_exists(mdd_tobj)) {
3156                 mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
3157                 tobj_locked = 1;
3158                 if (mdd_is_dead_obj(mdd_tobj)) {
3159                         /* shld not be dead, something is wrong */
3160                         CERROR("tobj is dead, something is wrong\n");
3161                         rc = -EINVAL;
3162                         goto cleanup;
3163                 }
3164                 mdo_ref_del(env, mdd_tobj, handle);
3165
3166                 /* Remove dot reference. */
3167                 if (S_ISDIR(tattr->la_mode))
3168                         mdo_ref_del(env, mdd_tobj, handle);
3169                 tobj_ref = 1;
3170
3171                 /* fetch updated nlink */
3172                 rc = mdd_la_get(env, mdd_tobj, tattr);
3173                 if (rc != 0) {
3174                         CERROR("%s: Failed to get nlink for tobj "
3175                                 DFID": rc = %d\n",
3176                                 mdd2obd_dev(mdd)->obd_name,
3177                                 PFID(tpobj_fid), rc);
3178                         GOTO(fixup_tpobj, rc);
3179                 }
3180
3181                 la->la_valid = LA_CTIME;
3182                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3183                 if (rc != 0) {
3184                         CERROR("%s: Failed to set ctime for tobj "
3185                                 DFID": rc = %d\n",
3186                                 mdd2obd_dev(mdd)->obd_name,
3187                                 PFID(tpobj_fid), rc);
3188                         GOTO(fixup_tpobj, rc);
3189                 }
3190
3191                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3192                 ma->ma_attr = *tattr;
3193                 ma->ma_valid |= MA_INODE;
3194                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3195                                        handle);
3196                 if (rc != 0) {
3197                         CERROR("%s: Failed to unlink tobj "
3198                                 DFID": rc = %d\n",
3199                                 mdd2obd_dev(mdd)->obd_name,
3200                                 PFID(tpobj_fid), rc);
3201                         GOTO(fixup_tpobj, rc);
3202                 }
3203
3204                 /* fetch updated nlink */
3205                 rc = mdd_la_get(env, mdd_tobj, tattr);
3206                 if (rc == -ENOENT) {
3207                         /* the object got removed, let's
3208                          * return the latest known attributes */
3209                         tattr->la_nlink = 0;
3210                         rc = 0;
3211                 } else if (rc != 0) {
3212                         CERROR("%s: Failed to get nlink for tobj "
3213                                 DFID": rc = %d\n",
3214                                 mdd2obd_dev(mdd)->obd_name,
3215                                 PFID(tpobj_fid), rc);
3216                         GOTO(fixup_tpobj, rc);
3217                 }
3218                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3219                 ma->ma_attr = *tattr;
3220                 ma->ma_valid |= MA_INODE;
3221
3222                 if (tattr->la_nlink == 0)
3223                         cl_flags |= CLF_RENAME_LAST;
3224                 else
3225                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3226         }
3227
3228         la->la_valid = LA_CTIME | LA_MTIME;
3229         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3230         if (rc)
3231                 GOTO(fixup_tpobj, rc);
3232
3233         if (mdd_spobj != mdd_tpobj) {
3234                 la->la_valid = LA_CTIME | LA_MTIME;
3235                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3236                 if (rc != 0)
3237                         GOTO(fixup_tpobj, rc);
3238         }
3239
3240         EXIT;
3241
3242 fixup_tpobj:
3243         if (rc) {
3244                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3245                 if (rc2)
3246                         CWARN("tp obj fix error %d\n",rc2);
3247
3248                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3249                     !mdd_is_dead_obj(mdd_tobj)) {
3250                         if (tobj_ref) {
3251                                 mdo_ref_add(env, mdd_tobj, handle);
3252                                 if (is_dir)
3253                                         mdo_ref_add(env, mdd_tobj, handle);
3254                         }
3255
3256                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3257                                                  mdd_object_fid(mdd_tobj),
3258                                                  mdd_object_type(mdd_tobj),
3259                                                  tname, handle);
3260                         if (rc2 != 0)
3261                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3262                 }
3263         }
3264
3265 fixup_spobj:
3266         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3267                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3268                 if (rc2)
3269                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3270                                mdd2obd_dev(mdd)->obd_name, rc2);
3271
3272
3273                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3274                                               dotdot, handle);
3275                 if (rc2 != 0)
3276                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3277                               mdd2obd_dev(mdd)->obd_name, rc2);
3278         }
3279
3280 fixup_spobj2:
3281         if (rc != 0) {
3282                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3283                                          mdd_object_type(mdd_sobj), sname,
3284                                          handle);
3285                 if (rc2 != 0)
3286                         CWARN("sp obj fix error: rc = %d\n", rc2);
3287         }
3288
3289 cleanup:
3290         if (tobj_locked)
3291                 mdd_write_unlock(env, mdd_tobj);
3292
3293         if (rc == 0)
3294                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3295                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3296                                             ltname, lsname, handle);
3297
3298 stop:
3299         rc = mdd_trans_stop(env, mdd, rc, handle);
3300
3301 out_pending:
3302         mdd_object_put(env, mdd_sobj);
3303         return rc;
3304 }
3305
3306 /**
3307  * Check whether we should migrate the file/dir
3308  * return val
3309  *      < 0  permission check failed or other error.
3310  *      = 0  the file can be migrated.
3311  **/
3312 static int mdd_migrate_sanity_check(const struct lu_env *env,
3313                                     struct mdd_device *mdd,
3314                                     struct mdd_object *spobj,
3315                                     struct mdd_object *tpobj,
3316                                     struct mdd_object *sobj,
3317                                     struct mdd_object *tobj,
3318                                     const struct lu_attr *spattr,
3319                                     const struct lu_attr *tpattr,
3320                                     const struct lu_attr *attr)
3321 {
3322         int rc;
3323
3324         ENTRY;
3325
3326         if (!mdd_object_remote(sobj)) {
3327                 mdd_read_lock(env, sobj, DT_SRC_CHILD);
3328                 if (sobj->mod_count > 0) {
3329                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3330                                mdd_obj_dev_name(sobj),
3331                                PFID(mdd_object_fid(sobj)),
3332                                sobj->mod_count);
3333                         mdd_read_unlock(env, sobj);
3334                         RETURN(-EBUSY);
3335                 }
3336                 mdd_read_unlock(env, sobj);
3337         }
3338
3339         if (mdd_object_exists(tobj))
3340                 RETURN(-EEXIST);
3341
3342         rc = mdd_may_delete(env, spobj, spattr, sobj, attr, NULL, 1, 0);
3343         if (rc)
3344                 RETURN(rc);
3345
3346         rc = mdd_may_create(env, tpobj, tpattr, NULL, true);
3347
3348         RETURN(rc);
3349 }
3350
3351 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
3352                             struct mdd_object *obj,
3353                             const struct lu_buf *buf,
3354                             const char *name,
3355                             int fl, struct thandle *handle);
3356
3357 /* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
3358 static int mdd_iterate_xattrs(const struct lu_env *env,
3359                               struct mdd_object *sobj,
3360                               struct mdd_object *tobj,
3361                               bool skip_linkea,
3362                               struct thandle *handle,
3363                               mdd_xattr_cb cb)
3364 {
3365         struct mdd_thread_info *info = mdd_env_info(env);
3366         char *xname;
3367         struct lu_buf list_xbuf;
3368         struct lu_buf cbxbuf;
3369         struct lu_buf xbuf = { NULL };
3370         int list_xsize;
3371         int xlen;
3372         int rem;
3373         int xsize;
3374         int rc;
3375
3376         ENTRY;
3377
3378         /* retrieve xattr list from the old object */
3379         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3380         if (list_xsize == -ENODATA)
3381                 RETURN(0);
3382
3383         if (list_xsize < 0)
3384                 RETURN(list_xsize);
3385
3386         lu_buf_check_and_alloc(&info->mdi_big_buf, list_xsize);
3387         if (info->mdi_big_buf.lb_buf == NULL)
3388                 RETURN(-ENOMEM);
3389
3390         list_xbuf.lb_buf = info->mdi_big_buf.lb_buf;
3391         list_xbuf.lb_len = list_xsize;
3392         rc = mdo_xattr_list(env, sobj, &list_xbuf);
3393         if (rc < 0)
3394                 RETURN(rc);
3395
3396         rem = rc;
3397         rc = 0;
3398         xname = list_xbuf.lb_buf;
3399         while (rem > 0) {
3400                 xlen = strnlen(xname, rem - 1) + 1;
3401                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3402                     strcmp(XATTR_NAME_LMV, xname) == 0)
3403                         goto next;
3404
3405                 if (skip_linkea &&
3406                     strcmp(XATTR_NAME_LINK, xname) == 0)
3407                         goto next;
3408
3409                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3410                 if (xsize == -ENODATA)
3411                         goto next;
3412                 if (xsize < 0)
3413                         GOTO(out, rc = xsize);
3414
3415                 lu_buf_check_and_alloc(&xbuf, xsize);
3416                 if (xbuf.lb_buf == NULL)
3417                         GOTO(out, rc = -ENOMEM);
3418
3419                 rc = mdo_xattr_get(env, sobj, &xbuf, xname);
3420                 if (rc == -ENODATA)
3421                         goto next;
3422                 if (rc < 0)
3423                         GOTO(out, rc);
3424
3425                 cbxbuf = xbuf;
3426                 cbxbuf.lb_len = xsize;
3427 repeat:
3428                 rc = cb(env, tobj, &cbxbuf, xname, 0, handle);
3429                 if (unlikely(rc == -ENOSPC &&
3430                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3431                         rc = linkea_overflow_shrink(
3432                                         (struct linkea_data *)(cbxbuf.lb_buf));
3433                         if (likely(rc > 0)) {
3434                                 cbxbuf.lb_len = rc;
3435                                 goto repeat;
3436                         }
3437                 }
3438
3439                 if (rc)
3440                         GOTO(out, rc);
3441 next:
3442                 xname += xlen;
3443                 rem -= xlen;
3444         }
3445
3446 out:
3447         lu_buf_free(&xbuf);
3448         RETURN(rc);
3449 }
3450
3451 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
3452                              struct mdd_object *sobj,
3453                              struct mdd_object *tobj,
3454                              const struct lu_name *sname,
3455                              const struct lu_fid *sfid,
3456                              const struct lu_name *lname,
3457                              const struct lu_fid *fid,
3458                              void *opaque,
3459                              struct thandle *handle);
3460
3461 static int mdd_declare_update_link(const struct lu_env *env,
3462                                    struct mdd_object *sobj,
3463                                    struct mdd_object *tobj,
3464                                    const struct lu_name *tname,
3465                                    const struct lu_fid *tpfid,
3466                                    const struct lu_name *lname,
3467                                    const struct lu_fid *fid,
3468                                    void *unused,
3469                                    struct thandle *handle)
3470 {
3471         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3472         struct mdd_object *pobj;
3473         int rc;
3474
3475         /* ignore tobj */
3476         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3477             !strcmp(tname->ln_name, lname->ln_name))
3478                 return 0;
3479
3480         pobj = mdd_object_find(env, mdd, fid);
3481         if (IS_ERR(pobj))
3482                 return PTR_ERR(pobj);
3483
3484
3485         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
3486         if (!rc)
3487                 rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
3488                                               mdd_object_type(sobj),
3489                                               lname->ln_name, handle);
3490         mdd_object_put(env, pobj);
3491         if (rc)
3492                 return rc;
3493
3494         rc = mdo_declare_ref_add(env, tobj, handle);
3495         if (rc)
3496                 return rc;
3497
3498         rc = mdo_declare_ref_del(env, sobj, handle);
3499         return rc;
3500 }
3501
3502 static int mdd_update_link(const struct lu_env *env,
3503                            struct mdd_object *sobj,
3504                            struct mdd_object *tobj,
3505                            const struct lu_name *tname,
3506                            const struct lu_fid *tpfid,
3507                            const struct lu_name *lname,
3508                            const struct lu_fid *fid,
3509                            void *unused,
3510                            struct thandle *handle)
3511 {
3512         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3513         struct mdd_object *pobj;
3514         int rc;
3515
3516         ENTRY;
3517
3518         LASSERT(lu_name_is_valid(lname));
3519
3520         /* ignore tobj */
3521         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3522             !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
3523                 RETURN(0);
3524
3525         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
3526                PFID(fid), PNAME(lname), PFID(mdd_object_fid(tobj)));
3527
3528         pobj = mdd_object_find(env, mdd, fid);
3529         if (IS_ERR(pobj)) {
3530                 CWARN("%s: cannot find obj "DFID": %ld\n",
3531                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
3532                 RETURN(PTR_ERR(pobj));
3533         }
3534
3535         if (!mdd_object_exists(pobj)) {
3536                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
3537                 mdd_object_put(env, pobj);
3538                 RETURN(-ENOENT);
3539         }
3540
3541         mdd_write_lock(env, pobj, DT_TGT_PARENT);
3542         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
3543         if (!rc)
3544                 rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(tobj),
3545                                              mdd_object_type(sobj),
3546                                              lname->ln_name, handle);
3547         mdd_write_unlock(env, pobj);
3548         mdd_object_put(env, pobj);
3549         if (rc)
3550                 RETURN(rc);
3551
3552         mdd_write_lock(env, tobj, DT_TGT_CHILD);
3553         rc = mdo_ref_add(env, tobj, handle);
3554         mdd_write_unlock(env, tobj);
3555         if (rc)
3556                 RETURN(rc);
3557
3558         mdd_write_lock(env, sobj, DT_SRC_CHILD);
3559         rc = mdo_ref_del(env, sobj, handle);
3560         mdd_write_unlock(env, sobj);
3561
3562         RETURN(rc);
3563 }
3564
3565 static inline int mdd_fld_lookup(const struct lu_env *env,
3566                                  struct mdd_device *mdd,
3567                                  const struct lu_fid *fid,
3568                                  __u32 *mdt_index)
3569 {
3570         struct lu_seq_range *range = &mdd_env_info(env)->mdi_range;
3571         struct seq_server_site *ss;
3572         int rc;
3573
3574         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
3575
3576         range->lsr_flags = LU_SEQ_RANGE_MDT;
3577         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
3578         if (rc)
3579                 return rc;
3580
3581         *mdt_index = range->lsr_index;
3582
3583         return 0;
3584 }
3585
3586 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
3587                                      struct mdd_object *sobj,
3588                                      struct mdd_object *tobj,
3589                                      const struct lu_name *tname,
3590                                      const struct lu_fid *tpfid,
3591                                      const struct lu_name *lname,
3592                                      const struct lu_fid *fid,
3593                                      void *opaque,
3594                                      struct thandle *handle)
3595 {
3596         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3597         __u32 source_mdt_index = *(__u32 *)opaque;
3598         __u32 link_mdt_index;
3599         int rc;
3600
3601         ENTRY;
3602
3603         /* ignore tobj */
3604         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3605             !strcmp(tname->ln_name, lname->ln_name))
3606                 return 0;
3607
3608         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
3609         if (rc)
3610                 RETURN(rc);
3611
3612         RETURN(link_mdt_index == source_mdt_index);
3613 }
3614
3615 static int mdd_iterate_linkea(const struct lu_env *env,
3616                               struct mdd_object *sobj,
3617                               struct mdd_object *tobj,
3618                               const struct lu_name *tname,
3619                               const struct lu_fid *tpfid,
3620                               struct linkea_data *ldata,
3621                               void *opaque,
3622                               struct thandle *handle,
3623                               mdd_linkea_cb cb)
3624 {
3625         struct mdd_thread_info *info = mdd_env_info(env);
3626         char *filename = info->mdi_name;
3627         struct lu_name lname;
3628         struct lu_fid fid;
3629         int rc = 0;
3630
3631         if (!ldata->ld_buf)
3632                 return 0;
3633
3634         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
3635              linkea_next_entry(ldata)) {
3636                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
3637                                     &fid);
3638
3639                 /* Note: lname might miss \0 at the end */
3640                 snprintf(filename, sizeof(info->mdi_name), "%.*s",
3641                          lname.ln_namelen, lname.ln_name);
3642                 lname.ln_name = filename;
3643
3644                 CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
3645
3646                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
3647                         handle);
3648         }
3649
3650         return rc;
3651 }
3652
3653 /**
3654  * Prepare linkea, and check whether file needs migrate: if source still has
3655  * link on source MDT, no need to migrate, just update namespace on source and
3656  * target parents.
3657  *
3658  * \retval      0 do migrate
3659  * \retval      1 don't migrate
3660  * \retval      -errno on failure
3661  */
3662 static int mdd_migrate_linkea_prepare(const struct lu_env *env,
3663                                       struct mdd_device *mdd,
3664                                       struct mdd_object *spobj,
3665                                       struct mdd_object *tpobj,
3666                                       struct mdd_object *sobj,
3667                                       const struct lu_name *sname,
3668                                       const struct lu_name *tname,
3669                                       const struct lu_attr *attr,
3670                                       struct linkea_data *ldata)
3671 {
3672         __u32 source_mdt_index;
3673         int rc;
3674
3675         ENTRY;
3676
3677         memset(ldata, 0, sizeof(*ldata));
3678         rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
3679                                 mdd_object_fid(tpobj), tname, 1, 0, ldata);
3680         if (rc)
3681                 RETURN(rc);
3682
3683         /*
3684          * Then it will check if the file should be migrated. If the file has
3685          * mulitple links, we only need migrate the file if all of its entries
3686          * has been migrated to the remote MDT.
3687          */
3688         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
3689                 RETURN(0);
3690
3691         /* If there are still links locally, don't migrate this file */
3692         LASSERT(ldata->ld_leh != NULL);
3693
3694         /*
3695          * If linkEA is overflow, it means there are some unknown name entries
3696          * under unknown parents, which will prevent the migration.
3697          */
3698         if (unlikely(ldata->ld_leh->leh_overflow_time))
3699                 RETURN(-EOVERFLOW);
3700
3701         rc = mdd_fld_lookup(env, mdd, mdd_object_fid(sobj), &source_mdt_index);
3702         if (rc)
3703                 RETURN(rc);
3704
3705         rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
3706                                 ldata, &source_mdt_index, NULL,
3707                                 mdd_is_link_on_source_mdt);
3708         RETURN(rc);
3709 }
3710
3711 static int mdd_declare_migrate_update(const struct lu_env *env,
3712                                       struct mdd_object *spobj,
3713                                       struct mdd_object *tpobj,
3714                                       struct mdd_object *obj,
3715                                       const struct lu_name *sname,
3716                                       const struct lu_name *tname,
3717                                       struct lu_attr *attr,
3718                                       struct lu_attr *spattr,
3719                                       struct lu_attr *tpattr,
3720                                       struct linkea_data *ldata,
3721                                       struct md_attr *ma,
3722                                       struct thandle *handle)
3723 {
3724         struct mdd_thread_info *info = mdd_env_info(env);
3725         struct lu_attr *la = &info->mdi_la_for_fix;
3726         int rc;
3727
3728         rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
3729         if (rc)
3730                 return rc;
3731
3732         if (S_ISDIR(attr->la_mode)) {
3733                 rc = mdo_declare_ref_del(env, spobj, handle);
3734                 if (rc)
3735                         return rc;
3736         }
3737
3738         rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
3739                                       attr->la_mode & S_IFMT,
3740                                       tname->ln_name, handle);
3741         if (rc)
3742                 return rc;
3743
3744         rc = mdd_declare_links_add(env, obj, handle, ldata);
3745         if (rc)
3746                 return rc;
3747
3748         if (S_ISDIR(attr->la_mode)) {
3749                 rc = mdo_declare_ref_add(env, tpobj, handle);
3750                 if (rc)
3751                         return rc;
3752         }
3753
3754         la->la_valid = LA_CTIME | LA_MTIME;
3755         rc = mdo_declare_attr_set(env, spobj, la, handle);
3756         if (rc)
3757                 return rc;
3758
3759         if (tpobj != spobj) {
3760                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
3761                 if (rc)
3762                         return rc;
3763         }
3764
3765         return rc;
3766 }
3767
3768 static int mdd_declare_migrate_create(const struct lu_env *env,
3769                                       struct mdd_object *spobj,
3770                                       struct mdd_object *tpobj,
3771                                       struct mdd_object *sobj,
3772                                       struct mdd_object *tobj,
3773                                       const struct lu_name *sname,
3774                                       const struct lu_name *tname,
3775                                       struct lu_attr *spattr,
3776                                       struct lu_attr *tpattr,
3777                                       struct lu_attr *attr,
3778                                       struct lu_buf *sbuf,
3779                                       struct linkea_data *ldata,
3780                                       struct md_attr *ma,
3781                                       struct md_op_spec *spec,
3782                                       struct dt_allocation_hint *hint,
3783                                       struct thandle *handle)
3784 {
3785         struct mdd_thread_info *info = mdd_env_info(env);
3786         struct md_layout_change *mlc = &info->mdi_mlc;
3787         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
3788         int rc;
3789
3790         ENTRY;
3791
3792         if (S_ISDIR(attr->la_mode)) {
3793                 struct lmv_user_md *lum = spec->u.sp_ea.eadata;
3794
3795                 mlc->mlc_opc = MD_LAYOUT_DETACH;
3796                 rc = mdo_declare_layout_change(env, sobj, mlc, handle);
3797                 if (rc)
3798                         return rc;
3799
3800                 lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3801         } else if (S_ISLNK(attr->la_mode)) {
3802                 spec->u.sp_symname = sbuf->lb_buf;
3803         } else if (S_ISREG(attr->la_mode)) {
3804                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
3805                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
3806         }
3807
3808         mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
3809
3810         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
3811                                 tname, attr, handle, spec, ldata, NULL, NULL,
3812                                 NULL, hint);
3813         if (rc)
3814                 return rc;
3815
3816         /*
3817          * tobj mode will be used in mdo_declare_layout_change(), but it's not
3818          * createb yet, copy from sobj.
3819          */
3820         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
3821         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
3822                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
3823
3824         if (S_ISDIR(attr->la_mode)) {
3825                 if (!lmv) {
3826                         /* if sobj is not striped, fake a 1-stripe LMV */
3827                         LASSERT(sizeof(info->mdi_key) >
3828                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
3829                         lmv = (typeof(lmv))info->mdi_key;
3830                         memset(lmv, 0, sizeof(*lmv));
3831                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3832                         lmv->lmv_stripe_count = cpu_to_le32(1);
3833                         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
3834                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
3835                                       mdd_object_fid(sobj));
3836                         mlc->mlc_buf.lb_buf = lmv;
3837                         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
3838                 } else {
3839                         mlc->mlc_buf = *sbuf;
3840                 }
3841                 mlc->mlc_opc = MD_LAYOUT_ATTACH;
3842                 rc = mdo_declare_layout_change(env, tobj, mlc, handle);
3843                 if (rc)
3844                         return rc;
3845         }
3846
3847         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
3848                                 mdo_declare_xattr_set);
3849         if (rc)
3850                 return rc;
3851
3852         if (S_ISREG(attr->la_mode)) {
3853                 struct lu_buf fid_buf;
3854
3855                 handle->th_complex = 1;
3856
3857                 /* target may be remote, update PFID via sobj. */
3858                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
3859                 fid_buf.lb_len = sizeof(struct lu_fid);
3860                 rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
3861                                            0, handle);
3862                 if (rc)
3863                         return rc;
3864
3865                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
3866                 if (rc)
3867                         return rc;
3868         }
3869
3870         if (!S_ISDIR(attr->la_mode)) {
3871                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
3872                                         mdd_object_fid(tpobj), ldata, NULL,
3873                                         handle, mdd_declare_update_link);
3874                 if (rc)
3875                         return rc;
3876         }
3877
3878         if (!S_ISDIR(attr->la_mode) || lmv) {
3879                 rc = mdo_declare_ref_del(env, sobj, handle);
3880                 if (rc)
3881                         return rc;
3882
3883                 if (S_ISDIR(attr->la_mode)) {
3884                         rc = mdo_declare_ref_del(env, sobj, handle);
3885                         if (rc)
3886                                 return rc;
3887                 }
3888
3889                 rc = mdo_declare_destroy(env, sobj, handle);
3890                 if (rc)
3891                         return rc;
3892         }
3893
3894         rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
3895                                         attr, spattr, tpattr, ldata, ma,
3896                                         handle);
3897         return rc;
3898 }
3899
3900 /**
3901  * migrate dirent from \a spobj to \a tpobj.
3902  **/
3903 static int mdd_migrate_update(const struct lu_env *env,
3904                               struct mdd_object *spobj,
3905                               struct mdd_object *tpobj,
3906                               struct mdd_object *obj,
3907                               const struct lu_name *sname,
3908                               const struct lu_name *tname,
3909                               struct lu_attr *attr,
3910                               struct lu_attr *spattr,
3911                               struct lu_attr *tpattr,
3912                               struct linkea_data *ldata,
3913                               struct md_attr *ma,
3914                               struct thandle *handle)
3915 {
3916         struct mdd_thread_info *info = mdd_env_info(env);
3917         struct lu_attr *la = &info->mdi_la_for_fix;
3918         int rc;
3919
3920         ENTRY;
3921
3922         CDEBUG(D_INFO, "update "DFID" from "DFID"/%s to "DFID"/%s\n",
3923                PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
3924                sname->ln_name, PFID(mdd_object_fid(tpobj)), tname->ln_name);
3925
3926         rc = __mdd_index_delete(env, spobj, sname->ln_name,
3927                                 S_ISDIR(attr->la_mode), handle);
3928         if (rc)
3929                 RETURN(rc);
3930
3931         rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
3932                                 attr->la_mode & S_IFMT,
3933                                 tname->ln_name, handle);
3934         if (rc)
3935                 RETURN(rc);
3936
3937         rc = mdd_links_write(env, obj, ldata, handle);
3938         if (rc)
3939                 RETURN(rc);
3940
3941         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3942         la->la_valid = LA_CTIME | LA_MTIME;
3943         mdd_write_lock(env, spobj, DT_SRC_PARENT);
3944         rc = mdd_update_time(env, spobj, spattr, la, handle);
3945         mdd_write_unlock(env, spobj);
3946         if (rc)
3947                 RETURN(rc);
3948
3949         if (tpobj != spobj) {
3950                 la->la_valid = LA_CTIME | LA_MTIME;
3951                 mdd_write_lock(env, tpobj, DT_TGT_PARENT);
3952                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
3953                 mdd_write_unlock(env, tpobj);
3954                 if (rc)
3955                         RETURN(rc);
3956         }
3957
3958         RETURN(rc);
3959 }
3960
3961 /**
3962  * Migrate file/dir to target MDT.
3963  *
3964  * Create target according to \a spec, and then migrate xattrs, if it's
3965  * directory, migrate source stripes to target.
3966  *
3967  * \param[in] env       execution environment
3968  * \param[in] spobj     source parent object
3969  * \param[in] tpobj     target parent object
3970  * \param[in] sobj      source object
3971  * \param[in] tobj      target object
3972  * \param[in] lname     file name
3973  * \param[in] spattr    source parent attributes
3974  * \param[in] tpattr    target parent attributes
3975  * \param[in] attr      source attributes
3976  * \param[in] sbuf      source LMV buf
3977  * \param[in] spec      migrate create spec
3978  * \param[in] hint      target creation hint
3979  * \param[in] handle    tranasction handle
3980  *
3981  * \retval      0 on success
3982  * \retval      -errno on failure
3983  **/
3984 static int mdd_migrate_create(const struct lu_env *env,
3985                               struct mdd_object *spobj,
3986                               struct mdd_object *tpobj,
3987                               struct mdd_object *sobj,
3988                               struct mdd_object *tobj,
3989                               const struct lu_name *sname,
3990                               const struct lu_name *tname,
3991                               struct lu_attr *spattr,
3992                               struct lu_attr *tpattr,
3993                               struct lu_attr *attr,
3994                               const struct lu_buf *sbuf,
3995                               struct linkea_data *ldata,
3996                               struct md_attr *ma,
3997                               struct md_op_spec *spec,
3998                               struct dt_allocation_hint *hint,
3999                               struct thandle *handle)
4000 {
4001         int rc;
4002
4003         ENTRY;
4004
4005         /*
4006          * migrate sobj stripes to tobj if it's directory:
4007          * 1. detach stripes from sobj.
4008          * 2. attach stripes to tobj, see mdd_declare_migrate_mdt().
4009          * 3. create stripes for tobj, see lod_xattr_set_lmv().
4010          */
4011         if (S_ISDIR(attr->la_mode)) {
4012                 struct mdd_thread_info *info = mdd_env_info(env);
4013                 struct md_layout_change *mlc = &info->mdi_mlc;
4014
4015                 mlc->mlc_opc = MD_LAYOUT_DETACH;
4016
4017                 mdd_write_lock(env, sobj, DT_SRC_PARENT);
4018                 rc = mdo_layout_change(env, sobj, mlc, handle);
4019                 mdd_write_unlock(env, sobj);
4020                 if (rc)
4021                         RETURN(rc);
4022         }
4023
4024         /* don't set nlink from sobj */
4025         attr->la_valid &= ~LA_NLINK;
4026
4027         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
4028                                hint, handle, false);
4029         if (rc)
4030                 RETURN(rc);
4031
4032         mdd_write_lock(env, tobj, DT_TGT_CHILD);
4033         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
4034         mdd_write_unlock(env, tobj);
4035         if (rc)
4036                 RETURN(rc);
4037
4038         /* for regular file, update OST objects XATTR_NAME_FID */
4039         if (S_ISREG(attr->la_mode)) {
4040                 struct lu_buf fid_buf;
4041
4042                 /* target may be remote, update PFID via sobj. */
4043                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
4044                 fid_buf.lb_len = sizeof(struct lu_fid);
4045                 rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
4046                                    handle);
4047                 if (rc)
4048                         RETURN(rc);
4049
4050                 /* delete LOV to avoid deleting OST objs when destroying sobj */
4051                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4052                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4053                 mdd_write_unlock(env, sobj);
4054                 /* O_DELAY_CREATE file may not have LOV, ignore -ENODATA */
4055                 if (rc && rc != -ENODATA)
4056                         RETURN(rc);
4057                 rc = 0;
4058         }
4059
4060         /* update links FID */
4061         if (!S_ISDIR(attr->la_mode)) {
4062                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
4063                                         mdd_object_fid(tpobj), ldata,
4064                                         NULL, handle, mdd_update_link);
4065                 if (rc)
4066                         RETURN(rc);
4067         }
4068
4069         /* don't destroy sobj if it's plain directory */
4070         if (!S_ISDIR(attr->la_mode) || sbuf->lb_buf) {
4071                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4072                 rc = mdo_ref_del(env, sobj, handle);
4073                 if (!rc) {
4074                         if (S_ISDIR(attr->la_mode))
4075                                 rc = mdo_ref_del(env, sobj, handle);
4076                         if (!rc)
4077                                 rc = mdo_destroy(env, sobj, handle);
4078                 }
4079                 mdd_write_unlock(env, sobj);
4080                 if (rc)
4081                         RETURN(rc);
4082         }
4083
4084         rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
4085                                 spattr, tpattr, ldata, ma, handle);
4086
4087         RETURN(rc);
4088 }
4089
4090 /* NB: if user issued different migrate command, we can't adjust it silently
4091  * here, because this command will decide target MDT in subdir migration in
4092  * LMV.
4093  */
4094 static int mdd_migrate_cmd_check(struct mdd_device *mdd,
4095                                  const struct lmv_mds_md_v1 *lmv,
4096                                  const struct lmv_user_md_v1 *lum,
4097                                  const struct lu_name *lname)
4098 {
4099         __u32 lum_stripe_count = lum->lum_stripe_count;
4100         __u32 lum_hash_type = lum->lum_hash_type &
4101                               cpu_to_le32(LMV_HASH_TYPE_MASK);
4102         __u32 lmv_hash_type = lmv->lmv_hash_type &
4103                               cpu_to_le32(LMV_HASH_TYPE_MASK);
4104
4105         if (!lmv_is_sane(lmv))
4106                 return -EBADF;
4107
4108         /* if stripe_count unspecified, set to 1 */
4109         if (!lum_stripe_count)
4110                 lum_stripe_count = cpu_to_le32(1);
4111
4112         /* TODO: check specific MDTs */
4113         if (lum_stripe_count != lmv->lmv_migrate_offset ||
4114             lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
4115             (lum_hash_type && lum_hash_type != lmv_hash_type)) {
4116                 CERROR("%s: '"DNAME"' migration was interrupted, run 'lfs migrate -m %d -c %d -H %s "DNAME"' to finish migration.\n",
4117                         mdd2obd_dev(mdd)->obd_name, PNAME(lname),
4118                         le32_to_cpu(lmv->lmv_master_mdt_index),
4119                         le32_to_cpu(lmv->lmv_migrate_offset),
4120                         mdt_hash_name[le32_to_cpu(lmv_hash_type)],
4121                         PNAME(lname));
4122                 return -EPERM;
4123         }
4124
4125         return -EALREADY;
4126 }
4127
4128 /**
4129  * Internal function to migrate directory or file between MDTs.
4130  *
4131  * migrate source to target in following steps:
4132  *   1. create target, append source stripes after target's if it's directory,
4133  *      migrate xattrs and update fid of source links.
4134  *   2. update namespace: migrate dirent from source parent to target parent,
4135  *      update file linkea, and destroy source if it's not needed any more.
4136  *
4137  * \param[in] env       execution environment
4138  * \param[in] spobj     source parent object
4139  * \param[in] tpobj     target parent object
4140  * \param[in] sobj      source object
4141  * \param[in] tobj      target object
4142  * \param[in] sname     source file name
4143  * \param[in] tname     target file name
4144  * \param[in] spec      target creation spec
4145  * \param[in] ma        used to update \a pobj mtime and ctime
4146  *
4147  * \retval              0 on success
4148  * \retval              -errno on failure
4149  */
4150 static int mdd_migrate_object(const struct lu_env *env,
4151                               struct mdd_object *spobj,
4152                               struct mdd_object *tpobj,
4153                               struct mdd_object *sobj,
4154                               struct mdd_object *tobj,
4155                               const struct lu_name *sname,
4156                               const struct lu_name *tname,
4157                               struct md_op_spec *spec,
4158                               struct md_attr *ma)
4159 {
4160         struct mdd_thread_info *info = mdd_env_info(env);
4161         struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
4162         struct lu_attr *spattr = &info->mdi_pattr;
4163         struct lu_attr *tpattr = &info->mdi_tpattr;
4164         struct lu_attr *attr = &info->mdi_cattr;
4165         struct linkea_data *ldata = &info->mdi_link_data;
4166         struct dt_allocation_hint *hint = &info->mdi_hint;
4167         struct lu_buf sbuf = { NULL };
4168         struct lmv_mds_md_v1 *lmv;
4169         struct thandle *handle;
4170         int rc;
4171
4172         ENTRY;
4173
4174         rc = mdd_la_get(env, sobj, attr);
4175         if (rc)
4176                 RETURN(rc);
4177
4178         rc = mdd_la_get(env, spobj, spattr);
4179         if (rc)
4180                 RETURN(rc);
4181
4182         rc = mdd_la_get(env, tpobj, tpattr);
4183         if (rc)
4184                 RETURN(rc);
4185
4186         if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
4187                 struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
4188
4189                 LASSERT(lum);
4190
4191                 /* if user use default value '0' for stripe_count, we need to
4192                  * adjust it to '1' to create a 1-stripe directory.
4193                  */
4194                 if (lum->lum_stripe_count == 0)
4195                         lum->lum_stripe_count = cpu_to_le32(1);
4196
4197                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4198                 if (rc && rc != -ENODATA)
4199                         GOTO(out, rc);
4200
4201                 lmv = sbuf.lb_buf;
4202                 if (lmv) {
4203                         if (!lmv_is_sane(lmv))
4204                                 GOTO(out, rc = -EBADF);
4205                         if (lmv_is_migrating(lmv)) {
4206                                 rc = mdd_migrate_cmd_check(mdd, lmv, lum,
4207                                                            sname);
4208                                 GOTO(out, rc);
4209                         }
4210                 }
4211         } else if (!S_ISDIR(attr->la_mode)) {
4212                 if (spobj == tpobj)
4213                         GOTO(out, rc = -EALREADY);
4214
4215                 /* update namespace only if @sobj is on MDT where @tpobj is. */
4216                 if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
4217                         spec->sp_migrate_nsonly = true;
4218
4219                 if (S_ISLNK(attr->la_mode)) {
4220                         lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4221                         if (!sbuf.lb_buf)
4222                                 GOTO(out, rc = -ENOMEM);
4223
4224                         rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4225                         if (rc <= 0) {
4226                                 rc = rc ?: -EFAULT;
4227                                 CERROR("%s: "DFID" readlink failed: rc = %d\n",
4228                                        mdd2obd_dev(mdd)->obd_name,
4229                                        PFID(mdd_object_fid(sobj)), rc);
4230                                 GOTO(out, rc);
4231                         }
4232                 }
4233         }
4234
4235         /* linkea needs update upon FID or parent stripe change */
4236         rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
4237                                         tname, attr, ldata);
4238         if (rc > 0)
4239                 /* update namespace only if @sobj has link on its MDT. */
4240                 spec->sp_migrate_nsonly = true;
4241         else if (rc < 0)
4242                 GOTO(out, rc);
4243
4244         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4245                                       spattr, tpattr, attr);
4246         if (rc)
4247                 GOTO(out, rc);
4248
4249         handle = mdd_trans_create(env, mdd);
4250         if (IS_ERR(handle))
4251                 GOTO(out, rc = PTR_ERR(handle));
4252
4253         if (spec->sp_migrate_nsonly)
4254                 rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
4255                                                 tname, attr, spattr, tpattr,
4256                                                 ldata, ma, handle);
4257         else
4258                 rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
4259                                                 sname, tname, spattr, tpattr,
4260                                                 attr, &sbuf, ldata, ma, spec,
4261                                                 hint, handle);
4262         if (rc)
4263                 GOTO(stop, rc);
4264
4265         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
4266                                          handle);
4267         if (rc)
4268                 GOTO(stop, rc);
4269
4270         rc = mdd_trans_start(env, mdd, handle);
4271         if (rc)
4272                 GOTO(stop, rc);
4273
4274         if (spec->sp_migrate_nsonly)
4275                 rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
4276                                         attr, spattr, tpattr, ldata, ma,
4277                                         handle);
4278         else
4279                 rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
4280                                         tname, spattr, tpattr, attr, &sbuf,
4281                                         ldata, ma, spec, hint, handle);
4282         if (rc)
4283                 GOTO(stop, rc);
4284
4285         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
4286                                     spec->sp_migrate_nsonly ? sobj : tobj,
4287                                     mdd_object_fid(spobj), mdd_object_fid(sobj),
4288                                     mdd_object_fid(tpobj), tname, sname,
4289                                     handle);
4290         if (rc)
4291                 GOTO(stop, rc);
4292         EXIT;
4293
4294 stop:
4295         rc = mdd_trans_stop(env, mdd, rc, handle);
4296 out:
4297         lu_buf_free(&sbuf);
4298
4299         return rc;
4300 }
4301
4302 /**
4303  * Migrate directory or file between MDTs.
4304  *
4305  * \param[in] env       execution environment
4306  * \param[in] md_pobj   parent master object
4307  * \param[in] md_sobj   source object
4308  * \param[in] lname     file name
4309  * \param[in] md_tobj   target object
4310  * \param[in] spec      target creation spec
4311  * \param[in] ma        used to update \a pobj mtime and ctime
4312  *
4313  * \retval              0 on success
4314  * \retval              -errno on failure
4315  */
4316 static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
4317                        struct md_object *md_sobj, const struct lu_name *lname,
4318                        struct md_object *md_tobj, struct md_op_spec *spec,
4319                        struct md_attr *ma)
4320 {
4321         struct mdd_thread_info *info = mdd_env_info(env);
4322         struct mdd_device *mdd = mdo2mdd(md_pobj);
4323         struct mdd_object *pobj = md2mdd_obj(md_pobj);
4324         struct mdd_object *sobj = md2mdd_obj(md_sobj);
4325         struct mdd_object *tobj = md2mdd_obj(md_tobj);
4326         struct mdd_object *spobj = NULL;
4327         struct mdd_object *tpobj = NULL;
4328         struct lu_buf pbuf = { NULL };
4329         struct lu_fid *fid = &info->mdi_fid2;
4330         struct lmv_mds_md_v1 *lmv;
4331         int rc;
4332
4333         ENTRY;
4334
4335         /* locate source and target stripe on pobj, which are the real parent */
4336         rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
4337         if (rc < 0 && rc != -ENODATA)
4338                 RETURN(rc);
4339
4340         lmv = pbuf.lb_buf;
4341         if (lmv) {
4342                 int index;
4343
4344                 if (!lmv_is_sane(lmv))
4345                         GOTO(out, rc = -EBADF);
4346
4347                 /* locate target parent stripe */
4348                 /* fail check here to make sure top dir migration succeed. */
4349                 if (lmv_is_migrating(lmv) &&
4350                     OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
4351                         GOTO(out, rc = -EIO);
4352
4353                 index = lmv_name_to_stripe_index(lmv, lname->ln_name,
4354                                                  lname->ln_namelen);
4355                 if (index < 0)
4356                         GOTO(out, rc = index);
4357
4358                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
4359                 tpobj = mdd_object_find(env, mdd, fid);
4360                 if (IS_ERR(tpobj))
4361                         GOTO(out, rc = PTR_ERR(tpobj));
4362
4363                 /* locate source parent stripe */
4364                 if (lmv_is_layout_changing(lmv)) {
4365                         index = lmv_name_to_stripe_index_old(lmv,
4366                                                              lname->ln_name,
4367                                                              lname->ln_namelen);
4368                         if (index < 0)
4369                                 GOTO(out, rc = index);
4370
4371                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
4372                         spobj = mdd_object_find(env, mdd, fid);
4373                         if (IS_ERR(spobj))
4374                                 GOTO(out, rc = PTR_ERR(spobj));
4375
4376                         /* parent stripe unchanged */
4377                         if (spobj == tpobj) {
4378                                 if (!lmv_is_restriping(lmv))
4379                                         GOTO(out, rc = -EINVAL);
4380                                 GOTO(out, rc = -EALREADY);
4381                         }
4382                 } else {
4383                         spobj = tpobj;
4384                         mdd_object_get(spobj);
4385                 }
4386         } else {
4387                 tpobj = pobj;
4388                 spobj = pobj;
4389                 mdd_object_get(tpobj);
4390                 mdd_object_get(spobj);
4391         }
4392
4393         rc = mdd_migrate_object(env, spobj, tpobj, sobj, tobj, lname, lname,
4394                                 spec, ma);
4395         GOTO(out, rc);
4396
4397 out:
4398         if (!IS_ERR_OR_NULL(spobj))
4399                 mdd_object_put(env, spobj);
4400         if (!IS_ERR_OR_NULL(tpobj))
4401                 mdd_object_put(env, tpobj);
4402         lu_buf_free(&pbuf);
4403
4404         return rc;
4405 }
4406
4407 static int mdd_declare_1sd_collapse(const struct lu_env *env,
4408                                     struct mdd_object *pobj,
4409                                     struct mdd_object *obj,
4410                                     struct mdd_object *stripe,
4411                                     struct lu_attr *attr,
4412                                     struct md_layout_change *mlc,
4413                                     struct lu_name *lname,
4414                                     struct thandle *handle)
4415 {
4416         int rc;
4417
4418         mlc->mlc_opc = MD_LAYOUT_DETACH;
4419         rc = mdo_declare_layout_change(env, obj, mlc, handle);
4420         if (rc)
4421                 return rc;
4422
4423         rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
4424                                       S_IFDIR, dotdot, handle);
4425         if (rc)
4426                 return rc;
4427
4428         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
4429                                 mdo_declare_xattr_set);
4430         if (rc)
4431                 return rc;
4432
4433         rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4434         if (rc)
4435                 return rc;
4436
4437         rc = mdo_declare_attr_set(env, stripe, attr, handle);
4438         if (rc)
4439                 return rc;
4440
4441         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4442         if (rc)
4443                 return rc;
4444
4445         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(stripe),
4446                                       attr->la_mode, lname->ln_name, handle);
4447         if (rc)
4448                 return rc;
4449
4450         rc = mdo_declare_ref_del(env, obj, handle);
4451         if (rc)
4452                 return rc;
4453
4454         rc = mdo_declare_ref_del(env, obj, handle);
4455         if (rc)
4456                 return rc;
4457
4458         rc = mdo_declare_destroy(env, obj, handle);
4459         if (rc)
4460                 return rc;
4461
4462         return rc;
4463 }
4464
4465 /* transform one-stripe directory to a plain directory */
4466 static int mdd_1sd_collapse(const struct lu_env *env,
4467                             struct mdd_object *pobj,
4468                             struct mdd_object *obj,
4469                             struct mdd_object *stripe,
4470                             struct lu_attr *attr,
4471                             struct md_layout_change *mlc,
4472                             struct lu_name *lname,
4473                             struct thandle *handle)
4474 {
4475         int rc;
4476
4477         ENTRY;
4478
4479         /* replace 1-stripe directory with its stripe */
4480         mlc->mlc_opc = MD_LAYOUT_DETACH;
4481
4482         mdd_write_lock(env, obj, DT_SRC_PARENT);
4483         rc = mdo_layout_change(env, obj, mlc, handle);
4484         mdd_write_unlock(env, obj);
4485         if (rc)
4486                 RETURN(rc);
4487
4488         mdd_write_lock(env, pobj, DT_SRC_PARENT);
4489         mdd_write_lock(env, obj, DT_SRC_CHILD);
4490
4491         /* insert dotdot to stripe which points to parent */
4492         rc = __mdd_index_insert_only(env, stripe, mdd_object_fid(pobj),
4493                                      S_IFDIR, dotdot, handle);
4494         if (rc)
4495                 GOTO(out, rc);
4496
4497         /* copy xattrs including linkea */
4498         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
4499         if (rc)
4500                 GOTO(out, rc);
4501
4502         /* delete LMV */
4503         rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4504         if (rc)
4505                 GOTO(out, rc);
4506
4507         /* don't set nlink from parent */
4508         attr->la_valid &= ~LA_NLINK;
4509
4510         rc = mdo_attr_set(env, stripe, attr, handle);
4511         if (rc)
4512                 GOTO(out, rc);
4513
4514         /* delete dir name from parent */
4515         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
4516         if (rc)
4517                 GOTO(out, rc);
4518
4519         /* insert stripe to parent with dir name */
4520         rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(stripe),
4521                                      attr->la_mode, lname->ln_name, handle);
4522         if (rc)
4523                 GOTO(out, rc);
4524
4525         /* destroy dir obj */
4526         rc = mdo_ref_del(env, obj, handle);
4527         if (rc)
4528                 GOTO(out, rc);
4529
4530         rc = mdo_ref_del(env, obj, handle);
4531         if (rc)
4532                 GOTO(out, rc);
4533
4534         rc = mdo_destroy(env, obj, handle);
4535         if (rc)
4536                 GOTO(out, rc);
4537
4538         EXIT;
4539 out:
4540         mdd_write_unlock(env, obj);
4541         mdd_write_unlock(env, pobj);
4542
4543         return rc;
4544 }
4545
4546 /*
4547  * shrink directory stripes after migration/merge
4548  */
4549 int mdd_dir_layout_shrink(const struct lu_env *env,
4550                           struct md_object *md_obj,
4551                           struct md_layout_change *mlc)
4552 {
4553         struct mdd_device *mdd = mdo2mdd(md_obj);
4554         struct mdd_thread_info *info = mdd_env_info(env);
4555         struct mdd_object *obj = md2mdd_obj(md_obj);
4556         struct mdd_object *pobj = NULL;
4557         struct mdd_object *stripe = NULL;
4558         struct lu_attr *attr = &info->mdi_pattr;
4559         struct lu_fid *fid = &info->mdi_fid2;
4560         struct lu_name lname = { NULL };
4561         struct lu_buf lmv_buf = { NULL };
4562         struct lmv_mds_md_v1 *lmv;
4563         struct lmv_user_md *lmu;
4564         struct thandle *handle;
4565         int rc;
4566
4567         ENTRY;
4568
4569         rc = mdd_la_get(env, obj, attr);
4570         if (rc)
4571                 RETURN(rc);
4572
4573         if (!S_ISDIR(attr->la_mode))
4574                 RETURN(-ENOTDIR);
4575
4576         rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
4577         if (rc < 0)
4578                 RETURN(rc);
4579
4580         lmv = lmv_buf.lb_buf;
4581         if (!lmv_is_sane(lmv))
4582                 RETURN(-EBADF);
4583
4584         lmu = mlc->mlc_buf.lb_buf;
4585
4586         /* adjust the default value '0' to '1' */
4587         if (lmu->lum_stripe_count == 0)
4588                 lmu->lum_stripe_count = cpu_to_le32(1);
4589
4590         /* these were checked in MDT */
4591         LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
4592                 le32_to_cpu(lmv->lmv_stripe_count));
4593         LASSERT(!lmv_is_splitting(lmv));
4594         LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
4595
4596         /* if dir stripe count will be shrunk to 1, it needs to be transformed
4597          * to a plain dir, which will cause FID change and namespace update.
4598          */
4599         if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
4600                 struct linkea_data *ldata = &info->mdi_link_data;
4601                 char *filename = info->mdi_name;
4602
4603                 rc = mdd_links_read(env, obj, ldata);
4604                 if (rc)
4605                         GOTO(out, rc);
4606
4607                 if (ldata->ld_leh->leh_reccount > 1)
4608                         GOTO(out, rc = -EINVAL);
4609
4610                 linkea_first_entry(ldata);
4611                 if (!ldata->ld_lee)
4612                         GOTO(out, rc = -ENODATA);
4613
4614                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
4615                                     fid);
4616
4617                 /* Note: lname might miss \0 at the end */
4618                 snprintf(filename, sizeof(info->mdi_name), "%.*s",
4619                          lname.ln_namelen, lname.ln_name);
4620                 lname.ln_name = filename;
4621
4622                 pobj = mdd_object_find(env, mdd, fid);
4623                 if (IS_ERR(pobj)) {
4624                         rc = PTR_ERR(pobj);
4625                         pobj = NULL;
4626                         GOTO(out, rc);
4627                 }
4628
4629                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
4630
4631                 stripe = mdd_object_find(env, mdd, fid);
4632                 if (IS_ERR(stripe)) {
4633                         mdd_object_put(env, pobj);
4634                         pobj = NULL;
4635                         GOTO(out, rc = PTR_ERR(stripe));
4636                 }
4637         }
4638
4639         handle = mdd_trans_create(env, mdd);
4640         if (IS_ERR(handle))
4641                 GOTO(out, rc = PTR_ERR(handle));
4642
4643         mlc->mlc_opc = MD_LAYOUT_SHRINK;
4644         rc = mdo_declare_layout_change(env, obj, mlc, handle);
4645         if (rc)
4646                 GOTO(stop_trans, rc);
4647
4648         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
4649                 rc = mdd_declare_1sd_collapse(env, pobj, obj, stripe, attr, mlc,
4650                                               &lname, handle);
4651                 if (rc)
4652                         GOTO(stop_trans, rc);
4653         }
4654
4655         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
4656                                          handle);
4657         if (rc)
4658                 GOTO(stop_trans, rc);
4659
4660         rc = mdd_trans_start(env, mdd, handle);
4661         if (rc)
4662                 GOTO(stop_trans, rc);
4663
4664         mdd_write_lock(env, obj, DT_SRC_PARENT);
4665         mlc->mlc_opc = MD_LAYOUT_SHRINK;
4666         rc = mdo_layout_change(env, obj, mlc, handle);
4667         mdd_write_unlock(env, obj);
4668         if (rc)
4669                 GOTO(stop_trans, rc);
4670
4671         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
4672                 rc = mdd_1sd_collapse(env, pobj, obj, stripe, attr, mlc, &lname,
4673                                       handle);
4674                 if (rc)
4675                         GOTO(stop_trans, rc);
4676         }
4677
4678         rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
4679                                             XATTR_NAME_LMV, handle);
4680         GOTO(stop_trans, rc);
4681
4682 stop_trans:
4683         rc = mdd_trans_stop(env, mdd, rc, handle);
4684 out:
4685         if (pobj) {
4686                 mdd_object_put(env, stripe);
4687                 mdd_object_put(env, pobj);
4688         }
4689         lu_buf_free(&lmv_buf);
4690         return rc;
4691 }
4692
4693 static int mdd_dir_declare_split_plain(const struct lu_env *env,
4694                                         struct mdd_device *mdd,
4695                                         struct mdd_object *pobj,
4696                                         struct mdd_object *obj,
4697                                         struct mdd_object *tobj,
4698                                         struct md_layout_change *mlc,
4699                                         struct dt_allocation_hint *hint,
4700                                         struct thandle *handle)
4701 {
4702         struct mdd_thread_info *info = mdd_env_info(env);
4703         const struct lu_name *lname = mlc->mlc_name;
4704         struct lu_attr *la = &info->mdi_la_for_fix;
4705         struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
4706         struct linkea_data *ldata = &info->mdi_link_data;
4707         struct lmv_mds_md_v1 *lmv;
4708         __u32 count;
4709         int rc;
4710
4711         mlc->mlc_opc = MD_LAYOUT_DETACH;
4712         rc = mdo_declare_layout_change(env, obj, mlc, handle);
4713         if (rc)
4714                 return rc;
4715
4716         memset(ldata, 0, sizeof(*ldata));
4717         rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
4718                                 lname, 1, 0, ldata);
4719         if (rc)
4720                 return rc;
4721
4722         count = lum->lum_stripe_count;
4723         lum->lum_stripe_count = 0;
4724         mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
4725                              hint);
4726         rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
4727                                 lname, mlc->mlc_attr, handle, mlc->mlc_spec,
4728                                 ldata, NULL, NULL, NULL, hint);
4729         if (rc)
4730                 return rc;
4731
4732         /* tobj mode will be used in lod_declare_xattr_set(), but it's not
4733          * created yet.
4734          */
4735         tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
4736
4737         lmv = (typeof(lmv))info->mdi_key;
4738         memset(lmv, 0, sizeof(*lmv));
4739         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
4740         lmv->lmv_stripe_count = cpu_to_le32(1);
4741         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
4742         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
4743
4744         mlc->mlc_opc = MD_LAYOUT_ATTACH;
4745         mlc->mlc_buf.lb_buf = lmv;
4746         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
4747         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
4748         if (rc)
4749                 return rc;
4750
4751         rc = mdd_iterate_xattrs(env, obj, tobj, true, handle,
4752                                 mdo_declare_xattr_set);
4753         if (rc)
4754                 return rc;
4755
4756         lum->lum_stripe_count = count;
4757         mlc->mlc_opc = MD_LAYOUT_SPLIT;
4758         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
4759         if (rc)
4760                 return rc;
4761
4762         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4763         if (rc)
4764                 return rc;
4765
4766         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
4767                                       S_IFDIR, lname->ln_name, handle);
4768         if (rc)
4769                 return rc;
4770
4771         la->la_valid = LA_CTIME | LA_MTIME;
4772         rc = mdo_declare_attr_set(env, obj, la, handle);
4773         if (rc)
4774                 return rc;
4775
4776         rc = mdo_declare_attr_set(env, pobj, la, handle);
4777         if (rc)
4778                 return rc;
4779
4780         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4781                                          handle);
4782         return rc;
4783 }
4784
4785 /**
4786  * plain directory split:
4787  * 1. create \a tobj as plain directory.
4788  * 2. append \a obj as first stripe of \a tobj.
4789  * 3. migrate xattrs from \a obj to \a tobj.
4790  * 4. split \a tobj to specific stripe count.
4791  */
4792 static int mdd_dir_split_plain(const struct lu_env *env,
4793                                 struct mdd_device *mdd,
4794                                 struct mdd_object *pobj,
4795                                 struct mdd_object *obj,
4796                                 struct mdd_object *tobj,
4797                                 struct md_layout_change *mlc,
4798                                 struct dt_allocation_hint *hint,
4799                                 struct thandle *handle)
4800 {
4801         struct mdd_thread_info *info = mdd_env_info(env);
4802         struct lu_attr *pattr = &info->mdi_pattr;
4803         struct lu_attr *la = &info->mdi_la_for_fix;
4804         const struct lu_name *lname = mlc->mlc_name;
4805         struct linkea_data *ldata = &info->mdi_link_data;
4806         int rc;
4807
4808         ENTRY;
4809
4810         /* copy linkea out and set on target later */
4811         rc = mdd_links_read(env, obj, ldata);
4812         if (rc)
4813                 RETURN(rc);
4814
4815         mlc->mlc_opc = MD_LAYOUT_DETACH;
4816         rc = mdo_layout_change(env, obj, mlc, handle);
4817         if (rc)
4818                 RETURN(rc);
4819
4820         /* don't set nlink from obj */
4821         mlc->mlc_attr->la_valid &= ~LA_NLINK;
4822
4823         rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
4824                                NULL, NULL, NULL, hint, handle, false);
4825         if (rc)
4826                 RETURN(rc);
4827
4828         rc = mdd_iterate_xattrs(env, obj, tobj, true, handle, mdo_xattr_set);
4829         if (rc)
4830                 RETURN(rc);
4831
4832         rc = mdd_links_write(env, tobj, ldata, handle);
4833         if (rc)
4834                 RETURN(rc);
4835
4836         rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
4837         if (rc)
4838                 RETURN(rc);
4839
4840         rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
4841                                 lname->ln_name, handle);
4842         if (rc)
4843                 RETURN(rc);
4844
4845         la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
4846         la->la_valid = LA_CTIME | LA_MTIME;
4847
4848         mdd_write_lock(env, obj, DT_SRC_CHILD);
4849         rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
4850         mdd_write_unlock(env, obj);
4851         if (rc)
4852                 RETURN(rc);
4853
4854         rc = mdd_la_get(env, pobj, pattr);
4855         if (rc)
4856                 RETURN(rc);
4857
4858         la->la_valid = LA_CTIME | LA_MTIME;
4859
4860         mdd_write_lock(env, pobj, DT_SRC_PARENT);
4861         rc = mdd_update_time(env, pobj, pattr, la, handle);
4862         mdd_write_unlock(env, pobj);
4863         if (rc)
4864                 RETURN(rc);
4865
4866         /* FID changes, record it as CL_MIGRATE */
4867         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
4868                                     mdd_object_fid(pobj), mdd_object_fid(obj),
4869                                     mdd_object_fid(pobj), lname, lname, handle);
4870         RETURN(rc);
4871 }
4872
4873 int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
4874                          struct md_layout_change *mlc)
4875 {
4876         struct mdd_thread_info *info = mdd_env_info(env);
4877         struct mdd_device *mdd = mdo2mdd(o);
4878         struct mdd_object *obj = md2mdd_obj(o);
4879         struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
4880         struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
4881         struct dt_allocation_hint *hint = &info->mdi_hint;
4882         bool is_plain = false;
4883         struct thandle *handle;
4884         int rc;
4885
4886         ENTRY;
4887
4888         LASSERT(S_ISDIR(mdd_object_type(obj)));
4889
4890         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
4891         if (rc == -ENODATA)
4892                 is_plain = true;
4893         else if (rc < 0)
4894                 RETURN(rc);
4895
4896         handle = mdd_trans_create(env, mdd);
4897         if (IS_ERR(handle))
4898                 RETURN(PTR_ERR(handle));
4899
4900         if (is_plain) {
4901                 rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj, mlc,
4902                                                  hint, handle);
4903         } else {
4904                 mlc->mlc_opc = MD_LAYOUT_SPLIT;
4905                 rc = mdo_declare_layout_change(env, obj, mlc, handle);
4906                 if (rc)
4907                         GOTO(stop_trans, rc);
4908
4909                 rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
4910                                                  NULL, handle);
4911         }
4912         if (rc)
4913                 GOTO(stop_trans, rc);
4914
4915         rc = mdd_trans_start(env, mdd, handle);
4916         if (rc)
4917                 GOTO(stop_trans, rc);
4918
4919         if (is_plain) {
4920                 rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, mlc, hint,
4921                                          handle);
4922         } else {
4923                 mdd_write_lock(env, obj, DT_TGT_CHILD);
4924                 rc = mdo_xattr_set(env, obj, NULL, XATTR_NAME_LMV,
4925                                    LU_XATTR_CREATE, handle);
4926                 mdd_write_unlock(env, obj);
4927                 if (rc)
4928                         GOTO(stop_trans, rc);
4929
4930                 rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
4931                                                     XATTR_NAME_LMV, handle);
4932         }
4933         if (rc)
4934                 GOTO(stop_trans, rc);
4935
4936         EXIT;
4937
4938 stop_trans:
4939         rc = mdd_trans_stop(env, mdd, rc, handle);
4940
4941         return rc;
4942 }
4943
4944 const struct md_dir_operations mdd_dir_ops = {
4945         .mdo_is_subdir     = mdd_is_subdir,
4946         .mdo_lookup        = mdd_lookup,
4947         .mdo_create        = mdd_create,
4948         .mdo_rename        = mdd_rename,
4949         .mdo_link          = mdd_link,
4950         .mdo_unlink        = mdd_unlink,
4951         .mdo_create_data   = mdd_create_data,
4952         .mdo_migrate       = mdd_migrate,
4953 };