Whamcloud - gitweb
LU-10680 mdd: create gc thread when no current transaction
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         .ln_name        = (char *) dotdot,
53         .ln_namelen     = sizeof(dotdot) - 1,
54 };
55
56 static inline int
57 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
58 {
59         if (!lu_name_is_valid(ln))
60                 return -EINVAL;
61         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
62                 return -ENAMETOOLONG;
63         else
64                 return 0;
65 }
66
67 /* Get FID from name and parent */
68 static int
69 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
70              const struct lu_attr *pattr, const struct lu_name *lname,
71              struct lu_fid* fid, int mask)
72 {
73         const char *name                = lname->ln_name;
74         const struct dt_key *key        = (const struct dt_key *)name;
75         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
76         struct mdd_device *m            = mdo2mdd(pobj);
77         struct dt_object *dir           = mdd_object_child(mdd_obj);
78         int rc;
79         ENTRY;
80
81         if (unlikely(mdd_is_dead_obj(mdd_obj)))
82                 RETURN(-ESTALE);
83
84         if (!mdd_object_exists(mdd_obj))
85                 RETURN(-ESTALE);
86
87         if (mdd_object_remote(mdd_obj)) {
88                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
89                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
90         }
91
92         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
93                                             MOR_TGT_PARENT);
94         if (rc)
95                 RETURN(rc);
96
97         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
98                    dt_try_as_dir(env, dir)))
99                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
100         else
101                 rc = -ENOTDIR;
102
103         RETURN(rc);
104 }
105
106 int mdd_lookup(const struct lu_env *env,
107                struct md_object *pobj, const struct lu_name *lname,
108                struct lu_fid *fid, struct md_op_spec *spec)
109 {
110         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
111         int rc;
112         ENTRY;
113
114         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
115         if (rc != 0)
116                 RETURN(rc);
117
118         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
119                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
120         RETURN(rc);
121 }
122
123 /** Read the link EA into a temp buffer.
124  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
125  * A pointer to the buffer is stored in \a ldata::ld_buf.
126  *
127  * \retval 0 or error
128  */
129 static int __mdd_links_read(const struct lu_env *env,
130                             struct mdd_object *mdd_obj,
131                             struct linkea_data *ldata)
132 {
133         int rc;
134
135         if (!mdd_object_exists(mdd_obj))
136                 return -ENODATA;
137
138         /* First try a small buf */
139         LASSERT(env != NULL);
140         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
141                                                PAGE_SIZE);
142         if (ldata->ld_buf->lb_buf == NULL)
143                 return -ENOMEM;
144
145         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
146         if (rc == -ERANGE) {
147                 /* Buf was too small, figure out what we need. */
148                 lu_buf_free(ldata->ld_buf);
149                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
150                                    XATTR_NAME_LINK);
151                 if (rc < 0)
152                         return rc;
153                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
154                 if (ldata->ld_buf->lb_buf == NULL)
155                         return -ENOMEM;
156                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
157                                   XATTR_NAME_LINK);
158         }
159         if (rc < 0) {
160                 lu_buf_free(ldata->ld_buf);
161                 ldata->ld_buf = NULL;
162                 return rc;
163         }
164
165         return linkea_init(ldata);
166 }
167
168 static int mdd_links_read(const struct lu_env *env,
169                           struct mdd_object *mdd_obj,
170                           struct linkea_data *ldata)
171 {
172         int rc;
173
174         rc = __mdd_links_read(env, mdd_obj, ldata);
175         if (!rc)
176                 rc = linkea_init(ldata);
177
178         return rc;
179 }
180
181 static int mdd_links_read_with_rec(const struct lu_env *env,
182                                    struct mdd_object *mdd_obj,
183                                    struct linkea_data *ldata)
184 {
185         int rc;
186
187         rc = __mdd_links_read(env, mdd_obj, ldata);
188         if (!rc)
189                 rc = linkea_init_with_rec(ldata);
190
191         return rc;
192 }
193
194 /**
195  * Get parent FID of the directory
196  *
197  * Read parent FID from linkEA, if that fails, then do lookup
198  * dotdot to get the parent FID.
199  *
200  * \param[in] env       execution environment
201  * \param[in] obj       object from which to find the parent FID
202  * \param[in] attr      attribute of the object
203  * \param[out] fid      fid to get the parent FID
204  *
205  * \retval              0 if getting the parent FID succeeds.
206  * \retval              negative errno if getting the parent FID fails.
207  **/
208 static inline int mdd_parent_fid(const struct lu_env *env,
209                                  struct mdd_object *obj,
210                                  const struct lu_attr *attr,
211                                  struct lu_fid *fid)
212 {
213         struct mdd_thread_info  *info = mdd_env_info(env);
214         struct linkea_data      ldata = { NULL };
215         struct lu_buf           *buf = &info->mti_link_buf;
216         struct lu_name          lname;
217         int                     rc = 0;
218
219         ENTRY;
220
221         LASSERT(S_ISDIR(mdd_object_type(obj)));
222
223         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
224         if (buf->lb_buf == NULL)
225                 GOTO(lookup, rc = 0);
226
227         ldata.ld_buf = buf;
228         rc = mdd_links_read_with_rec(env, obj, &ldata);
229         if (rc != 0)
230                 GOTO(lookup, rc);
231
232         LASSERT(ldata.ld_leh != NULL);
233         /* Directory should only have 1 parent */
234         if (ldata.ld_leh->leh_reccount > 1)
235                 GOTO(lookup, rc);
236
237         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
238
239         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
240         if (likely(fid_is_sane(fid)))
241                 RETURN(0);
242 lookup:
243         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
244         RETURN(rc);
245 }
246
247 /*
248  * For root fid use special function, which does not compare version component
249  * of fid. Version component is different for root fids on all MDTs.
250  */
251 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
252 {
253         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
254                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
255 }
256
257 /*
258  * return 1: if lf is the fid of the ancestor of p1;
259  * return 0: if not;
260  *
261  * return -EREMOTE: if remote object is found, in this
262  * case fid of remote object is saved to @pf;
263  *
264  * otherwise: values < 0, errors.
265  */
266 static int mdd_is_parent(const struct lu_env *env,
267                         struct mdd_device *mdd,
268                         struct mdd_object *p1,
269                         const struct lu_attr *attr,
270                         const struct lu_fid *lf,
271                         struct lu_fid *pf)
272 {
273         struct mdd_object *parent = NULL;
274         struct lu_fid *pfid;
275         int rc;
276         ENTRY;
277
278         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
279         pfid = &mdd_env_info(env)->mti_fid;
280
281         /* Check for root first. */
282         if (mdd_is_root(mdd, mdo2fid(p1)))
283                 RETURN(0);
284
285         for(;;) {
286                 /* this is done recursively */
287                 rc = mdd_parent_fid(env, p1, attr, pfid);
288                 if (rc)
289                         GOTO(out, rc);
290                 if (mdd_is_root(mdd, pfid))
291                         GOTO(out, rc = 0);
292                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
293                         GOTO(out, rc = 0);
294                 if (lu_fid_eq(pfid, lf))
295                         GOTO(out, rc = 1);
296                 if (parent != NULL)
297                         mdd_object_put(env, parent);
298
299                 parent = mdd_object_find(env, mdd, pfid);
300                 if (IS_ERR(parent))
301                         GOTO(out, rc = PTR_ERR(parent));
302
303                 if (!mdd_object_exists(parent))
304                         GOTO(out, rc = -EINVAL);
305
306                 p1 = parent;
307         }
308         EXIT;
309 out:
310         if (parent && !IS_ERR(parent))
311                 mdd_object_put(env, parent);
312         return rc;
313 }
314
315 /*
316  * No permission check is needed.
317  *
318  * returns 1: if fid is ancestor of @mo;
319  * returns 0: if fid is not an ancestor of @mo;
320  *
321  * returns EREMOTE if remote object is found, fid of remote object is saved to
322  * @fid;
323  *
324  * returns < 0: if error
325  */
326 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
327                   const struct lu_fid *fid, struct lu_fid *sfid)
328 {
329         struct mdd_device *mdd = mdo2mdd(mo);
330         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
331         int rc;
332         ENTRY;
333
334         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
335                 RETURN(0);
336
337         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
338         if (rc != 0)
339                 RETURN(rc);
340
341         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
342         if (rc == 0) {
343                 /* found root */
344                 fid_zero(sfid);
345         } else if (rc == 1) {
346                 /* found @fid is parent */
347                 *sfid = *fid;
348                 rc = 0;
349         }
350         RETURN(rc);
351 }
352
353 /*
354  * Check that @dir contains no entries except (possibly) dot and dotdot.
355  *
356  * Returns:
357  *
358  *             0        empty
359  *      -ENOTDIR        not a directory object
360  *    -ENOTEMPTY        not empty
361  *           -ve        other error
362  *
363  */
364 static int mdd_dir_is_empty(const struct lu_env *env,
365                             struct mdd_object *dir)
366 {
367         struct dt_it     *it;
368         struct dt_object *obj;
369         const struct dt_it_ops *iops;
370         int result;
371         ENTRY;
372
373         obj = mdd_object_child(dir);
374         if (!dt_try_as_dir(env, obj))
375                 RETURN(-ENOTDIR);
376
377         iops = &obj->do_index_ops->dio_it;
378         it = iops->init(env, obj, LUDA_64BITHASH);
379         if (!IS_ERR(it)) {
380                 result = iops->get(env, it, (const struct dt_key *)"");
381                 if (result > 0) {
382                         int i;
383                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
384                                 result = iops->next(env, it);
385                         if (result == 0)
386                                 result = -ENOTEMPTY;
387                         else if (result == 1)
388                                 result = 0;
389                 } else if (result == 0)
390                         /*
391                          * Huh? Index contains no zero key?
392                          */
393                         result = -EIO;
394
395                 iops->put(env, it);
396                 iops->fini(env, it);
397         } else
398                 result = PTR_ERR(it);
399         RETURN(result);
400 }
401
402 /**
403  * Determine if the target object can be hard linked, and right now it only
404  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
405  * directory nlink count might exceed the maximum link count(see
406  * osd_object_ref_add), so it only check nlink for non-directories.
407  *
408  * \param[in] env       thread environment
409  * \param[in] obj       object being linked to
410  * \param[in] la        attributes of \a obj
411  *
412  * \retval              0 if \a obj can be hard linked
413  * \retval              negative error if \a obj is a directory or has too
414  *                      many links
415  */
416 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
417                           const struct lu_attr *la)
418 {
419         struct mdd_device *m = mdd_obj2mdd_dev(obj);
420         ENTRY;
421
422         LASSERT(la != NULL);
423
424         /* Subdir count limitation can be broken through
425          * (see osd_object_ref_add), so only check non-directory here. */
426         if (!S_ISDIR(la->la_mode) &&
427             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
428                 RETURN(-EMLINK);
429
430         RETURN(0);
431 }
432
433 /**
434  * Check whether it may create the cobj under the pobj.
435  *
436  * \param[in] env       execution environment
437  * \param[in] pobj      the parent directory
438  * \param[in] pattr     the attribute of the parent directory
439  * \param[in] cobj      the child to be created
440  * \param[in] check_perm        if check WRITE|EXEC permission for parent
441  *
442  * \retval              = 0 create the child under this dir is allowed
443  * \retval              negative errno create the child under this dir is
444  *                      not allowed
445  */
446 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
447                    const struct lu_attr *pattr, struct mdd_object *cobj,
448                    bool check_perm)
449 {
450         int rc = 0;
451         ENTRY;
452
453         if (cobj && mdd_object_exists(cobj))
454                 RETURN(-EEXIST);
455
456         if (mdd_is_dead_obj(pobj))
457                 RETURN(-ENOENT);
458
459         if (check_perm)
460                 rc = mdd_permission_internal_locked(env, pobj, pattr,
461                                                     MAY_WRITE | MAY_EXEC,
462                                                     MOR_TGT_PARENT);
463         RETURN(rc);
464 }
465
466 /*
467  * Check whether can unlink from the pobj in the case of "cobj == NULL".
468  */
469 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
470                    const struct lu_attr *pattr, const struct lu_attr *attr)
471 {
472         int rc;
473         ENTRY;
474
475         if (mdd_is_dead_obj(pobj))
476                 RETURN(-ENOENT);
477
478         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
479                 RETURN(-EPERM);
480
481         rc = mdd_permission_internal_locked(env, pobj, pattr,
482                                             MAY_WRITE | MAY_EXEC,
483                                             MOR_TGT_PARENT);
484         if (rc != 0)
485                 RETURN(rc);
486
487         if (pattr->la_flags & LUSTRE_APPEND_FL)
488                 RETURN(-EPERM);
489
490         RETURN(rc);
491 }
492
493 /*
494  * pobj == NULL is remote ops case, under such case, pobj's
495  * VTX feature has been checked already, no need check again.
496  */
497 static inline int mdd_is_sticky(const struct lu_env *env,
498                                 struct mdd_object *pobj,
499                                 const struct lu_attr *pattr,
500                                 struct mdd_object *cobj,
501                                 const struct lu_attr *cattr)
502 {
503         struct lu_ucred *uc = lu_ucred_assert(env);
504
505         if (pobj != NULL) {
506                 LASSERT(pattr != NULL);
507                 if (!(pattr->la_mode & S_ISVTX) ||
508                     (pattr->la_uid == uc->uc_fsuid))
509                         return 0;
510         }
511
512         LASSERT(cattr != NULL);
513         if (cattr->la_uid == uc->uc_fsuid)
514                 return 0;
515
516         return !md_capable(uc, CFS_CAP_FOWNER);
517 }
518
519 static int mdd_may_delete_entry(const struct lu_env *env,
520                                 struct mdd_object *pobj,
521                                 const struct lu_attr *pattr,
522                                 int check_perm)
523 {
524         ENTRY;
525
526         LASSERT(pobj != NULL);
527         if (!mdd_object_exists(pobj))
528                 RETURN(-ENOENT);
529
530         if (mdd_is_dead_obj(pobj))
531                 RETURN(-ENOENT);
532
533         if (check_perm) {
534                 int rc;
535                 rc = mdd_permission_internal_locked(env, pobj, pattr,
536                                             MAY_WRITE | MAY_EXEC,
537                                             MOR_TGT_PARENT);
538                 if (rc)
539                         RETURN(rc);
540         }
541
542         if (pattr->la_flags & LUSTRE_APPEND_FL)
543                 RETURN(-EPERM);
544
545         RETURN(0);
546 }
547
548 /*
549  * Check whether it may delete the cobj from the pobj.
550  * pobj maybe NULL
551  */
552 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
553                    const struct lu_attr *tpattr, struct mdd_object *tobj,
554                    const struct lu_attr *tattr, const struct lu_attr *cattr,
555                    int check_perm, int check_empty)
556 {
557         int rc = 0;
558         ENTRY;
559
560         if (tpobj) {
561                 LASSERT(tpattr != NULL);
562                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
563                 if (rc != 0)
564                         RETURN(rc);
565         }
566
567         if (tobj == NULL)
568                 RETURN(0);
569
570         if (!mdd_object_exists(tobj))
571                 RETURN(-ENOENT);
572
573         if (mdd_is_dead_obj(tobj))
574                 RETURN(-ESTALE);
575
576         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
577                 RETURN(-EPERM);
578
579         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
580                 RETURN(-EPERM);
581
582         /* additional check the rename case */
583         if (cattr) {
584                 if (S_ISDIR(cattr->la_mode)) {
585                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
586
587                         if (!S_ISDIR(tattr->la_mode))
588                                 RETURN(-ENOTDIR);
589
590                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
591                                 RETURN(-EBUSY);
592                 } else if (S_ISDIR(tattr->la_mode))
593                         RETURN(-EISDIR);
594         }
595
596         if (S_ISDIR(tattr->la_mode) && check_empty)
597                 rc = mdd_dir_is_empty(env, tobj);
598
599         RETURN(rc);
600 }
601
602 /**
603  * Check whether it can create the link file(linked to @src_obj) under
604  * the target directory(@tgt_obj), and src_obj has been locked by
605  * mdd_write_lock.
606  *
607  * \param[in] env       execution environment
608  * \param[in] tgt_obj   the target directory
609  * \param[in] tattr     attributes of target directory
610  * \param[in] lname     the link name
611  * \param[in] src_obj   source object for link
612  * \param[in] cattr     attributes for source object
613  *
614  * \retval              = 0 it is allowed to create the link file under tgt_obj
615  * \retval              negative error not allowed to create the link file
616  */
617 static int mdd_link_sanity_check(const struct lu_env *env,
618                                  struct mdd_object *tgt_obj,
619                                  const struct lu_attr *tattr,
620                                  const struct lu_name *lname,
621                                  struct mdd_object *src_obj,
622                                  const struct lu_attr *cattr)
623 {
624         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
625         int rc = 0;
626         ENTRY;
627
628         if (!mdd_object_exists(src_obj))
629                 RETURN(-ENOENT);
630
631         if (mdd_is_dead_obj(src_obj))
632                 RETURN(-ESTALE);
633
634         /* Local ops, no lookup before link, check filename length here. */
635         rc = mdd_name_check(m, lname);
636         if (rc < 0)
637                 RETURN(rc);
638
639         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
640                 RETURN(-EPERM);
641
642         if (S_ISDIR(mdd_object_type(src_obj)))
643                 RETURN(-EPERM);
644
645         LASSERT(src_obj != tgt_obj);
646         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
647         if (rc != 0)
648                 RETURN(rc);
649
650         rc = __mdd_may_link(env, src_obj, cattr);
651
652         RETURN(rc);
653 }
654
655 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
656                                    const char *name, struct thandle *handle)
657 {
658         struct dt_object *next = mdd_object_child(pobj);
659         int rc;
660         ENTRY;
661
662         if (dt_try_as_dir(env, next))
663                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
664         else
665                 rc = -ENOTDIR;
666
667         RETURN(rc);
668 }
669
670 static int __mdd_index_insert_only(const struct lu_env *env,
671                                    struct mdd_object *pobj,
672                                    const struct lu_fid *lf, __u32 type,
673                                    const char *name,
674                                    struct thandle *handle)
675 {
676         struct dt_object *next = mdd_object_child(pobj);
677         int               rc;
678         ENTRY;
679
680         if (dt_try_as_dir(env, next)) {
681                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
682                 struct lu_ucred         *uc  = lu_ucred_check(env);
683                 int                      ignore_quota;
684
685                 rec->rec_fid = lf;
686                 rec->rec_type = type;
687                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
688                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
689                                (const struct dt_key *)name, handle,
690                                ignore_quota);
691         } else {
692                 rc = -ENOTDIR;
693         }
694         RETURN(rc);
695 }
696
697 /* insert named index, add reference if isdir */
698 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
699                               const struct lu_fid *lf, __u32 type,
700                               const char *name, struct thandle *handle)
701 {
702         int rc;
703         ENTRY;
704
705         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
706         if (rc == 0 && S_ISDIR(type)) {
707                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
708                 mdo_ref_add(env, pobj, handle);
709                 mdd_write_unlock(env, pobj);
710         }
711
712         RETURN(rc);
713 }
714
715 /* delete named index, drop reference if isdir */
716 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
717                               const char *name, int is_dir,
718                               struct thandle *handle)
719 {
720         int               rc;
721         ENTRY;
722
723         rc = __mdd_index_delete_only(env, pobj, name, handle);
724         if (rc == 0 && is_dir) {
725                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
726                 mdo_ref_del(env, pobj, handle);
727                 mdd_write_unlock(env, pobj);
728         }
729
730         RETURN(rc);
731 }
732
733 static int mdd_llog_record_calc_size(const struct lu_env *env,
734                                      const struct lu_name *tname,
735                                      const struct lu_name *sname)
736 {
737         const struct lu_ucred   *uc = lu_ucred(env);
738         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
739         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
740
741         if (sname != NULL)
742                 crf |= CLF_RENAME;
743
744         if (uc != NULL && uc->uc_jobid[0] != '\0')
745                 crf |= CLF_JOBID;
746
747         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
748                              changelog_rec_offset(crf, crfe) +
749                              (tname != NULL ? tname->ln_namelen : 0) +
750                              (sname != NULL ? 1 + sname->ln_namelen : 0));
751 }
752
753 int mdd_declare_changelog_store(const struct lu_env *env,
754                                 struct mdd_device *mdd,
755                                 enum changelog_rec_type type,
756                                 const struct lu_name *tname,
757                                 const struct lu_name *sname,
758                                 struct thandle *handle)
759 {
760         struct obd_device               *obd = mdd2obd_dev(mdd);
761         struct llog_ctxt                *ctxt;
762         struct llog_changelog_rec       *rec;
763         struct lu_buf                   *buf;
764         struct thandle                  *llog_th;
765         int                              reclen;
766         int                              rc;
767
768         if (!mdd_changelog_enabled(env, mdd, type))
769                 return 0;
770
771         reclen = mdd_llog_record_calc_size(env, tname, sname);
772         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
773         if (buf->lb_buf == NULL)
774                 return -ENOMEM;
775
776         rec = buf->lb_buf;
777         rec->cr_hdr.lrh_len = reclen;
778         rec->cr_hdr.lrh_type = CHANGELOG_REC;
779
780         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
781         if (ctxt == NULL)
782                 return -ENXIO;
783
784         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
785         if (IS_ERR(llog_th))
786                 GOTO(out_put, rc = PTR_ERR(llog_th));
787
788         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
789
790 out_put:
791         llog_ctxt_put(ctxt);
792
793         return rc;
794 }
795
796 /** Add a changelog entry \a rec to the changelog llog
797  * \param mdd
798  * \param rec
799  * \param handle - currently ignored since llogs start their own transaction;
800  *              this will hopefully be fixed in llog rewrite
801  * \retval 0 ok
802  */
803 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
804                         struct llog_changelog_rec *rec, struct thandle *th)
805 {
806         struct obd_device       *obd = mdd2obd_dev(mdd);
807         struct llog_ctxt        *ctxt;
808         struct thandle          *llog_th;
809         int                      rc;
810
811         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
812                                             changelog_rec_varsize(&rec->cr));
813
814         /* llog_lvfs_write_rec sets the llog tail len */
815         rec->cr_hdr.lrh_type = CHANGELOG_REC;
816         rec->cr.cr_time = cl_time();
817
818         spin_lock(&mdd->mdd_cl.mc_lock);
819         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
820          * but as long as the MDD transactions are ordered correctly for e.g.
821          * rename conflicts, I don't think this should matter. */
822         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
823         spin_unlock(&mdd->mdd_cl.mc_lock);
824
825         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
826         if (ctxt == NULL)
827                 return -ENXIO;
828
829         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
830         if (IS_ERR(llog_th))
831                 GOTO(out_put, rc = PTR_ERR(llog_th));
832
833         /* nested journal transaction */
834         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
835
836         /* time to recover some space ?? */
837         if (likely(!mdd->mdd_changelog_gc ||
838                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
839                    mdd->mdd_changelog_min_gc_interval >=
840                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
841                 /* save a spin_lock trip */
842                 goto out_put;
843         spin_lock(&mdd->mdd_cl.mc_lock);
844         if (likely(mdd->mdd_changelog_gc &&
845                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
846                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
847                         mdd->mdd_changelog_min_gc_interval)) {
848                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
849                              mdd->mdd_changelog_min_free_cat_entries ||
850                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
851                         CWARN("%s:%s low on changelog_catalog free entries, "
852                               "starting ChangeLog garbage collection thread\n",
853                               obd->obd_name,
854                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
855                                 " simulate" : "");
856
857                         /* indicate further kthread run will occur outside
858                          * right after current journal transaction filling has
859                          * completed
860                          */
861                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
862                 }
863                 /* next check in mdd_changelog_min_gc_interval anyway
864                  */
865                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
866         }
867         spin_unlock(&mdd->mdd_cl.mc_lock);
868 out_put:
869         llog_ctxt_put(ctxt);
870         if (rc > 0)
871                 rc = 0;
872         return rc;
873 }
874
875 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
876                                          const struct lu_fid *sfid,
877                                          const struct lu_fid *spfid,
878                                          const struct lu_name *sname)
879 {
880         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
881         size_t extsize;
882
883         LASSERT(sfid != NULL);
884         LASSERT(spfid != NULL);
885         LASSERT(sname != NULL);
886
887         extsize = sname->ln_namelen + 1;
888
889         rnm->cr_sfid = *sfid;
890         rnm->cr_spfid = *spfid;
891
892         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
893         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
894         rec->cr_namelen += extsize;
895 }
896
897 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
898 {
899         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
900
901         if (jobid == NULL || jobid[0] == '\0')
902                 return;
903
904         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
905 }
906
907 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
908 {
909         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
910
911         ef->cr_extra_flags = eflags;
912 }
913
914 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
915                                     __u64 uid, __u64 gid)
916 {
917         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
918
919         uidgid->cr_uid = uid;
920         uidgid->cr_gid = gid;
921 }
922
923 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
924                                  lnet_nid_t nid)
925 {
926         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
927
928         clnid->cr_nid = nid;
929 }
930
931 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, int flags)
932 {
933         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
934
935         omd->cr_openflags = (__u32)flags;
936 }
937
938 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
939                                    const char *xattr_name)
940 {
941         struct changelog_ext_xattr    *xattr = changelog_rec_xattr(rec);
942
943         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
944 }
945
946 /** Store a namespace change changelog record
947  * If this fails, we must fail the whole transaction; we don't
948  * want the change to commit without the log entry.
949  * \param target - mdd_object of change
950  * \param tpfid - target parent dir/object fid
951  * \param sfid - source object fid
952  * \param spfid - source parent fid
953  * \param tname - target name string
954  * \param sname - source name string
955  * \param handle - transaction handle
956  */
957 int mdd_changelog_ns_store(const struct lu_env *env,
958                            struct mdd_device *mdd,
959                            enum changelog_rec_type type,
960                            enum changelog_rec_flags crf,
961                            struct mdd_object *target,
962                            const struct lu_fid *tpfid,
963                            const struct lu_fid *sfid,
964                            const struct lu_fid *spfid,
965                            const struct lu_name *tname,
966                            const struct lu_name *sname,
967                            struct thandle *handle)
968 {
969         const struct lu_ucred           *uc = lu_ucred(env);
970         struct llog_changelog_rec       *rec;
971         struct lu_buf                   *buf;
972         int                              reclen;
973         __u64                            xflags = CLFE_INVALID;
974         int                              rc;
975         ENTRY;
976
977         if (!mdd_changelog_enabled(env, mdd, type))
978                 RETURN(0);
979
980         LASSERT(tpfid != NULL);
981         LASSERT(tname != NULL);
982         LASSERT(handle != NULL);
983
984         reclen = mdd_llog_record_calc_size(env, tname, sname);
985         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
986         if (buf->lb_buf == NULL)
987                 RETURN(-ENOMEM);
988         rec = buf->lb_buf;
989
990         crf &= CLF_FLAGMASK;
991         crf |= CLF_EXTRA_FLAGS;
992
993         if (uc) {
994                 if (uc->uc_jobid[0] != '\0')
995                         crf |= CLF_JOBID;
996                 xflags |= CLFE_UIDGID;
997                 xflags |= CLFE_NID;
998         }
999
1000         if (sname != NULL)
1001                 crf |= CLF_RENAME;
1002         else
1003                 crf |= CLF_VERSION;
1004
1005         rec->cr.cr_flags = crf;
1006
1007         if (crf & CLF_EXTRA_FLAGS) {
1008                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
1009                 if (xflags & CLFE_UIDGID)
1010                         mdd_changelog_rec_extra_uidgid(&rec->cr,
1011                                                        uc->uc_uid, uc->uc_gid);
1012                 if (xflags & CLFE_NID)
1013                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
1014         }
1015
1016         rec->cr.cr_type = (__u32)type;
1017         rec->cr.cr_pfid = *tpfid;
1018         rec->cr.cr_namelen = tname->ln_namelen;
1019         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1020
1021         if (crf & CLF_RENAME)
1022                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1023
1024         if (crf & CLF_JOBID)
1025                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1026
1027         if (likely(target != NULL)) {
1028                 rec->cr.cr_tfid = *mdo2fid(target);
1029                 target->mod_cltime = ktime_get();
1030         } else {
1031                 fid_zero(&rec->cr.cr_tfid);
1032         }
1033
1034         rc = mdd_changelog_store(env, mdd, rec, handle);
1035         if (rc < 0) {
1036                 CERROR("%s: cannot store changelog record: type = %d, "
1037                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1038                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1039                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1040                 return -EFAULT;
1041         }
1042
1043         return 0;
1044 }
1045
1046 static int __mdd_links_add(const struct lu_env *env,
1047                            struct mdd_object *mdd_obj,
1048                            struct linkea_data *ldata,
1049                            const struct lu_name *lname,
1050                            const struct lu_fid *pfid,
1051                            int first, int check)
1052 {
1053         int rc;
1054
1055         if (ldata->ld_leh == NULL) {
1056                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1057                 if (rc) {
1058                         if (rc != -ENODATA)
1059                                 return rc;
1060                         rc = linkea_data_new(ldata,
1061                                              &mdd_env_info(env)->mti_link_buf);
1062                         if (rc)
1063                                 return rc;
1064                 }
1065         }
1066
1067         if (check) {
1068                 rc = linkea_links_find(ldata, lname, pfid);
1069                 if (rc && rc != -ENOENT)
1070                         return rc;
1071                 if (rc == 0)
1072                         return -EEXIST;
1073         }
1074
1075         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1076                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1077
1078                 *tfid = *pfid;
1079                 tfid->f_ver = ~0;
1080                 linkea_add_buf(ldata, lname, tfid);
1081         }
1082
1083         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1084                 linkea_add_buf(ldata, lname, pfid);
1085
1086         return linkea_add_buf(ldata, lname, pfid);
1087 }
1088
1089 static int __mdd_links_del(const struct lu_env *env,
1090                            struct mdd_object *mdd_obj,
1091                            struct linkea_data *ldata,
1092                            const struct lu_name *lname,
1093                            const struct lu_fid *pfid)
1094 {
1095         int rc;
1096
1097         if (ldata->ld_leh == NULL) {
1098                 rc = mdd_links_read(env, mdd_obj, ldata);
1099                 if (rc)
1100                         return rc;
1101         }
1102
1103         rc = linkea_links_find(ldata, lname, pfid);
1104         if (rc)
1105                 return rc;
1106
1107         linkea_del_buf(ldata, lname);
1108         return 0;
1109 }
1110
1111 static int mdd_linkea_prepare(const struct lu_env *env,
1112                               struct mdd_object *mdd_obj,
1113                               const struct lu_fid *oldpfid,
1114                               const struct lu_name *oldlname,
1115                               const struct lu_fid *newpfid,
1116                               const struct lu_name *newlname,
1117                               int first, int check,
1118                               struct linkea_data *ldata)
1119 {
1120         int rc = 0;
1121         ENTRY;
1122
1123         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1124                 RETURN(0);
1125
1126         LASSERT(oldpfid != NULL || newpfid != NULL);
1127
1128         if (mdd_obj->mod_flags & DEAD_OBJ)
1129                 /* Unnecessary to update linkEA for dead object.  */
1130                 RETURN(0);
1131
1132         if (oldpfid != NULL) {
1133                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1134                 if (rc) {
1135                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1136                                 RETURN(rc);
1137
1138                         /* No changes done. */
1139                         rc = 0;
1140                 }
1141         }
1142
1143         /* If renaming, add the new record */
1144         if (newpfid != NULL)
1145                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1146                                      first, check);
1147
1148         RETURN(rc);
1149 }
1150
1151 int mdd_links_rename(const struct lu_env *env,
1152                      struct mdd_object *mdd_obj,
1153                      const struct lu_fid *oldpfid,
1154                      const struct lu_name *oldlname,
1155                      const struct lu_fid *newpfid,
1156                      const struct lu_name *newlname,
1157                      struct thandle *handle,
1158                      struct linkea_data *ldata,
1159                      int first, int check)
1160 {
1161         int rc = 0;
1162         ENTRY;
1163
1164         if (ldata == NULL) {
1165                 ldata = &mdd_env_info(env)->mti_link_data;
1166                 memset(ldata, 0, sizeof(*ldata));
1167                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1168                                         newpfid, newlname, first, check, ldata);
1169                 if (rc)
1170                         GOTO(out, rc);
1171         }
1172
1173         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1174                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1175
1176         GOTO(out, rc);
1177
1178 out:
1179         if (rc != 0) {
1180                 if (newlname == NULL)
1181                         CERROR("link_ea add failed %d "DFID"\n",
1182                                rc, PFID(mdd_object_fid(mdd_obj)));
1183                 else if (oldpfid == NULL)
1184                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1185                                newlname->ln_namelen, newlname->ln_name, rc,
1186                                PFID(mdd_object_fid(mdd_obj)));
1187                 else if (newpfid == NULL)
1188                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1189                                oldlname->ln_namelen, oldlname->ln_name, rc,
1190                                PFID(mdd_object_fid(mdd_obj)));
1191                 else
1192                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1193                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1194                                newlname->ln_namelen, newlname->ln_name, rc,
1195                                PFID(mdd_object_fid(mdd_obj)));
1196         }
1197
1198         if (is_vmalloc_addr(ldata->ld_buf))
1199                 /* if we vmalloced a large buffer drop it */
1200                 lu_buf_free(ldata->ld_buf);
1201
1202         return rc;
1203 }
1204
1205 static inline int mdd_links_add(const struct lu_env *env,
1206                                 struct mdd_object *mdd_obj,
1207                                 const struct lu_fid *pfid,
1208                                 const struct lu_name *lname,
1209                                 struct thandle *handle,
1210                                 struct linkea_data *ldata, int first)
1211 {
1212         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1213                                 pfid, lname, handle, ldata, first, 0);
1214 }
1215
1216 static inline int mdd_links_del(const struct lu_env *env,
1217                                 struct mdd_object *mdd_obj,
1218                                 const struct lu_fid *pfid,
1219                                 const struct lu_name *lname,
1220                                 struct thandle *handle)
1221 {
1222         return mdd_links_rename(env, mdd_obj, pfid, lname,
1223                                 NULL, NULL, handle, NULL, 0, 0);
1224 }
1225
1226 /** Read the link EA into a temp buffer.
1227  * Uses the name_buf since it is generally large.
1228  * \retval IS_ERR err
1229  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1230  */
1231 struct lu_buf *mdd_links_get(const struct lu_env *env,
1232                              struct mdd_object *mdd_obj)
1233 {
1234         struct linkea_data ldata = { NULL };
1235         int rc;
1236
1237         rc = mdd_links_read(env, mdd_obj, &ldata);
1238         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1239 }
1240
1241 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1242                     struct linkea_data *ldata, struct thandle *handle)
1243 {
1244         const struct lu_buf *buf;
1245         int                 rc;
1246
1247         if (ldata == NULL || ldata->ld_buf == NULL ||
1248             ldata->ld_leh == NULL)
1249                 return 0;
1250
1251         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1252                 return 0;
1253
1254 again:
1255         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1256                                 ldata->ld_leh->leh_len);
1257         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1258         if (unlikely(rc == -ENOSPC)) {
1259                 rc = linkea_overflow_shrink(ldata);
1260                 if (likely(rc > 0))
1261                         goto again;
1262         }
1263
1264         return rc;
1265 }
1266
1267 static int mdd_declare_links_add(const struct lu_env *env,
1268                                  struct mdd_object *mdd_obj,
1269                                  struct thandle *handle,
1270                                  struct linkea_data *ldata)
1271 {
1272         int     rc;
1273         int     ea_len;
1274         void    *linkea;
1275
1276         if (ldata != NULL && ldata->ld_leh != NULL) {
1277                 ea_len = ldata->ld_leh->leh_len;
1278                 linkea = ldata->ld_buf->lb_buf;
1279         } else {
1280                 ea_len = MAX_LINKEA_SIZE;
1281                 linkea = NULL;
1282         }
1283
1284         rc = mdo_declare_xattr_set(env, mdd_obj,
1285                                    mdd_buf_get_const(env, linkea, ea_len),
1286                                    XATTR_NAME_LINK, 0, handle);
1287
1288         return rc;
1289 }
1290
1291 static inline int mdd_declare_links_del(const struct lu_env *env,
1292                                         struct mdd_object *c,
1293                                         struct thandle *handle)
1294 {
1295         int rc = 0;
1296
1297         /* For directory, the linkEA will be removed together
1298          * with the object. */
1299         if (!S_ISDIR(mdd_object_type(c)))
1300                 rc = mdd_declare_links_add(env, c, handle, NULL);
1301
1302         return rc;
1303 }
1304
1305 static int mdd_declare_link(const struct lu_env *env,
1306                             struct mdd_device *mdd,
1307                             struct mdd_object *p,
1308                             struct mdd_object *c,
1309                             const struct lu_name *name,
1310                             struct thandle *handle,
1311                             struct lu_attr *la,
1312                             struct linkea_data *data)
1313 {
1314         struct lu_fid tfid = *mdo2fid(c);
1315         int rc;
1316
1317         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1318                 tfid.f_oid = cfs_fail_val;
1319
1320         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1321                                       name->ln_name, handle);
1322         if (rc != 0)
1323                 return rc;
1324
1325         rc = mdo_declare_ref_add(env, c, handle);
1326         if (rc != 0)
1327                 return rc;
1328
1329         la->la_valid = LA_CTIME | LA_MTIME;
1330         rc = mdo_declare_attr_set(env, p, la, handle);
1331         if (rc != 0)
1332                 return rc;
1333
1334         la->la_valid = LA_CTIME;
1335         rc = mdo_declare_attr_set(env, c, la, handle);
1336         if (rc != 0)
1337                 return rc;
1338
1339         rc = mdd_declare_links_add(env, c, handle, data);
1340         if (rc != 0)
1341                 return rc;
1342
1343         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1344                                          handle);
1345
1346         return rc;
1347 }
1348
1349 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1350                     struct md_object *src_obj, const struct lu_name *lname,
1351                     struct md_attr *ma)
1352 {
1353         const char *name = lname->ln_name;
1354         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1355         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1356         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1357         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1358         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1359         struct mdd_device *mdd = mdo2mdd(src_obj);
1360         struct thandle *handle;
1361         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1362         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1363         int rc;
1364         ENTRY;
1365
1366         rc = mdd_la_get(env, mdd_sobj, cattr);
1367         if (rc != 0)
1368                 RETURN(rc);
1369
1370         rc = mdd_la_get(env, mdd_tobj, tattr);
1371         if (rc != 0)
1372                 RETURN(rc);
1373
1374         /*
1375          * If we are using project inheritance, we only allow hard link
1376          * creation in our tree when the project IDs are the same;
1377          * otherwise the tree quota mechanism could be circumvented.
1378          */
1379         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1380             (tattr->la_projid != cattr->la_projid))
1381                 RETURN(-EXDEV);
1382
1383         handle = mdd_trans_create(env, mdd);
1384         if (IS_ERR(handle))
1385                 GOTO(out_pending, rc = PTR_ERR(handle));
1386
1387         memset(ldata, 0, sizeof(*ldata));
1388
1389         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1390         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1391
1392         /* Note: even this function will change ldata, but it comes from
1393          * thread_info, which is completely temporary and only seen in
1394          * this function, so we do not need reset ldata once it fails.*/
1395         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1396                                 lname, 0, 0, ldata);
1397         if (rc != 0)
1398                 GOTO(stop, rc);
1399
1400         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1401                               la, ldata);
1402         if (rc)
1403                 GOTO(stop, rc);
1404
1405         rc = mdd_trans_start(env, mdd, handle);
1406         if (rc)
1407                 GOTO(stop, rc);
1408
1409         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1410         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1411                                    cattr);
1412         if (rc)
1413                 GOTO(out_unlock, rc);
1414
1415         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1416                 rc = mdo_ref_add(env, mdd_sobj, handle);
1417                 if (rc != 0)
1418                         GOTO(out_unlock, rc);
1419         }
1420
1421         *tfid = *mdo2fid(mdd_sobj);
1422         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1423                 tfid->f_oid = cfs_fail_val;
1424
1425         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1426                                      mdd_object_type(mdd_sobj), name, handle);
1427         if (rc != 0) {
1428                 mdo_ref_del(env, mdd_sobj, handle);
1429                 GOTO(out_unlock, rc);
1430         }
1431
1432         la->la_valid = LA_CTIME | LA_MTIME;
1433         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1434         if (rc)
1435                 GOTO(out_unlock, rc);
1436
1437         la->la_valid = LA_CTIME;
1438         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1439         if (rc == 0)
1440                 /* Note: The failure of links_add should not cause the
1441                  * link failure, so do not check return value. */
1442                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1443                               lname, handle, ldata, 0);
1444
1445         EXIT;
1446 out_unlock:
1447         mdd_write_unlock(env, mdd_sobj);
1448         if (rc == 0)
1449                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1450                                             mdo2fid(mdd_tobj), NULL, NULL,
1451                                             lname, NULL, handle);
1452 stop:
1453         rc = mdd_trans_stop(env, mdd, rc, handle);
1454         if (is_vmalloc_addr(ldata->ld_buf))
1455                 /* if we vmalloced a large buffer drop it */
1456                 lu_buf_free(ldata->ld_buf);
1457 out_pending:
1458         return rc;
1459 }
1460
1461 static int mdd_mark_orphan_object(const struct lu_env *env,
1462                                 struct mdd_object *obj, struct thandle *handle,
1463                                 bool declare)
1464 {
1465         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1466         int rc;
1467
1468         if (!S_ISDIR(mdd_object_type(obj)))
1469                 return 0;
1470
1471         attr->la_valid = LA_FLAGS;
1472         attr->la_flags = LUSTRE_ORPHAN_FL;
1473
1474         if (declare)
1475                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1476         else
1477                 rc = mdo_attr_set(env, obj, attr, handle);
1478
1479         return rc;
1480 }
1481
1482 static int mdd_declare_finish_unlink(const struct lu_env *env,
1483                                      struct mdd_object *obj,
1484                                      struct thandle *handle)
1485 {
1486         int rc;
1487
1488         /* Sigh, we do not know if the unlink object will become orphan in
1489          * declare phase, but fortunately the flags here does not matter
1490          * in current declare implementation */
1491         rc = mdd_mark_orphan_object(env, obj, handle, true);
1492         if (rc != 0)
1493                 return rc;
1494
1495         rc = mdo_declare_destroy(env, obj, handle);
1496         if (rc != 0)
1497                 return rc;
1498
1499         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1500         if (rc != 0)
1501                 return rc;
1502
1503         return mdd_declare_links_del(env, obj, handle);
1504 }
1505
1506 /* caller should take a lock before calling */
1507 int mdd_finish_unlink(const struct lu_env *env,
1508                       struct mdd_object *obj, struct md_attr *ma,
1509                       const struct mdd_object *pobj,
1510                       const struct lu_name *lname,
1511                       struct thandle *th)
1512 {
1513         int rc = 0;
1514         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1515         ENTRY;
1516
1517         LASSERT(mdd_write_locked(env, obj) != 0);
1518
1519         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1520                 /* add new orphan and the object
1521                  * will be deleted during mdd_close() */
1522                 obj->mod_flags |= DEAD_OBJ;
1523                 if (obj->mod_count) {
1524                         rc = mdd_orphan_insert(env, obj, th);
1525                         if (rc == 0)
1526                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1527                                         "orphan list, open count = %d\n",
1528                                         PFID(mdd_object_fid(obj)),
1529                                         obj->mod_count);
1530                         else
1531                                 CERROR("Object "DFID" fail to be an orphan, "
1532                                        "open count = %d, maybe cause failed "
1533                                        "open replay\n",
1534                                         PFID(mdd_object_fid(obj)),
1535                                         obj->mod_count);
1536
1537                         /* mark object as an orphan here, not
1538                          * before mdd_orphan_insert() as racing
1539                          * mdd_la_get() may propagate ORPHAN_OBJ
1540                          * causing the asserition */
1541                         rc = mdd_mark_orphan_object(env, obj, th, false);
1542                 } else {
1543                         rc = mdo_destroy(env, obj, th);
1544                 }
1545         } else if (!is_dir) {
1546                 /* old files may not have link ea; ignore errors */
1547                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1548         }
1549
1550         RETURN(rc);
1551 }
1552
1553 /*
1554  * pobj maybe NULL
1555  * has mdd_write_lock on cobj already, but not on pobj yet
1556  */
1557 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1558                             const struct lu_attr *pattr,
1559                             struct mdd_object *cobj,
1560                             const struct lu_attr *cattr)
1561 {
1562         int rc;
1563         ENTRY;
1564
1565         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1566
1567         RETURN(rc);
1568 }
1569
1570 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1571                               struct mdd_object *p, struct mdd_object *c,
1572                               const struct lu_name *name, struct md_attr *ma,
1573                               struct thandle *handle, int no_name, int is_dir)
1574 {
1575         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1576         int              rc;
1577
1578         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1579                 if (likely(no_name == 0)) {
1580                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1581                                                       handle);
1582                         if (rc != 0)
1583                                 return rc;
1584                 }
1585
1586                 if (is_dir != 0) {
1587                         rc = mdo_declare_ref_del(env, p, handle);
1588                         if (rc != 0)
1589                                 return rc;
1590                 }
1591         }
1592
1593         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1594         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1595         la->la_valid = LA_CTIME | LA_MTIME;
1596         rc = mdo_declare_attr_set(env, p, la, handle);
1597         if (rc)
1598                 return rc;
1599
1600         if (c != NULL) {
1601                 rc = mdo_declare_ref_del(env, c, handle);
1602                 if (rc)
1603                         return rc;
1604
1605                 rc = mdo_declare_ref_del(env, c, handle);
1606                 if (rc)
1607                         return rc;
1608
1609                 la->la_valid = LA_CTIME;
1610                 rc = mdo_declare_attr_set(env, c, la, handle);
1611                 if (rc)
1612                         return rc;
1613
1614                 rc = mdd_declare_finish_unlink(env, c, handle);
1615                 if (rc)
1616                         return rc;
1617
1618                 /* FIXME: need changelog for remove entry */
1619                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1620                                                  NULL, handle);
1621         }
1622
1623         return rc;
1624 }
1625
1626 /*
1627  * test if a file has an HSM archive
1628  * if HSM attributes are not found in ma update them from
1629  * HSM xattr
1630  */
1631 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1632                                    struct mdd_object *obj,
1633                                    struct md_attr *ma)
1634 {
1635         ENTRY;
1636
1637         if (!(ma->ma_valid & MA_HSM)) {
1638                 /* no HSM MD provided, read xattr */
1639                 struct lu_buf   *hsm_buf;
1640                 const size_t     buflen = sizeof(struct hsm_attrs);
1641                 int              rc;
1642
1643                 hsm_buf = mdd_buf_get(env, NULL, 0);
1644                 lu_buf_alloc(hsm_buf, buflen);
1645                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1646                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1647                 lu_buf_free(hsm_buf);
1648                 if (rc < 0)
1649                         RETURN(false);
1650
1651                 ma->ma_valid |= MA_HSM;
1652         }
1653         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1654                 RETURN(true);
1655         RETURN(false);
1656 }
1657
1658 /**
1659  * Delete name entry and the object.
1660  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1661  * does not exist for this object, and it could only happen during resending
1662  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1663  * is also needed in this case(needed by changelog), so we have to add another
1664  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1665  * the ENOENT failure should be able to be fixed by redo mechanism.
1666  */
1667 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1668                       struct md_object *cobj, const struct lu_name *lname,
1669                       struct md_attr *ma, int no_name)
1670 {
1671         const char *name = lname->ln_name;
1672         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1673         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1674         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1675         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1676         struct mdd_object *mdd_cobj = NULL;
1677         struct mdd_device *mdd = mdo2mdd(pobj);
1678         struct thandle    *handle;
1679         int rc, is_dir = 0, cl_flags = 0;
1680         ENTRY;
1681
1682         /* cobj == NULL means only delete name entry */
1683         if (likely(cobj != NULL)) {
1684                 mdd_cobj = md2mdd_obj(cobj);
1685                 if (mdd_object_exists(mdd_cobj) == 0)
1686                         RETURN(-ENOENT);
1687         }
1688
1689         rc = mdd_la_get(env, mdd_pobj, pattr);
1690         if (rc)
1691                 RETURN(rc);
1692
1693         if (likely(mdd_cobj != NULL)) {
1694                 /* fetch cattr */
1695                 rc = mdd_la_get(env, mdd_cobj, cattr);
1696                 if (rc)
1697                         RETURN(rc);
1698
1699                 is_dir = S_ISDIR(cattr->la_mode);
1700                 /* search for an existing archive.
1701                  * we should check ahead as the object
1702                  * can be destroyed in this transaction */
1703                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1704                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1705         }
1706
1707         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1708         if (rc)
1709                 RETURN(rc);
1710
1711         handle = mdd_trans_create(env, mdd);
1712         if (IS_ERR(handle))
1713                 RETURN(PTR_ERR(handle));
1714
1715         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1716                                 lname, ma, handle, no_name, is_dir);
1717         if (rc)
1718                 GOTO(stop, rc);
1719
1720         rc = mdd_trans_start(env, mdd, handle);
1721         if (rc)
1722                 GOTO(stop, rc);
1723
1724         if (likely(mdd_cobj != NULL))
1725                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1726
1727         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1728                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1729                 if (rc)
1730                         GOTO(cleanup, rc);
1731         }
1732
1733         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1734             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1735                 GOTO(cleanup, rc = 0);
1736
1737         if (likely(mdd_cobj != NULL)) {
1738                 rc = mdo_ref_del(env, mdd_cobj, handle);
1739                 if (rc != 0) {
1740                         __mdd_index_insert_only(env, mdd_pobj,
1741                                                 mdo2fid(mdd_cobj),
1742                                                 mdd_object_type(mdd_cobj),
1743                                                 name, handle);
1744                         GOTO(cleanup, rc);
1745                 }
1746
1747                 if (is_dir)
1748                         /* unlink dot */
1749                         mdo_ref_del(env, mdd_cobj, handle);
1750
1751                 /* fetch updated nlink */
1752                 rc = mdd_la_get(env, mdd_cobj, cattr);
1753                 if (rc)
1754                         GOTO(cleanup, rc);
1755         }
1756
1757         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1758         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1759
1760         la->la_valid = LA_CTIME | LA_MTIME;
1761         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1762         if (rc)
1763                 GOTO(cleanup, rc);
1764
1765         /* Enough for only unlink the entry */
1766         if (unlikely(mdd_cobj == NULL))
1767                 GOTO(stop, rc);
1768
1769         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1770                 /* update ctime of an unlinked file only if it is still
1771                  * opened or a link still exists */
1772                 la->la_valid = LA_CTIME;
1773                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1774                 if (rc)
1775                         GOTO(cleanup, rc);
1776         }
1777
1778         /* XXX: this transfer to ma will be removed with LOD/OSP */
1779         ma->ma_attr = *cattr;
1780         ma->ma_valid |= MA_INODE;
1781         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1782         if (rc != 0)
1783                 GOTO(cleanup, rc);
1784
1785         /* fetch updated nlink */
1786         rc = mdd_la_get(env, mdd_cobj, cattr);
1787         /* if object is removed then we can't get its attrs,
1788          * use last get */
1789         if (rc == -ENOENT) {
1790                 cattr->la_nlink = 0;
1791                 rc = 0;
1792         }
1793
1794         if (cattr->la_nlink == 0) {
1795                 ma->ma_attr = *cattr;
1796                 ma->ma_valid |= MA_INODE;
1797         }
1798
1799         EXIT;
1800 cleanup:
1801         if (likely(mdd_cobj != NULL))
1802                 mdd_write_unlock(env, mdd_cobj);
1803
1804         if (rc == 0) {
1805                 if (cattr->la_nlink == 0)
1806                         cl_flags |= CLF_UNLINK_LAST;
1807                 else
1808                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1809
1810                 rc = mdd_changelog_ns_store(env, mdd,
1811                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1812                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1813                         handle);
1814         }
1815
1816 stop:
1817         rc = mdd_trans_stop(env, mdd, rc, handle);
1818
1819         return rc;
1820 }
1821
1822 /*
1823  * The permission has been checked when obj created, no need check again.
1824  */
1825 static int mdd_cd_sanity_check(const struct lu_env *env,
1826                                struct mdd_object *obj)
1827 {
1828         ENTRY;
1829
1830         /* EEXIST check */
1831         if (!obj || mdd_is_dead_obj(obj))
1832                 RETURN(-ENOENT);
1833
1834         RETURN(0);
1835 }
1836
1837 static int mdd_create_data(const struct lu_env *env,
1838                            struct md_object *pobj,
1839                            struct md_object *cobj,
1840                            const struct md_op_spec *spec,
1841                            struct md_attr *ma)
1842 {
1843         struct mdd_device *mdd = mdo2mdd(cobj);
1844         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1845         struct mdd_object *son = md2mdd_obj(cobj);
1846         struct thandle    *handle;
1847         const struct lu_buf *buf;
1848         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1849         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1850         int                rc;
1851         ENTRY;
1852
1853         rc = mdd_cd_sanity_check(env, son);
1854         if (rc)
1855                 RETURN(rc);
1856
1857         if (!md_should_create(spec->sp_cr_flags))
1858                 RETURN(0);
1859
1860         /*
1861          * there are following use cases for this function:
1862          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1863          *    striping can be specified or not
1864          * 2) CMD?
1865          */
1866         rc = mdd_la_get(env, son, attr);
1867         if (rc)
1868                 RETURN(rc);
1869
1870         /* calling ->ah_make_hint() is used to transfer information from parent */
1871         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1872
1873         handle = mdd_trans_create(env, mdd);
1874         if (IS_ERR(handle))
1875                 GOTO(out_free, rc = PTR_ERR(handle));
1876
1877         /*
1878          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1879          * Should this be fixed?
1880          */
1881         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1882                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1883                spec->sp_cr_flags, spec->no_create);
1884
1885         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1886                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1887                                         spec->u.sp_ea.eadatalen);
1888         } else {
1889                 buf = &LU_BUF_NULL;
1890         }
1891
1892         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1893                                   XATTR_NAME_LOV, 0, handle);
1894         if (rc)
1895                 GOTO(stop, rc);
1896
1897         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1898                                          handle);
1899         if (rc)
1900                 GOTO(stop, rc);
1901
1902         rc = mdd_trans_start(env, mdd, handle);
1903         if (rc)
1904                 GOTO(stop, rc);
1905
1906         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1907                           0, handle);
1908
1909         if (rc)
1910                 GOTO(stop, rc);
1911
1912         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1913
1914 stop:
1915         rc = mdd_trans_stop(env, mdd, rc, handle);
1916
1917 out_free:
1918         RETURN(rc);
1919 }
1920
1921 static int mdd_declare_object_initialize(const struct lu_env *env,
1922                                          struct mdd_object *parent,
1923                                          struct mdd_object *child,
1924                                          const struct lu_attr *attr,
1925                                          struct thandle *handle)
1926 {
1927         int rc;
1928         ENTRY;
1929
1930         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1931         if (!S_ISDIR(attr->la_mode))
1932                 RETURN(0);
1933
1934         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1935                                       dot, handle);
1936         if (rc != 0)
1937                 RETURN(rc);
1938
1939         rc = mdo_declare_ref_add(env, child, handle);
1940         if (rc != 0)
1941                 RETURN(rc);
1942
1943         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1944                                       dotdot, handle);
1945
1946         RETURN(rc);
1947 }
1948
1949 static int mdd_object_initialize(const struct lu_env *env,
1950                                  const struct lu_fid *pfid,
1951                                  struct mdd_object *child,
1952                                  struct lu_attr *attr, struct thandle *handle,
1953                                  const struct md_op_spec *spec)
1954 {
1955         int rc = 0;
1956         ENTRY;
1957
1958         if (S_ISDIR(attr->la_mode)) {
1959                 /* Add "." and ".." for newly created dir */
1960                 mdo_ref_add(env, child, handle);
1961                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1962                                              S_IFDIR, dot, handle);
1963                 if (rc == 0)
1964                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1965                                                      dotdot, handle);
1966                 if (rc != 0)
1967                         mdo_ref_del(env, child, handle);
1968         }
1969
1970         RETURN(rc);
1971 }
1972
1973 /**
1974  * This function checks whether it can create a file/dir under the
1975  * directory(@pobj). The directory(@pobj) is not being locked by
1976  * mdd lock.
1977  *
1978  * \param[in] env       execution environment
1979  * \param[in] pobj      the directory to create files
1980  * \param[in] pattr     the attributes of the directory
1981  * \param[in] lname     the name of the created file/dir
1982  * \param[in] cattr     the attributes of the file/dir
1983  * \param[in] spec      create specification
1984  *
1985  * \retval              = 0 it is allowed to create file/dir under
1986  *                      the directory
1987  * \retval              negative error not allowed to create file/dir
1988  *                      under the directory
1989  */
1990 static int mdd_create_sanity_check(const struct lu_env *env,
1991                                    struct md_object *pobj,
1992                                    const struct lu_attr *pattr,
1993                                    const struct lu_name *lname,
1994                                    struct lu_attr *cattr,
1995                                    struct md_op_spec *spec)
1996 {
1997         struct mdd_thread_info *info = mdd_env_info(env);
1998         struct lu_fid     *fid       = &info->mti_fid;
1999         struct mdd_object *obj       = md2mdd_obj(pobj);
2000         struct mdd_device *m         = mdo2mdd(pobj);
2001         bool            check_perm = true;
2002         int rc;
2003         ENTRY;
2004
2005         /* EEXIST check */
2006         if (mdd_is_dead_obj(obj))
2007                 RETURN(-ENOENT);
2008
2009         /*
2010          * In some cases this lookup is not needed - we know before if name
2011          * exists or not because MDT performs lookup for it.
2012          * name length check is done in lookup.
2013          */
2014         if (spec->sp_cr_lookup) {
2015                 /*
2016                  * Check if the name already exist, though it will be checked in
2017                  * _index_insert also, for avoiding rolling back if exists
2018                  * _index_insert.
2019                  */
2020                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2021                                   MAY_WRITE | MAY_EXEC);
2022                 if (rc != -ENOENT)
2023                         RETURN(rc ? : -EEXIST);
2024
2025                 /* Permission is already being checked in mdd_lookup */
2026                 check_perm = false;
2027         }
2028
2029         if (S_ISDIR(cattr->la_mode) &&
2030             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2031             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2032                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2033
2034                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2035                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2036                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2037                         rc = -EINVAL;
2038                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2039                                "stripe_offset = %d, stripe_count = %u: "
2040                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2041                                 le32_to_cpu(lum->lum_magic),
2042                                (int)le32_to_cpu(lum->lum_stripe_offset),
2043                                le32_to_cpu(lum->lum_stripe_count), rc);
2044                         return rc;
2045                 }
2046         }
2047
2048         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2049         if (rc != 0)
2050                 RETURN(rc);
2051
2052         /* sgid check */
2053         if (pattr->la_mode & S_ISGID) {
2054                 cattr->la_gid = pattr->la_gid;
2055                 if (S_ISDIR(cattr->la_mode)) {
2056                         cattr->la_mode |= S_ISGID;
2057                         cattr->la_valid |= LA_MODE;
2058                 }
2059         }
2060
2061         /* Inherit project ID from parent directory */
2062         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2063                 cattr->la_projid = pattr->la_projid;
2064                 if (S_ISDIR(cattr->la_mode)) {
2065                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2066                         cattr->la_valid |= LA_FLAGS;
2067                 }
2068                 cattr->la_valid |= LA_PROJID;
2069         }
2070
2071         rc = mdd_name_check(m, lname);
2072         if (rc < 0)
2073                 RETURN(rc);
2074
2075         switch (cattr->la_mode & S_IFMT) {
2076         case S_IFLNK: {
2077                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2078
2079                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2080                         RETURN(-ENAMETOOLONG);
2081                 else
2082                         RETURN(0);
2083         }
2084         case S_IFDIR:
2085         case S_IFREG:
2086         case S_IFCHR:
2087         case S_IFBLK:
2088         case S_IFIFO:
2089         case S_IFSOCK:
2090                 rc = 0;
2091                 break;
2092         default:
2093                 rc = -EINVAL;
2094                 break;
2095         }
2096         RETURN(rc);
2097 }
2098
2099 static int mdd_declare_create_object(const struct lu_env *env,
2100                                      struct mdd_device *mdd,
2101                                      struct mdd_object *p, struct mdd_object *c,
2102                                      struct lu_attr *attr,
2103                                      struct thandle *handle,
2104                                      const struct md_op_spec *spec,
2105                                      struct lu_buf *def_acl_buf,
2106                                      struct lu_buf *acl_buf,
2107                                      struct dt_allocation_hint *hint)
2108 {
2109         const struct lu_buf *buf;
2110         int rc;
2111
2112         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2113                                                 hint);
2114         if (rc)
2115                 GOTO(out, rc);
2116
2117 #ifdef CONFIG_FS_POSIX_ACL
2118         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2119                 /* if dir, then can inherit default ACl */
2120                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2121                                            XATTR_NAME_ACL_DEFAULT,
2122                                            0, handle);
2123                 if (rc)
2124                         GOTO(out, rc);
2125         }
2126
2127         if (acl_buf->lb_len > 0) {
2128                 rc = mdo_declare_attr_set(env, c, attr, handle);
2129                 if (rc)
2130                         GOTO(out, rc);
2131
2132                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2133                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2134                 if (rc)
2135                         GOTO(out, rc);
2136         }
2137 #endif
2138         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2139         if (rc)
2140                 GOTO(out, rc);
2141
2142         /* replay case, create LOV EA from client data */
2143         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2144             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2145                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2146                                         spec->u.sp_ea.eadatalen);
2147                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2148                                            handle);
2149                 if (rc)
2150                         GOTO(out, rc);
2151         }
2152
2153         if (S_ISLNK(attr->la_mode)) {
2154                 const char *target_name = spec->u.sp_symname;
2155                 int sym_len = strlen(target_name);
2156                 const struct lu_buf *buf;
2157
2158                 buf = mdd_buf_get_const(env, target_name, sym_len);
2159                 rc = dt_declare_record_write(env, mdd_object_child(c),
2160                                              buf, 0, handle);
2161                 if (rc)
2162                         GOTO(out, rc);
2163         }
2164
2165         if (spec->sp_cr_file_secctx_name != NULL) {
2166                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2167                                         spec->sp_cr_file_secctx_size);
2168                 rc = mdo_declare_xattr_set(env, c, buf,
2169                                            spec->sp_cr_file_secctx_name, 0,
2170                                            handle);
2171                 if (rc < 0)
2172                         GOTO(out, rc);
2173         }
2174 out:
2175         return rc;
2176 }
2177
2178 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2179                               struct mdd_object *p, struct mdd_object *c,
2180                               const struct lu_name *name,
2181                               struct lu_attr *attr,
2182                               struct thandle *handle,
2183                               const struct md_op_spec *spec,
2184                               struct linkea_data *ldata,
2185                               struct lu_buf *def_acl_buf,
2186                               struct lu_buf *acl_buf,
2187                               struct dt_allocation_hint *hint)
2188 {
2189         int rc;
2190
2191         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2192                                        def_acl_buf, acl_buf, hint);
2193         if (rc)
2194                 GOTO(out, rc);
2195
2196         if (S_ISDIR(attr->la_mode)) {
2197                 rc = mdo_declare_ref_add(env, p, handle);
2198                 if (rc)
2199                         GOTO(out, rc);
2200         }
2201
2202         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2203                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2204                 if (rc)
2205                         GOTO(out, rc);
2206         } else {
2207                 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
2208                 enum changelog_rec_type type;
2209
2210                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2211                                               name->ln_name, handle);
2212                 if (rc != 0)
2213                         return rc;
2214
2215                 rc = mdd_declare_links_add(env, c, handle, ldata);
2216                 if (rc)
2217                         return rc;
2218
2219                 *la = *attr;
2220                 la->la_valid = LA_CTIME | LA_MTIME;
2221                 rc = mdo_declare_attr_set(env, p, la, handle);
2222                 if (rc)
2223                         return rc;
2224
2225                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2226                        S_ISREG(attr->la_mode) ? CL_CREATE :
2227                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2228
2229                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2230                                                  handle);
2231                 if (rc)
2232                         return rc;
2233         }
2234 out:
2235         return rc;
2236 }
2237
2238 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2239                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2240                         struct lu_buf *acl_buf)
2241 {
2242         int     rc;
2243         ENTRY;
2244
2245         if (S_ISLNK(la->la_mode)) {
2246                 acl_buf->lb_len = 0;
2247                 def_acl_buf->lb_len = 0;
2248                 RETURN(0);
2249         }
2250
2251         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2252         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2253                            XATTR_NAME_ACL_DEFAULT);
2254         mdd_read_unlock(env, pobj);
2255         if (rc > 0) {
2256                 /* If there are default ACL, fix mode/ACL by default ACL */
2257                 def_acl_buf->lb_len = rc;
2258                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2259                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2260                 acl_buf->lb_len = rc;
2261                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2262                 if (rc < 0)
2263                         RETURN(rc);
2264         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2265                 /* If there are no default ACL, fix mode by mask */
2266                 struct lu_ucred *uc = lu_ucred(env);
2267
2268                 /* The create triggered by MDT internal events, such as
2269                  * LFSCK reset, will not contain valid "uc". */
2270                 if (unlikely(uc != NULL))
2271                         la->la_mode &= ~uc->uc_umask;
2272                 rc = 0;
2273                 acl_buf->lb_len = 0;
2274                 def_acl_buf->lb_len = 0;
2275         }
2276
2277         RETURN(rc);
2278 }
2279
2280 /**
2281  * Create a metadata object and initialize it, set acl, xattr.
2282  **/
2283 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2284                              struct mdd_object *son, struct lu_attr *attr,
2285                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2286                              struct lu_buf *def_acl_buf,
2287                              struct dt_allocation_hint *hint,
2288                              struct thandle *handle)
2289 {
2290         const struct lu_buf *buf;
2291         int rc;
2292
2293         mdd_write_lock(env, son, MOR_TGT_CHILD);
2294         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2295                                         hint);
2296         if (rc)
2297                 GOTO(unlock, rc);
2298
2299         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2300          * created in declare phase, they also needs to be added to master
2301          * object as sub-directory entry. So it has to initialize the master
2302          * object, then set dir striped EA.(in mdo_xattr_set) */
2303         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2304                                    spec);
2305         if (rc != 0)
2306                 GOTO(err_destroy, rc);
2307
2308         /*
2309          * in case of replay we just set LOVEA provided by the client
2310          * XXX: I think it would be interesting to try "old" way where
2311          *      MDT calls this xattr_set(LOV) in a different transaction.
2312          *      probably this way we code can be made better.
2313          */
2314
2315         /* During creation, there are only a few cases we need do xattr_set to
2316          * create stripes.
2317          * 1. regular file: see comments above.
2318          * 2. dir: inherit default striping or pool settings from parent.
2319          * 3. create striped directory with provided stripeEA.
2320          * 4. create striped directory because inherit default layout from the
2321          * parent.
2322          */
2323         if (spec->no_create ||
2324             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2325             S_ISDIR(attr->la_mode)) {
2326                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2327                                         spec->u.sp_ea.eadatalen);
2328                 rc = mdo_xattr_set(env, son, buf,
2329                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2330                                                             XATTR_NAME_LOV, 0,
2331                                    handle);
2332                 if (rc != 0)
2333                         GOTO(err_destroy, rc);
2334         }
2335
2336 #ifdef CONFIG_FS_POSIX_ACL
2337         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2338             S_ISDIR(attr->la_mode)) {
2339                 /* set default acl */
2340                 rc = mdo_xattr_set(env, son, def_acl_buf,
2341                                    XATTR_NAME_ACL_DEFAULT, 0,
2342                                    handle);
2343                 if (rc)
2344                         GOTO(err_destroy, rc);
2345         }
2346         /* set its own acl */
2347         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2348                 rc = mdo_xattr_set(env, son, acl_buf,
2349                                    XATTR_NAME_ACL_ACCESS,
2350                                    0, handle);
2351                 if (rc)
2352                         GOTO(err_destroy, rc);
2353         }
2354 #endif
2355
2356         if (S_ISLNK(attr->la_mode)) {
2357                 struct lu_ucred  *uc = lu_ucred_assert(env);
2358                 struct dt_object *dt = mdd_object_child(son);
2359                 const char *target_name = spec->u.sp_symname;
2360                 int sym_len = strlen(target_name);
2361                 loff_t pos = 0;
2362
2363                 buf = mdd_buf_get_const(env, target_name, sym_len);
2364                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2365                                                 uc->uc_cap &
2366                                                 CFS_CAP_SYS_RESOURCE_MASK);
2367
2368                 if (rc == sym_len)
2369                         rc = 0;
2370                 else
2371                         GOTO(err_initlized, rc = -EFAULT);
2372         }
2373
2374         if (spec->sp_cr_file_secctx_name != NULL) {
2375                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2376                                         spec->sp_cr_file_secctx_size);
2377                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2378                                    0, handle);
2379                 if (rc < 0)
2380                         GOTO(err_initlized, rc);
2381         }
2382
2383 err_initlized:
2384         if (unlikely(rc != 0)) {
2385                 int rc2;
2386                 if (S_ISDIR(attr->la_mode)) {
2387                         /* Drop the reference, no need to delete "."/"..",
2388                          * because the object to be destroied directly. */
2389                         rc2 = mdo_ref_del(env, son, handle);
2390                         if (rc2 != 0)
2391                                 GOTO(unlock, rc);
2392                 }
2393                 rc2 = mdo_ref_del(env, son, handle);
2394                 if (rc2 != 0)
2395                         GOTO(unlock, rc);
2396 err_destroy:
2397                 mdo_destroy(env, son, handle);
2398         }
2399 unlock:
2400         mdd_write_unlock(env, son);
2401         RETURN(rc);
2402 }
2403
2404 static int mdd_index_delete(const struct lu_env *env,
2405                             struct mdd_object *mdd_pobj,
2406                             struct lu_attr *cattr,
2407                             const struct lu_name *lname)
2408 {
2409         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2410         struct thandle *handle;
2411         int rc;
2412         ENTRY;
2413
2414         handle = mdd_trans_create(env, mdd);
2415         if (IS_ERR(handle))
2416                 RETURN(PTR_ERR(handle));
2417
2418         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2419                                       handle);
2420         if (rc != 0)
2421                 GOTO(stop, rc);
2422
2423         if (S_ISDIR(cattr->la_mode)) {
2424                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2425                 if (rc != 0)
2426                         GOTO(stop, rc);
2427         }
2428
2429         /* Since this will only be used in the error handler path,
2430          * Let's set the thandle to be local and not mess the transno */
2431         handle->th_local = 1;
2432         rc = mdd_trans_start(env, mdd, handle);
2433         if (rc)
2434                 GOTO(stop, rc);
2435
2436         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2437                                 S_ISDIR(cattr->la_mode), handle);
2438         if (rc)
2439                 GOTO(stop, rc);
2440 stop:
2441         rc = mdd_trans_stop(env, mdd, rc, handle);
2442
2443         RETURN(rc);
2444 }
2445
2446 /**
2447  * Create object and insert it into namespace.
2448  *
2449  * Two operations have to be performed:
2450  *
2451  *  - an allocation of a new object (->do_create()), and
2452  *  - an insertion into a parent index (->dio_insert()).
2453  *
2454  * Due to locking, operation order is not important, when both are
2455  * successful, *but* error handling cases are quite different:
2456  *
2457  *  - if insertion is done first, and following object creation fails,
2458  *  insertion has to be rolled back, but this operation might fail
2459  *  also leaving us with dangling index entry.
2460  *
2461  *  - if creation is done first, is has to be undone if insertion fails,
2462  *  leaving us with leaked space, which is not good but not fatal.
2463  *
2464  * It seems that creation-first is simplest solution, but it is sub-optimal
2465  * in the frequent
2466  *
2467  * $ mkdir foo
2468  * $ mkdir foo
2469  *
2470  * case, because second mkdir is bound to create object, only to
2471  * destroy it immediately.
2472  *
2473  * To avoid this follow local file systems that do double lookup:
2474  *
2475  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2476  * 1. create            (mdd_create_object_internal())
2477  * 2. insert            (__mdd_index_insert(), lookup again)
2478  *
2479  * \param[in] pobj      parent object
2480  * \param[in] lname     name of child being created
2481  * \param[in,out] child child object being created
2482  * \param[in] spec      additional create parameters
2483  * \param[in] ma        attributes for new child object
2484  *
2485  * \retval              0 on success
2486  * \retval              negative errno on failure
2487  */
2488 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2489                       const struct lu_name *lname, struct md_object *child,
2490                       struct md_op_spec *spec, struct md_attr *ma)
2491 {
2492         struct mdd_thread_info  *info = mdd_env_info(env);
2493         struct lu_attr          *la = &info->mti_la_for_fix;
2494         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2495         struct mdd_object       *son = md2mdd_obj(child);
2496         struct mdd_device       *mdd = mdo2mdd(pobj);
2497         struct lu_attr          *attr = &ma->ma_attr;
2498         struct thandle          *handle;
2499         struct lu_attr          *pattr = &info->mti_pattr;
2500         struct lu_buf           acl_buf;
2501         struct lu_buf           def_acl_buf;
2502         struct linkea_data      *ldata = &info->mti_link_data;
2503         const char              *name = lname->ln_name;
2504         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2505         int                      rc;
2506         int                      rc2;
2507         ENTRY;
2508
2509         rc = mdd_la_get(env, mdd_pobj, pattr);
2510         if (rc != 0)
2511                 RETURN(rc);
2512
2513         /* Sanity checks before big job. */
2514         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2515         if (rc)
2516                 RETURN(rc);
2517
2518         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2519                 GOTO(out_free, rc = -EINPROGRESS);
2520
2521         handle = mdd_trans_create(env, mdd);
2522         if (IS_ERR(handle))
2523                 GOTO(out_free, rc = PTR_ERR(handle));
2524
2525         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2526                                mdd->mdd_dt_conf.ddp_max_ea_size);
2527         acl_buf = info->mti_xattr_buf;
2528         def_acl_buf.lb_buf = info->mti_key;
2529         def_acl_buf.lb_len = sizeof(info->mti_key);
2530         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2531         if (rc < 0)
2532                 GOTO(out_stop, rc);
2533
2534         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2535
2536         memset(ldata, 0, sizeof(*ldata));
2537         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2538                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2539
2540                 tfid.f_oid--;
2541                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2542                                         &tfid, lname, 1, 0, ldata);
2543         } else {
2544                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2545                                         mdd_object_fid(mdd_pobj),
2546                                         lname, 1, 0, ldata);
2547         }
2548
2549         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2550                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2551                                 hint);
2552         if (rc)
2553                 GOTO(out_stop, rc);
2554
2555         rc = mdd_trans_start(env, mdd, handle);
2556         if (rc)
2557                 GOTO(out_stop, rc);
2558
2559         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2560                                &def_acl_buf, hint, handle);
2561         if (rc != 0)
2562                 GOTO(out_stop, rc);
2563
2564         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2565                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2566                 son->mod_flags |= VOLATILE_OBJ;
2567                 rc = mdd_orphan_insert(env, son, handle);
2568                 GOTO(out_volatile, rc);
2569         } else {
2570                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2571                                       attr->la_mode, name, handle);
2572                 if (rc != 0)
2573                         GOTO(err_created, rc);
2574
2575                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2576                               ldata, 1);
2577
2578                 /* update parent directory mtime/ctime */
2579                 *la = *attr;
2580                 la->la_valid = LA_CTIME | LA_MTIME;
2581                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2582                 if (rc)
2583                         GOTO(err_insert, rc);
2584         }
2585
2586         EXIT;
2587 err_insert:
2588         if (rc != 0) {
2589                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2590                         rc2 = mdd_orphan_delete(env, son, handle);
2591                 else
2592                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2593                                                  S_ISDIR(attr->la_mode),
2594                                                  handle);
2595                 if (rc2 != 0)
2596                         goto out_stop;
2597
2598 err_created:
2599                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2600                 if (S_ISDIR(attr->la_mode)) {
2601                         /* Drop the reference, no need to delete "."/"..",
2602                          * because the object is to be destroyed directly. */
2603                         rc2 = mdo_ref_del(env, son, handle);
2604                         if (rc2 != 0) {
2605                                 mdd_write_unlock(env, son);
2606                                 goto out_stop;
2607                         }
2608                 }
2609 out_volatile:
2610                 /* For volatile files drop one link immediately, since there is
2611                  * no filename in the namespace, and save any error returned. */
2612                 rc2 = mdo_ref_del(env, son, handle);
2613                 if (rc2 != 0) {
2614                         mdd_write_unlock(env, son);
2615                         if (unlikely(rc == 0))
2616                                 rc = rc2;
2617                         goto out_stop;
2618                 }
2619
2620                 /* Don't destroy the volatile object on success */
2621                 if (likely(rc != 0))
2622                         mdo_destroy(env, son, handle);
2623                 mdd_write_unlock(env, son);
2624         }
2625
2626         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2627             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2628                 rc = mdd_changelog_ns_store(env, mdd,
2629                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2630                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2631                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2632                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2633                                 NULL, handle);
2634 out_stop:
2635         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2636         if (rc == 0) {
2637                 /* If creation fails, it is most likely due to the remote update
2638                  * failure, because local transaction will mostly succeed at
2639                  * this stage. There is no easy way to rollback all of previous
2640                  * updates, so let's remove the object from namespace, and
2641                  * LFSCK should handle the orphan object. */
2642                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2643                         mdd_index_delete(env, mdd_pobj, attr, lname);
2644                 rc = rc2;
2645         }
2646 out_free:
2647         if (is_vmalloc_addr(ldata->ld_buf))
2648                 /* if we vmalloced a large buffer drop it */
2649                 lu_buf_free(ldata->ld_buf);
2650
2651         /* The child object shouldn't be cached anymore */
2652         if (rc)
2653                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2654                         &child->mo_lu.lo_header->loh_flags);
2655         return rc;
2656 }
2657
2658 /*
2659  * Get locks on parents in proper order
2660  * RETURN: < 0 - error, rename_order if successful
2661  */
2662 enum rename_order {
2663         MDD_RN_SAME,
2664         MDD_RN_SRCTGT,
2665         MDD_RN_TGTSRC
2666 };
2667
2668 static int mdd_rename_order(const struct lu_env *env,
2669                             struct mdd_device *mdd,
2670                             struct mdd_object *src_pobj,
2671                             const struct lu_attr *pattr,
2672                             struct mdd_object *tgt_pobj)
2673 {
2674         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2675         int rc;
2676         ENTRY;
2677
2678         if (src_pobj == tgt_pobj)
2679                 RETURN(MDD_RN_SAME);
2680
2681         /* compared the parent child relationship of src_p&tgt_p */
2682         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2683                 rc = MDD_RN_SRCTGT;
2684         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2685                 rc = MDD_RN_TGTSRC;
2686         } else {
2687                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2688                                    NULL);
2689                 if (rc == -EREMOTE)
2690                         rc = 0;
2691
2692                 if (rc == 1)
2693                         rc = MDD_RN_TGTSRC;
2694                 else if (rc == 0)
2695                         rc = MDD_RN_SRCTGT;
2696         }
2697
2698         RETURN(rc);
2699 }
2700
2701 /* has not mdd_write{read}_lock on any obj yet. */
2702 static int mdd_rename_sanity_check(const struct lu_env *env,
2703                                    struct mdd_object *src_pobj,
2704                                    const struct lu_attr *pattr,
2705                                    struct mdd_object *tgt_pobj,
2706                                    const struct lu_attr *tpattr,
2707                                    struct mdd_object *sobj,
2708                                    const struct lu_attr *cattr,
2709                                    struct mdd_object *tobj,
2710                                    const struct lu_attr *tattr)
2711 {
2712         int rc = 0;
2713         ENTRY;
2714
2715         /* XXX: when get here, sobj must NOT be NULL,
2716          * the other case has been processed in cld_rename
2717          * before mdd_rename and enable MDS_PERM_BYPASS. */
2718         LASSERT(sobj);
2719
2720         /*
2721          * If we are using project inheritance, we only allow renames
2722          * into our tree when the project IDs are the same; otherwise
2723          * tree quota mechanism would be circumvented.
2724          */
2725         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2726             tpattr->la_projid != cattr->la_projid) ||
2727             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2728             (pattr->la_projid != tpattr->la_projid)))
2729                 RETURN(-EXDEV);
2730
2731         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2732         if (rc)
2733                 RETURN(rc);
2734
2735         /* XXX: when get here, "tobj == NULL" means tobj must
2736          * NOT exist (neither on remote MDS, such case has been
2737          * processed in cld_rename before mdd_rename and enable
2738          * MDS_PERM_BYPASS).
2739          * So check may_create, but not check may_unlink. */
2740         if (tobj == NULL)
2741                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2742                                     (src_pobj != tgt_pobj));
2743         else
2744                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2745                                     (src_pobj != tgt_pobj), 1);
2746
2747         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2748                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2749
2750         RETURN(rc);
2751 }
2752
2753 static int mdd_declare_rename(const struct lu_env *env,
2754                               struct mdd_device *mdd,
2755                               struct mdd_object *mdd_spobj,
2756                               struct mdd_object *mdd_tpobj,
2757                               struct mdd_object *mdd_sobj,
2758                               struct mdd_object *mdd_tobj,
2759                               const struct lu_name *sname,
2760                               const struct lu_name *tname,
2761                               struct md_attr *ma,
2762                               struct linkea_data *ldata,
2763                               struct thandle *handle)
2764 {
2765         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2766         int rc;
2767
2768         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2769         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2770
2771         LASSERT(mdd_spobj);
2772         LASSERT(mdd_tpobj);
2773         LASSERT(mdd_sobj);
2774
2775         /* name from source dir */
2776         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2777         if (rc)
2778                 return rc;
2779
2780         /* .. from source child */
2781         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2782                 /* source child can be directory,
2783                  * counted by source dir's nlink */
2784                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2785                 if (rc)
2786                         return rc;
2787                 if (mdd_spobj != mdd_tpobj) {
2788                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2789                                                       handle);
2790                         if (rc != 0)
2791                                 return rc;
2792
2793                         rc = mdo_declare_index_insert(env, mdd_sobj,
2794                                                       mdo2fid(mdd_tpobj),
2795                                                       S_IFDIR, dotdot, handle);
2796                         if (rc != 0)
2797                                 return rc;
2798                 }
2799
2800                 /* new target child can be directory,
2801                  * counted by target dir's nlink */
2802                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2803                 if (rc != 0)
2804                         return rc;
2805         }
2806
2807         la->la_valid = LA_CTIME | LA_MTIME;
2808         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2809         if (rc != 0)
2810                 return rc;
2811
2812         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2813         if (rc != 0)
2814                 return rc;
2815
2816         la->la_valid = LA_CTIME;
2817         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2818         if (rc)
2819                 return rc;
2820
2821         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2822         if (rc)
2823                 return rc;
2824
2825         /* new name */
2826         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2827                                       mdd_object_type(mdd_sobj),
2828                                       tname->ln_name, handle);
2829         if (rc != 0)
2830                 return rc;
2831
2832         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2833                 /* delete target child in target parent directory */
2834                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2835                                               handle);
2836                 if (rc)
2837                         return rc;
2838
2839                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2840                 if (rc)
2841                         return rc;
2842
2843                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2844                         /* target child can be directory,
2845                          * delete "." reference in target child directory */
2846                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2847                         if (rc)
2848                                 return rc;
2849
2850                         /* delete ".." reference in target parent directory */
2851                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2852                         if (rc)
2853                                 return rc;
2854                 }
2855
2856                 la->la_valid = LA_CTIME;
2857                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2858                 if (rc)
2859                         return rc;
2860
2861                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2862                 if (rc)
2863                         return rc;
2864         }
2865
2866         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2867                                          handle);
2868         if (rc)
2869                 return rc;
2870
2871         return rc;
2872 }
2873
2874 /* src object can be remote that is why we use only fid and type of object */
2875 static int mdd_rename(const struct lu_env *env,
2876                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2877                       const struct lu_fid *lf, const struct lu_name *lsname,
2878                       struct md_object *tobj, const struct lu_name *ltname,
2879                       struct md_attr *ma)
2880 {
2881         const char *sname = lsname->ln_name;
2882         const char *tname = ltname->ln_name;
2883         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2884         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2885         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2886         struct mdd_device *mdd = mdo2mdd(src_pobj);
2887         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2888         struct mdd_object *mdd_tobj = NULL;
2889         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2890         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2891         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2892         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2893         struct thandle *handle;
2894         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2895         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2896         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2897         bool is_dir;
2898         bool tobj_ref = 0;
2899         bool tobj_locked = 0;
2900         unsigned cl_flags = 0;
2901         int rc, rc2;
2902         ENTRY;
2903
2904         if (tobj)
2905                 mdd_tobj = md2mdd_obj(tobj);
2906
2907         mdd_sobj = mdd_object_find(env, mdd, lf);
2908         if (IS_ERR(mdd_sobj))
2909                 RETURN(PTR_ERR(mdd_sobj));
2910
2911         rc = mdd_la_get(env, mdd_sobj, cattr);
2912         if (rc)
2913                 GOTO(out_pending, rc);
2914
2915         rc = mdd_la_get(env, mdd_spobj, pattr);
2916         if (rc)
2917                 GOTO(out_pending, rc);
2918
2919         if (mdd_tobj) {
2920                 rc = mdd_la_get(env, mdd_tobj, tattr);
2921                 if (rc)
2922                         GOTO(out_pending, rc);
2923                 /* search for an existing archive.
2924                  * we should check ahead as the object
2925                  * can be destroyed in this transaction */
2926                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2927                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2928         }
2929
2930         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2931         if (rc)
2932                 GOTO(out_pending, rc);
2933
2934         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2935                                      mdd_sobj, cattr, mdd_tobj, tattr);
2936         if (rc)
2937                 GOTO(out_pending, rc);
2938
2939         rc = mdd_name_check(mdd, ltname);
2940         if (rc < 0)
2941                 GOTO(out_pending, rc);
2942
2943         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2944         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2945         if (rc < 0)
2946                 GOTO(out_pending, rc);
2947
2948         handle = mdd_trans_create(env, mdd);
2949         if (IS_ERR(handle))
2950                 GOTO(out_pending, rc = PTR_ERR(handle));
2951
2952         memset(ldata, 0, sizeof(*ldata));
2953         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2954                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2955                                 1, 0, ldata);
2956         if (rc)
2957                 GOTO(stop, rc);
2958
2959         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2960                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2961         if (rc)
2962                 GOTO(stop, rc);
2963
2964         rc = mdd_trans_start(env, mdd, handle);
2965         if (rc)
2966                 GOTO(stop, rc);
2967
2968         is_dir = S_ISDIR(cattr->la_mode);
2969
2970         /* Remove source name from source directory */
2971         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2972         if (rc != 0)
2973                 GOTO(stop, rc);
2974
2975         /* "mv dir1 dir2" needs "dir1/.." link update */
2976         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2977                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2978                 if (rc != 0)
2979                         GOTO(fixup_spobj2, rc);
2980
2981                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2982                                              dotdot, handle);
2983                 if (rc != 0)
2984                         GOTO(fixup_spobj, rc);
2985         }
2986
2987         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2988                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2989                 if (rc != 0)
2990                         /* tname might been renamed to something else */
2991                         GOTO(fixup_spobj, rc);
2992         }
2993
2994         /* Insert new fid with target name into target dir */
2995         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2996                                 tname, handle);
2997         if (rc != 0)
2998                 GOTO(fixup_tpobj, rc);
2999
3000         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3001         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3002
3003         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3004         la->la_valid = LA_CTIME;
3005         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3006         if (rc)
3007                 GOTO(fixup_tpobj, rc);
3008
3009         /* Update the linkEA for the source object */
3010         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3011         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3012                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3013                               0, 0);
3014         if (rc == -ENOENT)
3015                 /* Old files might not have EA entry */
3016                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3017                               lsname, handle, NULL, 0);
3018         mdd_write_unlock(env, mdd_sobj);
3019         /* We don't fail the transaction if the link ea can't be
3020            updated -- fid2path will use alternate lookup method. */
3021         rc = 0;
3022
3023         /* Remove old target object
3024          * For tobj is remote case cmm layer has processed
3025          * and set tobj to NULL then. So when tobj is NOT NULL,
3026          * it must be local one.
3027          */
3028         if (tobj && mdd_object_exists(mdd_tobj)) {
3029                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3030                 tobj_locked = 1;
3031                 if (mdd_is_dead_obj(mdd_tobj)) {
3032                         /* shld not be dead, something is wrong */
3033                         CERROR("tobj is dead, something is wrong\n");
3034                         rc = -EINVAL;
3035                         goto cleanup;
3036                 }
3037                 mdo_ref_del(env, mdd_tobj, handle);
3038
3039                 /* Remove dot reference. */
3040                 if (S_ISDIR(tattr->la_mode))
3041                         mdo_ref_del(env, mdd_tobj, handle);
3042                 tobj_ref = 1;
3043
3044                 /* fetch updated nlink */
3045                 rc = mdd_la_get(env, mdd_tobj, tattr);
3046                 if (rc != 0) {
3047                         CERROR("%s: Failed to get nlink for tobj "
3048                                 DFID": rc = %d\n",
3049                                 mdd2obd_dev(mdd)->obd_name,
3050                                 PFID(tpobj_fid), rc);
3051                         GOTO(fixup_tpobj, rc);
3052                 }
3053
3054                 la->la_valid = LA_CTIME;
3055                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3056                 if (rc != 0) {
3057                         CERROR("%s: Failed to set ctime for tobj "
3058                                 DFID": rc = %d\n",
3059                                 mdd2obd_dev(mdd)->obd_name,
3060                                 PFID(tpobj_fid), rc);
3061                         GOTO(fixup_tpobj, rc);
3062                 }
3063
3064                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3065                 ma->ma_attr = *tattr;
3066                 ma->ma_valid |= MA_INODE;
3067                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3068                                        handle);
3069                 if (rc != 0) {
3070                         CERROR("%s: Failed to unlink tobj "
3071                                 DFID": rc = %d\n",
3072                                 mdd2obd_dev(mdd)->obd_name,
3073                                 PFID(tpobj_fid), rc);
3074                         GOTO(fixup_tpobj, rc);
3075                 }
3076
3077                 /* fetch updated nlink */
3078                 rc = mdd_la_get(env, mdd_tobj, tattr);
3079                 if (rc == -ENOENT) {
3080                         /* the object got removed, let's
3081                          * return the latest known attributes */
3082                         tattr->la_nlink = 0;
3083                         rc = 0;
3084                 } else if (rc != 0) {
3085                         CERROR("%s: Failed to get nlink for tobj "
3086                                 DFID": rc = %d\n",
3087                                 mdd2obd_dev(mdd)->obd_name,
3088                                 PFID(tpobj_fid), rc);
3089                         GOTO(fixup_tpobj, rc);
3090                 }
3091                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3092                 ma->ma_attr = *tattr;
3093                 ma->ma_valid |= MA_INODE;
3094
3095                 if (tattr->la_nlink == 0)
3096                         cl_flags |= CLF_RENAME_LAST;
3097                 else
3098                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3099         }
3100
3101         la->la_valid = LA_CTIME | LA_MTIME;
3102         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3103         if (rc)
3104                 GOTO(fixup_tpobj, rc);
3105
3106         if (mdd_spobj != mdd_tpobj) {
3107                 la->la_valid = LA_CTIME | LA_MTIME;
3108                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3109                 if (rc != 0)
3110                         GOTO(fixup_tpobj, rc);
3111         }
3112
3113         EXIT;
3114
3115 fixup_tpobj:
3116         if (rc) {
3117                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3118                 if (rc2)
3119                         CWARN("tp obj fix error %d\n",rc2);
3120
3121                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3122                     !mdd_is_dead_obj(mdd_tobj)) {
3123                         if (tobj_ref) {
3124                                 mdo_ref_add(env, mdd_tobj, handle);
3125                                 if (is_dir)
3126                                         mdo_ref_add(env, mdd_tobj, handle);
3127                         }
3128
3129                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3130                                                   mdo2fid(mdd_tobj),
3131                                                   mdd_object_type(mdd_tobj),
3132                                                   tname, handle);
3133                         if (rc2 != 0)
3134                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3135                 }
3136         }
3137
3138 fixup_spobj:
3139         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3140                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3141                 if (rc2)
3142                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3143                                mdd2obd_dev(mdd)->obd_name, rc2);
3144
3145
3146                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3147                                               dotdot, handle);
3148                 if (rc2 != 0)
3149                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3150                               mdd2obd_dev(mdd)->obd_name, rc2);
3151         }
3152
3153 fixup_spobj2:
3154         if (rc != 0) {
3155                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3156                                          mdd_object_type(mdd_sobj), sname,
3157                                          handle);
3158                 if (rc2 != 0)
3159                         CWARN("sp obj fix error: rc = %d\n", rc2);
3160         }
3161
3162 cleanup:
3163         if (tobj_locked)
3164                 mdd_write_unlock(env, mdd_tobj);
3165
3166         if (rc == 0)
3167                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3168                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3169                                             ltname, lsname, handle);
3170
3171 stop:
3172         rc = mdd_trans_stop(env, mdd, rc, handle);
3173
3174 out_pending:
3175         mdd_object_put(env, mdd_sobj);
3176         return rc;
3177 }
3178
3179 /**
3180  * During migration once the parent FID has been changed,
3181  * we need update the parent FID in linkea.
3182  **/
3183 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3184                                             struct mdd_object *parent,
3185                                             struct mdd_object *newparent,
3186                                             struct mdd_object *child,
3187                                             const char *name, int namelen,
3188                                             struct thandle *handle,
3189                                             bool declare)
3190 {
3191         struct mdd_thread_info  *info = mdd_env_info(env);
3192         struct linkea_data      ldata = { NULL };
3193         struct lu_buf           *buf = &info->mti_link_buf;
3194         int                     count;
3195         int                     rc = 0;
3196
3197         ENTRY;
3198
3199         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3200         if (buf->lb_buf == NULL)
3201                 RETURN(-ENOMEM);
3202
3203         ldata.ld_buf = buf;
3204         rc = mdd_links_read(env, child, &ldata);
3205         if (rc != 0) {
3206                 if (rc == -ENOENT || rc == -ENODATA)
3207                         rc = 0;
3208                 RETURN(rc);
3209         }
3210
3211         LASSERT(ldata.ld_leh != NULL);
3212         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3213         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3214                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3215                 struct lu_name lname;
3216                 struct lu_fid  fid;
3217
3218                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3219                                     &lname, &fid);
3220
3221                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3222                     !lu_fid_eq(&fid, mdd_object_fid(parent))) {
3223                         ldata.ld_lee = (struct link_ea_entry *)
3224                                        ((char *)ldata.ld_lee +
3225                                         ldata.ld_reclen);
3226                         continue;
3227                 }
3228
3229                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3230                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3231                        lname.ln_namelen, lname.ln_name,
3232                        PFID(mdd_object_fid(newparent)));
3233                 /* update to the new parent fid */
3234                 linkea_entry_pack(ldata.ld_lee, &lname,
3235                                   mdd_object_fid(newparent));
3236                 if (declare)
3237                         rc = mdd_declare_links_add(env, child, handle, &ldata);
3238                 else
3239                         rc = mdd_links_write(env, child, &ldata, handle);
3240                 break;
3241         }
3242         RETURN(rc);
3243 }
3244
3245 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3246                                            struct mdd_object *parent,
3247                                            struct mdd_object *newparent,
3248                                            struct mdd_object *child,
3249                                            const char *name, int namelen,
3250                                            struct thandle *handle)
3251 {
3252         return mdd_linkea_update_child_internal(env, parent, newparent,
3253                                                 child, name,
3254                                                 namelen, handle, true);
3255 }
3256
3257 static int mdd_linkea_update_child(const struct lu_env *env,
3258                                    struct mdd_object *parent,
3259                                    struct mdd_object *newparent,
3260                                    struct mdd_object *child,
3261                                    const char *name, int namelen,
3262                                    struct thandle *handle)
3263 {
3264         return mdd_linkea_update_child_internal(env, parent, newparent,
3265                                                 child, name,
3266                                                 namelen, handle, false);
3267 }
3268
3269 static int mdd_update_linkea_internal(const struct lu_env *env,
3270                                       struct mdd_object *mdd_pobj,
3271                                       struct mdd_object *mdd_sobj,
3272                                       struct mdd_object *mdd_tobj,
3273                                       const struct lu_name *child_name,
3274                                       struct linkea_data *ldata,
3275                                       struct thandle *handle,
3276                                       int declare)
3277 {
3278         struct mdd_thread_info  *info = mdd_env_info(env);
3279         int                     count;
3280         int                     rc = 0;
3281         ENTRY;
3282
3283         LASSERT(ldata->ld_buf != NULL);
3284         LASSERT(ldata->ld_leh != NULL);
3285
3286         /* If it is mulitple links file, we need update the name entry for
3287          * all parent */
3288         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3289         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3290                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3291                 struct mdd_object       *pobj;
3292                 struct lu_name          lname;
3293                 struct lu_fid           fid;
3294
3295                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3296                                     &lname, &fid);
3297                 pobj = mdd_object_find(env, mdd, &fid);
3298                 if (IS_ERR(pobj)) {
3299                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3300                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3301                               PTR_ERR(pobj));
3302                         continue;
3303                 }
3304
3305                 if (!mdd_object_exists(pobj)) {
3306                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3307                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3308                         goto next_put;
3309                 }
3310
3311                 if (pobj == mdd_pobj &&
3312                     lname.ln_namelen == child_name->ln_namelen &&
3313                     strncmp(lname.ln_name, child_name->ln_name,
3314                             lname.ln_namelen) == 0) {
3315                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3316                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3317                               PFID(&fid));
3318                         goto next_put;
3319                 }
3320
3321                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3322                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3323                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3324
3325                 if (declare) {
3326                         /* Remove source name from source directory */
3327                         /* Insert new fid with target name into target dir */
3328                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3329                                                       handle);
3330                         if (rc != 0)
3331                                 GOTO(next_put, rc);
3332
3333                         rc = mdo_declare_index_insert(env, pobj,
3334                                         mdd_object_fid(mdd_tobj),
3335                                         mdd_object_type(mdd_tobj),
3336                                         lname.ln_name, handle);
3337                         if (rc != 0)
3338                                 GOTO(next_put, rc);
3339
3340                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3341                         if (rc)
3342                                 GOTO(next_put, rc);
3343
3344                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3345                         if (rc)
3346                                 GOTO(next_put, rc);
3347                 } else {
3348                         char *tmp_name = info->mti_key;
3349
3350                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3351                                 /* lnamelen is too big(> NAME_MAX + 16),
3352                                  * something wrong about this linkea, let's
3353                                  * skip it */
3354                                 CWARN("%s: the name %.*s is too long under "
3355                                       DFID"\n", mdd2obd_dev(mdd)->obd_name,
3356                                       lname.ln_namelen, lname.ln_name,
3357                                       PFID(&fid));
3358                                 goto next_put;
3359                         }
3360
3361                         /* Note: lname might be without \0 at the end, see
3362                          * linkea_entry_unpack(), let's add extra \0 by
3363                          * snprintf */
3364                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3365                                  lname.ln_namelen, lname.ln_name);
3366                         lname.ln_name = tmp_name;
3367
3368                         /* Let's check if this linkEA still valid, before
3369                          * it might be packed into the RPC buffer. */
3370                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3371                                         &info->mti_fid, NULL);
3372                         if (rc < 0 || !lu_fid_eq(&info->mti_fid,
3373                                                  mdd_object_fid(mdd_sobj)))
3374                                 GOTO(next_put, rc == -ENOENT ? 0 : rc);
3375
3376                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3377                         if (rc != 0)
3378                                 GOTO(next_put, rc);
3379
3380                         rc = __mdd_index_insert(env, pobj,
3381                                         mdd_object_fid(mdd_tobj),
3382                                         mdd_object_type(mdd_tobj),
3383                                         tmp_name, handle);
3384                         if (rc != 0)
3385                                 GOTO(next_put, rc);
3386
3387                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3388                         rc = mdo_ref_add(env, mdd_tobj, handle);
3389                         mdd_write_unlock(env, mdd_tobj);
3390                         if (rc)
3391                                 GOTO(next_put, rc);
3392
3393                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3394                         mdo_ref_del(env, mdd_sobj, handle);
3395                         mdd_write_unlock(env, mdd_sobj);
3396                 }
3397 next_put:
3398                 mdd_object_put(env, pobj);
3399                 if (rc != 0)
3400                         break;
3401
3402                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3403                                                          ldata->ld_reclen);
3404         }
3405
3406         RETURN(rc);
3407 }
3408
3409 static int mdd_migrate_xattrs(const struct lu_env *env,
3410                               struct mdd_object *mdd_sobj,
3411                               struct mdd_object *mdd_tobj)
3412 {
3413         struct mdd_thread_info  *info = mdd_env_info(env);
3414         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3415         char                    *xname;
3416         struct thandle          *handle;
3417         struct lu_buf           xbuf;
3418         int                     xlen;
3419         int                     rem;
3420         int                     xsize;
3421         int                     list_xsize;
3422         struct lu_buf           list_xbuf;
3423         int                     rc;
3424
3425         /* retrieve xattr list from the old object */
3426         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3427         if (list_xsize == -ENODATA)
3428                 return 0;
3429
3430         if (list_xsize < 0)
3431                 return list_xsize;
3432
3433         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3434         if (info->mti_big_buf.lb_buf == NULL)
3435                 return -ENOMEM;
3436
3437         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3438         list_xbuf.lb_len = list_xsize;
3439         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3440         if (rc < 0)
3441                 return rc;
3442         rc = 0;
3443         rem = list_xsize;
3444         xname = list_xbuf.lb_buf;
3445         while (rem > 0) {
3446                 xlen = strnlen(xname, rem - 1) + 1;
3447                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3448                     strcmp(XATTR_NAME_LMV, xname) == 0)
3449                         goto next;
3450
3451                 /* For directory, if there are default layout, migrate here */
3452                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3453                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3454                         goto next;
3455
3456                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3457                 if (xsize == -ENODATA)
3458                         goto next;
3459                 if (xsize < 0)
3460                         GOTO(out, rc);
3461
3462                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3463                 if (info->mti_link_buf.lb_buf == NULL)
3464                         GOTO(out, rc = -ENOMEM);
3465
3466                 xbuf.lb_len = xsize;
3467                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3468                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3469                 if (rc == -ENODATA)
3470                         goto next;
3471                 if (rc < 0)
3472                         GOTO(out, rc);
3473
3474                 handle = mdd_trans_create(env, mdd);
3475                 if (IS_ERR(handle))
3476                         GOTO(out, rc = PTR_ERR(handle));
3477
3478                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3479                                            handle);
3480                 if (rc != 0)
3481                         GOTO(stop_trans, rc);
3482                 /* Note: this transaction is part of migration, and it is not
3483                  * the last step of migration, so we set th_local = 1 to avoid
3484                  * update last rcvd for this transaction */
3485                 handle->th_local = 1;
3486                 rc = mdd_trans_start(env, mdd, handle);
3487                 if (rc != 0)
3488                         GOTO(stop_trans, rc);
3489
3490 again:
3491                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3492                 if (rc == -EEXIST)
3493                         GOTO(stop_trans, rc = 0);
3494
3495                 if (unlikely(rc == -ENOSPC &&
3496                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3497                         rc = linkea_overflow_shrink(
3498                                         (struct linkea_data *)(xbuf.lb_buf));
3499                         if (likely(rc > 0)) {
3500                                 xbuf.lb_len = rc;
3501                                 goto again;
3502                         }
3503                 }
3504
3505                 if (rc != 0)
3506                         GOTO(stop_trans, rc);
3507 stop_trans:
3508                 rc = mdd_trans_stop(env, mdd, rc, handle);
3509                 if (rc != 0)
3510                         GOTO(out, rc);
3511 next:
3512                 rem -= xlen;
3513                 memmove(xname, xname + xlen, rem);
3514         }
3515 out:
3516         return rc;
3517 }
3518
3519 static int mdd_declare_migrate_create(const struct lu_env *env,
3520                                       struct mdd_object *mdd_pobj,
3521                                       struct mdd_object *mdd_sobj,
3522                                       struct mdd_object *mdd_tobj,
3523                                       struct md_op_spec *spec,
3524                                       struct lu_attr *la,
3525                                       union lmv_mds_md *mgr_ea,
3526                                       struct linkea_data *ldata,
3527                                       struct thandle *handle)
3528 {
3529         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3530         const struct lu_buf     *buf;
3531         int                     rc;
3532         int                     mgr_easize;
3533
3534         rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
3535                                                 handle, spec, NULL);
3536         if (rc != 0)
3537                 return rc;
3538
3539         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3540                                            handle);
3541         if (rc != 0)
3542                 return rc;
3543
3544         if (S_ISLNK(la->la_mode)) {
3545                 const char *target_name = spec->u.sp_symname;
3546                 int sym_len = strlen(target_name);
3547                 const struct lu_buf *buf;
3548
3549                 buf = mdd_buf_get_const(env, target_name, sym_len);
3550                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3551                                              buf, 0, handle);
3552                 if (rc != 0)
3553                         return rc;
3554         } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
3555                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
3556                 if (rc != 0)
3557                         return rc;
3558         }
3559
3560         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3561                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3562                                         spec->u.sp_ea.eadatalen);
3563                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3564                                            0, handle);
3565                 if (rc)
3566                         return rc;
3567         }
3568
3569         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3570         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3571         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3572                                    0, handle);
3573         if (rc)
3574                 return rc;
3575
3576         la_flag->la_valid = LA_FLAGS;
3577         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3578         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3579
3580         return rc;
3581 }
3582
3583 static int mdd_migrate_create(const struct lu_env *env,
3584                               struct mdd_object *mdd_pobj,
3585                               struct mdd_object *mdd_sobj,
3586                               struct mdd_object *mdd_tobj,
3587                               const struct lu_name *lname,
3588                               struct lu_attr *la)
3589 {
3590         struct mdd_thread_info  *info = mdd_env_info(env);
3591         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3592         struct md_op_spec       *spec = &info->mti_spec;
3593         struct lu_buf           lmm_buf = { NULL };
3594         struct lu_buf           link_buf = { NULL };
3595         struct lu_buf            mgr_buf;
3596         struct thandle          *handle;
3597         struct lmv_mds_md_v1    *mgr_ea;
3598         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3599         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3600         int                     mgr_easize;
3601         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3602         int                     rc;
3603         ENTRY;
3604
3605         /* prepare spec for create */
3606         memset(spec, 0, sizeof(*spec));
3607         spec->sp_cr_lookup = 0;
3608         spec->sp_feat = &dt_directory_features;
3609         if (S_ISLNK(la->la_mode)) {
3610                 const struct lu_buf *buf;
3611
3612                 buf = lu_buf_check_and_alloc(
3613                                 &mdd_env_info(env)->mti_big_buf,
3614                                 la->la_size + 1);
3615                 link_buf = *buf;
3616                 link_buf.lb_len = la->la_size + 1;
3617                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3618                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3619                 if (rc <= 0) {
3620                         rc = rc != 0 ? rc : -EFAULT;
3621                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3622                                mdd2obd_dev(mdd)->obd_name,
3623                                PFID(mdd_object_fid(mdd_sobj)), rc);
3624                         RETURN(rc);
3625                 }
3626                 spec->u.sp_symname = link_buf.lb_buf;
3627         } else if (S_ISREG(la->la_mode)) {
3628                 /* retrieve lov of the old object */
3629                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3630                 if (rc != 0 && rc != -ENODATA)
3631                         RETURN(rc);
3632                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3633                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3634                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3635                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3636                 }
3637         } else if (S_ISDIR(la->la_mode)) {
3638                 rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
3639                 if (rc == -ENODATA) {
3640                         /* ignore the non-linkEA error */
3641                         ldata = NULL;
3642                         rc = 0;
3643                 }
3644                 if (rc < 0)
3645                         RETURN(rc);
3646         }
3647
3648         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3649         lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
3650         mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
3651         mgr_buf.lb_len = mgr_easize;
3652         mgr_ea = mgr_buf.lb_buf;
3653         memset(mgr_ea, 0, sizeof(*mgr_ea));
3654         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3655         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3656         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3657         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3658         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3659         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3660
3661         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3662
3663         handle = mdd_trans_create(env, mdd);
3664         if (IS_ERR(handle))
3665                 GOTO(out_free, rc = PTR_ERR(handle));
3666
3667         /* Note: this transaction is part of migration, and it is not
3668          * the last step of migration, so we set th_local = 1 to avoid
3669          * update last rcvd for this transaction */
3670         handle->th_local = 1;
3671         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
3672                                         la, mgr_buf.lb_buf, ldata, handle);
3673         if (rc != 0)
3674                 GOTO(stop_trans, rc);
3675
3676         rc = mdd_trans_start(env, mdd, handle);
3677         if (rc != 0)
3678                 GOTO(stop_trans, rc);
3679
3680         /* don't set nlink from the original object */
3681         la->la_valid &= ~LA_NLINK;
3682
3683         /* create the target object */
3684         rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3685                                hint, handle);
3686         if (rc != 0)
3687                 GOTO(stop_trans, rc);
3688
3689         if (S_ISDIR(la->la_mode) && ldata != NULL) {
3690                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3691                 if (rc != 0)
3692                         GOTO(stop_trans, rc);
3693         }
3694
3695         /* Set MIGRATE EA on the source inode, so once the migration needs
3696          * to be re-done during failover, the re-do process can locate the
3697          * target object which is already being created. */
3698         rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
3699         if (rc != 0)
3700                 GOTO(stop_trans, rc);
3701
3702         /* Set immutable flag, so any modification is disabled until
3703          * the migration is done. Once the migration is interrupted,
3704          * if the resume process find the migrating object has both
3705          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3706          * flag and approve the migration */
3707         la_flag->la_valid = LA_FLAGS;
3708         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3709         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3710 stop_trans:
3711         if (handle != NULL)
3712                 rc = mdd_trans_stop(env, mdd, rc, handle);
3713 out_free:
3714         if (lmm_buf.lb_buf != NULL)
3715                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3716         RETURN(rc);
3717 }
3718
3719 static int mdd_migrate_entries(const struct lu_env *env,
3720                                struct mdd_object *mdd_sobj,
3721                                struct mdd_object *mdd_tobj)
3722 {
3723         struct dt_object        *next = mdd_object_child(mdd_sobj);
3724         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3725         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3726         struct thandle          *handle;
3727         struct dt_it            *it;
3728         const struct dt_it_ops  *iops;
3729         int                      result;
3730         struct lu_dirent        *ent;
3731         int                      rc;
3732         ENTRY;
3733
3734         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3735         if (ent == NULL)
3736                 RETURN(-ENOMEM);
3737
3738         if (!dt_try_as_dir(env, next))
3739                 GOTO(out_ent, rc = -ENOTDIR);
3740         /*
3741          * iterate directories
3742          */
3743         iops = &next->do_index_ops->dio_it;
3744         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3745         if (IS_ERR(it))
3746                 GOTO(out_ent, rc = PTR_ERR(it));
3747
3748         rc = iops->load(env, it, 0);
3749         if (rc == 0)
3750                 rc = iops->next(env, it);
3751         else if (rc > 0)
3752                 rc = 0;
3753         /*
3754          * At this point and across for-loop:
3755          *
3756          *  rc == 0 -> ok, proceed.
3757          *  rc >  0 -> end of directory.
3758          *  rc <  0 -> error.
3759          */
3760         do {
3761                 struct mdd_object       *child;
3762                 char                    *name = mdd_env_info(env)->mti_key;
3763                 int                     len;
3764                 int                     is_dir;
3765                 bool                    target_exist = false;
3766
3767                 len = iops->key_size(env, it);
3768                 if (len == 0)
3769                         goto next;
3770
3771                 result = iops->rec(env, it, (struct dt_rec *)ent,
3772                                    LUDA_FID | LUDA_TYPE);
3773                 if (result == -ESTALE)
3774                         goto next;
3775                 if (result != 0) {
3776                         rc = result;
3777                         goto out;
3778                 }
3779
3780                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3781
3782                 /* Insert new fid with target name into target dir */
3783                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3784                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3785                      ent->lde_name[1] == '.'))
3786                         goto next;
3787
3788                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3789                 if (IS_ERR(child))
3790                         GOTO(out, rc = PTR_ERR(child));
3791
3792                 /* child may not exist, but lu_object_attr will assert this,
3793                  * get type from loh_attr directly */
3794                 is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
3795
3796                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3797
3798                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3799
3800                 /* Check whether the name has been inserted to the target */
3801                 if (dt_try_as_dir(env, dt_tobj)) {
3802                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3803
3804                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3805                                        (struct dt_key *)name);
3806                         if (unlikely(rc == 0))
3807                                 target_exist = true;
3808                 }
3809
3810                 handle = mdd_trans_create(env, mdd);
3811                 if (IS_ERR(handle))
3812                         GOTO(out_put, rc = PTR_ERR(handle));
3813
3814                 /* Note: this transaction is part of migration, and it is not
3815                  * the last step of migration, so we set th_local = 1 to avoid
3816                  * updating last rcvd for this transaction */
3817                 handle->th_local = 1;
3818                 if (likely(!target_exist)) {
3819                         rc = mdo_declare_index_insert(env, mdd_tobj,
3820                                 &ent->lde_fid,
3821                                 child->mod_obj.mo_lu.lo_header->loh_attr,
3822                                 name, handle);
3823                         if (rc != 0)
3824                                 GOTO(out_put, rc);
3825
3826                         if (is_dir) {
3827                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3828                                 if (rc != 0)
3829                                         GOTO(out_put, rc);
3830                         }
3831                 }
3832
3833                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3834                 if (rc != 0)
3835                         GOTO(out_put, rc);
3836
3837                 if (is_dir) {
3838                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3839                         if (rc != 0)
3840                                 GOTO(out_put, rc);
3841
3842                         /* Update .. for child */
3843                         rc = mdo_declare_index_delete(env, child, dotdot,
3844                                                       handle);
3845                         if (rc != 0)
3846                                 GOTO(out_put, rc);
3847
3848                         rc = mdo_declare_index_insert(env, child,
3849                                                       mdd_object_fid(mdd_tobj),
3850                                                       S_IFDIR, dotdot, handle);
3851                         if (rc != 0)
3852                                 GOTO(out_put, rc);
3853                 }
3854
3855                 rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
3856                                                      child, name,
3857                                                      strlen(name),
3858                                                      handle);
3859                 if (rc != 0)
3860                         GOTO(out_put, rc);
3861
3862                 rc = mdd_trans_start(env, mdd, handle);
3863                 if (rc != 0) {
3864                         CERROR("%s: transaction start failed: rc = %d\n",
3865                                mdd2obd_dev(mdd)->obd_name, rc);
3866                         GOTO(out_put, rc);
3867                 }
3868
3869                 if (likely(!target_exist)) {
3870                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3871                                 child->mod_obj.mo_lu.lo_header->loh_attr, name,
3872                                 handle);
3873                         if (rc != 0)
3874                                 GOTO(out_put, rc);
3875                 }
3876
3877                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3878                 if (rc != 0)
3879                         GOTO(out_put, rc);
3880
3881                 if (is_dir) {
3882                         rc = __mdd_index_delete_only(env, child, dotdot,
3883                                                      handle);
3884                         if (rc != 0)
3885                                 GOTO(out_put, rc);
3886
3887                         rc = __mdd_index_insert_only(env, child,
3888                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3889                                          dotdot, handle);
3890                         if (rc != 0)
3891                                 GOTO(out_put, rc);
3892                 }
3893
3894                 rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
3895                                              child, name,
3896                                              strlen(name), handle);
3897
3898 out_put:
3899                 mdd_write_unlock(env, child);
3900                 mdd_object_put(env, child);
3901                 rc = mdd_trans_stop(env, mdd, rc, handle);
3902                 if (rc != 0)
3903                         GOTO(out, rc);
3904 next:
3905                 result = iops->next(env, it);
3906                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3907                         GOTO(out, rc = -EINTR);
3908
3909                 if (result == -ESTALE)
3910                         goto next;
3911         } while (result == 0);
3912 out:
3913         iops->put(env, it);
3914         iops->fini(env, it);
3915 out_ent:
3916         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3917         RETURN(rc);
3918 }
3919
3920 static int mdd_declare_update_linkea(const struct lu_env *env,
3921                                      struct mdd_object *mdd_pobj,
3922                                      struct mdd_object *mdd_sobj,
3923                                      struct mdd_object *mdd_tobj,
3924                                      const struct lu_name *child_name,
3925                                      struct linkea_data *ldata,
3926                                      struct thandle *handle)
3927 {
3928         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3929                                           child_name, ldata, handle, 1);
3930 }
3931
3932 static int mdd_update_linkea(const struct lu_env *env,
3933                              struct mdd_object *mdd_pobj,
3934                              struct mdd_object *mdd_sobj,
3935                              struct mdd_object *mdd_tobj,
3936                              const struct lu_name *child_name,
3937                              struct linkea_data *ldata,
3938                              struct thandle *handle)
3939 {
3940         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3941                                           child_name, ldata, handle, 0);
3942 }
3943
3944 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3945                                            struct mdd_object *mdd_pobj,
3946                                            struct mdd_object *mdd_sobj,
3947                                            struct mdd_object *mdd_tobj,
3948                                            const struct lu_name *lname,
3949                                            struct lu_attr *la,
3950                                            struct lu_attr *parent_la,
3951                                            struct linkea_data *ldata,
3952                                            struct thandle *handle)
3953 {
3954         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3955         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
3956         int rc;
3957
3958         /* Revert IMMUTABLE flag */
3959         la_flag->la_valid = LA_FLAGS;
3960         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3961         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3962         if (rc != 0)
3963                 return rc;
3964
3965         /* delete entry from source dir */
3966         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3967         if (rc != 0)
3968                 return rc;
3969
3970         if (ldata->ld_buf != NULL) {
3971                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3972                                                mdd_tobj, lname, ldata, handle);
3973                 if (rc != 0)
3974                         return rc;
3975         }
3976
3977         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3978                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3979                                            handle);
3980                 if (rc != 0)
3981                         return rc;
3982
3983                 handle->th_complex = 1;
3984                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3985                                            XATTR_NAME_FID,
3986                                            LU_XATTR_REPLACE, handle);
3987                 if (rc < 0)
3988                         return rc;
3989         }
3990
3991         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3992                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3993                 if (rc != 0)
3994                         return rc;
3995         }
3996
3997         /* new name */
3998         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3999                                       mdd_object_type(mdd_tobj),
4000                                       lname->ln_name, handle);
4001         if (rc != 0)
4002                 return rc;
4003
4004         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
4005         if (rc != 0)
4006                 return rc;
4007
4008         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4009                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
4010                 if (rc != 0)
4011                         return rc;
4012         }
4013
4014         /* delete old object */
4015         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4016         if (rc != 0)
4017                 return rc;
4018
4019         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4020                 /* delete old object */
4021                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4022                 if (rc != 0)
4023                         return rc;
4024                 /* set nlink to 0 */
4025                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
4026                 if (rc != 0)
4027                         return rc;
4028         }
4029
4030         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
4031         if (rc)
4032                 return rc;
4033
4034         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
4035         if (rc != 0)
4036                 return rc;
4037
4038         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4039                                          handle);
4040
4041         return rc;
4042 }
4043
4044 static int mdd_migrate_update_name(const struct lu_env *env,
4045                                    struct mdd_object *mdd_pobj,
4046                                    struct mdd_object *mdd_sobj,
4047                                    struct mdd_object *mdd_tobj,
4048                                    const struct lu_name *lname,
4049                                    struct md_attr *ma)
4050 {
4051         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
4052         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4053         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
4054         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
4055         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
4056         struct thandle          *handle;
4057         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
4058         const char              *name = lname->ln_name;
4059         int                     rc;
4060         ENTRY;
4061
4062         /* update time for parent */
4063         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
4064         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
4065         p_la->la_valid = LA_CTIME;
4066
4067         rc = mdd_la_get(env, mdd_sobj, so_attr);
4068         if (rc != 0)
4069                 RETURN(rc);
4070
4071         ldata->ld_buf = NULL;
4072         rc = mdd_links_read(env, mdd_sobj, ldata);
4073         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
4074                 RETURN(rc);
4075
4076         handle = mdd_trans_create(env, mdd);
4077         if (IS_ERR(handle))
4078                 RETURN(PTR_ERR(handle));
4079
4080         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
4081                                              lname, so_attr, p_la, ldata,
4082                                              handle);
4083         if (rc != 0) {
4084                 /* If the migration can not be fit in one transaction, just
4085                  * leave it in the original MDT */
4086                 if (rc == -E2BIG)
4087                         GOTO(stop_trans, rc = 0);
4088                 else
4089                         GOTO(stop_trans, rc);
4090         }
4091
4092         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
4093                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
4094                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4095                PFID(mdd_object_fid(mdd_tobj)));
4096
4097         rc = mdd_trans_start(env, mdd, handle);
4098         if (rc != 0)
4099                 GOTO(stop_trans, rc);
4100
4101         /* Revert IMMUTABLE flag */
4102         la_flag->la_valid = LA_FLAGS;
4103         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
4104         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
4105         if (rc != 0)
4106                 GOTO(stop_trans, rc);
4107
4108         /* Remove source name from source directory */
4109         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
4110         if (rc != 0)
4111                 GOTO(stop_trans, rc);
4112
4113         if (ldata->ld_buf != NULL) {
4114                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
4115                                        lname, ldata, handle);
4116                 if (rc != 0)
4117                         GOTO(stop_trans, rc);
4118
4119                 /*  linkea update might decrease the source object
4120                  *  nlink, let's get the attr again after ref_del */
4121                 rc = mdd_la_get(env, mdd_sobj, so_attr);
4122                 if (rc != 0)
4123                         GOTO(stop_trans, rc);
4124         }
4125
4126         if (S_ISREG(so_attr->la_mode)) {
4127                 if (so_attr->la_nlink == 1) {
4128                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4129                                            handle);
4130                         if (rc != 0 && rc != -ENODATA)
4131                                 GOTO(stop_trans, rc);
4132
4133                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
4134                                            XATTR_NAME_FID,
4135                                            LU_XATTR_REPLACE, handle);
4136                         if (rc < 0)
4137                                 GOTO(stop_trans, rc);
4138                 }
4139         }
4140
4141         /* Insert new fid with target name into target dir */
4142         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4143                                 mdd_object_type(mdd_tobj), name, handle);
4144         if (rc != 0)
4145                 GOTO(stop_trans, rc);
4146
4147         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4148
4149         mdd_sobj->mod_flags |= DEAD_OBJ;
4150         rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
4151         if (rc != 0)
4152                 GOTO(out_unlock, rc);
4153
4154         rc = mdd_orphan_insert(env, mdd_sobj, handle);
4155         if (rc != 0)
4156                 GOTO(out_unlock, rc);
4157
4158         mdo_ref_del(env, mdd_sobj, handle);
4159         if (is_dir)
4160                 mdo_ref_del(env, mdd_sobj, handle);
4161
4162         /* Get the attr again after ref_del */
4163         rc = mdd_la_get(env, mdd_sobj, so_attr);
4164         if (rc != 0)
4165                 GOTO(out_unlock, rc);
4166
4167         ma->ma_attr = *so_attr;
4168         ma->ma_valid |= MA_INODE;
4169
4170         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4171         if (rc != 0)
4172                 GOTO(out_unlock, rc);
4173
4174         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4175                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4176                                mdo2fid(mdd_pobj), lname, lname, handle);
4177         if (rc != 0) {
4178                 CWARN("%s: changelog for migrate %s "DFID
4179                       "under "DFID" failed: rc = %d\n",
4180                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4181                       PFID(mdd_object_fid(mdd_sobj)),
4182                       PFID(mdd_object_fid(mdd_pobj)), rc);
4183                 /* Sigh, there are no easy way to migrate back the object, so
4184                  * let's reset the result to 0 for now XXX */
4185                 rc = 0;
4186         }
4187 out_unlock:
4188         mdd_write_unlock(env, mdd_sobj);
4189
4190 stop_trans:
4191         rc = mdd_trans_stop(env, mdd, rc, handle);
4192
4193         RETURN(rc);
4194 }
4195
4196 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4197                           const struct lu_fid *fid, __u32 *mdt_index)
4198 {
4199         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4200         struct seq_server_site *ss;
4201         int rc;
4202
4203         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4204
4205         range->lsr_flags = LU_SEQ_RANGE_MDT;
4206         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4207         if (rc != 0)
4208                 return rc;
4209
4210         *mdt_index = range->lsr_index;
4211
4212         return 0;
4213 }
4214 /**
4215  * Check whether we should migrate the file/dir
4216  * return val
4217  *      < 0  permission check failed or other error.
4218  *      = 0  the file can be migrated.
4219  *      > 0  the file does not need to be migrated, mostly
4220  *           for multiple link file
4221  **/
4222 static int mdd_migrate_sanity_check(const struct lu_env *env,
4223                                     struct mdd_object *pobj,
4224                                     const struct lu_attr *pattr,
4225                                     struct mdd_object *sobj,
4226                                     struct lu_attr *sattr)
4227 {
4228         struct mdd_thread_info  *info = mdd_env_info(env);
4229         struct linkea_data      *ldata = &info->mti_link_data;
4230         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4231         int                     mgr_easize;
4232         struct lu_buf           *mgr_buf;
4233         int                     count;
4234         int                     rc;
4235         __u64 mdt_index;
4236         ENTRY;
4237
4238         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4239         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4240         if (mgr_buf->lb_buf == NULL)
4241                 RETURN(-ENOMEM);
4242
4243         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4244         if (rc > 0) {
4245                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4246
4247                 /* If the object has migrateEA, it means IMMUTE flag
4248                  * is being set by previous migration process, so it
4249                  * needs to override the IMMUTE flag, otherwise the
4250                  * following sanity check will fail */
4251                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4252                                                 LMV_HASH_FLAG_MIGRATION) {
4253                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4254
4255                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4256                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4257                                mdd2obd_dev(mdd)->obd_name,
4258                                PFID(mdd_object_fid(sobj)));
4259                 }
4260         }
4261
4262         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4263                                      sobj, sattr, NULL, NULL);
4264         if (rc != 0)
4265                 RETURN(rc);
4266
4267         /* Then it will check if the file should be migrated. If the file
4268          * has mulitple links, we only need migrate the file if all of its
4269          * entries has been migrated to the remote MDT */
4270         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4271                 RETURN(0);
4272
4273         rc = mdd_links_read(env, sobj, ldata);
4274         if (rc != 0) {
4275                 /* For multiple links files, if there are no linkEA data at all,
4276                  * means the file might be created before linkEA is enabled, and
4277                  * all of its links should not be migrated yet, otherwise it
4278                  * should have some linkEA there */
4279                 if (rc == -ENOENT || rc == -ENODATA)
4280                         RETURN(1);
4281                 RETURN(rc);
4282         }
4283
4284         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4285         /* If there are still links locally, then the file will not be
4286          * migrated. */
4287         LASSERT(ldata->ld_leh != NULL);
4288
4289         /* If the linkEA is overflow, then means there are some unknown name
4290          * entries under unknown parents, that will prevent the migration. */
4291         if (unlikely(ldata->ld_leh->leh_overflow_time))
4292                 RETURN(1);
4293
4294         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4295         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4296                 struct lu_name          lname;
4297                 struct lu_fid           fid;
4298                 __u32                   parent_mdt_index;
4299
4300                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4301                                     &lname, &fid);
4302                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4303                                                          ldata->ld_reclen);
4304
4305                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4306                 if (rc != 0)
4307                         RETURN(rc);
4308
4309                 /* Migrate the object only if none of its parents are on the
4310                  * current MDT. */
4311                 if (parent_mdt_index != mdt_index)
4312                         continue;
4313
4314                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4315                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4316                        lname.ln_name, PFID(&fid));
4317                 rc = 1;
4318                 break;
4319         }
4320
4321         RETURN(rc);
4322 }
4323
4324 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4325                        struct md_object *sobj, const struct lu_name *lname,
4326                        struct md_object *tobj, struct md_attr *ma)
4327 {
4328         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4329         struct mdd_device       *mdd = mdo2mdd(pobj);
4330         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4331         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4332         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4333         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4334         bool                    created = false;
4335         int                     rc;
4336
4337         ENTRY;
4338         /* If the file will being migrated, it will check whether
4339          * the file is being opened by someone else right now */
4340         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4341         if (mdd_sobj->mod_count > 0) {
4342                 CDEBUG(D_OTHER,
4343                        "%s: "DFID"%s is already opened count %d: rc = %d\n",
4344                        mdd2obd_dev(mdd)->obd_name,
4345                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4346                        mdd_sobj->mod_count, -EBUSY);
4347                 mdd_read_unlock(env, mdd_sobj);
4348                 GOTO(put, rc = -EBUSY);
4349         }
4350         mdd_read_unlock(env, mdd_sobj);
4351
4352         rc = mdd_la_get(env, mdd_sobj, so_attr);
4353         if (rc != 0)
4354                 GOTO(put, rc);
4355
4356         rc = mdd_la_get(env, mdd_pobj, pattr);
4357         if (rc != 0)
4358                 GOTO(put, rc);
4359
4360         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4361         if (rc != 0) {
4362                 if (rc > 0)
4363                         rc = 0;
4364                 GOTO(put, rc);
4365         }
4366
4367         /* Sigh, it is impossible to finish all of migration in a single
4368          * transaction, for example migrating big directory entries to the
4369          * new MDT, it needs insert all of name entries of children in the
4370          * new directory.
4371          *
4372          * So migration will be done in multiple steps and transactions.
4373          *
4374          * 1. create an orphan object on the remote MDT in one transaction.
4375          * 2. migrate extend attributes to the new target file/directory.
4376          * 3. For directory, migrate the entries to the new MDT and update
4377          * linkEA of each children. Because we can not migrate all entries
4378          * in a single transaction, so the migrating directory will become
4379          * a striped directory during migration, so once the process is
4380          * interrupted, the directory is still accessible. (During lookup,
4381          * client will locate the name by searching both original and target
4382          * object).
4383          * 4. Finally, update the name/FID to point to the new file/directory
4384          * in a separate transaction.
4385          */
4386
4387         /* step 1: Check whether the orphan object has been created, and create
4388          * orphan object on the remote MDT if needed */
4389         if (!mdd_object_exists(mdd_tobj)) {
4390                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4391                                         lname, so_attr);
4392                 if (rc != 0)
4393                         GOTO(put, rc);
4394                 created = true;
4395         }
4396
4397         LASSERT(mdd_object_exists(mdd_tobj));
4398         /* step 2: migrate xattr */
4399         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4400         if (rc != 0)
4401                 GOTO(put, rc);
4402
4403         /* step 3: migrate name entries to the orphan object */
4404         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4405                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4406                 if (rc != 0)
4407                         GOTO(put, rc);
4408                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4409                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4410                         GOTO(put, rc = 0);
4411         } else {
4412                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4413         }
4414
4415         LASSERT(mdd_object_exists(mdd_tobj));
4416         /* step 4: update name entry to the new object */
4417         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4418                                      ma);
4419         if (rc != 0)
4420                 GOTO(put, rc);
4421
4422         /* newly created target was not locked, don't cache its attributes */
4423         if (created)
4424                 mdd_invalidate(env, tobj);
4425 put:
4426         RETURN(rc);
4427 }
4428
4429 const struct md_dir_operations mdd_dir_ops = {
4430         .mdo_is_subdir     = mdd_is_subdir,
4431         .mdo_lookup        = mdd_lookup,
4432         .mdo_create        = mdd_create,
4433         .mdo_rename        = mdd_rename,
4434         .mdo_link          = mdd_link,
4435         .mdo_unlink        = mdd_unlink,
4436         .mdo_create_data   = mdd_create_data,
4437         .mdo_migrate       = mdd_migrate,
4438 };