Whamcloud - gitweb
51794b1bea9b530adfa969d01d2e4a7543b3ece8
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static inline int
61 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
62 {
63         if (!lu_name_is_valid(ln))
64                 return -EINVAL;
65         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
66                 return -ENAMETOOLONG;
67         else
68                 return 0;
69 }
70
71 /* Get FID from name and parent */
72 static int
73 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
74              const struct lu_attr *pattr, const struct lu_name *lname,
75              struct lu_fid* fid, int mask)
76 {
77         const char *name                = lname->ln_name;
78         const struct dt_key *key        = (const struct dt_key *)name;
79         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
80         struct mdd_device *m            = mdo2mdd(pobj);
81         struct dt_object *dir           = mdd_object_child(mdd_obj);
82         int rc;
83         ENTRY;
84
85         if (unlikely(mdd_is_dead_obj(mdd_obj)))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         } else if (!mdd_object_exists(mdd_obj)) {
92                 RETURN(-ESTALE);
93         }
94
95         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
96                                             MOR_TGT_PARENT);
97         if (rc)
98                 RETURN(rc);
99
100         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
101                    dt_try_as_dir(env, dir)))
102                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key,
103                                mdd_object_capa(env, mdd_obj));
104         else
105                 rc = -ENOTDIR;
106
107         RETURN(rc);
108 }
109
110 int mdd_lookup(const struct lu_env *env,
111                struct md_object *pobj, const struct lu_name *lname,
112                struct lu_fid *fid, struct md_op_spec *spec)
113 {
114         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
115         int rc;
116         ENTRY;
117
118         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr, BYPASS_CAPA);
119         if (rc != 0)
120                 RETURN(rc);
121
122         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
123                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
124         RETURN(rc);
125 }
126
127 static inline int mdd_parent_fid(const struct lu_env *env,
128                                  struct mdd_object *obj,
129                                  const struct lu_attr *attr,
130                                  struct lu_fid *fid)
131 {
132         return __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
133 }
134
135 /*
136  * For root fid use special function, which does not compare version component
137  * of fid. Version component is different for root fids on all MDTs.
138  */
139 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
140 {
141         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
142                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
143 }
144
145 /*
146  * return 1: if lf is the fid of the ancestor of p1;
147  * return 0: if not;
148  *
149  * return -EREMOTE: if remote object is found, in this
150  * case fid of remote object is saved to @pf;
151  *
152  * otherwise: values < 0, errors.
153  */
154 static int mdd_is_parent(const struct lu_env *env,
155                         struct mdd_device *mdd,
156                         struct mdd_object *p1,
157                         const struct lu_attr *attr,
158                         const struct lu_fid *lf,
159                         struct lu_fid *pf)
160 {
161         struct mdd_object *parent = NULL;
162         struct lu_fid *pfid;
163         int rc;
164         ENTRY;
165
166         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
167         pfid = &mdd_env_info(env)->mti_fid;
168
169         /* Check for root first. */
170         if (mdd_is_root(mdd, mdo2fid(p1)))
171                 RETURN(0);
172
173         for(;;) {
174                 /* this is done recursively, bypass capa for each obj */
175                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
176                 rc = mdd_parent_fid(env, p1, attr, pfid);
177                 if (rc)
178                         GOTO(out, rc);
179                 if (mdd_is_root(mdd, pfid))
180                         GOTO(out, rc = 0);
181                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
182                         GOTO(out, rc = 0);
183                 if (lu_fid_eq(pfid, lf))
184                         GOTO(out, rc = 1);
185                 if (parent)
186                         mdd_object_put(env, parent);
187
188                 parent = mdd_object_find(env, mdd, pfid);
189                 if (IS_ERR(parent)) {
190                         GOTO(out, rc = PTR_ERR(parent));
191                 } else if (mdd_object_remote(parent)) {
192                         /*FIXME: Because of the restriction of rename in Phase I.
193                          * If the parent is remote, we just assumed lf is not the
194                          * parent of P1 for now */
195                         GOTO(out, rc = 0);
196                 }
197                 p1 = parent;
198         }
199         EXIT;
200 out:
201         if (parent && !IS_ERR(parent))
202                 mdd_object_put(env, parent);
203         return rc;
204 }
205
206 /*
207  * No permission check is needed.
208  *
209  * returns 1: if fid is ancestor of @mo;
210  * returns 0: if fid is not a ancestor of @mo;
211  *
212  * returns EREMOTE if remote object is found, fid of remote object is saved to
213  * @fid;
214  *
215  * returns < 0: if error
216  */
217 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
218                   const struct lu_fid *fid, struct lu_fid *sfid)
219 {
220         struct mdd_device *mdd = mdo2mdd(mo);
221         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
222         int rc;
223         ENTRY;
224
225         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
226                 RETURN(0);
227
228         rc = mdd_la_get(env, md2mdd_obj(mo), attr, BYPASS_CAPA);
229         if (rc != 0)
230                 RETURN(rc);
231
232         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
233         if (rc == 0) {
234                 /* found root */
235                 fid_zero(sfid);
236         } else if (rc == 1) {
237                 /* found @fid is parent */
238                 *sfid = *fid;
239                 rc = 0;
240         }
241         RETURN(rc);
242 }
243
244 /*
245  * Check that @dir contains no entries except (possibly) dot and dotdot.
246  *
247  * Returns:
248  *
249  *             0        empty
250  *      -ENOTDIR        not a directory object
251  *    -ENOTEMPTY        not empty
252  *           -ve        other error
253  *
254  */
255 static int mdd_dir_is_empty(const struct lu_env *env,
256                             struct mdd_object *dir)
257 {
258         struct dt_it     *it;
259         struct dt_object *obj;
260         const struct dt_it_ops *iops;
261         int result;
262         ENTRY;
263
264         obj = mdd_object_child(dir);
265         if (!dt_try_as_dir(env, obj))
266                 RETURN(-ENOTDIR);
267
268         iops = &obj->do_index_ops->dio_it;
269         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
270         if (!IS_ERR(it)) {
271                 result = iops->get(env, it, (const struct dt_key *)"");
272                 if (result > 0) {
273                         int i;
274                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
275                                 result = iops->next(env, it);
276                         if (result == 0)
277                                 result = -ENOTEMPTY;
278                         else if (result == 1)
279                                 result = 0;
280                 } else if (result == 0)
281                         /*
282                          * Huh? Index contains no zero key?
283                          */
284                         result = -EIO;
285
286                 iops->put(env, it);
287                 iops->fini(env, it);
288         } else
289                 result = PTR_ERR(it);
290         RETURN(result);
291 }
292
293 /**
294  * Determine if the target object can be hard linked, and right now it only
295  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
296  * directory nlink count might exceed the maximum link count(see
297  * osd_object_ref_add), so it only check nlink for non-directories.
298  *
299  * \param[in] env       thread environment
300  * \param[in] obj       object being linked to
301  * \param[in] la        attributes of \a obj
302  *
303  * \retval              0 if \a obj can be hard linked
304  * \retval              negative error if \a obj is a directory or has too
305  *                      many links
306  */
307 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
308                           const struct lu_attr *la)
309 {
310         struct mdd_device *m = mdd_obj2mdd_dev(obj);
311         ENTRY;
312
313         LASSERT(la != NULL);
314
315         /* Subdir count limitation can be broken through
316          * (see osd_object_ref_add), so only check non-directory here. */
317         if (!S_ISDIR(la->la_mode) &&
318             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
319                 RETURN(-EMLINK);
320
321         RETURN(0);
322 }
323
324 /**
325  * Check whether it may create the cobj under the pobj.
326  *
327  * \param[in] env       execution environment
328  * \param[in] pobj      the parent directory
329  * \param[in] pattr     the attribute of the parent directory
330  * \param[in] cobj      the child to be created
331  * \param[in] check_perm        if check WRITE|EXEC permission for parent
332  *
333  * \retval              = 0 create the child under this dir is allowed
334  * \retval              negative errno create the child under this dir is
335  *                      not allowed
336  */
337 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
338                    const struct lu_attr *pattr, struct mdd_object *cobj,
339                    bool check_perm)
340 {
341         struct mdd_thread_info *info = mdd_env_info(env);
342         struct lu_buf   *xbuf;
343         int rc = 0;
344         ENTRY;
345
346         if (cobj && mdd_object_exists(cobj))
347                 RETURN(-EEXIST);
348
349         if (mdd_is_dead_obj(pobj))
350                 RETURN(-ENOENT);
351
352         /* If the parent is a sub-stripe, check whether it is dead */
353         xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
354         rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV,
355                            mdd_object_capa(env, pobj));
356         if (unlikely(rc > 0)) {
357                 struct lmv_mds_md_v1  *lmv1 = xbuf->lb_buf;
358
359                 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
360                     le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
361                         RETURN(-ESTALE);
362         }
363         rc = 0;
364
365         if (check_perm)
366                 rc = mdd_permission_internal_locked(env, pobj, pattr,
367                                                     MAY_WRITE | MAY_EXEC,
368                                                     MOR_TGT_PARENT);
369         RETURN(rc);
370 }
371
372 /*
373  * Check whether can unlink from the pobj in the case of "cobj == NULL".
374  */
375 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
376                    const struct lu_attr *pattr, const struct lu_attr *attr)
377 {
378         int rc;
379         ENTRY;
380
381         if (mdd_is_dead_obj(pobj))
382                 RETURN(-ENOENT);
383
384         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
385                 RETURN(-EPERM);
386
387         rc = mdd_permission_internal_locked(env, pobj, pattr,
388                                             MAY_WRITE | MAY_EXEC,
389                                             MOR_TGT_PARENT);
390         if (rc != 0)
391                 RETURN(rc);
392
393         if (pattr->la_flags & LUSTRE_APPEND_FL)
394                 RETURN(-EPERM);
395
396         RETURN(rc);
397 }
398
399 /*
400  * pobj == NULL is remote ops case, under such case, pobj's
401  * VTX feature has been checked already, no need check again.
402  */
403 static inline int mdd_is_sticky(const struct lu_env *env,
404                                 struct mdd_object *pobj,
405                                 const struct lu_attr *pattr,
406                                 struct mdd_object *cobj,
407                                 const struct lu_attr *cattr)
408 {
409         struct lu_ucred *uc = lu_ucred_assert(env);
410
411         if (pobj != NULL) {
412                 LASSERT(pattr != NULL);
413                 if (!(pattr->la_mode & S_ISVTX) ||
414                     (pattr->la_uid == uc->uc_fsuid))
415                         return 0;
416         }
417
418         LASSERT(cattr != NULL);
419         if (cattr->la_uid == uc->uc_fsuid)
420                 return 0;
421
422         return !md_capable(uc, CFS_CAP_FOWNER);
423 }
424
425 static int mdd_may_delete_entry(const struct lu_env *env,
426                                 struct mdd_object *pobj,
427                                 const struct lu_attr *pattr,
428                                 int check_perm)
429 {
430         ENTRY;
431
432         LASSERT(pobj != NULL);
433         if (!mdd_object_exists(pobj))
434                 RETURN(-ENOENT);
435
436         if (mdd_is_dead_obj(pobj))
437                 RETURN(-ENOENT);
438
439         if (check_perm) {
440                 int rc;
441                 rc = mdd_permission_internal_locked(env, pobj, pattr,
442                                             MAY_WRITE | MAY_EXEC,
443                                             MOR_TGT_PARENT);
444                 if (rc)
445                         RETURN(rc);
446         }
447
448         if (pattr->la_flags & LUSTRE_APPEND_FL)
449                 RETURN(-EPERM);
450
451         RETURN(0);
452 }
453
454 /*
455  * Check whether it may delete the cobj from the pobj.
456  * pobj maybe NULL
457  */
458 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
459                    const struct lu_attr *tpattr, struct mdd_object *tobj,
460                    const struct lu_attr *tattr, const struct lu_attr *cattr,
461                    int check_perm, int check_empty)
462 {
463         int rc = 0;
464         ENTRY;
465
466         if (tpobj) {
467                 LASSERT(tpattr != NULL);
468                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
469                 if (rc != 0)
470                         RETURN(rc);
471         }
472
473         if (tobj == NULL)
474                 RETURN(0);
475
476         if (!mdd_object_exists(tobj))
477                 RETURN(-ENOENT);
478
479         if (mdd_is_dead_obj(tobj))
480                 RETURN(-ESTALE);
481
482         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
483                 RETURN(-EPERM);
484
485         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
486                 RETURN(-EPERM);
487
488         /* additional check the rename case */
489         if (cattr) {
490                 if (S_ISDIR(cattr->la_mode)) {
491                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
492
493                         if (!S_ISDIR(tattr->la_mode))
494                                 RETURN(-ENOTDIR);
495
496                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
497                                 RETURN(-EBUSY);
498                 } else if (S_ISDIR(tattr->la_mode))
499                         RETURN(-EISDIR);
500         }
501
502         if (S_ISDIR(tattr->la_mode) && check_empty)
503                 rc = mdd_dir_is_empty(env, tobj);
504
505         RETURN(rc);
506 }
507
508 /**
509  * Check whether it can create the link file(linked to @src_obj) under
510  * the target directory(@tgt_obj), and src_obj has been locked by
511  * mdd_write_lock.
512  *
513  * \param[in] env       execution environment
514  * \param[in] tgt_obj   the target directory
515  * \param[in] tattr     attributes of target directory
516  * \param[in] lname     the link name
517  * \param[in] src_obj   source object for link
518  * \param[in] cattr     attributes for source object
519  *
520  * \retval              = 0 it is allowed to create the link file under tgt_obj
521  * \retval              negative error not allowed to create the link file
522  */
523 static int mdd_link_sanity_check(const struct lu_env *env,
524                                  struct mdd_object *tgt_obj,
525                                  const struct lu_attr *tattr,
526                                  const struct lu_name *lname,
527                                  struct mdd_object *src_obj,
528                                  const struct lu_attr *cattr)
529 {
530         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
531         int rc = 0;
532         ENTRY;
533
534         if (!mdd_object_exists(src_obj))
535                 RETURN(-ENOENT);
536
537         if (mdd_is_dead_obj(src_obj))
538                 RETURN(-ESTALE);
539
540         /* Local ops, no lookup before link, check filename length here. */
541         rc = mdd_name_check(m, lname);
542         if (rc < 0)
543                 RETURN(rc);
544
545         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
546                 RETURN(-EPERM);
547
548         if (S_ISDIR(mdd_object_type(src_obj)))
549                 RETURN(-EPERM);
550
551         LASSERT(src_obj != tgt_obj);
552         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
553         if (rc != 0)
554                 RETURN(rc);
555
556         rc = __mdd_may_link(env, src_obj, cattr);
557
558         RETURN(rc);
559 }
560
561 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
562                                    const char *name, struct thandle *handle,
563                                    struct lustre_capa *capa)
564 {
565         struct dt_object *next = mdd_object_child(pobj);
566         int rc;
567         ENTRY;
568
569         if (dt_try_as_dir(env, next))
570                 rc = dt_delete(env, next, (struct dt_key *)name, handle, capa);
571         else
572                 rc = -ENOTDIR;
573
574         RETURN(rc);
575 }
576
577 static int __mdd_index_insert_only(const struct lu_env *env,
578                                    struct mdd_object *pobj,
579                                    const struct lu_fid *lf, __u32 type,
580                                    const char *name,
581                                    struct thandle *handle,
582                                    struct lustre_capa *capa)
583 {
584         struct dt_object *next = mdd_object_child(pobj);
585         int               rc;
586         ENTRY;
587
588         if (dt_try_as_dir(env, next)) {
589                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
590                 struct lu_ucred         *uc  = lu_ucred_check(env);
591                 int                      ignore_quota;
592
593                 rec->rec_fid = lf;
594                 rec->rec_type = type;
595                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
596                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
597                                (const struct dt_key *)name, handle, capa,
598                                ignore_quota);
599         } else {
600                 rc = -ENOTDIR;
601         }
602         RETURN(rc);
603 }
604
605 /* insert named index, add reference if isdir */
606 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
607                               const struct lu_fid *lf, __u32 type,
608                               const char *name, struct thandle *handle,
609                               struct lustre_capa *capa)
610 {
611         int rc;
612         ENTRY;
613
614         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle, capa);
615         if (rc == 0 && S_ISDIR(type)) {
616                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
617                 mdo_ref_add(env, pobj, handle);
618                 mdd_write_unlock(env, pobj);
619         }
620
621         RETURN(rc);
622 }
623
624 /* delete named index, drop reference if isdir */
625 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
626                               const char *name, int is_dir, struct thandle *handle,
627                               struct lustre_capa *capa)
628 {
629         int               rc;
630         ENTRY;
631
632         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
633         if (rc == 0 && is_dir) {
634                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
635                 mdo_ref_del(env, pobj, handle);
636                 mdd_write_unlock(env, pobj);
637         }
638
639         RETURN(rc);
640 }
641
642 static int mdd_llog_record_calc_size(const struct lu_env *env,
643                                      const struct lu_name *tname,
644                                      const struct lu_name *sname)
645 {
646         const struct lu_ucred   *uc = lu_ucred(env);
647         enum changelog_rec_flags crf = 0;
648         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
649                                             sizeof(struct changelog_rec);
650
651         if (sname != NULL)
652                 crf |= CLF_RENAME;
653
654         if (uc != NULL && uc->uc_jobid[0] != '\0')
655                 crf |= CLF_JOBID;
656
657         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
658                              (tname != NULL ? tname->ln_namelen : 0) +
659                              (sname != NULL ? 1 + sname->ln_namelen : 0));
660 }
661
662 int mdd_declare_changelog_store(const struct lu_env *env,
663                                 struct mdd_device *mdd,
664                                 const struct lu_name *tname,
665                                 const struct lu_name *sname,
666                                 struct thandle *handle)
667 {
668         struct obd_device               *obd = mdd2obd_dev(mdd);
669         struct llog_ctxt                *ctxt;
670         struct llog_changelog_rec       *rec;
671         struct lu_buf                   *buf;
672         int                              reclen;
673         int                              rc;
674
675         /* Not recording */
676         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
677                 return 0;
678
679         reclen = mdd_llog_record_calc_size(env, tname, sname);
680         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
681         if (buf->lb_buf == NULL)
682                 return -ENOMEM;
683
684         rec = buf->lb_buf;
685         rec->cr_hdr.lrh_len = reclen;
686         rec->cr_hdr.lrh_type = CHANGELOG_REC;
687
688         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
689         if (ctxt == NULL)
690                 return -ENXIO;
691
692         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
693         llog_ctxt_put(ctxt);
694
695         return rc;
696 }
697
698 /** Add a changelog entry \a rec to the changelog llog
699  * \param mdd
700  * \param rec
701  * \param handle - currently ignored since llogs start their own transaction;
702  *              this will hopefully be fixed in llog rewrite
703  * \retval 0 ok
704  */
705 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
706                         struct llog_changelog_rec *rec, struct thandle *th)
707 {
708         struct obd_device       *obd = mdd2obd_dev(mdd);
709         struct llog_ctxt        *ctxt;
710         int                      rc;
711
712         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
713                                             changelog_rec_varsize(&rec->cr));
714
715         /* llog_lvfs_write_rec sets the llog tail len */
716         rec->cr_hdr.lrh_type = CHANGELOG_REC;
717         rec->cr.cr_time = cl_time();
718
719         spin_lock(&mdd->mdd_cl.mc_lock);
720         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
721          * but as long as the MDD transactions are ordered correctly for e.g.
722          * rename conflicts, I don't think this should matter. */
723         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
724         spin_unlock(&mdd->mdd_cl.mc_lock);
725
726         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
727         if (ctxt == NULL)
728                 return -ENXIO;
729
730         /* nested journal transaction */
731         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th);
732         llog_ctxt_put(ctxt);
733         if (rc > 0)
734                 rc = 0;
735
736         return rc;
737 }
738
739 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
740                                          const struct lu_fid *sfid,
741                                          const struct lu_fid *spfid,
742                                          const struct lu_name *sname)
743 {
744         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
745         size_t                           extsize = sname->ln_namelen + 1;
746
747         LASSERT(sfid != NULL);
748         LASSERT(spfid != NULL);
749         LASSERT(sname != NULL);
750
751         rnm->cr_sfid = *sfid;
752         rnm->cr_spfid = *spfid;
753
754         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
755         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
756         rec->cr_namelen += extsize;
757 }
758
759 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
760 {
761         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
762
763         if (jobid == NULL || jobid[0] == '\0')
764                 return;
765
766         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
767 }
768
769 /** Store a namespace change changelog record
770  * If this fails, we must fail the whole transaction; we don't
771  * want the change to commit without the log entry.
772  * \param target - mdd_object of change
773  * \param tpfid - target parent dir/object fid
774  * \param sfid - source object fid
775  * \param spfid - source parent fid
776  * \param tname - target name string
777  * \param sname - source name string
778  * \param handle - transacion handle
779  */
780 int mdd_changelog_ns_store(const struct lu_env *env,
781                            struct mdd_device *mdd,
782                            enum changelog_rec_type type,
783                            enum changelog_rec_flags crf,
784                            struct mdd_object *target,
785                            const struct lu_fid *tpfid,
786                            const struct lu_fid *sfid,
787                            const struct lu_fid *spfid,
788                            const struct lu_name *tname,
789                            const struct lu_name *sname,
790                            struct thandle *handle)
791 {
792         const struct lu_ucred           *uc = lu_ucred(env);
793         struct llog_changelog_rec       *rec;
794         struct lu_buf                   *buf;
795         int                              reclen;
796         int                              rc;
797         ENTRY;
798
799         /* Not recording */
800         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
801                 RETURN(0);
802
803         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
804                 RETURN(0);
805
806         LASSERT(tpfid != NULL);
807         LASSERT(tname != NULL);
808         LASSERT(handle != NULL);
809
810         reclen = mdd_llog_record_calc_size(env, tname, sname);
811         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
812         if (buf->lb_buf == NULL)
813                 RETURN(-ENOMEM);
814         rec = buf->lb_buf;
815
816         crf = (crf & CLF_FLAGMASK);
817
818         if (uc->uc_jobid[0] != '\0')
819                 crf |= CLF_JOBID;
820
821         if (sname != NULL)
822                 crf |= CLF_RENAME;
823         else
824                 crf |= CLF_VERSION;
825
826         rec->cr.cr_flags = crf;
827         rec->cr.cr_type = (__u32)type;
828         rec->cr.cr_pfid = *tpfid;
829         rec->cr.cr_namelen = tname->ln_namelen;
830         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
831
832         if (crf & CLF_RENAME)
833                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
834
835         if (crf & CLF_JOBID)
836                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
837
838         if (likely(target != NULL)) {
839                 rec->cr.cr_tfid = *mdo2fid(target);
840                 target->mod_cltime = cfs_time_current_64();
841         } else {
842                 fid_zero(&rec->cr.cr_tfid);
843         }
844
845         rc = mdd_changelog_store(env, mdd, rec, handle);
846         if (rc < 0) {
847                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
848                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
849                 return -EFAULT;
850         }
851
852         return 0;
853 }
854
855 static int __mdd_links_add(const struct lu_env *env,
856                            struct mdd_object *mdd_obj,
857                            struct linkea_data *ldata,
858                            const struct lu_name *lname,
859                            const struct lu_fid *pfid,
860                            int first, int check)
861 {
862         int rc;
863
864         if (ldata->ld_leh == NULL) {
865                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
866                 if (rc) {
867                         if (rc != -ENODATA)
868                                 return rc;
869                         rc = linkea_data_new(ldata,
870                                              &mdd_env_info(env)->mti_link_buf);
871                         if (rc)
872                                 return rc;
873                 }
874         }
875
876         if (check) {
877                 rc = linkea_links_find(ldata, lname, pfid);
878                 if (rc && rc != -ENOENT)
879                         return rc;
880                 if (rc == 0)
881                         return -EEXIST;
882         }
883
884         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
885                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
886
887                 *tfid = *pfid;
888                 tfid->f_ver = ~0;
889                 linkea_add_buf(ldata, lname, tfid);
890         }
891
892         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
893                 linkea_add_buf(ldata, lname, pfid);
894
895         return linkea_add_buf(ldata, lname, pfid);
896 }
897
898 static int __mdd_links_del(const struct lu_env *env,
899                            struct mdd_object *mdd_obj,
900                            struct linkea_data *ldata,
901                            const struct lu_name *lname,
902                            const struct lu_fid *pfid)
903 {
904         int rc;
905
906         if (ldata->ld_leh == NULL) {
907                 rc = mdd_links_read(env, mdd_obj, ldata);
908                 if (rc)
909                         return rc;
910         }
911
912         rc = linkea_links_find(ldata, lname, pfid);
913         if (rc)
914                 return rc;
915
916         linkea_del_buf(ldata, lname);
917         return 0;
918 }
919
920 static int mdd_linkea_prepare(const struct lu_env *env,
921                               struct mdd_object *mdd_obj,
922                               const struct lu_fid *oldpfid,
923                               const struct lu_name *oldlname,
924                               const struct lu_fid *newpfid,
925                               const struct lu_name *newlname,
926                               int first, int check,
927                               struct linkea_data *ldata)
928 {
929         int rc = 0;
930         int rc2 = 0;
931         ENTRY;
932
933         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
934                 return 0;
935
936         LASSERT(oldpfid != NULL || newpfid != NULL);
937
938         if (mdd_obj->mod_flags & DEAD_OBJ) {
939                 /* Prevent linkea to be updated which is NOT necessary. */
940                 ldata->ld_reclen = 0;
941                 /* No more links, don't bother */
942                 RETURN(0);
943         }
944
945         if (oldpfid != NULL) {
946                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
947                 if (rc) {
948                         if ((check == 1) ||
949                             (rc != -ENODATA && rc != -ENOENT))
950                                 RETURN(rc);
951                         /* No changes done. */
952                         rc = 0;
953                 }
954         }
955
956         /* If renaming, add the new record */
957         if (newpfid != NULL) {
958                 /* even if the add fails, we still delete the out-of-date
959                  * old link */
960                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
961                                       first, check);
962         }
963
964         rc = rc != 0 ? rc : rc2;
965
966         RETURN(rc);
967 }
968
969 int mdd_links_rename(const struct lu_env *env,
970                      struct mdd_object *mdd_obj,
971                      const struct lu_fid *oldpfid,
972                      const struct lu_name *oldlname,
973                      const struct lu_fid *newpfid,
974                      const struct lu_name *newlname,
975                      struct thandle *handle,
976                      struct linkea_data *ldata,
977                      int first, int check)
978 {
979         int rc = 0;
980         ENTRY;
981
982         if (ldata == NULL) {
983                 ldata = &mdd_env_info(env)->mti_link_data;
984                 memset(ldata, 0, sizeof(*ldata));
985                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
986                                         newpfid, newlname, first, check,
987                                         ldata);
988                 if (rc != 0)
989                         GOTO(out, rc);
990         }
991
992         if (ldata->ld_reclen != 0)
993                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
994         EXIT;
995 out:
996         if (rc != 0) {
997                 int error = 1;
998                 if (rc == -EOVERFLOW || rc == -ENOSPC)
999                         error = 0;
1000                 if (oldpfid == NULL)
1001                         CDEBUG(error ? D_ERROR : D_OTHER,
1002                                "link_ea add '%.*s' failed %d "DFID"\n",
1003                                newlname->ln_namelen, newlname->ln_name,
1004                                rc, PFID(mdd_object_fid(mdd_obj)));
1005                 else if (newpfid == NULL)
1006                         CDEBUG(error ? D_ERROR : D_OTHER,
1007                                "link_ea del '%.*s' failed %d "DFID"\n",
1008                                oldlname->ln_namelen, oldlname->ln_name,
1009                                rc, PFID(mdd_object_fid(mdd_obj)));
1010                 else
1011                         CDEBUG(error ? D_ERROR : D_OTHER,
1012                                "link_ea rename '%.*s'->'%.*s' failed %d "
1013                                DFID"\n",
1014                                oldlname->ln_namelen, oldlname->ln_name,
1015                                newlname->ln_namelen, newlname->ln_name,
1016                                rc, PFID(mdd_object_fid(mdd_obj)));
1017         }
1018
1019         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1020                 /* if we vmalloced a large buffer drop it */
1021                 lu_buf_free(ldata->ld_buf);
1022
1023         return rc;
1024 }
1025
1026 static inline int mdd_links_add(const struct lu_env *env,
1027                                 struct mdd_object *mdd_obj,
1028                                 const struct lu_fid *pfid,
1029                                 const struct lu_name *lname,
1030                                 struct thandle *handle,
1031                                 struct linkea_data *ldata, int first)
1032 {
1033         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1034                                 pfid, lname, handle, ldata, first, 0);
1035 }
1036
1037 static inline int mdd_links_del(const struct lu_env *env,
1038                                 struct mdd_object *mdd_obj,
1039                                 const struct lu_fid *pfid,
1040                                 const struct lu_name *lname,
1041                                 struct thandle *handle)
1042 {
1043         return mdd_links_rename(env, mdd_obj, pfid, lname,
1044                                 NULL, NULL, handle, NULL, 0, 0);
1045 }
1046
1047 /** Read the link EA into a temp buffer.
1048  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1049  * A pointer to the buffer is stored in \a ldata::ld_buf.
1050  *
1051  * \retval 0 or error
1052  */
1053 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1054                    struct linkea_data *ldata)
1055 {
1056         int rc;
1057
1058         if (!mdd_object_exists(mdd_obj))
1059                 return -ENODATA;
1060
1061         /* First try a small buf */
1062         LASSERT(env != NULL);
1063         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1064                                                PAGE_CACHE_SIZE);
1065         if (ldata->ld_buf->lb_buf == NULL)
1066                 return -ENOMEM;
1067
1068         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1069                           BYPASS_CAPA);
1070         if (rc == -ERANGE) {
1071                 /* Buf was too small, figure out what we need. */
1072                 lu_buf_free(ldata->ld_buf);
1073                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1074                                    XATTR_NAME_LINK, BYPASS_CAPA);
1075                 if (rc < 0)
1076                         return rc;
1077                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1078                 if (ldata->ld_buf->lb_buf == NULL)
1079                         return -ENOMEM;
1080                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1081                                   XATTR_NAME_LINK, BYPASS_CAPA);
1082         }
1083         if (rc < 0) {
1084                 lu_buf_free(ldata->ld_buf);
1085                 ldata->ld_buf = NULL;
1086                 return rc;
1087         }
1088
1089         return linkea_init(ldata);
1090 }
1091
1092 /** Read the link EA into a temp buffer.
1093  * Uses the name_buf since it is generally large.
1094  * \retval IS_ERR err
1095  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1096  */
1097 struct lu_buf *mdd_links_get(const struct lu_env *env,
1098                              struct mdd_object *mdd_obj)
1099 {
1100         struct linkea_data ldata = { NULL };
1101         int rc;
1102
1103         rc = mdd_links_read(env, mdd_obj, &ldata);
1104         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1105 }
1106
1107 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1108                     struct linkea_data *ldata, struct thandle *handle)
1109 {
1110         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1111                                                      ldata->ld_leh->leh_len);
1112         int                 rc;
1113
1114         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1115                 return 0;
1116
1117         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1118                            mdd_object_capa(env, mdd_obj));
1119         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1120             mdd_object_remote(mdd_obj) == 0) {
1121                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1122
1123                 /* XXX: If the linkEA is overflow, then we need to notify the
1124                  *      namespace LFSCK to skip "nlink" attribute verification
1125                  *      on this object to avoid the "nlink" to be shrinked by
1126                  *      wrong. It may be not good an interaction with LFSCK
1127                  *      like this. We will consider to replace it with other
1128                  *      mechanism in future. LU-5802. */
1129                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1130                                LFSCK_TYPE_NAMESPACE);
1131                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1132                                 lr, handle);
1133         }
1134
1135         return rc;
1136 }
1137
1138 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1139                           struct thandle *handle, struct linkea_data *ldata,
1140                           enum mdd_links_add_overflow overflow)
1141 {
1142         int     rc;
1143         int     ea_len;
1144         void    *linkea;
1145
1146         if (ldata != NULL && ldata->ld_leh != NULL) {
1147                 ea_len = ldata->ld_leh->leh_len;
1148                 linkea = ldata->ld_buf->lb_buf;
1149         } else {
1150                 ea_len = DEFAULT_LINKEA_SIZE;
1151                 linkea = NULL;
1152         }
1153
1154         /* XXX: max size? */
1155         rc = mdo_declare_xattr_set(env, mdd_obj,
1156                                    mdd_buf_get_const(env, linkea, ea_len),
1157                                    XATTR_NAME_LINK, 0, handle);
1158         if (rc != 0)
1159                 return rc;
1160
1161         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1162                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1163
1164                 /* XXX: If the linkEA is overflow, then we need to notify the
1165                  *      namespace LFSCK to skip "nlink" attribute verification
1166                  *      on this object to avoid the "nlink" to be shrinked by
1167                  *      wrong. It may be not good an interaction with LFSCK
1168                  *      like this. We will consider to replace it with other
1169                  *      mechanism in future. LU-5802. */
1170                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1171                                LFSCK_TYPE_NAMESPACE);
1172                 rc = lfsck_in_notify(env,
1173                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1174                                      lr, handle);
1175         }
1176
1177         return rc;
1178 }
1179
1180 static inline int mdd_declare_links_del(const struct lu_env *env,
1181                                         struct mdd_object *c,
1182                                         struct thandle *handle)
1183 {
1184         int rc = 0;
1185
1186         /* For directory, the linkEA will be removed together
1187          * with the object. */
1188         if (!S_ISDIR(mdd_object_type(c)))
1189                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1190
1191         return rc;
1192 }
1193
1194 static int mdd_declare_link(const struct lu_env *env,
1195                             struct mdd_device *mdd,
1196                             struct mdd_object *p,
1197                             struct mdd_object *c,
1198                             const struct lu_name *name,
1199                             struct thandle *handle,
1200                             struct lu_attr *la,
1201                             struct linkea_data *data)
1202 {
1203         int rc;
1204
1205         rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
1206                                       name->ln_name, handle);
1207         if (rc != 0)
1208                 return rc;
1209
1210         rc = mdo_declare_ref_add(env, c, handle);
1211         if (rc != 0)
1212                 return rc;
1213
1214         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1215                 rc = mdo_declare_ref_add(env, c, handle);
1216                 if (rc != 0)
1217                         return rc;
1218         }
1219
1220         la->la_valid = LA_CTIME | LA_MTIME;
1221         rc = mdo_declare_attr_set(env, p, la, handle);
1222         if (rc != 0)
1223                 return rc;
1224
1225         la->la_valid = LA_CTIME;
1226         rc = mdo_declare_attr_set(env, c, la, handle);
1227         if (rc != 0)
1228                 return rc;
1229
1230         rc = mdd_declare_links_add(env, c, handle, data,
1231                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1232         if (rc != 0)
1233                 return rc;
1234
1235         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1236
1237         return rc;
1238 }
1239
1240 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1241                     struct md_object *src_obj, const struct lu_name *lname,
1242                     struct md_attr *ma)
1243 {
1244         const char *name = lname->ln_name;
1245         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1246         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1247         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1248         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1249         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1250         struct mdd_device *mdd = mdo2mdd(src_obj);
1251         struct thandle *handle;
1252         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1253         int rc;
1254         ENTRY;
1255
1256         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
1257         if (rc != 0)
1258                 RETURN(rc);
1259
1260         rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
1261         if (rc != 0)
1262                 RETURN(rc);
1263
1264         handle = mdd_trans_create(env, mdd);
1265         if (IS_ERR(handle))
1266                 GOTO(out_pending, rc = PTR_ERR(handle));
1267
1268         memset(ldata, 0, sizeof(*ldata));
1269
1270         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1271         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1272
1273         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1274                               la, ldata);
1275         if (rc)
1276                 GOTO(stop, rc);
1277
1278         rc = mdd_trans_start(env, mdd, handle);
1279         if (rc)
1280                 GOTO(stop, rc);
1281
1282         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1283         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1284                                    cattr);
1285         if (rc)
1286                 GOTO(out_unlock, rc);
1287
1288         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1289                 rc = mdo_ref_add(env, mdd_sobj, handle);
1290                 if (rc != 0)
1291                         GOTO(out_unlock, rc);
1292         }
1293
1294         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1295                 rc = mdo_ref_add(env, mdd_sobj, handle);
1296                 if (rc != 0)
1297                         GOTO(out_unlock, rc);
1298         }
1299
1300         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
1301                 struct lu_fid tfid = *mdo2fid(mdd_sobj);
1302
1303                 tfid.f_oid++;
1304                 rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
1305                                              mdd_object_type(mdd_sobj),
1306                                              name, handle,
1307                                              mdd_object_capa(env, mdd_tobj));
1308         } else {
1309                 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1310                                              mdd_object_type(mdd_sobj),
1311                                              name, handle,
1312                                              mdd_object_capa(env, mdd_tobj));
1313         }
1314
1315         if (rc != 0) {
1316                 mdo_ref_del(env, mdd_sobj, handle);
1317                 GOTO(out_unlock, rc);
1318         }
1319
1320         la->la_valid = LA_CTIME | LA_MTIME;
1321         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1322         if (rc)
1323                 GOTO(out_unlock, rc);
1324
1325         la->la_valid = LA_CTIME;
1326         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1327         if (rc == 0) {
1328                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1329                                         mdo2fid(mdd_tobj), lname, 0, 0,
1330                                         ldata);
1331                 if (rc == 0)
1332                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1333                                       lname, handle, ldata, 0);
1334                 /* The failure of links_add should not cause the link
1335                  * failure, reset rc here */
1336                 rc = 0;
1337         }
1338         EXIT;
1339 out_unlock:
1340         mdd_write_unlock(env, mdd_sobj);
1341         if (rc == 0)
1342                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1343                                             mdo2fid(mdd_tobj), NULL, NULL,
1344                                             lname, NULL, handle);
1345 stop:
1346         mdd_trans_stop(env, mdd, rc, handle);
1347
1348         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1349                 /* if we vmalloced a large buffer drop it */
1350                 lu_buf_free(ldata->ld_buf);
1351 out_pending:
1352         return rc;
1353 }
1354
1355 static int mdd_mark_dead_object(const struct lu_env *env,
1356                                 struct mdd_object *obj, struct thandle *handle,
1357                                 bool declare)
1358 {
1359         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1360         int rc;
1361
1362         if (!declare)
1363                 obj->mod_flags |= DEAD_OBJ;
1364
1365         if (!S_ISDIR(mdd_object_type(obj)))
1366                 return 0;
1367
1368         attr->la_valid = LA_FLAGS;
1369         attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
1370
1371         if (declare)
1372                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1373         else
1374                 rc = mdo_attr_set(env, obj, attr, handle,
1375                                   mdd_object_capa(env, obj));
1376
1377         return rc;
1378 }
1379
1380 static int mdd_declare_finish_unlink(const struct lu_env *env,
1381                                      struct mdd_object *obj,
1382                                      struct thandle *handle)
1383 {
1384         int     rc;
1385
1386         rc = mdd_mark_dead_object(env, obj, handle, true);
1387         if (rc != 0)
1388                 return rc;
1389
1390         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1391         if (rc != 0)
1392                 return rc;
1393
1394         rc = mdo_declare_destroy(env, obj, handle);
1395         if (rc != 0)
1396                 return rc;
1397
1398         return mdd_declare_links_del(env, obj, handle);
1399 }
1400
1401 /* caller should take a lock before calling */
1402 int mdd_finish_unlink(const struct lu_env *env,
1403                       struct mdd_object *obj, struct md_attr *ma,
1404                       const struct mdd_object *pobj,
1405                       const struct lu_name *lname,
1406                       struct thandle *th)
1407 {
1408         int rc = 0;
1409         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1410         ENTRY;
1411
1412         LASSERT(mdd_write_locked(env, obj) != 0);
1413
1414         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1415                 rc = mdd_mark_dead_object(env, obj, th, false);
1416                 if (rc != 0)
1417                         RETURN(rc);
1418
1419                 /* add new orphan and the object
1420                  * will be deleted during mdd_close() */
1421                 if (obj->mod_count) {
1422                         rc = __mdd_orphan_add(env, obj, th);
1423                         if (rc == 0)
1424                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1425                                         "orphan list, open count = %d\n",
1426                                         PFID(mdd_object_fid(obj)),
1427                                         obj->mod_count);
1428                         else
1429                                 CERROR("Object "DFID" fail to be an orphan, "
1430                                        "open count = %d, maybe cause failed "
1431                                        "open replay\n",
1432                                         PFID(mdd_object_fid(obj)),
1433                                         obj->mod_count);
1434                 } else {
1435                         rc = mdo_destroy(env, obj, th);
1436                 }
1437         } else if (!is_dir) {
1438                 /* old files may not have link ea; ignore errors */
1439                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1440         }
1441
1442         RETURN(rc);
1443 }
1444
1445 /*
1446  * pobj maybe NULL
1447  * has mdd_write_lock on cobj already, but not on pobj yet
1448  */
1449 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1450                             const struct lu_attr *pattr,
1451                             struct mdd_object *cobj,
1452                             const struct lu_attr *cattr)
1453 {
1454         int rc;
1455         ENTRY;
1456
1457         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1458
1459         RETURN(rc);
1460 }
1461
1462 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1463                               struct mdd_object *p, struct mdd_object *c,
1464                               const struct lu_name *name, struct md_attr *ma,
1465                               struct thandle *handle, int no_name, int is_dir)
1466 {
1467         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1468         int              rc;
1469
1470         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1471                 if (likely(no_name == 0)) {
1472                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1473                                                       handle);
1474                         if (rc != 0)
1475                                 return rc;
1476                 }
1477
1478                 if (is_dir != 0) {
1479                         rc = mdo_declare_ref_del(env, p, handle);
1480                         if (rc != 0)
1481                                 return rc;
1482                 }
1483         }
1484
1485         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1486         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1487         la->la_valid = LA_CTIME | LA_MTIME;
1488         rc = mdo_declare_attr_set(env, p, la, handle);
1489         if (rc)
1490                 return rc;
1491
1492         if (c != NULL) {
1493                 rc = mdo_declare_ref_del(env, c, handle);
1494                 if (rc)
1495                         return rc;
1496
1497                 rc = mdo_declare_ref_del(env, c, handle);
1498                 if (rc)
1499                         return rc;
1500
1501                 la->la_valid = LA_CTIME;
1502                 rc = mdo_declare_attr_set(env, c, la, handle);
1503                 if (rc)
1504                         return rc;
1505
1506                 rc = mdd_declare_finish_unlink(env, c, handle);
1507                 if (rc)
1508                         return rc;
1509
1510                 /* FIXME: need changelog for remove entry */
1511                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1512         }
1513
1514         return rc;
1515 }
1516
1517 /*
1518  * test if a file has an HSM archive
1519  * if HSM attributes are not found in ma update them from
1520  * HSM xattr
1521  */
1522 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1523                                    struct mdd_object *obj,
1524                                    struct md_attr *ma)
1525 {
1526         ENTRY;
1527
1528         if (!(ma->ma_valid & MA_HSM)) {
1529                 /* no HSM MD provided, read xattr */
1530                 struct lu_buf   *hsm_buf;
1531                 const size_t     buflen = sizeof(struct hsm_attrs);
1532                 int              rc;
1533
1534                 hsm_buf = mdd_buf_get(env, NULL, 0);
1535                 lu_buf_alloc(hsm_buf, buflen);
1536                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM,
1537                                    mdd_object_capa(env, obj));
1538                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1539                 lu_buf_free(hsm_buf);
1540                 if (rc < 0)
1541                         RETURN(false);
1542
1543                 ma->ma_valid = MA_HSM;
1544         }
1545         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1546                 RETURN(true);
1547         RETURN(false);
1548 }
1549
1550 /**
1551  * Delete name entry and the object.
1552  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1553  * does not exist for this object, and it could only happen during resending
1554  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1555  * is also needed in this case(needed by changelog), so we have to add another
1556  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1557  * the ENOENT failure should be able to be fixed by redo mechanism.
1558  */
1559 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1560                       struct md_object *cobj, const struct lu_name *lname,
1561                       struct md_attr *ma, int no_name)
1562 {
1563         const char *name = lname->ln_name;
1564         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1565         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1566         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1567         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1568         struct mdd_object *mdd_cobj = NULL;
1569         struct mdd_device *mdd = mdo2mdd(pobj);
1570         struct thandle    *handle;
1571         int rc, is_dir = 0;
1572         ENTRY;
1573
1574         /* cobj == NULL means only delete name entry */
1575         if (likely(cobj != NULL)) {
1576                 mdd_cobj = md2mdd_obj(cobj);
1577                 if (mdd_object_exists(mdd_cobj) == 0)
1578                         RETURN(-ENOENT);
1579         }
1580
1581         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
1582         if (rc)
1583                 RETURN(rc);
1584
1585         if (likely(mdd_cobj != NULL)) {
1586                 /* fetch cattr */
1587                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1588                 if (rc)
1589                         RETURN(rc);
1590
1591                 is_dir = S_ISDIR(cattr->la_mode);
1592         }
1593
1594         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1595         if (rc)
1596                 RETURN(rc);
1597
1598         handle = mdd_trans_create(env, mdd);
1599         if (IS_ERR(handle))
1600                 RETURN(PTR_ERR(handle));
1601
1602         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1603                                 lname, ma, handle, no_name, is_dir);
1604         if (rc)
1605                 GOTO(stop, rc);
1606
1607         rc = mdd_trans_start(env, mdd, handle);
1608         if (rc)
1609                 GOTO(stop, rc);
1610
1611         if (likely(mdd_cobj != NULL))
1612                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1613
1614         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1615                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1616                                         mdd_object_capa(env, mdd_pobj));
1617                 if (rc)
1618                         GOTO(cleanup, rc);
1619         }
1620
1621         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1622             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1623                 GOTO(cleanup, rc = 0);
1624
1625         if (likely(mdd_cobj != NULL)) {
1626                 rc = mdo_ref_del(env, mdd_cobj, handle);
1627                 if (rc != 0) {
1628                         __mdd_index_insert_only(env, mdd_pobj,
1629                                                 mdo2fid(mdd_cobj),
1630                                                 mdd_object_type(mdd_cobj),
1631                                                 name, handle,
1632                                                 mdd_object_capa(env, mdd_pobj));
1633                         GOTO(cleanup, rc);
1634                 }
1635
1636                 if (is_dir)
1637                         /* unlink dot */
1638                         mdo_ref_del(env, mdd_cobj, handle);
1639
1640                 /* fetch updated nlink */
1641                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1642                 if (rc)
1643                         GOTO(cleanup, rc);
1644         }
1645
1646         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1647         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1648
1649         la->la_valid = LA_CTIME | LA_MTIME;
1650         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1651         if (rc)
1652                 GOTO(cleanup, rc);
1653
1654         /* Enough for only unlink the entry */
1655         if (unlikely(mdd_cobj == NULL))
1656                 GOTO(stop, rc);
1657
1658         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1659                 /* update ctime of an unlinked file only if it is still
1660                  * opened or a link still exists */
1661                 la->la_valid = LA_CTIME;
1662                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1663                 if (rc)
1664                         GOTO(cleanup, rc);
1665         }
1666
1667         /* XXX: this transfer to ma will be removed with LOD/OSP */
1668         ma->ma_attr = *cattr;
1669         ma->ma_valid |= MA_INODE;
1670         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1671
1672         /* fetch updated nlink */
1673         if (rc == 0)
1674                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1675
1676         /* if object is removed then we can't get its attrs, use last get */
1677         if (cattr->la_nlink == 0) {
1678                 ma->ma_attr = *cattr;
1679                 ma->ma_valid |= MA_INODE;
1680         }
1681         EXIT;
1682 cleanup:
1683         if (likely(mdd_cobj != NULL))
1684                 mdd_write_unlock(env, mdd_cobj);
1685
1686         if (rc == 0) {
1687                 int cl_flags = 0;
1688
1689                 if (cattr->la_nlink == 0) {
1690                         cl_flags |= CLF_UNLINK_LAST;
1691                         /* search for an existing archive */
1692                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1693                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1694                 }
1695
1696                 rc = mdd_changelog_ns_store(env, mdd,
1697                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1698                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1699                         handle);
1700         }
1701
1702 stop:
1703         mdd_trans_stop(env, mdd, rc, handle);
1704
1705         return rc;
1706 }
1707
1708 /*
1709  * The permission has been checked when obj created, no need check again.
1710  */
1711 static int mdd_cd_sanity_check(const struct lu_env *env,
1712                                struct mdd_object *obj)
1713 {
1714         ENTRY;
1715
1716         /* EEXIST check */
1717         if (!obj || mdd_is_dead_obj(obj))
1718                 RETURN(-ENOENT);
1719
1720         RETURN(0);
1721 }
1722
1723 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1724                            struct md_object *cobj, const struct md_op_spec *spec,
1725                            struct md_attr *ma)
1726 {
1727         struct mdd_device *mdd = mdo2mdd(cobj);
1728         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1729         struct mdd_object *son = md2mdd_obj(cobj);
1730         struct thandle    *handle;
1731         const struct lu_buf *buf;
1732         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1733         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1734         int                rc;
1735         ENTRY;
1736
1737         rc = mdd_cd_sanity_check(env, son);
1738         if (rc)
1739                 RETURN(rc);
1740
1741         if (!md_should_create(spec->sp_cr_flags))
1742                 RETURN(0);
1743
1744         /*
1745          * there are following use cases for this function:
1746          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1747          *    striping can be specified or not
1748          * 2) CMD?
1749          */
1750         rc = mdd_la_get(env, son, attr, BYPASS_CAPA);
1751         if (rc)
1752                 RETURN(rc);
1753
1754         /* calling ->ah_make_hint() is used to transfer information from parent */
1755         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1756
1757         handle = mdd_trans_create(env, mdd);
1758         if (IS_ERR(handle))
1759                 GOTO(out_free, rc = PTR_ERR(handle));
1760
1761         /*
1762          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1763          * Should this be fixed?
1764          */
1765         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1766                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1767                spec->sp_cr_flags, spec->no_create);
1768
1769         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1770                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1771                                         spec->u.sp_ea.eadatalen);
1772         } else {
1773                 buf = &LU_BUF_NULL;
1774         }
1775
1776         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1777                                   XATTR_NAME_LOV, 0, handle);
1778         if (rc)
1779                 GOTO(stop, rc);
1780
1781         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1782         if (rc)
1783                 GOTO(stop, rc);
1784
1785         rc = mdd_trans_start(env, mdd, handle);
1786         if (rc)
1787                 GOTO(stop, rc);
1788
1789         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1790                           0, handle, mdd_object_capa(env, son));
1791
1792         if (rc)
1793                 GOTO(stop, rc);
1794
1795         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1796
1797 stop:
1798         mdd_trans_stop(env, mdd, rc, handle);
1799 out_free:
1800         RETURN(rc);
1801 }
1802
1803 static int mdd_declare_object_initialize(const struct lu_env *env,
1804                                          struct mdd_object *parent,
1805                                          struct mdd_object *child,
1806                                          struct lu_attr *attr,
1807                                          struct thandle *handle)
1808 {
1809         int rc;
1810         ENTRY;
1811
1812         /*
1813          * inode mode has been set in creation time, and it's based on umask,
1814          * la_mode and acl, don't set here again! (which will go wrong
1815          * because below function doesn't consider umask).
1816          * I'd suggest set all object attributes in creation time, see above.
1817          */
1818         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1819         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1820         rc = mdo_declare_attr_set(env, child, attr, handle);
1821         attr->la_valid |= LA_MODE | LA_TYPE;
1822         if (rc != 0 || !S_ISDIR(attr->la_mode))
1823                 RETURN(rc);
1824
1825         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1826                                       dot, handle);
1827         if (rc != 0)
1828                 RETURN(rc);
1829
1830         rc = mdo_declare_ref_add(env, child, handle);
1831         if (rc != 0)
1832                 RETURN(rc);
1833
1834         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1835                                       dotdot, handle);
1836
1837         RETURN(rc);
1838 }
1839
1840 static int mdd_object_initialize(const struct lu_env *env,
1841                                  const struct lu_fid *pfid,
1842                                  struct mdd_object *child,
1843                                  struct lu_attr *attr, struct thandle *handle,
1844                                  const struct md_op_spec *spec)
1845 {
1846         int rc = 0;
1847         ENTRY;
1848
1849         if (S_ISDIR(attr->la_mode)) {
1850                 /* Add "." and ".." for newly created dir */
1851                 mdo_ref_add(env, child, handle);
1852                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1853                                              S_IFDIR, dot, handle, BYPASS_CAPA);
1854                 if (rc == 0)
1855                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1856                                                      dotdot, handle,
1857                                                      BYPASS_CAPA);
1858                 if (rc != 0)
1859                         mdo_ref_del(env, child, handle);
1860         }
1861
1862         RETURN(rc);
1863 }
1864
1865 /**
1866  * This function checks whether it can create a file/dir under the
1867  * directory(@pobj). The directory(@pobj) is not being locked by
1868  * mdd lock.
1869  *
1870  * \param[in] env       execution environment
1871  * \param[in] pobj      the directory to create files
1872  * \param[in] pattr     the attributes of the directory
1873  * \param[in] lname     the name of the created file/dir
1874  * \param[in] cattr     the attributes of the file/dir
1875  * \param[in] spec      create specification
1876  *
1877  * \retval              = 0 it is allowed to create file/dir under
1878  *                      the directory
1879  * \retval              negative error not allowed to create file/dir
1880  *                      under the directory
1881  */
1882 static int mdd_create_sanity_check(const struct lu_env *env,
1883                                    struct md_object *pobj,
1884                                    const struct lu_attr *pattr,
1885                                    const struct lu_name *lname,
1886                                    struct lu_attr *cattr,
1887                                    struct md_op_spec *spec)
1888 {
1889         struct mdd_thread_info *info = mdd_env_info(env);
1890         struct lu_fid     *fid       = &info->mti_fid;
1891         struct mdd_object *obj       = md2mdd_obj(pobj);
1892         struct mdd_device *m         = mdo2mdd(pobj);
1893         bool            check_perm = true;
1894         int rc;
1895         ENTRY;
1896
1897         /* EEXIST check */
1898         if (mdd_is_dead_obj(obj))
1899                 RETURN(-ENOENT);
1900
1901         /*
1902          * In some cases this lookup is not needed - we know before if name
1903          * exists or not because MDT performs lookup for it.
1904          * name length check is done in lookup.
1905          */
1906         if (spec->sp_cr_lookup) {
1907                 /*
1908                  * Check if the name already exist, though it will be checked in
1909                  * _index_insert also, for avoiding rolling back if exists
1910                  * _index_insert.
1911                  */
1912                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1913                                   MAY_WRITE | MAY_EXEC);
1914                 if (rc != -ENOENT)
1915                         RETURN(rc ? : -EEXIST);
1916
1917                 /* Permission is already being checked in mdd_lookup */
1918                 check_perm = false;
1919         }
1920
1921         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1922         if (rc != 0)
1923                 RETURN(rc);
1924
1925         /* sgid check */
1926         if (pattr->la_mode & S_ISGID) {
1927                 cattr->la_gid = pattr->la_gid;
1928                 if (S_ISDIR(cattr->la_mode)) {
1929                         cattr->la_mode |= S_ISGID;
1930                         cattr->la_valid |= LA_MODE;
1931                 }
1932         }
1933
1934         rc = mdd_name_check(m, lname);
1935         if (rc < 0)
1936                 RETURN(rc);
1937
1938         switch (cattr->la_mode & S_IFMT) {
1939         case S_IFLNK: {
1940                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1941
1942                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1943                         RETURN(-ENAMETOOLONG);
1944                 else
1945                         RETURN(0);
1946         }
1947         case S_IFDIR:
1948         case S_IFREG:
1949         case S_IFCHR:
1950         case S_IFBLK:
1951         case S_IFIFO:
1952         case S_IFSOCK:
1953                 rc = 0;
1954                 break;
1955         default:
1956                 rc = -EINVAL;
1957                 break;
1958         }
1959         RETURN(rc);
1960 }
1961
1962 static int mdd_declare_object_create(const struct lu_env *env,
1963                                      struct mdd_device *mdd,
1964                                      struct mdd_object *p, struct mdd_object *c,
1965                                      struct lu_attr *attr,
1966                                      struct thandle *handle,
1967                                      const struct md_op_spec *spec,
1968                                      struct lu_buf *def_acl_buf,
1969                                      struct lu_buf *acl_buf,
1970                                      struct dt_allocation_hint *hint)
1971 {
1972         int rc;
1973
1974         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
1975                                                 hint);
1976         if (rc)
1977                 GOTO(out, rc);
1978
1979 #ifdef CONFIG_FS_POSIX_ACL
1980         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
1981                 /* if dir, then can inherit default ACl */
1982                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
1983                                            XATTR_NAME_ACL_DEFAULT,
1984                                            0, handle);
1985                 if (rc)
1986                         GOTO(out, rc);
1987         }
1988
1989         if (acl_buf->lb_len > 0) {
1990                 rc = mdo_declare_attr_set(env, c, attr, handle);
1991                 if (rc)
1992                         GOTO(out, rc);
1993
1994                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1995                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1996                 if (rc)
1997                         GOTO(out, rc);
1998         }
1999 #endif
2000         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2001         if (rc)
2002                 GOTO(out, rc);
2003
2004         /* replay case, create LOV EA from client data */
2005         if (spec->no_create ||
2006             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2007                 const struct lu_buf *buf;
2008
2009                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2010                                         spec->u.sp_ea.eadatalen);
2011                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2012                                            handle);
2013                 if (rc)
2014                         GOTO(out, rc);
2015         }
2016
2017         if (S_ISLNK(attr->la_mode)) {
2018                 const char *target_name = spec->u.sp_symname;
2019                 int sym_len = strlen(target_name);
2020                 const struct lu_buf *buf;
2021
2022                 buf = mdd_buf_get_const(env, target_name, sym_len);
2023                 rc = dt_declare_record_write(env, mdd_object_child(c),
2024                                              buf, 0, handle);
2025                 if (rc)
2026                         GOTO(out, rc);
2027         }
2028 out:
2029         return rc;
2030 }
2031
2032 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2033                               struct mdd_object *p, struct mdd_object *c,
2034                               const struct lu_name *name,
2035                               struct lu_attr *attr,
2036                               struct thandle *handle,
2037                               const struct md_op_spec *spec,
2038                               struct linkea_data *ldata,
2039                               struct lu_buf *def_acl_buf,
2040                               struct lu_buf *acl_buf,
2041                               struct dt_allocation_hint *hint)
2042 {
2043         int rc;
2044
2045         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2046                                        def_acl_buf, acl_buf, hint);
2047         if (rc)
2048                 GOTO(out, rc);
2049
2050         if (S_ISDIR(attr->la_mode)) {
2051                 rc = mdo_declare_ref_add(env, p, handle);
2052                 if (rc)
2053                         GOTO(out, rc);
2054         }
2055
2056         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2057                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2058                 if (rc)
2059                         GOTO(out, rc);
2060         } else {
2061                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2062
2063                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2064                                               name->ln_name, handle);
2065                 if (rc != 0)
2066                         return rc;
2067
2068                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2069                 if (rc)
2070                         return rc;
2071
2072                 *la = *attr;
2073                 la->la_valid = LA_CTIME | LA_MTIME;
2074                 rc = mdo_declare_attr_set(env, p, la, handle);
2075                 if (rc)
2076                         return rc;
2077
2078                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2079                 if (rc)
2080                         return rc;
2081         }
2082
2083         /* XXX: For remote create, it should indicate the remote RPC
2084          * will be sent after local transaction is finished, which
2085          * is not very nice, but it will be removed once we fully support
2086          * async update */
2087         if (mdd_object_remote(p) && handle->th_update != NULL)
2088                 handle->th_update->tu_sent_after_local_trans = 1;
2089 out:
2090         return rc;
2091 }
2092
2093 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2094                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2095                         struct lu_buf *acl_buf)
2096 {
2097         int     rc;
2098         ENTRY;
2099
2100         if (S_ISLNK(la->la_mode)) {
2101                 acl_buf->lb_len = 0;
2102                 def_acl_buf->lb_len = 0;
2103                 RETURN(0);
2104         }
2105
2106         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2107         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2108                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
2109         mdd_read_unlock(env, pobj);
2110         if (rc > 0) {
2111                 /* If there are default ACL, fix mode/ACL by default ACL */
2112                 def_acl_buf->lb_len = rc;
2113                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2114                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2115                 acl_buf->lb_len = rc;
2116                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2117                 if (rc < 0)
2118                         RETURN(rc);
2119         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2120                 /* If there are no default ACL, fix mode by mask */
2121                 struct lu_ucred *uc = lu_ucred(env);
2122
2123                 /* The create triggered by MDT internal events, such as
2124                  * LFSCK reset, will not contain valid "uc". */
2125                 if (unlikely(uc != NULL))
2126                         la->la_mode &= ~uc->uc_umask;
2127                 rc = 0;
2128                 acl_buf->lb_len = 0;
2129                 def_acl_buf->lb_len = 0;
2130         }
2131
2132         RETURN(rc);
2133 }
2134
2135 /**
2136  * Create a metadata object and initialize it, set acl, xattr.
2137  **/
2138 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2139                              struct mdd_object *son, struct lu_attr *attr,
2140                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2141                              struct lu_buf *def_acl_buf,
2142                              struct dt_allocation_hint *hint,
2143                              struct thandle *handle)
2144 {
2145         int                     rc;
2146
2147         mdd_write_lock(env, son, MOR_TGT_CHILD);
2148         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2149                                         hint);
2150         if (rc)
2151                 GOTO(unlock, rc);
2152
2153         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2154          * created in declare phase, they also needs to be added to master
2155          * object as sub-directory entry. So it has to initialize the master
2156          * object, then set dir striped EA.(in mdo_xattr_set) */
2157         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2158                                    spec);
2159         if (rc != 0)
2160                 GOTO(err_destroy, rc);
2161
2162         /*
2163          * in case of replay we just set LOVEA provided by the client
2164          * XXX: I think it would be interesting to try "old" way where
2165          *      MDT calls this xattr_set(LOV) in a different transaction.
2166          *      probably this way we code can be made better.
2167          */
2168
2169         /* During creation, there are only a few cases we need do xattr_set to
2170          * create stripes.
2171          * 1. regular file: see comments above.
2172          * 2. dir: inherit default striping or pool settings from parent.
2173          * 3. create striped directory with provided stripeEA.
2174          * 4. create striped directory because inherit default layout from the
2175          * parent.
2176          */
2177         if (spec->no_create ||
2178             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2179             S_ISDIR(attr->la_mode)) {
2180                 const struct lu_buf *buf;
2181
2182                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2183                                         spec->u.sp_ea.eadatalen);
2184                 rc = mdo_xattr_set(env, son, buf,
2185                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2186                                                             XATTR_NAME_LOV, 0,
2187                                    handle, BYPASS_CAPA);
2188                 if (rc != 0)
2189                         GOTO(err_destroy, rc);
2190         }
2191
2192 #ifdef CONFIG_FS_POSIX_ACL
2193         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2194             S_ISDIR(attr->la_mode)) {
2195                 /* set default acl */
2196                 rc = mdo_xattr_set(env, son, def_acl_buf,
2197                                    XATTR_NAME_ACL_DEFAULT, 0,
2198                                    handle, BYPASS_CAPA);
2199                 if (rc)
2200                         GOTO(err_destroy, rc);
2201         }
2202         /* set its own acl */
2203         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2204                 rc = mdo_xattr_set(env, son, acl_buf,
2205                                    XATTR_NAME_ACL_ACCESS,
2206                                    0, handle, BYPASS_CAPA);
2207                 if (rc)
2208                         GOTO(err_destroy, rc);
2209         }
2210 #endif
2211
2212         if (S_ISLNK(attr->la_mode)) {
2213                 struct lu_ucred  *uc = lu_ucred_assert(env);
2214                 struct dt_object *dt = mdd_object_child(son);
2215                 const char *target_name = spec->u.sp_symname;
2216                 int sym_len = strlen(target_name);
2217                 const struct lu_buf *buf;
2218                 loff_t pos = 0;
2219
2220                 buf = mdd_buf_get_const(env, target_name, sym_len);
2221                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2222                                                 mdd_object_capa(env, son),
2223                                                 uc->uc_cap &
2224                                                 CFS_CAP_SYS_RESOURCE_MASK);
2225
2226                 if (rc == sym_len)
2227                         rc = 0;
2228                 else
2229                         GOTO(err_initlized, rc = -EFAULT);
2230         }
2231
2232 err_initlized:
2233         if (unlikely(rc != 0)) {
2234                 int rc2;
2235                 if (S_ISDIR(attr->la_mode)) {
2236                         /* Drop the reference, no need to delete "."/"..",
2237                          * because the object to be destroied directly. */
2238                         rc2 = mdo_ref_del(env, son, handle);
2239                         if (rc2 != 0)
2240                                 GOTO(unlock, rc);
2241                 }
2242                 rc2 = mdo_ref_del(env, son, handle);
2243                 if (rc2 != 0)
2244                         GOTO(unlock, rc);
2245 err_destroy:
2246                 mdo_destroy(env, son, handle);
2247         }
2248 unlock:
2249         mdd_write_unlock(env, son);
2250         RETURN(rc);
2251 }
2252
2253 /*
2254  * Create object and insert it into namespace.
2255  */
2256 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2257                       const struct lu_name *lname, struct md_object *child,
2258                       struct md_op_spec *spec, struct md_attr* ma)
2259 {
2260         struct mdd_thread_info  *info = mdd_env_info(env);
2261         struct lu_attr          *la = &info->mti_la_for_fix;
2262         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2263         struct mdd_object       *son = md2mdd_obj(child);
2264         struct mdd_device       *mdd = mdo2mdd(pobj);
2265         struct lu_attr          *attr = &ma->ma_attr;
2266         struct thandle          *handle;
2267         struct lu_attr          *pattr = &info->mti_pattr;
2268         struct lu_buf           acl_buf;
2269         struct lu_buf           def_acl_buf;
2270         struct linkea_data      *ldata = &info->mti_link_data;
2271         const char              *name = lname->ln_name;
2272         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2273         int                      rc;
2274         int                      rc2;
2275         ENTRY;
2276
2277         /*
2278          * Two operations have to be performed:
2279          *
2280          *  - an allocation of a new object (->do_create()), and
2281          *
2282          *  - an insertion into a parent index (->dio_insert()).
2283          *
2284          * Due to locking, operation order is not important, when both are
2285          * successful, *but* error handling cases are quite different:
2286          *
2287          *  - if insertion is done first, and following object creation fails,
2288          *  insertion has to be rolled back, but this operation might fail
2289          *  also leaving us with dangling index entry.
2290          *
2291          *  - if creation is done first, is has to be undone if insertion
2292          *  fails, leaving us with leaked space, which is neither good, nor
2293          *  fatal.
2294          *
2295          * It seems that creation-first is simplest solution, but it is
2296          * sub-optimal in the frequent
2297          *
2298          *         $ mkdir foo
2299          *         $ mkdir foo
2300          *
2301          * case, because second mkdir is bound to create object, only to
2302          * destroy it immediately.
2303          *
2304          * To avoid this follow local file systems that do double lookup:
2305          *
2306          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2307          *
2308          *     1. create            (mdd_object_create_internal())
2309          *
2310          *     2. insert            (__mdd_index_insert(), lookup again)
2311          */
2312
2313         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2314         if (rc != 0)
2315                 RETURN(rc);
2316
2317         /* Sanity checks before big job. */
2318         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2319         if (rc)
2320                 RETURN(rc);
2321
2322         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2323                 GOTO(out_free, rc = -EINPROGRESS);
2324
2325         handle = mdd_trans_create(env, mdd);
2326         if (IS_ERR(handle))
2327                 GOTO(out_free, rc = PTR_ERR(handle));
2328
2329         acl_buf.lb_buf = info->mti_xattr_buf;
2330         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2331         def_acl_buf.lb_buf = info->mti_key;
2332         def_acl_buf.lb_len = sizeof(info->mti_key);
2333         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2334         if (rc < 0)
2335                 GOTO(out_stop, rc);
2336
2337         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2338
2339         memset(ldata, 0, sizeof(*ldata));
2340         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2341                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2342
2343                 tfid.f_oid--;
2344                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2345                                         &tfid, lname, 1, 0, ldata);
2346         } else {
2347                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2348                                         mdd_object_fid(mdd_pobj),
2349                                         lname, 1, 0, ldata);
2350         }
2351
2352         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2353                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2354                                 hint);
2355         if (rc)
2356                 GOTO(out_stop, rc);
2357
2358         rc = mdd_trans_start(env, mdd, handle);
2359         if (rc)
2360                 GOTO(out_stop, rc);
2361
2362         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2363                                &def_acl_buf, hint, handle);
2364         if (rc != 0)
2365                 GOTO(out_stop, rc);
2366
2367         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2368                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2369                 rc = __mdd_orphan_add(env, son, handle);
2370                 GOTO(out_volatile, rc);
2371         } else {
2372                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2373                                         attr->la_mode, name, handle,
2374                                         mdd_object_capa(env, mdd_pobj));
2375                 if (rc != 0)
2376                         GOTO(err_created, rc);
2377
2378                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2379                               ldata, 1);
2380
2381                 /* update parent directory mtime/ctime */
2382                 *la = *attr;
2383                 la->la_valid = LA_CTIME | LA_MTIME;
2384                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2385                 if (rc)
2386                         GOTO(err_insert, rc);
2387         }
2388
2389         EXIT;
2390 err_insert:
2391         if (rc != 0) {
2392                 int rc2;
2393
2394                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2395                         rc2 = __mdd_orphan_del(env, son, handle);
2396                 else
2397                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2398                                                  S_ISDIR(attr->la_mode),
2399                                                  handle, BYPASS_CAPA);
2400                 if (rc2 != 0)
2401                         goto out_stop;
2402
2403 err_created:
2404                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2405                 if (S_ISDIR(attr->la_mode)) {
2406                         /* Drop the reference, no need to delete "."/"..",
2407                          * because the object is to be destroyed directly. */
2408                         rc2 = mdo_ref_del(env, son, handle);
2409                         if (rc2 != 0) {
2410                                 mdd_write_unlock(env, son);
2411                                 goto out_stop;
2412                         }
2413                 }
2414 out_volatile:
2415                 /* For volatile files drop one link immediately, since there is
2416                  * no filename in the namespace, and save any error returned. */
2417                 rc2 = mdo_ref_del(env, son, handle);
2418                 if (rc2 != 0) {
2419                         mdd_write_unlock(env, son);
2420                         if (unlikely(rc == 0))
2421                                 rc = rc2;
2422                         goto out_stop;
2423                 }
2424
2425                 /* Don't destroy the volatile object on success */
2426                 if (likely(rc != 0))
2427                         mdo_destroy(env, son, handle);
2428                 mdd_write_unlock(env, son);
2429         }
2430
2431         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2432             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2433                 rc = mdd_changelog_ns_store(env, mdd,
2434                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2435                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2436                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2437                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2438                                 NULL, handle);
2439 out_stop:
2440         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2441         if (rc == 0)
2442                 rc = rc2;
2443 out_free:
2444         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2445                 /* if we vmalloced a large buffer drop it */
2446                 lu_buf_free(ldata->ld_buf);
2447
2448         /* The child object shouldn't be cached anymore */
2449         if (rc)
2450                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2451                         &child->mo_lu.lo_header->loh_flags);
2452         return rc;
2453 }
2454
2455 /*
2456  * Get locks on parents in proper order
2457  * RETURN: < 0 - error, rename_order if successful
2458  */
2459 enum rename_order {
2460         MDD_RN_SAME,
2461         MDD_RN_SRCTGT,
2462         MDD_RN_TGTSRC
2463 };
2464
2465 static int mdd_rename_order(const struct lu_env *env,
2466                             struct mdd_device *mdd,
2467                             struct mdd_object *src_pobj,
2468                             const struct lu_attr *pattr,
2469                             struct mdd_object *tgt_pobj)
2470 {
2471         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2472         int rc;
2473         ENTRY;
2474
2475         if (src_pobj == tgt_pobj)
2476                 RETURN(MDD_RN_SAME);
2477
2478         /* compared the parent child relationship of src_p&tgt_p */
2479         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2480                 rc = MDD_RN_SRCTGT;
2481         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2482                 rc = MDD_RN_TGTSRC;
2483         } else {
2484                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2485                                    NULL);
2486                 if (rc == -EREMOTE)
2487                         rc = 0;
2488
2489                 if (rc == 1)
2490                         rc = MDD_RN_TGTSRC;
2491                 else if (rc == 0)
2492                         rc = MDD_RN_SRCTGT;
2493         }
2494
2495         RETURN(rc);
2496 }
2497
2498 /* has not mdd_write{read}_lock on any obj yet. */
2499 static int mdd_rename_sanity_check(const struct lu_env *env,
2500                                    struct mdd_object *src_pobj,
2501                                    const struct lu_attr *pattr,
2502                                    struct mdd_object *tgt_pobj,
2503                                    const struct lu_attr *tpattr,
2504                                    struct mdd_object *sobj,
2505                                    const struct lu_attr *cattr,
2506                                    struct mdd_object *tobj,
2507                                    const struct lu_attr *tattr)
2508 {
2509         int rc = 0;
2510         ENTRY;
2511
2512         /* XXX: when get here, sobj must NOT be NULL,
2513          * the other case has been processed in cld_rename
2514          * before mdd_rename and enable MDS_PERM_BYPASS. */
2515         LASSERT(sobj);
2516
2517         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2518         if (rc)
2519                 RETURN(rc);
2520
2521         /* XXX: when get here, "tobj == NULL" means tobj must
2522          * NOT exist (neither on remote MDS, such case has been
2523          * processed in cld_rename before mdd_rename and enable
2524          * MDS_PERM_BYPASS).
2525          * So check may_create, but not check may_unlink. */
2526         if (tobj == NULL)
2527                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2528                                     (src_pobj != tgt_pobj));
2529         else
2530                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2531                                     (src_pobj != tgt_pobj), 1);
2532
2533         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2534                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2535
2536         RETURN(rc);
2537 }
2538
2539 static int mdd_declare_rename(const struct lu_env *env,
2540                               struct mdd_device *mdd,
2541                               struct mdd_object *mdd_spobj,
2542                               struct mdd_object *mdd_tpobj,
2543                               struct mdd_object *mdd_sobj,
2544                               struct mdd_object *mdd_tobj,
2545                               const struct lu_name *tname,
2546                               const struct lu_name *sname,
2547                               struct md_attr *ma,
2548                               struct linkea_data *ldata,
2549                               struct thandle *handle)
2550 {
2551         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2552         int rc;
2553
2554         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2555         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2556
2557         LASSERT(mdd_spobj);
2558         LASSERT(mdd_tpobj);
2559         LASSERT(mdd_sobj);
2560
2561         /* name from source dir */
2562         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2563         if (rc)
2564                 return rc;
2565
2566         /* .. from source child */
2567         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2568                 /* source child can be directory,
2569                  * counted by source dir's nlink */
2570                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2571                 if (rc)
2572                         return rc;
2573                 if (mdd_spobj != mdd_tpobj) {
2574                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2575                                                       handle);
2576                         if (rc != 0)
2577                                 return rc;
2578
2579                         rc = mdo_declare_index_insert(env, mdd_sobj,
2580                                                       mdo2fid(mdd_tpobj),
2581                                                       S_IFDIR, dotdot, handle);
2582                         if (rc != 0)
2583                                 return rc;
2584                 }
2585
2586                 /* new target child can be directory,
2587                  * counted by target dir's nlink */
2588                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2589                 if (rc != 0)
2590                         return rc;
2591         }
2592
2593         la->la_valid = LA_CTIME | LA_MTIME;
2594         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2595         if (rc != 0)
2596                 return rc;
2597
2598         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2599         if (rc != 0)
2600                 return rc;
2601
2602         la->la_valid = LA_CTIME;
2603         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2604         if (rc)
2605                 return rc;
2606
2607         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2608                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2609         if (rc)
2610                 return rc;
2611
2612         /* new name */
2613         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2614                                       mdd_object_type(mdd_sobj),
2615                                       tname->ln_name, handle);
2616         if (rc != 0)
2617                 return rc;
2618
2619         /* name from target dir (old name), we declare it unconditionally
2620          * as mdd_rename() calls delete unconditionally as well. so just
2621          * to balance declarations vs calls to change ... */
2622         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2623         if (rc)
2624                 return rc;
2625
2626         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2627                 /* delete target child in target parent directory */
2628                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2629                 if (rc)
2630                         return rc;
2631
2632                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2633                         /* target child can be directory,
2634                          * delete "." reference in target child directory */
2635                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2636                         if (rc)
2637                                 return rc;
2638
2639                         /* delete ".." reference in target parent directory */
2640                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2641                         if (rc)
2642                                 return rc;
2643                 }
2644
2645                 la->la_valid = LA_CTIME;
2646                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2647                 if (rc)
2648                         return rc;
2649
2650                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2651                 if (rc)
2652                         return rc;
2653         }
2654
2655         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2656         if (rc)
2657                 return rc;
2658
2659         return rc;
2660 }
2661
2662 /* src object can be remote that is why we use only fid and type of object */
2663 static int mdd_rename(const struct lu_env *env,
2664                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2665                       const struct lu_fid *lf, const struct lu_name *lsname,
2666                       struct md_object *tobj, const struct lu_name *ltname,
2667                       struct md_attr *ma)
2668 {
2669         const char *sname = lsname->ln_name;
2670         const char *tname = ltname->ln_name;
2671         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2672         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2673         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2674         struct mdd_device *mdd = mdo2mdd(src_pobj);
2675         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2676         struct mdd_object *mdd_tobj = NULL;
2677         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2678         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2679         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2680         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2681         struct thandle *handle;
2682         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2683         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2684         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2685         bool is_dir;
2686         bool tobj_ref = 0;
2687         bool tobj_locked = 0;
2688         unsigned cl_flags = 0;
2689         int rc, rc2;
2690         ENTRY;
2691
2692         if (tobj)
2693                 mdd_tobj = md2mdd_obj(tobj);
2694
2695         mdd_sobj = mdd_object_find(env, mdd, lf);
2696
2697         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
2698         if (rc)
2699                 GOTO(out_pending, rc);
2700
2701         rc = mdd_la_get(env, mdd_spobj, pattr, BYPASS_CAPA);
2702         if (rc)
2703                 GOTO(out_pending, rc);
2704
2705         if (mdd_tobj) {
2706                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2707                 if (rc)
2708                         GOTO(out_pending, rc);
2709         }
2710
2711         rc = mdd_la_get(env, mdd_tpobj, tpattr, BYPASS_CAPA);
2712         if (rc)
2713                 GOTO(out_pending, rc);
2714
2715         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2716                                      mdd_sobj, cattr, mdd_tobj, tattr);
2717         if (rc)
2718                 GOTO(out_pending, rc);
2719
2720         rc = mdd_name_check(mdd, ltname);
2721         if (rc < 0)
2722                 GOTO(out_pending, rc);
2723
2724         handle = mdd_trans_create(env, mdd);
2725         if (IS_ERR(handle))
2726                 GOTO(out_pending, rc = PTR_ERR(handle));
2727
2728         memset(ldata, 0, sizeof(*ldata));
2729         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2730                            ltname, 1, 0, ldata);
2731         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2732                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2733         if (rc)
2734                 GOTO(stop, rc);
2735
2736         rc = mdd_trans_start(env, mdd, handle);
2737         if (rc)
2738                 GOTO(stop, rc);
2739
2740         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2741         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2742         if (rc < 0)
2743                 GOTO(cleanup_unlocked, rc);
2744
2745         is_dir = S_ISDIR(cattr->la_mode);
2746
2747         /* Remove source name from source directory */
2748         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2749                                 mdd_object_capa(env, mdd_spobj));
2750         if (rc)
2751                 GOTO(cleanup, rc);
2752
2753         /* "mv dir1 dir2" needs "dir1/.." link update */
2754         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2755                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2756                                         mdd_object_capa(env, mdd_sobj));
2757                 if (rc != 0)
2758                         GOTO(fixup_spobj2, rc);
2759
2760                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2761                                              dotdot, handle,
2762                                              mdd_object_capa(env, mdd_sobj));
2763                 if (rc != 0)
2764                         GOTO(fixup_spobj, rc);
2765         }
2766
2767         /* Remove target name from target directory
2768          * Here tobj can be remote one, so we do index_delete unconditionally
2769          * and -ENOENT is allowed.
2770          */
2771         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2772                                 mdd_object_capa(env, mdd_tpobj));
2773         if (rc != 0) {
2774                 if (mdd_tobj) {
2775                         /* tname might been renamed to something else */
2776                         GOTO(fixup_spobj, rc);
2777                 }
2778                 if (rc != -ENOENT)
2779                         GOTO(fixup_spobj, rc);
2780         }
2781
2782         /* Insert new fid with target name into target dir */
2783         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2784                                 tname, handle, mdd_object_capa(env, mdd_tpobj));
2785         if (rc != 0)
2786                 GOTO(fixup_tpobj, rc);
2787
2788         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2789         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2790
2791         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2792         if (mdd_sobj) {
2793                 la->la_valid = LA_CTIME;
2794                 rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2795                 if (rc)
2796                         GOTO(fixup_tpobj, rc);
2797         }
2798
2799         /* Remove old target object
2800          * For tobj is remote case cmm layer has processed
2801          * and set tobj to NULL then. So when tobj is NOT NULL,
2802          * it must be local one.
2803          */
2804         if (tobj && mdd_object_exists(mdd_tobj)) {
2805                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2806                 tobj_locked = 1;
2807                 if (mdd_is_dead_obj(mdd_tobj)) {
2808                         /* shld not be dead, something is wrong */
2809                         CERROR("tobj is dead, something is wrong\n");
2810                         rc = -EINVAL;
2811                         goto cleanup;
2812                 }
2813                 mdo_ref_del(env, mdd_tobj, handle);
2814
2815                 /* Remove dot reference. */
2816                 if (S_ISDIR(tattr->la_mode))
2817                         mdo_ref_del(env, mdd_tobj, handle);
2818                 tobj_ref = 1;
2819
2820                 /* fetch updated nlink */
2821                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2822                 if (rc != 0) {
2823                         CERROR("%s: Failed to get nlink for tobj "
2824                                 DFID": rc = %d\n",
2825                                 mdd2obd_dev(mdd)->obd_name,
2826                                 PFID(tpobj_fid), rc);
2827                         GOTO(fixup_tpobj, rc);
2828                 }
2829
2830                 la->la_valid = LA_CTIME;
2831                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2832                 if (rc != 0) {
2833                         CERROR("%s: Failed to set ctime for tobj "
2834                                 DFID": rc = %d\n",
2835                                 mdd2obd_dev(mdd)->obd_name,
2836                                 PFID(tpobj_fid), rc);
2837                         GOTO(fixup_tpobj, rc);
2838                 }
2839
2840                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2841                 ma->ma_attr = *tattr;
2842                 ma->ma_valid |= MA_INODE;
2843                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2844                                        handle);
2845                 if (rc != 0) {
2846                         CERROR("%s: Failed to unlink tobj "
2847                                 DFID": rc = %d\n",
2848                                 mdd2obd_dev(mdd)->obd_name,
2849                                 PFID(tpobj_fid), rc);
2850                         GOTO(fixup_tpobj, rc);
2851                 }
2852
2853                 /* fetch updated nlink */
2854                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2855                 if (rc != 0) {
2856                         CERROR("%s: Failed to get nlink for tobj "
2857                                 DFID": rc = %d\n",
2858                                 mdd2obd_dev(mdd)->obd_name,
2859                                 PFID(tpobj_fid), rc);
2860                         GOTO(fixup_tpobj, rc);
2861                 }
2862                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2863                 ma->ma_attr = *tattr;
2864                 ma->ma_valid |= MA_INODE;
2865
2866                 if (tattr->la_nlink == 0) {
2867                         cl_flags |= CLF_RENAME_LAST;
2868                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2869                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2870                 }
2871         }
2872
2873         la->la_valid = LA_CTIME | LA_MTIME;
2874         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2875         if (rc)
2876                 GOTO(fixup_tpobj, rc);
2877
2878         if (mdd_spobj != mdd_tpobj) {
2879                 la->la_valid = LA_CTIME | LA_MTIME;
2880                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2881         }
2882
2883         if (rc == 0 && mdd_sobj) {
2884                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2885                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2886                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2887                                       0, 0);
2888                 if (rc == -ENOENT)
2889                         /* Old files might not have EA entry */
2890                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2891                                       lsname, handle, NULL, 0);
2892                 mdd_write_unlock(env, mdd_sobj);
2893                 /* We don't fail the transaction if the link ea can't be
2894                    updated -- fid2path will use alternate lookup method. */
2895                 rc = 0;
2896         }
2897
2898         EXIT;
2899
2900 fixup_tpobj:
2901         if (rc) {
2902                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2903                                          BYPASS_CAPA);
2904                 if (rc2)
2905                         CWARN("tp obj fix error %d\n",rc2);
2906
2907                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2908                     !mdd_is_dead_obj(mdd_tobj)) {
2909                         if (tobj_ref) {
2910                                 mdo_ref_add(env, mdd_tobj, handle);
2911                                 if (is_dir)
2912                                         mdo_ref_add(env, mdd_tobj, handle);
2913                         }
2914
2915                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2916                                                   mdo2fid(mdd_tobj),
2917                                                   mdd_object_type(mdd_tobj),
2918                                                   tname, handle, BYPASS_CAPA);
2919                         if (rc2 != 0)
2920                                 CWARN("tp obj fix error: rc = %d\n", rc2);
2921                 }
2922         }
2923
2924 fixup_spobj:
2925         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2926                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2927                                               BYPASS_CAPA);
2928
2929                 if (rc2)
2930                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2931                                mdd2obd_dev(mdd)->obd_name, rc2);
2932
2933
2934                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
2935                                               dotdot, handle, BYPASS_CAPA);
2936                 if (rc2 != 0)
2937                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2938                               mdd2obd_dev(mdd)->obd_name, rc2);
2939         }
2940
2941 fixup_spobj2:
2942         if (rc != 0) {
2943                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
2944                                          mdd_object_type(mdd_sobj), sname,
2945                                          handle, BYPASS_CAPA);
2946                 if (rc2 != 0)
2947                         CWARN("sp obj fix error: rc = %d\n", rc2);
2948         }
2949
2950 cleanup:
2951         if (tobj_locked)
2952                 mdd_write_unlock(env, mdd_tobj);
2953
2954 cleanup_unlocked:
2955         if (rc == 0)
2956                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
2957                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
2958                                             ltname, lsname, handle);
2959
2960 stop:
2961         mdd_trans_stop(env, mdd, rc, handle);
2962
2963 out_pending:
2964         mdd_object_put(env, mdd_sobj);
2965         return rc;
2966 }
2967
2968 /**
2969  * During migration once the parent FID has been changed,
2970  * we need update the parent FID in linkea.
2971  **/
2972 static int mdd_linkea_update_child_internal(const struct lu_env *env,
2973                                             struct mdd_object *parent,
2974                                             struct mdd_object *child,
2975                                             const char *name, int namelen,
2976                                             struct thandle *handle,
2977                                             bool declare)
2978 {
2979         struct mdd_thread_info  *info = mdd_env_info(env);
2980         struct linkea_data      ldata = { NULL };
2981         struct lu_buf           *buf = &info->mti_link_buf;
2982         int                     count;
2983         int                     rc = 0;
2984
2985         ENTRY;
2986
2987         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2988         if (buf->lb_buf == NULL)
2989                 RETURN(-ENOMEM);
2990
2991         ldata.ld_buf = buf;
2992         rc = mdd_links_read(env, child, &ldata);
2993         if (rc != 0) {
2994                 if (rc == -ENOENT || rc == -ENODATA)
2995                         rc = 0;
2996                 RETURN(rc);
2997         }
2998
2999         LASSERT(ldata.ld_leh != NULL);
3000         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3001         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3002                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3003                 struct lu_name lname;
3004                 struct lu_fid  fid;
3005
3006                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3007                                     &lname, &fid);
3008
3009                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3010                     lu_fid_eq(&fid, mdd_object_fid(parent))) {
3011                         ldata.ld_lee = (struct link_ea_entry *)
3012                                        ((char *)ldata.ld_lee +
3013                                         ldata.ld_reclen);
3014                         continue;
3015                 }
3016
3017                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3018                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3019                        lname.ln_namelen, lname.ln_name,
3020                        PFID(mdd_object_fid(parent)));
3021                 /* update to the new parent fid */
3022                 linkea_entry_pack(ldata.ld_lee, &lname,
3023                                   mdd_object_fid(parent));
3024                 if (declare)
3025                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3026                                                    MLAO_IGNORE);
3027                 else
3028                         rc = mdd_links_write(env, child, &ldata, handle);
3029                 break;
3030         }
3031         RETURN(rc);
3032 }
3033
3034 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3035                                            struct mdd_object *parent,
3036                                            struct mdd_object *child,
3037                                            const char *name, int namelen,
3038                                            struct thandle *handle)
3039 {
3040         return mdd_linkea_update_child_internal(env, parent, child, name,
3041                                                 namelen, handle, true);
3042 }
3043
3044 static int mdd_linkea_update_child(const struct lu_env *env,
3045                                    struct mdd_object *parent,
3046                                    struct mdd_object *child,
3047                                    const char *name, int namelen,
3048                                    struct thandle *handle)
3049 {
3050         return mdd_linkea_update_child_internal(env, parent, child, name,
3051                                                 namelen, handle, false);
3052 }
3053
3054 static int mdd_update_linkea_internal(const struct lu_env *env,
3055                                       struct mdd_object *mdd_pobj,
3056                                       struct mdd_object *mdd_sobj,
3057                                       struct mdd_object *mdd_tobj,
3058                                       const struct lu_name *child_name,
3059                                       struct thandle *handle,
3060                                       int declare)
3061 {
3062         struct mdd_thread_info  *info = mdd_env_info(env);
3063         struct linkea_data      *ldata = &info->mti_link_data;
3064         int                     count;
3065         int                     rc = 0;
3066         ENTRY;
3067
3068         rc = mdd_links_read(env, mdd_sobj, ldata);
3069         if (rc != 0) {
3070                 if (rc == -ENOENT || rc == -ENODATA)
3071                         rc = 0;
3072                 RETURN(rc);
3073         }
3074
3075         if (declare)
3076                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3077                                            MLAO_IGNORE);
3078         else
3079                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3080
3081         if (rc != 0)
3082                 RETURN(rc);
3083
3084         /* If it is mulitple links file, we need update the name entry for
3085          * all parent */
3086         LASSERT(ldata->ld_leh != NULL);
3087         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3088         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3089                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3090                 struct mdd_object       *pobj;
3091                 struct lu_name          lname;
3092                 struct lu_fid           fid;
3093
3094                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3095                                     &lname, &fid);
3096                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3097                                                          ldata->ld_reclen);
3098                 pobj = mdd_object_find(env, mdd, &fid);
3099                 if (IS_ERR(pobj)) {
3100                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3101                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3102                               PTR_ERR(pobj));
3103                         continue;
3104                 }
3105
3106                 if (!mdd_object_exists(pobj)) {
3107                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3108                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3109                         GOTO(next_put, rc);
3110                 }
3111
3112                 if (pobj == mdd_pobj &&
3113                     lname.ln_namelen == child_name->ln_namelen &&
3114                     strncmp(lname.ln_name, child_name->ln_name,
3115                             lname.ln_namelen) == 0) {
3116                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3117                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3118                               PFID(&fid));
3119                         GOTO(next_put, rc);
3120                 }
3121
3122                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3123                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3124                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3125
3126                 if (declare) {
3127                         /* Remove source name from source directory */
3128                         /* Insert new fid with target name into target dir */
3129                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3130                                                       handle);
3131                         if (rc != 0)
3132                                 GOTO(next_put, rc);
3133
3134                         rc = mdo_declare_index_insert(env, pobj,
3135                                         mdd_object_fid(mdd_tobj),
3136                                         mdd_object_type(mdd_tobj),
3137                                         lname.ln_name, handle);
3138                         if (rc != 0)
3139                                 GOTO(next_put, rc);
3140
3141                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3142                         if (rc)
3143                                 GOTO(next_put, rc);
3144
3145                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3146                         if (rc)
3147                                 GOTO(next_put, rc);
3148                 } else {
3149                         rc = __mdd_index_delete(env, pobj, lname.ln_name,
3150                                                 0, handle,
3151                                                 mdd_object_capa(env, pobj));
3152                         if (rc)
3153                                 GOTO(next_put, rc);
3154
3155                         rc = __mdd_index_insert(env, pobj,
3156                                         mdd_object_fid(mdd_tobj),
3157                                         mdd_object_type(mdd_tobj),
3158                                         lname.ln_name, handle,
3159                                         mdd_object_capa(env, pobj));
3160                         if (rc != 0)
3161                                 GOTO(next_put, rc);
3162
3163                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3164                         rc = mdo_ref_add(env, mdd_tobj, handle);
3165                         mdd_write_unlock(env, mdd_tobj);
3166                         if (rc)
3167                                 GOTO(next_put, rc);
3168
3169                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3170                         mdo_ref_del(env, mdd_sobj, handle);
3171                         mdd_write_unlock(env, mdd_sobj);
3172                 }
3173 next_put:
3174                 mdd_object_put(env, pobj);
3175                 if (rc != 0)
3176                         break;
3177         }
3178
3179         RETURN(rc);
3180 }
3181
3182 static int mdd_migrate_xattrs(const struct lu_env *env,
3183                               struct mdd_object *mdd_sobj,
3184                               struct mdd_object *mdd_tobj)
3185 {
3186         struct mdd_thread_info  *info = mdd_env_info(env);
3187         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3188         char                    *xname;
3189         struct thandle          *handle;
3190         struct lu_buf           xbuf;
3191         int                     xlen;
3192         int                     rem;
3193         int                     xsize;
3194         int                     list_xsize;
3195         struct lu_buf           list_xbuf;
3196         int                     rc;
3197
3198         /* retrieve xattr list from the old object */
3199         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL,
3200                                     mdd_object_capa(env, mdd_sobj));
3201         if (list_xsize == -ENODATA)
3202                 return 0;
3203
3204         if (list_xsize < 0)
3205                 return list_xsize;
3206
3207         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3208         if (info->mti_big_buf.lb_buf == NULL)
3209                 return -ENOMEM;
3210
3211         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3212         list_xbuf.lb_len = list_xsize;
3213         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf,
3214                             mdd_object_capa(env, mdd_sobj));
3215         if (rc < 0)
3216                 return rc;
3217         rc = 0;
3218         rem = list_xsize;
3219         xname = list_xbuf.lb_buf;
3220         while (rem > 0) {
3221                 xlen = strnlen(xname, rem - 1) + 1;
3222                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3223                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3224                     strcmp(XATTR_NAME_LMV, xname) == 0)
3225                         goto next;
3226
3227                 /* For directory, if there are default layout, migrate here */
3228                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3229                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3230                         goto next;
3231
3232                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL,
3233                                       xname,
3234                                       mdd_object_capa(env, mdd_sobj));
3235                 if (xsize == -ENODATA)
3236                         goto next;
3237                 if (xsize < 0)
3238                         GOTO(out, rc);
3239
3240                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3241                 if (info->mti_link_buf.lb_buf == NULL)
3242                         GOTO(out, rc = -ENOMEM);
3243
3244                 xbuf.lb_len = xsize;
3245                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3246                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname,
3247                                    mdd_object_capa(env, mdd_sobj));
3248                 if (rc == -ENODATA)
3249                         goto next;
3250                 if (rc < 0)
3251                         GOTO(out, rc);
3252
3253                 handle = mdd_trans_create(env, mdd);
3254                 if (IS_ERR(handle))
3255                         GOTO(out, rc = PTR_ERR(handle));
3256
3257                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3258                                            handle);
3259                 if (rc != 0)
3260                         GOTO(stop_trans, rc);
3261                 /* Note: this transaction is part of migration, and it is not
3262                  * the last step of migration, so we set th_local = 1 to avoid
3263                  * update last rcvd for this transaction */
3264                 handle->th_local = 1;
3265                 rc = mdd_trans_start(env, mdd, handle);
3266                 if (rc != 0)
3267                         GOTO(stop_trans, rc);
3268
3269                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle,
3270                                    mdd_object_capa(env, mdd_sobj));
3271                 if (rc == -EEXIST)
3272                         GOTO(stop_trans, rc = 0);
3273
3274                 if (rc != 0)
3275                         GOTO(stop_trans, rc);
3276 stop_trans:
3277                 mdd_trans_stop(env, mdd, rc, handle);
3278                 if (rc != 0)
3279                         GOTO(out, rc);
3280 next:
3281                 rem -= xlen;
3282                 memmove(xname, xname + xlen, rem);
3283         }
3284 out:
3285         return rc;
3286 }
3287
3288 static int mdd_declare_migrate_create(const struct lu_env *env,
3289                                       struct mdd_object *mdd_pobj,
3290                                       struct mdd_object *mdd_sobj,
3291                                       struct mdd_object *mdd_tobj,
3292                                       struct md_op_spec *spec,
3293                                       struct lu_attr *la,
3294                                       union lmv_mds_md *mgr_ea,
3295                                       struct thandle *handle)
3296 {
3297         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3298         const struct lu_buf     *buf;
3299         int                     rc;
3300         int                     mgr_easize;
3301
3302         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3303                                                 handle, spec, NULL);
3304         if (rc != 0)
3305                 return rc;
3306
3307         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3308                                            handle);
3309         if (rc != 0)
3310                 return rc;
3311
3312         if (S_ISLNK(la->la_mode)) {
3313                 const char *target_name = spec->u.sp_symname;
3314                 int sym_len = strlen(target_name);
3315                 const struct lu_buf *buf;
3316
3317                 buf = mdd_buf_get_const(env, target_name, sym_len);
3318                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3319                                              buf, 0, handle);
3320                 if (rc != 0)
3321                         return rc;
3322         }
3323
3324         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3325                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3326                                         spec->u.sp_ea.eadatalen);
3327                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3328                                            0, handle);
3329                 if (rc)
3330                         return rc;
3331         }
3332
3333         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3334         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3335         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3336                                    0, handle);
3337         if (rc)
3338                 return rc;
3339
3340         la_flag->la_valid = LA_FLAGS;
3341         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3342         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3343
3344         return rc;
3345 }
3346
3347 static int mdd_migrate_create(const struct lu_env *env,
3348                               struct mdd_object *mdd_pobj,
3349                               struct mdd_object *mdd_sobj,
3350                               struct mdd_object *mdd_tobj,
3351                               struct lu_attr *la)
3352 {
3353         struct mdd_thread_info  *info = mdd_env_info(env);
3354         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3355         struct md_op_spec       *spec = &info->mti_spec;
3356         struct lu_buf           lmm_buf = { NULL };
3357         struct lu_buf           link_buf = { NULL };
3358         const struct lu_buf     *buf;
3359         struct thandle          *handle;
3360         struct lmv_mds_md_v1    *mgr_ea;
3361         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3362         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3363         int                     mgr_easize;
3364         int                     rc;
3365         ENTRY;
3366
3367         /* prepare spec for create */
3368         memset(spec, 0, sizeof(*spec));
3369         spec->sp_cr_lookup = 0;
3370         spec->sp_feat = &dt_directory_features;
3371         if (S_ISLNK(la->la_mode)) {
3372                 buf = lu_buf_check_and_alloc(
3373                                 &mdd_env_info(env)->mti_big_buf,
3374                                 la->la_size + 1);
3375                 link_buf = *buf;
3376                 link_buf.lb_len = la->la_size + 1;
3377                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3378                 if (rc <= 0) {
3379                         rc = rc != 0 ? rc : -EFAULT;
3380                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3381                                mdd2obd_dev(mdd)->obd_name,
3382                                PFID(mdd_object_fid(mdd_sobj)), rc);
3383                         RETURN(rc);
3384                 }
3385                 spec->u.sp_symname = link_buf.lb_buf;
3386         } else if S_ISREG(la->la_mode) {
3387                 /* retrieve lov of the old object */
3388                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3389                 if (rc != 0 && rc != -ENODATA)
3390                         RETURN(rc);
3391                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3392                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3393                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3394                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3395                 }
3396         }
3397
3398         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3399         memset(mgr_ea, 0, sizeof(*mgr_ea));
3400         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3401         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3402         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3403         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3404         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3405         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3406
3407         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3408
3409         handle = mdd_trans_create(env, mdd);
3410         if (IS_ERR(handle))
3411                 GOTO(out_free, rc = PTR_ERR(handle));
3412
3413         /* Note: this transaction is part of migration, and it is not
3414          * the last step of migration, so we set th_local = 1 to avoid
3415          * update last rcvd for this transaction */
3416         handle->th_local = 1;
3417         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3418                                         spec, la,
3419                                         (union lmv_mds_md *)info->mti_xattr_buf,
3420                                         handle);
3421         if (rc != 0)
3422                 GOTO(stop_trans, rc);
3423
3424         rc = mdd_trans_start(env, mdd, handle);
3425         if (rc != 0)
3426                 GOTO(stop_trans, rc);
3427
3428         /* create the target object */
3429         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3430                                hint, handle);
3431         if (rc != 0)
3432                 GOTO(stop_trans, rc);
3433
3434         /* Set MIGRATE EA on the source inode, so once the migration needs
3435          * to be re-done during failover, the re-do process can locate the
3436          * target object which is already being created. */
3437         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3438         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3439         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0,
3440                            handle, mdd_object_capa(env, mdd_sobj));
3441         if (rc != 0)
3442                 GOTO(stop_trans, rc);
3443
3444         /* Set immutable flag, so any modification is disabled until
3445          * the migration is done. Once the migration is interrupted,
3446          * if the resume process find the migrating object has both
3447          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3448          * flag and approve the migration */
3449         la_flag->la_valid = LA_FLAGS;
3450         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3451         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
3452                           mdd_object_capa(env, mdd_sobj));
3453 stop_trans:
3454         if (handle != NULL)
3455                 mdd_trans_stop(env, mdd, rc, handle);
3456 out_free:
3457         if (lmm_buf.lb_buf != NULL)
3458                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3459         RETURN(rc);
3460 }
3461
3462 static int mdd_migrate_entries(const struct lu_env *env,
3463                                struct mdd_object *mdd_sobj,
3464                                struct mdd_object *mdd_tobj)
3465 {
3466         struct dt_object        *next = mdd_object_child(mdd_sobj);
3467         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3468         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3469         struct thandle          *handle;
3470         struct dt_it            *it;
3471         const struct dt_it_ops  *iops;
3472         int                      rc;
3473         int                      result;
3474         struct lu_dirent        *ent;
3475         ENTRY;
3476
3477         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3478         if (ent == NULL)
3479                 RETURN(-ENOMEM);
3480
3481         if (!dt_try_as_dir(env, next))
3482                 GOTO(out_ent, rc = -ENOTDIR);
3483         /*
3484          * iterate directories
3485          */
3486         iops = &next->do_index_ops->dio_it;
3487         it = iops->init(env, next, LUDA_FID | LUDA_TYPE,
3488                         mdd_object_capa(env, mdd_sobj));
3489         if (IS_ERR(it))
3490                 GOTO(out_ent, rc = PTR_ERR(it));
3491
3492         rc = iops->load(env, it, 0);
3493         if (rc == 0)
3494                 rc = iops->next(env, it);
3495         else if (rc > 0)
3496                 rc = 0;
3497         /*
3498          * At this point and across for-loop:
3499          *
3500          *  rc == 0 -> ok, proceed.
3501          *  rc >  0 -> end of directory.
3502          *  rc <  0 -> error.
3503          */
3504         do {
3505                 struct mdd_object       *child;
3506                 char                    *name = mdd_env_info(env)->mti_key;
3507                 int                     len;
3508                 int                     recsize;
3509                 int                     is_dir;
3510                 bool                    target_exist = false;
3511
3512                 len = iops->key_size(env, it);
3513                 if (len == 0)
3514                         goto next;
3515
3516                 result = iops->rec(env, it, (struct dt_rec *)ent,
3517                                    LUDA_FID | LUDA_TYPE);
3518                 if (result == -ESTALE)
3519                         goto next;
3520                 if (result != 0) {
3521                         rc = result;
3522                         goto out;
3523                 }
3524
3525                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3526                 recsize = le16_to_cpu(ent->lde_reclen);
3527
3528                 /* Insert new fid with target name into target dir */
3529                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3530                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3531                      ent->lde_name[1] == '.'))
3532                         goto next;
3533
3534                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3535                 if (IS_ERR(child))
3536                         GOTO(out, rc = PTR_ERR(child));
3537
3538                 is_dir = S_ISDIR(mdd_object_type(child));
3539
3540                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3541
3542                 /* Check whether the name has been inserted to the target */
3543                 if (dt_try_as_dir(env, dt_tobj)) {
3544                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3545
3546                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3547                                        (struct dt_key *)name,
3548                                        mdd_object_capa(env, mdd_tobj));
3549                         if (unlikely(rc == 0))
3550                                 target_exist = true;
3551                 }
3552
3553                 handle = mdd_trans_create(env, mdd);
3554                 if (IS_ERR(handle))
3555                         GOTO(out, rc = PTR_ERR(handle));
3556
3557                 /* Note: this transaction is part of migration, and it is not
3558                  * the last step of migration, so we set th_local = 1 to avoid
3559                  * updating last rcvd for this transaction */
3560                 handle->th_local = 1;
3561                 if (likely(!target_exist)) {
3562                         rc = mdo_declare_index_insert(env, mdd_tobj,
3563                                                       &ent->lde_fid,
3564                                                       mdd_object_type(child),
3565                                                       name, handle);
3566                         if (rc != 0)
3567                                 GOTO(out_put, rc);
3568
3569                         if (is_dir) {
3570                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3571                                 if (rc != 0)
3572                                         GOTO(out_put, rc);
3573                         }
3574                 }
3575
3576                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3577                 if (rc != 0)
3578                         GOTO(out_put, rc);
3579
3580                 if (is_dir) {
3581                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3582                         if (rc != 0)
3583                                 GOTO(out_put, rc);
3584
3585                         /* Update .. for child */
3586                         rc = mdo_declare_index_delete(env, child, dotdot,
3587                                                       handle);
3588                         if (rc != 0)
3589                                 GOTO(out_put, rc);
3590
3591                         rc = mdo_declare_index_insert(env, child,
3592                                                       mdd_object_fid(mdd_tobj),
3593                                                       S_IFDIR, dotdot, handle);
3594                         if (rc != 0)
3595                                 GOTO(out_put, rc);
3596                 }
3597
3598                 rc = mdd_linkea_declare_update_child(env, mdd_tobj,
3599                                                      child, name,
3600                                                      strlen(name),
3601                                                      handle);
3602                 if (rc != 0)
3603                         GOTO(out_put, rc);
3604
3605                 rc = mdd_trans_start(env, mdd, handle);
3606                 if (rc != 0) {
3607                         CERROR("%s: transaction start failed: rc = %d\n",
3608                                mdd2obd_dev(mdd)->obd_name, rc);
3609                         GOTO(out_put, rc);
3610                 }
3611
3612                 if (likely(!target_exist)) {
3613                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3614                                                 mdd_object_type(child),
3615                                                 name, handle,
3616                                                 mdd_object_capa(env, mdd_tobj));
3617                         if (rc != 0)
3618                                 GOTO(out_put, rc);
3619
3620                         if (is_dir) {
3621                                 rc = mdo_ref_add(env, mdd_tobj, handle);
3622                                 if (rc != 0)
3623                                         GOTO(out_put, rc);
3624
3625                         }
3626                 }
3627
3628                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle,
3629                                         mdd_object_capa(env, mdd_sobj));
3630                 if (rc != 0)
3631                         GOTO(out_put, rc);
3632
3633                 if (is_dir) {
3634                         rc = __mdd_index_delete_only(env, child, dotdot, handle,
3635                                                    mdd_object_capa(env, child));
3636                         if (rc != 0)
3637                                 GOTO(out_put, rc);
3638
3639                         rc = __mdd_index_insert_only(env, child,
3640                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3641                                          dotdot, handle,
3642                                          mdd_object_capa(env, child));
3643                         if (rc != 0)
3644                                 GOTO(out_put, rc);
3645                 }
3646
3647                 rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
3648                                              strlen(name), handle);
3649
3650 out_put:
3651                 mdd_object_put(env, child);
3652                 mdd_trans_stop(env, mdd, rc, handle);
3653                 if (rc != 0)
3654                         GOTO(out, rc);
3655 next:
3656                 result = iops->next(env, it);
3657                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3658                         GOTO(out, rc = -EINTR);
3659
3660                 if (result == -ESTALE)
3661                         goto next;
3662         } while (result == 0);
3663 out:
3664         iops->put(env, it);
3665         iops->fini(env, it);
3666 out_ent:
3667         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3668         RETURN(rc);
3669 }
3670
3671 static int mdd_declare_update_linkea(const struct lu_env *env,
3672                                      struct mdd_object *mdd_pobj,
3673                                      struct mdd_object *mdd_sobj,
3674                                      struct mdd_object *mdd_tobj,
3675                                      struct thandle *handle,
3676                                      const struct lu_name *child_name)
3677 {
3678         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3679                                           child_name, handle, 1);
3680 }
3681
3682 static int mdd_update_linkea(const struct lu_env *env,
3683                              struct mdd_object *mdd_pobj,
3684                              struct mdd_object *mdd_sobj,
3685                              struct mdd_object *mdd_tobj,
3686                              struct thandle *handle,
3687                              const struct lu_name *child_name)
3688 {
3689         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3690                                           child_name, handle, 0);
3691 }
3692
3693 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3694                                            struct mdd_object *mdd_pobj,
3695                                            struct mdd_object *mdd_sobj,
3696                                            struct mdd_object *mdd_tobj,
3697                                            const struct lu_name *lname,
3698                                            struct lu_attr *la,
3699                                            struct lu_attr *parent_la,
3700                                            struct thandle *handle)
3701 {
3702         struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
3703         int             rc;
3704
3705         /* Revert IMMUTABLE flag */
3706         la_flag->la_valid = LA_FLAGS;
3707         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3708         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3709         if (rc != 0)
3710                 return rc;
3711
3712         /* delete entry from source dir */
3713         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3714         if (rc != 0)
3715                 return rc;
3716
3717         rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3718                                        mdd_tobj, handle, lname);
3719         if (rc != 0)
3720                 return rc;
3721
3722         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3723                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3724                                            handle);
3725                 if (rc != 0)
3726                         return rc;
3727         }
3728
3729         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3730                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3731                 if (rc != 0)
3732                         return rc;
3733         }
3734
3735         /* new name */
3736         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3737                                       mdd_object_type(mdd_tobj),
3738                                       lname->ln_name, handle);
3739         if (rc != 0)
3740                 return rc;
3741
3742         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3743                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3744                 if (rc != 0)
3745                         return rc;
3746         }
3747
3748         /* delete old object */
3749         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3750         if (rc != 0)
3751                 return rc;
3752
3753         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3754                 /* delete old object */
3755                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3756                 if (rc != 0)
3757                         return rc;
3758                 /* set nlink to 0 */
3759                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3760                 if (rc != 0)
3761                         return rc;
3762         }
3763
3764         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3765         if (rc)
3766                 return rc;
3767
3768         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3769
3770         return rc;
3771 }
3772
3773 static int mdd_migrate_update_name(const struct lu_env *env,
3774                                    struct mdd_object *mdd_pobj,
3775                                    struct mdd_object *mdd_sobj,
3776                                    struct mdd_object *mdd_tobj,
3777                                    const struct lu_name *lname,
3778                                    struct md_attr *ma)
3779 {
3780         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3781         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3782         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3783         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3784         struct thandle          *handle;
3785         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3786         const char              *name = lname->ln_name;
3787         int                     rc;
3788         ENTRY;
3789
3790         /* update time for parent */
3791         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3792         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3793         p_la->la_valid = LA_CTIME;
3794
3795         rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
3796         if (rc != 0)
3797                 RETURN(rc);
3798
3799         handle = mdd_trans_create(env, mdd);
3800         if (IS_ERR(handle))
3801                 RETURN(PTR_ERR(handle));
3802
3803         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3804                                              lname, so_attr, p_la, handle);
3805         if (rc != 0) {
3806                 /* If the migration can not be fit in one transaction, just
3807                  * leave it in the original MDT */
3808                 if (rc == -E2BIG)
3809                         GOTO(stop_trans, rc = 0);
3810                 else
3811                         GOTO(stop_trans, rc);
3812         }
3813
3814         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3815                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3816                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3817                PFID(mdd_object_fid(mdd_tobj)));
3818
3819         rc = mdd_trans_start(env, mdd, handle);
3820         if (rc != 0)
3821                 GOTO(stop_trans, rc);
3822
3823         /* Revert IMMUTABLE flag */
3824         la_flag->la_valid = LA_FLAGS;
3825         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3826         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
3827                           mdd_object_capa(env, mdd_pobj));
3828         if (rc != 0)
3829                 GOTO(stop_trans, rc);
3830
3831         /* Remove source name from source directory */
3832         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
3833                                 mdd_object_capa(env, mdd_pobj));
3834         if (rc != 0)
3835                 GOTO(stop_trans, rc);
3836
3837         rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3838                                handle, lname);
3839         if (rc != 0)
3840                 GOTO(stop_trans, rc);
3841
3842         if (S_ISREG(so_attr->la_mode)) {
3843                 if (so_attr->la_nlink == 1) {
3844                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3845                                            handle,
3846                                            mdd_object_capa(env, mdd_sobj));
3847                         if (rc != 0 && rc != -ENODATA)
3848                                 GOTO(stop_trans, rc);
3849                 }
3850         }
3851
3852         /* Insert new fid with target name into target dir */
3853         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
3854                                 mdd_object_type(mdd_tobj), name,
3855                                 handle, mdd_object_capa(env, mdd_pobj));
3856         if (rc != 0)
3857                 GOTO(stop_trans, rc);
3858
3859         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
3860                            NULL, 1);
3861         if (rc != 0)
3862                 GOTO(stop_trans, rc);
3863
3864         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3865         mdo_ref_del(env, mdd_sobj, handle);
3866         if (is_dir)
3867                 mdo_ref_del(env, mdd_sobj, handle);
3868
3869         /* Get the attr again after ref_del */
3870         rc = mdd_la_get(env, mdd_sobj, so_attr,
3871                         mdd_object_capa(env, mdd_sobj));
3872         if (rc != 0)
3873                 GOTO(out_unlock, rc);
3874
3875         ma->ma_attr = *so_attr;
3876         ma->ma_valid |= MA_INODE;
3877         rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
3878         if (rc != 0)
3879                 GOTO(out_unlock, rc);
3880
3881         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
3882         if (rc != 0)
3883                 GOTO(out_unlock, rc);
3884
3885 out_unlock:
3886         mdd_write_unlock(env, mdd_sobj);
3887
3888 stop_trans:
3889         mdd_trans_stop(env, mdd, rc, handle);
3890
3891         RETURN(rc);
3892 }
3893
3894 /**
3895  * Check whether we should migrate the file/dir
3896  * return val
3897  *      < 0  permission check failed or other error.
3898  *      = 0  the file can be migrated.
3899  *      > 0  the file does not need to be migrated, mostly
3900  *           for multiple link file
3901  **/
3902 static int mdd_migrate_sanity_check(const struct lu_env *env,
3903                                     struct mdd_object *pobj,
3904                                     const struct lu_attr *pattr,
3905                                     struct mdd_object *sobj,
3906                                     struct lu_attr *sattr)
3907 {
3908         struct mdd_thread_info  *info = mdd_env_info(env);
3909         struct linkea_data      *ldata = &info->mti_link_data;
3910         int                     mgr_easize;
3911         struct lu_buf           *mgr_buf;
3912         int                     count;
3913         int                     rc;
3914
3915         ENTRY;
3916
3917         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3918         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
3919         if (mgr_buf->lb_buf == NULL)
3920                 RETURN(-ENOMEM);
3921
3922         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV,
3923                            mdd_object_capa(env, sobj));
3924         if (rc > 0) {
3925                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
3926
3927                 /* If the object has migrateEA, it means IMMUTE flag
3928                  * is being set by previous migration process, so it
3929                  * needs to override the IMMUTE flag, otherwise the
3930                  * following sanity check will fail */
3931                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
3932                                                 LMV_HASH_FLAG_MIGRATION) {
3933                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3934
3935                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
3936                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
3937                                mdd2obd_dev(mdd)->obd_name,
3938                                PFID(mdd_object_fid(sobj)));
3939                 }
3940         }
3941
3942         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
3943                                      sobj, sattr, NULL, NULL);
3944         if (rc != 0)
3945                 RETURN(rc);
3946
3947         /* Then it will check if the file should be migrated. If the file
3948          * has mulitple links, we only need migrate the file if all of its
3949          * entries has been migrated to the remote MDT */
3950         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
3951                 RETURN(0);
3952
3953         rc = mdd_links_read(env, sobj, ldata);
3954         if (rc != 0) {
3955                 /* For multiple links files, if there are no linkEA data at all,
3956                  * means the file might be created before linkEA is enabled, and
3957                  * all all of its links should not be migrated yet, otherwise
3958                  * it should have some linkEA there */
3959                 if (rc == -ENOENT || rc == -ENODATA)
3960                         RETURN(1);
3961                 RETURN(rc);
3962         }
3963
3964         /* If it is mulitple links file, we need update the name entry for
3965          * all parent */
3966         LASSERT(ldata->ld_leh != NULL);
3967         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3968         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3969                 struct mdd_device       *mdd = mdo2mdd(&sobj->mod_obj);
3970                 struct mdd_object       *lpobj;
3971                 struct lu_name          lname;
3972                 struct lu_fid           fid;
3973
3974                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3975                                     &lname, &fid);
3976                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3977                                                          ldata->ld_reclen);
3978                 lpobj = mdd_object_find(env, mdd, &fid);
3979                 if (IS_ERR(lpobj)) {
3980                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3981                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3982                               PTR_ERR(lpobj));
3983                         continue;
3984                 }
3985
3986                 if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
3987                         CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
3988                                PFID(&fid), lname.ln_namelen, lname.ln_name);
3989                         mdd_object_put(env, lpobj);
3990                         continue;
3991                 }
3992
3993                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
3994                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
3995                        lname.ln_name, PFID(&fid));
3996                 mdd_object_put(env, lpobj);
3997                 rc = 1;
3998                 break;
3999         }
4000
4001         RETURN(rc);
4002 }
4003
4004 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4005                        struct md_object *sobj, const struct lu_name *lname,
4006                        struct md_object *tobj, struct md_attr *ma)
4007 {
4008         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4009         struct mdd_device       *mdd = mdo2mdd(pobj);
4010         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4011         struct mdd_object       *mdd_tobj = NULL;
4012         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4013         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4014         int                     rc;
4015
4016         ENTRY;
4017         /* If the file will being migrated, it will check whether
4018          * the file is being opened by someone else right now */
4019         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4020         if (mdd_sobj->mod_count >= 1) {
4021                 CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
4022                        mdd2obd_dev(mdd)->obd_name,
4023                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4024                        mdd_sobj->mod_count, -EBUSY);
4025                 mdd_read_unlock(env, mdd_sobj);
4026                 GOTO(put, rc = -EBUSY);
4027         }
4028         mdd_read_unlock(env, mdd_sobj);
4029
4030         rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
4031         if (rc != 0)
4032                 GOTO(put, rc);
4033
4034         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
4035         if (rc != 0)
4036                 GOTO(put, rc);
4037
4038         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4039         if (rc != 0) {
4040                 if (rc > 0)
4041                         rc = 0;
4042                 GOTO(put, rc);
4043         }
4044
4045         /* Sigh, it is impossible to finish all of migration in a single
4046          * transaction, for example migrating big directory entries to the
4047          * new MDT, it needs insert all of name entries of children in the
4048          * new directory.
4049          *
4050          * So migration will be done in multiple steps and transactions.
4051          *
4052          * 1. create an orphan object on the remote MDT in one transaction.
4053          * 2. migrate extend attributes to the new target file/directory.
4054          * 3. For directory, migrate the entries to the new MDT and update
4055          * linkEA of each children. Because we can not migrate all entries
4056          * in a single transaction, so the migrating directory will become
4057          * a striped directory during migration, so once the process is
4058          * interrupted, the directory is still accessible. (During lookup,
4059          * client will locate the name by searching both original and target
4060          * object).
4061          * 4. Finally, update the name/FID to point to the new file/directory
4062          * in a separate transaction.
4063          */
4064
4065         /* step 1: Check whether the orphan object has been created, and create
4066          * orphan object on the remote MDT if needed */
4067         mdd_tobj = md2mdd_obj(tobj);
4068         if (!mdd_object_exists(mdd_tobj)) {
4069                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4070                                         so_attr);
4071                 if (rc != 0)
4072                         GOTO(put, rc);
4073         }
4074
4075         /* step 2: migrate xattr */
4076         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4077         if (rc != 0)
4078                 GOTO(put, rc);
4079
4080         /* step 3: migrate name entries to the orphan object */
4081         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4082                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4083                 if (rc != 0)
4084                         GOTO(put, rc);
4085                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4086                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4087                         GOTO(put, rc = 0);
4088         }
4089
4090         /* step 4: update name entry to the new object */
4091         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4092                                      ma);
4093         if (rc != 0)
4094                 GOTO(put, rc);
4095 put:
4096         RETURN(rc);
4097 }
4098
4099 const struct md_dir_operations mdd_dir_ops = {
4100         .mdo_is_subdir     = mdd_is_subdir,
4101         .mdo_lookup        = mdd_lookup,
4102         .mdo_create        = mdd_create,
4103         .mdo_rename        = mdd_rename,
4104         .mdo_link          = mdd_link,
4105         .mdo_unlink        = mdd_unlink,
4106         .mdo_create_data   = mdd_create_data,
4107         .mdo_migrate       = mdd_migrate,
4108 };