Whamcloud - gitweb
LU-4684 migrate: replace PFID via source
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46
47 #include "mdd_internal.h"
48
49 static const char dot[] = ".";
50 static const char dotdot[] = "..";
51
52 static struct lu_name lname_dotdot = {
53         .ln_name        = (char *) dotdot,
54         .ln_namelen     = sizeof(dotdot) - 1,
55 };
56
57 static inline int
58 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
59 {
60         if (!lu_name_is_valid(ln))
61                 return -EINVAL;
62         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
63                 return -ENAMETOOLONG;
64         else
65                 return 0;
66 }
67
68 /* Get FID from name and parent */
69 static int
70 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
71              const struct lu_attr *pattr, const struct lu_name *lname,
72              struct lu_fid* fid, int mask)
73 {
74         const char *name                = lname->ln_name;
75         const struct dt_key *key        = (const struct dt_key *)name;
76         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
77         struct mdd_device *m            = mdo2mdd(pobj);
78         struct dt_object *dir           = mdd_object_child(mdd_obj);
79         int rc;
80         ENTRY;
81
82         if (unlikely(mdd_is_dead_obj(mdd_obj)))
83                 RETURN(-ESTALE);
84
85         if (!mdd_object_exists(mdd_obj))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         }
92
93         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
94                                             MOR_TGT_PARENT);
95         if (rc)
96                 RETURN(rc);
97
98         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
99                    dt_try_as_dir(env, dir)))
100                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
101         else
102                 rc = -ENOTDIR;
103
104         RETURN(rc);
105 }
106
107 int mdd_lookup(const struct lu_env *env,
108                struct md_object *pobj, const struct lu_name *lname,
109                struct lu_fid *fid, struct md_op_spec *spec)
110 {
111         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
112         int rc;
113         ENTRY;
114
115         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
116         if (rc != 0)
117                 RETURN(rc);
118
119         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
120                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
121         RETURN(rc);
122 }
123
124 /** Read the link EA into a temp buffer.
125  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
126  * A pointer to the buffer is stored in \a ldata::ld_buf.
127  *
128  * \retval 0 or error
129  */
130 static int __mdd_links_read(const struct lu_env *env,
131                             struct mdd_object *mdd_obj,
132                             struct linkea_data *ldata)
133 {
134         int rc;
135
136         if (!mdd_object_exists(mdd_obj))
137                 return -ENODATA;
138
139         /* First try a small buf */
140         LASSERT(env != NULL);
141         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
142                                                PAGE_SIZE);
143         if (ldata->ld_buf->lb_buf == NULL)
144                 return -ENOMEM;
145
146         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
147         if (rc == -ERANGE) {
148                 /* Buf was too small, figure out what we need. */
149                 lu_buf_free(ldata->ld_buf);
150                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
151                                    XATTR_NAME_LINK);
152                 if (rc < 0)
153                         return rc;
154                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
155                 if (ldata->ld_buf->lb_buf == NULL)
156                         return -ENOMEM;
157                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
158                                   XATTR_NAME_LINK);
159         }
160         if (rc < 0) {
161                 lu_buf_free(ldata->ld_buf);
162                 ldata->ld_buf = NULL;
163                 return rc;
164         }
165
166         return linkea_init(ldata);
167 }
168
169 static int mdd_links_read(const struct lu_env *env,
170                           struct mdd_object *mdd_obj,
171                           struct linkea_data *ldata)
172 {
173         int rc;
174
175         rc = __mdd_links_read(env, mdd_obj, ldata);
176         if (!rc)
177                 rc = linkea_init(ldata);
178
179         return rc;
180 }
181
182 static int mdd_links_read_with_rec(const struct lu_env *env,
183                                    struct mdd_object *mdd_obj,
184                                    struct linkea_data *ldata)
185 {
186         int rc;
187
188         rc = __mdd_links_read(env, mdd_obj, ldata);
189         if (!rc)
190                 rc = linkea_init_with_rec(ldata);
191
192         return rc;
193 }
194
195 /**
196  * Get parent FID of the directory
197  *
198  * Read parent FID from linkEA, if that fails, then do lookup
199  * dotdot to get the parent FID.
200  *
201  * \param[in] env       execution environment
202  * \param[in] obj       object from which to find the parent FID
203  * \param[in] attr      attribute of the object
204  * \param[out] fid      fid to get the parent FID
205  *
206  * \retval              0 if getting the parent FID succeeds.
207  * \retval              negative errno if getting the parent FID fails.
208  **/
209 static inline int mdd_parent_fid(const struct lu_env *env,
210                                  struct mdd_object *obj,
211                                  const struct lu_attr *attr,
212                                  struct lu_fid *fid)
213 {
214         struct mdd_thread_info  *info = mdd_env_info(env);
215         struct linkea_data      ldata = { NULL };
216         struct lu_buf           *buf = &info->mti_link_buf;
217         struct lu_name          lname;
218         int                     rc = 0;
219
220         ENTRY;
221
222         LASSERT(S_ISDIR(mdd_object_type(obj)));
223
224         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
225         if (buf->lb_buf == NULL)
226                 GOTO(lookup, rc = 0);
227
228         ldata.ld_buf = buf;
229         rc = mdd_links_read_with_rec(env, obj, &ldata);
230         if (rc != 0)
231                 GOTO(lookup, rc);
232
233         LASSERT(ldata.ld_leh != NULL);
234         /* Directory should only have 1 parent */
235         if (ldata.ld_leh->leh_reccount > 1)
236                 GOTO(lookup, rc);
237
238         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
239
240         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
241         if (likely(fid_is_sane(fid)))
242                 RETURN(0);
243 lookup:
244         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
245         RETURN(rc);
246 }
247
248 /*
249  * For root fid use special function, which does not compare version component
250  * of fid. Version component is different for root fids on all MDTs.
251  */
252 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
253 {
254         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
255                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
256 }
257
258 /*
259  * return 1: if \a tfid is the fid of the ancestor of \a mo;
260  * return 0: if not;
261  * otherwise: values < 0, errors.
262  */
263 static int mdd_is_parent(const struct lu_env *env,
264                         struct mdd_device *mdd,
265                         struct mdd_object *mo,
266                         const struct lu_attr *attr,
267                         const struct lu_fid *tfid)
268 {
269         struct mdd_object *mp;
270         struct lu_fid *pfid;
271         int rc;
272
273         LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
274         pfid = &mdd_env_info(env)->mti_fid;
275
276         if (mdd_is_root(mdd, mdo2fid(mo)))
277                 return 0;
278
279         if (mdd_is_root(mdd, tfid))
280                 return 1;
281
282         rc = mdd_parent_fid(env, mo, attr, pfid);
283         if (rc)
284                 return rc;
285
286         while (1) {
287                 if (lu_fid_eq(pfid, tfid))
288                         return 1;
289
290                 if (mdd_is_root(mdd, pfid))
291                         return 0;
292
293                 mp = mdd_object_find(env, mdd, pfid);
294                 if (IS_ERR(mp))
295                         return PTR_ERR(mp);
296
297                 if (!mdd_object_exists(mp)) {
298                         mdd_object_put(env, mp);
299                         return -ENOENT;
300                 }
301
302                 rc = mdd_parent_fid(env, mp, attr, pfid);
303                 mdd_object_put(env, mp);
304                 if (rc)
305                         return rc;
306         }
307
308         return 0;
309 }
310
311 /*
312  * No permission check is needed.
313  *
314  * returns 1: if fid is ancestor of @mo;
315  * returns 0: if fid is not an ancestor of @mo;
316  * returns < 0: if error
317  */
318 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
319                   const struct lu_fid *fid)
320 {
321         struct mdd_device *mdd = mdo2mdd(mo);
322         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
323         int rc;
324         ENTRY;
325
326         if (!mdd_object_exists(md2mdd_obj(mo)))
327                 RETURN(-ENOENT);
328
329         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
330                 RETURN(-ENOTDIR);
331
332         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
333         if (rc != 0)
334                 RETURN(rc);
335
336         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
337         RETURN(rc);
338 }
339
340 /*
341  * Check that @dir contains no entries except (possibly) dot and dotdot.
342  *
343  * Returns:
344  *
345  *             0        empty
346  *      -ENOTDIR        not a directory object
347  *    -ENOTEMPTY        not empty
348  *           -ve        other error
349  *
350  */
351 static int mdd_dir_is_empty(const struct lu_env *env,
352                             struct mdd_object *dir)
353 {
354         struct dt_it     *it;
355         struct dt_object *obj;
356         const struct dt_it_ops *iops;
357         int result;
358         ENTRY;
359
360         obj = mdd_object_child(dir);
361         if (!dt_try_as_dir(env, obj))
362                 RETURN(-ENOTDIR);
363
364         iops = &obj->do_index_ops->dio_it;
365         it = iops->init(env, obj, LUDA_64BITHASH);
366         if (!IS_ERR(it)) {
367                 result = iops->get(env, it, (const struct dt_key *)"");
368                 if (result > 0) {
369                         int i;
370                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
371                                 result = iops->next(env, it);
372                         if (result == 0)
373                                 result = -ENOTEMPTY;
374                         else if (result == 1)
375                                 result = 0;
376                 } else if (result == 0)
377                         /*
378                          * Huh? Index contains no zero key?
379                          */
380                         result = -EIO;
381
382                 iops->put(env, it);
383                 iops->fini(env, it);
384         } else
385                 result = PTR_ERR(it);
386         RETURN(result);
387 }
388
389 /**
390  * Determine if the target object can be hard linked, and right now it only
391  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
392  * directory nlink count might exceed the maximum link count(see
393  * osd_object_ref_add), so it only check nlink for non-directories.
394  *
395  * \param[in] env       thread environment
396  * \param[in] obj       object being linked to
397  * \param[in] la        attributes of \a obj
398  *
399  * \retval              0 if \a obj can be hard linked
400  * \retval              negative error if \a obj is a directory or has too
401  *                      many links
402  */
403 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
404                           const struct lu_attr *la)
405 {
406         struct mdd_device *m = mdd_obj2mdd_dev(obj);
407         ENTRY;
408
409         LASSERT(la != NULL);
410
411         /* Subdir count limitation can be broken through
412          * (see osd_object_ref_add), so only check non-directory here. */
413         if (!S_ISDIR(la->la_mode) &&
414             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
415                 RETURN(-EMLINK);
416
417         RETURN(0);
418 }
419
420 /**
421  * Check whether it may create the cobj under the pobj.
422  *
423  * \param[in] env       execution environment
424  * \param[in] pobj      the parent directory
425  * \param[in] pattr     the attribute of the parent directory
426  * \param[in] cobj      the child to be created
427  * \param[in] check_perm        if check WRITE|EXEC permission for parent
428  *
429  * \retval              = 0 create the child under this dir is allowed
430  * \retval              negative errno create the child under this dir is
431  *                      not allowed
432  */
433 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
434                    const struct lu_attr *pattr, struct mdd_object *cobj,
435                    bool check_perm)
436 {
437         int rc = 0;
438         ENTRY;
439
440         if (cobj && mdd_object_exists(cobj))
441                 RETURN(-EEXIST);
442
443         if (mdd_is_dead_obj(pobj))
444                 RETURN(-ENOENT);
445
446         if (check_perm)
447                 rc = mdd_permission_internal_locked(env, pobj, pattr,
448                                                     MAY_WRITE | MAY_EXEC,
449                                                     MOR_TGT_PARENT);
450         RETURN(rc);
451 }
452
453 /*
454  * Check whether can unlink from the pobj in the case of "cobj == NULL".
455  */
456 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
457                    const struct lu_attr *pattr, const struct lu_attr *attr)
458 {
459         int rc;
460         ENTRY;
461
462         if (mdd_is_dead_obj(pobj))
463                 RETURN(-ENOENT);
464
465         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
466                 RETURN(-EPERM);
467
468         rc = mdd_permission_internal_locked(env, pobj, pattr,
469                                             MAY_WRITE | MAY_EXEC,
470                                             MOR_TGT_PARENT);
471         if (rc != 0)
472                 RETURN(rc);
473
474         if (pattr->la_flags & LUSTRE_APPEND_FL)
475                 RETURN(-EPERM);
476
477         RETURN(rc);
478 }
479
480 /*
481  * pobj == NULL is remote ops case, under such case, pobj's
482  * VTX feature has been checked already, no need check again.
483  */
484 static inline int mdd_is_sticky(const struct lu_env *env,
485                                 struct mdd_object *pobj,
486                                 const struct lu_attr *pattr,
487                                 struct mdd_object *cobj,
488                                 const struct lu_attr *cattr)
489 {
490         struct lu_ucred *uc = lu_ucred_assert(env);
491
492         if (pobj != NULL) {
493                 LASSERT(pattr != NULL);
494                 if (!(pattr->la_mode & S_ISVTX) ||
495                     (pattr->la_uid == uc->uc_fsuid))
496                         return 0;
497         }
498
499         LASSERT(cattr != NULL);
500         if (cattr->la_uid == uc->uc_fsuid)
501                 return 0;
502
503         return !md_capable(uc, CFS_CAP_FOWNER);
504 }
505
506 static int mdd_may_delete_entry(const struct lu_env *env,
507                                 struct mdd_object *pobj,
508                                 const struct lu_attr *pattr,
509                                 int check_perm)
510 {
511         ENTRY;
512
513         LASSERT(pobj != NULL);
514         if (!mdd_object_exists(pobj))
515                 RETURN(-ENOENT);
516
517         if (mdd_is_dead_obj(pobj))
518                 RETURN(-ENOENT);
519
520         if (check_perm) {
521                 int rc;
522                 rc = mdd_permission_internal_locked(env, pobj, pattr,
523                                             MAY_WRITE | MAY_EXEC,
524                                             MOR_TGT_PARENT);
525                 if (rc)
526                         RETURN(rc);
527         }
528
529         if (pattr->la_flags & LUSTRE_APPEND_FL)
530                 RETURN(-EPERM);
531
532         RETURN(0);
533 }
534
535 /*
536  * Check whether it may delete the cobj from the pobj.
537  * pobj maybe NULL
538  */
539 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
540                    const struct lu_attr *tpattr, struct mdd_object *tobj,
541                    const struct lu_attr *tattr, const struct lu_attr *cattr,
542                    int check_perm, int check_empty)
543 {
544         int rc = 0;
545         ENTRY;
546
547         if (tpobj) {
548                 LASSERT(tpattr != NULL);
549                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
550                 if (rc != 0)
551                         RETURN(rc);
552         }
553
554         if (tobj == NULL)
555                 RETURN(0);
556
557         if (!mdd_object_exists(tobj))
558                 RETURN(-ENOENT);
559
560         if (mdd_is_dead_obj(tobj))
561                 RETURN(-ESTALE);
562
563         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
564                 RETURN(-EPERM);
565
566         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
567                 RETURN(-EPERM);
568
569         /* additional check the rename case */
570         if (cattr) {
571                 if (S_ISDIR(cattr->la_mode)) {
572                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
573
574                         if (!S_ISDIR(tattr->la_mode))
575                                 RETURN(-ENOTDIR);
576
577                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
578                                 RETURN(-EBUSY);
579                 } else if (S_ISDIR(tattr->la_mode))
580                         RETURN(-EISDIR);
581         }
582
583         if (S_ISDIR(tattr->la_mode) && check_empty)
584                 rc = mdd_dir_is_empty(env, tobj);
585
586         RETURN(rc);
587 }
588
589 /**
590  * Check whether it can create the link file(linked to @src_obj) under
591  * the target directory(@tgt_obj), and src_obj has been locked by
592  * mdd_write_lock.
593  *
594  * \param[in] env       execution environment
595  * \param[in] tgt_obj   the target directory
596  * \param[in] tattr     attributes of target directory
597  * \param[in] lname     the link name
598  * \param[in] src_obj   source object for link
599  * \param[in] cattr     attributes for source object
600  *
601  * \retval              = 0 it is allowed to create the link file under tgt_obj
602  * \retval              negative error not allowed to create the link file
603  */
604 static int mdd_link_sanity_check(const struct lu_env *env,
605                                  struct mdd_object *tgt_obj,
606                                  const struct lu_attr *tattr,
607                                  const struct lu_name *lname,
608                                  struct mdd_object *src_obj,
609                                  const struct lu_attr *cattr)
610 {
611         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
612         int rc = 0;
613         ENTRY;
614
615         if (!mdd_object_exists(src_obj))
616                 RETURN(-ENOENT);
617
618         if (mdd_is_dead_obj(src_obj))
619                 RETURN(-ESTALE);
620
621         /* Local ops, no lookup before link, check filename length here. */
622         rc = mdd_name_check(m, lname);
623         if (rc < 0)
624                 RETURN(rc);
625
626         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
627                 RETURN(-EPERM);
628
629         if (S_ISDIR(mdd_object_type(src_obj)))
630                 RETURN(-EPERM);
631
632         LASSERT(src_obj != tgt_obj);
633         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
634         if (rc != 0)
635                 RETURN(rc);
636
637         rc = __mdd_may_link(env, src_obj, cattr);
638
639         RETURN(rc);
640 }
641
642 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
643                                    const char *name, struct thandle *handle)
644 {
645         struct dt_object *next = mdd_object_child(pobj);
646         int rc;
647         ENTRY;
648
649         if (dt_try_as_dir(env, next))
650                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
651         else
652                 rc = -ENOTDIR;
653
654         RETURN(rc);
655 }
656
657 static int __mdd_index_insert_only(const struct lu_env *env,
658                                    struct mdd_object *pobj,
659                                    const struct lu_fid *lf, __u32 type,
660                                    const char *name,
661                                    struct thandle *handle)
662 {
663         struct dt_object *next = mdd_object_child(pobj);
664         int               rc;
665         ENTRY;
666
667         if (dt_try_as_dir(env, next)) {
668                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
669
670                 rec->rec_fid = lf;
671                 rec->rec_type = type;
672                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
673                                (const struct dt_key *)name, handle);
674         } else {
675                 rc = -ENOTDIR;
676         }
677         RETURN(rc);
678 }
679
680 /* insert named index, add reference if isdir */
681 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
682                               const struct lu_fid *lf, __u32 type,
683                               const char *name, struct thandle *handle)
684 {
685         int rc;
686         ENTRY;
687
688         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
689         if (rc == 0 && S_ISDIR(type)) {
690                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
691                 mdo_ref_add(env, pobj, handle);
692                 mdd_write_unlock(env, pobj);
693         }
694
695         RETURN(rc);
696 }
697
698 /* delete named index, drop reference if isdir */
699 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
700                               const char *name, int is_dir,
701                               struct thandle *handle)
702 {
703         int               rc;
704         ENTRY;
705
706         rc = __mdd_index_delete_only(env, pobj, name, handle);
707         if (rc == 0 && is_dir) {
708                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
709                 mdo_ref_del(env, pobj, handle);
710                 mdd_write_unlock(env, pobj);
711         }
712
713         RETURN(rc);
714 }
715
716 static int mdd_llog_record_calc_size(const struct lu_env *env,
717                                      const struct lu_name *tname,
718                                      const struct lu_name *sname)
719 {
720         const struct lu_ucred   *uc = lu_ucred(env);
721         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
722         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
723
724         if (sname != NULL)
725                 crf |= CLF_RENAME;
726
727         if (uc != NULL && uc->uc_jobid[0] != '\0')
728                 crf |= CLF_JOBID;
729
730         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
731                              changelog_rec_offset(crf, crfe) +
732                              (tname != NULL ? tname->ln_namelen : 0) +
733                              (sname != NULL ? 1 + sname->ln_namelen : 0));
734 }
735
736 int mdd_declare_changelog_store(const struct lu_env *env,
737                                 struct mdd_device *mdd,
738                                 enum changelog_rec_type type,
739                                 const struct lu_name *tname,
740                                 const struct lu_name *sname,
741                                 struct thandle *handle)
742 {
743         struct obd_device               *obd = mdd2obd_dev(mdd);
744         struct llog_ctxt                *ctxt;
745         struct llog_changelog_rec       *rec;
746         struct lu_buf                   *buf;
747         struct thandle                  *llog_th;
748         int                              reclen;
749         int                              rc;
750
751         if (!mdd_changelog_enabled(env, mdd, type))
752                 return 0;
753
754         reclen = mdd_llog_record_calc_size(env, tname, sname);
755         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
756         if (buf->lb_buf == NULL)
757                 return -ENOMEM;
758
759         rec = buf->lb_buf;
760         rec->cr_hdr.lrh_len = reclen;
761         rec->cr_hdr.lrh_type = CHANGELOG_REC;
762
763         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
764         if (ctxt == NULL)
765                 return -ENXIO;
766
767         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
768         if (IS_ERR(llog_th))
769                 GOTO(out_put, rc = PTR_ERR(llog_th));
770
771         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
772
773 out_put:
774         llog_ctxt_put(ctxt);
775
776         return rc;
777 }
778
779 /** Add a changelog entry \a rec to the changelog llog
780  * \param mdd
781  * \param rec
782  * \param handle - currently ignored since llogs start their own transaction;
783  *              this will hopefully be fixed in llog rewrite
784  * \retval 0 ok
785  */
786 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
787                         struct llog_changelog_rec *rec, struct thandle *th)
788 {
789         struct obd_device       *obd = mdd2obd_dev(mdd);
790         struct llog_ctxt        *ctxt;
791         struct thandle          *llog_th;
792         int                      rc;
793
794         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
795                                             changelog_rec_varsize(&rec->cr));
796
797         /* llog_lvfs_write_rec sets the llog tail len */
798         rec->cr_hdr.lrh_type = CHANGELOG_REC;
799         rec->cr.cr_time = cl_time();
800
801         spin_lock(&mdd->mdd_cl.mc_lock);
802         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
803          * but as long as the MDD transactions are ordered correctly for e.g.
804          * rename conflicts, I don't think this should matter. */
805         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
806         spin_unlock(&mdd->mdd_cl.mc_lock);
807
808         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
809         if (ctxt == NULL)
810                 return -ENXIO;
811
812         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
813         if (IS_ERR(llog_th))
814                 GOTO(out_put, rc = PTR_ERR(llog_th));
815
816         /* nested journal transaction */
817         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
818
819         /* time to recover some space ?? */
820         if (likely(!mdd->mdd_changelog_gc ||
821                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
822                    mdd->mdd_changelog_min_gc_interval >=
823                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
824                 /* save a spin_lock trip */
825                 goto out_put;
826         spin_lock(&mdd->mdd_cl.mc_lock);
827         if (likely(mdd->mdd_changelog_gc &&
828                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
829                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
830                         mdd->mdd_changelog_min_gc_interval)) {
831                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
832                              mdd->mdd_changelog_min_free_cat_entries ||
833                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
834                         CWARN("%s:%s low on changelog_catalog free entries, "
835                               "starting ChangeLog garbage collection thread\n",
836                               obd->obd_name,
837                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
838                                 " simulate" : "");
839
840                         /* indicate further kthread run will occur outside
841                          * right after current journal transaction filling has
842                          * completed
843                          */
844                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
845                 }
846                 /* next check in mdd_changelog_min_gc_interval anyway
847                  */
848                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
849         }
850         spin_unlock(&mdd->mdd_cl.mc_lock);
851 out_put:
852         llog_ctxt_put(ctxt);
853         if (rc > 0)
854                 rc = 0;
855         return rc;
856 }
857
858 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
859                                          const struct lu_fid *sfid,
860                                          const struct lu_fid *spfid,
861                                          const struct lu_name *sname)
862 {
863         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
864         size_t extsize;
865
866         LASSERT(sfid != NULL);
867         LASSERT(spfid != NULL);
868         LASSERT(sname != NULL);
869
870         extsize = sname->ln_namelen + 1;
871
872         rnm->cr_sfid = *sfid;
873         rnm->cr_spfid = *spfid;
874
875         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
876         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
877         rec->cr_namelen += extsize;
878 }
879
880 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
881 {
882         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
883
884         if (jobid == NULL || jobid[0] == '\0')
885                 return;
886
887         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
888 }
889
890 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
891 {
892         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
893
894         ef->cr_extra_flags = eflags;
895 }
896
897 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
898                                     __u64 uid, __u64 gid)
899 {
900         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
901
902         uidgid->cr_uid = uid;
903         uidgid->cr_gid = gid;
904 }
905
906 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
907                                  lnet_nid_t nid)
908 {
909         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
910
911         clnid->cr_nid = nid;
912 }
913
914 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, int flags)
915 {
916         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
917
918         omd->cr_openflags = (__u32)flags;
919 }
920
921 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
922                                    const char *xattr_name)
923 {
924         struct changelog_ext_xattr    *xattr = changelog_rec_xattr(rec);
925
926         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
927 }
928
929 /** Store a namespace change changelog record
930  * If this fails, we must fail the whole transaction; we don't
931  * want the change to commit without the log entry.
932  * \param target - mdd_object of change
933  * \param tpfid - target parent dir/object fid
934  * \param sfid - source object fid
935  * \param spfid - source parent fid
936  * \param tname - target name string
937  * \param sname - source name string
938  * \param handle - transaction handle
939  */
940 int mdd_changelog_ns_store(const struct lu_env *env,
941                            struct mdd_device *mdd,
942                            enum changelog_rec_type type,
943                            enum changelog_rec_flags crf,
944                            struct mdd_object *target,
945                            const struct lu_fid *tpfid,
946                            const struct lu_fid *sfid,
947                            const struct lu_fid *spfid,
948                            const struct lu_name *tname,
949                            const struct lu_name *sname,
950                            struct thandle *handle)
951 {
952         const struct lu_ucred           *uc = lu_ucred(env);
953         struct llog_changelog_rec       *rec;
954         struct lu_buf                   *buf;
955         int                              reclen;
956         __u64                            xflags = CLFE_INVALID;
957         int                              rc;
958         ENTRY;
959
960         if (!mdd_changelog_enabled(env, mdd, type))
961                 RETURN(0);
962
963         LASSERT(tpfid != NULL);
964         LASSERT(tname != NULL);
965         LASSERT(handle != NULL);
966
967         reclen = mdd_llog_record_calc_size(env, tname, sname);
968         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
969         if (buf->lb_buf == NULL)
970                 RETURN(-ENOMEM);
971         rec = buf->lb_buf;
972
973         crf &= CLF_FLAGMASK;
974         crf |= CLF_EXTRA_FLAGS;
975
976         if (uc) {
977                 if (uc->uc_jobid[0] != '\0')
978                         crf |= CLF_JOBID;
979                 xflags |= CLFE_UIDGID;
980                 xflags |= CLFE_NID;
981         }
982
983         if (sname != NULL)
984                 crf |= CLF_RENAME;
985         else
986                 crf |= CLF_VERSION;
987
988         rec->cr.cr_flags = crf;
989
990         if (crf & CLF_EXTRA_FLAGS) {
991                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
992                 if (xflags & CLFE_UIDGID)
993                         mdd_changelog_rec_extra_uidgid(&rec->cr,
994                                                        uc->uc_uid, uc->uc_gid);
995                 if (xflags & CLFE_NID)
996                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
997         }
998
999         rec->cr.cr_type = (__u32)type;
1000         rec->cr.cr_pfid = *tpfid;
1001         rec->cr.cr_namelen = tname->ln_namelen;
1002         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1003
1004         if (crf & CLF_RENAME)
1005                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1006
1007         if (crf & CLF_JOBID)
1008                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1009
1010         if (likely(target != NULL)) {
1011                 rec->cr.cr_tfid = *mdo2fid(target);
1012                 target->mod_cltime = ktime_get();
1013         } else {
1014                 fid_zero(&rec->cr.cr_tfid);
1015         }
1016
1017         rc = mdd_changelog_store(env, mdd, rec, handle);
1018         if (rc < 0) {
1019                 CERROR("%s: cannot store changelog record: type = %d, "
1020                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1021                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1022                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1023                 return -EFAULT;
1024         }
1025
1026         return 0;
1027 }
1028
1029 static int __mdd_links_add(const struct lu_env *env,
1030                            struct mdd_object *mdd_obj,
1031                            struct linkea_data *ldata,
1032                            const struct lu_name *lname,
1033                            const struct lu_fid *pfid,
1034                            int first, int check)
1035 {
1036         int rc;
1037
1038         if (ldata->ld_leh == NULL) {
1039                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1040                 if (rc) {
1041                         if (rc != -ENODATA)
1042                                 return rc;
1043                         rc = linkea_data_new(ldata,
1044                                              &mdd_env_info(env)->mti_link_buf);
1045                         if (rc)
1046                                 return rc;
1047                 }
1048         }
1049
1050         if (check) {
1051                 rc = linkea_links_find(ldata, lname, pfid);
1052                 if (rc && rc != -ENOENT)
1053                         return rc;
1054                 if (rc == 0)
1055                         return -EEXIST;
1056         }
1057
1058         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1059                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1060
1061                 *tfid = *pfid;
1062                 tfid->f_ver = ~0;
1063                 linkea_add_buf(ldata, lname, tfid);
1064         }
1065
1066         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1067                 linkea_add_buf(ldata, lname, pfid);
1068
1069         return linkea_add_buf(ldata, lname, pfid);
1070 }
1071
1072 static int __mdd_links_del(const struct lu_env *env,
1073                            struct mdd_object *mdd_obj,
1074                            struct linkea_data *ldata,
1075                            const struct lu_name *lname,
1076                            const struct lu_fid *pfid)
1077 {
1078         int rc;
1079
1080         if (ldata->ld_leh == NULL) {
1081                 rc = mdd_links_read(env, mdd_obj, ldata);
1082                 if (rc)
1083                         return rc;
1084         }
1085
1086         rc = linkea_links_find(ldata, lname, pfid);
1087         if (rc)
1088                 return rc;
1089
1090         linkea_del_buf(ldata, lname);
1091         return 0;
1092 }
1093
1094 static int mdd_linkea_prepare(const struct lu_env *env,
1095                               struct mdd_object *mdd_obj,
1096                               const struct lu_fid *oldpfid,
1097                               const struct lu_name *oldlname,
1098                               const struct lu_fid *newpfid,
1099                               const struct lu_name *newlname,
1100                               int first, int check,
1101                               struct linkea_data *ldata)
1102 {
1103         int rc = 0;
1104         ENTRY;
1105
1106         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1107                 RETURN(0);
1108
1109         LASSERT(oldpfid != NULL || newpfid != NULL);
1110
1111         if (mdd_obj->mod_flags & DEAD_OBJ)
1112                 /* Unnecessary to update linkEA for dead object.  */
1113                 RETURN(0);
1114
1115         if (oldpfid != NULL) {
1116                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1117                 if (rc) {
1118                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1119                                 RETURN(rc);
1120
1121                         /* No changes done. */
1122                         rc = 0;
1123                 }
1124         }
1125
1126         /* If renaming, add the new record */
1127         if (newpfid != NULL)
1128                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1129                                      first, check);
1130
1131         RETURN(rc);
1132 }
1133
1134 int mdd_links_rename(const struct lu_env *env,
1135                      struct mdd_object *mdd_obj,
1136                      const struct lu_fid *oldpfid,
1137                      const struct lu_name *oldlname,
1138                      const struct lu_fid *newpfid,
1139                      const struct lu_name *newlname,
1140                      struct thandle *handle,
1141                      struct linkea_data *ldata,
1142                      int first, int check)
1143 {
1144         int rc = 0;
1145         ENTRY;
1146
1147         if (ldata == NULL) {
1148                 ldata = &mdd_env_info(env)->mti_link_data;
1149                 memset(ldata, 0, sizeof(*ldata));
1150                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1151                                         newpfid, newlname, first, check, ldata);
1152                 if (rc)
1153                         GOTO(out, rc);
1154         }
1155
1156         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1157                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1158
1159         GOTO(out, rc);
1160
1161 out:
1162         if (rc != 0) {
1163                 if (newlname == NULL)
1164                         CERROR("link_ea add failed %d "DFID"\n",
1165                                rc, PFID(mdd_object_fid(mdd_obj)));
1166                 else if (oldpfid == NULL)
1167                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1168                                newlname->ln_namelen, newlname->ln_name, rc,
1169                                PFID(mdd_object_fid(mdd_obj)));
1170                 else if (newpfid == NULL)
1171                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1172                                oldlname->ln_namelen, oldlname->ln_name, rc,
1173                                PFID(mdd_object_fid(mdd_obj)));
1174                 else
1175                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1176                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1177                                newlname->ln_namelen, newlname->ln_name, rc,
1178                                PFID(mdd_object_fid(mdd_obj)));
1179         }
1180
1181         if (is_vmalloc_addr(ldata->ld_buf))
1182                 /* if we vmalloced a large buffer drop it */
1183                 lu_buf_free(ldata->ld_buf);
1184
1185         return rc;
1186 }
1187
1188 static inline int mdd_links_add(const struct lu_env *env,
1189                                 struct mdd_object *mdd_obj,
1190                                 const struct lu_fid *pfid,
1191                                 const struct lu_name *lname,
1192                                 struct thandle *handle,
1193                                 struct linkea_data *ldata, int first)
1194 {
1195         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1196                                 pfid, lname, handle, ldata, first, 0);
1197 }
1198
1199 static inline int mdd_links_del(const struct lu_env *env,
1200                                 struct mdd_object *mdd_obj,
1201                                 const struct lu_fid *pfid,
1202                                 const struct lu_name *lname,
1203                                 struct thandle *handle)
1204 {
1205         return mdd_links_rename(env, mdd_obj, pfid, lname,
1206                                 NULL, NULL, handle, NULL, 0, 0);
1207 }
1208
1209 /** Read the link EA into a temp buffer.
1210  * Uses the name_buf since it is generally large.
1211  * \retval IS_ERR err
1212  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1213  */
1214 struct lu_buf *mdd_links_get(const struct lu_env *env,
1215                              struct mdd_object *mdd_obj)
1216 {
1217         struct linkea_data ldata = { NULL };
1218         int rc;
1219
1220         rc = mdd_links_read(env, mdd_obj, &ldata);
1221         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1222 }
1223
1224 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1225                     struct linkea_data *ldata, struct thandle *handle)
1226 {
1227         const struct lu_buf *buf;
1228         int                 rc;
1229
1230         if (ldata == NULL || ldata->ld_buf == NULL ||
1231             ldata->ld_leh == NULL)
1232                 return 0;
1233
1234         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1235                 return 0;
1236
1237 again:
1238         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1239                                 ldata->ld_leh->leh_len);
1240         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1241         if (unlikely(rc == -ENOSPC)) {
1242                 rc = linkea_overflow_shrink(ldata);
1243                 if (likely(rc > 0))
1244                         goto again;
1245         }
1246
1247         return rc;
1248 }
1249
1250 static int mdd_declare_links_add(const struct lu_env *env,
1251                                  struct mdd_object *mdd_obj,
1252                                  struct thandle *handle,
1253                                  struct linkea_data *ldata)
1254 {
1255         int     rc;
1256         int     ea_len;
1257         void    *linkea;
1258
1259         if (ldata != NULL && ldata->ld_leh != NULL) {
1260                 ea_len = ldata->ld_leh->leh_len;
1261                 linkea = ldata->ld_buf->lb_buf;
1262         } else {
1263                 ea_len = MAX_LINKEA_SIZE;
1264                 linkea = NULL;
1265         }
1266
1267         rc = mdo_declare_xattr_set(env, mdd_obj,
1268                                    mdd_buf_get_const(env, linkea, ea_len),
1269                                    XATTR_NAME_LINK, 0, handle);
1270
1271         return rc;
1272 }
1273
1274 static inline int mdd_declare_links_del(const struct lu_env *env,
1275                                         struct mdd_object *c,
1276                                         struct thandle *handle)
1277 {
1278         int rc = 0;
1279
1280         /* For directory, the linkEA will be removed together
1281          * with the object. */
1282         if (!S_ISDIR(mdd_object_type(c)))
1283                 rc = mdd_declare_links_add(env, c, handle, NULL);
1284
1285         return rc;
1286 }
1287
1288 static int mdd_declare_link(const struct lu_env *env,
1289                             struct mdd_device *mdd,
1290                             struct mdd_object *p,
1291                             struct mdd_object *c,
1292                             const struct lu_name *name,
1293                             struct thandle *handle,
1294                             struct lu_attr *la,
1295                             struct linkea_data *data)
1296 {
1297         struct lu_fid tfid = *mdo2fid(c);
1298         int rc;
1299
1300         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1301                 tfid.f_oid = cfs_fail_val;
1302
1303         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1304                                       name->ln_name, handle);
1305         if (rc != 0)
1306                 return rc;
1307
1308         rc = mdo_declare_ref_add(env, c, handle);
1309         if (rc != 0)
1310                 return rc;
1311
1312         la->la_valid = LA_CTIME | LA_MTIME;
1313         rc = mdo_declare_attr_set(env, p, la, handle);
1314         if (rc != 0)
1315                 return rc;
1316
1317         la->la_valid = LA_CTIME;
1318         rc = mdo_declare_attr_set(env, c, la, handle);
1319         if (rc != 0)
1320                 return rc;
1321
1322         rc = mdd_declare_links_add(env, c, handle, data);
1323         if (rc != 0)
1324                 return rc;
1325
1326         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1327                                          handle);
1328
1329         return rc;
1330 }
1331
1332 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1333                     struct md_object *src_obj, const struct lu_name *lname,
1334                     struct md_attr *ma)
1335 {
1336         const char *name = lname->ln_name;
1337         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1338         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1339         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1340         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1341         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1342         struct mdd_device *mdd = mdo2mdd(src_obj);
1343         struct thandle *handle;
1344         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1345         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1346         int rc;
1347         ENTRY;
1348
1349         rc = mdd_la_get(env, mdd_sobj, cattr);
1350         if (rc != 0)
1351                 RETURN(rc);
1352
1353         rc = mdd_la_get(env, mdd_tobj, tattr);
1354         if (rc != 0)
1355                 RETURN(rc);
1356
1357         /*
1358          * If we are using project inheritance, we only allow hard link
1359          * creation in our tree when the project IDs are the same;
1360          * otherwise the tree quota mechanism could be circumvented.
1361          */
1362         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1363             (tattr->la_projid != cattr->la_projid))
1364                 RETURN(-EXDEV);
1365
1366         handle = mdd_trans_create(env, mdd);
1367         if (IS_ERR(handle))
1368                 GOTO(out_pending, rc = PTR_ERR(handle));
1369
1370         memset(ldata, 0, sizeof(*ldata));
1371
1372         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1373         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1374
1375         /* Note: even this function will change ldata, but it comes from
1376          * thread_info, which is completely temporary and only seen in
1377          * this function, so we do not need reset ldata once it fails.*/
1378         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1379                                 lname, 0, 0, ldata);
1380         if (rc != 0)
1381                 GOTO(stop, rc);
1382
1383         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1384                               la, ldata);
1385         if (rc)
1386                 GOTO(stop, rc);
1387
1388         rc = mdd_trans_start(env, mdd, handle);
1389         if (rc)
1390                 GOTO(stop, rc);
1391
1392         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1393         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1394                                    cattr);
1395         if (rc)
1396                 GOTO(out_unlock, rc);
1397
1398         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1399                 rc = mdo_ref_add(env, mdd_sobj, handle);
1400                 if (rc != 0)
1401                         GOTO(out_unlock, rc);
1402         }
1403
1404         *tfid = *mdo2fid(mdd_sobj);
1405         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1406                 tfid->f_oid = cfs_fail_val;
1407
1408         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1409                                      mdd_object_type(mdd_sobj), name, handle);
1410         if (rc != 0) {
1411                 mdo_ref_del(env, mdd_sobj, handle);
1412                 GOTO(out_unlock, rc);
1413         }
1414
1415         la->la_valid = LA_CTIME | LA_MTIME;
1416         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1417         if (rc)
1418                 GOTO(out_unlock, rc);
1419
1420         la->la_valid = LA_CTIME;
1421         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1422         if (rc == 0)
1423                 /* Note: The failure of links_add should not cause the
1424                  * link failure, so do not check return value. */
1425                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1426                               lname, handle, ldata, 0);
1427
1428         EXIT;
1429 out_unlock:
1430         mdd_write_unlock(env, mdd_sobj);
1431         if (rc == 0)
1432                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1433                                             mdo2fid(mdd_tobj), NULL, NULL,
1434                                             lname, NULL, handle);
1435 stop:
1436         rc = mdd_trans_stop(env, mdd, rc, handle);
1437         if (is_vmalloc_addr(ldata->ld_buf))
1438                 /* if we vmalloced a large buffer drop it */
1439                 lu_buf_free(ldata->ld_buf);
1440 out_pending:
1441         return rc;
1442 }
1443
1444 static int mdd_mark_orphan_object(const struct lu_env *env,
1445                                 struct mdd_object *obj, struct thandle *handle,
1446                                 bool declare)
1447 {
1448         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1449         int rc;
1450
1451         if (!S_ISDIR(mdd_object_type(obj)))
1452                 return 0;
1453
1454         attr->la_valid = LA_FLAGS;
1455         attr->la_flags = LUSTRE_ORPHAN_FL;
1456
1457         if (declare)
1458                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1459         else
1460                 rc = mdo_attr_set(env, obj, attr, handle);
1461
1462         return rc;
1463 }
1464
1465 static int mdd_declare_finish_unlink(const struct lu_env *env,
1466                                      struct mdd_object *obj,
1467                                      struct thandle *handle)
1468 {
1469         int rc;
1470
1471         /* Sigh, we do not know if the unlink object will become orphan in
1472          * declare phase, but fortunately the flags here does not matter
1473          * in current declare implementation */
1474         rc = mdd_mark_orphan_object(env, obj, handle, true);
1475         if (rc != 0)
1476                 return rc;
1477
1478         rc = mdo_declare_destroy(env, obj, handle);
1479         if (rc != 0)
1480                 return rc;
1481
1482         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1483         if (rc != 0)
1484                 return rc;
1485
1486         return mdd_declare_links_del(env, obj, handle);
1487 }
1488
1489 /* caller should take a lock before calling */
1490 int mdd_finish_unlink(const struct lu_env *env,
1491                       struct mdd_object *obj, struct md_attr *ma,
1492                       const struct mdd_object *pobj,
1493                       const struct lu_name *lname,
1494                       struct thandle *th)
1495 {
1496         int rc = 0;
1497         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1498         ENTRY;
1499
1500         LASSERT(mdd_write_locked(env, obj) != 0);
1501
1502         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1503                 /* add new orphan and the object
1504                  * will be deleted during mdd_close() */
1505                 obj->mod_flags |= DEAD_OBJ;
1506                 if (obj->mod_count) {
1507                         rc = mdd_orphan_insert(env, obj, th);
1508                         if (rc == 0)
1509                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1510                                         "orphan list, open count = %d\n",
1511                                         PFID(mdd_object_fid(obj)),
1512                                         obj->mod_count);
1513                         else
1514                                 CERROR("Object "DFID" fail to be an orphan, "
1515                                        "open count = %d, maybe cause failed "
1516                                        "open replay\n",
1517                                         PFID(mdd_object_fid(obj)),
1518                                         obj->mod_count);
1519
1520                         /* mark object as an orphan here, not
1521                          * before mdd_orphan_insert() as racing
1522                          * mdd_la_get() may propagate ORPHAN_OBJ
1523                          * causing the asserition */
1524                         rc = mdd_mark_orphan_object(env, obj, th, false);
1525                 } else {
1526                         rc = mdo_destroy(env, obj, th);
1527                 }
1528         } else if (!is_dir) {
1529                 /* old files may not have link ea; ignore errors */
1530                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1531         }
1532
1533         RETURN(rc);
1534 }
1535
1536 /*
1537  * pobj maybe NULL
1538  * has mdd_write_lock on cobj already, but not on pobj yet
1539  */
1540 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1541                             const struct lu_attr *pattr,
1542                             struct mdd_object *cobj,
1543                             const struct lu_attr *cattr)
1544 {
1545         int rc;
1546         ENTRY;
1547
1548         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1549
1550         RETURN(rc);
1551 }
1552
1553 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1554                               struct mdd_object *p, struct mdd_object *c,
1555                               const struct lu_name *name, struct md_attr *ma,
1556                               struct thandle *handle, int no_name, int is_dir)
1557 {
1558         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1559         int              rc;
1560
1561         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1562                 if (likely(no_name == 0)) {
1563                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1564                                                       handle);
1565                         if (rc != 0)
1566                                 return rc;
1567                 }
1568
1569                 if (is_dir != 0) {
1570                         rc = mdo_declare_ref_del(env, p, handle);
1571                         if (rc != 0)
1572                                 return rc;
1573                 }
1574         }
1575
1576         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1577         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1578         la->la_valid = LA_CTIME | LA_MTIME;
1579         rc = mdo_declare_attr_set(env, p, la, handle);
1580         if (rc)
1581                 return rc;
1582
1583         if (c != NULL) {
1584                 rc = mdo_declare_ref_del(env, c, handle);
1585                 if (rc)
1586                         return rc;
1587
1588                 rc = mdo_declare_ref_del(env, c, handle);
1589                 if (rc)
1590                         return rc;
1591
1592                 la->la_valid = LA_CTIME;
1593                 rc = mdo_declare_attr_set(env, c, la, handle);
1594                 if (rc)
1595                         return rc;
1596
1597                 rc = mdd_declare_finish_unlink(env, c, handle);
1598                 if (rc)
1599                         return rc;
1600
1601                 /* FIXME: need changelog for remove entry */
1602                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1603                                                  NULL, handle);
1604         }
1605
1606         return rc;
1607 }
1608
1609 /*
1610  * test if a file has an HSM archive
1611  * if HSM attributes are not found in ma update them from
1612  * HSM xattr
1613  */
1614 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1615                                    struct mdd_object *obj,
1616                                    struct md_attr *ma)
1617 {
1618         ENTRY;
1619
1620         if (!(ma->ma_valid & MA_HSM)) {
1621                 /* no HSM MD provided, read xattr */
1622                 struct lu_buf   *hsm_buf;
1623                 const size_t     buflen = sizeof(struct hsm_attrs);
1624                 int              rc;
1625
1626                 hsm_buf = mdd_buf_get(env, NULL, 0);
1627                 lu_buf_alloc(hsm_buf, buflen);
1628                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1629                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1630                 lu_buf_free(hsm_buf);
1631                 if (rc < 0)
1632                         RETURN(false);
1633
1634                 ma->ma_valid |= MA_HSM;
1635         }
1636         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1637                 RETURN(true);
1638         RETURN(false);
1639 }
1640
1641 /**
1642  * Delete name entry and the object.
1643  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1644  * does not exist for this object, and it could only happen during resending
1645  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1646  * is also needed in this case(needed by changelog), so we have to add another
1647  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1648  * the ENOENT failure should be able to be fixed by redo mechanism.
1649  */
1650 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1651                       struct md_object *cobj, const struct lu_name *lname,
1652                       struct md_attr *ma, int no_name)
1653 {
1654         const char *name = lname->ln_name;
1655         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1656         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1657         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1658         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1659         struct mdd_object *mdd_cobj = NULL;
1660         struct mdd_device *mdd = mdo2mdd(pobj);
1661         struct thandle    *handle;
1662         int rc, is_dir = 0, cl_flags = 0;
1663         ENTRY;
1664
1665         /* let shutdown to start */
1666         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
1667
1668         /* cobj == NULL means only delete name entry */
1669         if (likely(cobj != NULL)) {
1670                 mdd_cobj = md2mdd_obj(cobj);
1671                 if (mdd_object_exists(mdd_cobj) == 0)
1672                         RETURN(-ENOENT);
1673         }
1674
1675         rc = mdd_la_get(env, mdd_pobj, pattr);
1676         if (rc)
1677                 RETURN(rc);
1678
1679         if (likely(mdd_cobj != NULL)) {
1680                 /* fetch cattr */
1681                 rc = mdd_la_get(env, mdd_cobj, cattr);
1682                 if (rc)
1683                         RETURN(rc);
1684
1685                 is_dir = S_ISDIR(cattr->la_mode);
1686                 /* search for an existing archive.
1687                  * we should check ahead as the object
1688                  * can be destroyed in this transaction */
1689                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1690                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1691         }
1692
1693         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1694         if (rc)
1695                 RETURN(rc);
1696
1697         handle = mdd_trans_create(env, mdd);
1698         if (IS_ERR(handle))
1699                 RETURN(PTR_ERR(handle));
1700
1701         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1702                                 lname, ma, handle, no_name, is_dir);
1703         if (rc)
1704                 GOTO(stop, rc);
1705
1706         rc = mdd_trans_start(env, mdd, handle);
1707         if (rc)
1708                 GOTO(stop, rc);
1709
1710         if (likely(mdd_cobj != NULL))
1711                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1712
1713         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1714                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1715                 if (rc)
1716                         GOTO(cleanup, rc);
1717         }
1718
1719         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1720             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1721                 GOTO(cleanup, rc = 0);
1722
1723         if (likely(mdd_cobj != NULL)) {
1724                 rc = mdo_ref_del(env, mdd_cobj, handle);
1725                 if (rc != 0) {
1726                         __mdd_index_insert_only(env, mdd_pobj,
1727                                                 mdo2fid(mdd_cobj),
1728                                                 mdd_object_type(mdd_cobj),
1729                                                 name, handle);
1730                         GOTO(cleanup, rc);
1731                 }
1732
1733                 if (is_dir)
1734                         /* unlink dot */
1735                         mdo_ref_del(env, mdd_cobj, handle);
1736
1737                 /* fetch updated nlink */
1738                 rc = mdd_la_get(env, mdd_cobj, cattr);
1739                 if (rc)
1740                         GOTO(cleanup, rc);
1741         }
1742
1743         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1744         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1745
1746         la->la_valid = LA_CTIME | LA_MTIME;
1747         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1748         if (rc)
1749                 GOTO(cleanup, rc);
1750
1751         /* Enough for only unlink the entry */
1752         if (unlikely(mdd_cobj == NULL))
1753                 GOTO(stop, rc);
1754
1755         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1756                 /* update ctime of an unlinked file only if it is still
1757                  * opened or a link still exists */
1758                 la->la_valid = LA_CTIME;
1759                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1760                 if (rc)
1761                         GOTO(cleanup, rc);
1762         }
1763
1764         /* XXX: this transfer to ma will be removed with LOD/OSP */
1765         ma->ma_attr = *cattr;
1766         ma->ma_valid |= MA_INODE;
1767         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1768         if (rc != 0)
1769                 GOTO(cleanup, rc);
1770
1771         /* fetch updated nlink */
1772         rc = mdd_la_get(env, mdd_cobj, cattr);
1773         /* if object is removed then we can't get its attrs,
1774          * use last get */
1775         if (rc == -ENOENT) {
1776                 cattr->la_nlink = 0;
1777                 rc = 0;
1778         }
1779
1780         if (cattr->la_nlink == 0) {
1781                 ma->ma_attr = *cattr;
1782                 ma->ma_valid |= MA_INODE;
1783         }
1784
1785         EXIT;
1786 cleanup:
1787         if (likely(mdd_cobj != NULL))
1788                 mdd_write_unlock(env, mdd_cobj);
1789
1790         if (rc == 0) {
1791                 if (cattr->la_nlink == 0)
1792                         cl_flags |= CLF_UNLINK_LAST;
1793                 else
1794                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1795
1796                 rc = mdd_changelog_ns_store(env, mdd,
1797                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1798                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1799                         handle);
1800         }
1801
1802 stop:
1803         rc = mdd_trans_stop(env, mdd, rc, handle);
1804
1805         return rc;
1806 }
1807
1808 /*
1809  * The permission has been checked when obj created, no need check again.
1810  */
1811 static int mdd_cd_sanity_check(const struct lu_env *env,
1812                                struct mdd_object *obj)
1813 {
1814         ENTRY;
1815
1816         /* EEXIST check */
1817         if (!obj || mdd_is_dead_obj(obj))
1818                 RETURN(-ENOENT);
1819
1820         RETURN(0);
1821 }
1822
1823 static int mdd_create_data(const struct lu_env *env,
1824                            struct md_object *pobj,
1825                            struct md_object *cobj,
1826                            const struct md_op_spec *spec,
1827                            struct md_attr *ma)
1828 {
1829         struct mdd_device *mdd = mdo2mdd(cobj);
1830         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1831         struct mdd_object *son = md2mdd_obj(cobj);
1832         struct thandle    *handle;
1833         const struct lu_buf *buf;
1834         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1835         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1836         int                rc;
1837         ENTRY;
1838
1839         rc = mdd_cd_sanity_check(env, son);
1840         if (rc)
1841                 RETURN(rc);
1842
1843         if (!md_should_create(spec->sp_cr_flags))
1844                 RETURN(0);
1845
1846         /*
1847          * there are following use cases for this function:
1848          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1849          *    striping can be specified or not
1850          * 2) CMD?
1851          */
1852         rc = mdd_la_get(env, son, attr);
1853         if (rc)
1854                 RETURN(rc);
1855
1856         /* calling ->ah_make_hint() is used to transfer information from parent */
1857         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1858
1859         handle = mdd_trans_create(env, mdd);
1860         if (IS_ERR(handle))
1861                 GOTO(out_free, rc = PTR_ERR(handle));
1862
1863         /*
1864          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1865          * Should this be fixed?
1866          */
1867         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1868                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1869                spec->sp_cr_flags, spec->no_create);
1870
1871         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1872                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1873                                         spec->u.sp_ea.eadatalen);
1874         } else {
1875                 buf = &LU_BUF_NULL;
1876         }
1877
1878         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1879                                   XATTR_NAME_LOV, 0, handle);
1880         if (rc)
1881                 GOTO(stop, rc);
1882
1883         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1884                                          handle);
1885         if (rc)
1886                 GOTO(stop, rc);
1887
1888         rc = mdd_trans_start(env, mdd, handle);
1889         if (rc)
1890                 GOTO(stop, rc);
1891
1892         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1893                           0, handle);
1894
1895         if (rc)
1896                 GOTO(stop, rc);
1897
1898         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1899
1900 stop:
1901         rc = mdd_trans_stop(env, mdd, rc, handle);
1902
1903 out_free:
1904         RETURN(rc);
1905 }
1906
1907 static int mdd_declare_object_initialize(const struct lu_env *env,
1908                                          struct mdd_object *parent,
1909                                          struct mdd_object *child,
1910                                          const struct lu_attr *attr,
1911                                          struct thandle *handle)
1912 {
1913         int rc;
1914         ENTRY;
1915
1916         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1917         if (!S_ISDIR(attr->la_mode))
1918                 RETURN(0);
1919
1920         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1921                                       dot, handle);
1922         if (rc != 0)
1923                 RETURN(rc);
1924
1925         rc = mdo_declare_ref_add(env, child, handle);
1926         if (rc != 0)
1927                 RETURN(rc);
1928
1929         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1930                                       dotdot, handle);
1931
1932         RETURN(rc);
1933 }
1934
1935 static int mdd_object_initialize(const struct lu_env *env,
1936                                  const struct lu_fid *pfid,
1937                                  struct mdd_object *child,
1938                                  struct lu_attr *attr,
1939                                  struct thandle *handle)
1940 {
1941         int rc = 0;
1942         ENTRY;
1943
1944         if (S_ISDIR(attr->la_mode)) {
1945                 /* Add "." and ".." for newly created dir */
1946                 mdo_ref_add(env, child, handle);
1947                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1948                                              S_IFDIR, dot, handle);
1949                 if (rc == 0)
1950                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1951                                                      dotdot, handle);
1952                 if (rc != 0)
1953                         mdo_ref_del(env, child, handle);
1954         }
1955
1956         RETURN(rc);
1957 }
1958
1959 /**
1960  * This function checks whether it can create a file/dir under the
1961  * directory(@pobj). The directory(@pobj) is not being locked by
1962  * mdd lock.
1963  *
1964  * \param[in] env       execution environment
1965  * \param[in] pobj      the directory to create files
1966  * \param[in] pattr     the attributes of the directory
1967  * \param[in] lname     the name of the created file/dir
1968  * \param[in] cattr     the attributes of the file/dir
1969  * \param[in] spec      create specification
1970  *
1971  * \retval              = 0 it is allowed to create file/dir under
1972  *                      the directory
1973  * \retval              negative error not allowed to create file/dir
1974  *                      under the directory
1975  */
1976 static int mdd_create_sanity_check(const struct lu_env *env,
1977                                    struct md_object *pobj,
1978                                    const struct lu_attr *pattr,
1979                                    const struct lu_name *lname,
1980                                    struct lu_attr *cattr,
1981                                    struct md_op_spec *spec)
1982 {
1983         struct mdd_thread_info *info = mdd_env_info(env);
1984         struct lu_fid     *fid       = &info->mti_fid;
1985         struct mdd_object *obj       = md2mdd_obj(pobj);
1986         struct mdd_device *m         = mdo2mdd(pobj);
1987         bool            check_perm = true;
1988         int rc;
1989         ENTRY;
1990
1991         /* EEXIST check */
1992         if (mdd_is_dead_obj(obj))
1993                 RETURN(-ENOENT);
1994
1995         /*
1996          * In some cases this lookup is not needed - we know before if name
1997          * exists or not because MDT performs lookup for it.
1998          * name length check is done in lookup.
1999          */
2000         if (spec->sp_cr_lookup) {
2001                 /*
2002                  * Check if the name already exist, though it will be checked in
2003                  * _index_insert also, for avoiding rolling back if exists
2004                  * _index_insert.
2005                  */
2006                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2007                                   MAY_WRITE | MAY_EXEC);
2008                 if (rc != -ENOENT)
2009                         RETURN(rc ? : -EEXIST);
2010
2011                 /* Permission is already being checked in mdd_lookup */
2012                 check_perm = false;
2013         }
2014
2015         if (S_ISDIR(cattr->la_mode) &&
2016             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2017             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2018                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2019
2020                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2021                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2022                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2023                         rc = -EINVAL;
2024                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2025                                "stripe_offset = %d, stripe_count = %u: "
2026                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2027                                 le32_to_cpu(lum->lum_magic),
2028                                (int)le32_to_cpu(lum->lum_stripe_offset),
2029                                le32_to_cpu(lum->lum_stripe_count), rc);
2030                         return rc;
2031                 }
2032         }
2033
2034         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2035         if (rc != 0)
2036                 RETURN(rc);
2037
2038         /* sgid check */
2039         if (pattr->la_mode & S_ISGID) {
2040                 cattr->la_gid = pattr->la_gid;
2041                 if (S_ISDIR(cattr->la_mode)) {
2042                         cattr->la_mode |= S_ISGID;
2043                         cattr->la_valid |= LA_MODE;
2044                 }
2045         }
2046
2047         /* Inherit project ID from parent directory */
2048         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2049                 cattr->la_projid = pattr->la_projid;
2050                 if (S_ISDIR(cattr->la_mode)) {
2051                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2052                         cattr->la_valid |= LA_FLAGS;
2053                 }
2054                 cattr->la_valid |= LA_PROJID;
2055         }
2056
2057         rc = mdd_name_check(m, lname);
2058         if (rc < 0)
2059                 RETURN(rc);
2060
2061         switch (cattr->la_mode & S_IFMT) {
2062         case S_IFLNK: {
2063                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2064
2065                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2066                         RETURN(-ENAMETOOLONG);
2067                 else
2068                         RETURN(0);
2069         }
2070         case S_IFDIR:
2071         case S_IFREG:
2072         case S_IFCHR:
2073         case S_IFBLK:
2074         case S_IFIFO:
2075         case S_IFSOCK:
2076                 rc = 0;
2077                 break;
2078         default:
2079                 rc = -EINVAL;
2080                 break;
2081         }
2082         RETURN(rc);
2083 }
2084
2085 static int mdd_declare_create_object(const struct lu_env *env,
2086                                      struct mdd_device *mdd,
2087                                      struct mdd_object *p, struct mdd_object *c,
2088                                      struct lu_attr *attr,
2089                                      struct thandle *handle,
2090                                      const struct md_op_spec *spec,
2091                                      struct lu_buf *def_acl_buf,
2092                                      struct lu_buf *acl_buf,
2093                                      struct dt_allocation_hint *hint)
2094 {
2095         const struct lu_buf *buf;
2096         int rc;
2097
2098         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2099                                                 hint);
2100         if (rc)
2101                 GOTO(out, rc);
2102
2103 #ifdef CONFIG_FS_POSIX_ACL
2104         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2105                 /* if dir, then can inherit default ACl */
2106                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2107                                            XATTR_NAME_ACL_DEFAULT,
2108                                            0, handle);
2109                 if (rc)
2110                         GOTO(out, rc);
2111         }
2112
2113         if (acl_buf && acl_buf->lb_len > 0) {
2114                 rc = mdo_declare_attr_set(env, c, attr, handle);
2115                 if (rc)
2116                         GOTO(out, rc);
2117
2118                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2119                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2120                 if (rc)
2121                         GOTO(out, rc);
2122         }
2123 #endif
2124         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2125         if (rc)
2126                 GOTO(out, rc);
2127
2128         /* replay case, create LOV EA from client data */
2129         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2130             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2131                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2132                                         spec->u.sp_ea.eadatalen);
2133                 rc = mdo_declare_xattr_set(env, c, buf,
2134                                            S_ISDIR(attr->la_mode) ?
2135                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2136                                            0, handle);
2137                 if (rc)
2138                         GOTO(out, rc);
2139         }
2140
2141         if (S_ISLNK(attr->la_mode)) {
2142                 const char *target_name = spec->u.sp_symname;
2143                 int sym_len = strlen(target_name);
2144                 const struct lu_buf *buf;
2145
2146                 buf = mdd_buf_get_const(env, target_name, sym_len);
2147                 rc = dt_declare_record_write(env, mdd_object_child(c),
2148                                              buf, 0, handle);
2149                 if (rc)
2150                         GOTO(out, rc);
2151         }
2152
2153         if (spec->sp_cr_file_secctx_name != NULL) {
2154                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2155                                         spec->sp_cr_file_secctx_size);
2156                 rc = mdo_declare_xattr_set(env, c, buf,
2157                                            spec->sp_cr_file_secctx_name, 0,
2158                                            handle);
2159                 if (rc < 0)
2160                         GOTO(out, rc);
2161         }
2162 out:
2163         return rc;
2164 }
2165
2166 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2167                               struct mdd_object *p, struct mdd_object *c,
2168                               const struct lu_name *name,
2169                               struct lu_attr *attr,
2170                               struct thandle *handle,
2171                               const struct md_op_spec *spec,
2172                               struct linkea_data *ldata,
2173                               struct lu_buf *def_acl_buf,
2174                               struct lu_buf *acl_buf,
2175                               struct dt_allocation_hint *hint)
2176 {
2177         int rc;
2178
2179         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2180                                        def_acl_buf, acl_buf, hint);
2181         if (rc)
2182                 GOTO(out, rc);
2183
2184         if (S_ISDIR(attr->la_mode)) {
2185                 rc = mdo_declare_ref_add(env, p, handle);
2186                 if (rc)
2187                         GOTO(out, rc);
2188         }
2189
2190         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2191                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2192                 if (rc)
2193                         GOTO(out, rc);
2194         } else {
2195                 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
2196                 enum changelog_rec_type type;
2197
2198                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2199                                               name->ln_name, handle);
2200                 if (rc != 0)
2201                         return rc;
2202
2203                 rc = mdd_declare_links_add(env, c, handle, ldata);
2204                 if (rc)
2205                         return rc;
2206
2207                 *la = *attr;
2208                 la->la_valid = LA_CTIME | LA_MTIME;
2209                 rc = mdo_declare_attr_set(env, p, la, handle);
2210                 if (rc)
2211                         return rc;
2212
2213                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2214                        S_ISREG(attr->la_mode) ? CL_CREATE :
2215                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2216
2217                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2218                                                  handle);
2219                 if (rc)
2220                         return rc;
2221         }
2222 out:
2223         return rc;
2224 }
2225
2226 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2227                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2228                         struct lu_buf *acl_buf)
2229 {
2230         int     rc;
2231         ENTRY;
2232
2233         if (S_ISLNK(la->la_mode)) {
2234                 acl_buf->lb_len = 0;
2235                 def_acl_buf->lb_len = 0;
2236                 RETURN(0);
2237         }
2238
2239         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2240         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2241                            XATTR_NAME_ACL_DEFAULT);
2242         mdd_read_unlock(env, pobj);
2243         if (rc > 0) {
2244                 /* If there are default ACL, fix mode/ACL by default ACL */
2245                 def_acl_buf->lb_len = rc;
2246                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2247                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2248                 acl_buf->lb_len = rc;
2249                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2250                 if (rc < 0)
2251                         RETURN(rc);
2252         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2253                 /* If there are no default ACL, fix mode by mask */
2254                 struct lu_ucred *uc = lu_ucred(env);
2255
2256                 /* The create triggered by MDT internal events, such as
2257                  * LFSCK reset, will not contain valid "uc". */
2258                 if (unlikely(uc != NULL))
2259                         la->la_mode &= ~uc->uc_umask;
2260                 rc = 0;
2261                 acl_buf->lb_len = 0;
2262                 def_acl_buf->lb_len = 0;
2263         }
2264
2265         RETURN(rc);
2266 }
2267
2268 /**
2269  * Create a metadata object and initialize it, set acl, xattr.
2270  **/
2271 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2272                              struct mdd_object *son, struct lu_attr *attr,
2273                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2274                              struct lu_buf *def_acl_buf,
2275                              struct dt_allocation_hint *hint,
2276                              struct thandle *handle)
2277 {
2278         const struct lu_buf *buf;
2279         int rc;
2280
2281         mdd_write_lock(env, son, MOR_TGT_CHILD);
2282         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2283                                         hint);
2284         if (rc)
2285                 GOTO(unlock, rc);
2286
2287         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2288          * created in declare phase, they also needs to be added to master
2289          * object as sub-directory entry. So it has to initialize the master
2290          * object, then set dir striped EA.(in mdo_xattr_set) */
2291         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
2292         if (rc != 0)
2293                 GOTO(err_destroy, rc);
2294
2295         /*
2296          * in case of replay we just set LOVEA provided by the client
2297          * XXX: I think it would be interesting to try "old" way where
2298          *      MDT calls this xattr_set(LOV) in a different transaction.
2299          *      probably this way we code can be made better.
2300          */
2301
2302         /* During creation, there are only a few cases we need do xattr_set to
2303          * create stripes.
2304          * 1. regular file: see comments above.
2305          * 2. dir: inherit default striping or pool settings from parent.
2306          * 3. create striped directory with provided stripeEA.
2307          * 4. create striped directory because inherit default layout from the
2308          * parent.
2309          */
2310         if (spec->no_create ||
2311             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2312             S_ISDIR(attr->la_mode)) {
2313                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2314                                         spec->u.sp_ea.eadatalen);
2315                 rc = mdo_xattr_set(env, son, buf,
2316                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2317                                                             XATTR_NAME_LOV,
2318                                    0, handle);
2319                 if (rc != 0)
2320                         GOTO(err_destroy, rc);
2321         }
2322
2323 #ifdef CONFIG_FS_POSIX_ACL
2324         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2325             S_ISDIR(attr->la_mode)) {
2326                 /* set default acl */
2327                 rc = mdo_xattr_set(env, son, def_acl_buf,
2328                                    XATTR_NAME_ACL_DEFAULT, 0,
2329                                    handle);
2330                 if (rc)
2331                         GOTO(err_destroy, rc);
2332         }
2333         /* set its own acl */
2334         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2335                 rc = mdo_xattr_set(env, son, acl_buf,
2336                                    XATTR_NAME_ACL_ACCESS,
2337                                    0, handle);
2338                 if (rc)
2339                         GOTO(err_destroy, rc);
2340         }
2341 #endif
2342
2343         if (S_ISLNK(attr->la_mode)) {
2344                 struct dt_object *dt = mdd_object_child(son);
2345                 const char *target_name = spec->u.sp_symname;
2346                 int sym_len = strlen(target_name);
2347                 loff_t pos = 0;
2348
2349                 buf = mdd_buf_get_const(env, target_name, sym_len);
2350                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
2351                 if (rc == sym_len)
2352                         rc = 0;
2353                 else
2354                         GOTO(err_initlized, rc = -EFAULT);
2355         }
2356
2357         if (spec->sp_cr_file_secctx_name != NULL) {
2358                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2359                                         spec->sp_cr_file_secctx_size);
2360                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2361                                    0, handle);
2362                 if (rc < 0)
2363                         GOTO(err_initlized, rc);
2364         }
2365
2366 err_initlized:
2367         if (unlikely(rc != 0)) {
2368                 int rc2;
2369                 if (S_ISDIR(attr->la_mode)) {
2370                         /* Drop the reference, no need to delete "."/"..",
2371                          * because the object to be destroied directly. */
2372                         rc2 = mdo_ref_del(env, son, handle);
2373                         if (rc2 != 0)
2374                                 GOTO(unlock, rc);
2375                 }
2376                 rc2 = mdo_ref_del(env, son, handle);
2377                 if (rc2 != 0)
2378                         GOTO(unlock, rc);
2379 err_destroy:
2380                 mdo_destroy(env, son, handle);
2381         }
2382 unlock:
2383         mdd_write_unlock(env, son);
2384         RETURN(rc);
2385 }
2386
2387 static int mdd_index_delete(const struct lu_env *env,
2388                             struct mdd_object *mdd_pobj,
2389                             struct lu_attr *cattr,
2390                             const struct lu_name *lname)
2391 {
2392         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2393         struct thandle *handle;
2394         int rc;
2395         ENTRY;
2396
2397         handle = mdd_trans_create(env, mdd);
2398         if (IS_ERR(handle))
2399                 RETURN(PTR_ERR(handle));
2400
2401         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2402                                       handle);
2403         if (rc != 0)
2404                 GOTO(stop, rc);
2405
2406         if (S_ISDIR(cattr->la_mode)) {
2407                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2408                 if (rc != 0)
2409                         GOTO(stop, rc);
2410         }
2411
2412         /* Since this will only be used in the error handler path,
2413          * Let's set the thandle to be local and not mess the transno */
2414         handle->th_local = 1;
2415         rc = mdd_trans_start(env, mdd, handle);
2416         if (rc)
2417                 GOTO(stop, rc);
2418
2419         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2420                                 S_ISDIR(cattr->la_mode), handle);
2421         if (rc)
2422                 GOTO(stop, rc);
2423 stop:
2424         rc = mdd_trans_stop(env, mdd, rc, handle);
2425
2426         RETURN(rc);
2427 }
2428
2429 /**
2430  * Create object and insert it into namespace.
2431  *
2432  * Two operations have to be performed:
2433  *
2434  *  - an allocation of a new object (->do_create()), and
2435  *  - an insertion into a parent index (->dio_insert()).
2436  *
2437  * Due to locking, operation order is not important, when both are
2438  * successful, *but* error handling cases are quite different:
2439  *
2440  *  - if insertion is done first, and following object creation fails,
2441  *  insertion has to be rolled back, but this operation might fail
2442  *  also leaving us with dangling index entry.
2443  *
2444  *  - if creation is done first, is has to be undone if insertion fails,
2445  *  leaving us with leaked space, which is not good but not fatal.
2446  *
2447  * It seems that creation-first is simplest solution, but it is sub-optimal
2448  * in the frequent
2449  *
2450  * $ mkdir foo
2451  * $ mkdir foo
2452  *
2453  * case, because second mkdir is bound to create object, only to
2454  * destroy it immediately.
2455  *
2456  * To avoid this follow local file systems that do double lookup:
2457  *
2458  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2459  * 1. create            (mdd_create_object_internal())
2460  * 2. insert            (__mdd_index_insert(), lookup again)
2461  *
2462  * \param[in] pobj      parent object
2463  * \param[in] lname     name of child being created
2464  * \param[in,out] child child object being created
2465  * \param[in] spec      additional create parameters
2466  * \param[in] ma        attributes for new child object
2467  *
2468  * \retval              0 on success
2469  * \retval              negative errno on failure
2470  */
2471 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2472                       const struct lu_name *lname, struct md_object *child,
2473                       struct md_op_spec *spec, struct md_attr *ma)
2474 {
2475         struct mdd_thread_info  *info = mdd_env_info(env);
2476         struct lu_attr          *la = &info->mti_la_for_fix;
2477         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2478         struct mdd_object       *son = md2mdd_obj(child);
2479         struct mdd_device       *mdd = mdo2mdd(pobj);
2480         struct lu_attr          *attr = &ma->ma_attr;
2481         struct thandle          *handle;
2482         struct lu_attr          *pattr = &info->mti_pattr;
2483         struct lu_buf           acl_buf;
2484         struct lu_buf           def_acl_buf;
2485         struct linkea_data      *ldata = &info->mti_link_data;
2486         const char              *name = lname->ln_name;
2487         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2488         int                      rc;
2489         int                      rc2;
2490         ENTRY;
2491
2492         rc = mdd_la_get(env, mdd_pobj, pattr);
2493         if (rc != 0)
2494                 RETURN(rc);
2495
2496         /* Sanity checks before big job. */
2497         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2498         if (rc)
2499                 RETURN(rc);
2500
2501         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2502                 GOTO(out_free, rc = -EINPROGRESS);
2503
2504         handle = mdd_trans_create(env, mdd);
2505         if (IS_ERR(handle))
2506                 GOTO(out_free, rc = PTR_ERR(handle));
2507
2508         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2509                                mdd->mdd_dt_conf.ddp_max_ea_size);
2510         acl_buf = info->mti_xattr_buf;
2511         def_acl_buf.lb_buf = info->mti_key;
2512         def_acl_buf.lb_len = sizeof(info->mti_key);
2513         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2514         if (rc < 0)
2515                 GOTO(out_stop, rc);
2516
2517         if (S_ISDIR(attr->la_mode)) {
2518                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
2519
2520                 /*
2521                  * migrate may create 1-stripe directory, so lod_ah_init()
2522                  * doesn't adjust stripe count from lmu.
2523                  */
2524                 if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
2525                         info->mti_lmu = *lmu;
2526                         info->mti_lmu.lum_stripe_count = 0;
2527                         spec->u.sp_ea.eadata = &info->mti_lmu;
2528                 }
2529         }
2530
2531         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2532
2533         memset(ldata, 0, sizeof(*ldata));
2534         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2535                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2536
2537                 tfid.f_oid--;
2538                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2539                                         &tfid, lname, 1, 0, ldata);
2540         } else {
2541                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2542                                         mdd_object_fid(mdd_pobj),
2543                                         lname, 1, 0, ldata);
2544         }
2545
2546         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2547                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2548                                 hint);
2549         if (rc)
2550                 GOTO(out_stop, rc);
2551
2552         rc = mdd_trans_start(env, mdd, handle);
2553         if (rc)
2554                 GOTO(out_stop, rc);
2555
2556         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2557                                &def_acl_buf, hint, handle);
2558         if (rc != 0)
2559                 GOTO(out_stop, rc);
2560
2561         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2562                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2563                 son->mod_flags |= VOLATILE_OBJ;
2564                 rc = mdd_orphan_insert(env, son, handle);
2565                 GOTO(out_volatile, rc);
2566         } else {
2567                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2568                                       attr->la_mode, name, handle);
2569                 if (rc != 0)
2570                         GOTO(err_created, rc);
2571
2572                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2573                               ldata, 1);
2574
2575                 /* update parent directory mtime/ctime */
2576                 *la = *attr;
2577                 la->la_valid = LA_CTIME | LA_MTIME;
2578                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2579                 if (rc)
2580                         GOTO(err_insert, rc);
2581         }
2582
2583         EXIT;
2584 err_insert:
2585         if (rc != 0) {
2586                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2587                         rc2 = mdd_orphan_delete(env, son, handle);
2588                 else
2589                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2590                                                  S_ISDIR(attr->la_mode),
2591                                                  handle);
2592                 if (rc2 != 0)
2593                         goto out_stop;
2594
2595 err_created:
2596                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2597                 if (S_ISDIR(attr->la_mode)) {
2598                         /* Drop the reference, no need to delete "."/"..",
2599                          * because the object is to be destroyed directly. */
2600                         rc2 = mdo_ref_del(env, son, handle);
2601                         if (rc2 != 0) {
2602                                 mdd_write_unlock(env, son);
2603                                 goto out_stop;
2604                         }
2605                 }
2606 out_volatile:
2607                 /* For volatile files drop one link immediately, since there is
2608                  * no filename in the namespace, and save any error returned. */
2609                 rc2 = mdo_ref_del(env, son, handle);
2610                 if (rc2 != 0) {
2611                         mdd_write_unlock(env, son);
2612                         if (unlikely(rc == 0))
2613                                 rc = rc2;
2614                         goto out_stop;
2615                 }
2616
2617                 /* Don't destroy the volatile object on success */
2618                 if (likely(rc != 0))
2619                         mdo_destroy(env, son, handle);
2620                 mdd_write_unlock(env, son);
2621         }
2622
2623         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2624             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2625                 rc = mdd_changelog_ns_store(env, mdd,
2626                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2627                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2628                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2629                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2630                                 NULL, handle);
2631 out_stop:
2632         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2633         if (rc == 0) {
2634                 /* If creation fails, it is most likely due to the remote update
2635                  * failure, because local transaction will mostly succeed at
2636                  * this stage. There is no easy way to rollback all of previous
2637                  * updates, so let's remove the object from namespace, and
2638                  * LFSCK should handle the orphan object. */
2639                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2640                         mdd_index_delete(env, mdd_pobj, attr, lname);
2641                 rc = rc2;
2642         }
2643 out_free:
2644         if (is_vmalloc_addr(ldata->ld_buf))
2645                 /* if we vmalloced a large buffer drop it */
2646                 lu_buf_free(ldata->ld_buf);
2647
2648         /* The child object shouldn't be cached anymore */
2649         if (rc)
2650                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2651                         &child->mo_lu.lo_header->loh_flags);
2652         return rc;
2653 }
2654
2655 /*
2656  * Get locks on parents in proper order
2657  * RETURN: < 0 - error, rename_order if successful
2658  */
2659 enum rename_order {
2660         MDD_RN_SAME,
2661         MDD_RN_SRCTGT,
2662         MDD_RN_TGTSRC
2663 };
2664
2665 static int mdd_rename_order(const struct lu_env *env,
2666                             struct mdd_device *mdd,
2667                             struct mdd_object *src_pobj,
2668                             const struct lu_attr *pattr,
2669                             struct mdd_object *tgt_pobj)
2670 {
2671         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2672         int rc;
2673         ENTRY;
2674
2675         if (src_pobj == tgt_pobj)
2676                 RETURN(MDD_RN_SAME);
2677
2678         /* compared the parent child relationship of src_p&tgt_p */
2679         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2680                 rc = MDD_RN_SRCTGT;
2681         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2682                 rc = MDD_RN_TGTSRC;
2683         } else {
2684                 rc = mdd_is_parent(env, mdd, src_pobj, pattr,
2685                                    mdo2fid(tgt_pobj));
2686                 if (rc == -EREMOTE)
2687                         rc = 0;
2688
2689                 if (rc == 1)
2690                         rc = MDD_RN_TGTSRC;
2691                 else if (rc == 0)
2692                         rc = MDD_RN_SRCTGT;
2693         }
2694
2695         RETURN(rc);
2696 }
2697
2698 /* has not mdd_write{read}_lock on any obj yet. */
2699 static int mdd_rename_sanity_check(const struct lu_env *env,
2700                                    struct mdd_object *src_pobj,
2701                                    const struct lu_attr *pattr,
2702                                    struct mdd_object *tgt_pobj,
2703                                    const struct lu_attr *tpattr,
2704                                    struct mdd_object *sobj,
2705                                    const struct lu_attr *cattr,
2706                                    struct mdd_object *tobj,
2707                                    const struct lu_attr *tattr)
2708 {
2709         int rc = 0;
2710         ENTRY;
2711
2712         /* XXX: when get here, sobj must NOT be NULL,
2713          * the other case has been processed in cld_rename
2714          * before mdd_rename and enable MDS_PERM_BYPASS. */
2715         LASSERT(sobj);
2716
2717         /*
2718          * If we are using project inheritance, we only allow renames
2719          * into our tree when the project IDs are the same; otherwise
2720          * tree quota mechanism would be circumvented.
2721          */
2722         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2723             tpattr->la_projid != cattr->la_projid) ||
2724             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2725             (pattr->la_projid != tpattr->la_projid)))
2726                 RETURN(-EXDEV);
2727
2728         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2729         if (rc)
2730                 RETURN(rc);
2731
2732         /* XXX: when get here, "tobj == NULL" means tobj must
2733          * NOT exist (neither on remote MDS, such case has been
2734          * processed in cld_rename before mdd_rename and enable
2735          * MDS_PERM_BYPASS).
2736          * So check may_create, but not check may_unlink. */
2737         if (tobj == NULL)
2738                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2739                                     (src_pobj != tgt_pobj));
2740         else
2741                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2742                                     (src_pobj != tgt_pobj), 1);
2743
2744         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2745                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2746
2747         RETURN(rc);
2748 }
2749
2750 static int mdd_declare_rename(const struct lu_env *env,
2751                               struct mdd_device *mdd,
2752                               struct mdd_object *mdd_spobj,
2753                               struct mdd_object *mdd_tpobj,
2754                               struct mdd_object *mdd_sobj,
2755                               struct mdd_object *mdd_tobj,
2756                               const struct lu_name *sname,
2757                               const struct lu_name *tname,
2758                               struct md_attr *ma,
2759                               struct linkea_data *ldata,
2760                               struct thandle *handle)
2761 {
2762         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2763         int rc;
2764
2765         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2766         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2767
2768         LASSERT(mdd_spobj);
2769         LASSERT(mdd_tpobj);
2770         LASSERT(mdd_sobj);
2771
2772         /* name from source dir */
2773         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2774         if (rc)
2775                 return rc;
2776
2777         /* .. from source child */
2778         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2779                 /* source child can be directory,
2780                  * counted by source dir's nlink */
2781                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2782                 if (rc)
2783                         return rc;
2784                 if (mdd_spobj != mdd_tpobj) {
2785                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2786                                                       handle);
2787                         if (rc != 0)
2788                                 return rc;
2789
2790                         rc = mdo_declare_index_insert(env, mdd_sobj,
2791                                                       mdo2fid(mdd_tpobj),
2792                                                       S_IFDIR, dotdot, handle);
2793                         if (rc != 0)
2794                                 return rc;
2795                 }
2796
2797                 /* new target child can be directory,
2798                  * counted by target dir's nlink */
2799                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2800                 if (rc != 0)
2801                         return rc;
2802         }
2803
2804         la->la_valid = LA_CTIME | LA_MTIME;
2805         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2806         if (rc != 0)
2807                 return rc;
2808
2809         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2810         if (rc != 0)
2811                 return rc;
2812
2813         la->la_valid = LA_CTIME;
2814         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2815         if (rc)
2816                 return rc;
2817
2818         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2819         if (rc)
2820                 return rc;
2821
2822         /* new name */
2823         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2824                                       mdd_object_type(mdd_sobj),
2825                                       tname->ln_name, handle);
2826         if (rc != 0)
2827                 return rc;
2828
2829         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2830                 /* delete target child in target parent directory */
2831                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2832                                               handle);
2833                 if (rc)
2834                         return rc;
2835
2836                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2837                 if (rc)
2838                         return rc;
2839
2840                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2841                         /* target child can be directory,
2842                          * delete "." reference in target child directory */
2843                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2844                         if (rc)
2845                                 return rc;
2846
2847                         /* delete ".." reference in target parent directory */
2848                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2849                         if (rc)
2850                                 return rc;
2851                 }
2852
2853                 la->la_valid = LA_CTIME;
2854                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2855                 if (rc)
2856                         return rc;
2857
2858                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2859                 if (rc)
2860                         return rc;
2861         }
2862
2863         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2864                                          handle);
2865         if (rc)
2866                 return rc;
2867
2868         return rc;
2869 }
2870
2871 /* src object can be remote that is why we use only fid and type of object */
2872 static int mdd_rename(const struct lu_env *env,
2873                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2874                       const struct lu_fid *lf, const struct lu_name *lsname,
2875                       struct md_object *tobj, const struct lu_name *ltname,
2876                       struct md_attr *ma)
2877 {
2878         const char *sname = lsname->ln_name;
2879         const char *tname = ltname->ln_name;
2880         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2881         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2882         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2883         struct mdd_device *mdd = mdo2mdd(src_pobj);
2884         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2885         struct mdd_object *mdd_tobj = NULL;
2886         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2887         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2888         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2889         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2890         struct thandle *handle;
2891         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2892         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2893         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2894         bool is_dir;
2895         bool tobj_ref = 0;
2896         bool tobj_locked = 0;
2897         unsigned cl_flags = 0;
2898         int rc, rc2;
2899         ENTRY;
2900
2901         /* let unlink to complete and commit */
2902         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
2903
2904         if (tobj)
2905                 mdd_tobj = md2mdd_obj(tobj);
2906
2907         mdd_sobj = mdd_object_find(env, mdd, lf);
2908         if (IS_ERR(mdd_sobj))
2909                 RETURN(PTR_ERR(mdd_sobj));
2910
2911         rc = mdd_la_get(env, mdd_sobj, cattr);
2912         if (rc)
2913                 GOTO(out_pending, rc);
2914
2915         rc = mdd_la_get(env, mdd_spobj, pattr);
2916         if (rc)
2917                 GOTO(out_pending, rc);
2918
2919         if (mdd_tobj) {
2920                 rc = mdd_la_get(env, mdd_tobj, tattr);
2921                 if (rc)
2922                         GOTO(out_pending, rc);
2923                 /* search for an existing archive.
2924                  * we should check ahead as the object
2925                  * can be destroyed in this transaction */
2926                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2927                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2928         }
2929
2930         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2931         if (rc)
2932                 GOTO(out_pending, rc);
2933
2934         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2935                                      mdd_sobj, cattr, mdd_tobj, tattr);
2936         if (rc)
2937                 GOTO(out_pending, rc);
2938
2939         rc = mdd_name_check(mdd, ltname);
2940         if (rc < 0)
2941                 GOTO(out_pending, rc);
2942
2943         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2944         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2945         if (rc < 0)
2946                 GOTO(out_pending, rc);
2947
2948         handle = mdd_trans_create(env, mdd);
2949         if (IS_ERR(handle))
2950                 GOTO(out_pending, rc = PTR_ERR(handle));
2951
2952         memset(ldata, 0, sizeof(*ldata));
2953         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2954                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2955                                 1, 0, ldata);
2956         if (rc)
2957                 GOTO(stop, rc);
2958
2959         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2960                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2961         if (rc)
2962                 GOTO(stop, rc);
2963
2964         rc = mdd_trans_start(env, mdd, handle);
2965         if (rc)
2966                 GOTO(stop, rc);
2967
2968         is_dir = S_ISDIR(cattr->la_mode);
2969
2970         /* Remove source name from source directory */
2971         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2972         if (rc != 0)
2973                 GOTO(stop, rc);
2974
2975         /* "mv dir1 dir2" needs "dir1/.." link update */
2976         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2977                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2978                 if (rc != 0)
2979                         GOTO(fixup_spobj2, rc);
2980
2981                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2982                                              dotdot, handle);
2983                 if (rc != 0)
2984                         GOTO(fixup_spobj, rc);
2985         }
2986
2987         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2988                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2989                 if (rc != 0)
2990                         /* tname might been renamed to something else */
2991                         GOTO(fixup_spobj, rc);
2992         }
2993
2994         /* Insert new fid with target name into target dir */
2995         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2996                                 tname, handle);
2997         if (rc != 0)
2998                 GOTO(fixup_tpobj, rc);
2999
3000         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3001         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3002
3003         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3004         la->la_valid = LA_CTIME;
3005         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3006         if (rc)
3007                 GOTO(fixup_tpobj, rc);
3008
3009         /* Update the linkEA for the source object */
3010         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3011         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3012                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3013                               0, 0);
3014         if (rc == -ENOENT)
3015                 /* Old files might not have EA entry */
3016                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3017                               lsname, handle, NULL, 0);
3018         mdd_write_unlock(env, mdd_sobj);
3019         /* We don't fail the transaction if the link ea can't be
3020            updated -- fid2path will use alternate lookup method. */
3021         rc = 0;
3022
3023         /* Remove old target object
3024          * For tobj is remote case cmm layer has processed
3025          * and set tobj to NULL then. So when tobj is NOT NULL,
3026          * it must be local one.
3027          */
3028         if (tobj && mdd_object_exists(mdd_tobj)) {
3029                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3030                 tobj_locked = 1;
3031                 if (mdd_is_dead_obj(mdd_tobj)) {
3032                         /* shld not be dead, something is wrong */
3033                         CERROR("tobj is dead, something is wrong\n");
3034                         rc = -EINVAL;
3035                         goto cleanup;
3036                 }
3037                 mdo_ref_del(env, mdd_tobj, handle);
3038
3039                 /* Remove dot reference. */
3040                 if (S_ISDIR(tattr->la_mode))
3041                         mdo_ref_del(env, mdd_tobj, handle);
3042                 tobj_ref = 1;
3043
3044                 /* fetch updated nlink */
3045                 rc = mdd_la_get(env, mdd_tobj, tattr);
3046                 if (rc != 0) {
3047                         CERROR("%s: Failed to get nlink for tobj "
3048                                 DFID": rc = %d\n",
3049                                 mdd2obd_dev(mdd)->obd_name,
3050                                 PFID(tpobj_fid), rc);
3051                         GOTO(fixup_tpobj, rc);
3052                 }
3053
3054                 la->la_valid = LA_CTIME;
3055                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3056                 if (rc != 0) {
3057                         CERROR("%s: Failed to set ctime for tobj "
3058                                 DFID": rc = %d\n",
3059                                 mdd2obd_dev(mdd)->obd_name,
3060                                 PFID(tpobj_fid), rc);
3061                         GOTO(fixup_tpobj, rc);
3062                 }
3063
3064                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3065                 ma->ma_attr = *tattr;
3066                 ma->ma_valid |= MA_INODE;
3067                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3068                                        handle);
3069                 if (rc != 0) {
3070                         CERROR("%s: Failed to unlink tobj "
3071                                 DFID": rc = %d\n",
3072                                 mdd2obd_dev(mdd)->obd_name,
3073                                 PFID(tpobj_fid), rc);
3074                         GOTO(fixup_tpobj, rc);
3075                 }
3076
3077                 /* fetch updated nlink */
3078                 rc = mdd_la_get(env, mdd_tobj, tattr);
3079                 if (rc == -ENOENT) {
3080                         /* the object got removed, let's
3081                          * return the latest known attributes */
3082                         tattr->la_nlink = 0;
3083                         rc = 0;
3084                 } else if (rc != 0) {
3085                         CERROR("%s: Failed to get nlink for tobj "
3086                                 DFID": rc = %d\n",
3087                                 mdd2obd_dev(mdd)->obd_name,
3088                                 PFID(tpobj_fid), rc);
3089                         GOTO(fixup_tpobj, rc);
3090                 }
3091                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3092                 ma->ma_attr = *tattr;
3093                 ma->ma_valid |= MA_INODE;
3094
3095                 if (tattr->la_nlink == 0)
3096                         cl_flags |= CLF_RENAME_LAST;
3097                 else
3098                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3099         }
3100
3101         la->la_valid = LA_CTIME | LA_MTIME;
3102         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3103         if (rc)
3104                 GOTO(fixup_tpobj, rc);
3105
3106         if (mdd_spobj != mdd_tpobj) {
3107                 la->la_valid = LA_CTIME | LA_MTIME;
3108                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3109                 if (rc != 0)
3110                         GOTO(fixup_tpobj, rc);
3111         }
3112
3113         EXIT;
3114
3115 fixup_tpobj:
3116         if (rc) {
3117                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3118                 if (rc2)
3119                         CWARN("tp obj fix error %d\n",rc2);
3120
3121                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3122                     !mdd_is_dead_obj(mdd_tobj)) {
3123                         if (tobj_ref) {
3124                                 mdo_ref_add(env, mdd_tobj, handle);
3125                                 if (is_dir)
3126                                         mdo_ref_add(env, mdd_tobj, handle);
3127                         }
3128
3129                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3130                                                   mdo2fid(mdd_tobj),
3131                                                   mdd_object_type(mdd_tobj),
3132                                                   tname, handle);
3133                         if (rc2 != 0)
3134                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3135                 }
3136         }
3137
3138 fixup_spobj:
3139         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3140                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3141                 if (rc2)
3142                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3143                                mdd2obd_dev(mdd)->obd_name, rc2);
3144
3145
3146                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3147                                               dotdot, handle);
3148                 if (rc2 != 0)
3149                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3150                               mdd2obd_dev(mdd)->obd_name, rc2);
3151         }
3152
3153 fixup_spobj2:
3154         if (rc != 0) {
3155                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3156                                          mdd_object_type(mdd_sobj), sname,
3157                                          handle);
3158                 if (rc2 != 0)
3159                         CWARN("sp obj fix error: rc = %d\n", rc2);
3160         }
3161
3162 cleanup:
3163         if (tobj_locked)
3164                 mdd_write_unlock(env, mdd_tobj);
3165
3166         if (rc == 0)
3167                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3168                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3169                                             ltname, lsname, handle);
3170
3171 stop:
3172         rc = mdd_trans_stop(env, mdd, rc, handle);
3173
3174 out_pending:
3175         mdd_object_put(env, mdd_sobj);
3176         return rc;
3177 }
3178
3179 /**
3180  * Check whether we should migrate the file/dir
3181  * return val
3182  *      < 0  permission check failed or other error.
3183  *      = 0  the file can be migrated.
3184  **/
3185 static int mdd_migrate_sanity_check(const struct lu_env *env,
3186                                     struct mdd_device *mdd,
3187                                     struct mdd_object *spobj,
3188                                     struct mdd_object *tpobj,
3189                                     struct mdd_object *sobj,
3190                                     struct mdd_object *tobj,
3191                                     const struct lu_attr *spattr,
3192                                     const struct lu_attr *tpattr,
3193                                     const struct lu_attr *attr)
3194 {
3195         int rc;
3196
3197         ENTRY;
3198
3199         if (!mdd_object_remote(sobj)) {
3200                 mdd_read_lock(env, sobj, MOR_SRC_CHILD);
3201                 if (sobj->mod_count > 0) {
3202                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3203                                mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
3204                                sobj->mod_count);
3205                         mdd_read_unlock(env, sobj);
3206                         RETURN(-EBUSY);
3207                 }
3208                 mdd_read_unlock(env, sobj);
3209         }
3210
3211         if (mdd_object_exists(tobj))
3212                 RETURN(-EEXIST);
3213
3214         rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
3215                                      attr, NULL, NULL);
3216         RETURN(rc);
3217 }
3218
3219 typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
3220                                  struct mdd_object *obj,
3221                                  struct mdd_object *stripe,
3222                                  const struct lu_buf *lmv_buf,
3223                                  const struct lu_buf *lmu_buf,
3224                                  int index,
3225                                  struct thandle *handle);
3226
3227 static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
3228                                          struct mdd_object *obj,
3229                                          struct mdd_object *stripe,
3230                                          const struct lu_buf *lmv_buf,
3231                                          const struct lu_buf *lmu_buf,
3232                                          int index,
3233                                          struct thandle *handle)
3234 {
3235         struct mdd_thread_info *info = mdd_env_info(env);
3236         char *stripe_name = info->mti_name;
3237         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3238         int rc;
3239
3240         if (index < le32_to_cpu(lmu->lum_stripe_count))
3241                 return 0;
3242
3243         rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
3244         if (rc)
3245                 return rc;
3246
3247         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3248                  PFID(mdd_object_fid(stripe)), index);
3249
3250         rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
3251         if (rc)
3252                 return rc;
3253
3254         rc = mdo_declare_ref_del(env, obj, handle);
3255
3256         return rc;
3257 }
3258
3259 /* delete stripe from its master object namespace */
3260 static int mdd_dir_delete_stripe(const struct lu_env *env,
3261                                  struct mdd_object *obj,
3262                                  struct mdd_object *stripe,
3263                                  const struct lu_buf *lmv_buf,
3264                                  const struct lu_buf *lmu_buf,
3265                                  int index,
3266                                  struct thandle *handle)
3267 {
3268         struct mdd_thread_info *info = mdd_env_info(env);
3269         char *stripe_name = info->mti_name;
3270         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3271         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3272         __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
3273         int rc;
3274
3275         ENTRY;
3276
3277         /* local dir will delete via LOD */
3278         LASSERT(mdd_object_remote(obj));
3279         LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
3280
3281         if (index < del_offset)
3282                 RETURN(0);
3283
3284         mdd_write_lock(env, stripe, MOR_SRC_CHILD);
3285         rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
3286         if (rc)
3287                 GOTO(out, rc);
3288
3289         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3290                  PFID(mdd_object_fid(stripe)), index);
3291
3292         rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
3293         if (rc)
3294                 GOTO(out, rc);
3295
3296         rc = mdo_ref_del(env, obj, handle);
3297         GOTO(out, rc);
3298 out:
3299         mdd_write_unlock(env, stripe);
3300
3301         return rc;
3302 }
3303
3304 static int mdd_dir_declare_destroy_stripe(const struct lu_env *env,
3305                                           struct mdd_object *obj,
3306                                           struct mdd_object *stripe,
3307                                           const struct lu_buf *lmv_buf,
3308                                           const struct lu_buf *lmu_buf,
3309                                           int index,
3310                                           struct thandle *handle)
3311 {
3312         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3313         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3314         int rc;
3315
3316         if (index < shrink_offset) {
3317                 if (shrink_offset < 2)
3318                         return 0;
3319                 return mdo_declare_xattr_set(env, stripe, lmv_buf,
3320                                              XATTR_NAME_LMV".set", 0, handle);
3321         }
3322
3323         rc = mdo_declare_ref_del(env, stripe, handle);
3324         if (rc)
3325                 return rc;
3326
3327         rc = mdo_declare_destroy(env, stripe, handle);
3328
3329         return rc;
3330 }
3331
3332 static int mdd_dir_destroy_stripe(const struct lu_env *env,
3333                                   struct mdd_object *obj,
3334                                   struct mdd_object *stripe,
3335                                   const struct lu_buf *lmv_buf,
3336                                   const struct lu_buf *lmu_buf,
3337                                   int index,
3338                                   struct thandle *handle)
3339 {
3340         struct mdd_thread_info *info = mdd_env_info(env);
3341         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3342         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3343         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3344         int rc;
3345
3346         ENTRY;
3347
3348         /* update remaining stripes' LMV */
3349         if (index < shrink_offset) {
3350                 struct lmv_mds_md_v1 *slave_lmv;
3351                 struct lu_buf slave_buf = {
3352                                 .lb_buf = &info->mti_lmv.lmv_md_v1,
3353                                 .lb_len = sizeof(*slave_lmv)
3354                 };
3355                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
3356
3357                 /* if dir will be shrunk to 1-stripe, don't update */
3358                 if (shrink_offset < 2)
3359                         RETURN(0);
3360
3361                 slave_lmv = slave_buf.lb_buf;
3362                 memset(slave_lmv, 0, sizeof(*slave_lmv));
3363                 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
3364                 slave_lmv->lmv_stripe_count = lmu->lum_stripe_count;
3365                 slave_lmv->lmv_master_mdt_index = cpu_to_le32(index);
3366                 slave_lmv->lmv_hash_type = lmv->lmv_hash_type &
3367                                            cpu_to_le32(LMV_HASH_TYPE_MASK);
3368                 slave_lmv->lmv_layout_version = cpu_to_le32(++version);
3369
3370                 rc = mdo_xattr_set(env, stripe, &slave_buf,
3371                                    XATTR_NAME_LMV".set", 0, handle);
3372                 RETURN(rc);
3373         }
3374
3375         mdd_write_lock(env, stripe, MOR_SRC_CHILD);
3376         rc = mdo_ref_del(env, stripe, handle);
3377         if (!rc)
3378                 rc = mdo_destroy(env, stripe, handle);
3379         mdd_write_unlock(env, stripe);
3380
3381         RETURN(rc);
3382 }
3383
3384 static int mdd_shrink_stripe_is_empty(const struct lu_env *env,
3385                                        struct mdd_object *obj,
3386                                        struct mdd_object *stripe,
3387                                        const struct lu_buf *lmv_buf,
3388                                        const struct lu_buf *lmu_buf,
3389                                        int index,
3390                                        struct thandle *handle)
3391 {
3392         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3393         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3394
3395         /* the default value is 0, but it means 1 */
3396         if (!shrink_offset)
3397                 shrink_offset = 1;
3398
3399         if (index < shrink_offset)
3400                 return 0;
3401
3402         return mdd_dir_is_empty(env, stripe);
3403 }
3404
3405 /*
3406  * iterate stripes of striped directory on remote MDT, local striped directory
3407  * is accessed via LOD.
3408  */
3409 static int mdd_dir_iterate_stripes(const struct lu_env *env,
3410                                    struct mdd_object *obj,
3411                                    const struct lu_buf *lmv_buf,
3412                                    const struct lu_buf *lmu_buf,
3413                                    struct thandle *handle,
3414                                    mdd_dir_stripe_cb cb)
3415 {
3416         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
3417         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3418         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3419         struct mdd_object *stripe;
3420         int i;
3421         int rc;
3422
3423         ENTRY;
3424
3425         LASSERT(lmv);
3426
3427         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
3428                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
3429                 stripe = mdd_object_find(env, mdd, fid);
3430                 if (IS_ERR(stripe))
3431                         RETURN(PTR_ERR(stripe));
3432
3433                 rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
3434                 mdd_object_put(env, stripe);
3435                 if (rc)
3436                         RETURN(rc);
3437         }
3438
3439         RETURN(0);
3440 }
3441
3442 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
3443                             struct mdd_object *obj,
3444                             const struct lu_buf *buf,
3445                             const char *name,
3446                             int fl, struct thandle *handle);
3447
3448 /* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
3449 static int mdd_iterate_xattrs(const struct lu_env *env,
3450                               struct mdd_object *sobj,
3451                               struct mdd_object *tobj,
3452                               bool skip_linkea,
3453                               struct thandle *handle,
3454                               mdd_xattr_cb cb)
3455 {
3456         struct mdd_thread_info *info = mdd_env_info(env);
3457         char *xname;
3458         struct lu_buf list_xbuf;
3459         struct lu_buf xbuf = { NULL };
3460         int list_xsize;
3461         int xlen;
3462         int rem;
3463         int xsize;
3464         int rc;
3465
3466         ENTRY;
3467
3468         /* retrieve xattr list from the old object */
3469         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3470         if (list_xsize == -ENODATA)
3471                 RETURN(0);
3472
3473         if (list_xsize < 0)
3474                 RETURN(list_xsize);
3475
3476         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3477         if (info->mti_big_buf.lb_buf == NULL)
3478                 RETURN(-ENOMEM);
3479
3480         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3481         list_xbuf.lb_len = list_xsize;
3482         rc = mdo_xattr_list(env, sobj, &list_xbuf);
3483         if (rc < 0)
3484                 RETURN(rc);
3485
3486         rem = rc;
3487         rc = 0;
3488         xname = list_xbuf.lb_buf;
3489         while (rem > 0) {
3490                 xlen = strnlen(xname, rem - 1) + 1;
3491                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3492                     strcmp(XATTR_NAME_LMV, xname) == 0)
3493                         goto next;
3494
3495                 if (skip_linkea &&
3496                     strcmp(XATTR_NAME_LINK, xname) == 0)
3497                         goto next;
3498
3499                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3500                 if (xsize == -ENODATA)
3501                         goto next;
3502                 if (xsize < 0)
3503                         GOTO(out, rc = xsize);
3504
3505                 lu_buf_check_and_alloc(&xbuf, xsize);
3506                 if (xbuf.lb_buf == NULL)
3507                         GOTO(out, rc = -ENOMEM);
3508
3509                 rc = mdo_xattr_get(env, sobj, &xbuf, xname);
3510                 if (rc == -ENODATA)
3511                         goto next;
3512                 if (rc < 0)
3513                         GOTO(out, rc);
3514
3515 repeat:
3516                 rc = cb(env, tobj, &xbuf, xname, 0, handle);
3517                 if (unlikely(rc == -ENOSPC &&
3518                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3519                         rc = linkea_overflow_shrink(
3520                                         (struct linkea_data *)(xbuf.lb_buf));
3521                         if (likely(rc > 0)) {
3522                                 xbuf.lb_len = rc;
3523                                 goto repeat;
3524                         }
3525                 }
3526
3527                 if (rc)
3528                         GOTO(out, rc);
3529 next:
3530                 xname += xlen;
3531                 rem -= xlen;
3532         }
3533
3534 out:
3535         lu_buf_free(&xbuf);
3536         RETURN(rc);
3537 }
3538
3539 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
3540                              struct mdd_object *sobj,
3541                              struct mdd_object *tobj,
3542                              const struct lu_name *sname,
3543                              const struct lu_fid *sfid,
3544                              const struct lu_name *lname,
3545                              const struct lu_fid *fid,
3546                              void *opaque,
3547                              struct thandle *handle);
3548
3549 static int mdd_declare_update_link(const struct lu_env *env,
3550                                    struct mdd_object *sobj,
3551                                    struct mdd_object *tobj,
3552                                    const struct lu_name *tname,
3553                                    const struct lu_fid *tpfid,
3554                                    const struct lu_name *lname,
3555                                    const struct lu_fid *fid,
3556                                    void *unused,
3557                                    struct thandle *handle)
3558 {
3559         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3560         struct mdd_object *pobj;
3561         int rc;
3562
3563         /* ignore tobj */
3564         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3565             !strcmp(tname->ln_name, lname->ln_name))
3566                 return 0;
3567
3568         pobj = mdd_object_find(env, mdd, fid);
3569         if (IS_ERR(pobj))
3570                 return PTR_ERR(pobj);
3571
3572
3573         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
3574         if (!rc)
3575                 rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
3576                                               mdd_object_type(sobj),
3577                                               lname->ln_name, handle);
3578         mdd_object_put(env, pobj);
3579         if (rc)
3580                 return rc;
3581
3582         rc = mdo_declare_ref_add(env, tobj, handle);
3583         if (rc)
3584                 return rc;
3585
3586         rc = mdo_declare_ref_del(env, sobj, handle);
3587         return rc;
3588 }
3589
3590 static int mdd_update_link(const struct lu_env *env,
3591                            struct mdd_object *sobj,
3592                            struct mdd_object *tobj,
3593                            const struct lu_name *tname,
3594                            const struct lu_fid *tpfid,
3595                            const struct lu_name *lname,
3596                            const struct lu_fid *fid,
3597                            void *unused,
3598                            struct thandle *handle)
3599 {
3600         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3601         struct mdd_object *pobj;
3602         int rc;
3603
3604         ENTRY;
3605
3606         LASSERT(lu_name_is_valid(lname));
3607
3608         /* ignore tobj */
3609         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3610             !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
3611                 RETURN(0);
3612
3613         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
3614                PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
3615
3616         pobj = mdd_object_find(env, mdd, fid);
3617         if (IS_ERR(pobj)) {
3618                 CWARN("%s: cannot find obj "DFID": %ld\n",
3619                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
3620                 RETURN(PTR_ERR(pobj));
3621         }
3622
3623         if (!mdd_object_exists(pobj)) {
3624                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
3625                 mdd_object_put(env, pobj);
3626                 RETURN(-ENOENT);
3627         }
3628
3629         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
3630         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
3631         if (!rc)
3632                 rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
3633                                              mdd_object_type(sobj),
3634                                              lname->ln_name, handle);
3635         mdd_write_unlock(env, pobj);
3636         mdd_object_put(env, pobj);
3637         if (rc)
3638                 RETURN(rc);
3639
3640         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
3641         rc = mdo_ref_add(env, tobj, handle);
3642         mdd_write_unlock(env, tobj);
3643         if (rc)
3644                 RETURN(rc);
3645
3646         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
3647         rc = mdo_ref_del(env, sobj, handle);
3648         mdd_write_unlock(env, sobj);
3649
3650         RETURN(rc);
3651 }
3652
3653 static inline int mdd_fld_lookup(const struct lu_env *env,
3654                                  struct mdd_device *mdd,
3655                                  const struct lu_fid *fid,
3656                                  __u32 *mdt_index)
3657 {
3658         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
3659         struct seq_server_site *ss;
3660         int rc;
3661
3662         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
3663
3664         range->lsr_flags = LU_SEQ_RANGE_MDT;
3665         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
3666         if (rc)
3667                 return rc;
3668
3669         *mdt_index = range->lsr_index;
3670
3671         return 0;
3672 }
3673
3674 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
3675                                      struct mdd_object *sobj,
3676                                      struct mdd_object *tobj,
3677                                      const struct lu_name *tname,
3678                                      const struct lu_fid *tpfid,
3679                                      const struct lu_name *lname,
3680                                      const struct lu_fid *fid,
3681                                      void *opaque,
3682                                      struct thandle *handle)
3683 {
3684         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3685         __u32 source_mdt_index = *(__u32 *)opaque;
3686         __u32 link_mdt_index;
3687         int rc;
3688
3689         ENTRY;
3690
3691         /* ignore tobj */
3692         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3693             !strcmp(tname->ln_name, lname->ln_name))
3694                 return 0;
3695
3696         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
3697         if (rc)
3698                 RETURN(rc);
3699
3700         RETURN(link_mdt_index == source_mdt_index);
3701 }
3702
3703 static int mdd_iterate_linkea(const struct lu_env *env,
3704                               struct mdd_object *sobj,
3705                               struct mdd_object *tobj,
3706                               const struct lu_name *tname,
3707                               const struct lu_fid *tpfid,
3708                               struct linkea_data *ldata,
3709                               void *opaque,
3710                               struct thandle *handle,
3711                               mdd_linkea_cb cb)
3712 {
3713         struct mdd_thread_info *info = mdd_env_info(env);
3714         char *filename = info->mti_name;
3715         struct lu_name lname;
3716         struct lu_fid fid;
3717         int rc = 0;
3718
3719         if (!ldata->ld_buf)
3720                 return 0;
3721
3722         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
3723              linkea_next_entry(ldata)) {
3724                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
3725                                     &fid);
3726
3727                 /* Note: lname might miss \0 at the end */
3728                 snprintf(filename, sizeof(info->mti_name), "%.*s",
3729                          lname.ln_namelen, lname.ln_name);
3730                 lname.ln_name = filename;
3731
3732                 CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
3733
3734                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
3735                         handle);
3736         }
3737
3738         return rc;
3739 }
3740
3741 /**
3742  * Prepare linkea, and check whether file needs migrate: if source still has
3743  * link on source MDT, no need to migrate, just update namespace on source and
3744  * target parents.
3745  *
3746  * \retval      0 do migrate
3747  * \retval      1 don't migrate
3748  * \retval      -errno on failure
3749  */
3750 static int migrate_linkea_prepare(const struct lu_env *env,
3751                                   struct mdd_device *mdd,
3752                                   struct mdd_object *spobj,
3753                                   struct mdd_object *tpobj,
3754                                   struct mdd_object *sobj,
3755                                   const struct lu_name *lname,
3756                                   const struct lu_attr *attr,
3757                                   struct linkea_data *ldata)
3758 {
3759         __u32 source_mdt_index;
3760         int rc;
3761
3762         ENTRY;
3763
3764         memset(ldata, 0, sizeof(*ldata));
3765         rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
3766                                 mdo2fid(tpobj), lname, 1, 0, ldata);
3767         if (rc)
3768                 RETURN(rc);
3769
3770         /*
3771          * Then it will check if the file should be migrated. If the file has
3772          * mulitple links, we only need migrate the file if all of its entries
3773          * has been migrated to the remote MDT.
3774          */
3775         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
3776                 RETURN(0);
3777
3778         /* If there are still links locally, don't migrate this file */
3779         LASSERT(ldata->ld_leh != NULL);
3780
3781         /*
3782          * If linkEA is overflow, it means there are some unknown name entries
3783          * under unknown parents, which will prevent the migration.
3784          */
3785         if (unlikely(ldata->ld_leh->leh_overflow_time))
3786                 RETURN(-EOVERFLOW);
3787
3788         rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
3789         if (rc)
3790                 RETURN(rc);
3791
3792         rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
3793                                 &source_mdt_index, NULL,
3794                                 mdd_is_link_on_source_mdt);
3795         RETURN(rc);
3796 }
3797
3798 static int mdd_dir_declare_layout_delete(const struct lu_env *env,
3799                                          struct mdd_object *obj,
3800                                          const struct lu_buf *lmv_buf,
3801                                          const struct lu_buf *lmu_buf,
3802                                          struct thandle *handle)
3803 {
3804         int rc;
3805
3806         if (!lmv_buf->lb_buf)
3807                 rc = mdo_declare_index_delete(env, obj, dotdot, handle);
3808         else if (mdd_object_remote(obj))
3809                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3810                                              mdd_dir_declare_delete_stripe);
3811         else
3812                 rc = mdo_declare_xattr_set(env, obj, lmu_buf,
3813                                            XATTR_NAME_LMV".del", 0, handle);
3814
3815         return rc;
3816 }
3817
3818 static int mdd_dir_layout_delete(const struct lu_env *env,
3819                                  struct mdd_object *obj,
3820                                  const struct lu_buf *lmv_buf,
3821                                  const struct lu_buf *lmu_buf,
3822                                  struct thandle *handle)
3823 {
3824         int rc;
3825
3826         ENTRY;
3827
3828         mdd_write_lock(env, obj, MOR_SRC_PARENT);
3829         if (!lmv_buf->lb_buf)
3830                 /* normal dir */
3831                 rc = __mdd_index_delete_only(env, obj, dotdot, handle);
3832         else if (mdd_object_remote(obj))
3833                 /* striped, but remote */
3834                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3835                                              mdd_dir_delete_stripe);
3836         else
3837                 rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
3838                                    handle);
3839         mdd_write_unlock(env, obj);
3840
3841         RETURN(rc);
3842 }
3843
3844 static int mdd_declare_migrate_create(const struct lu_env *env,
3845                                       struct mdd_object *tpobj,
3846                                       struct mdd_object *sobj,
3847                                       struct mdd_object *tobj,
3848                                       const struct lu_name *lname,
3849                                       struct lu_attr *attr,
3850                                       struct lu_buf *sbuf,
3851                                       struct linkea_data *ldata,
3852                                       struct md_op_spec *spec,
3853                                       struct dt_allocation_hint *hint,
3854                                       struct thandle *handle)
3855 {
3856         struct mdd_thread_info *info = mdd_env_info(env);
3857         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
3858         int rc;
3859
3860         if (S_ISDIR(attr->la_mode)) {
3861                 struct lu_buf lmu_buf = { NULL };
3862
3863                 if (lmv) {
3864                         struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
3865
3866                         lmu->lum_stripe_count = 0;
3867                         lmu_buf.lb_buf = lmu;
3868                         lmu_buf.lb_len = sizeof(*lmu);
3869                 }
3870
3871                 rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
3872                                                    handle);
3873                 if (rc)
3874                         return rc;
3875
3876                 if (lmv) {
3877                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3878                                                    handle);
3879                         if (rc)
3880                                 return rc;
3881                 }
3882         }
3883
3884         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
3885                                 lname, attr, handle, spec, ldata, NULL, NULL,
3886                                 hint);
3887         if (rc)
3888                 return rc;
3889
3890         if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
3891                 if (!lmv) {
3892                         /*
3893                          * if sobj is not striped, fake a 1-stripe LMV, which
3894                          * will be used to generate a compound LMV for tobj.
3895                          */
3896                         LASSERT(sizeof(info->mti_key) >
3897                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
3898                         lmv = (typeof(lmv))info->mti_key;
3899                         memset(lmv, 0, sizeof(*lmv));
3900                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3901                         lmv->lmv_stripe_count = cpu_to_le32(1);
3902                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
3903                         sbuf->lb_buf = lmv;
3904                         sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
3905
3906                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3907                                                    XATTR_NAME_LMV".add", 0,
3908                                                    handle);
3909                         sbuf->lb_buf = NULL;
3910                         sbuf->lb_len = 0;
3911                 } else {
3912                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3913                                                    XATTR_NAME_LMV".add", 0,
3914                                                    handle);
3915                 }
3916                 if (rc)
3917                         return rc;
3918         }
3919
3920         /*
3921          * tobj mode will be used in lod_declare_xattr_set(), but it's not
3922          * createb yet, copy from sobj.
3923          */
3924         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
3925         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
3926                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
3927
3928         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
3929                                 mdo_declare_xattr_set);
3930         if (rc)
3931                 return rc;
3932
3933         if (S_ISREG(attr->la_mode)) {
3934                 struct lu_buf fid_buf;
3935
3936                 handle->th_complex = 1;
3937
3938                 /* target may be remote, update PFID via sobj. */
3939                 fid_buf.lb_buf = (void *)mdo2fid(tobj);
3940                 fid_buf.lb_len = sizeof(struct lu_fid);
3941                 rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
3942                                            0, handle);
3943                 if (rc)
3944                         return rc;
3945
3946                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
3947                 if (rc)
3948                         return rc;
3949         }
3950
3951         if (!S_ISDIR(attr->la_mode)) {
3952                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
3953                                         ldata, NULL, handle,
3954                                         mdd_declare_update_link);
3955                 if (rc)
3956                         return rc;
3957
3958                 if (lmv) {
3959                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3960                                                    handle);
3961                         if (rc)
3962                                 return rc;
3963                 }
3964         }
3965
3966         return rc;
3967 }
3968
3969 /**
3970  * Create target, migrate xattrs and update links.
3971  *
3972  * Create target according to \a spec, and then migrate xattrs, if it's
3973  * directory, migrate source stripes to target, else update fid to target
3974  * for links.
3975  *
3976  * \param[in] env       execution environment
3977  * \param[in] tpobj     target parent object
3978  * \param[in] sobj      source object
3979  * \param[in] tobj      target object
3980  * \param[in] lname     file name
3981  * \param[in] attr      source attributes
3982  * \param[in] sbuf      source LMV buf
3983  * \param[in] ldata     source linkea
3984  * \param[in] spec      migrate create spec
3985  * \param[in] hint      target creation hint
3986  * \param[in] handle    tranasction handle
3987  *
3988  * \retval      0 on success
3989  * \retval      -errno on failure
3990  **/
3991 static int mdd_migrate_create(const struct lu_env *env,
3992                               struct mdd_object *tpobj,
3993                               struct mdd_object *sobj,
3994                               struct mdd_object *tobj,
3995                               const struct lu_name *lname,
3996                               struct lu_attr *attr,
3997                               const struct lu_buf *sbuf,
3998                               struct linkea_data *ldata,
3999                               struct md_op_spec *spec,
4000                               struct dt_allocation_hint *hint,
4001                               struct thandle *handle)
4002 {
4003         int rc;
4004
4005         ENTRY;
4006
4007         /*
4008          * directory will migrate sobj stripes to tobj:
4009          * 1. delete stripes from sobj.
4010          * 2. add stripes to tobj, see lod_dir_declare_layout_add().
4011          * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
4012          */
4013         if (S_ISDIR(attr->la_mode)) {
4014                 struct lu_buf lmu_buf = { NULL };
4015
4016                 if (sbuf->lb_buf) {
4017                         struct mdd_thread_info *info = mdd_env_info(env);
4018                         struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
4019
4020                         lmu->lum_stripe_count = 0;
4021                         lmu_buf.lb_buf = lmu;
4022                         lmu_buf.lb_len = sizeof(*lmu);
4023                 }
4024
4025                 rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
4026                 if (rc)
4027                         RETURN(rc);
4028
4029                 /*
4030                  * delete LMV so that later when destroying sobj it won't delete
4031                  * stripes again.
4032                  */
4033                 if (sbuf->lb_buf) {
4034                         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4035                         rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
4036                         mdd_write_unlock(env, sobj);
4037                         if (rc)
4038                                 RETURN(rc);
4039                 }
4040         }
4041
4042         /* don't set nlink from sobj */
4043         attr->la_valid &= ~LA_NLINK;
4044
4045         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
4046                                 handle);
4047         if (rc)
4048                 RETURN(rc);
4049
4050         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
4051         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
4052         mdd_write_unlock(env, tobj);
4053         if (rc)
4054                 RETURN(rc);
4055
4056         /* for regular file, update OST objects XATTR_NAME_FID */
4057         if (S_ISREG(attr->la_mode)) {
4058                 struct lu_buf fid_buf;
4059
4060                 /* target may be remote, update PFID via sobj. */
4061                 fid_buf.lb_buf = (void *)mdo2fid(tobj);
4062                 fid_buf.lb_len = sizeof(struct lu_fid);
4063                 rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
4064                                    handle);
4065                 if (rc)
4066                         RETURN(rc);
4067
4068                 /* delete LOV to avoid deleting OST objs when destroying sobj */
4069                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4070                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4071                 mdd_write_unlock(env, sobj);
4072                 if (rc)
4073                         RETURN(rc);
4074         }
4075
4076         if (!S_ISDIR(attr->la_mode))
4077                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
4078                                         ldata, NULL, handle, mdd_update_link);
4079
4080         RETURN(rc);
4081 }
4082
4083 static int mdd_declare_migrate_update(const struct lu_env *env,
4084                                       struct mdd_object *spobj,
4085                                       struct mdd_object *tpobj,
4086                                       struct mdd_object *sobj,
4087                                       struct mdd_object *tobj,
4088                                       const struct lu_name *lname,
4089                                       struct lu_attr *attr,
4090                                       struct lu_attr *spattr,
4091                                       struct lu_attr *tpattr,
4092                                       struct linkea_data *ldata,
4093                                       bool do_create,
4094                                       bool do_destroy,
4095                                       struct md_attr *ma,
4096                                       struct thandle *handle)
4097 {
4098         struct mdd_thread_info *info = mdd_env_info(env);
4099         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4100         struct lu_attr *la = &info->mti_la_for_fix;
4101         int rc;
4102
4103         rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
4104         if (rc)
4105                 return rc;
4106
4107         if (S_ISDIR(attr->la_mode)) {
4108                 rc = mdo_declare_ref_del(env, spobj, handle);
4109                 if (rc)
4110                         return rc;
4111         }
4112
4113         rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4114                                       lname->ln_name, handle);
4115         if (rc)
4116                 return rc;
4117
4118         rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
4119         if (rc)
4120                 return rc;
4121
4122         if (S_ISDIR(attr->la_mode)) {
4123                 rc = mdo_declare_ref_add(env, tpobj, handle);
4124                 if (rc)
4125                         return rc;
4126         }
4127
4128         la->la_valid = LA_CTIME | LA_MTIME;
4129         rc = mdo_declare_attr_set(env, spobj, la, handle);
4130         if (rc)
4131                 return rc;
4132
4133         if (tpobj != spobj) {
4134                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
4135                 if (rc)
4136                         return rc;
4137         }
4138
4139         if (do_create && do_destroy) {
4140                 rc = mdo_declare_ref_del(env, sobj, handle);
4141                 if (rc)
4142                         return rc;
4143
4144                 rc = mdo_declare_destroy(env, sobj, handle);
4145                 if (rc)
4146                         return rc;
4147         }
4148
4149         return rc;
4150 }
4151
4152 /**
4153  * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
4154  **/
4155 static int mdd_migrate_update(const struct lu_env *env,
4156                               struct mdd_object *spobj,
4157                               struct mdd_object *tpobj,
4158                               struct mdd_object *sobj,
4159                               struct mdd_object *tobj,
4160                               const struct lu_name *lname,
4161                               struct lu_attr *attr,
4162                               struct lu_attr *spattr,
4163                               struct lu_attr *tpattr,
4164                               struct linkea_data *ldata,
4165                               bool do_create,
4166                               bool do_destroy,
4167                               struct md_attr *ma,
4168                               struct thandle *handle)
4169 {
4170         struct mdd_thread_info *info = mdd_env_info(env);
4171         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4172         struct lu_attr *la = &info->mti_la_for_fix;
4173         int rc;
4174
4175         ENTRY;
4176
4177         CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
4178                lname->ln_name, PFID(mdo2fid(spobj)),
4179                PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
4180                PFID(fid));
4181
4182         rc = __mdd_index_delete(env, spobj, lname->ln_name,
4183                                 S_ISDIR(attr->la_mode), handle);
4184         if (rc)
4185                 RETURN(rc);
4186
4187         rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4188                                 lname->ln_name, handle);
4189         if (rc)
4190                 RETURN(rc);
4191
4192         rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
4193         if (rc)
4194                 RETURN(rc);
4195
4196         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
4197         la->la_valid = LA_CTIME | LA_MTIME;
4198         mdd_write_lock(env, spobj, MOR_SRC_PARENT);
4199         rc = mdd_update_time(env, spobj, spattr, la, handle);
4200         mdd_write_unlock(env, spobj);
4201         if (rc)
4202                 RETURN(rc);
4203
4204         if (tpobj != spobj) {
4205                 la->la_valid = LA_CTIME | LA_MTIME;
4206                 mdd_write_lock(env, tpobj, MOR_TGT_PARENT);
4207                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
4208                 mdd_write_unlock(env, tpobj);
4209                 if (rc)
4210                         RETURN(rc);
4211         }
4212
4213         /*
4214          * there are three situations we shouldn't destroy source:
4215          * 1. if source is not dir, and it happens to be located on the same MDT
4216          *    as target parent.
4217          * 2. if source is not dir, and has link on the same MDT where source is
4218          *    located.
4219          * 3. if source is dir, and it's a normal, non-empty dir.
4220          *
4221          * the first two situations equals to !do_create, and the 3rd equals to
4222          * !do_destroy, so the below condition is actually
4223          * !(!do_create || !do_destroy).
4224          *
4225          * NB, if user has opened source dir before migration, he will get
4226          * -ENOENT error when close it later, because source is likely to be
4227          *  remote, which can't be moved to orphan list, but except this error
4228          *  message, this won't cause any inconsistency or trouble.
4229          */
4230         if (do_create && do_destroy) {
4231                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4232                 mdo_ref_del(env, sobj, handle);
4233                 rc = mdo_destroy(env, sobj, handle);
4234                 mdd_write_unlock(env, sobj);
4235         }
4236
4237         RETURN(rc);
4238 }
4239
4240 /**
4241  * Migrate directory or file.
4242  *
4243  * migrate source to target in following steps:
4244  *   1. create target, append source stripes after target's if it's directory,
4245  *      migrate xattrs and update fid of source links.
4246  *   2. update namespace: migrate dirent from source parent to target parent,
4247  *      update file linkea, and destroy source if it's not needed any more.
4248  *
4249  * \param[in] env       execution environment
4250  * \param[in] md_pobj   parent master object
4251  * \param[in] md_sobj   source object
4252  * \param[in] lname     file name
4253  * \param[in] md_tobj   target object
4254  * \param[in] spec      target creation spec
4255  * \param[in] ma        used to update \a pobj mtime and ctime
4256  *
4257  * \retval              0 on success
4258  * \retval              -errno on failure
4259  */
4260 static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
4261                        struct md_object *md_sobj, const struct lu_name *lname,
4262                        struct md_object *md_tobj, struct md_op_spec *spec,
4263                        struct md_attr *ma)
4264 {
4265         struct mdd_device *mdd = mdo2mdd(md_pobj);
4266         struct mdd_thread_info *info = mdd_env_info(env);
4267         struct mdd_object *pobj = md2mdd_obj(md_pobj);
4268         struct mdd_object *sobj = md2mdd_obj(md_sobj);
4269         struct mdd_object *tobj = md2mdd_obj(md_tobj);
4270         struct mdd_object *spobj = NULL;
4271         struct mdd_object *tpobj = NULL;
4272         struct lu_attr *spattr = &info->mti_pattr;
4273         struct lu_attr *tpattr = &info->mti_tpattr;
4274         struct lu_attr *attr = &info->mti_cattr;
4275         struct linkea_data *ldata = &info->mti_link_data;
4276         struct dt_allocation_hint *hint = &info->mti_hint;
4277         struct lu_fid *fid = &info->mti_fid2;
4278         struct lu_buf pbuf = { NULL };
4279         struct lu_buf sbuf = { NULL };
4280         struct lmv_mds_md_v1 *plmv;
4281         struct thandle *handle;
4282         bool do_create = true;
4283         bool do_destroy = true;
4284         int rc;
4285         ENTRY;
4286
4287         rc = mdd_la_get(env, sobj, attr);
4288         if (rc)
4289                 RETURN(rc);
4290
4291         /* locate source and target stripe on pobj, which are the real parent */
4292         rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
4293         if (rc < 0 && rc != -ENODATA)
4294                 RETURN(rc);
4295
4296         plmv = pbuf.lb_buf;
4297         if (plmv) {
4298                 __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
4299                 __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
4300                 int index;
4301
4302                 /* locate target parent stripe */
4303                 if (hash_type & LMV_HASH_FLAG_MIGRATION) {
4304                         /*
4305                          * fail check here to make sure top dir migration
4306                          * succeed.
4307                          */
4308                         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
4309                                 GOTO(out, rc = -EIO);
4310                         hash_type &= ~LMV_HASH_FLAG_MIGRATION;
4311                         count = le32_to_cpu(plmv->lmv_migrate_offset);
4312                 }
4313                 index = lmv_name_to_stripe_index(hash_type, count,
4314                                                  lname->ln_name,
4315                                                  lname->ln_namelen);
4316                 if (index < 0)
4317                         GOTO(out, rc = index);
4318
4319                 fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4320                 tpobj = mdd_object_find(env, mdd, fid);
4321                 if (IS_ERR(tpobj))
4322                         GOTO(out, rc = PTR_ERR(tpobj));
4323
4324                 /* locate source parent stripe */
4325                 if (le32_to_cpu(plmv->lmv_hash_type) &
4326                     LMV_HASH_FLAG_MIGRATION) {
4327                         hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
4328                         count = le32_to_cpu(plmv->lmv_stripe_count) -
4329                                 le32_to_cpu(plmv->lmv_migrate_offset);
4330
4331                         index = lmv_name_to_stripe_index(hash_type, count,
4332                                                          lname->ln_name,
4333                                                          lname->ln_namelen);
4334                         if (index < 0) {
4335                                 mdd_object_put(env, tpobj);
4336                                 GOTO(out, rc = index);
4337                         }
4338
4339                         index += le32_to_cpu(plmv->lmv_migrate_offset);
4340                         fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4341                         spobj = mdd_object_find(env, mdd, fid);
4342                         if (IS_ERR(spobj)) {
4343                                 mdd_object_put(env, tpobj);
4344                                 GOTO(out, rc = PTR_ERR(spobj));
4345                         }
4346                 } else {
4347                         spobj = tpobj;
4348                         mdd_object_get(spobj);
4349                 }
4350         } else {
4351                 tpobj = pobj;
4352                 spobj = pobj;
4353                 mdd_object_get(tpobj);
4354                 mdd_object_get(spobj);
4355         }
4356
4357         rc = mdd_la_get(env, spobj, spattr);
4358         if (rc)
4359                 GOTO(out, rc);
4360
4361         rc = mdd_la_get(env, tpobj, tpattr);
4362         if (rc)
4363                 GOTO(out, rc);
4364
4365         if (S_ISDIR(attr->la_mode)) {
4366                 struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
4367
4368                 LASSERT(lmu);
4369
4370                 /*
4371                  * if user use default value '0' for stripe_count, we need to
4372                  * adjust it to '1' to create a 1-stripe directory.
4373                  */
4374                 if (lmu->lum_stripe_count == 0) {
4375                         /* eadata is from request, don't alter it */
4376                         info->mti_lmu = *lmu;
4377                         info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
4378                         spec->u.sp_ea.eadata = &info->mti_lmu;
4379                         lmu = spec->u.sp_ea.eadata;
4380                 }
4381
4382                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4383                 if (rc == -ENODATA) {
4384                         if (mdd_dir_is_empty(env, sobj) == 0) {
4385                                 /*
4386                                  * if sobj is empty, and target is not striped,
4387                                  * create target as a normal directory.
4388                                  */
4389                                 if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
4390                                         info->mti_lmu = *lmu;
4391                                         info->mti_lmu.lum_stripe_count = 0;
4392                                         spec->u.sp_ea.eadata = &info->mti_lmu;
4393                                         lmu = spec->u.sp_ea.eadata;
4394                                 }
4395                         } else {
4396                                 /*
4397                                  * sobj is not striped dir, if it's not empty,
4398                                  * it will be migrated to be a stripe of target,
4399                                  * don't destroy it after migration.
4400                                  */
4401                                 do_destroy = false;
4402                         }
4403                 } else if (rc) {
4404                         GOTO(out, rc);
4405                 } else {
4406                         struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
4407
4408                         if (le32_to_cpu(lmv->lmv_hash_type) &
4409                             LMV_HASH_FLAG_MIGRATION) {
4410                                 __u32 lum_stripe_count = lmu->lum_stripe_count;
4411                                 __u32 lmv_hash_type = lmv->lmv_hash_type &
4412                                         cpu_to_le32(LMV_HASH_TYPE_MASK);
4413
4414                                 if (!lum_stripe_count)
4415                                         lum_stripe_count = cpu_to_le32(1);
4416
4417                                 /* TODO: check specific MDTs */
4418                                 if (lmv->lmv_migrate_offset !=
4419                                     lum_stripe_count ||
4420                                     lmv->lmv_master_mdt_index !=
4421                                     lmu->lum_stripe_offset ||
4422                                     (lmv_hash_type != 0 &&
4423                                      lmv_hash_type != lmu->lum_hash_type)) {
4424                                         CERROR("%s: \'"DNAME"\' migration was "
4425                                                 "interrupted, run \'lfs migrate "
4426                                                 "-m %d -c %d -H %d "DNAME"\' to "
4427                                                 "finish migration.\n",
4428                                                 mdd2obd_dev(mdd)->obd_name,
4429                                                 PNAME(lname),
4430                                                 le32_to_cpu(
4431                                                     lmv->lmv_master_mdt_index),
4432                                                 le32_to_cpu(
4433                                                     lmv->lmv_migrate_offset),
4434                                                 le32_to_cpu(lmv_hash_type),
4435                                                 PNAME(lname));
4436                                         GOTO(out, rc = -EPERM);
4437                                 }
4438                                 GOTO(out, rc = -EALREADY);
4439                         }
4440                 }
4441         } else if (!mdd_object_remote(tpobj)) {
4442                 /*
4443                  * if source is already on MDT where target parent is located,
4444                  * no need to create, just update namespace.
4445                  */
4446                 do_create = false;
4447         } else if (S_ISLNK(attr->la_mode)) {
4448                 lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4449                 if (!sbuf.lb_buf)
4450                         GOTO(out, rc = -ENOMEM);
4451                 rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4452                 if (rc <= 0) {
4453                         rc = rc ?: -EFAULT;
4454                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
4455                                mdd2obd_dev(mdd)->obd_name,
4456                                PFID(mdo2fid(sobj)), rc);
4457                         GOTO(out, rc);
4458                 }
4459                 spec->u.sp_symname = sbuf.lb_buf;
4460         } else if (S_ISREG(attr->la_mode)) {
4461                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
4462                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
4463         }
4464
4465         /*
4466          * if sobj has link on the same MDT, no need to create, just update
4467          * namespace, and it will be a remote file on target parent, which is
4468          * similar to rename.
4469          */
4470         rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
4471                                     ldata);
4472         if (rc > 0)
4473                 do_create = false;
4474         else if (rc)
4475                 GOTO(out, rc);
4476
4477         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4478                                       spattr, tpattr, attr);
4479         if (rc)
4480                 GOTO(out, rc);
4481
4482         mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
4483
4484         handle = mdd_trans_create(env, mdd);
4485         if (IS_ERR(handle))
4486                 GOTO(out, rc = PTR_ERR(handle));
4487
4488         if (do_create) {
4489                 rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
4490                                                 attr, &sbuf, ldata, spec, hint,
4491                                                 handle);
4492                 if (rc)
4493                         GOTO(stop_trans, rc);
4494         }
4495
4496         rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
4497                                         attr, spattr, tpattr, ldata, do_create,
4498                                         do_destroy, ma, handle);
4499         if (rc)
4500                 GOTO(stop_trans, rc);
4501
4502         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4503                                          handle);
4504         if (rc)
4505                 GOTO(stop_trans, rc);
4506
4507         rc = mdd_trans_start(env, mdd, handle);
4508         if (rc)
4509                 GOTO(stop_trans, rc);
4510
4511         if (do_create) {
4512                 rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
4513                                         &sbuf, ldata, spec, hint, handle);
4514                 if (rc)
4515                         GOTO(stop_trans, rc);
4516         }
4517
4518         rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
4519                                 spattr, tpattr, ldata, do_create, do_destroy,
4520                                 ma, handle);
4521         if (rc)
4522                 GOTO(stop_trans, rc);
4523
4524         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
4525                                     mdo2fid(spobj), mdo2fid(sobj),
4526                                     mdo2fid(tpobj), lname, lname, handle);
4527         if (rc)
4528                 GOTO(stop_trans, rc);
4529
4530         EXIT;
4531 stop_trans:
4532         rc = mdd_trans_stop(env, mdd, rc, handle);
4533 out:
4534         if (spobj && !IS_ERR(spobj))
4535                 mdd_object_put(env, spobj);
4536         if (tpobj && !IS_ERR(tpobj))
4537                 mdd_object_put(env, tpobj);
4538         lu_buf_free(&sbuf);
4539         lu_buf_free(&pbuf);
4540         return rc;
4541 }
4542
4543 static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
4544                                            struct mdd_object *pobj,
4545                                            struct mdd_object *obj,
4546                                            struct mdd_object *stripe,
4547                                            struct lu_attr *attr,
4548                                            struct lu_buf *lmv_buf,
4549                                            const struct lu_buf *lmu_buf,
4550                                            struct lu_name *lname,
4551                                            struct thandle *handle)
4552 {
4553         struct mdd_thread_info *info = mdd_env_info(env);
4554         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
4555         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
4556         struct lu_buf shrink_buf = { .lb_buf = lmu,
4557                                      .lb_len = sizeof(*lmu) };
4558         int rc;
4559
4560         LASSERT(lmv);
4561
4562         memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
4563
4564         if (le32_to_cpu(lmu->lum_stripe_count) < 2)
4565                 lmu->lum_stripe_count = 0;
4566
4567         rc = mdd_dir_declare_layout_delete(env, obj, lmv_buf, &shrink_buf,
4568                                            handle);
4569         if (rc)
4570                 return rc;
4571
4572         if (lmu->lum_stripe_count == 0) {
4573                 lmu->lum_stripe_count = cpu_to_le32(1);
4574
4575                 rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LMV, handle);
4576                 if (rc)
4577                         return rc;
4578         }
4579
4580         rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
4581                                      mdd_dir_declare_destroy_stripe);
4582         if (rc)
4583                 return rc;
4584
4585         if (le32_to_cpu(lmu->lum_stripe_count) > 1)
4586                 return mdo_declare_xattr_set(env, obj, lmv_buf,
4587                                              XATTR_NAME_LMV".set", 0, handle);
4588
4589         rc = mdo_declare_index_insert(env, stripe, mdo2fid(pobj), S_IFDIR,
4590                                       dotdot, handle);
4591         if (rc)
4592                 return rc;
4593
4594         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
4595                                 mdo_declare_xattr_set);
4596         if (rc)
4597                 return rc;
4598
4599         rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4600         if (rc)
4601                 return rc;
4602
4603         rc = mdo_declare_attr_set(env, stripe, attr, handle);
4604         if (rc)
4605                 return rc;
4606
4607         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4608         if (rc)
4609                 return rc;
4610
4611         rc = mdo_declare_index_insert(env, pobj, mdo2fid(stripe), attr->la_mode,
4612                                       lname->ln_name, handle);
4613         if (rc)
4614                 return rc;
4615
4616         rc = mdo_declare_ref_del(env, obj, handle);
4617         if (rc)
4618                 return rc;
4619
4620         rc = mdo_declare_ref_del(env, obj, handle);
4621         if (rc)
4622                 return rc;
4623
4624         rc = mdo_declare_destroy(env, obj, handle);
4625         if (rc)
4626                 return rc;
4627
4628         return rc;
4629
4630 }
4631
4632 /*
4633  * after files under \a obj were migrated, shrink old stripes from \a obj,
4634  * furthermore, if it becomes a 1-stripe directory, convert it to a normal one.
4635  */
4636 static int __mdd_dir_layout_shrink(const struct lu_env *env,
4637                                    struct mdd_object *pobj,
4638                                    struct mdd_object *obj,
4639                                    struct mdd_object *stripe,
4640                                    struct lu_attr *attr,
4641                                    struct lu_buf *lmv_buf,
4642                                    const struct lu_buf *lmu_buf,
4643                                    struct lu_name *lname,
4644                                    struct thandle *handle)
4645 {
4646         struct mdd_thread_info *info = mdd_env_info(env);
4647         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
4648         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
4649         struct lu_buf shrink_buf = { .lb_buf = lmu,
4650                                      .lb_len = sizeof(*lmu) };
4651         int len = lmv_buf->lb_len;
4652         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
4653         int rc;
4654
4655         ENTRY;
4656
4657         /* lmu needs to be altered, but lmu_buf is const */
4658         memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
4659
4660         /*
4661          * if dir will be shrunk to 1-stripe, delete all stripes, because it
4662          * will be converted to normal dir.
4663          */
4664         if (le32_to_cpu(lmu->lum_stripe_count) == 1)
4665                 lmu->lum_stripe_count = 0;
4666
4667         /* delete stripes after lmu_stripe_count */
4668         rc = mdd_dir_layout_delete(env, obj, lmv_buf, &shrink_buf, handle);
4669         if (rc)
4670                 RETURN(rc);
4671
4672         if (lmu->lum_stripe_count == 0) {
4673                 lmu->lum_stripe_count = cpu_to_le32(1);
4674
4675                 /* delete LMV to avoid deleting stripes again upon destroy */
4676                 mdd_write_lock(env, obj, MOR_SRC_CHILD);
4677                 rc = mdo_xattr_del(env, obj, XATTR_NAME_LMV, handle);
4678                 mdd_write_unlock(env, obj);
4679                 if (rc)
4680                         RETURN(rc);
4681         }
4682
4683         /* destroy stripes after lmu_stripe_count */
4684         mdd_write_lock(env, obj, MOR_SRC_PARENT);
4685         rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
4686                                      mdd_dir_destroy_stripe);
4687         mdd_write_unlock(env, obj);
4688
4689         if (le32_to_cpu(lmu->lum_stripe_count) > 1) {
4690                 /* update dir LMV, that's all if it's still striped. */
4691                 lmv->lmv_stripe_count = lmu->lum_stripe_count;
4692                 lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
4693                 lmv->lmv_migrate_offset = 0;
4694                 lmv->lmv_migrate_hash = 0;
4695                 lmv->lmv_layout_version = cpu_to_le32(++version);
4696
4697                 lmv_buf->lb_len = sizeof(*lmv);
4698                 rc = mdo_xattr_set(env, obj, lmv_buf, XATTR_NAME_LMV".set", 0,
4699                                    handle);
4700                 lmv_buf->lb_len = len;
4701                 RETURN(rc);
4702         }
4703
4704         /* replace directory with its remaining stripe */
4705         LASSERT(pobj);
4706         LASSERT(stripe);
4707
4708         mdd_write_lock(env, pobj, MOR_SRC_PARENT);
4709         mdd_write_lock(env, obj, MOR_SRC_CHILD);
4710
4711         /* insert dotdot to stripe which points to parent */
4712         rc = __mdd_index_insert_only(env, stripe, mdo2fid(pobj), S_IFDIR,
4713                                      dotdot, handle);
4714         if (rc)
4715                 GOTO(out, rc);
4716
4717         /* copy xattrs including linkea */
4718         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
4719         if (rc)
4720                 GOTO(out, rc);
4721
4722         /* delete LMV */
4723         rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4724         if (rc)
4725                 GOTO(out, rc);
4726
4727         /* don't set nlink from parent */
4728         attr->la_valid &= ~LA_NLINK;
4729
4730         rc = mdo_attr_set(env, stripe, attr, handle);
4731         if (rc)
4732                 GOTO(out, rc);
4733
4734         /* delete dir name from parent */
4735         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
4736         if (rc)
4737                 GOTO(out, rc);
4738
4739         /* insert stripe to parent with dir name */
4740         rc = __mdd_index_insert_only(env, pobj, mdo2fid(stripe), attr->la_mode,
4741                                      lname->ln_name, handle);
4742         if (rc)
4743                 GOTO(out, rc);
4744
4745         /* destroy dir obj */
4746         rc = mdo_ref_del(env, obj, handle);
4747         if (rc)
4748                 GOTO(out, rc);
4749
4750         rc = mdo_ref_del(env, obj, handle);
4751         if (rc)
4752                 GOTO(out, rc);
4753
4754         rc = mdo_destroy(env, obj, handle);
4755         if (rc)
4756                 GOTO(out, rc);
4757
4758         EXIT;
4759 out:
4760         mdd_write_unlock(env, obj);
4761         mdd_write_unlock(env, pobj);
4762
4763         return rc;
4764 }
4765
4766 /*
4767  * shrink directory stripes to lum_stripe_count specified by lum_mds_md.
4768  */
4769 int mdd_dir_layout_shrink(const struct lu_env *env,
4770                           struct md_object *md_obj,
4771                           const struct lu_buf *lmu_buf)
4772 {
4773         struct mdd_device *mdd = mdo2mdd(md_obj);
4774         struct mdd_thread_info *info = mdd_env_info(env);
4775         struct mdd_object *obj = md2mdd_obj(md_obj);
4776         struct mdd_object *pobj = NULL;
4777         struct mdd_object *stripe = NULL;
4778         struct lu_attr *attr = &info->mti_pattr;
4779         struct lu_fid *fid = &info->mti_fid2;
4780         struct lu_name lname = { NULL };
4781         struct lu_buf lmv_buf = { NULL };
4782         struct lmv_mds_md_v1 *lmv;
4783         struct lmv_user_md *lmu;
4784         struct thandle *handle;
4785         int rc;
4786
4787         ENTRY;
4788
4789         rc = mdd_la_get(env, obj, attr);
4790         if (rc)
4791                 RETURN(rc);
4792
4793         if (!S_ISDIR(attr->la_mode))
4794                 RETURN(-ENOTDIR);
4795
4796         rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
4797         if (rc < 0)
4798                 RETURN(rc);
4799
4800         lmv = lmv_buf.lb_buf;
4801         lmu = lmu_buf->lb_buf;
4802
4803         /* this was checked in MDT */
4804         LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
4805                 le32_to_cpu(lmv->lmv_stripe_count));
4806
4807         rc = mdd_dir_iterate_stripes(env, obj, &lmv_buf, lmu_buf, NULL,
4808                                      mdd_shrink_stripe_is_empty);
4809         if (rc < 0)
4810                 GOTO(out, rc);
4811         else if (rc != 0)
4812                 GOTO(out, rc = -ENOTEMPTY);
4813
4814         /*
4815          * if obj stripe count will be shrunk to 1, we need to convert it to a
4816          * normal dir, which will change its fid and update parent namespace,
4817          * get obj name and parent fid from linkea.
4818          */
4819         if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
4820                 struct linkea_data *ldata = &info->mti_link_data;
4821                 char *filename = info->mti_name;
4822
4823                 rc = mdd_links_read(env, obj, ldata);
4824                 if (rc)
4825                         GOTO(out, rc);
4826
4827                 if (ldata->ld_leh->leh_reccount > 1)
4828                         GOTO(out, rc = -EINVAL);
4829
4830                 linkea_first_entry(ldata);
4831                 if (!ldata->ld_lee)
4832                         GOTO(out, rc = -ENODATA);
4833
4834                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
4835                                     fid);
4836
4837                 /* Note: lname might miss \0 at the end */
4838                 snprintf(filename, sizeof(info->mti_name), "%.*s",
4839                          lname.ln_namelen, lname.ln_name);
4840                 lname.ln_name = filename;
4841
4842                 pobj = mdd_object_find(env, mdd, fid);
4843                 if (IS_ERR(pobj)) {
4844                         rc = PTR_ERR(pobj);
4845                         pobj = NULL;
4846                         GOTO(out, rc);
4847                 }
4848
4849                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
4850
4851                 stripe = mdd_object_find(env, mdd, fid);
4852                 if (IS_ERR(stripe)) {
4853                         mdd_object_put(env, pobj);
4854                         pobj = NULL;
4855                         GOTO(out, rc = PTR_ERR(stripe));
4856                 }
4857         }
4858
4859         handle = mdd_trans_create(env, mdd);
4860         if (IS_ERR(handle))
4861                 GOTO(out, rc = PTR_ERR(handle));
4862
4863         rc = __mdd_dir_declare_layout_shrink(env, pobj, obj, stripe, attr,
4864                                              &lmv_buf, lmu_buf, &lname, handle);
4865         if (rc)
4866                 GOTO(stop_trans, rc);
4867
4868         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
4869                                          handle);
4870         if (rc)
4871                 GOTO(stop_trans, rc);
4872
4873         rc = mdd_trans_start(env, mdd, handle);
4874         if (rc)
4875                 GOTO(stop_trans, rc);
4876
4877         rc = __mdd_dir_layout_shrink(env, pobj, obj, stripe, attr, &lmv_buf,
4878                                      lmu_buf, &lname, handle);
4879         if (rc)
4880                 GOTO(stop_trans, rc);
4881
4882         rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
4883                                             XATTR_NAME_LMV, handle);
4884         GOTO(stop_trans, rc);
4885
4886 stop_trans:
4887         rc = mdd_trans_stop(env, mdd, rc, handle);
4888 out:
4889         if (pobj) {
4890                 mdd_object_put(env, stripe);
4891                 mdd_object_put(env, pobj);
4892         }
4893         lu_buf_free(&lmv_buf);
4894         return rc;
4895 }
4896
4897 const struct md_dir_operations mdd_dir_ops = {
4898         .mdo_is_subdir     = mdd_is_subdir,
4899         .mdo_lookup        = mdd_lookup,
4900         .mdo_create        = mdd_create,
4901         .mdo_rename        = mdd_rename,
4902         .mdo_link          = mdd_link,
4903         .mdo_unlink        = mdd_unlink,
4904         .mdo_create_data   = mdd_create_data,
4905         .mdo_migrate       = mdd_migrate,
4906 };