Whamcloud - gitweb
LU-8648 all: remove all Sun license and URL references
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         (char *) dotdot,
53         sizeof(dotdot) - 1
54 };
55
56 static inline int
57 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
58 {
59         if (!lu_name_is_valid(ln))
60                 return -EINVAL;
61         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
62                 return -ENAMETOOLONG;
63         else
64                 return 0;
65 }
66
67 /* Get FID from name and parent */
68 static int
69 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
70              const struct lu_attr *pattr, const struct lu_name *lname,
71              struct lu_fid* fid, int mask)
72 {
73         const char *name                = lname->ln_name;
74         const struct dt_key *key        = (const struct dt_key *)name;
75         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
76         struct mdd_device *m            = mdo2mdd(pobj);
77         struct dt_object *dir           = mdd_object_child(mdd_obj);
78         int rc;
79         ENTRY;
80
81         if (unlikely(mdd_is_dead_obj(mdd_obj)))
82                 RETURN(-ESTALE);
83
84         if (!mdd_object_exists(mdd_obj))
85                 RETURN(-ESTALE);
86
87         if (mdd_object_remote(mdd_obj)) {
88                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
89                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
90         }
91
92         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
93                                             MOR_TGT_PARENT);
94         if (rc)
95                 RETURN(rc);
96
97         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
98                    dt_try_as_dir(env, dir)))
99                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
100         else
101                 rc = -ENOTDIR;
102
103         RETURN(rc);
104 }
105
106 int mdd_lookup(const struct lu_env *env,
107                struct md_object *pobj, const struct lu_name *lname,
108                struct lu_fid *fid, struct md_op_spec *spec)
109 {
110         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
111         int rc;
112         ENTRY;
113
114         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
115         if (rc != 0)
116                 RETURN(rc);
117
118         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
119                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
120         RETURN(rc);
121 }
122
123 /**
124  * Get parent FID of the directory
125  *
126  * Read parent FID from linkEA, if that fails, then do lookup
127  * dotdot to get the parent FID.
128  *
129  * \param[in] env       execution environment
130  * \param[in] obj       object from which to find the parent FID
131  * \param[in] attr      attribute of the object
132  * \param[out] fid      fid to get the parent FID
133  *
134  * \retval              0 if getting the parent FID succeeds.
135  * \retval              negative errno if getting the parent FID fails.
136  **/
137 static inline int mdd_parent_fid(const struct lu_env *env,
138                                  struct mdd_object *obj,
139                                  const struct lu_attr *attr,
140                                  struct lu_fid *fid)
141 {
142         struct mdd_thread_info  *info = mdd_env_info(env);
143         struct linkea_data      ldata = { NULL };
144         struct lu_buf           *buf = &info->mti_link_buf;
145         struct lu_name          lname;
146         int                     rc = 0;
147
148         ENTRY;
149
150         LASSERT(S_ISDIR(mdd_object_type(obj)));
151
152         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
153         if (buf->lb_buf == NULL)
154                 GOTO(lookup, rc = 0);
155
156         ldata.ld_buf = buf;
157         rc = mdd_links_read(env, obj, &ldata);
158         if (rc != 0)
159                 GOTO(lookup, rc);
160
161         LASSERT(ldata.ld_leh != NULL);
162         /* Directory should only have 1 parent */
163         if (ldata.ld_leh->leh_reccount > 1)
164                 GOTO(lookup, rc);
165
166         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
167
168         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
169         if (likely(fid_is_sane(fid)))
170                 RETURN(0);
171 lookup:
172         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
173         RETURN(rc);
174 }
175
176 /*
177  * For root fid use special function, which does not compare version component
178  * of fid. Version component is different for root fids on all MDTs.
179  */
180 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
181 {
182         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
183                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
184 }
185
186 /*
187  * return 1: if lf is the fid of the ancestor of p1;
188  * return 0: if not;
189  *
190  * return -EREMOTE: if remote object is found, in this
191  * case fid of remote object is saved to @pf;
192  *
193  * otherwise: values < 0, errors.
194  */
195 static int mdd_is_parent(const struct lu_env *env,
196                         struct mdd_device *mdd,
197                         struct mdd_object *p1,
198                         const struct lu_attr *attr,
199                         const struct lu_fid *lf,
200                         struct lu_fid *pf)
201 {
202         struct mdd_object *parent = NULL;
203         struct lu_fid *pfid;
204         int rc;
205         ENTRY;
206
207         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
208         pfid = &mdd_env_info(env)->mti_fid;
209
210         /* Check for root first. */
211         if (mdd_is_root(mdd, mdo2fid(p1)))
212                 RETURN(0);
213
214         for(;;) {
215                 /* this is done recursively */
216                 rc = mdd_parent_fid(env, p1, attr, pfid);
217                 if (rc)
218                         GOTO(out, rc);
219                 if (mdd_is_root(mdd, pfid))
220                         GOTO(out, rc = 0);
221                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
222                         GOTO(out, rc = 0);
223                 if (lu_fid_eq(pfid, lf))
224                         GOTO(out, rc = 1);
225                 if (parent != NULL)
226                         mdd_object_put(env, parent);
227
228                 parent = mdd_object_find(env, mdd, pfid);
229                 if (IS_ERR(parent))
230                         GOTO(out, rc = PTR_ERR(parent));
231
232                 if (!mdd_object_exists(parent))
233                         GOTO(out, rc = -EINVAL);
234
235                 p1 = parent;
236         }
237         EXIT;
238 out:
239         if (parent && !IS_ERR(parent))
240                 mdd_object_put(env, parent);
241         return rc;
242 }
243
244 /*
245  * No permission check is needed.
246  *
247  * returns 1: if fid is ancestor of @mo;
248  * returns 0: if fid is not an ancestor of @mo;
249  *
250  * returns EREMOTE if remote object is found, fid of remote object is saved to
251  * @fid;
252  *
253  * returns < 0: if error
254  */
255 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
256                   const struct lu_fid *fid, struct lu_fid *sfid)
257 {
258         struct mdd_device *mdd = mdo2mdd(mo);
259         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
260         int rc;
261         ENTRY;
262
263         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
264                 RETURN(0);
265
266         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
267         if (rc != 0)
268                 RETURN(rc);
269
270         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
271         if (rc == 0) {
272                 /* found root */
273                 fid_zero(sfid);
274         } else if (rc == 1) {
275                 /* found @fid is parent */
276                 *sfid = *fid;
277                 rc = 0;
278         }
279         RETURN(rc);
280 }
281
282 /*
283  * Check that @dir contains no entries except (possibly) dot and dotdot.
284  *
285  * Returns:
286  *
287  *             0        empty
288  *      -ENOTDIR        not a directory object
289  *    -ENOTEMPTY        not empty
290  *           -ve        other error
291  *
292  */
293 static int mdd_dir_is_empty(const struct lu_env *env,
294                             struct mdd_object *dir)
295 {
296         struct dt_it     *it;
297         struct dt_object *obj;
298         const struct dt_it_ops *iops;
299         int result;
300         ENTRY;
301
302         obj = mdd_object_child(dir);
303         if (!dt_try_as_dir(env, obj))
304                 RETURN(-ENOTDIR);
305
306         iops = &obj->do_index_ops->dio_it;
307         it = iops->init(env, obj, LUDA_64BITHASH);
308         if (!IS_ERR(it)) {
309                 result = iops->get(env, it, (const struct dt_key *)"");
310                 if (result > 0) {
311                         int i;
312                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
313                                 result = iops->next(env, it);
314                         if (result == 0)
315                                 result = -ENOTEMPTY;
316                         else if (result == 1)
317                                 result = 0;
318                 } else if (result == 0)
319                         /*
320                          * Huh? Index contains no zero key?
321                          */
322                         result = -EIO;
323
324                 iops->put(env, it);
325                 iops->fini(env, it);
326         } else
327                 result = PTR_ERR(it);
328         RETURN(result);
329 }
330
331 /**
332  * Determine if the target object can be hard linked, and right now it only
333  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
334  * directory nlink count might exceed the maximum link count(see
335  * osd_object_ref_add), so it only check nlink for non-directories.
336  *
337  * \param[in] env       thread environment
338  * \param[in] obj       object being linked to
339  * \param[in] la        attributes of \a obj
340  *
341  * \retval              0 if \a obj can be hard linked
342  * \retval              negative error if \a obj is a directory or has too
343  *                      many links
344  */
345 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
346                           const struct lu_attr *la)
347 {
348         struct mdd_device *m = mdd_obj2mdd_dev(obj);
349         ENTRY;
350
351         LASSERT(la != NULL);
352
353         /* Subdir count limitation can be broken through
354          * (see osd_object_ref_add), so only check non-directory here. */
355         if (!S_ISDIR(la->la_mode) &&
356             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
357                 RETURN(-EMLINK);
358
359         RETURN(0);
360 }
361
362 /**
363  * Check whether it may create the cobj under the pobj.
364  *
365  * \param[in] env       execution environment
366  * \param[in] pobj      the parent directory
367  * \param[in] pattr     the attribute of the parent directory
368  * \param[in] cobj      the child to be created
369  * \param[in] check_perm        if check WRITE|EXEC permission for parent
370  *
371  * \retval              = 0 create the child under this dir is allowed
372  * \retval              negative errno create the child under this dir is
373  *                      not allowed
374  */
375 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
376                    const struct lu_attr *pattr, struct mdd_object *cobj,
377                    bool check_perm)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (cobj && mdd_object_exists(cobj))
383                 RETURN(-EEXIST);
384
385         if (mdd_is_dead_obj(pobj))
386                 RETURN(-ENOENT);
387
388         if (check_perm)
389                 rc = mdd_permission_internal_locked(env, pobj, pattr,
390                                                     MAY_WRITE | MAY_EXEC,
391                                                     MOR_TGT_PARENT);
392         RETURN(rc);
393 }
394
395 /*
396  * Check whether can unlink from the pobj in the case of "cobj == NULL".
397  */
398 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
399                    const struct lu_attr *pattr, const struct lu_attr *attr)
400 {
401         int rc;
402         ENTRY;
403
404         if (mdd_is_dead_obj(pobj))
405                 RETURN(-ENOENT);
406
407         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
408                 RETURN(-EPERM);
409
410         rc = mdd_permission_internal_locked(env, pobj, pattr,
411                                             MAY_WRITE | MAY_EXEC,
412                                             MOR_TGT_PARENT);
413         if (rc != 0)
414                 RETURN(rc);
415
416         if (pattr->la_flags & LUSTRE_APPEND_FL)
417                 RETURN(-EPERM);
418
419         RETURN(rc);
420 }
421
422 /*
423  * pobj == NULL is remote ops case, under such case, pobj's
424  * VTX feature has been checked already, no need check again.
425  */
426 static inline int mdd_is_sticky(const struct lu_env *env,
427                                 struct mdd_object *pobj,
428                                 const struct lu_attr *pattr,
429                                 struct mdd_object *cobj,
430                                 const struct lu_attr *cattr)
431 {
432         struct lu_ucred *uc = lu_ucred_assert(env);
433
434         if (pobj != NULL) {
435                 LASSERT(pattr != NULL);
436                 if (!(pattr->la_mode & S_ISVTX) ||
437                     (pattr->la_uid == uc->uc_fsuid))
438                         return 0;
439         }
440
441         LASSERT(cattr != NULL);
442         if (cattr->la_uid == uc->uc_fsuid)
443                 return 0;
444
445         return !md_capable(uc, CFS_CAP_FOWNER);
446 }
447
448 static int mdd_may_delete_entry(const struct lu_env *env,
449                                 struct mdd_object *pobj,
450                                 const struct lu_attr *pattr,
451                                 int check_perm)
452 {
453         ENTRY;
454
455         LASSERT(pobj != NULL);
456         if (!mdd_object_exists(pobj))
457                 RETURN(-ENOENT);
458
459         if (mdd_is_dead_obj(pobj))
460                 RETURN(-ENOENT);
461
462         if (check_perm) {
463                 int rc;
464                 rc = mdd_permission_internal_locked(env, pobj, pattr,
465                                             MAY_WRITE | MAY_EXEC,
466                                             MOR_TGT_PARENT);
467                 if (rc)
468                         RETURN(rc);
469         }
470
471         if (pattr->la_flags & LUSTRE_APPEND_FL)
472                 RETURN(-EPERM);
473
474         RETURN(0);
475 }
476
477 /*
478  * Check whether it may delete the cobj from the pobj.
479  * pobj maybe NULL
480  */
481 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
482                    const struct lu_attr *tpattr, struct mdd_object *tobj,
483                    const struct lu_attr *tattr, const struct lu_attr *cattr,
484                    int check_perm, int check_empty)
485 {
486         int rc = 0;
487         ENTRY;
488
489         if (tpobj) {
490                 LASSERT(tpattr != NULL);
491                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
492                 if (rc != 0)
493                         RETURN(rc);
494         }
495
496         if (tobj == NULL)
497                 RETURN(0);
498
499         if (!mdd_object_exists(tobj))
500                 RETURN(-ENOENT);
501
502         if (mdd_is_dead_obj(tobj))
503                 RETURN(-ESTALE);
504
505         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
506                 RETURN(-EPERM);
507
508         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
509                 RETURN(-EPERM);
510
511         /* additional check the rename case */
512         if (cattr) {
513                 if (S_ISDIR(cattr->la_mode)) {
514                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
515
516                         if (!S_ISDIR(tattr->la_mode))
517                                 RETURN(-ENOTDIR);
518
519                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
520                                 RETURN(-EBUSY);
521                 } else if (S_ISDIR(tattr->la_mode))
522                         RETURN(-EISDIR);
523         }
524
525         if (S_ISDIR(tattr->la_mode) && check_empty)
526                 rc = mdd_dir_is_empty(env, tobj);
527
528         RETURN(rc);
529 }
530
531 /**
532  * Check whether it can create the link file(linked to @src_obj) under
533  * the target directory(@tgt_obj), and src_obj has been locked by
534  * mdd_write_lock.
535  *
536  * \param[in] env       execution environment
537  * \param[in] tgt_obj   the target directory
538  * \param[in] tattr     attributes of target directory
539  * \param[in] lname     the link name
540  * \param[in] src_obj   source object for link
541  * \param[in] cattr     attributes for source object
542  *
543  * \retval              = 0 it is allowed to create the link file under tgt_obj
544  * \retval              negative error not allowed to create the link file
545  */
546 static int mdd_link_sanity_check(const struct lu_env *env,
547                                  struct mdd_object *tgt_obj,
548                                  const struct lu_attr *tattr,
549                                  const struct lu_name *lname,
550                                  struct mdd_object *src_obj,
551                                  const struct lu_attr *cattr)
552 {
553         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
554         int rc = 0;
555         ENTRY;
556
557         if (!mdd_object_exists(src_obj))
558                 RETURN(-ENOENT);
559
560         if (mdd_is_dead_obj(src_obj))
561                 RETURN(-ESTALE);
562
563         /* Local ops, no lookup before link, check filename length here. */
564         rc = mdd_name_check(m, lname);
565         if (rc < 0)
566                 RETURN(rc);
567
568         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
569                 RETURN(-EPERM);
570
571         if (S_ISDIR(mdd_object_type(src_obj)))
572                 RETURN(-EPERM);
573
574         LASSERT(src_obj != tgt_obj);
575         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
576         if (rc != 0)
577                 RETURN(rc);
578
579         rc = __mdd_may_link(env, src_obj, cattr);
580
581         RETURN(rc);
582 }
583
584 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
585                                    const char *name, struct thandle *handle)
586 {
587         struct dt_object *next = mdd_object_child(pobj);
588         int rc;
589         ENTRY;
590
591         if (dt_try_as_dir(env, next))
592                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
593         else
594                 rc = -ENOTDIR;
595
596         RETURN(rc);
597 }
598
599 static int __mdd_index_insert_only(const struct lu_env *env,
600                                    struct mdd_object *pobj,
601                                    const struct lu_fid *lf, __u32 type,
602                                    const char *name,
603                                    struct thandle *handle)
604 {
605         struct dt_object *next = mdd_object_child(pobj);
606         int               rc;
607         ENTRY;
608
609         if (dt_try_as_dir(env, next)) {
610                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
611                 struct lu_ucred         *uc  = lu_ucred_check(env);
612                 int                      ignore_quota;
613
614                 rec->rec_fid = lf;
615                 rec->rec_type = type;
616                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
617                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
618                                (const struct dt_key *)name, handle,
619                                ignore_quota);
620         } else {
621                 rc = -ENOTDIR;
622         }
623         RETURN(rc);
624 }
625
626 /* insert named index, add reference if isdir */
627 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
628                               const struct lu_fid *lf, __u32 type,
629                               const char *name, struct thandle *handle)
630 {
631         int rc;
632         ENTRY;
633
634         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
635         if (rc == 0 && S_ISDIR(type)) {
636                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
637                 mdo_ref_add(env, pobj, handle);
638                 mdd_write_unlock(env, pobj);
639         }
640
641         RETURN(rc);
642 }
643
644 /* delete named index, drop reference if isdir */
645 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
646                               const char *name, int is_dir,
647                               struct thandle *handle)
648 {
649         int               rc;
650         ENTRY;
651
652         rc = __mdd_index_delete_only(env, pobj, name, handle);
653         if (rc == 0 && is_dir) {
654                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
655                 mdo_ref_del(env, pobj, handle);
656                 mdd_write_unlock(env, pobj);
657         }
658
659         RETURN(rc);
660 }
661
662 static int mdd_llog_record_calc_size(const struct lu_env *env,
663                                      const struct lu_name *tname,
664                                      const struct lu_name *sname)
665 {
666         const struct lu_ucred   *uc = lu_ucred(env);
667         enum changelog_rec_flags crf = 0;
668         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
669                                             sizeof(struct changelog_rec);
670
671         if (sname != NULL)
672                 crf |= CLF_RENAME;
673
674         if (uc != NULL && uc->uc_jobid[0] != '\0')
675                 crf |= CLF_JOBID;
676
677         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
678                              (tname != NULL ? tname->ln_namelen : 0) +
679                              (sname != NULL ? 1 + sname->ln_namelen : 0));
680 }
681
682 int mdd_declare_changelog_store(const struct lu_env *env,
683                                 struct mdd_device *mdd,
684                                 const struct lu_name *tname,
685                                 const struct lu_name *sname,
686                                 struct thandle *handle)
687 {
688         struct obd_device               *obd = mdd2obd_dev(mdd);
689         struct llog_ctxt                *ctxt;
690         struct llog_changelog_rec       *rec;
691         struct lu_buf                   *buf;
692         struct thandle                  *llog_th;
693         int                              reclen;
694         int                              rc;
695
696         /* Not recording */
697         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
698                 return 0;
699
700         reclen = mdd_llog_record_calc_size(env, tname, sname);
701         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
702         if (buf->lb_buf == NULL)
703                 return -ENOMEM;
704
705         rec = buf->lb_buf;
706         rec->cr_hdr.lrh_len = reclen;
707         rec->cr_hdr.lrh_type = CHANGELOG_REC;
708
709         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
710         if (ctxt == NULL)
711                 return -ENXIO;
712
713         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
714         if (IS_ERR(llog_th))
715                 GOTO(out_put, rc = PTR_ERR(llog_th));
716
717         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
718
719 out_put:
720         llog_ctxt_put(ctxt);
721
722         return rc;
723 }
724
725 /** Add a changelog entry \a rec to the changelog llog
726  * \param mdd
727  * \param rec
728  * \param handle - currently ignored since llogs start their own transaction;
729  *              this will hopefully be fixed in llog rewrite
730  * \retval 0 ok
731  */
732 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
733                         struct llog_changelog_rec *rec, struct thandle *th)
734 {
735         struct obd_device       *obd = mdd2obd_dev(mdd);
736         struct llog_ctxt        *ctxt;
737         struct thandle          *llog_th;
738         int                      rc;
739
740         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
741                                             changelog_rec_varsize(&rec->cr));
742
743         /* llog_lvfs_write_rec sets the llog tail len */
744         rec->cr_hdr.lrh_type = CHANGELOG_REC;
745         rec->cr.cr_time = cl_time();
746
747         spin_lock(&mdd->mdd_cl.mc_lock);
748         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
749          * but as long as the MDD transactions are ordered correctly for e.g.
750          * rename conflicts, I don't think this should matter. */
751         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
752         spin_unlock(&mdd->mdd_cl.mc_lock);
753
754         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
755         if (ctxt == NULL)
756                 return -ENXIO;
757
758         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
759         if (IS_ERR(llog_th))
760                 GOTO(out_put, rc = PTR_ERR(llog_th));
761
762         /* nested journal transaction */
763         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
764
765 out_put:
766         llog_ctxt_put(ctxt);
767         if (rc > 0)
768                 rc = 0;
769         return rc;
770 }
771
772 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
773                                          const struct lu_fid *sfid,
774                                          const struct lu_fid *spfid,
775                                          const struct lu_name *sname)
776 {
777         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
778         size_t                           extsize = sname->ln_namelen + 1;
779
780         LASSERT(sfid != NULL);
781         LASSERT(spfid != NULL);
782         LASSERT(sname != NULL);
783
784         rnm->cr_sfid = *sfid;
785         rnm->cr_spfid = *spfid;
786
787         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
788         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
789         rec->cr_namelen += extsize;
790 }
791
792 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
793 {
794         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
795
796         if (jobid == NULL || jobid[0] == '\0')
797                 return;
798
799         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
800 }
801
802 /** Store a namespace change changelog record
803  * If this fails, we must fail the whole transaction; we don't
804  * want the change to commit without the log entry.
805  * \param target - mdd_object of change
806  * \param tpfid - target parent dir/object fid
807  * \param sfid - source object fid
808  * \param spfid - source parent fid
809  * \param tname - target name string
810  * \param sname - source name string
811  * \param handle - transaction handle
812  */
813 int mdd_changelog_ns_store(const struct lu_env *env,
814                            struct mdd_device *mdd,
815                            enum changelog_rec_type type,
816                            enum changelog_rec_flags crf,
817                            struct mdd_object *target,
818                            const struct lu_fid *tpfid,
819                            const struct lu_fid *sfid,
820                            const struct lu_fid *spfid,
821                            const struct lu_name *tname,
822                            const struct lu_name *sname,
823                            struct thandle *handle)
824 {
825         const struct lu_ucred           *uc = lu_ucred(env);
826         struct llog_changelog_rec       *rec;
827         struct lu_buf                   *buf;
828         int                              reclen;
829         int                              rc;
830         ENTRY;
831
832         /* Not recording */
833         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
834                 RETURN(0);
835
836         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
837                 RETURN(0);
838
839         LASSERT(tpfid != NULL);
840         LASSERT(tname != NULL);
841         LASSERT(handle != NULL);
842
843         reclen = mdd_llog_record_calc_size(env, tname, sname);
844         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
845         if (buf->lb_buf == NULL)
846                 RETURN(-ENOMEM);
847         rec = buf->lb_buf;
848
849         crf &= CLF_FLAGMASK;
850
851         if (uc != NULL && uc->uc_jobid[0] != '\0')
852                 crf |= CLF_JOBID;
853
854         if (sname != NULL)
855                 crf |= CLF_RENAME;
856         else
857                 crf |= CLF_VERSION;
858
859         rec->cr.cr_flags = crf;
860         rec->cr.cr_type = (__u32)type;
861         rec->cr.cr_pfid = *tpfid;
862         rec->cr.cr_namelen = tname->ln_namelen;
863         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
864
865         if (crf & CLF_RENAME)
866                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
867
868         if (crf & CLF_JOBID)
869                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
870
871         if (likely(target != NULL)) {
872                 rec->cr.cr_tfid = *mdo2fid(target);
873                 target->mod_cltime = cfs_time_current_64();
874         } else {
875                 fid_zero(&rec->cr.cr_tfid);
876         }
877
878         rc = mdd_changelog_store(env, mdd, rec, handle);
879         if (rc < 0) {
880                 CERROR("%s: cannot store changelog record: type = %d, "
881                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
882                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
883                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
884                 return -EFAULT;
885         }
886
887         return 0;
888 }
889
890 static int __mdd_links_add(const struct lu_env *env,
891                            struct mdd_object *mdd_obj,
892                            struct linkea_data *ldata,
893                            const struct lu_name *lname,
894                            const struct lu_fid *pfid,
895                            int first, int check)
896 {
897         int rc;
898
899         if (ldata->ld_leh == NULL) {
900                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
901                 if (rc) {
902                         if (rc != -ENODATA)
903                                 return rc;
904                         rc = linkea_data_new(ldata,
905                                              &mdd_env_info(env)->mti_link_buf);
906                         if (rc)
907                                 return rc;
908                 }
909         }
910
911         if (check) {
912                 rc = linkea_links_find(ldata, lname, pfid);
913                 if (rc && rc != -ENOENT)
914                         return rc;
915                 if (rc == 0)
916                         return -EEXIST;
917         }
918
919         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
920                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
921
922                 *tfid = *pfid;
923                 tfid->f_ver = ~0;
924                 linkea_add_buf(ldata, lname, tfid);
925         }
926
927         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
928                 linkea_add_buf(ldata, lname, pfid);
929
930         return linkea_add_buf(ldata, lname, pfid);
931 }
932
933 static int __mdd_links_del(const struct lu_env *env,
934                            struct mdd_object *mdd_obj,
935                            struct linkea_data *ldata,
936                            const struct lu_name *lname,
937                            const struct lu_fid *pfid)
938 {
939         int rc;
940
941         if (ldata->ld_leh == NULL) {
942                 rc = mdd_links_read(env, mdd_obj, ldata);
943                 if (rc)
944                         return rc;
945         }
946
947         rc = linkea_links_find(ldata, lname, pfid);
948         if (rc)
949                 return rc;
950
951         linkea_del_buf(ldata, lname);
952         return 0;
953 }
954
955 static int mdd_linkea_prepare(const struct lu_env *env,
956                               struct mdd_object *mdd_obj,
957                               const struct lu_fid *oldpfid,
958                               const struct lu_name *oldlname,
959                               const struct lu_fid *newpfid,
960                               const struct lu_name *newlname,
961                               int first, int check,
962                               struct linkea_data *ldata)
963 {
964         int rc = 0;
965         int rc2 = 0;
966         ENTRY;
967
968         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
969                 return 0;
970
971         LASSERT(oldpfid != NULL || newpfid != NULL);
972
973         if (mdd_obj->mod_flags & DEAD_OBJ) {
974                 /* Prevent linkea to be updated which is NOT necessary. */
975                 ldata->ld_reclen = 0;
976                 /* No more links, don't bother */
977                 RETURN(0);
978         }
979
980         if (oldpfid != NULL) {
981                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
982                 if (rc) {
983                         if ((check == 1) ||
984                             (rc != -ENODATA && rc != -ENOENT))
985                                 RETURN(rc);
986                         /* No changes done. */
987                         rc = 0;
988                 }
989         }
990
991         /* If renaming, add the new record */
992         if (newpfid != NULL) {
993                 /* even if the add fails, we still delete the out-of-date
994                  * old link */
995                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
996                                       first, check);
997         }
998
999         rc = rc != 0 ? rc : rc2;
1000
1001         RETURN(rc);
1002 }
1003
1004 int mdd_links_rename(const struct lu_env *env,
1005                      struct mdd_object *mdd_obj,
1006                      const struct lu_fid *oldpfid,
1007                      const struct lu_name *oldlname,
1008                      const struct lu_fid *newpfid,
1009                      const struct lu_name *newlname,
1010                      struct thandle *handle,
1011                      struct linkea_data *ldata,
1012                      int first, int check)
1013 {
1014         int rc = 0;
1015         ENTRY;
1016
1017         if (ldata == NULL) {
1018                 ldata = &mdd_env_info(env)->mti_link_data;
1019                 memset(ldata, 0, sizeof(*ldata));
1020                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1021                                         newpfid, newlname, first, check,
1022                                         ldata);
1023                 if (rc != 0)
1024                         GOTO(out, rc);
1025         }
1026
1027         if (ldata->ld_reclen != 0)
1028                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1029         EXIT;
1030 out:
1031         if (rc != 0) {
1032                 int error = 1;
1033                 if (rc == -EOVERFLOW || rc == -ENOSPC)
1034                         error = 0;
1035                 if (newlname == NULL)
1036                         CDEBUG(error ? D_ERROR : D_OTHER,
1037                                "link_ea add failed %d "DFID"\n",
1038                                rc, PFID(mdd_object_fid(mdd_obj)));
1039                 else if (oldpfid == NULL)
1040                         CDEBUG(error ? D_ERROR : D_OTHER,
1041                                "link_ea add '%.*s' failed %d "DFID"\n",
1042                                newlname->ln_namelen, newlname->ln_name,
1043                                rc, PFID(mdd_object_fid(mdd_obj)));
1044                 else if (newpfid == NULL)
1045                         CDEBUG(error ? D_ERROR : D_OTHER,
1046                                "link_ea del '%.*s' failed %d "DFID"\n",
1047                                oldlname->ln_namelen, oldlname->ln_name,
1048                                rc, PFID(mdd_object_fid(mdd_obj)));
1049                 else
1050                         CDEBUG(error ? D_ERROR : D_OTHER,
1051                                "link_ea rename '%.*s'->'%.*s' failed %d "
1052                                DFID"\n",
1053                                oldlname->ln_namelen, oldlname->ln_name,
1054                                newlname->ln_namelen, newlname->ln_name,
1055                                rc, PFID(mdd_object_fid(mdd_obj)));
1056         }
1057
1058         if (is_vmalloc_addr(ldata->ld_buf))
1059                 /* if we vmalloced a large buffer drop it */
1060                 lu_buf_free(ldata->ld_buf);
1061
1062         return rc;
1063 }
1064
1065 static inline int mdd_links_add(const struct lu_env *env,
1066                                 struct mdd_object *mdd_obj,
1067                                 const struct lu_fid *pfid,
1068                                 const struct lu_name *lname,
1069                                 struct thandle *handle,
1070                                 struct linkea_data *ldata, int first)
1071 {
1072         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1073                                 pfid, lname, handle, ldata, first, 0);
1074 }
1075
1076 static inline int mdd_links_del(const struct lu_env *env,
1077                                 struct mdd_object *mdd_obj,
1078                                 const struct lu_fid *pfid,
1079                                 const struct lu_name *lname,
1080                                 struct thandle *handle)
1081 {
1082         return mdd_links_rename(env, mdd_obj, pfid, lname,
1083                                 NULL, NULL, handle, NULL, 0, 0);
1084 }
1085
1086 /** Read the link EA into a temp buffer.
1087  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1088  * A pointer to the buffer is stored in \a ldata::ld_buf.
1089  *
1090  * \retval 0 or error
1091  */
1092 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1093                    struct linkea_data *ldata)
1094 {
1095         int rc;
1096
1097         if (!mdd_object_exists(mdd_obj))
1098                 return -ENODATA;
1099
1100         /* First try a small buf */
1101         LASSERT(env != NULL);
1102         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1103                                                PAGE_SIZE);
1104         if (ldata->ld_buf->lb_buf == NULL)
1105                 return -ENOMEM;
1106
1107         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
1108         if (rc == -ERANGE) {
1109                 /* Buf was too small, figure out what we need. */
1110                 lu_buf_free(ldata->ld_buf);
1111                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1112                                    XATTR_NAME_LINK);
1113                 if (rc < 0)
1114                         return rc;
1115                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1116                 if (ldata->ld_buf->lb_buf == NULL)
1117                         return -ENOMEM;
1118                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1119                                   XATTR_NAME_LINK);
1120         }
1121         if (rc < 0) {
1122                 lu_buf_free(ldata->ld_buf);
1123                 ldata->ld_buf = NULL;
1124                 return rc;
1125         }
1126
1127         return linkea_init(ldata);
1128 }
1129
1130 /** Read the link EA into a temp buffer.
1131  * Uses the name_buf since it is generally large.
1132  * \retval IS_ERR err
1133  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1134  */
1135 struct lu_buf *mdd_links_get(const struct lu_env *env,
1136                              struct mdd_object *mdd_obj)
1137 {
1138         struct linkea_data ldata = { NULL };
1139         int rc;
1140
1141         rc = mdd_links_read(env, mdd_obj, &ldata);
1142         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1143 }
1144
1145 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1146                     struct linkea_data *ldata, struct thandle *handle)
1147 {
1148         const struct lu_buf *buf;
1149         int                 rc;
1150
1151         if (ldata == NULL || ldata->ld_buf == NULL ||
1152             ldata->ld_leh == NULL)
1153                 return 0;
1154
1155         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1156                                 ldata->ld_leh->leh_len);
1157         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1158                 return 0;
1159
1160         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1161         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1162             mdd_object_remote(mdd_obj) == 0) {
1163                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1164                 struct thandle  *sub_th;
1165
1166                 /* XXX: If the linkEA is overflow, then we need to notify the
1167                  *      namespace LFSCK to skip "nlink" attribute verification
1168                  *      on this object to avoid the "nlink" to be shrinked by
1169                  *      wrong. It may be not good an interaction with LFSCK
1170                  *      like this. We will consider to replace it with other
1171                  *      mechanism in future. LU-5802. */
1172                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1173                                LFSCK_TYPE_NAMESPACE);
1174
1175                 sub_th = thandle_get_sub_by_dt(env, handle,
1176                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1177                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1178                                 lr, sub_th);
1179         }
1180
1181         return rc;
1182 }
1183
1184 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1185                           struct thandle *handle, struct linkea_data *ldata,
1186                           enum mdd_links_add_overflow overflow)
1187 {
1188         int     rc;
1189         int     ea_len;
1190         void    *linkea;
1191
1192         if (ldata != NULL && ldata->ld_leh != NULL) {
1193                 ea_len = ldata->ld_leh->leh_len;
1194                 linkea = ldata->ld_buf->lb_buf;
1195         } else {
1196                 ea_len = DEFAULT_LINKEA_SIZE;
1197                 linkea = NULL;
1198         }
1199
1200         /* XXX: max size? */
1201         rc = mdo_declare_xattr_set(env, mdd_obj,
1202                                    mdd_buf_get_const(env, linkea, ea_len),
1203                                    XATTR_NAME_LINK, 0, handle);
1204         if (rc != 0)
1205                 return rc;
1206
1207         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1208                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1209                 struct thandle  *sub_th;
1210
1211                 /* XXX: If the linkEA is overflow, then we need to notify the
1212                  *      namespace LFSCK to skip "nlink" attribute verification
1213                  *      on this object to avoid the "nlink" to be shrinked by
1214                  *      wrong. It may be not good an interaction with LFSCK
1215                  *      like this. We will consider to replace it with other
1216                  *      mechanism in future. LU-5802. */
1217                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1218                                LFSCK_TYPE_NAMESPACE);
1219
1220                 sub_th = thandle_get_sub_by_dt(env, handle,
1221                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1222                 rc = lfsck_in_notify(env,
1223                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1224                                      lr, sub_th);
1225         }
1226
1227         return rc;
1228 }
1229
1230 static inline int mdd_declare_links_del(const struct lu_env *env,
1231                                         struct mdd_object *c,
1232                                         struct thandle *handle)
1233 {
1234         int rc = 0;
1235
1236         /* For directory, the linkEA will be removed together
1237          * with the object. */
1238         if (!S_ISDIR(mdd_object_type(c)))
1239                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1240
1241         return rc;
1242 }
1243
1244 static int mdd_declare_link(const struct lu_env *env,
1245                             struct mdd_device *mdd,
1246                             struct mdd_object *p,
1247                             struct mdd_object *c,
1248                             const struct lu_name *name,
1249                             struct thandle *handle,
1250                             struct lu_attr *la,
1251                             struct linkea_data *data)
1252 {
1253         struct lu_fid tfid = *mdo2fid(c);
1254         int rc;
1255
1256         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1257                 tfid.f_oid = cfs_fail_val;
1258
1259         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1260                                       name->ln_name, handle);
1261         if (rc != 0)
1262                 return rc;
1263
1264         rc = mdo_declare_ref_add(env, c, handle);
1265         if (rc != 0)
1266                 return rc;
1267
1268         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1269                 rc = mdo_declare_ref_add(env, c, handle);
1270                 if (rc != 0)
1271                         return rc;
1272         }
1273
1274         la->la_valid = LA_CTIME | LA_MTIME;
1275         rc = mdo_declare_attr_set(env, p, la, handle);
1276         if (rc != 0)
1277                 return rc;
1278
1279         la->la_valid = LA_CTIME;
1280         rc = mdo_declare_attr_set(env, c, la, handle);
1281         if (rc != 0)
1282                 return rc;
1283
1284         rc = mdd_declare_links_add(env, c, handle, data,
1285                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1286         if (rc != 0)
1287                 return rc;
1288
1289         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1290
1291         return rc;
1292 }
1293
1294 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1295                     struct md_object *src_obj, const struct lu_name *lname,
1296                     struct md_attr *ma)
1297 {
1298         const char *name = lname->ln_name;
1299         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1300         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1301         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1302         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1303         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1304         struct mdd_device *mdd = mdo2mdd(src_obj);
1305         struct thandle *handle;
1306         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1307         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1308         int rc;
1309         ENTRY;
1310
1311         rc = mdd_la_get(env, mdd_sobj, cattr);
1312         if (rc != 0)
1313                 RETURN(rc);
1314
1315         rc = mdd_la_get(env, mdd_tobj, tattr);
1316         if (rc != 0)
1317                 RETURN(rc);
1318
1319         handle = mdd_trans_create(env, mdd);
1320         if (IS_ERR(handle))
1321                 GOTO(out_pending, rc = PTR_ERR(handle));
1322
1323         memset(ldata, 0, sizeof(*ldata));
1324
1325         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1326         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1327
1328         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1329                               la, ldata);
1330         if (rc)
1331                 GOTO(stop, rc);
1332
1333         rc = mdd_trans_start(env, mdd, handle);
1334         if (rc)
1335                 GOTO(stop, rc);
1336
1337         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1338         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1339                                    cattr);
1340         if (rc)
1341                 GOTO(out_unlock, rc);
1342
1343         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1344                 rc = mdo_ref_add(env, mdd_sobj, handle);
1345                 if (rc != 0)
1346                         GOTO(out_unlock, rc);
1347         }
1348
1349         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1350                 rc = mdo_ref_add(env, mdd_sobj, handle);
1351                 if (rc != 0)
1352                         GOTO(out_unlock, rc);
1353         }
1354
1355         *tfid = *mdo2fid(mdd_sobj);
1356         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1357                 tfid->f_oid = cfs_fail_val;
1358
1359         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1360                                      mdd_object_type(mdd_sobj), name, handle);
1361         if (rc != 0) {
1362                 mdo_ref_del(env, mdd_sobj, handle);
1363                 GOTO(out_unlock, rc);
1364         }
1365
1366         la->la_valid = LA_CTIME | LA_MTIME;
1367         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1368         if (rc)
1369                 GOTO(out_unlock, rc);
1370
1371         la->la_valid = LA_CTIME;
1372         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1373         if (rc == 0) {
1374                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1375                                         mdo2fid(mdd_tobj), lname, 0, 0,
1376                                         ldata);
1377                 if (rc == 0)
1378                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1379                                       lname, handle, ldata, 0);
1380                 /* The failure of links_add should not cause the link
1381                  * failure, reset rc here */
1382                 rc = 0;
1383         }
1384         EXIT;
1385 out_unlock:
1386         mdd_write_unlock(env, mdd_sobj);
1387         if (rc == 0)
1388                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1389                                             mdo2fid(mdd_tobj), NULL, NULL,
1390                                             lname, NULL, handle);
1391 stop:
1392         rc = mdd_trans_stop(env, mdd, rc, handle);
1393         if (is_vmalloc_addr(ldata->ld_buf))
1394                 /* if we vmalloced a large buffer drop it */
1395                 lu_buf_free(ldata->ld_buf);
1396 out_pending:
1397         return rc;
1398 }
1399
1400 static int mdd_mark_orphan_object(const struct lu_env *env,
1401                                 struct mdd_object *obj, struct thandle *handle,
1402                                 bool declare)
1403 {
1404         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1405         int rc;
1406
1407         if (!S_ISDIR(mdd_object_type(obj)))
1408                 return 0;
1409
1410         attr->la_valid = LA_FLAGS;
1411         attr->la_flags = LUSTRE_ORPHAN_FL;
1412
1413         if (declare)
1414                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1415         else
1416                 rc = mdo_attr_set(env, obj, attr, handle);
1417
1418         return rc;
1419 }
1420
1421 static int mdd_declare_finish_unlink(const struct lu_env *env,
1422                                      struct mdd_object *obj,
1423                                      struct thandle *handle)
1424 {
1425         int     rc;
1426
1427         /* Sigh, we do not know if the unlink object will become orphan in
1428          * declare phase, but fortunately the flags here does not matter
1429          * in current declare implementation */
1430         rc = mdd_mark_orphan_object(env, obj, handle, true);
1431         if (rc != 0)
1432                 return rc;
1433
1434         rc = mdo_declare_destroy(env, obj, handle);
1435         if (rc != 0)
1436                 return rc;
1437
1438         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1439         if (rc != 0)
1440                 return rc;
1441
1442         return mdd_declare_links_del(env, obj, handle);
1443 }
1444
1445 /* caller should take a lock before calling */
1446 int mdd_finish_unlink(const struct lu_env *env,
1447                       struct mdd_object *obj, struct md_attr *ma,
1448                       const struct mdd_object *pobj,
1449                       const struct lu_name *lname,
1450                       struct thandle *th)
1451 {
1452         int rc = 0;
1453         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1454         ENTRY;
1455
1456         LASSERT(mdd_write_locked(env, obj) != 0);
1457
1458         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1459                 /* add new orphan and the object
1460                  * will be deleted during mdd_close() */
1461                 obj->mod_flags |= DEAD_OBJ;
1462                 if (obj->mod_count) {
1463                         rc = __mdd_orphan_add(env, obj, th);
1464                         if (rc == 0)
1465                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1466                                         "orphan list, open count = %d\n",
1467                                         PFID(mdd_object_fid(obj)),
1468                                         obj->mod_count);
1469                         else
1470                                 CERROR("Object "DFID" fail to be an orphan, "
1471                                        "open count = %d, maybe cause failed "
1472                                        "open replay\n",
1473                                         PFID(mdd_object_fid(obj)),
1474                                         obj->mod_count);
1475
1476                         /* mark object as an orphan here, not
1477                          * before __mdd_orphan_add() as racing
1478                          * mdd_la_get() may propagate ORPHAN_OBJ
1479                          * causing the asserition */
1480                         rc = mdd_mark_orphan_object(env, obj, th, false);
1481                 } else {
1482                         rc = mdo_destroy(env, obj, th);
1483                 }
1484         } else if (!is_dir) {
1485                 /* old files may not have link ea; ignore errors */
1486                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1487         }
1488
1489         RETURN(rc);
1490 }
1491
1492 /*
1493  * pobj maybe NULL
1494  * has mdd_write_lock on cobj already, but not on pobj yet
1495  */
1496 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1497                             const struct lu_attr *pattr,
1498                             struct mdd_object *cobj,
1499                             const struct lu_attr *cattr)
1500 {
1501         int rc;
1502         ENTRY;
1503
1504         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1505
1506         RETURN(rc);
1507 }
1508
1509 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1510                               struct mdd_object *p, struct mdd_object *c,
1511                               const struct lu_name *name, struct md_attr *ma,
1512                               struct thandle *handle, int no_name, int is_dir)
1513 {
1514         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1515         int              rc;
1516
1517         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1518                 if (likely(no_name == 0)) {
1519                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1520                                                       handle);
1521                         if (rc != 0)
1522                                 return rc;
1523                 }
1524
1525                 if (is_dir != 0) {
1526                         rc = mdo_declare_ref_del(env, p, handle);
1527                         if (rc != 0)
1528                                 return rc;
1529                 }
1530         }
1531
1532         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1533         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1534         la->la_valid = LA_CTIME | LA_MTIME;
1535         rc = mdo_declare_attr_set(env, p, la, handle);
1536         if (rc)
1537                 return rc;
1538
1539         if (c != NULL) {
1540                 rc = mdo_declare_ref_del(env, c, handle);
1541                 if (rc)
1542                         return rc;
1543
1544                 rc = mdo_declare_ref_del(env, c, handle);
1545                 if (rc)
1546                         return rc;
1547
1548                 la->la_valid = LA_CTIME;
1549                 rc = mdo_declare_attr_set(env, c, la, handle);
1550                 if (rc)
1551                         return rc;
1552
1553                 rc = mdd_declare_finish_unlink(env, c, handle);
1554                 if (rc)
1555                         return rc;
1556
1557                 /* FIXME: need changelog for remove entry */
1558                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1559         }
1560
1561         return rc;
1562 }
1563
1564 /*
1565  * test if a file has an HSM archive
1566  * if HSM attributes are not found in ma update them from
1567  * HSM xattr
1568  */
1569 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1570                                    struct mdd_object *obj,
1571                                    struct md_attr *ma)
1572 {
1573         ENTRY;
1574
1575         if (!(ma->ma_valid & MA_HSM)) {
1576                 /* no HSM MD provided, read xattr */
1577                 struct lu_buf   *hsm_buf;
1578                 const size_t     buflen = sizeof(struct hsm_attrs);
1579                 int              rc;
1580
1581                 hsm_buf = mdd_buf_get(env, NULL, 0);
1582                 lu_buf_alloc(hsm_buf, buflen);
1583                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1584                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1585                 lu_buf_free(hsm_buf);
1586                 if (rc < 0)
1587                         RETURN(false);
1588
1589                 ma->ma_valid |= MA_HSM;
1590         }
1591         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1592                 RETURN(true);
1593         RETURN(false);
1594 }
1595
1596 /**
1597  * Delete name entry and the object.
1598  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1599  * does not exist for this object, and it could only happen during resending
1600  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1601  * is also needed in this case(needed by changelog), so we have to add another
1602  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1603  * the ENOENT failure should be able to be fixed by redo mechanism.
1604  */
1605 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1606                       struct md_object *cobj, const struct lu_name *lname,
1607                       struct md_attr *ma, int no_name)
1608 {
1609         const char *name = lname->ln_name;
1610         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1611         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1612         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1613         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1614         struct mdd_object *mdd_cobj = NULL;
1615         struct mdd_device *mdd = mdo2mdd(pobj);
1616         struct thandle    *handle;
1617         int rc, is_dir = 0, cl_flags = 0;
1618         ENTRY;
1619
1620         /* cobj == NULL means only delete name entry */
1621         if (likely(cobj != NULL)) {
1622                 mdd_cobj = md2mdd_obj(cobj);
1623                 if (mdd_object_exists(mdd_cobj) == 0)
1624                         RETURN(-ENOENT);
1625         }
1626
1627         rc = mdd_la_get(env, mdd_pobj, pattr);
1628         if (rc)
1629                 RETURN(rc);
1630
1631         if (likely(mdd_cobj != NULL)) {
1632                 /* fetch cattr */
1633                 rc = mdd_la_get(env, mdd_cobj, cattr);
1634                 if (rc)
1635                         RETURN(rc);
1636
1637                 is_dir = S_ISDIR(cattr->la_mode);
1638                 /* search for an existing archive.
1639                  * we should check ahead as the object
1640                  * can be destroyed in this transaction */
1641                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1642                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1643         }
1644
1645         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1646         if (rc)
1647                 RETURN(rc);
1648
1649         handle = mdd_trans_create(env, mdd);
1650         if (IS_ERR(handle))
1651                 RETURN(PTR_ERR(handle));
1652
1653         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1654                                 lname, ma, handle, no_name, is_dir);
1655         if (rc)
1656                 GOTO(stop, rc);
1657
1658         rc = mdd_trans_start(env, mdd, handle);
1659         if (rc)
1660                 GOTO(stop, rc);
1661
1662         if (likely(mdd_cobj != NULL))
1663                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1664
1665         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1666                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1667                 if (rc)
1668                         GOTO(cleanup, rc);
1669         }
1670
1671         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1672             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1673                 GOTO(cleanup, rc = 0);
1674
1675         if (likely(mdd_cobj != NULL)) {
1676                 rc = mdo_ref_del(env, mdd_cobj, handle);
1677                 if (rc != 0) {
1678                         __mdd_index_insert_only(env, mdd_pobj,
1679                                                 mdo2fid(mdd_cobj),
1680                                                 mdd_object_type(mdd_cobj),
1681                                                 name, handle);
1682                         GOTO(cleanup, rc);
1683                 }
1684
1685                 if (is_dir)
1686                         /* unlink dot */
1687                         mdo_ref_del(env, mdd_cobj, handle);
1688
1689                 /* fetch updated nlink */
1690                 rc = mdd_la_get(env, mdd_cobj, cattr);
1691                 if (rc)
1692                         GOTO(cleanup, rc);
1693         }
1694
1695         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1696         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1697
1698         la->la_valid = LA_CTIME | LA_MTIME;
1699         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1700         if (rc)
1701                 GOTO(cleanup, rc);
1702
1703         /* Enough for only unlink the entry */
1704         if (unlikely(mdd_cobj == NULL))
1705                 GOTO(stop, rc);
1706
1707         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1708                 /* update ctime of an unlinked file only if it is still
1709                  * opened or a link still exists */
1710                 la->la_valid = LA_CTIME;
1711                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1712                 if (rc)
1713                         GOTO(cleanup, rc);
1714         }
1715
1716         /* XXX: this transfer to ma will be removed with LOD/OSP */
1717         ma->ma_attr = *cattr;
1718         ma->ma_valid |= MA_INODE;
1719         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1720         if (rc != 0)
1721                 GOTO(cleanup, rc);
1722
1723         /* fetch updated nlink */
1724         rc = mdd_la_get(env, mdd_cobj, cattr);
1725         /* if object is removed then we can't get its attrs,
1726          * use last get */
1727         if (rc == -ENOENT) {
1728                 cattr->la_nlink = 0;
1729                 rc = 0;
1730         }
1731
1732         if (cattr->la_nlink == 0) {
1733                 ma->ma_attr = *cattr;
1734                 ma->ma_valid |= MA_INODE;
1735         }
1736
1737         EXIT;
1738 cleanup:
1739         if (likely(mdd_cobj != NULL))
1740                 mdd_write_unlock(env, mdd_cobj);
1741
1742         if (rc == 0) {
1743                 if (cattr->la_nlink == 0)
1744                         cl_flags |= CLF_UNLINK_LAST;
1745                 else
1746                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1747
1748                 rc = mdd_changelog_ns_store(env, mdd,
1749                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1750                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1751                         handle);
1752         }
1753
1754 stop:
1755         rc = mdd_trans_stop(env, mdd, rc, handle);
1756
1757         return rc;
1758 }
1759
1760 /*
1761  * The permission has been checked when obj created, no need check again.
1762  */
1763 static int mdd_cd_sanity_check(const struct lu_env *env,
1764                                struct mdd_object *obj)
1765 {
1766         ENTRY;
1767
1768         /* EEXIST check */
1769         if (!obj || mdd_is_dead_obj(obj))
1770                 RETURN(-ENOENT);
1771
1772         RETURN(0);
1773 }
1774
1775 static int mdd_create_data(const struct lu_env *env,
1776                            struct md_object *pobj,
1777                            struct md_object *cobj,
1778                            const struct md_op_spec *spec,
1779                            struct md_attr *ma)
1780 {
1781         struct mdd_device *mdd = mdo2mdd(cobj);
1782         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1783         struct mdd_object *son = md2mdd_obj(cobj);
1784         struct thandle    *handle;
1785         const struct lu_buf *buf;
1786         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1787         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1788         int                rc;
1789         ENTRY;
1790
1791         rc = mdd_cd_sanity_check(env, son);
1792         if (rc)
1793                 RETURN(rc);
1794
1795         if (!md_should_create(spec->sp_cr_flags))
1796                 RETURN(0);
1797
1798         /*
1799          * there are following use cases for this function:
1800          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1801          *    striping can be specified or not
1802          * 2) CMD?
1803          */
1804         rc = mdd_la_get(env, son, attr);
1805         if (rc)
1806                 RETURN(rc);
1807
1808         /* calling ->ah_make_hint() is used to transfer information from parent */
1809         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1810
1811         handle = mdd_trans_create(env, mdd);
1812         if (IS_ERR(handle))
1813                 GOTO(out_free, rc = PTR_ERR(handle));
1814
1815         /*
1816          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1817          * Should this be fixed?
1818          */
1819         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1820                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1821                spec->sp_cr_flags, spec->no_create);
1822
1823         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1824                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1825                                         spec->u.sp_ea.eadatalen);
1826         } else {
1827                 buf = &LU_BUF_NULL;
1828         }
1829
1830         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1831                                   XATTR_NAME_LOV, 0, handle);
1832         if (rc)
1833                 GOTO(stop, rc);
1834
1835         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1836         if (rc)
1837                 GOTO(stop, rc);
1838
1839         rc = mdd_trans_start(env, mdd, handle);
1840         if (rc)
1841                 GOTO(stop, rc);
1842
1843         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1844                           0, handle);
1845
1846         if (rc)
1847                 GOTO(stop, rc);
1848
1849         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1850
1851 stop:
1852         rc = mdd_trans_stop(env, mdd, rc, handle);
1853
1854 out_free:
1855         RETURN(rc);
1856 }
1857
1858 static int mdd_declare_object_initialize(const struct lu_env *env,
1859                                          struct mdd_object *parent,
1860                                          struct mdd_object *child,
1861                                          const struct lu_attr *attr,
1862                                          struct thandle *handle)
1863 {
1864         int rc;
1865         ENTRY;
1866
1867         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1868         if (!S_ISDIR(attr->la_mode))
1869                 RETURN(0);
1870
1871         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1872                                       dot, handle);
1873         if (rc != 0)
1874                 RETURN(rc);
1875
1876         rc = mdo_declare_ref_add(env, child, handle);
1877         if (rc != 0)
1878                 RETURN(rc);
1879
1880         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1881                                       dotdot, handle);
1882
1883         RETURN(rc);
1884 }
1885
1886 static int mdd_object_initialize(const struct lu_env *env,
1887                                  const struct lu_fid *pfid,
1888                                  struct mdd_object *child,
1889                                  struct lu_attr *attr, struct thandle *handle,
1890                                  const struct md_op_spec *spec)
1891 {
1892         int rc = 0;
1893         ENTRY;
1894
1895         if (S_ISDIR(attr->la_mode)) {
1896                 /* Add "." and ".." for newly created dir */
1897                 mdo_ref_add(env, child, handle);
1898                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1899                                              S_IFDIR, dot, handle);
1900                 if (rc == 0)
1901                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1902                                                      dotdot, handle);
1903                 if (rc != 0)
1904                         mdo_ref_del(env, child, handle);
1905         }
1906
1907         RETURN(rc);
1908 }
1909
1910 /**
1911  * This function checks whether it can create a file/dir under the
1912  * directory(@pobj). The directory(@pobj) is not being locked by
1913  * mdd lock.
1914  *
1915  * \param[in] env       execution environment
1916  * \param[in] pobj      the directory to create files
1917  * \param[in] pattr     the attributes of the directory
1918  * \param[in] lname     the name of the created file/dir
1919  * \param[in] cattr     the attributes of the file/dir
1920  * \param[in] spec      create specification
1921  *
1922  * \retval              = 0 it is allowed to create file/dir under
1923  *                      the directory
1924  * \retval              negative error not allowed to create file/dir
1925  *                      under the directory
1926  */
1927 static int mdd_create_sanity_check(const struct lu_env *env,
1928                                    struct md_object *pobj,
1929                                    const struct lu_attr *pattr,
1930                                    const struct lu_name *lname,
1931                                    struct lu_attr *cattr,
1932                                    struct md_op_spec *spec)
1933 {
1934         struct mdd_thread_info *info = mdd_env_info(env);
1935         struct lu_fid     *fid       = &info->mti_fid;
1936         struct mdd_object *obj       = md2mdd_obj(pobj);
1937         struct mdd_device *m         = mdo2mdd(pobj);
1938         bool            check_perm = true;
1939         int rc;
1940         ENTRY;
1941
1942         /* EEXIST check */
1943         if (mdd_is_dead_obj(obj))
1944                 RETURN(-ENOENT);
1945
1946         /*
1947          * In some cases this lookup is not needed - we know before if name
1948          * exists or not because MDT performs lookup for it.
1949          * name length check is done in lookup.
1950          */
1951         if (spec->sp_cr_lookup) {
1952                 /*
1953                  * Check if the name already exist, though it will be checked in
1954                  * _index_insert also, for avoiding rolling back if exists
1955                  * _index_insert.
1956                  */
1957                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1958                                   MAY_WRITE | MAY_EXEC);
1959                 if (rc != -ENOENT)
1960                         RETURN(rc ? : -EEXIST);
1961
1962                 /* Permission is already being checked in mdd_lookup */
1963                 check_perm = false;
1964         }
1965
1966         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1967         if (rc != 0)
1968                 RETURN(rc);
1969
1970         /* sgid check */
1971         if (pattr->la_mode & S_ISGID) {
1972                 cattr->la_gid = pattr->la_gid;
1973                 if (S_ISDIR(cattr->la_mode)) {
1974                         cattr->la_mode |= S_ISGID;
1975                         cattr->la_valid |= LA_MODE;
1976                 }
1977         }
1978
1979         rc = mdd_name_check(m, lname);
1980         if (rc < 0)
1981                 RETURN(rc);
1982
1983         switch (cattr->la_mode & S_IFMT) {
1984         case S_IFLNK: {
1985                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1986
1987                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
1988                         RETURN(-ENAMETOOLONG);
1989                 else
1990                         RETURN(0);
1991         }
1992         case S_IFDIR:
1993         case S_IFREG:
1994         case S_IFCHR:
1995         case S_IFBLK:
1996         case S_IFIFO:
1997         case S_IFSOCK:
1998                 rc = 0;
1999                 break;
2000         default:
2001                 rc = -EINVAL;
2002                 break;
2003         }
2004         RETURN(rc);
2005 }
2006
2007 static int mdd_declare_object_create(const struct lu_env *env,
2008                                      struct mdd_device *mdd,
2009                                      struct mdd_object *p, struct mdd_object *c,
2010                                      struct lu_attr *attr,
2011                                      struct thandle *handle,
2012                                      const struct md_op_spec *spec,
2013                                      struct lu_buf *def_acl_buf,
2014                                      struct lu_buf *acl_buf,
2015                                      struct dt_allocation_hint *hint)
2016 {
2017         const struct lu_buf *buf;
2018         int rc;
2019
2020         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
2021                                                 hint);
2022         if (rc)
2023                 GOTO(out, rc);
2024
2025 #ifdef CONFIG_FS_POSIX_ACL
2026         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2027                 /* if dir, then can inherit default ACl */
2028                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2029                                            XATTR_NAME_ACL_DEFAULT,
2030                                            0, handle);
2031                 if (rc)
2032                         GOTO(out, rc);
2033         }
2034
2035         if (acl_buf->lb_len > 0) {
2036                 rc = mdo_declare_attr_set(env, c, attr, handle);
2037                 if (rc)
2038                         GOTO(out, rc);
2039
2040                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2041                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2042                 if (rc)
2043                         GOTO(out, rc);
2044         }
2045 #endif
2046         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2047         if (rc)
2048                 GOTO(out, rc);
2049
2050         /* replay case, create LOV EA from client data */
2051         if (spec->no_create ||
2052             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2053                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2054                                         spec->u.sp_ea.eadatalen);
2055                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2056                                            handle);
2057                 if (rc)
2058                         GOTO(out, rc);
2059         }
2060
2061         if (S_ISLNK(attr->la_mode)) {
2062                 const char *target_name = spec->u.sp_symname;
2063                 int sym_len = strlen(target_name);
2064                 const struct lu_buf *buf;
2065
2066                 buf = mdd_buf_get_const(env, target_name, sym_len);
2067                 rc = dt_declare_record_write(env, mdd_object_child(c),
2068                                              buf, 0, handle);
2069                 if (rc)
2070                         GOTO(out, rc);
2071         }
2072
2073         if (spec->sp_cr_file_secctx_name != NULL) {
2074                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2075                                         spec->sp_cr_file_secctx_size);
2076                 rc = mdo_declare_xattr_set(env, c, buf,
2077                                            spec->sp_cr_file_secctx_name, 0,
2078                                            handle);
2079                 if (rc < 0)
2080                         GOTO(out, rc);
2081         }
2082 out:
2083         return rc;
2084 }
2085
2086 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2087                               struct mdd_object *p, struct mdd_object *c,
2088                               const struct lu_name *name,
2089                               struct lu_attr *attr,
2090                               struct thandle *handle,
2091                               const struct md_op_spec *spec,
2092                               struct linkea_data *ldata,
2093                               struct lu_buf *def_acl_buf,
2094                               struct lu_buf *acl_buf,
2095                               struct dt_allocation_hint *hint)
2096 {
2097         int rc;
2098
2099         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2100                                        def_acl_buf, acl_buf, hint);
2101         if (rc)
2102                 GOTO(out, rc);
2103
2104         if (S_ISDIR(attr->la_mode)) {
2105                 rc = mdo_declare_ref_add(env, p, handle);
2106                 if (rc)
2107                         GOTO(out, rc);
2108         }
2109
2110         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2111                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2112                 if (rc)
2113                         GOTO(out, rc);
2114         } else {
2115                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2116
2117                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2118                                               name->ln_name, handle);
2119                 if (rc != 0)
2120                         return rc;
2121
2122                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2123                 if (rc)
2124                         return rc;
2125
2126                 *la = *attr;
2127                 la->la_valid = LA_CTIME | LA_MTIME;
2128                 rc = mdo_declare_attr_set(env, p, la, handle);
2129                 if (rc)
2130                         return rc;
2131
2132                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2133                 if (rc)
2134                         return rc;
2135         }
2136 out:
2137         return rc;
2138 }
2139
2140 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2141                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2142                         struct lu_buf *acl_buf)
2143 {
2144         int     rc;
2145         ENTRY;
2146
2147         if (S_ISLNK(la->la_mode)) {
2148                 acl_buf->lb_len = 0;
2149                 def_acl_buf->lb_len = 0;
2150                 RETURN(0);
2151         }
2152
2153         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2154         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2155                            XATTR_NAME_ACL_DEFAULT);
2156         mdd_read_unlock(env, pobj);
2157         if (rc > 0) {
2158                 /* If there are default ACL, fix mode/ACL by default ACL */
2159                 def_acl_buf->lb_len = rc;
2160                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2161                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2162                 acl_buf->lb_len = rc;
2163                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2164                 if (rc < 0)
2165                         RETURN(rc);
2166         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2167                 /* If there are no default ACL, fix mode by mask */
2168                 struct lu_ucred *uc = lu_ucred(env);
2169
2170                 /* The create triggered by MDT internal events, such as
2171                  * LFSCK reset, will not contain valid "uc". */
2172                 if (unlikely(uc != NULL))
2173                         la->la_mode &= ~uc->uc_umask;
2174                 rc = 0;
2175                 acl_buf->lb_len = 0;
2176                 def_acl_buf->lb_len = 0;
2177         }
2178
2179         RETURN(rc);
2180 }
2181
2182 /**
2183  * Create a metadata object and initialize it, set acl, xattr.
2184  **/
2185 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2186                              struct mdd_object *son, struct lu_attr *attr,
2187                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2188                              struct lu_buf *def_acl_buf,
2189                              struct dt_allocation_hint *hint,
2190                              struct thandle *handle)
2191 {
2192         const struct lu_buf    *buf;
2193         int                     rc;
2194
2195         mdd_write_lock(env, son, MOR_TGT_CHILD);
2196         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2197                                         hint);
2198         if (rc)
2199                 GOTO(unlock, rc);
2200
2201         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2202          * created in declare phase, they also needs to be added to master
2203          * object as sub-directory entry. So it has to initialize the master
2204          * object, then set dir striped EA.(in mdo_xattr_set) */
2205         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2206                                    spec);
2207         if (rc != 0)
2208                 GOTO(err_destroy, rc);
2209
2210         /*
2211          * in case of replay we just set LOVEA provided by the client
2212          * XXX: I think it would be interesting to try "old" way where
2213          *      MDT calls this xattr_set(LOV) in a different transaction.
2214          *      probably this way we code can be made better.
2215          */
2216
2217         /* During creation, there are only a few cases we need do xattr_set to
2218          * create stripes.
2219          * 1. regular file: see comments above.
2220          * 2. dir: inherit default striping or pool settings from parent.
2221          * 3. create striped directory with provided stripeEA.
2222          * 4. create striped directory because inherit default layout from the
2223          * parent.
2224          */
2225         if (spec->no_create ||
2226             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2227             S_ISDIR(attr->la_mode)) {
2228                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2229                                         spec->u.sp_ea.eadatalen);
2230                 rc = mdo_xattr_set(env, son, buf,
2231                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2232                                                             XATTR_NAME_LOV, 0,
2233                                    handle);
2234                 if (rc != 0)
2235                         GOTO(err_destroy, rc);
2236         }
2237
2238 #ifdef CONFIG_FS_POSIX_ACL
2239         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2240             S_ISDIR(attr->la_mode)) {
2241                 /* set default acl */
2242                 rc = mdo_xattr_set(env, son, def_acl_buf,
2243                                    XATTR_NAME_ACL_DEFAULT, 0,
2244                                    handle);
2245                 if (rc)
2246                         GOTO(err_destroy, rc);
2247         }
2248         /* set its own acl */
2249         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2250                 rc = mdo_xattr_set(env, son, acl_buf,
2251                                    XATTR_NAME_ACL_ACCESS,
2252                                    0, handle);
2253                 if (rc)
2254                         GOTO(err_destroy, rc);
2255         }
2256 #endif
2257
2258         if (S_ISLNK(attr->la_mode)) {
2259                 struct lu_ucred  *uc = lu_ucred_assert(env);
2260                 struct dt_object *dt = mdd_object_child(son);
2261                 const char *target_name = spec->u.sp_symname;
2262                 int sym_len = strlen(target_name);
2263                 const struct lu_buf *buf;
2264                 loff_t pos = 0;
2265
2266                 buf = mdd_buf_get_const(env, target_name, sym_len);
2267                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2268                                                 uc->uc_cap &
2269                                                 CFS_CAP_SYS_RESOURCE_MASK);
2270
2271                 if (rc == sym_len)
2272                         rc = 0;
2273                 else
2274                         GOTO(err_initlized, rc = -EFAULT);
2275         }
2276
2277         if (spec->sp_cr_file_secctx_name != NULL) {
2278                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2279                                         spec->sp_cr_file_secctx_size);
2280                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2281                                    0, handle);
2282                 if (rc < 0)
2283                         GOTO(err_initlized, rc);
2284         }
2285
2286 err_initlized:
2287         if (unlikely(rc != 0)) {
2288                 int rc2;
2289                 if (S_ISDIR(attr->la_mode)) {
2290                         /* Drop the reference, no need to delete "."/"..",
2291                          * because the object to be destroied directly. */
2292                         rc2 = mdo_ref_del(env, son, handle);
2293                         if (rc2 != 0)
2294                                 GOTO(unlock, rc);
2295                 }
2296                 rc2 = mdo_ref_del(env, son, handle);
2297                 if (rc2 != 0)
2298                         GOTO(unlock, rc);
2299 err_destroy:
2300                 mdo_destroy(env, son, handle);
2301         }
2302 unlock:
2303         mdd_write_unlock(env, son);
2304         RETURN(rc);
2305 }
2306
2307 static int mdd_index_delete(const struct lu_env *env,
2308                             struct mdd_object *mdd_pobj,
2309                             struct lu_attr *cattr,
2310                             const struct lu_name *lname)
2311 {
2312         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2313         struct thandle *handle;
2314         int rc;
2315         ENTRY;
2316
2317         handle = mdd_trans_create(env, mdd);
2318         if (IS_ERR(handle))
2319                 RETURN(PTR_ERR(handle));
2320
2321         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2322                                       handle);
2323         if (rc != 0)
2324                 GOTO(stop, rc);
2325
2326         if (S_ISDIR(cattr->la_mode)) {
2327                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2328                 if (rc != 0)
2329                         GOTO(stop, rc);
2330         }
2331
2332         /* Since this will only be used in the error handler path,
2333          * Let's set the thandle to be local and not mess the transno */
2334         handle->th_local = 1;
2335         rc = mdd_trans_start(env, mdd, handle);
2336         if (rc)
2337                 GOTO(stop, rc);
2338
2339         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2340                                 S_ISDIR(cattr->la_mode), handle);
2341         if (rc)
2342                 GOTO(stop, rc);
2343 stop:
2344         rc = mdd_trans_stop(env, mdd, rc, handle);
2345
2346         RETURN(rc);
2347 }
2348
2349 /*
2350  * Create object and insert it into namespace.
2351  */
2352 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2353                       const struct lu_name *lname, struct md_object *child,
2354                       struct md_op_spec *spec, struct md_attr* ma)
2355 {
2356         struct mdd_thread_info  *info = mdd_env_info(env);
2357         struct lu_attr          *la = &info->mti_la_for_fix;
2358         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2359         struct mdd_object       *son = md2mdd_obj(child);
2360         struct mdd_device       *mdd = mdo2mdd(pobj);
2361         struct lu_attr          *attr = &ma->ma_attr;
2362         struct thandle          *handle;
2363         struct lu_attr          *pattr = &info->mti_pattr;
2364         struct lu_buf           acl_buf;
2365         struct lu_buf           def_acl_buf;
2366         struct linkea_data      *ldata = &info->mti_link_data;
2367         const char              *name = lname->ln_name;
2368         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2369         int                      rc;
2370         int                      rc2;
2371         ENTRY;
2372
2373         /*
2374          * Two operations have to be performed:
2375          *
2376          *  - an allocation of a new object (->do_create()), and
2377          *
2378          *  - an insertion into a parent index (->dio_insert()).
2379          *
2380          * Due to locking, operation order is not important, when both are
2381          * successful, *but* error handling cases are quite different:
2382          *
2383          *  - if insertion is done first, and following object creation fails,
2384          *  insertion has to be rolled back, but this operation might fail
2385          *  also leaving us with dangling index entry.
2386          *
2387          *  - if creation is done first, is has to be undone if insertion
2388          *  fails, leaving us with leaked space, which is neither good, nor
2389          *  fatal.
2390          *
2391          * It seems that creation-first is simplest solution, but it is
2392          * sub-optimal in the frequent
2393          *
2394          *         $ mkdir foo
2395          *         $ mkdir foo
2396          *
2397          * case, because second mkdir is bound to create object, only to
2398          * destroy it immediately.
2399          *
2400          * To avoid this follow local file systems that do double lookup:
2401          *
2402          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2403          *
2404          *     1. create            (mdd_object_create_internal())
2405          *
2406          *     2. insert            (__mdd_index_insert(), lookup again)
2407          */
2408
2409         rc = mdd_la_get(env, mdd_pobj, pattr);
2410         if (rc != 0)
2411                 RETURN(rc);
2412
2413         /* Sanity checks before big job. */
2414         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2415         if (rc)
2416                 RETURN(rc);
2417
2418         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2419                 GOTO(out_free, rc = -EINPROGRESS);
2420
2421         handle = mdd_trans_create(env, mdd);
2422         if (IS_ERR(handle))
2423                 GOTO(out_free, rc = PTR_ERR(handle));
2424
2425         acl_buf.lb_buf = info->mti_xattr_buf;
2426         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2427         def_acl_buf.lb_buf = info->mti_key;
2428         def_acl_buf.lb_len = sizeof(info->mti_key);
2429         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2430         if (rc < 0)
2431                 GOTO(out_stop, rc);
2432
2433         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2434
2435         memset(ldata, 0, sizeof(*ldata));
2436         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2437                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2438
2439                 tfid.f_oid--;
2440                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2441                                         &tfid, lname, 1, 0, ldata);
2442         } else {
2443                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2444                                         mdd_object_fid(mdd_pobj),
2445                                         lname, 1, 0, ldata);
2446         }
2447
2448         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2449                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2450                                 hint);
2451         if (rc)
2452                 GOTO(out_stop, rc);
2453
2454         rc = mdd_trans_start(env, mdd, handle);
2455         if (rc)
2456                 GOTO(out_stop, rc);
2457
2458         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2459                                &def_acl_buf, hint, handle);
2460         if (rc != 0)
2461                 GOTO(out_stop, rc);
2462
2463         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2464                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2465                 rc = __mdd_orphan_add(env, son, handle);
2466                 GOTO(out_volatile, rc);
2467         } else {
2468                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2469                                         attr->la_mode, name, handle);
2470                 if (rc != 0)
2471                         GOTO(err_created, rc);
2472
2473                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2474                               ldata, 1);
2475
2476                 /* update parent directory mtime/ctime */
2477                 *la = *attr;
2478                 la->la_valid = LA_CTIME | LA_MTIME;
2479                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2480                 if (rc)
2481                         GOTO(err_insert, rc);
2482         }
2483
2484         EXIT;
2485 err_insert:
2486         if (rc != 0) {
2487                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2488                         rc2 = __mdd_orphan_del(env, son, handle);
2489                 else
2490                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2491                                                  S_ISDIR(attr->la_mode),
2492                                                  handle);
2493                 if (rc2 != 0)
2494                         goto out_stop;
2495
2496 err_created:
2497                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2498                 if (S_ISDIR(attr->la_mode)) {
2499                         /* Drop the reference, no need to delete "."/"..",
2500                          * because the object is to be destroyed directly. */
2501                         rc2 = mdo_ref_del(env, son, handle);
2502                         if (rc2 != 0) {
2503                                 mdd_write_unlock(env, son);
2504                                 goto out_stop;
2505                         }
2506                 }
2507 out_volatile:
2508                 /* For volatile files drop one link immediately, since there is
2509                  * no filename in the namespace, and save any error returned. */
2510                 rc2 = mdo_ref_del(env, son, handle);
2511                 if (rc2 != 0) {
2512                         mdd_write_unlock(env, son);
2513                         if (unlikely(rc == 0))
2514                                 rc = rc2;
2515                         goto out_stop;
2516                 }
2517
2518                 /* Don't destroy the volatile object on success */
2519                 if (likely(rc != 0))
2520                         mdo_destroy(env, son, handle);
2521                 mdd_write_unlock(env, son);
2522         }
2523
2524         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2525             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2526                 rc = mdd_changelog_ns_store(env, mdd,
2527                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2528                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2529                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2530                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2531                                 NULL, handle);
2532 out_stop:
2533         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2534         if (rc == 0) {
2535                 /* If creation fails, it is most likely due to the remote update
2536                  * failure, because local transaction will mostly succeed at
2537                  * this stage. There is no easy way to rollback all of previous
2538                  * updates, so let's remove the object from namespace, and
2539                  * LFSCK should handle the orphan object. */
2540                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2541                         mdd_index_delete(env, mdd_pobj, attr, lname);
2542                 rc = rc2;
2543         }
2544 out_free:
2545         if (is_vmalloc_addr(ldata->ld_buf))
2546                 /* if we vmalloced a large buffer drop it */
2547                 lu_buf_free(ldata->ld_buf);
2548
2549         /* The child object shouldn't be cached anymore */
2550         if (rc)
2551                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2552                         &child->mo_lu.lo_header->loh_flags);
2553         return rc;
2554 }
2555
2556 /*
2557  * Get locks on parents in proper order
2558  * RETURN: < 0 - error, rename_order if successful
2559  */
2560 enum rename_order {
2561         MDD_RN_SAME,
2562         MDD_RN_SRCTGT,
2563         MDD_RN_TGTSRC
2564 };
2565
2566 static int mdd_rename_order(const struct lu_env *env,
2567                             struct mdd_device *mdd,
2568                             struct mdd_object *src_pobj,
2569                             const struct lu_attr *pattr,
2570                             struct mdd_object *tgt_pobj)
2571 {
2572         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2573         int rc;
2574         ENTRY;
2575
2576         if (src_pobj == tgt_pobj)
2577                 RETURN(MDD_RN_SAME);
2578
2579         /* compared the parent child relationship of src_p&tgt_p */
2580         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2581                 rc = MDD_RN_SRCTGT;
2582         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2583                 rc = MDD_RN_TGTSRC;
2584         } else {
2585                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2586                                    NULL);
2587                 if (rc == -EREMOTE)
2588                         rc = 0;
2589
2590                 if (rc == 1)
2591                         rc = MDD_RN_TGTSRC;
2592                 else if (rc == 0)
2593                         rc = MDD_RN_SRCTGT;
2594         }
2595
2596         RETURN(rc);
2597 }
2598
2599 /* has not mdd_write{read}_lock on any obj yet. */
2600 static int mdd_rename_sanity_check(const struct lu_env *env,
2601                                    struct mdd_object *src_pobj,
2602                                    const struct lu_attr *pattr,
2603                                    struct mdd_object *tgt_pobj,
2604                                    const struct lu_attr *tpattr,
2605                                    struct mdd_object *sobj,
2606                                    const struct lu_attr *cattr,
2607                                    struct mdd_object *tobj,
2608                                    const struct lu_attr *tattr)
2609 {
2610         int rc = 0;
2611         ENTRY;
2612
2613         /* XXX: when get here, sobj must NOT be NULL,
2614          * the other case has been processed in cld_rename
2615          * before mdd_rename and enable MDS_PERM_BYPASS. */
2616         LASSERT(sobj);
2617
2618         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2619         if (rc)
2620                 RETURN(rc);
2621
2622         /* XXX: when get here, "tobj == NULL" means tobj must
2623          * NOT exist (neither on remote MDS, such case has been
2624          * processed in cld_rename before mdd_rename and enable
2625          * MDS_PERM_BYPASS).
2626          * So check may_create, but not check may_unlink. */
2627         if (tobj == NULL)
2628                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2629                                     (src_pobj != tgt_pobj));
2630         else
2631                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2632                                     (src_pobj != tgt_pobj), 1);
2633
2634         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2635                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2636
2637         RETURN(rc);
2638 }
2639
2640 static int mdd_declare_rename(const struct lu_env *env,
2641                               struct mdd_device *mdd,
2642                               struct mdd_object *mdd_spobj,
2643                               struct mdd_object *mdd_tpobj,
2644                               struct mdd_object *mdd_sobj,
2645                               struct mdd_object *mdd_tobj,
2646                               const struct lu_name *sname,
2647                               const struct lu_name *tname,
2648                               struct md_attr *ma,
2649                               struct linkea_data *ldata,
2650                               struct thandle *handle)
2651 {
2652         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2653         int rc;
2654
2655         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2656         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2657
2658         LASSERT(mdd_spobj);
2659         LASSERT(mdd_tpobj);
2660         LASSERT(mdd_sobj);
2661
2662         /* name from source dir */
2663         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2664         if (rc)
2665                 return rc;
2666
2667         /* .. from source child */
2668         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2669                 /* source child can be directory,
2670                  * counted by source dir's nlink */
2671                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2672                 if (rc)
2673                         return rc;
2674                 if (mdd_spobj != mdd_tpobj) {
2675                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2676                                                       handle);
2677                         if (rc != 0)
2678                                 return rc;
2679
2680                         rc = mdo_declare_index_insert(env, mdd_sobj,
2681                                                       mdo2fid(mdd_tpobj),
2682                                                       S_IFDIR, dotdot, handle);
2683                         if (rc != 0)
2684                                 return rc;
2685                 }
2686
2687                 /* new target child can be directory,
2688                  * counted by target dir's nlink */
2689                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2690                 if (rc != 0)
2691                         return rc;
2692         }
2693
2694         la->la_valid = LA_CTIME | LA_MTIME;
2695         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2696         if (rc != 0)
2697                 return rc;
2698
2699         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2700         if (rc != 0)
2701                 return rc;
2702
2703         la->la_valid = LA_CTIME;
2704         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2705         if (rc)
2706                 return rc;
2707
2708         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2709                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2710         if (rc)
2711                 return rc;
2712
2713         /* new name */
2714         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2715                                       mdd_object_type(mdd_sobj),
2716                                       tname->ln_name, handle);
2717         if (rc != 0)
2718                 return rc;
2719
2720         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2721                 /* delete target child in target parent directory */
2722                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2723                                               handle);
2724                 if (rc)
2725                         return rc;
2726
2727                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2728                 if (rc)
2729                         return rc;
2730
2731                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2732                         /* target child can be directory,
2733                          * delete "." reference in target child directory */
2734                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2735                         if (rc)
2736                                 return rc;
2737
2738                         /* delete ".." reference in target parent directory */
2739                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2740                         if (rc)
2741                                 return rc;
2742                 }
2743
2744                 la->la_valid = LA_CTIME;
2745                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2746                 if (rc)
2747                         return rc;
2748
2749                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2750                 if (rc)
2751                         return rc;
2752         }
2753
2754         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2755         if (rc)
2756                 return rc;
2757
2758         return rc;
2759 }
2760
2761 /* src object can be remote that is why we use only fid and type of object */
2762 static int mdd_rename(const struct lu_env *env,
2763                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2764                       const struct lu_fid *lf, const struct lu_name *lsname,
2765                       struct md_object *tobj, const struct lu_name *ltname,
2766                       struct md_attr *ma)
2767 {
2768         const char *sname = lsname->ln_name;
2769         const char *tname = ltname->ln_name;
2770         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2771         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2772         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2773         struct mdd_device *mdd = mdo2mdd(src_pobj);
2774         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2775         struct mdd_object *mdd_tobj = NULL;
2776         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2777         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2778         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2779         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2780         struct thandle *handle;
2781         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2782         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2783         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2784         bool is_dir;
2785         bool tobj_ref = 0;
2786         bool tobj_locked = 0;
2787         unsigned cl_flags = 0;
2788         int rc, rc2;
2789         ENTRY;
2790
2791         if (tobj)
2792                 mdd_tobj = md2mdd_obj(tobj);
2793
2794         mdd_sobj = mdd_object_find(env, mdd, lf);
2795         if (IS_ERR(mdd_sobj))
2796                 RETURN(PTR_ERR(mdd_sobj));
2797
2798         rc = mdd_la_get(env, mdd_sobj, cattr);
2799         if (rc)
2800                 GOTO(out_pending, rc);
2801
2802         rc = mdd_la_get(env, mdd_spobj, pattr);
2803         if (rc)
2804                 GOTO(out_pending, rc);
2805
2806         if (mdd_tobj) {
2807                 rc = mdd_la_get(env, mdd_tobj, tattr);
2808                 if (rc)
2809                         GOTO(out_pending, rc);
2810                 /* search for an existing archive.
2811                  * we should check ahead as the object
2812                  * can be destroyed in this transaction */
2813                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2814                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2815         }
2816
2817         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2818         if (rc)
2819                 GOTO(out_pending, rc);
2820
2821         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2822                                      mdd_sobj, cattr, mdd_tobj, tattr);
2823         if (rc)
2824                 GOTO(out_pending, rc);
2825
2826         rc = mdd_name_check(mdd, ltname);
2827         if (rc < 0)
2828                 GOTO(out_pending, rc);
2829
2830         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2831         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2832         if (rc < 0)
2833                 GOTO(out_pending, rc);
2834
2835         handle = mdd_trans_create(env, mdd);
2836         if (IS_ERR(handle))
2837                 GOTO(out_pending, rc = PTR_ERR(handle));
2838
2839         memset(ldata, 0, sizeof(*ldata));
2840         mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
2841                            mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
2842         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2843                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2844         if (rc)
2845                 GOTO(stop, rc);
2846
2847         rc = mdd_trans_start(env, mdd, handle);
2848         if (rc)
2849                 GOTO(stop, rc);
2850
2851         is_dir = S_ISDIR(cattr->la_mode);
2852
2853         /* Remove source name from source directory */
2854         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2855         if (rc != 0)
2856                 GOTO(stop, rc);
2857
2858         /* "mv dir1 dir2" needs "dir1/.." link update */
2859         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2860                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2861                 if (rc != 0)
2862                         GOTO(fixup_spobj2, rc);
2863
2864                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2865                                              dotdot, handle);
2866                 if (rc != 0)
2867                         GOTO(fixup_spobj, rc);
2868         }
2869
2870         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2871                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2872                 if (rc != 0)
2873                         /* tname might been renamed to something else */
2874                         GOTO(fixup_spobj, rc);
2875         }
2876
2877         /* Insert new fid with target name into target dir */
2878         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2879                                 tname, handle);
2880         if (rc != 0)
2881                 GOTO(fixup_tpobj, rc);
2882
2883         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2884         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2885
2886         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2887         la->la_valid = LA_CTIME;
2888         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2889         if (rc)
2890                 GOTO(fixup_tpobj, rc);
2891
2892         /* Update the linkEA for the source object */
2893         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2894         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2895                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
2896                               0, 0);
2897         if (rc == -ENOENT)
2898                 /* Old files might not have EA entry */
2899                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2900                               lsname, handle, NULL, 0);
2901         mdd_write_unlock(env, mdd_sobj);
2902         /* We don't fail the transaction if the link ea can't be
2903            updated -- fid2path will use alternate lookup method. */
2904         rc = 0;
2905
2906         /* Remove old target object
2907          * For tobj is remote case cmm layer has processed
2908          * and set tobj to NULL then. So when tobj is NOT NULL,
2909          * it must be local one.
2910          */
2911         if (tobj && mdd_object_exists(mdd_tobj)) {
2912                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2913                 tobj_locked = 1;
2914                 if (mdd_is_dead_obj(mdd_tobj)) {
2915                         /* shld not be dead, something is wrong */
2916                         CERROR("tobj is dead, something is wrong\n");
2917                         rc = -EINVAL;
2918                         goto cleanup;
2919                 }
2920                 mdo_ref_del(env, mdd_tobj, handle);
2921
2922                 /* Remove dot reference. */
2923                 if (S_ISDIR(tattr->la_mode))
2924                         mdo_ref_del(env, mdd_tobj, handle);
2925                 tobj_ref = 1;
2926
2927                 /* fetch updated nlink */
2928                 rc = mdd_la_get(env, mdd_tobj, tattr);
2929                 if (rc != 0) {
2930                         CERROR("%s: Failed to get nlink for tobj "
2931                                 DFID": rc = %d\n",
2932                                 mdd2obd_dev(mdd)->obd_name,
2933                                 PFID(tpobj_fid), rc);
2934                         GOTO(fixup_tpobj, rc);
2935                 }
2936
2937                 la->la_valid = LA_CTIME;
2938                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2939                 if (rc != 0) {
2940                         CERROR("%s: Failed to set ctime for tobj "
2941                                 DFID": rc = %d\n",
2942                                 mdd2obd_dev(mdd)->obd_name,
2943                                 PFID(tpobj_fid), rc);
2944                         GOTO(fixup_tpobj, rc);
2945                 }
2946
2947                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2948                 ma->ma_attr = *tattr;
2949                 ma->ma_valid |= MA_INODE;
2950                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2951                                        handle);
2952                 if (rc != 0) {
2953                         CERROR("%s: Failed to unlink tobj "
2954                                 DFID": rc = %d\n",
2955                                 mdd2obd_dev(mdd)->obd_name,
2956                                 PFID(tpobj_fid), rc);
2957                         GOTO(fixup_tpobj, rc);
2958                 }
2959
2960                 /* fetch updated nlink */
2961                 rc = mdd_la_get(env, mdd_tobj, tattr);
2962                 if (rc == -ENOENT) {
2963                         /* the object got removed, let's
2964                          * return the latest known attributes */
2965                         tattr->la_nlink = 0;
2966                         rc = 0;
2967                 } else if (rc != 0) {
2968                         CERROR("%s: Failed to get nlink for tobj "
2969                                 DFID": rc = %d\n",
2970                                 mdd2obd_dev(mdd)->obd_name,
2971                                 PFID(tpobj_fid), rc);
2972                         GOTO(fixup_tpobj, rc);
2973                 }
2974                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2975                 ma->ma_attr = *tattr;
2976                 ma->ma_valid |= MA_INODE;
2977
2978                 if (tattr->la_nlink == 0)
2979                         cl_flags |= CLF_RENAME_LAST;
2980                 else
2981                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
2982         }
2983
2984         la->la_valid = LA_CTIME | LA_MTIME;
2985         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2986         if (rc)
2987                 GOTO(fixup_tpobj, rc);
2988
2989         if (mdd_spobj != mdd_tpobj) {
2990                 la->la_valid = LA_CTIME | LA_MTIME;
2991                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2992                 if (rc != 0)
2993                         GOTO(fixup_tpobj, rc);
2994         }
2995
2996         EXIT;
2997
2998 fixup_tpobj:
2999         if (rc) {
3000                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3001                 if (rc2)
3002                         CWARN("tp obj fix error %d\n",rc2);
3003
3004                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3005                     !mdd_is_dead_obj(mdd_tobj)) {
3006                         if (tobj_ref) {
3007                                 mdo_ref_add(env, mdd_tobj, handle);
3008                                 if (is_dir)
3009                                         mdo_ref_add(env, mdd_tobj, handle);
3010                         }
3011
3012                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3013                                                   mdo2fid(mdd_tobj),
3014                                                   mdd_object_type(mdd_tobj),
3015                                                   tname, handle);
3016                         if (rc2 != 0)
3017                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3018                 }
3019         }
3020
3021 fixup_spobj:
3022         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3023                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3024                 if (rc2)
3025                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3026                                mdd2obd_dev(mdd)->obd_name, rc2);
3027
3028
3029                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3030                                               dotdot, handle);
3031                 if (rc2 != 0)
3032                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3033                               mdd2obd_dev(mdd)->obd_name, rc2);
3034         }
3035
3036 fixup_spobj2:
3037         if (rc != 0) {
3038                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3039                                          mdd_object_type(mdd_sobj), sname,
3040                                          handle);
3041                 if (rc2 != 0)
3042                         CWARN("sp obj fix error: rc = %d\n", rc2);
3043         }
3044
3045 cleanup:
3046         if (tobj_locked)
3047                 mdd_write_unlock(env, mdd_tobj);
3048
3049         if (rc == 0)
3050                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3051                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3052                                             ltname, lsname, handle);
3053
3054 stop:
3055         rc = mdd_trans_stop(env, mdd, rc, handle);
3056
3057 out_pending:
3058         mdd_object_put(env, mdd_sobj);
3059         return rc;
3060 }
3061
3062 /**
3063  * During migration once the parent FID has been changed,
3064  * we need update the parent FID in linkea.
3065  **/
3066 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3067                                             struct mdd_object *parent,
3068                                             struct mdd_object *newparent,
3069                                             struct mdd_object *child,
3070                                             const char *name, int namelen,
3071                                             struct thandle *handle,
3072                                             bool declare)
3073 {
3074         struct mdd_thread_info  *info = mdd_env_info(env);
3075         struct linkea_data      ldata = { NULL };
3076         struct lu_buf           *buf = &info->mti_link_buf;
3077         int                     count;
3078         int                     rc = 0;
3079
3080         ENTRY;
3081
3082         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3083         if (buf->lb_buf == NULL)
3084                 RETURN(-ENOMEM);
3085
3086         ldata.ld_buf = buf;
3087         rc = mdd_links_read(env, child, &ldata);
3088         if (rc != 0) {
3089                 if (rc == -ENOENT || rc == -ENODATA)
3090                         rc = 0;
3091                 RETURN(rc);
3092         }
3093
3094         LASSERT(ldata.ld_leh != NULL);
3095         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3096         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3097                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3098                 struct lu_name lname;
3099                 struct lu_fid  fid;
3100
3101                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3102                                     &lname, &fid);
3103
3104                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3105                     !lu_fid_eq(&fid, mdd_object_fid(parent))) {
3106                         ldata.ld_lee = (struct link_ea_entry *)
3107                                        ((char *)ldata.ld_lee +
3108                                         ldata.ld_reclen);
3109                         continue;
3110                 }
3111
3112                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3113                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3114                        lname.ln_namelen, lname.ln_name,
3115                        PFID(mdd_object_fid(newparent)));
3116                 /* update to the new parent fid */
3117                 linkea_entry_pack(ldata.ld_lee, &lname,
3118                                   mdd_object_fid(newparent));
3119                 if (declare)
3120                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3121                                                    MLAO_IGNORE);
3122                 else
3123                         rc = mdd_links_write(env, child, &ldata, handle);
3124                 break;
3125         }
3126         RETURN(rc);
3127 }
3128
3129 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3130                                            struct mdd_object *parent,
3131                                            struct mdd_object *newparent,
3132                                            struct mdd_object *child,
3133                                            const char *name, int namelen,
3134                                            struct thandle *handle)
3135 {
3136         return mdd_linkea_update_child_internal(env, parent, newparent,
3137                                                 child, name,
3138                                                 namelen, handle, true);
3139 }
3140
3141 static int mdd_linkea_update_child(const struct lu_env *env,
3142                                    struct mdd_object *parent,
3143                                    struct mdd_object *newparent,
3144                                    struct mdd_object *child,
3145                                    const char *name, int namelen,
3146                                    struct thandle *handle)
3147 {
3148         return mdd_linkea_update_child_internal(env, parent, newparent,
3149                                                 child, name,
3150                                                 namelen, handle, false);
3151 }
3152
3153 static int mdd_update_linkea_internal(const struct lu_env *env,
3154                                       struct mdd_object *mdd_pobj,
3155                                       struct mdd_object *mdd_sobj,
3156                                       struct mdd_object *mdd_tobj,
3157                                       const struct lu_name *child_name,
3158                                       struct linkea_data *ldata,
3159                                       struct thandle *handle,
3160                                       int declare)
3161 {
3162         struct mdd_thread_info  *info = mdd_env_info(env);
3163         int                     count;
3164         int                     rc = 0;
3165         ENTRY;
3166
3167         LASSERT(ldata->ld_buf != NULL);
3168
3169 again:
3170         /* If it is mulitple links file, we need update the name entry for
3171          * all parent */
3172         LASSERT(ldata->ld_leh != NULL);
3173         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3174         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3175                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3176                 struct mdd_object       *pobj;
3177                 struct lu_name          lname;
3178                 struct lu_fid           fid;
3179
3180                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3181                                     &lname, &fid);
3182                 pobj = mdd_object_find(env, mdd, &fid);
3183                 if (IS_ERR(pobj)) {
3184                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3185                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3186                               PTR_ERR(pobj));
3187                         linkea_del_buf(ldata, &lname);
3188                         goto again;
3189                 }
3190
3191                 if (!mdd_object_exists(pobj)) {
3192                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3193                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3194                         linkea_del_buf(ldata, &lname);
3195                         mdd_object_put(env, pobj);
3196                         goto again;
3197                 }
3198
3199                 if (pobj == mdd_pobj &&
3200                     lname.ln_namelen == child_name->ln_namelen &&
3201                     strncmp(lname.ln_name, child_name->ln_name,
3202                             lname.ln_namelen) == 0) {
3203                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3204                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3205                               PFID(&fid));
3206                         linkea_del_buf(ldata, &lname);
3207                         mdd_object_put(env, pobj);
3208                         goto again;
3209                 }
3210
3211                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3212                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3213                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3214
3215                 if (declare) {
3216                         /* Remove source name from source directory */
3217                         /* Insert new fid with target name into target dir */
3218                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3219                                                       handle);
3220                         if (rc != 0)
3221                                 GOTO(next_put, rc);
3222
3223                         rc = mdo_declare_index_insert(env, pobj,
3224                                         mdd_object_fid(mdd_tobj),
3225                                         mdd_object_type(mdd_tobj),
3226                                         lname.ln_name, handle);
3227                         if (rc != 0)
3228                                 GOTO(next_put, rc);
3229
3230                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3231                         if (rc)
3232                                 GOTO(next_put, rc);
3233
3234                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3235                         if (rc)
3236                                 GOTO(next_put, rc);
3237                 } else {
3238                         char *tmp_name = info->mti_key;
3239
3240                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3241                                 /* lnamelen is too big(> NAME_MAX + 16),
3242                                  * something wrong about this linkea, let's
3243                                  * skip it */
3244                                 linkea_del_buf(ldata, &lname);
3245                                 mdd_object_put(env, pobj);
3246                                 goto again;
3247                         }
3248
3249                         /* Note: lname might be without \0 at the end, see
3250                          * linkea_entry_unpack(), let's add extra \0 by
3251                          * snprintf */
3252                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3253                                  lname.ln_namelen, lname.ln_name);
3254                         lname.ln_name = tmp_name;
3255
3256                         /* Let's check if this linkEA still valid, before
3257                          * it might be packed into the RPC buffer. */
3258                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3259                                         &info->mti_fid, NULL);
3260                         if (rc < 0 ||
3261                             !lu_fid_eq(&info->mti_fid,
3262                                         mdd_object_fid(mdd_sobj))) {
3263                                 /* skip invalid linkea entry */
3264                                 linkea_del_buf(ldata, &lname);
3265                                 mdd_object_put(env, pobj);
3266                                 goto again;
3267                         }
3268
3269                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3270                         if (rc != 0)
3271                                 GOTO(next_put, rc);
3272
3273                         rc = __mdd_index_insert(env, pobj,
3274                                         mdd_object_fid(mdd_tobj),
3275                                         mdd_object_type(mdd_tobj),
3276                                         tmp_name, handle);
3277                         if (rc != 0)
3278                                 GOTO(next_put, rc);
3279
3280                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3281                         rc = mdo_ref_add(env, mdd_tobj, handle);
3282                         mdd_write_unlock(env, mdd_tobj);
3283                         if (rc)
3284                                 GOTO(next_put, rc);
3285
3286                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3287                         mdo_ref_del(env, mdd_sobj, handle);
3288                         mdd_write_unlock(env, mdd_sobj);
3289                 }
3290 next_put:
3291                 mdd_object_put(env, pobj);
3292                 if (rc != 0)
3293                         break;
3294
3295                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3296                                                          ldata->ld_reclen);
3297         }
3298
3299         RETURN(rc);
3300 }
3301
3302 static int mdd_migrate_xattrs(const struct lu_env *env,
3303                               struct mdd_object *mdd_sobj,
3304                               struct mdd_object *mdd_tobj)
3305 {
3306         struct mdd_thread_info  *info = mdd_env_info(env);
3307         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3308         char                    *xname;
3309         struct thandle          *handle;
3310         struct lu_buf           xbuf;
3311         int                     xlen;
3312         int                     rem;
3313         int                     xsize;
3314         int                     list_xsize;
3315         struct lu_buf           list_xbuf;
3316         int                     rc;
3317
3318         /* retrieve xattr list from the old object */
3319         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3320         if (list_xsize == -ENODATA)
3321                 return 0;
3322
3323         if (list_xsize < 0)
3324                 return list_xsize;
3325
3326         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3327         if (info->mti_big_buf.lb_buf == NULL)
3328                 return -ENOMEM;
3329
3330         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3331         list_xbuf.lb_len = list_xsize;
3332         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3333         if (rc < 0)
3334                 return rc;
3335         rc = 0;
3336         rem = list_xsize;
3337         xname = list_xbuf.lb_buf;
3338         while (rem > 0) {
3339                 xlen = strnlen(xname, rem - 1) + 1;
3340                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3341                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3342                     strcmp(XATTR_NAME_LMV, xname) == 0)
3343                         goto next;
3344
3345                 /* For directory, if there are default layout, migrate here */
3346                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3347                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3348                         goto next;
3349
3350                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3351                 if (xsize == -ENODATA)
3352                         goto next;
3353                 if (xsize < 0)
3354                         GOTO(out, rc);
3355
3356                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3357                 if (info->mti_link_buf.lb_buf == NULL)
3358                         GOTO(out, rc = -ENOMEM);
3359
3360                 xbuf.lb_len = xsize;
3361                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3362                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3363                 if (rc == -ENODATA)
3364                         goto next;
3365                 if (rc < 0)
3366                         GOTO(out, rc);
3367
3368                 handle = mdd_trans_create(env, mdd);
3369                 if (IS_ERR(handle))
3370                         GOTO(out, rc = PTR_ERR(handle));
3371
3372                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3373                                            handle);
3374                 if (rc != 0)
3375                         GOTO(stop_trans, rc);
3376                 /* Note: this transaction is part of migration, and it is not
3377                  * the last step of migration, so we set th_local = 1 to avoid
3378                  * update last rcvd for this transaction */
3379                 handle->th_local = 1;
3380                 rc = mdd_trans_start(env, mdd, handle);
3381                 if (rc != 0)
3382                         GOTO(stop_trans, rc);
3383
3384                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3385                 if (rc == -EEXIST)
3386                         GOTO(stop_trans, rc = 0);
3387
3388                 if (rc != 0)
3389                         GOTO(stop_trans, rc);
3390 stop_trans:
3391                 rc = mdd_trans_stop(env, mdd, rc, handle);
3392                 if (rc != 0)
3393                         GOTO(out, rc);
3394 next:
3395                 rem -= xlen;
3396                 memmove(xname, xname + xlen, rem);
3397         }
3398 out:
3399         return rc;
3400 }
3401
3402 static int mdd_declare_migrate_create(const struct lu_env *env,
3403                                       struct mdd_object *mdd_pobj,
3404                                       struct mdd_object *mdd_sobj,
3405                                       struct mdd_object *mdd_tobj,
3406                                       struct md_op_spec *spec,
3407                                       struct lu_attr *la,
3408                                       union lmv_mds_md *mgr_ea,
3409                                       struct linkea_data *ldata,
3410                                       struct thandle *handle)
3411 {
3412         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3413         const struct lu_buf     *buf;
3414         int                     rc;
3415         int                     mgr_easize;
3416
3417         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3418                                                 handle, spec, NULL);
3419         if (rc != 0)
3420                 return rc;
3421
3422         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3423                                            handle);
3424         if (rc != 0)
3425                 return rc;
3426
3427         if (S_ISLNK(la->la_mode)) {
3428                 const char *target_name = spec->u.sp_symname;
3429                 int sym_len = strlen(target_name);
3430                 const struct lu_buf *buf;
3431
3432                 buf = mdd_buf_get_const(env, target_name, sym_len);
3433                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3434                                              buf, 0, handle);
3435                 if (rc != 0)
3436                         return rc;
3437         } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
3438                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3439                                            MLAO_IGNORE);
3440                 if (rc != 0)
3441                         return rc;
3442         }
3443
3444         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3445                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3446                                         spec->u.sp_ea.eadatalen);
3447                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3448                                            0, handle);
3449                 if (rc)
3450                         return rc;
3451         }
3452
3453         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3454         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3455         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3456                                    0, handle);
3457         if (rc)
3458                 return rc;
3459
3460         la_flag->la_valid = LA_FLAGS;
3461         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3462         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3463
3464         return rc;
3465 }
3466
3467 static int mdd_migrate_create(const struct lu_env *env,
3468                               struct mdd_object *mdd_pobj,
3469                               struct mdd_object *mdd_sobj,
3470                               struct mdd_object *mdd_tobj,
3471                               const struct lu_name *lname,
3472                               struct lu_attr *la)
3473 {
3474         struct mdd_thread_info  *info = mdd_env_info(env);
3475         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3476         struct md_op_spec       *spec = &info->mti_spec;
3477         struct lu_buf           lmm_buf = { NULL };
3478         struct lu_buf           link_buf = { NULL };
3479         const struct lu_buf     *buf;
3480         struct thandle          *handle;
3481         struct lmv_mds_md_v1    *mgr_ea;
3482         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3483         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3484         int                     mgr_easize;
3485         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3486         int                     rc;
3487         ENTRY;
3488
3489         /* prepare spec for create */
3490         memset(spec, 0, sizeof(*spec));
3491         spec->sp_cr_lookup = 0;
3492         spec->sp_feat = &dt_directory_features;
3493         if (S_ISLNK(la->la_mode)) {
3494                 buf = lu_buf_check_and_alloc(
3495                                 &mdd_env_info(env)->mti_big_buf,
3496                                 la->la_size + 1);
3497                 link_buf = *buf;
3498                 link_buf.lb_len = la->la_size + 1;
3499                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3500                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3501                 if (rc <= 0) {
3502                         rc = rc != 0 ? rc : -EFAULT;
3503                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3504                                mdd2obd_dev(mdd)->obd_name,
3505                                PFID(mdd_object_fid(mdd_sobj)), rc);
3506                         RETURN(rc);
3507                 }
3508                 spec->u.sp_symname = link_buf.lb_buf;
3509         } else if (S_ISREG(la->la_mode)) {
3510                 /* retrieve lov of the old object */
3511                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3512                 if (rc != 0 && rc != -ENODATA)
3513                         RETURN(rc);
3514                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3515                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3516                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3517                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3518                 }
3519         } else if (S_ISDIR(la->la_mode)) {
3520                 rc = mdd_links_read(env, mdd_sobj, ldata);
3521                 if (rc == -ENODATA) {
3522                         /* ignore the non-linkEA error */
3523                         ldata = NULL;
3524                         rc = 0;
3525                 }
3526                 if (rc < 0)
3527                         RETURN(rc);
3528         }
3529
3530         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3531         memset(mgr_ea, 0, sizeof(*mgr_ea));
3532         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3533         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3534         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3535         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3536         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3537         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3538
3539         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3540
3541         handle = mdd_trans_create(env, mdd);
3542         if (IS_ERR(handle))
3543                 GOTO(out_free, rc = PTR_ERR(handle));
3544
3545         /* Note: this transaction is part of migration, and it is not
3546          * the last step of migration, so we set th_local = 1 to avoid
3547          * update last rcvd for this transaction */
3548         handle->th_local = 1;
3549         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3550                                         spec, la,
3551                                         (union lmv_mds_md *)info->mti_xattr_buf,
3552                                         ldata, handle);
3553         if (rc != 0)
3554                 GOTO(stop_trans, rc);
3555
3556         rc = mdd_trans_start(env, mdd, handle);
3557         if (rc != 0)
3558                 GOTO(stop_trans, rc);
3559
3560         /* don't set nlink from the original object */
3561         la->la_valid &= ~LA_NLINK;
3562
3563         /* create the target object */
3564         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3565                                hint, handle);
3566         if (rc != 0)
3567                 GOTO(stop_trans, rc);
3568
3569         if (S_ISDIR(la->la_mode) && ldata != NULL) {
3570                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3571                 if (rc != 0)
3572                         GOTO(stop_trans, rc);
3573         }
3574
3575         /* Set MIGRATE EA on the source inode, so once the migration needs
3576          * to be re-done during failover, the re-do process can locate the
3577          * target object which is already being created. */
3578         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3579         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3580         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
3581         if (rc != 0)
3582                 GOTO(stop_trans, rc);
3583
3584         /* Set immutable flag, so any modification is disabled until
3585          * the migration is done. Once the migration is interrupted,
3586          * if the resume process find the migrating object has both
3587          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3588          * flag and approve the migration */
3589         la_flag->la_valid = LA_FLAGS;
3590         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3591         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3592 stop_trans:
3593         if (handle != NULL)
3594                 rc = mdd_trans_stop(env, mdd, rc, handle);
3595 out_free:
3596         if (lmm_buf.lb_buf != NULL)
3597                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3598         RETURN(rc);
3599 }
3600
3601 static int mdd_migrate_entries(const struct lu_env *env,
3602                                struct mdd_object *mdd_sobj,
3603                                struct mdd_object *mdd_tobj)
3604 {
3605         struct dt_object        *next = mdd_object_child(mdd_sobj);
3606         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3607         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3608         struct thandle          *handle;
3609         struct dt_it            *it;
3610         const struct dt_it_ops  *iops;
3611         int                      result;
3612         struct lu_dirent        *ent;
3613         int                      rc;
3614         ENTRY;
3615
3616         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3617         if (ent == NULL)
3618                 RETURN(-ENOMEM);
3619
3620         if (!dt_try_as_dir(env, next))
3621                 GOTO(out_ent, rc = -ENOTDIR);
3622         /*
3623          * iterate directories
3624          */
3625         iops = &next->do_index_ops->dio_it;
3626         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3627         if (IS_ERR(it))
3628                 GOTO(out_ent, rc = PTR_ERR(it));
3629
3630         rc = iops->load(env, it, 0);
3631         if (rc == 0)
3632                 rc = iops->next(env, it);
3633         else if (rc > 0)
3634                 rc = 0;
3635         /*
3636          * At this point and across for-loop:
3637          *
3638          *  rc == 0 -> ok, proceed.
3639          *  rc >  0 -> end of directory.
3640          *  rc <  0 -> error.
3641          */
3642         do {
3643                 struct mdd_object       *child;
3644                 char                    *name = mdd_env_info(env)->mti_key;
3645                 int                     len;
3646                 int                     recsize;
3647                 int                     is_dir;
3648                 bool                    target_exist = false;
3649
3650                 len = iops->key_size(env, it);
3651                 if (len == 0)
3652                         goto next;
3653
3654                 result = iops->rec(env, it, (struct dt_rec *)ent,
3655                                    LUDA_FID | LUDA_TYPE);
3656                 if (result == -ESTALE)
3657                         goto next;
3658                 if (result != 0) {
3659                         rc = result;
3660                         goto out;
3661                 }
3662
3663                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3664                 recsize = le16_to_cpu(ent->lde_reclen);
3665
3666                 /* Insert new fid with target name into target dir */
3667                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3668                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3669                      ent->lde_name[1] == '.'))
3670                         goto next;
3671
3672                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3673                 if (IS_ERR(child))
3674                         GOTO(out, rc = PTR_ERR(child));
3675
3676                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3677                 is_dir = S_ISDIR(mdd_object_type(child));
3678
3679                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3680
3681                 /* Check whether the name has been inserted to the target */
3682                 if (dt_try_as_dir(env, dt_tobj)) {
3683                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3684
3685                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3686                                        (struct dt_key *)name);
3687                         if (unlikely(rc == 0))
3688                                 target_exist = true;
3689                 }
3690
3691                 handle = mdd_trans_create(env, mdd);
3692                 if (IS_ERR(handle))
3693                         GOTO(out_put, rc = PTR_ERR(handle));
3694
3695                 /* Note: this transaction is part of migration, and it is not
3696                  * the last step of migration, so we set th_local = 1 to avoid
3697                  * updating last rcvd for this transaction */
3698                 handle->th_local = 1;
3699                 if (likely(!target_exist)) {
3700                         rc = mdo_declare_index_insert(env, mdd_tobj,
3701                                                       &ent->lde_fid,
3702                                                       mdd_object_type(child),
3703                                                       name, handle);
3704                         if (rc != 0)
3705                                 GOTO(out_put, rc);
3706
3707                         if (is_dir) {
3708                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3709                                 if (rc != 0)
3710                                         GOTO(out_put, rc);
3711                         }
3712                 }
3713
3714                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3715                 if (rc != 0)
3716                         GOTO(out_put, rc);
3717
3718                 if (is_dir) {
3719                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3720                         if (rc != 0)
3721                                 GOTO(out_put, rc);
3722
3723                         /* Update .. for child */
3724                         rc = mdo_declare_index_delete(env, child, dotdot,
3725                                                       handle);
3726                         if (rc != 0)
3727                                 GOTO(out_put, rc);
3728
3729                         rc = mdo_declare_index_insert(env, child,
3730                                                       mdd_object_fid(mdd_tobj),
3731                                                       S_IFDIR, dotdot, handle);
3732                         if (rc != 0)
3733                                 GOTO(out_put, rc);
3734                 }
3735
3736                 rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
3737                                                      child, name,
3738                                                      strlen(name),
3739                                                      handle);
3740                 if (rc != 0)
3741                         GOTO(out_put, rc);
3742
3743                 rc = mdd_trans_start(env, mdd, handle);
3744                 if (rc != 0) {
3745                         CERROR("%s: transaction start failed: rc = %d\n",
3746                                mdd2obd_dev(mdd)->obd_name, rc);
3747                         GOTO(out_put, rc);
3748                 }
3749
3750                 if (likely(!target_exist)) {
3751                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3752                                                 mdd_object_type(child),
3753                                                 name, handle);
3754                         if (rc != 0)
3755                                 GOTO(out_put, rc);
3756                 }
3757
3758                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3759                 if (rc != 0)
3760                         GOTO(out_put, rc);
3761
3762                 if (is_dir) {
3763                         rc = __mdd_index_delete_only(env, child, dotdot,
3764                                                      handle);
3765                         if (rc != 0)
3766                                 GOTO(out_put, rc);
3767
3768                         rc = __mdd_index_insert_only(env, child,
3769                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3770                                          dotdot, handle);
3771                         if (rc != 0)
3772                                 GOTO(out_put, rc);
3773                 }
3774
3775                 rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
3776                                              child, name,
3777                                              strlen(name), handle);
3778
3779 out_put:
3780                 mdd_write_unlock(env, child);
3781                 mdd_object_put(env, child);
3782                 rc = mdd_trans_stop(env, mdd, rc, handle);
3783                 if (rc != 0)
3784                         GOTO(out, rc);
3785 next:
3786                 result = iops->next(env, it);
3787                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3788                         GOTO(out, rc = -EINTR);
3789
3790                 if (result == -ESTALE)
3791                         goto next;
3792         } while (result == 0);
3793 out:
3794         iops->put(env, it);
3795         iops->fini(env, it);
3796 out_ent:
3797         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3798         RETURN(rc);
3799 }
3800
3801 static int mdd_declare_update_linkea(const struct lu_env *env,
3802                                      struct mdd_object *mdd_pobj,
3803                                      struct mdd_object *mdd_sobj,
3804                                      struct mdd_object *mdd_tobj,
3805                                      const struct lu_name *child_name,
3806                                      struct linkea_data *ldata,
3807                                      struct thandle *handle)
3808 {
3809         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3810                                           child_name, ldata, handle, 1);
3811 }
3812
3813 static int mdd_update_linkea(const struct lu_env *env,
3814                              struct mdd_object *mdd_pobj,
3815                              struct mdd_object *mdd_sobj,
3816                              struct mdd_object *mdd_tobj,
3817                              const struct lu_name *child_name,
3818                              struct linkea_data *ldata,
3819                              struct thandle *handle)
3820 {
3821         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3822                                           child_name, ldata, handle, 0);
3823 }
3824
3825 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3826                                            struct mdd_object *mdd_pobj,
3827                                            struct mdd_object *mdd_sobj,
3828                                            struct mdd_object *mdd_tobj,
3829                                            const struct lu_name *lname,
3830                                            struct lu_attr *la,
3831                                            struct lu_attr *parent_la,
3832                                            struct linkea_data *ldata,
3833                                            struct thandle *handle)
3834 {
3835         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3836         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
3837         int rc;
3838
3839         /* Revert IMMUTABLE flag */
3840         la_flag->la_valid = LA_FLAGS;
3841         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3842         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3843         if (rc != 0)
3844                 return rc;
3845
3846         /* delete entry from source dir */
3847         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3848         if (rc != 0)
3849                 return rc;
3850
3851         if (ldata->ld_buf != NULL) {
3852                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3853                                                mdd_tobj, lname, ldata, handle);
3854                 if (rc != 0)
3855                         return rc;
3856         }
3857
3858         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3859                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3860                                            handle);
3861                 if (rc != 0)
3862                         return rc;
3863
3864                 handle->th_complex = 1;
3865                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3866                                            XATTR_NAME_FID,
3867                                            LU_XATTR_REPLACE, handle);
3868                 if (rc < 0)
3869                         return rc;
3870         }
3871
3872         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3873                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3874                 if (rc != 0)
3875                         return rc;
3876         }
3877
3878         /* new name */
3879         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3880                                       mdd_object_type(mdd_tobj),
3881                                       lname->ln_name, handle);
3882         if (rc != 0)
3883                 return rc;
3884
3885         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
3886         if (rc != 0)
3887                 return rc;
3888
3889         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3890                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3891                 if (rc != 0)
3892                         return rc;
3893         }
3894
3895         /* delete old object */
3896         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3897         if (rc != 0)
3898                 return rc;
3899
3900         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3901                 /* delete old object */
3902                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3903                 if (rc != 0)
3904                         return rc;
3905                 /* set nlink to 0 */
3906                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3907                 if (rc != 0)
3908                         return rc;
3909         }
3910
3911         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3912         if (rc)
3913                 return rc;
3914
3915         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3916         if (rc != 0)
3917                 return rc;
3918
3919         rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
3920
3921         return rc;
3922 }
3923
3924 static int mdd_migrate_update_name(const struct lu_env *env,
3925                                    struct mdd_object *mdd_pobj,
3926                                    struct mdd_object *mdd_sobj,
3927                                    struct mdd_object *mdd_tobj,
3928                                    const struct lu_name *lname,
3929                                    struct md_attr *ma)
3930 {
3931         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3932         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3933         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3934         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3935         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3936         struct thandle          *handle;
3937         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3938         const char              *name = lname->ln_name;
3939         int                     rc;
3940         ENTRY;
3941
3942         /* update time for parent */
3943         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3944         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3945         p_la->la_valid = LA_CTIME;
3946
3947         rc = mdd_la_get(env, mdd_sobj, so_attr);
3948         if (rc != 0)
3949                 RETURN(rc);
3950
3951         ldata->ld_buf = NULL;
3952         rc = mdd_links_read(env, mdd_sobj, ldata);
3953         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
3954                 RETURN(rc);
3955
3956         handle = mdd_trans_create(env, mdd);
3957         if (IS_ERR(handle))
3958                 RETURN(PTR_ERR(handle));
3959
3960         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3961                                              lname, so_attr, p_la, ldata,
3962                                              handle);
3963         if (rc != 0) {
3964                 /* If the migration can not be fit in one transaction, just
3965                  * leave it in the original MDT */
3966                 if (rc == -E2BIG)
3967                         GOTO(stop_trans, rc = 0);
3968                 else
3969                         GOTO(stop_trans, rc);
3970         }
3971
3972         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3973                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3974                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3975                PFID(mdd_object_fid(mdd_tobj)));
3976
3977         rc = mdd_trans_start(env, mdd, handle);
3978         if (rc != 0)
3979                 GOTO(stop_trans, rc);
3980
3981         /* Revert IMMUTABLE flag */
3982         la_flag->la_valid = LA_FLAGS;
3983         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3984         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3985         if (rc != 0)
3986                 GOTO(stop_trans, rc);
3987
3988         /* Remove source name from source directory */
3989         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
3990         if (rc != 0)
3991                 GOTO(stop_trans, rc);
3992
3993         if (ldata->ld_buf != NULL) {
3994                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3995                                        lname, ldata, handle);
3996                 if (rc != 0)
3997                         GOTO(stop_trans, rc);
3998
3999                 /*  linkea update might decrease the source object
4000                  *  nlink, let's get the attr again after ref_del */
4001                 rc = mdd_la_get(env, mdd_sobj, so_attr);
4002                 if (rc != 0)
4003                         GOTO(stop_trans, rc);
4004         }
4005
4006         if (S_ISREG(so_attr->la_mode)) {
4007                 if (so_attr->la_nlink == 1) {
4008                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4009                                            handle);
4010                         if (rc != 0 && rc != -ENODATA)
4011                                 GOTO(stop_trans, rc);
4012
4013                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
4014                                            XATTR_NAME_FID,
4015                                            LU_XATTR_REPLACE, handle);
4016                         if (rc < 0)
4017                                 GOTO(stop_trans, rc);
4018                 }
4019         }
4020
4021         /* Insert new fid with target name into target dir */
4022         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4023                                 mdd_object_type(mdd_tobj), name, handle);
4024         if (rc != 0)
4025                 GOTO(stop_trans, rc);
4026
4027         linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
4028         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
4029                            ldata, 1);
4030         if (rc != 0)
4031                 GOTO(stop_trans, rc);
4032
4033         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4034
4035         mdd_sobj->mod_flags |= DEAD_OBJ;
4036         rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
4037         if (rc != 0)
4038                 GOTO(out_unlock, rc);
4039
4040         rc = __mdd_orphan_add(env, mdd_sobj, handle);
4041         if (rc != 0)
4042                 GOTO(out_unlock, rc);
4043
4044         mdo_ref_del(env, mdd_sobj, handle);
4045         if (is_dir)
4046                 mdo_ref_del(env, mdd_sobj, handle);
4047
4048         /* Get the attr again after ref_del */
4049         rc = mdd_la_get(env, mdd_sobj, so_attr);
4050         if (rc != 0)
4051                 GOTO(out_unlock, rc);
4052
4053         ma->ma_attr = *so_attr;
4054         ma->ma_valid |= MA_INODE;
4055
4056         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4057         if (rc != 0)
4058                 GOTO(out_unlock, rc);
4059
4060         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4061                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4062                                mdo2fid(mdd_pobj), lname, lname, handle);
4063         if (rc != 0) {
4064                 CWARN("%s: changelog for migrate %s "DFID
4065                       "under "DFID" failed: rc = %d\n",
4066                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4067                       PFID(mdd_object_fid(mdd_sobj)),
4068                       PFID(mdd_object_fid(mdd_pobj)), rc);
4069                 /* Sigh, there are no easy way to migrate back the object, so
4070                  * let's reset the result to 0 for now XXX */
4071                 rc = 0;
4072         }
4073 out_unlock:
4074         mdd_write_unlock(env, mdd_sobj);
4075
4076 stop_trans:
4077         rc = mdd_trans_stop(env, mdd, rc, handle);
4078
4079         RETURN(rc);
4080 }
4081
4082 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4083                           const struct lu_fid *fid, __u32 *mdt_index)
4084 {
4085         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4086         struct seq_server_site *ss;
4087         int rc;
4088
4089         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4090
4091         range->lsr_flags = LU_SEQ_RANGE_MDT;
4092         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4093         if (rc != 0)
4094                 return rc;
4095
4096         *mdt_index = range->lsr_index;
4097
4098         return 0;
4099 }
4100 /**
4101  * Check whether we should migrate the file/dir
4102  * return val
4103  *      < 0  permission check failed or other error.
4104  *      = 0  the file can be migrated.
4105  *      > 0  the file does not need to be migrated, mostly
4106  *           for multiple link file
4107  **/
4108 static int mdd_migrate_sanity_check(const struct lu_env *env,
4109                                     struct mdd_object *pobj,
4110                                     const struct lu_attr *pattr,
4111                                     struct mdd_object *sobj,
4112                                     struct lu_attr *sattr)
4113 {
4114         struct mdd_thread_info  *info = mdd_env_info(env);
4115         struct linkea_data      *ldata = &info->mti_link_data;
4116         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4117         int                     mgr_easize;
4118         struct lu_buf           *mgr_buf;
4119         int                     count;
4120         int                     rc;
4121         __u64 mdt_index;
4122         ENTRY;
4123
4124         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4125         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4126         if (mgr_buf->lb_buf == NULL)
4127                 RETURN(-ENOMEM);
4128
4129         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4130         if (rc > 0) {
4131                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4132
4133                 /* If the object has migrateEA, it means IMMUTE flag
4134                  * is being set by previous migration process, so it
4135                  * needs to override the IMMUTE flag, otherwise the
4136                  * following sanity check will fail */
4137                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4138                                                 LMV_HASH_FLAG_MIGRATION) {
4139                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4140
4141                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4142                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4143                                mdd2obd_dev(mdd)->obd_name,
4144                                PFID(mdd_object_fid(sobj)));
4145                 }
4146         }
4147
4148         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4149                                      sobj, sattr, NULL, NULL);
4150         if (rc != 0)
4151                 RETURN(rc);
4152
4153         /* Then it will check if the file should be migrated. If the file
4154          * has mulitple links, we only need migrate the file if all of its
4155          * entries has been migrated to the remote MDT */
4156         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4157                 RETURN(0);
4158
4159         rc = mdd_links_read(env, sobj, ldata);
4160         if (rc != 0) {
4161                 /* For multiple links files, if there are no linkEA data at all,
4162                  * means the file might be created before linkEA is enabled, and
4163                  * all of its links should not be migrated yet, otherwise it
4164                  * should have some linkEA there */
4165                 if (rc == -ENOENT || rc == -ENODATA)
4166                         RETURN(1);
4167                 RETURN(rc);
4168         }
4169
4170         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4171         /* If there are still links locally, then the file will not be
4172          * migrated. */
4173         LASSERT(ldata->ld_leh != NULL);
4174         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4175         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4176                 struct lu_name          lname;
4177                 struct lu_fid           fid;
4178                 __u32                   parent_mdt_index;
4179
4180                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4181                                     &lname, &fid);
4182                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4183                                                          ldata->ld_reclen);
4184
4185                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4186                 if (rc != 0)
4187                         RETURN(rc);
4188
4189                 /* Migrate the object only if none of its parents are on the
4190                  * current MDT. */
4191                 if (parent_mdt_index != mdt_index)
4192                         continue;
4193
4194                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4195                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4196                        lname.ln_name, PFID(&fid));
4197                 rc = 1;
4198                 break;
4199         }
4200
4201         RETURN(rc);
4202 }
4203
4204 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4205                        struct md_object *sobj, const struct lu_name *lname,
4206                        struct md_object *tobj, struct md_attr *ma)
4207 {
4208         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4209         struct mdd_device       *mdd = mdo2mdd(pobj);
4210         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4211         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4212         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4213         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4214         bool                    created = false;
4215         int                     rc;
4216
4217         ENTRY;
4218         /* If the file will being migrated, it will check whether
4219          * the file is being opened by someone else right now */
4220         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4221         if (mdd_sobj->mod_count > 0) {
4222                 CDEBUG(D_OTHER,
4223                        "%s: "DFID"%s is already opened count %d: rc = %d\n",
4224                        mdd2obd_dev(mdd)->obd_name,
4225                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4226                        mdd_sobj->mod_count, -EBUSY);
4227                 mdd_read_unlock(env, mdd_sobj);
4228                 GOTO(put, rc = -EBUSY);
4229         }
4230         mdd_read_unlock(env, mdd_sobj);
4231
4232         rc = mdd_la_get(env, mdd_sobj, so_attr);
4233         if (rc != 0)
4234                 GOTO(put, rc);
4235
4236         rc = mdd_la_get(env, mdd_pobj, pattr);
4237         if (rc != 0)
4238                 GOTO(put, rc);
4239
4240         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4241         if (rc != 0) {
4242                 if (rc > 0)
4243                         rc = 0;
4244                 GOTO(put, rc);
4245         }
4246
4247         /* Sigh, it is impossible to finish all of migration in a single
4248          * transaction, for example migrating big directory entries to the
4249          * new MDT, it needs insert all of name entries of children in the
4250          * new directory.
4251          *
4252          * So migration will be done in multiple steps and transactions.
4253          *
4254          * 1. create an orphan object on the remote MDT in one transaction.
4255          * 2. migrate extend attributes to the new target file/directory.
4256          * 3. For directory, migrate the entries to the new MDT and update
4257          * linkEA of each children. Because we can not migrate all entries
4258          * in a single transaction, so the migrating directory will become
4259          * a striped directory during migration, so once the process is
4260          * interrupted, the directory is still accessible. (During lookup,
4261          * client will locate the name by searching both original and target
4262          * object).
4263          * 4. Finally, update the name/FID to point to the new file/directory
4264          * in a separate transaction.
4265          */
4266
4267         /* step 1: Check whether the orphan object has been created, and create
4268          * orphan object on the remote MDT if needed */
4269         if (!mdd_object_exists(mdd_tobj)) {
4270                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4271                                         lname, so_attr);
4272                 if (rc != 0)
4273                         GOTO(put, rc);
4274                 created = true;
4275         }
4276
4277         LASSERT(mdd_object_exists(mdd_tobj));
4278         /* step 2: migrate xattr */
4279         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4280         if (rc != 0)
4281                 GOTO(put, rc);
4282
4283         /* step 3: migrate name entries to the orphan object */
4284         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4285                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4286                 if (rc != 0)
4287                         GOTO(put, rc);
4288                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4289                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4290                         GOTO(put, rc = 0);
4291         } else {
4292                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4293         }
4294
4295         LASSERT(mdd_object_exists(mdd_tobj));
4296         /* step 4: update name entry to the new object */
4297         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4298                                      ma);
4299         if (rc != 0)
4300                 GOTO(put, rc);
4301
4302         /* newly created target was not locked, don't cache its attributes */
4303         if (created)
4304                 mdd_invalidate(env, tobj);
4305 put:
4306         RETURN(rc);
4307 }
4308
4309 const struct md_dir_operations mdd_dir_ops = {
4310         .mdo_is_subdir     = mdd_is_subdir,
4311         .mdo_lookup        = mdd_lookup,
4312         .mdo_create        = mdd_create,
4313         .mdo_rename        = mdd_rename,
4314         .mdo_link          = mdd_link,
4315         .mdo_unlink        = mdd_unlink,
4316         .mdo_create_data   = mdd_create_data,
4317         .mdo_migrate       = mdd_migrate,
4318 };