Whamcloud - gitweb
LU-17000 utils: handle_yaml_no_op() has wrong signature
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Lustre Metadata Server (mdd) routines
14  *
15  * Author: Wang Di <wangdi@intel.com>
16  */
17
18 #define DEBUG_SUBSYSTEM S_MDS
19
20 #include <obd_class.h>
21 #include <obd_support.h>
22 #include <lustre_mds.h>
23 #include <lustre_fid.h>
24 #include <lustre_lmv.h>
25 #include <lustre_idmap.h>
26 #include <lustre_crypto.h>
27 #include <uapi/linux/lustre/lgss.h>
28
29 #include "mdd_internal.h"
30
31 static const char dot[] = ".";
32 static const char dotdot[] = "..";
33
34 static struct lu_name lname_dotdot = {
35         .ln_name        = (char *) dotdot,
36         .ln_namelen     = sizeof(dotdot) - 1,
37 };
38
39 static inline int
40 mdd_name_check(const struct lu_env *env, struct mdd_device *m,
41                const struct lu_name *ln)
42 {
43         struct mdd_thread_info *info = mdd_env_info(env);
44         bool enc = info->mdi_pattr.la_valid & LA_FLAGS &&
45                 info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
46
47         if (!lu_name_is_valid(ln))
48                 return -EINVAL;
49         else if (!enc && ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
50                 return -ENAMETOOLONG;
51         else
52                 return 0;
53 }
54
55 /* Get FID from name and parent */
56 static int
57 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
58              const struct lu_attr *pattr, const struct lu_name *lname,
59              struct lu_fid *fid, unsigned int may_mask)
60 {
61         const char *name = lname->ln_name;
62         const struct dt_key *key = (const struct dt_key *)name;
63         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
64         struct dt_object *dir = mdd_object_child(mdd_obj);
65         int rc;
66
67         ENTRY;
68
69         if (unlikely(mdd_is_dead_obj(mdd_obj)))
70                 RETURN(-ESTALE);
71
72         if (!mdd_object_exists(mdd_obj))
73                 RETURN(-ESTALE);
74
75         if (mdd_object_remote(mdd_obj)) {
76                 CDEBUG(D_INFO, "%s: Object "DFID" located on remote server\n",
77                        mdd_obj_dev_name(mdd_obj),
78                        PFID(mdd_object_fid(mdd_obj)));
79         }
80
81         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, may_mask,
82                                             DT_TGT_PARENT);
83         if (rc)
84                 RETURN(rc);
85
86         if (likely(dt_try_as_dir(env, dir, true)))
87                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
88         else
89                 rc = -ENOTDIR;
90
91         RETURN(rc);
92 }
93
94 int mdd_lookup(const struct lu_env *env,
95                struct md_object *pobj, const struct lu_name *lname,
96                struct lu_fid *fid, struct md_op_spec *spec)
97 {
98         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
99         int rc;
100
101         ENTRY;
102
103         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
104         if (rc != 0)
105                 RETURN(rc);
106
107         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
108                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
109         RETURN(rc);
110 }
111
112 /** Read the link EA into a temp buffer.
113  * Uses the mdd_thread_info::mdi_link_buf since it is generally large.
114  * A pointer to the buffer is stored in \a ldata::ld_buf.
115  *
116  * \retval 0 or error
117  */
118 static int __mdd_links_read(const struct lu_env *env,
119                             struct mdd_object *mdd_obj,
120                             struct linkea_data *ldata)
121 {
122         int rc;
123
124         if (!mdd_object_exists(mdd_obj))
125                 return -ENODATA;
126
127         /* First try a small buf */
128         LASSERT(env != NULL);
129         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_link_buf,
130                                                PAGE_SIZE);
131         if (ldata->ld_buf->lb_buf == NULL)
132                 return -ENOMEM;
133
134         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
135         if (rc == -ERANGE) {
136                 /* Buf was too small, figure out what we need. */
137                 lu_buf_free(ldata->ld_buf);
138                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
139                                    XATTR_NAME_LINK);
140                 if (rc < 0)
141                         return rc;
142                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
143                 if (ldata->ld_buf->lb_buf == NULL)
144                         return -ENOMEM;
145                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
146                                   XATTR_NAME_LINK);
147         }
148         if (rc < 0) {
149                 lu_buf_free(ldata->ld_buf);
150                 ldata->ld_buf = NULL;
151                 return rc;
152         }
153
154         return linkea_init(ldata);
155 }
156
157 int mdd_links_read(const struct lu_env *env,
158                    struct mdd_object *mdd_obj,
159                    struct linkea_data *ldata)
160 {
161         int rc;
162
163         rc = __mdd_links_read(env, mdd_obj, ldata);
164         if (!rc)
165                 rc = linkea_init(ldata);
166
167         return rc;
168 }
169
170 static int mdd_links_read_with_rec(const struct lu_env *env,
171                                    struct mdd_object *mdd_obj,
172                                    struct linkea_data *ldata)
173 {
174         int rc;
175
176         rc = __mdd_links_read(env, mdd_obj, ldata);
177         if (!rc)
178                 rc = linkea_init_with_rec(ldata);
179
180         return rc;
181 }
182
183 /**
184  * Get parent FID of the directory
185  *
186  * Read parent FID from linkEA, if that fails, then do lookup
187  * dotdot to get the parent FID.
188  *
189  * \param[in] env       execution environment
190  * \param[in] obj       object from which to find the parent FID
191  * \param[in] attr      attribute of the object
192  * \param[out] fid      fid to get the parent FID
193  *
194  * \retval              0 if getting the parent FID succeeds.
195  * \retval              negative errno if getting the parent FID fails.
196  */
197 static inline int mdd_parent_fid(const struct lu_env *env,
198                                  struct mdd_object *obj,
199                                  const struct lu_attr *attr,
200                                  struct lu_fid *fid)
201 {
202         struct mdd_thread_info *info = mdd_env_info(env);
203         struct linkea_data ldata = { NULL };
204         struct lu_buf *buf = &info->mdi_link_buf;
205         struct lu_name lname;
206         int rc = 0;
207
208         ENTRY;
209
210         LASSERTF(S_ISDIR(mdd_object_type(obj)),
211                  "%s: FID "DFID" is not a directory type = %o\n",
212                  mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
213                  mdd_object_type(obj));
214
215         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
216         if (buf->lb_buf == NULL)
217                 GOTO(lookup, rc = 0);
218
219         ldata.ld_buf = buf;
220         rc = mdd_links_read_with_rec(env, obj, &ldata);
221         if (rc != 0)
222                 GOTO(lookup, rc);
223
224         /* the obj is not locked, don't cache attributes */
225         mdd_invalidate(env, &obj->mod_obj);
226
227         LASSERT(ldata.ld_leh != NULL);
228         /* Directory should only have 1 parent */
229         if (ldata.ld_leh->leh_reccount > 1)
230                 GOTO(lookup, rc);
231
232         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
233
234         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
235         if (likely(fid_is_sane(fid)))
236                 RETURN(0);
237 lookup:
238         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
239         RETURN(rc);
240 }
241
242 /*
243  * For root fid use special function, which does not compare version component
244  * of fid. Version component is different for root fids on all MDTs.
245  */
246 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
247 {
248         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
249                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
250 }
251
252 /*
253  * return 1: if \a tfid is the fid of the ancestor of \a mo;
254  * return 0: if not;
255  * otherwise: values < 0, errors.
256  */
257 static int mdd_is_parent(const struct lu_env *env,
258                         struct mdd_device *mdd,
259                         struct mdd_object *mo,
260                         const struct lu_attr *attr,
261                         const struct lu_fid *tfid)
262 {
263         struct mdd_object *mp;
264         struct lu_fid *pfid;
265         int rc;
266
267         LASSERT(!lu_fid_eq(mdd_object_fid(mo), tfid));
268         pfid = &mdd_env_info(env)->mdi_fid;
269
270         if (mdd_is_root(mdd, mdd_object_fid(mo)))
271                 return 0;
272
273         if (mdd_is_root(mdd, tfid))
274                 return 1;
275
276         rc = mdd_parent_fid(env, mo, attr, pfid);
277         if (rc)
278                 return rc;
279
280         while (1) {
281                 if (lu_fid_eq(pfid, tfid))
282                         return 1;
283
284                 if (mdd_is_root(mdd, pfid))
285                         return 0;
286
287                 mp = mdd_object_find(env, mdd, pfid);
288                 if (IS_ERR(mp))
289                         return PTR_ERR(mp);
290
291                 if (!mdd_object_exists(mp)) {
292                         mdd_object_put(env, mp);
293                         return -ENOENT;
294                 }
295
296                 rc = mdd_parent_fid(env, mp, attr, pfid);
297                 mdd_object_put(env, mp);
298                 if (rc)
299                         return rc;
300         }
301
302         return 0;
303 }
304
305 /*
306  * No permission check is needed.
307  *
308  * returns 1: if fid is ancestor of @mo;
309  * returns 0: if fid is not an ancestor of @mo;
310  * returns < 0: if error
311  */
312 static int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
313                          const struct lu_fid *fid)
314 {
315         struct mdd_device *mdd = mdo2mdd(mo);
316         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
317         int rc;
318
319         ENTRY;
320
321         if (!mdd_object_exists(md2mdd_obj(mo)))
322                 RETURN(-ENOENT);
323
324         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
325                 RETURN(-ENOTDIR);
326
327         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
328         if (rc != 0)
329                 RETURN(rc);
330
331         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
332         RETURN(rc);
333 }
334
335 /*
336  * Check that @dir contains no entries except (possibly) dot and dotdot.
337  *
338  * Returns:
339  *
340  *             0        empty
341  *      -ENOTDIR        not a directory object
342  *    -ENOTEMPTY        not empty
343  *           -ve        other error
344  *
345  */
346 int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir)
347 {
348         struct dt_it     *it;
349         struct dt_object *obj;
350         const struct dt_it_ops *iops;
351         int result;
352
353         ENTRY;
354
355         obj = mdd_object_child(dir);
356         if (!dt_try_as_dir(env, obj, true))
357                 RETURN(-ENOTDIR);
358
359         iops = &obj->do_index_ops->dio_it;
360         it = iops->init(env, obj, LUDA_64BITHASH);
361         if (!IS_ERR(it)) {
362                 result = iops->get(env, it, (const struct dt_key *)"");
363                 if (result > 0) {
364                         int i;
365
366                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
367                                 result = iops->next(env, it);
368                         if (result == 0)
369                                 result = -ENOTEMPTY;
370                         else if (result == 1)
371                                 result = 0;
372                 } else if (result == 0)
373                         /*
374                          * Huh? Index contains no zero key?
375                          */
376                         result = -EIO;
377
378                 iops->put(env, it);
379                 iops->fini(env, it);
380         } else {
381                 result = PTR_ERR(it);
382                 /* -ENODEV means no valid stripe */
383                 if (result == -ENODEV)
384                         RETURN(0);
385         }
386         RETURN(result);
387 }
388
389 /*
390  * Determine if the target object can be hard linked, and right now it only
391  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
392  * directory nlink count might exceed the maximum link count(see
393  * osd_object_ref_add), so it only check nlink for non-directories.
394  *
395  * \param[in] env       thread environment
396  * \param[in] obj       object being linked to
397  * \param[in] la        attributes of \a obj
398  *
399  * \retval              0 if \a obj can be hard linked
400  * \retval              negative error if \a obj is a directory or has too
401  *                      many links
402  */
403 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
404                           const struct lu_attr *la)
405 {
406         struct mdd_device *m = mdd_obj2mdd_dev(obj);
407
408         ENTRY;
409
410         LASSERT(la != NULL);
411
412         /* Subdir count limitation can be broken through
413          * (see osd_object_ref_add), so only check non-directory here.
414          */
415         if (!S_ISDIR(la->la_mode) &&
416             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
417                 RETURN(-EMLINK);
418
419         RETURN(0);
420 }
421
422 /**
423  * Check whether it may create the cobj under the pobj.
424  *
425  * \param[in] env       execution environment
426  * \param[in] pobj      the parent directory
427  * \param[in] pattr     the attribute of the parent directory
428  * \param[in] cobj      the child to be created
429  * \param[in] check_perm        if check WRITE|EXEC permission for parent
430  *
431  * \retval              = 0 create the child under this dir is allowed
432  * \retval              negative errno create the child under this dir is
433  *                      not allowed
434  */
435 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
436                    const struct lu_attr *pattr, struct mdd_object *cobj,
437                    bool check_perm)
438 {
439         int rc = 0;
440
441         ENTRY;
442
443         if (cobj && mdd_object_exists(cobj))
444                 RETURN(-EEXIST);
445
446         if (mdd_is_dead_obj(pobj))
447                 RETURN(-ENOENT);
448
449         if (check_perm)
450                 rc = mdd_permission_internal_locked(env, pobj, pattr,
451                                                     MAY_WRITE | MAY_EXEC,
452                                                     DT_TGT_PARENT);
453         RETURN(rc);
454 }
455
456 /* Check whether can unlink from the pobj in the case of "cobj == NULL". */
457 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
458                    const struct lu_attr *pattr, const struct lu_attr *attr)
459 {
460         int rc;
461
462         ENTRY;
463
464         if (mdd_is_dead_obj(pobj))
465                 RETURN(-ENOENT);
466
467         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
468                 RETURN(-EPERM);
469
470         rc = mdd_permission_internal_locked(env, pobj, pattr,
471                                             MAY_WRITE | MAY_EXEC,
472                                             DT_TGT_PARENT);
473         if (rc != 0)
474                 RETURN(rc);
475
476         if (pattr->la_flags & LUSTRE_APPEND_FL)
477                 RETURN(-EPERM);
478
479         RETURN(rc);
480 }
481
482 /* pobj == NULL is remote ops case, under such case, pobj's
483  * VTX feature has been checked already, no need check again.
484  */
485 static inline int mdd_is_sticky(const struct lu_env *env,
486                                 struct mdd_object *pobj,
487                                 const struct lu_attr *pattr,
488                                 struct mdd_object *cobj,
489                                 const struct lu_attr *cattr)
490 {
491         struct lu_ucred *uc = lu_ucred_assert(env);
492
493         if (pobj != NULL) {
494                 LASSERT(pattr != NULL);
495                 if (!(pattr->la_mode & S_ISVTX) ||
496                     (pattr->la_uid == uc->uc_fsuid))
497                         return 0;
498         }
499
500         LASSERT(cattr != NULL);
501         if (cattr->la_uid == uc->uc_fsuid)
502                 return 0;
503
504         return !cap_raised(uc->uc_cap, CAP_FOWNER);
505 }
506
507 static int mdd_may_delete_entry(const struct lu_env *env,
508                                 struct mdd_object *pobj,
509                                 const struct lu_attr *pattr,
510                                 int check_perm)
511 {
512         ENTRY;
513
514         LASSERT(pobj != NULL);
515         if (!mdd_object_exists(pobj))
516                 RETURN(-ENOENT);
517
518         if (mdd_is_dead_obj(pobj))
519                 RETURN(-ENOENT);
520
521         if (check_perm) {
522                 int rc;
523
524                 rc = mdd_permission_internal_locked(env, pobj, pattr,
525                                             MAY_WRITE | MAY_EXEC,
526                                             DT_TGT_PARENT);
527                 if (rc)
528                         RETURN(rc);
529         }
530
531         if (pattr->la_flags & LUSTRE_APPEND_FL)
532                 RETURN(-EPERM);
533
534         RETURN(0);
535 }
536
537 /*
538  * Check whether it may delete the cobj from the pobj.
539  * pobj maybe NULL
540  */
541 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
542                    const struct lu_attr *tpattr, struct mdd_object *tobj,
543                    const struct lu_attr *tattr, const struct lu_attr *cattr,
544                    int check_perm, int check_empty)
545 {
546         int rc = 0;
547
548         ENTRY;
549
550         if (tpobj) {
551                 LASSERT(tpattr != NULL);
552                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
553                 if (rc != 0)
554                         RETURN(rc);
555         }
556
557         if (tobj == NULL)
558                 RETURN(0);
559
560         if (!mdd_object_exists(tobj))
561                 RETURN(-ENOENT);
562
563         if (mdd_is_dead_obj(tobj))
564                 RETURN(-ESTALE);
565
566         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
567                 RETURN(-EPERM);
568
569         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
570                 RETURN(-EPERM);
571
572         /* additional check the rename case */
573         if (cattr) {
574                 if (S_ISDIR(cattr->la_mode)) {
575                         if (!S_ISDIR(tattr->la_mode))
576                                 RETURN(-ENOTDIR);
577
578                         if (mdd_is_root(mdo2mdd(&tobj->mod_obj),
579                                         mdd_object_fid(tobj)))
580                                 RETURN(-EBUSY);
581                 } else if (S_ISDIR(tattr->la_mode))
582                         RETURN(-EISDIR);
583         }
584
585         if (S_ISDIR(tattr->la_mode) && check_empty)
586                 rc = mdd_dir_is_empty(env, tobj);
587
588         RETURN(rc);
589 }
590
591 /**
592  * Check whether it can create the link file(linked to @src_obj) under
593  * the target directory(@tgt_obj), and src_obj has been locked by
594  * mdd_write_lock.
595  *
596  * \param[in] env       execution environment
597  * \param[in] tgt_obj   the target directory
598  * \param[in] tattr     attributes of target directory
599  * \param[in] lname     the link name
600  * \param[in] src_obj   source object for link
601  * \param[in] cattr     attributes for source object
602  *
603  * \retval              = 0 it is allowed to create the link file under tgt_obj
604  * \retval              negative error not allowed to create the link file
605  */
606 static int mdd_link_sanity_check(const struct lu_env *env,
607                                  struct mdd_object *tgt_obj,
608                                  const struct lu_attr *tattr,
609                                  const struct lu_name *lname,
610                                  struct mdd_object *src_obj,
611                                  const struct lu_attr *cattr)
612 {
613         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
614         int rc = 0;
615
616         ENTRY;
617
618         if (!mdd_object_exists(src_obj))
619                 RETURN(-ENOENT);
620
621         if (mdd_is_dead_obj(src_obj))
622                 RETURN(-ESTALE);
623
624         /* Local ops, no lookup before link, check filename length here. */
625         rc = mdd_name_check(env, m, lname);
626         if (rc < 0)
627                 RETURN(rc);
628
629         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
630                 RETURN(-EPERM);
631
632         if (S_ISDIR(mdd_object_type(src_obj)))
633                 RETURN(-EPERM);
634
635         LASSERT(src_obj != tgt_obj);
636         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
637         if (rc != 0)
638                 RETURN(rc);
639
640         rc = __mdd_may_link(env, src_obj, cattr);
641
642         RETURN(rc);
643 }
644
645 static int __mdd_index_delete_only(const struct lu_env *env,
646                                    struct mdd_object *pobj,
647                                    const char *name, struct thandle *handle)
648 {
649         struct dt_object *next = mdd_object_child(pobj);
650         int rc;
651
652         ENTRY;
653
654         if (dt_try_as_dir(env, next, true))
655                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
656         else
657                 rc = -ENOTDIR;
658
659         RETURN(rc);
660 }
661
662 static int __mdd_index_insert_only(const struct lu_env *env,
663                                    struct mdd_object *pobj,
664                                    const struct lu_fid *lf, __u32 type,
665                                    const char *name, struct thandle *handle)
666 {
667         struct dt_object *next = mdd_object_child(pobj);
668         int rc;
669
670         ENTRY;
671
672         if (dt_try_as_dir(env, next, true)) {
673                 struct dt_insert_rec *rec = &mdd_env_info(env)->mdi_dt_rec;
674
675                 rec->rec_fid = lf;
676                 rec->rec_type = type;
677                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
678                                (const struct dt_key *)name, handle);
679         } else {
680                 rc = -ENOTDIR;
681         }
682         RETURN(rc);
683 }
684
685 /* insert named index, add reference if isdir */
686 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
687                               const struct lu_fid *lf, __u32 type,
688                               const char *name, struct thandle *handle)
689 {
690         int rc;
691
692         ENTRY;
693
694         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
695         if (rc == 0 && S_ISDIR(type)) {
696                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
697                 mdo_ref_add(env, pobj, handle);
698                 mdd_write_unlock(env, pobj);
699         }
700
701         RETURN(rc);
702 }
703
704 /* delete named index, drop reference if isdir */
705 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
706                               const char *name, int is_dir,
707                               struct thandle *handle)
708 {
709         int rc;
710
711         ENTRY;
712
713         rc = __mdd_index_delete_only(env, pobj, name, handle);
714         if (rc == 0 && is_dir) {
715                 mdd_write_lock(env, pobj, DT_TGT_PARENT);
716                 mdo_ref_del(env, pobj, handle);
717                 mdd_write_unlock(env, pobj);
718         }
719
720         RETURN(rc);
721 }
722
723 static int mdd_llog_record_calc_size(const struct lu_env *env,
724                                      const struct lu_name *tname,
725                                      const struct lu_name *sname)
726 {
727         const struct lu_ucred   *uc = lu_ucred(env);
728         enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
729         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
730
731         if (sname != NULL)
732                 clf_flags |= CLF_RENAME;
733
734         if (uc != NULL && uc->uc_jobid[0] != '\0')
735                 clf_flags |= CLF_JOBID;
736
737         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
738                              changelog_rec_offset(clf_flags, crfe) +
739                              (tname != NULL ? tname->ln_namelen : 0) +
740                              (sname != NULL ? 1 + sname->ln_namelen : 0));
741 }
742
743 int mdd_declare_changelog_store(const struct lu_env *env,
744                                 struct mdd_device *mdd,
745                                 enum changelog_rec_type type,
746                                 const struct lu_name *tname,
747                                 const struct lu_name *sname,
748                                 struct thandle *handle)
749 {
750         struct obd_device *obd = mdd2obd_dev(mdd);
751         struct llog_ctxt *ctxt;
752         struct llog_rec_hdr rec_hdr;
753         struct thandle *llog_th;
754         int rc;
755
756         if (!mdd_changelog_enabled(env, mdd, type))
757                 return 0;
758
759         rec_hdr.lrh_len = mdd_llog_record_calc_size(env, tname, sname);
760         rec_hdr.lrh_type = CHANGELOG_REC;
761
762         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
763         if (ctxt == NULL)
764                 return -ENXIO;
765
766         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
767         if (IS_ERR(llog_th))
768                 GOTO(out_put, rc = PTR_ERR(llog_th));
769
770         rc = llog_declare_add(env, ctxt->loc_handle, &rec_hdr, llog_th);
771
772 out_put:
773         llog_ctxt_put(ctxt);
774
775         return rc;
776 }
777
778 /* The locking here is a bit tricky. For a CHANGELOG_REC the function
779  * drops loghandle->lgh_lock for a performance reasons. All dt_write()
780  * are used own offset, so it is safe.
781  * For other records general function is called and it doesnot drop
782  * a semaphore. The callers are changelog catalog records and initialisation
783  * records. llog_cat_new_log->llog_write_rec->mdd_changelog_write_rec()
784  *
785  * Since dt_record_write() could be reordered, rec1|rec2|0x0|rec4 could be
786  * at memory, reader should care about it. When the th is commited it is
787  * impossible to have a hole, since reordered records have the same th.
788  */
789 int mdd_changelog_write_rec(const struct lu_env *env,
790                             struct llog_handle *loghandle,
791                             struct llog_rec_hdr *r,
792                             struct llog_cookie *cookie,
793                             int idx, struct thandle *th)
794 {
795         int rc;
796         static struct thandle *saved_th;
797
798         CDEBUG(D_TRACE, "Adding rec %u type %u to "DFID" flags %x count %d\n",
799                idx, r->lrh_type, PLOGID(&loghandle->lgh_id),
800                loghandle->lgh_hdr->llh_flags, loghandle->lgh_hdr->llh_count);
801
802         if (r->lrh_type == CHANGELOG_REC) {
803                 struct mdd_device *mdd;
804                 struct llog_changelog_rec *rec;
805                 size_t left;
806                 __u32 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
807                 struct dt_object *o = loghandle->lgh_obj;
808                 loff_t offset;
809                 struct lu_buf lgi_buf;
810
811                 left = chunk_size - (loghandle->lgh_cur_offset &
812                                      (chunk_size - 1));
813
814                 mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
815                 rec = container_of(r, struct llog_changelog_rec, cr_hdr);
816
817                 /* Don't use padding records because it require a slot at header
818                  * so previous result of checking llog_is_full(loghandle)
819                  * would be invalid, leave zeroes at the end of block.
820                  * A reader would care about it.
821                  */
822                 if (left != 0 && left < r->lrh_len)
823                         loghandle->lgh_cur_offset += left;
824
825                 offset = loghandle->lgh_cur_offset;
826                 loghandle->lgh_cur_offset += r->lrh_len;
827                 r->lrh_index = ++loghandle->lgh_last_idx;
828
829                 spin_lock(&mdd->mdd_cl.mc_lock);
830                 rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
831                 spin_unlock(&mdd->mdd_cl.mc_lock);
832
833                 /* drop the loghandle semaphore for parallel writes */
834                 up_write(&loghandle->lgh_lock);
835
836                 REC_TAIL(r)->lrt_len = r->lrh_len;
837                 REC_TAIL(r)->lrt_index = r->lrh_index;
838
839                 lgi_buf.lb_len = rec->cr_hdr.lrh_len;
840                 lgi_buf.lb_buf = rec;
841
842                 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_CHANGELOG_FAIL_WRITE) &&
843                     (rec->cr.cr_index % (cfs_fail_val + 1)) == 0)
844                         rc = -EIO;
845                 else
846                         rc = dt_record_write(env, o, &lgi_buf, &offset, th);
847
848                 if (rc) {
849                         CERROR("%s: failed to write changelog record file "DFID" rec idx %u off %llu chnlg idx %llu: rc = %d\n",
850                                loghandle->lgh_ctxt->loc_obd->obd_name,
851                                PFID(lu_object_fid(&o->do_lu)), r->lrh_index,
852                                offset, rec->cr.cr_index, rc);
853                         return rc;
854                 }
855
856                 /* mark index at bitmap after successful write, increment count,
857                  * and lrt_index with a last index. Use a lgh_hdr_lock for
858                  * a synchronization with llog_cancel.
859                  */
860                 spin_lock(&loghandle->lgh_hdr_lock);
861                 rc = __test_and_set_bit_le(r->lrh_index,
862                                            LLOG_HDR_BITMAP(loghandle->lgh_hdr));
863                 LASSERTF(!rc,
864                          "%s: index %u already set in llog bitmap "DFID"\n",
865                          loghandle->lgh_ctxt->loc_obd->obd_name,
866                          r->lrh_index, PLOGID(&loghandle->lgh_id));
867                 loghandle->lgh_hdr->llh_count++;
868                 if (LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index < r->lrh_index)
869                         LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index =
870                                 r->lrh_index;
871                 spin_unlock(&loghandle->lgh_hdr_lock);
872
873                 if (unlikely(th != saved_th)) {
874                         CDEBUG(D_OTHER, "%s: wrote rec %u "DFID" count %d\n",
875                                loghandle->lgh_ctxt->loc_obd->obd_name,
876                                r->lrh_index, PLOGID(&loghandle->lgh_id),
877                                loghandle->lgh_hdr->llh_count);
878                         saved_th = th;
879                 }
880                 lgi_buf.lb_len = loghandle->lgh_hdr_size;
881                 lgi_buf.lb_buf = loghandle->lgh_hdr;
882                 offset = 0;
883                 CDEBUG(D_TRACE, "%s: writing header "DFID"\n",
884                        loghandle->lgh_ctxt->loc_obd->obd_name,
885                        PLOGID(&loghandle->lgh_id));
886                 /* full header write, it is a local. For a mapped bh
887                  * it is memcpy() only. Probably it could be delayed as work.
888                  */
889                 rc = dt_record_write(env, o, &lgi_buf, &offset, th);
890         } else {
891                 rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
892                                                 cookie, idx, th);
893         }
894         if (rc < 0)
895                 CERROR("%s: failed to write changelog record file "DFID" count %d offset %llu: rc = %d\n",
896                        loghandle->lgh_ctxt->loc_obd->obd_name,
897                        PLOGID(&loghandle->lgh_id),
898                        loghandle->lgh_hdr->llh_count, loghandle->lgh_cur_offset,
899                        rc);
900
901         return rc;
902 }
903
904 /**
905  * Checks that changelog consumes safe amount of space comparing
906  * with FS free space
907  *
908  * \param env - current lu_env
909  * \param mdd - current MDD device
910  * \param lgh - changelog catalog llog handle
911  * \param estimate - get exact llog size or estimate it.
912  *
913  * \retval true/false
914  */
915 bool mdd_changelog_is_space_safe(const struct lu_env *env,
916                                  struct mdd_device *mdd,
917                                  struct llog_handle *lgh,
918                                  bool estimate)
919 {
920         struct obd_statfs sfs;
921         unsigned long long free_space_limit;
922         unsigned long long llog_size;
923         int rc;
924
925         rc = dt_statfs(env, mdd->mdd_bottom, &sfs);
926         if (rc)
927                 /* check is ignored if OSD is not healthy for any reason */
928                 return true;
929
930         /* if changelog consumes more than 1/4 of available space then start
931          * emergency cleanup.
932          */
933         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_CHANGELOG_ENOSPC))
934                 free_space_limit = cfs_fail_val;
935         else
936                 free_space_limit = (sfs.os_bfree * sfs.os_bsize) >> 2;
937
938         /* if \estimate parameter is used then calculate llog size from
939          * number of used catalog entries and plain llog maximum size.
940          * Plain llog maximum size if set as 1/64 of FS free space limited
941          * by 128MB as maximum and 2MB as minimum, see llog_cat_new_log()
942          * Estimation helps to avoid full llog processing to get exact size
943          * by llog_cat_size().
944          */
945         if (estimate) {
946                 /* use 1/64 of FS size but keep it between 2MB and 128MB */
947                 llog_size = clamp_t(unsigned long long,
948                                     (sfs.os_blocks * sfs.os_bsize) >> 6,
949                                     2 << 20, 128 << 20);
950                 /* llog_cat_free_space() gives free slots, we need occupied,
951                  * so subtruct free from total slots minus one for header
952                  */
953                 llog_size *= LLOG_HDR_BITMAP_SIZE(lgh->lgh_hdr) - 1 -
954                              llog_cat_free_space(lgh);
955         } else {
956                 /* get exact llog size */
957                 llog_size = llog_cat_size(env, lgh);
958         }
959         CDEBUG(D_HA, "%s:%s changelog size is %lluMB, space limit is %lluMB\n",
960                mdd2obd_dev(mdd)->obd_name, estimate ? " estimated" : "",
961                llog_size >> 20, free_space_limit >> 20);
962
963         if (llog_size > free_space_limit) {
964                 CWARN("%s: changelog uses %lluMB with %lluMB space limit\n",
965                       mdd2obd_dev(mdd)->obd_name, llog_size >> 20,
966                       free_space_limit >> 20);
967                 return false;
968         }
969
970         return true;
971 }
972
973 /**
974  * Checks if there is enough space in changelog itself and in FS and force
975  * emergency changelog cleanup if needed. It will purge users one by one
976  * from the oldest one while emergency conditions are true.
977  *
978  * \param env - current lu_env
979  * \param mdd - current MDD device
980  * \param lgh - changelog catalog llog handle
981  *
982  * \retval true if emergency cleanup is needed for changelog
983  */
984 static bool mdd_changelog_emrg_cleanup(const struct lu_env *env,
985                                        struct mdd_device *mdd,
986                                        struct llog_handle *lgh)
987 {
988         unsigned long free_entries = llog_cat_free_space(lgh);
989
990         /* free space GC is disabled or is in progress already */
991         if (!mdd->mdd_changelog_free_space_gc || mdd->mdd_changelog_emrg_gc)
992                 return false;
993
994         if (free_entries <= mdd->mdd_changelog_min_free_cat_entries) {
995                 CWARN("%s: changelog has only %lu free catalog entries\n",
996                       mdd2obd_dev(mdd)->obd_name, free_entries);
997                 mdd->mdd_changelog_emrg_gc = true;
998                 return true;
999         }
1000
1001         if (!mdd_changelog_is_space_safe(env, mdd, lgh, true)) {
1002                 mdd->mdd_changelog_emrg_gc = true;
1003                 return true;
1004         }
1005
1006         return false;
1007 }
1008
1009 static bool mdd_changelog_need_gc(const struct lu_env *env,
1010                                   struct mdd_device *mdd,
1011                                   struct llog_handle *lgh)
1012 {
1013         struct mdd_changelog *mc = &mdd->mdd_cl;
1014
1015         return mdd_changelog_emrg_cleanup(env, mdd, lgh) ||
1016                mdd_changelog_is_too_idle(mdd, mc->mc_minrec, mc->mc_mintime) ||
1017                CFS_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD);
1018 }
1019
1020 /** Add a changelog entry \a rec to the changelog llog
1021  * \param mdd
1022  * \param rec
1023  * \param handle - currently ignored since llogs start their own transaction;
1024  *              this will hopefully be fixed in llog rewrite
1025  * \retval 0 ok
1026  */
1027 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
1028                         struct llog_changelog_rec *rec, struct thandle *th)
1029 {
1030         struct obd_device *obd = mdd2obd_dev(mdd);
1031         struct llog_ctxt *ctxt;
1032         struct thandle *llog_th;
1033         int rc;
1034         bool need_gc;
1035
1036         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
1037                                             changelog_rec_varsize(&rec->cr));
1038
1039         /* llog_lvfs_write_rec sets the llog tail len */
1040         rec->cr_hdr.lrh_type = CHANGELOG_REC;
1041         rec->cr.cr_time = cl_time();
1042
1043         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
1044         if (ctxt == NULL)
1045                 return -ENXIO;
1046
1047         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
1048         if (IS_ERR(llog_th))
1049                 GOTO(out_put, rc = PTR_ERR(llog_th));
1050
1051         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
1052         /* nested journal transaction */
1053         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
1054
1055         /* time to recover some space ?? */
1056         if (likely(!mdd->mdd_changelog_gc ||
1057                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
1058                    mdd->mdd_changelog_min_gc_interval >=
1059                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
1060                 /* save a spin_lock trip */
1061                 goto out_put;
1062
1063         if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_IDX_PUMP)) {
1064                 spin_lock(&mdd->mdd_cl.mc_lock);
1065                 mdd->mdd_cl.mc_index += cfs_fail_val;
1066                 spin_unlock(&mdd->mdd_cl.mc_lock);
1067         }
1068
1069         need_gc = mdd_changelog_need_gc(env, mdd, ctxt->loc_handle);
1070         spin_lock(&mdd->mdd_cl.mc_lock);
1071         if (likely(mdd->mdd_changelog_gc &&
1072                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
1073                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
1074                         mdd->mdd_changelog_min_gc_interval)) {
1075                 if (unlikely(need_gc)) {
1076                         CWARN("%s: %s starting changelog garbage collection\n",
1077                               obd->obd_name,
1078                               CFS_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
1079                               " simulate" : "");
1080                         /* indicate further kthread run will occur outside
1081                          * right after current journal transaction filling has
1082                          * completed
1083                          */
1084                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
1085                 }
1086                 /* next check in mdd_changelog_min_gc_interval anyway */
1087                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
1088         }
1089         spin_unlock(&mdd->mdd_cl.mc_lock);
1090 out_put:
1091         llog_ctxt_put(ctxt);
1092         if (rc > 0)
1093                 rc = 0;
1094         return rc;
1095 }
1096
1097 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
1098                                          const struct lu_fid *sfid,
1099                                          const struct lu_fid *spfid,
1100                                          const struct lu_name *sname)
1101 {
1102         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
1103         size_t extsize;
1104
1105         LASSERT(sfid != NULL);
1106         LASSERT(spfid != NULL);
1107         LASSERT(sname != NULL);
1108
1109         extsize = sname->ln_namelen + 1;
1110
1111         rnm->cr_sfid = *sfid;
1112         rnm->cr_spfid = *spfid;
1113
1114         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
1115         strscpy(changelog_rec_sname(rec), sname->ln_name, extsize);
1116         rec->cr_namelen += extsize;
1117 }
1118
1119 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
1120 {
1121         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
1122
1123         if (jobid == NULL || jobid[0] == '\0')
1124                 return;
1125
1126         strscpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
1127 }
1128
1129 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
1130 {
1131         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
1132
1133         ef->cr_extra_flags = eflags;
1134 }
1135
1136 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
1137                                     __u64 uid, __u64 gid)
1138 {
1139         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
1140
1141         uidgid->cr_uid = uid;
1142         uidgid->cr_gid = gid;
1143 }
1144
1145 /* To support the new large NID structure we use all the space in
1146  * struct changelog_ext_nid to store struct lnet_nid.
1147  */
1148 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
1149                                  const struct lnet_nid *nid)
1150 {
1151         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
1152
1153         BUILD_BUG_ON(sizeof(*clnid) < sizeof(*nid));
1154         memcpy(clnid, nid, sizeof(*nid));
1155 }
1156
1157 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
1158 {
1159         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
1160
1161         omd->cr_openflags = flags;
1162 }
1163
1164 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
1165                                    const char *xattr_name)
1166 {
1167         struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
1168
1169         strscpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
1170 }
1171
1172 /**
1173  * Set the parent FID at \a pfid for a namespace change changelog record, using
1174  * XATTR_NAME_LMV and linkEA from the remote object to obtain the correct
1175  * parent FID for striped directories
1176  *
1177  * \param[in] env - environment
1178  * \param[in] mdd - mdd device
1179  * \param[in] parent - parent object
1180  * \param[in] pattr - parent attribute
1181  * \param[out] pfid - parent fid
1182  *
1183  * \retval 0 success
1184  * \retval -errno failure
1185  */
1186 static int mdd_changelog_ns_pfid_set(const struct lu_env *env,
1187                                      struct mdd_device *mdd,
1188                                      struct mdd_object *parent,
1189                                      const struct lu_attr *pattr,
1190                                      struct lu_fid *pfid)
1191 {
1192         int rc = 0;
1193
1194         /* Certain userspace tools might rely on the previous behavior of
1195          * displaying the shard's parent FID, on some changelog records related
1196          * to striped directories, so use that for compatibility if needed
1197          */
1198         if (mdd->mdd_cl.mc_enable_shard_pfid) {
1199                 *pfid = *mdd_object_fid(parent);
1200                 return 0;
1201         }
1202
1203         if (!fid_is_zero(&parent->mod_striped_pfid)) {
1204                 *pfid = parent->mod_striped_pfid;
1205                 return 0;
1206         }
1207
1208         /* is the parent dir striped? */
1209         rc = mdo_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LMV);
1210         if (rc == -ENODATA) {
1211                 *pfid = *mdd_object_fid(parent);
1212                 parent->mod_striped_pfid = *pfid;
1213                 return 0;
1214         }
1215
1216         if (rc < 0)
1217                 return rc;
1218
1219         LASSERT(!mdd_is_root(mdo2mdd(&parent->mod_obj),
1220                              mdd_object_fid(parent)));
1221
1222         /* hide shard FID */
1223         rc = mdd_parent_fid(env, parent, pattr, pfid);
1224         if (!rc)
1225                 parent->mod_striped_pfid = *pfid;
1226
1227         return rc;
1228 }
1229
1230 /* The digested form is made of a FID (16 bytes) followed by the second-to-last
1231  * ciphertext block (16 bytes), so a total length of 32 bytes.
1232  */
1233 /* Must be identical to ll_digest_filename in llite_internal.h */
1234 struct changelog_digest_filename {
1235         struct lu_fid   cdf_fid;
1236         char            cdf_excerpt[LL_CRYPTO_BLOCK_SIZE];
1237 };
1238
1239 /**
1240  * Utility function to process filename in changelog
1241  *
1242  * \param[in] name     file name
1243  * \param[in] namelen  file name len
1244  * \param[in] fid      file FID
1245  * \param[in] enc      is object encrypted?
1246  * \param[out]ln       pointer to the struct lu_name to hold the real name
1247  *
1248  * If file is not encrypted, output name is just the file name.
1249  * If file is encrypted, file name needs to be decoded then digested if the name
1250  * is also encrypted. In this case a new buffer is allocated, and ln->ln_name
1251  * needs to be freed by the caller.
1252  *
1253  * \retval   0, on success
1254  * \retval -ve, on error
1255  */
1256 static int changelog_name2digest(const char *name, int namelen,
1257                                  const struct lu_fid *fid,
1258                                  bool enc, struct lu_name *ln)
1259 {
1260         struct changelog_digest_filename *digest = NULL;
1261         char *buf = NULL, *bufout = NULL, *p, *q;
1262         int len, bufoutlen;
1263         int rc = 0;
1264
1265         ENTRY;
1266
1267         ln->ln_name = name;
1268         ln->ln_namelen = namelen;
1269
1270         if (!enc)
1271                 GOTO(out, rc);
1272
1273         /* now we know file is encrypted */
1274         if (strnchr(name, namelen, '=')) {
1275                 /* only proceed to critical decode if
1276                  * encrypted name contains espace char '='
1277                  */
1278                 buf = kmalloc(namelen, GFP_NOFS);
1279                 if (!buf)
1280                         GOTO(out, rc = -ENOMEM);
1281
1282                 namelen = critical_decode(name, namelen, buf);
1283                 ln->ln_name = buf;
1284                 ln->ln_namelen = namelen;
1285         }
1286
1287         p = (char *)ln->ln_name;
1288         len = namelen;
1289         while (len--) {
1290                 if (!isprint(*p++))
1291                         break;
1292         }
1293
1294         /* len == -1 means we went through the whole decoded name without
1295          * finding any non-printable character, so consider it is not encrypted
1296          */
1297         if (len == -1)
1298                 GOTO(out, rc);
1299
1300         /* now we know the name has some non-printable characters */
1301         if (namelen > LL_CRYPTO_BLOCK_SIZE * 2) {
1302                 if (!fid)
1303                         GOTO(out, rc = -EPROTO);
1304
1305                 OBD_ALLOC_PTR(digest);
1306                 if (!digest)
1307                         GOTO(out, rc = -ENOMEM);
1308
1309                 digest->cdf_fid = *fid;
1310                 memcpy(digest->cdf_excerpt,
1311                        LLCRYPT_EXTRACT_DIGEST(ln->ln_name, ln->ln_namelen),
1312                        LL_CRYPTO_BLOCK_SIZE);
1313                 p = (char *)digest;
1314                 len = sizeof(*digest);
1315         } else {
1316                 p = (char *)ln->ln_name;
1317                 len = ln->ln_namelen;
1318         }
1319
1320         bufoutlen = BASE64URL_CHARS(len) + 2;
1321         bufout = kmalloc(digest ? bufoutlen + 1 : bufoutlen, GFP_NOFS);
1322         if (!bufout)
1323                 GOTO(free_digest, rc = -ENOMEM);
1324
1325         q = bufout;
1326         if (digest)
1327                 *q++ = LLCRYPT_DIGESTED_CHAR;
1328         /* beware that gss_base64url_encode adds a trailing space */
1329         gss_base64url_encode(&q, &bufoutlen, (__u8 *)p, len);
1330         if (bufoutlen == -1) {
1331                 kfree(bufout);
1332         } else {
1333                 kfree(buf);
1334                 ln->ln_name = bufout;
1335                 ln->ln_namelen = q - bufout - 1;
1336         }
1337
1338 free_digest:
1339         OBD_FREE_PTR(digest);
1340 out:
1341         RETURN(rc);
1342 }
1343
1344 /** Store a namespace change changelog record
1345  * If this fails, we must fail the whole transaction; we don't
1346  * want the change to commit without the log entry.
1347  * \param target - mdd_object of change
1348  * \param parent - target parent object
1349  * \param pattr - target parent attribute
1350  * \param sfid - source object fid
1351  * \param sparent - source parent object
1352  * \param spattr - source parent attribute
1353  * \param tname - target name string
1354  * \param sname - source name string
1355  * \param handle - transaction handle
1356  */
1357 int mdd_changelog_ns_store(const struct lu_env *env,
1358                            struct mdd_device *mdd,
1359                            enum changelog_rec_type type,
1360                            enum changelog_rec_flags clf_flags,
1361                            struct mdd_object *target,
1362                            struct mdd_object *parent,
1363                            const struct lu_attr *pattr,
1364                            const struct lu_fid *sfid,
1365                            struct mdd_object *sparent,
1366                            const struct lu_attr *spattr,
1367                            const struct lu_name *tname,
1368                            const struct lu_name *sname,
1369                            struct thandle *handle)
1370 {
1371         struct mdd_thread_info *info = mdd_env_info(env);
1372         struct lu_name *ltname = NULL, *lsname = NULL;
1373         const struct lu_ucred *uc = lu_ucred(env);
1374         struct llog_changelog_rec *rec;
1375         __u64 xflags = CLFE_INVALID;
1376         struct lu_fid *tfid = NULL;
1377         struct lu_buf *buf;
1378         int reclen;
1379         bool enc;
1380         int rc = 0;
1381
1382         ENTRY;
1383
1384         if (!mdd_changelog_enabled(env, mdd, type))
1385                 RETURN(0);
1386
1387         LASSERT(S_ISDIR(mdd_object_type(parent)));
1388         LASSERT(tname != NULL);
1389         LASSERT(handle != NULL);
1390
1391         if (tname) {
1392                 OBD_ALLOC_PTR(ltname);
1393                 if (!ltname)
1394                         GOTO(out, rc = -ENOMEM);
1395
1396                 if (sname) {
1397                         enc = info->mdi_tpattr.la_valid & LA_FLAGS &&
1398                                 info->mdi_tpattr.la_flags & LUSTRE_ENCRYPT_FL;
1399                         tfid = (struct lu_fid *)sfid;
1400                 } else {
1401                         enc = info->mdi_pattr.la_valid & LA_FLAGS &&
1402                                 info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
1403                         tfid = (struct lu_fid *)mdd_object_fid(target);
1404                 }
1405                 rc = changelog_name2digest(tname->ln_name, tname->ln_namelen,
1406                                            tfid, enc, ltname);
1407                 if (rc)
1408                         GOTO(out_ltname, rc);
1409         }
1410         if (sname) {
1411                 OBD_ALLOC_PTR(lsname);
1412                 if (!lsname)
1413                         GOTO(out_ltname, rc = -ENOMEM);
1414
1415                 enc = info->mdi_pattr.la_valid & LA_FLAGS &&
1416                         info->mdi_pattr.la_flags & LUSTRE_ENCRYPT_FL;
1417                 rc = changelog_name2digest(sname->ln_name, sname->ln_namelen,
1418                                            tfid, enc, lsname);
1419                 if (rc)
1420                         GOTO(out_lsname, rc);
1421         }
1422
1423         reclen = mdd_llog_record_calc_size(env, ltname, lsname);
1424         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_chlg_buf, reclen);
1425         if (buf->lb_buf == NULL)
1426                 GOTO(out_lsname, rc = -ENOMEM);
1427         rec = buf->lb_buf;
1428
1429         clf_flags &= CLF_FLAGMASK;
1430         clf_flags |= CLF_EXTRA_FLAGS;
1431
1432         if (uc) {
1433                 if (uc->uc_jobid[0] != '\0')
1434                         clf_flags |= CLF_JOBID;
1435                 xflags |= CLFE_UIDGID;
1436                 xflags |= CLFE_NID;
1437                 xflags |= CLFE_NID_BE;
1438         }
1439
1440         if (lsname != NULL)
1441                 clf_flags |= CLF_RENAME;
1442         else
1443                 clf_flags |= CLF_VERSION;
1444
1445         rec->cr.cr_flags = clf_flags;
1446
1447         if (clf_flags & CLF_EXTRA_FLAGS) {
1448                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
1449                 if (xflags & CLFE_UIDGID)
1450                         mdd_changelog_rec_extra_uidgid(&rec->cr,
1451                                                        uc->uc_uid, uc->uc_gid);
1452                 if (xflags & CLFE_NID)
1453                         mdd_changelog_rec_extra_nid(&rec->cr, &uc->uc_nid);
1454         }
1455
1456         rec->cr.cr_type = (__u32)type;
1457
1458         rc = mdd_changelog_ns_pfid_set(env, mdd, parent, pattr,
1459                                        &rec->cr.cr_pfid);
1460         if (rc < 0)
1461                 GOTO(out_lsname, rc);
1462
1463         rec->cr.cr_namelen = ltname->ln_namelen;
1464         memcpy(changelog_rec_name(&rec->cr), ltname->ln_name,
1465                ltname->ln_namelen);
1466
1467         if (clf_flags & CLF_RENAME) {
1468                 struct lu_fid spfid;
1469
1470                 rc = mdd_changelog_ns_pfid_set(env, mdd, sparent, spattr,
1471                                                &spfid);
1472                 if (rc < 0)
1473                         GOTO(out_lsname, rc);
1474
1475                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, &spfid, lsname);
1476         }
1477
1478         if (clf_flags & CLF_JOBID)
1479                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1480
1481         if (likely(target != NULL)) {
1482                 rec->cr.cr_tfid = *mdd_object_fid(target);
1483                 target->mod_cltime = ktime_get();
1484         } else {
1485                 fid_zero(&rec->cr.cr_tfid);
1486         }
1487
1488         rc = mdd_changelog_store(env, mdd, rec, handle);
1489         if (rc < 0) {
1490                 CERROR("%s: cannot store changelog record: type = %d, name = '%s', t = "
1491                        DFID", p = "DFID": rc = %d\n",
1492                        mdd2obd_dev(mdd)->obd_name, type, ltname->ln_name,
1493                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1494                 GOTO(out_lsname, rc = -EFAULT);
1495         }
1496
1497 out_lsname:
1498         if (lsname && lsname->ln_name != sname->ln_name)
1499                 kfree(lsname->ln_name);
1500         OBD_FREE_PTR(lsname);
1501 out_ltname:
1502         if (ltname && ltname->ln_name != tname->ln_name)
1503                 kfree(ltname->ln_name);
1504         OBD_FREE_PTR(ltname);
1505 out:
1506         RETURN(rc);
1507 }
1508
1509 static int __mdd_links_add(const struct lu_env *env,
1510                            struct mdd_object *mdd_obj,
1511                            struct linkea_data *ldata,
1512                            const struct lu_name *lname,
1513                            const struct lu_fid *pfid,
1514                            int first, int check)
1515 {
1516         /* cattr is set in mdd_link */
1517         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1518         int rc;
1519
1520         if (ldata->ld_leh == NULL) {
1521                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1522                 if (rc) {
1523                         if (rc != -ENODATA)
1524                                 return rc;
1525                         rc = linkea_data_new(ldata,
1526                                              &mdd_env_info(env)->mdi_link_buf);
1527                         if (rc)
1528                                 return rc;
1529                 }
1530         }
1531
1532         if (check) {
1533                 rc = linkea_links_find(ldata, lname, pfid);
1534                 if (rc && rc != -ENOENT)
1535                         return rc;
1536                 if (rc == 0)
1537                         return -EEXIST;
1538         }
1539
1540         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1541                 struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1542
1543                 *tfid = *pfid;
1544                 tfid->f_ver = ~0;
1545                 linkea_add_buf(ldata, lname, tfid, false);
1546         }
1547
1548         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1549                 linkea_add_buf(ldata, lname, pfid, false);
1550
1551         /* For encrypted file, we want to limit number of hard links to what
1552          * linkEA can contain. So ask to return error in case of overflow.
1553          * Currently linkEA stores 4KiB of links, that is 14 NAME_MAX links,
1554          * or 119 16-byte names.
1555          */
1556         return linkea_add_buf(ldata, lname, pfid,
1557                               cattr->la_valid & LA_FLAGS &&
1558                               cattr->la_flags & LUSTRE_ENCRYPT_FL);
1559 }
1560
1561 static int __mdd_links_del(const struct lu_env *env,
1562                            struct mdd_object *mdd_obj,
1563                            struct linkea_data *ldata,
1564                            const struct lu_name *lname,
1565                            const struct lu_fid *pfid)
1566 {
1567         /* cattr is set in mdd_link */
1568         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1569         int rc;
1570
1571         if (ldata->ld_leh == NULL) {
1572                 rc = mdd_links_read(env, mdd_obj, ldata);
1573                 if (rc)
1574                         return rc;
1575         }
1576
1577         rc = linkea_links_find(ldata, lname, pfid);
1578         if (rc)
1579                 return rc;
1580
1581         linkea_del_buf(ldata, lname,
1582                        cattr->la_valid & LA_FLAGS &&
1583                        cattr->la_flags & LUSTRE_ENCRYPT_FL);
1584         return 0;
1585 }
1586
1587 static int mdd_linkea_prepare(const struct lu_env *env,
1588                               struct mdd_object *mdd_obj,
1589                               const struct lu_fid *oldpfid,
1590                               const struct lu_name *oldlname,
1591                               const struct lu_fid *newpfid,
1592                               const struct lu_name *newlname,
1593                               int first, int check,
1594                               struct linkea_data *ldata)
1595 {
1596         int rc = 0;
1597
1598         ENTRY;
1599
1600         if (CFS_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1601                 RETURN(0);
1602
1603         LASSERT(oldpfid != NULL || newpfid != NULL);
1604
1605         if (mdd_obj->mod_flags & DEAD_OBJ)
1606                 /* Unnecessary to update linkEA for dead object.  */
1607                 RETURN(0);
1608
1609         if (oldpfid != NULL) {
1610                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1611                 if (rc) {
1612                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1613                                 RETURN(rc);
1614
1615                         /* No changes done. */
1616                         rc = 0;
1617                 }
1618         }
1619
1620         /* If renaming, add the new record */
1621         if (newpfid != NULL)
1622                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1623                                      first, check);
1624
1625         RETURN(rc);
1626 }
1627
1628 int mdd_links_rename(const struct lu_env *env,
1629                      struct mdd_object *mdd_obj,
1630                      const struct lu_fid *oldpfid,
1631                      const struct lu_name *oldlname,
1632                      const struct lu_fid *newpfid,
1633                      const struct lu_name *newlname,
1634                      struct thandle *handle,
1635                      struct linkea_data *ldata,
1636                      int first, int check)
1637 {
1638         int rc = 0;
1639
1640         ENTRY;
1641
1642         if (ldata == NULL) {
1643                 ldata = &mdd_env_info(env)->mdi_link_data;
1644                 memset(ldata, 0, sizeof(*ldata));
1645                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1646                                         newpfid, newlname, first, check, ldata);
1647                 if (rc)
1648                         GOTO(out, rc);
1649         }
1650
1651         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1652                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1653
1654         GOTO(out, rc);
1655
1656 out:
1657         if (rc != 0) {
1658                 if (newlname == NULL)
1659                         CERROR("link_ea add failed "DFID": rc = %d\n",
1660                                PFID(mdd_object_fid(mdd_obj)), rc);
1661                 else if (oldpfid == NULL)
1662                         CERROR("link_ea add '%.*s' failed "DFID": rc = %d\n",
1663                                newlname->ln_namelen, newlname->ln_name,
1664                                PFID(mdd_object_fid(mdd_obj)), rc);
1665                 else if (newpfid == NULL)
1666                         CERROR("link_ea del '%.*s' failed "DFID": rc = %d\n",
1667                                oldlname->ln_namelen, oldlname->ln_name,
1668                                PFID(mdd_object_fid(mdd_obj)), rc);
1669                 else
1670                         CERROR("link_ea rename '%.*s'->'%.*s' failed "DFID": rc = %d\n",
1671                                oldlname->ln_namelen, oldlname->ln_name,
1672                                newlname->ln_namelen, newlname->ln_name,
1673                                PFID(mdd_object_fid(mdd_obj)), rc);
1674         }
1675
1676         if (is_vmalloc_addr(ldata->ld_buf))
1677                 /* if we vmalloced a large buffer drop it */
1678                 lu_buf_free(ldata->ld_buf);
1679
1680         return rc;
1681 }
1682
1683 static inline int mdd_links_add(const struct lu_env *env,
1684                                 struct mdd_object *mdd_obj,
1685                                 const struct lu_fid *pfid,
1686                                 const struct lu_name *lname,
1687                                 struct thandle *handle,
1688                                 struct linkea_data *ldata, int first)
1689 {
1690         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1691                                 pfid, lname, handle, ldata, first, 0);
1692 }
1693
1694 static inline int mdd_links_del(const struct lu_env *env,
1695                                 struct mdd_object *mdd_obj,
1696                                 const struct lu_fid *pfid,
1697                                 const struct lu_name *lname,
1698                                 struct thandle *handle)
1699 {
1700         return mdd_links_rename(env, mdd_obj, pfid, lname,
1701                                 NULL, NULL, handle, NULL, 0, 0);
1702 }
1703
1704 /** Read the link EA into a temp buffer.
1705  * Uses the name_buf since it is generally large.
1706  * \retval IS_ERR err
1707  * \retval ptr to \a lu_buf (always \a mdi_link_buf)
1708  */
1709 struct lu_buf *mdd_links_get(const struct lu_env *env,
1710                              struct mdd_object *mdd_obj)
1711 {
1712         struct linkea_data ldata = { NULL };
1713         int rc;
1714
1715         rc = mdd_links_read(env, mdd_obj, &ldata);
1716         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1717 }
1718
1719 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1720                     struct linkea_data *ldata, struct thandle *handle)
1721 {
1722         const struct lu_buf *buf;
1723         int                 rc;
1724
1725         if (ldata == NULL || ldata->ld_buf == NULL ||
1726             ldata->ld_leh == NULL)
1727                 return 0;
1728
1729         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1730                 return 0;
1731
1732 again:
1733         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1734                                 ldata->ld_leh->leh_len);
1735         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1736         if (unlikely(rc == -ENOSPC)) {
1737                 rc = linkea_overflow_shrink(ldata);
1738                 if (likely(rc > 0))
1739                         goto again;
1740         }
1741
1742         return rc;
1743 }
1744
1745 static int mdd_declare_links_add(const struct lu_env *env,
1746                                  struct mdd_object *mdd_obj,
1747                                  struct thandle *handle,
1748                                  struct linkea_data *ldata)
1749 {
1750         int rc;
1751         int ea_len;
1752         void *linkea;
1753
1754         if (ldata != NULL && ldata->ld_leh != NULL) {
1755                 ea_len = ldata->ld_leh->leh_len;
1756                 linkea = ldata->ld_buf->lb_buf;
1757         } else {
1758                 ea_len = MAX_LINKEA_SIZE;
1759                 linkea = NULL;
1760         }
1761
1762         rc = mdo_declare_xattr_set(env, mdd_obj,
1763                                    mdd_buf_get_const(env, linkea, ea_len),
1764                                    XATTR_NAME_LINK, 0, handle);
1765
1766         return rc;
1767 }
1768
1769 static inline int mdd_declare_links_del(const struct lu_env *env,
1770                                         struct mdd_object *c,
1771                                         struct thandle *handle)
1772 {
1773         int rc = 0;
1774
1775         /* For directory, linkEA will be removed together with the object. */
1776         if (!S_ISDIR(mdd_object_type(c)))
1777                 rc = mdd_declare_links_add(env, c, handle, NULL);
1778
1779         return rc;
1780 }
1781
1782 static int mdd_declare_link(const struct lu_env *env,
1783                             struct mdd_device *mdd,
1784                             struct mdd_object *p,
1785                             struct mdd_object *c,
1786                             const struct lu_name *name,
1787                             struct thandle *handle,
1788                             struct lu_attr *la,
1789                             struct linkea_data *data)
1790 {
1791         struct lu_fid tfid = *mdd_object_fid(c);
1792         int rc;
1793
1794         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1795                 tfid.f_oid = cfs_fail_val;
1796
1797         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1798                                       name->ln_name, handle);
1799         if (rc != 0)
1800                 return rc;
1801
1802         rc = mdo_declare_ref_add(env, c, handle);
1803         if (rc != 0)
1804                 return rc;
1805
1806         la->la_valid = LA_CTIME | LA_MTIME;
1807         rc = mdo_declare_attr_set(env, p, la, handle);
1808         if (rc != 0)
1809                 return rc;
1810
1811         la->la_valid = LA_CTIME;
1812         rc = mdo_declare_attr_set(env, c, la, handle);
1813         if (rc != 0)
1814                 return rc;
1815
1816         rc = mdd_declare_links_add(env, c, handle, data);
1817         if (rc != 0)
1818                 return rc;
1819
1820         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1821                                          handle);
1822
1823         return rc;
1824 }
1825
1826 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1827                     struct md_object *src_obj, const struct lu_name *lname,
1828                     struct md_attr *ma)
1829 {
1830         const char *name = lname->ln_name;
1831         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
1832         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1833         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1834         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1835         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
1836         struct mdd_device *mdd = mdo2mdd(src_obj);
1837         struct thandle *handle;
1838         struct lu_fid *tfid = &mdd_env_info(env)->mdi_fid2;
1839         struct linkea_data *ldata = &mdd_env_info(env)->mdi_link_data;
1840         int rc;
1841
1842         ENTRY;
1843
1844         rc = mdd_la_get(env, mdd_sobj, cattr);
1845         if (rc != 0)
1846                 RETURN(rc);
1847
1848         rc = mdd_la_get(env, mdd_tobj, tattr);
1849         if (rc != 0)
1850                 RETURN(rc);
1851
1852         /* If we are using project inheritance, we only allow hard link
1853          * creation in our tree when the project IDs are the same;
1854          * otherwise the tree quota mechanism could be circumvented.
1855          */
1856         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1857             (tattr->la_projid != cattr->la_projid))
1858                 RETURN(-EXDEV);
1859
1860         handle = mdd_trans_create(env, mdd);
1861         if (IS_ERR(handle))
1862                 GOTO(out_pending, rc = PTR_ERR(handle));
1863
1864         memset(ldata, 0, sizeof(*ldata));
1865
1866         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1867         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1868
1869         /* Note: even this function will change ldata, but it comes from
1870          * thread_info, which is completely temporary and only seen in
1871          * this function, so we do not need reset ldata once it fails.
1872          */
1873         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1874                                 mdd_object_fid(mdd_tobj), lname, 0, 0, ldata);
1875         if (rc != 0)
1876                 GOTO(stop, rc);
1877
1878         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1879                               la, ldata);
1880         if (rc)
1881                 GOTO(stop, rc);
1882
1883         rc = mdd_trans_start(env, mdd, handle);
1884         if (rc)
1885                 GOTO(stop, rc);
1886
1887         mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
1888         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1889                                    cattr);
1890         if (rc)
1891                 GOTO(out_unlock, rc);
1892
1893         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1894                 rc = mdo_ref_add(env, mdd_sobj, handle);
1895                 if (rc != 0)
1896                         GOTO(out_unlock, rc);
1897         }
1898
1899         *tfid = *mdd_object_fid(mdd_sobj);
1900         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1901                 tfid->f_oid = cfs_fail_val;
1902
1903         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1904                                      mdd_object_type(mdd_sobj), name, handle);
1905         if (rc != 0) {
1906                 mdo_ref_del(env, mdd_sobj, handle);
1907                 GOTO(out_unlock, rc);
1908         }
1909
1910         la->la_valid = LA_CTIME | LA_MTIME;
1911         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1912         if (rc)
1913                 GOTO(out_unlock, rc);
1914
1915         la->la_valid = LA_CTIME;
1916         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1917         if (rc == 0)
1918                 /* Note: The failure of links_add should not cause the
1919                  * link failure, so do not check return value.
1920                  */
1921                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_tobj),
1922                               lname, handle, ldata, 0);
1923
1924         EXIT;
1925 out_unlock:
1926         mdd_write_unlock(env, mdd_sobj);
1927         if (rc == 0)
1928                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1929                                             mdd_tobj, tattr, NULL,
1930                                             NULL, NULL, lname, NULL, handle);
1931 stop:
1932         rc = mdd_trans_stop(env, mdd, rc, handle);
1933         if (is_vmalloc_addr(ldata->ld_buf))
1934                 /* if we vmalloced a large buffer drop it */
1935                 lu_buf_free(ldata->ld_buf);
1936 out_pending:
1937         return rc;
1938 }
1939
1940 static int mdd_mark_orphan_object(const struct lu_env *env,
1941                                 struct mdd_object *obj, struct thandle *handle,
1942                                 bool declare)
1943 {
1944         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1945         int rc;
1946
1947         attr->la_valid = LA_FLAGS;
1948         attr->la_flags = LUSTRE_ORPHAN_FL;
1949
1950         if (declare)
1951                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1952         else
1953                 rc = mdo_attr_set(env, obj, attr, handle);
1954
1955         return rc;
1956 }
1957
1958 static int mdd_declare_finish_unlink(const struct lu_env *env,
1959                                      struct mdd_object *obj,
1960                                      struct thandle *handle)
1961 {
1962         int rc;
1963
1964         /* Sigh, we do not know if the unlink object will become orphan in
1965          * declare phase, but fortunately the flags here does not matter
1966          * in current declare implementation
1967          */
1968         rc = mdd_mark_orphan_object(env, obj, handle, true);
1969         if (rc != 0)
1970                 return rc;
1971
1972         rc = mdo_declare_destroy(env, obj, handle);
1973         if (rc != 0)
1974                 return rc;
1975
1976         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1977         if (rc != 0)
1978                 return rc;
1979
1980         return mdd_declare_links_del(env, obj, handle);
1981 }
1982
1983 /* caller should take a lock before calling */
1984 int mdd_finish_unlink(const struct lu_env *env,
1985                       struct mdd_object *obj, struct md_attr *ma,
1986                       struct mdd_object *pobj,
1987                       const struct lu_name *lname,
1988                       struct thandle *th)
1989 {
1990         int rc = 0;
1991         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1992
1993         ENTRY;
1994
1995         LASSERT(mdd_write_locked(env, obj) != 0);
1996
1997         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1998                 /* add new orphan, object will be deleted during mdd_close() */
1999                 obj->mod_flags |= DEAD_OBJ;
2000                 if (obj->mod_count) {
2001                         rc = mdd_orphan_insert(env, obj, th);
2002                         if (rc == 0)
2003                                 CDEBUG(D_HA,
2004                                        "Object "DFID" is inserted into orphan list, open count = %d\n",
2005                                        PFID(mdd_object_fid(obj)),
2006                                        obj->mod_count);
2007                         else
2008                                 CERROR("Object "DFID" fail to be an orphan, open count = %d, maybe cause failed open replay\n",
2009                                         PFID(mdd_object_fid(obj)),
2010                                         obj->mod_count);
2011
2012                         /* mark object as an orphan here, not before
2013                          * mdd_orphan_insert() as racing mdd_la_get() may
2014                          * propagate ORPHAN_OBJ causing the asserition
2015                          */
2016                         rc = mdd_mark_orphan_object(env, obj, th, false);
2017                 } else {
2018                         rc = mdo_destroy(env, obj, th);
2019                 }
2020         } else if (!is_dir) {
2021                 /* old files may not have link ea; ignore errors */
2022                 mdd_links_del(env, obj, mdd_object_fid(pobj), lname, th);
2023         }
2024
2025         RETURN(rc);
2026 }
2027
2028 /*
2029  * pobj maybe NULL
2030  * has mdd_write_lock on cobj already, but not on pobj yet
2031  */
2032 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
2033                             const struct lu_attr *pattr,
2034                             struct mdd_object *cobj,
2035                             const struct lu_attr *cattr)
2036 {
2037         int rc;
2038
2039         ENTRY;
2040
2041         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
2042
2043         RETURN(rc);
2044 }
2045
2046 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
2047                               struct mdd_object *p, struct mdd_object *c,
2048                               const struct lu_name *name, struct md_attr *ma,
2049                               struct thandle *handle, int no_name, int is_dir)
2050 {
2051         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2052         int rc;
2053
2054         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
2055                 if (likely(no_name == 0)) {
2056                         rc = mdo_declare_index_delete(env, p, name->ln_name,
2057                                                       handle);
2058                         if (rc != 0)
2059                                 return rc;
2060                 }
2061
2062                 if (is_dir != 0) {
2063                         rc = mdo_declare_ref_del(env, p, handle);
2064                         if (rc != 0)
2065                                 return rc;
2066                 }
2067         }
2068
2069         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2070         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2071         la->la_valid = LA_CTIME | LA_MTIME;
2072         rc = mdo_declare_attr_set(env, p, la, handle);
2073         if (rc)
2074                 return rc;
2075
2076         if (c != NULL) {
2077                 rc = mdo_declare_ref_del(env, c, handle);
2078                 if (rc)
2079                         return rc;
2080
2081                 rc = mdo_declare_ref_del(env, c, handle);
2082                 if (rc)
2083                         return rc;
2084
2085                 la->la_valid = LA_CTIME;
2086                 rc = mdo_declare_attr_set(env, c, la, handle);
2087                 if (rc)
2088                         return rc;
2089
2090                 rc = mdd_declare_finish_unlink(env, c, handle);
2091                 if (rc)
2092                         return rc;
2093
2094                 /* FIXME: need changelog for remove entry */
2095                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
2096                                                  NULL, handle);
2097         }
2098
2099         return rc;
2100 }
2101
2102 /*
2103  * test if a file has an HSM archive
2104  * if HSM attributes are not found in ma update them from
2105  * HSM xattr
2106  */
2107 static bool mdd_hsm_archive_exists(const struct lu_env *env,
2108                                    struct mdd_object *obj,
2109                                    struct md_attr *ma)
2110 {
2111         ENTRY;
2112
2113         if (!(ma->ma_valid & MA_HSM)) {
2114                 /* no HSM MD provided, read xattr */
2115                 struct lu_buf *hsm_buf;
2116                 const size_t buflen = sizeof(struct hsm_attrs);
2117                 int rc;
2118
2119                 hsm_buf = mdd_buf_get(env, NULL, 0);
2120                 lu_buf_alloc(hsm_buf, buflen);
2121                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
2122                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
2123                 lu_buf_free(hsm_buf);
2124                 if (rc < 0)
2125                         RETURN(false);
2126
2127                 ma->ma_valid |= MA_HSM;
2128         }
2129         if (ma->ma_hsm.mh_flags & HS_EXISTS)
2130                 RETURN(true);
2131         RETURN(false);
2132 }
2133
2134 /**
2135  * Delete name entry and the object.
2136  * Note: no_name == 1 means it only destory the object, i.e. name_entry
2137  * does not exist for this object, and it could only happen during resending
2138  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
2139  * is also needed in this case(needed by changelog), so we have to add another
2140  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
2141  * the ENOENT failure should be able to be fixed by redo mechanism.
2142  */
2143 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
2144                       struct md_object *cobj, const struct lu_name *lname,
2145                       struct md_attr *ma, int no_name)
2146 {
2147         char *name = (char *)lname->ln_name;
2148         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2149         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2150         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2151         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2152         struct mdd_object *mdd_cobj = NULL;
2153         struct mdd_device *mdd = mdo2mdd(pobj);
2154         struct thandle    *handle;
2155         int rc, is_dir = 0, cl_flags = 0;
2156
2157         ENTRY;
2158
2159         /* let shutdown to start */
2160         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
2161
2162         /* cobj == NULL means only delete name entry */
2163         if (likely(cobj != NULL)) {
2164                 mdd_cobj = md2mdd_obj(cobj);
2165                 if (mdd_object_exists(mdd_cobj) == 0)
2166                         RETURN(-ENOENT);
2167         }
2168
2169         rc = mdd_la_get(env, mdd_pobj, pattr);
2170         if (rc)
2171                 RETURN(rc);
2172
2173         if (likely(mdd_cobj != NULL)) {
2174                 /* fetch cattr */
2175                 rc = mdd_la_get(env, mdd_cobj, cattr);
2176                 if (rc)
2177                         RETURN(rc);
2178
2179                 is_dir = S_ISDIR(cattr->la_mode);
2180                 /* search for an existing archive. We should check ahead as the
2181                  * object can be destroyed in this transaction
2182                  */
2183                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
2184                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
2185         }
2186
2187         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
2188         if (rc)
2189                 RETURN(rc);
2190
2191         handle = mdd_trans_create(env, mdd);
2192         if (IS_ERR(handle))
2193                 RETURN(PTR_ERR(handle));
2194
2195         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
2196                                 lname, ma, handle, no_name, is_dir);
2197         if (rc)
2198                 GOTO(stop, rc);
2199
2200         rc = mdd_trans_start(env, mdd, handle);
2201         if (rc)
2202                 GOTO(stop, rc);
2203
2204         if (likely(mdd_cobj != NULL))
2205                 mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
2206
2207         if (lname->ln_name[lname->ln_namelen] != '\0') {
2208                 /* lname->ln_name is not necessarily NUL terminated */
2209                 name = kmalloc(lname->ln_namelen + 1, GFP_NOFS);
2210                 if (!name)
2211                         GOTO(cleanup, rc = -ENOMEM);
2212
2213                 memcpy(name, lname->ln_name, lname->ln_namelen);
2214                 name[lname->ln_namelen] = '\0';
2215         }
2216
2217         if (likely(no_name == 0) && !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
2218                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
2219                 if (rc)
2220                         GOTO(cleanup, rc);
2221         }
2222
2223         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
2224             CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
2225                 GOTO(cleanup, rc = 0);
2226
2227         if (likely(mdd_cobj != NULL)) {
2228                 rc = mdo_ref_del(env, mdd_cobj, handle);
2229                 if (rc != 0) {
2230                         __mdd_index_insert_only(env, mdd_pobj,
2231                                                 mdd_object_fid(mdd_cobj),
2232                                                 mdd_object_type(mdd_cobj),
2233                                                 name, handle);
2234                         GOTO(cleanup, rc);
2235                 }
2236
2237                 if (is_dir)
2238                         /* unlink dot */
2239                         mdo_ref_del(env, mdd_cobj, handle);
2240
2241                 /* fetch updated nlink */
2242                 rc = mdd_la_get(env, mdd_cobj, cattr);
2243                 if (rc)
2244                         GOTO(cleanup, rc);
2245         }
2246
2247         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2248         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2249
2250         la->la_valid = LA_CTIME | LA_MTIME;
2251         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2252         if (rc)
2253                 GOTO(cleanup, rc);
2254
2255         /* Enough for only unlink the entry */
2256         if (unlikely(mdd_cobj == NULL))
2257                 GOTO(cleanup, rc);
2258
2259         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
2260                 /* update ctime of an unlinked file only if it is still opened
2261                  * or a link still exists
2262                  */
2263                 la->la_valid = LA_CTIME;
2264                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
2265                 if (rc)
2266                         GOTO(cleanup, rc);
2267         }
2268
2269         /* XXX: this transfer to ma will be removed with LOD/OSP */
2270         ma->ma_attr = *cattr;
2271         ma->ma_valid |= MA_INODE;
2272         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
2273         if (rc != 0)
2274                 GOTO(cleanup, rc);
2275
2276         /* fetch updated nlink */
2277         rc = mdd_la_get(env, mdd_cobj, cattr);
2278         /* if object is removed then we can't get its attrs, use last get */
2279         if (rc == -ENOENT) {
2280                 cattr->la_nlink = 0;
2281                 rc = 0;
2282         }
2283
2284         if (cattr->la_nlink == 0) {
2285                 ma->ma_attr = *cattr;
2286                 ma->ma_valid |= MA_INODE;
2287         }
2288
2289         EXIT;
2290 cleanup:
2291         if (name != lname->ln_name)
2292                 kfree(name);
2293
2294         if (likely(mdd_cobj != NULL))
2295                 mdd_write_unlock(env, mdd_cobj);
2296
2297         if (rc == 0) {
2298                 if (mdd_is_dead_obj(mdd_cobj))
2299                         cl_flags |= CLF_UNLINK_LAST;
2300                 else
2301                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
2302
2303                 rc = mdd_changelog_ns_store(env, mdd,
2304                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
2305                         mdd_cobj, mdd_pobj, pattr, NULL,
2306                         NULL, NULL, lname, NULL, handle);
2307         }
2308
2309 stop:
2310         rc = mdd_trans_stop(env, mdd, rc, handle);
2311
2312         return rc;
2313 }
2314
2315 /*
2316  * The permission has been checked when obj created, no need check again.
2317  */
2318 static int mdd_cd_sanity_check(const struct lu_env *env,
2319                                struct mdd_object *obj)
2320 {
2321         ENTRY;
2322
2323         /* EEXIST check */
2324         if (!obj || mdd_is_dead_obj(obj))
2325                 RETURN(-ENOENT);
2326
2327         RETURN(0);
2328 }
2329
2330 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
2331                            struct md_object *cobj,
2332                            const struct md_op_spec *spec, struct md_attr *ma)
2333 {
2334         struct mdd_device *mdd = mdo2mdd(cobj);
2335         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2336         struct mdd_object *son = md2mdd_obj(cobj);
2337         struct thandle *handle;
2338         const struct lu_buf *buf;
2339         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
2340         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
2341         int rc;
2342
2343         ENTRY;
2344
2345         rc = mdd_cd_sanity_check(env, son);
2346         if (rc)
2347                 RETURN(rc);
2348
2349         if (!md_should_create(spec->sp_cr_flags))
2350                 RETURN(0);
2351
2352         /*
2353          * there are following use cases for this function:
2354          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
2355          *    striping can be specified or not
2356          * 2) CMD?
2357          */
2358         rc = mdd_la_get(env, son, attr);
2359         if (rc)
2360                 RETURN(rc);
2361
2362         /* calling ->ah_make_hint(), used to transfer information from parent */
2363         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2364
2365         handle = mdd_trans_create(env, mdd);
2366         if (IS_ERR(handle))
2367                 GOTO(out_free, rc = PTR_ERR(handle));
2368
2369         /*
2370          * XXX: Setting the lov ea is not locked but setting the attr is locked?
2371          * Should this be fixed?
2372          */
2373         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#lo, no_create %u\n",
2374                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
2375                spec->sp_cr_flags, spec->no_create);
2376
2377         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
2378                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2379                                         spec->u.sp_ea.eadatalen);
2380         } else {
2381                 buf = &LU_BUF_NULL;
2382         }
2383
2384         rc = dt_declare_xattr_set(env, mdd_object_child(son), NULL, buf,
2385                                   XATTR_NAME_LOV, 0, handle);
2386         if (rc)
2387                 GOTO(stop, rc);
2388
2389         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
2390                                          handle);
2391         if (rc)
2392                 GOTO(stop, rc);
2393
2394         rc = mdd_trans_start(env, mdd, handle);
2395         if (rc)
2396                 GOTO(stop, rc);
2397
2398         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
2399                           0, handle);
2400
2401         if (rc)
2402                 GOTO(stop, rc);
2403
2404         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle,
2405                                       NULL);
2406
2407 stop:
2408         rc = mdd_trans_stop(env, mdd, rc, handle);
2409
2410 out_free:
2411         RETURN(rc);
2412 }
2413
2414 static int mdd_declare_object_initialize(const struct lu_env *env,
2415                                          struct mdd_object *parent,
2416                                          struct mdd_object *child,
2417                                          const struct lu_attr *attr,
2418                                          struct thandle *handle)
2419 {
2420         int rc;
2421
2422         ENTRY;
2423
2424         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
2425         if (!S_ISDIR(attr->la_mode))
2426                 RETURN(0);
2427
2428         rc = mdo_declare_index_insert(env, child, mdd_object_fid(child),
2429                                       S_IFDIR, dot, handle);
2430         if (rc != 0)
2431                 RETURN(rc);
2432
2433         rc = mdo_declare_ref_add(env, child, handle);
2434         if (rc != 0)
2435                 RETURN(rc);
2436
2437         rc = mdo_declare_index_insert(env, child, mdd_object_fid(parent),
2438                                       S_IFDIR, dotdot, handle);
2439
2440         RETURN(rc);
2441 }
2442
2443 static int mdd_object_initialize(const struct lu_env *env,
2444                                  const struct lu_fid *pfid,
2445                                  struct mdd_object *child,
2446                                  struct lu_attr *attr,
2447                                  struct thandle *handle)
2448 {
2449         int rc = 0;
2450
2451         ENTRY;
2452
2453         if (S_ISDIR(attr->la_mode)) {
2454                 /* Add "." and ".." for newly created dir */
2455                 mdo_ref_add(env, child, handle);
2456                 rc = __mdd_index_insert_only(env, child, mdd_object_fid(child),
2457                                              S_IFDIR, dot, handle);
2458                 if (rc == 0)
2459                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
2460                                                      dotdot, handle);
2461                 if (rc != 0)
2462                         mdo_ref_del(env, child, handle);
2463         }
2464
2465         RETURN(rc);
2466 }
2467
2468 /**
2469  * This function checks whether it can create a file/dir under the
2470  * directory(@pobj). The directory(@pobj) is not being locked by
2471  * mdd lock.
2472  *
2473  * \param[in] env       execution environment
2474  * \param[in] pobj      the directory to create files
2475  * \param[in] pattr     the attributes of the directory
2476  * \param[in] lname     the name of the created file/dir
2477  * \param[in] cattr     the attributes of the file/dir
2478  * \param[in] spec      create specification
2479  *
2480  * \retval              = 0 it is allowed to create file/dir under
2481  *                      the directory
2482  * \retval              negative error not allowed to create file/dir
2483  *                      under the directory
2484  */
2485 static int mdd_create_sanity_check(const struct lu_env *env,
2486                                    struct md_object *pobj,
2487                                    const struct lu_attr *pattr,
2488                                    const struct lu_name *lname,
2489                                    struct lu_attr *cattr,
2490                                    struct md_op_spec *spec)
2491 {
2492         struct mdd_thread_info *info = mdd_env_info(env);
2493         struct lu_fid *fid = &info->mdi_fid;
2494         struct mdd_object *obj = md2mdd_obj(pobj);
2495         struct mdd_device *m = mdo2mdd(pobj);
2496         bool check_perm = true;
2497         int rc;
2498
2499         ENTRY;
2500
2501         /* EEXIST check */
2502         if (mdd_is_dead_obj(obj))
2503                 RETURN(-ENOENT);
2504
2505         /*
2506          * In some cases this lookup is not needed - we know before if name
2507          * exists or not because MDT performs lookup for it.
2508          * name length check is done in lookup.
2509          */
2510         if (spec->sp_cr_lookup) {
2511                 /*
2512                  * Check if the name already exist, though it will be checked in
2513                  * _index_insert also, for avoiding rolling back if exists
2514                  * _index_insert.
2515                  */
2516                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2517                                   MAY_WRITE | MAY_EXEC);
2518                 if (rc != -ENOENT)
2519                         RETURN(rc ? : -EEXIST);
2520
2521                 /* Permission is already being checked in mdd_lookup */
2522                 check_perm = false;
2523         }
2524
2525         if (S_ISDIR(cattr->la_mode) &&
2526             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2527             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2528                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2529
2530                 if (!lmv_user_magic_supported(le32_to_cpu(lum->lum_magic)) &&
2531                     !(spec->sp_replay &&
2532                       lum->lum_magic == cpu_to_le32(LMV_MAGIC_V1))) {
2533                         rc = -EINVAL;
2534                         CERROR("%s: invalid lmv_user_md: magic=%x hash=%x stripe_offset=%d stripe_count=%u: rc = %d\n",
2535                                mdd2obd_dev(m)->obd_name,
2536                                le32_to_cpu(lum->lum_magic),
2537                                le32_to_cpu(lum->lum_hash_type),
2538                                (int)le32_to_cpu(lum->lum_stripe_offset),
2539                                le32_to_cpu(lum->lum_stripe_count), rc);
2540                         RETURN(rc);
2541                 }
2542         }
2543
2544         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2545         if (rc != 0)
2546                 RETURN(rc);
2547
2548         /* sgid check */
2549         if (pattr->la_mode & S_ISGID) {
2550                 struct lu_ucred *uc = lu_ucred(env);
2551
2552                 cattr->la_gid = pattr->la_gid;
2553
2554                 /* Directories are special, and always inherit S_ISGID */
2555                 if (S_ISDIR(cattr->la_mode)) {
2556                         cattr->la_mode |= S_ISGID;
2557                         cattr->la_valid |= LA_MODE;
2558                 } else if ((cattr->la_mode & (S_ISGID | 0010))
2559                                 == (S_ISGID | 0010) &&
2560                            !lustre_in_group_p(uc,
2561                                               (cattr->la_valid & LA_GID) ?
2562                                               cattr->la_gid : pattr->la_gid) &&
2563                            !cap_raised(uc->uc_cap, CAP_FSETID)) {
2564                         cattr->la_mode &= ~S_ISGID;
2565                         cattr->la_valid |= LA_MODE;
2566                 }
2567         }
2568
2569         /* Inherit project ID from parent directory */
2570         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2571                 cattr->la_projid = pattr->la_projid;
2572                 if (S_ISDIR(cattr->la_mode)) {
2573                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2574                         cattr->la_valid |= LA_FLAGS;
2575                 }
2576                 cattr->la_valid |= LA_PROJID;
2577         }
2578
2579         rc = mdd_name_check(env, m, lname);
2580         if (rc < 0)
2581                 RETURN(rc);
2582
2583         switch (cattr->la_mode & S_IFMT) {
2584         case S_IFLNK: {
2585                 unsigned int symlen = spec->u.sp_symname.ln_namelen + 1;
2586
2587                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2588                         RETURN(-ENAMETOOLONG);
2589                 else
2590                         RETURN(0);
2591         }
2592         case S_IFDIR:
2593         case S_IFREG:
2594         case S_IFCHR:
2595         case S_IFBLK:
2596         case S_IFIFO:
2597         case S_IFSOCK:
2598                 rc = 0;
2599                 break;
2600         default:
2601                 rc = -EINVAL;
2602                 break;
2603         }
2604         RETURN(rc);
2605 }
2606
2607 static int mdd_declare_create_object(const struct lu_env *env,
2608                                      struct mdd_device *mdd,
2609                                      struct mdd_object *p, struct mdd_object *c,
2610                                      struct lu_attr *attr,
2611                                      struct thandle *handle,
2612                                      const struct md_op_spec *spec,
2613                                      struct lu_buf *def_acl_buf,
2614                                      struct lu_buf *acl_buf,
2615                                      struct lu_buf *hsm_buf,
2616                                      struct dt_allocation_hint *hint)
2617 {
2618         const struct lu_buf *buf;
2619         int rc;
2620
2621 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2622         /* ldiskfs OSD needs this information for credit allocation */
2623         if (def_acl_buf)
2624                 hint->dah_acl_len = def_acl_buf->lb_len;
2625 #endif
2626         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2627                                                 hint);
2628         if (rc)
2629                 GOTO(out, rc);
2630
2631 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2632         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2633                 /* if dir, then can inherit default ACl */
2634                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2635                                            XATTR_NAME_ACL_DEFAULT,
2636                                            0, handle);
2637                 if (rc)
2638                         GOTO(out, rc);
2639         }
2640
2641         if (acl_buf && acl_buf->lb_len > 0) {
2642                 rc = mdo_declare_attr_set(env, c, attr, handle);
2643                 if (rc)
2644                         GOTO(out, rc);
2645
2646                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2647                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2648                 if (rc)
2649                         GOTO(out, rc);
2650         }
2651 #endif
2652         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2653         if (rc)
2654                 GOTO(out, rc);
2655
2656         /* replay case, create LOV EA from client data */
2657         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2658             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2659                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2660                                         spec->u.sp_ea.eadatalen);
2661                 rc = mdo_declare_xattr_set(env, c, buf,
2662                                            S_ISDIR(attr->la_mode) ?
2663                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2664                                            LU_XATTR_CREATE, handle);
2665                 if (rc)
2666                         GOTO(out, rc);
2667
2668                 if (spec->sp_cr_flags & MDS_OPEN_PCC) {
2669                         rc = mdo_declare_xattr_set(env, c, hsm_buf,
2670                                                    XATTR_NAME_HSM,
2671                                                    0, handle);
2672                         if (rc)
2673                                 GOTO(out, rc);
2674                 }
2675         }
2676
2677         if (S_ISLNK(attr->la_mode)) {
2678                 const char *target_name = spec->u.sp_symname.ln_name;
2679                 int sym_len = spec->u.sp_symname.ln_namelen;
2680                 const struct lu_buf *buf;
2681
2682                 buf = mdd_buf_get_const(env, target_name, sym_len);
2683                 rc = dt_declare_record_write(env, mdd_object_child(c),
2684                                              buf, 0, handle);
2685                 if (rc)
2686                         GOTO(out, rc);
2687         }
2688
2689         if (spec->sp_cr_file_secctx_name != NULL) {
2690                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2691                                         spec->sp_cr_file_secctx_size);
2692                 rc = mdo_declare_xattr_set(env, c, buf,
2693                                            spec->sp_cr_file_secctx_name, 0,
2694                                            handle);
2695                 if (rc < 0)
2696                         GOTO(out, rc);
2697         }
2698
2699         if (spec->sp_cr_file_encctx != NULL) {
2700                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2701                                         spec->sp_cr_file_encctx_size);
2702                 rc = mdo_declare_xattr_set(env, c, buf,
2703                                            LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2704                                            handle);
2705                 if (rc < 0)
2706                         GOTO(out, rc);
2707         }
2708 out:
2709         return rc;
2710 }
2711
2712 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2713                               struct mdd_object *p, struct mdd_object *c,
2714                               const struct lu_name *name,
2715                               struct lu_attr *attr,
2716                               struct thandle *handle,
2717                               const struct md_op_spec *spec,
2718                               struct linkea_data *ldata,
2719                               struct lu_buf *def_acl_buf,
2720                               struct lu_buf *acl_buf,
2721                               struct lu_buf *hsm_buf,
2722                               struct dt_allocation_hint *hint)
2723 {
2724         int rc;
2725
2726         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2727                                        def_acl_buf, acl_buf, hsm_buf, hint);
2728         if (rc)
2729                 GOTO(out, rc);
2730
2731         if (S_ISDIR(attr->la_mode)) {
2732                 rc = mdo_declare_ref_add(env, p, handle);
2733                 if (rc)
2734                         GOTO(out, rc);
2735         }
2736
2737         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2738                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2739                 if (rc)
2740                         GOTO(out, rc);
2741         } else {
2742                 struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
2743                 enum changelog_rec_type type;
2744
2745                 rc = mdo_declare_index_insert(env, p, mdd_object_fid(c),
2746                                               attr->la_mode, name->ln_name,
2747                                               handle);
2748                 if (rc != 0)
2749                         return rc;
2750
2751                 rc = mdd_declare_links_add(env, c, handle, ldata);
2752                 if (rc)
2753                         return rc;
2754
2755                 *la = *attr;
2756                 la->la_valid = LA_CTIME | LA_MTIME;
2757                 rc = mdo_declare_attr_set(env, p, la, handle);
2758                 if (rc)
2759                         return rc;
2760
2761                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2762                        S_ISREG(attr->la_mode) ? CL_CREATE :
2763                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2764
2765                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2766                                                  handle);
2767                 if (rc)
2768                         return rc;
2769         }
2770 out:
2771         return rc;
2772 }
2773
2774 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2775                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2776                         struct lu_buf *acl_buf)
2777 {
2778         int rc;
2779
2780         ENTRY;
2781
2782         if (S_ISLNK(la->la_mode)) {
2783                 acl_buf->lb_len = 0;
2784                 def_acl_buf->lb_len = 0;
2785                 RETURN(0);
2786         }
2787
2788         mdd_read_lock(env, pobj, DT_TGT_PARENT);
2789         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2790                            XATTR_NAME_ACL_DEFAULT);
2791         mdd_read_unlock(env, pobj);
2792         if (rc > 0) {
2793                 /* ACL buffer size is not enough, need realloc */
2794                 if (rc > acl_buf->lb_len)
2795                         RETURN(-ERANGE);
2796
2797                 /* If there are default ACL, fix mode/ACL by default ACL */
2798                 def_acl_buf->lb_len = rc;
2799                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2800                 acl_buf->lb_len = rc;
2801                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2802                 if (rc < 0)
2803                         RETURN(rc);
2804         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2805                 /* If there are no default ACL, fix mode by mask */
2806                 struct lu_ucred *uc = lu_ucred(env);
2807
2808                 /* The create triggered by MDT internal events, such as
2809                  * LFSCK reset, will not contain valid "uc".
2810                  */
2811                 if (unlikely(uc != NULL))
2812                         la->la_mode &= ~uc->uc_umask;
2813                 rc = 0;
2814                 acl_buf->lb_len = 0;
2815                 def_acl_buf->lb_len = 0;
2816         }
2817
2818         RETURN(rc);
2819 }
2820
2821 /**
2822  * Create a metadata object and initialize it, set acl, xattr.
2823  **/
2824 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2825                              struct mdd_object *son, struct lu_attr *attr,
2826                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2827                              struct lu_buf *def_acl_buf,
2828                              struct lu_buf *hsm_buf,
2829                              struct dt_allocation_hint *hint,
2830                              struct thandle *handle, bool initial_create)
2831 {
2832         const struct lu_fid *son_fid = mdd_object_fid(son);
2833         const struct lu_ucred *uc = lu_ucred(env);
2834         const char *jobid = uc->uc_jobid;
2835         const struct lu_buf *buf;
2836         size_t jobid_len;
2837         int rc;
2838
2839         mdd_write_lock(env, son, DT_TGT_CHILD);
2840         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2841                                         hint);
2842         if (rc)
2843                 GOTO(unlock, rc);
2844
2845         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2846          * created in declare phase, they also needs to be added to master
2847          * object as sub-directory entry. So it has to initialize the master
2848          * object, then set dir striped EA.(in mdo_xattr_set)
2849          */
2850         rc = mdd_object_initialize(env, mdd_object_fid(pobj), son, attr,
2851                                    handle);
2852         if (rc != 0)
2853                 GOTO(err_destroy, rc);
2854
2855         /*
2856          * in case of replay we just set LOVEA provided by the client
2857          * XXX: I think it would be interesting to try "old" way where
2858          *      MDT calls this xattr_set(LOV) in a different transaction.
2859          *      probably this way we code can be made better.
2860          */
2861
2862         /* During creation, there are only a few cases we need do xattr_set to
2863          * create stripes.
2864          * 1. regular file: see comments above.
2865          * 2. dir: inherit default striping or pool settings from parent.
2866          * 3. create striped directory with provided stripeEA.
2867          * 4. create striped directory because inherit default layout from the
2868          * parent.
2869          */
2870         if (spec->no_create ||
2871             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2872             S_ISDIR(attr->la_mode)) {
2873                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2874                                         spec->u.sp_ea.eadatalen);
2875                 rc = mdo_xattr_set(env, son, buf,
2876                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2877                                                             XATTR_NAME_LOV,
2878                                    LU_XATTR_CREATE, handle);
2879                 if (rc != 0)
2880                         GOTO(err_destroy, rc);
2881         }
2882
2883         if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
2884                 struct md_hsm mh;
2885
2886                 memset(&mh, 0, sizeof(mh));
2887                 mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
2888                 mh.mh_arch_id = spec->sp_archive_id;
2889                 lustre_hsm2buf(hsm_buf->lb_buf, &mh);
2890                 rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
2891                                    0, handle);
2892                 if (rc != 0)
2893                         GOTO(err_destroy, rc);
2894         }
2895
2896 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
2897         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2898             S_ISDIR(attr->la_mode)) {
2899                 /* set default acl */
2900                 rc = mdo_xattr_set(env, son, def_acl_buf,
2901                                    XATTR_NAME_ACL_DEFAULT, 0,
2902                                    handle);
2903                 if (rc)
2904                         GOTO(err_destroy, rc);
2905         }
2906         /* set its own acl */
2907         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2908                 rc = mdo_xattr_set(env, son, acl_buf,
2909                                    XATTR_NAME_ACL_ACCESS,
2910                                    0, handle);
2911                 if (rc)
2912                         GOTO(err_destroy, rc);
2913         }
2914 #endif
2915
2916         if (S_ISLNK(attr->la_mode)) {
2917                 struct dt_object *dt = mdd_object_child(son);
2918                 const char *target_name = spec->u.sp_symname.ln_name;
2919                 int sym_len = spec->u.sp_symname.ln_namelen;
2920                 loff_t pos = 0;
2921
2922                 buf = mdd_buf_get_const(env, target_name, sym_len);
2923                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
2924                 if (rc == sym_len)
2925                         rc = 0;
2926                 else
2927                         GOTO(err_initlized, rc = -EFAULT);
2928         }
2929
2930         if (initial_create && spec->sp_cr_file_secctx_name != NULL) {
2931                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2932                                         spec->sp_cr_file_secctx_size);
2933                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2934                                    0, handle);
2935                 if (rc < 0)
2936                         GOTO(err_initlized, rc);
2937         }
2938
2939         if (spec->sp_cr_file_encctx != NULL) {
2940                 buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
2941                                         spec->sp_cr_file_encctx_size);
2942                 rc = mdo_xattr_set(env, son, buf,
2943                                    LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
2944                                    handle);
2945                 if (rc < 0)
2946                         GOTO(err_initlized, rc);
2947         }
2948
2949         /* remove the quotes if present before storing it in the xattr. */
2950         if (initial_create &&
2951             spec->sp_cr_job_xattr[0] != '\0' &&
2952             jobid[0] != '\0' &&
2953             (S_ISREG(attr->la_mode) ||
2954              S_ISDIR(attr->la_mode))) {
2955                 jobid_len = strnlen(jobid, LUSTRE_JOBID_SIZE);
2956                 if (jobid_len > 1 &&
2957                     jobid[0] == '"' &&
2958                     jobid[jobid_len - 1] == '"'){
2959                         jobid++;
2960                         jobid_len -= 2;
2961                 }
2962                 if (jobid_len > 0) {
2963                         buf = mdd_buf_get_const(env, jobid, jobid_len);
2964                         rc = mdo_xattr_set(env, son, buf,
2965                                            spec->sp_cr_job_xattr, 0,
2966                                            handle);
2967                         /* this xattr is nonessential, so ignore errors. */
2968                         if (rc != 0) {
2969                                 CDEBUG(D_INODE,
2970                                        DFID
2971                                        " failed to set xattr '%s': rc = %d\n",
2972                                        PFID(son_fid),
2973                                        spec->sp_cr_job_xattr, rc);
2974                                 rc = 0;
2975                         }
2976                 }
2977         }
2978
2979 err_initlized:
2980         if (unlikely(rc != 0)) {
2981                 int rc2;
2982
2983                 if (S_ISDIR(attr->la_mode)) {
2984                         /* Drop the reference, no need to delete "."/"..",
2985                          * because the object to be destroyed directly.
2986                          */
2987                         rc2 = mdo_ref_del(env, son, handle);
2988                         if (rc2 != 0)
2989                                 GOTO(unlock, rc);
2990                 }
2991                 rc2 = mdo_ref_del(env, son, handle);
2992                 if (rc2 != 0)
2993                         GOTO(unlock, rc);
2994 err_destroy:
2995                 mdo_destroy(env, son, handle);
2996         }
2997 unlock:
2998         mdd_write_unlock(env, son);
2999         RETURN(rc);
3000 }
3001
3002 static int mdd_index_delete(const struct lu_env *env,
3003                             struct mdd_object *mdd_pobj,
3004                             struct lu_attr *cattr,
3005                             const struct lu_name *lname)
3006 {
3007         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
3008         struct thandle *handle;
3009         int rc;
3010
3011         ENTRY;
3012
3013         handle = mdd_trans_create(env, mdd);
3014         if (IS_ERR(handle))
3015                 RETURN(PTR_ERR(handle));
3016
3017         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
3018                                       handle);
3019         if (rc != 0)
3020                 GOTO(stop, rc);
3021
3022         if (S_ISDIR(cattr->la_mode)) {
3023                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3024                 if (rc != 0)
3025                         GOTO(stop, rc);
3026         }
3027
3028         /* Since this will only be used in the error handler path,
3029          * Let's set the thandle to be local and not mess the transno
3030          */
3031         handle->th_local = 1;
3032         rc = mdd_trans_start(env, mdd, handle);
3033         if (rc)
3034                 GOTO(stop, rc);
3035
3036         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
3037                                 S_ISDIR(cattr->la_mode), handle);
3038         if (rc)
3039                 GOTO(stop, rc);
3040 stop:
3041         rc = mdd_trans_stop(env, mdd, rc, handle);
3042
3043         RETURN(rc);
3044 }
3045
3046 /**
3047  * Create object and insert it into namespace.
3048  *
3049  * Two operations have to be performed:
3050  *
3051  *  - an allocation of a new object (->do_create()), and
3052  *  - an insertion into a parent index (->dio_insert()).
3053  *
3054  * Due to locking, operation order is not important, when both are
3055  * successful, *but* error handling cases are quite different:
3056  *
3057  *  - if insertion is done first, and following object creation fails,
3058  *  insertion has to be rolled back, but this operation might fail
3059  *  also leaving us with dangling index entry.
3060  *
3061  *  - if creation is done first, is has to be undone if insertion fails,
3062  *  leaving us with leaked space, which is not good but not fatal.
3063  *
3064  * It seems that creation-first is simplest solution, but it is sub-optimal
3065  * in the frequent
3066  *
3067  * $ mkdir foo
3068  * $ mkdir foo
3069  *
3070  * case, because second mkdir is bound to create object, only to
3071  * destroy it immediately.
3072  *
3073  * To avoid this follow local file systems that do double lookup:
3074  *
3075  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
3076  * 1. create            (mdd_create_object_internal())
3077  * 2. insert            (__mdd_index_insert(), lookup again)
3078  *
3079  * \param[in] pobj      parent object
3080  * \param[in] lname     name of child being created
3081  * \param[in,out] child child object being created
3082  * \param[in] spec      additional create parameters
3083  * \param[in] ma        attributes for new child object
3084  *
3085  * \retval              0 on success
3086  * \retval              negative errno on failure
3087  */
3088 int mdd_create(const struct lu_env *env, struct md_object *pobj,
3089                       const struct lu_name *lname, struct md_object *child,
3090                       struct md_op_spec *spec, struct md_attr *ma)
3091 {
3092         struct mdd_thread_info *info = mdd_env_info(env);
3093         struct lu_attr *la = &info->mdi_la_for_fix;
3094         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
3095         struct mdd_object *son = md2mdd_obj(child);
3096         struct mdd_device *mdd = mdo2mdd(pobj);
3097         struct lu_attr *attr = &ma->ma_attr;
3098         struct thandle *handle;
3099         struct lu_attr *pattr = &info->mdi_pattr;
3100         struct lu_buf acl_buf;
3101         struct lu_buf def_acl_buf;
3102         struct lu_buf hsm_buf;
3103         struct linkea_data *ldata = &info->mdi_link_data;
3104         const char *name = lname->ln_name;
3105         struct dt_allocation_hint *hint = &mdd_env_info(env)->mdi_hint;
3106         int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
3107         bool name_inserted = false;
3108         int rc, rc2;
3109
3110         ENTRY;
3111
3112         rc = mdd_la_get(env, mdd_pobj, pattr);
3113         if (rc != 0)
3114                 RETURN(rc);
3115
3116         /* Sanity checks before big job. */
3117         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
3118         if (unlikely(rc == -EEXIST && S_ISDIR(attr->la_mode) &&
3119                      spec->sp_replay && mdd_object_remote(mdd_pobj)))
3120                 /* if it's replay by client request, and name is found in
3121                  * parent directory on remote MDT, it means mkdir was partially
3122                  * executed: name was successfully added, but target not.
3123                  */
3124                 name_inserted = true;
3125         else if (rc)
3126                 RETURN(rc);
3127
3128         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
3129                 GOTO(out_free, rc = -EINPROGRESS);
3130
3131         handle = mdd_trans_create(env, mdd);
3132         if (IS_ERR(handle))
3133                 GOTO(out_free, rc = PTR_ERR(handle));
3134
3135 use_bigger_buffer:
3136         acl_buf = *lu_buf_check_and_alloc(&info->mdi_xattr_buf, acl_size);
3137         if (!acl_buf.lb_buf)
3138                 GOTO(out_stop, rc = -ENOMEM);
3139
3140         def_acl_buf = *lu_buf_check_and_alloc(&info->mdi_big_buf, acl_size);
3141         if (!def_acl_buf.lb_buf)
3142                 GOTO(out_stop, rc = -ENOMEM);
3143
3144         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
3145         if (unlikely(rc == -ERANGE &&
3146                      acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
3147                 /* use maximum-sized xattr buffer for too-big default ACL */
3148                 acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
3149                                  XATTR_SIZE_MAX);
3150                 goto use_bigger_buffer;
3151         }
3152         if (rc < 0)
3153                 GOTO(out_stop, rc);
3154
3155         /* adjust stripe count to 0 for 'lfs mkdir -c 1 ...' to avoid creating
3156          * 1-stripe directory, MDS_OPEN_DEFAULT_LMV means ea is default LMV.
3157          */
3158         if (unlikely(S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata &&
3159                      !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV))) {
3160                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
3161
3162                 /* migrate may create 1-stripe directory, adjust stripe count
3163                  * before lod_ah_init().
3164                  */
3165                 if (lmu && lmu->lum_magic == cpu_to_le32(LMV_USER_MAGIC) &&
3166                     lmu->lum_stripe_count == cpu_to_le32(1))
3167                         lmu->lum_stripe_count = 0;
3168         }
3169
3170         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
3171
3172         memset(ldata, 0, sizeof(*ldata));
3173         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
3174                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
3175
3176                 tfid.f_oid--;
3177                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
3178                                         &tfid, lname, 1, 0, ldata);
3179         } else {
3180                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
3181                                         mdd_object_fid(mdd_pobj),
3182                                         lname, 1, 0, ldata);
3183         }
3184
3185         if (spec->sp_cr_flags & MDS_OPEN_PCC) {
3186                 LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
3187
3188                 memset(&hsm_buf, 0, sizeof(hsm_buf));
3189                 lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
3190                 if (hsm_buf.lb_buf == NULL)
3191                         GOTO(out_stop, rc = -ENOMEM);
3192         }
3193
3194         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
3195                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
3196                                 &hsm_buf, hint);
3197         if (rc)
3198                 GOTO(out_stop, rc);
3199
3200         rc = mdd_trans_start(env, mdd, handle);
3201         if (rc)
3202                 GOTO(out_stop, rc);
3203
3204         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
3205                                &def_acl_buf, &hsm_buf, hint, handle, true);
3206         if (rc != 0)
3207                 GOTO(out_stop, rc);
3208
3209         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
3210                 mdd_write_lock(env, son, DT_TGT_CHILD);
3211                 son->mod_flags |= VOLATILE_OBJ;
3212                 rc = mdd_orphan_insert(env, son, handle);
3213                 GOTO(out_volatile, rc);
3214         } else {
3215                 if (likely(!name_inserted)) {
3216                         rc = __mdd_index_insert(env, mdd_pobj,
3217                                                 mdd_object_fid(son),
3218                                                 attr->la_mode, name, handle);
3219                         if (rc != 0)
3220                                 GOTO(err_created, rc);
3221                 }
3222
3223                 mdd_links_add(env, son, mdd_object_fid(mdd_pobj), lname,
3224                               handle, ldata, 1);
3225
3226                 /* update parent directory mtime/ctime */
3227                 *la = *attr;
3228                 la->la_valid = LA_CTIME | LA_MTIME;
3229                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
3230                 if (rc)
3231                         GOTO(err_insert, rc);
3232         }
3233
3234         EXIT;
3235 err_insert:
3236         if (rc != 0) {
3237                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
3238                         rc2 = mdd_orphan_delete(env, son, handle);
3239                 else
3240                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
3241                                                  S_ISDIR(attr->la_mode),
3242                                                  handle);
3243                 if (rc2 != 0)
3244                         goto out_stop;
3245
3246 err_created:
3247                 mdd_write_lock(env, son, DT_TGT_CHILD);
3248                 if (S_ISDIR(attr->la_mode)) {
3249                         /* Drop the reference, no need to delete "."/"..",
3250                          * because the object is to be destroyed directly.
3251                          */
3252                         rc2 = mdo_ref_del(env, son, handle);
3253                         if (rc2 != 0) {
3254                                 mdd_write_unlock(env, son);
3255                                 goto out_stop;
3256                         }
3257                 }
3258 out_volatile:
3259                 /* For volatile files drop one link immediately, since there is
3260                  * no filename in the namespace, and save any error returned.
3261                  */
3262                 rc2 = mdo_ref_del(env, son, handle);
3263                 if (rc2 != 0) {
3264                         mdd_write_unlock(env, son);
3265                         if (unlikely(rc == 0))
3266                                 rc = rc2;
3267                         goto out_stop;
3268                 }
3269
3270                 /* Don't destroy the volatile object on success */
3271                 if (likely(rc != 0))
3272                         mdo_destroy(env, son, handle);
3273                 mdd_write_unlock(env, son);
3274         }
3275
3276         if (rc == 0 && fid_is_namespace_visible(mdd_object_fid(son)) &&
3277             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
3278                 rc = mdd_changelog_ns_store(env, mdd,
3279                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
3280                                 S_ISREG(attr->la_mode) ? CL_CREATE :
3281                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
3282                                 0, son, mdd_pobj, pattr, NULL, NULL, NULL,
3283                                 lname, NULL, handle);
3284 out_stop:
3285         rc2 = mdd_trans_stop(env, mdd, rc, handle);
3286         if (rc == 0) {
3287                 /* If creation fails, it is most likely due to the remote update
3288                  * failure, because local transaction will mostly succeed at
3289                  * this stage. There is no easy way to rollback all of previous
3290                  * updates, so let's remove the object from namespace, and
3291                  * LFSCK should handle the orphan object.
3292                  */
3293                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
3294                         mdd_index_delete(env, mdd_pobj, attr, lname);
3295                 rc = rc2;
3296         }
3297 out_free:
3298         if (is_vmalloc_addr(ldata->ld_buf))
3299                 /* if we vmalloced a large buffer drop it */
3300                 lu_buf_free(ldata->ld_buf);
3301
3302         if (spec->sp_cr_flags & MDS_OPEN_PCC)
3303                 lu_buf_free(&hsm_buf);
3304
3305         /* The child object shouldn't be cached anymore */
3306         if (rc)
3307                 set_bit(LU_OBJECT_HEARD_BANSHEE,
3308                         &child->mo_lu.lo_header->loh_flags);
3309         return rc;
3310 }
3311
3312 /* has not mdd_write{read}_lock on any obj yet. */
3313 static int mdd_rename_sanity_check(const struct lu_env *env,
3314                                    struct mdd_object *src_pobj,
3315                                    const struct lu_attr *spattr,
3316                                    struct mdd_object *tgt_pobj,
3317                                    const struct lu_attr *tpattr,
3318                                    struct mdd_object *sobj,
3319                                    const struct lu_attr *sattr,
3320                                    struct mdd_object *tobj,
3321                                    const struct lu_attr *tattr)
3322 {
3323         int rc = 0;
3324
3325         ENTRY;
3326
3327         /* XXX: when get here, sobj must NOT be NULL,
3328          * the other case has been processed in cld_rename
3329          * before mdd_rename and enable MDS_PERM_BYPASS.
3330          */
3331         LASSERT(sobj);
3332
3333         /*
3334          * If we are using project inheritance, we only allow renames
3335          * into our tree when the project IDs are the same; otherwise
3336          * tree quota mechanism would be circumvented.
3337          */
3338         if ((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
3339              tpattr->la_projid != sattr->la_projid && S_ISDIR(sattr->la_mode))
3340                 RETURN(-EXDEV);
3341
3342         /* we prevent an encrypted file from being renamed
3343          * into an unencrypted dir
3344          */
3345         if ((spattr->la_valid & LA_FLAGS &&
3346              spattr->la_flags & LUSTRE_ENCRYPT_FL) &&
3347             !(tpattr->la_valid & LA_FLAGS &&
3348               tpattr->la_flags & LUSTRE_ENCRYPT_FL))
3349                 RETURN(-EXDEV);
3350
3351         rc = mdd_may_delete(env, src_pobj, spattr, sobj, sattr, NULL, 1, 0);
3352         if (rc)
3353                 RETURN(rc);
3354
3355         /* XXX: when get here, "tobj == NULL" means tobj must
3356          * NOT exist (neither on remote MDS, such case has been
3357          * processed in cld_rename before mdd_rename and enable
3358          * MDS_PERM_BYPASS).
3359          * So check may_create, but not check may_unlink.
3360          */
3361         if (tobj == NULL)
3362                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
3363                                     (src_pobj != tgt_pobj));
3364         else
3365                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, sattr,
3366                                     (src_pobj != tgt_pobj), 1);
3367
3368         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(sattr->la_mode))
3369                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
3370
3371         RETURN(rc);
3372 }
3373
3374 static
3375 int mdd_declare_rename(const struct lu_env *env, struct mdd_device *mdd,
3376                      struct mdd_object *mdd_spobj, struct mdd_object *mdd_tpobj,
3377                      struct mdd_object *mdd_sobj, struct mdd_object *mdd_tobj,
3378                      const struct lu_name *sname, const struct lu_name *tname,
3379                      struct md_attr *ma, struct linkea_data *ldata,
3380                      bool change_projid, struct thandle *handle)
3381 {
3382         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
3383         int rc;
3384
3385         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3386         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3387
3388         LASSERT(mdd_spobj);
3389         LASSERT(mdd_tpobj);
3390         LASSERT(mdd_sobj);
3391
3392         /* name from source dir */
3393         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
3394         if (rc)
3395                 return rc;
3396
3397         /* .. from source child */
3398         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3399                 /* source child can be directory, count by source dir's nlink */
3400                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
3401                 if (rc)
3402                         return rc;
3403                 if (mdd_spobj != mdd_tpobj) {
3404                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
3405                                                       handle);
3406                         if (rc != 0)
3407                                 return rc;
3408
3409                         rc = mdo_declare_index_insert(env, mdd_sobj,
3410                                                       mdd_object_fid(mdd_tpobj),
3411                                                       S_IFDIR, dotdot, handle);
3412                         if (rc != 0)
3413                                 return rc;
3414                 }
3415
3416                 /* new target child can be dir, counted by target dir's nlink */
3417                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
3418                 if (rc != 0)
3419                         return rc;
3420         }
3421
3422         la->la_valid = LA_CTIME | LA_MTIME;
3423         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
3424         if (rc != 0)
3425                 return rc;
3426
3427         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
3428         if (rc != 0)
3429                 return rc;
3430
3431         la->la_valid = LA_CTIME;
3432         if (change_projid)
3433                 la->la_valid |= LA_PROJID;
3434         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3435         if (rc)
3436                 return rc;
3437
3438         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
3439         if (rc)
3440                 return rc;
3441
3442         /* new name */
3443         rc = mdo_declare_index_insert(env, mdd_tpobj, mdd_object_fid(mdd_sobj),
3444                                       mdd_object_type(mdd_sobj),
3445                                       tname->ln_name, handle);
3446         if (rc != 0)
3447                 return rc;
3448
3449         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
3450                 /* delete target child in target parent directory */
3451                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
3452                                               handle);
3453                 if (rc)
3454                         return rc;
3455
3456                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
3457                 if (rc)
3458                         return rc;
3459
3460                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
3461                         /* target child can be directory,
3462                          * delete "." reference in target child directory
3463                          */
3464                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
3465                         if (rc)
3466                                 return rc;
3467
3468                         /* delete ".." reference in target parent directory */
3469                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
3470                         if (rc)
3471                                 return rc;
3472                 }
3473
3474                 la->la_valid = LA_CTIME;
3475                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
3476                 if (rc)
3477                         return rc;
3478
3479                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
3480                 if (rc)
3481                         return rc;
3482         }
3483
3484         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
3485                                          handle);
3486         if (rc)
3487                 return rc;
3488
3489         return rc;
3490 }
3491
3492 static
3493 int mdd_migrate_object(const struct lu_env *env, struct mdd_object *spobj,
3494                        struct mdd_object *tpobj, struct mdd_object *sobj,
3495                        struct mdd_object *tobj, const struct lu_name *sname,
3496                        const struct lu_name *tname, struct md_op_spec *spec,
3497                        struct md_attr *ma);
3498
3499 /* src object can be remote that is why we use only fid and type of object */
3500 static int mdd_rename(const struct lu_env *env,  struct md_object *src_pobj,
3501                       struct md_object *tgt_pobj, const struct lu_fid *lf,
3502                       const struct lu_name *lsname, struct md_object *tobj,
3503                       const struct lu_name *ltname, struct md_attr *ma)
3504 {
3505         const char *sname = lsname->ln_name;
3506         const char *tname = ltname->ln_name;
3507         struct lu_attr *la = &mdd_env_info(env)->mdi_la_for_fix;
3508         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
3509         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); /* target parent */
3510         struct mdd_object *mdd_sobj = NULL;                  /* source object */
3511         struct mdd_object *mdd_tobj = NULL;       /* (possible) target object */
3512         struct lu_attr *sattr = MDD_ENV_VAR(env, cattr);
3513         struct lu_attr *spattr = MDD_ENV_VAR(env, pattr);
3514         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
3515         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
3516         struct linkea_data *ldata = &mdd_env_info(env)->mdi_link_data;
3517         const struct lu_fid *tpobj_fid = mdd_object_fid(mdd_tpobj);
3518         const struct lu_fid *spobj_fid = mdd_object_fid(mdd_spobj);
3519         struct mdd_device *mdd = mdo2mdd(src_pobj);
3520         struct thandle *handle;
3521         bool is_dir;
3522         bool tobj_ref = 0;
3523         bool tobj_locked = 0;
3524         bool change_projid = false;
3525         unsigned int cl_flags = 0;
3526         int rc, rc2;
3527
3528         ENTRY;
3529
3530         /* let unlink to complete and commit */
3531         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
3532
3533         if (tobj)
3534                 mdd_tobj = md2mdd_obj(tobj);
3535
3536         mdd_sobj = mdd_object_find(env, mdd, lf);
3537         if (IS_ERR(mdd_sobj))
3538                 RETURN(PTR_ERR(mdd_sobj));
3539
3540         rc = mdd_la_get(env, mdd_sobj, sattr);
3541         if (rc)
3542                 GOTO(out_pending, rc);
3543
3544         /* if rename is cross MDTs, migrate symlink if it doesn't have other
3545          * hard links, and target doesn't exist.
3546          */
3547         if (mdd_object_remote(mdd_sobj) && S_ISLNK(sattr->la_mode) &&
3548             sattr->la_nlink == 1 && !tobj) {
3549                 struct md_op_spec *spec = &mdd_env_info(env)->mdi_spec;
3550                 struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
3551                 struct lu_fid tfid;
3552
3553                 rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
3554                                                NULL);
3555                 if (rc < 0)
3556                         GOTO(out_pending, rc);
3557
3558                 mdd_tobj = mdd_object_find(env, mdd, &tfid);
3559                 if (IS_ERR(mdd_tobj))
3560                         GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
3561
3562                 memset(spec, 0, sizeof(*spec));
3563                 rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
3564                                         mdd_tobj, lsname, ltname, spec, ma);
3565                 mdd_object_put(env, mdd_tobj);
3566                 GOTO(out_pending, rc);
3567         }
3568
3569         rc = mdd_la_get(env, mdd_spobj, spattr);
3570         if (rc)
3571                 GOTO(out_pending, rc);
3572
3573         if (mdd_tobj) {
3574                 rc = mdd_la_get(env, mdd_tobj, tattr);
3575                 if (rc)
3576                         GOTO(out_pending, rc);
3577                 /* search for an existing archive.  we should check ahead as the
3578                  * object can be destroyed in this transaction
3579                  */
3580                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
3581                         cl_flags |= CLF_RENAME_LAST_EXISTS;
3582         }
3583
3584         rc = mdd_la_get(env, mdd_tpobj, tpattr);
3585         if (rc)
3586                 GOTO(out_pending, rc);
3587
3588         rc = mdd_rename_sanity_check(env, mdd_spobj, spattr, mdd_tpobj, tpattr,
3589                                      mdd_sobj, sattr, mdd_tobj, tattr);
3590         if (rc)
3591                 GOTO(out_pending, rc);
3592
3593         rc = mdd_name_check(env, mdd, ltname);
3594         if (rc < 0)
3595                 GOTO(out_pending, rc);
3596
3597         handle = mdd_trans_create(env, mdd);
3598         if (IS_ERR(handle))
3599                 GOTO(out_pending, rc = PTR_ERR(handle));
3600
3601         memset(ldata, 0, sizeof(*ldata));
3602         rc = mdd_linkea_prepare(env, mdd_sobj, spobj_fid, lsname, tpobj_fid,
3603                                 ltname, 1, 0, ldata);
3604         if (rc)
3605                 GOTO(stop, rc);
3606
3607         if (tpattr->la_projid != sattr->la_projid &&
3608             tpattr->la_flags & LUSTRE_PROJINHERIT_FL)
3609                 change_projid = true;
3610
3611         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
3612                                 mdd_tobj, lsname, ltname, ma, ldata,
3613                                 change_projid, handle);
3614         if (rc)
3615                 GOTO(stop, rc);
3616
3617         rc = mdd_trans_start(env, mdd, handle);
3618         if (rc)
3619                 GOTO(stop, rc);
3620
3621         is_dir = S_ISDIR(sattr->la_mode);
3622
3623         /* Remove source name from source directory */
3624         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
3625         if (rc)
3626                 GOTO(stop, rc);
3627
3628         /* "mv dir1 dir2" needs "dir1/.." link update */
3629         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
3630                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3631                 if (rc)
3632                         GOTO(fixup_spobj2, rc);
3633
3634                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
3635                                              dotdot, handle);
3636                 if (rc)
3637                         GOTO(fixup_spobj, rc);
3638         }
3639
3640         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
3641                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3642                 if (rc)
3643                         /* tname might been renamed to something else */
3644                         GOTO(fixup_spobj, rc);
3645         }
3646
3647         /* Insert new fid with target name into target dir */
3648         rc = __mdd_index_insert(env, mdd_tpobj, lf, sattr->la_mode,
3649                                 tname, handle);
3650         if (rc)
3651                 GOTO(fixup_tpobj, rc);
3652
3653         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3654         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3655
3656         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3657         la->la_valid = LA_CTIME;
3658         if (change_projid) {
3659                 /* mdd_update_time honors other valid flags except TIME ones */
3660                 la->la_valid |= LA_PROJID;
3661                 la->la_projid = tpattr->la_projid;
3662         }
3663         rc = mdd_update_time(env, mdd_sobj, sattr, la, handle);
3664         if (rc)
3665                 GOTO(fixup_tpobj, rc);
3666
3667         /* Update the linkEA for the source object */
3668         mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
3669         rc = mdd_links_rename(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3670                               lsname, mdd_object_fid(mdd_tpobj), ltname,
3671                               handle, ldata, 0, 0);
3672         if (rc == -ENOENT)
3673                 /* Old files might not have EA entry */
3674                 mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3675                               lsname, handle, NULL, 0);
3676         mdd_write_unlock(env, mdd_sobj);
3677         /* We don't fail the transaction if the link ea can't be
3678          * updated -- fid2path will use alternate lookup method.
3679          */
3680         rc = 0;
3681
3682         /* Remove old target object
3683          * For tobj is remote case cmm layer has processed
3684          * and set tobj to NULL then. So when tobj is NOT NULL,
3685          * it must be local one.
3686          */
3687         if (tobj && mdd_object_exists(mdd_tobj)) {
3688                 mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
3689                 tobj_locked = 1;
3690                 if (mdd_is_dead_obj(mdd_tobj)) {
3691                         /* should not be dead, something is wrong */
3692                         rc = -EINVAL;
3693                         CERROR("%s: something bad, dead tobj "DFID": rc = %d\n",
3694                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3695                         goto cleanup;
3696                 }
3697                 mdo_ref_del(env, mdd_tobj, handle);
3698
3699                 /* Remove dot reference. */
3700                 if (S_ISDIR(tattr->la_mode))
3701                         mdo_ref_del(env, mdd_tobj, handle);
3702                 tobj_ref = 1;
3703
3704                 /* fetch updated nlink */
3705                 rc = mdd_la_get(env, mdd_tobj, tattr);
3706                 if (rc) {
3707                         CERROR("%s: failed get nlink of tobj "DFID": rc = %d\n",
3708                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3709                         GOTO(fixup_tpobj, rc);
3710                 }
3711
3712                 la->la_valid = LA_CTIME;
3713                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3714                 if (rc) {
3715                         CERROR("%s: failed set ctime of tobj "DFID": rc = %d\n",
3716                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3717                         GOTO(fixup_tpobj, rc);
3718                 }
3719
3720                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3721                 ma->ma_attr = *tattr;
3722                 ma->ma_valid |= MA_INODE;
3723                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3724                                        handle);
3725                 if (rc) {
3726                         CERROR("%s: failed to unlink tobj "DFID": rc = %d\n",
3727                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3728                         GOTO(fixup_tpobj, rc);
3729                 }
3730
3731                 /* fetch updated nlink */
3732                 rc = mdd_la_get(env, mdd_tobj, tattr);
3733                 if (rc == -ENOENT) {
3734                         /* object removed? return the latest known attributes */
3735                         tattr->la_nlink = 0;
3736                         rc = 0;
3737                 } else if (rc) {
3738                         CERROR("%s: failed get nlink of tobj "DFID": rc = %d\n",
3739                                mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc);
3740                         GOTO(fixup_tpobj, rc);
3741                 }
3742                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3743                 ma->ma_attr = *tattr;
3744                 ma->ma_valid |= MA_INODE;
3745
3746                 if (tattr->la_nlink == 0)
3747                         cl_flags |= CLF_RENAME_LAST;
3748                 else
3749                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3750         }
3751
3752         la->la_valid = LA_CTIME | LA_MTIME;
3753         rc = mdd_update_time(env, mdd_spobj, spattr, la, handle);
3754         if (rc)
3755                 GOTO(fixup_tpobj, rc);
3756
3757         if (mdd_spobj != mdd_tpobj) {
3758                 la->la_valid = LA_CTIME | LA_MTIME;
3759                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3760                 if (rc != 0)
3761                         GOTO(fixup_tpobj, rc);
3762         }
3763
3764         EXIT;
3765
3766 fixup_tpobj:
3767         if (rc) {
3768                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3769                 if (rc2)
3770                         CWARN("%s: tpobj "DFID" fix error: rc = %d\n",
3771                               mdd2obd_dev(mdd)->obd_name, PFID(tpobj_fid), rc2);
3772
3773                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3774                     !mdd_is_dead_obj(mdd_tobj)) {
3775                         if (tobj_ref) {
3776                                 mdo_ref_add(env, mdd_tobj, handle);
3777                                 if (is_dir)
3778                                         mdo_ref_add(env, mdd_tobj, handle);
3779                         }
3780
3781                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3782                                                  mdd_object_fid(mdd_tobj),
3783                                                  mdd_object_type(mdd_tobj),
3784                                                  tname, handle);
3785                         if (rc2)
3786                                 CWARN("%s: tpobj "DFID" fix error: rc = %d\n",
3787                                       mdd2obd_dev(mdd)->obd_name,
3788                                       PFID(tpobj_fid), rc2);
3789                 }
3790         }
3791
3792 fixup_spobj:
3793         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3794                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3795                 if (rc2)
3796                         CWARN("%s: spobj "DFID" dotdot delete error: rc = %d\n",
3797                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3798
3799
3800                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3801                                               dotdot, handle);
3802                 if (rc2)
3803                         CWARN("%s: spobj "DFID" dotdot insert error: rc = %d\n",
3804                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3805         }
3806
3807 fixup_spobj2:
3808         if (rc) {
3809                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3810                                          mdd_object_type(mdd_sobj), sname,
3811                                          handle);
3812                 if (rc2)
3813                         CWARN("%s: spobj "DFID" fix error: rc = %d\n",
3814                               mdd2obd_dev(mdd)->obd_name, PFID(spobj_fid), rc2);
3815         }
3816
3817 cleanup:
3818         if (tobj_locked)
3819                 mdd_write_unlock(env, mdd_tobj);
3820
3821         if (rc == 0)
3822                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3823                                             mdd_tobj, mdd_tpobj, tpattr, lf,
3824                                             mdd_spobj, spattr, ltname, lsname,
3825                                             handle);
3826
3827 stop:
3828         rc = mdd_trans_stop(env, mdd, rc, handle);
3829
3830 out_pending:
3831         mdd_object_put(env, mdd_sobj);
3832
3833         return rc;
3834 }
3835
3836 /**
3837  * Check whether we should migrate the file/dir
3838  * return val
3839  *      < 0  permission check failed or other error.
3840  *      = 0  the file can be migrated.
3841  **/
3842 static int mdd_migrate_sanity_check(const struct lu_env *env,
3843                                     struct mdd_device *mdd,
3844                                     struct mdd_object *spobj,
3845                                     struct mdd_object *tpobj,
3846                                     struct mdd_object *sobj,
3847                                     struct mdd_object *tobj,
3848                                     const struct lu_attr *spattr,
3849                                     const struct lu_attr *tpattr,
3850                                     const struct lu_attr *attr,
3851                                     bool nsonly)
3852 {
3853         int rc;
3854
3855         ENTRY;
3856
3857         if (!nsonly && !mdd_object_remote(sobj)) {
3858                 mdd_read_lock(env, sobj, DT_SRC_CHILD);
3859                 if (sobj->mod_count > 0) {
3860                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3861                                mdd_obj_dev_name(sobj),
3862                                PFID(mdd_object_fid(sobj)),
3863                                sobj->mod_count);
3864                         mdd_read_unlock(env, sobj);
3865                         RETURN(-EBUSY);
3866                 }
3867                 mdd_read_unlock(env, sobj);
3868         }
3869
3870         if (mdd_object_exists(tobj))
3871                 RETURN(-EEXIST);
3872
3873         rc = mdd_may_delete(env, spobj, spattr, sobj, attr, NULL, 1, 0);
3874         if (rc)
3875                 RETURN(rc);
3876
3877         rc = mdd_may_create(env, tpobj, tpattr, NULL, true);
3878
3879         RETURN(rc);
3880 }
3881
3882 struct mdd_xattr_entry {
3883         struct list_head        mxe_linkage;
3884         char                   *mxe_name;
3885         struct lu_buf           mxe_buf;
3886 };
3887
3888 struct mdd_xattrs {
3889         struct lu_buf           mx_namebuf;
3890         struct list_head        mx_list;
3891 };
3892
3893 static inline void mdd_xattrs_init(struct mdd_xattrs *xattrs)
3894 {
3895         INIT_LIST_HEAD(&xattrs->mx_list);
3896         xattrs->mx_namebuf.lb_buf = NULL;
3897         xattrs->mx_namebuf.lb_len = 0;
3898 }
3899
3900 static inline void mdd_xattrs_fini(struct mdd_xattrs *xattrs)
3901 {
3902         struct mdd_xattr_entry *entry;
3903         struct mdd_xattr_entry *tmp;
3904
3905         list_for_each_entry_safe(entry, tmp, &xattrs->mx_list, mxe_linkage) {
3906                 lu_buf_free(&entry->mxe_buf);
3907                 list_del(&entry->mxe_linkage);
3908                 OBD_FREE_PTR(entry);
3909         }
3910
3911         lu_buf_free(&xattrs->mx_namebuf);
3912 }
3913
3914 /* read xattrs into buf, but skip LMA, LMV, LINKEA if 'skip_linkea' is
3915  * set, and DMV if 'skip_dmv" is set.
3916  */
3917 static int mdd_xattrs_migrate_prep(const struct lu_env *env,
3918                                    struct mdd_xattrs *xattrs,
3919                                    struct mdd_object *sobj,
3920                                    bool skip_linkea,
3921                                    bool skip_dmv)
3922 {
3923         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
3924         struct mdd_xattr_entry *entry;
3925         bool needencxattr = false;
3926         bool encxattrfound = false;
3927         char *xname;
3928         int list_xsize;
3929         int xlen;
3930         int rem;
3931         int xsize;
3932         int rc;
3933
3934         ENTRY;
3935
3936         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3937         if (list_xsize == -ENODATA)
3938                 RETURN(0);
3939
3940         if (list_xsize < 0)
3941                 RETURN(list_xsize);
3942
3943         if (attr->la_valid & LA_FLAGS &&
3944             attr->la_flags & LUSTRE_ENCRYPT_FL) {
3945                 needencxattr = true;
3946                 list_xsize +=
3947                         strlen(LL_XATTR_NAME_ENCRYPTION_CONTEXT) + 1;
3948         }
3949
3950         lu_buf_alloc(&xattrs->mx_namebuf, list_xsize);
3951         if (xattrs->mx_namebuf.lb_buf == NULL)
3952                 RETURN(-ENOMEM);
3953
3954         rc = mdo_xattr_list(env, sobj, &xattrs->mx_namebuf);
3955         if (rc < 0)
3956                 GOTO(fini, rc);
3957
3958         rem = rc;
3959         rc = 0;
3960         xname = xattrs->mx_namebuf.lb_buf;
3961 reloop:
3962         for (; rem > 0; xname += xlen, rem -= xlen) {
3963                 if (needencxattr &&
3964                     strcmp(xname, LL_XATTR_NAME_ENCRYPTION_CONTEXT) == 0)
3965                         encxattrfound = true;
3966                 xlen = strnlen(xname, rem - 1) + 1;
3967                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3968                     strcmp(XATTR_NAME_LMV, xname) == 0)
3969                         continue;
3970
3971                 if (skip_linkea &&
3972                     strcmp(XATTR_NAME_LINK, xname) == 0)
3973                         continue;
3974
3975                 if (skip_dmv &&
3976                     strcmp(XATTR_NAME_DEFAULT_LMV, xname) == 0)
3977                         continue;
3978
3979                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3980                 if (xsize == -ENODATA)
3981                         continue;
3982                 if (xsize < 0)
3983                         GOTO(fini, rc = xsize);
3984
3985                 OBD_ALLOC_PTR(entry);
3986                 if (!entry)
3987                         GOTO(fini, rc = -ENOMEM);
3988
3989                 lu_buf_alloc(&entry->mxe_buf, xsize);
3990                 if (!entry->mxe_buf.lb_buf) {
3991                         OBD_FREE_PTR(entry);
3992                         GOTO(fini, rc = -ENOMEM);
3993                 }
3994
3995                 rc = mdo_xattr_get(env, sobj, &entry->mxe_buf, xname);
3996                 if (rc < 0) {
3997                         lu_buf_free(&entry->mxe_buf);
3998                         OBD_FREE_PTR(entry);
3999                         if (rc == -ENODATA)
4000                                 continue;
4001                         GOTO(fini, rc);
4002                 }
4003
4004                 entry->mxe_name = xname;
4005                 list_add_tail(&entry->mxe_linkage, &xattrs->mx_list);
4006         }
4007
4008         if (needencxattr && !encxattrfound) {
4009                 xlen = strlen(LL_XATTR_NAME_ENCRYPTION_CONTEXT) + 1;
4010                 strncpy(xname, LL_XATTR_NAME_ENCRYPTION_CONTEXT, xlen);
4011                 rem = xlen;
4012                 GOTO(reloop, 0);
4013         }
4014
4015         RETURN(0);
4016 fini:
4017         mdd_xattrs_fini(xattrs);
4018         RETURN(rc);
4019 }
4020
4021 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
4022                             struct mdd_object *obj,
4023                             const struct lu_buf *buf,
4024                             const char *name,
4025                             int fl, struct thandle *handle);
4026
4027 static int mdd_foreach_xattr(const struct lu_env *env,
4028                              struct mdd_object *tobj,
4029                              struct mdd_xattrs *xattrs,
4030                              struct thandle *handle,
4031                              mdd_xattr_cb cb)
4032 {
4033         struct mdd_xattr_entry *entry;
4034         int rc;
4035
4036         list_for_each_entry(entry, &xattrs->mx_list, mxe_linkage) {
4037                 rc = cb(env, tobj, &entry->mxe_buf, entry->mxe_name, 0, handle);
4038                 if (rc)
4039                         return rc;
4040         }
4041
4042         return 0;
4043 }
4044
4045 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
4046                              struct mdd_object *sobj,
4047                              struct mdd_object *tobj,
4048                              const struct lu_name *sname,
4049                              const struct lu_fid *sfid,
4050                              const struct lu_name *lname,
4051                              const struct lu_fid *fid,
4052                              void *opaque,
4053                              struct thandle *handle);
4054
4055 static int mdd_declare_update_link(const struct lu_env *env,
4056                                    struct mdd_object *sobj,
4057                                    struct mdd_object *tobj,
4058                                    const struct lu_name *tname,
4059                                    const struct lu_fid *tpfid,
4060                                    const struct lu_name *lname,
4061                                    const struct lu_fid *fid,
4062                                    void *unused,
4063                                    struct thandle *handle)
4064 {
4065         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4066         struct mdd_object *pobj;
4067         int rc;
4068
4069         /* ignore tobj */
4070         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4071             !strcmp(tname->ln_name, lname->ln_name))
4072                 return 0;
4073
4074         pobj = mdd_object_find(env, mdd, fid);
4075         if (IS_ERR(pobj))
4076                 return PTR_ERR(pobj);
4077
4078
4079         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4080         if (!rc)
4081                 rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
4082                                               mdd_object_type(sobj),
4083                                               lname->ln_name, handle);
4084         mdd_object_put(env, pobj);
4085         if (rc)
4086                 return rc;
4087
4088         rc = mdo_declare_ref_add(env, tobj, handle);
4089         if (rc)
4090                 return rc;
4091
4092         rc = mdo_declare_ref_del(env, sobj, handle);
4093         return rc;
4094 }
4095
4096 static int mdd_update_link(const struct lu_env *env,
4097                            struct mdd_object *sobj,
4098                            struct mdd_object *tobj,
4099                            const struct lu_name *tname,
4100                            const struct lu_fid *tpfid,
4101                            const struct lu_name *lname,
4102                            const struct lu_fid *fid,
4103                            void *unused,
4104                            struct thandle *handle)
4105 {
4106         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4107         struct mdd_object *pobj;
4108         int rc;
4109
4110         ENTRY;
4111
4112         /* ignore tobj */
4113         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4114             !memcmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
4115                 RETURN(0);
4116
4117         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
4118                PFID(fid), encode_fn_luname(lname), PFID(mdd_object_fid(tobj)));
4119
4120         pobj = mdd_object_find(env, mdd, fid);
4121         if (IS_ERR(pobj)) {
4122                 CWARN("%s: cannot find obj "DFID": %ld\n",
4123                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
4124                 RETURN(PTR_ERR(pobj));
4125         }
4126
4127         if (!mdd_object_exists(pobj)) {
4128                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
4129                 mdd_object_put(env, pobj);
4130                 RETURN(-ENOENT);
4131         }
4132
4133         mdd_write_lock(env, pobj, DT_TGT_PARENT);
4134         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
4135         if (!rc)
4136                 rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(tobj),
4137                                              mdd_object_type(sobj),
4138                                              lname->ln_name, handle);
4139         mdd_write_unlock(env, pobj);
4140         mdd_object_put(env, pobj);
4141         if (rc)
4142                 RETURN(rc);
4143
4144         mdd_write_lock(env, tobj, DT_TGT_CHILD);
4145         rc = mdo_ref_add(env, tobj, handle);
4146         mdd_write_unlock(env, tobj);
4147         if (rc)
4148                 RETURN(rc);
4149
4150         mdd_write_lock(env, sobj, DT_SRC_CHILD);
4151         rc = mdo_ref_del(env, sobj, handle);
4152         mdd_write_unlock(env, sobj);
4153
4154         RETURN(rc);
4155 }
4156
4157 static inline int mdd_fld_lookup(const struct lu_env *env,
4158                                  struct mdd_device *mdd,
4159                                  const struct lu_fid *fid,
4160                                  __u32 *mdt_index)
4161 {
4162         struct lu_seq_range *range = &mdd_env_info(env)->mdi_range;
4163         struct seq_server_site *ss;
4164         int rc;
4165
4166         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4167
4168         range->lsr_flags = LU_SEQ_RANGE_MDT;
4169         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4170         if (rc)
4171                 return rc;
4172
4173         *mdt_index = range->lsr_index;
4174
4175         return 0;
4176 }
4177
4178 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
4179                                      struct mdd_object *sobj,
4180                                      struct mdd_object *tobj,
4181                                      const struct lu_name *tname,
4182                                      const struct lu_fid *tpfid,
4183                                      const struct lu_name *lname,
4184                                      const struct lu_fid *fid,
4185                                      void *opaque,
4186                                      struct thandle *handle)
4187 {
4188         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4189         __u32 source_mdt_index = *(__u32 *)opaque;
4190         __u32 link_mdt_index;
4191         int rc;
4192
4193         ENTRY;
4194
4195         /* ignore tobj */
4196         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
4197             !memcmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
4198                 return 0;
4199
4200         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
4201         if (rc)
4202                 RETURN(rc);
4203
4204         RETURN(link_mdt_index == source_mdt_index);
4205 }
4206
4207 static int mdd_iterate_linkea(const struct lu_env *env,
4208                               struct mdd_object *sobj,
4209                               struct mdd_object *tobj,
4210                               const struct lu_name *tname,
4211                               const struct lu_fid *tpfid,
4212                               struct linkea_data *ldata,
4213                               void *opaque,
4214                               struct thandle *handle,
4215                               mdd_linkea_cb cb)
4216 {
4217         struct mdd_thread_info *info = mdd_env_info(env);
4218         char *filename = info->mdi_name;
4219         struct lu_name lname;
4220         struct lu_fid fid;
4221         int rc = 0;
4222
4223         if (!ldata->ld_buf)
4224                 return 0;
4225
4226         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
4227              linkea_next_entry(ldata)) {
4228                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
4229                                     &fid);
4230
4231                 /* Note: lname might miss \0 at the end */
4232                 snprintf(filename, sizeof(info->mdi_name), DNAME,
4233                          lname.ln_namelen, lname.ln_name);
4234                 lname.ln_name = filename;
4235
4236                 CDEBUG(D_INFO, DFID"/"DNAME"\n",
4237                        PFID(&fid), encode_fn_luname(&lname));
4238
4239                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
4240                         handle);
4241         }
4242
4243         return rc;
4244 }
4245
4246 /**
4247  * Prepare linkea, and check whether file needs migrate: if source still has
4248  * link on source MDT, no need to migrate, just update namespace on source and
4249  * target parents.
4250  *
4251  * \retval      0 do migrate
4252  * \retval      1 don't migrate
4253  * \retval      -errno on failure
4254  */
4255 static int mdd_migrate_linkea_prepare(const struct lu_env *env,
4256                                       struct mdd_device *mdd,
4257                                       struct mdd_object *spobj,
4258                                       struct mdd_object *tpobj,
4259                                       struct mdd_object *sobj,
4260                                       const struct lu_name *sname,
4261                                       const struct lu_name *tname,
4262                                       const struct lu_attr *attr,
4263                                       struct linkea_data *ldata)
4264 {
4265         __u32 source_mdt_index;
4266         int rc;
4267
4268         ENTRY;
4269
4270         memset(ldata, 0, sizeof(*ldata));
4271         rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
4272                                 mdd_object_fid(tpobj), tname, 1, 0, ldata);
4273         if (rc)
4274                 RETURN(rc);
4275
4276         /*
4277          * Then it will check if the file should be migrated. If the file has
4278          * mulitple links, we only need migrate the file if all of its entries
4279          * has been migrated to the remote MDT.
4280          */
4281         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
4282                 RETURN(0);
4283
4284         /* If there are still links locally, don't migrate this file */
4285         LASSERT(ldata->ld_leh != NULL);
4286
4287         /* If linkEA is overflow, switch to ns-only migrate */
4288         if (unlikely(ldata->ld_leh->leh_overflow_time))
4289                 RETURN(+EOVERFLOW);
4290
4291         rc = mdd_fld_lookup(env, mdd, mdd_object_fid(sobj), &source_mdt_index);
4292         if (rc)
4293                 RETURN(rc);
4294
4295         rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
4296                                 ldata, &source_mdt_index, NULL,
4297                                 mdd_is_link_on_source_mdt);
4298         RETURN(rc);
4299 }
4300
4301 static int mdd_declare_migrate_update(const struct lu_env *env,
4302                                       struct mdd_object *spobj,
4303                                       struct mdd_object *tpobj,
4304                                       struct mdd_object *obj,
4305                                       const struct lu_name *sname,
4306                                       const struct lu_name *tname,
4307                                       struct lu_attr *attr,
4308                                       struct lu_attr *spattr,
4309                                       struct lu_attr *tpattr,
4310                                       struct linkea_data *ldata,
4311                                       struct md_attr *ma,
4312                                       struct thandle *handle)
4313 {
4314         struct mdd_thread_info *info = mdd_env_info(env);
4315         struct lu_attr *la = &info->mdi_la_for_fix;
4316         int rc;
4317
4318         rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
4319         if (rc)
4320                 return rc;
4321
4322         if (S_ISDIR(attr->la_mode)) {
4323                 rc = mdo_declare_ref_del(env, spobj, handle);
4324                 if (rc)
4325                         return rc;
4326         }
4327
4328         rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
4329                                       attr->la_mode & S_IFMT,
4330                                       tname->ln_name, handle);
4331         if (rc)
4332                 return rc;
4333
4334         rc = mdd_declare_links_add(env, obj, handle, ldata);
4335         if (rc)
4336                 return rc;
4337
4338         if (S_ISDIR(attr->la_mode)) {
4339                 rc = mdo_declare_ref_add(env, tpobj, handle);
4340                 if (rc)
4341                         return rc;
4342         }
4343
4344         la->la_valid = LA_CTIME | LA_MTIME;
4345         rc = mdo_declare_attr_set(env, spobj, la, handle);
4346         if (rc)
4347                 return rc;
4348
4349         if (tpobj != spobj) {
4350                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
4351                 if (rc)
4352                         return rc;
4353         }
4354
4355         return rc;
4356 }
4357
4358 static int mdd_declare_migrate_create(const struct lu_env *env,
4359                                       struct mdd_object *spobj,
4360                                       struct mdd_object *tpobj,
4361                                       struct mdd_object *sobj,
4362                                       struct mdd_object *tobj,
4363                                       const struct lu_name *sname,
4364                                       const struct lu_name *tname,
4365                                       struct lu_attr *spattr,
4366                                       struct lu_attr *tpattr,
4367                                       struct lu_attr *attr,
4368                                       struct lu_buf *sbuf,
4369                                       struct linkea_data *ldata,
4370                                       struct mdd_xattrs *xattrs,
4371                                       struct md_attr *ma,
4372                                       struct md_op_spec *spec,
4373                                       struct dt_allocation_hint *hint,
4374                                       struct thandle *handle)
4375 {
4376         struct mdd_thread_info *info = mdd_env_info(env);
4377         struct md_layout_change *mlc = &info->mdi_mlc;
4378         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
4379         int rc;
4380
4381         ENTRY;
4382
4383         if (S_ISDIR(attr->la_mode)) {
4384                 struct lmv_user_md *lum = spec->u.sp_ea.eadata;
4385
4386                 mlc->mlc_opc = MD_LAYOUT_DETACH;
4387                 rc = mdo_declare_layout_change(env, sobj, mlc, handle);
4388                 if (rc)
4389                         return rc;
4390
4391                 lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
4392         } else if (S_ISLNK(attr->la_mode)) {
4393                 spec->u.sp_symname.ln_name = sbuf->lb_buf;
4394                 /* don't count NUL */
4395                 spec->u.sp_symname.ln_namelen = sbuf->lb_len - 1;
4396         } else if (S_ISREG(attr->la_mode)) {
4397                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
4398                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
4399         }
4400
4401         mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
4402
4403         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
4404                                 tname, attr, handle, spec, ldata, NULL, NULL,
4405                                 NULL, hint);
4406         if (rc)
4407                 return rc;
4408
4409         /*
4410          * tobj mode will be used in mdo_declare_layout_change(), but it's not
4411          * createb yet, copy from sobj.
4412          */
4413         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
4414         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
4415                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
4416
4417         if (S_ISDIR(attr->la_mode)) {
4418                 if (!lmv) {
4419                         /* if sobj is not striped, fake a 1-stripe LMV */
4420                         LASSERT(sizeof(info->mdi_key) >
4421                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
4422                         lmv = (typeof(lmv))info->mdi_key;
4423                         memset(lmv, 0, sizeof(*lmv));
4424                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
4425                         lmv->lmv_stripe_count = cpu_to_le32(1);
4426                         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
4427                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
4428                                       mdd_object_fid(sobj));
4429                         mlc->mlc_buf.lb_buf = lmv;
4430                         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
4431                 } else {
4432                         mlc->mlc_buf = *sbuf;
4433                 }
4434                 mlc->mlc_opc = MD_LAYOUT_ATTACH;
4435                 rc = mdo_declare_layout_change(env, tobj, mlc, handle);
4436                 if (rc)
4437                         return rc;
4438         }
4439
4440         rc = mdd_foreach_xattr(env, tobj, xattrs, handle,
4441                                mdo_declare_xattr_set);
4442         if (rc)
4443                 return rc;
4444
4445         if (S_ISREG(attr->la_mode)) {
4446                 struct lu_buf fid_buf;
4447
4448                 handle->th_complex = 1;
4449
4450                 /* target may be remote, update PFID via sobj. */
4451                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
4452                 fid_buf.lb_len = sizeof(struct lu_fid);
4453                 rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
4454                                            0, handle);
4455                 if (rc)
4456                         return rc;
4457
4458                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4459                 if (rc)
4460                         return rc;
4461         }
4462
4463         if (!S_ISDIR(attr->la_mode)) {
4464                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
4465                                         mdd_object_fid(tpobj), ldata, NULL,
4466                                         handle, mdd_declare_update_link);
4467                 if (rc)
4468                         return rc;
4469         }
4470
4471         if (!S_ISDIR(attr->la_mode) || lmv) {
4472                 rc = mdo_declare_ref_del(env, sobj, handle);
4473                 if (rc)
4474                         return rc;
4475
4476                 if (S_ISDIR(attr->la_mode)) {
4477                         rc = mdo_declare_ref_del(env, sobj, handle);
4478                         if (rc)
4479                                 return rc;
4480                 }
4481
4482                 rc = mdo_declare_destroy(env, sobj, handle);
4483                 if (rc)
4484                         return rc;
4485         }
4486
4487         rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
4488                                         attr, spattr, tpattr, ldata, ma,
4489                                         handle);
4490         return rc;
4491 }
4492
4493 /**
4494  * migrate dirent from \a spobj to \a tpobj.
4495  **/
4496 static int mdd_migrate_update(const struct lu_env *env,
4497                               struct mdd_object *spobj,
4498                               struct mdd_object *tpobj,
4499                               struct mdd_object *obj,
4500                               const struct lu_name *sname,
4501                               const struct lu_name *tname,
4502                               struct lu_attr *attr,
4503                               struct lu_attr *spattr,
4504                               struct lu_attr *tpattr,
4505                               struct linkea_data *ldata,
4506                               struct md_attr *ma,
4507                               struct thandle *handle)
4508 {
4509         struct mdd_thread_info *info = mdd_env_info(env);
4510         struct lu_attr *la = &info->mdi_la_for_fix;
4511         int rc;
4512
4513         ENTRY;
4514
4515         CDEBUG(D_INFO, "update "DFID" from "DFID"/"DNAME" to "DFID"/"DNAME"\n",
4516                PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
4517                encode_fn_luname(sname), PFID(mdd_object_fid(tpobj)),
4518                encode_fn_luname(tname));
4519
4520         rc = __mdd_index_delete(env, spobj, sname->ln_name,
4521                                 S_ISDIR(attr->la_mode), handle);
4522         if (rc)
4523                 RETURN(rc);
4524
4525         rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
4526                                 attr->la_mode & S_IFMT,
4527                                 tname->ln_name, handle);
4528         if (rc)
4529                 RETURN(rc);
4530
4531         rc = mdd_links_write(env, obj, ldata, handle);
4532         if (rc)
4533                 RETURN(rc);
4534
4535         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
4536         la->la_valid = LA_CTIME | LA_MTIME;
4537         mdd_write_lock(env, spobj, DT_SRC_PARENT);
4538         rc = mdd_update_time(env, spobj, spattr, la, handle);
4539         mdd_write_unlock(env, spobj);
4540         if (rc)
4541                 RETURN(rc);
4542
4543         if (tpobj != spobj) {
4544                 la->la_valid = LA_CTIME | LA_MTIME;
4545                 mdd_write_lock(env, tpobj, DT_TGT_PARENT);
4546                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
4547                 mdd_write_unlock(env, tpobj);
4548                 if (rc)
4549                         RETURN(rc);
4550         }
4551
4552         RETURN(rc);
4553 }
4554
4555 /**
4556  * Migrate file/dir to target MDT.
4557  *
4558  * Create target according to \a spec, and then migrate xattrs, if it's
4559  * directory, migrate source stripes to target.
4560  *
4561  * \param[in] env       execution environment
4562  * \param[in] spobj     source parent object
4563  * \param[in] tpobj     target parent object
4564  * \param[in] sobj      source object
4565  * \param[in] tobj      target object
4566  * \param[in] lname     file name
4567  * \param[in] spattr    source parent attributes
4568  * \param[in] tpattr    target parent attributes
4569  * \param[in] attr      source attributes
4570  * \param[in] sbuf      source LMV buf
4571  * \param[in] spec      migrate create spec
4572  * \param[in] hint      target creation hint
4573  * \param[in] handle    tranasction handle
4574  *
4575  * \retval      0 on success
4576  * \retval      -errno on failure
4577  **/
4578 static int mdd_migrate_create(const struct lu_env *env,
4579                               struct mdd_object *spobj,
4580                               struct mdd_object *tpobj,
4581                               struct mdd_object *sobj,
4582                               struct mdd_object *tobj,
4583                               const struct lu_name *sname,
4584                               const struct lu_name *tname,
4585                               struct lu_attr *spattr,
4586                               struct lu_attr *tpattr,
4587                               struct lu_attr *attr,
4588                               const struct lu_buf *sbuf,
4589                               struct linkea_data *ldata,
4590                               struct mdd_xattrs *xattrs,
4591                               struct md_attr *ma,
4592                               struct md_op_spec *spec,
4593                               struct dt_allocation_hint *hint,
4594                               struct thandle *handle)
4595 {
4596         int rc;
4597
4598         ENTRY;
4599
4600         /*
4601          * migrate sobj stripes to tobj if it's directory:
4602          * 1. detach stripes from sobj.
4603          * 2. attach stripes to tobj, see mdd_declare_migrate_mdt().
4604          * 3. create stripes for tobj, see lod_xattr_set_lmv().
4605          */
4606         if (S_ISDIR(attr->la_mode)) {
4607                 struct mdd_thread_info *info = mdd_env_info(env);
4608                 struct md_layout_change *mlc = &info->mdi_mlc;
4609
4610                 mlc->mlc_opc = MD_LAYOUT_DETACH;
4611
4612                 mdd_write_lock(env, sobj, DT_SRC_PARENT);
4613                 rc = mdo_layout_change(env, sobj, mlc, handle);
4614                 mdd_write_unlock(env, sobj);
4615                 if (rc)
4616                         RETURN(rc);
4617         }
4618
4619         /* don't set nlink from sobj */
4620         attr->la_valid &= ~LA_NLINK;
4621
4622         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
4623                                hint, handle, false);
4624         if (rc)
4625                 RETURN(rc);
4626
4627         mdd_write_lock(env, tobj, DT_TGT_CHILD);
4628         rc = mdd_foreach_xattr(env, tobj, xattrs, handle, mdo_xattr_set);
4629         mdd_write_unlock(env, tobj);
4630         if (rc)
4631                 RETURN(rc);
4632
4633         /* for regular file, update OST objects XATTR_NAME_FID */
4634         if (S_ISREG(attr->la_mode)) {
4635                 struct lu_buf fid_buf;
4636
4637                 /* target may be remote, update PFID via sobj. */
4638                 fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
4639                 fid_buf.lb_len = sizeof(struct lu_fid);
4640                 rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
4641                                    handle);
4642                 if (rc)
4643                         RETURN(rc);
4644
4645                 /* delete LOV to avoid deleting OST objs when destroying sobj */
4646                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4647                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4648                 mdd_write_unlock(env, sobj);
4649                 /* O_DELAY_CREATE file may not have LOV, ignore -ENODATA */
4650                 if (rc && rc != -ENODATA)
4651                         RETURN(rc);
4652                 rc = 0;
4653         }
4654
4655         /* update links FID */
4656         if (!S_ISDIR(attr->la_mode)) {
4657                 rc = mdd_iterate_linkea(env, sobj, tobj, tname,
4658                                         mdd_object_fid(tpobj), ldata,
4659                                         NULL, handle, mdd_update_link);
4660                 if (rc)
4661                         RETURN(rc);
4662         }
4663
4664         /* don't destroy sobj if it's plain directory */
4665         if (!S_ISDIR(attr->la_mode) || sbuf->lb_buf) {
4666                 mdd_write_lock(env, sobj, DT_SRC_CHILD);
4667                 rc = mdo_ref_del(env, sobj, handle);
4668                 if (!rc) {
4669                         if (S_ISDIR(attr->la_mode))
4670                                 rc = mdo_ref_del(env, sobj, handle);
4671                         if (!rc)
4672                                 rc = mdo_destroy(env, sobj, handle);
4673                 }
4674                 mdd_write_unlock(env, sobj);
4675                 if (rc)
4676                         RETURN(rc);
4677         }
4678
4679         rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
4680                                 spattr, tpattr, ldata, ma, handle);
4681
4682         RETURN(rc);
4683 }
4684
4685 /* NB: if user issued different migrate command, we can't adjust it silently
4686  * here, because this command will decide target MDT in subdir migration in
4687  * LMV.
4688  */
4689 static int mdd_migrate_cmd_check(const struct lu_env *env, struct mdd_device *mdd,
4690                                  struct mdd_object *sobj,
4691                                  const struct lmv_mds_md_v1 *lmv,
4692                                  const struct lmv_user_md_v1 *lum,
4693                                  size_t lum_len, const struct lu_name *lname)
4694 {
4695         struct mdd_thread_info *info = mdd_env_info(env);
4696         __u32 lum_stripe_count = lum->lum_stripe_count;
4697         __u32 lum_hash_type = lum->lum_hash_type &
4698                               cpu_to_le32(LMV_HASH_TYPE_MASK);
4699         struct md_layout_change *mlc = &info->mdi_mlc;
4700         __u32 lmv_hash_type;
4701         int rc = 0;
4702         ENTRY;
4703
4704         if (lmv && !lmv_is_sane(lmv))
4705                 RETURN(-EBADF);
4706
4707         /* If stripe_count unspecified, set to 1 */
4708         if (!lum_stripe_count)
4709                 lum_stripe_count = cpu_to_le32(1);
4710
4711         /* Easy check for plain and single-striped dirs
4712          * if the object is on the target MDT already
4713          */
4714         if (!lmv || lmv->lmv_stripe_count == cpu_to_le32(1)) {
4715                 struct seq_server_site  *ss = mdd_seq_site(mdd);
4716                 struct lu_seq_range range = { 0 };
4717
4718                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
4719                 rc = fld_server_lookup(env, ss->ss_server_fld,
4720                                 fid_seq(mdd_object_fid(sobj)), &range);
4721                 if (rc)
4722                         RETURN(rc);
4723
4724                 if (lum_stripe_count == cpu_to_le32(1) &&
4725                     le32_to_cpu(lum->lum_stripe_offset) == range.lsr_index)
4726                         RETURN(-EALREADY);
4727                 RETURN(0);
4728         }
4729
4730         lmv_hash_type = lmv->lmv_hash_type & cpu_to_le32(LMV_HASH_TYPE_MASK);
4731
4732         if (lmv_is_migrating(lmv)) {
4733                 if (lum_stripe_count != lmv->lmv_migrate_offset ||
4734                     lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
4735                     (lum_hash_type && lum_hash_type != lmv_hash_type)) {
4736                         rc = -EPERM;
4737                 }
4738         } else {
4739                 /* check at top level if the target layout already applied */
4740                 if ((lum_hash_type && lum_hash_type != lmv_hash_type) ||
4741                     lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
4742                     lum_stripe_count != lmv->lmv_stripe_count)
4743                         RETURN(0);
4744         }
4745
4746         if (rc == 0) {
4747                 mlc->mlc_buf.lb_buf = (void*)lum;
4748                 mlc->mlc_buf.lb_len = lum_len;
4749                 rc = mo_layout_check(env, &sobj->mod_obj, mlc);
4750         }
4751
4752         if (rc == -EPERM) {
4753                 CERROR("%s: '"DNAME"' migration was interrupted, run "
4754                        "'lfs migrate -m %d -c %d -H %s "DNAME"' to finish migration: rc = %d\n",
4755                        mdd2obd_dev(mdd)->obd_name, encode_fn_luname(lname),
4756                        le32_to_cpu(lmv->lmv_master_mdt_index),
4757                        le32_to_cpu(lmv->lmv_migrate_offset),
4758                        mdt_hash_name[le32_to_cpu(lmv_hash_type)],
4759                        encode_fn_luname(lname), rc);
4760         }
4761
4762         RETURN(rc);
4763 }
4764
4765 /**
4766  * Internal function to migrate directory or file between MDTs.
4767  *
4768  * migrate source to target in following steps:
4769  *   1. create target, append source stripes after target's if it's directory,
4770  *      migrate xattrs and update fid of source links.
4771  *   2. update namespace: migrate dirent from source parent to target parent,
4772  *      update file linkea, and destroy source if it's not needed any more.
4773  *
4774  * \param[in] env       execution environment
4775  * \param[in] spobj     source parent object
4776  * \param[in] tpobj     target parent object
4777  * \param[in] sobj      source object
4778  * \param[in] tobj      target object
4779  * \param[in] sname     source file name
4780  * \param[in] tname     target file name
4781  * \param[in] spec      target creation spec
4782  * \param[in] ma        used to update \a pobj mtime and ctime
4783  *
4784  * \retval              0 on success
4785  * \retval              -errno on failure
4786  */
4787 static int mdd_migrate_object(const struct lu_env *env,
4788                               struct mdd_object *spobj,
4789                               struct mdd_object *tpobj,
4790                               struct mdd_object *sobj,
4791                               struct mdd_object *tobj,
4792                               const struct lu_name *sname,
4793                               const struct lu_name *tname,
4794                               struct md_op_spec *spec,
4795                               struct md_attr *ma)
4796 {
4797         struct mdd_thread_info *info = mdd_env_info(env);
4798         struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
4799         struct lu_attr *spattr = &info->mdi_pattr;
4800         struct lu_attr *tpattr = &info->mdi_tpattr;
4801         struct lu_attr *attr = &info->mdi_cattr;
4802         struct linkea_data *ldata = &info->mdi_link_data;
4803         struct dt_allocation_hint *hint = &info->mdi_hint;
4804         struct lu_buf sbuf = { NULL };
4805         struct mdd_xattrs xattrs;
4806         struct lmv_mds_md_v1 *lmv;
4807         struct thandle *handle;
4808         int retried = 0;
4809         int rc;
4810
4811         ENTRY;
4812
4813         CDEBUG(D_INFO, "migrate "DNAME" from "DFID"/"DFID" to "DFID"/"DFID"\n",
4814                encode_fn_luname(sname), PFID(mdd_object_fid(spobj)),
4815                PFID(mdd_object_fid(sobj)), PFID(mdd_object_fid(tpobj)),
4816                PFID(mdd_object_fid(tobj)));
4817
4818 retry:
4819         rc = mdd_la_get(env, sobj, attr);
4820         if (rc)
4821                 RETURN(rc);
4822
4823         rc = mdd_la_get(env, spobj, spattr);
4824         if (rc)
4825                 RETURN(rc);
4826
4827         rc = mdd_la_get(env, tpobj, tpattr);
4828         if (rc)
4829                 RETURN(rc);
4830
4831         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4832                                       spattr, tpattr, attr,
4833                                       spec->sp_migrate_nsonly);
4834         if (rc == -EBUSY && !spec->sp_migrate_nsonly) {
4835                 spec->sp_migrate_nsonly = 1;
4836                 CWARN("%s: "DFID"/%s is open, migrate only dentry\n",
4837                       mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(spobj)),
4838                       sname->ln_name);
4839                 rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj,
4840                                               tobj, spattr, tpattr, attr,
4841                                               spec->sp_migrate_nsonly);
4842         }
4843         if (rc)
4844                 RETURN(rc);
4845
4846         mdd_xattrs_init(&xattrs);
4847
4848         if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
4849                 struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
4850                 size_t lum_len = spec->u.sp_ea.eadatalen;
4851
4852                 LASSERT(lum);
4853
4854                 /* if user use default value '0' for stripe_count, we need to
4855                  * adjust it to '1' to create a 1-stripe directory.
4856                  */
4857                 if (lum->lum_stripe_count == 0)
4858                         lum->lum_stripe_count = cpu_to_le32(1);
4859
4860                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4861                 if (rc && rc != -ENODATA)
4862                         GOTO(out, rc);
4863
4864                 lmv = sbuf.lb_buf;
4865                 rc = mdd_migrate_cmd_check(env, mdd, sobj, lmv, lum,
4866                                            lum_len, sname);
4867                 if (rc)
4868                         GOTO(out, rc);
4869         } else if (!S_ISDIR(attr->la_mode)) {
4870                 /* update namespace only if @sobj is on MDT where @tpobj is. */
4871                 if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
4872                         spec->sp_migrate_nsonly = true;
4873
4874                 if (S_ISLNK(attr->la_mode)) {
4875                         lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4876                         if (!sbuf.lb_buf)
4877                                 GOTO(out, rc = -ENOMEM);
4878
4879                         rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4880                         if (rc <= 0) {
4881                                 rc = rc ?: -EFAULT;
4882                                 CERROR("%s: "DFID" readlink failed: rc = %d\n",
4883                                        mdd2obd_dev(mdd)->obd_name,
4884                                        PFID(mdd_object_fid(sobj)), rc);
4885                                 GOTO(out, rc);
4886                         }
4887                 }
4888         }
4889
4890         /* linkea needs update upon FID or parent stripe change */
4891         rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
4892                                         tname, attr, ldata);
4893         if (rc > 0)
4894                 /* update namespace only if @sobj has link on its MDT. */
4895                 spec->sp_migrate_nsonly = true;
4896         else if (rc < 0)
4897                 GOTO(out, rc);
4898
4899         /* migrate inode will migrate xattrs, prepare xattrs early to avoid
4900          * RPCs inside transaction.
4901          */
4902         if (!spec->sp_migrate_nsonly) {
4903                 rc = mdd_xattrs_migrate_prep(env, &xattrs, sobj, true, true);
4904                 if (rc)
4905                         GOTO(out, rc);
4906         }
4907
4908         handle = mdd_trans_create(env, mdd);
4909         if (IS_ERR(handle))
4910                 GOTO(out, rc = PTR_ERR(handle));
4911
4912         if (spec->sp_migrate_nsonly)
4913                 rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
4914                                                 tname, attr, spattr, tpattr,
4915                                                 ldata, ma, handle);
4916         else
4917                 rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
4918                                                 sname, tname, spattr, tpattr,
4919                                                 attr, &sbuf, ldata, &xattrs, ma,
4920                                                 spec, hint, handle);
4921         if (rc)
4922                 GOTO(stop, rc);
4923
4924         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
4925                                          handle);
4926         if (rc)
4927                 GOTO(stop, rc);
4928
4929         rc = mdd_trans_start(env, mdd, handle);
4930         if (rc)
4931                 GOTO(stop, rc);
4932
4933         if (spec->sp_migrate_nsonly)
4934                 rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
4935                                         attr, spattr, tpattr, ldata, ma,
4936                                         handle);
4937         else
4938                 rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
4939                                         tname, spattr, tpattr, attr, &sbuf,
4940                                         ldata, &xattrs, ma, spec, hint, handle);
4941         if (rc)
4942                 GOTO(stop, rc);
4943
4944         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
4945                                     spec->sp_migrate_nsonly ? sobj : tobj,
4946                                     spobj, spattr, mdd_object_fid(sobj),
4947                                     tpobj, tpattr, tname, sname,
4948                                     handle);
4949
4950 stop:
4951         rc = mdd_trans_stop(env, mdd, rc, handle);
4952 out:
4953         mdd_xattrs_fini(&xattrs);
4954         lu_buf_free(&sbuf);
4955
4956         /**
4957          * -EAGAIN means transaction execution phase detect the layout
4958          * has been changed by others.
4959          */
4960         if (rc == -EAGAIN && retried++ < MAX_TRANS_RETRIED)
4961                 GOTO(retry, retried);
4962
4963         RETURN(rc);
4964 }
4965
4966 /**
4967  * Migrate directory or file between MDTs.
4968  *
4969  * \param[in] env       execution environment
4970  * \param[in] md_spobj  source parent object
4971  * \param[in] md_tpobj  target parent object
4972  * \param[in] md_sobj   source object
4973  * \param[in] lname     file name
4974  * \param[in] md_tobj   target object
4975  * \param[in] spec      target creation spec
4976  * \param[in] ma        used to update \a pobj mtime and ctime
4977  *
4978  * \retval              0 on success
4979  * \retval              -errno on failure
4980  */
4981 static int mdd_migrate(const struct lu_env *env, struct md_object *md_spobj,
4982                        struct md_object *md_tpobj, struct md_object *md_sobj,
4983                        struct md_object *md_tobj, const struct lu_name *lname,
4984                        struct md_op_spec *spec, struct md_attr *ma)
4985 {
4986         return mdd_migrate_object(env, md2mdd_obj(md_spobj),
4987                                   md2mdd_obj(md_tpobj), md2mdd_obj(md_sobj),
4988                                   md2mdd_obj(md_tobj), lname, lname, spec, ma);
4989 }
4990
4991 static int mdd_declare_1sd_collapse(const struct lu_env *env,
4992                                     struct mdd_object *pobj,
4993                                     struct mdd_object *obj,
4994                                     struct mdd_object *stripe,
4995                                     struct lu_attr *attr,
4996                                     struct mdd_xattrs *xattrs,
4997                                     struct md_layout_change *mlc,
4998                                     struct lu_name *lname,
4999                                     struct thandle *handle)
5000 {
5001         int rc;
5002
5003         mlc->mlc_opc = MD_LAYOUT_DETACH;
5004         rc = mdo_declare_layout_change(env, obj, mlc, handle);
5005         if (rc)
5006                 return rc;
5007
5008         rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
5009                                       S_IFDIR, dotdot, handle);
5010         if (rc)
5011                 return rc;
5012
5013         rc = mdd_foreach_xattr(env, stripe, xattrs, handle,
5014                                mdo_declare_xattr_set);
5015         if (rc)
5016                 return rc;
5017
5018         rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
5019         if (rc)
5020                 return rc;
5021
5022         rc = mdo_declare_attr_set(env, stripe, attr, handle);
5023         if (rc)
5024                 return rc;
5025
5026         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
5027         if (rc)
5028                 return rc;
5029
5030         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(stripe),
5031                                       attr->la_mode, lname->ln_name, handle);
5032         if (rc)
5033                 return rc;
5034
5035         rc = mdo_declare_ref_del(env, obj, handle);
5036         if (rc)
5037                 return rc;
5038
5039         rc = mdo_declare_ref_del(env, obj, handle);
5040         if (rc)
5041                 return rc;
5042
5043         rc = mdo_declare_destroy(env, obj, handle);
5044         if (rc)
5045                 return rc;
5046
5047         return rc;
5048 }
5049
5050 /* transform one-stripe directory to a plain directory */
5051 static int mdd_1sd_collapse(const struct lu_env *env,
5052                             struct mdd_object *pobj,
5053                             struct mdd_object *obj,
5054                             struct mdd_object *stripe,
5055                             struct lu_attr *attr,
5056                             struct mdd_xattrs *xattrs,
5057                             struct md_layout_change *mlc,
5058                             struct lu_name *lname,
5059                             struct thandle *handle)
5060 {
5061         int rc;
5062
5063         ENTRY;
5064
5065         /* replace 1-stripe directory with its stripe */
5066         mlc->mlc_opc = MD_LAYOUT_DETACH;
5067
5068         mdd_write_lock(env, obj, DT_SRC_PARENT);
5069         rc = mdo_layout_change(env, obj, mlc, handle);
5070         mdd_write_unlock(env, obj);
5071         if (rc)
5072                 RETURN(rc);
5073
5074         mdd_write_lock(env, pobj, DT_SRC_PARENT);
5075         mdd_write_lock(env, obj, DT_SRC_CHILD);
5076
5077         /* insert dotdot to stripe which points to parent */
5078         rc = __mdd_index_insert_only(env, stripe, mdd_object_fid(pobj),
5079                                      S_IFDIR, dotdot, handle);
5080         if (rc)
5081                 GOTO(out, rc);
5082
5083         rc = mdd_foreach_xattr(env, stripe, xattrs, handle, mdo_xattr_set);
5084         if (rc)
5085                 GOTO(out, rc);
5086
5087         /* delete LMV */
5088         rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
5089         if (rc)
5090                 GOTO(out, rc);
5091
5092         /* don't set nlink from parent */
5093         attr->la_valid &= ~LA_NLINK;
5094
5095         rc = mdo_attr_set(env, stripe, attr, handle);
5096         if (rc)
5097                 GOTO(out, rc);
5098
5099         /* delete dir name from parent */
5100         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
5101         if (rc)
5102                 GOTO(out, rc);
5103
5104         /* insert stripe to parent with dir name */
5105         rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(stripe),
5106                                      attr->la_mode, lname->ln_name, handle);
5107         if (rc)
5108                 GOTO(out, rc);
5109
5110         /* destroy dir obj */
5111         rc = mdo_ref_del(env, obj, handle);
5112         if (rc)
5113                 GOTO(out, rc);
5114
5115         rc = mdo_ref_del(env, obj, handle);
5116         if (rc)
5117                 GOTO(out, rc);
5118
5119         rc = mdo_destroy(env, obj, handle);
5120         if (rc)
5121                 GOTO(out, rc);
5122
5123         EXIT;
5124 out:
5125         mdd_write_unlock(env, obj);
5126         mdd_write_unlock(env, pobj);
5127
5128         return rc;
5129 }
5130
5131 /*
5132  * shrink directory stripes after migration/merge
5133  */
5134 int mdd_dir_layout_shrink(const struct lu_env *env,
5135                           struct md_object *md_obj,
5136                           struct md_layout_change *mlc)
5137 {
5138         struct mdd_device *mdd = mdo2mdd(md_obj);
5139         struct mdd_thread_info *info = mdd_env_info(env);
5140         struct mdd_object *obj = md2mdd_obj(md_obj);
5141         struct mdd_object *pobj = NULL;
5142         struct mdd_object *stripe = NULL;
5143         struct lu_attr *attr = &info->mdi_pattr;
5144         struct lu_fid *fid = &info->mdi_fid2;
5145         struct lu_name lname = { NULL };
5146         struct lu_buf lmv_buf = { NULL };
5147         struct mdd_xattrs xattrs;
5148         struct lmv_mds_md_v1 *lmv;
5149         struct lmv_user_md *lmu;
5150         struct thandle *handle;
5151         int rc;
5152
5153         ENTRY;
5154
5155         rc = mdd_la_get(env, obj, attr);
5156         if (rc)
5157                 RETURN(rc);
5158
5159         if (!S_ISDIR(attr->la_mode))
5160                 RETURN(-ENOTDIR);
5161
5162         rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
5163         if (rc < 0)
5164                 RETURN(rc);
5165
5166         lmv = lmv_buf.lb_buf;
5167         if (!lmv_is_sane(lmv))
5168                 GOTO(out_lmv, rc = -EBADF);
5169
5170         lmu = mlc->mlc_buf.lb_buf;
5171
5172         /* adjust the default value '0' to '1' */
5173         if (lmu->lum_stripe_count == 0)
5174                 lmu->lum_stripe_count = cpu_to_le32(1);
5175
5176         /* these were checked in MDT */
5177         LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
5178                 le32_to_cpu(lmv->lmv_stripe_count));
5179         LASSERT(!lmv_is_splitting(lmv));
5180         LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
5181
5182         mdd_xattrs_init(&xattrs);
5183
5184         /* if dir stripe count will be shrunk to 1, it needs to be transformed
5185          * to a plain dir, which will cause FID change and namespace update.
5186          */
5187         if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
5188                 struct linkea_data *ldata = &info->mdi_link_data;
5189                 char *filename = info->mdi_name;
5190
5191                 rc = mdd_links_read(env, obj, ldata);
5192                 if (rc)
5193                         GOTO(out, rc);
5194
5195                 if (ldata->ld_leh->leh_reccount > 1)
5196                         GOTO(out, rc = -EINVAL);
5197
5198                 linkea_first_entry(ldata);
5199                 if (!ldata->ld_lee)
5200                         GOTO(out, rc = -ENODATA);
5201
5202                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
5203                                     fid);
5204
5205                 /* Note: lname might miss \0 at the end */
5206                 snprintf(filename, sizeof(info->mdi_name), DNAME,
5207                          lname.ln_namelen, lname.ln_name);
5208                 lname.ln_name = filename;
5209
5210                 pobj = mdd_object_find(env, mdd, fid);
5211                 if (IS_ERR(pobj)) {
5212                         rc = PTR_ERR(pobj);
5213                         pobj = NULL;
5214                         GOTO(out, rc);
5215                 }
5216
5217                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
5218
5219                 stripe = mdd_object_find(env, mdd, fid);
5220                 if (IS_ERR(stripe)) {
5221                         mdd_object_put(env, pobj);
5222                         pobj = NULL;
5223                         GOTO(out, rc = PTR_ERR(stripe));
5224                 }
5225
5226                 if (!lmv_is_fixed(lmv))
5227                         rc = mdd_xattrs_migrate_prep(env, &xattrs, obj, false,
5228                                                      false);
5229         }
5230
5231         handle = mdd_trans_create(env, mdd);
5232         if (IS_ERR(handle))
5233                 GOTO(out, rc = PTR_ERR(handle));
5234
5235         mlc->mlc_opc = MD_LAYOUT_SHRINK;
5236         rc = mdo_declare_layout_change(env, obj, mlc, handle);
5237         if (rc)
5238                 GOTO(stop_trans, rc);
5239
5240         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
5241                 rc = mdd_declare_1sd_collapse(env, pobj, obj, stripe, attr,
5242                                               &xattrs, mlc, &lname, handle);
5243                 if (rc)
5244                         GOTO(stop_trans, rc);
5245         }
5246
5247         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
5248                                          handle);
5249         if (rc)
5250                 GOTO(stop_trans, rc);
5251
5252         rc = mdd_trans_start(env, mdd, handle);
5253         if (rc)
5254                 GOTO(stop_trans, rc);
5255
5256         mdd_write_lock(env, obj, DT_SRC_PARENT);
5257         mlc->mlc_opc = MD_LAYOUT_SHRINK;
5258         rc = mdo_layout_change(env, obj, mlc, handle);
5259         mdd_write_unlock(env, obj);
5260         if (rc)
5261                 GOTO(stop_trans, rc);
5262
5263         if (le32_to_cpu(lmu->lum_stripe_count) == 1 && !lmv_is_fixed(lmv)) {
5264                 rc = mdd_1sd_collapse(env, pobj, obj, stripe, attr, &xattrs,
5265                                       mlc, &lname, handle);
5266                 if (rc)
5267                         GOTO(stop_trans, rc);
5268         }
5269
5270         rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
5271                                             XATTR_NAME_LMV, handle);
5272         GOTO(stop_trans, rc);
5273
5274 stop_trans:
5275         rc = mdd_trans_stop(env, mdd, rc, handle);
5276 out:
5277         mdd_xattrs_fini(&xattrs);
5278         if (pobj) {
5279                 mdd_object_put(env, stripe);
5280                 mdd_object_put(env, pobj);
5281         }
5282 out_lmv:
5283         lu_buf_free(&lmv_buf);
5284         return rc;
5285 }
5286
5287 static int mdd_dir_declare_split_plain(const struct lu_env *env,
5288                                         struct mdd_device *mdd,
5289                                         struct mdd_object *pobj,
5290                                         struct mdd_object *obj,
5291                                         struct mdd_object *tobj,
5292                                         struct mdd_xattrs *xattrs,
5293                                         struct md_layout_change *mlc,
5294                                         struct dt_allocation_hint *hint,
5295                                         struct thandle *handle)
5296 {
5297         struct mdd_thread_info *info = mdd_env_info(env);
5298         const struct lu_name *lname = mlc->mlc_name;
5299         struct lu_attr *la = &info->mdi_la_for_fix;
5300         struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
5301         struct linkea_data *ldata = &info->mdi_link_data;
5302         struct lmv_mds_md_v1 *lmv;
5303         __u32 count;
5304         int rc;
5305
5306         mlc->mlc_opc = MD_LAYOUT_DETACH;
5307         rc = mdo_declare_layout_change(env, obj, mlc, handle);
5308         if (rc)
5309                 return rc;
5310
5311         memset(ldata, 0, sizeof(*ldata));
5312         rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
5313                                 lname, 1, 0, ldata);
5314         if (rc)
5315                 return rc;
5316
5317         count = lum->lum_stripe_count;
5318         lum->lum_stripe_count = 0;
5319         /* don't set default LMV since it will become a striped dir  */
5320         lum->lum_max_inherit = LMV_INHERIT_NONE;
5321         mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
5322                              hint);
5323         rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
5324                                 lname, mlc->mlc_attr, handle, mlc->mlc_spec,
5325                                 ldata, NULL, NULL, NULL, hint);
5326         if (rc)
5327                 return rc;
5328
5329         /* tobj mode will be used in lod_declare_xattr_set(), but it's not
5330          * created yet.
5331          */
5332         tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
5333
5334         lmv = (typeof(lmv))info->mdi_key;
5335         memset(lmv, 0, sizeof(*lmv));
5336         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
5337         lmv->lmv_stripe_count = cpu_to_le32(1);
5338         lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
5339         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
5340
5341         mlc->mlc_opc = MD_LAYOUT_ATTACH;
5342         mlc->mlc_buf.lb_buf = lmv;
5343         mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
5344         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
5345         if (rc)
5346                 return rc;
5347
5348         rc = mdd_foreach_xattr(env, tobj, xattrs, handle,
5349                                mdo_declare_xattr_set);
5350         if (rc)
5351                 return rc;
5352
5353         lum->lum_stripe_count = count;
5354         mlc->mlc_opc = MD_LAYOUT_SPLIT;
5355         rc = mdo_declare_layout_change(env, tobj, mlc, handle);
5356         if (rc)
5357                 return rc;
5358
5359         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
5360         if (rc)
5361                 return rc;
5362
5363         rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
5364                                       S_IFDIR, lname->ln_name, handle);
5365         if (rc)
5366                 return rc;
5367
5368         la->la_valid = LA_CTIME | LA_MTIME;
5369         rc = mdo_declare_attr_set(env, obj, la, handle);
5370         if (rc)
5371                 return rc;
5372
5373         rc = mdo_declare_attr_set(env, pobj, la, handle);
5374         if (rc)
5375                 return rc;
5376
5377         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
5378                                          handle);
5379         return rc;
5380 }
5381
5382 /**
5383  * plain directory split:
5384  * 1. create \a tobj as plain directory.
5385  * 2. append \a obj as first stripe of \a tobj.
5386  * 3. migrate xattrs from \a obj to \a tobj.
5387  * 4. split \a tobj to specific stripe count.
5388  */
5389 static int mdd_dir_split_plain(const struct lu_env *env,
5390                                 struct mdd_device *mdd,
5391                                 struct mdd_object *pobj,
5392                                 struct mdd_object *obj,
5393                                 struct mdd_object *tobj,
5394                                 struct mdd_xattrs *xattrs,
5395                                 struct md_layout_change *mlc,
5396                                 struct dt_allocation_hint *hint,
5397                                 struct thandle *handle)
5398 {
5399         struct mdd_thread_info *info = mdd_env_info(env);
5400         struct lu_attr *pattr = &info->mdi_pattr;
5401         struct lu_attr *la = &info->mdi_la_for_fix;
5402         const struct lu_name *lname = mlc->mlc_name;
5403         struct linkea_data *ldata = &info->mdi_link_data;
5404         int rc;
5405
5406         ENTRY;
5407
5408         /* copy linkea out and set on target later */
5409         rc = mdd_links_read(env, obj, ldata);
5410         if (rc)
5411                 RETURN(rc);
5412
5413         mlc->mlc_opc = MD_LAYOUT_DETACH;
5414         rc = mdo_layout_change(env, obj, mlc, handle);
5415         if (rc)
5416                 RETURN(rc);
5417
5418         /* don't set nlink from obj */
5419         mlc->mlc_attr->la_valid &= ~LA_NLINK;
5420
5421         rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
5422                                NULL, NULL, NULL, hint, handle, false);
5423         if (rc)
5424                 RETURN(rc);
5425
5426         rc = mdd_foreach_xattr(env, tobj, xattrs, handle, mdo_xattr_set);
5427         if (rc)
5428                 RETURN(rc);
5429
5430         rc = mdd_links_write(env, tobj, ldata, handle);
5431         if (rc)
5432                 RETURN(rc);
5433
5434         rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
5435         if (rc)
5436                 RETURN(rc);
5437
5438         rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
5439                                 lname->ln_name, handle);
5440         if (rc)
5441                 RETURN(rc);
5442
5443         la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
5444         la->la_valid = LA_CTIME | LA_MTIME;
5445
5446         mdd_write_lock(env, obj, DT_SRC_CHILD);
5447         rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
5448         mdd_write_unlock(env, obj);
5449         if (rc)
5450                 RETURN(rc);
5451
5452         rc = mdd_la_get(env, pobj, pattr);
5453         if (rc)
5454                 RETURN(rc);
5455
5456         la->la_valid = LA_CTIME | LA_MTIME;
5457
5458         mdd_write_lock(env, pobj, DT_SRC_PARENT);
5459         rc = mdd_update_time(env, pobj, pattr, la, handle);
5460         mdd_write_unlock(env, pobj);
5461         if (rc)
5462                 RETURN(rc);
5463
5464         /* FID changes, record it as CL_MIGRATE */
5465         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
5466                                     pobj, pattr, mdd_object_fid(obj),
5467                                     pobj, pattr, lname, lname, handle);
5468         RETURN(rc);
5469 }
5470
5471 int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
5472                          struct md_layout_change *mlc)
5473 {
5474         struct mdd_thread_info *info = mdd_env_info(env);
5475         struct mdd_device *mdd = mdo2mdd(o);
5476         struct mdd_object *obj = md2mdd_obj(o);
5477         struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
5478         struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
5479         struct dt_allocation_hint *hint = &info->mdi_hint;
5480         bool is_plain = false;
5481         struct mdd_xattrs xattrs;
5482         struct thandle *handle;
5483         int rc;
5484
5485         ENTRY;
5486
5487         LASSERT(S_ISDIR(mdd_object_type(obj)));
5488
5489         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
5490         if (rc == -ENODATA)
5491                 is_plain = true;
5492         else if (rc < 0)
5493                 RETURN(rc);
5494
5495         mdd_xattrs_init(&xattrs);
5496         if (is_plain)
5497                 rc = mdd_xattrs_migrate_prep(env, &xattrs, obj, true, true);
5498
5499         handle = mdd_trans_create(env, mdd);
5500         if (IS_ERR(handle))
5501                 GOTO(out, rc = PTR_ERR(handle));
5502
5503         if (is_plain) {
5504                 rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj,
5505                                                  &xattrs, mlc, hint, handle);
5506         } else {
5507                 mlc->mlc_opc = MD_LAYOUT_SPLIT;
5508                 rc = mdo_declare_layout_change(env, obj, mlc, handle);
5509                 if (rc)
5510                         GOTO(stop_trans, rc);
5511
5512                 rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
5513                                                  NULL, handle);
5514         }
5515         if (rc)
5516                 GOTO(stop_trans, rc);
5517
5518         rc = mdd_trans_start(env, mdd, handle);
5519         if (rc)
5520                 GOTO(stop_trans, rc);
5521
5522         if (is_plain) {
5523                 rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, &xattrs,
5524                                          mlc, hint, handle);
5525         } else {
5526                 struct lu_buf *buf = &info->mdi_buf[0];
5527
5528                 buf->lb_buf = mlc->mlc_spec->u.sp_ea.eadata;
5529                 buf->lb_len = mlc->mlc_spec->u.sp_ea.eadatalen;
5530
5531                 mdd_write_lock(env, obj, DT_TGT_CHILD);
5532                 rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LMV,
5533                                    LU_XATTR_CREATE, handle);
5534                 mdd_write_unlock(env, obj);
5535                 if (rc)
5536                         GOTO(stop_trans, rc);
5537
5538                 rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
5539                                                     XATTR_NAME_LMV, handle);
5540         }
5541         if (rc)
5542                 GOTO(stop_trans, rc);
5543
5544         EXIT;
5545
5546 stop_trans:
5547         rc = mdd_trans_stop(env, mdd, rc, handle);
5548 out:
5549         mdd_xattrs_fini(&xattrs);
5550
5551         return rc;
5552 }
5553
5554 const struct md_dir_operations mdd_dir_ops = {
5555         .mdo_is_subdir     = mdd_is_subdir,
5556         .mdo_lookup        = mdd_lookup,
5557         .mdo_create        = mdd_create,
5558         .mdo_rename        = mdd_rename,
5559         .mdo_link          = mdd_link,
5560         .mdo_unlink        = mdd_unlink,
5561         .mdo_create_data   = mdd_create_data,
5562         .mdo_migrate       = mdd_migrate,
5563 };