Whamcloud - gitweb
LU-6458 mdd: try linkEA first in mdd_parent_fid
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static inline int
61 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
62 {
63         if (!lu_name_is_valid(ln))
64                 return -EINVAL;
65         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
66                 return -ENAMETOOLONG;
67         else
68                 return 0;
69 }
70
71 /* Get FID from name and parent */
72 static int
73 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
74              const struct lu_attr *pattr, const struct lu_name *lname,
75              struct lu_fid* fid, int mask)
76 {
77         const char *name                = lname->ln_name;
78         const struct dt_key *key        = (const struct dt_key *)name;
79         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
80         struct mdd_device *m            = mdo2mdd(pobj);
81         struct dt_object *dir           = mdd_object_child(mdd_obj);
82         int rc;
83         ENTRY;
84
85         if (unlikely(mdd_is_dead_obj(mdd_obj)))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         } else if (!mdd_object_exists(mdd_obj)) {
92                 RETURN(-ESTALE);
93         }
94
95         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
96                                             MOR_TGT_PARENT);
97         if (rc)
98                 RETURN(rc);
99
100         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
101                    dt_try_as_dir(env, dir)))
102                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
103         else
104                 rc = -ENOTDIR;
105
106         RETURN(rc);
107 }
108
109 int mdd_lookup(const struct lu_env *env,
110                struct md_object *pobj, const struct lu_name *lname,
111                struct lu_fid *fid, struct md_op_spec *spec)
112 {
113         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
114         int rc;
115         ENTRY;
116
117         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
118         if (rc != 0)
119                 RETURN(rc);
120
121         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
122                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
123         RETURN(rc);
124 }
125
126 /**
127  * Get parent FID of the directory
128  *
129  * Read parent FID from linkEA, if that fails, then do lookup
130  * dotdot to get the parent FID.
131  *
132  * \param[in] env       execution environment
133  * \param[in] obj       object from which to find the parent FID
134  * \param[in] attr      attribute of the object
135  * \param[out] fid      fid to get the parent FID
136  *
137  * \retval              0 if getting the parent FID succeeds.
138  * \retval              negative errno if getting the parent FID fails.
139  **/
140 static inline int mdd_parent_fid(const struct lu_env *env,
141                                  struct mdd_object *obj,
142                                  const struct lu_attr *attr,
143                                  struct lu_fid *fid)
144 {
145         struct mdd_thread_info  *info = mdd_env_info(env);
146         struct linkea_data      ldata = { NULL };
147         struct lu_buf           *buf = &info->mti_link_buf;
148         struct lu_name          lname;
149         int                     rc = 0;
150
151         ENTRY;
152
153         LASSERT(S_ISDIR(mdd_object_type(obj)));
154
155         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
156         if (buf->lb_buf == NULL)
157                 GOTO(lookup, rc = 0);
158
159         ldata.ld_buf = buf;
160         rc = mdd_links_read(env, obj, &ldata);
161         if (rc != 0)
162                 GOTO(lookup, rc);
163
164         LASSERT(ldata.ld_leh != NULL);
165         /* Directory should only have 1 parent */
166         if (ldata.ld_leh->leh_reccount > 1)
167                 GOTO(lookup, rc);
168
169         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
170
171         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
172         if (likely(fid_is_sane(fid)))
173                 RETURN(0);
174 lookup:
175         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
176         RETURN(rc);
177 }
178
179 /*
180  * For root fid use special function, which does not compare version component
181  * of fid. Version component is different for root fids on all MDTs.
182  */
183 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
184 {
185         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
186                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
187 }
188
189 /*
190  * return 1: if lf is the fid of the ancestor of p1;
191  * return 0: if not;
192  *
193  * return -EREMOTE: if remote object is found, in this
194  * case fid of remote object is saved to @pf;
195  *
196  * otherwise: values < 0, errors.
197  */
198 static int mdd_is_parent(const struct lu_env *env,
199                         struct mdd_device *mdd,
200                         struct mdd_object *p1,
201                         const struct lu_attr *attr,
202                         const struct lu_fid *lf,
203                         struct lu_fid *pf)
204 {
205         struct mdd_object *parent = NULL;
206         struct lu_fid *pfid;
207         int rc;
208         ENTRY;
209
210         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
211         pfid = &mdd_env_info(env)->mti_fid;
212
213         /* Check for root first. */
214         if (mdd_is_root(mdd, mdo2fid(p1)))
215                 RETURN(0);
216
217         for(;;) {
218                 /* this is done recursively */
219                 rc = mdd_parent_fid(env, p1, attr, pfid);
220                 if (rc)
221                         GOTO(out, rc);
222                 if (mdd_is_root(mdd, pfid))
223                         GOTO(out, rc = 0);
224                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
225                         GOTO(out, rc = 0);
226                 if (lu_fid_eq(pfid, lf))
227                         GOTO(out, rc = 1);
228                 if (parent)
229                         mdd_object_put(env, parent);
230
231                 parent = mdd_object_find(env, mdd, pfid);
232                 if (IS_ERR(parent)) {
233                         GOTO(out, rc = PTR_ERR(parent));
234                 } else if (mdd_object_remote(parent)) {
235                         /*FIXME: Because of the restriction of rename in Phase I.
236                          * If the parent is remote, we just assumed lf is not the
237                          * parent of P1 for now */
238                         GOTO(out, rc = 0);
239                 }
240                 p1 = parent;
241         }
242         EXIT;
243 out:
244         if (parent && !IS_ERR(parent))
245                 mdd_object_put(env, parent);
246         return rc;
247 }
248
249 /*
250  * No permission check is needed.
251  *
252  * returns 1: if fid is ancestor of @mo;
253  * returns 0: if fid is not an ancestor of @mo;
254  *
255  * returns EREMOTE if remote object is found, fid of remote object is saved to
256  * @fid;
257  *
258  * returns < 0: if error
259  */
260 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
261                   const struct lu_fid *fid, struct lu_fid *sfid)
262 {
263         struct mdd_device *mdd = mdo2mdd(mo);
264         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
265         int rc;
266         ENTRY;
267
268         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
269                 RETURN(0);
270
271         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
272         if (rc != 0)
273                 RETURN(rc);
274
275         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
276         if (rc == 0) {
277                 /* found root */
278                 fid_zero(sfid);
279         } else if (rc == 1) {
280                 /* found @fid is parent */
281                 *sfid = *fid;
282                 rc = 0;
283         }
284         RETURN(rc);
285 }
286
287 /*
288  * Check that @dir contains no entries except (possibly) dot and dotdot.
289  *
290  * Returns:
291  *
292  *             0        empty
293  *      -ENOTDIR        not a directory object
294  *    -ENOTEMPTY        not empty
295  *           -ve        other error
296  *
297  */
298 static int mdd_dir_is_empty(const struct lu_env *env,
299                             struct mdd_object *dir)
300 {
301         struct dt_it     *it;
302         struct dt_object *obj;
303         const struct dt_it_ops *iops;
304         int result;
305         ENTRY;
306
307         obj = mdd_object_child(dir);
308         if (!dt_try_as_dir(env, obj))
309                 RETURN(-ENOTDIR);
310
311         iops = &obj->do_index_ops->dio_it;
312         it = iops->init(env, obj, LUDA_64BITHASH);
313         if (!IS_ERR(it)) {
314                 result = iops->get(env, it, (const struct dt_key *)"");
315                 if (result > 0) {
316                         int i;
317                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
318                                 result = iops->next(env, it);
319                         if (result == 0)
320                                 result = -ENOTEMPTY;
321                         else if (result == 1)
322                                 result = 0;
323                 } else if (result == 0)
324                         /*
325                          * Huh? Index contains no zero key?
326                          */
327                         result = -EIO;
328
329                 iops->put(env, it);
330                 iops->fini(env, it);
331         } else
332                 result = PTR_ERR(it);
333         RETURN(result);
334 }
335
336 /**
337  * Determine if the target object can be hard linked, and right now it only
338  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
339  * directory nlink count might exceed the maximum link count(see
340  * osd_object_ref_add), so it only check nlink for non-directories.
341  *
342  * \param[in] env       thread environment
343  * \param[in] obj       object being linked to
344  * \param[in] la        attributes of \a obj
345  *
346  * \retval              0 if \a obj can be hard linked
347  * \retval              negative error if \a obj is a directory or has too
348  *                      many links
349  */
350 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
351                           const struct lu_attr *la)
352 {
353         struct mdd_device *m = mdd_obj2mdd_dev(obj);
354         ENTRY;
355
356         LASSERT(la != NULL);
357
358         /* Subdir count limitation can be broken through
359          * (see osd_object_ref_add), so only check non-directory here. */
360         if (!S_ISDIR(la->la_mode) &&
361             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
362                 RETURN(-EMLINK);
363
364         RETURN(0);
365 }
366
367 /**
368  * Check whether it may create the cobj under the pobj.
369  *
370  * \param[in] env       execution environment
371  * \param[in] pobj      the parent directory
372  * \param[in] pattr     the attribute of the parent directory
373  * \param[in] cobj      the child to be created
374  * \param[in] check_perm        if check WRITE|EXEC permission for parent
375  *
376  * \retval              = 0 create the child under this dir is allowed
377  * \retval              negative errno create the child under this dir is
378  *                      not allowed
379  */
380 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
381                    const struct lu_attr *pattr, struct mdd_object *cobj,
382                    bool check_perm)
383 {
384         struct mdd_thread_info *info = mdd_env_info(env);
385         struct lu_buf   *xbuf;
386         int rc = 0;
387         ENTRY;
388
389         if (cobj && mdd_object_exists(cobj))
390                 RETURN(-EEXIST);
391
392         if (mdd_is_dead_obj(pobj))
393                 RETURN(-ENOENT);
394
395         /* If the parent is a sub-stripe, check whether it is dead */
396         xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
397         rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV);
398         if (unlikely(rc > 0)) {
399                 struct lmv_mds_md_v1  *lmv1 = xbuf->lb_buf;
400
401                 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
402                     le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
403                         RETURN(-ESTALE);
404         }
405         rc = 0;
406
407         if (check_perm)
408                 rc = mdd_permission_internal_locked(env, pobj, pattr,
409                                                     MAY_WRITE | MAY_EXEC,
410                                                     MOR_TGT_PARENT);
411         RETURN(rc);
412 }
413
414 /*
415  * Check whether can unlink from the pobj in the case of "cobj == NULL".
416  */
417 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
418                    const struct lu_attr *pattr, const struct lu_attr *attr)
419 {
420         int rc;
421         ENTRY;
422
423         if (mdd_is_dead_obj(pobj))
424                 RETURN(-ENOENT);
425
426         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
427                 RETURN(-EPERM);
428
429         rc = mdd_permission_internal_locked(env, pobj, pattr,
430                                             MAY_WRITE | MAY_EXEC,
431                                             MOR_TGT_PARENT);
432         if (rc != 0)
433                 RETURN(rc);
434
435         if (pattr->la_flags & LUSTRE_APPEND_FL)
436                 RETURN(-EPERM);
437
438         RETURN(rc);
439 }
440
441 /*
442  * pobj == NULL is remote ops case, under such case, pobj's
443  * VTX feature has been checked already, no need check again.
444  */
445 static inline int mdd_is_sticky(const struct lu_env *env,
446                                 struct mdd_object *pobj,
447                                 const struct lu_attr *pattr,
448                                 struct mdd_object *cobj,
449                                 const struct lu_attr *cattr)
450 {
451         struct lu_ucred *uc = lu_ucred_assert(env);
452
453         if (pobj != NULL) {
454                 LASSERT(pattr != NULL);
455                 if (!(pattr->la_mode & S_ISVTX) ||
456                     (pattr->la_uid == uc->uc_fsuid))
457                         return 0;
458         }
459
460         LASSERT(cattr != NULL);
461         if (cattr->la_uid == uc->uc_fsuid)
462                 return 0;
463
464         return !md_capable(uc, CFS_CAP_FOWNER);
465 }
466
467 static int mdd_may_delete_entry(const struct lu_env *env,
468                                 struct mdd_object *pobj,
469                                 const struct lu_attr *pattr,
470                                 int check_perm)
471 {
472         ENTRY;
473
474         LASSERT(pobj != NULL);
475         if (!mdd_object_exists(pobj))
476                 RETURN(-ENOENT);
477
478         if (mdd_is_dead_obj(pobj))
479                 RETURN(-ENOENT);
480
481         if (check_perm) {
482                 int rc;
483                 rc = mdd_permission_internal_locked(env, pobj, pattr,
484                                             MAY_WRITE | MAY_EXEC,
485                                             MOR_TGT_PARENT);
486                 if (rc)
487                         RETURN(rc);
488         }
489
490         if (pattr->la_flags & LUSTRE_APPEND_FL)
491                 RETURN(-EPERM);
492
493         RETURN(0);
494 }
495
496 /*
497  * Check whether it may delete the cobj from the pobj.
498  * pobj maybe NULL
499  */
500 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
501                    const struct lu_attr *tpattr, struct mdd_object *tobj,
502                    const struct lu_attr *tattr, const struct lu_attr *cattr,
503                    int check_perm, int check_empty)
504 {
505         int rc = 0;
506         ENTRY;
507
508         if (tpobj) {
509                 LASSERT(tpattr != NULL);
510                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
511                 if (rc != 0)
512                         RETURN(rc);
513         }
514
515         if (tobj == NULL)
516                 RETURN(0);
517
518         if (!mdd_object_exists(tobj))
519                 RETURN(-ENOENT);
520
521         if (mdd_is_dead_obj(tobj))
522                 RETURN(-ESTALE);
523
524         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
525                 RETURN(-EPERM);
526
527         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
528                 RETURN(-EPERM);
529
530         /* additional check the rename case */
531         if (cattr) {
532                 if (S_ISDIR(cattr->la_mode)) {
533                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
534
535                         if (!S_ISDIR(tattr->la_mode))
536                                 RETURN(-ENOTDIR);
537
538                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
539                                 RETURN(-EBUSY);
540                 } else if (S_ISDIR(tattr->la_mode))
541                         RETURN(-EISDIR);
542         }
543
544         if (S_ISDIR(tattr->la_mode) && check_empty)
545                 rc = mdd_dir_is_empty(env, tobj);
546
547         RETURN(rc);
548 }
549
550 /**
551  * Check whether it can create the link file(linked to @src_obj) under
552  * the target directory(@tgt_obj), and src_obj has been locked by
553  * mdd_write_lock.
554  *
555  * \param[in] env       execution environment
556  * \param[in] tgt_obj   the target directory
557  * \param[in] tattr     attributes of target directory
558  * \param[in] lname     the link name
559  * \param[in] src_obj   source object for link
560  * \param[in] cattr     attributes for source object
561  *
562  * \retval              = 0 it is allowed to create the link file under tgt_obj
563  * \retval              negative error not allowed to create the link file
564  */
565 static int mdd_link_sanity_check(const struct lu_env *env,
566                                  struct mdd_object *tgt_obj,
567                                  const struct lu_attr *tattr,
568                                  const struct lu_name *lname,
569                                  struct mdd_object *src_obj,
570                                  const struct lu_attr *cattr)
571 {
572         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
573         int rc = 0;
574         ENTRY;
575
576         if (!mdd_object_exists(src_obj))
577                 RETURN(-ENOENT);
578
579         if (mdd_is_dead_obj(src_obj))
580                 RETURN(-ESTALE);
581
582         /* Local ops, no lookup before link, check filename length here. */
583         rc = mdd_name_check(m, lname);
584         if (rc < 0)
585                 RETURN(rc);
586
587         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
588                 RETURN(-EPERM);
589
590         if (S_ISDIR(mdd_object_type(src_obj)))
591                 RETURN(-EPERM);
592
593         LASSERT(src_obj != tgt_obj);
594         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
595         if (rc != 0)
596                 RETURN(rc);
597
598         rc = __mdd_may_link(env, src_obj, cattr);
599
600         RETURN(rc);
601 }
602
603 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
604                                    const char *name, struct thandle *handle)
605 {
606         struct dt_object *next = mdd_object_child(pobj);
607         int rc;
608         ENTRY;
609
610         if (dt_try_as_dir(env, next))
611                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
612         else
613                 rc = -ENOTDIR;
614
615         RETURN(rc);
616 }
617
618 static int __mdd_index_insert_only(const struct lu_env *env,
619                                    struct mdd_object *pobj,
620                                    const struct lu_fid *lf, __u32 type,
621                                    const char *name,
622                                    struct thandle *handle)
623 {
624         struct dt_object *next = mdd_object_child(pobj);
625         int               rc;
626         ENTRY;
627
628         if (dt_try_as_dir(env, next)) {
629                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
630                 struct lu_ucred         *uc  = lu_ucred_check(env);
631                 int                      ignore_quota;
632
633                 rec->rec_fid = lf;
634                 rec->rec_type = type;
635                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
636                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
637                                (const struct dt_key *)name, handle,
638                                ignore_quota);
639         } else {
640                 rc = -ENOTDIR;
641         }
642         RETURN(rc);
643 }
644
645 /* insert named index, add reference if isdir */
646 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
647                               const struct lu_fid *lf, __u32 type,
648                               const char *name, struct thandle *handle)
649 {
650         int rc;
651         ENTRY;
652
653         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
654         if (rc == 0 && S_ISDIR(type)) {
655                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
656                 mdo_ref_add(env, pobj, handle);
657                 mdd_write_unlock(env, pobj);
658         }
659
660         RETURN(rc);
661 }
662
663 /* delete named index, drop reference if isdir */
664 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
665                               const char *name, int is_dir,
666                               struct thandle *handle)
667 {
668         int               rc;
669         ENTRY;
670
671         rc = __mdd_index_delete_only(env, pobj, name, handle);
672         if (rc == 0 && is_dir) {
673                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
674                 mdo_ref_del(env, pobj, handle);
675                 mdd_write_unlock(env, pobj);
676         }
677
678         RETURN(rc);
679 }
680
681 static int mdd_llog_record_calc_size(const struct lu_env *env,
682                                      const struct lu_name *tname,
683                                      const struct lu_name *sname)
684 {
685         const struct lu_ucred   *uc = lu_ucred(env);
686         enum changelog_rec_flags crf = 0;
687         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
688                                             sizeof(struct changelog_rec);
689
690         if (sname != NULL)
691                 crf |= CLF_RENAME;
692
693         if (uc != NULL && uc->uc_jobid[0] != '\0')
694                 crf |= CLF_JOBID;
695
696         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
697                              (tname != NULL ? tname->ln_namelen : 0) +
698                              (sname != NULL ? 1 + sname->ln_namelen : 0));
699 }
700
701 int mdd_declare_changelog_store(const struct lu_env *env,
702                                 struct mdd_device *mdd,
703                                 const struct lu_name *tname,
704                                 const struct lu_name *sname,
705                                 struct thandle *handle)
706 {
707         struct obd_device               *obd = mdd2obd_dev(mdd);
708         struct llog_ctxt                *ctxt;
709         struct llog_changelog_rec       *rec;
710         struct lu_buf                   *buf;
711         struct thandle                  *llog_th;
712         int                              reclen;
713         int                              rc;
714
715         /* Not recording */
716         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
717                 return 0;
718
719         reclen = mdd_llog_record_calc_size(env, tname, sname);
720         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
721         if (buf->lb_buf == NULL)
722                 return -ENOMEM;
723
724         rec = buf->lb_buf;
725         rec->cr_hdr.lrh_len = reclen;
726         rec->cr_hdr.lrh_type = CHANGELOG_REC;
727
728         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
729         if (ctxt == NULL)
730                 return -ENXIO;
731
732         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
733         if (IS_ERR(llog_th))
734                 GOTO(out_put, rc = PTR_ERR(llog_th));
735
736         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
737
738 out_put:
739         llog_ctxt_put(ctxt);
740
741         return rc;
742 }
743
744 /** Add a changelog entry \a rec to the changelog llog
745  * \param mdd
746  * \param rec
747  * \param handle - currently ignored since llogs start their own transaction;
748  *              this will hopefully be fixed in llog rewrite
749  * \retval 0 ok
750  */
751 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
752                         struct llog_changelog_rec *rec, struct thandle *th)
753 {
754         struct obd_device       *obd = mdd2obd_dev(mdd);
755         struct llog_ctxt        *ctxt;
756         struct thandle          *llog_th;
757         int                      rc;
758
759         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
760                                             changelog_rec_varsize(&rec->cr));
761
762         /* llog_lvfs_write_rec sets the llog tail len */
763         rec->cr_hdr.lrh_type = CHANGELOG_REC;
764         rec->cr.cr_time = cl_time();
765
766         spin_lock(&mdd->mdd_cl.mc_lock);
767         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
768          * but as long as the MDD transactions are ordered correctly for e.g.
769          * rename conflicts, I don't think this should matter. */
770         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
771         spin_unlock(&mdd->mdd_cl.mc_lock);
772
773         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
774         if (ctxt == NULL)
775                 return -ENXIO;
776
777         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
778         if (IS_ERR(llog_th))
779                 GOTO(out_put, rc = PTR_ERR(llog_th));
780
781         /* nested journal transaction */
782         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
783
784 out_put:
785         llog_ctxt_put(ctxt);
786         if (rc > 0)
787                 rc = 0;
788         return rc;
789 }
790
791 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
792                                          const struct lu_fid *sfid,
793                                          const struct lu_fid *spfid,
794                                          const struct lu_name *sname)
795 {
796         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
797         size_t                           extsize = sname->ln_namelen + 1;
798
799         LASSERT(sfid != NULL);
800         LASSERT(spfid != NULL);
801         LASSERT(sname != NULL);
802
803         rnm->cr_sfid = *sfid;
804         rnm->cr_spfid = *spfid;
805
806         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
807         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
808         rec->cr_namelen += extsize;
809 }
810
811 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
812 {
813         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
814
815         if (jobid == NULL || jobid[0] == '\0')
816                 return;
817
818         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
819 }
820
821 /** Store a namespace change changelog record
822  * If this fails, we must fail the whole transaction; we don't
823  * want the change to commit without the log entry.
824  * \param target - mdd_object of change
825  * \param tpfid - target parent dir/object fid
826  * \param sfid - source object fid
827  * \param spfid - source parent fid
828  * \param tname - target name string
829  * \param sname - source name string
830  * \param handle - transacion handle
831  */
832 int mdd_changelog_ns_store(const struct lu_env *env,
833                            struct mdd_device *mdd,
834                            enum changelog_rec_type type,
835                            enum changelog_rec_flags crf,
836                            struct mdd_object *target,
837                            const struct lu_fid *tpfid,
838                            const struct lu_fid *sfid,
839                            const struct lu_fid *spfid,
840                            const struct lu_name *tname,
841                            const struct lu_name *sname,
842                            struct thandle *handle)
843 {
844         const struct lu_ucred           *uc = lu_ucred(env);
845         struct llog_changelog_rec       *rec;
846         struct lu_buf                   *buf;
847         int                              reclen;
848         int                              rc;
849         ENTRY;
850
851         /* Not recording */
852         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
853                 RETURN(0);
854
855         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
856                 RETURN(0);
857
858         LASSERT(tpfid != NULL);
859         LASSERT(tname != NULL);
860         LASSERT(handle != NULL);
861
862         reclen = mdd_llog_record_calc_size(env, tname, sname);
863         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
864         if (buf->lb_buf == NULL)
865                 RETURN(-ENOMEM);
866         rec = buf->lb_buf;
867
868         crf &= CLF_FLAGMASK;
869
870         if (uc != NULL && uc->uc_jobid[0] != '\0')
871                 crf |= CLF_JOBID;
872
873         if (sname != NULL)
874                 crf |= CLF_RENAME;
875         else
876                 crf |= CLF_VERSION;
877
878         rec->cr.cr_flags = crf;
879         rec->cr.cr_type = (__u32)type;
880         rec->cr.cr_pfid = *tpfid;
881         rec->cr.cr_namelen = tname->ln_namelen;
882         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
883
884         if (crf & CLF_RENAME)
885                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
886
887         if (crf & CLF_JOBID)
888                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
889
890         if (likely(target != NULL)) {
891                 rec->cr.cr_tfid = *mdo2fid(target);
892                 target->mod_cltime = cfs_time_current_64();
893         } else {
894                 fid_zero(&rec->cr.cr_tfid);
895         }
896
897         rc = mdd_changelog_store(env, mdd, rec, handle);
898         if (rc < 0) {
899                 CERROR("%s: cannot store changelog record: type = %d, "
900                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
901                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
902                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
903                 return -EFAULT;
904         }
905
906         return 0;
907 }
908
909 static int __mdd_links_add(const struct lu_env *env,
910                            struct mdd_object *mdd_obj,
911                            struct linkea_data *ldata,
912                            const struct lu_name *lname,
913                            const struct lu_fid *pfid,
914                            int first, int check)
915 {
916         int rc;
917
918         if (ldata->ld_leh == NULL) {
919                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
920                 if (rc) {
921                         if (rc != -ENODATA)
922                                 return rc;
923                         rc = linkea_data_new(ldata,
924                                              &mdd_env_info(env)->mti_link_buf);
925                         if (rc)
926                                 return rc;
927                 }
928         }
929
930         if (check) {
931                 rc = linkea_links_find(ldata, lname, pfid);
932                 if (rc && rc != -ENOENT)
933                         return rc;
934                 if (rc == 0)
935                         return -EEXIST;
936         }
937
938         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
939                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
940
941                 *tfid = *pfid;
942                 tfid->f_ver = ~0;
943                 linkea_add_buf(ldata, lname, tfid);
944         }
945
946         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
947                 linkea_add_buf(ldata, lname, pfid);
948
949         return linkea_add_buf(ldata, lname, pfid);
950 }
951
952 static int __mdd_links_del(const struct lu_env *env,
953                            struct mdd_object *mdd_obj,
954                            struct linkea_data *ldata,
955                            const struct lu_name *lname,
956                            const struct lu_fid *pfid)
957 {
958         int rc;
959
960         if (ldata->ld_leh == NULL) {
961                 rc = mdd_links_read(env, mdd_obj, ldata);
962                 if (rc)
963                         return rc;
964         }
965
966         rc = linkea_links_find(ldata, lname, pfid);
967         if (rc)
968                 return rc;
969
970         linkea_del_buf(ldata, lname);
971         return 0;
972 }
973
974 static int mdd_linkea_prepare(const struct lu_env *env,
975                               struct mdd_object *mdd_obj,
976                               const struct lu_fid *oldpfid,
977                               const struct lu_name *oldlname,
978                               const struct lu_fid *newpfid,
979                               const struct lu_name *newlname,
980                               int first, int check,
981                               struct linkea_data *ldata)
982 {
983         int rc = 0;
984         int rc2 = 0;
985         ENTRY;
986
987         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
988                 return 0;
989
990         LASSERT(oldpfid != NULL || newpfid != NULL);
991
992         if (mdd_obj->mod_flags & DEAD_OBJ) {
993                 /* Prevent linkea to be updated which is NOT necessary. */
994                 ldata->ld_reclen = 0;
995                 /* No more links, don't bother */
996                 RETURN(0);
997         }
998
999         if (oldpfid != NULL) {
1000                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1001                 if (rc) {
1002                         if ((check == 1) ||
1003                             (rc != -ENODATA && rc != -ENOENT))
1004                                 RETURN(rc);
1005                         /* No changes done. */
1006                         rc = 0;
1007                 }
1008         }
1009
1010         /* If renaming, add the new record */
1011         if (newpfid != NULL) {
1012                 /* even if the add fails, we still delete the out-of-date
1013                  * old link */
1014                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1015                                       first, check);
1016         }
1017
1018         rc = rc != 0 ? rc : rc2;
1019
1020         RETURN(rc);
1021 }
1022
1023 int mdd_links_rename(const struct lu_env *env,
1024                      struct mdd_object *mdd_obj,
1025                      const struct lu_fid *oldpfid,
1026                      const struct lu_name *oldlname,
1027                      const struct lu_fid *newpfid,
1028                      const struct lu_name *newlname,
1029                      struct thandle *handle,
1030                      struct linkea_data *ldata,
1031                      int first, int check)
1032 {
1033         int rc = 0;
1034         ENTRY;
1035
1036         if (ldata == NULL) {
1037                 ldata = &mdd_env_info(env)->mti_link_data;
1038                 memset(ldata, 0, sizeof(*ldata));
1039                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1040                                         newpfid, newlname, first, check,
1041                                         ldata);
1042                 if (rc != 0)
1043                         GOTO(out, rc);
1044         }
1045
1046         if (ldata->ld_reclen != 0)
1047                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1048         EXIT;
1049 out:
1050         if (rc != 0) {
1051                 int error = 1;
1052                 if (rc == -EOVERFLOW || rc == -ENOSPC)
1053                         error = 0;
1054                 if (oldpfid == NULL)
1055                         CDEBUG(error ? D_ERROR : D_OTHER,
1056                                "link_ea add '%.*s' failed %d "DFID"\n",
1057                                newlname->ln_namelen, newlname->ln_name,
1058                                rc, PFID(mdd_object_fid(mdd_obj)));
1059                 else if (newpfid == NULL)
1060                         CDEBUG(error ? D_ERROR : D_OTHER,
1061                                "link_ea del '%.*s' failed %d "DFID"\n",
1062                                oldlname->ln_namelen, oldlname->ln_name,
1063                                rc, PFID(mdd_object_fid(mdd_obj)));
1064                 else
1065                         CDEBUG(error ? D_ERROR : D_OTHER,
1066                                "link_ea rename '%.*s'->'%.*s' failed %d "
1067                                DFID"\n",
1068                                oldlname->ln_namelen, oldlname->ln_name,
1069                                newlname->ln_namelen, newlname->ln_name,
1070                                rc, PFID(mdd_object_fid(mdd_obj)));
1071         }
1072
1073         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1074                 /* if we vmalloced a large buffer drop it */
1075                 lu_buf_free(ldata->ld_buf);
1076
1077         return rc;
1078 }
1079
1080 static inline int mdd_links_add(const struct lu_env *env,
1081                                 struct mdd_object *mdd_obj,
1082                                 const struct lu_fid *pfid,
1083                                 const struct lu_name *lname,
1084                                 struct thandle *handle,
1085                                 struct linkea_data *ldata, int first)
1086 {
1087         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1088                                 pfid, lname, handle, ldata, first, 0);
1089 }
1090
1091 static inline int mdd_links_del(const struct lu_env *env,
1092                                 struct mdd_object *mdd_obj,
1093                                 const struct lu_fid *pfid,
1094                                 const struct lu_name *lname,
1095                                 struct thandle *handle)
1096 {
1097         return mdd_links_rename(env, mdd_obj, pfid, lname,
1098                                 NULL, NULL, handle, NULL, 0, 0);
1099 }
1100
1101 /** Read the link EA into a temp buffer.
1102  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1103  * A pointer to the buffer is stored in \a ldata::ld_buf.
1104  *
1105  * \retval 0 or error
1106  */
1107 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1108                    struct linkea_data *ldata)
1109 {
1110         int rc;
1111
1112         if (!mdd_object_exists(mdd_obj))
1113                 return -ENODATA;
1114
1115         /* First try a small buf */
1116         LASSERT(env != NULL);
1117         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1118                                                PAGE_CACHE_SIZE);
1119         if (ldata->ld_buf->lb_buf == NULL)
1120                 return -ENOMEM;
1121
1122         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
1123         if (rc == -ERANGE) {
1124                 /* Buf was too small, figure out what we need. */
1125                 lu_buf_free(ldata->ld_buf);
1126                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1127                                    XATTR_NAME_LINK);
1128                 if (rc < 0)
1129                         return rc;
1130                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1131                 if (ldata->ld_buf->lb_buf == NULL)
1132                         return -ENOMEM;
1133                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1134                                   XATTR_NAME_LINK);
1135         }
1136         if (rc < 0) {
1137                 lu_buf_free(ldata->ld_buf);
1138                 ldata->ld_buf = NULL;
1139                 return rc;
1140         }
1141
1142         return linkea_init(ldata);
1143 }
1144
1145 /** Read the link EA into a temp buffer.
1146  * Uses the name_buf since it is generally large.
1147  * \retval IS_ERR err
1148  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1149  */
1150 struct lu_buf *mdd_links_get(const struct lu_env *env,
1151                              struct mdd_object *mdd_obj)
1152 {
1153         struct linkea_data ldata = { NULL };
1154         int rc;
1155
1156         rc = mdd_links_read(env, mdd_obj, &ldata);
1157         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1158 }
1159
1160 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1161                     struct linkea_data *ldata, struct thandle *handle)
1162 {
1163         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1164                                                      ldata->ld_leh->leh_len);
1165         int                 rc;
1166
1167         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1168                 return 0;
1169
1170         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1171         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1172             mdd_object_remote(mdd_obj) == 0) {
1173                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1174                 struct thandle  *sub_th;
1175
1176                 /* XXX: If the linkEA is overflow, then we need to notify the
1177                  *      namespace LFSCK to skip "nlink" attribute verification
1178                  *      on this object to avoid the "nlink" to be shrinked by
1179                  *      wrong. It may be not good an interaction with LFSCK
1180                  *      like this. We will consider to replace it with other
1181                  *      mechanism in future. LU-5802. */
1182                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1183                                LFSCK_TYPE_NAMESPACE);
1184
1185                 sub_th = thandle_get_sub_by_dt(env, handle,
1186                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1187                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1188                                 lr, sub_th);
1189         }
1190
1191         return rc;
1192 }
1193
1194 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1195                           struct thandle *handle, struct linkea_data *ldata,
1196                           enum mdd_links_add_overflow overflow)
1197 {
1198         int     rc;
1199         int     ea_len;
1200         void    *linkea;
1201
1202         if (ldata != NULL && ldata->ld_leh != NULL) {
1203                 ea_len = ldata->ld_leh->leh_len;
1204                 linkea = ldata->ld_buf->lb_buf;
1205         } else {
1206                 ea_len = DEFAULT_LINKEA_SIZE;
1207                 linkea = NULL;
1208         }
1209
1210         /* XXX: max size? */
1211         rc = mdo_declare_xattr_set(env, mdd_obj,
1212                                    mdd_buf_get_const(env, linkea, ea_len),
1213                                    XATTR_NAME_LINK, 0, handle);
1214         if (rc != 0)
1215                 return rc;
1216
1217         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1218                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1219                 struct thandle  *sub_th;
1220
1221                 /* XXX: If the linkEA is overflow, then we need to notify the
1222                  *      namespace LFSCK to skip "nlink" attribute verification
1223                  *      on this object to avoid the "nlink" to be shrinked by
1224                  *      wrong. It may be not good an interaction with LFSCK
1225                  *      like this. We will consider to replace it with other
1226                  *      mechanism in future. LU-5802. */
1227                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1228                                LFSCK_TYPE_NAMESPACE);
1229
1230                 sub_th = thandle_get_sub_by_dt(env, handle,
1231                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1232                 rc = lfsck_in_notify(env,
1233                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1234                                      lr, sub_th);
1235         }
1236
1237         return rc;
1238 }
1239
1240 static inline int mdd_declare_links_del(const struct lu_env *env,
1241                                         struct mdd_object *c,
1242                                         struct thandle *handle)
1243 {
1244         int rc = 0;
1245
1246         /* For directory, the linkEA will be removed together
1247          * with the object. */
1248         if (!S_ISDIR(mdd_object_type(c)))
1249                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1250
1251         return rc;
1252 }
1253
1254 static int mdd_declare_link(const struct lu_env *env,
1255                             struct mdd_device *mdd,
1256                             struct mdd_object *p,
1257                             struct mdd_object *c,
1258                             const struct lu_name *name,
1259                             struct thandle *handle,
1260                             struct lu_attr *la,
1261                             struct linkea_data *data)
1262 {
1263         int rc;
1264
1265         rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
1266                                       name->ln_name, handle);
1267         if (rc != 0)
1268                 return rc;
1269
1270         rc = mdo_declare_ref_add(env, c, handle);
1271         if (rc != 0)
1272                 return rc;
1273
1274         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1275                 rc = mdo_declare_ref_add(env, c, handle);
1276                 if (rc != 0)
1277                         return rc;
1278         }
1279
1280         la->la_valid = LA_CTIME | LA_MTIME;
1281         rc = mdo_declare_attr_set(env, p, la, handle);
1282         if (rc != 0)
1283                 return rc;
1284
1285         la->la_valid = LA_CTIME;
1286         rc = mdo_declare_attr_set(env, c, la, handle);
1287         if (rc != 0)
1288                 return rc;
1289
1290         rc = mdd_declare_links_add(env, c, handle, data,
1291                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1292         if (rc != 0)
1293                 return rc;
1294
1295         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1296
1297         return rc;
1298 }
1299
1300 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1301                     struct md_object *src_obj, const struct lu_name *lname,
1302                     struct md_attr *ma)
1303 {
1304         const char *name = lname->ln_name;
1305         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1306         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1307         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1308         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1309         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1310         struct mdd_device *mdd = mdo2mdd(src_obj);
1311         struct thandle *handle;
1312         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1313         int rc;
1314         ENTRY;
1315
1316         rc = mdd_la_get(env, mdd_sobj, cattr);
1317         if (rc != 0)
1318                 RETURN(rc);
1319
1320         rc = mdd_la_get(env, mdd_tobj, tattr);
1321         if (rc != 0)
1322                 RETURN(rc);
1323
1324         handle = mdd_trans_create(env, mdd);
1325         if (IS_ERR(handle))
1326                 GOTO(out_pending, rc = PTR_ERR(handle));
1327
1328         memset(ldata, 0, sizeof(*ldata));
1329
1330         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1331         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1332
1333         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1334                               la, ldata);
1335         if (rc)
1336                 GOTO(stop, rc);
1337
1338         rc = mdd_trans_start(env, mdd, handle);
1339         if (rc)
1340                 GOTO(stop, rc);
1341
1342         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1343         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1344                                    cattr);
1345         if (rc)
1346                 GOTO(out_unlock, rc);
1347
1348         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1349                 rc = mdo_ref_add(env, mdd_sobj, handle);
1350                 if (rc != 0)
1351                         GOTO(out_unlock, rc);
1352         }
1353
1354         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1355                 rc = mdo_ref_add(env, mdd_sobj, handle);
1356                 if (rc != 0)
1357                         GOTO(out_unlock, rc);
1358         }
1359
1360         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
1361                 struct lu_fid tfid = *mdo2fid(mdd_sobj);
1362
1363                 tfid.f_oid++;
1364                 rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
1365                                              mdd_object_type(mdd_sobj),
1366                                              name, handle);
1367         } else {
1368                 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1369                                              mdd_object_type(mdd_sobj),
1370                                              name, handle);
1371         }
1372
1373         if (rc != 0) {
1374                 mdo_ref_del(env, mdd_sobj, handle);
1375                 GOTO(out_unlock, rc);
1376         }
1377
1378         la->la_valid = LA_CTIME | LA_MTIME;
1379         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1380         if (rc)
1381                 GOTO(out_unlock, rc);
1382
1383         la->la_valid = LA_CTIME;
1384         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1385         if (rc == 0) {
1386                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1387                                         mdo2fid(mdd_tobj), lname, 0, 0,
1388                                         ldata);
1389                 if (rc == 0)
1390                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1391                                       lname, handle, ldata, 0);
1392                 /* The failure of links_add should not cause the link
1393                  * failure, reset rc here */
1394                 rc = 0;
1395         }
1396         EXIT;
1397 out_unlock:
1398         mdd_write_unlock(env, mdd_sobj);
1399         if (rc == 0)
1400                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1401                                             mdo2fid(mdd_tobj), NULL, NULL,
1402                                             lname, NULL, handle);
1403 stop:
1404         mdd_trans_stop(env, mdd, rc, handle);
1405
1406         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1407                 /* if we vmalloced a large buffer drop it */
1408                 lu_buf_free(ldata->ld_buf);
1409 out_pending:
1410         return rc;
1411 }
1412
1413 static int mdd_mark_dead_object(const struct lu_env *env,
1414                                 struct mdd_object *obj, struct thandle *handle,
1415                                 bool declare)
1416 {
1417         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1418         int rc;
1419
1420         if (!declare)
1421                 obj->mod_flags |= DEAD_OBJ;
1422
1423         if (!S_ISDIR(mdd_object_type(obj)))
1424                 return 0;
1425
1426         attr->la_valid = LA_FLAGS;
1427         attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
1428
1429         if (declare)
1430                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1431         else
1432                 rc = mdo_attr_set(env, obj, attr, handle);
1433
1434         return rc;
1435 }
1436
1437 static int mdd_declare_finish_unlink(const struct lu_env *env,
1438                                      struct mdd_object *obj,
1439                                      struct thandle *handle)
1440 {
1441         int     rc;
1442
1443         rc = mdd_mark_dead_object(env, obj, handle, true);
1444         if (rc != 0)
1445                 return rc;
1446
1447         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1448         if (rc != 0)
1449                 return rc;
1450
1451         rc = mdo_declare_destroy(env, obj, handle);
1452         if (rc != 0)
1453                 return rc;
1454
1455         return mdd_declare_links_del(env, obj, handle);
1456 }
1457
1458 /* caller should take a lock before calling */
1459 int mdd_finish_unlink(const struct lu_env *env,
1460                       struct mdd_object *obj, struct md_attr *ma,
1461                       const struct mdd_object *pobj,
1462                       const struct lu_name *lname,
1463                       struct thandle *th)
1464 {
1465         int rc = 0;
1466         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1467         ENTRY;
1468
1469         LASSERT(mdd_write_locked(env, obj) != 0);
1470
1471         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1472                 rc = mdd_mark_dead_object(env, obj, th, false);
1473                 if (rc != 0)
1474                         RETURN(rc);
1475
1476                 /* add new orphan and the object
1477                  * will be deleted during mdd_close() */
1478                 if (obj->mod_count) {
1479                         rc = __mdd_orphan_add(env, obj, th);
1480                         if (rc == 0)
1481                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1482                                         "orphan list, open count = %d\n",
1483                                         PFID(mdd_object_fid(obj)),
1484                                         obj->mod_count);
1485                         else
1486                                 CERROR("Object "DFID" fail to be an orphan, "
1487                                        "open count = %d, maybe cause failed "
1488                                        "open replay\n",
1489                                         PFID(mdd_object_fid(obj)),
1490                                         obj->mod_count);
1491                 } else {
1492                         rc = mdo_destroy(env, obj, th);
1493                 }
1494         } else if (!is_dir) {
1495                 /* old files may not have link ea; ignore errors */
1496                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1497         }
1498
1499         RETURN(rc);
1500 }
1501
1502 /*
1503  * pobj maybe NULL
1504  * has mdd_write_lock on cobj already, but not on pobj yet
1505  */
1506 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1507                             const struct lu_attr *pattr,
1508                             struct mdd_object *cobj,
1509                             const struct lu_attr *cattr)
1510 {
1511         int rc;
1512         ENTRY;
1513
1514         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1520                               struct mdd_object *p, struct mdd_object *c,
1521                               const struct lu_name *name, struct md_attr *ma,
1522                               struct thandle *handle, int no_name, int is_dir)
1523 {
1524         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1525         int              rc;
1526
1527         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1528                 if (likely(no_name == 0)) {
1529                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1530                                                       handle);
1531                         if (rc != 0)
1532                                 return rc;
1533                 }
1534
1535                 if (is_dir != 0) {
1536                         rc = mdo_declare_ref_del(env, p, handle);
1537                         if (rc != 0)
1538                                 return rc;
1539                 }
1540         }
1541
1542         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1543         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1544         la->la_valid = LA_CTIME | LA_MTIME;
1545         rc = mdo_declare_attr_set(env, p, la, handle);
1546         if (rc)
1547                 return rc;
1548
1549         if (c != NULL) {
1550                 rc = mdo_declare_ref_del(env, c, handle);
1551                 if (rc)
1552                         return rc;
1553
1554                 rc = mdo_declare_ref_del(env, c, handle);
1555                 if (rc)
1556                         return rc;
1557
1558                 la->la_valid = LA_CTIME;
1559                 rc = mdo_declare_attr_set(env, c, la, handle);
1560                 if (rc)
1561                         return rc;
1562
1563                 rc = mdd_declare_finish_unlink(env, c, handle);
1564                 if (rc)
1565                         return rc;
1566
1567                 /* FIXME: need changelog for remove entry */
1568                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1569         }
1570
1571         return rc;
1572 }
1573
1574 /*
1575  * test if a file has an HSM archive
1576  * if HSM attributes are not found in ma update them from
1577  * HSM xattr
1578  */
1579 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1580                                    struct mdd_object *obj,
1581                                    struct md_attr *ma)
1582 {
1583         ENTRY;
1584
1585         if (!(ma->ma_valid & MA_HSM)) {
1586                 /* no HSM MD provided, read xattr */
1587                 struct lu_buf   *hsm_buf;
1588                 const size_t     buflen = sizeof(struct hsm_attrs);
1589                 int              rc;
1590
1591                 hsm_buf = mdd_buf_get(env, NULL, 0);
1592                 lu_buf_alloc(hsm_buf, buflen);
1593                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1594                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1595                 lu_buf_free(hsm_buf);
1596                 if (rc < 0)
1597                         RETURN(false);
1598
1599                 ma->ma_valid = MA_HSM;
1600         }
1601         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1602                 RETURN(true);
1603         RETURN(false);
1604 }
1605
1606 /**
1607  * Delete name entry and the object.
1608  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1609  * does not exist for this object, and it could only happen during resending
1610  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1611  * is also needed in this case(needed by changelog), so we have to add another
1612  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1613  * the ENOENT failure should be able to be fixed by redo mechanism.
1614  */
1615 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1616                       struct md_object *cobj, const struct lu_name *lname,
1617                       struct md_attr *ma, int no_name)
1618 {
1619         const char *name = lname->ln_name;
1620         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1621         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1622         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1623         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1624         struct mdd_object *mdd_cobj = NULL;
1625         struct mdd_device *mdd = mdo2mdd(pobj);
1626         struct thandle    *handle;
1627         int rc, is_dir = 0;
1628         ENTRY;
1629
1630         /* cobj == NULL means only delete name entry */
1631         if (likely(cobj != NULL)) {
1632                 mdd_cobj = md2mdd_obj(cobj);
1633                 if (mdd_object_exists(mdd_cobj) == 0)
1634                         RETURN(-ENOENT);
1635         }
1636
1637         rc = mdd_la_get(env, mdd_pobj, pattr);
1638         if (rc)
1639                 RETURN(rc);
1640
1641         if (likely(mdd_cobj != NULL)) {
1642                 /* fetch cattr */
1643                 rc = mdd_la_get(env, mdd_cobj, cattr);
1644                 if (rc)
1645                         RETURN(rc);
1646
1647                 is_dir = S_ISDIR(cattr->la_mode);
1648         }
1649
1650         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1651         if (rc)
1652                 RETURN(rc);
1653
1654         handle = mdd_trans_create(env, mdd);
1655         if (IS_ERR(handle))
1656                 RETURN(PTR_ERR(handle));
1657
1658         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1659                                 lname, ma, handle, no_name, is_dir);
1660         if (rc)
1661                 GOTO(stop, rc);
1662
1663         rc = mdd_trans_start(env, mdd, handle);
1664         if (rc)
1665                 GOTO(stop, rc);
1666
1667         if (likely(mdd_cobj != NULL))
1668                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1669
1670         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1671                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1672                 if (rc)
1673                         GOTO(cleanup, rc);
1674         }
1675
1676         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1677             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1678                 GOTO(cleanup, rc = 0);
1679
1680         if (likely(mdd_cobj != NULL)) {
1681                 rc = mdo_ref_del(env, mdd_cobj, handle);
1682                 if (rc != 0) {
1683                         __mdd_index_insert_only(env, mdd_pobj,
1684                                                 mdo2fid(mdd_cobj),
1685                                                 mdd_object_type(mdd_cobj),
1686                                                 name, handle);
1687                         GOTO(cleanup, rc);
1688                 }
1689
1690                 if (is_dir)
1691                         /* unlink dot */
1692                         mdo_ref_del(env, mdd_cobj, handle);
1693
1694                 /* fetch updated nlink */
1695                 rc = mdd_la_get(env, mdd_cobj, cattr);
1696                 if (rc)
1697                         GOTO(cleanup, rc);
1698         }
1699
1700         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1701         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1702
1703         la->la_valid = LA_CTIME | LA_MTIME;
1704         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1705         if (rc)
1706                 GOTO(cleanup, rc);
1707
1708         /* Enough for only unlink the entry */
1709         if (unlikely(mdd_cobj == NULL))
1710                 GOTO(stop, rc);
1711
1712         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1713                 /* update ctime of an unlinked file only if it is still
1714                  * opened or a link still exists */
1715                 la->la_valid = LA_CTIME;
1716                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1717                 if (rc)
1718                         GOTO(cleanup, rc);
1719         }
1720
1721         /* XXX: this transfer to ma will be removed with LOD/OSP */
1722         ma->ma_attr = *cattr;
1723         ma->ma_valid |= MA_INODE;
1724         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1725
1726         /* fetch updated nlink */
1727         if (rc == 0)
1728                 rc = mdd_la_get(env, mdd_cobj, cattr);
1729
1730         /* if object is removed then we can't get its attrs, use last get */
1731         if (cattr->la_nlink == 0) {
1732                 ma->ma_attr = *cattr;
1733                 ma->ma_valid |= MA_INODE;
1734         }
1735         EXIT;
1736 cleanup:
1737         if (likely(mdd_cobj != NULL))
1738                 mdd_write_unlock(env, mdd_cobj);
1739
1740         if (rc == 0) {
1741                 int cl_flags = 0;
1742
1743                 if (cattr->la_nlink == 0) {
1744                         cl_flags |= CLF_UNLINK_LAST;
1745                         /* search for an existing archive */
1746                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1747                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1748                 }
1749
1750                 rc = mdd_changelog_ns_store(env, mdd,
1751                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1752                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1753                         handle);
1754         }
1755
1756 stop:
1757         mdd_trans_stop(env, mdd, rc, handle);
1758
1759         return rc;
1760 }
1761
1762 /*
1763  * The permission has been checked when obj created, no need check again.
1764  */
1765 static int mdd_cd_sanity_check(const struct lu_env *env,
1766                                struct mdd_object *obj)
1767 {
1768         ENTRY;
1769
1770         /* EEXIST check */
1771         if (!obj || mdd_is_dead_obj(obj))
1772                 RETURN(-ENOENT);
1773
1774         RETURN(0);
1775 }
1776
1777 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1778                            struct md_object *cobj, const struct md_op_spec *spec,
1779                            struct md_attr *ma)
1780 {
1781         struct mdd_device *mdd = mdo2mdd(cobj);
1782         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1783         struct mdd_object *son = md2mdd_obj(cobj);
1784         struct thandle    *handle;
1785         const struct lu_buf *buf;
1786         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1787         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1788         int                rc;
1789         ENTRY;
1790
1791         rc = mdd_cd_sanity_check(env, son);
1792         if (rc)
1793                 RETURN(rc);
1794
1795         if (!md_should_create(spec->sp_cr_flags))
1796                 RETURN(0);
1797
1798         /*
1799          * there are following use cases for this function:
1800          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1801          *    striping can be specified or not
1802          * 2) CMD?
1803          */
1804         rc = mdd_la_get(env, son, attr);
1805         if (rc)
1806                 RETURN(rc);
1807
1808         /* calling ->ah_make_hint() is used to transfer information from parent */
1809         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1810
1811         handle = mdd_trans_create(env, mdd);
1812         if (IS_ERR(handle))
1813                 GOTO(out_free, rc = PTR_ERR(handle));
1814
1815         /*
1816          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1817          * Should this be fixed?
1818          */
1819         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1820                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1821                spec->sp_cr_flags, spec->no_create);
1822
1823         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1824                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1825                                         spec->u.sp_ea.eadatalen);
1826         } else {
1827                 buf = &LU_BUF_NULL;
1828         }
1829
1830         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1831                                   XATTR_NAME_LOV, 0, handle);
1832         if (rc)
1833                 GOTO(stop, rc);
1834
1835         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1836         if (rc)
1837                 GOTO(stop, rc);
1838
1839         rc = mdd_trans_start(env, mdd, handle);
1840         if (rc)
1841                 GOTO(stop, rc);
1842
1843         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1844                           0, handle);
1845
1846         if (rc)
1847                 GOTO(stop, rc);
1848
1849         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1850
1851 stop:
1852         mdd_trans_stop(env, mdd, rc, handle);
1853 out_free:
1854         RETURN(rc);
1855 }
1856
1857 static int mdd_declare_object_initialize(const struct lu_env *env,
1858                                          struct mdd_object *parent,
1859                                          struct mdd_object *child,
1860                                          struct lu_attr *attr,
1861                                          struct thandle *handle)
1862 {
1863         int rc;
1864         ENTRY;
1865
1866         /*
1867          * inode mode has been set in creation time, and it's based on umask,
1868          * la_mode and acl, don't set here again! (which will go wrong
1869          * because below function doesn't consider umask).
1870          * I'd suggest set all object attributes in creation time, see above.
1871          */
1872         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1873         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1874         rc = mdo_declare_attr_set(env, child, attr, handle);
1875         attr->la_valid |= LA_MODE | LA_TYPE;
1876         if (rc != 0 || !S_ISDIR(attr->la_mode))
1877                 RETURN(rc);
1878
1879         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1880                                       dot, handle);
1881         if (rc != 0)
1882                 RETURN(rc);
1883
1884         rc = mdo_declare_ref_add(env, child, handle);
1885         if (rc != 0)
1886                 RETURN(rc);
1887
1888         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1889                                       dotdot, handle);
1890
1891         RETURN(rc);
1892 }
1893
1894 static int mdd_object_initialize(const struct lu_env *env,
1895                                  const struct lu_fid *pfid,
1896                                  struct mdd_object *child,
1897                                  struct lu_attr *attr, struct thandle *handle,
1898                                  const struct md_op_spec *spec)
1899 {
1900         int rc = 0;
1901         ENTRY;
1902
1903         if (S_ISDIR(attr->la_mode)) {
1904                 /* Add "." and ".." for newly created dir */
1905                 mdo_ref_add(env, child, handle);
1906                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1907                                              S_IFDIR, dot, handle);
1908                 if (rc == 0)
1909                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1910                                                      dotdot, handle);
1911                 if (rc != 0)
1912                         mdo_ref_del(env, child, handle);
1913         }
1914
1915         RETURN(rc);
1916 }
1917
1918 /**
1919  * This function checks whether it can create a file/dir under the
1920  * directory(@pobj). The directory(@pobj) is not being locked by
1921  * mdd lock.
1922  *
1923  * \param[in] env       execution environment
1924  * \param[in] pobj      the directory to create files
1925  * \param[in] pattr     the attributes of the directory
1926  * \param[in] lname     the name of the created file/dir
1927  * \param[in] cattr     the attributes of the file/dir
1928  * \param[in] spec      create specification
1929  *
1930  * \retval              = 0 it is allowed to create file/dir under
1931  *                      the directory
1932  * \retval              negative error not allowed to create file/dir
1933  *                      under the directory
1934  */
1935 static int mdd_create_sanity_check(const struct lu_env *env,
1936                                    struct md_object *pobj,
1937                                    const struct lu_attr *pattr,
1938                                    const struct lu_name *lname,
1939                                    struct lu_attr *cattr,
1940                                    struct md_op_spec *spec)
1941 {
1942         struct mdd_thread_info *info = mdd_env_info(env);
1943         struct lu_fid     *fid       = &info->mti_fid;
1944         struct mdd_object *obj       = md2mdd_obj(pobj);
1945         struct mdd_device *m         = mdo2mdd(pobj);
1946         bool            check_perm = true;
1947         int rc;
1948         ENTRY;
1949
1950         /* EEXIST check */
1951         if (mdd_is_dead_obj(obj))
1952                 RETURN(-ENOENT);
1953
1954         /*
1955          * In some cases this lookup is not needed - we know before if name
1956          * exists or not because MDT performs lookup for it.
1957          * name length check is done in lookup.
1958          */
1959         if (spec->sp_cr_lookup) {
1960                 /*
1961                  * Check if the name already exist, though it will be checked in
1962                  * _index_insert also, for avoiding rolling back if exists
1963                  * _index_insert.
1964                  */
1965                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1966                                   MAY_WRITE | MAY_EXEC);
1967                 if (rc != -ENOENT)
1968                         RETURN(rc ? : -EEXIST);
1969
1970                 /* Permission is already being checked in mdd_lookup */
1971                 check_perm = false;
1972         }
1973
1974         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1975         if (rc != 0)
1976                 RETURN(rc);
1977
1978         /* sgid check */
1979         if (pattr->la_mode & S_ISGID) {
1980                 cattr->la_gid = pattr->la_gid;
1981                 if (S_ISDIR(cattr->la_mode)) {
1982                         cattr->la_mode |= S_ISGID;
1983                         cattr->la_valid |= LA_MODE;
1984                 }
1985         }
1986
1987         rc = mdd_name_check(m, lname);
1988         if (rc < 0)
1989                 RETURN(rc);
1990
1991         switch (cattr->la_mode & S_IFMT) {
1992         case S_IFLNK: {
1993                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1994
1995                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1996                         RETURN(-ENAMETOOLONG);
1997                 else
1998                         RETURN(0);
1999         }
2000         case S_IFDIR:
2001         case S_IFREG:
2002         case S_IFCHR:
2003         case S_IFBLK:
2004         case S_IFIFO:
2005         case S_IFSOCK:
2006                 rc = 0;
2007                 break;
2008         default:
2009                 rc = -EINVAL;
2010                 break;
2011         }
2012         RETURN(rc);
2013 }
2014
2015 static int mdd_declare_object_create(const struct lu_env *env,
2016                                      struct mdd_device *mdd,
2017                                      struct mdd_object *p, struct mdd_object *c,
2018                                      struct lu_attr *attr,
2019                                      struct thandle *handle,
2020                                      const struct md_op_spec *spec,
2021                                      struct lu_buf *def_acl_buf,
2022                                      struct lu_buf *acl_buf,
2023                                      struct dt_allocation_hint *hint)
2024 {
2025         int rc;
2026
2027         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
2028                                                 hint);
2029         if (rc)
2030                 GOTO(out, rc);
2031
2032 #ifdef CONFIG_FS_POSIX_ACL
2033         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2034                 /* if dir, then can inherit default ACl */
2035                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2036                                            XATTR_NAME_ACL_DEFAULT,
2037                                            0, handle);
2038                 if (rc)
2039                         GOTO(out, rc);
2040         }
2041
2042         if (acl_buf->lb_len > 0) {
2043                 rc = mdo_declare_attr_set(env, c, attr, handle);
2044                 if (rc)
2045                         GOTO(out, rc);
2046
2047                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2048                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2049                 if (rc)
2050                         GOTO(out, rc);
2051         }
2052 #endif
2053         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2054         if (rc)
2055                 GOTO(out, rc);
2056
2057         /* replay case, create LOV EA from client data */
2058         if (spec->no_create ||
2059             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2060                 const struct lu_buf *buf;
2061
2062                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2063                                         spec->u.sp_ea.eadatalen);
2064                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2065                                            handle);
2066                 if (rc)
2067                         GOTO(out, rc);
2068         }
2069
2070         if (S_ISLNK(attr->la_mode)) {
2071                 const char *target_name = spec->u.sp_symname;
2072                 int sym_len = strlen(target_name);
2073                 const struct lu_buf *buf;
2074
2075                 buf = mdd_buf_get_const(env, target_name, sym_len);
2076                 rc = dt_declare_record_write(env, mdd_object_child(c),
2077                                              buf, 0, handle);
2078                 if (rc)
2079                         GOTO(out, rc);
2080         }
2081 out:
2082         return rc;
2083 }
2084
2085 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2086                               struct mdd_object *p, struct mdd_object *c,
2087                               const struct lu_name *name,
2088                               struct lu_attr *attr,
2089                               struct thandle *handle,
2090                               const struct md_op_spec *spec,
2091                               struct linkea_data *ldata,
2092                               struct lu_buf *def_acl_buf,
2093                               struct lu_buf *acl_buf,
2094                               struct dt_allocation_hint *hint)
2095 {
2096         int rc;
2097
2098         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2099                                        def_acl_buf, acl_buf, hint);
2100         if (rc)
2101                 GOTO(out, rc);
2102
2103         if (S_ISDIR(attr->la_mode)) {
2104                 rc = mdo_declare_ref_add(env, p, handle);
2105                 if (rc)
2106                         GOTO(out, rc);
2107         }
2108
2109         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2110                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2111                 if (rc)
2112                         GOTO(out, rc);
2113         } else {
2114                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2115
2116                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2117                                               name->ln_name, handle);
2118                 if (rc != 0)
2119                         return rc;
2120
2121                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2122                 if (rc)
2123                         return rc;
2124
2125                 *la = *attr;
2126                 la->la_valid = LA_CTIME | LA_MTIME;
2127                 rc = mdo_declare_attr_set(env, p, la, handle);
2128                 if (rc)
2129                         return rc;
2130
2131                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2132                 if (rc)
2133                         return rc;
2134         }
2135 out:
2136         return rc;
2137 }
2138
2139 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2140                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2141                         struct lu_buf *acl_buf)
2142 {
2143         int     rc;
2144         ENTRY;
2145
2146         if (S_ISLNK(la->la_mode)) {
2147                 acl_buf->lb_len = 0;
2148                 def_acl_buf->lb_len = 0;
2149                 RETURN(0);
2150         }
2151
2152         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2153         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2154                            XATTR_NAME_ACL_DEFAULT);
2155         mdd_read_unlock(env, pobj);
2156         if (rc > 0) {
2157                 /* If there are default ACL, fix mode/ACL by default ACL */
2158                 def_acl_buf->lb_len = rc;
2159                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2160                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2161                 acl_buf->lb_len = rc;
2162                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2163                 if (rc < 0)
2164                         RETURN(rc);
2165         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2166                 /* If there are no default ACL, fix mode by mask */
2167                 struct lu_ucred *uc = lu_ucred(env);
2168
2169                 /* The create triggered by MDT internal events, such as
2170                  * LFSCK reset, will not contain valid "uc". */
2171                 if (unlikely(uc != NULL))
2172                         la->la_mode &= ~uc->uc_umask;
2173                 rc = 0;
2174                 acl_buf->lb_len = 0;
2175                 def_acl_buf->lb_len = 0;
2176         }
2177
2178         RETURN(rc);
2179 }
2180
2181 /**
2182  * Create a metadata object and initialize it, set acl, xattr.
2183  **/
2184 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2185                              struct mdd_object *son, struct lu_attr *attr,
2186                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2187                              struct lu_buf *def_acl_buf,
2188                              struct dt_allocation_hint *hint,
2189                              struct thandle *handle)
2190 {
2191         int                     rc;
2192
2193         mdd_write_lock(env, son, MOR_TGT_CHILD);
2194         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2195                                         hint);
2196         if (rc)
2197                 GOTO(unlock, rc);
2198
2199         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2200          * created in declare phase, they also needs to be added to master
2201          * object as sub-directory entry. So it has to initialize the master
2202          * object, then set dir striped EA.(in mdo_xattr_set) */
2203         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2204                                    spec);
2205         if (rc != 0)
2206                 GOTO(err_destroy, rc);
2207
2208         /*
2209          * in case of replay we just set LOVEA provided by the client
2210          * XXX: I think it would be interesting to try "old" way where
2211          *      MDT calls this xattr_set(LOV) in a different transaction.
2212          *      probably this way we code can be made better.
2213          */
2214
2215         /* During creation, there are only a few cases we need do xattr_set to
2216          * create stripes.
2217          * 1. regular file: see comments above.
2218          * 2. dir: inherit default striping or pool settings from parent.
2219          * 3. create striped directory with provided stripeEA.
2220          * 4. create striped directory because inherit default layout from the
2221          * parent.
2222          */
2223         if (spec->no_create ||
2224             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2225             S_ISDIR(attr->la_mode)) {
2226                 const struct lu_buf *buf;
2227
2228                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2229                                         spec->u.sp_ea.eadatalen);
2230                 rc = mdo_xattr_set(env, son, buf,
2231                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2232                                                             XATTR_NAME_LOV, 0,
2233                                    handle);
2234                 if (rc != 0)
2235                         GOTO(err_destroy, rc);
2236         }
2237
2238 #ifdef CONFIG_FS_POSIX_ACL
2239         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2240             S_ISDIR(attr->la_mode)) {
2241                 /* set default acl */
2242                 rc = mdo_xattr_set(env, son, def_acl_buf,
2243                                    XATTR_NAME_ACL_DEFAULT, 0,
2244                                    handle);
2245                 if (rc)
2246                         GOTO(err_destroy, rc);
2247         }
2248         /* set its own acl */
2249         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2250                 rc = mdo_xattr_set(env, son, acl_buf,
2251                                    XATTR_NAME_ACL_ACCESS,
2252                                    0, handle);
2253                 if (rc)
2254                         GOTO(err_destroy, rc);
2255         }
2256 #endif
2257
2258         if (S_ISLNK(attr->la_mode)) {
2259                 struct lu_ucred  *uc = lu_ucred_assert(env);
2260                 struct dt_object *dt = mdd_object_child(son);
2261                 const char *target_name = spec->u.sp_symname;
2262                 int sym_len = strlen(target_name);
2263                 const struct lu_buf *buf;
2264                 loff_t pos = 0;
2265
2266                 buf = mdd_buf_get_const(env, target_name, sym_len);
2267                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2268                                                 uc->uc_cap &
2269                                                 CFS_CAP_SYS_RESOURCE_MASK);
2270
2271                 if (rc == sym_len)
2272                         rc = 0;
2273                 else
2274                         GOTO(err_initlized, rc = -EFAULT);
2275         }
2276
2277 err_initlized:
2278         if (unlikely(rc != 0)) {
2279                 int rc2;
2280                 if (S_ISDIR(attr->la_mode)) {
2281                         /* Drop the reference, no need to delete "."/"..",
2282                          * because the object to be destroied directly. */
2283                         rc2 = mdo_ref_del(env, son, handle);
2284                         if (rc2 != 0)
2285                                 GOTO(unlock, rc);
2286                 }
2287                 rc2 = mdo_ref_del(env, son, handle);
2288                 if (rc2 != 0)
2289                         GOTO(unlock, rc);
2290 err_destroy:
2291                 mdo_destroy(env, son, handle);
2292         }
2293 unlock:
2294         mdd_write_unlock(env, son);
2295         RETURN(rc);
2296 }
2297
2298 /*
2299  * Create object and insert it into namespace.
2300  */
2301 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2302                       const struct lu_name *lname, struct md_object *child,
2303                       struct md_op_spec *spec, struct md_attr* ma)
2304 {
2305         struct mdd_thread_info  *info = mdd_env_info(env);
2306         struct lu_attr          *la = &info->mti_la_for_fix;
2307         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2308         struct mdd_object       *son = md2mdd_obj(child);
2309         struct mdd_device       *mdd = mdo2mdd(pobj);
2310         struct lu_attr          *attr = &ma->ma_attr;
2311         struct thandle          *handle;
2312         struct lu_attr          *pattr = &info->mti_pattr;
2313         struct lu_buf           acl_buf;
2314         struct lu_buf           def_acl_buf;
2315         struct linkea_data      *ldata = &info->mti_link_data;
2316         const char              *name = lname->ln_name;
2317         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2318         int                      rc;
2319         int                      rc2;
2320         ENTRY;
2321
2322         /*
2323          * Two operations have to be performed:
2324          *
2325          *  - an allocation of a new object (->do_create()), and
2326          *
2327          *  - an insertion into a parent index (->dio_insert()).
2328          *
2329          * Due to locking, operation order is not important, when both are
2330          * successful, *but* error handling cases are quite different:
2331          *
2332          *  - if insertion is done first, and following object creation fails,
2333          *  insertion has to be rolled back, but this operation might fail
2334          *  also leaving us with dangling index entry.
2335          *
2336          *  - if creation is done first, is has to be undone if insertion
2337          *  fails, leaving us with leaked space, which is neither good, nor
2338          *  fatal.
2339          *
2340          * It seems that creation-first is simplest solution, but it is
2341          * sub-optimal in the frequent
2342          *
2343          *         $ mkdir foo
2344          *         $ mkdir foo
2345          *
2346          * case, because second mkdir is bound to create object, only to
2347          * destroy it immediately.
2348          *
2349          * To avoid this follow local file systems that do double lookup:
2350          *
2351          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2352          *
2353          *     1. create            (mdd_object_create_internal())
2354          *
2355          *     2. insert            (__mdd_index_insert(), lookup again)
2356          */
2357
2358         rc = mdd_la_get(env, mdd_pobj, pattr);
2359         if (rc != 0)
2360                 RETURN(rc);
2361
2362         /* Sanity checks before big job. */
2363         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2364         if (rc)
2365                 RETURN(rc);
2366
2367         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2368                 GOTO(out_free, rc = -EINPROGRESS);
2369
2370         handle = mdd_trans_create(env, mdd);
2371         if (IS_ERR(handle))
2372                 GOTO(out_free, rc = PTR_ERR(handle));
2373
2374         acl_buf.lb_buf = info->mti_xattr_buf;
2375         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2376         def_acl_buf.lb_buf = info->mti_key;
2377         def_acl_buf.lb_len = sizeof(info->mti_key);
2378         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2379         if (rc < 0)
2380                 GOTO(out_stop, rc);
2381
2382         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2383
2384         memset(ldata, 0, sizeof(*ldata));
2385         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2386                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2387
2388                 tfid.f_oid--;
2389                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2390                                         &tfid, lname, 1, 0, ldata);
2391         } else {
2392                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2393                                         mdd_object_fid(mdd_pobj),
2394                                         lname, 1, 0, ldata);
2395         }
2396
2397         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2398                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2399                                 hint);
2400         if (rc)
2401                 GOTO(out_stop, rc);
2402
2403         rc = mdd_trans_start(env, mdd, handle);
2404         if (rc)
2405                 GOTO(out_stop, rc);
2406
2407         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2408                                &def_acl_buf, hint, handle);
2409         if (rc != 0)
2410                 GOTO(out_stop, rc);
2411
2412         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2413                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2414                 rc = __mdd_orphan_add(env, son, handle);
2415                 GOTO(out_volatile, rc);
2416         } else {
2417                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2418                                         attr->la_mode, name, handle);
2419                 if (rc != 0)
2420                         GOTO(err_created, rc);
2421
2422                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2423                               ldata, 1);
2424
2425                 /* update parent directory mtime/ctime */
2426                 *la = *attr;
2427                 la->la_valid = LA_CTIME | LA_MTIME;
2428                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2429                 if (rc)
2430                         GOTO(err_insert, rc);
2431         }
2432
2433         EXIT;
2434 err_insert:
2435         if (rc != 0) {
2436                 int rc2;
2437
2438                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2439                         rc2 = __mdd_orphan_del(env, son, handle);
2440                 else
2441                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2442                                                  S_ISDIR(attr->la_mode),
2443                                                  handle);
2444                 if (rc2 != 0)
2445                         goto out_stop;
2446
2447 err_created:
2448                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2449                 if (S_ISDIR(attr->la_mode)) {
2450                         /* Drop the reference, no need to delete "."/"..",
2451                          * because the object is to be destroyed directly. */
2452                         rc2 = mdo_ref_del(env, son, handle);
2453                         if (rc2 != 0) {
2454                                 mdd_write_unlock(env, son);
2455                                 goto out_stop;
2456                         }
2457                 }
2458 out_volatile:
2459                 /* For volatile files drop one link immediately, since there is
2460                  * no filename in the namespace, and save any error returned. */
2461                 rc2 = mdo_ref_del(env, son, handle);
2462                 if (rc2 != 0) {
2463                         mdd_write_unlock(env, son);
2464                         if (unlikely(rc == 0))
2465                                 rc = rc2;
2466                         goto out_stop;
2467                 }
2468
2469                 /* Don't destroy the volatile object on success */
2470                 if (likely(rc != 0))
2471                         mdo_destroy(env, son, handle);
2472                 mdd_write_unlock(env, son);
2473         }
2474
2475         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2476             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2477                 rc = mdd_changelog_ns_store(env, mdd,
2478                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2479                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2480                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2481                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2482                                 NULL, handle);
2483 out_stop:
2484         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2485         if (rc == 0)
2486                 rc = rc2;
2487 out_free:
2488         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2489                 /* if we vmalloced a large buffer drop it */
2490                 lu_buf_free(ldata->ld_buf);
2491
2492         /* The child object shouldn't be cached anymore */
2493         if (rc)
2494                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2495                         &child->mo_lu.lo_header->loh_flags);
2496         return rc;
2497 }
2498
2499 /*
2500  * Get locks on parents in proper order
2501  * RETURN: < 0 - error, rename_order if successful
2502  */
2503 enum rename_order {
2504         MDD_RN_SAME,
2505         MDD_RN_SRCTGT,
2506         MDD_RN_TGTSRC
2507 };
2508
2509 static int mdd_rename_order(const struct lu_env *env,
2510                             struct mdd_device *mdd,
2511                             struct mdd_object *src_pobj,
2512                             const struct lu_attr *pattr,
2513                             struct mdd_object *tgt_pobj)
2514 {
2515         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2516         int rc;
2517         ENTRY;
2518
2519         if (src_pobj == tgt_pobj)
2520                 RETURN(MDD_RN_SAME);
2521
2522         /* compared the parent child relationship of src_p&tgt_p */
2523         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2524                 rc = MDD_RN_SRCTGT;
2525         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2526                 rc = MDD_RN_TGTSRC;
2527         } else {
2528                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2529                                    NULL);
2530                 if (rc == -EREMOTE)
2531                         rc = 0;
2532
2533                 if (rc == 1)
2534                         rc = MDD_RN_TGTSRC;
2535                 else if (rc == 0)
2536                         rc = MDD_RN_SRCTGT;
2537         }
2538
2539         RETURN(rc);
2540 }
2541
2542 /* has not mdd_write{read}_lock on any obj yet. */
2543 static int mdd_rename_sanity_check(const struct lu_env *env,
2544                                    struct mdd_object *src_pobj,
2545                                    const struct lu_attr *pattr,
2546                                    struct mdd_object *tgt_pobj,
2547                                    const struct lu_attr *tpattr,
2548                                    struct mdd_object *sobj,
2549                                    const struct lu_attr *cattr,
2550                                    struct mdd_object *tobj,
2551                                    const struct lu_attr *tattr)
2552 {
2553         int rc = 0;
2554         ENTRY;
2555
2556         /* XXX: when get here, sobj must NOT be NULL,
2557          * the other case has been processed in cld_rename
2558          * before mdd_rename and enable MDS_PERM_BYPASS. */
2559         LASSERT(sobj);
2560
2561         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2562         if (rc)
2563                 RETURN(rc);
2564
2565         /* XXX: when get here, "tobj == NULL" means tobj must
2566          * NOT exist (neither on remote MDS, such case has been
2567          * processed in cld_rename before mdd_rename and enable
2568          * MDS_PERM_BYPASS).
2569          * So check may_create, but not check may_unlink. */
2570         if (tobj == NULL)
2571                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2572                                     (src_pobj != tgt_pobj));
2573         else
2574                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2575                                     (src_pobj != tgt_pobj), 1);
2576
2577         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2578                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2579
2580         RETURN(rc);
2581 }
2582
2583 static int mdd_declare_rename(const struct lu_env *env,
2584                               struct mdd_device *mdd,
2585                               struct mdd_object *mdd_spobj,
2586                               struct mdd_object *mdd_tpobj,
2587                               struct mdd_object *mdd_sobj,
2588                               struct mdd_object *mdd_tobj,
2589                               const struct lu_name *tname,
2590                               const struct lu_name *sname,
2591                               struct md_attr *ma,
2592                               struct linkea_data *ldata,
2593                               struct thandle *handle)
2594 {
2595         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2596         int rc;
2597
2598         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2599         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2600
2601         LASSERT(mdd_spobj);
2602         LASSERT(mdd_tpobj);
2603         LASSERT(mdd_sobj);
2604
2605         /* name from source dir */
2606         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2607         if (rc)
2608                 return rc;
2609
2610         /* .. from source child */
2611         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2612                 /* source child can be directory,
2613                  * counted by source dir's nlink */
2614                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2615                 if (rc)
2616                         return rc;
2617                 if (mdd_spobj != mdd_tpobj) {
2618                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2619                                                       handle);
2620                         if (rc != 0)
2621                                 return rc;
2622
2623                         rc = mdo_declare_index_insert(env, mdd_sobj,
2624                                                       mdo2fid(mdd_tpobj),
2625                                                       S_IFDIR, dotdot, handle);
2626                         if (rc != 0)
2627                                 return rc;
2628                 }
2629
2630                 /* new target child can be directory,
2631                  * counted by target dir's nlink */
2632                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2633                 if (rc != 0)
2634                         return rc;
2635         }
2636
2637         la->la_valid = LA_CTIME | LA_MTIME;
2638         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2639         if (rc != 0)
2640                 return rc;
2641
2642         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2643         if (rc != 0)
2644                 return rc;
2645
2646         la->la_valid = LA_CTIME;
2647         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2648         if (rc)
2649                 return rc;
2650
2651         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2652                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2653         if (rc)
2654                 return rc;
2655
2656         /* new name */
2657         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2658                                       mdd_object_type(mdd_sobj),
2659                                       tname->ln_name, handle);
2660         if (rc != 0)
2661                 return rc;
2662
2663         /* name from target dir (old name), we declare it unconditionally
2664          * as mdd_rename() calls delete unconditionally as well. so just
2665          * to balance declarations vs calls to change ... */
2666         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2667         if (rc)
2668                 return rc;
2669
2670         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2671                 /* delete target child in target parent directory */
2672                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2673                 if (rc)
2674                         return rc;
2675
2676                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2677                         /* target child can be directory,
2678                          * delete "." reference in target child directory */
2679                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2680                         if (rc)
2681                                 return rc;
2682
2683                         /* delete ".." reference in target parent directory */
2684                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2685                         if (rc)
2686                                 return rc;
2687                 }
2688
2689                 la->la_valid = LA_CTIME;
2690                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2691                 if (rc)
2692                         return rc;
2693
2694                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2695                 if (rc)
2696                         return rc;
2697         }
2698
2699         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2700         if (rc)
2701                 return rc;
2702
2703         return rc;
2704 }
2705
2706 /* src object can be remote that is why we use only fid and type of object */
2707 static int mdd_rename(const struct lu_env *env,
2708                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2709                       const struct lu_fid *lf, const struct lu_name *lsname,
2710                       struct md_object *tobj, const struct lu_name *ltname,
2711                       struct md_attr *ma)
2712 {
2713         const char *sname = lsname->ln_name;
2714         const char *tname = ltname->ln_name;
2715         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2716         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2717         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2718         struct mdd_device *mdd = mdo2mdd(src_pobj);
2719         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2720         struct mdd_object *mdd_tobj = NULL;
2721         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2722         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2723         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2724         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2725         struct thandle *handle;
2726         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2727         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2728         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2729         bool is_dir;
2730         bool tobj_ref = 0;
2731         bool tobj_locked = 0;
2732         unsigned cl_flags = 0;
2733         int rc, rc2;
2734         ENTRY;
2735
2736         if (tobj)
2737                 mdd_tobj = md2mdd_obj(tobj);
2738
2739         mdd_sobj = mdd_object_find(env, mdd, lf);
2740
2741         rc = mdd_la_get(env, mdd_sobj, cattr);
2742         if (rc)
2743                 GOTO(out_pending, rc);
2744
2745         rc = mdd_la_get(env, mdd_spobj, pattr);
2746         if (rc)
2747                 GOTO(out_pending, rc);
2748
2749         if (mdd_tobj) {
2750                 rc = mdd_la_get(env, mdd_tobj, tattr);
2751                 if (rc)
2752                         GOTO(out_pending, rc);
2753         }
2754
2755         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2756         if (rc)
2757                 GOTO(out_pending, rc);
2758
2759         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2760                                      mdd_sobj, cattr, mdd_tobj, tattr);
2761         if (rc)
2762                 GOTO(out_pending, rc);
2763
2764         rc = mdd_name_check(mdd, ltname);
2765         if (rc < 0)
2766                 GOTO(out_pending, rc);
2767
2768         handle = mdd_trans_create(env, mdd);
2769         if (IS_ERR(handle))
2770                 GOTO(out_pending, rc = PTR_ERR(handle));
2771
2772         memset(ldata, 0, sizeof(*ldata));
2773         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2774                            ltname, 1, 0, ldata);
2775         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2776                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2777         if (rc)
2778                 GOTO(stop, rc);
2779
2780         rc = mdd_trans_start(env, mdd, handle);
2781         if (rc)
2782                 GOTO(stop, rc);
2783
2784         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2785         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2786         if (rc < 0)
2787                 GOTO(cleanup_unlocked, rc);
2788
2789         is_dir = S_ISDIR(cattr->la_mode);
2790
2791         /* Remove source name from source directory */
2792         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2793         if (rc)
2794                 GOTO(cleanup, rc);
2795
2796         /* "mv dir1 dir2" needs "dir1/.." link update */
2797         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2798                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2799                 if (rc != 0)
2800                         GOTO(fixup_spobj2, rc);
2801
2802                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2803                                              dotdot, handle);
2804                 if (rc != 0)
2805                         GOTO(fixup_spobj, rc);
2806         }
2807
2808         /* Remove target name from target directory
2809          * Here tobj can be remote one, so we do index_delete unconditionally
2810          * and -ENOENT is allowed.
2811          */
2812         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2813         if (rc != 0) {
2814                 if (mdd_tobj) {
2815                         /* tname might been renamed to something else */
2816                         GOTO(fixup_spobj, rc);
2817                 }
2818                 if (rc != -ENOENT)
2819                         GOTO(fixup_spobj, rc);
2820         }
2821
2822         /* Insert new fid with target name into target dir */
2823         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2824                                 tname, handle);
2825         if (rc != 0)
2826                 GOTO(fixup_tpobj, rc);
2827
2828         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2829         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2830
2831         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2832         if (mdd_sobj) {
2833                 la->la_valid = LA_CTIME;
2834                 rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2835                 if (rc)
2836                         GOTO(fixup_tpobj, rc);
2837         }
2838
2839         /* Remove old target object
2840          * For tobj is remote case cmm layer has processed
2841          * and set tobj to NULL then. So when tobj is NOT NULL,
2842          * it must be local one.
2843          */
2844         if (tobj && mdd_object_exists(mdd_tobj)) {
2845                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2846                 tobj_locked = 1;
2847                 if (mdd_is_dead_obj(mdd_tobj)) {
2848                         /* shld not be dead, something is wrong */
2849                         CERROR("tobj is dead, something is wrong\n");
2850                         rc = -EINVAL;
2851                         goto cleanup;
2852                 }
2853                 mdo_ref_del(env, mdd_tobj, handle);
2854
2855                 /* Remove dot reference. */
2856                 if (S_ISDIR(tattr->la_mode))
2857                         mdo_ref_del(env, mdd_tobj, handle);
2858                 tobj_ref = 1;
2859
2860                 /* fetch updated nlink */
2861                 rc = mdd_la_get(env, mdd_tobj, tattr);
2862                 if (rc != 0) {
2863                         CERROR("%s: Failed to get nlink for tobj "
2864                                 DFID": rc = %d\n",
2865                                 mdd2obd_dev(mdd)->obd_name,
2866                                 PFID(tpobj_fid), rc);
2867                         GOTO(fixup_tpobj, rc);
2868                 }
2869
2870                 la->la_valid = LA_CTIME;
2871                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2872                 if (rc != 0) {
2873                         CERROR("%s: Failed to set ctime for tobj "
2874                                 DFID": rc = %d\n",
2875                                 mdd2obd_dev(mdd)->obd_name,
2876                                 PFID(tpobj_fid), rc);
2877                         GOTO(fixup_tpobj, rc);
2878                 }
2879
2880                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2881                 ma->ma_attr = *tattr;
2882                 ma->ma_valid |= MA_INODE;
2883                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2884                                        handle);
2885                 if (rc != 0) {
2886                         CERROR("%s: Failed to unlink tobj "
2887                                 DFID": rc = %d\n",
2888                                 mdd2obd_dev(mdd)->obd_name,
2889                                 PFID(tpobj_fid), rc);
2890                         GOTO(fixup_tpobj, rc);
2891                 }
2892
2893                 /* fetch updated nlink */
2894                 rc = mdd_la_get(env, mdd_tobj, tattr);
2895                 if (rc != 0) {
2896                         CERROR("%s: Failed to get nlink for tobj "
2897                                 DFID": rc = %d\n",
2898                                 mdd2obd_dev(mdd)->obd_name,
2899                                 PFID(tpobj_fid), rc);
2900                         GOTO(fixup_tpobj, rc);
2901                 }
2902                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2903                 ma->ma_attr = *tattr;
2904                 ma->ma_valid |= MA_INODE;
2905
2906                 if (tattr->la_nlink == 0) {
2907                         cl_flags |= CLF_RENAME_LAST;
2908                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2909                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2910                 }
2911         }
2912
2913         la->la_valid = LA_CTIME | LA_MTIME;
2914         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2915         if (rc)
2916                 GOTO(fixup_tpobj, rc);
2917
2918         if (mdd_spobj != mdd_tpobj) {
2919                 la->la_valid = LA_CTIME | LA_MTIME;
2920                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2921         }
2922
2923         if (rc == 0 && mdd_sobj) {
2924                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2925                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2926                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2927                                       0, 0);
2928                 if (rc == -ENOENT)
2929                         /* Old files might not have EA entry */
2930                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2931                                       lsname, handle, NULL, 0);
2932                 mdd_write_unlock(env, mdd_sobj);
2933                 /* We don't fail the transaction if the link ea can't be
2934                    updated -- fid2path will use alternate lookup method. */
2935                 rc = 0;
2936         }
2937
2938         EXIT;
2939
2940 fixup_tpobj:
2941         if (rc) {
2942                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2943                 if (rc2)
2944                         CWARN("tp obj fix error %d\n",rc2);
2945
2946                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2947                     !mdd_is_dead_obj(mdd_tobj)) {
2948                         if (tobj_ref) {
2949                                 mdo_ref_add(env, mdd_tobj, handle);
2950                                 if (is_dir)
2951                                         mdo_ref_add(env, mdd_tobj, handle);
2952                         }
2953
2954                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2955                                                   mdo2fid(mdd_tobj),
2956                                                   mdd_object_type(mdd_tobj),
2957                                                   tname, handle);
2958                         if (rc2 != 0)
2959                                 CWARN("tp obj fix error: rc = %d\n", rc2);
2960                 }
2961         }
2962
2963 fixup_spobj:
2964         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2965                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2966                 if (rc2)
2967                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2968                                mdd2obd_dev(mdd)->obd_name, rc2);
2969
2970
2971                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
2972                                               dotdot, handle);
2973                 if (rc2 != 0)
2974                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2975                               mdd2obd_dev(mdd)->obd_name, rc2);
2976         }
2977
2978 fixup_spobj2:
2979         if (rc != 0) {
2980                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
2981                                          mdd_object_type(mdd_sobj), sname,
2982                                          handle);
2983                 if (rc2 != 0)
2984                         CWARN("sp obj fix error: rc = %d\n", rc2);
2985         }
2986
2987 cleanup:
2988         if (tobj_locked)
2989                 mdd_write_unlock(env, mdd_tobj);
2990
2991 cleanup_unlocked:
2992         if (rc == 0)
2993                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
2994                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
2995                                             ltname, lsname, handle);
2996
2997 stop:
2998         mdd_trans_stop(env, mdd, rc, handle);
2999
3000 out_pending:
3001         mdd_object_put(env, mdd_sobj);
3002         return rc;
3003 }
3004
3005 /**
3006  * During migration once the parent FID has been changed,
3007  * we need update the parent FID in linkea.
3008  **/
3009 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3010                                             struct mdd_object *parent,
3011                                             struct mdd_object *child,
3012                                             const char *name, int namelen,
3013                                             struct thandle *handle,
3014                                             bool declare)
3015 {
3016         struct mdd_thread_info  *info = mdd_env_info(env);
3017         struct linkea_data      ldata = { NULL };
3018         struct lu_buf           *buf = &info->mti_link_buf;
3019         int                     count;
3020         int                     rc = 0;
3021
3022         ENTRY;
3023
3024         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3025         if (buf->lb_buf == NULL)
3026                 RETURN(-ENOMEM);
3027
3028         ldata.ld_buf = buf;
3029         rc = mdd_links_read(env, child, &ldata);
3030         if (rc != 0) {
3031                 if (rc == -ENOENT || rc == -ENODATA)
3032                         rc = 0;
3033                 RETURN(rc);
3034         }
3035
3036         LASSERT(ldata.ld_leh != NULL);
3037         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3038         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3039                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3040                 struct lu_name lname;
3041                 struct lu_fid  fid;
3042
3043                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3044                                     &lname, &fid);
3045
3046                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3047                     lu_fid_eq(&fid, mdd_object_fid(parent))) {
3048                         ldata.ld_lee = (struct link_ea_entry *)
3049                                        ((char *)ldata.ld_lee +
3050                                         ldata.ld_reclen);
3051                         continue;
3052                 }
3053
3054                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3055                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3056                        lname.ln_namelen, lname.ln_name,
3057                        PFID(mdd_object_fid(parent)));
3058                 /* update to the new parent fid */
3059                 linkea_entry_pack(ldata.ld_lee, &lname,
3060                                   mdd_object_fid(parent));
3061                 if (declare)
3062                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3063                                                    MLAO_IGNORE);
3064                 else
3065                         rc = mdd_links_write(env, child, &ldata, handle);
3066                 break;
3067         }
3068         RETURN(rc);
3069 }
3070
3071 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3072                                            struct mdd_object *parent,
3073                                            struct mdd_object *child,
3074                                            const char *name, int namelen,
3075                                            struct thandle *handle)
3076 {
3077         return mdd_linkea_update_child_internal(env, parent, child, name,
3078                                                 namelen, handle, true);
3079 }
3080
3081 static int mdd_linkea_update_child(const struct lu_env *env,
3082                                    struct mdd_object *parent,
3083                                    struct mdd_object *child,
3084                                    const char *name, int namelen,
3085                                    struct thandle *handle)
3086 {
3087         return mdd_linkea_update_child_internal(env, parent, child, name,
3088                                                 namelen, handle, false);
3089 }
3090
3091 static int mdd_update_linkea_internal(const struct lu_env *env,
3092                                       struct mdd_object *mdd_pobj,
3093                                       struct mdd_object *mdd_sobj,
3094                                       struct mdd_object *mdd_tobj,
3095                                       const struct lu_name *child_name,
3096                                       struct thandle *handle,
3097                                       int declare)
3098 {
3099         struct mdd_thread_info  *info = mdd_env_info(env);
3100         struct linkea_data      *ldata = &info->mti_link_data;
3101         int                     count;
3102         int                     rc = 0;
3103         ENTRY;
3104
3105         rc = mdd_links_read(env, mdd_sobj, ldata);
3106         if (rc != 0) {
3107                 if (rc == -ENOENT || rc == -ENODATA)
3108                         rc = 0;
3109                 RETURN(rc);
3110         }
3111
3112         /* If it is mulitple links file, we need update the name entry for
3113          * all parent */
3114         LASSERT(ldata->ld_leh != NULL);
3115         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3116         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3117                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3118                 struct mdd_object       *pobj;
3119                 struct lu_name          lname;
3120                 struct lu_fid           fid;
3121
3122                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3123                                     &lname, &fid);
3124                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3125                                                          ldata->ld_reclen);
3126                 pobj = mdd_object_find(env, mdd, &fid);
3127                 if (IS_ERR(pobj)) {
3128                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3129                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3130                               PTR_ERR(pobj));
3131                         continue;
3132                 }
3133
3134                 if (!mdd_object_exists(pobj)) {
3135                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3136                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3137                         GOTO(next_put, rc);
3138                 }
3139
3140                 if (pobj == mdd_pobj &&
3141                     lname.ln_namelen == child_name->ln_namelen &&
3142                     strncmp(lname.ln_name, child_name->ln_name,
3143                             lname.ln_namelen) == 0) {
3144                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3145                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3146                               PFID(&fid));
3147                         GOTO(next_put, rc);
3148                 }
3149
3150                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3151                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3152                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3153
3154                 if (declare) {
3155                         /* Remove source name from source directory */
3156                         /* Insert new fid with target name into target dir */
3157                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3158                                                       handle);
3159                         if (rc != 0)
3160                                 GOTO(next_put, rc);
3161
3162                         rc = mdo_declare_index_insert(env, pobj,
3163                                         mdd_object_fid(mdd_tobj),
3164                                         mdd_object_type(mdd_tobj),
3165                                         lname.ln_name, handle);
3166                         if (rc != 0)
3167                                 GOTO(next_put, rc);
3168
3169                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3170                         if (rc)
3171                                 GOTO(next_put, rc);
3172
3173                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3174                         if (rc)
3175                                 GOTO(next_put, rc);
3176                 } else {
3177                         rc = __mdd_index_delete(env, pobj, lname.ln_name,
3178                                                 0, handle);
3179                         if (rc)
3180                                 GOTO(next_put, rc);
3181
3182                         rc = __mdd_index_insert(env, pobj,
3183                                         mdd_object_fid(mdd_tobj),
3184                                         mdd_object_type(mdd_tobj),
3185                                         lname.ln_name, handle);
3186                         if (rc != 0)
3187                                 GOTO(next_put, rc);
3188
3189                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3190                         rc = mdo_ref_add(env, mdd_tobj, handle);
3191                         mdd_write_unlock(env, mdd_tobj);
3192                         if (rc)
3193                                 GOTO(next_put, rc);
3194
3195                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3196                         mdo_ref_del(env, mdd_sobj, handle);
3197                         mdd_write_unlock(env, mdd_sobj);
3198                 }
3199 next_put:
3200                 mdd_object_put(env, pobj);
3201                 if (rc != 0)
3202                         break;
3203         }
3204
3205         RETURN(rc);
3206 }
3207
3208 static int mdd_migrate_xattrs(const struct lu_env *env,
3209                               struct mdd_object *mdd_sobj,
3210                               struct mdd_object *mdd_tobj)
3211 {
3212         struct mdd_thread_info  *info = mdd_env_info(env);
3213         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3214         char                    *xname;
3215         struct thandle          *handle;
3216         struct lu_buf           xbuf;
3217         int                     xlen;
3218         int                     rem;
3219         int                     xsize;
3220         int                     list_xsize;
3221         struct lu_buf           list_xbuf;
3222         int                     rc;
3223
3224         /* retrieve xattr list from the old object */
3225         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3226         if (list_xsize == -ENODATA)
3227                 return 0;
3228
3229         if (list_xsize < 0)
3230                 return list_xsize;
3231
3232         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3233         if (info->mti_big_buf.lb_buf == NULL)
3234                 return -ENOMEM;
3235
3236         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3237         list_xbuf.lb_len = list_xsize;
3238         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3239         if (rc < 0)
3240                 return rc;
3241         rc = 0;
3242         rem = list_xsize;
3243         xname = list_xbuf.lb_buf;
3244         while (rem > 0) {
3245                 xlen = strnlen(xname, rem - 1) + 1;
3246                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3247                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3248                     strcmp(XATTR_NAME_LMV, xname) == 0)
3249                         goto next;
3250
3251                 /* For directory, if there are default layout, migrate here */
3252                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3253                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3254                         goto next;
3255
3256                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3257                 if (xsize == -ENODATA)
3258                         goto next;
3259                 if (xsize < 0)
3260                         GOTO(out, rc);
3261
3262                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3263                 if (info->mti_link_buf.lb_buf == NULL)
3264                         GOTO(out, rc = -ENOMEM);
3265
3266                 xbuf.lb_len = xsize;
3267                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3268                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3269                 if (rc == -ENODATA)
3270                         goto next;
3271                 if (rc < 0)
3272                         GOTO(out, rc);
3273
3274                 handle = mdd_trans_create(env, mdd);
3275                 if (IS_ERR(handle))
3276                         GOTO(out, rc = PTR_ERR(handle));
3277
3278                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3279                                            handle);
3280                 if (rc != 0)
3281                         GOTO(stop_trans, rc);
3282                 /* Note: this transaction is part of migration, and it is not
3283                  * the last step of migration, so we set th_local = 1 to avoid
3284                  * update last rcvd for this transaction */
3285                 handle->th_local = 1;
3286                 rc = mdd_trans_start(env, mdd, handle);
3287                 if (rc != 0)
3288                         GOTO(stop_trans, rc);
3289
3290                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3291                 if (rc == -EEXIST)
3292                         GOTO(stop_trans, rc = 0);
3293
3294                 if (rc != 0)
3295                         GOTO(stop_trans, rc);
3296 stop_trans:
3297                 mdd_trans_stop(env, mdd, rc, handle);
3298                 if (rc != 0)
3299                         GOTO(out, rc);
3300 next:
3301                 rem -= xlen;
3302                 memmove(xname, xname + xlen, rem);
3303         }
3304 out:
3305         return rc;
3306 }
3307
3308 static int mdd_declare_migrate_create(const struct lu_env *env,
3309                                       struct mdd_object *mdd_pobj,
3310                                       struct mdd_object *mdd_sobj,
3311                                       struct mdd_object *mdd_tobj,
3312                                       struct md_op_spec *spec,
3313                                       struct lu_attr *la,
3314                                       union lmv_mds_md *mgr_ea,
3315                                       struct linkea_data *ldata,
3316                                       struct thandle *handle)
3317 {
3318         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3319         const struct lu_buf     *buf;
3320         int                     rc;
3321         int                     mgr_easize;
3322
3323         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3324                                                 handle, spec, NULL);
3325         if (rc != 0)
3326                 return rc;
3327
3328         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3329                                            handle);
3330         if (rc != 0)
3331                 return rc;
3332
3333         if (S_ISLNK(la->la_mode)) {
3334                 const char *target_name = spec->u.sp_symname;
3335                 int sym_len = strlen(target_name);
3336                 const struct lu_buf *buf;
3337
3338                 buf = mdd_buf_get_const(env, target_name, sym_len);
3339                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3340                                              buf, 0, handle);
3341                 if (rc != 0)
3342                         return rc;
3343         } else if (S_ISDIR(la->la_mode)) {
3344                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3345                                            MLAO_IGNORE);
3346                 if (rc != 0)
3347                         return rc;
3348         }
3349
3350         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3351                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3352                                         spec->u.sp_ea.eadatalen);
3353                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3354                                            0, handle);
3355                 if (rc)
3356                         return rc;
3357         }
3358
3359         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3360         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3361         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3362                                    0, handle);
3363         if (rc)
3364                 return rc;
3365
3366         la_flag->la_valid = LA_FLAGS;
3367         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3368         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3369
3370         return rc;
3371 }
3372
3373 static int mdd_migrate_create(const struct lu_env *env,
3374                               struct mdd_object *mdd_pobj,
3375                               struct mdd_object *mdd_sobj,
3376                               struct mdd_object *mdd_tobj,
3377                               const struct lu_name *lname,
3378                               struct lu_attr *la)
3379 {
3380         struct mdd_thread_info  *info = mdd_env_info(env);
3381         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3382         struct md_op_spec       *spec = &info->mti_spec;
3383         struct lu_buf           lmm_buf = { NULL };
3384         struct lu_buf           link_buf = { NULL };
3385         const struct lu_buf     *buf;
3386         struct thandle          *handle;
3387         struct lmv_mds_md_v1    *mgr_ea;
3388         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3389         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3390         int                     mgr_easize;
3391         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3392         int                     rc;
3393         ENTRY;
3394
3395         /* prepare spec for create */
3396         memset(spec, 0, sizeof(*spec));
3397         spec->sp_cr_lookup = 0;
3398         spec->sp_feat = &dt_directory_features;
3399         if (S_ISLNK(la->la_mode)) {
3400                 buf = lu_buf_check_and_alloc(
3401                                 &mdd_env_info(env)->mti_big_buf,
3402                                 la->la_size + 1);
3403                 link_buf = *buf;
3404                 link_buf.lb_len = la->la_size + 1;
3405                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3406                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3407                 if (rc <= 0) {
3408                         rc = rc != 0 ? rc : -EFAULT;
3409                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3410                                mdd2obd_dev(mdd)->obd_name,
3411                                PFID(mdd_object_fid(mdd_sobj)), rc);
3412                         RETURN(rc);
3413                 }
3414                 spec->u.sp_symname = link_buf.lb_buf;
3415         } else if S_ISREG(la->la_mode) {
3416                 /* retrieve lov of the old object */
3417                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3418                 if (rc != 0 && rc != -ENODATA)
3419                         RETURN(rc);
3420                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3421                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3422                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3423                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3424                 }
3425         } else if (S_ISDIR(la->la_mode)) {
3426                 rc = mdd_links_read(env, mdd_sobj, ldata);
3427                 if (rc < 0 && rc != -ENODATA)
3428                         RETURN(rc);
3429         }
3430
3431         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3432         memset(mgr_ea, 0, sizeof(*mgr_ea));
3433         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3434         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3435         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3436         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3437         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3438         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3439
3440         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3441
3442         handle = mdd_trans_create(env, mdd);
3443         if (IS_ERR(handle))
3444                 GOTO(out_free, rc = PTR_ERR(handle));
3445
3446         /* Note: this transaction is part of migration, and it is not
3447          * the last step of migration, so we set th_local = 1 to avoid
3448          * update last rcvd for this transaction */
3449         handle->th_local = 1;
3450         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3451                                         spec, la,
3452                                         (union lmv_mds_md *)info->mti_xattr_buf,
3453                                         ldata, handle);
3454         if (rc != 0)
3455                 GOTO(stop_trans, rc);
3456
3457         rc = mdd_trans_start(env, mdd, handle);
3458         if (rc != 0)
3459                 GOTO(stop_trans, rc);
3460
3461         /* create the target object */
3462         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3463                                hint, handle);
3464         if (rc != 0)
3465                 GOTO(stop_trans, rc);
3466
3467         if (S_ISDIR(la->la_mode)) {
3468                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3469                 if (rc != 0)
3470                         GOTO(stop_trans, rc);
3471         }
3472
3473         /* Set MIGRATE EA on the source inode, so once the migration needs
3474          * to be re-done during failover, the re-do process can locate the
3475          * target object which is already being created. */
3476         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3477         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3478         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
3479         if (rc != 0)
3480                 GOTO(stop_trans, rc);
3481
3482         /* Set immutable flag, so any modification is disabled until
3483          * the migration is done. Once the migration is interrupted,
3484          * if the resume process find the migrating object has both
3485          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3486          * flag and approve the migration */
3487         la_flag->la_valid = LA_FLAGS;
3488         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3489         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3490 stop_trans:
3491         if (handle != NULL)
3492                 mdd_trans_stop(env, mdd, rc, handle);
3493 out_free:
3494         if (lmm_buf.lb_buf != NULL)
3495                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3496         RETURN(rc);
3497 }
3498
3499 static int mdd_migrate_entries(const struct lu_env *env,
3500                                struct mdd_object *mdd_sobj,
3501                                struct mdd_object *mdd_tobj)
3502 {
3503         struct dt_object        *next = mdd_object_child(mdd_sobj);
3504         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3505         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3506         struct thandle          *handle;
3507         struct dt_it            *it;
3508         const struct dt_it_ops  *iops;
3509         int                      rc;
3510         int                      result;
3511         struct lu_dirent        *ent;
3512         ENTRY;
3513
3514         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3515         if (ent == NULL)
3516                 RETURN(-ENOMEM);
3517
3518         if (!dt_try_as_dir(env, next))
3519                 GOTO(out_ent, rc = -ENOTDIR);
3520         /*
3521          * iterate directories
3522          */
3523         iops = &next->do_index_ops->dio_it;
3524         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3525         if (IS_ERR(it))
3526                 GOTO(out_ent, rc = PTR_ERR(it));
3527
3528         rc = iops->load(env, it, 0);
3529         if (rc == 0)
3530                 rc = iops->next(env, it);
3531         else if (rc > 0)
3532                 rc = 0;
3533         /*
3534          * At this point and across for-loop:
3535          *
3536          *  rc == 0 -> ok, proceed.
3537          *  rc >  0 -> end of directory.
3538          *  rc <  0 -> error.
3539          */
3540         do {
3541                 struct mdd_object       *child;
3542                 char                    *name = mdd_env_info(env)->mti_key;
3543                 int                     len;
3544                 int                     recsize;
3545                 int                     is_dir;
3546                 bool                    target_exist = false;
3547
3548                 len = iops->key_size(env, it);
3549                 if (len == 0)
3550                         goto next;
3551
3552                 result = iops->rec(env, it, (struct dt_rec *)ent,
3553                                    LUDA_FID | LUDA_TYPE);
3554                 if (result == -ESTALE)
3555                         goto next;
3556                 if (result != 0) {
3557                         rc = result;
3558                         goto out;
3559                 }
3560
3561                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3562                 recsize = le16_to_cpu(ent->lde_reclen);
3563
3564                 /* Insert new fid with target name into target dir */
3565                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3566                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3567                      ent->lde_name[1] == '.'))
3568                         goto next;
3569
3570                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3571                 if (IS_ERR(child))
3572                         GOTO(out, rc = PTR_ERR(child));
3573
3574                 is_dir = S_ISDIR(mdd_object_type(child));
3575
3576                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3577
3578                 /* Check whether the name has been inserted to the target */
3579                 if (dt_try_as_dir(env, dt_tobj)) {
3580                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3581
3582                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3583                                        (struct dt_key *)name);
3584                         if (unlikely(rc == 0))
3585                                 target_exist = true;
3586                 }
3587
3588                 handle = mdd_trans_create(env, mdd);
3589                 if (IS_ERR(handle))
3590                         GOTO(out, rc = PTR_ERR(handle));
3591
3592                 /* Note: this transaction is part of migration, and it is not
3593                  * the last step of migration, so we set th_local = 1 to avoid
3594                  * updating last rcvd for this transaction */
3595                 handle->th_local = 1;
3596                 if (likely(!target_exist)) {
3597                         rc = mdo_declare_index_insert(env, mdd_tobj,
3598                                                       &ent->lde_fid,
3599                                                       mdd_object_type(child),
3600                                                       name, handle);
3601                         if (rc != 0)
3602                                 GOTO(out_put, rc);
3603
3604                         if (is_dir) {
3605                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3606                                 if (rc != 0)
3607                                         GOTO(out_put, rc);
3608                         }
3609                 }
3610
3611                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3612                 if (rc != 0)
3613                         GOTO(out_put, rc);
3614
3615                 if (is_dir) {
3616                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3617                         if (rc != 0)
3618                                 GOTO(out_put, rc);
3619
3620                         /* Update .. for child */
3621                         rc = mdo_declare_index_delete(env, child, dotdot,
3622                                                       handle);
3623                         if (rc != 0)
3624                                 GOTO(out_put, rc);
3625
3626                         rc = mdo_declare_index_insert(env, child,
3627                                                       mdd_object_fid(mdd_tobj),
3628                                                       S_IFDIR, dotdot, handle);
3629                         if (rc != 0)
3630                                 GOTO(out_put, rc);
3631                 }
3632
3633                 rc = mdd_linkea_declare_update_child(env, mdd_tobj,
3634                                                      child, name,
3635                                                      strlen(name),
3636                                                      handle);
3637                 if (rc != 0)
3638                         GOTO(out_put, rc);
3639
3640                 rc = mdd_trans_start(env, mdd, handle);
3641                 if (rc != 0) {
3642                         CERROR("%s: transaction start failed: rc = %d\n",
3643                                mdd2obd_dev(mdd)->obd_name, rc);
3644                         GOTO(out_put, rc);
3645                 }
3646
3647                 if (likely(!target_exist)) {
3648                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3649                                                 mdd_object_type(child),
3650                                                 name, handle);
3651                         if (rc != 0)
3652                                 GOTO(out_put, rc);
3653                 }
3654
3655                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3656                 if (rc != 0)
3657                         GOTO(out_put, rc);
3658
3659                 if (is_dir) {
3660                         rc = __mdd_index_delete_only(env, child, dotdot,
3661                                                      handle);
3662                         if (rc != 0)
3663                                 GOTO(out_put, rc);
3664
3665                         rc = __mdd_index_insert_only(env, child,
3666                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3667                                          dotdot, handle);
3668                         if (rc != 0)
3669                                 GOTO(out_put, rc);
3670                 }
3671
3672                 rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
3673                                              strlen(name), handle);
3674
3675 out_put:
3676                 mdd_object_put(env, child);
3677                 mdd_trans_stop(env, mdd, rc, handle);
3678                 if (rc != 0)
3679                         GOTO(out, rc);
3680 next:
3681                 result = iops->next(env, it);
3682                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3683                         GOTO(out, rc = -EINTR);
3684
3685                 if (result == -ESTALE)
3686                         goto next;
3687         } while (result == 0);
3688 out:
3689         iops->put(env, it);
3690         iops->fini(env, it);
3691 out_ent:
3692         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3693         RETURN(rc);
3694 }
3695
3696 static int mdd_declare_update_linkea(const struct lu_env *env,
3697                                      struct mdd_object *mdd_pobj,
3698                                      struct mdd_object *mdd_sobj,
3699                                      struct mdd_object *mdd_tobj,
3700                                      struct thandle *handle,
3701                                      const struct lu_name *child_name)
3702 {
3703         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3704                                           child_name, handle, 1);
3705 }
3706
3707 static int mdd_update_linkea(const struct lu_env *env,
3708                              struct mdd_object *mdd_pobj,
3709                              struct mdd_object *mdd_sobj,
3710                              struct mdd_object *mdd_tobj,
3711                              struct thandle *handle,
3712                              const struct lu_name *child_name)
3713 {
3714         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3715                                           child_name, handle, 0);
3716 }
3717
3718 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3719                                            struct mdd_object *mdd_pobj,
3720                                            struct mdd_object *mdd_sobj,
3721                                            struct mdd_object *mdd_tobj,
3722                                            const struct lu_name *lname,
3723                                            struct lu_attr *la,
3724                                            struct lu_attr *parent_la,
3725                                            struct thandle *handle)
3726 {
3727         struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
3728         int             rc;
3729
3730         /* Revert IMMUTABLE flag */
3731         la_flag->la_valid = LA_FLAGS;
3732         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3733         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3734         if (rc != 0)
3735                 return rc;
3736
3737         /* delete entry from source dir */
3738         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3739         if (rc != 0)
3740                 return rc;
3741
3742         rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3743                                        mdd_tobj, handle, lname);
3744         if (rc != 0)
3745                 return rc;
3746
3747         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3748                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3749                                            handle);
3750                 if (rc != 0)
3751                         return rc;
3752         }
3753
3754         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3755                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3756                 if (rc != 0)
3757                         return rc;
3758         }
3759
3760         /* new name */
3761         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3762                                       mdd_object_type(mdd_tobj),
3763                                       lname->ln_name, handle);
3764         if (rc != 0)
3765                 return rc;
3766
3767         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
3768         if (rc != 0)
3769                 return rc;
3770
3771         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3772                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3773                 if (rc != 0)
3774                         return rc;
3775         }
3776
3777         /* delete old object */
3778         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3779         if (rc != 0)
3780                 return rc;
3781
3782         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3783                 /* delete old object */
3784                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3785                 if (rc != 0)
3786                         return rc;
3787                 /* set nlink to 0 */
3788                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3789                 if (rc != 0)
3790                         return rc;
3791         }
3792
3793         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3794         if (rc)
3795                 return rc;
3796
3797         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3798
3799         return rc;
3800 }
3801
3802 static int mdd_migrate_update_name(const struct lu_env *env,
3803                                    struct mdd_object *mdd_pobj,
3804                                    struct mdd_object *mdd_sobj,
3805                                    struct mdd_object *mdd_tobj,
3806                                    const struct lu_name *lname,
3807                                    struct md_attr *ma)
3808 {
3809         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3810         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3811         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3812         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3813         struct thandle          *handle;
3814         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3815         const char              *name = lname->ln_name;
3816         int                     rc;
3817         ENTRY;
3818
3819         /* update time for parent */
3820         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3821         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3822         p_la->la_valid = LA_CTIME;
3823
3824         rc = mdd_la_get(env, mdd_sobj, so_attr);
3825         if (rc != 0)
3826                 RETURN(rc);
3827
3828         handle = mdd_trans_create(env, mdd);
3829         if (IS_ERR(handle))
3830                 RETURN(PTR_ERR(handle));
3831
3832         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3833                                              lname, so_attr, p_la, handle);
3834         if (rc != 0) {
3835                 /* If the migration can not be fit in one transaction, just
3836                  * leave it in the original MDT */
3837                 if (rc == -E2BIG)
3838                         GOTO(stop_trans, rc = 0);
3839                 else
3840                         GOTO(stop_trans, rc);
3841         }
3842
3843         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3844                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3845                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3846                PFID(mdd_object_fid(mdd_tobj)));
3847
3848         rc = mdd_trans_start(env, mdd, handle);
3849         if (rc != 0)
3850                 GOTO(stop_trans, rc);
3851
3852         /* Revert IMMUTABLE flag */
3853         la_flag->la_valid = LA_FLAGS;
3854         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3855         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3856         if (rc != 0)
3857                 GOTO(stop_trans, rc);
3858
3859         /* Remove source name from source directory */
3860         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
3861         if (rc != 0)
3862                 GOTO(stop_trans, rc);
3863
3864         rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3865                                handle, lname);
3866         if (rc != 0)
3867                 GOTO(stop_trans, rc);
3868
3869         if (S_ISREG(so_attr->la_mode)) {
3870                 if (so_attr->la_nlink == 1) {
3871                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3872                                            handle);
3873                         if (rc != 0 && rc != -ENODATA)
3874                                 GOTO(stop_trans, rc);
3875                 }
3876         }
3877
3878         /* Insert new fid with target name into target dir */
3879         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
3880                                 mdd_object_type(mdd_tobj), name, handle);
3881         if (rc != 0)
3882                 GOTO(stop_trans, rc);
3883
3884         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
3885                            NULL, 1);
3886         if (rc != 0)
3887                 GOTO(stop_trans, rc);
3888
3889         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3890         mdo_ref_del(env, mdd_sobj, handle);
3891         if (is_dir)
3892                 mdo_ref_del(env, mdd_sobj, handle);
3893
3894         /* Get the attr again after ref_del */
3895         rc = mdd_la_get(env, mdd_sobj, so_attr);
3896         if (rc != 0)
3897                 GOTO(out_unlock, rc);
3898
3899         ma->ma_attr = *so_attr;
3900         ma->ma_valid |= MA_INODE;
3901         rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
3902         if (rc != 0)
3903                 GOTO(out_unlock, rc);
3904
3905         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
3906         if (rc != 0)
3907                 GOTO(out_unlock, rc);
3908
3909 out_unlock:
3910         mdd_write_unlock(env, mdd_sobj);
3911
3912 stop_trans:
3913         mdd_trans_stop(env, mdd, rc, handle);
3914
3915         RETURN(rc);
3916 }
3917
3918 /**
3919  * Check whether we should migrate the file/dir
3920  * return val
3921  *      < 0  permission check failed or other error.
3922  *      = 0  the file can be migrated.
3923  *      > 0  the file does not need to be migrated, mostly
3924  *           for multiple link file
3925  **/
3926 static int mdd_migrate_sanity_check(const struct lu_env *env,
3927                                     struct mdd_object *pobj,
3928                                     const struct lu_attr *pattr,
3929                                     struct mdd_object *sobj,
3930                                     struct lu_attr *sattr)
3931 {
3932         struct mdd_thread_info  *info = mdd_env_info(env);
3933         struct linkea_data      *ldata = &info->mti_link_data;
3934         int                     mgr_easize;
3935         struct lu_buf           *mgr_buf;
3936         int                     count;
3937         int                     rc;
3938
3939         ENTRY;
3940
3941         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3942         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
3943         if (mgr_buf->lb_buf == NULL)
3944                 RETURN(-ENOMEM);
3945
3946         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
3947         if (rc > 0) {
3948                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
3949
3950                 /* If the object has migrateEA, it means IMMUTE flag
3951                  * is being set by previous migration process, so it
3952                  * needs to override the IMMUTE flag, otherwise the
3953                  * following sanity check will fail */
3954                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
3955                                                 LMV_HASH_FLAG_MIGRATION) {
3956                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3957
3958                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
3959                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
3960                                mdd2obd_dev(mdd)->obd_name,
3961                                PFID(mdd_object_fid(sobj)));
3962                 }
3963         }
3964
3965         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
3966                                      sobj, sattr, NULL, NULL);
3967         if (rc != 0)
3968                 RETURN(rc);
3969
3970         /* Then it will check if the file should be migrated. If the file
3971          * has mulitple links, we only need migrate the file if all of its
3972          * entries has been migrated to the remote MDT */
3973         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
3974                 RETURN(0);
3975
3976         rc = mdd_links_read(env, sobj, ldata);
3977         if (rc != 0) {
3978                 /* For multiple links files, if there are no linkEA data at all,
3979                  * means the file might be created before linkEA is enabled, and
3980                  * all all of its links should not be migrated yet, otherwise
3981                  * it should have some linkEA there */
3982                 if (rc == -ENOENT || rc == -ENODATA)
3983                         RETURN(1);
3984                 RETURN(rc);
3985         }
3986
3987         /* If it is mulitple links file, we need update the name entry for
3988          * all parent */
3989         LASSERT(ldata->ld_leh != NULL);
3990         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3991         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3992                 struct mdd_device       *mdd = mdo2mdd(&sobj->mod_obj);
3993                 struct mdd_object       *lpobj;
3994                 struct lu_name          lname;
3995                 struct lu_fid           fid;
3996
3997                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3998                                     &lname, &fid);
3999                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4000                                                          ldata->ld_reclen);
4001                 lpobj = mdd_object_find(env, mdd, &fid);
4002                 if (IS_ERR(lpobj)) {
4003                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
4004                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
4005                               PTR_ERR(lpobj));
4006                         continue;
4007                 }
4008
4009                 if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
4010                         CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
4011                                PFID(&fid), lname.ln_namelen, lname.ln_name);
4012                         mdd_object_put(env, lpobj);
4013                         continue;
4014                 }
4015
4016                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4017                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4018                        lname.ln_name, PFID(&fid));
4019                 mdd_object_put(env, lpobj);
4020                 rc = 1;
4021                 break;
4022         }
4023
4024         RETURN(rc);
4025 }
4026
4027 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4028                        struct md_object *sobj, const struct lu_name *lname,
4029                        struct md_object *tobj, struct md_attr *ma)
4030 {
4031         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4032         struct mdd_device       *mdd = mdo2mdd(pobj);
4033         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4034         struct mdd_object       *mdd_tobj = NULL;
4035         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4036         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4037         int                     rc;
4038
4039         ENTRY;
4040         /* If the file will being migrated, it will check whether
4041          * the file is being opened by someone else right now */
4042         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4043         if (mdd_sobj->mod_count >= 1) {
4044                 CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
4045                        mdd2obd_dev(mdd)->obd_name,
4046                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4047                        mdd_sobj->mod_count, -EBUSY);
4048                 mdd_read_unlock(env, mdd_sobj);
4049                 GOTO(put, rc = -EBUSY);
4050         }
4051         mdd_read_unlock(env, mdd_sobj);
4052
4053         rc = mdd_la_get(env, mdd_sobj, so_attr);
4054         if (rc != 0)
4055                 GOTO(put, rc);
4056
4057         rc = mdd_la_get(env, mdd_pobj, pattr);
4058         if (rc != 0)
4059                 GOTO(put, rc);
4060
4061         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4062         if (rc != 0) {
4063                 if (rc > 0)
4064                         rc = 0;
4065                 GOTO(put, rc);
4066         }
4067
4068         /* Sigh, it is impossible to finish all of migration in a single
4069          * transaction, for example migrating big directory entries to the
4070          * new MDT, it needs insert all of name entries of children in the
4071          * new directory.
4072          *
4073          * So migration will be done in multiple steps and transactions.
4074          *
4075          * 1. create an orphan object on the remote MDT in one transaction.
4076          * 2. migrate extend attributes to the new target file/directory.
4077          * 3. For directory, migrate the entries to the new MDT and update
4078          * linkEA of each children. Because we can not migrate all entries
4079          * in a single transaction, so the migrating directory will become
4080          * a striped directory during migration, so once the process is
4081          * interrupted, the directory is still accessible. (During lookup,
4082          * client will locate the name by searching both original and target
4083          * object).
4084          * 4. Finally, update the name/FID to point to the new file/directory
4085          * in a separate transaction.
4086          */
4087
4088         /* step 1: Check whether the orphan object has been created, and create
4089          * orphan object on the remote MDT if needed */
4090         mdd_tobj = md2mdd_obj(tobj);
4091         if (!mdd_object_exists(mdd_tobj)) {
4092                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4093                                         lname, so_attr);
4094                 if (rc != 0)
4095                         GOTO(put, rc);
4096         }
4097
4098         /* step 2: migrate xattr */
4099         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4100         if (rc != 0)
4101                 GOTO(put, rc);
4102
4103         /* step 3: migrate name entries to the orphan object */
4104         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4105                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4106                 if (rc != 0)
4107                         GOTO(put, rc);
4108                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4109                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4110                         GOTO(put, rc = 0);
4111         } else {
4112                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4113         }
4114
4115         /* step 4: update name entry to the new object */
4116         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4117                                      ma);
4118         if (rc != 0)
4119                 GOTO(put, rc);
4120 put:
4121         RETURN(rc);
4122 }
4123
4124 const struct md_dir_operations mdd_dir_ops = {
4125         .mdo_is_subdir     = mdd_is_subdir,
4126         .mdo_lookup        = mdd_lookup,
4127         .mdo_create        = mdd_create,
4128         .mdo_rename        = mdd_rename,
4129         .mdo_link          = mdd_link,
4130         .mdo_unlink        = mdd_unlink,
4131         .mdo_create_data   = mdd_create_data,
4132         .mdo_migrate       = mdd_migrate,
4133 };