Whamcloud - gitweb
e9380ffb0c1d7bae65d0b3c95a3d3e30a10764fa
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46
47 #include "mdd_internal.h"
48
49 static const char dot[] = ".";
50 static const char dotdot[] = "..";
51
52 static struct lu_name lname_dotdot = {
53         .ln_name        = (char *) dotdot,
54         .ln_namelen     = sizeof(dotdot) - 1,
55 };
56
57 static inline int
58 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
59 {
60         if (!lu_name_is_valid(ln))
61                 return -EINVAL;
62         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
63                 return -ENAMETOOLONG;
64         else
65                 return 0;
66 }
67
68 /* Get FID from name and parent */
69 static int
70 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
71              const struct lu_attr *pattr, const struct lu_name *lname,
72              struct lu_fid* fid, int mask)
73 {
74         const char *name                = lname->ln_name;
75         const struct dt_key *key        = (const struct dt_key *)name;
76         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
77         struct mdd_device *m            = mdo2mdd(pobj);
78         struct dt_object *dir           = mdd_object_child(mdd_obj);
79         int rc;
80         ENTRY;
81
82         if (unlikely(mdd_is_dead_obj(mdd_obj)))
83                 RETURN(-ESTALE);
84
85         if (!mdd_object_exists(mdd_obj))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         }
92
93         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
94                                             MOR_TGT_PARENT);
95         if (rc)
96                 RETURN(rc);
97
98         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
99                    dt_try_as_dir(env, dir)))
100                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
101         else
102                 rc = -ENOTDIR;
103
104         RETURN(rc);
105 }
106
107 int mdd_lookup(const struct lu_env *env,
108                struct md_object *pobj, const struct lu_name *lname,
109                struct lu_fid *fid, struct md_op_spec *spec)
110 {
111         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
112         int rc;
113         ENTRY;
114
115         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
116         if (rc != 0)
117                 RETURN(rc);
118
119         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
120                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
121         RETURN(rc);
122 }
123
124 /** Read the link EA into a temp buffer.
125  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
126  * A pointer to the buffer is stored in \a ldata::ld_buf.
127  *
128  * \retval 0 or error
129  */
130 static int __mdd_links_read(const struct lu_env *env,
131                             struct mdd_object *mdd_obj,
132                             struct linkea_data *ldata)
133 {
134         int rc;
135
136         if (!mdd_object_exists(mdd_obj))
137                 return -ENODATA;
138
139         /* First try a small buf */
140         LASSERT(env != NULL);
141         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
142                                                PAGE_SIZE);
143         if (ldata->ld_buf->lb_buf == NULL)
144                 return -ENOMEM;
145
146         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
147         if (rc == -ERANGE) {
148                 /* Buf was too small, figure out what we need. */
149                 lu_buf_free(ldata->ld_buf);
150                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
151                                    XATTR_NAME_LINK);
152                 if (rc < 0)
153                         return rc;
154                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
155                 if (ldata->ld_buf->lb_buf == NULL)
156                         return -ENOMEM;
157                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
158                                   XATTR_NAME_LINK);
159         }
160         if (rc < 0) {
161                 lu_buf_free(ldata->ld_buf);
162                 ldata->ld_buf = NULL;
163                 return rc;
164         }
165
166         return linkea_init(ldata);
167 }
168
169 static int mdd_links_read(const struct lu_env *env,
170                           struct mdd_object *mdd_obj,
171                           struct linkea_data *ldata)
172 {
173         int rc;
174
175         rc = __mdd_links_read(env, mdd_obj, ldata);
176         if (!rc)
177                 rc = linkea_init(ldata);
178
179         return rc;
180 }
181
182 static int mdd_links_read_with_rec(const struct lu_env *env,
183                                    struct mdd_object *mdd_obj,
184                                    struct linkea_data *ldata)
185 {
186         int rc;
187
188         rc = __mdd_links_read(env, mdd_obj, ldata);
189         if (!rc)
190                 rc = linkea_init_with_rec(ldata);
191
192         return rc;
193 }
194
195 /**
196  * Get parent FID of the directory
197  *
198  * Read parent FID from linkEA, if that fails, then do lookup
199  * dotdot to get the parent FID.
200  *
201  * \param[in] env       execution environment
202  * \param[in] obj       object from which to find the parent FID
203  * \param[in] attr      attribute of the object
204  * \param[out] fid      fid to get the parent FID
205  *
206  * \retval              0 if getting the parent FID succeeds.
207  * \retval              negative errno if getting the parent FID fails.
208  **/
209 static inline int mdd_parent_fid(const struct lu_env *env,
210                                  struct mdd_object *obj,
211                                  const struct lu_attr *attr,
212                                  struct lu_fid *fid)
213 {
214         struct mdd_thread_info  *info = mdd_env_info(env);
215         struct linkea_data      ldata = { NULL };
216         struct lu_buf           *buf = &info->mti_link_buf;
217         struct lu_name          lname;
218         int                     rc = 0;
219
220         ENTRY;
221
222         LASSERT(S_ISDIR(mdd_object_type(obj)));
223
224         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
225         if (buf->lb_buf == NULL)
226                 GOTO(lookup, rc = 0);
227
228         ldata.ld_buf = buf;
229         rc = mdd_links_read_with_rec(env, obj, &ldata);
230         if (rc != 0)
231                 GOTO(lookup, rc);
232
233         LASSERT(ldata.ld_leh != NULL);
234         /* Directory should only have 1 parent */
235         if (ldata.ld_leh->leh_reccount > 1)
236                 GOTO(lookup, rc);
237
238         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
239
240         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
241         if (likely(fid_is_sane(fid)))
242                 RETURN(0);
243 lookup:
244         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
245         RETURN(rc);
246 }
247
248 /*
249  * For root fid use special function, which does not compare version component
250  * of fid. Version component is different for root fids on all MDTs.
251  */
252 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
253 {
254         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
255                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
256 }
257
258 /*
259  * return 1: if \a tfid is the fid of the ancestor of \a mo;
260  * return 0: if not;
261  * otherwise: values < 0, errors.
262  */
263 static int mdd_is_parent(const struct lu_env *env,
264                         struct mdd_device *mdd,
265                         struct mdd_object *mo,
266                         const struct lu_attr *attr,
267                         const struct lu_fid *tfid)
268 {
269         struct mdd_object *mp;
270         struct lu_fid *pfid;
271         int rc;
272
273         LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
274         pfid = &mdd_env_info(env)->mti_fid;
275
276         if (mdd_is_root(mdd, mdo2fid(mo)))
277                 return 0;
278
279         if (mdd_is_root(mdd, tfid))
280                 return 1;
281
282         rc = mdd_parent_fid(env, mo, attr, pfid);
283         if (rc)
284                 return rc;
285
286         while (1) {
287                 if (lu_fid_eq(pfid, tfid))
288                         return 1;
289
290                 if (mdd_is_root(mdd, pfid))
291                         return 0;
292
293                 mp = mdd_object_find(env, mdd, pfid);
294                 if (IS_ERR(mp))
295                         return PTR_ERR(mp);
296
297                 if (!mdd_object_exists(mp)) {
298                         mdd_object_put(env, mp);
299                         return -ENOENT;
300                 }
301
302                 rc = mdd_parent_fid(env, mp, attr, pfid);
303                 mdd_object_put(env, mp);
304                 if (rc)
305                         return rc;
306         }
307
308         return 0;
309 }
310
311 /*
312  * No permission check is needed.
313  *
314  * returns 1: if fid is ancestor of @mo;
315  * returns 0: if fid is not an ancestor of @mo;
316  * returns < 0: if error
317  */
318 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
319                   const struct lu_fid *fid)
320 {
321         struct mdd_device *mdd = mdo2mdd(mo);
322         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
323         int rc;
324         ENTRY;
325
326         if (!mdd_object_exists(md2mdd_obj(mo)))
327                 RETURN(-ENOENT);
328
329         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
330                 RETURN(-ENOTDIR);
331
332         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
333         if (rc != 0)
334                 RETURN(rc);
335
336         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
337         RETURN(rc);
338 }
339
340 /*
341  * Check that @dir contains no entries except (possibly) dot and dotdot.
342  *
343  * Returns:
344  *
345  *             0        empty
346  *      -ENOTDIR        not a directory object
347  *    -ENOTEMPTY        not empty
348  *           -ve        other error
349  *
350  */
351 static int mdd_dir_is_empty(const struct lu_env *env,
352                             struct mdd_object *dir)
353 {
354         struct dt_it     *it;
355         struct dt_object *obj;
356         const struct dt_it_ops *iops;
357         int result;
358         ENTRY;
359
360         obj = mdd_object_child(dir);
361         if (!dt_try_as_dir(env, obj))
362                 RETURN(-ENOTDIR);
363
364         iops = &obj->do_index_ops->dio_it;
365         it = iops->init(env, obj, LUDA_64BITHASH);
366         if (!IS_ERR(it)) {
367                 result = iops->get(env, it, (const struct dt_key *)"");
368                 if (result > 0) {
369                         int i;
370                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
371                                 result = iops->next(env, it);
372                         if (result == 0)
373                                 result = -ENOTEMPTY;
374                         else if (result == 1)
375                                 result = 0;
376                 } else if (result == 0)
377                         /*
378                          * Huh? Index contains no zero key?
379                          */
380                         result = -EIO;
381
382                 iops->put(env, it);
383                 iops->fini(env, it);
384         } else
385                 result = PTR_ERR(it);
386         RETURN(result);
387 }
388
389 /**
390  * Determine if the target object can be hard linked, and right now it only
391  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
392  * directory nlink count might exceed the maximum link count(see
393  * osd_object_ref_add), so it only check nlink for non-directories.
394  *
395  * \param[in] env       thread environment
396  * \param[in] obj       object being linked to
397  * \param[in] la        attributes of \a obj
398  *
399  * \retval              0 if \a obj can be hard linked
400  * \retval              negative error if \a obj is a directory or has too
401  *                      many links
402  */
403 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
404                           const struct lu_attr *la)
405 {
406         struct mdd_device *m = mdd_obj2mdd_dev(obj);
407         ENTRY;
408
409         LASSERT(la != NULL);
410
411         /* Subdir count limitation can be broken through
412          * (see osd_object_ref_add), so only check non-directory here. */
413         if (!S_ISDIR(la->la_mode) &&
414             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
415                 RETURN(-EMLINK);
416
417         RETURN(0);
418 }
419
420 /**
421  * Check whether it may create the cobj under the pobj.
422  *
423  * \param[in] env       execution environment
424  * \param[in] pobj      the parent directory
425  * \param[in] pattr     the attribute of the parent directory
426  * \param[in] cobj      the child to be created
427  * \param[in] check_perm        if check WRITE|EXEC permission for parent
428  *
429  * \retval              = 0 create the child under this dir is allowed
430  * \retval              negative errno create the child under this dir is
431  *                      not allowed
432  */
433 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
434                    const struct lu_attr *pattr, struct mdd_object *cobj,
435                    bool check_perm)
436 {
437         int rc = 0;
438         ENTRY;
439
440         if (cobj && mdd_object_exists(cobj))
441                 RETURN(-EEXIST);
442
443         if (mdd_is_dead_obj(pobj))
444                 RETURN(-ENOENT);
445
446         if (check_perm)
447                 rc = mdd_permission_internal_locked(env, pobj, pattr,
448                                                     MAY_WRITE | MAY_EXEC,
449                                                     MOR_TGT_PARENT);
450         RETURN(rc);
451 }
452
453 /*
454  * Check whether can unlink from the pobj in the case of "cobj == NULL".
455  */
456 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
457                    const struct lu_attr *pattr, const struct lu_attr *attr)
458 {
459         int rc;
460         ENTRY;
461
462         if (mdd_is_dead_obj(pobj))
463                 RETURN(-ENOENT);
464
465         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
466                 RETURN(-EPERM);
467
468         rc = mdd_permission_internal_locked(env, pobj, pattr,
469                                             MAY_WRITE | MAY_EXEC,
470                                             MOR_TGT_PARENT);
471         if (rc != 0)
472                 RETURN(rc);
473
474         if (pattr->la_flags & LUSTRE_APPEND_FL)
475                 RETURN(-EPERM);
476
477         RETURN(rc);
478 }
479
480 /*
481  * pobj == NULL is remote ops case, under such case, pobj's
482  * VTX feature has been checked already, no need check again.
483  */
484 static inline int mdd_is_sticky(const struct lu_env *env,
485                                 struct mdd_object *pobj,
486                                 const struct lu_attr *pattr,
487                                 struct mdd_object *cobj,
488                                 const struct lu_attr *cattr)
489 {
490         struct lu_ucred *uc = lu_ucred_assert(env);
491
492         if (pobj != NULL) {
493                 LASSERT(pattr != NULL);
494                 if (!(pattr->la_mode & S_ISVTX) ||
495                     (pattr->la_uid == uc->uc_fsuid))
496                         return 0;
497         }
498
499         LASSERT(cattr != NULL);
500         if (cattr->la_uid == uc->uc_fsuid)
501                 return 0;
502
503         return !md_capable(uc, CFS_CAP_FOWNER);
504 }
505
506 static int mdd_may_delete_entry(const struct lu_env *env,
507                                 struct mdd_object *pobj,
508                                 const struct lu_attr *pattr,
509                                 int check_perm)
510 {
511         ENTRY;
512
513         LASSERT(pobj != NULL);
514         if (!mdd_object_exists(pobj))
515                 RETURN(-ENOENT);
516
517         if (mdd_is_dead_obj(pobj))
518                 RETURN(-ENOENT);
519
520         if (check_perm) {
521                 int rc;
522                 rc = mdd_permission_internal_locked(env, pobj, pattr,
523                                             MAY_WRITE | MAY_EXEC,
524                                             MOR_TGT_PARENT);
525                 if (rc)
526                         RETURN(rc);
527         }
528
529         if (pattr->la_flags & LUSTRE_APPEND_FL)
530                 RETURN(-EPERM);
531
532         RETURN(0);
533 }
534
535 /*
536  * Check whether it may delete the cobj from the pobj.
537  * pobj maybe NULL
538  */
539 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
540                    const struct lu_attr *tpattr, struct mdd_object *tobj,
541                    const struct lu_attr *tattr, const struct lu_attr *cattr,
542                    int check_perm, int check_empty)
543 {
544         int rc = 0;
545         ENTRY;
546
547         if (tpobj) {
548                 LASSERT(tpattr != NULL);
549                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
550                 if (rc != 0)
551                         RETURN(rc);
552         }
553
554         if (tobj == NULL)
555                 RETURN(0);
556
557         if (!mdd_object_exists(tobj))
558                 RETURN(-ENOENT);
559
560         if (mdd_is_dead_obj(tobj))
561                 RETURN(-ESTALE);
562
563         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
564                 RETURN(-EPERM);
565
566         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
567                 RETURN(-EPERM);
568
569         /* additional check the rename case */
570         if (cattr) {
571                 if (S_ISDIR(cattr->la_mode)) {
572                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
573
574                         if (!S_ISDIR(tattr->la_mode))
575                                 RETURN(-ENOTDIR);
576
577                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
578                                 RETURN(-EBUSY);
579                 } else if (S_ISDIR(tattr->la_mode))
580                         RETURN(-EISDIR);
581         }
582
583         if (S_ISDIR(tattr->la_mode) && check_empty)
584                 rc = mdd_dir_is_empty(env, tobj);
585
586         RETURN(rc);
587 }
588
589 /**
590  * Check whether it can create the link file(linked to @src_obj) under
591  * the target directory(@tgt_obj), and src_obj has been locked by
592  * mdd_write_lock.
593  *
594  * \param[in] env       execution environment
595  * \param[in] tgt_obj   the target directory
596  * \param[in] tattr     attributes of target directory
597  * \param[in] lname     the link name
598  * \param[in] src_obj   source object for link
599  * \param[in] cattr     attributes for source object
600  *
601  * \retval              = 0 it is allowed to create the link file under tgt_obj
602  * \retval              negative error not allowed to create the link file
603  */
604 static int mdd_link_sanity_check(const struct lu_env *env,
605                                  struct mdd_object *tgt_obj,
606                                  const struct lu_attr *tattr,
607                                  const struct lu_name *lname,
608                                  struct mdd_object *src_obj,
609                                  const struct lu_attr *cattr)
610 {
611         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
612         int rc = 0;
613         ENTRY;
614
615         if (!mdd_object_exists(src_obj))
616                 RETURN(-ENOENT);
617
618         if (mdd_is_dead_obj(src_obj))
619                 RETURN(-ESTALE);
620
621         /* Local ops, no lookup before link, check filename length here. */
622         rc = mdd_name_check(m, lname);
623         if (rc < 0)
624                 RETURN(rc);
625
626         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
627                 RETURN(-EPERM);
628
629         if (S_ISDIR(mdd_object_type(src_obj)))
630                 RETURN(-EPERM);
631
632         LASSERT(src_obj != tgt_obj);
633         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
634         if (rc != 0)
635                 RETURN(rc);
636
637         rc = __mdd_may_link(env, src_obj, cattr);
638
639         RETURN(rc);
640 }
641
642 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
643                                    const char *name, struct thandle *handle)
644 {
645         struct dt_object *next = mdd_object_child(pobj);
646         int rc;
647         ENTRY;
648
649         if (dt_try_as_dir(env, next))
650                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
651         else
652                 rc = -ENOTDIR;
653
654         RETURN(rc);
655 }
656
657 static int __mdd_index_insert_only(const struct lu_env *env,
658                                    struct mdd_object *pobj,
659                                    const struct lu_fid *lf, __u32 type,
660                                    const char *name,
661                                    struct thandle *handle)
662 {
663         struct dt_object *next = mdd_object_child(pobj);
664         int               rc;
665         ENTRY;
666
667         if (dt_try_as_dir(env, next)) {
668                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
669                 struct lu_ucred         *uc  = lu_ucred_check(env);
670                 int                      ignore_quota;
671
672                 rec->rec_fid = lf;
673                 rec->rec_type = type;
674                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
675                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
676                                (const struct dt_key *)name, handle,
677                                ignore_quota);
678         } else {
679                 rc = -ENOTDIR;
680         }
681         RETURN(rc);
682 }
683
684 /* insert named index, add reference if isdir */
685 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
686                               const struct lu_fid *lf, __u32 type,
687                               const char *name, struct thandle *handle)
688 {
689         int rc;
690         ENTRY;
691
692         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
693         if (rc == 0 && S_ISDIR(type)) {
694                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
695                 mdo_ref_add(env, pobj, handle);
696                 mdd_write_unlock(env, pobj);
697         }
698
699         RETURN(rc);
700 }
701
702 /* delete named index, drop reference if isdir */
703 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
704                               const char *name, int is_dir,
705                               struct thandle *handle)
706 {
707         int               rc;
708         ENTRY;
709
710         rc = __mdd_index_delete_only(env, pobj, name, handle);
711         if (rc == 0 && is_dir) {
712                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
713                 mdo_ref_del(env, pobj, handle);
714                 mdd_write_unlock(env, pobj);
715         }
716
717         RETURN(rc);
718 }
719
720 static int mdd_llog_record_calc_size(const struct lu_env *env,
721                                      const struct lu_name *tname,
722                                      const struct lu_name *sname)
723 {
724         const struct lu_ucred   *uc = lu_ucred(env);
725         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
726         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
727
728         if (sname != NULL)
729                 crf |= CLF_RENAME;
730
731         if (uc != NULL && uc->uc_jobid[0] != '\0')
732                 crf |= CLF_JOBID;
733
734         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
735                              changelog_rec_offset(crf, crfe) +
736                              (tname != NULL ? tname->ln_namelen : 0) +
737                              (sname != NULL ? 1 + sname->ln_namelen : 0));
738 }
739
740 int mdd_declare_changelog_store(const struct lu_env *env,
741                                 struct mdd_device *mdd,
742                                 enum changelog_rec_type type,
743                                 const struct lu_name *tname,
744                                 const struct lu_name *sname,
745                                 struct thandle *handle)
746 {
747         struct obd_device               *obd = mdd2obd_dev(mdd);
748         struct llog_ctxt                *ctxt;
749         struct llog_changelog_rec       *rec;
750         struct lu_buf                   *buf;
751         struct thandle                  *llog_th;
752         int                              reclen;
753         int                              rc;
754
755         if (!mdd_changelog_enabled(env, mdd, type))
756                 return 0;
757
758         reclen = mdd_llog_record_calc_size(env, tname, sname);
759         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
760         if (buf->lb_buf == NULL)
761                 return -ENOMEM;
762
763         rec = buf->lb_buf;
764         rec->cr_hdr.lrh_len = reclen;
765         rec->cr_hdr.lrh_type = CHANGELOG_REC;
766
767         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
768         if (ctxt == NULL)
769                 return -ENXIO;
770
771         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
772         if (IS_ERR(llog_th))
773                 GOTO(out_put, rc = PTR_ERR(llog_th));
774
775         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
776
777 out_put:
778         llog_ctxt_put(ctxt);
779
780         return rc;
781 }
782
783 /** Add a changelog entry \a rec to the changelog llog
784  * \param mdd
785  * \param rec
786  * \param handle - currently ignored since llogs start their own transaction;
787  *              this will hopefully be fixed in llog rewrite
788  * \retval 0 ok
789  */
790 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
791                         struct llog_changelog_rec *rec, struct thandle *th)
792 {
793         struct obd_device       *obd = mdd2obd_dev(mdd);
794         struct llog_ctxt        *ctxt;
795         struct thandle          *llog_th;
796         int                      rc;
797
798         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
799                                             changelog_rec_varsize(&rec->cr));
800
801         /* llog_lvfs_write_rec sets the llog tail len */
802         rec->cr_hdr.lrh_type = CHANGELOG_REC;
803         rec->cr.cr_time = cl_time();
804
805         spin_lock(&mdd->mdd_cl.mc_lock);
806         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
807          * but as long as the MDD transactions are ordered correctly for e.g.
808          * rename conflicts, I don't think this should matter. */
809         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
810         spin_unlock(&mdd->mdd_cl.mc_lock);
811
812         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
813         if (ctxt == NULL)
814                 return -ENXIO;
815
816         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
817         if (IS_ERR(llog_th))
818                 GOTO(out_put, rc = PTR_ERR(llog_th));
819
820         /* nested journal transaction */
821         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
822
823         /* time to recover some space ?? */
824         if (likely(!mdd->mdd_changelog_gc ||
825                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
826                    mdd->mdd_changelog_min_gc_interval >=
827                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
828                 /* save a spin_lock trip */
829                 goto out_put;
830         spin_lock(&mdd->mdd_cl.mc_lock);
831         if (likely(mdd->mdd_changelog_gc &&
832                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
833                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
834                         mdd->mdd_changelog_min_gc_interval)) {
835                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
836                              mdd->mdd_changelog_min_free_cat_entries ||
837                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
838                         CWARN("%s:%s low on changelog_catalog free entries, "
839                               "starting ChangeLog garbage collection thread\n",
840                               obd->obd_name,
841                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
842                                 " simulate" : "");
843
844                         /* indicate further kthread run will occur outside
845                          * right after current journal transaction filling has
846                          * completed
847                          */
848                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
849                 }
850                 /* next check in mdd_changelog_min_gc_interval anyway
851                  */
852                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
853         }
854         spin_unlock(&mdd->mdd_cl.mc_lock);
855 out_put:
856         llog_ctxt_put(ctxt);
857         if (rc > 0)
858                 rc = 0;
859         return rc;
860 }
861
862 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
863                                          const struct lu_fid *sfid,
864                                          const struct lu_fid *spfid,
865                                          const struct lu_name *sname)
866 {
867         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
868         size_t extsize;
869
870         LASSERT(sfid != NULL);
871         LASSERT(spfid != NULL);
872         LASSERT(sname != NULL);
873
874         extsize = sname->ln_namelen + 1;
875
876         rnm->cr_sfid = *sfid;
877         rnm->cr_spfid = *spfid;
878
879         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
880         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
881         rec->cr_namelen += extsize;
882 }
883
884 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
885 {
886         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
887
888         if (jobid == NULL || jobid[0] == '\0')
889                 return;
890
891         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
892 }
893
894 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
895 {
896         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
897
898         ef->cr_extra_flags = eflags;
899 }
900
901 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
902                                     __u64 uid, __u64 gid)
903 {
904         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
905
906         uidgid->cr_uid = uid;
907         uidgid->cr_gid = gid;
908 }
909
910 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
911                                  lnet_nid_t nid)
912 {
913         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
914
915         clnid->cr_nid = nid;
916 }
917
918 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, int flags)
919 {
920         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
921
922         omd->cr_openflags = (__u32)flags;
923 }
924
925 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
926                                    const char *xattr_name)
927 {
928         struct changelog_ext_xattr    *xattr = changelog_rec_xattr(rec);
929
930         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
931 }
932
933 /** Store a namespace change changelog record
934  * If this fails, we must fail the whole transaction; we don't
935  * want the change to commit without the log entry.
936  * \param target - mdd_object of change
937  * \param tpfid - target parent dir/object fid
938  * \param sfid - source object fid
939  * \param spfid - source parent fid
940  * \param tname - target name string
941  * \param sname - source name string
942  * \param handle - transaction handle
943  */
944 int mdd_changelog_ns_store(const struct lu_env *env,
945                            struct mdd_device *mdd,
946                            enum changelog_rec_type type,
947                            enum changelog_rec_flags crf,
948                            struct mdd_object *target,
949                            const struct lu_fid *tpfid,
950                            const struct lu_fid *sfid,
951                            const struct lu_fid *spfid,
952                            const struct lu_name *tname,
953                            const struct lu_name *sname,
954                            struct thandle *handle)
955 {
956         const struct lu_ucred           *uc = lu_ucred(env);
957         struct llog_changelog_rec       *rec;
958         struct lu_buf                   *buf;
959         int                              reclen;
960         __u64                            xflags = CLFE_INVALID;
961         int                              rc;
962         ENTRY;
963
964         if (!mdd_changelog_enabled(env, mdd, type))
965                 RETURN(0);
966
967         LASSERT(tpfid != NULL);
968         LASSERT(tname != NULL);
969         LASSERT(handle != NULL);
970
971         reclen = mdd_llog_record_calc_size(env, tname, sname);
972         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
973         if (buf->lb_buf == NULL)
974                 RETURN(-ENOMEM);
975         rec = buf->lb_buf;
976
977         crf &= CLF_FLAGMASK;
978         crf |= CLF_EXTRA_FLAGS;
979
980         if (uc) {
981                 if (uc->uc_jobid[0] != '\0')
982                         crf |= CLF_JOBID;
983                 xflags |= CLFE_UIDGID;
984                 xflags |= CLFE_NID;
985         }
986
987         if (sname != NULL)
988                 crf |= CLF_RENAME;
989         else
990                 crf |= CLF_VERSION;
991
992         rec->cr.cr_flags = crf;
993
994         if (crf & CLF_EXTRA_FLAGS) {
995                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
996                 if (xflags & CLFE_UIDGID)
997                         mdd_changelog_rec_extra_uidgid(&rec->cr,
998                                                        uc->uc_uid, uc->uc_gid);
999                 if (xflags & CLFE_NID)
1000                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
1001         }
1002
1003         rec->cr.cr_type = (__u32)type;
1004         rec->cr.cr_pfid = *tpfid;
1005         rec->cr.cr_namelen = tname->ln_namelen;
1006         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1007
1008         if (crf & CLF_RENAME)
1009                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1010
1011         if (crf & CLF_JOBID)
1012                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1013
1014         if (likely(target != NULL)) {
1015                 rec->cr.cr_tfid = *mdo2fid(target);
1016                 target->mod_cltime = ktime_get();
1017         } else {
1018                 fid_zero(&rec->cr.cr_tfid);
1019         }
1020
1021         rc = mdd_changelog_store(env, mdd, rec, handle);
1022         if (rc < 0) {
1023                 CERROR("%s: cannot store changelog record: type = %d, "
1024                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1025                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1026                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1027                 return -EFAULT;
1028         }
1029
1030         return 0;
1031 }
1032
1033 static int __mdd_links_add(const struct lu_env *env,
1034                            struct mdd_object *mdd_obj,
1035                            struct linkea_data *ldata,
1036                            const struct lu_name *lname,
1037                            const struct lu_fid *pfid,
1038                            int first, int check)
1039 {
1040         int rc;
1041
1042         if (ldata->ld_leh == NULL) {
1043                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1044                 if (rc) {
1045                         if (rc != -ENODATA)
1046                                 return rc;
1047                         rc = linkea_data_new(ldata,
1048                                              &mdd_env_info(env)->mti_link_buf);
1049                         if (rc)
1050                                 return rc;
1051                 }
1052         }
1053
1054         if (check) {
1055                 rc = linkea_links_find(ldata, lname, pfid);
1056                 if (rc && rc != -ENOENT)
1057                         return rc;
1058                 if (rc == 0)
1059                         return -EEXIST;
1060         }
1061
1062         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1063                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1064
1065                 *tfid = *pfid;
1066                 tfid->f_ver = ~0;
1067                 linkea_add_buf(ldata, lname, tfid);
1068         }
1069
1070         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1071                 linkea_add_buf(ldata, lname, pfid);
1072
1073         return linkea_add_buf(ldata, lname, pfid);
1074 }
1075
1076 static int __mdd_links_del(const struct lu_env *env,
1077                            struct mdd_object *mdd_obj,
1078                            struct linkea_data *ldata,
1079                            const struct lu_name *lname,
1080                            const struct lu_fid *pfid)
1081 {
1082         int rc;
1083
1084         if (ldata->ld_leh == NULL) {
1085                 rc = mdd_links_read(env, mdd_obj, ldata);
1086                 if (rc)
1087                         return rc;
1088         }
1089
1090         rc = linkea_links_find(ldata, lname, pfid);
1091         if (rc)
1092                 return rc;
1093
1094         linkea_del_buf(ldata, lname);
1095         return 0;
1096 }
1097
1098 static int mdd_linkea_prepare(const struct lu_env *env,
1099                               struct mdd_object *mdd_obj,
1100                               const struct lu_fid *oldpfid,
1101                               const struct lu_name *oldlname,
1102                               const struct lu_fid *newpfid,
1103                               const struct lu_name *newlname,
1104                               int first, int check,
1105                               struct linkea_data *ldata)
1106 {
1107         int rc = 0;
1108         ENTRY;
1109
1110         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1111                 RETURN(0);
1112
1113         LASSERT(oldpfid != NULL || newpfid != NULL);
1114
1115         if (mdd_obj->mod_flags & DEAD_OBJ)
1116                 /* Unnecessary to update linkEA for dead object.  */
1117                 RETURN(0);
1118
1119         if (oldpfid != NULL) {
1120                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1121                 if (rc) {
1122                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1123                                 RETURN(rc);
1124
1125                         /* No changes done. */
1126                         rc = 0;
1127                 }
1128         }
1129
1130         /* If renaming, add the new record */
1131         if (newpfid != NULL)
1132                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1133                                      first, check);
1134
1135         RETURN(rc);
1136 }
1137
1138 int mdd_links_rename(const struct lu_env *env,
1139                      struct mdd_object *mdd_obj,
1140                      const struct lu_fid *oldpfid,
1141                      const struct lu_name *oldlname,
1142                      const struct lu_fid *newpfid,
1143                      const struct lu_name *newlname,
1144                      struct thandle *handle,
1145                      struct linkea_data *ldata,
1146                      int first, int check)
1147 {
1148         int rc = 0;
1149         ENTRY;
1150
1151         if (ldata == NULL) {
1152                 ldata = &mdd_env_info(env)->mti_link_data;
1153                 memset(ldata, 0, sizeof(*ldata));
1154                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1155                                         newpfid, newlname, first, check, ldata);
1156                 if (rc)
1157                         GOTO(out, rc);
1158         }
1159
1160         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1161                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1162
1163         GOTO(out, rc);
1164
1165 out:
1166         if (rc != 0) {
1167                 if (newlname == NULL)
1168                         CERROR("link_ea add failed %d "DFID"\n",
1169                                rc, PFID(mdd_object_fid(mdd_obj)));
1170                 else if (oldpfid == NULL)
1171                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1172                                newlname->ln_namelen, newlname->ln_name, rc,
1173                                PFID(mdd_object_fid(mdd_obj)));
1174                 else if (newpfid == NULL)
1175                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1176                                oldlname->ln_namelen, oldlname->ln_name, rc,
1177                                PFID(mdd_object_fid(mdd_obj)));
1178                 else
1179                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1180                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1181                                newlname->ln_namelen, newlname->ln_name, rc,
1182                                PFID(mdd_object_fid(mdd_obj)));
1183         }
1184
1185         if (is_vmalloc_addr(ldata->ld_buf))
1186                 /* if we vmalloced a large buffer drop it */
1187                 lu_buf_free(ldata->ld_buf);
1188
1189         return rc;
1190 }
1191
1192 static inline int mdd_links_add(const struct lu_env *env,
1193                                 struct mdd_object *mdd_obj,
1194                                 const struct lu_fid *pfid,
1195                                 const struct lu_name *lname,
1196                                 struct thandle *handle,
1197                                 struct linkea_data *ldata, int first)
1198 {
1199         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1200                                 pfid, lname, handle, ldata, first, 0);
1201 }
1202
1203 static inline int mdd_links_del(const struct lu_env *env,
1204                                 struct mdd_object *mdd_obj,
1205                                 const struct lu_fid *pfid,
1206                                 const struct lu_name *lname,
1207                                 struct thandle *handle)
1208 {
1209         return mdd_links_rename(env, mdd_obj, pfid, lname,
1210                                 NULL, NULL, handle, NULL, 0, 0);
1211 }
1212
1213 /** Read the link EA into a temp buffer.
1214  * Uses the name_buf since it is generally large.
1215  * \retval IS_ERR err
1216  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1217  */
1218 struct lu_buf *mdd_links_get(const struct lu_env *env,
1219                              struct mdd_object *mdd_obj)
1220 {
1221         struct linkea_data ldata = { NULL };
1222         int rc;
1223
1224         rc = mdd_links_read(env, mdd_obj, &ldata);
1225         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1226 }
1227
1228 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1229                     struct linkea_data *ldata, struct thandle *handle)
1230 {
1231         const struct lu_buf *buf;
1232         int                 rc;
1233
1234         if (ldata == NULL || ldata->ld_buf == NULL ||
1235             ldata->ld_leh == NULL)
1236                 return 0;
1237
1238         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1239                 return 0;
1240
1241 again:
1242         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1243                                 ldata->ld_leh->leh_len);
1244         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1245         if (unlikely(rc == -ENOSPC)) {
1246                 rc = linkea_overflow_shrink(ldata);
1247                 if (likely(rc > 0))
1248                         goto again;
1249         }
1250
1251         return rc;
1252 }
1253
1254 static int mdd_declare_links_add(const struct lu_env *env,
1255                                  struct mdd_object *mdd_obj,
1256                                  struct thandle *handle,
1257                                  struct linkea_data *ldata)
1258 {
1259         int     rc;
1260         int     ea_len;
1261         void    *linkea;
1262
1263         if (ldata != NULL && ldata->ld_leh != NULL) {
1264                 ea_len = ldata->ld_leh->leh_len;
1265                 linkea = ldata->ld_buf->lb_buf;
1266         } else {
1267                 ea_len = MAX_LINKEA_SIZE;
1268                 linkea = NULL;
1269         }
1270
1271         rc = mdo_declare_xattr_set(env, mdd_obj,
1272                                    mdd_buf_get_const(env, linkea, ea_len),
1273                                    XATTR_NAME_LINK, 0, handle);
1274
1275         return rc;
1276 }
1277
1278 static inline int mdd_declare_links_del(const struct lu_env *env,
1279                                         struct mdd_object *c,
1280                                         struct thandle *handle)
1281 {
1282         int rc = 0;
1283
1284         /* For directory, the linkEA will be removed together
1285          * with the object. */
1286         if (!S_ISDIR(mdd_object_type(c)))
1287                 rc = mdd_declare_links_add(env, c, handle, NULL);
1288
1289         return rc;
1290 }
1291
1292 static int mdd_declare_link(const struct lu_env *env,
1293                             struct mdd_device *mdd,
1294                             struct mdd_object *p,
1295                             struct mdd_object *c,
1296                             const struct lu_name *name,
1297                             struct thandle *handle,
1298                             struct lu_attr *la,
1299                             struct linkea_data *data)
1300 {
1301         struct lu_fid tfid = *mdo2fid(c);
1302         int rc;
1303
1304         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1305                 tfid.f_oid = cfs_fail_val;
1306
1307         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1308                                       name->ln_name, handle);
1309         if (rc != 0)
1310                 return rc;
1311
1312         rc = mdo_declare_ref_add(env, c, handle);
1313         if (rc != 0)
1314                 return rc;
1315
1316         la->la_valid = LA_CTIME | LA_MTIME;
1317         rc = mdo_declare_attr_set(env, p, la, handle);
1318         if (rc != 0)
1319                 return rc;
1320
1321         la->la_valid = LA_CTIME;
1322         rc = mdo_declare_attr_set(env, c, la, handle);
1323         if (rc != 0)
1324                 return rc;
1325
1326         rc = mdd_declare_links_add(env, c, handle, data);
1327         if (rc != 0)
1328                 return rc;
1329
1330         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1331                                          handle);
1332
1333         return rc;
1334 }
1335
1336 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1337                     struct md_object *src_obj, const struct lu_name *lname,
1338                     struct md_attr *ma)
1339 {
1340         const char *name = lname->ln_name;
1341         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1342         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1343         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1344         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1345         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1346         struct mdd_device *mdd = mdo2mdd(src_obj);
1347         struct thandle *handle;
1348         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1349         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1350         int rc;
1351         ENTRY;
1352
1353         rc = mdd_la_get(env, mdd_sobj, cattr);
1354         if (rc != 0)
1355                 RETURN(rc);
1356
1357         rc = mdd_la_get(env, mdd_tobj, tattr);
1358         if (rc != 0)
1359                 RETURN(rc);
1360
1361         /*
1362          * If we are using project inheritance, we only allow hard link
1363          * creation in our tree when the project IDs are the same;
1364          * otherwise the tree quota mechanism could be circumvented.
1365          */
1366         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1367             (tattr->la_projid != cattr->la_projid))
1368                 RETURN(-EXDEV);
1369
1370         handle = mdd_trans_create(env, mdd);
1371         if (IS_ERR(handle))
1372                 GOTO(out_pending, rc = PTR_ERR(handle));
1373
1374         memset(ldata, 0, sizeof(*ldata));
1375
1376         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1377         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1378
1379         /* Note: even this function will change ldata, but it comes from
1380          * thread_info, which is completely temporary and only seen in
1381          * this function, so we do not need reset ldata once it fails.*/
1382         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1383                                 lname, 0, 0, ldata);
1384         if (rc != 0)
1385                 GOTO(stop, rc);
1386
1387         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1388                               la, ldata);
1389         if (rc)
1390                 GOTO(stop, rc);
1391
1392         rc = mdd_trans_start(env, mdd, handle);
1393         if (rc)
1394                 GOTO(stop, rc);
1395
1396         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1397         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1398                                    cattr);
1399         if (rc)
1400                 GOTO(out_unlock, rc);
1401
1402         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1403                 rc = mdo_ref_add(env, mdd_sobj, handle);
1404                 if (rc != 0)
1405                         GOTO(out_unlock, rc);
1406         }
1407
1408         *tfid = *mdo2fid(mdd_sobj);
1409         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1410                 tfid->f_oid = cfs_fail_val;
1411
1412         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1413                                      mdd_object_type(mdd_sobj), name, handle);
1414         if (rc != 0) {
1415                 mdo_ref_del(env, mdd_sobj, handle);
1416                 GOTO(out_unlock, rc);
1417         }
1418
1419         la->la_valid = LA_CTIME | LA_MTIME;
1420         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1421         if (rc)
1422                 GOTO(out_unlock, rc);
1423
1424         la->la_valid = LA_CTIME;
1425         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1426         if (rc == 0)
1427                 /* Note: The failure of links_add should not cause the
1428                  * link failure, so do not check return value. */
1429                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1430                               lname, handle, ldata, 0);
1431
1432         EXIT;
1433 out_unlock:
1434         mdd_write_unlock(env, mdd_sobj);
1435         if (rc == 0)
1436                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1437                                             mdo2fid(mdd_tobj), NULL, NULL,
1438                                             lname, NULL, handle);
1439 stop:
1440         rc = mdd_trans_stop(env, mdd, rc, handle);
1441         if (is_vmalloc_addr(ldata->ld_buf))
1442                 /* if we vmalloced a large buffer drop it */
1443                 lu_buf_free(ldata->ld_buf);
1444 out_pending:
1445         return rc;
1446 }
1447
1448 static int mdd_mark_orphan_object(const struct lu_env *env,
1449                                 struct mdd_object *obj, struct thandle *handle,
1450                                 bool declare)
1451 {
1452         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1453         int rc;
1454
1455         if (!S_ISDIR(mdd_object_type(obj)))
1456                 return 0;
1457
1458         attr->la_valid = LA_FLAGS;
1459         attr->la_flags = LUSTRE_ORPHAN_FL;
1460
1461         if (declare)
1462                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1463         else
1464                 rc = mdo_attr_set(env, obj, attr, handle);
1465
1466         return rc;
1467 }
1468
1469 static int mdd_declare_finish_unlink(const struct lu_env *env,
1470                                      struct mdd_object *obj,
1471                                      struct thandle *handle)
1472 {
1473         int rc;
1474
1475         /* Sigh, we do not know if the unlink object will become orphan in
1476          * declare phase, but fortunately the flags here does not matter
1477          * in current declare implementation */
1478         rc = mdd_mark_orphan_object(env, obj, handle, true);
1479         if (rc != 0)
1480                 return rc;
1481
1482         rc = mdo_declare_destroy(env, obj, handle);
1483         if (rc != 0)
1484                 return rc;
1485
1486         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1487         if (rc != 0)
1488                 return rc;
1489
1490         return mdd_declare_links_del(env, obj, handle);
1491 }
1492
1493 /* caller should take a lock before calling */
1494 int mdd_finish_unlink(const struct lu_env *env,
1495                       struct mdd_object *obj, struct md_attr *ma,
1496                       const struct mdd_object *pobj,
1497                       const struct lu_name *lname,
1498                       struct thandle *th)
1499 {
1500         int rc = 0;
1501         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1502         ENTRY;
1503
1504         LASSERT(mdd_write_locked(env, obj) != 0);
1505
1506         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1507                 /* add new orphan and the object
1508                  * will be deleted during mdd_close() */
1509                 obj->mod_flags |= DEAD_OBJ;
1510                 if (obj->mod_count) {
1511                         rc = mdd_orphan_insert(env, obj, th);
1512                         if (rc == 0)
1513                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1514                                         "orphan list, open count = %d\n",
1515                                         PFID(mdd_object_fid(obj)),
1516                                         obj->mod_count);
1517                         else
1518                                 CERROR("Object "DFID" fail to be an orphan, "
1519                                        "open count = %d, maybe cause failed "
1520                                        "open replay\n",
1521                                         PFID(mdd_object_fid(obj)),
1522                                         obj->mod_count);
1523
1524                         /* mark object as an orphan here, not
1525                          * before mdd_orphan_insert() as racing
1526                          * mdd_la_get() may propagate ORPHAN_OBJ
1527                          * causing the asserition */
1528                         rc = mdd_mark_orphan_object(env, obj, th, false);
1529                 } else {
1530                         rc = mdo_destroy(env, obj, th);
1531                 }
1532         } else if (!is_dir) {
1533                 /* old files may not have link ea; ignore errors */
1534                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1535         }
1536
1537         RETURN(rc);
1538 }
1539
1540 /*
1541  * pobj maybe NULL
1542  * has mdd_write_lock on cobj already, but not on pobj yet
1543  */
1544 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1545                             const struct lu_attr *pattr,
1546                             struct mdd_object *cobj,
1547                             const struct lu_attr *cattr)
1548 {
1549         int rc;
1550         ENTRY;
1551
1552         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1553
1554         RETURN(rc);
1555 }
1556
1557 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1558                               struct mdd_object *p, struct mdd_object *c,
1559                               const struct lu_name *name, struct md_attr *ma,
1560                               struct thandle *handle, int no_name, int is_dir)
1561 {
1562         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1563         int              rc;
1564
1565         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1566                 if (likely(no_name == 0)) {
1567                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1568                                                       handle);
1569                         if (rc != 0)
1570                                 return rc;
1571                 }
1572
1573                 if (is_dir != 0) {
1574                         rc = mdo_declare_ref_del(env, p, handle);
1575                         if (rc != 0)
1576                                 return rc;
1577                 }
1578         }
1579
1580         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1581         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1582         la->la_valid = LA_CTIME | LA_MTIME;
1583         rc = mdo_declare_attr_set(env, p, la, handle);
1584         if (rc)
1585                 return rc;
1586
1587         if (c != NULL) {
1588                 rc = mdo_declare_ref_del(env, c, handle);
1589                 if (rc)
1590                         return rc;
1591
1592                 rc = mdo_declare_ref_del(env, c, handle);
1593                 if (rc)
1594                         return rc;
1595
1596                 la->la_valid = LA_CTIME;
1597                 rc = mdo_declare_attr_set(env, c, la, handle);
1598                 if (rc)
1599                         return rc;
1600
1601                 rc = mdd_declare_finish_unlink(env, c, handle);
1602                 if (rc)
1603                         return rc;
1604
1605                 /* FIXME: need changelog for remove entry */
1606                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1607                                                  NULL, handle);
1608         }
1609
1610         return rc;
1611 }
1612
1613 /*
1614  * test if a file has an HSM archive
1615  * if HSM attributes are not found in ma update them from
1616  * HSM xattr
1617  */
1618 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1619                                    struct mdd_object *obj,
1620                                    struct md_attr *ma)
1621 {
1622         ENTRY;
1623
1624         if (!(ma->ma_valid & MA_HSM)) {
1625                 /* no HSM MD provided, read xattr */
1626                 struct lu_buf   *hsm_buf;
1627                 const size_t     buflen = sizeof(struct hsm_attrs);
1628                 int              rc;
1629
1630                 hsm_buf = mdd_buf_get(env, NULL, 0);
1631                 lu_buf_alloc(hsm_buf, buflen);
1632                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1633                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1634                 lu_buf_free(hsm_buf);
1635                 if (rc < 0)
1636                         RETURN(false);
1637
1638                 ma->ma_valid |= MA_HSM;
1639         }
1640         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1641                 RETURN(true);
1642         RETURN(false);
1643 }
1644
1645 /**
1646  * Delete name entry and the object.
1647  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1648  * does not exist for this object, and it could only happen during resending
1649  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1650  * is also needed in this case(needed by changelog), so we have to add another
1651  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1652  * the ENOENT failure should be able to be fixed by redo mechanism.
1653  */
1654 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1655                       struct md_object *cobj, const struct lu_name *lname,
1656                       struct md_attr *ma, int no_name)
1657 {
1658         const char *name = lname->ln_name;
1659         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1660         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1661         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1662         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1663         struct mdd_object *mdd_cobj = NULL;
1664         struct mdd_device *mdd = mdo2mdd(pobj);
1665         struct thandle    *handle;
1666         int rc, is_dir = 0, cl_flags = 0;
1667         ENTRY;
1668
1669         /* let shutdown to start */
1670         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
1671
1672         /* cobj == NULL means only delete name entry */
1673         if (likely(cobj != NULL)) {
1674                 mdd_cobj = md2mdd_obj(cobj);
1675                 if (mdd_object_exists(mdd_cobj) == 0)
1676                         RETURN(-ENOENT);
1677         }
1678
1679         rc = mdd_la_get(env, mdd_pobj, pattr);
1680         if (rc)
1681                 RETURN(rc);
1682
1683         if (likely(mdd_cobj != NULL)) {
1684                 /* fetch cattr */
1685                 rc = mdd_la_get(env, mdd_cobj, cattr);
1686                 if (rc)
1687                         RETURN(rc);
1688
1689                 is_dir = S_ISDIR(cattr->la_mode);
1690                 /* search for an existing archive.
1691                  * we should check ahead as the object
1692                  * can be destroyed in this transaction */
1693                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1694                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1695         }
1696
1697         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1698         if (rc)
1699                 RETURN(rc);
1700
1701         handle = mdd_trans_create(env, mdd);
1702         if (IS_ERR(handle))
1703                 RETURN(PTR_ERR(handle));
1704
1705         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1706                                 lname, ma, handle, no_name, is_dir);
1707         if (rc)
1708                 GOTO(stop, rc);
1709
1710         rc = mdd_trans_start(env, mdd, handle);
1711         if (rc)
1712                 GOTO(stop, rc);
1713
1714         if (likely(mdd_cobj != NULL))
1715                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1716
1717         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1718                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1719                 if (rc)
1720                         GOTO(cleanup, rc);
1721         }
1722
1723         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1724             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1725                 GOTO(cleanup, rc = 0);
1726
1727         if (likely(mdd_cobj != NULL)) {
1728                 rc = mdo_ref_del(env, mdd_cobj, handle);
1729                 if (rc != 0) {
1730                         __mdd_index_insert_only(env, mdd_pobj,
1731                                                 mdo2fid(mdd_cobj),
1732                                                 mdd_object_type(mdd_cobj),
1733                                                 name, handle);
1734                         GOTO(cleanup, rc);
1735                 }
1736
1737                 if (is_dir)
1738                         /* unlink dot */
1739                         mdo_ref_del(env, mdd_cobj, handle);
1740
1741                 /* fetch updated nlink */
1742                 rc = mdd_la_get(env, mdd_cobj, cattr);
1743                 if (rc)
1744                         GOTO(cleanup, rc);
1745         }
1746
1747         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1748         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1749
1750         la->la_valid = LA_CTIME | LA_MTIME;
1751         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1752         if (rc)
1753                 GOTO(cleanup, rc);
1754
1755         /* Enough for only unlink the entry */
1756         if (unlikely(mdd_cobj == NULL))
1757                 GOTO(stop, rc);
1758
1759         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1760                 /* update ctime of an unlinked file only if it is still
1761                  * opened or a link still exists */
1762                 la->la_valid = LA_CTIME;
1763                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1764                 if (rc)
1765                         GOTO(cleanup, rc);
1766         }
1767
1768         /* XXX: this transfer to ma will be removed with LOD/OSP */
1769         ma->ma_attr = *cattr;
1770         ma->ma_valid |= MA_INODE;
1771         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1772         if (rc != 0)
1773                 GOTO(cleanup, rc);
1774
1775         /* fetch updated nlink */
1776         rc = mdd_la_get(env, mdd_cobj, cattr);
1777         /* if object is removed then we can't get its attrs,
1778          * use last get */
1779         if (rc == -ENOENT) {
1780                 cattr->la_nlink = 0;
1781                 rc = 0;
1782         }
1783
1784         if (cattr->la_nlink == 0) {
1785                 ma->ma_attr = *cattr;
1786                 ma->ma_valid |= MA_INODE;
1787         }
1788
1789         EXIT;
1790 cleanup:
1791         if (likely(mdd_cobj != NULL))
1792                 mdd_write_unlock(env, mdd_cobj);
1793
1794         if (rc == 0) {
1795                 if (cattr->la_nlink == 0)
1796                         cl_flags |= CLF_UNLINK_LAST;
1797                 else
1798                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1799
1800                 rc = mdd_changelog_ns_store(env, mdd,
1801                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1802                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1803                         handle);
1804         }
1805
1806 stop:
1807         rc = mdd_trans_stop(env, mdd, rc, handle);
1808
1809         return rc;
1810 }
1811
1812 /*
1813  * The permission has been checked when obj created, no need check again.
1814  */
1815 static int mdd_cd_sanity_check(const struct lu_env *env,
1816                                struct mdd_object *obj)
1817 {
1818         ENTRY;
1819
1820         /* EEXIST check */
1821         if (!obj || mdd_is_dead_obj(obj))
1822                 RETURN(-ENOENT);
1823
1824         RETURN(0);
1825 }
1826
1827 static int mdd_create_data(const struct lu_env *env,
1828                            struct md_object *pobj,
1829                            struct md_object *cobj,
1830                            const struct md_op_spec *spec,
1831                            struct md_attr *ma)
1832 {
1833         struct mdd_device *mdd = mdo2mdd(cobj);
1834         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1835         struct mdd_object *son = md2mdd_obj(cobj);
1836         struct thandle    *handle;
1837         const struct lu_buf *buf;
1838         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1839         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1840         int                rc;
1841         ENTRY;
1842
1843         rc = mdd_cd_sanity_check(env, son);
1844         if (rc)
1845                 RETURN(rc);
1846
1847         if (!md_should_create(spec->sp_cr_flags))
1848                 RETURN(0);
1849
1850         /*
1851          * there are following use cases for this function:
1852          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1853          *    striping can be specified or not
1854          * 2) CMD?
1855          */
1856         rc = mdd_la_get(env, son, attr);
1857         if (rc)
1858                 RETURN(rc);
1859
1860         /* calling ->ah_make_hint() is used to transfer information from parent */
1861         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1862
1863         handle = mdd_trans_create(env, mdd);
1864         if (IS_ERR(handle))
1865                 GOTO(out_free, rc = PTR_ERR(handle));
1866
1867         /*
1868          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1869          * Should this be fixed?
1870          */
1871         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1872                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1873                spec->sp_cr_flags, spec->no_create);
1874
1875         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1876                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1877                                         spec->u.sp_ea.eadatalen);
1878         } else {
1879                 buf = &LU_BUF_NULL;
1880         }
1881
1882         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1883                                   XATTR_NAME_LOV, 0, handle);
1884         if (rc)
1885                 GOTO(stop, rc);
1886
1887         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1888                                          handle);
1889         if (rc)
1890                 GOTO(stop, rc);
1891
1892         rc = mdd_trans_start(env, mdd, handle);
1893         if (rc)
1894                 GOTO(stop, rc);
1895
1896         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1897                           0, handle);
1898
1899         if (rc)
1900                 GOTO(stop, rc);
1901
1902         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1903
1904 stop:
1905         rc = mdd_trans_stop(env, mdd, rc, handle);
1906
1907 out_free:
1908         RETURN(rc);
1909 }
1910
1911 static int mdd_declare_object_initialize(const struct lu_env *env,
1912                                          struct mdd_object *parent,
1913                                          struct mdd_object *child,
1914                                          const struct lu_attr *attr,
1915                                          struct thandle *handle)
1916 {
1917         int rc;
1918         ENTRY;
1919
1920         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1921         if (!S_ISDIR(attr->la_mode))
1922                 RETURN(0);
1923
1924         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1925                                       dot, handle);
1926         if (rc != 0)
1927                 RETURN(rc);
1928
1929         rc = mdo_declare_ref_add(env, child, handle);
1930         if (rc != 0)
1931                 RETURN(rc);
1932
1933         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1934                                       dotdot, handle);
1935
1936         RETURN(rc);
1937 }
1938
1939 static int mdd_object_initialize(const struct lu_env *env,
1940                                  const struct lu_fid *pfid,
1941                                  struct mdd_object *child,
1942                                  struct lu_attr *attr,
1943                                  struct thandle *handle)
1944 {
1945         int rc = 0;
1946         ENTRY;
1947
1948         if (S_ISDIR(attr->la_mode)) {
1949                 /* Add "." and ".." for newly created dir */
1950                 mdo_ref_add(env, child, handle);
1951                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1952                                              S_IFDIR, dot, handle);
1953                 if (rc == 0)
1954                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1955                                                      dotdot, handle);
1956                 if (rc != 0)
1957                         mdo_ref_del(env, child, handle);
1958         }
1959
1960         RETURN(rc);
1961 }
1962
1963 /**
1964  * This function checks whether it can create a file/dir under the
1965  * directory(@pobj). The directory(@pobj) is not being locked by
1966  * mdd lock.
1967  *
1968  * \param[in] env       execution environment
1969  * \param[in] pobj      the directory to create files
1970  * \param[in] pattr     the attributes of the directory
1971  * \param[in] lname     the name of the created file/dir
1972  * \param[in] cattr     the attributes of the file/dir
1973  * \param[in] spec      create specification
1974  *
1975  * \retval              = 0 it is allowed to create file/dir under
1976  *                      the directory
1977  * \retval              negative error not allowed to create file/dir
1978  *                      under the directory
1979  */
1980 static int mdd_create_sanity_check(const struct lu_env *env,
1981                                    struct md_object *pobj,
1982                                    const struct lu_attr *pattr,
1983                                    const struct lu_name *lname,
1984                                    struct lu_attr *cattr,
1985                                    struct md_op_spec *spec)
1986 {
1987         struct mdd_thread_info *info = mdd_env_info(env);
1988         struct lu_fid     *fid       = &info->mti_fid;
1989         struct mdd_object *obj       = md2mdd_obj(pobj);
1990         struct mdd_device *m         = mdo2mdd(pobj);
1991         bool            check_perm = true;
1992         int rc;
1993         ENTRY;
1994
1995         /* EEXIST check */
1996         if (mdd_is_dead_obj(obj))
1997                 RETURN(-ENOENT);
1998
1999         /*
2000          * In some cases this lookup is not needed - we know before if name
2001          * exists or not because MDT performs lookup for it.
2002          * name length check is done in lookup.
2003          */
2004         if (spec->sp_cr_lookup) {
2005                 /*
2006                  * Check if the name already exist, though it will be checked in
2007                  * _index_insert also, for avoiding rolling back if exists
2008                  * _index_insert.
2009                  */
2010                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2011                                   MAY_WRITE | MAY_EXEC);
2012                 if (rc != -ENOENT)
2013                         RETURN(rc ? : -EEXIST);
2014
2015                 /* Permission is already being checked in mdd_lookup */
2016                 check_perm = false;
2017         }
2018
2019         if (S_ISDIR(cattr->la_mode) &&
2020             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2021             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2022                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2023
2024                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2025                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2026                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2027                         rc = -EINVAL;
2028                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2029                                "stripe_offset = %d, stripe_count = %u: "
2030                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2031                                 le32_to_cpu(lum->lum_magic),
2032                                (int)le32_to_cpu(lum->lum_stripe_offset),
2033                                le32_to_cpu(lum->lum_stripe_count), rc);
2034                         return rc;
2035                 }
2036         }
2037
2038         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2039         if (rc != 0)
2040                 RETURN(rc);
2041
2042         /* sgid check */
2043         if (pattr->la_mode & S_ISGID) {
2044                 cattr->la_gid = pattr->la_gid;
2045                 if (S_ISDIR(cattr->la_mode)) {
2046                         cattr->la_mode |= S_ISGID;
2047                         cattr->la_valid |= LA_MODE;
2048                 }
2049         }
2050
2051         /* Inherit project ID from parent directory */
2052         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2053                 cattr->la_projid = pattr->la_projid;
2054                 if (S_ISDIR(cattr->la_mode)) {
2055                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2056                         cattr->la_valid |= LA_FLAGS;
2057                 }
2058                 cattr->la_valid |= LA_PROJID;
2059         }
2060
2061         rc = mdd_name_check(m, lname);
2062         if (rc < 0)
2063                 RETURN(rc);
2064
2065         switch (cattr->la_mode & S_IFMT) {
2066         case S_IFLNK: {
2067                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2068
2069                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2070                         RETURN(-ENAMETOOLONG);
2071                 else
2072                         RETURN(0);
2073         }
2074         case S_IFDIR:
2075         case S_IFREG:
2076         case S_IFCHR:
2077         case S_IFBLK:
2078         case S_IFIFO:
2079         case S_IFSOCK:
2080                 rc = 0;
2081                 break;
2082         default:
2083                 rc = -EINVAL;
2084                 break;
2085         }
2086         RETURN(rc);
2087 }
2088
2089 static int mdd_declare_create_object(const struct lu_env *env,
2090                                      struct mdd_device *mdd,
2091                                      struct mdd_object *p, struct mdd_object *c,
2092                                      struct lu_attr *attr,
2093                                      struct thandle *handle,
2094                                      const struct md_op_spec *spec,
2095                                      struct lu_buf *def_acl_buf,
2096                                      struct lu_buf *acl_buf,
2097                                      struct dt_allocation_hint *hint)
2098 {
2099         const struct lu_buf *buf;
2100         int rc;
2101
2102         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2103                                                 hint);
2104         if (rc)
2105                 GOTO(out, rc);
2106
2107 #ifdef CONFIG_FS_POSIX_ACL
2108         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2109                 /* if dir, then can inherit default ACl */
2110                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2111                                            XATTR_NAME_ACL_DEFAULT,
2112                                            0, handle);
2113                 if (rc)
2114                         GOTO(out, rc);
2115         }
2116
2117         if (acl_buf && acl_buf->lb_len > 0) {
2118                 rc = mdo_declare_attr_set(env, c, attr, handle);
2119                 if (rc)
2120                         GOTO(out, rc);
2121
2122                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2123                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2124                 if (rc)
2125                         GOTO(out, rc);
2126         }
2127 #endif
2128         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2129         if (rc)
2130                 GOTO(out, rc);
2131
2132         /* replay case, create LOV EA from client data */
2133         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2134             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2135                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2136                                         spec->u.sp_ea.eadatalen);
2137                 rc = mdo_declare_xattr_set(env, c, buf,
2138                                            S_ISDIR(attr->la_mode) ?
2139                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2140                                            0, handle);
2141                 if (rc)
2142                         GOTO(out, rc);
2143         }
2144
2145         if (S_ISLNK(attr->la_mode)) {
2146                 const char *target_name = spec->u.sp_symname;
2147                 int sym_len = strlen(target_name);
2148                 const struct lu_buf *buf;
2149
2150                 buf = mdd_buf_get_const(env, target_name, sym_len);
2151                 rc = dt_declare_record_write(env, mdd_object_child(c),
2152                                              buf, 0, handle);
2153                 if (rc)
2154                         GOTO(out, rc);
2155         }
2156
2157         if (spec->sp_cr_file_secctx_name != NULL) {
2158                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2159                                         spec->sp_cr_file_secctx_size);
2160                 rc = mdo_declare_xattr_set(env, c, buf,
2161                                            spec->sp_cr_file_secctx_name, 0,
2162                                            handle);
2163                 if (rc < 0)
2164                         GOTO(out, rc);
2165         }
2166 out:
2167         return rc;
2168 }
2169
2170 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2171                               struct mdd_object *p, struct mdd_object *c,
2172                               const struct lu_name *name,
2173                               struct lu_attr *attr,
2174                               struct thandle *handle,
2175                               const struct md_op_spec *spec,
2176                               struct linkea_data *ldata,
2177                               struct lu_buf *def_acl_buf,
2178                               struct lu_buf *acl_buf,
2179                               struct dt_allocation_hint *hint)
2180 {
2181         int rc;
2182
2183         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2184                                        def_acl_buf, acl_buf, hint);
2185         if (rc)
2186                 GOTO(out, rc);
2187
2188         if (S_ISDIR(attr->la_mode)) {
2189                 rc = mdo_declare_ref_add(env, p, handle);
2190                 if (rc)
2191                         GOTO(out, rc);
2192         }
2193
2194         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2195                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2196                 if (rc)
2197                         GOTO(out, rc);
2198         } else {
2199                 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
2200                 enum changelog_rec_type type;
2201
2202                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2203                                               name->ln_name, handle);
2204                 if (rc != 0)
2205                         return rc;
2206
2207                 rc = mdd_declare_links_add(env, c, handle, ldata);
2208                 if (rc)
2209                         return rc;
2210
2211                 *la = *attr;
2212                 la->la_valid = LA_CTIME | LA_MTIME;
2213                 rc = mdo_declare_attr_set(env, p, la, handle);
2214                 if (rc)
2215                         return rc;
2216
2217                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2218                        S_ISREG(attr->la_mode) ? CL_CREATE :
2219                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2220
2221                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2222                                                  handle);
2223                 if (rc)
2224                         return rc;
2225         }
2226 out:
2227         return rc;
2228 }
2229
2230 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2231                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2232                         struct lu_buf *acl_buf)
2233 {
2234         int     rc;
2235         ENTRY;
2236
2237         if (S_ISLNK(la->la_mode)) {
2238                 acl_buf->lb_len = 0;
2239                 def_acl_buf->lb_len = 0;
2240                 RETURN(0);
2241         }
2242
2243         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2244         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2245                            XATTR_NAME_ACL_DEFAULT);
2246         mdd_read_unlock(env, pobj);
2247         if (rc > 0) {
2248                 /* If there are default ACL, fix mode/ACL by default ACL */
2249                 def_acl_buf->lb_len = rc;
2250                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2251                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2252                 acl_buf->lb_len = rc;
2253                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2254                 if (rc < 0)
2255                         RETURN(rc);
2256         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2257                 /* If there are no default ACL, fix mode by mask */
2258                 struct lu_ucred *uc = lu_ucred(env);
2259
2260                 /* The create triggered by MDT internal events, such as
2261                  * LFSCK reset, will not contain valid "uc". */
2262                 if (unlikely(uc != NULL))
2263                         la->la_mode &= ~uc->uc_umask;
2264                 rc = 0;
2265                 acl_buf->lb_len = 0;
2266                 def_acl_buf->lb_len = 0;
2267         }
2268
2269         RETURN(rc);
2270 }
2271
2272 /**
2273  * Create a metadata object and initialize it, set acl, xattr.
2274  **/
2275 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2276                              struct mdd_object *son, struct lu_attr *attr,
2277                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2278                              struct lu_buf *def_acl_buf,
2279                              struct dt_allocation_hint *hint,
2280                              struct thandle *handle)
2281 {
2282         const struct lu_buf *buf;
2283         int rc;
2284
2285         mdd_write_lock(env, son, MOR_TGT_CHILD);
2286         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2287                                         hint);
2288         if (rc)
2289                 GOTO(unlock, rc);
2290
2291         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2292          * created in declare phase, they also needs to be added to master
2293          * object as sub-directory entry. So it has to initialize the master
2294          * object, then set dir striped EA.(in mdo_xattr_set) */
2295         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
2296         if (rc != 0)
2297                 GOTO(err_destroy, rc);
2298
2299         /*
2300          * in case of replay we just set LOVEA provided by the client
2301          * XXX: I think it would be interesting to try "old" way where
2302          *      MDT calls this xattr_set(LOV) in a different transaction.
2303          *      probably this way we code can be made better.
2304          */
2305
2306         /* During creation, there are only a few cases we need do xattr_set to
2307          * create stripes.
2308          * 1. regular file: see comments above.
2309          * 2. dir: inherit default striping or pool settings from parent.
2310          * 3. create striped directory with provided stripeEA.
2311          * 4. create striped directory because inherit default layout from the
2312          * parent.
2313          */
2314         if (spec->no_create ||
2315             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2316             S_ISDIR(attr->la_mode)) {
2317                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2318                                         spec->u.sp_ea.eadatalen);
2319                 rc = mdo_xattr_set(env, son, buf,
2320                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2321                                                             XATTR_NAME_LOV,
2322                                    0, handle);
2323                 if (rc != 0)
2324                         GOTO(err_destroy, rc);
2325         }
2326
2327 #ifdef CONFIG_FS_POSIX_ACL
2328         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2329             S_ISDIR(attr->la_mode)) {
2330                 /* set default acl */
2331                 rc = mdo_xattr_set(env, son, def_acl_buf,
2332                                    XATTR_NAME_ACL_DEFAULT, 0,
2333                                    handle);
2334                 if (rc)
2335                         GOTO(err_destroy, rc);
2336         }
2337         /* set its own acl */
2338         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2339                 rc = mdo_xattr_set(env, son, acl_buf,
2340                                    XATTR_NAME_ACL_ACCESS,
2341                                    0, handle);
2342                 if (rc)
2343                         GOTO(err_destroy, rc);
2344         }
2345 #endif
2346
2347         if (S_ISLNK(attr->la_mode)) {
2348                 struct lu_ucred  *uc = lu_ucred_assert(env);
2349                 struct dt_object *dt = mdd_object_child(son);
2350                 const char *target_name = spec->u.sp_symname;
2351                 int sym_len = strlen(target_name);
2352                 loff_t pos = 0;
2353
2354                 buf = mdd_buf_get_const(env, target_name, sym_len);
2355                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2356                                                 uc->uc_cap &
2357                                                 CFS_CAP_SYS_RESOURCE_MASK);
2358
2359                 if (rc == sym_len)
2360                         rc = 0;
2361                 else
2362                         GOTO(err_initlized, rc = -EFAULT);
2363         }
2364
2365         if (spec->sp_cr_file_secctx_name != NULL) {
2366                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2367                                         spec->sp_cr_file_secctx_size);
2368                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2369                                    0, handle);
2370                 if (rc < 0)
2371                         GOTO(err_initlized, rc);
2372         }
2373
2374 err_initlized:
2375         if (unlikely(rc != 0)) {
2376                 int rc2;
2377                 if (S_ISDIR(attr->la_mode)) {
2378                         /* Drop the reference, no need to delete "."/"..",
2379                          * because the object to be destroied directly. */
2380                         rc2 = mdo_ref_del(env, son, handle);
2381                         if (rc2 != 0)
2382                                 GOTO(unlock, rc);
2383                 }
2384                 rc2 = mdo_ref_del(env, son, handle);
2385                 if (rc2 != 0)
2386                         GOTO(unlock, rc);
2387 err_destroy:
2388                 mdo_destroy(env, son, handle);
2389         }
2390 unlock:
2391         mdd_write_unlock(env, son);
2392         RETURN(rc);
2393 }
2394
2395 static int mdd_index_delete(const struct lu_env *env,
2396                             struct mdd_object *mdd_pobj,
2397                             struct lu_attr *cattr,
2398                             const struct lu_name *lname)
2399 {
2400         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2401         struct thandle *handle;
2402         int rc;
2403         ENTRY;
2404
2405         handle = mdd_trans_create(env, mdd);
2406         if (IS_ERR(handle))
2407                 RETURN(PTR_ERR(handle));
2408
2409         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2410                                       handle);
2411         if (rc != 0)
2412                 GOTO(stop, rc);
2413
2414         if (S_ISDIR(cattr->la_mode)) {
2415                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2416                 if (rc != 0)
2417                         GOTO(stop, rc);
2418         }
2419
2420         /* Since this will only be used in the error handler path,
2421          * Let's set the thandle to be local and not mess the transno */
2422         handle->th_local = 1;
2423         rc = mdd_trans_start(env, mdd, handle);
2424         if (rc)
2425                 GOTO(stop, rc);
2426
2427         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2428                                 S_ISDIR(cattr->la_mode), handle);
2429         if (rc)
2430                 GOTO(stop, rc);
2431 stop:
2432         rc = mdd_trans_stop(env, mdd, rc, handle);
2433
2434         RETURN(rc);
2435 }
2436
2437 /**
2438  * Create object and insert it into namespace.
2439  *
2440  * Two operations have to be performed:
2441  *
2442  *  - an allocation of a new object (->do_create()), and
2443  *  - an insertion into a parent index (->dio_insert()).
2444  *
2445  * Due to locking, operation order is not important, when both are
2446  * successful, *but* error handling cases are quite different:
2447  *
2448  *  - if insertion is done first, and following object creation fails,
2449  *  insertion has to be rolled back, but this operation might fail
2450  *  also leaving us with dangling index entry.
2451  *
2452  *  - if creation is done first, is has to be undone if insertion fails,
2453  *  leaving us with leaked space, which is not good but not fatal.
2454  *
2455  * It seems that creation-first is simplest solution, but it is sub-optimal
2456  * in the frequent
2457  *
2458  * $ mkdir foo
2459  * $ mkdir foo
2460  *
2461  * case, because second mkdir is bound to create object, only to
2462  * destroy it immediately.
2463  *
2464  * To avoid this follow local file systems that do double lookup:
2465  *
2466  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2467  * 1. create            (mdd_create_object_internal())
2468  * 2. insert            (__mdd_index_insert(), lookup again)
2469  *
2470  * \param[in] pobj      parent object
2471  * \param[in] lname     name of child being created
2472  * \param[in,out] child child object being created
2473  * \param[in] spec      additional create parameters
2474  * \param[in] ma        attributes for new child object
2475  *
2476  * \retval              0 on success
2477  * \retval              negative errno on failure
2478  */
2479 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2480                       const struct lu_name *lname, struct md_object *child,
2481                       struct md_op_spec *spec, struct md_attr *ma)
2482 {
2483         struct mdd_thread_info  *info = mdd_env_info(env);
2484         struct lu_attr          *la = &info->mti_la_for_fix;
2485         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2486         struct mdd_object       *son = md2mdd_obj(child);
2487         struct mdd_device       *mdd = mdo2mdd(pobj);
2488         struct lu_attr          *attr = &ma->ma_attr;
2489         struct thandle          *handle;
2490         struct lu_attr          *pattr = &info->mti_pattr;
2491         struct lu_buf           acl_buf;
2492         struct lu_buf           def_acl_buf;
2493         struct linkea_data      *ldata = &info->mti_link_data;
2494         const char              *name = lname->ln_name;
2495         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2496         int                      rc;
2497         int                      rc2;
2498         ENTRY;
2499
2500         rc = mdd_la_get(env, mdd_pobj, pattr);
2501         if (rc != 0)
2502                 RETURN(rc);
2503
2504         /* Sanity checks before big job. */
2505         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2506         if (rc)
2507                 RETURN(rc);
2508
2509         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2510                 GOTO(out_free, rc = -EINPROGRESS);
2511
2512         handle = mdd_trans_create(env, mdd);
2513         if (IS_ERR(handle))
2514                 GOTO(out_free, rc = PTR_ERR(handle));
2515
2516         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2517                                mdd->mdd_dt_conf.ddp_max_ea_size);
2518         acl_buf = info->mti_xattr_buf;
2519         def_acl_buf.lb_buf = info->mti_key;
2520         def_acl_buf.lb_len = sizeof(info->mti_key);
2521         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2522         if (rc < 0)
2523                 GOTO(out_stop, rc);
2524
2525         if (S_ISDIR(attr->la_mode)) {
2526                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
2527
2528                 /*
2529                  * migrate may create 1-stripe directory, so lod_ah_init()
2530                  * doesn't adjust stripe count from lmu.
2531                  */
2532                 if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
2533                         info->mti_lmu = *lmu;
2534                         info->mti_lmu.lum_stripe_count = 0;
2535                         spec->u.sp_ea.eadata = &info->mti_lmu;
2536                 }
2537         }
2538
2539         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2540
2541         memset(ldata, 0, sizeof(*ldata));
2542         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2543                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2544
2545                 tfid.f_oid--;
2546                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2547                                         &tfid, lname, 1, 0, ldata);
2548         } else {
2549                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2550                                         mdd_object_fid(mdd_pobj),
2551                                         lname, 1, 0, ldata);
2552         }
2553
2554         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2555                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2556                                 hint);
2557         if (rc)
2558                 GOTO(out_stop, rc);
2559
2560         rc = mdd_trans_start(env, mdd, handle);
2561         if (rc)
2562                 GOTO(out_stop, rc);
2563
2564         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2565                                &def_acl_buf, hint, handle);
2566         if (rc != 0)
2567                 GOTO(out_stop, rc);
2568
2569         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2570                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2571                 son->mod_flags |= VOLATILE_OBJ;
2572                 rc = mdd_orphan_insert(env, son, handle);
2573                 GOTO(out_volatile, rc);
2574         } else {
2575                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2576                                       attr->la_mode, name, handle);
2577                 if (rc != 0)
2578                         GOTO(err_created, rc);
2579
2580                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2581                               ldata, 1);
2582
2583                 /* update parent directory mtime/ctime */
2584                 *la = *attr;
2585                 la->la_valid = LA_CTIME | LA_MTIME;
2586                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2587                 if (rc)
2588                         GOTO(err_insert, rc);
2589         }
2590
2591         EXIT;
2592 err_insert:
2593         if (rc != 0) {
2594                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2595                         rc2 = mdd_orphan_delete(env, son, handle);
2596                 else
2597                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2598                                                  S_ISDIR(attr->la_mode),
2599                                                  handle);
2600                 if (rc2 != 0)
2601                         goto out_stop;
2602
2603 err_created:
2604                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2605                 if (S_ISDIR(attr->la_mode)) {
2606                         /* Drop the reference, no need to delete "."/"..",
2607                          * because the object is to be destroyed directly. */
2608                         rc2 = mdo_ref_del(env, son, handle);
2609                         if (rc2 != 0) {
2610                                 mdd_write_unlock(env, son);
2611                                 goto out_stop;
2612                         }
2613                 }
2614 out_volatile:
2615                 /* For volatile files drop one link immediately, since there is
2616                  * no filename in the namespace, and save any error returned. */
2617                 rc2 = mdo_ref_del(env, son, handle);
2618                 if (rc2 != 0) {
2619                         mdd_write_unlock(env, son);
2620                         if (unlikely(rc == 0))
2621                                 rc = rc2;
2622                         goto out_stop;
2623                 }
2624
2625                 /* Don't destroy the volatile object on success */
2626                 if (likely(rc != 0))
2627                         mdo_destroy(env, son, handle);
2628                 mdd_write_unlock(env, son);
2629         }
2630
2631         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2632             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2633                 rc = mdd_changelog_ns_store(env, mdd,
2634                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2635                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2636                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2637                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2638                                 NULL, handle);
2639 out_stop:
2640         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2641         if (rc == 0) {
2642                 /* If creation fails, it is most likely due to the remote update
2643                  * failure, because local transaction will mostly succeed at
2644                  * this stage. There is no easy way to rollback all of previous
2645                  * updates, so let's remove the object from namespace, and
2646                  * LFSCK should handle the orphan object. */
2647                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2648                         mdd_index_delete(env, mdd_pobj, attr, lname);
2649                 rc = rc2;
2650         }
2651 out_free:
2652         if (is_vmalloc_addr(ldata->ld_buf))
2653                 /* if we vmalloced a large buffer drop it */
2654                 lu_buf_free(ldata->ld_buf);
2655
2656         /* The child object shouldn't be cached anymore */
2657         if (rc)
2658                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2659                         &child->mo_lu.lo_header->loh_flags);
2660         return rc;
2661 }
2662
2663 /*
2664  * Get locks on parents in proper order
2665  * RETURN: < 0 - error, rename_order if successful
2666  */
2667 enum rename_order {
2668         MDD_RN_SAME,
2669         MDD_RN_SRCTGT,
2670         MDD_RN_TGTSRC
2671 };
2672
2673 static int mdd_rename_order(const struct lu_env *env,
2674                             struct mdd_device *mdd,
2675                             struct mdd_object *src_pobj,
2676                             const struct lu_attr *pattr,
2677                             struct mdd_object *tgt_pobj)
2678 {
2679         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2680         int rc;
2681         ENTRY;
2682
2683         if (src_pobj == tgt_pobj)
2684                 RETURN(MDD_RN_SAME);
2685
2686         /* compared the parent child relationship of src_p&tgt_p */
2687         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2688                 rc = MDD_RN_SRCTGT;
2689         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2690                 rc = MDD_RN_TGTSRC;
2691         } else {
2692                 rc = mdd_is_parent(env, mdd, src_pobj, pattr,
2693                                    mdo2fid(tgt_pobj));
2694                 if (rc == -EREMOTE)
2695                         rc = 0;
2696
2697                 if (rc == 1)
2698                         rc = MDD_RN_TGTSRC;
2699                 else if (rc == 0)
2700                         rc = MDD_RN_SRCTGT;
2701         }
2702
2703         RETURN(rc);
2704 }
2705
2706 /* has not mdd_write{read}_lock on any obj yet. */
2707 static int mdd_rename_sanity_check(const struct lu_env *env,
2708                                    struct mdd_object *src_pobj,
2709                                    const struct lu_attr *pattr,
2710                                    struct mdd_object *tgt_pobj,
2711                                    const struct lu_attr *tpattr,
2712                                    struct mdd_object *sobj,
2713                                    const struct lu_attr *cattr,
2714                                    struct mdd_object *tobj,
2715                                    const struct lu_attr *tattr)
2716 {
2717         int rc = 0;
2718         ENTRY;
2719
2720         /* XXX: when get here, sobj must NOT be NULL,
2721          * the other case has been processed in cld_rename
2722          * before mdd_rename and enable MDS_PERM_BYPASS. */
2723         LASSERT(sobj);
2724
2725         /*
2726          * If we are using project inheritance, we only allow renames
2727          * into our tree when the project IDs are the same; otherwise
2728          * tree quota mechanism would be circumvented.
2729          */
2730         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2731             tpattr->la_projid != cattr->la_projid) ||
2732             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2733             (pattr->la_projid != tpattr->la_projid)))
2734                 RETURN(-EXDEV);
2735
2736         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2737         if (rc)
2738                 RETURN(rc);
2739
2740         /* XXX: when get here, "tobj == NULL" means tobj must
2741          * NOT exist (neither on remote MDS, such case has been
2742          * processed in cld_rename before mdd_rename and enable
2743          * MDS_PERM_BYPASS).
2744          * So check may_create, but not check may_unlink. */
2745         if (tobj == NULL)
2746                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2747                                     (src_pobj != tgt_pobj));
2748         else
2749                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2750                                     (src_pobj != tgt_pobj), 1);
2751
2752         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2753                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2754
2755         RETURN(rc);
2756 }
2757
2758 static int mdd_declare_rename(const struct lu_env *env,
2759                               struct mdd_device *mdd,
2760                               struct mdd_object *mdd_spobj,
2761                               struct mdd_object *mdd_tpobj,
2762                               struct mdd_object *mdd_sobj,
2763                               struct mdd_object *mdd_tobj,
2764                               const struct lu_name *sname,
2765                               const struct lu_name *tname,
2766                               struct md_attr *ma,
2767                               struct linkea_data *ldata,
2768                               struct thandle *handle)
2769 {
2770         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2771         int rc;
2772
2773         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2774         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2775
2776         LASSERT(mdd_spobj);
2777         LASSERT(mdd_tpobj);
2778         LASSERT(mdd_sobj);
2779
2780         /* name from source dir */
2781         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2782         if (rc)
2783                 return rc;
2784
2785         /* .. from source child */
2786         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2787                 /* source child can be directory,
2788                  * counted by source dir's nlink */
2789                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2790                 if (rc)
2791                         return rc;
2792                 if (mdd_spobj != mdd_tpobj) {
2793                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2794                                                       handle);
2795                         if (rc != 0)
2796                                 return rc;
2797
2798                         rc = mdo_declare_index_insert(env, mdd_sobj,
2799                                                       mdo2fid(mdd_tpobj),
2800                                                       S_IFDIR, dotdot, handle);
2801                         if (rc != 0)
2802                                 return rc;
2803                 }
2804
2805                 /* new target child can be directory,
2806                  * counted by target dir's nlink */
2807                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2808                 if (rc != 0)
2809                         return rc;
2810         }
2811
2812         la->la_valid = LA_CTIME | LA_MTIME;
2813         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2814         if (rc != 0)
2815                 return rc;
2816
2817         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2818         if (rc != 0)
2819                 return rc;
2820
2821         la->la_valid = LA_CTIME;
2822         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2823         if (rc)
2824                 return rc;
2825
2826         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2827         if (rc)
2828                 return rc;
2829
2830         /* new name */
2831         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2832                                       mdd_object_type(mdd_sobj),
2833                                       tname->ln_name, handle);
2834         if (rc != 0)
2835                 return rc;
2836
2837         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2838                 /* delete target child in target parent directory */
2839                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2840                                               handle);
2841                 if (rc)
2842                         return rc;
2843
2844                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2845                 if (rc)
2846                         return rc;
2847
2848                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2849                         /* target child can be directory,
2850                          * delete "." reference in target child directory */
2851                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2852                         if (rc)
2853                                 return rc;
2854
2855                         /* delete ".." reference in target parent directory */
2856                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2857                         if (rc)
2858                                 return rc;
2859                 }
2860
2861                 la->la_valid = LA_CTIME;
2862                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2863                 if (rc)
2864                         return rc;
2865
2866                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2867                 if (rc)
2868                         return rc;
2869         }
2870
2871         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2872                                          handle);
2873         if (rc)
2874                 return rc;
2875
2876         return rc;
2877 }
2878
2879 /* src object can be remote that is why we use only fid and type of object */
2880 static int mdd_rename(const struct lu_env *env,
2881                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2882                       const struct lu_fid *lf, const struct lu_name *lsname,
2883                       struct md_object *tobj, const struct lu_name *ltname,
2884                       struct md_attr *ma)
2885 {
2886         const char *sname = lsname->ln_name;
2887         const char *tname = ltname->ln_name;
2888         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2889         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2890         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2891         struct mdd_device *mdd = mdo2mdd(src_pobj);
2892         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2893         struct mdd_object *mdd_tobj = NULL;
2894         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2895         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2896         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2897         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2898         struct thandle *handle;
2899         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2900         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2901         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2902         bool is_dir;
2903         bool tobj_ref = 0;
2904         bool tobj_locked = 0;
2905         unsigned cl_flags = 0;
2906         int rc, rc2;
2907         ENTRY;
2908
2909         /* let unlink to complete and commit */
2910         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
2911
2912         if (tobj)
2913                 mdd_tobj = md2mdd_obj(tobj);
2914
2915         mdd_sobj = mdd_object_find(env, mdd, lf);
2916         if (IS_ERR(mdd_sobj))
2917                 RETURN(PTR_ERR(mdd_sobj));
2918
2919         rc = mdd_la_get(env, mdd_sobj, cattr);
2920         if (rc)
2921                 GOTO(out_pending, rc);
2922
2923         rc = mdd_la_get(env, mdd_spobj, pattr);
2924         if (rc)
2925                 GOTO(out_pending, rc);
2926
2927         if (mdd_tobj) {
2928                 rc = mdd_la_get(env, mdd_tobj, tattr);
2929                 if (rc)
2930                         GOTO(out_pending, rc);
2931                 /* search for an existing archive.
2932                  * we should check ahead as the object
2933                  * can be destroyed in this transaction */
2934                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2935                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2936         }
2937
2938         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2939         if (rc)
2940                 GOTO(out_pending, rc);
2941
2942         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2943                                      mdd_sobj, cattr, mdd_tobj, tattr);
2944         if (rc)
2945                 GOTO(out_pending, rc);
2946
2947         rc = mdd_name_check(mdd, ltname);
2948         if (rc < 0)
2949                 GOTO(out_pending, rc);
2950
2951         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2952         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2953         if (rc < 0)
2954                 GOTO(out_pending, rc);
2955
2956         handle = mdd_trans_create(env, mdd);
2957         if (IS_ERR(handle))
2958                 GOTO(out_pending, rc = PTR_ERR(handle));
2959
2960         memset(ldata, 0, sizeof(*ldata));
2961         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2962                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2963                                 1, 0, ldata);
2964         if (rc)
2965                 GOTO(stop, rc);
2966
2967         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2968                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2969         if (rc)
2970                 GOTO(stop, rc);
2971
2972         rc = mdd_trans_start(env, mdd, handle);
2973         if (rc)
2974                 GOTO(stop, rc);
2975
2976         is_dir = S_ISDIR(cattr->la_mode);
2977
2978         /* Remove source name from source directory */
2979         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2980         if (rc != 0)
2981                 GOTO(stop, rc);
2982
2983         /* "mv dir1 dir2" needs "dir1/.." link update */
2984         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2985                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2986                 if (rc != 0)
2987                         GOTO(fixup_spobj2, rc);
2988
2989                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2990                                              dotdot, handle);
2991                 if (rc != 0)
2992                         GOTO(fixup_spobj, rc);
2993         }
2994
2995         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2996                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2997                 if (rc != 0)
2998                         /* tname might been renamed to something else */
2999                         GOTO(fixup_spobj, rc);
3000         }
3001
3002         /* Insert new fid with target name into target dir */
3003         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
3004                                 tname, handle);
3005         if (rc != 0)
3006                 GOTO(fixup_tpobj, rc);
3007
3008         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3009         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3010
3011         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3012         la->la_valid = LA_CTIME;
3013         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3014         if (rc)
3015                 GOTO(fixup_tpobj, rc);
3016
3017         /* Update the linkEA for the source object */
3018         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3019         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3020                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3021                               0, 0);
3022         if (rc == -ENOENT)
3023                 /* Old files might not have EA entry */
3024                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3025                               lsname, handle, NULL, 0);
3026         mdd_write_unlock(env, mdd_sobj);
3027         /* We don't fail the transaction if the link ea can't be
3028            updated -- fid2path will use alternate lookup method. */
3029         rc = 0;
3030
3031         /* Remove old target object
3032          * For tobj is remote case cmm layer has processed
3033          * and set tobj to NULL then. So when tobj is NOT NULL,
3034          * it must be local one.
3035          */
3036         if (tobj && mdd_object_exists(mdd_tobj)) {
3037                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3038                 tobj_locked = 1;
3039                 if (mdd_is_dead_obj(mdd_tobj)) {
3040                         /* shld not be dead, something is wrong */
3041                         CERROR("tobj is dead, something is wrong\n");
3042                         rc = -EINVAL;
3043                         goto cleanup;
3044                 }
3045                 mdo_ref_del(env, mdd_tobj, handle);
3046
3047                 /* Remove dot reference. */
3048                 if (S_ISDIR(tattr->la_mode))
3049                         mdo_ref_del(env, mdd_tobj, handle);
3050                 tobj_ref = 1;
3051
3052                 /* fetch updated nlink */
3053                 rc = mdd_la_get(env, mdd_tobj, tattr);
3054                 if (rc != 0) {
3055                         CERROR("%s: Failed to get nlink for tobj "
3056                                 DFID": rc = %d\n",
3057                                 mdd2obd_dev(mdd)->obd_name,
3058                                 PFID(tpobj_fid), rc);
3059                         GOTO(fixup_tpobj, rc);
3060                 }
3061
3062                 la->la_valid = LA_CTIME;
3063                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3064                 if (rc != 0) {
3065                         CERROR("%s: Failed to set ctime for tobj "
3066                                 DFID": rc = %d\n",
3067                                 mdd2obd_dev(mdd)->obd_name,
3068                                 PFID(tpobj_fid), rc);
3069                         GOTO(fixup_tpobj, rc);
3070                 }
3071
3072                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3073                 ma->ma_attr = *tattr;
3074                 ma->ma_valid |= MA_INODE;
3075                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3076                                        handle);
3077                 if (rc != 0) {
3078                         CERROR("%s: Failed to unlink tobj "
3079                                 DFID": rc = %d\n",
3080                                 mdd2obd_dev(mdd)->obd_name,
3081                                 PFID(tpobj_fid), rc);
3082                         GOTO(fixup_tpobj, rc);
3083                 }
3084
3085                 /* fetch updated nlink */
3086                 rc = mdd_la_get(env, mdd_tobj, tattr);
3087                 if (rc == -ENOENT) {
3088                         /* the object got removed, let's
3089                          * return the latest known attributes */
3090                         tattr->la_nlink = 0;
3091                         rc = 0;
3092                 } else if (rc != 0) {
3093                         CERROR("%s: Failed to get nlink for tobj "
3094                                 DFID": rc = %d\n",
3095                                 mdd2obd_dev(mdd)->obd_name,
3096                                 PFID(tpobj_fid), rc);
3097                         GOTO(fixup_tpobj, rc);
3098                 }
3099                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3100                 ma->ma_attr = *tattr;
3101                 ma->ma_valid |= MA_INODE;
3102
3103                 if (tattr->la_nlink == 0)
3104                         cl_flags |= CLF_RENAME_LAST;
3105                 else
3106                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3107         }
3108
3109         la->la_valid = LA_CTIME | LA_MTIME;
3110         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3111         if (rc)
3112                 GOTO(fixup_tpobj, rc);
3113
3114         if (mdd_spobj != mdd_tpobj) {
3115                 la->la_valid = LA_CTIME | LA_MTIME;
3116                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3117                 if (rc != 0)
3118                         GOTO(fixup_tpobj, rc);
3119         }
3120
3121         EXIT;
3122
3123 fixup_tpobj:
3124         if (rc) {
3125                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3126                 if (rc2)
3127                         CWARN("tp obj fix error %d\n",rc2);
3128
3129                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3130                     !mdd_is_dead_obj(mdd_tobj)) {
3131                         if (tobj_ref) {
3132                                 mdo_ref_add(env, mdd_tobj, handle);
3133                                 if (is_dir)
3134                                         mdo_ref_add(env, mdd_tobj, handle);
3135                         }
3136
3137                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3138                                                   mdo2fid(mdd_tobj),
3139                                                   mdd_object_type(mdd_tobj),
3140                                                   tname, handle);
3141                         if (rc2 != 0)
3142                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3143                 }
3144         }
3145
3146 fixup_spobj:
3147         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3148                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3149                 if (rc2)
3150                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3151                                mdd2obd_dev(mdd)->obd_name, rc2);
3152
3153
3154                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3155                                               dotdot, handle);
3156                 if (rc2 != 0)
3157                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3158                               mdd2obd_dev(mdd)->obd_name, rc2);
3159         }
3160
3161 fixup_spobj2:
3162         if (rc != 0) {
3163                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3164                                          mdd_object_type(mdd_sobj), sname,
3165                                          handle);
3166                 if (rc2 != 0)
3167                         CWARN("sp obj fix error: rc = %d\n", rc2);
3168         }
3169
3170 cleanup:
3171         if (tobj_locked)
3172                 mdd_write_unlock(env, mdd_tobj);
3173
3174         if (rc == 0)
3175                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3176                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3177                                             ltname, lsname, handle);
3178
3179 stop:
3180         rc = mdd_trans_stop(env, mdd, rc, handle);
3181
3182 out_pending:
3183         mdd_object_put(env, mdd_sobj);
3184         return rc;
3185 }
3186
3187 /**
3188  * Check whether we should migrate the file/dir
3189  * return val
3190  *      < 0  permission check failed or other error.
3191  *      = 0  the file can be migrated.
3192  **/
3193 static int mdd_migrate_sanity_check(const struct lu_env *env,
3194                                     struct mdd_device *mdd,
3195                                     struct mdd_object *spobj,
3196                                     struct mdd_object *tpobj,
3197                                     struct mdd_object *sobj,
3198                                     struct mdd_object *tobj,
3199                                     const struct lu_attr *spattr,
3200                                     const struct lu_attr *tpattr,
3201                                     const struct lu_attr *attr)
3202 {
3203         int rc;
3204
3205         ENTRY;
3206
3207         if (!mdd_object_remote(sobj)) {
3208                 mdd_read_lock(env, sobj, MOR_SRC_CHILD);
3209                 if (sobj->mod_count > 0) {
3210                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3211                                mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
3212                                sobj->mod_count);
3213                         mdd_read_unlock(env, sobj);
3214                         RETURN(-EBUSY);
3215                 }
3216                 mdd_read_unlock(env, sobj);
3217         }
3218
3219         if (mdd_object_exists(tobj))
3220                 RETURN(-EEXIST);
3221
3222         rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
3223                                      attr, NULL, NULL);
3224         RETURN(rc);
3225 }
3226
3227 typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
3228                                  struct mdd_object *obj,
3229                                  struct mdd_object *stripe,
3230                                  const struct lu_buf *lmv_buf,
3231                                  const struct lu_buf *lmu_buf,
3232                                  int index,
3233                                  struct thandle *handle);
3234
3235 static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
3236                                          struct mdd_object *obj,
3237                                          struct mdd_object *stripe,
3238                                          const struct lu_buf *lmv_buf,
3239                                          const struct lu_buf *lmu_buf,
3240                                          int index,
3241                                          struct thandle *handle)
3242 {
3243         struct mdd_thread_info *info = mdd_env_info(env);
3244         char *stripe_name = info->mti_name;
3245         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3246         int rc;
3247
3248         if (index < le32_to_cpu(lmu->lum_stripe_count))
3249                 return 0;
3250
3251         rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
3252         if (rc)
3253                 return rc;
3254
3255         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3256                  PFID(mdd_object_fid(stripe)), index);
3257
3258         rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
3259         if (rc)
3260                 return rc;
3261
3262         rc = mdo_declare_ref_del(env, obj, handle);
3263
3264         return rc;
3265 }
3266
3267 /* delete stripe from its master object namespace */
3268 static int mdd_dir_delete_stripe(const struct lu_env *env,
3269                                  struct mdd_object *obj,
3270                                  struct mdd_object *stripe,
3271                                  const struct lu_buf *lmv_buf,
3272                                  const struct lu_buf *lmu_buf,
3273                                  int index,
3274                                  struct thandle *handle)
3275 {
3276         struct mdd_thread_info *info = mdd_env_info(env);
3277         char *stripe_name = info->mti_name;
3278         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3279         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3280         __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
3281         int rc;
3282
3283         ENTRY;
3284
3285         /* local dir will delete via LOD */
3286         LASSERT(mdd_object_remote(obj));
3287         LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
3288
3289         if (index < del_offset)
3290                 RETURN(0);
3291
3292         mdd_write_lock(env, stripe, MOR_SRC_CHILD);
3293         rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
3294         if (rc)
3295                 GOTO(out, rc);
3296
3297         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3298                  PFID(mdd_object_fid(stripe)), index);
3299
3300         rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
3301         if (rc)
3302                 GOTO(out, rc);
3303
3304         rc = mdo_ref_del(env, obj, handle);
3305         GOTO(out, rc);
3306 out:
3307         mdd_write_unlock(env, stripe);
3308
3309         return rc;
3310 }
3311
3312 static int mdd_dir_declare_destroy_stripe(const struct lu_env *env,
3313                                           struct mdd_object *obj,
3314                                           struct mdd_object *stripe,
3315                                           const struct lu_buf *lmv_buf,
3316                                           const struct lu_buf *lmu_buf,
3317                                           int index,
3318                                           struct thandle *handle)
3319 {
3320         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3321         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3322         int rc;
3323
3324         if (index < shrink_offset) {
3325                 if (shrink_offset < 2)
3326                         return 0;
3327                 return mdo_declare_xattr_set(env, stripe, lmv_buf,
3328                                              XATTR_NAME_LMV".set", 0, handle);
3329         }
3330
3331         rc = mdo_declare_ref_del(env, stripe, handle);
3332         if (rc)
3333                 return rc;
3334
3335         rc = mdo_declare_destroy(env, stripe, handle);
3336
3337         return rc;
3338 }
3339
3340 static int mdd_dir_destroy_stripe(const struct lu_env *env,
3341                                   struct mdd_object *obj,
3342                                   struct mdd_object *stripe,
3343                                   const struct lu_buf *lmv_buf,
3344                                   const struct lu_buf *lmu_buf,
3345                                   int index,
3346                                   struct thandle *handle)
3347 {
3348         struct mdd_thread_info *info = mdd_env_info(env);
3349         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3350         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3351         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3352         int rc;
3353
3354         ENTRY;
3355
3356         /* update remaining stripes' LMV */
3357         if (index < shrink_offset) {
3358                 struct lmv_mds_md_v1 *slave_lmv;
3359                 struct lu_buf slave_buf = {
3360                                 .lb_buf = &info->mti_lmv.lmv_md_v1,
3361                                 .lb_len = sizeof(*slave_lmv)
3362                 };
3363                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
3364
3365                 /* if dir will be shrunk to 1-stripe, don't update */
3366                 if (shrink_offset < 2)
3367                         RETURN(0);
3368
3369                 slave_lmv = slave_buf.lb_buf;
3370                 memset(slave_lmv, 0, sizeof(*slave_lmv));
3371                 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
3372                 slave_lmv->lmv_stripe_count = lmu->lum_stripe_count;
3373                 slave_lmv->lmv_master_mdt_index = cpu_to_le32(index);
3374                 slave_lmv->lmv_hash_type = lmv->lmv_hash_type &
3375                                            cpu_to_le32(LMV_HASH_TYPE_MASK);
3376                 slave_lmv->lmv_layout_version = cpu_to_le32(++version);
3377
3378                 rc = mdo_xattr_set(env, stripe, &slave_buf,
3379                                    XATTR_NAME_LMV".set", 0, handle);
3380                 RETURN(rc);
3381         }
3382
3383         mdd_write_lock(env, stripe, MOR_SRC_CHILD);
3384         rc = mdo_ref_del(env, stripe, handle);
3385         if (!rc)
3386                 rc = mdo_destroy(env, stripe, handle);
3387         mdd_write_unlock(env, stripe);
3388
3389         RETURN(rc);
3390 }
3391
3392 static int mdd_shrink_stripe_is_empty(const struct lu_env *env,
3393                                        struct mdd_object *obj,
3394                                        struct mdd_object *stripe,
3395                                        const struct lu_buf *lmv_buf,
3396                                        const struct lu_buf *lmu_buf,
3397                                        int index,
3398                                        struct thandle *handle)
3399 {
3400         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3401         __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
3402
3403         /* the default value is 0, but it means 1 */
3404         if (!shrink_offset)
3405                 shrink_offset = 1;
3406
3407         if (index < shrink_offset)
3408                 return 0;
3409
3410         return mdd_dir_is_empty(env, stripe);
3411 }
3412
3413 /*
3414  * iterate stripes of striped directory on remote MDT, local striped directory
3415  * is accessed via LOD.
3416  */
3417 static int mdd_dir_iterate_stripes(const struct lu_env *env,
3418                                    struct mdd_object *obj,
3419                                    const struct lu_buf *lmv_buf,
3420                                    const struct lu_buf *lmu_buf,
3421                                    struct thandle *handle,
3422                                    mdd_dir_stripe_cb cb)
3423 {
3424         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
3425         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3426         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3427         struct mdd_object *stripe;
3428         int i;
3429         int rc;
3430
3431         ENTRY;
3432
3433         LASSERT(lmv);
3434
3435         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
3436                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
3437                 stripe = mdd_object_find(env, mdd, fid);
3438                 if (IS_ERR(stripe))
3439                         RETURN(PTR_ERR(stripe));
3440
3441                 rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
3442                 mdd_object_put(env, stripe);
3443                 if (rc)
3444                         RETURN(rc);
3445         }
3446
3447         RETURN(0);
3448 }
3449
3450 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
3451                             struct mdd_object *obj,
3452                             const struct lu_buf *buf,
3453                             const char *name,
3454                             int fl, struct thandle *handle);
3455
3456 /* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
3457 static int mdd_iterate_xattrs(const struct lu_env *env,
3458                               struct mdd_object *sobj,
3459                               struct mdd_object *tobj,
3460                               bool skip_linkea,
3461                               struct thandle *handle,
3462                               mdd_xattr_cb cb)
3463 {
3464         struct mdd_thread_info *info = mdd_env_info(env);
3465         char *xname;
3466         struct lu_buf list_xbuf;
3467         struct lu_buf xbuf = { NULL };
3468         int list_xsize;
3469         int xlen;
3470         int rem;
3471         int xsize;
3472         int rc;
3473
3474         ENTRY;
3475
3476         /* retrieve xattr list from the old object */
3477         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3478         if (list_xsize == -ENODATA)
3479                 RETURN(0);
3480
3481         if (list_xsize < 0)
3482                 RETURN(list_xsize);
3483
3484         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3485         if (info->mti_big_buf.lb_buf == NULL)
3486                 RETURN(-ENOMEM);
3487
3488         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3489         list_xbuf.lb_len = list_xsize;
3490         rc = mdo_xattr_list(env, sobj, &list_xbuf);
3491         if (rc < 0)
3492                 RETURN(rc);
3493
3494         rem = rc;
3495         rc = 0;
3496         xname = list_xbuf.lb_buf;
3497         while (rem > 0) {
3498                 xlen = strnlen(xname, rem - 1) + 1;
3499                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3500                     strcmp(XATTR_NAME_LMV, xname) == 0)
3501                         goto next;
3502
3503                 if (skip_linkea &&
3504                     strcmp(XATTR_NAME_LINK, xname) == 0)
3505                         goto next;
3506
3507                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3508                 if (xsize == -ENODATA)
3509                         goto next;
3510                 if (xsize < 0)
3511                         GOTO(out, rc = xsize);
3512
3513                 lu_buf_check_and_alloc(&xbuf, xsize);
3514                 if (xbuf.lb_buf == NULL)
3515                         GOTO(out, rc = -ENOMEM);
3516
3517                 rc = mdo_xattr_get(env, sobj, &xbuf, xname);
3518                 if (rc == -ENODATA)
3519                         goto next;
3520                 if (rc < 0)
3521                         GOTO(out, rc);
3522
3523 repeat:
3524                 rc = cb(env, tobj, &xbuf, xname, 0, handle);
3525                 if (unlikely(rc == -ENOSPC &&
3526                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3527                         rc = linkea_overflow_shrink(
3528                                         (struct linkea_data *)(xbuf.lb_buf));
3529                         if (likely(rc > 0)) {
3530                                 xbuf.lb_len = rc;
3531                                 goto repeat;
3532                         }
3533                 }
3534
3535                 if (rc)
3536                         GOTO(out, rc);
3537 next:
3538                 xname += xlen;
3539                 rem -= xlen;
3540         }
3541
3542 out:
3543         lu_buf_free(&xbuf);
3544         RETURN(rc);
3545 }
3546
3547 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
3548                              struct mdd_object *sobj,
3549                              struct mdd_object *tobj,
3550                              const struct lu_name *sname,
3551                              const struct lu_fid *sfid,
3552                              const struct lu_name *lname,
3553                              const struct lu_fid *fid,
3554                              void *opaque,
3555                              struct thandle *handle);
3556
3557 static int mdd_declare_update_link(const struct lu_env *env,
3558                                    struct mdd_object *sobj,
3559                                    struct mdd_object *tobj,
3560                                    const struct lu_name *tname,
3561                                    const struct lu_fid *tpfid,
3562                                    const struct lu_name *lname,
3563                                    const struct lu_fid *fid,
3564                                    void *unused,
3565                                    struct thandle *handle)
3566 {
3567         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3568         struct mdd_object *pobj;
3569         int rc;
3570
3571         /* ignore tobj */
3572         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3573             !strcmp(tname->ln_name, lname->ln_name))
3574                 return 0;
3575
3576         pobj = mdd_object_find(env, mdd, fid);
3577         if (IS_ERR(pobj))
3578                 return PTR_ERR(pobj);
3579
3580
3581         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
3582         if (!rc)
3583                 rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
3584                                               mdd_object_type(sobj),
3585                                               lname->ln_name, handle);
3586         mdd_object_put(env, pobj);
3587         if (rc)
3588                 return rc;
3589
3590         rc = mdo_declare_ref_add(env, tobj, handle);
3591         if (rc)
3592                 return rc;
3593
3594         rc = mdo_declare_ref_del(env, sobj, handle);
3595         return rc;
3596 }
3597
3598 static int mdd_update_link(const struct lu_env *env,
3599                            struct mdd_object *sobj,
3600                            struct mdd_object *tobj,
3601                            const struct lu_name *tname,
3602                            const struct lu_fid *tpfid,
3603                            const struct lu_name *lname,
3604                            const struct lu_fid *fid,
3605                            void *unused,
3606                            struct thandle *handle)
3607 {
3608         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3609         struct mdd_object *pobj;
3610         int rc;
3611
3612         ENTRY;
3613
3614         LASSERT(lu_name_is_valid(lname));
3615
3616         /* ignore tobj */
3617         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3618             !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
3619                 RETURN(0);
3620
3621         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
3622                PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
3623
3624         pobj = mdd_object_find(env, mdd, fid);
3625         if (IS_ERR(pobj)) {
3626                 CWARN("%s: cannot find obj "DFID": %ld\n",
3627                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
3628                 RETURN(PTR_ERR(pobj));
3629         }
3630
3631         if (!mdd_object_exists(pobj)) {
3632                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
3633                 mdd_object_put(env, pobj);
3634                 RETURN(-ENOENT);
3635         }
3636
3637         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
3638         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
3639         if (!rc)
3640                 rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
3641                                              mdd_object_type(sobj),
3642                                              lname->ln_name, handle);
3643         mdd_write_unlock(env, pobj);
3644         mdd_object_put(env, pobj);
3645         if (rc)
3646                 RETURN(rc);
3647
3648         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
3649         rc = mdo_ref_add(env, tobj, handle);
3650         mdd_write_unlock(env, tobj);
3651         if (rc)
3652                 RETURN(rc);
3653
3654         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
3655         rc = mdo_ref_del(env, sobj, handle);
3656         mdd_write_unlock(env, sobj);
3657
3658         RETURN(rc);
3659 }
3660
3661 static inline int mdd_fld_lookup(const struct lu_env *env,
3662                                  struct mdd_device *mdd,
3663                                  const struct lu_fid *fid,
3664                                  __u32 *mdt_index)
3665 {
3666         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
3667         struct seq_server_site *ss;
3668         int rc;
3669
3670         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
3671
3672         range->lsr_flags = LU_SEQ_RANGE_MDT;
3673         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
3674         if (rc)
3675                 return rc;
3676
3677         *mdt_index = range->lsr_index;
3678
3679         return 0;
3680 }
3681
3682 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
3683                                      struct mdd_object *sobj,
3684                                      struct mdd_object *tobj,
3685                                      const struct lu_name *tname,
3686                                      const struct lu_fid *tpfid,
3687                                      const struct lu_name *lname,
3688                                      const struct lu_fid *fid,
3689                                      void *opaque,
3690                                      struct thandle *handle)
3691 {
3692         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3693         __u32 source_mdt_index = *(__u32 *)opaque;
3694         __u32 link_mdt_index;
3695         int rc;
3696
3697         ENTRY;
3698
3699         /* ignore tobj */
3700         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3701             !strcmp(tname->ln_name, lname->ln_name))
3702                 return 0;
3703
3704         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
3705         if (rc)
3706                 RETURN(rc);
3707
3708         RETURN(link_mdt_index == source_mdt_index);
3709 }
3710
3711 static int mdd_iterate_linkea(const struct lu_env *env,
3712                               struct mdd_object *sobj,
3713                               struct mdd_object *tobj,
3714                               const struct lu_name *tname,
3715                               const struct lu_fid *tpfid,
3716                               struct linkea_data *ldata,
3717                               void *opaque,
3718                               struct thandle *handle,
3719                               mdd_linkea_cb cb)
3720 {
3721         struct mdd_thread_info *info = mdd_env_info(env);
3722         char *filename = info->mti_name;
3723         struct lu_name lname;
3724         struct lu_fid fid;
3725         int rc = 0;
3726
3727         if (!ldata->ld_buf)
3728                 return 0;
3729
3730         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
3731              linkea_next_entry(ldata)) {
3732                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
3733                                     &fid);
3734
3735                 /* Note: lname might miss \0 at the end */
3736                 snprintf(filename, sizeof(info->mti_name), "%.*s",
3737                          lname.ln_namelen, lname.ln_name);
3738                 lname.ln_name = filename;
3739
3740                 CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
3741
3742                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
3743                         handle);
3744         }
3745
3746         return rc;
3747 }
3748
3749 /**
3750  * Prepare linkea, and check whether file needs migrate: if source still has
3751  * link on source MDT, no need to migrate, just update namespace on source and
3752  * target parents.
3753  *
3754  * \retval      0 do migrate
3755  * \retval      1 don't migrate
3756  * \retval      -errno on failure
3757  */
3758 static int migrate_linkea_prepare(const struct lu_env *env,
3759                                   struct mdd_device *mdd,
3760                                   struct mdd_object *spobj,
3761                                   struct mdd_object *tpobj,
3762                                   struct mdd_object *sobj,
3763                                   const struct lu_name *lname,
3764                                   const struct lu_attr *attr,
3765                                   struct linkea_data *ldata)
3766 {
3767         __u32 source_mdt_index;
3768         int rc;
3769
3770         ENTRY;
3771
3772         memset(ldata, 0, sizeof(*ldata));
3773         rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
3774                                 mdo2fid(tpobj), lname, 1, 0, ldata);
3775         if (rc)
3776                 RETURN(rc);
3777
3778         /*
3779          * Then it will check if the file should be migrated. If the file has
3780          * mulitple links, we only need migrate the file if all of its entries
3781          * has been migrated to the remote MDT.
3782          */
3783         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
3784                 RETURN(0);
3785
3786         /* If there are still links locally, don't migrate this file */
3787         LASSERT(ldata->ld_leh != NULL);
3788
3789         /*
3790          * If linkEA is overflow, it means there are some unknown name entries
3791          * under unknown parents, which will prevent the migration.
3792          */
3793         if (unlikely(ldata->ld_leh->leh_overflow_time))
3794                 RETURN(-EOVERFLOW);
3795
3796         rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
3797         if (rc)
3798                 RETURN(rc);
3799
3800         rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
3801                                 &source_mdt_index, NULL,
3802                                 mdd_is_link_on_source_mdt);
3803         RETURN(rc);
3804 }
3805
3806 static int mdd_dir_declare_layout_delete(const struct lu_env *env,
3807                                          struct mdd_object *obj,
3808                                          const struct lu_buf *lmv_buf,
3809                                          const struct lu_buf *lmu_buf,
3810                                          struct thandle *handle)
3811 {
3812         int rc;
3813
3814         if (!lmv_buf->lb_buf)
3815                 rc = mdo_declare_index_delete(env, obj, dotdot, handle);
3816         else if (mdd_object_remote(obj))
3817                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3818                                              mdd_dir_declare_delete_stripe);
3819         else
3820                 rc = mdo_declare_xattr_set(env, obj, lmu_buf,
3821                                            XATTR_NAME_LMV".del", 0, handle);
3822
3823         return rc;
3824 }
3825
3826 static int mdd_dir_layout_delete(const struct lu_env *env,
3827                                  struct mdd_object *obj,
3828                                  const struct lu_buf *lmv_buf,
3829                                  const struct lu_buf *lmu_buf,
3830                                  struct thandle *handle)
3831 {
3832         int rc;
3833
3834         ENTRY;
3835
3836         mdd_write_lock(env, obj, MOR_SRC_PARENT);
3837         if (!lmv_buf->lb_buf)
3838                 /* normal dir */
3839                 rc = __mdd_index_delete_only(env, obj, dotdot, handle);
3840         else if (mdd_object_remote(obj))
3841                 /* striped, but remote */
3842                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3843                                              mdd_dir_delete_stripe);
3844         else
3845                 rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
3846                                    handle);
3847         mdd_write_unlock(env, obj);
3848
3849         RETURN(rc);
3850 }
3851
3852 static int mdd_declare_migrate_create(const struct lu_env *env,
3853                                       struct mdd_object *tpobj,
3854                                       struct mdd_object *sobj,
3855                                       struct mdd_object *tobj,
3856                                       const struct lu_name *lname,
3857                                       struct lu_attr *attr,
3858                                       struct lu_buf *sbuf,
3859                                       struct linkea_data *ldata,
3860                                       struct md_op_spec *spec,
3861                                       struct dt_allocation_hint *hint,
3862                                       struct thandle *handle)
3863 {
3864         struct mdd_thread_info *info = mdd_env_info(env);
3865         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
3866         int rc;
3867
3868         if (S_ISDIR(attr->la_mode)) {
3869                 struct lu_buf lmu_buf = { NULL };
3870
3871                 if (lmv) {
3872                         struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
3873
3874                         lmu->lum_stripe_count = 0;
3875                         lmu_buf.lb_buf = lmu;
3876                         lmu_buf.lb_len = sizeof(*lmu);
3877                 }
3878
3879                 rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
3880                                                    handle);
3881                 if (rc)
3882                         return rc;
3883
3884                 if (lmv) {
3885                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3886                                                    handle);
3887                         if (rc)
3888                                 return rc;
3889                 }
3890         }
3891
3892         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
3893                                 lname, attr, handle, spec, ldata, NULL, NULL,
3894                                 hint);
3895         if (rc)
3896                 return rc;
3897
3898         if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
3899                 if (!lmv) {
3900                         /*
3901                          * if sobj is not striped, fake a 1-stripe LMV, which
3902                          * will be used to generate a compound LMV for tobj.
3903                          */
3904                         LASSERT(sizeof(info->mti_key) >
3905                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
3906                         lmv = (typeof(lmv))info->mti_key;
3907                         memset(lmv, 0, sizeof(*lmv));
3908                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3909                         lmv->lmv_stripe_count = cpu_to_le32(1);
3910                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
3911                         sbuf->lb_buf = lmv;
3912                         sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
3913
3914                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3915                                                    XATTR_NAME_LMV".add", 0,
3916                                                    handle);
3917                         sbuf->lb_buf = NULL;
3918                         sbuf->lb_len = 0;
3919                 } else {
3920                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3921                                                    XATTR_NAME_LMV".add", 0,
3922                                                    handle);
3923                 }
3924                 if (rc)
3925                         return rc;
3926         }
3927
3928         /*
3929          * tobj mode will be used in lod_declare_xattr_set(), but it's not
3930          * createb yet, copy from sobj.
3931          */
3932         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
3933         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
3934                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
3935
3936         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
3937                                 mdo_declare_xattr_set);
3938         if (rc)
3939                 return rc;
3940
3941         if (S_ISREG(attr->la_mode)) {
3942                 handle->th_complex = 1;
3943
3944                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
3945                 if (rc)
3946                         return rc;
3947
3948                 /*
3949                  * target is not initalized because its LOV is copied from
3950                  * source in mdd_migrate_create(), declare via sobj.
3951                  */
3952                 rc = mdo_declare_xattr_set(env, sobj, NULL, XATTR_NAME_FID, 0,
3953                                            handle);
3954                 if (rc)
3955                         return rc;
3956         }
3957
3958         if (!S_ISDIR(attr->la_mode)) {
3959                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
3960                                         ldata, NULL, handle,
3961                                         mdd_declare_update_link);
3962                 if (rc)
3963                         return rc;
3964
3965                 if (lmv) {
3966                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3967                                                    handle);
3968                         if (rc)
3969                                 return rc;
3970                 }
3971         }
3972
3973         return rc;
3974 }
3975
3976 /**
3977  * Create target, migrate xattrs and update links.
3978  *
3979  * Create target according to \a spec, and then migrate xattrs, if it's
3980  * directory, migrate source stripes to target, else update fid to target
3981  * for links.
3982  *
3983  * \param[in] env       execution environment
3984  * \param[in] tpobj     target parent object
3985  * \param[in] sobj      source object
3986  * \param[in] tobj      target object
3987  * \param[in] lname     file name
3988  * \param[in] attr      source attributes
3989  * \param[in] sbuf      source LMV buf
3990  * \param[in] ldata     source linkea
3991  * \param[in] spec      migrate create spec
3992  * \param[in] hint      target creation hint
3993  * \param[in] handle    tranasction handle
3994  *
3995  * \retval      0 on success
3996  * \retval      -errno on failure
3997  **/
3998 static int mdd_migrate_create(const struct lu_env *env,
3999                               struct mdd_object *tpobj,
4000                               struct mdd_object *sobj,
4001                               struct mdd_object *tobj,
4002                               const struct lu_name *lname,
4003                               struct lu_attr *attr,
4004                               const struct lu_buf *sbuf,
4005                               struct linkea_data *ldata,
4006                               struct md_op_spec *spec,
4007                               struct dt_allocation_hint *hint,
4008                               struct thandle *handle)
4009 {
4010         int rc;
4011
4012         ENTRY;
4013
4014         /*
4015          * directory will migrate sobj stripes to tobj:
4016          * 1. delete stripes from sobj.
4017          * 2. add stripes to tobj, see lod_dir_declare_layout_add().
4018          * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
4019          */
4020         if (S_ISDIR(attr->la_mode)) {
4021                 struct lu_buf lmu_buf = { NULL };
4022
4023                 if (sbuf->lb_buf) {
4024                         struct mdd_thread_info *info = mdd_env_info(env);
4025                         struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
4026
4027                         lmu->lum_stripe_count = 0;
4028                         lmu_buf.lb_buf = lmu;
4029                         lmu_buf.lb_len = sizeof(*lmu);
4030                 }
4031
4032                 rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
4033                 if (rc)
4034                         RETURN(rc);
4035
4036                 /*
4037                  * delete LMV so that later when destroying sobj it won't delete
4038                  * stripes again.
4039                  */
4040                 if (sbuf->lb_buf) {
4041                         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4042                         rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
4043                         mdd_write_unlock(env, sobj);
4044                         if (rc)
4045                                 RETURN(rc);
4046                 }
4047         }
4048
4049         /* don't set nlink from sobj */
4050         attr->la_valid &= ~LA_NLINK;
4051
4052         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
4053                                 handle);
4054         if (rc)
4055                 RETURN(rc);
4056
4057         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
4058         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
4059         mdd_write_unlock(env, tobj);
4060         if (rc)
4061                 RETURN(rc);
4062
4063         if (S_ISREG(attr->la_mode)) {
4064                 /* delete LOV to avoid deleting OST objs when destroying sobj */
4065                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4066                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
4067                 mdd_write_unlock(env, sobj);
4068                 if (rc)
4069                         RETURN(rc);
4070
4071                 /* for regular file, update OST objects XATTR_NAME_FID */
4072                 rc = mdo_xattr_set(env, tobj, NULL, XATTR_NAME_FID, 0, handle);
4073                 if (rc)
4074                         RETURN(rc);
4075         }
4076
4077         if (!S_ISDIR(attr->la_mode))
4078                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
4079                                         ldata, NULL, handle, mdd_update_link);
4080
4081         RETURN(rc);
4082 }
4083
4084 static int mdd_declare_migrate_update(const struct lu_env *env,
4085                                       struct mdd_object *spobj,
4086                                       struct mdd_object *tpobj,
4087                                       struct mdd_object *sobj,
4088                                       struct mdd_object *tobj,
4089                                       const struct lu_name *lname,
4090                                       struct lu_attr *attr,
4091                                       struct lu_attr *spattr,
4092                                       struct lu_attr *tpattr,
4093                                       struct linkea_data *ldata,
4094                                       bool do_create,
4095                                       bool do_destroy,
4096                                       struct md_attr *ma,
4097                                       struct thandle *handle)
4098 {
4099         struct mdd_thread_info *info = mdd_env_info(env);
4100         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4101         struct lu_attr *la = &info->mti_la_for_fix;
4102         int rc;
4103
4104         rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
4105         if (rc)
4106                 return rc;
4107
4108         if (S_ISDIR(attr->la_mode)) {
4109                 rc = mdo_declare_ref_del(env, spobj, handle);
4110                 if (rc)
4111                         return rc;
4112         }
4113
4114         rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4115                                       lname->ln_name, handle);
4116         if (rc)
4117                 return rc;
4118
4119         rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
4120         if (rc)
4121                 return rc;
4122
4123         if (S_ISDIR(attr->la_mode)) {
4124                 rc = mdo_declare_ref_add(env, tpobj, handle);
4125                 if (rc)
4126                         return rc;
4127         }
4128
4129         la->la_valid = LA_CTIME | LA_MTIME;
4130         rc = mdo_declare_attr_set(env, spobj, la, handle);
4131         if (rc)
4132                 return rc;
4133
4134         if (tpobj != spobj) {
4135                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
4136                 if (rc)
4137                         return rc;
4138         }
4139
4140         if (do_create && do_destroy) {
4141                 rc = mdo_declare_ref_del(env, sobj, handle);
4142                 if (rc)
4143                         return rc;
4144
4145                 rc = mdo_declare_destroy(env, sobj, handle);
4146                 if (rc)
4147                         return rc;
4148         }
4149
4150         return rc;
4151 }
4152
4153 /**
4154  * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
4155  **/
4156 static int mdd_migrate_update(const struct lu_env *env,
4157                               struct mdd_object *spobj,
4158                               struct mdd_object *tpobj,
4159                               struct mdd_object *sobj,
4160                               struct mdd_object *tobj,
4161                               const struct lu_name *lname,
4162                               struct lu_attr *attr,
4163                               struct lu_attr *spattr,
4164                               struct lu_attr *tpattr,
4165                               struct linkea_data *ldata,
4166                               bool do_create,
4167                               bool do_destroy,
4168                               struct md_attr *ma,
4169                               struct thandle *handle)
4170 {
4171         struct mdd_thread_info *info = mdd_env_info(env);
4172         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4173         struct lu_attr *la = &info->mti_la_for_fix;
4174         int rc;
4175
4176         ENTRY;
4177
4178         CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
4179                lname->ln_name, PFID(mdo2fid(spobj)),
4180                PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
4181                PFID(fid));
4182
4183         rc = __mdd_index_delete(env, spobj, lname->ln_name,
4184                                 S_ISDIR(attr->la_mode), handle);
4185         if (rc)
4186                 RETURN(rc);
4187
4188         rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4189                                 lname->ln_name, handle);
4190         if (rc)
4191                 RETURN(rc);
4192
4193         rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
4194         if (rc)
4195                 RETURN(rc);
4196
4197         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
4198         la->la_valid = LA_CTIME | LA_MTIME;
4199         mdd_write_lock(env, spobj, MOR_SRC_PARENT);
4200         rc = mdd_update_time(env, spobj, spattr, la, handle);
4201         mdd_write_unlock(env, spobj);
4202         if (rc)
4203                 RETURN(rc);
4204
4205         if (tpobj != spobj) {
4206                 la->la_valid = LA_CTIME | LA_MTIME;
4207                 mdd_write_lock(env, tpobj, MOR_TGT_PARENT);
4208                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
4209                 mdd_write_unlock(env, tpobj);
4210                 if (rc)
4211                         RETURN(rc);
4212         }
4213
4214         /*
4215          * there are three situations we shouldn't destroy source:
4216          * 1. if source is not dir, and it happens to be located on the same MDT
4217          *    as target parent.
4218          * 2. if source is not dir, and has link on the same MDT where source is
4219          *    located.
4220          * 3. if source is dir, and it's a normal, non-empty dir.
4221          *
4222          * the first two situations equals to !do_create, and the 3rd equals to
4223          * !do_destroy, so the below condition is actually
4224          * !(!do_create || !do_destroy).
4225          *
4226          * NB, if user has opened source dir before migration, he will get
4227          * -ENOENT error when close it later, because source is likely to be
4228          *  remote, which can't be moved to orphan list, but except this error
4229          *  message, this won't cause any inconsistency or trouble.
4230          */
4231         if (do_create && do_destroy) {
4232                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4233                 mdo_ref_del(env, sobj, handle);
4234                 rc = mdo_destroy(env, sobj, handle);
4235                 mdd_write_unlock(env, sobj);
4236         }
4237
4238         RETURN(rc);
4239 }
4240
4241 /**
4242  * Migrate directory or file.
4243  *
4244  * migrate source to target in following steps:
4245  *   1. create target, append source stripes after target's if it's directory,
4246  *      migrate xattrs and update fid of source links.
4247  *   2. update namespace: migrate dirent from source parent to target parent,
4248  *      update file linkea, and destroy source if it's not needed any more.
4249  *
4250  * \param[in] env       execution environment
4251  * \param[in] md_pobj   parent master object
4252  * \param[in] md_sobj   source object
4253  * \param[in] lname     file name
4254  * \param[in] md_tobj   target object
4255  * \param[in] spec      target creation spec
4256  * \param[in] ma        used to update \a pobj mtime and ctime
4257  *
4258  * \retval              0 on success
4259  * \retval              -errno on failure
4260  */
4261 static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
4262                        struct md_object *md_sobj, const struct lu_name *lname,
4263                        struct md_object *md_tobj, struct md_op_spec *spec,
4264                        struct md_attr *ma)
4265 {
4266         struct mdd_device *mdd = mdo2mdd(md_pobj);
4267         struct mdd_thread_info *info = mdd_env_info(env);
4268         struct mdd_object *pobj = md2mdd_obj(md_pobj);
4269         struct mdd_object *sobj = md2mdd_obj(md_sobj);
4270         struct mdd_object *tobj = md2mdd_obj(md_tobj);
4271         struct mdd_object *spobj = NULL;
4272         struct mdd_object *tpobj = NULL;
4273         struct lu_attr *spattr = &info->mti_pattr;
4274         struct lu_attr *tpattr = &info->mti_tpattr;
4275         struct lu_attr *attr = &info->mti_cattr;
4276         struct linkea_data *ldata = &info->mti_link_data;
4277         struct dt_allocation_hint *hint = &info->mti_hint;
4278         struct lu_fid *fid = &info->mti_fid2;
4279         struct lu_buf pbuf = { NULL };
4280         struct lu_buf sbuf = { NULL };
4281         struct lmv_mds_md_v1 *plmv;
4282         struct thandle *handle;
4283         bool do_create = true;
4284         bool do_destroy = true;
4285         int rc;
4286         ENTRY;
4287
4288         rc = mdd_la_get(env, sobj, attr);
4289         if (rc)
4290                 RETURN(rc);
4291
4292         /* locate source and target stripe on pobj, which are the real parent */
4293         rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
4294         if (rc < 0 && rc != -ENODATA)
4295                 RETURN(rc);
4296
4297         plmv = pbuf.lb_buf;
4298         if (plmv) {
4299                 __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
4300                 __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
4301                 int index;
4302
4303                 /* locate target parent stripe */
4304                 if (hash_type & LMV_HASH_FLAG_MIGRATION) {
4305                         /*
4306                          * fail check here to make sure top dir migration
4307                          * succeed.
4308                          */
4309                         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
4310                                 GOTO(out, rc = -EIO);
4311                         hash_type &= ~LMV_HASH_FLAG_MIGRATION;
4312                         count = le32_to_cpu(plmv->lmv_migrate_offset);
4313                 }
4314                 index = lmv_name_to_stripe_index(hash_type, count,
4315                                                  lname->ln_name,
4316                                                  lname->ln_namelen);
4317                 if (index < 0)
4318                         GOTO(out, rc = index);
4319
4320                 fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4321                 tpobj = mdd_object_find(env, mdd, fid);
4322                 if (IS_ERR(tpobj))
4323                         GOTO(out, rc = PTR_ERR(tpobj));
4324
4325                 /* locate source parent stripe */
4326                 if (le32_to_cpu(plmv->lmv_hash_type) &
4327                     LMV_HASH_FLAG_MIGRATION) {
4328                         hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
4329                         count = le32_to_cpu(plmv->lmv_stripe_count) -
4330                                 le32_to_cpu(plmv->lmv_migrate_offset);
4331
4332                         index = lmv_name_to_stripe_index(hash_type, count,
4333                                                          lname->ln_name,
4334                                                          lname->ln_namelen);
4335                         if (index < 0) {
4336                                 mdd_object_put(env, tpobj);
4337                                 GOTO(out, rc = index);
4338                         }
4339
4340                         index += le32_to_cpu(plmv->lmv_migrate_offset);
4341                         fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4342                         spobj = mdd_object_find(env, mdd, fid);
4343                         if (IS_ERR(spobj)) {
4344                                 mdd_object_put(env, tpobj);
4345                                 GOTO(out, rc = PTR_ERR(spobj));
4346                         }
4347                 } else {
4348                         spobj = tpobj;
4349                         mdd_object_get(spobj);
4350                 }
4351         } else {
4352                 tpobj = pobj;
4353                 spobj = pobj;
4354                 mdd_object_get(tpobj);
4355                 mdd_object_get(spobj);
4356         }
4357
4358         rc = mdd_la_get(env, spobj, spattr);
4359         if (rc)
4360                 GOTO(out, rc);
4361
4362         rc = mdd_la_get(env, tpobj, tpattr);
4363         if (rc)
4364                 GOTO(out, rc);
4365
4366         if (S_ISDIR(attr->la_mode)) {
4367                 struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
4368
4369                 LASSERT(lmu);
4370
4371                 /*
4372                  * if user use default value '0' for stripe_count, we need to
4373                  * adjust it to '1' to create a 1-stripe directory.
4374                  */
4375                 if (lmu->lum_stripe_count == 0) {
4376                         /* eadata is from request, don't alter it */
4377                         info->mti_lmu = *lmu;
4378                         info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
4379                         spec->u.sp_ea.eadata = &info->mti_lmu;
4380                         lmu = spec->u.sp_ea.eadata;
4381                 }
4382
4383                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4384                 if (rc == -ENODATA) {
4385                         if (mdd_dir_is_empty(env, sobj) == 0) {
4386                                 /*
4387                                  * if sobj is empty, and target is not striped,
4388                                  * create target as a normal directory.
4389                                  */
4390                                 if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
4391                                         info->mti_lmu = *lmu;
4392                                         info->mti_lmu.lum_stripe_count = 0;
4393                                         spec->u.sp_ea.eadata = &info->mti_lmu;
4394                                         lmu = spec->u.sp_ea.eadata;
4395                                 }
4396                         } else {
4397                                 /*
4398                                  * sobj is not striped dir, if it's not empty,
4399                                  * it will be migrated to be a stripe of target,
4400                                  * don't destroy it after migration.
4401                                  */
4402                                 do_destroy = false;
4403                         }
4404                 } else if (rc) {
4405                         GOTO(out, rc);
4406                 } else {
4407                         struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
4408
4409                         if (le32_to_cpu(lmv->lmv_hash_type) &
4410                             LMV_HASH_FLAG_MIGRATION) {
4411                                 __u32 lum_stripe_count = lmu->lum_stripe_count;
4412                                 __u32 lmv_hash_type = lmv->lmv_hash_type &
4413                                         cpu_to_le32(LMV_HASH_TYPE_MASK);
4414
4415                                 if (!lum_stripe_count)
4416                                         lum_stripe_count = cpu_to_le32(1);
4417
4418                                 /* TODO: check specific MDTs */
4419                                 if (lmv->lmv_migrate_offset !=
4420                                     lum_stripe_count ||
4421                                     lmv->lmv_master_mdt_index !=
4422                                     lmu->lum_stripe_offset ||
4423                                     (lmv_hash_type != 0 &&
4424                                      lmv_hash_type != lmu->lum_hash_type)) {
4425                                         CERROR("%s: \'"DNAME"\' migration was "
4426                                                 "interrupted, run \'lfs migrate "
4427                                                 "-m %d -c %d -H %d "DNAME"\' to "
4428                                                 "finish migration.\n",
4429                                                 mdd2obd_dev(mdd)->obd_name,
4430                                                 PNAME(lname),
4431                                                 le32_to_cpu(
4432                                                     lmv->lmv_master_mdt_index),
4433                                                 le32_to_cpu(
4434                                                     lmv->lmv_migrate_offset),
4435                                                 le32_to_cpu(lmv_hash_type),
4436                                                 PNAME(lname));
4437                                         GOTO(out, rc = -EPERM);
4438                                 }
4439                                 GOTO(out, rc = -EALREADY);
4440                         }
4441                 }
4442         } else if (!mdd_object_remote(tpobj)) {
4443                 /*
4444                  * if source is already on MDT where target parent is located,
4445                  * no need to create, just update namespace.
4446                  */
4447                 do_create = false;
4448         } else if (S_ISLNK(attr->la_mode)) {
4449                 lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4450                 if (!sbuf.lb_buf)
4451                         GOTO(out, rc = -ENOMEM);
4452                 rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4453                 if (rc <= 0) {
4454                         rc = rc ?: -EFAULT;
4455                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
4456                                mdd2obd_dev(mdd)->obd_name,
4457                                PFID(mdo2fid(sobj)), rc);
4458                         GOTO(out, rc);
4459                 }
4460                 spec->u.sp_symname = sbuf.lb_buf;
4461         } else if (S_ISREG(attr->la_mode)) {
4462                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
4463                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
4464         }
4465
4466         /*
4467          * if sobj has link on the same MDT, no need to create, just update
4468          * namespace, and it will be a remote file on target parent, which is
4469          * similar to rename.
4470          */
4471         rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
4472                                     ldata);
4473         if (rc > 0)
4474                 do_create = false;
4475         else if (rc)
4476                 GOTO(out, rc);
4477
4478         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4479                                       spattr, tpattr, attr);
4480         if (rc)
4481                 GOTO(out, rc);
4482
4483         mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
4484
4485         handle = mdd_trans_create(env, mdd);
4486         if (IS_ERR(handle))
4487                 GOTO(out, rc = PTR_ERR(handle));
4488
4489         if (do_create) {
4490                 rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
4491                                                 attr, &sbuf, ldata, spec, hint,
4492                                                 handle);
4493                 if (rc)
4494                         GOTO(stop_trans, rc);
4495         }
4496
4497         rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
4498                                         attr, spattr, tpattr, ldata, do_create,
4499                                         do_destroy, ma, handle);
4500         if (rc)
4501                 GOTO(stop_trans, rc);
4502
4503         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4504                                          handle);
4505         if (rc)
4506                 GOTO(stop_trans, rc);
4507
4508         rc = mdd_trans_start(env, mdd, handle);
4509         if (rc)
4510                 GOTO(stop_trans, rc);
4511
4512         if (do_create) {
4513                 rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
4514                                         &sbuf, ldata, spec, hint, handle);
4515                 if (rc)
4516                         GOTO(stop_trans, rc);
4517         }
4518
4519         rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
4520                                 spattr, tpattr, ldata, do_create, do_destroy,
4521                                 ma, handle);
4522         if (rc)
4523                 GOTO(stop_trans, rc);
4524
4525         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
4526                                     mdo2fid(spobj), mdo2fid(sobj),
4527                                     mdo2fid(tpobj), lname, lname, handle);
4528         if (rc)
4529                 GOTO(stop_trans, rc);
4530
4531         EXIT;
4532 stop_trans:
4533         rc = mdd_trans_stop(env, mdd, rc, handle);
4534 out:
4535         if (spobj && !IS_ERR(spobj))
4536                 mdd_object_put(env, spobj);
4537         if (tpobj && !IS_ERR(tpobj))
4538                 mdd_object_put(env, tpobj);
4539         lu_buf_free(&sbuf);
4540         lu_buf_free(&pbuf);
4541         return rc;
4542 }
4543
4544 static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
4545                                            struct mdd_object *pobj,
4546                                            struct mdd_object *obj,
4547                                            struct mdd_object *stripe,
4548                                            struct lu_attr *attr,
4549                                            struct lu_buf *lmv_buf,
4550                                            const struct lu_buf *lmu_buf,
4551                                            struct lu_name *lname,
4552                                            struct thandle *handle)
4553 {
4554         struct mdd_thread_info *info = mdd_env_info(env);
4555         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
4556         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
4557         struct lu_buf shrink_buf = { .lb_buf = lmu,
4558                                      .lb_len = sizeof(*lmu) };
4559         int rc;
4560
4561         LASSERT(lmv);
4562
4563         memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
4564
4565         if (le32_to_cpu(lmu->lum_stripe_count) < 2)
4566                 lmu->lum_stripe_count = 0;
4567
4568         rc = mdd_dir_declare_layout_delete(env, obj, lmv_buf, &shrink_buf,
4569                                            handle);
4570         if (rc)
4571                 return rc;
4572
4573         if (lmu->lum_stripe_count == 0) {
4574                 lmu->lum_stripe_count = cpu_to_le32(1);
4575
4576                 rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LMV, handle);
4577                 if (rc)
4578                         return rc;
4579         }
4580
4581         rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
4582                                      mdd_dir_declare_destroy_stripe);
4583         if (rc)
4584                 return rc;
4585
4586         if (le32_to_cpu(lmu->lum_stripe_count) > 1)
4587                 return mdo_declare_xattr_set(env, obj, lmv_buf,
4588                                              XATTR_NAME_LMV".set", 0, handle);
4589
4590         rc = mdo_declare_index_insert(env, stripe, mdo2fid(pobj), S_IFDIR,
4591                                       dotdot, handle);
4592         if (rc)
4593                 return rc;
4594
4595         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
4596                                 mdo_declare_xattr_set);
4597         if (rc)
4598                 return rc;
4599
4600         rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4601         if (rc)
4602                 return rc;
4603
4604         rc = mdo_declare_attr_set(env, stripe, attr, handle);
4605         if (rc)
4606                 return rc;
4607
4608         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
4609         if (rc)
4610                 return rc;
4611
4612         rc = mdo_declare_index_insert(env, pobj, mdo2fid(stripe), attr->la_mode,
4613                                       lname->ln_name, handle);
4614         if (rc)
4615                 return rc;
4616
4617         rc = mdo_declare_ref_del(env, obj, handle);
4618         if (rc)
4619                 return rc;
4620
4621         rc = mdo_declare_ref_del(env, obj, handle);
4622         if (rc)
4623                 return rc;
4624
4625         rc = mdo_declare_destroy(env, obj, handle);
4626         if (rc)
4627                 return rc;
4628
4629         return rc;
4630
4631 }
4632
4633 /*
4634  * after files under \a obj were migrated, shrink old stripes from \a obj,
4635  * furthermore, if it becomes a 1-stripe directory, convert it to a normal one.
4636  */
4637 static int __mdd_dir_layout_shrink(const struct lu_env *env,
4638                                    struct mdd_object *pobj,
4639                                    struct mdd_object *obj,
4640                                    struct mdd_object *stripe,
4641                                    struct lu_attr *attr,
4642                                    struct lu_buf *lmv_buf,
4643                                    const struct lu_buf *lmu_buf,
4644                                    struct lu_name *lname,
4645                                    struct thandle *handle)
4646 {
4647         struct mdd_thread_info *info = mdd_env_info(env);
4648         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
4649         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
4650         struct lu_buf shrink_buf = { .lb_buf = lmu,
4651                                      .lb_len = sizeof(*lmu) };
4652         int len = lmv_buf->lb_len;
4653         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
4654         int rc;
4655
4656         ENTRY;
4657
4658         /* lmu needs to be altered, but lmu_buf is const */
4659         memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
4660
4661         /*
4662          * if dir will be shrunk to 1-stripe, delete all stripes, because it
4663          * will be converted to normal dir.
4664          */
4665         if (le32_to_cpu(lmu->lum_stripe_count) == 1)
4666                 lmu->lum_stripe_count = 0;
4667
4668         /* delete stripes after lmu_stripe_count */
4669         rc = mdd_dir_layout_delete(env, obj, lmv_buf, &shrink_buf, handle);
4670         if (rc)
4671                 RETURN(rc);
4672
4673         if (lmu->lum_stripe_count == 0) {
4674                 lmu->lum_stripe_count = cpu_to_le32(1);
4675
4676                 /* delete LMV to avoid deleting stripes again upon destroy */
4677                 mdd_write_lock(env, obj, MOR_SRC_CHILD);
4678                 rc = mdo_xattr_del(env, obj, XATTR_NAME_LMV, handle);
4679                 mdd_write_unlock(env, obj);
4680                 if (rc)
4681                         RETURN(rc);
4682         }
4683
4684         /* destroy stripes after lmu_stripe_count */
4685         mdd_write_lock(env, obj, MOR_SRC_PARENT);
4686         rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
4687                                      mdd_dir_destroy_stripe);
4688         mdd_write_unlock(env, obj);
4689
4690         if (le32_to_cpu(lmu->lum_stripe_count) > 1) {
4691                 /* update dir LMV, that's all if it's still striped. */
4692                 lmv->lmv_stripe_count = lmu->lum_stripe_count;
4693                 lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
4694                 lmv->lmv_migrate_offset = 0;
4695                 lmv->lmv_migrate_hash = 0;
4696                 lmv->lmv_layout_version = cpu_to_le32(++version);
4697
4698                 lmv_buf->lb_len = sizeof(*lmv);
4699                 rc = mdo_xattr_set(env, obj, lmv_buf, XATTR_NAME_LMV".set", 0,
4700                                    handle);
4701                 lmv_buf->lb_len = len;
4702                 RETURN(rc);
4703         }
4704
4705         /* replace directory with its remaining stripe */
4706         LASSERT(pobj);
4707         LASSERT(stripe);
4708
4709         mdd_write_lock(env, pobj, MOR_SRC_PARENT);
4710         mdd_write_lock(env, obj, MOR_SRC_CHILD);
4711
4712         /* insert dotdot to stripe which points to parent */
4713         rc = __mdd_index_insert_only(env, stripe, mdo2fid(pobj), S_IFDIR,
4714                                      dotdot, handle);
4715         if (rc)
4716                 GOTO(out, rc);
4717
4718         /* copy xattrs including linkea */
4719         rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
4720         if (rc)
4721                 GOTO(out, rc);
4722
4723         /* delete LMV */
4724         rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
4725         if (rc)
4726                 GOTO(out, rc);
4727
4728         /* don't set nlink from parent */
4729         attr->la_valid &= ~LA_NLINK;
4730
4731         rc = mdo_attr_set(env, stripe, attr, handle);
4732         if (rc)
4733                 GOTO(out, rc);
4734
4735         /* delete dir name from parent */
4736         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
4737         if (rc)
4738                 GOTO(out, rc);
4739
4740         /* insert stripe to parent with dir name */
4741         rc = __mdd_index_insert_only(env, pobj, mdo2fid(stripe), attr->la_mode,
4742                                      lname->ln_name, handle);
4743         if (rc)
4744                 GOTO(out, rc);
4745
4746         /* destroy dir obj */
4747         rc = mdo_ref_del(env, obj, handle);
4748         if (rc)
4749                 GOTO(out, rc);
4750
4751         rc = mdo_ref_del(env, obj, handle);
4752         if (rc)
4753                 GOTO(out, rc);
4754
4755         rc = mdo_destroy(env, obj, handle);
4756         if (rc)
4757                 GOTO(out, rc);
4758
4759         EXIT;
4760 out:
4761         mdd_write_unlock(env, obj);
4762         mdd_write_unlock(env, pobj);
4763
4764         return rc;
4765 }
4766
4767 /*
4768  * shrink directory stripes to lum_stripe_count specified by lum_mds_md.
4769  */
4770 int mdd_dir_layout_shrink(const struct lu_env *env,
4771                           struct md_object *md_obj,
4772                           const struct lu_buf *lmu_buf)
4773 {
4774         struct mdd_device *mdd = mdo2mdd(md_obj);
4775         struct mdd_thread_info *info = mdd_env_info(env);
4776         struct mdd_object *obj = md2mdd_obj(md_obj);
4777         struct mdd_object *pobj = NULL;
4778         struct mdd_object *stripe = NULL;
4779         struct lu_attr *attr = &info->mti_pattr;
4780         struct lu_fid *fid = &info->mti_fid2;
4781         struct lu_name lname = { NULL };
4782         struct lu_buf lmv_buf = { NULL };
4783         struct lmv_mds_md_v1 *lmv;
4784         struct lmv_user_md *lmu;
4785         struct thandle *handle;
4786         int rc;
4787
4788         ENTRY;
4789
4790         rc = mdd_la_get(env, obj, attr);
4791         if (rc)
4792                 RETURN(rc);
4793
4794         if (!S_ISDIR(attr->la_mode))
4795                 RETURN(-ENOTDIR);
4796
4797         rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
4798         if (rc < 0)
4799                 RETURN(rc);
4800
4801         lmv = lmv_buf.lb_buf;
4802         lmu = lmu_buf->lb_buf;
4803
4804         /* this was checked in MDT */
4805         LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
4806                 le32_to_cpu(lmv->lmv_stripe_count));
4807
4808         rc = mdd_dir_iterate_stripes(env, obj, &lmv_buf, lmu_buf, NULL,
4809                                      mdd_shrink_stripe_is_empty);
4810         if (rc < 0)
4811                 GOTO(out, rc);
4812         else if (rc != 0)
4813                 GOTO(out, rc = -ENOTEMPTY);
4814
4815         /*
4816          * if obj stripe count will be shrunk to 1, we need to convert it to a
4817          * normal dir, which will change its fid and update parent namespace,
4818          * get obj name and parent fid from linkea.
4819          */
4820         if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
4821                 struct linkea_data *ldata = &info->mti_link_data;
4822                 char *filename = info->mti_name;
4823
4824                 rc = mdd_links_read(env, obj, ldata);
4825                 if (rc)
4826                         GOTO(out, rc);
4827
4828                 if (ldata->ld_leh->leh_reccount > 1)
4829                         GOTO(out, rc = -EINVAL);
4830
4831                 linkea_first_entry(ldata);
4832                 if (!ldata->ld_lee)
4833                         GOTO(out, rc = -ENODATA);
4834
4835                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
4836                                     fid);
4837
4838                 /* Note: lname might miss \0 at the end */
4839                 snprintf(filename, sizeof(info->mti_name), "%.*s",
4840                          lname.ln_namelen, lname.ln_name);
4841                 lname.ln_name = filename;
4842
4843                 pobj = mdd_object_find(env, mdd, fid);
4844                 if (IS_ERR(pobj)) {
4845                         rc = PTR_ERR(pobj);
4846                         pobj = NULL;
4847                         GOTO(out, rc);
4848                 }
4849
4850                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
4851
4852                 stripe = mdd_object_find(env, mdd, fid);
4853                 if (IS_ERR(stripe)) {
4854                         mdd_object_put(env, pobj);
4855                         pobj = NULL;
4856                         GOTO(out, rc = PTR_ERR(stripe));
4857                 }
4858         }
4859
4860         handle = mdd_trans_create(env, mdd);
4861         if (IS_ERR(handle))
4862                 GOTO(out, rc = PTR_ERR(handle));
4863
4864         rc = __mdd_dir_declare_layout_shrink(env, pobj, obj, stripe, attr,
4865                                              &lmv_buf, lmu_buf, &lname, handle);
4866         if (rc)
4867                 GOTO(stop_trans, rc);
4868
4869         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
4870                                          handle);
4871         if (rc)
4872                 GOTO(stop_trans, rc);
4873
4874         rc = mdd_trans_start(env, mdd, handle);
4875         if (rc)
4876                 GOTO(stop_trans, rc);
4877
4878         rc = __mdd_dir_layout_shrink(env, pobj, obj, stripe, attr, &lmv_buf,
4879                                      lmu_buf, &lname, handle);
4880         if (rc)
4881                 GOTO(stop_trans, rc);
4882
4883         rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
4884                                             XATTR_NAME_LMV, handle);
4885         GOTO(stop_trans, rc);
4886
4887 stop_trans:
4888         rc = mdd_trans_stop(env, mdd, rc, handle);
4889 out:
4890         if (pobj) {
4891                 mdd_object_put(env, stripe);
4892                 mdd_object_put(env, pobj);
4893         }
4894         lu_buf_free(&lmv_buf);
4895         return rc;
4896 }
4897
4898 const struct md_dir_operations mdd_dir_ops = {
4899         .mdo_is_subdir     = mdd_is_subdir,
4900         .mdo_lookup        = mdd_lookup,
4901         .mdo_create        = mdd_create,
4902         .mdo_rename        = mdd_rename,
4903         .mdo_link          = mdd_link,
4904         .mdo_unlink        = mdd_unlink,
4905         .mdo_create_data   = mdd_create_data,
4906         .mdo_migrate       = mdd_migrate,
4907 };