Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Lustre Metadata Server (mdd) routines
14  *
15  * Author: Wang Di <wangdi@intel.com>
16  */
17
18 #define DEBUG_SUBSYSTEM S_MDS
19
20 #include <obd_class.h>
21 #include <obd_support.h>
22 #include <lustre_mds.h>
23 #include <lustre_fid.h>
24 #include <lustre_lmv.h>
25 #include <lustre_idmap.h>
26 #include <lustre_crypto.h>
27 #include <uapi/linux/lustre/lgss.h>
28
29 #include "mdd_internal.h"
30
31 static const char dot[] = ".";
32 static const char dotdot[] = "..";
33
34 static struct lu_name lname_dotdot = {
35         .ln_name        = (char *) dotdot,
36         .ln_namelen     = sizeof(dotdot) - 1,
37 };
38
39 static inline int
40 mdd_name_check(const struct lu_env *env, struct mdd_device *m,
41                const struct lu_name *ln)
42 {
43         struct mdd_thread_info *info = mdd_env_info(env);
44         bool enc = info->mdi_pattr.la_valid & LA_FLAGS &&
45                 info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
46
47         if (!lu_name_is_valid(ln))
48                 return -EINVAL;
49         else if (!enc && ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
50                 return -ENAMETOOLONG;
51         else
52                 return 0;
53 }
54
55 /* Get FID from name and parent */
56 static int
57 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
58              const struct lu_attr *pattr, const struct lu_name *lname,
59              struct lu_fid *fid, unsigned int may_mask)
60 {
61         const char *name = lname->ln_name;
62         const struct dt_key *key = (const struct dt_key *)name;
63         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
64         struct dt_object *dir = mdd_object_child(mdd_obj);
65         int rc;
66
67         ENTRY;
68
69         if (unlikely(mdd_is_dead_obj(mdd_obj)))
70                 RETURN(-ESTALE);
71
72         if (!mdd_object_exists(mdd_obj))
73                 RETURN(-ESTALE);
74
75         if (mdd_object_remote(mdd_obj)) {
76                 CDEBUG(D_INFO, "%s: Object "DFID" located on remote server\n",
77                        mdd_obj_dev_name(mdd_obj),
78                        PFID(mdd_object_fid(mdd_obj)));
79         }
80
81         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, may_mask,
82                                             DT_TGT_PARENT);
83         if (rc)
84                 RETURN(rc);
85
86         if (likely(dt_try_as_dir(env, dir, true)))
87                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
88         else
89                 rc = -ENOTDIR;
90
91         RETURN(rc);
92 }
93
94 int mdd_lookup(const struct lu_env *env,
95                struct md_object *pobj, const struct lu_name *lname,
96                struct lu_fid *fid, struct md_op_spec *spec)
97 {
98         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
99         int rc;
100
101         ENTRY;
102
103         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
104         if (rc != 0)
105                 RETURN(rc);
106
107         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
108                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
109         RETURN(rc);
110 }
111
112 /** Read the link EA into a temp buffer.
113  * Uses the mdd_thread_info::mdi_link_buf since it is generally large.
114  * A pointer to the buffer is stored in \a ldata::ld_buf.
115  *
116  * \retval 0 or error
117  */
118 static int __mdd_links_read(const struct lu_env *env,
119                             struct mdd_object *mdd_obj,
120                             struct linkea_data *ldata)
121 {
122         int rc;
123
124         if (!mdd_object_exists(mdd_obj))
125                 return -ENODATA;
126
127         /* First try a small buf */
128         LASSERT(env != NULL);
129         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_link_buf,
130                                                PAGE_SIZE);
131         if (ldata->ld_buf->lb_buf == NULL)
132                 return -ENOMEM;
133
134         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
135         if (rc == -ERANGE) {
136                 /* Buf was too small, figure out what we need. */
137                 lu_buf_free(ldata->ld_buf);
138                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
139                                    XATTR_NAME_LINK);
140                 if (rc < 0)
141                         return rc;
142                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
143                 if (ldata->ld_buf->lb_buf == NULL)
144                         return -ENOMEM;
145                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
146                                   XATTR_NAME_LINK);
147         }
148         if (rc < 0) {
149                 lu_buf_free(ldata->ld_buf);
150                 ldata->ld_buf = NULL;
151                 return rc;
152         }
153
154         return linkea_init(ldata);
155 }
156
157 int mdd_links_read(const struct lu_env *env,
158                    struct mdd_object *mdd_obj,
159                    struct linkea_data *ldata)
160 {
161         int rc;
162
163         rc = __mdd_links_read(env, mdd_obj, ldata);
164         if (!rc)
165                 rc = linkea_init(ldata);
166
167         return rc;
168 }
169
170 static int mdd_links_read_with_rec(const struct lu_env *env,
171                                    struct mdd_object *mdd_obj,
172                                    struct linkea_data *ldata)
173 {
174         int rc;
175
176         rc = __mdd_links_read(env, mdd_obj, ldata);
177         if (!rc)
178                 rc = linkea_init_with_rec(ldata);
179
180         return rc;
181 }
182
183 /**
184  * Get parent FID of the directory
185  *
186  * Read parent FID from linkEA, if that fails, then do lookup
187  * dotdot to get the parent FID.
188  *
189  * \param[in] env       execution environment
190  * \param[in] obj       object from which to find the parent FID
191  * \param[in] attr      attribute of the object
192  * \param[out] fid      fid to get the parent FID
193  *
194  * \retval              0 if getting the parent FID succeeds.
195  * \retval              negative errno if getting the parent FID fails.
196  */
197 static inline int mdd_parent_fid(const struct lu_env *env,
198                                  struct mdd_object *obj,
199                                  const struct lu_attr *attr,
200                                  struct lu_fid *fid)
201 {
202         struct mdd_thread_info *info = mdd_env_info(env);
203         struct linkea_data ldata = { NULL };
204         struct lu_buf *buf = &info->mdi_link_buf;
205         struct lu_name lname;
206         int rc = 0;
207
208         ENTRY;
209
210         LASSERTF(S_ISDIR(mdd_object_type(obj)),
211                  "%s: FID "DFID" is not a directory type = %o\n",
212                  mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
213                  mdd_object_type(obj));
214
215         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
216         if (buf->lb_buf == NULL)
217                 GOTO(lookup, rc = 0);
218
219         ldata.ld_buf = buf;
220         rc = mdd_links_read_with_rec(env, obj, &ldata);
221         if (rc != 0)
222                 GOTO(lookup, rc);
223
224         /* the obj is not locked, don't cache attributes */
225         mdd_invalidate(env, &obj->mod_obj);
226
227         LASSERT(ldata.ld_leh != NULL);
228         /* Directory should only have 1 parent */
229         if (ldata.ld_leh->leh_reccount > 1)
230                 GOTO(lookup, rc);
231
232         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
233
234         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
235         if (likely(fid_is_sane(fid)))
236                 RETURN(0);
237 lookup:
238         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
239         RETURN(rc);
240 }
241
242 /*
243  * For root fid use special function, which does not compare version component
244  * of fid. Version component is different for root fids on all MDTs.
245  */
246 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
247 {
248         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
249                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
250 }
251
252 /*
253  * return 1: if \a tfid is the fid of the ancestor of \a mo;
254  * return 0: if not;
255  * otherwise: values < 0, errors.
256  */
257 static int mdd_is_parent(const struct lu_env *env,
258                         struct mdd_device *mdd,
259                         struct mdd_object *mo,
260                         const struct lu_attr *attr,
261                         const struct lu_fid *tfid)
262 {
263         struct mdd_object *mp;
264         struct lu_fid *pfid;
265         int rc;
266
267         LASSERT(!lu_fid_eq(mdd_object_fid(mo), tfid));
268         pfid = &mdd_env_info(env)->mdi_fid;
269
270         if (mdd_is_root(mdd, mdd_object_fid(mo)))
271                 return 0;
272
273         if (mdd_is_root(mdd, tfid))
274                 return 1;
275
276         rc = mdd_parent_fid(env, mo, attr, pfid);
277         if (rc)
278                 return rc;
279
280         while (1) {
281                 if (lu_fid_eq(pfid, tfid))
282                         return 1;
283
284                 if (mdd_is_root(mdd, pfid))
285                         return 0;
286
287                 mp = mdd_object_find(env, mdd, pfid);
288                 if (IS_ERR(mp))
289                         return PTR_ERR(mp);
290
291                 if (!mdd_object_exists(mp)) {
292                         mdd_object_put(env, mp);
293                         return -ENOENT;
294                 }
295
296                 rc = mdd_parent_fid(env, mp, attr, pfid);
297                 mdd_object_put(env, mp);
298                 if (rc)
299                         return rc;
300         }
301
302         return 0;
303 }
304
305 /*
306  * No permission check is needed.
307  *
308  * returns 1: if fid is ancestor of @mo;
309  * returns 0: if fid is not an ancestor of @mo;
310  * returns < 0: if error
311  */
312 static int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
313                          const struct lu_fid *fid)
314 {
315         struct mdd_device *mdd = mdo2mdd(mo);
316         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
317         int rc;
318
319         ENTRY;
320
321         if (!mdd_object_exists(md2mdd_obj(mo)))
322                 RETURN(-ENOENT);
323
324         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
325                 RETURN(-ENOTDIR);
326
327         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
328         if (rc != 0)
329                 RETURN(rc);
330
331         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
332         RETURN(rc);
333 }
334
335 /*
336  * Check that @dir contains no entries except (possibly) dot and dotdot.
337  *
338  * Returns:
339  *
340  *             0        empty
341  *      -ENOTDIR        not a directory object
342  *    -ENOTEMPTY        not empty
343  *           -ve        other error
344  *
345  */
346 int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir)
347 {
348         struct dt_it     *it;
349         struct dt_object *obj;
350         const struct dt_it_ops *iops;
351         int result;
352
353         ENTRY;
354
355         obj = mdd_object_child(dir);
356         if (!dt_try_as_dir(env, obj, true))
357                 RETURN(-ENOTDIR);
358
359         iops = &obj->do_index_ops->dio_it;
360         it = iops->init(env, obj, LUDA_64BITHASH);
361         if (!IS_ERR(it)) {
362                 result = iops->get(env, it, (const struct dt_key *)"");
363                 if (result > 0) {
364                         int i;
365
366                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
367                                 result = iops->next(env, it);
368                         if (result == 0)
369                                 result = -ENOTEMPTY;
370                         else if (result == 1)
371                                 result = 0;
372                 } else if (result == 0)
373                         /*
374                          * Huh? Index contains no zero key?
375                          */
376                         result = -EIO;
377
378                 iops->put(env, it);
379                 iops->fini(env, it);
380         } else {
381                 result = PTR_ERR(it);
382                 /* -ENODEV means no valid stripe */
383                 if (result == -ENODEV)
384                         RETURN(0);
385         }
386         RETURN(result);
387 }
388
389 /*
390  * Determine if the target object can be hard linked, and right now it only
391  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
392  * directory nlink count might exceed the maximum link count(see
393  * osd_object_ref_add), so it only check nlink for non-directories.
394  *
395  * \param[in] env       thread environment
396  * \param[in] obj       object being linked to
397  * \param[in] la        attributes of \a obj
398  *
399  * \retval              0 if \a obj can be hard linked
400  * \retval              negative error if \a obj is a directory or has too
401  *                      many links
402  */
403 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
404                           const struct lu_attr *la)
405 {
406         struct mdd_device *m = mdd_obj2mdd_dev(obj);
407
408         ENTRY;
409
410         LASSERT(la != NULL);
411
412         /* Subdir count limitation can be broken through
413          * (see osd_object_ref_add), so only check non-directory here.
414          */
415         if (!S_ISDIR(la->la_mode) &&
416             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
417                 RETURN(-EMLINK);
418
419         RETURN(0);
420 }
421
422 /**
423  * Check whether it may create the cobj under the pobj.
424  *
425  * \param[in] env       execution environment
426  * \param[in] pobj      the parent directory
427  * \param[in] pattr     the attribute of the parent directory
428  * \param[in] cobj      the child to be created
429  * \param[in] check_perm        if check WRITE|EXEC permission for parent
430  *
431  * \retval              = 0 create the child under this dir is allowed
432  * \retval              negative errno create the child under this dir is
433  *                      not allowed
434  */
435 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
436                    const struct lu_attr *pattr, struct mdd_object *cobj,
437                    bool check_perm)
438 {
439         int rc = 0;
440
441         ENTRY;
442
443         if (cobj && mdd_object_exists(cobj))
444                 RETURN(-EEXIST);
445
446         if (mdd_is_dead_obj(pobj))
447                 RETURN(-ENOENT);
448
449         if (check_perm)
450                 rc = mdd_permission_internal_locked(env, pobj, pattr,
451                                                     MAY_WRITE | MAY_EXEC,
452                                                     DT_TGT_PARENT);
453         RETURN(rc);
454 }
455
456 /* Check whether can unlink from the pobj in the case of "cobj == NULL". */
457 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
458                    const struct lu_attr *pattr, const struct lu_attr *attr)
459 {
460         int rc;
461
462         ENTRY;
463
464         if (mdd_is_dead_obj(pobj))
465                 RETURN(-ENOENT);
466
467         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
468                 RETURN(-EPERM);
469
470         rc = mdd_permission_internal_locked(env, pobj, pattr,
471                                             MAY_WRITE | MAY_EXEC,
472                                             DT_TGT_PARENT);
473         if (rc != 0)
474                 RETURN(rc);
475
476         if (pattr->la_flags & LUSTRE_APPEND_FL)
477                 RETURN(-EPERM);
478
479         RETURN(rc);
480 }
481
482 /* pobj == NULL is remote ops case, under such case, pobj's
483  * VTX feature has been checked already, no need check again.
484  */
485 static inline int mdd_is_sticky(const struct lu_env *env,
486                                 struct mdd_object *pobj,
487                                 const struct lu_attr *pattr,
488                                 struct mdd_object *cobj,
489                                 const struct lu_attr *cattr)
490 {
491         struct lu_ucred *uc = lu_ucred_assert(env);
492
493         if (pobj != NULL) {
494                 LASSERT(pattr != NULL);
495                 if (!(pattr->la_mode & S_ISVTX) ||
496                     (pattr->la_uid == uc->uc_fsuid))
497                         return 0;
498         }
499
500         LASSERT(cattr != NULL);
501         if (cattr->la_uid == uc->uc_fsuid)
502                 return 0;
503
504         return !cap_raised(uc->uc_cap, CAP_FOWNER);
505 }
506
507 static int mdd_may_delete_entry(const struct lu_env *env,
508                                 struct mdd_object *pobj,
509                                 const struct lu_attr *pattr,
510                                 int check_perm)
511 {
512         ENTRY;
513
514         LASSERT(pobj != NULL);
515         if (!mdd_object_exists(pobj))
516                 RETURN(-ENOENT);
517
518         if (mdd_is_dead_obj(pobj))
519                 RETURN(-ENOENT);
520
521         if (check_perm) {
522                 int rc;
523
524                 rc = mdd_permission_internal_locked(env, pobj, pattr,
525                                             MAY_WRITE | MAY_EXEC,
526                                             DT_TGT_PARENT);
527                 if (rc)
528                         RETURN(rc);
529         }
530
531         if (pattr->la_flags & LUSTRE_APPEND_FL)
532                 RETURN(-EPERM);
533
534         RETURN(0);
535 }
536
537 /*
538  * Check whether it may delete the cobj from the pobj.
539  * pobj maybe NULL
540  */
541 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
542                    const struct lu_attr *tpattr, struct mdd_object *tobj,
543                    const struct lu_attr *tattr, const struct lu_attr *cattr,
544                    int check_perm, int check_empty)
545 {
546         int rc = 0;
547
548         ENTRY;
549
550         if (tpobj) {
551                 LASSERT(tpattr != NULL);
552                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
553                 if (rc != 0)
554                         RETURN(rc);
555         }
556
557         if (tobj == NULL)
558                 RETURN(0);
559
560         if (!mdd_object_exists(tobj))
561                 RETURN(-ENOENT);
562
563         if (mdd_is_dead_obj(tobj))
564                 RETURN(-ESTALE);
565
566         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
567                 RETURN(-EPERM);
568
569         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
570                 RETURN(-EPERM);
571
572         /* additional check the rename case */
573         if (cattr) {
574                 if (S_ISDIR(cattr->la_mode)) {
575                         if (!S_ISDIR(tattr->la_mode))
576                                 RETURN(-ENOTDIR);
577
578                         if (mdd_is_root(mdo2mdd(&tobj->mod_obj),
579                                         mdd_object_fid(tobj)))
580                                 RETURN(-EBUSY);
581                 } else if (S_ISDIR(tattr->la_mode))
582                         RETURN(-EISDIR);
583         }
584
585         if (S_ISDIR(tattr->la_mode) && check_empty)
586                 rc = mdd_dir_is_empty(env, tobj);
587
588         RETURN(rc);
589 }
590
591 /**
592  * Check whether it can create the link file(linked to @src_obj) under
593  * the target directory(@tgt_obj), and src_obj has been locked by
594  * mdd_write_lock.
595  *
596  * \param[in] env       execution environment
597  * \param[in] tgt_obj   the target directory
598  * \param[in] tattr     attributes of target directory
599  * \param[in] lname     the link name
600  * \param[in] src_obj   source object for link
601  * \param[in] cattr     attributes for source object
602  *
603  * \retval              = 0 it is allowed to create the link file under tgt_obj
604  * \retval              negative error not allowed to create the link file
605  */
606 static int mdd_link_sanity_check(const struct lu_env *env,
607                                  struct mdd_object *tgt_obj,
608                                  const struct lu_attr *tattr,
609                                  const struct lu_name *lname,
610                                  struct mdd_object *src_obj,
611                                  const struct lu_attr *cattr)
612 {
613         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
614         int rc = 0;
615
616         ENTRY;
617
618         if (!mdd_object_exists(src_obj))
619                 RETURN(-ENOENT);
620
621         if (mdd_is_dead_obj(src_obj))
622                 RETURN(-ESTALE);
623
624         /* Local ops, no lookup before link, check filename length here. */
625         rc = mdd_name_check(env, m, lname);
626         if (rc < 0)
627                 RETURN(rc);
628
629         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
630                 RETURN(-EPERM);
631
632         if (S_ISDIR(mdd_object_type(src_obj)))
633                 RETURN(-EPERM);
634
635         LASSERT(src_obj != tgt_obj);
636         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
637         if (rc != 0)
638                 RETURN(rc);
639
640         rc = __mdd_may_link(env, src_obj, cattr);
641
642         RETURN(rc);
643 }
644
645 static int __mdd_index_delete_only(const struct lu_env *env,
646                                    struct mdd_object *pobj,
647                                    const char *name, struct thandle *handle)
648 {
649         struct dt_object *next = mdd_object_child(pobj);
650         int rc;
651
652         ENTRY;
653
654         if (dt_try_as_dir(env, next, true))
655                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
656         else
657                 rc = -ENOTDIR;
658
659         RETURN(rc);
660 }
661
662 static int __mdd_index_insert_only(const struct lu_env *env,
663                                    struct mdd_object *pobj,
664                                    const struct lu_fid *lf, __u32 type,
665                                    const char *name, struct thandle *handle)
666 {
667         struct dt_object *next = mdd_object_child(pobj);
668         int rc;
669
670         ENTRY;
671
672         if (dt_try_as_dir(env, next, true)) {
673                 struct dt_insert_rec *rec = &mdd_env_info(env)->mdi_dt_rec;
674
675                 rec->rec_fid = lf;
676                 rec->rec_type = type;
677                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
678                                (const struct dt_key *)name, handle);
679         } else {
680                 rc = -ENOTDIR;
681         }
682         RETURN(rc);
683 }
684
685 /* insert named index, add reference if isdir */
686 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
687                               const struct lu_fid *lf, __u32 type,
688                               const char *name, struct thandle *handle)
689 {
690         int rc;
691
692         ENTRY;
693
694         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
695         if (rc == 0 && S_ISDIR(type)) {
696                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
697                 mdo_ref_add(env, pobj, handle);
698                 mdd_write_unlock(env, pobj);
699         }
700
701         RETURN(rc);
702 }
703
704 /* delete named index, drop reference if isdir */
705 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
706                               const char *name, int is_dir,
707                               struct thandle *handle)
708 {
709         int rc;
710
711         ENTRY;
712
713         rc = __mdd_index_delete_only(env, pobj, name, handle);
714         if (rc == 0 && is_dir) {
715                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
716                 mdo_ref_del(env, pobj, handle);
717                 mdd_write_unlock(env, pobj);
718         }
719
720         RETURN(rc);
721 }
722
723 static int mdd_llog_record_calc_size(const struct lu_env *env,
724                                      const struct lu_name *tname,
725                                      const struct lu_name *sname)
726 {
727         const struct lu_ucred   *uc = lu_ucred(env);
728         enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
729         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
730
731         if (sname != NULL)
732                 clf_flags |= CLF_RENAME;
733
734         if (uc != NULL && uc->uc_jobid[0] != '\0')
735                 clf_flags |= CLF_JOBID;
736
737         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
738                              changelog_rec_offset(clf_flags, crfe) +
739                              (tname != NULL ? tname->ln_namelen : 0) +
740                              (sname != NULL ? 1 + sname->ln_namelen : 0));
741 }
742
743 int mdd_declare_changelog_store(const struct lu_env *env,
744                                 struct mdd_device *mdd,
745                                 enum changelog_rec_type type,
746                                 const struct lu_name *tname,
747                                 const struct lu_name *sname,
748                                 struct thandle *handle)
749 {
750         struct obd_device *obd = mdd2obd_dev(mdd);
751         struct llog_ctxt *ctxt;
752         struct llog_rec_hdr rec_hdr;
753         struct thandle *llog_th;
754         int rc;
755
756         if (!mdd_changelog_enabled(env, mdd, type))
757                 return 0;
758
759         rec_hdr.lrh_len = mdd_llog_record_calc_size(env, tname, sname);
760         rec_hdr.lrh_type = CHANGELOG_REC;
761
762         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
763         if (ctxt == NULL)
764                 return -ENXIO;
765
766         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
767         if (IS_ERR(llog_th))
768                 GOTO(out_put, rc = PTR_ERR(llog_th));
769
770         rc = llog_declare_add(env, ctxt->loc_handle, &rec_hdr, llog_th);
771
772 out_put:
773         llog_ctxt_put(ctxt);
774
775         return rc;
776 }
777
778 /* The locking here is a bit tricky. For a CHANGELOG_REC the function
779  * drops loghandle->lgh_lock for a performance reasons. All dt_write()
780  * are used own offset, so it is safe.
781  * For other records general function is called and it doesnot drop
782  * a semaphore. The callers are changelog catalog records and initialisation
783  * records. llog_cat_new_log->llog_write_rec->mdd_changelog_write_rec()
784  *
785  * Since dt_record_write() could be reordered, rec1|rec2|0x0|rec4 could be
786  * at memory, reader should care about it. When the th is commited it is
787  * impossible to have a hole, since reordered records have the same th.
788  */
789 int mdd_changelog_write_rec(const struct lu_env *env,
790                             struct llog_handle *loghandle,
791                             struct llog_rec_hdr *r,
792                             struct llog_cookie *cookie,
793                             int idx, struct thandle *th)
794 {
795         int rc;
796         static struct thandle *saved_th;
797
798         CDEBUG(D_TRACE, "Adding rec %u type %u to "DFID" flags %x count %d\n",
799                idx, r->lrh_type, PLOGID(&loghandle->lgh_id),
800                loghandle->lgh_hdr->llh_flags, loghandle->lgh_hdr->llh_count);
801
802         if (r->lrh_type == CHANGELOG_REC) {
803                 struct mdd_device *mdd;
804                 struct llog_changelog_rec *rec;
805                 size_t left;
806                 __u32 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
807                 struct dt_object *o = loghandle->lgh_obj;
808                 loff_t offset;
809                 struct lu_buf lgi_buf;
810
811                 left = chunk_size - (loghandle->lgh_cur_offset &
812                                      (chunk_size - 1));
813
814                 mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
815                 rec = container_of(r, struct llog_changelog_rec, cr_hdr);
816
817                 /* Don't use padding records because it require a slot at header
818                  * so previous result of checking llog_is_full(loghandle)
819                  * would be invalid, leave zeroes at the end of block.
820                  * A reader would care about it.
821                  */
822                 if (left != 0 && left < r->lrh_len)
823                         loghandle->lgh_cur_offset += left;
824
825                 offset = loghandle->lgh_cur_offset;
826                 loghandle->lgh_cur_offset += r->lrh_len;
827                 r->lrh_index = ++loghandle->lgh_last_idx;
828
829                 spin_lock(&mdd->mdd_cl.mc_lock);
830                 rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
831                 spin_unlock(&mdd->mdd_cl.mc_lock);
832
833                 /* drop the loghandle semaphore for parallel writes */
834                 up_write(&loghandle->lgh_lock);
835
836                 REC_TAIL(r)->lrt_len = r->lrh_len;
837                 REC_TAIL(r)->lrt_index = r->lrh_index;
838
839                 lgi_buf.lb_len = rec->cr_hdr.lrh_len;
840                 lgi_buf.lb_buf = rec;
841
842                 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_CHANGELOG_FAIL_WRITE) &&
843                     (rec->cr.cr_index % (cfs_fail_val + 1)) == 0)
844                         rc = -EIO;
845                 else
846                         rc = dt_record_write(env, o, &lgi_buf, &offset, th);
847
848                 if (rc) {
849                         CERROR("%s: failed to write changelog record file "DFID" rec idx %u off %llu chnlg idx %llu: rc = %d\n",
850                                loghandle->lgh_ctxt->loc_obd->obd_name,
851                                PFID(lu_object_fid(&o->do_lu)), r->lrh_index,
852                                offset, rec->cr.cr_index, rc);
853                         return rc;
854                 }
855
856                 /* mark index at bitmap after successful write, increment count,
857                  * and lrt_index with a last index. Use a lgh_hdr_lock for
858                  * a synchronization with llog_cancel.
859                  */
860                 spin_lock(&loghandle->lgh_hdr_lock);
861                 rc = __test_and_set_bit_le(r->lrh_index,
862                                            LLOG_HDR_BITMAP(loghandle->lgh_hdr));
863                 LASSERTF(!rc,
864                          "%s: index %u already set in llog bitmap "DFID"\n",
865                          loghandle->lgh_ctxt->loc_obd->obd_name,
866                          r->lrh_index, PLOGID(&loghandle->lgh_id));
867                 loghandle->lgh_hdr->llh_count++;
868                 if (LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index < r->lrh_index)
869                         LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index =
870                                 r->lrh_index;
871                 spin_unlock(&loghandle->lgh_hdr_lock);
872
873                 if (unlikely(th != saved_th)) {
874                         CDEBUG(D_OTHER, "%s: wrote rec %u "DFID" count %d\n",
875                                loghandle->lgh_ctxt->loc_obd->obd_name,
876                                r->lrh_index, PLOGID(&loghandle->lgh_id),
877                                loghandle->lgh_hdr->llh_count);
878                         saved_th = th;
879                 }
880                 lgi_buf.lb_len = loghandle->lgh_hdr_size;
881                 lgi_buf.lb_buf = loghandle->lgh_hdr;
882                 offset = 0;
883                 CDEBUG(D_TRACE, "%s: writing header "DFID"\n",
884                        loghandle->lgh_ctxt->loc_obd->obd_name,
885                        PLOGID(&loghandle->lgh_id));
886                 /* full header write, it is a local. For a mapped bh
887                  * it is memcpy() only. Probably it could be delayed as work.
888                  */
889                 rc = dt_record_write(env, o, &lgi_buf, &offset, th);
890         } else {
891                 rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
892                                                 cookie, idx, th);
893         }
894         if (rc < 0)
895                 CERROR("%s: failed to write changelog record file "DFID" count %d offset %llu: rc = %d\n",
896                        loghandle->lgh_ctxt->loc_obd->obd_name,
897                        PLOGID(&loghandle->lgh_id),
898                        loghandle->lgh_hdr->llh_count, loghandle->lgh_cur_offset,
899                        rc);
900
901         return rc;
902 }
903
904 /**
905  * Checks that changelog consumes safe amount of space comparing
906  * with FS free space
907  *
908  * \param env - current lu_env
909  * \param mdd - current MDD device
910  * \param lgh - changelog catalog llog handle
911  * \param estimate - get exact llog size or estimate it.
912  *
913  * \retval true/false
914  */
915 bool mdd_changelog_is_space_safe(const struct lu_env *env,
916                                  struct mdd_device *mdd,
917                                  struct llog_handle *lgh,
918                                  bool estimate)
919 {
920         struct obd_statfs sfs;
921         unsigned long long free_space_limit;
922         unsigned long long llog_size;
923         int rc;
924
925         rc = dt_statfs(env, mdd->mdd_bottom, &sfs);
926         if (rc)
927                 /* check is ignored if OSD is not healthy for any reason */
928                 return true;
929
930         /* if changelog consumes more than 1/4 of available space then start
931          * emergency cleanup.
932          */
933         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_CHANGELOG_ENOSPC))
934                 free_space_limit = cfs_fail_val;
935         else
936                 free_space_limit = (sfs.os_bfree * sfs.os_bsize) >> 2;
937
938         /* if \estimate parameter is used then calculate llog size from
939          * number of used catalog entries and plain llog maximum size.
940          * Plain llog maximum size if set as 1/64 of FS free space limited
941          * by 128MB as maximum and 2MB as minimum, see llog_cat_new_log()
942          * Estimation helps to avoid full llog processing to get exact size
943          * by llog_cat_size().
944          */
945         if (estimate) {
946                 /* use 1/64 of FS size but keep it between 2MB and 128MB */
947                 llog_size = clamp_t(unsigned long long,
948                                     (sfs.os_blocks * sfs.os_bsize) >> 6,
949                                     2 << 20, 128 << 20);
950                 /* llog_cat_free_space() gives free slots, we need occupied,
951                  * so subtruct free from total slots minus one for header
952                  */
953                 llog_size *= LLOG_HDR_BITMAP_SIZE(lgh->lgh_hdr) - 1 -
954                              llog_cat_free_space(lgh);
955         } else {
956                 /* get exact llog size */
957                 llog_size = llog_cat_size(env, lgh);
958         }
959         CDEBUG(D_HA, "%s:%s changelog size is %lluMB, space limit is %lluMB\n",
960                mdd2obd_dev(mdd)->obd_name, estimate ? " estimated" : "",
961                llog_size >> 20, free_space_limit >> 20);
962
963         if (llog_size > free_space_limit) {
964                 CWARN("%s: changelog uses %lluMB with %lluMB space limit\n",
965                       mdd2obd_dev(mdd)->obd_name, llog_size >> 20,
966                       free_space_limit >> 20);
967                 return false;
968         }
969
970         return true;
971 }
972
973 /**
974  * Checks if there is enough space in changelog itself and in FS and force
975  * emergency changelog cleanup if needed. It will purge users one by one
976  * from the oldest one while emergency conditions are true.
977  *
978  * \param env - current lu_env
979  * \param mdd - current MDD device
980  * \param lgh - changelog catalog llog handle
981  *
982  * \retval true if emergency cleanup is needed for changelog
983  */
984 static bool mdd_changelog_emrg_cleanup(const struct lu_env *env,
985                                        struct mdd_device *mdd,
986                                        struct llog_handle *lgh)
987 {
988         unsigned long free_entries = llog_cat_free_space(lgh);
989
990         /* free space GC is disabled or is in progress already */
991         if (!mdd->mdd_changelog_free_space_gc || mdd->mdd_changelog_emrg_gc)
992                 return false;
993
994         if (free_entries <= mdd->mdd_changelog_min_free_cat_entries) {
995                 CWARN("%s: changelog has only %lu free catalog entries\n",
996                       mdd2obd_dev(mdd)->obd_name, free_entries);
997                 mdd->mdd_changelog_emrg_gc = true;
998                 return true;
999         }
1000
1001         if (!mdd_changelog_is_space_safe(env, mdd, lgh, true)) {
1002                 mdd->mdd_changelog_emrg_gc = true;
1003                 return true;
1004         }
1005
1006         return false;
1007 }
1008
1009 static bool mdd_changelog_need_gc(const struct lu_env *env,
1010                                   struct mdd_device *mdd,
1011                                   struct llog_handle *lgh)
1012 {
1013         struct mdd_changelog *mc = &mdd->mdd_cl;
1014
1015         return mdd_changelog_emrg_cleanup(env, mdd, lgh) ||
1016                mdd_changelog_is_too_idle(mdd, mc->mc_minrec, mc->mc_mintime) ||
1017                CFS_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD);
1018 }
1019
1020 /** Add a changelog entry \a rec to the changelog llog
1021  * \param mdd
1022  * \param rec
1023  * \param handle - currently ignored since llogs start their own transaction;
1024  *              this will hopefully be fixed in llog rewrite
1025  * \retval 0 ok
1026  */
1027 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
1028                         struct llog_changelog_rec *rec, struct thandle *th)
1029 {
1030         struct obd_device *obd = mdd2obd_dev(mdd);
1031         struct llog_ctxt *ctxt;
1032         struct thandle *llog_th;
1033         int rc;
1034         bool need_gc;
1035
1036         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
1037                                             changelog_rec_varsize(&rec->cr));
1038
1039         /* llog_lvfs_write_rec sets the llog tail len */
1040         rec->cr_hdr.lrh_type = CHANGELOG_REC;
1041         rec->cr.cr_time = cl_time();
1042
1043         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
1044         if (ctxt == NULL)
1045                 return -ENXIO;
1046
1047         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
1048         if (IS_ERR(llog_th))
1049                 GOTO(out_put, rc = PTR_ERR(llog_th));
1050
1051         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
1052         /* nested journal transaction */
1053         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
1054
1055         /* time to recover some space ?? */
1056         if (likely(!mdd->mdd_changelog_gc ||
1057                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
1058                    mdd->mdd_changelog_min_gc_interval >=
1059                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
1060                 /* save a spin_lock trip */
1061                 goto out_put;
1062
1063         if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_IDX_PUMP)) {
1064                 spin_lock(&mdd->mdd_cl.mc_lock);
1065                 mdd->mdd_cl.mc_index += cfs_fail_val;
1066                 spin_unlock(&mdd->mdd_cl.mc_lock);
1067         }
1068
1069         need_gc = mdd_changelog_need_gc(env, mdd, ctxt->loc_handle);
1070         spin_lock(&mdd->mdd_cl.mc_lock);
1071         if (likely(mdd->mdd_changelog_gc &&
1072                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
1073                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
1074                         mdd->mdd_changelog_min_gc_interval)) {
1075                 if (unlikely(need_gc)) {
1076                         CWARN("%s: %s starting changelog garbage collection\n",
1077                               obd->obd_name,
1078                               CFS_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
1079                               " simulate" : "");
1080                         /* indicate further kthread run will occur outside
1081                          * right after current journal transaction filling has
1082                          * completed
1083                          */
1084                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
1085                 }
1086                 /* next check in mdd_changelog_min_gc_interval anyway */
1087                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
1088         }
1089         spin_unlock(&mdd->mdd_cl.mc_lock);
1090 out_put:
1091         llog_ctxt_put(ctxt);
1092         if (rc > 0)
1093                 rc = 0;
1094         return rc;
1095 }
1096
1097 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
1098                                          const struct lu_fid *sfid,
1099                                          const struct lu_fid *spfid,
1100                                          const struct lu_name *sname)
1101 {
1102         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
1103         size_t extsize;
1104
1105         LASSERT(sfid != NULL);
1106         LASSERT(spfid != NULL);
1107         LASSERT(sname != NULL);
1108
1109         extsize = sname->ln_namelen + 1;
1110
1111         rnm->cr_sfid = *sfid;
1112         rnm->cr_spfid = *spfid;
1113
1114         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
1115         strscpy(changelog_rec_sname(rec), sname->ln_name, extsize);
1116         rec->cr_namelen += extsize;
1117 }
1118
1119 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
1120 {
1121         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
1122
1123         if (jobid == NULL || jobid[0] == '\0')
1124                 return;
1125
1126         strscpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
1127 }
1128
1129 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
1130 {
1131         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
1132
1133         ef->cr_extra_flags = eflags;
1134 }
1135
1136 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
1137                                     __u64 uid, __u64 gid)
1138 {
1139         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
1140
1141         uidgid->cr_uid = uid;
1142         uidgid->cr_gid = gid;
1143 }
1144
1145 /* To support the new large NID structure we use all the space in
1146  * struct changelog_ext_nid to store struct lnet_nid.
1147  */
1148 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
1149                                  const struct lnet_nid *nid)
1150 {
1151         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
1152
1153         BUILD_BUG_ON(sizeof(*clnid) < sizeof(*nid));
1154         memcpy(clnid, nid, sizeof(*nid));
1155 }
1156
1157 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
1158 {
1159         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
1160
1161         omd->cr_openflags = flags;
1162 }
1163
1164 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
1165                                    const char *xattr_name)
1166 {
1167         struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
1168
1169         strscpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
1170 }
1171
1172 /**
1173  * Set the parent FID at \a pfid for a namespace change changelog record, using
1174  * XATTR_NAME_LMV and linkEA from the remote object to obtain the correct
1175  * parent FID for striped directories
1176  *
1177  * \param[in] env - environment
1178  * \param[in] mdd - mdd device
1179  * \param[in] parent - parent object
1180  * \param[in] pattr - parent attribute
1181  * \param[out] pfid - parent fid
1182  *
1183  * \retval 0 success
1184  * \retval -errno failure
1185  */
1186 static int mdd_changelog_ns_pfid_set(const struct lu_env *env,
1187                                      struct mdd_device *mdd,
1188                                      struct mdd_object *parent,
1189                                      const struct lu_attr *pattr,
1190                                      struct lu_fid *pfid)
1191 {
1192         int rc = 0;
1193
1194         /* Certain userspace tools might rely on the previous behavior of
1195          * displaying the shard's parent FID, on some changelog records related
1196          * to striped directories, so use that for compatibility if needed
1197          */
1198         if (mdd->mdd_cl.mc_enable_shard_pfid) {
1199                 *pfid = *mdd_object_fid(parent);
1200                 return 0;
1201         }
1202
1203         if (!fid_is_zero(&parent->mod_striped_pfid)) {
1204                 *pfid = parent->mod_striped_pfid;
1205                 return 0;
1206         }
1207
1208         /* is the parent dir striped? */
1209         rc = mdo_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LMV);
1210         if (rc == -ENODATA) {
1211                 *pfid = *mdd_object_fid(parent);
1212                 parent->mod_striped_pfid = *pfid;
1213                 return 0;
1214         }
1215
1216         if (rc < 0)
1217                 return rc;
1218
1219         LASSERT(!mdd_is_root(mdo2mdd(&parent->mod_obj),
1220                              mdd_object_fid(parent)));
1221
1222         /* hide shard FID */
1223         rc = mdd_parent_fid(env, parent, pattr, pfid);
1224         if (!rc)
1225                 parent->mod_striped_pfid = *pfid;
1226
1227         return rc;
1228 }
1229
1230 /* The digested form is made of a FID (16 bytes) followed by the second-to-last
1231  * ciphertext block (16 bytes), so a total length of 32 bytes.
1232  */
1233 /* Must be identical to ll_digest_filename in llite_internal.h */
1234 struct changelog_digest_filename {
1235         struct lu_fid   cdf_fid;
1236         char            cdf_excerpt[LL_CRYPTO_BLOCK_SIZE];
1237 };
1238
1239 /**
1240  * Utility function to process filename in changelog
1241  *
1242  * \param[in] name     file name
1243  * \param[in] namelen  file name len
1244  * \param[in] fid      file FID
1245  * \param[in] enc      is object encrypted?
1246  * \param[out]ln       pointer to the struct lu_name to hold the real name
1247  *
1248  * If file is not encrypted, output name is just the file name.
1249  * If file is encrypted, file name needs to be decoded then digested if the name
1250  * is also encrypted. In this case a new buffer is allocated, and ln->ln_name
1251  * needs to be freed by the caller.
1252  *
1253  * \retval   0, on success
1254  * \retval -ve, on error
1255  */
1256 static int changelog_name2digest(const char *name, int namelen,
1257                                  const struct lu_fid *fid,
1258                                  bool enc, struct lu_name *ln)
1259 {
1260         struct changelog_digest_filename *digest = NULL;
1261         char *buf = NULL, *bufout = NULL, *p, *q;
1262         int len, bufoutlen;
1263         int rc = 0;
1264
1265         ENTRY;
1266
1267         ln->ln_name = name;
1268         ln->ln_namelen = namelen;
1269
1270         if (!enc)
1271                 GOTO(out, rc);
1272
1273         /* now we know file is encrypted */
1274         if (strnchr(name, namelen, '=')) {
1275                 /* only proceed to critical decode if
1276                  * encrypted name contains espace char '='
1277                  */
1278                 buf = kmalloc(namelen, GFP_NOFS);
1279                 if (!buf)
1280                         GOTO(out, rc = -ENOMEM);
1281
1282                 namelen = critical_decode(name, namelen, buf);
1283                 ln->ln_name = buf;
1284                 ln->ln_namelen = namelen;
1285         }
1286
1287         p = (char *)ln->ln_name;
1288         len = namelen;
1289         while (len--) {
1290                 if (!isprint(*p++))
1291                         break;
1292         }
1293
1294         /* len == -1 means we went through the whole decoded name without
1295          * finding any non-printable character, so consider it is not encrypted
1296          */
1297         if (len == -1)
1298                 GOTO(out, rc);
1299
1300         /* now we know the name has some non-printable characters */
1301         if (namelen > LL_CRYPTO_BLOCK_SIZE * 2) {
1302                 if (!fid)
1303                         GOTO(out, rc = -EPROTO);
1304
1305                 OBD_ALLOC_PTR(digest);
1306                 if (!digest)
1307                         GOTO(out, rc = -ENOMEM);
1308
1309                 digest->cdf_fid = *fid;
1310                 memcpy(digest->cdf_excerpt,
1311                        LLCRYPT_EXTRACT_DIGEST(ln->ln_name, ln->ln_namelen),
1312                        LL_CRYPTO_BLOCK_SIZE);
1313                 p = (char *)digest;
1314                 len = sizeof(*digest);
1315         } else {
1316                 p = (char *)ln->ln_name;
1317                 len = ln->ln_namelen;
1318         }
1319
1320         bufoutlen = BASE64URL_CHARS(len) + 2;
1321         bufout = kmalloc(digest ? bufoutlen + 1 : bufoutlen, GFP_NOFS);
1322         if (!bufout)
1323                 GOTO(free_digest, rc = -ENOMEM);
1324
1325         q = bufout;
1326         if (digest)
1327                 *q++ = LLCRYPT_DIGESTED_CHAR;
1328         /* beware that gss_base64url_encode adds a trailing space */
1329         gss_base64url_encode(&q, &bufoutlen, (__u8 *)p, len);
1330         if (bufoutlen == -1) {
1331                 kfree(bufout);
1332         } else {
1333                 kfree(buf);
1334                 ln->ln_name = bufout;
1335                 ln->ln_namelen = q - bufout - 1;
1336         }
1337
1338 free_digest:
1339         OBD_FREE_PTR(digest);
1340 out:
1341         RETURN(rc);
1342 }
1343
1344 /** Store a namespace change changelog record
1345  * If this fails, we must fail the whole transaction; we don't
1346  * want the change to commit without the log entry.
1347  * \param target - mdd_object of change
1348  * \param parent - target parent object
1349  * \param pattr - target parent attribute
1350  * \param sfid - source object fid
1351  * \param sparent - source parent object
1352  * \param spattr - source parent attribute
1353  * \param tname - target name string
1354  * \param sname - source name string
1355  * \param handle - transaction handle
1356  */
1357 int mdd_changelog_ns_store(const struct lu_env *env,
1358                            struct mdd_device *mdd,
1359                            enum changelog_rec_type type,
1360                            enum changelog_rec_flags clf_flags,
1361                            struct mdd_object *target,
1362                            struct mdd_object *parent,
1363                            const struct lu_attr *pattr,
1364                            const struct lu_fid *sfid,
1365                            struct mdd_object *sparent,
1366                            const struct lu_attr *spattr,
1367                            const struct lu_name *tname,
1368                            const struct lu_name *sname,
1369                            struct thandle *handle)
1370 {
1371         struct mdd_thread_info *info = mdd_env_info(env);
1372         struct lu_name *ltname = NULL, *lsname = NULL;
1373         const struct lu_ucred *uc = lu_ucred(env);
1374         struct llog_changelog_rec *rec;
1375         __u64 xflags = CLFE_INVALID;
1376         struct lu_fid *tfid = NULL;
1377         struct lu_buf *buf;
1378         int reclen;
1379         bool enc;
1380         int rc = 0;
1381
1382         ENTRY;
1383
1384         if (!mdd_changelog_enabled(env, mdd, type))
1385                 RETURN(0);
1386
1387         LASSERT(S_ISDIR(mdd_object_type(parent)));
1388         LASSERT(tname != NULL);
1389         LASSERT(handle != NULL);
1390
1391         if (tname) {
1392                 OBD_ALLOC_PTR(ltname);
1393                 if (!ltname)
1394                         GOTO(out, rc = -ENOMEM);
1395
1396                 if (sname) {
1397                         enc = info->mdi_tpattr.la_valid & LA_FLAGS &&
1398                                 info->mdi_tpattr.la_flags & LUSTRE_ENCRYPT_FL;
1399                         tfid = (struct lu_fid *)sfid;
1400                 } else {
1401                         enc = info->mdi_pattr.la_valid & LA_FLAGS &&
1402                                 info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
1403                         tfid = (struct lu_fid *)mdd_object_fid(target);
1404                 }
1405                 rc = changelog_name2digest(tname->ln_name, tname->ln_namelen,
1406                                            tfid, enc, ltname);
1407                 if (rc)
1408                         GOTO(out_ltname, rc);
1409         }
1410         if (sname) {
1411                 OBD_ALLOC_PTR(lsname);
1412                 if (!lsname)
1413                         GOTO(out_ltname, rc = -ENOMEM);
1414
1415                 enc = info->mdi_pattr.la_valid & LA_FLAGS &&
1416                         info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
1417                 rc = changelog_name2digest(sname->ln_name, sname->ln_namelen,
1418                                            tfid, enc, lsname);
1419                 if (rc)
1420                         GOTO(out_lsname, rc);
1421         }
1422
1423         reclen = mdd_llog_record_calc_size(env, ltname, lsname);
1424         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_chlg_buf, reclen);
1425         if (buf->lb_buf == NULL)
1426                 GOTO(out_lsname, rc = -ENOMEM);
1427         rec = buf->lb_buf;
1428
1429         clf_flags &= CLF_FLAGMASK;
1430         clf_flags |= CLF_EXTRA_FLAGS;
1431
1432         if (uc) {
1433                 if (uc->uc_jobid[0] != '\0')
1434                         clf_flags |= CLF_JOBID;
1435                 xflags |= CLFE_UIDGID;
1436                 xflags |= CLFE_NID;
1437                 xflags |= CLFE_NID_BE;
1438         }
1439
1440         if (lsname != NULL)
1441                 clf_flags |= CLF_RENAME;
1442         else
1443                 clf_flags |= CLF_VERSION;
1444
1445         rec->cr.cr_flags = clf_flags;
1446
1447         if (clf_flags & CLF_EXTRA_FLAGS) {
1448                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
1449                 if (xflags & CLFE_UIDGID)
1450                         mdd_changelog_rec_extra_uidgid(&rec->cr,
1451                                                        uc->uc_uid, uc->uc_gid);
1452                 if (xflags & CLFE_NID)
1453                         mdd_changelog_rec_extra_nid(&rec->cr, &uc->uc_nid);
1454         }
1455
1456         rec->cr.cr_type = (__u32)type;
1457
1458         rc = mdd_changelog_ns_pfid_set(env, mdd, parent, pattr,
1459                                        &rec->cr.cr_pfid);
1460         if (rc < 0)
1461                 GOTO(out_lsname, rc);
1462
1463         rec->cr.cr_namelen = ltname->ln_namelen;
1464         memcpy(changelog_rec_name(&rec->cr), ltname->ln_name,
1465                ltname->ln_namelen);
1466
1467         if (clf_flags & CLF_RENAME) {
1468                 struct lu_fid spfid;
1469
1470                 rc = mdd_changelog_ns_pfid_set(env, mdd, sparent, spattr,
1471                                                &spfid);
1472                 if (rc < 0)
1473                         GOTO(out_lsname, rc);
1474
1475                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, &spfid, lsname);
1476         }
1477
1478         if (clf_flags & CLF_JOBID)
1479                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1480
1481         if (likely(target != NULL)) {
1482                 rec->cr.cr_tfid = *mdd_object_fid(target);
1483                 target->mod_cltime = ktime_get();
1484         } else {
1485                 fid_zero(&rec->cr.cr_tfid);
1486         }
1487
1488         rc = mdd_changelog_store(env, mdd, rec, handle);
1489         if (rc < 0) {
1490                 CERROR("%s: cannot store changelog record: type = %d, name = '%s', t = "
1491                        DFID", p = "DFID": rc = %d\n",
1492                        mdd2obd_dev(mdd)->obd_name, type, ltname->ln_name,
1493                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1494                 GOTO(out_lsname, rc = -EFAULT);
1495         }
1496
1497 out_lsname:
1498         if (lsname && lsname->ln_name != sname->ln_name)
1499                 kfree(lsname->ln_name);
1500         OBD_FREE_PTR(lsname);
1501 out_ltname:
1502         if (ltname && ltname->ln_name != tname->ln_name)
1503                 kfree(ltname->ln_name);
1504         OBD_FREE_PTR(ltname);
1505 out:
1506         RETURN(rc);
1507 }
1508
1509 static int __mdd_links_add(const struct lu_env *env,
1510                            struct mdd_object *mdd_obj,
1511                            struct linkea_data *ldata,
1512                            const struct lu_name *lname,
1513                            const struct lu_fid *pfid,
1514                            int first, int check)
1515 {
1516         /* cattr is set in mdd_link */
1517         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1518         int rc;
1519
1520         if (ldata->ld_leh == NULL) {
1521                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1522                 if (rc) {
1523                         if (rc != -ENODATA)
1524                                 return rc;
1525                         rc = linkea_data_new(ldata,
1526                                              &mdd_env_info(env)->mdi_link_buf);
1527                         if (rc)
1528                                 return rc;
1529                 }
1530         }
1531
1532         if (check) {
1533                 rc = linkea_links_find(ldata, lname, pfid);
1534                 if (rc && rc != -ENOENT)
1535                         return rc;
1536                 if (rc == 0)
1537                         return -EEXIST;
1538         }
1539
1540         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1541                 struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1542
1543                 *tfid = *pfid;
1544                 tfid->f_ver = ~0;
1545                 linkea_add_buf(ldata, lname, tfid, false);
1546         }
1547
1548         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1549                 linkea_add_buf(ldata, lname, pfid, false);
1550
1551         /* For encrypted file, we want to limit number of hard links to what
1552          * linkEA can contain. So ask to return error in case of overflow.
1553          * Currently linkEA stores 4KiB of links, that is 14 NAME_MAX links,
1554          * or 119 16-byte names.
1555          */
1556         return linkea_add_buf(ldata, lname, pfid,
1557                               cattr->la_valid & LA_FLAGS &&
1558                               cattr->la_flags & LUSTRE_ENCRYPT_FL);
1559 }
1560
1561 static int __mdd_links_del(const struct lu_env *env,
1562                            struct mdd_object *mdd_obj,
1563                            struct linkea_data *ldata,
1564                            const struct lu_name *lname,
1565                            const struct lu_fid *pfid)
1566 {
1567         /* cattr is set in mdd_link */
1568         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1569         int rc;
1570
1571         if (ldata->ld_leh == NULL) {
1572                 rc = mdd_links_read(env, mdd_obj, ldata);
1573                 if (rc)
1574                         return rc;
1575         }
1576
1577         rc = linkea_links_find(ldata, lname, pfid);
1578         if (rc)
1579                 return rc;
1580
1581         linkea_del_buf(ldata, lname,
1582                        cattr->la_valid & LA_FLAGS &&
1583                        cattr->la_flags & LUSTRE_ENCRYPT_FL);
1584         return 0;
1585 }
1586
1587 static int mdd_linkea_prepare(const struct lu_env *env,
1588                               struct mdd_object *mdd_obj,
1589                               const struct lu_fid *oldpfid,
1590                               const struct lu_name *oldlname,
1591                               const struct lu_fid *newpfid,
1592                               const struct lu_name *newlname,
1593                               int first, int check,
1594                               struct linkea_data *ldata)
1595 {
1596         int rc = 0;
1597
1598         ENTRY;
1599
1600         if (CFS_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1601                 RETURN(0);
1602
1603         LASSERT(oldpfid != NULL || newpfid != NULL);
1604
1605         if (mdd_obj->mod_flags & DEAD_OBJ)
1606                 /* Unnecessary to update linkEA for dead object.  */
1607                 RETURN(0);
1608
1609         if (oldpfid != NULL) {
1610                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1611                 if (rc) {
1612                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1613                                 RETURN(rc);
1614
1615                         /* No changes done. */
1616                         rc = 0;
1617                 }
1618         }
1619
1620         /* If renaming, add the new record */
1621         if (newpfid != NULL)
1622                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1623                                      first, check);
1624
1625         RETURN(rc);
1626 }
1627
1628 int mdd_links_rename(const struct lu_env *env,
1629                      struct mdd_object *mdd_obj,
1630                      const struct lu_fid *oldpfid,
1631                      const struct lu_name *oldlname,
1632                      const struct lu_fid *newpfid,
1633                      const struct lu_name *newlname,
1634                      struct thandle *handle,
1635                      struct linkea_data *ldata,
1636                      int first, int check)
1637 {
1638         int rc = 0;
1639
1640         ENTRY;
1641
1642         if (ldata == NULL) {
1643                 ldata = &mdd_env_info(env)->mdi_link_data;
1644                 memset(ldata, 0, sizeof(*ldata));
1645                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1646                                         newpfid, newlname, first, check, ldata);
1647                 if (rc)
1648                         GOTO(out, rc);
1649         }
1650
1651         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1652                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1653
1654         GOTO(out, rc);
1655
1656 out:
1657         if (rc != 0) {
1658                 if (newlname == NULL)
1659                         CERROR("link_ea add failed "DFID": rc = %d\n",
1660                                PFID(mdd_object_fid(mdd_obj)), rc);
1661                 else if (oldpfid == NULL)
1662                         CERROR("link_ea add '%.*s' failed "DFID": rc = %d\n",
1663                                newlname->ln_namelen, newlname->ln_name,
1664                                PFID(mdd_object_fid(mdd_obj)), rc);
1665                 else if (newpfid == NULL)
1666                         CERROR("link_ea del '%.*s' failed "DFID": rc = %d\n",
1667                                oldlname->ln_namelen, oldlname->ln_name,
1668                                PFID(mdd_object_fid(mdd_obj)), rc);
1669                 else
1670                         CERROR("link_ea rename '%.*s'->'%.*s' failed "DFID": rc = %d\n",
1671                                oldlname->ln_namelen, oldlname->ln_name,
1672                                newlname->ln_namelen, newlname->ln_name,
1673                                PFID(mdd_object_fid(mdd_obj)), rc);
1674         }
1675
1676         if (is_vmalloc_addr(ldata->ld_buf))
1677                 /* if we vmalloced a large buffer drop it */
1678                 lu_buf_free(ldata->ld_buf);
1679
1680         return rc;
1681 }
1682
1683 static inline int mdd_links_add(const struct lu_env *env,
1684                                 struct mdd_object *mdd_obj,
1685                                 const struct lu_fid *pfid,
1686                                 const struct lu_name *lname,
1687                                 struct thandle *handle,
1688                                 struct linkea_data *ldata, int first)
1689 {
1690         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1691                                 pfid, lname, handle, ldata, first, 0);
1692 }
1693
1694 static inline int mdd_links_del(const struct lu_env *env,
1695                                 struct mdd_object *mdd_obj,
1696                                 const struct lu_fid *pfid,
1697                                 const struct lu_name *lname,
1698                                 struct thandle *handle)
1699 {
1700         return mdd_links_rename(env, mdd_obj, pfid, lname,
1701                                 NULL, NULL, handle, NULL, 0, 0);
1702 }
1703
1704 /** Read the link EA into a temp buffer.
1705  * Uses the name_buf since it is generally large.
1706  * \retval IS_ERR err
1707  * \retval ptr to \a lu_buf (always \a mdi_link_buf)
1708  */
1709 struct lu_buf *mdd_links_get(const struct lu_env *env,
1710                              struct mdd_object *mdd_obj)
1711 {
1712         struct linkea_data ldata = { NULL };
1713         int rc;
1714
1715         rc = mdd_links_read(env, mdd_obj, &ldata);
1716         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1717 }
1718
1719 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1720                     struct linkea_data *ldata, struct thandle *handle)
1721 {
1722         const struct lu_buf *buf;
1723         int                 rc;
1724
1725         if (ldata == NULL || ldata->ld_buf == NULL ||
1726             ldata->ld_leh == NULL)
1727                 return 0;
1728
1729         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1730                 return 0;
1731
1732 again:
1733         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1734                                 ldata->ld_leh->leh_len);
1735         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1736         if (unlikely(rc == -ENOSPC)) {
1737                 rc = linkea_overflow_shrink(ldata);
1738                 if (likely(rc > 0))
1739                         goto again;
1740         }
1741
1742         return rc;
1743 }
1744
1745 static int mdd_declare_links_add(const struct lu_env *env,
1746                                  struct mdd_object *mdd_obj,
1747                                  struct thandle *handle,
1748                                  struct linkea_data *ldata)
1749 {
1750         int rc;
1751         int ea_len;
1752         void *linkea;
1753
1754         if (ldata != NULL && ldata->ld_leh != NULL) {
1755                 ea_len = ldata->ld_leh->leh_len;
1756                 linkea = ldata->ld_buf->lb_buf;
1757         } else {
1758                 ea_len = MAX_LINKEA_SIZE;
1759                 linkea = NULL;
1760         }
1761
1762         rc = mdo_declare_xattr_set(env, mdd_obj,
1763                                    mdd_buf_get_const(env, linkea, ea_len),
1764                                    XATTR_NAME_LINK, 0, handle);
1765
1766         return rc;
1767 }
1768
1769 static inline int mdd_declare_links_del(const struct lu_env *env,
1770                                         struct mdd_object *c,
1771                                         struct thandle *handle)
1772 {
1773         int rc = 0;
1774
1775         /* For directory, linkEA will be removed together with the object. */
1776         if (!S_ISDIR(mdd_object_type(c)))
1777                 rc = mdd_declare_links_add(env, c, handle, NULL);
1778
1779         return rc;
1780 }
1781
1782 static int mdd_declare_link(const struct lu_env *env,
1783                             struct mdd_device *mdd,
1784                             struct mdd_object *p,
1785                             struct mdd_object *c,
1786                             const struct lu_name *name,
1787                             struct thandle *handle,
1788                             struct lu_attr *la,
1789                             struct linkea_data *data)
1790 {
1791         struct lu_fid tfid = *mdd_object_fid(c);
1792         int rc;
1793
1794         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1795                 tfid.f_oid = cfs_fail_val;
1796
1797         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1798                                       name->ln_name, handle);
1799         if (rc != 0)
1800                 return rc;
1801
1802         rc = mdo_declare_ref_add(env, c, handle);
1803         if (rc != 0)
1804                 return rc;
1805
1806         la->la_valid = LA_CTIME | LA_MTIME;
1807         rc = mdo_declare_attr_set(env, p, la, handle);
1808         if (rc != 0)
1809                 return rc;
1810
1811         la->la_valid = LA_CTIME;
1812         rc = mdo_declare_attr_set(env, c, la, handle);
1813         if (rc != 0)
1814                 return rc;
1815
1816         rc = mdd_declare_links_add(env, c, handle, data);
1817         if (rc != 0)
1818                 return rc;
1819
1820         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1821                                          handle);
1822
1823         return rc;
1824 }
1825
1826 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1827                     struct md_object *src_obj, const struct lu_name *lname,
1828                     struct md_attr *ma)
1829 {
1830         const char *name = lname->ln_name;
1831         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
1832         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1833         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1834         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1835         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
1836         struct mdd_device *mdd = mdo2mdd(src_obj);
1837         struct thandle *handle;
1838         struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1839         struct linkea_data *ldata = &mdd_env_info(env)->mdi_link_data;
1840         int rc;
1841
1842         ENTRY;
1843
1844         rc = mdd_la_get(env, mdd_sobj, cattr);
1845         if (rc != 0)
1846                 RETURN(rc);
1847
1848         rc = mdd_la_get(env, mdd_tobj, tattr);
1849         if (rc != 0)
1850                 RETURN(rc);
1851
1852         /* If we are using project inheritance, we only allow hard link
1853          * creation in our tree when the project IDs are the same;
1854          * otherwise the tree quota mechanism could be circumvented.
1855          */
1856         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1857             (tattr->la_projid != cattr->la_projid))
1858                 RETURN(-EXDEV);
1859
1860         handle = mdd_trans_create(env, mdd);
1861         if (IS_ERR(handle))
1862                 GOTO(out_pending, rc = PTR_ERR(handle));
1863
1864         memset(ldata, 0, sizeof(*ldata));
1865
1866         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1867         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1868
1869         /* Note: even this function will change ldata, but it comes from
1870          * thread_info, which is completely temporary and only seen in
1871          * this function, so we do not need reset ldata once it fails.
1872          */
1873         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1874                                 mdd_object_fid(mdd_tobj), lname, 0, 0, ldata);
1875         if (rc != 0)
1876                 GOTO(stop, rc);
1877
1878         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1879                               la, ldata);
1880         if (rc)
1881                 GOTO(stop, rc);
1882
1883         rc = mdd_trans_start(env, mdd, handle);
1884         if (rc)
1885                 GOTO(stop, rc);
1886
1887         mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
1888         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1889                                    cattr);
1890         if (rc)
1891                 GOTO(out_unlock, rc);
1892
1893         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1894                 rc = mdo_ref_add(env, mdd_sobj, handle);
1895                 if (rc != 0)
1896                         GOTO(out_unlock, rc);
1897         }
1898
1899         *tfid = *mdd_object_fid(mdd_sobj);
1900         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1901                 tfid->f_oid = cfs_fail_val;
1902
1903         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1904                                      mdd_object_type(mdd_sobj), name, handle);
1905         if (rc != 0) {
1906                 mdo_ref_del(env, mdd_sobj, handle);
1907                 GOTO(out_unlock, rc);
1908         }
1909
1910         la->la_valid = LA_CTIME | LA_MTIME;
1911         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1912         if (rc)
1913                 GOTO(out_unlock, rc);
1914
1915         la->la_valid = LA_CTIME;
1916         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1917         if (rc == 0)
1918                 /* Note: The failure of links_add should not cause the
1919                  * link failure, so do not check return value.
1920                  */
1921                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_tobj),
1922                               lname, handle, ldata, 0);
1923
1924         EXIT;
1925 out_unlock:
1926         mdd_write_unlock(env, mdd_sobj);
1927         if (rc == 0)
1928                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1929                                             mdd_tobj, tattr, NULL,
1930                                             NULL, NULL, lname, NULL, handle);
1931 stop:
1932         rc = mdd_trans_stop(env, mdd, rc, handle);
1933         if (is_vmalloc_addr(ldata->ld_buf))
1934                 /* if we vmalloced a large buffer drop it */
1935                 lu_buf_free(ldata->ld_buf);
1936 out_pending:
1937         return rc;
1938 }
1939
1940 static int mdd_mark_orphan_object(const struct lu_env *env,
1941                                 struct mdd_object *obj, struct thandle *handle,
1942                                 bool declare)
1943 {
1944         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1945         int rc;
1946
1947         attr->la_valid = LA_FLAGS;
1948         attr->la_flags = LUSTRE_ORPHAN_FL;
1949
1950         if (declare)
1951                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1952         else
1953                 rc = mdo_attr_set(env, obj, attr, handle);
1954
1955         return rc;
1956 }
1957
1958 static int mdd_declare_finish_unlink(const struct lu_env *env,
1959                                      struct mdd_object *obj,
1960                                      struct thandle *handle)
1961 {
1962         int rc;
1963
1964         /* Sigh, we do not know if the unlink object will become orphan in
1965          * declare phase, but fortunately the flags here does not matter
1966          * in current declare implementation
1967          */
1968         rc = mdd_mark_orphan_object(env, obj, handle, true);
1969         if (rc != 0)
1970                 return rc;
1971
1972         rc = mdo_declare_destroy(env, obj, handle);
1973         if (rc != 0)
1974                 return rc;
1975
1976         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1977         if (rc != 0)
1978                 return rc;
1979
1980         return mdd_declare_links_del(env, obj, handle);
1981 }
1982
1983 /* caller should take a lock before calling */
1984 int mdd_finish_unlink(const struct lu_env *env,
1985                       struct mdd_object *obj, struct md_attr *ma,
1986                       struct mdd_object *pobj,
1987                       const struct lu_name *lname,
1988                       struct thandle *th)
1989 {
1990         int rc = 0;
1991         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1992
1993         ENTRY;
1994
1995         LASSERT(mdd_write_locked(env, obj) != 0);
1996
1997         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1998                 /* add new orphan, object will be deleted during mdd_close() */
1999                 obj->mod_flags |= DEAD_OBJ;
2000                 if (obj->mod_count) {
2001                         rc = mdd_orphan_insert(env, obj, th);
2002                         if (rc == 0)
2003                                 CDEBUG(D_HA,
2004                                        "Object "DFID" is inserted into orphan list, open count = %d\n",
2005                                        PFID(mdd_object_fid(obj)),
2006                                        obj->mod_count);
2007                         else
2008                                 CERROR("Object "DFID" fail to be an orphan, open count = %d, maybe cause failed open replay\n",
2009                                         PFID(mdd_object_fid(obj)),
2010                                         obj->mod_count);
2011
2012                         /* mark object as an orphan here, not before
2013                          * mdd_orphan_insert() as racing mdd_la_get() may
2014                          * propagate ORPHAN_OBJ causing the asserition
2015                          */
2016                         rc = mdd_mark_orphan_object(env, obj, th, false);
2017                 } else {
2018                         rc = mdo_destroy(env, obj, th);
2019                 }
2020         } else if (!is_dir) {
2021                 /* old files may not have link ea; ignore errors */
2022                 mdd_links_del(env, obj, mdd_object_fid(pobj), lname, th);
2023         }
2024
2025         RETURN(rc);
2026 }
2027
2028 /*
2029  * pobj maybe NULL
2030  * has mdd_write_lock on cobj already, but not on pobj yet
2031  */
2032 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
2033                             const struct lu_attr *pattr,
2034                             struct mdd_object *cobj,
2035                             const struct lu_attr *cattr)
2036 {
2037         int rc;
2038
2039         ENTRY;
2040
2041         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
2042
2043         RETURN(rc);
2044 }
2045
2046 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
2047                               struct mdd_object *p, struct mdd_object *c,
2048                               const struct lu_name *name, struct md_attr *ma,
2049                               struct thandle *handle, int no_name, int is_dir)
2050 {
2051         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2052         int rc;
2053
2054         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
2055                 if (likely(no_name == 0)) {
2056                         rc = mdo_declare_index_delete(env, p, name->ln_name,
2057                                                       handle);
2058                         if (rc != 0)
2059                                 return rc;
2060                 }
2061
2062                 if (is_dir != 0) {
2063                         rc = mdo_declare_ref_del(env, p, handle);
2064                         if (rc != 0)
2065                                 return rc;
2066                 }
2067         }
2068
2069         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2070         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2071         la->la_valid = LA_CTIME | LA_MTIME;
2072         rc = mdo_declare_attr_set(env, p, la, handle);
2073         if (rc)
2074                 return rc;
2075
2076         if (c != NULL) {
2077                 rc = mdo_declare_ref_del(env, c, handle);
2078                 if (rc)
2079                         return rc;
2080
2081                 rc = mdo_declare_ref_del(env, c, handle);
2082                 if (rc)
2083                         return rc;
2084
2085                 la->la_valid = LA_CTIME;
2086                 rc = mdo_declare_attr_set(env, c, la, handle);
2087                 if (rc)
2088                         return rc;
2089
2090                 rc = mdd_declare_finish_unlink(env, c, handle);
2091                 if (rc)
2092                         return rc;
2093
2094                 /* FIXME: need changelog for remove entry */
2095                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
2096                                                  NULL, handle);
2097         }
2098
2099         return rc;
2100 }
2101
2102 /*
2103  * test if a file has an HSM archive
2104  * if HSM attributes are not found in ma update them from
2105  * HSM xattr
2106  */
2107 static bool mdd_hsm_archive_exists(const struct lu_env *env,
2108                                    struct mdd_object *obj,
2109                                    struct md_attr *ma)
2110 {
2111         ENTRY;
2112
2113         if (!(ma->ma_valid & MA_HSM)) {
2114                 /* no HSM MD provided, read xattr */
2115                 struct lu_buf *hsm_buf;
2116                 const size_t buflen = sizeof(struct hsm_attrs);
2117                 int rc;
2118
2119                 hsm_buf = mdd_buf_get(env, NULL, 0);
2120                 lu_buf_alloc(hsm_buf, buflen);
2121                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
2122                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
2123                 lu_buf_free(hsm_buf);
2124                 if (rc < 0)
2125                         RETURN(false);
2126
2127                 ma->ma_valid |= MA_HSM;
2128         }
2129         if (ma->ma_hsm.mh_flags & HS_EXISTS)
2130                 RETURN(true);
2131         RETURN(false);
2132 }
2133
2134 /**
2135  * Delete name entry and the object.
2136  * Note: no_name == 1 means it only destory the object, i.e. name_entry
2137  * does not exist for this object, and it could only happen during resending
2138  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
2139  * is also needed in this case(needed by changelog), so we have to add another
2140  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
2141  * the ENOENT failure should be able to be fixed by redo mechanism.
2142  */
2143 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
2144                       struct md_object *cobj, const struct lu_name *lname,
2145                       struct md_attr *ma, int no_name)
2146 {
2147         char *name = (char *)lname->ln_name;
2148         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2149         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2150         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2151         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2152         struct mdd_object *mdd_cobj = NULL;
2153         struct mdd_device *mdd = mdo2mdd(pobj);
2154         struct thandle    *handle;
2155         int rc, is_dir = 0, cl_flags = 0;
2156
2157         ENTRY;
2158
2159         /* let shutdown to start */
2160         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
2161
2162         /* cobj == NULL means only delete name entry */
2163         if (likely(cobj != NULL)) {
2164                 mdd_cobj = md2mdd_obj(cobj);
2165                 if (mdd_object_exists(mdd_cobj) == 0)
2166                         RETURN(-ENOENT);
2167         }
2168
2169         rc = mdd_la_get(env, mdd_pobj, pattr);
2170         if (rc)
2171                 RETURN(rc);
2172
2173         if (likely(mdd_cobj != NULL)) {
2174                 /* fetch cattr */
2175                 rc = mdd_la_get(env, mdd_cobj, cattr);
2176                 if (rc)
2177                         RETURN(rc);
2178
2179                 is_dir = S_ISDIR(cattr->la_mode);
2180                 /* search for an existing archive. We should check ahead as the
2181                  * object can be destroyed in this transaction
2182                  */
2183                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
2184                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
2185         }
2186
2187         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
2188         if (rc)
2189                 RETURN(rc);
2190
2191         handle = mdd_trans_create(env, mdd);
2192         if (IS_ERR(handle))
2193                 RETURN(PTR_ERR(handle));
2194
2195         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
2196                                 lname, ma, handle, no_name, is_dir);
2197         if (rc)
2198                 GOTO(stop, rc);
2199
2200         rc = mdd_trans_start(env, mdd, handle);
2201         if (rc)
2202                 GOTO(stop, rc);
2203
2204         if (likely(mdd_cobj != NULL))
2205                 mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
2206
2207         if (lname->ln_name[lname->ln_namelen] != '\0') {
2208                 /* lname->ln_name is not necessarily NUL terminated */
2209                 name = kmalloc(lname->ln_namelen + 1, GFP_NOFS);
2210                 if (!name)
2211                         GOTO(cleanup, rc = -ENOMEM);
2212
2213                 memcpy(name, lname->ln_name, lname->ln_namelen);
2214                 name[lname->ln_namelen] = '\0';
2215         }
2216
2217         if (likely(no_name == 0) && !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
2218                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
2219                 if (rc)
2220                         GOTO(cleanup, rc);
2221         }
2222
2223         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
2224             CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
2225                 GOTO(cleanup, rc = 0);
2226
2227         if (likely(mdd_cobj != NULL)) {
2228                 rc = mdo_ref_del(env, mdd_cobj, handle);
2229                 if (rc != 0) {
2230                         __mdd_index_insert_only(env, mdd_pobj,
2231                                                 mdd_object_fid(mdd_cobj),
2232                                                 mdd_object_type(mdd_cobj),
2233                                                 name, handle);
2234                         GOTO(cleanup, rc);
2235                 }
2236
2237                 if (is_dir)
2238                         /* unlink dot */
2239                         mdo_ref_del(env, mdd_cobj, handle);
2240
2241                 /* fetch updated nlink */
2242                 rc = mdd_la_get(env, mdd_cobj, cattr);
2243                 if (rc)
2244                         GOTO(cleanup, rc);
2245         }
2246
2247         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2248         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2249
2250         la->la_valid = LA_CTIME | LA_MTIME;
2251         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2252         if (rc)
2253                 GOTO(cleanup, rc);
2254
2255         /* Enough for only unlink the entry */
2256         if (unlikely(mdd_cobj == NULL))
2257                 GOTO(cleanup, rc);
2258
2259         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
2260                 /* update ctime of an unlinked file only if it is still opened
2261                  * or a link still exists
2262                  */
2263                 la->la_valid = LA_CTIME;
2264                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
2265                 if (rc)
2266                         GOTO(cleanup, rc);
2267         }
2268
2269         /* XXX: this transfer to ma will be removed with LOD/OSP */
2270         ma->ma_attr = *cattr;
2271         ma->ma_valid |= MA_INODE;
2272         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
2273         if (rc != 0)
2274                 GOTO(cleanup, rc);
2275
2276         /* fetch updated nlink */
2277         rc = mdd_la_get(env, mdd_cobj, cattr);
2278         /* if object is removed then we can't get its attrs, use last get */
2279         if (rc == -ENOENT) {
2280                 cattr->la_nlink = 0;
2281                 rc = 0;
2282         }
2283
2284         if (cattr->la_nlink == 0) {
2285                 ma->ma_attr = *cattr;
2286                 ma->ma_valid |= MA_INODE;
2287         }
2288
2289         EXIT;
2290 cleanup:
2291         if (name != lname->ln_name)
2292                 kfree(name);
2293
2294         if (likely(mdd_cobj != NULL))
2295                 mdd_write_unlock(env, mdd_cobj);
2296
2297         if (rc == 0) {
2298                 if (mdd_is_dead_obj(mdd_cobj))
2299                         cl_flags |= CLF_UNLINK_LAST;
2300                 else
2301                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
2302
2303                 rc = mdd_changelog_ns_store(env, mdd,
2304                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
2305                         mdd_cobj, mdd_pobj, pattr, NULL,
2306                         NULL, NULL, lname, NULL, handle);
2307         }
2308
2309 stop:
2310         rc = mdd_trans_stop(env, mdd, rc, handle);
2311
2312         return rc;
2313 }
2314
2315 /*
2316  * The permission has been checked when obj created, no need check again.
2317  */
2318 static int mdd_cd_sanity_check(const struct lu_env *env,
2319                                struct mdd_object *obj)
2320 {
2321         ENTRY;
2322
2323         /* EEXIST check */
2324         if (!obj || mdd_is_dead_obj(obj))
2325                 RETURN(-ENOENT);
2326
2327         RETURN(0);
2328 }
2329
2330 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
2331                            struct md_object *cobj,
2332                            const struct md_op_spec *spec, struct md_attr *ma)
2333 {
2334         struct mdd_device *mdd = mdo2mdd(cobj);
2335         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2336         struct mdd_object *son = md2mdd_obj(cobj);
2337         struct thandle *handle;
2338         const struct lu_buf *buf;
2339         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
2340         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
2341         int rc;
2342
2343         ENTRY;
2344
2345         rc = mdd_cd_sanity_check(env, son);
2346         if (rc)
2347                 RETURN(rc);
2348
2349         if (!md_should_create(spec->sp_cr_flags))
2350                 RETURN(0);
2351
2352         /*
2353          * there are following use cases for this function:
2354          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
2355          *    striping can be specified or not
2356          * 2) CMD?
2357          */
2358         rc = mdd_la_get(env, son, attr);
2359         if (rc)
2360                 RETURN(rc);
2361
2362         /* calling ->ah_make_hint(), used to transfer information from parent */
2363         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2364
2365         handle = mdd_trans_create(env, mdd);
2366         if (IS_ERR(handle))
2367                 GOTO(out_free, rc = PTR_ERR(handle));
2368
2369         /*
2370          * XXX: Setting the lov ea is not locked but setting the attr is locked?
2371          * Should this be fixed?
2372          */
2373         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#lo, no_create %u\n",
2374                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
2375                spec->sp_cr_flags, spec->no_create);
2376
2377         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
2378                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2379                                         spec->u.sp_ea.eadatalen);
2380         } else {
2381                 buf = &LU_BUF_NULL;
2382         }
2383
2384         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
2385                                   XATTR_NAME_LOV, 0, handle);
2386         if (rc)
2387                 GOTO(stop, rc);
2388
2389         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
2390                                          handle);
2391         if (rc)
2392                 GOTO(stop, rc);
2393
2394         rc = mdd_trans_start(env, mdd, handle);
2395         if (rc)
2396                 GOTO(stop, rc);
2397
2398         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
2399                           0, handle);
2400
2401         if (rc)
2402                 GOTO(stop, rc);
2403
2404         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle,
2405                                       NULL);
2406
2407 stop:
2408         rc = mdd_trans_stop(env, mdd, rc, handle);
2409
2410 out_free:
2411         RETURN(rc);
2412 }
2413
2414 static int mdd_declare_object_initialize(const struct lu_env *env,
2415                                          struct mdd_object *parent,
2416                                          struct mdd_object *child,
2417                                          const struct lu_attr *attr,
2418                                          struct thandle *handle)
2419 {
2420         int rc;
2421
2422         ENTRY;
2423
2424         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
2425         if (!S_ISDIR(attr->la_mode))
2426                 RETURN(0);
2427
2428         rc = mdo_declare_index_insert(env, child, mdd_object_fid(child),
2429                                       S_IFDIR, dot, handle);
2430         if (rc != 0)
2431                 RETURN(rc);
2432
2433         rc = mdo_declare_ref_add(env, child, handle);
2434         if (rc != 0)
2435                 RETURN(rc);
2436
2437         rc = mdo_declare_index_insert(env, child, mdd_object_fid(parent),
2438                                       S_IFDIR, dotdot, handle);
2439
2440         RETURN(rc);
2441 }
2442
2443 static int mdd_object_initialize(const struct lu_env *env,
2444                                  const struct lu_fid *pfid,
2445                                  struct mdd_object *child,
2446                                  struct lu_attr *attr,
2447                                  struct thandle *handle)
2448 {
2449         int rc = 0;
2450
2451         ENTRY;
2452
2453         if (S_ISDIR(attr->la_mode)) {
2454                 /* Add "." and ".." for newly created dir */
2455                 mdo_ref_add(env, child, handle);
2456                 rc = __mdd_index_insert_only(env, child, mdd_object_fid(child),
2457                                              S_IFDIR, dot, handle);
2458                 if (rc == 0)
2459                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
2460                                                      dotdot, handle);
2461                 if (rc != 0)
2462                         mdo_ref_del(env, child, handle);
2463         }
2464
2465         RETURN(rc);
2466 }
2467
2468 /**
2469  * This function checks whether it can create a file/dir under the
2470  * directory(@pobj). The directory(@pobj) is not being locked by
2471  * mdd lock.
2472  *
2473  * \param[in] env       execution environment
2474  * \param[in] pobj      the directory to create files
2475  * \param[in] pattr     the attributes of the directory
2476  * \param[in] lname     the name of the created file/dir
2477  * \param[in] cattr     the attributes of the file/dir
2478  * \param[in] spec      create specification
2479  *
2480  * \retval              = 0 it is allowed to create file/dir under
2481  *                      the directory
2482  * \retval              negative error not allowed to create file/dir
2483  *                      under the directory
2484  */
2485 static int mdd_create_sanity_check(const struct lu_env *env,
2486                                    struct md_object *pobj,
2487                                    const struct lu_attr *pattr,
2488                                    const struct lu_name *lname,
2489                                    struct lu_attr *cattr,
2490                                    struct md_op_spec *spec)
2491 {
2492         struct mdd_thread_info *info = mdd_env_info(env);
2493         struct lu_fid *fid = &info->mdi_fid;
2494         struct mdd_object *obj = md2mdd_obj(pobj);
2495         struct mdd_device *m = mdo2mdd(pobj);
2496         bool check_perm = true;
2497         int rc;
2498
2499         ENTRY;
2500
2501         /* EEXIST check */
2502         if (mdd_is_dead_obj(obj))
2503                 RETURN(-ENOENT);
2504
2505         /*
2506          * In some cases this lookup is not needed - we know before if name
2507          * exists or not because MDT performs lookup for it.
2508          * name length check is done in lookup.
2509          */
2510         if (spec->sp_cr_lookup) {
2511                 /*
2512                  * Check if the name already exist, though it will be checked in
2513                  * _index_insert also, for avoiding rolling back if exists
2514                  * _index_insert.
2515                  */
2516                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2517                                   MAY_WRITE | MAY_EXEC);
2518                 if (rc != -ENOENT)
2519                         RETURN(rc ? : -EEXIST);
2520
2521                 /* Permission is already being checked in mdd_lookup */
2522                 check_perm = false;
2523         }
2524
2525         if (S_ISDIR(cattr->la_mode) &&
2526             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2527             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2528                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2529
2530                 if (!lmv_user_magic_supported(le32_to_cpu(lum->lum_magic)) &&
2531                     !(spec->sp_replay &&
2532                       lum->lum_magic == cpu_to_le32(LMV_MAGIC_V1))) {
2533                         rc = -EINVAL;
2534                         CERROR("%s: invalid lmv_user_md: magic=%x hash=%x stripe_offset=%d stripe_count=%u: rc = %d\n",
2535                                mdd2obd_dev(m)->obd_name,
2536                                le32_to_cpu(lum->lum_magic),
2537                                le32_to_cpu(lum->lum_hash_type),
2538                                (int)le32_to_cpu(lum->lum_stripe_offset),
2539                                le32_to_cpu(lum->lum_stripe_count), rc);
2540                         RETURN(rc);
2541                 }
2542         }
2543
2544         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2545         if (rc != 0)
2546                 RETURN(rc);
2547
2548         /* sgid check */
2549         if (pattr->la_mode & S_ISGID) {
2550                 struct lu_ucred *uc = lu_ucred(env);
2551
2552                 cattr->la_gid = pattr->la_gid;
2553
2554                 /* Directories are special, and always inherit S_ISGID */
2555                 if (S_ISDIR(cattr->la_mode)) {
2556                         cattr->la_mode |= S_ISGID;
2557                         cattr->la_valid |= LA_MODE;
2558                 } else if ((cattr->la_mode & (S_ISGID | 0010))
2559                                 == (S_ISGID | 0010) &&
2560                            !lustre_in_group_p(uc,
2561                                               (cattr->la_valid & LA_GID) ?
2562                                               cattr->la_gid : pattr->la_gid) &&
2563                            !cap_raised(uc->uc_cap, CAP_FSETID)) {
2564                         cattr->la_mode &= ~S_ISGID;
2565                         cattr->la_valid |= LA_MODE;
2566                 }
2567         }
2568
2569         /* Inherit project ID from parent directory */
2570         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2571                 cattr->la_projid = pattr->la_projid;
2572                 if (S_ISDIR(cattr->la_mode)) {
2573                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2574                         cattr->la_valid |= LA_FLAGS;
2575                 }
2576                 cattr->la_valid |= LA_PROJID;
2577         }
2578
2579         rc = mdd_name_check(env, m, lname);
2580         if (rc < 0)
2581                 RETURN(rc);
2582
2583         switch (cattr->la_mode & S_IFMT) {
2584         case S_IFLNK: {
2585                 unsigned int symlen = spec->u.sp_symname.ln_namelen + 1;
2586
2587                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2588                         RETURN(-ENAMETOOLONG);
2589                 else
2590                         RETURN(0);
2591         }
2592         case S_IFDIR:
2593         case S_IFREG:
2594         case S_IFCHR:
2595         case S_IFBLK:
2596         case S_IFIFO:
2597         case S_IFSOCK:
2598                 rc = 0;
2599                 break;
2600         default:
2601                 rc = -EINVAL;
2602                 break;
2603         }
2604         RETURN(rc);
2605 }
2606
2607 static int mdd_declare_create_object(const struct lu_env *env,
2608                                      struct mdd_device *mdd,
2609                                      struct mdd_object *p, struct mdd_object *c,
2610                                      struct lu_attr *attr,
2611                                      struct thandle *handle,
2612                                      const struct md_op_spec *spec,
2613                                      struct lu_buf *def_acl_buf,
2614                                      struct lu_buf *acl_buf,
2615                                      struct lu_buf *hsm_buf,
2616                                      struct dt_allocation_hint *hint)
2617 {
2618         const struct lu_buf *buf;
2619         int rc;
2620
2621 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2622         /* ldiskfs OSD needs this information for credit allocation */
2623         if (def_acl_buf)
2624                 hint->dah_acl_len = def_acl_buf->lb_len;
2625 #endif
2626         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2627                                                 hint);
2628         if (rc)
2629                 GOTO(out, rc);
2630
2631 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2632         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2633                 /* if dir, then can inherit default ACl */
2634                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2635                                            XATTR_NAME_ACL_DEFAULT,
2636                                            0, handle);
2637                 if (rc)
2638                         GOTO(out, rc);
2639         }
2640
2641         if (acl_buf && acl_buf->lb_len > 0) {
2642                 rc = mdo_declare_attr_set(env, c, attr, handle);
2643                 if (rc)
2644                         GOTO(out, rc);
2645
2646                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2647                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2648                 if (rc)
2649                         GOTO(out, rc);
2650         }
2651 #endif
2652         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2653         if (rc)
2654                 GOTO(out, rc);
2655
2656         /* replay case, create LOV EA from client data */
2657         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2658             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2659                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2660                                         spec->u.sp_ea.eadatalen);
2661                 rc = mdo_declare_xattr_set(env, c, buf,
2662                                            S_ISDIR(attr->la_mode) ?
2663                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2664                                            LU_XATTR_CREATE, handle);
2665                 if (rc)
2666                         GOTO(out, rc);
2667
2668                 if (spec->sp_cr_flags & MDS_OPEN_PCC) {
2669                         rc = mdo_declare_xattr_set(env, c, hsm_buf,
2670                                                    XATTR_NAME_HSM,
2671                                                    0, handle);
2672                         if (rc)
2673                                 GOTO(out, rc);
2674                 }
2675         }
2676
2677         if (S_ISLNK(attr->la_mode)) {
2678                 const char *target_name = spec->u.sp_symname.ln_name;
2679                 int sym_len = spec->u.sp_symname.ln_namelen;
2680                 const struct lu_buf *buf;
2681
2682                 buf = mdd_buf_get_const(env, target_name, sym_len);
2683                 rc = dt_declare_record_write(env, mdd_object_child(c),
2684                                              buf, 0, handle);
2685                 if (rc)
2686                         GOTO(out, rc);
2687         }
2688
2689         if (spec->sp_cr_file_secctx_name != NULL) {
2690                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2691                                         spec->sp_cr_file_secctx_size);
2692                 rc = mdo_declare_xattr_set(env, c, buf,
2693                                            spec->sp_cr_file_secctx_name, 0,
2694                                            handle);
2695                 if (rc < 0)
2696                         GOTO(out, rc);
2697         }
2698
2699         if (spec->sp_cr_file_encctx != NULL) {
2700                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2701                                         spec->sp_cr_file_encctx_size);
2702                 rc = mdo_declare_xattr_set(env, c, buf,
2703                                            LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2704                                            handle);
2705                 if (rc < 0)
2706                         GOTO(out, rc);
2707         }
2708 out:
2709         return rc;
2710 }
2711
2712 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2713                               struct mdd_object *p, struct mdd_object *c,
2714                               const struct lu_name *name,
2715                               struct lu_attr *attr,
2716                               struct thandle *handle,
2717                               const struct md_op_spec *spec,
2718                               struct linkea_data *ldata,
2719                               struct lu_buf *def_acl_buf,
2720                               struct lu_buf *acl_buf,
2721                               struct lu_buf *hsm_buf,
2722                               struct dt_allocation_hint *hint)
2723 {
2724         int rc;
2725
2726         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2727                                        def_acl_buf, acl_buf, hsm_buf, hint);
2728         if (rc)
2729                 GOTO(out, rc);
2730
2731         if (S_ISDIR(attr->la_mode)) {
2732                 rc = mdo_declare_ref_add(env, p, handle);
2733                 if (rc)
2734                         GOTO(out, rc);
2735         }
2736
2737         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2738                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2739                 if (rc)
2740                         GOTO(out, rc);
2741         } else {
2742                 struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2743                 enum changelog_rec_type type;
2744
2745                 rc = mdo_declare_index_insert(env, p, mdd_object_fid(c),
2746                                               attr->la_mode, name->ln_name,
2747                                               handle);
2748                 if (rc != 0)
2749                         return rc;
2750
2751                 rc = mdd_declare_links_add(env, c, handle, ldata);
2752                 if (rc)
2753                         return rc;
2754
2755                 *la = *attr;
2756                 la->la_valid = LA_CTIME | LA_MTIME;
2757                 rc = mdo_declare_attr_set(env, p, la, handle);
2758                 if (rc)
2759                         return rc;
2760
2761                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2762                        S_ISREG(attr->la_mode) ? CL_CREATE :
2763                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2764
2765                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2766                                                  handle);
2767                 if (rc)
2768                         return rc;
2769         }
2770 out:
2771         return rc;
2772 }
2773
2774 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2775                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2776                         struct lu_buf *acl_buf)
2777 {
2778         int rc;
2779
2780         ENTRY;
2781
2782         if (S_ISLNK(la->la_mode)) {
2783                 acl_buf->lb_len = 0;
2784                 def_acl_buf->lb_len = 0;
2785                 RETURN(0);
2786         }
2787
2788         mdd_read_lock(env, pobj, DT_TGT_PARENT);
2789         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2790                            XATTR_NAME_ACL_DEFAULT);
2791         mdd_read_unlock(env, pobj);
2792         if (rc > 0) {
2793                 /* ACL buffer size is not enough, need realloc */
2794                 if (rc > acl_buf->lb_len)
2795                         RETURN(-ERANGE);
2796
2797                 /* If there are default ACL, fix mode/ACL by default ACL */
2798                 def_acl_buf->lb_len = rc;
2799                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2800                 acl_buf->lb_len = rc;
2801                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2802                 if (rc < 0)
2803                         RETURN(rc);
2804         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2805                 /* If there are no default ACL, fix mode by mask */
2806                 struct lu_ucred *uc = lu_ucred(env);
2807
2808                 /* The create triggered by MDT internal events, such as
2809                  * LFSCK reset, will not contain valid "uc".
2810                  */
2811                 if (unlikely(uc != NULL))
2812                         la->la_mode &= ~uc->uc_umask;
2813                 rc = 0;
2814                 acl_buf->lb_len = 0;
2815                 def_acl_buf->lb_len = 0;
2816         }
2817
2818         RETURN(rc);
2819 }
2820
2821 /**
2822  * Create a metadata object and initialize it, set acl, xattr.
2823  **/
2824 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2825                              struct mdd_object *son, struct lu_attr *attr,
2826                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2827                              struct lu_buf *def_acl_buf,
2828                              struct lu_buf *hsm_buf,
2829                              struct dt_allocation_hint *hint,
2830                              struct thandle *handle, bool initial_create)
2831 {
2832         const struct lu_fid *son_fid = mdd_object_fid(son);
2833         const struct lu_ucred *uc = lu_ucred(env);
2834         const char *jobid = uc->uc_jobid;
2835         const struct lu_buf *buf;
2836         size_t jobid_len;
2837         int rc;
2838
2839         mdd_write_lock(env, son, DT_TGT_CHILD);
2840         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2841                                         hint);
2842         if (rc)
2843                 GOTO(unlock, rc);
2844
2845         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2846          * created in declare phase, they also needs to be added to master
2847          * object as sub-directory entry. So it has to initialize the master
2848          * object, then set dir striped EA.(in mdo_xattr_set)
2849          */
2850         rc = mdd_object_initialize(env, mdd_object_fid(pobj), son, attr,
2851                                    handle);
2852         if (rc != 0)
2853                 GOTO(err_destroy, rc);
2854
2855         /*
2856          * in case of replay we just set LOVEA provided by the client
2857          * XXX: I think it would be interesting to try "old" way where
2858          *      MDT calls this xattr_set(LOV) in a different transaction.
2859          *      probably this way we code can be made better.
2860          */
2861
2862         /* During creation, there are only a few cases we need do xattr_set to
2863          * create stripes.
2864          * 1. regular file: see comments above.
2865          * 2. dir: inherit default striping or pool settings from parent.
2866          * 3. create striped directory with provided stripeEA.
2867          * 4. create striped directory because inherit default layout from the
2868          * parent.
2869          */
2870         if (spec->no_create ||
2871             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2872             S_ISDIR(attr->la_mode)) {
2873                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2874                                         spec->u.sp_ea.eadatalen);
2875                 rc = mdo_xattr_set(env, son, buf,
2876                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2877                                                             XATTR_NAME_LOV,
2878                                    LU_XATTR_CREATE, handle);
2879                 if (rc != 0)
2880                         GOTO(err_destroy, rc);
2881         }
2882
2883         if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
2884                 struct md_hsm mh;
2885
2886                 memset(&mh, 0, sizeof(mh));
2887                 mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
2888                 mh.mh_arch_id = spec->sp_archive_id;
2889                 lustre_hsm2buf(hsm_buf->lb_buf, &mh);
2890                 rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
2891                                    0, handle);
2892                 if (rc != 0)
2893                         GOTO(err_destroy, rc);
2894         }
2895
2896 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2897         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2898             S_ISDIR(attr->la_mode)) {
2899                 /* set default acl */
2900                 rc = mdo_xattr_set(env, son, def_acl_buf,
2901                                    XATTR_NAME_ACL_DEFAULT, 0,
2902                                    handle);
2903                 if (rc)
2904                         GOTO(err_destroy, rc);
2905         }
2906         /* set its own acl */
2907         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2908                 rc = mdo_xattr_set(env, son, acl_buf,
2909                                    XATTR_NAME_ACL_ACCESS,
2910                                    0, handle);
2911                 if (rc)
2912                         GOTO(err_destroy, rc);
2913         }
2914 #endif
2915
2916         if (S_ISLNK(attr->la_mode)) {
2917                 struct dt_object *dt = mdd_object_child(son);
2918                 const char *target_name = spec->u.sp_symname.ln_name;
2919                 int sym_len = spec->u.sp_symname.ln_namelen;
2920                 loff_t pos = 0;
2921
2922                 buf = mdd_buf_get_const(env, target_name, sym_len);
2923                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
2924                 if (rc == sym_len)
2925                         rc = 0;
2926                 else
2927                         GOTO(err_initlized, rc = -EFAULT);
2928         }
2929
2930         if (initial_create && spec->sp_cr_file_secctx_name != NULL) {
2931                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2932                                         spec->sp_cr_file_secctx_size);
2933                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2934                                    0, handle);
2935                 if (rc < 0)
2936                         GOTO(err_initlized, rc);
2937         }
2938
2939         if (spec->sp_cr_file_encctx != NULL) {
2940                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2941                                         spec->sp_cr_file_encctx_size);
2942                 rc = mdo_xattr_set(env, son, buf,
2943                                    LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2944                                    handle);
2945                 if (rc < 0)
2946                         GOTO(err_initlized, rc);
2947         }
2948
2949         if (initial_create &&
2950             spec->sp_cr_job_xattr[0] != '\0' &&
2951             jobid[0] != '\0' &&
2952             (S_ISREG(attr->la_mode) || S_ISDIR(attr->la_mode))) {
2953                 jobid_len = strnlen(jobid, LUSTRE_JOBID_SIZE);
2954                 buf = mdd_buf_get_const(env, jobid, jobid_len);
2955
2956                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_job_xattr, 0,
2957                                    handle);
2958                 /* this xattr is nonessential, so ignore errors. */
2959                 if (rc != 0) {
2960                         CDEBUG(D_INODE,
2961                                DFID" failed to set xattr '%s': rc = %d\n",
2962                                PFID(son_fid), spec->sp_cr_job_xattr, rc);
2963                         rc = 0;
2964                 }
2965         }
2966
2967 err_initlized:
2968         if (unlikely(rc != 0)) {
2969                 int rc2;
2970
2971                 if (S_ISDIR(attr->la_mode)) {
2972                         /* Drop the reference, no need to delete "."/"..",
2973                          * because the object to be destroyed directly.
2974                          */
2975                         rc2 = mdo_ref_del(env, son, handle);
2976                         if (rc2 != 0)
2977                                 GOTO(unlock, rc);
2978                 }
2979                 rc2 = mdo_ref_del(env, son, handle);
2980                 if (rc2 != 0)
2981                         GOTO(unlock, rc);
2982 err_destroy:
2983                 mdo_destroy(env, son, handle);
2984         }
2985 unlock:
2986         mdd_write_unlock(env, son);
2987         RETURN(rc);
2988 }
2989
2990 static int mdd_index_delete(const struct lu_env *env,
2991                             struct mdd_object *mdd_pobj,
2992                             struct lu_attr *cattr,
2993                             const struct lu_name *lname)
2994 {
2995         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2996         struct thandle *handle;
2997         int rc;
2998
2999         ENTRY;
3000
3001         handle = mdd_trans_create(env, mdd);
3002         if (IS_ERR(handle))
3003                 RETURN(PTR_ERR(handle));
3004
3005         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
3006                                       handle);
3007         if (rc != 0)
3008                 GOTO(stop, rc);
3009
3010         if (S_ISDIR(cattr->la_mode)) {
3011                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3012                 if (rc != 0)
3013                         GOTO(stop, rc);
3014         }
3015
3016         /* Since this will only be used in the error handler path,
3017          * Let's set the thandle to be local and not mess the transno
3018          */
3019         handle->th_local = 1;
3020         rc = mdd_trans_start(env, mdd, handle);
3021         if (rc)
3022                 GOTO(stop, rc);
3023
3024         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
3025                                 S_ISDIR(cattr->la_mode), handle);
3026         if (rc)
3027                 GOTO(stop, rc);
3028 stop:
3029         rc = mdd_trans_stop(env, mdd, rc, handle);
3030
3031         RETURN(rc);
3032 }
3033
3034 /**
3035  * Create object and insert it into namespace.
3036  *
3037  * Two operations have to be performed:
3038  *
3039  *  - an allocation of a new object (->do_create()), and
3040  *  - an insertion into a parent index (->dio_insert()).
3041  *
3042  * Due to locking, operation order is not important, when both are
3043  * successful, *but* error handling cases are quite different:
3044  *
3045  *  - if insertion is done first, and following object creation fails,
3046  *  insertion has to be rolled back, but this operation might fail
3047  *  also leaving us with dangling index entry.
3048  *
3049  *  - if creation is done first, is has to be undone if insertion fails,
3050  *  leaving us with leaked space, which is not good but not fatal.
3051  *
3052  * It seems that creation-first is simplest solution, but it is sub-optimal
3053  * in the frequent
3054  *
3055  * $ mkdir foo
3056  * $ mkdir foo
3057  *
3058  * case, because second mkdir is bound to create object, only to
3059  * destroy it immediately.
3060  *
3061  * To avoid this follow local file systems that do double lookup:
3062  *
3063  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
3064  * 1. create            (mdd_create_object_internal())
3065  * 2. insert            (__mdd_index_insert(), lookup again)
3066  *
3067  * \param[in] pobj      parent object
3068  * \param[in] lname     name of child being created
3069  * \param[in,out] child child object being created
3070  * \param[in] spec      additional create parameters
3071  * \param[in] ma        attributes for new child object
3072  *
3073  * \retval              0 on success
3074  * \retval              negative errno on failure
3075  */
3076 int mdd_create(const struct lu_env *env, struct md_object *pobj,
3077                       const struct lu_name *lname, struct md_object *child,
3078                       struct md_op_spec *spec, struct md_attr *ma)
3079 {
3080         struct mdd_thread_info *info = mdd_env_info(env);
3081         struct lu_attr *la = &info->mdi_la_for_fix;
3082         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
3083         struct mdd_object *son = md2mdd_obj(child);
3084         struct mdd_device *mdd = mdo2mdd(pobj);
3085         struct lu_attr *attr = &ma->ma_attr;
3086         struct thandle *handle;
3087         struct lu_attr *pattr = &info->mdi_pattr;
3088         struct lu_buf acl_buf;
3089         struct lu_buf def_acl_buf;
3090         struct lu_buf hsm_buf;
3091         struct linkea_data *ldata = &info->mdi_link_data;
3092         const char *name = lname->ln_name;
3093         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
3094         int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
3095         bool name_inserted = false;
3096         int rc, rc2;
3097
3098         ENTRY;
3099
3100         rc = mdd_la_get(env, mdd_pobj, pattr);
3101         if (rc != 0)
3102                 RETURN(rc);
3103
3104         /* Sanity checks before big job. */
3105         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
3106         if (unlikely(rc == -EEXIST && S_ISDIR(attr->la_mode) &&
3107                      spec->sp_replay && mdd_object_remote(mdd_pobj)))
3108                 /* if it's replay by client request, and name is found in
3109                  * parent directory on remote MDT, it means mkdir was partially
3110                  * executed: name was successfully added, but target not.
3111                  */
3112                 name_inserted = true;
3113         else if (rc)
3114                 RETURN(rc);
3115
3116         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
3117                 GOTO(out_free, rc = -EINPROGRESS);
3118
3119         handle = mdd_trans_create(env, mdd);
3120         if (IS_ERR(handle))
3121                 GOTO(out_free, rc = PTR_ERR(handle));
3122
3123 use_bigger_buffer:
3124         acl_buf = *lu_buf_check_and_alloc(&info->mdi_xattr_buf, acl_size);
3125         if (!acl_buf.lb_buf)
3126                 GOTO(out_stop, rc = -ENOMEM);
3127
3128         def_acl_buf = *lu_buf_check_and_alloc(&info->mdi_big_buf, acl_size);
3129         if (!def_acl_buf.lb_buf)
3130                 GOTO(out_stop, rc = -ENOMEM);
3131
3132         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
3133         if (unlikely(rc == -ERANGE &&
3134                      acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
3135                 /* use maximum-sized xattr buffer for too-big default ACL */
3136                 acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
3137                                  XATTR_SIZE_MAX);
3138                 goto use_bigger_buffer;
3139         }
3140         if (rc < 0)
3141                 GOTO(out_stop, rc);
3142
3143         /* adjust stripe count to 0 for 'lfs mkdir -c 1 ...' to avoid creating
3144          * 1-stripe directory, MDS_OPEN_DEFAULT_LMV means ea is default LMV.
3145          */
3146         if (unlikely(S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata &&
3147                      !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV))) {
3148                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
3149
3150                 /* migrate may create 1-stripe directory, adjust stripe count
3151                  * before lod_ah_init().
3152                  */
3153                 if (lmu && lmu->lum_magic == cpu_to_le32(LMV_USER_MAGIC) &&
3154                     lmu->lum_stripe_count == cpu_to_le32(1))
3155                         lmu->lum_stripe_count = 0;
3156         }
3157
3158         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
3159
3160         memset(ldata, 0, sizeof(*ldata));
3161         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
3162                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
3163
3164                 tfid.f_oid--;
3165                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
3166                                         &tfid, lname, 1, 0, ldata);
3167         } else {
3168                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
3169                                         mdd_object_fid(mdd_pobj),
3170                                         lname, 1, 0, ldata);
3171         }
3172
3173         if (spec->sp_cr_flags & MDS_OPEN_PCC) {
3174                 LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
3175
3176                 memset(&hsm_buf, 0, sizeof(hsm_buf));
3177                 lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
3178                 if (hsm_buf.lb_buf == NULL)
3179                         GOTO(out_stop, rc = -ENOMEM);
3180         }
3181
3182         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
3183                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
3184                                 &hsm_buf, hint);
3185         if (rc)
3186                 GOTO(out_stop, rc);
3187
3188         rc = mdd_trans_start(env, mdd, handle);
3189         if (rc)
3190                 GOTO(out_stop, rc);
3191
3192         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
3193                                &def_acl_buf, &hsm_buf, hint, handle, true);
3194         if (rc != 0)
3195                 GOTO(out_stop, rc);
3196
3197         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
3198                 mdd_write_lock(env, son, DT_TGT_CHILD);
3199                 son->mod_flags |= VOLATILE_OBJ;
3200                 rc = mdd_orphan_insert(env, son, handle);
3201                 GOTO(out_volatile, rc);
3202         } else {
3203                 if (likely(!name_inserted)) {
3204                         rc = __mdd_index_insert(env, mdd_pobj,
3205                                                 mdd_object_fid(son),
3206                                                 attr->la_mode, name, handle);
3207                         if (rc != 0)
3208                                 GOTO(err_created, rc);
3209                 }
3210
3211                 mdd_links_add(env, son, mdd_object_fid(mdd_pobj), lname,
3212                               handle, ldata, 1);
3213
3214                 /* update parent directory mtime/ctime */
3215                 *la = *attr;
3216                 la->la_valid = LA_CTIME | LA_MTIME;
3217                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
3218                 if (rc)
3219                         GOTO(err_insert, rc);
3220         }
3221
3222         EXIT;
3223 err_insert:
3224         if (rc != 0) {
3225                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
3226                         rc2 = mdd_orphan_delete(env, son, handle);
3227                 else
3228                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
3229                                                  S_ISDIR(attr->la_mode),
3230                                                  handle);
3231                 if (rc2 != 0)
3232                         goto out_stop;
3233
3234 err_created:
3235                 mdd_write_lock(env, son, DT_TGT_CHILD);
3236                 if (S_ISDIR(attr->la_mode)) {
3237                         /* Drop the reference, no need to delete "."/"..",
3238                          * because the object is to be destroyed directly.
3239                          */
3240                         rc2 = mdo_ref_del(env, son, handle);
3241                         if (rc2 != 0) {
3242                                 mdd_write_unlock(env, son);
3243                                 goto out_stop;
3244                         }
3245                 }
3246 out_volatile:
3247                 /* For volatile files drop one link immediately, since there is
3248                  * no filename in the namespace, and save any error returned.
3249                  */
3250                 rc2 = mdo_ref_del(env, son, handle);
3251                 if (rc2 != 0) {
3252                         mdd_write_unlock(env, son);
3253                         if (unlikely(rc == 0))
3254                                 rc = rc2;
3255                         goto out_stop;
3256                 }
3257
3258                 /* Don't destroy the volatile object on success */
3259                 if (likely(rc != 0))
3260                         mdo_destroy(env, son, handle);
3261                 mdd_write_unlock(env, son);
3262         }
3263
3264         if (rc == 0 && fid_is_namespace_visible(mdd_object_fid(son)) &&
3265             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
3266                 rc = mdd_changelog_ns_store(env, mdd,
3267                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
3268                                 S_ISREG(attr->la_mode) ? CL_CREATE :
3269                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
3270                                 0, son, mdd_pobj, pattr, NULL, NULL, NULL,
3271                                 lname, NULL, handle);
3272 out_stop:
3273         rc2 = mdd_trans_stop(env, mdd, rc, handle);
3274         if (rc == 0) {
3275                 /* If creation fails, it is most likely due to the remote update
3276                  * failure, because local transaction will mostly succeed at
3277                  * this stage. There is no easy way to rollback all of previous
3278                  * updates, so let's remove the object from namespace, and
3279                  * LFSCK should handle the orphan object.
3280                  */
3281                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
3282                         mdd_index_delete(env, mdd_pobj, attr, lname);
3283                 rc = rc2;
3284         }
3285 out_free:
3286         if (is_vmalloc_addr(ldata->ld_buf))
3287                 /* if we vmalloced a large buffer drop it */
3288                 lu_buf_free(ldata->ld_buf);
3289
3290         if (spec->sp_cr_flags & MDS_OPEN_PCC)
3291                 lu_buf_free(&hsm_buf);
3292
3293         /* The child object shouldn't be cached anymore */
3294         if (rc)
3295                 set_bit(LU_OBJECT_HEARD_BANSHEE,
3296                         &child->mo_lu.lo_header->loh_flags);
3297         return rc;
3298 }
3299
3300 /* has not mdd_write{read}_lock on any obj yet. */
3301 static int mdd_rename_sanity_check(const struct lu_env *env,
3302                                    struct mdd_object *src_pobj,
3303                                    const struct lu_attr *spattr,
3304                                    struct mdd_object *tgt_pobj,
3305                                    const struct lu_attr *tpattr,
3306                                    struct mdd_object *sobj,
3307                                    const struct lu_attr *sattr,
3308                                    struct mdd_object *tobj,
3309                                    const struct lu_attr *tattr)
3310 {
3311         int rc = 0;
3312
3313         ENTRY;
3314
3315         /* XXX: when get here, sobj must NOT be NULL,
3316          * the other case has been processed in cld_rename
3317          * before mdd_rename and enable MDS_PERM_BYPASS.
3318          */
3319         LASSERT(sobj);
3320
3321         /*
3322          * If we are using project inheritance, we only allow renames
3323          * into our tree when the project IDs are the same; otherwise
3324          * tree quota mechanism would be circumvented.
3325          */
3326         if ((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
3327              tpattr->la_projid != sattr->la_projid && S_ISDIR(sattr->la_mode))
3328                 RETURN(-EXDEV);
3329
3330         /* we prevent an encrypted file from being renamed
3331          * into an unencrypted dir
3332          */
3333         if ((spattr->la_valid & LA_FLAGS &&
3334              spattr->la_flags & LUSTRE_ENCRYPT_FL) &&
3335             !(tpattr->la_valid & LA_FLAGS &&
3336               tpattr->la_flags & LUSTRE_ENCRYPT_FL))
3337                 RETURN(-EXDEV);
3338
3339         rc = mdd_may_delete(env, src_pobj, spattr, sobj, sattr, NULL, 1, 0);
3340         if (rc)
3341                 RETURN(rc);
3342
3343         /* XXX: when get here, "tobj == NULL" means tobj must
3344          * NOT exist (neither on remote MDS, such case has been
3345          * processed in cld_rename before mdd_rename and enable
3346          * MDS_PERM_BYPASS).
3347          * So check may_create, but not check may_unlink.
3348          */
3349         if (tobj == NULL)
3350                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
3351                                     (src_pobj != tgt_pobj));
3352         else
3353                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, sattr,
3354                                     (src_pobj != tgt_pobj), 1);
3355
3356         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(sattr->la_mode))
3357                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
3358
3359         RETURN(rc);
3360 }
3361
3362 static
3363 int mdd_declare_rename(const struct lu_env *env, struct mdd_device *mdd,
3364                      struct mdd_object *mdd_spobj, struct mdd_object *mdd_tpobj,
3365                      struct mdd_object *mdd_sobj, struct mdd_object *mdd_tobj,
3366                      const struct lu_name *sname, const struct lu_name *tname,
3367                      struct md_attr *ma, struct linkea_data *ldata,
3368                      bool change_projid, struct thandle *handle)
3369 {
3370         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
3371         int rc;
3372
3373         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3374         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3375
3376         LASSERT(mdd_spobj);
3377         LASSERT(mdd_tpobj);
3378         LASSERT(mdd_sobj);
3379
3380         /* name from source dir */
3381         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
3382         if (rc)
3383                 return rc;
3384
3385         /* .. from source child */
3386         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3387                 /* source child can be directory, count by source dir's nlink */
3388                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
3389                 if (rc)
3390                         return rc;
3391                 if (mdd_spobj != mdd_tpobj) {
3392                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
3393                                                       handle);
3394                         if (rc != 0)
3395                                 return rc;
3396
3397                         rc = mdo_declare_index_insert(env, mdd_sobj,
3398                                                       mdd_object_fid(mdd_tpobj),
3399                                                       S_IFDIR, dotdot, handle);
3400                         if (rc != 0)
3401                                 return rc;
3402                 }
3403
3404                 /* new target child can be dir, counted by target dir's nlink */
3405                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
3406                 if (rc != 0)
3407                         return rc;
3408         }
3409
3410         la->la_valid = LA_CTIME | LA_MTIME;
3411         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
3412         if (rc != 0)
3413                 return rc;
3414
3415         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
3416         if (rc != 0)
3417                 return rc;
3418
3419         la->la_valid = LA_CTIME;
3420         if (change_projid)
3421                 la->la_valid |= LA_PROJID;
3422         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3423         if (rc)
3424                 return rc;
3425
3426         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
3427         if (rc)
3428                 return rc;
3429
3430         /* new name */
3431         rc = mdo_declare_index_insert(env, mdd_tpobj, mdd_object_fid(mdd_sobj),
3432                                       mdd_object_type(mdd_sobj),
3433                                       tname->ln_name, handle);
3434         if (rc != 0)
3435                 return rc;
3436
3437         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
3438                 /* delete target child in target parent directory */
3439                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
3440                                               handle);
3441                 if (rc)
3442                         return rc;
3443
3444                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
3445                 if (rc)
3446                         return rc;
3447
3448                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
3449                         /* target child can be directory,
3450                          * delete "." reference in target child directory
3451                          */
3452                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
3453                         if (rc)
3454                                 return rc;
3455
3456                         /* delete ".." reference in target parent directory */
3457                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
3458                         if (rc)
3459                                 return rc;
3460                 }
3461
3462                 la->la_valid = LA_CTIME;
3463                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
3464                 if (rc)
3465                         return rc;
3466
3467                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
3468                 if (rc)
3469                         return rc;
3470         }
3471
3472         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
3473                                          handle);
3474         if (rc)
3475                 return rc;
3476
3477         return rc;
3478 }
3479
3480 static
3481 int mdd_migrate_object(const struct lu_env *env, struct mdd_object *spobj,
3482                        struct mdd_object *tpobj, struct mdd_object *sobj,
3483                        struct mdd_object *tobj, const struct lu_name *sname,
3484                        const struct lu_name *tname, struct md_op_spec *spec,
3485                        struct md_attr *ma);
3486
3487 /* src object can be remote that is why we use only fid and type of object */
3488 static int mdd_rename(const struct lu_env *env,  struct md_object *src_pobj,
3489                       struct md_object *tgt_pobj, const struct lu_fid *lf,
3490                       const struct lu_name *lsname, struct md_object *tobj,
3491                       const struct lu_name *ltname, struct md_attr *ma)
3492 {
3493         const char *sname = lsname->ln_name;
3494         const char *tname = ltname->ln_name;
3495         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
3496         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
3497         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); /* target parent */
3498         struct mdd_object *mdd_sobj = NULL;                  /* source object */
3499         struct mdd_object *mdd_tobj = NULL;       /* (possible) target object */
3500         struct lu_attr *sattr = MDD_ENV_VAR(env, cattr);
3501         struct lu_attr *spattr = MDD_ENV_VAR(env, pattr);
3502         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
3503         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
3504         struct linkea_data *ldata = &mdd_env_info(env)->mdi_link_data;
3505         const struct lu_fid *tpobj_fid = mdd_object_fid(mdd_tpobj);
3506         const struct lu_fid *spobj_fid = mdd_object_fid(mdd_spobj);
3507         struct mdd_device *mdd = mdo2mdd(src_pobj);
3508         struct thandle *handle;
3509         bool is_dir;
3510         bool tobj_ref = 0;
3511         bool tobj_locked = 0;
3512         bool change_projid = false;
3513         unsigned int cl_flags = 0;
3514         int rc, rc2;
3515
3516         ENTRY;
3517
3518         /* let unlink to complete and commit */
3519         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
3520
3521         if (tobj)
3522                 mdd_tobj = md2mdd_obj(tobj);
3523
3524         mdd_sobj = mdd_object_find(env, mdd, lf);
3525         if (IS_ERR(mdd_sobj))
3526                 RETURN(PTR_ERR(mdd_sobj));
3527
3528         rc = mdd_la_get(env, mdd_sobj, sattr);
3529         if (rc)
3530                 GOTO(out_pending, rc);
3531
3532         /* if rename is cross MDTs, migrate symlink if it doesn't have other
3533          * hard links, and target doesn't exist.
3534          */
3535         if (mdd_object_remote(mdd_sobj) && S_ISLNK(sattr->la_mode) &&
3536             sattr->la_nlink == 1 && !tobj) {
3537                 struct md_op_spec *spec = &mdd_env_info(env)->mdi_spec;
3538                 struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
3539                 struct lu_fid tfid;
3540
3541                 rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
3542                                                NULL);
3543                 if (rc < 0)
3544                         GOTO(out_pending, rc);
3545
3546                 mdd_tobj = mdd_object_find(env, mdd, &tfid);
3547                 if (IS_ERR(mdd_tobj))
3548                         GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
3549
3550                 memset(spec, 0, sizeof(*spec));
3551                 rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
3552                                         mdd_tobj, lsname, ltname, spec, ma);
3553                 mdd_object_put(env, mdd_tobj);
3554                 GOTO(out_pending, rc);
3555         }
3556
3557         rc = mdd_la_get(env, mdd_spobj, spattr);
3558         if (rc)
3559                 GOTO(out_pending, rc);
3560
3561         if (mdd_tobj) {
3562                 rc = mdd_la_get(env, mdd_tobj, tattr);
3563                 if (rc)
3564                         GOTO(out_pending, rc);
3565                 /* search for an existing archive.  we should check ahead as the
3566                  * object can be destroyed in this transaction
3567                  */
3568                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
3569                         cl_flags |= CLF_RENAME_LAST_EXISTS;
3570         }
3571
3572         rc = mdd_la_get(env, mdd_tpobj, tpattr);
3573         if (rc)
3574                 GOTO(out_pending, rc);
3575
3576         rc = mdd_rename_sanity_check(env, mdd_spobj, spattr, mdd_tpobj, tpattr,
3577                                      mdd_sobj, sattr, mdd_tobj, tattr);
3578         if (rc)
3579                 GOTO(out_pending, rc);
3580
3581         rc = mdd_name_check(env, mdd, ltname);
3582         if (rc < 0)
3583                 GOTO(out_pending, rc);
3584
3585         handle = mdd_trans_create(env, mdd);
3586         if (IS_ERR(handle))
3587                 GOTO(out_pending, rc = PTR_ERR(handle));
3588
3589         memset(ldata, 0, sizeof(*ldata));
3590         rc = mdd_linkea_prepare(env, mdd_sobj, spobj_fid, lsname, tpobj_fid,
3591                                 ltname, 1, 0, ldata);
3592         if (rc)
3593                 GOTO(stop, rc);
3594
3595         if (tpattr->la_projid != sattr->la_projid &&
3596             tpattr->la_flags & LUSTRE_PROJINHERIT_FL)
3597                 change_projid = true;
3598
3599         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
3600                                 mdd_tobj, lsname, ltname, ma, ldata,
3601                                 change_projid, handle);
3602         if (rc)
3603                 GOTO(stop, rc);
3604
3605         rc = mdd_trans_start(env, mdd, handle);
3606         if (rc)
3607                 GOTO(stop, rc);
3608
3609         is_dir = S_ISDIR(sattr->la_mode);
3610
3611         /* Remove source name from source directory */
3612         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
3613         if (rc)
3614                 GOTO(stop, rc);
3615
3616         /* "mv dir1 dir2" needs "dir1/.." link update */
3617         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
3618                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3619                 if (rc)
3620                         GOTO(fixup_spobj2, rc);
3621
3622                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
3623                                              dotdot, handle);
3624                 if (rc)
3625                         GOTO(fixup_spobj, rc);
3626         }
3627
3628         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
3629                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3630                 if (rc)
3631                         /* tname might been renamed to something else */
3632                         GOTO(fixup_spobj, rc);
3633         }
3634
3635         /* Insert new fid with target name into target dir */
3636         rc = __mdd_index_insert(env, mdd_tpobj, lf, sattr->la_mode,
3637                                 tname, handle);
3638         if (rc)
3639                 GOTO(fixup_tpobj, rc);
3640
3641         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3642         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3643
3644         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3645         la->la_valid = LA_CTIME;
3646         if (change_projid) {
3647                 /* mdd_update_time honors other valid flags except TIME ones */
3648                 la->la_valid |= LA_PROJID;
3649                 la->la_projid = tpattr->la_projid;
3650         }
3651         rc = mdd_update_time(env, mdd_sobj, sattr, la, handle);
3652         if (rc)
3653                 GOTO(fixup_tpobj, rc);
3654
3655         /* Update the linkEA for the source object */
3656         mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
3657         rc = mdd_links_rename(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3658                               lsname, mdd_object_fid(mdd_tpobj), ltname,
3659                               handle, ldata, 0, 0);
3660         if (rc == -ENOENT)
3661                 /* Old files might not have EA entry */
3662                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3663                               lsname, handle, NULL, 0);
3664         mdd_write_unlock(env, mdd_sobj);
3665         /* We don't fail the transaction if the link ea can't be
3666          * updated -- fid2path will use alternate lookup method.
3667          */
3668         rc = 0;
3669
3670         /* Remove old target object
3671          * For tobj is remote case cmm layer has processed
3672          * and set tobj to NULL then. So when tobj is NOT NULL,
3673          * it must be local one.
3674          */
3675         if (tobj && mdd_object_exists(mdd_tobj)) {
3676                 mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
3677                 tobj_locked = 1;
3678                 if (mdd_is_dead_obj(mdd_tobj)) {
3679                         /* should not be dead, something is wrong */
3680                         rc = -EINVAL;
3681                         CERROR("%s: something bad, dead tobj "DFID": rc = %d\n",
3682                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3683                         goto cleanup;
3684                 }
3685                 mdo_ref_del(env, mdd_tobj, handle);
3686
3687                 /* Remove dot reference. */
3688                 if (S_ISDIR(tattr->la_mode))
3689                         mdo_ref_del(env, mdd_tobj, handle);
3690                 tobj_ref = 1;
3691
3692                 /* fetch updated nlink */
3693                 rc = mdd_la_get(env, mdd_tobj, tattr);
3694                 if (rc) {
3695                         CERROR("%s: failed get nlink of tobj "DFID": rc = %d\n",
3696                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3697                         GOTO(fixup_tpobj, rc);
3698                 }
3699
3700                 la->la_valid = LA_CTIME;
3701                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3702                 if (rc) {
3703                         CERROR("%s: failed set ctime of tobj "DFID": rc = %d\n",
3704                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3705                         GOTO(fixup_tpobj, rc);
3706                 }
3707
3708                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3709                 ma->ma_attr = *tattr;
3710                 ma->ma_valid |= MA_INODE;
3711                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3712                                        handle);
3713                 if (rc) {
3714                         CERROR("%s: failed to unlink tobj "DFID": rc = %d\n",
3715                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3716                         GOTO(fixup_tpobj, rc);
3717                 }
3718
3719                 /* fetch updated nlink */
3720                 rc = mdd_la_get(env, mdd_tobj, tattr);
3721                 if (rc == -ENOENT) {
3722                         /* object removed? return the latest known attributes */
3723                         tattr->la_nlink = 0;
3724                         rc = 0;
3725                 } else if (rc) {
3726                         CERROR("%s: failed get nlink of tobj "DFID": rc = %d\n",
3727                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3728                         GOTO(fixup_tpobj, rc);
3729                 }
3730                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3731                 ma->ma_attr = *tattr;
3732                 ma->ma_valid |= MA_INODE;
3733
3734                 if (tattr->la_nlink == 0)
3735                         cl_flags |= CLF_RENAME_LAST;
3736                 else
3737                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3738         }
3739
3740         la->la_valid = LA_CTIME | LA_MTIME;
3741         rc = mdd_update_time(env, mdd_spobj, spattr, la, handle);
3742         if (rc)
3743                 GOTO(fixup_tpobj, rc);
3744
3745         if (mdd_spobj != mdd_tpobj) {
3746                 la->la_valid = LA_CTIME | LA_MTIME;
3747                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3748                 if (rc != 0)
3749                         GOTO(fixup_tpobj, rc);
3750         }
3751
3752         EXIT;
3753
3754 fixup_tpobj:
3755         if (rc) {
3756                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3757                 if (rc2)
3758                         CWARN("%s: tpobj "DFID" fix error: rc = %d\n",
3759                               mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc2);
3760
3761                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3762                     !mdd_is_dead_obj(mdd_tobj)) {
3763                         if (tobj_ref) {
3764                                 mdo_ref_add(env, mdd_tobj, handle);
3765                                 if (is_dir)
3766                                         mdo_ref_add(env, mdd_tobj, handle);
3767                         }
3768
3769                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3770                                                  mdd_object_fid(mdd_tobj),
3771                                                  mdd_object_type(mdd_tobj),
3772                                                  tname, handle);
3773                         if (rc2)
3774                                 CWARN("%s: tpobj "DFID" fix error: rc = %d\n",
3775                                       mdd2obd_dev(mdd)->obd_name,
3776                                       PFID(tpobj_fid), rc2);
3777                 }
3778         }
3779
3780 fixup_spobj:
3781         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3782                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3783                 if (rc2)
3784                         CWARN("%s: spobj "DFID" dotdot delete error: rc = %d\n",
3785                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3786
3787
3788                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3789                                               dotdot, handle);
3790                 if (rc2)
3791                         CWARN("%s: spobj "DFID" dotdot insert error: rc = %d\n",
3792                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3793         }
3794
3795 fixup_spobj2:
3796         if (rc) {
3797                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3798                                          mdd_object_type(mdd_sobj), sname,
3799                                          handle);
3800                 if (rc2)
3801                         CWARN("%s: spobj "DFID" fix error: rc = %d\n",
3802                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3803         }
3804
3805 cleanup:
3806         if (tobj_locked)
3807                 mdd_write_unlock(env, mdd_tobj);
3808
3809         if (rc == 0)
3810                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3811                                             mdd_tobj, mdd_tpobj, tpattr, lf,
3812                                             mdd_spobj, spattr, ltname, lsname,
3813                                             handle);
3814
3815 stop:
3816         rc = mdd_trans_stop(env, mdd, rc, handle);
3817
3818 out_pending:
3819         mdd_object_put(env, mdd_sobj);
3820
3821         return rc;
3822 }
3823
3824 /**
3825  * Check whether we should migrate the file/dir
3826  * return val
3827  *      < 0  permission check failed or other error.
3828  *      = 0  the file can be migrated.
3829  **/
3830 static int mdd_migrate_sanity_check(const struct lu_env *env,
3831                                     struct mdd_device *mdd,
3832                                     struct mdd_object *spobj,
3833                                     struct mdd_object *tpobj,
3834                                     struct mdd_object *sobj,
3835                                     struct mdd_object *tobj,
3836                                     const struct lu_attr *spattr,
3837                                     const struct lu_attr *tpattr,
3838                                     const struct lu_attr *attr,
3839                                     bool nsonly)
3840 {
3841         int rc;
3842
3843         ENTRY;
3844
3845         if (!nsonly && !mdd_object_remote(sobj)) {
3846                 mdd_read_lock(env, sobj, DT_SRC_CHILD);
3847                 if (sobj->mod_count > 0) {
3848                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3849                                mdd_obj_dev_name(sobj),
3850                                PFID(mdd_object_fid(sobj)),
3851                                sobj->mod_count);
3852                         mdd_read_unlock(env, sobj);
3853                         RETURN(-EBUSY);
3854                 }
3855                 mdd_read_unlock(env, sobj);
3856         }
3857
3858         if (mdd_object_exists(tobj))
3859                 RETURN(-EEXIST);
3860
3861         rc = mdd_may_delete(env, spobj, spattr, sobj, attr, NULL, 1, 0);
3862         if (rc)
3863                 RETURN(rc);
3864
3865         rc = mdd_may_create(env, tpobj, tpattr, NULL, true);
3866
3867         RETURN(rc);
3868 }
3869
3870 struct mdd_xattr_entry {
3871         struct list_head        mxe_linkage;
3872         char                   *mxe_name;
3873         struct lu_buf           mxe_buf;
3874 };
3875
3876 struct mdd_xattrs {
3877         struct lu_buf           mx_namebuf;
3878         struct list_head        mx_list;
3879 };
3880
3881 static inline void mdd_xattrs_init(struct mdd_xattrs *xattrs)
3882 {
3883         INIT_LIST_HEAD(&xattrs->mx_list);
3884         xattrs->mx_namebuf.lb_buf = NULL;
3885         xattrs->mx_namebuf.lb_len = 0;
3886 }
3887
3888 static inline void mdd_xattrs_fini(struct mdd_xattrs *xattrs)
3889 {
3890         struct mdd_xattr_entry *entry;
3891         struct mdd_xattr_entry *tmp;
3892
3893         list_for_each_entry_safe(entry, tmp, &xattrs->mx_list, mxe_linkage) {
3894                 lu_buf_free(&entry->mxe_buf);
3895                 list_del(&entry->mxe_linkage);
3896                 OBD_FREE_PTR(entry);
3897         }
3898
3899         lu_buf_free(&xattrs->mx_namebuf);
3900 }
3901
3902 /* read xattrs into buf, but skip LMA, LMV, LINKEA if 'skip_linkea' is
3903  * set, and DMV if 'skip_dmv" is set.
3904  */
3905 static int mdd_xattrs_migrate_prep(const struct lu_env *env,
3906                                    struct mdd_xattrs *xattrs,
3907                                    struct mdd_object *sobj,
3908                                    bool skip_linkea,
3909                                    bool skip_dmv)
3910 {
3911         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
3912         struct mdd_xattr_entry *entry;
3913         bool needencxattr = false;
3914         bool encxattrfound = false;
3915         char *xname;
3916         int list_xsize;
3917         int xlen;
3918         int rem;
3919         int xsize;
3920         int rc;
3921
3922         ENTRY;
3923
3924         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3925         if (list_xsize == -ENODATA)
3926                 RETURN(0);
3927
3928         if (list_xsize < 0)
3929                 RETURN(list_xsize);
3930
3931         if (attr->la_valid & LA_FLAGS &&
3932             attr->la_flags & LUSTRE_ENCRYPT_FL) {
3933                 needencxattr = true;
3934                 list_xsize +=
3935                         strlen(LL_XATTR_NAME_ENCRYPTION_CONTEXT) + 1;
3936         }
3937
3938         lu_buf_alloc(&xattrs->mx_namebuf, list_xsize);
3939         if (xattrs->mx_namebuf.lb_buf == NULL)
3940                 RETURN(-ENOMEM);
3941
3942         rc = mdo_xattr_list(env, sobj, &xattrs->mx_namebuf);
3943         if (rc < 0)
3944                 GOTO(fini, rc);
3945
3946         rem = rc;
3947         rc = 0;
3948         xname = xattrs->mx_namebuf.lb_buf;
3949 reloop:
3950         for (; rem > 0; xname += xlen, rem -= xlen) {
3951                 if (needencxattr &&
3952                     strcmp(xname, LL_XATTR_NAME_ENCRYPTION_CONTEXT) == 0)
3953                         encxattrfound = true;
3954                 xlen = strnlen(xname, rem - 1) + 1;
3955                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3956                     strcmp(XATTR_NAME_LMV, xname) == 0)
3957                         continue;
3958
3959                 if (skip_linkea &&
3960                     strcmp(XATTR_NAME_LINK, xname) == 0)
3961                         continue;
3962
3963                 if (skip_dmv &&
3964                     strcmp(XATTR_NAME_DEFAULT_LMV, xname) == 0)
3965                         continue;
3966
3967                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3968                 if (xsize == -ENODATA)
3969                         continue;
3970                 if (xsize < 0)
3971                         GOTO(fini, rc = xsize);
3972
3973                 OBD_ALLOC_PTR(entry);
3974                 if (!entry)
3975                         GOTO(fini, rc = -ENOMEM);
3976
3977                 lu_buf_alloc(&entry->mxe_buf, xsize);
3978                 if (!entry->mxe_buf.lb_buf) {
3979                         OBD_FREE_PTR(entry);
3980                         GOTO(fini, rc = -ENOMEM);
3981                 }
3982
3983                 rc = mdo_xattr_get(env, sobj, &entry->mxe_buf, xname);
3984                 if (rc < 0) {
3985                         lu_buf_free(&entry->mxe_buf);
3986                         OBD_FREE_PTR(entry);
3987                         if (rc == -ENODATA)
3988                                 continue;
3989                         GOTO(fini, rc);
3990                 }
3991
3992                 entry->mxe_name = xname;
3993                 list_add_tail(&entry->mxe_linkage, &xattrs->mx_list);
3994         }
3995
3996         if (needencxattr && !encxattrfound) {
3997                 xlen = strlen(LL_XATTR_NAME_ENCRYPTION_CONTEXT) + 1;
3998                 strncpy(xname, LL_XATTR_NAME_ENCRYPTION_CONTEXT, xlen);
3999                 rem = xlen;
4000                 GOTO(reloop, 0);
4001         }
4002
4003         RETURN(0);
4004 fini:
4005         mdd_xattrs_fini(xattrs);
4006         RETURN(rc);
4007 }
4008
4009 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
4010                             struct mdd_object *obj,
4011                             const struct lu_buf *buf,
4012                             const char *name,
4013                             int fl, struct thandle *handle);
4014
4015 static int mdd_foreach_xattr(const struct lu_env *env,
4016                              struct mdd_object *tobj,
4017                              struct mdd_xattrs *xattrs,
4018                              struct thandle *handle,
4019                              mdd_xattr_cb cb)
4020 {
4021         struct mdd_xattr_entry *entry;
4022         int rc;
4023
4024         list_for_each_entry(entry, &xattrs->mx_list, mxe_linkage) {
4025                 rc = cb(env, tobj, &entry->mxe_buf, entry->mxe_name, 0, handle);
4026                 if (rc)
4027                         return rc;
4028         }
4029
4030         return 0;
4031 }
4032
4033 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
4034                              struct mdd_object *sobj,
4035                              struct mdd_object *tobj,
4036                              const struct lu_name *sname,
4037                              const struct lu_fid *sfid,
4038                              const struct lu_name *lname,
4039                              const struct lu_fid *fid,
4040                              void *opaque,
4041                              struct thandle *handle);
4042
4043 static int mdd_declare_update_link(const struct lu_env *env,
4044                                    struct mdd_object *sobj,
4045                                    struct mdd_object *tobj,
4046                                    const struct lu_name *tname,
4047                                    const struct lu_fid *tpfid,
4048                                    const struct lu_name *lname,
4049                                    const struct lu_fid *fid,
4050                                    void *unused,
4051                                    struct thandle *handle)
4052 {
4053         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4054         struct mdd_object *pobj;
4055         int rc;
4056
4057         /* ignore tobj */
4058         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4059             !strcmp(tname->ln_name, lname->ln_name))
4060                 return 0;
4061
4062         pobj = mdd_object_find(env, mdd, fid);
4063         if (IS_ERR(pobj))
4064                 return PTR_ERR(pobj);
4065
4066
4067         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4068         if (!rc)
4069                 rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
4070                                               mdd_object_type(sobj),
4071                                               lname->ln_name, handle);
4072         mdd_object_put(env, pobj);
4073         if (rc)
4074                 return rc;
4075
4076         rc = mdo_declare_ref_add(env, tobj, handle);
4077         if (rc)
4078                 return rc;
4079
4080         rc = mdo_declare_ref_del(env, sobj, handle);
4081         return rc;
4082 }
4083
4084 static int mdd_update_link(const struct lu_env *env,
4085                            struct mdd_object *sobj,
4086                            struct mdd_object *tobj,
4087                            const struct lu_name *tname,
4088                            const struct lu_fid *tpfid,
4089                            const struct lu_name *lname,
4090                            const struct lu_fid *fid,
4091                            void *unused,
4092                            struct thandle *handle)
4093 {
4094         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4095         struct mdd_object *pobj;
4096         int rc;
4097
4098         ENTRY;
4099
4100         /* ignore tobj */
4101         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4102             !memcmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
4103                 RETURN(0);
4104
4105         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
4106                PFID(fid), encode_fn_luname(lname), PFID(mdd_object_fid(tobj)));
4107
4108         pobj = mdd_object_find(env, mdd, fid);
4109         if (IS_ERR(pobj)) {
4110                 CWARN("%s: cannot find obj "DFID": %ld\n",
4111                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
4112                 RETURN(PTR_ERR(pobj));
4113         }
4114
4115         if (!mdd_object_exists(pobj)) {
4116                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
4117                 mdd_object_put(env, pobj);
4118                 RETURN(-ENOENT);
4119         }
4120
4121         mdd_write_lock(env, pobj, DT_TGT_PARENT);
4122         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
4123         if (!rc)
4124                 rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(tobj),
4125                                              mdd_object_type(sobj),
4126                                              lname->ln_name, handle);
4127         mdd_write_unlock(env, pobj);
4128         mdd_object_put(env, pobj);
4129         if (rc)
4130                 RETURN(rc);
4131
4132         mdd_write_lock(env, tobj, DT_TGT_CHILD);
4133         rc = mdo_ref_add(env, tobj, handle);
4134         mdd_write_unlock(env, tobj);
4135         if (rc)
4136                 RETURN(rc);
4137
4138         mdd_write_lock(env, sobj, DT_SRC_CHILD);
4139         rc = mdo_ref_del(env, sobj, handle);
4140         mdd_write_unlock(env, sobj);
4141
4142         RETURN(rc);
4143 }
4144
4145 static inline int mdd_fld_lookup(const struct lu_env *env,
4146                                  struct mdd_device *mdd,
4147                                  const struct lu_fid *fid,
4148                                  __u32 *mdt_index)
4149 {
4150         struct lu_seq_range *range = &mdd_env_info(env)->mdi_range;
4151         struct seq_server_site *ss;
4152         int rc;
4153
4154         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4155
4156         range->lsr_flags = LU_SEQ_RANGE_MDT;
4157         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4158         if (rc)
4159                 return rc;
4160
4161         *mdt_index = range->lsr_index;
4162
4163         return 0;
4164 }
4165
4166 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
4167                                      struct mdd_object *sobj,
4168                                      struct mdd_object *tobj,
4169                                      const struct lu_name *tname,
4170                                      const struct lu_fid *tpfid,
4171                                      const struct lu_name *lname,
4172                                      const struct lu_fid *fid,
4173                                      void *opaque,
4174                                      struct thandle *handle)
4175 {
4176         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4177         __u32 source_mdt_index = *(__u32 *)opaque;
4178         __u32 link_mdt_index;
4179         int rc;
4180
4181         ENTRY;
4182
4183         /* ignore tobj */
4184         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4185             !memcmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
4186                 return 0;
4187
4188         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
4189         if (rc)
4190                 RETURN(rc);
4191
4192         RETURN(link_mdt_index == source_mdt_index);
4193 }
4194
4195 static int mdd_iterate_linkea(const struct lu_env *env,
4196                               struct mdd_object *sobj,
4197                               struct mdd_object *tobj,
4198                               const struct lu_name *tname,
4199                               const struct lu_fid *tpfid,
4200                               struct linkea_data *ldata,
4201                               void *opaque,
4202                               struct thandle *handle,
4203                               mdd_linkea_cb cb)
4204 {
4205         struct mdd_thread_info *info = mdd_env_info(env);
4206         char *filename = info->mdi_name;
4207         struct lu_name lname;
4208         struct lu_fid fid;
4209         int rc = 0;
4210
4211         if (!ldata->ld_buf)
4212                 return 0;
4213
4214         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
4215              linkea_next_entry(ldata)) {
4216                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
4217                                     &fid);
4218
4219                 /* Note: lname might miss \0 at the end */
4220                 snprintf(filename, sizeof(info->mdi_name), DNAME,
4221                          lname.ln_namelen, lname.ln_name);
4222                 lname.ln_name = filename;
4223
4224                 CDEBUG(D_INFO, DFID"/"DNAME"\n",
4225                        PFID(&fid), encode_fn_luname(&lname));
4226
4227                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
4228                         handle);
4229         }
4230
4231         return rc;
4232 }
4233
4234 /**
4235  * Prepare linkea, and check whether file needs migrate: if source still has
4236  * link on source MDT, no need to migrate, just update namespace on source and
4237  * target parents.
4238  *
4239  * \retval      0 do migrate
4240  * \retval      1 don't migrate
4241  * \retval      -errno on failure
4242  */
4243 static int mdd_migrate_linkea_prepare(const struct lu_env *env,
4244                                       struct mdd_device *mdd,
4245                                       struct mdd_object *spobj,
4246                                       struct mdd_object *tpobj,
4247                                       struct mdd_object *sobj,
4248                                       const struct lu_name *sname,
4249                                       const struct lu_name *tname,
4250                                       const struct lu_attr *attr,
4251                                       struct linkea_data *ldata)
4252 {
4253         __u32 source_mdt_index;
4254         int rc;
4255
4256         ENTRY;
4257
4258         memset(ldata, 0, sizeof(*ldata));
4259         rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
4260                                 mdd_object_fid(tpobj), tname, 1, 0, ldata);
4261         if (rc)
4262                 RETURN(rc);
4263
4264         /*
4265          * Then it will check if the file should be migrated. If the file has
4266          * mulitple links, we only need migrate the file if all of its entries
4267          * has been migrated to the remote MDT.
4268          */
4269         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
4270                 RETURN(0);
4271
4272         /* If there are still links locally, don't migrate this file */
4273         LASSERT(ldata->ld_leh != NULL);
4274
4275         /* If linkEA is overflow, switch to ns-only migrate */
4276         if (unlikely(ldata->ld_leh->leh_overflow_time))
4277                 RETURN(+EOVERFLOW);
4278
4279         rc = mdd_fld_lookup(env, mdd, mdd_object_fid(sobj), &source_mdt_index);
4280         if (rc)
4281                 RETURN(rc);
4282
4283         rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
4284                                 ldata, &source_mdt_index, NULL,
4285                                 mdd_is_link_on_source_mdt);
4286         RETURN(rc);
4287 }
4288
4289 static int mdd_declare_migrate_update(const struct lu_env *env,
4290                                       struct mdd_object *spobj,
4291                                       struct mdd_object *tpobj,
4292                                       struct mdd_object *obj,
4293                                       const struct lu_name *sname,
4294                                       const struct lu_name *tname,
4295                                       struct lu_attr *attr,
4296                                       struct lu_attr *spattr,
4297                                       struct lu_attr *tpattr,
4298                                       struct linkea_data *ldata,
4299                                       struct md_attr *ma,
4300                                       struct thandle *handle)
4301 {
4302         struct mdd_thread_info *info = mdd_env_info(env);
4303         struct lu_attr *la = &info->mdi_la_for_fix;
4304         int rc;
4305
4306         rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
4307         if (rc)
4308                 return rc;
4309
4310         if (S_ISDIR(attr->la_mode)) {
4311                 rc = mdo_declare_ref_del(env, spobj, handle);
4312                 if (rc)
4313                         return rc;
4314         }
4315
4316         rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
4317                                       attr->la_mode & S_IFMT,
4318                                       tname->ln_name, handle);
4319         if (rc)
4320                 return rc;
4321
4322         rc = mdd_declare_links_add(env, obj, handle, ldata);
4323         if (rc)
4324                 return rc;
4325
4326         if (S_ISDIR(attr->la_mode)) {
4327                 rc = mdo_declare_ref_add(env, tpobj, handle);
4328                 if (rc)
4329                         return rc;
4330         }
4331
4332         la->la_valid = LA_CTIME | LA_MTIME;
4333         rc = mdo_declare_attr_set(env, spobj, la, handle);
4334         if (rc)
4335                 return rc;
4336
4337         if (tpobj != spobj) {
4338                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
4339                 if (rc)
4340                         return rc;
4341         }
4342
4343         return rc;
4344 }
4345
4346 static int mdd_declare_migrate_create(const struct lu_env *env,
4347                                       struct mdd_object *spobj,
4348                                       struct mdd_object *tpobj,
4349                                       struct mdd_object *sobj,
4350                                       struct mdd_object *tobj,
4351                                       const struct lu_name *sname,
4352                                       const struct lu_name *tname,
4353                                       struct lu_attr *spattr,
4354                                       struct lu_attr *tpattr,
4355                                       struct lu_attr *attr,
4356                                       struct lu_buf *sbuf,
4357                                       struct linkea_data *ldata,
4358                                       struct mdd_xattrs *xattrs,
4359                                       struct md_attr *ma,
4360                                       struct md_op_spec *spec,
4361                                       struct dt_allocation_hint *hint,
4362                                       struct thandle *handle)
4363 {
4364         struct mdd_thread_info *info = mdd_env_info(env);
4365         struct md_layout_change *mlc = &info->mdi_mlc;
4366         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
4367         int rc;
4368
4369         ENTRY;
4370
4371         if (S_ISDIR(attr->la_mode)) {
4372                 struct lmv_user_md *lum = spec->u.sp_ea.eadata;
4373
4374                 mlc->mlc_opc = MD_LAYOUT_DETACH;
4375                 rc = mdo_declare_layout_change(env, sobj, mlc, handle);
4376                 if (rc)
4377                         return rc;
4378
4379                 lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
4380         } else if (S_ISLNK(attr->la_mode)) {
4381                 spec->u.sp_symname.ln_name = sbuf->lb_buf;
4382                 /* don't count NUL */
4383                 spec->u.sp_symname.ln_namelen = sbuf->lb_len - 1;
4384         } else if (S_ISREG(attr->la_mode)) {
4385                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
4386                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
4387         }
4388
4389         mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
4390
4391         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
4392                                 tname, attr, handle, spec, ldata, NULL, NULL,
4393                                 NULL, hint);
4394         if (rc)
4395                 return rc;
4396
4397         /*
4398          * tobj mode will be used in mdo_declare_layout_change(), but it's not
4399          * createb yet, copy from sobj.
4400          */
4401         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
4402         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
4403                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
4404
4405         if (S_ISDIR(attr->la_mode)) {
4406                 if (!lmv) {
4407                         /* if sobj is not striped, fake a 1-stripe LMV */
4408                         LASSERT(sizeof(info->mdi_key) >
4409                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
4410                         lmv = (typeof(lmv))info->mdi_key;
4411                         memset(lmv, 0, sizeof(*lmv));
4412                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
4413                         lmv->lmv_stripe_count = cpu_to_le32(1);
4414                         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
4415                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
4416                                       mdd_object_fid(sobj));
4417                         mlc->mlc_buf.lb_buf = lmv;
4418                         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
4419                 } else {
4420                         mlc->mlc_buf = *sbuf;
4421                 }
4422                 mlc->mlc_opc = MD_LAYOUT_ATTACH;
4423                 rc = mdo_declare_layout_change(env, tobj, mlc, handle);
4424                 if (rc)
4425                         return rc;
4426         }
4427
4428         rc = mdd_foreach_xattr(env, tobj, xattrs, handle,
4429                                mdo_declare_xattr_set);
4430         if (rc)
4431                 return rc;
4432
4433         if (S_ISREG(attr->la_mode)) {
4434                 struct lu_buf fid_buf;
4435
4436                 handle->th_complex = 1;
4437
4438                 /* target may be remote, update PFID via sobj. */
4439                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
4440                 fid_buf.lb_len = sizeof(struct lu_fid);
4441                 rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
4442                                            0, handle);
4443                 if (rc)
4444                         return rc;
4445
4446                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4447                 if (rc)
4448                         return rc;
4449         }
4450
4451         if (!S_ISDIR(attr->la_mode)) {
4452                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
4453                                         mdd_object_fid(tpobj), ldata, NULL,
4454                                         handle, mdd_declare_update_link);
4455                 if (rc)
4456                         return rc;
4457         }
4458
4459         if (!S_ISDIR(attr->la_mode) || lmv) {
4460                 rc = mdo_declare_ref_del(env, sobj, handle);
4461                 if (rc)
4462                         return rc;
4463
4464                 if (S_ISDIR(attr->la_mode)) {
4465                         rc = mdo_declare_ref_del(env, sobj, handle);
4466                         if (rc)
4467                                 return rc;
4468                 }
4469
4470                 rc = mdo_declare_destroy(env, sobj, handle);
4471                 if (rc)
4472                         return rc;
4473         }
4474
4475         rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
4476                                         attr, spattr, tpattr, ldata, ma,
4477                                         handle);
4478         return rc;
4479 }
4480
4481 /**
4482  * migrate dirent from \a spobj to \a tpobj.
4483  **/
4484 static int mdd_migrate_update(const struct lu_env *env,
4485                               struct mdd_object *spobj,
4486                               struct mdd_object *tpobj,
4487                               struct mdd_object *obj,
4488                               const struct lu_name *sname,
4489                               const struct lu_name *tname,
4490                               struct lu_attr *attr,
4491                               struct lu_attr *spattr,
4492                               struct lu_attr *tpattr,
4493                               struct linkea_data *ldata,
4494                               struct md_attr *ma,
4495                               struct thandle *handle)
4496 {
4497         struct mdd_thread_info *info = mdd_env_info(env);
4498         struct lu_attr *la = &info->mdi_la_for_fix;
4499         int rc;
4500
4501         ENTRY;
4502
4503         CDEBUG(D_INFO, "update "DFID" from "DFID"/"DNAME" to "DFID"/"DNAME"\n",
4504                PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
4505                encode_fn_luname(sname), PFID(mdd_object_fid(tpobj)),
4506                encode_fn_luname(tname));
4507
4508         rc = __mdd_index_delete(env, spobj, sname->ln_name,
4509                                 S_ISDIR(attr->la_mode), handle);
4510         if (rc)
4511                 RETURN(rc);
4512
4513         rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
4514                                 attr->la_mode & S_IFMT,
4515                                 tname->ln_name, handle);
4516         if (rc)
4517                 RETURN(rc);
4518
4519         rc = mdd_links_write(env, obj, ldata, handle);
4520         if (rc)
4521                 RETURN(rc);
4522
4523         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
4524         la->la_valid = LA_CTIME | LA_MTIME;
4525         mdd_write_lock(env, spobj, DT_SRC_PARENT);
4526         rc = mdd_update_time(env, spobj, spattr, la, handle);
4527         mdd_write_unlock(env, spobj);
4528         if (rc)
4529                 RETURN(rc);
4530
4531         if (tpobj != spobj) {
4532                 la->la_valid = LA_CTIME | LA_MTIME;
4533                 mdd_write_lock(env, tpobj, DT_TGT_PARENT);
4534                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
4535                 mdd_write_unlock(env, tpobj);
4536                 if (rc)
4537                         RETURN(rc);
4538         }
4539
4540         RETURN(rc);
4541 }
4542
4543 /**
4544  * Migrate file/dir to target MDT.
4545  *
4546  * Create target according to \a spec, and then migrate xattrs, if it's
4547  * directory, migrate source stripes to target.
4548  *
4549  * \param[in] env       execution environment
4550  * \param[in] spobj     source parent object
4551  * \param[in] tpobj     target parent object
4552  * \param[in] sobj      source object
4553  * \param[in] tobj      target object
4554  * \param[in] lname     file name
4555  * \param[in] spattr    source parent attributes
4556  * \param[in] tpattr    target parent attributes
4557  * \param[in] attr      source attributes
4558  * \param[in] sbuf      source LMV buf
4559  * \param[in] spec      migrate create spec
4560  * \param[in] hint      target creation hint
4561  * \param[in] handle    tranasction handle
4562  *
4563  * \retval      0 on success
4564  * \retval      -errno on failure
4565  **/
4566 static int mdd_migrate_create(const struct lu_env *env,
4567                               struct mdd_object *spobj,
4568                               struct mdd_object *tpobj,
4569                               struct mdd_object *sobj,
4570                               struct mdd_object *tobj,
4571                               const struct lu_name *sname,
4572                               const struct lu_name *tname,
4573                               struct lu_attr *spattr,
4574                               struct lu_attr *tpattr,
4575                               struct lu_attr *attr,
4576                               const struct lu_buf *sbuf,
4577                               struct linkea_data *ldata,
4578                               struct mdd_xattrs *xattrs,
4579                               struct md_attr *ma,
4580                               struct md_op_spec *spec,
4581                               struct dt_allocation_hint *hint,
4582                               struct thandle *handle)
4583 {
4584         int rc;
4585
4586         ENTRY;
4587
4588         /*
4589          * migrate sobj stripes to tobj if it's directory:
4590          * 1. detach stripes from sobj.
4591          * 2. attach stripes to tobj, see mdd_declare_migrate_mdt().
4592          * 3. create stripes for tobj, see lod_xattr_set_lmv().
4593          */
4594         if (S_ISDIR(attr->la_mode)) {
4595                 struct mdd_thread_info *info = mdd_env_info(env);
4596                 struct md_layout_change *mlc = &info->mdi_mlc;
4597
4598                 mlc->mlc_opc = MD_LAYOUT_DETACH;
4599
4600                 mdd_write_lock(env, sobj, DT_SRC_PARENT);
4601                 rc = mdo_layout_change(env, sobj, mlc, handle);
4602                 mdd_write_unlock(env, sobj);
4603                 if (rc)
4604                         RETURN(rc);
4605         }
4606
4607         /* don't set nlink from sobj */
4608         attr->la_valid &= ~LA_NLINK;
4609
4610         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
4611                                hint, handle, false);
4612         if (rc)
4613                 RETURN(rc);
4614
4615         mdd_write_lock(env, tobj, DT_TGT_CHILD);
4616         rc = mdd_foreach_xattr(env, tobj, xattrs, handle, mdo_xattr_set);
4617         mdd_write_unlock(env, tobj);
4618         if (rc)
4619                 RETURN(rc);
4620
4621         /* for regular file, update OST objects XATTR_NAME_FID */
4622         if (S_ISREG(attr->la_mode)) {
4623                 struct lu_buf fid_buf;
4624
4625                 /* target may be remote, update PFID via sobj. */
4626                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
4627                 fid_buf.lb_len = sizeof(struct lu_fid);
4628                 rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
4629                                    handle);
4630                 if (rc)
4631                         RETURN(rc);
4632
4633                 /* delete LOV to avoid deleting OST objs when destroying sobj */
4634                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4635                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4636                 mdd_write_unlock(env, sobj);
4637                 /* O_DELAY_CREATE file may not have LOV, ignore -ENODATA */
4638                 if (rc && rc != -ENODATA)
4639                         RETURN(rc);
4640                 rc = 0;
4641         }
4642
4643         /* update links FID */
4644         if (!S_ISDIR(attr->la_mode)) {
4645                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
4646                                         mdd_object_fid(tpobj), ldata,
4647                                         NULL, handle, mdd_update_link);
4648                 if (rc)
4649                         RETURN(rc);
4650         }
4651
4652         /* don't destroy sobj if it's plain directory */
4653         if (!S_ISDIR(attr->la_mode) || sbuf->lb_buf) {
4654                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4655                 rc = mdo_ref_del(env, sobj, handle);
4656                 if (!rc) {
4657                         if (S_ISDIR(attr->la_mode))
4658                                 rc = mdo_ref_del(env, sobj, handle);
4659                         if (!rc)
4660                                 rc = mdo_destroy(env, sobj, handle);
4661                 }
4662                 mdd_write_unlock(env, sobj);
4663                 if (rc)
4664                         RETURN(rc);
4665         }
4666
4667         rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
4668                                 spattr, tpattr, ldata, ma, handle);
4669
4670         RETURN(rc);
4671 }
4672
4673 /* NB: if user issued different migrate command, we can't adjust it silently
4674  * here, because this command will decide target MDT in subdir migration in
4675  * LMV.
4676  */
4677 static int mdd_migrate_cmd_check(const struct lu_env *env, struct mdd_device *mdd,
4678                                  struct mdd_object *sobj,
4679                                  const struct lmv_mds_md_v1 *lmv,
4680                                  const struct lmv_user_md_v1 *lum,
4681                                  size_t lum_len, const struct lu_name *lname)
4682 {
4683         struct mdd_thread_info *info = mdd_env_info(env);
4684         __u32 lum_stripe_count = lum->lum_stripe_count;
4685         __u32 lum_hash_type = lum->lum_hash_type &
4686                               cpu_to_le32(LMV_HASH_TYPE_MASK);
4687         struct md_layout_change *mlc = &info->mdi_mlc;
4688         __u32 lmv_hash_type;
4689         int rc = 0;
4690         ENTRY;
4691
4692         if (lmv && !lmv_is_sane(lmv))
4693                 RETURN(-EBADF);
4694
4695         /* If stripe_count unspecified, set to 1 */
4696         if (!lum_stripe_count)
4697                 lum_stripe_count = cpu_to_le32(1);
4698
4699         /* Easy check for plain and single-striped dirs
4700          * if the object is on the target MDT already
4701          */
4702         if (!lmv || lmv->lmv_stripe_count == cpu_to_le32(1)) {
4703                 struct seq_server_site  *ss = mdd_seq_site(mdd);
4704                 struct lu_seq_range range = { 0 };
4705
4706                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
4707                 rc = fld_server_lookup(env, ss->ss_server_fld,
4708                                 fid_seq(mdd_object_fid(sobj)), &range);
4709                 if (rc)
4710                         RETURN(rc);
4711
4712                 if (lum_stripe_count == cpu_to_le32(1) &&
4713                     le32_to_cpu(lum->lum_stripe_offset) == range.lsr_index)
4714                         RETURN(-EALREADY);
4715                 RETURN(0);
4716         }
4717
4718         lmv_hash_type = lmv->lmv_hash_type & cpu_to_le32(LMV_HASH_TYPE_MASK);
4719
4720         if (lmv_is_migrating(lmv)) {
4721                 if (lum_stripe_count != lmv->lmv_migrate_offset ||
4722                     lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
4723                     (lum_hash_type && lum_hash_type != lmv_hash_type)) {
4724                         rc = -EPERM;
4725                 }
4726         } else {
4727                 /* check at top level if the target layout already applied */
4728                 if ((lum_hash_type && lum_hash_type != lmv_hash_type) ||
4729                     lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
4730                     lum_stripe_count != lmv->lmv_stripe_count)
4731                         RETURN(0);
4732         }
4733
4734         if (rc == 0) {
4735                 mlc->mlc_buf.lb_buf = (void*)lum;
4736                 mlc->mlc_buf.lb_len = lum_len;
4737                 rc = mo_layout_check(env, &sobj->mod_obj, mlc);
4738         }
4739
4740         if (rc == -EPERM) {
4741                 CERROR("%s: '"DNAME"' migration was interrupted, run "
4742                        "'lfs migrate -m %d -c %d -H %s "DNAME"' to finish migration: rc = %d\n",
4743                        mdd2obd_dev(mdd)->obd_name, encode_fn_luname(lname),
4744                        le32_to_cpu(lmv->lmv_master_mdt_index),
4745                        le32_to_cpu(lmv->lmv_migrate_offset),
4746                        mdt_hash_name[le32_to_cpu(lmv_hash_type)],
4747                        encode_fn_luname(lname), rc);
4748         }
4749
4750         RETURN(rc);
4751 }
4752
4753 /**
4754  * Internal function to migrate directory or file between MDTs.
4755  *
4756  * migrate source to target in following steps:
4757  *   1. create target, append source stripes after target's if it's directory,
4758  *      migrate xattrs and update fid of source links.
4759  *   2. update namespace: migrate dirent from source parent to target parent,
4760  *      update file linkea, and destroy source if it's not needed any more.
4761  *
4762  * \param[in] env       execution environment
4763  * \param[in] spobj     source parent object
4764  * \param[in] tpobj     target parent object
4765  * \param[in] sobj      source object
4766  * \param[in] tobj      target object
4767  * \param[in] sname     source file name
4768  * \param[in] tname     target file name
4769  * \param[in] spec      target creation spec
4770  * \param[in] ma        used to update \a pobj mtime and ctime
4771  *
4772  * \retval              0 on success
4773  * \retval              -errno on failure
4774  */
4775 static int mdd_migrate_object(const struct lu_env *env,
4776                               struct mdd_object *spobj,
4777                               struct mdd_object *tpobj,
4778                               struct mdd_object *sobj,
4779                               struct mdd_object *tobj,
4780                               const struct lu_name *sname,
4781                               const struct lu_name *tname,
4782                               struct md_op_spec *spec,
4783                               struct md_attr *ma)
4784 {
4785         struct mdd_thread_info *info = mdd_env_info(env);
4786         struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
4787         struct lu_attr *spattr = &info->mdi_pattr;
4788         struct lu_attr *tpattr = &info->mdi_tpattr;
4789         struct lu_attr *attr = &info->mdi_cattr;
4790         struct linkea_data *ldata = &info->mdi_link_data;
4791         struct dt_allocation_hint *hint = &info->mdi_hint;
4792         struct lu_buf sbuf = { NULL };
4793         struct mdd_xattrs xattrs;
4794         struct lmv_mds_md_v1 *lmv;
4795         struct thandle *handle;
4796         int retried = 0;
4797         int rc;
4798
4799         ENTRY;
4800
4801         CDEBUG(D_INFO, "migrate "DNAME" from "DFID"/"DFID" to "DFID"/"DFID"\n",
4802                encode_fn_luname(sname), PFID(mdd_object_fid(spobj)),
4803                PFID(mdd_object_fid(sobj)), PFID(mdd_object_fid(tpobj)),
4804                PFID(mdd_object_fid(tobj)));
4805
4806 retry:
4807         rc = mdd_la_get(env, sobj, attr);
4808         if (rc)
4809                 RETURN(rc);
4810
4811         rc = mdd_la_get(env, spobj, spattr);
4812         if (rc)
4813                 RETURN(rc);
4814
4815         rc = mdd_la_get(env, tpobj, tpattr);
4816         if (rc)
4817                 RETURN(rc);
4818
4819         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4820                                       spattr, tpattr, attr,
4821                                       spec->sp_migrate_nsonly);
4822         if (rc == -EBUSY && !spec->sp_migrate_nsonly) {
4823                 spec->sp_migrate_nsonly = 1;
4824                 CWARN("%s: "DFID"/%s is open, migrate only dentry\n",
4825                       mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(spobj)),
4826                       sname->ln_name);
4827                 rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj,
4828                                               tobj, spattr, tpattr, attr,
4829                                               spec->sp_migrate_nsonly);
4830         }
4831         if (rc)
4832                 RETURN(rc);
4833
4834         mdd_xattrs_init(&xattrs);
4835
4836         if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
4837                 struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
4838                 size_t lum_len = spec->u.sp_ea.eadatalen;
4839
4840                 LASSERT(lum);
4841
4842                 /* if user use default value '0' for stripe_count, we need to
4843                  * adjust it to '1' to create a 1-stripe directory.
4844                  */
4845                 if (lum->lum_stripe_count == 0)
4846                         lum->lum_stripe_count = cpu_to_le32(1);
4847
4848                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4849                 if (rc && rc != -ENODATA)
4850                         GOTO(out, rc);
4851
4852                 lmv = sbuf.lb_buf;
4853                 rc = mdd_migrate_cmd_check(env, mdd, sobj, lmv, lum,
4854                                            lum_len, sname);
4855                 if (rc)
4856                         GOTO(out, rc);
4857         } else if (!S_ISDIR(attr->la_mode)) {
4858                 /* update namespace only if @sobj is on MDT where @tpobj is. */
4859                 if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
4860                         spec->sp_migrate_nsonly = true;
4861
4862                 if (S_ISLNK(attr->la_mode)) {
4863                         lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4864                         if (!sbuf.lb_buf)
4865                                 GOTO(out, rc = -ENOMEM);
4866
4867                         rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4868                         if (rc <= 0) {
4869                                 rc = rc ?: -EFAULT;
4870                                 CERROR("%s: "DFID" readlink failed: rc = %d\n",
4871                                        mdd2obd_dev(mdd)->obd_name,
4872                                        PFID(mdd_object_fid(sobj)), rc);
4873                                 GOTO(out, rc);
4874                         }
4875                 }
4876         }
4877
4878         /* linkea needs update upon FID or parent stripe change */
4879         rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
4880                                         tname, attr, ldata);
4881         if (rc > 0)
4882                 /* update namespace only if @sobj has link on its MDT. */
4883                 spec->sp_migrate_nsonly = true;
4884         else if (rc < 0)
4885                 GOTO(out, rc);
4886
4887         /* migrate inode will migrate xattrs, prepare xattrs early to avoid
4888          * RPCs inside transaction.
4889          */
4890         if (!spec->sp_migrate_nsonly) {
4891                 rc = mdd_xattrs_migrate_prep(env, &xattrs, sobj, true, true);
4892                 if (rc)
4893                         GOTO(out, rc);
4894         }
4895
4896         handle = mdd_trans_create(env, mdd);
4897         if (IS_ERR(handle))
4898                 GOTO(out, rc = PTR_ERR(handle));
4899
4900         if (spec->sp_migrate_nsonly)
4901                 rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
4902                                                 tname, attr, spattr, tpattr,
4903                                                 ldata, ma, handle);
4904         else
4905                 rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
4906                                                 sname, tname, spattr, tpattr,
4907                                                 attr, &sbuf, ldata, &xattrs, ma,
4908                                                 spec, hint, handle);
4909         if (rc)
4910                 GOTO(stop, rc);
4911
4912         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
4913                                          handle);
4914         if (rc)
4915                 GOTO(stop, rc);
4916
4917         rc = mdd_trans_start(env, mdd, handle);
4918         if (rc)
4919                 GOTO(stop, rc);
4920
4921         if (spec->sp_migrate_nsonly)
4922                 rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
4923                                         attr, spattr, tpattr, ldata, ma,
4924                                         handle);
4925         else
4926                 rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
4927                                         tname, spattr, tpattr, attr, &sbuf,
4928                                         ldata, &xattrs, ma, spec, hint, handle);
4929         if (rc)
4930                 GOTO(stop, rc);
4931
4932         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
4933                                     spec->sp_migrate_nsonly ? sobj : tobj,
4934                                     spobj, spattr, mdd_object_fid(sobj),
4935                                     tpobj, tpattr, tname, sname,
4936                                     handle);
4937
4938 stop:
4939         rc = mdd_trans_stop(env, mdd, rc, handle);
4940 out:
4941         mdd_xattrs_fini(&xattrs);
4942         lu_buf_free(&sbuf);
4943
4944         /**
4945          * -EAGAIN means transaction execution phase detect the layout
4946          * has been changed by others.
4947          */
4948         if (rc == -EAGAIN && retried++ < MAX_TRANS_RETRIED)
4949                 GOTO(retry, retried);
4950
4951         RETURN(rc);
4952 }
4953
4954 /**
4955  * Migrate directory or file between MDTs.
4956  *
4957  * \param[in] env       execution environment
4958  * \param[in] md_spobj  source parent object
4959  * \param[in] md_tpobj  target parent object
4960  * \param[in] md_sobj   source object
4961  * \param[in] lname     file name
4962  * \param[in] md_tobj   target object
4963  * \param[in] spec      target creation spec
4964  * \param[in] ma        used to update \a pobj mtime and ctime
4965  *
4966  * \retval              0 on success
4967  * \retval              -errno on failure
4968  */
4969 static int mdd_migrate(const struct lu_env *env, struct md_object *md_spobj,
4970                        struct md_object *md_tpobj, struct md_object *md_sobj,
4971                        struct md_object *md_tobj, const struct lu_name *lname,
4972                        struct md_op_spec *spec, struct md_attr *ma)
4973 {
4974         return mdd_migrate_object(env, md2mdd_obj(md_spobj),
4975                                   md2mdd_obj(md_tpobj), md2mdd_obj(md_sobj),
4976                                   md2mdd_obj(md_tobj), lname, lname, spec, ma);
4977 }
4978
4979 static int mdd_declare_1sd_collapse(const struct lu_env *env,
4980                                     struct mdd_object *pobj,
4981                                     struct mdd_object *obj,
4982                                     struct mdd_object *stripe,
4983                                     struct lu_attr *attr,
4984                                     struct mdd_xattrs *xattrs,
4985                                     struct md_layout_change *mlc,
4986                                     struct lu_name *lname,
4987                                     struct thandle *handle)
4988 {
4989         int rc;
4990
4991         mlc->mlc_opc = MD_LAYOUT_DETACH;
4992         rc = mdo_declare_layout_change(env, obj, mlc, handle);
4993         if (rc)
4994                 return rc;
4995
4996         rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
4997                                       S_IFDIR, dotdot, handle);
4998         if (rc)
4999                 return rc;
5000
5001         rc = mdd_foreach_xattr(env, stripe, xattrs, handle,
5002                                mdo_declare_xattr_set);
5003         if (rc)
5004                 return rc;
5005
5006         rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
5007         if (rc)
5008                 return rc;
5009
5010         rc = mdo_declare_attr_set(env, stripe, attr, handle);
5011         if (rc)
5012                 return rc;
5013
5014         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
5015         if (rc)
5016                 return rc;
5017
5018         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(stripe),
5019                                       attr->la_mode, lname->ln_name, handle);
5020         if (rc)
5021                 return rc;
5022
5023         rc = mdo_declare_ref_del(env, obj, handle);
5024         if (rc)
5025                 return rc;
5026
5027         rc = mdo_declare_ref_del(env, obj, handle);
5028         if (rc)
5029                 return rc;
5030
5031         rc = mdo_declare_destroy(env, obj, handle);
5032         if (rc)
5033                 return rc;
5034
5035         return rc;
5036 }
5037
5038 /* transform one-stripe directory to a plain directory */
5039 static int mdd_1sd_collapse(const struct lu_env *env,
5040                             struct mdd_object *pobj,
5041                             struct mdd_object *obj,
5042                             struct mdd_object *stripe,
5043                             struct lu_attr *attr,
5044                             struct mdd_xattrs *xattrs,
5045                             struct md_layout_change *mlc,
5046                             struct lu_name *lname,
5047                             struct thandle *handle)
5048 {
5049         int rc;
5050
5051         ENTRY;
5052
5053         /* replace 1-stripe directory with its stripe */
5054         mlc->mlc_opc = MD_LAYOUT_DETACH;
5055
5056         mdd_write_lock(env, obj, DT_SRC_PARENT);
5057         rc = mdo_layout_change(env, obj, mlc, handle);
5058         mdd_write_unlock(env, obj);
5059         if (rc)
5060                 RETURN(rc);
5061
5062         mdd_write_lock(env, pobj, DT_SRC_PARENT);
5063         mdd_write_lock(env, obj, DT_SRC_CHILD);
5064
5065         /* insert dotdot to stripe which points to parent */
5066         rc = __mdd_index_insert_only(env, stripe, mdd_object_fid(pobj),
5067                                      S_IFDIR, dotdot, handle);
5068         if (rc)
5069                 GOTO(out, rc);
5070
5071         rc = mdd_foreach_xattr(env, stripe, xattrs, handle, mdo_xattr_set);
5072         if (rc)
5073                 GOTO(out, rc);
5074
5075         /* delete LMV */
5076         rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
5077         if (rc)
5078                 GOTO(out, rc);
5079
5080         /* don't set nlink from parent */
5081         attr->la_valid &= ~LA_NLINK;
5082
5083         rc = mdo_attr_set(env, stripe, attr, handle);
5084         if (rc)
5085                 GOTO(out, rc);
5086
5087         /* delete dir name from parent */
5088         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
5089         if (rc)
5090                 GOTO(out, rc);
5091
5092         /* insert stripe to parent with dir name */
5093         rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(stripe),
5094                                      attr->la_mode, lname->ln_name, handle);
5095         if (rc)
5096                 GOTO(out, rc);
5097
5098         /* destroy dir obj */
5099         rc = mdo_ref_del(env, obj, handle);
5100         if (rc)
5101                 GOTO(out, rc);
5102
5103         rc = mdo_ref_del(env, obj, handle);
5104         if (rc)
5105                 GOTO(out, rc);
5106
5107         rc = mdo_destroy(env, obj, handle);
5108         if (rc)
5109                 GOTO(out, rc);
5110
5111         EXIT;
5112 out:
5113         mdd_write_unlock(env, obj);
5114         mdd_write_unlock(env, pobj);
5115
5116         return rc;
5117 }
5118
5119 /*
5120  * shrink directory stripes after migration/merge
5121  */
5122 int mdd_dir_layout_shrink(const struct lu_env *env,
5123                           struct md_object *md_obj,
5124                           struct md_layout_change *mlc)
5125 {
5126         struct mdd_device *mdd = mdo2mdd(md_obj);
5127         struct mdd_thread_info *info = mdd_env_info(env);
5128         struct mdd_object *obj = md2mdd_obj(md_obj);
5129         struct mdd_object *pobj = NULL;
5130         struct mdd_object *stripe = NULL;
5131         struct lu_attr *attr = &info->mdi_pattr;
5132         struct lu_fid *fid = &info->mdi_fid2;
5133         struct lu_name lname = { NULL };
5134         struct lu_buf lmv_buf = { NULL };
5135         struct mdd_xattrs xattrs;
5136         struct lmv_mds_md_v1 *lmv;
5137         struct lmv_user_md *lmu;
5138         struct thandle *handle;
5139         int rc;
5140
5141         ENTRY;
5142
5143         rc = mdd_la_get(env, obj, attr);
5144         if (rc)
5145                 RETURN(rc);
5146
5147         if (!S_ISDIR(attr->la_mode))
5148                 RETURN(-ENOTDIR);
5149
5150         rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
5151         if (rc < 0)
5152                 RETURN(rc);
5153
5154         lmv = lmv_buf.lb_buf;
5155         if (!lmv_is_sane(lmv))
5156                 GOTO(out_lmv, rc = -EBADF);
5157
5158         lmu = mlc->mlc_buf.lb_buf;
5159
5160         /* adjust the default value '0' to '1' */
5161         if (lmu->lum_stripe_count == 0)
5162                 lmu->lum_stripe_count = cpu_to_le32(1);
5163
5164         /* these were checked in MDT */
5165         LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
5166                 le32_to_cpu(lmv->lmv_stripe_count));
5167         LASSERT(!lmv_is_splitting(lmv));
5168         LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
5169
5170         mdd_xattrs_init(&xattrs);
5171
5172         /* if dir stripe count will be shrunk to 1, it needs to be transformed
5173          * to a plain dir, which will cause FID change and namespace update.
5174          */
5175         if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
5176                 struct linkea_data *ldata = &info->mdi_link_data;
5177                 char *filename = info->mdi_name;
5178
5179                 rc = mdd_links_read(env, obj, ldata);
5180                 if (rc)
5181                         GOTO(out, rc);
5182
5183                 if (ldata->ld_leh->leh_reccount > 1)
5184                         GOTO(out, rc = -EINVAL);
5185
5186                 linkea_first_entry(ldata);
5187                 if (!ldata->ld_lee)
5188                         GOTO(out, rc = -ENODATA);
5189
5190                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
5191                                     fid);
5192
5193                 /* Note: lname might miss \0 at the end */
5194                 snprintf(filename, sizeof(info->mdi_name), DNAME,
5195                          lname.ln_namelen, lname.ln_name);
5196                 lname.ln_name = filename;
5197
5198                 pobj = mdd_object_find(env, mdd, fid);
5199                 if (IS_ERR(pobj)) {
5200                         rc = PTR_ERR(pobj);
5201                         pobj = NULL;
5202                         GOTO(out, rc);
5203                 }
5204
5205                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
5206
5207                 stripe = mdd_object_find(env, mdd, fid);
5208                 if (IS_ERR(stripe)) {
5209                         mdd_object_put(env, pobj);
5210                         pobj = NULL;
5211                         GOTO(out, rc = PTR_ERR(stripe));
5212                 }
5213
5214                 if (!lmv_is_fixed(lmv))
5215                         rc = mdd_xattrs_migrate_prep(env, &xattrs, obj, false,
5216                                                      false);
5217         }
5218
5219         handle = mdd_trans_create(env, mdd);
5220         if (IS_ERR(handle))
5221                 GOTO(out, rc = PTR_ERR(handle));
5222
5223         mlc->mlc_opc = MD_LAYOUT_SHRINK;
5224         rc = mdo_declare_layout_change(env, obj, mlc, handle);
5225         if (rc)
5226                 GOTO(stop_trans, rc);
5227
5228         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
5229                 rc = mdd_declare_1sd_collapse(env, pobj, obj, stripe, attr,
5230                                               &xattrs, mlc, &lname, handle);
5231                 if (rc)
5232                         GOTO(stop_trans, rc);
5233         }
5234
5235         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
5236                                          handle);
5237         if (rc)
5238                 GOTO(stop_trans, rc);
5239
5240         rc = mdd_trans_start(env, mdd, handle);
5241         if (rc)
5242                 GOTO(stop_trans, rc);
5243
5244         mdd_write_lock(env, obj, DT_SRC_PARENT);
5245         mlc->mlc_opc = MD_LAYOUT_SHRINK;
5246         rc = mdo_layout_change(env, obj, mlc, handle);
5247         mdd_write_unlock(env, obj);
5248         if (rc)
5249                 GOTO(stop_trans, rc);
5250
5251         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
5252                 rc = mdd_1sd_collapse(env, pobj, obj, stripe, attr, &xattrs,
5253                                       mlc, &lname, handle);
5254                 if (rc)
5255                         GOTO(stop_trans, rc);
5256         }
5257
5258         rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
5259                                             XATTR_NAME_LMV, handle);
5260         GOTO(stop_trans, rc);
5261
5262 stop_trans:
5263         rc = mdd_trans_stop(env, mdd, rc, handle);
5264 out:
5265         mdd_xattrs_fini(&xattrs);
5266         if (pobj) {
5267                 mdd_object_put(env, stripe);
5268                 mdd_object_put(env, pobj);
5269         }
5270 out_lmv:
5271         lu_buf_free(&lmv_buf);
5272         return rc;
5273 }
5274
5275 static int mdd_dir_declare_split_plain(const struct lu_env *env,
5276                                         struct mdd_device *mdd,
5277                                         struct mdd_object *pobj,
5278                                         struct mdd_object *obj,
5279                                         struct mdd_object *tobj,
5280                                         struct mdd_xattrs *xattrs,
5281                                         struct md_layout_change *mlc,
5282                                         struct dt_allocation_hint *hint,
5283                                         struct thandle *handle)
5284 {
5285         struct mdd_thread_info *info = mdd_env_info(env);
5286         const struct lu_name *lname = mlc->mlc_name;
5287         struct lu_attr *la = &info->mdi_la_for_fix;
5288         struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
5289         struct linkea_data *ldata = &info->mdi_link_data;
5290         struct lmv_mds_md_v1 *lmv;
5291         __u32 count;
5292         int rc;
5293
5294         mlc->mlc_opc = MD_LAYOUT_DETACH;
5295         rc = mdo_declare_layout_change(env, obj, mlc, handle);
5296         if (rc)
5297                 return rc;
5298
5299         memset(ldata, 0, sizeof(*ldata));
5300         rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
5301                                 lname, 1, 0, ldata);
5302         if (rc)
5303                 return rc;
5304
5305         count = lum->lum_stripe_count;
5306         lum->lum_stripe_count = 0;
5307         /* don't set default LMV since it will become a striped dir  */
5308         lum->lum_max_inherit = LMV_INHERIT_NONE;
5309         mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
5310                              hint);
5311         rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
5312                                 lname, mlc->mlc_attr, handle, mlc->mlc_spec,
5313                                 ldata, NULL, NULL, NULL, hint);
5314         if (rc)
5315                 return rc;
5316
5317         /* tobj mode will be used in lod_declare_xattr_set(), but it's not
5318          * created yet.
5319          */
5320         tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
5321
5322         lmv = (typeof(lmv))info->mdi_key;
5323         memset(lmv, 0, sizeof(*lmv));
5324         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
5325         lmv->lmv_stripe_count = cpu_to_le32(1);
5326         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
5327         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
5328
5329         mlc->mlc_opc = MD_LAYOUT_ATTACH;
5330         mlc->mlc_buf.lb_buf = lmv;
5331         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
5332         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
5333         if (rc)
5334                 return rc;
5335
5336         rc = mdd_foreach_xattr(env, tobj, xattrs, handle,
5337                                mdo_declare_xattr_set);
5338         if (rc)
5339                 return rc;
5340
5341         lum->lum_stripe_count = count;
5342         mlc->mlc_opc = MD_LAYOUT_SPLIT;
5343         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
5344         if (rc)
5345                 return rc;
5346
5347         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
5348         if (rc)
5349                 return rc;
5350
5351         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
5352                                       S_IFDIR, lname->ln_name, handle);
5353         if (rc)
5354                 return rc;
5355
5356         la->la_valid = LA_CTIME | LA_MTIME;
5357         rc = mdo_declare_attr_set(env, obj, la, handle);
5358         if (rc)
5359                 return rc;
5360
5361         rc = mdo_declare_attr_set(env, pobj, la, handle);
5362         if (rc)
5363                 return rc;
5364
5365         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
5366                                          handle);
5367         return rc;
5368 }
5369
5370 /**
5371  * plain directory split:
5372  * 1. create \a tobj as plain directory.
5373  * 2. append \a obj as first stripe of \a tobj.
5374  * 3. migrate xattrs from \a obj to \a tobj.
5375  * 4. split \a tobj to specific stripe count.
5376  */
5377 static int mdd_dir_split_plain(const struct lu_env *env,
5378                                 struct mdd_device *mdd,
5379                                 struct mdd_object *pobj,
5380                                 struct mdd_object *obj,
5381                                 struct mdd_object *tobj,
5382                                 struct mdd_xattrs *xattrs,
5383                                 struct md_layout_change *mlc,
5384                                 struct dt_allocation_hint *hint,
5385                                 struct thandle *handle)
5386 {
5387         struct mdd_thread_info *info = mdd_env_info(env);
5388         struct lu_attr *pattr = &info->mdi_pattr;
5389         struct lu_attr *la = &info->mdi_la_for_fix;
5390         const struct lu_name *lname = mlc->mlc_name;
5391         struct linkea_data *ldata = &info->mdi_link_data;
5392         int rc;
5393
5394         ENTRY;
5395
5396         /* copy linkea out and set on target later */
5397         rc = mdd_links_read(env, obj, ldata);
5398         if (rc)
5399                 RETURN(rc);
5400
5401         mlc->mlc_opc = MD_LAYOUT_DETACH;
5402         rc = mdo_layout_change(env, obj, mlc, handle);
5403         if (rc)
5404                 RETURN(rc);
5405
5406         /* don't set nlink from obj */
5407         mlc->mlc_attr->la_valid &= ~LA_NLINK;
5408
5409         rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
5410                                NULL, NULL, NULL, hint, handle, false);
5411         if (rc)
5412                 RETURN(rc);
5413
5414         rc = mdd_foreach_xattr(env, tobj, xattrs, handle, mdo_xattr_set);
5415         if (rc)
5416                 RETURN(rc);
5417
5418         rc = mdd_links_write(env, tobj, ldata, handle);
5419         if (rc)
5420                 RETURN(rc);
5421
5422         rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
5423         if (rc)
5424                 RETURN(rc);
5425
5426         rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
5427                                 lname->ln_name, handle);
5428         if (rc)
5429                 RETURN(rc);
5430
5431         la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
5432         la->la_valid = LA_CTIME | LA_MTIME;
5433
5434         mdd_write_lock(env, obj, DT_SRC_CHILD);
5435         rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
5436         mdd_write_unlock(env, obj);
5437         if (rc)
5438                 RETURN(rc);
5439
5440         rc = mdd_la_get(env, pobj, pattr);
5441         if (rc)
5442                 RETURN(rc);
5443
5444         la->la_valid = LA_CTIME | LA_MTIME;
5445
5446         mdd_write_lock(env, pobj, DT_SRC_PARENT);
5447         rc = mdd_update_time(env, pobj, pattr, la, handle);
5448         mdd_write_unlock(env, pobj);
5449         if (rc)
5450                 RETURN(rc);
5451
5452         /* FID changes, record it as CL_MIGRATE */
5453         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
5454                                     pobj, pattr, mdd_object_fid(obj),
5455                                     pobj, pattr, lname, lname, handle);
5456         RETURN(rc);
5457 }
5458
5459 int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
5460                          struct md_layout_change *mlc)
5461 {
5462         struct mdd_thread_info *info = mdd_env_info(env);
5463         struct mdd_device *mdd = mdo2mdd(o);
5464         struct mdd_object *obj = md2mdd_obj(o);
5465         struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
5466         struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
5467         struct dt_allocation_hint *hint = &info->mdi_hint;
5468         bool is_plain = false;
5469         struct mdd_xattrs xattrs;
5470         struct thandle *handle;
5471         int rc;
5472
5473         ENTRY;
5474
5475         LASSERT(S_ISDIR(mdd_object_type(obj)));
5476
5477         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
5478         if (rc == -ENODATA)
5479                 is_plain = true;
5480         else if (rc < 0)
5481                 RETURN(rc);
5482
5483         mdd_xattrs_init(&xattrs);
5484         if (is_plain)
5485                 rc = mdd_xattrs_migrate_prep(env, &xattrs, obj, true, true);
5486
5487         handle = mdd_trans_create(env, mdd);
5488         if (IS_ERR(handle))
5489                 GOTO(out, rc = PTR_ERR(handle));
5490
5491         if (is_plain) {
5492                 rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj,
5493                                                  &xattrs, mlc, hint, handle);
5494         } else {
5495                 mlc->mlc_opc = MD_LAYOUT_SPLIT;
5496                 rc = mdo_declare_layout_change(env, obj, mlc, handle);
5497                 if (rc)
5498                         GOTO(stop_trans, rc);
5499
5500                 rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
5501                                                  NULL, handle);
5502         }
5503         if (rc)
5504                 GOTO(stop_trans, rc);
5505
5506         rc = mdd_trans_start(env, mdd, handle);
5507         if (rc)
5508                 GOTO(stop_trans, rc);
5509
5510         if (is_plain) {
5511                 rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, &xattrs,
5512                                          mlc, hint, handle);
5513         } else {
5514                 struct lu_buf *buf = &info->mdi_buf[0];
5515
5516                 buf->lb_buf = mlc->mlc_spec->u.sp_ea.eadata;
5517                 buf->lb_len = mlc->mlc_spec->u.sp_ea.eadatalen;
5518
5519                 mdd_write_lock(env, obj, DT_TGT_CHILD);
5520                 rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LMV,
5521                                    LU_XATTR_CREATE, handle);
5522                 mdd_write_unlock(env, obj);
5523                 if (rc)
5524                         GOTO(stop_trans, rc);
5525
5526                 rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
5527                                                     XATTR_NAME_LMV, handle);
5528         }
5529         if (rc)
5530                 GOTO(stop_trans, rc);
5531
5532         EXIT;
5533
5534 stop_trans:
5535         rc = mdd_trans_stop(env, mdd, rc, handle);
5536 out:
5537         mdd_xattrs_fini(&xattrs);
5538
5539         return rc;
5540 }
5541
5542 const struct md_dir_operations mdd_dir_ops = {
5543         .mdo_is_subdir     = mdd_is_subdir,
5544         .mdo_lookup        = mdd_lookup,
5545         .mdo_create        = mdd_create,
5546         .mdo_rename        = mdd_rename,
5547         .mdo_link          = mdd_link,
5548         .mdo_unlink        = mdd_unlink,
5549         .mdo_create_data   = mdd_create_data,
5550         .mdo_migrate       = mdd_migrate,
5551 };