Whamcloud - gitweb
aa87adc267160ddba34d672fc07e90ad2c79addb
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46
47 #include "mdd_internal.h"
48
49 static const char dot[] = ".";
50 static const char dotdot[] = "..";
51
52 static struct lu_name lname_dotdot = {
53         .ln_name        = (char *) dotdot,
54         .ln_namelen     = sizeof(dotdot) - 1,
55 };
56
57 static inline int
58 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
59 {
60         if (!lu_name_is_valid(ln))
61                 return -EINVAL;
62         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
63                 return -ENAMETOOLONG;
64         else
65                 return 0;
66 }
67
68 /* Get FID from name and parent */
69 static int
70 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
71              const struct lu_attr *pattr, const struct lu_name *lname,
72              struct lu_fid* fid, int mask)
73 {
74         const char *name                = lname->ln_name;
75         const struct dt_key *key        = (const struct dt_key *)name;
76         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
77         struct mdd_device *m            = mdo2mdd(pobj);
78         struct dt_object *dir           = mdd_object_child(mdd_obj);
79         int rc;
80         ENTRY;
81
82         if (unlikely(mdd_is_dead_obj(mdd_obj)))
83                 RETURN(-ESTALE);
84
85         if (!mdd_object_exists(mdd_obj))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         }
92
93         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
94                                             MOR_TGT_PARENT);
95         if (rc)
96                 RETURN(rc);
97
98         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
99                    dt_try_as_dir(env, dir)))
100                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
101         else
102                 rc = -ENOTDIR;
103
104         RETURN(rc);
105 }
106
107 int mdd_lookup(const struct lu_env *env,
108                struct md_object *pobj, const struct lu_name *lname,
109                struct lu_fid *fid, struct md_op_spec *spec)
110 {
111         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
112         int rc;
113         ENTRY;
114
115         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
116         if (rc != 0)
117                 RETURN(rc);
118
119         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
120                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
121         RETURN(rc);
122 }
123
124 /** Read the link EA into a temp buffer.
125  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
126  * A pointer to the buffer is stored in \a ldata::ld_buf.
127  *
128  * \retval 0 or error
129  */
130 static int __mdd_links_read(const struct lu_env *env,
131                             struct mdd_object *mdd_obj,
132                             struct linkea_data *ldata)
133 {
134         int rc;
135
136         if (!mdd_object_exists(mdd_obj))
137                 return -ENODATA;
138
139         /* First try a small buf */
140         LASSERT(env != NULL);
141         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
142                                                PAGE_SIZE);
143         if (ldata->ld_buf->lb_buf == NULL)
144                 return -ENOMEM;
145
146         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
147         if (rc == -ERANGE) {
148                 /* Buf was too small, figure out what we need. */
149                 lu_buf_free(ldata->ld_buf);
150                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
151                                    XATTR_NAME_LINK);
152                 if (rc < 0)
153                         return rc;
154                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
155                 if (ldata->ld_buf->lb_buf == NULL)
156                         return -ENOMEM;
157                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
158                                   XATTR_NAME_LINK);
159         }
160         if (rc < 0) {
161                 lu_buf_free(ldata->ld_buf);
162                 ldata->ld_buf = NULL;
163                 return rc;
164         }
165
166         return linkea_init(ldata);
167 }
168
169 static int mdd_links_read(const struct lu_env *env,
170                           struct mdd_object *mdd_obj,
171                           struct linkea_data *ldata)
172 {
173         int rc;
174
175         rc = __mdd_links_read(env, mdd_obj, ldata);
176         if (!rc)
177                 rc = linkea_init(ldata);
178
179         return rc;
180 }
181
182 static int mdd_links_read_with_rec(const struct lu_env *env,
183                                    struct mdd_object *mdd_obj,
184                                    struct linkea_data *ldata)
185 {
186         int rc;
187
188         rc = __mdd_links_read(env, mdd_obj, ldata);
189         if (!rc)
190                 rc = linkea_init_with_rec(ldata);
191
192         return rc;
193 }
194
195 /**
196  * Get parent FID of the directory
197  *
198  * Read parent FID from linkEA, if that fails, then do lookup
199  * dotdot to get the parent FID.
200  *
201  * \param[in] env       execution environment
202  * \param[in] obj       object from which to find the parent FID
203  * \param[in] attr      attribute of the object
204  * \param[out] fid      fid to get the parent FID
205  *
206  * \retval              0 if getting the parent FID succeeds.
207  * \retval              negative errno if getting the parent FID fails.
208  **/
209 static inline int mdd_parent_fid(const struct lu_env *env,
210                                  struct mdd_object *obj,
211                                  const struct lu_attr *attr,
212                                  struct lu_fid *fid)
213 {
214         struct mdd_thread_info  *info = mdd_env_info(env);
215         struct linkea_data      ldata = { NULL };
216         struct lu_buf           *buf = &info->mti_link_buf;
217         struct lu_name          lname;
218         int                     rc = 0;
219
220         ENTRY;
221
222         LASSERT(S_ISDIR(mdd_object_type(obj)));
223
224         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
225         if (buf->lb_buf == NULL)
226                 GOTO(lookup, rc = 0);
227
228         ldata.ld_buf = buf;
229         rc = mdd_links_read_with_rec(env, obj, &ldata);
230         if (rc != 0)
231                 GOTO(lookup, rc);
232
233         LASSERT(ldata.ld_leh != NULL);
234         /* Directory should only have 1 parent */
235         if (ldata.ld_leh->leh_reccount > 1)
236                 GOTO(lookup, rc);
237
238         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
239
240         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
241         if (likely(fid_is_sane(fid)))
242                 RETURN(0);
243 lookup:
244         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
245         RETURN(rc);
246 }
247
248 /*
249  * For root fid use special function, which does not compare version component
250  * of fid. Version component is different for root fids on all MDTs.
251  */
252 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
253 {
254         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
255                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
256 }
257
258 /*
259  * return 1: if \a tfid is the fid of the ancestor of \a mo;
260  * return 0: if not;
261  * otherwise: values < 0, errors.
262  */
263 static int mdd_is_parent(const struct lu_env *env,
264                         struct mdd_device *mdd,
265                         struct mdd_object *mo,
266                         const struct lu_attr *attr,
267                         const struct lu_fid *tfid)
268 {
269         struct mdd_object *mp;
270         struct lu_fid *pfid;
271         int rc;
272
273         LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
274         pfid = &mdd_env_info(env)->mti_fid;
275
276         if (mdd_is_root(mdd, mdo2fid(mo)))
277                 return 0;
278
279         if (mdd_is_root(mdd, tfid))
280                 return 1;
281
282         rc = mdd_parent_fid(env, mo, attr, pfid);
283         if (rc)
284                 return rc;
285
286         while (1) {
287                 if (lu_fid_eq(pfid, tfid))
288                         return 1;
289
290                 if (mdd_is_root(mdd, pfid))
291                         return 0;
292
293                 mp = mdd_object_find(env, mdd, pfid);
294                 if (IS_ERR(mp))
295                         return PTR_ERR(mp);
296
297                 if (!mdd_object_exists(mp)) {
298                         mdd_object_put(env, mp);
299                         return -ENOENT;
300                 }
301
302                 rc = mdd_parent_fid(env, mp, attr, pfid);
303                 mdd_object_put(env, mp);
304                 if (rc)
305                         return rc;
306         }
307
308         return 0;
309 }
310
311 /*
312  * No permission check is needed.
313  *
314  * returns 1: if fid is ancestor of @mo;
315  * returns 0: if fid is not an ancestor of @mo;
316  * returns < 0: if error
317  */
318 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
319                   const struct lu_fid *fid)
320 {
321         struct mdd_device *mdd = mdo2mdd(mo);
322         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
323         int rc;
324         ENTRY;
325
326         if (!mdd_object_exists(md2mdd_obj(mo)))
327                 RETURN(-ENOENT);
328
329         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
330                 RETURN(-ENOTDIR);
331
332         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
333         if (rc != 0)
334                 RETURN(rc);
335
336         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
337         RETURN(rc);
338 }
339
340 /*
341  * Check that @dir contains no entries except (possibly) dot and dotdot.
342  *
343  * Returns:
344  *
345  *             0        empty
346  *      -ENOTDIR        not a directory object
347  *    -ENOTEMPTY        not empty
348  *           -ve        other error
349  *
350  */
351 static int mdd_dir_is_empty(const struct lu_env *env,
352                             struct mdd_object *dir)
353 {
354         struct dt_it     *it;
355         struct dt_object *obj;
356         const struct dt_it_ops *iops;
357         int result;
358         ENTRY;
359
360         obj = mdd_object_child(dir);
361         if (!dt_try_as_dir(env, obj))
362                 RETURN(-ENOTDIR);
363
364         iops = &obj->do_index_ops->dio_it;
365         it = iops->init(env, obj, LUDA_64BITHASH);
366         if (!IS_ERR(it)) {
367                 result = iops->get(env, it, (const struct dt_key *)"");
368                 if (result > 0) {
369                         int i;
370                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
371                                 result = iops->next(env, it);
372                         if (result == 0)
373                                 result = -ENOTEMPTY;
374                         else if (result == 1)
375                                 result = 0;
376                 } else if (result == 0)
377                         /*
378                          * Huh? Index contains no zero key?
379                          */
380                         result = -EIO;
381
382                 iops->put(env, it);
383                 iops->fini(env, it);
384         } else
385                 result = PTR_ERR(it);
386         RETURN(result);
387 }
388
389 /**
390  * Determine if the target object can be hard linked, and right now it only
391  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
392  * directory nlink count might exceed the maximum link count(see
393  * osd_object_ref_add), so it only check nlink for non-directories.
394  *
395  * \param[in] env       thread environment
396  * \param[in] obj       object being linked to
397  * \param[in] la        attributes of \a obj
398  *
399  * \retval              0 if \a obj can be hard linked
400  * \retval              negative error if \a obj is a directory or has too
401  *                      many links
402  */
403 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
404                           const struct lu_attr *la)
405 {
406         struct mdd_device *m = mdd_obj2mdd_dev(obj);
407         ENTRY;
408
409         LASSERT(la != NULL);
410
411         /* Subdir count limitation can be broken through
412          * (see osd_object_ref_add), so only check non-directory here. */
413         if (!S_ISDIR(la->la_mode) &&
414             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
415                 RETURN(-EMLINK);
416
417         RETURN(0);
418 }
419
420 /**
421  * Check whether it may create the cobj under the pobj.
422  *
423  * \param[in] env       execution environment
424  * \param[in] pobj      the parent directory
425  * \param[in] pattr     the attribute of the parent directory
426  * \param[in] cobj      the child to be created
427  * \param[in] check_perm        if check WRITE|EXEC permission for parent
428  *
429  * \retval              = 0 create the child under this dir is allowed
430  * \retval              negative errno create the child under this dir is
431  *                      not allowed
432  */
433 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
434                    const struct lu_attr *pattr, struct mdd_object *cobj,
435                    bool check_perm)
436 {
437         int rc = 0;
438         ENTRY;
439
440         if (cobj && mdd_object_exists(cobj))
441                 RETURN(-EEXIST);
442
443         if (mdd_is_dead_obj(pobj))
444                 RETURN(-ENOENT);
445
446         if (check_perm)
447                 rc = mdd_permission_internal_locked(env, pobj, pattr,
448                                                     MAY_WRITE | MAY_EXEC,
449                                                     MOR_TGT_PARENT);
450         RETURN(rc);
451 }
452
453 /*
454  * Check whether can unlink from the pobj in the case of "cobj == NULL".
455  */
456 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
457                    const struct lu_attr *pattr, const struct lu_attr *attr)
458 {
459         int rc;
460         ENTRY;
461
462         if (mdd_is_dead_obj(pobj))
463                 RETURN(-ENOENT);
464
465         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
466                 RETURN(-EPERM);
467
468         rc = mdd_permission_internal_locked(env, pobj, pattr,
469                                             MAY_WRITE | MAY_EXEC,
470                                             MOR_TGT_PARENT);
471         if (rc != 0)
472                 RETURN(rc);
473
474         if (pattr->la_flags & LUSTRE_APPEND_FL)
475                 RETURN(-EPERM);
476
477         RETURN(rc);
478 }
479
480 /*
481  * pobj == NULL is remote ops case, under such case, pobj's
482  * VTX feature has been checked already, no need check again.
483  */
484 static inline int mdd_is_sticky(const struct lu_env *env,
485                                 struct mdd_object *pobj,
486                                 const struct lu_attr *pattr,
487                                 struct mdd_object *cobj,
488                                 const struct lu_attr *cattr)
489 {
490         struct lu_ucred *uc = lu_ucred_assert(env);
491
492         if (pobj != NULL) {
493                 LASSERT(pattr != NULL);
494                 if (!(pattr->la_mode & S_ISVTX) ||
495                     (pattr->la_uid == uc->uc_fsuid))
496                         return 0;
497         }
498
499         LASSERT(cattr != NULL);
500         if (cattr->la_uid == uc->uc_fsuid)
501                 return 0;
502
503         return !md_capable(uc, CFS_CAP_FOWNER);
504 }
505
506 static int mdd_may_delete_entry(const struct lu_env *env,
507                                 struct mdd_object *pobj,
508                                 const struct lu_attr *pattr,
509                                 int check_perm)
510 {
511         ENTRY;
512
513         LASSERT(pobj != NULL);
514         if (!mdd_object_exists(pobj))
515                 RETURN(-ENOENT);
516
517         if (mdd_is_dead_obj(pobj))
518                 RETURN(-ENOENT);
519
520         if (check_perm) {
521                 int rc;
522                 rc = mdd_permission_internal_locked(env, pobj, pattr,
523                                             MAY_WRITE | MAY_EXEC,
524                                             MOR_TGT_PARENT);
525                 if (rc)
526                         RETURN(rc);
527         }
528
529         if (pattr->la_flags & LUSTRE_APPEND_FL)
530                 RETURN(-EPERM);
531
532         RETURN(0);
533 }
534
535 /*
536  * Check whether it may delete the cobj from the pobj.
537  * pobj maybe NULL
538  */
539 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
540                    const struct lu_attr *tpattr, struct mdd_object *tobj,
541                    const struct lu_attr *tattr, const struct lu_attr *cattr,
542                    int check_perm, int check_empty)
543 {
544         int rc = 0;
545         ENTRY;
546
547         if (tpobj) {
548                 LASSERT(tpattr != NULL);
549                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
550                 if (rc != 0)
551                         RETURN(rc);
552         }
553
554         if (tobj == NULL)
555                 RETURN(0);
556
557         if (!mdd_object_exists(tobj))
558                 RETURN(-ENOENT);
559
560         if (mdd_is_dead_obj(tobj))
561                 RETURN(-ESTALE);
562
563         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
564                 RETURN(-EPERM);
565
566         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
567                 RETURN(-EPERM);
568
569         /* additional check the rename case */
570         if (cattr) {
571                 if (S_ISDIR(cattr->la_mode)) {
572                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
573
574                         if (!S_ISDIR(tattr->la_mode))
575                                 RETURN(-ENOTDIR);
576
577                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
578                                 RETURN(-EBUSY);
579                 } else if (S_ISDIR(tattr->la_mode))
580                         RETURN(-EISDIR);
581         }
582
583         if (S_ISDIR(tattr->la_mode) && check_empty)
584                 rc = mdd_dir_is_empty(env, tobj);
585
586         RETURN(rc);
587 }
588
589 /**
590  * Check whether it can create the link file(linked to @src_obj) under
591  * the target directory(@tgt_obj), and src_obj has been locked by
592  * mdd_write_lock.
593  *
594  * \param[in] env       execution environment
595  * \param[in] tgt_obj   the target directory
596  * \param[in] tattr     attributes of target directory
597  * \param[in] lname     the link name
598  * \param[in] src_obj   source object for link
599  * \param[in] cattr     attributes for source object
600  *
601  * \retval              = 0 it is allowed to create the link file under tgt_obj
602  * \retval              negative error not allowed to create the link file
603  */
604 static int mdd_link_sanity_check(const struct lu_env *env,
605                                  struct mdd_object *tgt_obj,
606                                  const struct lu_attr *tattr,
607                                  const struct lu_name *lname,
608                                  struct mdd_object *src_obj,
609                                  const struct lu_attr *cattr)
610 {
611         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
612         int rc = 0;
613         ENTRY;
614
615         if (!mdd_object_exists(src_obj))
616                 RETURN(-ENOENT);
617
618         if (mdd_is_dead_obj(src_obj))
619                 RETURN(-ESTALE);
620
621         /* Local ops, no lookup before link, check filename length here. */
622         rc = mdd_name_check(m, lname);
623         if (rc < 0)
624                 RETURN(rc);
625
626         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
627                 RETURN(-EPERM);
628
629         if (S_ISDIR(mdd_object_type(src_obj)))
630                 RETURN(-EPERM);
631
632         LASSERT(src_obj != tgt_obj);
633         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
634         if (rc != 0)
635                 RETURN(rc);
636
637         rc = __mdd_may_link(env, src_obj, cattr);
638
639         RETURN(rc);
640 }
641
642 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
643                                    const char *name, struct thandle *handle)
644 {
645         struct dt_object *next = mdd_object_child(pobj);
646         int rc;
647         ENTRY;
648
649         if (dt_try_as_dir(env, next))
650                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
651         else
652                 rc = -ENOTDIR;
653
654         RETURN(rc);
655 }
656
657 static int __mdd_index_insert_only(const struct lu_env *env,
658                                    struct mdd_object *pobj,
659                                    const struct lu_fid *lf, __u32 type,
660                                    const char *name,
661                                    struct thandle *handle)
662 {
663         struct dt_object *next = mdd_object_child(pobj);
664         int               rc;
665         ENTRY;
666
667         if (dt_try_as_dir(env, next)) {
668                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
669                 struct lu_ucred         *uc  = lu_ucred_check(env);
670                 int                      ignore_quota;
671
672                 rec->rec_fid = lf;
673                 rec->rec_type = type;
674                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
675                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
676                                (const struct dt_key *)name, handle,
677                                ignore_quota);
678         } else {
679                 rc = -ENOTDIR;
680         }
681         RETURN(rc);
682 }
683
684 /* insert named index, add reference if isdir */
685 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
686                               const struct lu_fid *lf, __u32 type,
687                               const char *name, struct thandle *handle)
688 {
689         int rc;
690         ENTRY;
691
692         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
693         if (rc == 0 && S_ISDIR(type)) {
694                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
695                 mdo_ref_add(env, pobj, handle);
696                 mdd_write_unlock(env, pobj);
697         }
698
699         RETURN(rc);
700 }
701
702 /* delete named index, drop reference if isdir */
703 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
704                               const char *name, int is_dir,
705                               struct thandle *handle)
706 {
707         int               rc;
708         ENTRY;
709
710         rc = __mdd_index_delete_only(env, pobj, name, handle);
711         if (rc == 0 && is_dir) {
712                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
713                 mdo_ref_del(env, pobj, handle);
714                 mdd_write_unlock(env, pobj);
715         }
716
717         RETURN(rc);
718 }
719
720 static int mdd_llog_record_calc_size(const struct lu_env *env,
721                                      const struct lu_name *tname,
722                                      const struct lu_name *sname)
723 {
724         const struct lu_ucred   *uc = lu_ucred(env);
725         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
726         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
727
728         if (sname != NULL)
729                 crf |= CLF_RENAME;
730
731         if (uc != NULL && uc->uc_jobid[0] != '\0')
732                 crf |= CLF_JOBID;
733
734         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
735                              changelog_rec_offset(crf, crfe) +
736                              (tname != NULL ? tname->ln_namelen : 0) +
737                              (sname != NULL ? 1 + sname->ln_namelen : 0));
738 }
739
740 int mdd_declare_changelog_store(const struct lu_env *env,
741                                 struct mdd_device *mdd,
742                                 enum changelog_rec_type type,
743                                 const struct lu_name *tname,
744                                 const struct lu_name *sname,
745                                 struct thandle *handle)
746 {
747         struct obd_device               *obd = mdd2obd_dev(mdd);
748         struct llog_ctxt                *ctxt;
749         struct llog_changelog_rec       *rec;
750         struct lu_buf                   *buf;
751         struct thandle                  *llog_th;
752         int                              reclen;
753         int                              rc;
754
755         if (!mdd_changelog_enabled(env, mdd, type))
756                 return 0;
757
758         reclen = mdd_llog_record_calc_size(env, tname, sname);
759         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
760         if (buf->lb_buf == NULL)
761                 return -ENOMEM;
762
763         rec = buf->lb_buf;
764         rec->cr_hdr.lrh_len = reclen;
765         rec->cr_hdr.lrh_type = CHANGELOG_REC;
766
767         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
768         if (ctxt == NULL)
769                 return -ENXIO;
770
771         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
772         if (IS_ERR(llog_th))
773                 GOTO(out_put, rc = PTR_ERR(llog_th));
774
775         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
776
777 out_put:
778         llog_ctxt_put(ctxt);
779
780         return rc;
781 }
782
783 /** Add a changelog entry \a rec to the changelog llog
784  * \param mdd
785  * \param rec
786  * \param handle - currently ignored since llogs start their own transaction;
787  *              this will hopefully be fixed in llog rewrite
788  * \retval 0 ok
789  */
790 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
791                         struct llog_changelog_rec *rec, struct thandle *th)
792 {
793         struct obd_device       *obd = mdd2obd_dev(mdd);
794         struct llog_ctxt        *ctxt;
795         struct thandle          *llog_th;
796         int                      rc;
797
798         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
799                                             changelog_rec_varsize(&rec->cr));
800
801         /* llog_lvfs_write_rec sets the llog tail len */
802         rec->cr_hdr.lrh_type = CHANGELOG_REC;
803         rec->cr.cr_time = cl_time();
804
805         spin_lock(&mdd->mdd_cl.mc_lock);
806         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
807          * but as long as the MDD transactions are ordered correctly for e.g.
808          * rename conflicts, I don't think this should matter. */
809         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
810         spin_unlock(&mdd->mdd_cl.mc_lock);
811
812         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
813         if (ctxt == NULL)
814                 return -ENXIO;
815
816         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
817         if (IS_ERR(llog_th))
818                 GOTO(out_put, rc = PTR_ERR(llog_th));
819
820         /* nested journal transaction */
821         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
822
823         /* time to recover some space ?? */
824         if (likely(!mdd->mdd_changelog_gc ||
825                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
826                    mdd->mdd_changelog_min_gc_interval >=
827                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
828                 /* save a spin_lock trip */
829                 goto out_put;
830         spin_lock(&mdd->mdd_cl.mc_lock);
831         if (likely(mdd->mdd_changelog_gc &&
832                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
833                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
834                         mdd->mdd_changelog_min_gc_interval)) {
835                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
836                              mdd->mdd_changelog_min_free_cat_entries ||
837                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
838                         CWARN("%s:%s low on changelog_catalog free entries, "
839                               "starting ChangeLog garbage collection thread\n",
840                               obd->obd_name,
841                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
842                                 " simulate" : "");
843
844                         /* indicate further kthread run will occur outside
845                          * right after current journal transaction filling has
846                          * completed
847                          */
848                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
849                 }
850                 /* next check in mdd_changelog_min_gc_interval anyway
851                  */
852                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
853         }
854         spin_unlock(&mdd->mdd_cl.mc_lock);
855 out_put:
856         llog_ctxt_put(ctxt);
857         if (rc > 0)
858                 rc = 0;
859         return rc;
860 }
861
862 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
863                                          const struct lu_fid *sfid,
864                                          const struct lu_fid *spfid,
865                                          const struct lu_name *sname)
866 {
867         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
868         size_t extsize;
869
870         LASSERT(sfid != NULL);
871         LASSERT(spfid != NULL);
872         LASSERT(sname != NULL);
873
874         extsize = sname->ln_namelen + 1;
875
876         rnm->cr_sfid = *sfid;
877         rnm->cr_spfid = *spfid;
878
879         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
880         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
881         rec->cr_namelen += extsize;
882 }
883
884 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
885 {
886         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
887
888         if (jobid == NULL || jobid[0] == '\0')
889                 return;
890
891         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
892 }
893
894 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
895 {
896         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
897
898         ef->cr_extra_flags = eflags;
899 }
900
901 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
902                                     __u64 uid, __u64 gid)
903 {
904         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
905
906         uidgid->cr_uid = uid;
907         uidgid->cr_gid = gid;
908 }
909
910 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
911                                  lnet_nid_t nid)
912 {
913         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
914
915         clnid->cr_nid = nid;
916 }
917
918 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, int flags)
919 {
920         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
921
922         omd->cr_openflags = (__u32)flags;
923 }
924
925 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
926                                    const char *xattr_name)
927 {
928         struct changelog_ext_xattr    *xattr = changelog_rec_xattr(rec);
929
930         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
931 }
932
933 /** Store a namespace change changelog record
934  * If this fails, we must fail the whole transaction; we don't
935  * want the change to commit without the log entry.
936  * \param target - mdd_object of change
937  * \param tpfid - target parent dir/object fid
938  * \param sfid - source object fid
939  * \param spfid - source parent fid
940  * \param tname - target name string
941  * \param sname - source name string
942  * \param handle - transaction handle
943  */
944 int mdd_changelog_ns_store(const struct lu_env *env,
945                            struct mdd_device *mdd,
946                            enum changelog_rec_type type,
947                            enum changelog_rec_flags crf,
948                            struct mdd_object *target,
949                            const struct lu_fid *tpfid,
950                            const struct lu_fid *sfid,
951                            const struct lu_fid *spfid,
952                            const struct lu_name *tname,
953                            const struct lu_name *sname,
954                            struct thandle *handle)
955 {
956         const struct lu_ucred           *uc = lu_ucred(env);
957         struct llog_changelog_rec       *rec;
958         struct lu_buf                   *buf;
959         int                              reclen;
960         __u64                            xflags = CLFE_INVALID;
961         int                              rc;
962         ENTRY;
963
964         if (!mdd_changelog_enabled(env, mdd, type))
965                 RETURN(0);
966
967         LASSERT(tpfid != NULL);
968         LASSERT(tname != NULL);
969         LASSERT(handle != NULL);
970
971         reclen = mdd_llog_record_calc_size(env, tname, sname);
972         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
973         if (buf->lb_buf == NULL)
974                 RETURN(-ENOMEM);
975         rec = buf->lb_buf;
976
977         crf &= CLF_FLAGMASK;
978         crf |= CLF_EXTRA_FLAGS;
979
980         if (uc) {
981                 if (uc->uc_jobid[0] != '\0')
982                         crf |= CLF_JOBID;
983                 xflags |= CLFE_UIDGID;
984                 xflags |= CLFE_NID;
985         }
986
987         if (sname != NULL)
988                 crf |= CLF_RENAME;
989         else
990                 crf |= CLF_VERSION;
991
992         rec->cr.cr_flags = crf;
993
994         if (crf & CLF_EXTRA_FLAGS) {
995                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
996                 if (xflags & CLFE_UIDGID)
997                         mdd_changelog_rec_extra_uidgid(&rec->cr,
998                                                        uc->uc_uid, uc->uc_gid);
999                 if (xflags & CLFE_NID)
1000                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
1001         }
1002
1003         rec->cr.cr_type = (__u32)type;
1004         rec->cr.cr_pfid = *tpfid;
1005         rec->cr.cr_namelen = tname->ln_namelen;
1006         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1007
1008         if (crf & CLF_RENAME)
1009                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1010
1011         if (crf & CLF_JOBID)
1012                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1013
1014         if (likely(target != NULL)) {
1015                 rec->cr.cr_tfid = *mdo2fid(target);
1016                 target->mod_cltime = ktime_get();
1017         } else {
1018                 fid_zero(&rec->cr.cr_tfid);
1019         }
1020
1021         rc = mdd_changelog_store(env, mdd, rec, handle);
1022         if (rc < 0) {
1023                 CERROR("%s: cannot store changelog record: type = %d, "
1024                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1025                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1026                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1027                 return -EFAULT;
1028         }
1029
1030         return 0;
1031 }
1032
1033 static int __mdd_links_add(const struct lu_env *env,
1034                            struct mdd_object *mdd_obj,
1035                            struct linkea_data *ldata,
1036                            const struct lu_name *lname,
1037                            const struct lu_fid *pfid,
1038                            int first, int check)
1039 {
1040         int rc;
1041
1042         if (ldata->ld_leh == NULL) {
1043                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1044                 if (rc) {
1045                         if (rc != -ENODATA)
1046                                 return rc;
1047                         rc = linkea_data_new(ldata,
1048                                              &mdd_env_info(env)->mti_link_buf);
1049                         if (rc)
1050                                 return rc;
1051                 }
1052         }
1053
1054         if (check) {
1055                 rc = linkea_links_find(ldata, lname, pfid);
1056                 if (rc && rc != -ENOENT)
1057                         return rc;
1058                 if (rc == 0)
1059                         return -EEXIST;
1060         }
1061
1062         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1063                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1064
1065                 *tfid = *pfid;
1066                 tfid->f_ver = ~0;
1067                 linkea_add_buf(ldata, lname, tfid);
1068         }
1069
1070         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1071                 linkea_add_buf(ldata, lname, pfid);
1072
1073         return linkea_add_buf(ldata, lname, pfid);
1074 }
1075
1076 static int __mdd_links_del(const struct lu_env *env,
1077                            struct mdd_object *mdd_obj,
1078                            struct linkea_data *ldata,
1079                            const struct lu_name *lname,
1080                            const struct lu_fid *pfid)
1081 {
1082         int rc;
1083
1084         if (ldata->ld_leh == NULL) {
1085                 rc = mdd_links_read(env, mdd_obj, ldata);
1086                 if (rc)
1087                         return rc;
1088         }
1089
1090         rc = linkea_links_find(ldata, lname, pfid);
1091         if (rc)
1092                 return rc;
1093
1094         linkea_del_buf(ldata, lname);
1095         return 0;
1096 }
1097
1098 static int mdd_linkea_prepare(const struct lu_env *env,
1099                               struct mdd_object *mdd_obj,
1100                               const struct lu_fid *oldpfid,
1101                               const struct lu_name *oldlname,
1102                               const struct lu_fid *newpfid,
1103                               const struct lu_name *newlname,
1104                               int first, int check,
1105                               struct linkea_data *ldata)
1106 {
1107         int rc = 0;
1108         ENTRY;
1109
1110         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1111                 RETURN(0);
1112
1113         LASSERT(oldpfid != NULL || newpfid != NULL);
1114
1115         if (mdd_obj->mod_flags & DEAD_OBJ)
1116                 /* Unnecessary to update linkEA for dead object.  */
1117                 RETURN(0);
1118
1119         if (oldpfid != NULL) {
1120                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1121                 if (rc) {
1122                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1123                                 RETURN(rc);
1124
1125                         /* No changes done. */
1126                         rc = 0;
1127                 }
1128         }
1129
1130         /* If renaming, add the new record */
1131         if (newpfid != NULL)
1132                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1133                                      first, check);
1134
1135         RETURN(rc);
1136 }
1137
1138 int mdd_links_rename(const struct lu_env *env,
1139                      struct mdd_object *mdd_obj,
1140                      const struct lu_fid *oldpfid,
1141                      const struct lu_name *oldlname,
1142                      const struct lu_fid *newpfid,
1143                      const struct lu_name *newlname,
1144                      struct thandle *handle,
1145                      struct linkea_data *ldata,
1146                      int first, int check)
1147 {
1148         int rc = 0;
1149         ENTRY;
1150
1151         if (ldata == NULL) {
1152                 ldata = &mdd_env_info(env)->mti_link_data;
1153                 memset(ldata, 0, sizeof(*ldata));
1154                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1155                                         newpfid, newlname, first, check, ldata);
1156                 if (rc)
1157                         GOTO(out, rc);
1158         }
1159
1160         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1161                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1162
1163         GOTO(out, rc);
1164
1165 out:
1166         if (rc != 0) {
1167                 if (newlname == NULL)
1168                         CERROR("link_ea add failed %d "DFID"\n",
1169                                rc, PFID(mdd_object_fid(mdd_obj)));
1170                 else if (oldpfid == NULL)
1171                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1172                                newlname->ln_namelen, newlname->ln_name, rc,
1173                                PFID(mdd_object_fid(mdd_obj)));
1174                 else if (newpfid == NULL)
1175                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1176                                oldlname->ln_namelen, oldlname->ln_name, rc,
1177                                PFID(mdd_object_fid(mdd_obj)));
1178                 else
1179                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1180                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1181                                newlname->ln_namelen, newlname->ln_name, rc,
1182                                PFID(mdd_object_fid(mdd_obj)));
1183         }
1184
1185         if (is_vmalloc_addr(ldata->ld_buf))
1186                 /* if we vmalloced a large buffer drop it */
1187                 lu_buf_free(ldata->ld_buf);
1188
1189         return rc;
1190 }
1191
1192 static inline int mdd_links_add(const struct lu_env *env,
1193                                 struct mdd_object *mdd_obj,
1194                                 const struct lu_fid *pfid,
1195                                 const struct lu_name *lname,
1196                                 struct thandle *handle,
1197                                 struct linkea_data *ldata, int first)
1198 {
1199         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1200                                 pfid, lname, handle, ldata, first, 0);
1201 }
1202
1203 static inline int mdd_links_del(const struct lu_env *env,
1204                                 struct mdd_object *mdd_obj,
1205                                 const struct lu_fid *pfid,
1206                                 const struct lu_name *lname,
1207                                 struct thandle *handle)
1208 {
1209         return mdd_links_rename(env, mdd_obj, pfid, lname,
1210                                 NULL, NULL, handle, NULL, 0, 0);
1211 }
1212
1213 /** Read the link EA into a temp buffer.
1214  * Uses the name_buf since it is generally large.
1215  * \retval IS_ERR err
1216  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1217  */
1218 struct lu_buf *mdd_links_get(const struct lu_env *env,
1219                              struct mdd_object *mdd_obj)
1220 {
1221         struct linkea_data ldata = { NULL };
1222         int rc;
1223
1224         rc = mdd_links_read(env, mdd_obj, &ldata);
1225         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1226 }
1227
1228 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1229                     struct linkea_data *ldata, struct thandle *handle)
1230 {
1231         const struct lu_buf *buf;
1232         int                 rc;
1233
1234         if (ldata == NULL || ldata->ld_buf == NULL ||
1235             ldata->ld_leh == NULL)
1236                 return 0;
1237
1238         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1239                 return 0;
1240
1241 again:
1242         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1243                                 ldata->ld_leh->leh_len);
1244         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1245         if (unlikely(rc == -ENOSPC)) {
1246                 rc = linkea_overflow_shrink(ldata);
1247                 if (likely(rc > 0))
1248                         goto again;
1249         }
1250
1251         return rc;
1252 }
1253
1254 static int mdd_declare_links_add(const struct lu_env *env,
1255                                  struct mdd_object *mdd_obj,
1256                                  struct thandle *handle,
1257                                  struct linkea_data *ldata)
1258 {
1259         int     rc;
1260         int     ea_len;
1261         void    *linkea;
1262
1263         if (ldata != NULL && ldata->ld_leh != NULL) {
1264                 ea_len = ldata->ld_leh->leh_len;
1265                 linkea = ldata->ld_buf->lb_buf;
1266         } else {
1267                 ea_len = MAX_LINKEA_SIZE;
1268                 linkea = NULL;
1269         }
1270
1271         rc = mdo_declare_xattr_set(env, mdd_obj,
1272                                    mdd_buf_get_const(env, linkea, ea_len),
1273                                    XATTR_NAME_LINK, 0, handle);
1274
1275         return rc;
1276 }
1277
1278 static inline int mdd_declare_links_del(const struct lu_env *env,
1279                                         struct mdd_object *c,
1280                                         struct thandle *handle)
1281 {
1282         int rc = 0;
1283
1284         /* For directory, the linkEA will be removed together
1285          * with the object. */
1286         if (!S_ISDIR(mdd_object_type(c)))
1287                 rc = mdd_declare_links_add(env, c, handle, NULL);
1288
1289         return rc;
1290 }
1291
1292 static int mdd_declare_link(const struct lu_env *env,
1293                             struct mdd_device *mdd,
1294                             struct mdd_object *p,
1295                             struct mdd_object *c,
1296                             const struct lu_name *name,
1297                             struct thandle *handle,
1298                             struct lu_attr *la,
1299                             struct linkea_data *data)
1300 {
1301         struct lu_fid tfid = *mdo2fid(c);
1302         int rc;
1303
1304         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1305                 tfid.f_oid = cfs_fail_val;
1306
1307         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1308                                       name->ln_name, handle);
1309         if (rc != 0)
1310                 return rc;
1311
1312         rc = mdo_declare_ref_add(env, c, handle);
1313         if (rc != 0)
1314                 return rc;
1315
1316         la->la_valid = LA_CTIME | LA_MTIME;
1317         rc = mdo_declare_attr_set(env, p, la, handle);
1318         if (rc != 0)
1319                 return rc;
1320
1321         la->la_valid = LA_CTIME;
1322         rc = mdo_declare_attr_set(env, c, la, handle);
1323         if (rc != 0)
1324                 return rc;
1325
1326         rc = mdd_declare_links_add(env, c, handle, data);
1327         if (rc != 0)
1328                 return rc;
1329
1330         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1331                                          handle);
1332
1333         return rc;
1334 }
1335
1336 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1337                     struct md_object *src_obj, const struct lu_name *lname,
1338                     struct md_attr *ma)
1339 {
1340         const char *name = lname->ln_name;
1341         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1342         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1343         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1344         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1345         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1346         struct mdd_device *mdd = mdo2mdd(src_obj);
1347         struct thandle *handle;
1348         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1349         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1350         int rc;
1351         ENTRY;
1352
1353         rc = mdd_la_get(env, mdd_sobj, cattr);
1354         if (rc != 0)
1355                 RETURN(rc);
1356
1357         rc = mdd_la_get(env, mdd_tobj, tattr);
1358         if (rc != 0)
1359                 RETURN(rc);
1360
1361         /*
1362          * If we are using project inheritance, we only allow hard link
1363          * creation in our tree when the project IDs are the same;
1364          * otherwise the tree quota mechanism could be circumvented.
1365          */
1366         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1367             (tattr->la_projid != cattr->la_projid))
1368                 RETURN(-EXDEV);
1369
1370         handle = mdd_trans_create(env, mdd);
1371         if (IS_ERR(handle))
1372                 GOTO(out_pending, rc = PTR_ERR(handle));
1373
1374         memset(ldata, 0, sizeof(*ldata));
1375
1376         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1377         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1378
1379         /* Note: even this function will change ldata, but it comes from
1380          * thread_info, which is completely temporary and only seen in
1381          * this function, so we do not need reset ldata once it fails.*/
1382         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1383                                 lname, 0, 0, ldata);
1384         if (rc != 0)
1385                 GOTO(stop, rc);
1386
1387         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1388                               la, ldata);
1389         if (rc)
1390                 GOTO(stop, rc);
1391
1392         rc = mdd_trans_start(env, mdd, handle);
1393         if (rc)
1394                 GOTO(stop, rc);
1395
1396         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1397         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1398                                    cattr);
1399         if (rc)
1400                 GOTO(out_unlock, rc);
1401
1402         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1403                 rc = mdo_ref_add(env, mdd_sobj, handle);
1404                 if (rc != 0)
1405                         GOTO(out_unlock, rc);
1406         }
1407
1408         *tfid = *mdo2fid(mdd_sobj);
1409         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1410                 tfid->f_oid = cfs_fail_val;
1411
1412         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1413                                      mdd_object_type(mdd_sobj), name, handle);
1414         if (rc != 0) {
1415                 mdo_ref_del(env, mdd_sobj, handle);
1416                 GOTO(out_unlock, rc);
1417         }
1418
1419         la->la_valid = LA_CTIME | LA_MTIME;
1420         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1421         if (rc)
1422                 GOTO(out_unlock, rc);
1423
1424         la->la_valid = LA_CTIME;
1425         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1426         if (rc == 0)
1427                 /* Note: The failure of links_add should not cause the
1428                  * link failure, so do not check return value. */
1429                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1430                               lname, handle, ldata, 0);
1431
1432         EXIT;
1433 out_unlock:
1434         mdd_write_unlock(env, mdd_sobj);
1435         if (rc == 0)
1436                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1437                                             mdo2fid(mdd_tobj), NULL, NULL,
1438                                             lname, NULL, handle);
1439 stop:
1440         rc = mdd_trans_stop(env, mdd, rc, handle);
1441         if (is_vmalloc_addr(ldata->ld_buf))
1442                 /* if we vmalloced a large buffer drop it */
1443                 lu_buf_free(ldata->ld_buf);
1444 out_pending:
1445         return rc;
1446 }
1447
1448 static int mdd_mark_orphan_object(const struct lu_env *env,
1449                                 struct mdd_object *obj, struct thandle *handle,
1450                                 bool declare)
1451 {
1452         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1453         int rc;
1454
1455         if (!S_ISDIR(mdd_object_type(obj)))
1456                 return 0;
1457
1458         attr->la_valid = LA_FLAGS;
1459         attr->la_flags = LUSTRE_ORPHAN_FL;
1460
1461         if (declare)
1462                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1463         else
1464                 rc = mdo_attr_set(env, obj, attr, handle);
1465
1466         return rc;
1467 }
1468
1469 static int mdd_declare_finish_unlink(const struct lu_env *env,
1470                                      struct mdd_object *obj,
1471                                      struct thandle *handle)
1472 {
1473         int rc;
1474
1475         /* Sigh, we do not know if the unlink object will become orphan in
1476          * declare phase, but fortunately the flags here does not matter
1477          * in current declare implementation */
1478         rc = mdd_mark_orphan_object(env, obj, handle, true);
1479         if (rc != 0)
1480                 return rc;
1481
1482         rc = mdo_declare_destroy(env, obj, handle);
1483         if (rc != 0)
1484                 return rc;
1485
1486         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1487         if (rc != 0)
1488                 return rc;
1489
1490         return mdd_declare_links_del(env, obj, handle);
1491 }
1492
1493 /* caller should take a lock before calling */
1494 int mdd_finish_unlink(const struct lu_env *env,
1495                       struct mdd_object *obj, struct md_attr *ma,
1496                       const struct mdd_object *pobj,
1497                       const struct lu_name *lname,
1498                       struct thandle *th)
1499 {
1500         int rc = 0;
1501         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1502         ENTRY;
1503
1504         LASSERT(mdd_write_locked(env, obj) != 0);
1505
1506         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1507                 /* add new orphan and the object
1508                  * will be deleted during mdd_close() */
1509                 obj->mod_flags |= DEAD_OBJ;
1510                 if (obj->mod_count) {
1511                         rc = mdd_orphan_insert(env, obj, th);
1512                         if (rc == 0)
1513                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1514                                         "orphan list, open count = %d\n",
1515                                         PFID(mdd_object_fid(obj)),
1516                                         obj->mod_count);
1517                         else
1518                                 CERROR("Object "DFID" fail to be an orphan, "
1519                                        "open count = %d, maybe cause failed "
1520                                        "open replay\n",
1521                                         PFID(mdd_object_fid(obj)),
1522                                         obj->mod_count);
1523
1524                         /* mark object as an orphan here, not
1525                          * before mdd_orphan_insert() as racing
1526                          * mdd_la_get() may propagate ORPHAN_OBJ
1527                          * causing the asserition */
1528                         rc = mdd_mark_orphan_object(env, obj, th, false);
1529                 } else {
1530                         rc = mdo_destroy(env, obj, th);
1531                 }
1532         } else if (!is_dir) {
1533                 /* old files may not have link ea; ignore errors */
1534                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1535         }
1536
1537         RETURN(rc);
1538 }
1539
1540 /*
1541  * pobj maybe NULL
1542  * has mdd_write_lock on cobj already, but not on pobj yet
1543  */
1544 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1545                             const struct lu_attr *pattr,
1546                             struct mdd_object *cobj,
1547                             const struct lu_attr *cattr)
1548 {
1549         int rc;
1550         ENTRY;
1551
1552         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1553
1554         RETURN(rc);
1555 }
1556
1557 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1558                               struct mdd_object *p, struct mdd_object *c,
1559                               const struct lu_name *name, struct md_attr *ma,
1560                               struct thandle *handle, int no_name, int is_dir)
1561 {
1562         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1563         int              rc;
1564
1565         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1566                 if (likely(no_name == 0)) {
1567                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1568                                                       handle);
1569                         if (rc != 0)
1570                                 return rc;
1571                 }
1572
1573                 if (is_dir != 0) {
1574                         rc = mdo_declare_ref_del(env, p, handle);
1575                         if (rc != 0)
1576                                 return rc;
1577                 }
1578         }
1579
1580         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1581         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1582         la->la_valid = LA_CTIME | LA_MTIME;
1583         rc = mdo_declare_attr_set(env, p, la, handle);
1584         if (rc)
1585                 return rc;
1586
1587         if (c != NULL) {
1588                 rc = mdo_declare_ref_del(env, c, handle);
1589                 if (rc)
1590                         return rc;
1591
1592                 rc = mdo_declare_ref_del(env, c, handle);
1593                 if (rc)
1594                         return rc;
1595
1596                 la->la_valid = LA_CTIME;
1597                 rc = mdo_declare_attr_set(env, c, la, handle);
1598                 if (rc)
1599                         return rc;
1600
1601                 rc = mdd_declare_finish_unlink(env, c, handle);
1602                 if (rc)
1603                         return rc;
1604
1605                 /* FIXME: need changelog for remove entry */
1606                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1607                                                  NULL, handle);
1608         }
1609
1610         return rc;
1611 }
1612
1613 /*
1614  * test if a file has an HSM archive
1615  * if HSM attributes are not found in ma update them from
1616  * HSM xattr
1617  */
1618 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1619                                    struct mdd_object *obj,
1620                                    struct md_attr *ma)
1621 {
1622         ENTRY;
1623
1624         if (!(ma->ma_valid & MA_HSM)) {
1625                 /* no HSM MD provided, read xattr */
1626                 struct lu_buf   *hsm_buf;
1627                 const size_t     buflen = sizeof(struct hsm_attrs);
1628                 int              rc;
1629
1630                 hsm_buf = mdd_buf_get(env, NULL, 0);
1631                 lu_buf_alloc(hsm_buf, buflen);
1632                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1633                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1634                 lu_buf_free(hsm_buf);
1635                 if (rc < 0)
1636                         RETURN(false);
1637
1638                 ma->ma_valid |= MA_HSM;
1639         }
1640         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1641                 RETURN(true);
1642         RETURN(false);
1643 }
1644
1645 /**
1646  * Delete name entry and the object.
1647  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1648  * does not exist for this object, and it could only happen during resending
1649  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1650  * is also needed in this case(needed by changelog), so we have to add another
1651  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1652  * the ENOENT failure should be able to be fixed by redo mechanism.
1653  */
1654 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1655                       struct md_object *cobj, const struct lu_name *lname,
1656                       struct md_attr *ma, int no_name)
1657 {
1658         const char *name = lname->ln_name;
1659         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1660         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1661         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1662         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1663         struct mdd_object *mdd_cobj = NULL;
1664         struct mdd_device *mdd = mdo2mdd(pobj);
1665         struct thandle    *handle;
1666         int rc, is_dir = 0, cl_flags = 0;
1667         ENTRY;
1668
1669         /* let shutdown to start */
1670         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
1671
1672         /* cobj == NULL means only delete name entry */
1673         if (likely(cobj != NULL)) {
1674                 mdd_cobj = md2mdd_obj(cobj);
1675                 if (mdd_object_exists(mdd_cobj) == 0)
1676                         RETURN(-ENOENT);
1677         }
1678
1679         rc = mdd_la_get(env, mdd_pobj, pattr);
1680         if (rc)
1681                 RETURN(rc);
1682
1683         if (likely(mdd_cobj != NULL)) {
1684                 /* fetch cattr */
1685                 rc = mdd_la_get(env, mdd_cobj, cattr);
1686                 if (rc)
1687                         RETURN(rc);
1688
1689                 is_dir = S_ISDIR(cattr->la_mode);
1690                 /* search for an existing archive.
1691                  * we should check ahead as the object
1692                  * can be destroyed in this transaction */
1693                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1694                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1695         }
1696
1697         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1698         if (rc)
1699                 RETURN(rc);
1700
1701         handle = mdd_trans_create(env, mdd);
1702         if (IS_ERR(handle))
1703                 RETURN(PTR_ERR(handle));
1704
1705         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1706                                 lname, ma, handle, no_name, is_dir);
1707         if (rc)
1708                 GOTO(stop, rc);
1709
1710         rc = mdd_trans_start(env, mdd, handle);
1711         if (rc)
1712                 GOTO(stop, rc);
1713
1714         if (likely(mdd_cobj != NULL))
1715                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1716
1717         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1718                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1719                 if (rc)
1720                         GOTO(cleanup, rc);
1721         }
1722
1723         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1724             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1725                 GOTO(cleanup, rc = 0);
1726
1727         if (likely(mdd_cobj != NULL)) {
1728                 rc = mdo_ref_del(env, mdd_cobj, handle);
1729                 if (rc != 0) {
1730                         __mdd_index_insert_only(env, mdd_pobj,
1731                                                 mdo2fid(mdd_cobj),
1732                                                 mdd_object_type(mdd_cobj),
1733                                                 name, handle);
1734                         GOTO(cleanup, rc);
1735                 }
1736
1737                 if (is_dir)
1738                         /* unlink dot */
1739                         mdo_ref_del(env, mdd_cobj, handle);
1740
1741                 /* fetch updated nlink */
1742                 rc = mdd_la_get(env, mdd_cobj, cattr);
1743                 if (rc)
1744                         GOTO(cleanup, rc);
1745         }
1746
1747         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1748         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1749
1750         la->la_valid = LA_CTIME | LA_MTIME;
1751         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1752         if (rc)
1753                 GOTO(cleanup, rc);
1754
1755         /* Enough for only unlink the entry */
1756         if (unlikely(mdd_cobj == NULL))
1757                 GOTO(stop, rc);
1758
1759         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1760                 /* update ctime of an unlinked file only if it is still
1761                  * opened or a link still exists */
1762                 la->la_valid = LA_CTIME;
1763                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1764                 if (rc)
1765                         GOTO(cleanup, rc);
1766         }
1767
1768         /* XXX: this transfer to ma will be removed with LOD/OSP */
1769         ma->ma_attr = *cattr;
1770         ma->ma_valid |= MA_INODE;
1771         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1772         if (rc != 0)
1773                 GOTO(cleanup, rc);
1774
1775         /* fetch updated nlink */
1776         rc = mdd_la_get(env, mdd_cobj, cattr);
1777         /* if object is removed then we can't get its attrs,
1778          * use last get */
1779         if (rc == -ENOENT) {
1780                 cattr->la_nlink = 0;
1781                 rc = 0;
1782         }
1783
1784         if (cattr->la_nlink == 0) {
1785                 ma->ma_attr = *cattr;
1786                 ma->ma_valid |= MA_INODE;
1787         }
1788
1789         EXIT;
1790 cleanup:
1791         if (likely(mdd_cobj != NULL))
1792                 mdd_write_unlock(env, mdd_cobj);
1793
1794         if (rc == 0) {
1795                 if (cattr->la_nlink == 0)
1796                         cl_flags |= CLF_UNLINK_LAST;
1797                 else
1798                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1799
1800                 rc = mdd_changelog_ns_store(env, mdd,
1801                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1802                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1803                         handle);
1804         }
1805
1806 stop:
1807         rc = mdd_trans_stop(env, mdd, rc, handle);
1808
1809         return rc;
1810 }
1811
1812 /*
1813  * The permission has been checked when obj created, no need check again.
1814  */
1815 static int mdd_cd_sanity_check(const struct lu_env *env,
1816                                struct mdd_object *obj)
1817 {
1818         ENTRY;
1819
1820         /* EEXIST check */
1821         if (!obj || mdd_is_dead_obj(obj))
1822                 RETURN(-ENOENT);
1823
1824         RETURN(0);
1825 }
1826
1827 static int mdd_create_data(const struct lu_env *env,
1828                            struct md_object *pobj,
1829                            struct md_object *cobj,
1830                            const struct md_op_spec *spec,
1831                            struct md_attr *ma)
1832 {
1833         struct mdd_device *mdd = mdo2mdd(cobj);
1834         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1835         struct mdd_object *son = md2mdd_obj(cobj);
1836         struct thandle    *handle;
1837         const struct lu_buf *buf;
1838         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1839         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1840         int                rc;
1841         ENTRY;
1842
1843         rc = mdd_cd_sanity_check(env, son);
1844         if (rc)
1845                 RETURN(rc);
1846
1847         if (!md_should_create(spec->sp_cr_flags))
1848                 RETURN(0);
1849
1850         /*
1851          * there are following use cases for this function:
1852          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1853          *    striping can be specified or not
1854          * 2) CMD?
1855          */
1856         rc = mdd_la_get(env, son, attr);
1857         if (rc)
1858                 RETURN(rc);
1859
1860         /* calling ->ah_make_hint() is used to transfer information from parent */
1861         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1862
1863         handle = mdd_trans_create(env, mdd);
1864         if (IS_ERR(handle))
1865                 GOTO(out_free, rc = PTR_ERR(handle));
1866
1867         /*
1868          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1869          * Should this be fixed?
1870          */
1871         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1872                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1873                spec->sp_cr_flags, spec->no_create);
1874
1875         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1876                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1877                                         spec->u.sp_ea.eadatalen);
1878         } else {
1879                 buf = &LU_BUF_NULL;
1880         }
1881
1882         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1883                                   XATTR_NAME_LOV, 0, handle);
1884         if (rc)
1885                 GOTO(stop, rc);
1886
1887         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1888                                          handle);
1889         if (rc)
1890                 GOTO(stop, rc);
1891
1892         rc = mdd_trans_start(env, mdd, handle);
1893         if (rc)
1894                 GOTO(stop, rc);
1895
1896         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1897                           0, handle);
1898
1899         if (rc)
1900                 GOTO(stop, rc);
1901
1902         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1903
1904 stop:
1905         rc = mdd_trans_stop(env, mdd, rc, handle);
1906
1907 out_free:
1908         RETURN(rc);
1909 }
1910
1911 static int mdd_declare_object_initialize(const struct lu_env *env,
1912                                          struct mdd_object *parent,
1913                                          struct mdd_object *child,
1914                                          const struct lu_attr *attr,
1915                                          struct thandle *handle)
1916 {
1917         int rc;
1918         ENTRY;
1919
1920         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1921         if (!S_ISDIR(attr->la_mode))
1922                 RETURN(0);
1923
1924         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1925                                       dot, handle);
1926         if (rc != 0)
1927                 RETURN(rc);
1928
1929         rc = mdo_declare_ref_add(env, child, handle);
1930         if (rc != 0)
1931                 RETURN(rc);
1932
1933         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1934                                       dotdot, handle);
1935
1936         RETURN(rc);
1937 }
1938
1939 static int mdd_object_initialize(const struct lu_env *env,
1940                                  const struct lu_fid *pfid,
1941                                  struct mdd_object *child,
1942                                  struct lu_attr *attr,
1943                                  struct thandle *handle)
1944 {
1945         int rc = 0;
1946         ENTRY;
1947
1948         if (S_ISDIR(attr->la_mode)) {
1949                 /* Add "." and ".." for newly created dir */
1950                 mdo_ref_add(env, child, handle);
1951                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1952                                              S_IFDIR, dot, handle);
1953                 if (rc == 0)
1954                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1955                                                      dotdot, handle);
1956                 if (rc != 0)
1957                         mdo_ref_del(env, child, handle);
1958         }
1959
1960         RETURN(rc);
1961 }
1962
1963 /**
1964  * This function checks whether it can create a file/dir under the
1965  * directory(@pobj). The directory(@pobj) is not being locked by
1966  * mdd lock.
1967  *
1968  * \param[in] env       execution environment
1969  * \param[in] pobj      the directory to create files
1970  * \param[in] pattr     the attributes of the directory
1971  * \param[in] lname     the name of the created file/dir
1972  * \param[in] cattr     the attributes of the file/dir
1973  * \param[in] spec      create specification
1974  *
1975  * \retval              = 0 it is allowed to create file/dir under
1976  *                      the directory
1977  * \retval              negative error not allowed to create file/dir
1978  *                      under the directory
1979  */
1980 static int mdd_create_sanity_check(const struct lu_env *env,
1981                                    struct md_object *pobj,
1982                                    const struct lu_attr *pattr,
1983                                    const struct lu_name *lname,
1984                                    struct lu_attr *cattr,
1985                                    struct md_op_spec *spec)
1986 {
1987         struct mdd_thread_info *info = mdd_env_info(env);
1988         struct lu_fid     *fid       = &info->mti_fid;
1989         struct mdd_object *obj       = md2mdd_obj(pobj);
1990         struct mdd_device *m         = mdo2mdd(pobj);
1991         bool            check_perm = true;
1992         int rc;
1993         ENTRY;
1994
1995         /* EEXIST check */
1996         if (mdd_is_dead_obj(obj))
1997                 RETURN(-ENOENT);
1998
1999         /*
2000          * In some cases this lookup is not needed - we know before if name
2001          * exists or not because MDT performs lookup for it.
2002          * name length check is done in lookup.
2003          */
2004         if (spec->sp_cr_lookup) {
2005                 /*
2006                  * Check if the name already exist, though it will be checked in
2007                  * _index_insert also, for avoiding rolling back if exists
2008                  * _index_insert.
2009                  */
2010                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2011                                   MAY_WRITE | MAY_EXEC);
2012                 if (rc != -ENOENT)
2013                         RETURN(rc ? : -EEXIST);
2014
2015                 /* Permission is already being checked in mdd_lookup */
2016                 check_perm = false;
2017         }
2018
2019         if (S_ISDIR(cattr->la_mode) &&
2020             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2021             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2022                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2023
2024                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2025                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2026                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2027                         rc = -EINVAL;
2028                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2029                                "stripe_offset = %d, stripe_count = %u: "
2030                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2031                                 le32_to_cpu(lum->lum_magic),
2032                                (int)le32_to_cpu(lum->lum_stripe_offset),
2033                                le32_to_cpu(lum->lum_stripe_count), rc);
2034                         return rc;
2035                 }
2036         }
2037
2038         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2039         if (rc != 0)
2040                 RETURN(rc);
2041
2042         /* sgid check */
2043         if (pattr->la_mode & S_ISGID) {
2044                 cattr->la_gid = pattr->la_gid;
2045                 if (S_ISDIR(cattr->la_mode)) {
2046                         cattr->la_mode |= S_ISGID;
2047                         cattr->la_valid |= LA_MODE;
2048                 }
2049         }
2050
2051         /* Inherit project ID from parent directory */
2052         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2053                 cattr->la_projid = pattr->la_projid;
2054                 if (S_ISDIR(cattr->la_mode)) {
2055                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2056                         cattr->la_valid |= LA_FLAGS;
2057                 }
2058                 cattr->la_valid |= LA_PROJID;
2059         }
2060
2061         rc = mdd_name_check(m, lname);
2062         if (rc < 0)
2063                 RETURN(rc);
2064
2065         switch (cattr->la_mode & S_IFMT) {
2066         case S_IFLNK: {
2067                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2068
2069                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2070                         RETURN(-ENAMETOOLONG);
2071                 else
2072                         RETURN(0);
2073         }
2074         case S_IFDIR:
2075         case S_IFREG:
2076         case S_IFCHR:
2077         case S_IFBLK:
2078         case S_IFIFO:
2079         case S_IFSOCK:
2080                 rc = 0;
2081                 break;
2082         default:
2083                 rc = -EINVAL;
2084                 break;
2085         }
2086         RETURN(rc);
2087 }
2088
2089 static int mdd_declare_create_object(const struct lu_env *env,
2090                                      struct mdd_device *mdd,
2091                                      struct mdd_object *p, struct mdd_object *c,
2092                                      struct lu_attr *attr,
2093                                      struct thandle *handle,
2094                                      const struct md_op_spec *spec,
2095                                      struct lu_buf *def_acl_buf,
2096                                      struct lu_buf *acl_buf,
2097                                      struct dt_allocation_hint *hint)
2098 {
2099         const struct lu_buf *buf;
2100         int rc;
2101
2102         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2103                                                 hint);
2104         if (rc)
2105                 GOTO(out, rc);
2106
2107 #ifdef CONFIG_FS_POSIX_ACL
2108         if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2109                 /* if dir, then can inherit default ACl */
2110                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2111                                            XATTR_NAME_ACL_DEFAULT,
2112                                            0, handle);
2113                 if (rc)
2114                         GOTO(out, rc);
2115         }
2116
2117         if (acl_buf && acl_buf->lb_len > 0) {
2118                 rc = mdo_declare_attr_set(env, c, attr, handle);
2119                 if (rc)
2120                         GOTO(out, rc);
2121
2122                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2123                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2124                 if (rc)
2125                         GOTO(out, rc);
2126         }
2127 #endif
2128         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2129         if (rc)
2130                 GOTO(out, rc);
2131
2132         /* replay case, create LOV EA from client data */
2133         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2134             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2135                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2136                                         spec->u.sp_ea.eadatalen);
2137                 rc = mdo_declare_xattr_set(env, c, buf,
2138                                            S_ISDIR(attr->la_mode) ?
2139                                                 XATTR_NAME_LMV : XATTR_NAME_LOV,
2140                                            0, handle);
2141                 if (rc)
2142                         GOTO(out, rc);
2143         }
2144
2145         if (S_ISLNK(attr->la_mode)) {
2146                 const char *target_name = spec->u.sp_symname;
2147                 int sym_len = strlen(target_name);
2148                 const struct lu_buf *buf;
2149
2150                 buf = mdd_buf_get_const(env, target_name, sym_len);
2151                 rc = dt_declare_record_write(env, mdd_object_child(c),
2152                                              buf, 0, handle);
2153                 if (rc)
2154                         GOTO(out, rc);
2155         }
2156
2157         if (spec->sp_cr_file_secctx_name != NULL) {
2158                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2159                                         spec->sp_cr_file_secctx_size);
2160                 rc = mdo_declare_xattr_set(env, c, buf,
2161                                            spec->sp_cr_file_secctx_name, 0,
2162                                            handle);
2163                 if (rc < 0)
2164                         GOTO(out, rc);
2165         }
2166 out:
2167         return rc;
2168 }
2169
2170 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2171                               struct mdd_object *p, struct mdd_object *c,
2172                               const struct lu_name *name,
2173                               struct lu_attr *attr,
2174                               struct thandle *handle,
2175                               const struct md_op_spec *spec,
2176                               struct linkea_data *ldata,
2177                               struct lu_buf *def_acl_buf,
2178                               struct lu_buf *acl_buf,
2179                               struct dt_allocation_hint *hint)
2180 {
2181         int rc;
2182
2183         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2184                                        def_acl_buf, acl_buf, hint);
2185         if (rc)
2186                 GOTO(out, rc);
2187
2188         if (S_ISDIR(attr->la_mode)) {
2189                 rc = mdo_declare_ref_add(env, p, handle);
2190                 if (rc)
2191                         GOTO(out, rc);
2192         }
2193
2194         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2195                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2196                 if (rc)
2197                         GOTO(out, rc);
2198         } else {
2199                 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
2200                 enum changelog_rec_type type;
2201
2202                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2203                                               name->ln_name, handle);
2204                 if (rc != 0)
2205                         return rc;
2206
2207                 rc = mdd_declare_links_add(env, c, handle, ldata);
2208                 if (rc)
2209                         return rc;
2210
2211                 *la = *attr;
2212                 la->la_valid = LA_CTIME | LA_MTIME;
2213                 rc = mdo_declare_attr_set(env, p, la, handle);
2214                 if (rc)
2215                         return rc;
2216
2217                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2218                        S_ISREG(attr->la_mode) ? CL_CREATE :
2219                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2220
2221                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2222                                                  handle);
2223                 if (rc)
2224                         return rc;
2225         }
2226 out:
2227         return rc;
2228 }
2229
2230 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2231                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2232                         struct lu_buf *acl_buf)
2233 {
2234         int     rc;
2235         ENTRY;
2236
2237         if (S_ISLNK(la->la_mode)) {
2238                 acl_buf->lb_len = 0;
2239                 def_acl_buf->lb_len = 0;
2240                 RETURN(0);
2241         }
2242
2243         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2244         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2245                            XATTR_NAME_ACL_DEFAULT);
2246         mdd_read_unlock(env, pobj);
2247         if (rc > 0) {
2248                 /* If there are default ACL, fix mode/ACL by default ACL */
2249                 def_acl_buf->lb_len = rc;
2250                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2251                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2252                 acl_buf->lb_len = rc;
2253                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2254                 if (rc < 0)
2255                         RETURN(rc);
2256         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2257                 /* If there are no default ACL, fix mode by mask */
2258                 struct lu_ucred *uc = lu_ucred(env);
2259
2260                 /* The create triggered by MDT internal events, such as
2261                  * LFSCK reset, will not contain valid "uc". */
2262                 if (unlikely(uc != NULL))
2263                         la->la_mode &= ~uc->uc_umask;
2264                 rc = 0;
2265                 acl_buf->lb_len = 0;
2266                 def_acl_buf->lb_len = 0;
2267         }
2268
2269         RETURN(rc);
2270 }
2271
2272 /**
2273  * Create a metadata object and initialize it, set acl, xattr.
2274  **/
2275 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2276                              struct mdd_object *son, struct lu_attr *attr,
2277                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2278                              struct lu_buf *def_acl_buf,
2279                              struct dt_allocation_hint *hint,
2280                              struct thandle *handle)
2281 {
2282         const struct lu_buf *buf;
2283         int rc;
2284
2285         mdd_write_lock(env, son, MOR_TGT_CHILD);
2286         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2287                                         hint);
2288         if (rc)
2289                 GOTO(unlock, rc);
2290
2291         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2292          * created in declare phase, they also needs to be added to master
2293          * object as sub-directory entry. So it has to initialize the master
2294          * object, then set dir striped EA.(in mdo_xattr_set) */
2295         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
2296         if (rc != 0)
2297                 GOTO(err_destroy, rc);
2298
2299         /*
2300          * in case of replay we just set LOVEA provided by the client
2301          * XXX: I think it would be interesting to try "old" way where
2302          *      MDT calls this xattr_set(LOV) in a different transaction.
2303          *      probably this way we code can be made better.
2304          */
2305
2306         /* During creation, there are only a few cases we need do xattr_set to
2307          * create stripes.
2308          * 1. regular file: see comments above.
2309          * 2. dir: inherit default striping or pool settings from parent.
2310          * 3. create striped directory with provided stripeEA.
2311          * 4. create striped directory because inherit default layout from the
2312          * parent.
2313          */
2314         if (spec->no_create ||
2315             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2316             S_ISDIR(attr->la_mode)) {
2317                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2318                                         spec->u.sp_ea.eadatalen);
2319                 rc = mdo_xattr_set(env, son, buf,
2320                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2321                                                             XATTR_NAME_LOV,
2322                                    0, handle);
2323                 if (rc != 0)
2324                         GOTO(err_destroy, rc);
2325         }
2326
2327 #ifdef CONFIG_FS_POSIX_ACL
2328         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2329             S_ISDIR(attr->la_mode)) {
2330                 /* set default acl */
2331                 rc = mdo_xattr_set(env, son, def_acl_buf,
2332                                    XATTR_NAME_ACL_DEFAULT, 0,
2333                                    handle);
2334                 if (rc)
2335                         GOTO(err_destroy, rc);
2336         }
2337         /* set its own acl */
2338         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2339                 rc = mdo_xattr_set(env, son, acl_buf,
2340                                    XATTR_NAME_ACL_ACCESS,
2341                                    0, handle);
2342                 if (rc)
2343                         GOTO(err_destroy, rc);
2344         }
2345 #endif
2346
2347         if (S_ISLNK(attr->la_mode)) {
2348                 struct lu_ucred  *uc = lu_ucred_assert(env);
2349                 struct dt_object *dt = mdd_object_child(son);
2350                 const char *target_name = spec->u.sp_symname;
2351                 int sym_len = strlen(target_name);
2352                 loff_t pos = 0;
2353
2354                 buf = mdd_buf_get_const(env, target_name, sym_len);
2355                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2356                                                 uc->uc_cap &
2357                                                 CFS_CAP_SYS_RESOURCE_MASK);
2358
2359                 if (rc == sym_len)
2360                         rc = 0;
2361                 else
2362                         GOTO(err_initlized, rc = -EFAULT);
2363         }
2364
2365         if (spec->sp_cr_file_secctx_name != NULL) {
2366                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2367                                         spec->sp_cr_file_secctx_size);
2368                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2369                                    0, handle);
2370                 if (rc < 0)
2371                         GOTO(err_initlized, rc);
2372         }
2373
2374 err_initlized:
2375         if (unlikely(rc != 0)) {
2376                 int rc2;
2377                 if (S_ISDIR(attr->la_mode)) {
2378                         /* Drop the reference, no need to delete "."/"..",
2379                          * because the object to be destroied directly. */
2380                         rc2 = mdo_ref_del(env, son, handle);
2381                         if (rc2 != 0)
2382                                 GOTO(unlock, rc);
2383                 }
2384                 rc2 = mdo_ref_del(env, son, handle);
2385                 if (rc2 != 0)
2386                         GOTO(unlock, rc);
2387 err_destroy:
2388                 mdo_destroy(env, son, handle);
2389         }
2390 unlock:
2391         mdd_write_unlock(env, son);
2392         RETURN(rc);
2393 }
2394
2395 static int mdd_index_delete(const struct lu_env *env,
2396                             struct mdd_object *mdd_pobj,
2397                             struct lu_attr *cattr,
2398                             const struct lu_name *lname)
2399 {
2400         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2401         struct thandle *handle;
2402         int rc;
2403         ENTRY;
2404
2405         handle = mdd_trans_create(env, mdd);
2406         if (IS_ERR(handle))
2407                 RETURN(PTR_ERR(handle));
2408
2409         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2410                                       handle);
2411         if (rc != 0)
2412                 GOTO(stop, rc);
2413
2414         if (S_ISDIR(cattr->la_mode)) {
2415                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2416                 if (rc != 0)
2417                         GOTO(stop, rc);
2418         }
2419
2420         /* Since this will only be used in the error handler path,
2421          * Let's set the thandle to be local and not mess the transno */
2422         handle->th_local = 1;
2423         rc = mdd_trans_start(env, mdd, handle);
2424         if (rc)
2425                 GOTO(stop, rc);
2426
2427         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2428                                 S_ISDIR(cattr->la_mode), handle);
2429         if (rc)
2430                 GOTO(stop, rc);
2431 stop:
2432         rc = mdd_trans_stop(env, mdd, rc, handle);
2433
2434         RETURN(rc);
2435 }
2436
2437 /**
2438  * Create object and insert it into namespace.
2439  *
2440  * Two operations have to be performed:
2441  *
2442  *  - an allocation of a new object (->do_create()), and
2443  *  - an insertion into a parent index (->dio_insert()).
2444  *
2445  * Due to locking, operation order is not important, when both are
2446  * successful, *but* error handling cases are quite different:
2447  *
2448  *  - if insertion is done first, and following object creation fails,
2449  *  insertion has to be rolled back, but this operation might fail
2450  *  also leaving us with dangling index entry.
2451  *
2452  *  - if creation is done first, is has to be undone if insertion fails,
2453  *  leaving us with leaked space, which is not good but not fatal.
2454  *
2455  * It seems that creation-first is simplest solution, but it is sub-optimal
2456  * in the frequent
2457  *
2458  * $ mkdir foo
2459  * $ mkdir foo
2460  *
2461  * case, because second mkdir is bound to create object, only to
2462  * destroy it immediately.
2463  *
2464  * To avoid this follow local file systems that do double lookup:
2465  *
2466  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2467  * 1. create            (mdd_create_object_internal())
2468  * 2. insert            (__mdd_index_insert(), lookup again)
2469  *
2470  * \param[in] pobj      parent object
2471  * \param[in] lname     name of child being created
2472  * \param[in,out] child child object being created
2473  * \param[in] spec      additional create parameters
2474  * \param[in] ma        attributes for new child object
2475  *
2476  * \retval              0 on success
2477  * \retval              negative errno on failure
2478  */
2479 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2480                       const struct lu_name *lname, struct md_object *child,
2481                       struct md_op_spec *spec, struct md_attr *ma)
2482 {
2483         struct mdd_thread_info  *info = mdd_env_info(env);
2484         struct lu_attr          *la = &info->mti_la_for_fix;
2485         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2486         struct mdd_object       *son = md2mdd_obj(child);
2487         struct mdd_device       *mdd = mdo2mdd(pobj);
2488         struct lu_attr          *attr = &ma->ma_attr;
2489         struct thandle          *handle;
2490         struct lu_attr          *pattr = &info->mti_pattr;
2491         struct lu_buf           acl_buf;
2492         struct lu_buf           def_acl_buf;
2493         struct linkea_data      *ldata = &info->mti_link_data;
2494         const char              *name = lname->ln_name;
2495         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2496         int                      rc;
2497         int                      rc2;
2498         ENTRY;
2499
2500         rc = mdd_la_get(env, mdd_pobj, pattr);
2501         if (rc != 0)
2502                 RETURN(rc);
2503
2504         /* Sanity checks before big job. */
2505         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2506         if (rc)
2507                 RETURN(rc);
2508
2509         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2510                 GOTO(out_free, rc = -EINPROGRESS);
2511
2512         handle = mdd_trans_create(env, mdd);
2513         if (IS_ERR(handle))
2514                 GOTO(out_free, rc = PTR_ERR(handle));
2515
2516         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2517                                mdd->mdd_dt_conf.ddp_max_ea_size);
2518         acl_buf = info->mti_xattr_buf;
2519         def_acl_buf.lb_buf = info->mti_key;
2520         def_acl_buf.lb_len = sizeof(info->mti_key);
2521         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2522         if (rc < 0)
2523                 GOTO(out_stop, rc);
2524
2525         if (S_ISDIR(attr->la_mode)) {
2526                 struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
2527
2528                 /*
2529                  * migrate may create 1-stripe directory, so lod_ah_init()
2530                  * doesn't adjust stripe count from lmu.
2531                  */
2532                 if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
2533                         info->mti_lmu = *lmu;
2534                         info->mti_lmu.lum_stripe_count = 0;
2535                         spec->u.sp_ea.eadata = &info->mti_lmu;
2536                 }
2537         }
2538
2539         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2540
2541         memset(ldata, 0, sizeof(*ldata));
2542         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2543                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2544
2545                 tfid.f_oid--;
2546                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2547                                         &tfid, lname, 1, 0, ldata);
2548         } else {
2549                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2550                                         mdd_object_fid(mdd_pobj),
2551                                         lname, 1, 0, ldata);
2552         }
2553
2554         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2555                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2556                                 hint);
2557         if (rc)
2558                 GOTO(out_stop, rc);
2559
2560         rc = mdd_trans_start(env, mdd, handle);
2561         if (rc)
2562                 GOTO(out_stop, rc);
2563
2564         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2565                                &def_acl_buf, hint, handle);
2566         if (rc != 0)
2567                 GOTO(out_stop, rc);
2568
2569         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2570                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2571                 son->mod_flags |= VOLATILE_OBJ;
2572                 rc = mdd_orphan_insert(env, son, handle);
2573                 GOTO(out_volatile, rc);
2574         } else {
2575                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2576                                       attr->la_mode, name, handle);
2577                 if (rc != 0)
2578                         GOTO(err_created, rc);
2579
2580                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2581                               ldata, 1);
2582
2583                 /* update parent directory mtime/ctime */
2584                 *la = *attr;
2585                 la->la_valid = LA_CTIME | LA_MTIME;
2586                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2587                 if (rc)
2588                         GOTO(err_insert, rc);
2589         }
2590
2591         EXIT;
2592 err_insert:
2593         if (rc != 0) {
2594                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2595                         rc2 = mdd_orphan_delete(env, son, handle);
2596                 else
2597                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2598                                                  S_ISDIR(attr->la_mode),
2599                                                  handle);
2600                 if (rc2 != 0)
2601                         goto out_stop;
2602
2603 err_created:
2604                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2605                 if (S_ISDIR(attr->la_mode)) {
2606                         /* Drop the reference, no need to delete "."/"..",
2607                          * because the object is to be destroyed directly. */
2608                         rc2 = mdo_ref_del(env, son, handle);
2609                         if (rc2 != 0) {
2610                                 mdd_write_unlock(env, son);
2611                                 goto out_stop;
2612                         }
2613                 }
2614 out_volatile:
2615                 /* For volatile files drop one link immediately, since there is
2616                  * no filename in the namespace, and save any error returned. */
2617                 rc2 = mdo_ref_del(env, son, handle);
2618                 if (rc2 != 0) {
2619                         mdd_write_unlock(env, son);
2620                         if (unlikely(rc == 0))
2621                                 rc = rc2;
2622                         goto out_stop;
2623                 }
2624
2625                 /* Don't destroy the volatile object on success */
2626                 if (likely(rc != 0))
2627                         mdo_destroy(env, son, handle);
2628                 mdd_write_unlock(env, son);
2629         }
2630
2631         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2632             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2633                 rc = mdd_changelog_ns_store(env, mdd,
2634                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2635                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2636                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2637                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2638                                 NULL, handle);
2639 out_stop:
2640         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2641         if (rc == 0) {
2642                 /* If creation fails, it is most likely due to the remote update
2643                  * failure, because local transaction will mostly succeed at
2644                  * this stage. There is no easy way to rollback all of previous
2645                  * updates, so let's remove the object from namespace, and
2646                  * LFSCK should handle the orphan object. */
2647                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2648                         mdd_index_delete(env, mdd_pobj, attr, lname);
2649                 rc = rc2;
2650         }
2651 out_free:
2652         if (is_vmalloc_addr(ldata->ld_buf))
2653                 /* if we vmalloced a large buffer drop it */
2654                 lu_buf_free(ldata->ld_buf);
2655
2656         /* The child object shouldn't be cached anymore */
2657         if (rc)
2658                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2659                         &child->mo_lu.lo_header->loh_flags);
2660         return rc;
2661 }
2662
2663 /*
2664  * Get locks on parents in proper order
2665  * RETURN: < 0 - error, rename_order if successful
2666  */
2667 enum rename_order {
2668         MDD_RN_SAME,
2669         MDD_RN_SRCTGT,
2670         MDD_RN_TGTSRC
2671 };
2672
2673 static int mdd_rename_order(const struct lu_env *env,
2674                             struct mdd_device *mdd,
2675                             struct mdd_object *src_pobj,
2676                             const struct lu_attr *pattr,
2677                             struct mdd_object *tgt_pobj)
2678 {
2679         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2680         int rc;
2681         ENTRY;
2682
2683         if (src_pobj == tgt_pobj)
2684                 RETURN(MDD_RN_SAME);
2685
2686         /* compared the parent child relationship of src_p&tgt_p */
2687         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2688                 rc = MDD_RN_SRCTGT;
2689         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2690                 rc = MDD_RN_TGTSRC;
2691         } else {
2692                 rc = mdd_is_parent(env, mdd, src_pobj, pattr,
2693                                    mdo2fid(tgt_pobj));
2694                 if (rc == -EREMOTE)
2695                         rc = 0;
2696
2697                 if (rc == 1)
2698                         rc = MDD_RN_TGTSRC;
2699                 else if (rc == 0)
2700                         rc = MDD_RN_SRCTGT;
2701         }
2702
2703         RETURN(rc);
2704 }
2705
2706 /* has not mdd_write{read}_lock on any obj yet. */
2707 static int mdd_rename_sanity_check(const struct lu_env *env,
2708                                    struct mdd_object *src_pobj,
2709                                    const struct lu_attr *pattr,
2710                                    struct mdd_object *tgt_pobj,
2711                                    const struct lu_attr *tpattr,
2712                                    struct mdd_object *sobj,
2713                                    const struct lu_attr *cattr,
2714                                    struct mdd_object *tobj,
2715                                    const struct lu_attr *tattr)
2716 {
2717         int rc = 0;
2718         ENTRY;
2719
2720         /* XXX: when get here, sobj must NOT be NULL,
2721          * the other case has been processed in cld_rename
2722          * before mdd_rename and enable MDS_PERM_BYPASS. */
2723         LASSERT(sobj);
2724
2725         /*
2726          * If we are using project inheritance, we only allow renames
2727          * into our tree when the project IDs are the same; otherwise
2728          * tree quota mechanism would be circumvented.
2729          */
2730         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2731             tpattr->la_projid != cattr->la_projid) ||
2732             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2733             (pattr->la_projid != tpattr->la_projid)))
2734                 RETURN(-EXDEV);
2735
2736         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2737         if (rc)
2738                 RETURN(rc);
2739
2740         /* XXX: when get here, "tobj == NULL" means tobj must
2741          * NOT exist (neither on remote MDS, such case has been
2742          * processed in cld_rename before mdd_rename and enable
2743          * MDS_PERM_BYPASS).
2744          * So check may_create, but not check may_unlink. */
2745         if (tobj == NULL)
2746                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2747                                     (src_pobj != tgt_pobj));
2748         else
2749                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2750                                     (src_pobj != tgt_pobj), 1);
2751
2752         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2753                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2754
2755         RETURN(rc);
2756 }
2757
2758 static int mdd_declare_rename(const struct lu_env *env,
2759                               struct mdd_device *mdd,
2760                               struct mdd_object *mdd_spobj,
2761                               struct mdd_object *mdd_tpobj,
2762                               struct mdd_object *mdd_sobj,
2763                               struct mdd_object *mdd_tobj,
2764                               const struct lu_name *sname,
2765                               const struct lu_name *tname,
2766                               struct md_attr *ma,
2767                               struct linkea_data *ldata,
2768                               struct thandle *handle)
2769 {
2770         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2771         int rc;
2772
2773         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2774         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2775
2776         LASSERT(mdd_spobj);
2777         LASSERT(mdd_tpobj);
2778         LASSERT(mdd_sobj);
2779
2780         /* name from source dir */
2781         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2782         if (rc)
2783                 return rc;
2784
2785         /* .. from source child */
2786         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2787                 /* source child can be directory,
2788                  * counted by source dir's nlink */
2789                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2790                 if (rc)
2791                         return rc;
2792                 if (mdd_spobj != mdd_tpobj) {
2793                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2794                                                       handle);
2795                         if (rc != 0)
2796                                 return rc;
2797
2798                         rc = mdo_declare_index_insert(env, mdd_sobj,
2799                                                       mdo2fid(mdd_tpobj),
2800                                                       S_IFDIR, dotdot, handle);
2801                         if (rc != 0)
2802                                 return rc;
2803                 }
2804
2805                 /* new target child can be directory,
2806                  * counted by target dir's nlink */
2807                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2808                 if (rc != 0)
2809                         return rc;
2810         }
2811
2812         la->la_valid = LA_CTIME | LA_MTIME;
2813         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2814         if (rc != 0)
2815                 return rc;
2816
2817         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2818         if (rc != 0)
2819                 return rc;
2820
2821         la->la_valid = LA_CTIME;
2822         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2823         if (rc)
2824                 return rc;
2825
2826         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2827         if (rc)
2828                 return rc;
2829
2830         /* new name */
2831         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2832                                       mdd_object_type(mdd_sobj),
2833                                       tname->ln_name, handle);
2834         if (rc != 0)
2835                 return rc;
2836
2837         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2838                 /* delete target child in target parent directory */
2839                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2840                                               handle);
2841                 if (rc)
2842                         return rc;
2843
2844                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2845                 if (rc)
2846                         return rc;
2847
2848                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2849                         /* target child can be directory,
2850                          * delete "." reference in target child directory */
2851                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2852                         if (rc)
2853                                 return rc;
2854
2855                         /* delete ".." reference in target parent directory */
2856                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2857                         if (rc)
2858                                 return rc;
2859                 }
2860
2861                 la->la_valid = LA_CTIME;
2862                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2863                 if (rc)
2864                         return rc;
2865
2866                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2867                 if (rc)
2868                         return rc;
2869         }
2870
2871         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2872                                          handle);
2873         if (rc)
2874                 return rc;
2875
2876         return rc;
2877 }
2878
2879 /* src object can be remote that is why we use only fid and type of object */
2880 static int mdd_rename(const struct lu_env *env,
2881                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2882                       const struct lu_fid *lf, const struct lu_name *lsname,
2883                       struct md_object *tobj, const struct lu_name *ltname,
2884                       struct md_attr *ma)
2885 {
2886         const char *sname = lsname->ln_name;
2887         const char *tname = ltname->ln_name;
2888         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2889         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2890         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2891         struct mdd_device *mdd = mdo2mdd(src_pobj);
2892         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2893         struct mdd_object *mdd_tobj = NULL;
2894         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2895         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2896         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2897         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2898         struct thandle *handle;
2899         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2900         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2901         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2902         bool is_dir;
2903         bool tobj_ref = 0;
2904         bool tobj_locked = 0;
2905         unsigned cl_flags = 0;
2906         int rc, rc2;
2907         ENTRY;
2908
2909         /* let unlink to complete and commit */
2910         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
2911
2912         if (tobj)
2913                 mdd_tobj = md2mdd_obj(tobj);
2914
2915         mdd_sobj = mdd_object_find(env, mdd, lf);
2916         if (IS_ERR(mdd_sobj))
2917                 RETURN(PTR_ERR(mdd_sobj));
2918
2919         rc = mdd_la_get(env, mdd_sobj, cattr);
2920         if (rc)
2921                 GOTO(out_pending, rc);
2922
2923         rc = mdd_la_get(env, mdd_spobj, pattr);
2924         if (rc)
2925                 GOTO(out_pending, rc);
2926
2927         if (mdd_tobj) {
2928                 rc = mdd_la_get(env, mdd_tobj, tattr);
2929                 if (rc)
2930                         GOTO(out_pending, rc);
2931                 /* search for an existing archive.
2932                  * we should check ahead as the object
2933                  * can be destroyed in this transaction */
2934                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2935                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2936         }
2937
2938         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2939         if (rc)
2940                 GOTO(out_pending, rc);
2941
2942         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2943                                      mdd_sobj, cattr, mdd_tobj, tattr);
2944         if (rc)
2945                 GOTO(out_pending, rc);
2946
2947         rc = mdd_name_check(mdd, ltname);
2948         if (rc < 0)
2949                 GOTO(out_pending, rc);
2950
2951         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2952         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2953         if (rc < 0)
2954                 GOTO(out_pending, rc);
2955
2956         handle = mdd_trans_create(env, mdd);
2957         if (IS_ERR(handle))
2958                 GOTO(out_pending, rc = PTR_ERR(handle));
2959
2960         memset(ldata, 0, sizeof(*ldata));
2961         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2962                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2963                                 1, 0, ldata);
2964         if (rc)
2965                 GOTO(stop, rc);
2966
2967         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2968                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2969         if (rc)
2970                 GOTO(stop, rc);
2971
2972         rc = mdd_trans_start(env, mdd, handle);
2973         if (rc)
2974                 GOTO(stop, rc);
2975
2976         is_dir = S_ISDIR(cattr->la_mode);
2977
2978         /* Remove source name from source directory */
2979         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2980         if (rc != 0)
2981                 GOTO(stop, rc);
2982
2983         /* "mv dir1 dir2" needs "dir1/.." link update */
2984         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2985                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2986                 if (rc != 0)
2987                         GOTO(fixup_spobj2, rc);
2988
2989                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2990                                              dotdot, handle);
2991                 if (rc != 0)
2992                         GOTO(fixup_spobj, rc);
2993         }
2994
2995         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2996                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2997                 if (rc != 0)
2998                         /* tname might been renamed to something else */
2999                         GOTO(fixup_spobj, rc);
3000         }
3001
3002         /* Insert new fid with target name into target dir */
3003         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
3004                                 tname, handle);
3005         if (rc != 0)
3006                 GOTO(fixup_tpobj, rc);
3007
3008         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3009         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3010
3011         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3012         la->la_valid = LA_CTIME;
3013         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3014         if (rc)
3015                 GOTO(fixup_tpobj, rc);
3016
3017         /* Update the linkEA for the source object */
3018         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3019         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3020                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3021                               0, 0);
3022         if (rc == -ENOENT)
3023                 /* Old files might not have EA entry */
3024                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3025                               lsname, handle, NULL, 0);
3026         mdd_write_unlock(env, mdd_sobj);
3027         /* We don't fail the transaction if the link ea can't be
3028            updated -- fid2path will use alternate lookup method. */
3029         rc = 0;
3030
3031         /* Remove old target object
3032          * For tobj is remote case cmm layer has processed
3033          * and set tobj to NULL then. So when tobj is NOT NULL,
3034          * it must be local one.
3035          */
3036         if (tobj && mdd_object_exists(mdd_tobj)) {
3037                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3038                 tobj_locked = 1;
3039                 if (mdd_is_dead_obj(mdd_tobj)) {
3040                         /* shld not be dead, something is wrong */
3041                         CERROR("tobj is dead, something is wrong\n");
3042                         rc = -EINVAL;
3043                         goto cleanup;
3044                 }
3045                 mdo_ref_del(env, mdd_tobj, handle);
3046
3047                 /* Remove dot reference. */
3048                 if (S_ISDIR(tattr->la_mode))
3049                         mdo_ref_del(env, mdd_tobj, handle);
3050                 tobj_ref = 1;
3051
3052                 /* fetch updated nlink */
3053                 rc = mdd_la_get(env, mdd_tobj, tattr);
3054                 if (rc != 0) {
3055                         CERROR("%s: Failed to get nlink for tobj "
3056                                 DFID": rc = %d\n",
3057                                 mdd2obd_dev(mdd)->obd_name,
3058                                 PFID(tpobj_fid), rc);
3059                         GOTO(fixup_tpobj, rc);
3060                 }
3061
3062                 la->la_valid = LA_CTIME;
3063                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3064                 if (rc != 0) {
3065                         CERROR("%s: Failed to set ctime for tobj "
3066                                 DFID": rc = %d\n",
3067                                 mdd2obd_dev(mdd)->obd_name,
3068                                 PFID(tpobj_fid), rc);
3069                         GOTO(fixup_tpobj, rc);
3070                 }
3071
3072                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3073                 ma->ma_attr = *tattr;
3074                 ma->ma_valid |= MA_INODE;
3075                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3076                                        handle);
3077                 if (rc != 0) {
3078                         CERROR("%s: Failed to unlink tobj "
3079                                 DFID": rc = %d\n",
3080                                 mdd2obd_dev(mdd)->obd_name,
3081                                 PFID(tpobj_fid), rc);
3082                         GOTO(fixup_tpobj, rc);
3083                 }
3084
3085                 /* fetch updated nlink */
3086                 rc = mdd_la_get(env, mdd_tobj, tattr);
3087                 if (rc == -ENOENT) {
3088                         /* the object got removed, let's
3089                          * return the latest known attributes */
3090                         tattr->la_nlink = 0;
3091                         rc = 0;
3092                 } else if (rc != 0) {
3093                         CERROR("%s: Failed to get nlink for tobj "
3094                                 DFID": rc = %d\n",
3095                                 mdd2obd_dev(mdd)->obd_name,
3096                                 PFID(tpobj_fid), rc);
3097                         GOTO(fixup_tpobj, rc);
3098                 }
3099                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3100                 ma->ma_attr = *tattr;
3101                 ma->ma_valid |= MA_INODE;
3102
3103                 if (tattr->la_nlink == 0)
3104                         cl_flags |= CLF_RENAME_LAST;
3105                 else
3106                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3107         }
3108
3109         la->la_valid = LA_CTIME | LA_MTIME;
3110         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3111         if (rc)
3112                 GOTO(fixup_tpobj, rc);
3113
3114         if (mdd_spobj != mdd_tpobj) {
3115                 la->la_valid = LA_CTIME | LA_MTIME;
3116                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3117                 if (rc != 0)
3118                         GOTO(fixup_tpobj, rc);
3119         }
3120
3121         EXIT;
3122
3123 fixup_tpobj:
3124         if (rc) {
3125                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3126                 if (rc2)
3127                         CWARN("tp obj fix error %d\n",rc2);
3128
3129                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3130                     !mdd_is_dead_obj(mdd_tobj)) {
3131                         if (tobj_ref) {
3132                                 mdo_ref_add(env, mdd_tobj, handle);
3133                                 if (is_dir)
3134                                         mdo_ref_add(env, mdd_tobj, handle);
3135                         }
3136
3137                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3138                                                   mdo2fid(mdd_tobj),
3139                                                   mdd_object_type(mdd_tobj),
3140                                                   tname, handle);
3141                         if (rc2 != 0)
3142                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3143                 }
3144         }
3145
3146 fixup_spobj:
3147         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3148                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3149                 if (rc2)
3150                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3151                                mdd2obd_dev(mdd)->obd_name, rc2);
3152
3153
3154                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3155                                               dotdot, handle);
3156                 if (rc2 != 0)
3157                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3158                               mdd2obd_dev(mdd)->obd_name, rc2);
3159         }
3160
3161 fixup_spobj2:
3162         if (rc != 0) {
3163                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3164                                          mdd_object_type(mdd_sobj), sname,
3165                                          handle);
3166                 if (rc2 != 0)
3167                         CWARN("sp obj fix error: rc = %d\n", rc2);
3168         }
3169
3170 cleanup:
3171         if (tobj_locked)
3172                 mdd_write_unlock(env, mdd_tobj);
3173
3174         if (rc == 0)
3175                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3176                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3177                                             ltname, lsname, handle);
3178
3179 stop:
3180         rc = mdd_trans_stop(env, mdd, rc, handle);
3181
3182 out_pending:
3183         mdd_object_put(env, mdd_sobj);
3184         return rc;
3185 }
3186
3187 /**
3188  * Check whether we should migrate the file/dir
3189  * return val
3190  *      < 0  permission check failed or other error.
3191  *      = 0  the file can be migrated.
3192  **/
3193 static int mdd_migrate_sanity_check(const struct lu_env *env,
3194                                     struct mdd_device *mdd,
3195                                     struct mdd_object *spobj,
3196                                     struct mdd_object *tpobj,
3197                                     struct mdd_object *sobj,
3198                                     struct mdd_object *tobj,
3199                                     const struct lu_attr *spattr,
3200                                     const struct lu_attr *tpattr,
3201                                     const struct lu_attr *attr)
3202 {
3203         int rc;
3204
3205         ENTRY;
3206
3207         if (!mdd_object_remote(sobj)) {
3208                 mdd_read_lock(env, sobj, MOR_SRC_CHILD);
3209                 if (sobj->mod_count > 0) {
3210                         CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
3211                                mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
3212                                sobj->mod_count);
3213                         mdd_read_unlock(env, sobj);
3214                         RETURN(-EBUSY);
3215                 }
3216                 mdd_read_unlock(env, sobj);
3217         }
3218
3219         if (mdd_object_exists(tobj))
3220                 RETURN(-EEXIST);
3221
3222         rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
3223                                      attr, NULL, NULL);
3224         RETURN(rc);
3225 }
3226
3227 typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
3228                                  struct mdd_object *obj,
3229                                  struct mdd_object *stripe,
3230                                  const struct lu_buf *lmv_buf,
3231                                  const struct lu_buf *lmu_buf,
3232                                  int index,
3233                                  struct thandle *handle);
3234
3235 static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
3236                                          struct mdd_object *obj,
3237                                          struct mdd_object *stripe,
3238                                          const struct lu_buf *lmv_buf,
3239                                          const struct lu_buf *lmu_buf,
3240                                          int index,
3241                                          struct thandle *handle)
3242 {
3243         struct mdd_thread_info *info = mdd_env_info(env);
3244         char *stripe_name = info->mti_name;
3245         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3246         int rc;
3247
3248         if (index < le32_to_cpu(lmu->lum_stripe_count))
3249                 return 0;
3250
3251         rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
3252         if (rc)
3253                 return rc;
3254
3255         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3256                  PFID(mdd_object_fid(stripe)), index);
3257
3258         rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
3259         if (rc)
3260                 return rc;
3261
3262         rc = mdo_declare_ref_del(env, obj, handle);
3263
3264         return rc;
3265 }
3266
3267 /* delete stripe from its master object namespace */
3268 static int mdd_dir_delete_stripe(const struct lu_env *env,
3269                                  struct mdd_object *obj,
3270                                  struct mdd_object *stripe,
3271                                  const struct lu_buf *lmv_buf,
3272                                  const struct lu_buf *lmu_buf,
3273                                  int index,
3274                                  struct thandle *handle)
3275 {
3276         struct mdd_thread_info *info = mdd_env_info(env);
3277         char *stripe_name = info->mti_name;
3278         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3279         struct lmv_user_md *lmu = lmu_buf->lb_buf;
3280         __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
3281         int rc;
3282
3283         ENTRY;
3284
3285         /* local dir will delete via LOD */
3286         LASSERT(mdd_object_remote(obj));
3287         LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
3288
3289         if (index < del_offset)
3290                 RETURN(0);
3291
3292         mdd_write_lock(env, stripe, MOR_SRC_CHILD);
3293         rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
3294         if (rc)
3295                 GOTO(out, rc);
3296
3297         snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
3298                  PFID(mdd_object_fid(stripe)), index);
3299
3300         rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
3301         if (rc)
3302                 GOTO(out, rc);
3303
3304         rc = mdo_ref_del(env, obj, handle);
3305         GOTO(out, rc);
3306 out:
3307         mdd_write_unlock(env, stripe);
3308
3309         return rc;
3310 }
3311
3312 /*
3313  * iterate stripes of striped directory on remote MDT, local striped directory
3314  * is accessed via LOD.
3315  */
3316 static int mdd_dir_iterate_stripes(const struct lu_env *env,
3317                                    struct mdd_object *obj,
3318                                    const struct lu_buf *lmv_buf,
3319                                    const struct lu_buf *lmu_buf,
3320                                    struct thandle *handle,
3321                                    mdd_dir_stripe_cb cb)
3322 {
3323         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
3324         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3325         struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
3326         struct mdd_object *stripe;
3327         int i;
3328         int rc;
3329
3330         ENTRY;
3331
3332         LASSERT(lmv);
3333
3334         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
3335                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
3336                 stripe = mdd_object_find(env, mdd, fid);
3337                 if (IS_ERR(stripe))
3338                         RETURN(PTR_ERR(stripe));
3339
3340                 rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
3341                 mdd_object_put(env, stripe);
3342                 if (rc)
3343                         RETURN(rc);
3344         }
3345
3346         RETURN(0);
3347 }
3348
3349 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
3350                             struct mdd_object *obj,
3351                             const struct lu_buf *buf,
3352                             const char *name,
3353                             int fl, struct thandle *handle);
3354
3355 /* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
3356 static int mdd_iterate_xattrs(const struct lu_env *env,
3357                               struct mdd_object *sobj,
3358                               struct mdd_object *tobj,
3359                               bool skip_linkea,
3360                               struct thandle *handle,
3361                               mdd_xattr_cb cb)
3362 {
3363         struct mdd_thread_info *info = mdd_env_info(env);
3364         char *xname;
3365         struct lu_buf list_xbuf;
3366         struct lu_buf xbuf = { NULL };
3367         int list_xsize;
3368         int xlen;
3369         int rem;
3370         int xsize;
3371         int rc;
3372
3373         ENTRY;
3374
3375         /* retrieve xattr list from the old object */
3376         list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
3377         if (list_xsize == -ENODATA)
3378                 RETURN(0);
3379
3380         if (list_xsize < 0)
3381                 RETURN(list_xsize);
3382
3383         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3384         if (info->mti_big_buf.lb_buf == NULL)
3385                 RETURN(-ENOMEM);
3386
3387         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3388         list_xbuf.lb_len = list_xsize;
3389         rc = mdo_xattr_list(env, sobj, &list_xbuf);
3390         if (rc < 0)
3391                 RETURN(rc);
3392
3393         rem = rc;
3394         rc = 0;
3395         xname = list_xbuf.lb_buf;
3396         while (rem > 0) {
3397                 xlen = strnlen(xname, rem - 1) + 1;
3398                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3399                     strcmp(XATTR_NAME_LMV, xname) == 0)
3400                         goto next;
3401
3402                 if (skip_linkea &&
3403                     strcmp(XATTR_NAME_LINK, xname) == 0)
3404                         goto next;
3405
3406                 xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
3407                 if (xsize == -ENODATA)
3408                         goto next;
3409                 if (xsize < 0)
3410                         GOTO(out, rc = xsize);
3411
3412                 lu_buf_check_and_alloc(&xbuf, xsize);
3413                 if (xbuf.lb_buf == NULL)
3414                         GOTO(out, rc = -ENOMEM);
3415
3416                 rc = mdo_xattr_get(env, sobj, &xbuf, xname);
3417                 if (rc == -ENODATA)
3418                         goto next;
3419                 if (rc < 0)
3420                         GOTO(out, rc);
3421
3422 repeat:
3423                 rc = cb(env, tobj, &xbuf, xname, 0, handle);
3424                 if (unlikely(rc == -ENOSPC &&
3425                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3426                         rc = linkea_overflow_shrink(
3427                                         (struct linkea_data *)(xbuf.lb_buf));
3428                         if (likely(rc > 0)) {
3429                                 xbuf.lb_len = rc;
3430                                 goto repeat;
3431                         }
3432                 }
3433
3434                 if (rc)
3435                         GOTO(out, rc);
3436 next:
3437                 xname += xlen;
3438                 rem -= xlen;
3439         }
3440
3441 out:
3442         lu_buf_free(&xbuf);
3443         RETURN(rc);
3444 }
3445
3446 typedef int (*mdd_linkea_cb)(const struct lu_env *env,
3447                              struct mdd_object *sobj,
3448                              struct mdd_object *tobj,
3449                              const struct lu_name *sname,
3450                              const struct lu_fid *sfid,
3451                              const struct lu_name *lname,
3452                              const struct lu_fid *fid,
3453                              void *opaque,
3454                              struct thandle *handle);
3455
3456 static int mdd_declare_update_link(const struct lu_env *env,
3457                                    struct mdd_object *sobj,
3458                                    struct mdd_object *tobj,
3459                                    const struct lu_name *tname,
3460                                    const struct lu_fid *tpfid,
3461                                    const struct lu_name *lname,
3462                                    const struct lu_fid *fid,
3463                                    void *unused,
3464                                    struct thandle *handle)
3465 {
3466         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3467         struct mdd_object *pobj;
3468         int rc;
3469
3470         /* ignore tobj */
3471         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3472             !strcmp(tname->ln_name, lname->ln_name))
3473                 return 0;
3474
3475         pobj = mdd_object_find(env, mdd, fid);
3476         if (IS_ERR(pobj))
3477                 return PTR_ERR(pobj);
3478
3479
3480         rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
3481         if (!rc)
3482                 rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
3483                                               mdd_object_type(sobj),
3484                                               lname->ln_name, handle);
3485         mdd_object_put(env, pobj);
3486         if (rc)
3487                 return rc;
3488
3489         rc = mdo_declare_ref_add(env, tobj, handle);
3490         if (rc)
3491                 return rc;
3492
3493         rc = mdo_declare_ref_del(env, sobj, handle);
3494         return rc;
3495 }
3496
3497 static int mdd_update_link(const struct lu_env *env,
3498                            struct mdd_object *sobj,
3499                            struct mdd_object *tobj,
3500                            const struct lu_name *tname,
3501                            const struct lu_fid *tpfid,
3502                            const struct lu_name *lname,
3503                            const struct lu_fid *fid,
3504                            void *unused,
3505                            struct thandle *handle)
3506 {
3507         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3508         struct mdd_object *pobj;
3509         int rc;
3510
3511         ENTRY;
3512
3513         LASSERT(lu_name_is_valid(lname));
3514
3515         /* ignore tobj */
3516         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3517             !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
3518                 RETURN(0);
3519
3520         CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
3521                PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
3522
3523         pobj = mdd_object_find(env, mdd, fid);
3524         if (IS_ERR(pobj)) {
3525                 CWARN("%s: cannot find obj "DFID": %ld\n",
3526                       mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
3527                 RETURN(PTR_ERR(pobj));
3528         }
3529
3530         if (!mdd_object_exists(pobj)) {
3531                 CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
3532                 mdd_object_put(env, pobj);
3533                 RETURN(-ENOENT);
3534         }
3535
3536         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
3537         rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
3538         if (!rc)
3539                 rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
3540                                              mdd_object_type(sobj),
3541                                              lname->ln_name, handle);
3542         mdd_write_unlock(env, pobj);
3543         mdd_object_put(env, pobj);
3544         if (rc)
3545                 RETURN(rc);
3546
3547         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
3548         rc = mdo_ref_add(env, tobj, handle);
3549         mdd_write_unlock(env, tobj);
3550         if (rc)
3551                 RETURN(rc);
3552
3553         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
3554         rc = mdo_ref_del(env, sobj, handle);
3555         mdd_write_unlock(env, sobj);
3556
3557         RETURN(rc);
3558 }
3559
3560 static inline int mdd_fld_lookup(const struct lu_env *env,
3561                                  struct mdd_device *mdd,
3562                                  const struct lu_fid *fid,
3563                                  __u32 *mdt_index)
3564 {
3565         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
3566         struct seq_server_site *ss;
3567         int rc;
3568
3569         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
3570
3571         range->lsr_flags = LU_SEQ_RANGE_MDT;
3572         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
3573         if (rc)
3574                 return rc;
3575
3576         *mdt_index = range->lsr_index;
3577
3578         return 0;
3579 }
3580
3581 static int mdd_is_link_on_source_mdt(const struct lu_env *env,
3582                                      struct mdd_object *sobj,
3583                                      struct mdd_object *tobj,
3584                                      const struct lu_name *tname,
3585                                      const struct lu_fid *tpfid,
3586                                      const struct lu_name *lname,
3587                                      const struct lu_fid *fid,
3588                                      void *opaque,
3589                                      struct thandle *handle)
3590 {
3591         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3592         __u32 source_mdt_index = *(__u32 *)opaque;
3593         __u32 link_mdt_index;
3594         int rc;
3595
3596         ENTRY;
3597
3598         /* ignore tobj */
3599         if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
3600             !strcmp(tname->ln_name, lname->ln_name))
3601                 return 0;
3602
3603         rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
3604         if (rc)
3605                 RETURN(rc);
3606
3607         RETURN(link_mdt_index == source_mdt_index);
3608 }
3609
3610 static int mdd_iterate_linkea(const struct lu_env *env,
3611                               struct mdd_object *sobj,
3612                               struct mdd_object *tobj,
3613                               const struct lu_name *tname,
3614                               const struct lu_fid *tpfid,
3615                               struct linkea_data *ldata,
3616                               void *opaque,
3617                               struct thandle *handle,
3618                               mdd_linkea_cb cb)
3619 {
3620         struct mdd_thread_info *info = mdd_env_info(env);
3621         char *filename = info->mti_name;
3622         struct lu_name lname;
3623         struct lu_fid fid;
3624         int rc = 0;
3625
3626         if (!ldata->ld_buf)
3627                 return 0;
3628
3629         for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
3630              linkea_next_entry(ldata)) {
3631                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
3632                                     &fid);
3633
3634                 /* Note: lname might miss \0 at the end */
3635                 snprintf(filename, sizeof(info->mti_name), "%.*s",
3636                          lname.ln_namelen, lname.ln_name);
3637                 lname.ln_name = filename;
3638
3639                 CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
3640
3641                 rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
3642                         handle);
3643         }
3644
3645         return rc;
3646 }
3647
3648 /**
3649  * Prepare linkea, and check whether file needs migrate: if source still has
3650  * link on source MDT, no need to migrate, just update namespace on source and
3651  * target parents.
3652  *
3653  * \retval      0 do migrate
3654  * \retval      1 don't migrate
3655  * \retval      -errno on failure
3656  */
3657 static int migrate_linkea_prepare(const struct lu_env *env,
3658                                   struct mdd_device *mdd,
3659                                   struct mdd_object *spobj,
3660                                   struct mdd_object *tpobj,
3661                                   struct mdd_object *sobj,
3662                                   const struct lu_name *lname,
3663                                   const struct lu_attr *attr,
3664                                   struct linkea_data *ldata)
3665 {
3666         __u32 source_mdt_index;
3667         int rc;
3668
3669         ENTRY;
3670
3671         memset(ldata, 0, sizeof(*ldata));
3672         rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
3673                                 mdo2fid(tpobj), lname, 1, 0, ldata);
3674         if (rc)
3675                 RETURN(rc);
3676
3677         /*
3678          * Then it will check if the file should be migrated. If the file has
3679          * mulitple links, we only need migrate the file if all of its entries
3680          * has been migrated to the remote MDT.
3681          */
3682         if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
3683                 RETURN(0);
3684
3685         /* If there are still links locally, don't migrate this file */
3686         LASSERT(ldata->ld_leh != NULL);
3687
3688         /*
3689          * If linkEA is overflow, it means there are some unknown name entries
3690          * under unknown parents, which will prevent the migration.
3691          */
3692         if (unlikely(ldata->ld_leh->leh_overflow_time))
3693                 RETURN(-EOVERFLOW);
3694
3695         rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
3696         if (rc)
3697                 RETURN(rc);
3698
3699         rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
3700                                 &source_mdt_index, NULL,
3701                                 mdd_is_link_on_source_mdt);
3702         RETURN(rc);
3703 }
3704
3705 static int mdd_dir_declare_layout_delete(const struct lu_env *env,
3706                                          struct mdd_object *obj,
3707                                          const struct lu_buf *lmv_buf,
3708                                          const struct lu_buf *lmu_buf,
3709                                          struct thandle *handle)
3710 {
3711         int rc;
3712
3713         if (!lmv_buf->lb_buf)
3714                 rc = mdo_declare_index_delete(env, obj, dotdot, handle);
3715         else if (mdd_object_remote(obj))
3716                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3717                                              mdd_dir_declare_delete_stripe);
3718         else
3719                 rc = mdo_declare_xattr_set(env, obj, lmu_buf,
3720                                            XATTR_NAME_LMV".del", 0, handle);
3721
3722         return rc;
3723 }
3724
3725 static int mdd_dir_layout_delete(const struct lu_env *env,
3726                                  struct mdd_object *obj,
3727                                  const struct lu_buf *lmv_buf,
3728                                  const struct lu_buf *lmu_buf,
3729                                  struct thandle *handle)
3730 {
3731         int rc;
3732
3733         ENTRY;
3734
3735         mdd_write_lock(env, obj, MOR_SRC_PARENT);
3736         if (!lmv_buf->lb_buf)
3737                 /* normal dir */
3738                 rc = __mdd_index_delete_only(env, obj, dotdot, handle);
3739         else if (mdd_object_remote(obj))
3740                 /* striped, but remote */
3741                 rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
3742                                              mdd_dir_delete_stripe);
3743         else
3744                 rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
3745                                    handle);
3746         mdd_write_unlock(env, obj);
3747
3748         RETURN(rc);
3749 }
3750
3751 static int mdd_declare_migrate_create(const struct lu_env *env,
3752                                       struct mdd_object *tpobj,
3753                                       struct mdd_object *sobj,
3754                                       struct mdd_object *tobj,
3755                                       const struct lu_name *lname,
3756                                       struct lu_attr *attr,
3757                                       struct lu_buf *sbuf,
3758                                       struct linkea_data *ldata,
3759                                       struct md_op_spec *spec,
3760                                       struct dt_allocation_hint *hint,
3761                                       struct thandle *handle)
3762 {
3763         struct mdd_thread_info *info = mdd_env_info(env);
3764         struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
3765         int rc;
3766
3767         if (S_ISDIR(attr->la_mode)) {
3768                 struct lu_buf lmu_buf = { NULL };
3769
3770                 if (lmv) {
3771                         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
3772
3773                         lmu->lum_stripe_count = 0;
3774                         lmu_buf.lb_buf = lmu;
3775                         lmu_buf.lb_len = sizeof(*lmu);
3776                 }
3777
3778                 rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
3779                                                 handle);
3780                 if (rc)
3781                         return rc;
3782
3783                 if (lmv) {
3784                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3785                                                    handle);
3786                         if (rc)
3787                                 return rc;
3788                 }
3789         }
3790
3791         rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
3792                                 lname, attr, handle, spec, ldata, NULL, NULL,
3793                                 hint);
3794         if (rc)
3795                 return rc;
3796
3797         if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
3798                 if (!lmv) {
3799                         /*
3800                          * if sobj is not striped, fake a 1-stripe LMV, which
3801                          * will be used to generate a compound LMV for tobj.
3802                          */
3803                         LASSERT(sizeof(info->mti_key) >
3804                                 lmv_mds_md_size(1, LMV_MAGIC_V1));
3805                         lmv = (typeof(lmv))info->mti_key;
3806                         memset(lmv, 0, sizeof(*lmv));
3807                         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3808                         lmv->lmv_stripe_count = cpu_to_le32(1);
3809                         fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
3810                         sbuf->lb_buf = lmv;
3811                         sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
3812
3813                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3814                                                    XATTR_NAME_LMV".add", 0,
3815                                                    handle);
3816                         sbuf->lb_buf = NULL;
3817                         sbuf->lb_len = 0;
3818                 } else {
3819                         rc = mdo_declare_xattr_set(env, tobj, sbuf,
3820                                                    XATTR_NAME_LMV".add", 0,
3821                                                    handle);
3822                 }
3823                 if (rc)
3824                         return rc;
3825         }
3826
3827         /*
3828          * tobj mode will be used in lod_declare_xattr_set(), but it's not
3829          * createb yet, copy from sobj.
3830          */
3831         tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
3832         tobj->mod_obj.mo_lu.lo_header->loh_attr |=
3833                 sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
3834
3835         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
3836                                 mdo_declare_xattr_set);
3837         if (rc)
3838                 return rc;
3839
3840         if (S_ISREG(attr->la_mode)) {
3841                 handle->th_complex = 1;
3842
3843                 rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
3844                 if (rc)
3845                         return rc;
3846
3847                 /*
3848                  * target is not initalized because its LOV is copied from
3849                  * source in mdd_migrate_create(), declare via sobj.
3850                  */
3851                 rc = mdo_declare_xattr_set(env, sobj, NULL, XATTR_NAME_FID, 0,
3852                                            handle);
3853                 if (rc)
3854                         return rc;
3855         }
3856
3857         if (!S_ISDIR(attr->la_mode)) {
3858                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
3859                                         ldata, NULL, handle,
3860                                         mdd_declare_update_link);
3861                 if (rc)
3862                         return rc;
3863
3864                 if (lmv) {
3865                         rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
3866                                                    handle);
3867                         if (rc)
3868                                 return rc;
3869                 }
3870         }
3871
3872         return rc;
3873 }
3874
3875 /**
3876  * Create target, migrate xattrs and update links.
3877  *
3878  * Create target according to \a spec, and then migrate xattrs, if it's
3879  * directory, migrate source stripes to target, else update fid to target
3880  * for links.
3881  *
3882  * \param[in] env       execution environment
3883  * \param[in] tpobj     target parent object
3884  * \param[in] sobj      source object
3885  * \param[in] tobj      target object
3886  * \param[in] lname     file name
3887  * \param[in] attr      source attributes
3888  * \param[in] sbuf      source LMV buf
3889  * \param[in] ldata     source linkea
3890  * \param[in] spec      migrate create spec
3891  * \param[in] hint      target creation hint
3892  * \param[in] handle    tranasction handle
3893  *
3894  * \retval      0 on success
3895  * \retval      -errno on failure
3896  **/
3897 static int mdd_migrate_create(const struct lu_env *env,
3898                               struct mdd_object *tpobj,
3899                               struct mdd_object *sobj,
3900                               struct mdd_object *tobj,
3901                               const struct lu_name *lname,
3902                               struct lu_attr *attr,
3903                               const struct lu_buf *sbuf,
3904                               struct linkea_data *ldata,
3905                               struct md_op_spec *spec,
3906                               struct dt_allocation_hint *hint,
3907                               struct thandle *handle)
3908 {
3909         int rc;
3910
3911         ENTRY;
3912
3913         /*
3914          * directory will migrate sobj stripes to tobj:
3915          * 1. delete stripes from sobj.
3916          * 2. add stripes to tobj, see lod_dir_declare_layout_add().
3917          * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
3918          */
3919         if (S_ISDIR(attr->la_mode)) {
3920                 struct lu_buf lmu_buf = { NULL };
3921
3922                 if (sbuf->lb_buf) {
3923                         struct mdd_thread_info *info = mdd_env_info(env);
3924                         struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
3925
3926                         lmu->lum_stripe_count = 0;
3927                         lmu_buf.lb_buf = lmu;
3928                         lmu_buf.lb_len = sizeof(*lmu);
3929                 }
3930
3931                 rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
3932                 if (rc)
3933                         RETURN(rc);
3934
3935                 /*
3936                  * delete LMV so that later when destroying sobj it won't delete
3937                  * stripes again.
3938                  */
3939                 if (sbuf->lb_buf) {
3940                         mdd_write_lock(env, sobj, MOR_SRC_CHILD);
3941                         rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
3942                         mdd_write_unlock(env, sobj);
3943                         if (rc)
3944                                 RETURN(rc);
3945                 }
3946         }
3947
3948         /* don't set nlink from sobj */
3949         attr->la_valid &= ~LA_NLINK;
3950
3951         rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
3952                                 handle);
3953         if (rc)
3954                 RETURN(rc);
3955
3956         mdd_write_lock(env, tobj, MOR_TGT_CHILD);
3957         rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
3958         mdd_write_unlock(env, tobj);
3959         if (rc)
3960                 RETURN(rc);
3961
3962         if (S_ISREG(attr->la_mode)) {
3963                 /* delete LOV to avoid deleting OST objs when destroying sobj */
3964                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
3965                 rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
3966                 mdd_write_unlock(env, sobj);
3967                 if (rc)
3968                         RETURN(rc);
3969
3970                 /* for regular file, update OST objects XATTR_NAME_FID */
3971                 rc = mdo_xattr_set(env, tobj, NULL, XATTR_NAME_FID, 0, handle);
3972                 if (rc)
3973                         RETURN(rc);
3974         }
3975
3976         if (!S_ISDIR(attr->la_mode))
3977                 rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
3978                                         ldata, NULL, handle, mdd_update_link);
3979
3980         RETURN(rc);
3981 }
3982
3983 static int mdd_declare_migrate_update(const struct lu_env *env,
3984                                       struct mdd_object *spobj,
3985                                       struct mdd_object *tpobj,
3986                                       struct mdd_object *sobj,
3987                                       struct mdd_object *tobj,
3988                                       const struct lu_name *lname,
3989                                       struct lu_attr *attr,
3990                                       struct lu_attr *spattr,
3991                                       struct lu_attr *tpattr,
3992                                       struct linkea_data *ldata,
3993                                       bool do_create,
3994                                       bool do_destroy,
3995                                       struct md_attr *ma,
3996                                       struct thandle *handle)
3997 {
3998         struct mdd_thread_info *info = mdd_env_info(env);
3999         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4000         struct lu_attr *la = &info->mti_la_for_fix;
4001         int rc;
4002
4003         rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
4004         if (rc)
4005                 return rc;
4006
4007         if (S_ISDIR(attr->la_mode)) {
4008                 rc = mdo_declare_ref_del(env, spobj, handle);
4009                 if (rc)
4010                         return rc;
4011         }
4012
4013         rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4014                                       lname->ln_name, handle);
4015         if (rc)
4016                 return rc;
4017
4018         rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
4019         if (rc)
4020                 return rc;
4021
4022         if (S_ISDIR(attr->la_mode)) {
4023                 rc = mdo_declare_ref_add(env, tpobj, handle);
4024                 if (rc)
4025                         return rc;
4026         }
4027
4028         la->la_valid = LA_CTIME | LA_MTIME;
4029         rc = mdo_declare_attr_set(env, spobj, la, handle);
4030         if (rc)
4031                 return rc;
4032
4033         if (tpobj != spobj) {
4034                 rc = mdo_declare_attr_set(env, tpobj, la, handle);
4035                 if (rc)
4036                         return rc;
4037         }
4038
4039         if (do_create && do_destroy) {
4040                 rc = mdo_declare_ref_del(env, sobj, handle);
4041                 if (rc)
4042                         return rc;
4043
4044                 rc = mdo_declare_destroy(env, sobj, handle);
4045                 if (rc)
4046                         return rc;
4047         }
4048
4049         return rc;
4050 }
4051
4052 /**
4053  * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
4054  **/
4055 static int mdd_migrate_update(const struct lu_env *env,
4056                               struct mdd_object *spobj,
4057                               struct mdd_object *tpobj,
4058                               struct mdd_object *sobj,
4059                               struct mdd_object *tobj,
4060                               const struct lu_name *lname,
4061                               struct lu_attr *attr,
4062                               struct lu_attr *spattr,
4063                               struct lu_attr *tpattr,
4064                               struct linkea_data *ldata,
4065                               bool do_create,
4066                               bool do_destroy,
4067                               struct md_attr *ma,
4068                               struct thandle *handle)
4069 {
4070         struct mdd_thread_info *info = mdd_env_info(env);
4071         const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
4072         struct lu_attr *la = &info->mti_la_for_fix;
4073         int rc;
4074
4075         ENTRY;
4076
4077         CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
4078                lname->ln_name, PFID(mdo2fid(spobj)),
4079                PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
4080                PFID(fid));
4081
4082         rc = __mdd_index_delete(env, spobj, lname->ln_name,
4083                                 S_ISDIR(attr->la_mode), handle);
4084         if (rc)
4085                 RETURN(rc);
4086
4087         rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
4088                                 lname->ln_name, handle);
4089         if (rc)
4090                 RETURN(rc);
4091
4092         rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
4093         if (rc)
4094                 RETURN(rc);
4095
4096         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
4097         la->la_valid = LA_CTIME | LA_MTIME;
4098         mdd_write_lock(env, spobj, MOR_SRC_PARENT);
4099         rc = mdd_update_time(env, spobj, spattr, la, handle);
4100         mdd_write_unlock(env, spobj);
4101         if (rc)
4102                 RETURN(rc);
4103
4104         if (tpobj != spobj) {
4105                 la->la_valid = LA_CTIME | LA_MTIME;
4106                 mdd_write_lock(env, tpobj, MOR_TGT_PARENT);
4107                 rc = mdd_update_time(env, tpobj, tpattr, la, handle);
4108                 mdd_write_unlock(env, tpobj);
4109                 if (rc)
4110                         RETURN(rc);
4111         }
4112
4113         /*
4114          * there are three situations we shouldn't destroy source:
4115          * 1. if source is not dir, and it happens to be located on the same MDT
4116          *    as target parent.
4117          * 2. if source is not dir, and has link on the same MDT where source is
4118          *    located.
4119          * 3. if source is dir, and it's a normal, non-empty dir.
4120          *
4121          * the first two situations equals to !do_create, and the 3rd equals to
4122          * !do_destroy, so the below condition is actually
4123          * !(!do_create || !do_destroy).
4124          *
4125          * NB, if user has opened source dir before migration, he will get
4126          * -ENOENT error when close it later, because source is likely to be
4127          *  remote, which can't be moved to orphan list, but except this error
4128          *  message, this won't cause any inconsistency or trouble.
4129          */
4130         if (do_create && do_destroy) {
4131                 mdd_write_lock(env, sobj, MOR_SRC_CHILD);
4132                 mdo_ref_del(env, sobj, handle);
4133                 rc = mdo_destroy(env, sobj, handle);
4134                 mdd_write_unlock(env, sobj);
4135         }
4136
4137         RETURN(rc);
4138 }
4139
4140 /**
4141  * Migrate directory or file.
4142  *
4143  * migrate source to target in following steps:
4144  *   1. create target, append source stripes after target's if it's directory,
4145  *      migrate xattrs and update fid of source links.
4146  *   2. update namespace: migrate dirent from source parent to target parent,
4147  *      update file linkea, and destroy source if it's not needed any more.
4148  *
4149  * \param[in] env       execution environment
4150  * \param[in] md_pobj   parent master object
4151  * \param[in] md_sobj   source object
4152  * \param[in] lname     file name
4153  * \param[in] md_tobj   target object
4154  * \param[in] spec      target creation spec
4155  * \param[in] ma        used to update \a pobj mtime and ctime
4156  *
4157  * \retval              0 on success
4158  * \retval              -errno on failure
4159  */
4160 static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
4161                        struct md_object *md_sobj, const struct lu_name *lname,
4162                        struct md_object *md_tobj, struct md_op_spec *spec,
4163                        struct md_attr *ma)
4164 {
4165         struct mdd_device *mdd = mdo2mdd(md_pobj);
4166         struct mdd_thread_info *info = mdd_env_info(env);
4167         struct mdd_object *pobj = md2mdd_obj(md_pobj);
4168         struct mdd_object *sobj = md2mdd_obj(md_sobj);
4169         struct mdd_object *tobj = md2mdd_obj(md_tobj);
4170         struct mdd_object *spobj = NULL;
4171         struct mdd_object *tpobj = NULL;
4172         struct lu_attr *spattr = &info->mti_pattr;
4173         struct lu_attr *tpattr = &info->mti_tpattr;
4174         struct lu_attr *attr = &info->mti_cattr;
4175         struct linkea_data *ldata = &info->mti_link_data;
4176         struct dt_allocation_hint *hint = &info->mti_hint;
4177         struct lu_fid *fid = &info->mti_fid2;
4178         struct lu_buf pbuf = { NULL };
4179         struct lu_buf sbuf = { NULL };
4180         struct lmv_mds_md_v1 *plmv;
4181         struct thandle *handle;
4182         bool do_create = true;
4183         bool do_destroy = true;
4184         int rc;
4185         ENTRY;
4186
4187         rc = mdd_la_get(env, sobj, attr);
4188         if (rc)
4189                 RETURN(rc);
4190
4191         /* locate source and target stripe on pobj, which are the real parent */
4192         rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
4193         if (rc < 0 && rc != -ENODATA)
4194                 RETURN(rc);
4195
4196         plmv = pbuf.lb_buf;
4197         if (plmv) {
4198                 __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
4199                 __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
4200                 int index;
4201
4202                 /* locate target parent stripe */
4203                 if (hash_type & LMV_HASH_FLAG_MIGRATION) {
4204                         /*
4205                          * fail check here to make sure top dir migration
4206                          * succeed.
4207                          */
4208                         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
4209                                 GOTO(out, rc = -EIO);
4210                         hash_type &= ~LMV_HASH_FLAG_MIGRATION;
4211                         count = le32_to_cpu(plmv->lmv_migrate_offset);
4212                 }
4213                 index = lmv_name_to_stripe_index(hash_type, count,
4214                                                  lname->ln_name,
4215                                                  lname->ln_namelen);
4216                 if (index < 0)
4217                         GOTO(out, rc = index);
4218
4219                 fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4220                 tpobj = mdd_object_find(env, mdd, fid);
4221                 if (IS_ERR(tpobj))
4222                         GOTO(out, rc = PTR_ERR(tpobj));
4223
4224                 /* locate source parent stripe */
4225                 if (le32_to_cpu(plmv->lmv_hash_type) &
4226                     LMV_HASH_FLAG_MIGRATION) {
4227                         hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
4228                         count = le32_to_cpu(plmv->lmv_stripe_count) -
4229                                 le32_to_cpu(plmv->lmv_migrate_offset);
4230
4231                         index = lmv_name_to_stripe_index(hash_type, count,
4232                                                          lname->ln_name,
4233                                                          lname->ln_namelen);
4234                         if (index < 0) {
4235                                 mdd_object_put(env, tpobj);
4236                                 GOTO(out, rc = index);
4237                         }
4238
4239                         index += le32_to_cpu(plmv->lmv_migrate_offset);
4240                         fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
4241                         spobj = mdd_object_find(env, mdd, fid);
4242                         if (IS_ERR(spobj)) {
4243                                 mdd_object_put(env, tpobj);
4244                                 GOTO(out, rc = PTR_ERR(spobj));
4245                         }
4246                 } else {
4247                         spobj = tpobj;
4248                         mdd_object_get(spobj);
4249                 }
4250         } else {
4251                 tpobj = pobj;
4252                 spobj = pobj;
4253                 mdd_object_get(tpobj);
4254                 mdd_object_get(spobj);
4255         }
4256
4257         rc = mdd_la_get(env, spobj, spattr);
4258         if (rc)
4259                 GOTO(out, rc);
4260
4261         rc = mdd_la_get(env, tpobj, tpattr);
4262         if (rc)
4263                 GOTO(out, rc);
4264
4265         if (S_ISDIR(attr->la_mode)) {
4266                 struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
4267
4268                 LASSERT(lmu);
4269
4270                 /*
4271                  * if user use default value '0' for stripe_count, we need to
4272                  * adjust it to '1' to create a 1-stripe directory.
4273                  */
4274                 if (lmu->lum_stripe_count == 0) {
4275                         /* eadata is from request, don't alter it */
4276                         info->mti_lmu = *lmu;
4277                         info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
4278                         spec->u.sp_ea.eadata = &info->mti_lmu;
4279                         lmu = spec->u.sp_ea.eadata;
4280                 }
4281
4282                 rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
4283                 if (rc == -ENODATA) {
4284                         if (mdd_dir_is_empty(env, sobj) == 0) {
4285                                 /*
4286                                  * if sobj is empty, and target is not striped,
4287                                  * create target as a normal directory.
4288                                  */
4289                                 if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
4290                                         info->mti_lmu = *lmu;
4291                                         info->mti_lmu.lum_stripe_count = 0;
4292                                         spec->u.sp_ea.eadata = &info->mti_lmu;
4293                                         lmu = spec->u.sp_ea.eadata;
4294                                 }
4295                         } else {
4296                                 /*
4297                                  * sobj is not striped dir, if it's not empty,
4298                                  * it will be migrated to be a stripe of target,
4299                                  * don't destroy it after migration.
4300                                  */
4301                                 do_destroy = false;
4302                         }
4303                 } else if (rc) {
4304                         GOTO(out, rc);
4305                 } else {
4306                         struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
4307
4308                         if (le32_to_cpu(lmv->lmv_hash_type) &
4309                             LMV_HASH_FLAG_MIGRATION) {
4310                                 __u32 lum_stripe_count = lmu->lum_stripe_count;
4311                                 __u32 lmv_hash_type = lmv->lmv_hash_type &
4312                                         cpu_to_le32(LMV_HASH_TYPE_MASK);
4313
4314                                 if (!lum_stripe_count)
4315                                         lum_stripe_count = cpu_to_le32(1);
4316
4317                                 /* TODO: check specific MDTs */
4318                                 if (lmv->lmv_migrate_offset !=
4319                                     lum_stripe_count ||
4320                                     lmv->lmv_master_mdt_index !=
4321                                     lmu->lum_stripe_offset ||
4322                                     (lmv_hash_type != 0 &&
4323                                      lmv_hash_type != lmu->lum_hash_type)) {
4324                                         CERROR("%s: \'"DNAME"\' migration was "
4325                                                 "interrupted, run \'lfs migrate "
4326                                                 "-m %d -c %d -H %d "DNAME"\' to "
4327                                                 "finish migration.\n",
4328                                                 mdd2obd_dev(mdd)->obd_name,
4329                                                 PNAME(lname),
4330                                                 le32_to_cpu(
4331                                                     lmv->lmv_master_mdt_index),
4332                                                 le32_to_cpu(
4333                                                     lmv->lmv_migrate_offset),
4334                                                 le32_to_cpu(lmv_hash_type),
4335                                                 PNAME(lname));
4336                                         GOTO(out, rc = -EPERM);
4337                                 }
4338                                 GOTO(out, rc = -EALREADY);
4339                         }
4340                 }
4341         } else if (!mdd_object_remote(tpobj)) {
4342                 /*
4343                  * if source is already on MDT where target parent is located,
4344                  * no need to create, just update namespace.
4345                  */
4346                 do_create = false;
4347         } else if (S_ISLNK(attr->la_mode)) {
4348                 lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
4349                 if (!sbuf.lb_buf)
4350                         GOTO(out, rc = -ENOMEM);
4351                 rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
4352                 if (rc <= 0) {
4353                         rc = rc ?: -EFAULT;
4354                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
4355                                mdd2obd_dev(mdd)->obd_name,
4356                                PFID(mdo2fid(sobj)), rc);
4357                         GOTO(out, rc);
4358                 }
4359                 spec->u.sp_symname = sbuf.lb_buf;
4360         } else if (S_ISREG(attr->la_mode)) {
4361                 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
4362                 spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
4363         }
4364
4365         /*
4366          * if sobj has link on the same MDT, no need to create, just update
4367          * namespace, and it will be a remote file on target parent, which is
4368          * similar to rename.
4369          */
4370         rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
4371                                     ldata);
4372         if (rc > 0)
4373                 do_create = false;
4374         else if (rc)
4375                 GOTO(out, rc);
4376
4377         rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
4378                                       spattr, tpattr, attr);
4379         if (rc)
4380                 GOTO(out, rc);
4381
4382         mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
4383
4384         handle = mdd_trans_create(env, mdd);
4385         if (IS_ERR(handle))
4386                 GOTO(out, rc = PTR_ERR(handle));
4387
4388         if (do_create) {
4389                 rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
4390                                                 attr, &sbuf, ldata, spec, hint,
4391                                                 handle);
4392                 if (rc)
4393                         GOTO(stop_trans, rc);
4394         }
4395
4396         rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
4397                                         attr, spattr, tpattr, ldata, do_create,
4398                                         do_destroy, ma, handle);
4399         if (rc)
4400                 GOTO(stop_trans, rc);
4401
4402         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4403                                          handle);
4404         if (rc)
4405                 GOTO(stop_trans, rc);
4406
4407         rc = mdd_trans_start(env, mdd, handle);
4408         if (rc)
4409                 GOTO(stop_trans, rc);
4410
4411         if (do_create) {
4412                 rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
4413                                         &sbuf, ldata, spec, hint, handle);
4414                 if (rc)
4415                         GOTO(stop_trans, rc);
4416         }
4417
4418         rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
4419                                 spattr, tpattr, ldata, do_create, do_destroy,
4420                                 ma, handle);
4421         if (rc)
4422                 GOTO(stop_trans, rc);
4423
4424         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
4425                                     mdo2fid(spobj), mdo2fid(sobj),
4426                                     mdo2fid(tpobj), lname, lname, handle);
4427         if (rc)
4428                 GOTO(stop_trans, rc);
4429
4430         EXIT;
4431 stop_trans:
4432         rc = mdd_trans_stop(env, mdd, rc, handle);
4433 out:
4434         if (spobj && !IS_ERR(spobj))
4435                 mdd_object_put(env, spobj);
4436         if (tpobj && !IS_ERR(tpobj))
4437                 mdd_object_put(env, tpobj);
4438         lu_buf_free(&sbuf);
4439         lu_buf_free(&pbuf);
4440         return rc;
4441 }
4442
4443 const struct md_dir_operations mdd_dir_ops = {
4444         .mdo_is_subdir     = mdd_is_subdir,
4445         .mdo_lookup        = mdd_lookup,
4446         .mdo_create        = mdd_create,
4447         .mdo_rename        = mdd_rename,
4448         .mdo_link          = mdd_link,
4449         .mdo_unlink        = mdd_unlink,
4450         .mdo_create_data   = mdd_create_data,
4451         .mdo_migrate       = mdd_migrate,
4452 };