Whamcloud - gitweb
LU-5938 mdd: fixed oops when dereferencing structure
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static inline int
61 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
62 {
63         if (!lu_name_is_valid(ln))
64                 return -EINVAL;
65         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
66                 return -ENAMETOOLONG;
67         else
68                 return 0;
69 }
70
71 /* Get FID from name and parent */
72 static int
73 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
74              const struct lu_attr *pattr, const struct lu_name *lname,
75              struct lu_fid* fid, int mask)
76 {
77         const char *name                = lname->ln_name;
78         const struct dt_key *key        = (const struct dt_key *)name;
79         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
80         struct mdd_device *m            = mdo2mdd(pobj);
81         struct dt_object *dir           = mdd_object_child(mdd_obj);
82         int rc;
83         ENTRY;
84
85         if (unlikely(mdd_is_dead_obj(mdd_obj)))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         } else if (!mdd_object_exists(mdd_obj)) {
92                 RETURN(-ESTALE);
93         }
94
95         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
96                                             MOR_TGT_PARENT);
97         if (rc)
98                 RETURN(rc);
99
100         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
101                    dt_try_as_dir(env, dir)))
102                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key,
103                                mdd_object_capa(env, mdd_obj));
104         else
105                 rc = -ENOTDIR;
106
107         RETURN(rc);
108 }
109
110 int mdd_lookup(const struct lu_env *env,
111                struct md_object *pobj, const struct lu_name *lname,
112                struct lu_fid *fid, struct md_op_spec *spec)
113 {
114         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
115         int rc;
116         ENTRY;
117
118         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr, BYPASS_CAPA);
119         if (rc != 0)
120                 RETURN(rc);
121
122         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
123                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
124         RETURN(rc);
125 }
126
127 static inline int mdd_parent_fid(const struct lu_env *env,
128                                  struct mdd_object *obj,
129                                  const struct lu_attr *attr,
130                                  struct lu_fid *fid)
131 {
132         return __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
133 }
134
135 /*
136  * For root fid use special function, which does not compare version component
137  * of fid. Version component is different for root fids on all MDTs.
138  */
139 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
140 {
141         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
142                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
143 }
144
145 /*
146  * return 1: if lf is the fid of the ancestor of p1;
147  * return 0: if not;
148  *
149  * return -EREMOTE: if remote object is found, in this
150  * case fid of remote object is saved to @pf;
151  *
152  * otherwise: values < 0, errors.
153  */
154 static int mdd_is_parent(const struct lu_env *env,
155                         struct mdd_device *mdd,
156                         struct mdd_object *p1,
157                         const struct lu_attr *attr,
158                         const struct lu_fid *lf,
159                         struct lu_fid *pf)
160 {
161         struct mdd_object *parent = NULL;
162         struct lu_fid *pfid;
163         int rc;
164         ENTRY;
165
166         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
167         pfid = &mdd_env_info(env)->mti_fid;
168
169         /* Check for root first. */
170         if (mdd_is_root(mdd, mdo2fid(p1)))
171                 RETURN(0);
172
173         for(;;) {
174                 /* this is done recursively, bypass capa for each obj */
175                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
176                 rc = mdd_parent_fid(env, p1, attr, pfid);
177                 if (rc)
178                         GOTO(out, rc);
179                 if (mdd_is_root(mdd, pfid))
180                         GOTO(out, rc = 0);
181                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
182                         GOTO(out, rc = 0);
183                 if (lu_fid_eq(pfid, lf))
184                         GOTO(out, rc = 1);
185                 if (parent)
186                         mdd_object_put(env, parent);
187
188                 parent = mdd_object_find(env, mdd, pfid);
189                 if (IS_ERR(parent)) {
190                         GOTO(out, rc = PTR_ERR(parent));
191                 } else if (mdd_object_remote(parent)) {
192                         /*FIXME: Because of the restriction of rename in Phase I.
193                          * If the parent is remote, we just assumed lf is not the
194                          * parent of P1 for now */
195                         GOTO(out, rc = 0);
196                 }
197                 p1 = parent;
198         }
199         EXIT;
200 out:
201         if (parent && !IS_ERR(parent))
202                 mdd_object_put(env, parent);
203         return rc;
204 }
205
206 /*
207  * No permission check is needed.
208  *
209  * returns 1: if fid is ancestor of @mo;
210  * returns 0: if fid is not a ancestor of @mo;
211  *
212  * returns EREMOTE if remote object is found, fid of remote object is saved to
213  * @fid;
214  *
215  * returns < 0: if error
216  */
217 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
218                   const struct lu_fid *fid, struct lu_fid *sfid)
219 {
220         struct mdd_device *mdd = mdo2mdd(mo);
221         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
222         int rc;
223         ENTRY;
224
225         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
226                 RETURN(0);
227
228         rc = mdd_la_get(env, md2mdd_obj(mo), attr, BYPASS_CAPA);
229         if (rc != 0)
230                 RETURN(rc);
231
232         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
233         if (rc == 0) {
234                 /* found root */
235                 fid_zero(sfid);
236         } else if (rc == 1) {
237                 /* found @fid is parent */
238                 *sfid = *fid;
239                 rc = 0;
240         }
241         RETURN(rc);
242 }
243
244 /*
245  * Check that @dir contains no entries except (possibly) dot and dotdot.
246  *
247  * Returns:
248  *
249  *             0        empty
250  *      -ENOTDIR        not a directory object
251  *    -ENOTEMPTY        not empty
252  *           -ve        other error
253  *
254  */
255 static int mdd_dir_is_empty(const struct lu_env *env,
256                             struct mdd_object *dir)
257 {
258         struct dt_it     *it;
259         struct dt_object *obj;
260         const struct dt_it_ops *iops;
261         int result;
262         ENTRY;
263
264         obj = mdd_object_child(dir);
265         if (!dt_try_as_dir(env, obj))
266                 RETURN(-ENOTDIR);
267
268         iops = &obj->do_index_ops->dio_it;
269         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
270         if (!IS_ERR(it)) {
271                 result = iops->get(env, it, (const struct dt_key *)"");
272                 if (result > 0) {
273                         int i;
274                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
275                                 result = iops->next(env, it);
276                         if (result == 0)
277                                 result = -ENOTEMPTY;
278                         else if (result == 1)
279                                 result = 0;
280                 } else if (result == 0)
281                         /*
282                          * Huh? Index contains no zero key?
283                          */
284                         result = -EIO;
285
286                 iops->put(env, it);
287                 iops->fini(env, it);
288         } else
289                 result = PTR_ERR(it);
290         RETURN(result);
291 }
292
293 /**
294  * Determine if the target object can be hard linked, and right now it only
295  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
296  * directory nlink count might exceed the maximum link count(see
297  * osd_object_ref_add), so it only check nlink for non-directories.
298  *
299  * \param[in] env       thread environment
300  * \param[in] obj       object being linked to
301  * \param[in] la        attributes of \a obj
302  *
303  * \retval              0 if \a obj can be hard linked
304  * \retval              negative error if \a obj is a directory or has too
305  *                      many links
306  */
307 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
308                           const struct lu_attr *la)
309 {
310         struct mdd_device *m = mdd_obj2mdd_dev(obj);
311         ENTRY;
312
313         LASSERT(la != NULL);
314
315         /* Subdir count limitation can be broken through
316          * (see osd_object_ref_add), so only check non-directory here. */
317         if (!S_ISDIR(la->la_mode) &&
318             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
319                 RETURN(-EMLINK);
320
321         RETURN(0);
322 }
323
324 /**
325  * Check whether it may create the cobj under the pobj.
326  *
327  * \param[in] env       execution environment
328  * \param[in] pobj      the parent directory
329  * \param[in] pattr     the attribute of the parent directory
330  * \param[in] cobj      the child to be created
331  * \param[in] check_perm        if check WRITE|EXEC permission for parent
332  *
333  * \retval              = 0 create the child under this dir is allowed
334  * \retval              negative errno create the child under this dir is
335  *                      not allowed
336  */
337 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
338                    const struct lu_attr *pattr, struct mdd_object *cobj,
339                    bool check_perm)
340 {
341         struct mdd_thread_info *info = mdd_env_info(env);
342         struct lu_buf   *xbuf;
343         int rc = 0;
344         ENTRY;
345
346         if (cobj && mdd_object_exists(cobj))
347                 RETURN(-EEXIST);
348
349         if (mdd_is_dead_obj(pobj))
350                 RETURN(-ENOENT);
351
352         /* If the parent is a sub-stripe, check whether it is dead */
353         xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
354         rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV,
355                            mdd_object_capa(env, pobj));
356         if (unlikely(rc > 0)) {
357                 struct lmv_mds_md_v1  *lmv1 = xbuf->lb_buf;
358
359                 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
360                     le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
361                         RETURN(-ESTALE);
362         }
363         rc = 0;
364
365         if (check_perm)
366                 rc = mdd_permission_internal_locked(env, pobj, pattr,
367                                                     MAY_WRITE | MAY_EXEC,
368                                                     MOR_TGT_PARENT);
369         RETURN(rc);
370 }
371
372 /*
373  * Check whether can unlink from the pobj in the case of "cobj == NULL".
374  */
375 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
376                    const struct lu_attr *pattr, const struct lu_attr *attr)
377 {
378         int rc;
379         ENTRY;
380
381         if (mdd_is_dead_obj(pobj))
382                 RETURN(-ENOENT);
383
384         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
385                 RETURN(-EPERM);
386
387         rc = mdd_permission_internal_locked(env, pobj, pattr,
388                                             MAY_WRITE | MAY_EXEC,
389                                             MOR_TGT_PARENT);
390         if (rc != 0)
391                 RETURN(rc);
392
393         if (pattr->la_flags & LUSTRE_APPEND_FL)
394                 RETURN(-EPERM);
395
396         RETURN(rc);
397 }
398
399 /*
400  * pobj == NULL is remote ops case, under such case, pobj's
401  * VTX feature has been checked already, no need check again.
402  */
403 static inline int mdd_is_sticky(const struct lu_env *env,
404                                 struct mdd_object *pobj,
405                                 const struct lu_attr *pattr,
406                                 struct mdd_object *cobj,
407                                 const struct lu_attr *cattr)
408 {
409         struct lu_ucred *uc = lu_ucred_assert(env);
410
411         if (pobj != NULL) {
412                 LASSERT(pattr != NULL);
413                 if (!(pattr->la_mode & S_ISVTX) ||
414                     (pattr->la_uid == uc->uc_fsuid))
415                         return 0;
416         }
417
418         LASSERT(cattr != NULL);
419         if (cattr->la_uid == uc->uc_fsuid)
420                 return 0;
421
422         return !md_capable(uc, CFS_CAP_FOWNER);
423 }
424
425 static int mdd_may_delete_entry(const struct lu_env *env,
426                                 struct mdd_object *pobj,
427                                 const struct lu_attr *pattr,
428                                 int check_perm)
429 {
430         ENTRY;
431
432         LASSERT(pobj != NULL);
433         if (!mdd_object_exists(pobj))
434                 RETURN(-ENOENT);
435
436         if (mdd_is_dead_obj(pobj))
437                 RETURN(-ENOENT);
438
439         if (check_perm) {
440                 int rc;
441                 rc = mdd_permission_internal_locked(env, pobj, pattr,
442                                             MAY_WRITE | MAY_EXEC,
443                                             MOR_TGT_PARENT);
444                 if (rc)
445                         RETURN(rc);
446         }
447
448         if (pattr->la_flags & LUSTRE_APPEND_FL)
449                 RETURN(-EPERM);
450
451         RETURN(0);
452 }
453
454 /*
455  * Check whether it may delete the cobj from the pobj.
456  * pobj maybe NULL
457  */
458 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
459                    const struct lu_attr *tpattr, struct mdd_object *tobj,
460                    const struct lu_attr *tattr, const struct lu_attr *cattr,
461                    int check_perm, int check_empty)
462 {
463         int rc = 0;
464         ENTRY;
465
466         if (tpobj) {
467                 LASSERT(tpattr != NULL);
468                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
469                 if (rc != 0)
470                         RETURN(rc);
471         }
472
473         if (tobj == NULL)
474                 RETURN(0);
475
476         if (!mdd_object_exists(tobj))
477                 RETURN(-ENOENT);
478
479         if (mdd_is_dead_obj(tobj))
480                 RETURN(-ESTALE);
481
482         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
483                 RETURN(-EPERM);
484
485         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
486                 RETURN(-EPERM);
487
488         /* additional check the rename case */
489         if (cattr) {
490                 if (S_ISDIR(cattr->la_mode)) {
491                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
492
493                         if (!S_ISDIR(tattr->la_mode))
494                                 RETURN(-ENOTDIR);
495
496                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
497                                 RETURN(-EBUSY);
498                 } else if (S_ISDIR(tattr->la_mode))
499                         RETURN(-EISDIR);
500         }
501
502         if (S_ISDIR(tattr->la_mode) && check_empty)
503                 rc = mdd_dir_is_empty(env, tobj);
504
505         RETURN(rc);
506 }
507
508 /**
509  * Check whether it can create the link file(linked to @src_obj) under
510  * the target directory(@tgt_obj), and src_obj has been locked by
511  * mdd_write_lock.
512  *
513  * \param[in] env       execution environment
514  * \param[in] tgt_obj   the target directory
515  * \param[in] tattr     attributes of target directory
516  * \param[in] lname     the link name
517  * \param[in] src_obj   source object for link
518  * \param[in] cattr     attributes for source object
519  *
520  * \retval              = 0 it is allowed to create the link file under tgt_obj
521  * \retval              negative error not allowed to create the link file
522  */
523 static int mdd_link_sanity_check(const struct lu_env *env,
524                                  struct mdd_object *tgt_obj,
525                                  const struct lu_attr *tattr,
526                                  const struct lu_name *lname,
527                                  struct mdd_object *src_obj,
528                                  const struct lu_attr *cattr)
529 {
530         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
531         int rc = 0;
532         ENTRY;
533
534         if (!mdd_object_exists(src_obj))
535                 RETURN(-ENOENT);
536
537         if (mdd_is_dead_obj(src_obj))
538                 RETURN(-ESTALE);
539
540         /* Local ops, no lookup before link, check filename length here. */
541         rc = mdd_name_check(m, lname);
542         if (rc < 0)
543                 RETURN(rc);
544
545         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
546                 RETURN(-EPERM);
547
548         if (S_ISDIR(mdd_object_type(src_obj)))
549                 RETURN(-EPERM);
550
551         LASSERT(src_obj != tgt_obj);
552         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
553         if (rc != 0)
554                 RETURN(rc);
555
556         rc = __mdd_may_link(env, src_obj, cattr);
557
558         RETURN(rc);
559 }
560
561 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
562                                    const char *name, struct thandle *handle,
563                                    struct lustre_capa *capa)
564 {
565         struct dt_object *next = mdd_object_child(pobj);
566         int rc;
567         ENTRY;
568
569         if (dt_try_as_dir(env, next))
570                 rc = dt_delete(env, next, (struct dt_key *)name, handle, capa);
571         else
572                 rc = -ENOTDIR;
573
574         RETURN(rc);
575 }
576
577 static int __mdd_index_insert_only(const struct lu_env *env,
578                                    struct mdd_object *pobj,
579                                    const struct lu_fid *lf, __u32 type,
580                                    const char *name,
581                                    struct thandle *handle,
582                                    struct lustre_capa *capa)
583 {
584         struct dt_object *next = mdd_object_child(pobj);
585         int               rc;
586         ENTRY;
587
588         if (dt_try_as_dir(env, next)) {
589                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
590                 struct lu_ucred         *uc  = lu_ucred_check(env);
591                 int                      ignore_quota;
592
593                 rec->rec_fid = lf;
594                 rec->rec_type = type;
595                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
596                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
597                                (const struct dt_key *)name, handle, capa,
598                                ignore_quota);
599         } else {
600                 rc = -ENOTDIR;
601         }
602         RETURN(rc);
603 }
604
605 /* insert named index, add reference if isdir */
606 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
607                               const struct lu_fid *lf, __u32 type,
608                               const char *name, struct thandle *handle,
609                               struct lustre_capa *capa)
610 {
611         int rc;
612         ENTRY;
613
614         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle, capa);
615         if (rc == 0 && S_ISDIR(type)) {
616                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
617                 mdo_ref_add(env, pobj, handle);
618                 mdd_write_unlock(env, pobj);
619         }
620
621         RETURN(rc);
622 }
623
624 /* delete named index, drop reference if isdir */
625 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
626                               const char *name, int is_dir, struct thandle *handle,
627                               struct lustre_capa *capa)
628 {
629         int               rc;
630         ENTRY;
631
632         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
633         if (rc == 0 && is_dir) {
634                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
635                 mdo_ref_del(env, pobj, handle);
636                 mdd_write_unlock(env, pobj);
637         }
638
639         RETURN(rc);
640 }
641
642 static int mdd_llog_record_calc_size(const struct lu_env *env,
643                                      const struct lu_name *tname,
644                                      const struct lu_name *sname)
645 {
646         const struct lu_ucred   *uc = lu_ucred(env);
647         enum changelog_rec_flags crf = 0;
648         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
649                                             sizeof(struct changelog_rec);
650
651         if (sname != NULL)
652                 crf |= CLF_RENAME;
653
654         if (uc != NULL && uc->uc_jobid[0] != '\0')
655                 crf |= CLF_JOBID;
656
657         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
658                              (tname != NULL ? tname->ln_namelen : 0) +
659                              (sname != NULL ? 1 + sname->ln_namelen : 0));
660 }
661
662 int mdd_declare_changelog_store(const struct lu_env *env,
663                                 struct mdd_device *mdd,
664                                 const struct lu_name *tname,
665                                 const struct lu_name *sname,
666                                 struct thandle *handle)
667 {
668         struct obd_device               *obd = mdd2obd_dev(mdd);
669         struct llog_ctxt                *ctxt;
670         struct llog_changelog_rec       *rec;
671         struct lu_buf                   *buf;
672         int                              reclen;
673         int                              rc;
674
675         /* Not recording */
676         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
677                 return 0;
678
679         reclen = mdd_llog_record_calc_size(env, tname, sname);
680         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
681         if (buf->lb_buf == NULL)
682                 return -ENOMEM;
683
684         rec = buf->lb_buf;
685         rec->cr_hdr.lrh_len = reclen;
686         rec->cr_hdr.lrh_type = CHANGELOG_REC;
687
688         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
689         if (ctxt == NULL)
690                 return -ENXIO;
691
692         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
693         llog_ctxt_put(ctxt);
694
695         return rc;
696 }
697
698 /** Add a changelog entry \a rec to the changelog llog
699  * \param mdd
700  * \param rec
701  * \param handle - currently ignored since llogs start their own transaction;
702  *              this will hopefully be fixed in llog rewrite
703  * \retval 0 ok
704  */
705 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
706                         struct llog_changelog_rec *rec, struct thandle *th)
707 {
708         struct obd_device       *obd = mdd2obd_dev(mdd);
709         struct llog_ctxt        *ctxt;
710         int                      rc;
711
712         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
713                                             changelog_rec_varsize(&rec->cr));
714
715         /* llog_lvfs_write_rec sets the llog tail len */
716         rec->cr_hdr.lrh_type = CHANGELOG_REC;
717         rec->cr.cr_time = cl_time();
718
719         spin_lock(&mdd->mdd_cl.mc_lock);
720         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
721          * but as long as the MDD transactions are ordered correctly for e.g.
722          * rename conflicts, I don't think this should matter. */
723         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
724         spin_unlock(&mdd->mdd_cl.mc_lock);
725
726         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
727         if (ctxt == NULL)
728                 return -ENXIO;
729
730         /* nested journal transaction */
731         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th);
732         llog_ctxt_put(ctxt);
733         if (rc > 0)
734                 rc = 0;
735
736         return rc;
737 }
738
739 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
740                                          const struct lu_fid *sfid,
741                                          const struct lu_fid *spfid,
742                                          const struct lu_name *sname)
743 {
744         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
745         size_t                           extsize = sname->ln_namelen + 1;
746
747         LASSERT(sfid != NULL);
748         LASSERT(spfid != NULL);
749         LASSERT(sname != NULL);
750
751         rnm->cr_sfid = *sfid;
752         rnm->cr_spfid = *spfid;
753
754         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
755         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
756         rec->cr_namelen += extsize;
757 }
758
759 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
760 {
761         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
762
763         if (jobid == NULL || jobid[0] == '\0')
764                 return;
765
766         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
767 }
768
769 /** Store a namespace change changelog record
770  * If this fails, we must fail the whole transaction; we don't
771  * want the change to commit without the log entry.
772  * \param target - mdd_object of change
773  * \param tpfid - target parent dir/object fid
774  * \param sfid - source object fid
775  * \param spfid - source parent fid
776  * \param tname - target name string
777  * \param sname - source name string
778  * \param handle - transacion handle
779  */
780 int mdd_changelog_ns_store(const struct lu_env *env,
781                            struct mdd_device *mdd,
782                            enum changelog_rec_type type,
783                            enum changelog_rec_flags crf,
784                            struct mdd_object *target,
785                            const struct lu_fid *tpfid,
786                            const struct lu_fid *sfid,
787                            const struct lu_fid *spfid,
788                            const struct lu_name *tname,
789                            const struct lu_name *sname,
790                            struct thandle *handle)
791 {
792         const struct lu_ucred           *uc = lu_ucred(env);
793         struct llog_changelog_rec       *rec;
794         struct lu_buf                   *buf;
795         int                              reclen;
796         int                              rc;
797         ENTRY;
798
799         /* Not recording */
800         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
801                 RETURN(0);
802
803         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
804                 RETURN(0);
805
806         LASSERT(tpfid != NULL);
807         LASSERT(tname != NULL);
808         LASSERT(handle != NULL);
809
810         reclen = mdd_llog_record_calc_size(env, tname, sname);
811         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
812         if (buf->lb_buf == NULL)
813                 RETURN(-ENOMEM);
814         rec = buf->lb_buf;
815
816         crf &= CLF_FLAGMASK;
817
818         if (uc != NULL && uc->uc_jobid[0] != '\0')
819                 crf |= CLF_JOBID;
820
821         if (sname != NULL)
822                 crf |= CLF_RENAME;
823         else
824                 crf |= CLF_VERSION;
825
826         rec->cr.cr_flags = crf;
827         rec->cr.cr_type = (__u32)type;
828         rec->cr.cr_pfid = *tpfid;
829         rec->cr.cr_namelen = tname->ln_namelen;
830         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
831
832         if (crf & CLF_RENAME)
833                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
834
835         if (crf & CLF_JOBID)
836                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
837
838         if (likely(target != NULL)) {
839                 rec->cr.cr_tfid = *mdo2fid(target);
840                 target->mod_cltime = cfs_time_current_64();
841         } else {
842                 fid_zero(&rec->cr.cr_tfid);
843         }
844
845         rc = mdd_changelog_store(env, mdd, rec, handle);
846         if (rc < 0) {
847                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
848                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
849                 return -EFAULT;
850         }
851
852         return 0;
853 }
854
855 static int __mdd_links_add(const struct lu_env *env,
856                            struct mdd_object *mdd_obj,
857                            struct linkea_data *ldata,
858                            const struct lu_name *lname,
859                            const struct lu_fid *pfid,
860                            int first, int check)
861 {
862         int rc;
863
864         if (ldata->ld_leh == NULL) {
865                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
866                 if (rc) {
867                         if (rc != -ENODATA)
868                                 return rc;
869                         rc = linkea_data_new(ldata,
870                                              &mdd_env_info(env)->mti_link_buf);
871                         if (rc)
872                                 return rc;
873                 }
874         }
875
876         if (check) {
877                 rc = linkea_links_find(ldata, lname, pfid);
878                 if (rc && rc != -ENOENT)
879                         return rc;
880                 if (rc == 0)
881                         return -EEXIST;
882         }
883
884         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
885                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
886
887                 *tfid = *pfid;
888                 tfid->f_ver = ~0;
889                 linkea_add_buf(ldata, lname, tfid);
890         }
891
892         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
893                 linkea_add_buf(ldata, lname, pfid);
894
895         return linkea_add_buf(ldata, lname, pfid);
896 }
897
898 static int __mdd_links_del(const struct lu_env *env,
899                            struct mdd_object *mdd_obj,
900                            struct linkea_data *ldata,
901                            const struct lu_name *lname,
902                            const struct lu_fid *pfid)
903 {
904         int rc;
905
906         if (ldata->ld_leh == NULL) {
907                 rc = mdd_links_read(env, mdd_obj, ldata);
908                 if (rc)
909                         return rc;
910         }
911
912         rc = linkea_links_find(ldata, lname, pfid);
913         if (rc)
914                 return rc;
915
916         linkea_del_buf(ldata, lname);
917         return 0;
918 }
919
920 static int mdd_linkea_prepare(const struct lu_env *env,
921                               struct mdd_object *mdd_obj,
922                               const struct lu_fid *oldpfid,
923                               const struct lu_name *oldlname,
924                               const struct lu_fid *newpfid,
925                               const struct lu_name *newlname,
926                               int first, int check,
927                               struct linkea_data *ldata)
928 {
929         int rc = 0;
930         int rc2 = 0;
931         ENTRY;
932
933         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
934                 return 0;
935
936         LASSERT(oldpfid != NULL || newpfid != NULL);
937
938         if (mdd_obj->mod_flags & DEAD_OBJ) {
939                 /* Prevent linkea to be updated which is NOT necessary. */
940                 ldata->ld_reclen = 0;
941                 /* No more links, don't bother */
942                 RETURN(0);
943         }
944
945         if (oldpfid != NULL) {
946                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
947                 if (rc) {
948                         if ((check == 1) ||
949                             (rc != -ENODATA && rc != -ENOENT))
950                                 RETURN(rc);
951                         /* No changes done. */
952                         rc = 0;
953                 }
954         }
955
956         /* If renaming, add the new record */
957         if (newpfid != NULL) {
958                 /* even if the add fails, we still delete the out-of-date
959                  * old link */
960                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
961                                       first, check);
962         }
963
964         rc = rc != 0 ? rc : rc2;
965
966         RETURN(rc);
967 }
968
969 int mdd_links_rename(const struct lu_env *env,
970                      struct mdd_object *mdd_obj,
971                      const struct lu_fid *oldpfid,
972                      const struct lu_name *oldlname,
973                      const struct lu_fid *newpfid,
974                      const struct lu_name *newlname,
975                      struct thandle *handle,
976                      struct linkea_data *ldata,
977                      int first, int check)
978 {
979         int rc = 0;
980         ENTRY;
981
982         if (ldata == NULL) {
983                 ldata = &mdd_env_info(env)->mti_link_data;
984                 memset(ldata, 0, sizeof(*ldata));
985                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
986                                         newpfid, newlname, first, check,
987                                         ldata);
988                 if (rc != 0)
989                         GOTO(out, rc);
990         }
991
992         if (ldata->ld_reclen != 0)
993                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
994         EXIT;
995 out:
996         if (rc != 0) {
997                 int error = 1;
998                 if (rc == -EOVERFLOW || rc == -ENOSPC)
999                         error = 0;
1000                 if (oldpfid == NULL)
1001                         CDEBUG(error ? D_ERROR : D_OTHER,
1002                                "link_ea add '%.*s' failed %d "DFID"\n",
1003                                newlname->ln_namelen, newlname->ln_name,
1004                                rc, PFID(mdd_object_fid(mdd_obj)));
1005                 else if (newpfid == NULL)
1006                         CDEBUG(error ? D_ERROR : D_OTHER,
1007                                "link_ea del '%.*s' failed %d "DFID"\n",
1008                                oldlname->ln_namelen, oldlname->ln_name,
1009                                rc, PFID(mdd_object_fid(mdd_obj)));
1010                 else
1011                         CDEBUG(error ? D_ERROR : D_OTHER,
1012                                "link_ea rename '%.*s'->'%.*s' failed %d "
1013                                DFID"\n",
1014                                oldlname->ln_namelen, oldlname->ln_name,
1015                                newlname->ln_namelen, newlname->ln_name,
1016                                rc, PFID(mdd_object_fid(mdd_obj)));
1017         }
1018
1019         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1020                 /* if we vmalloced a large buffer drop it */
1021                 lu_buf_free(ldata->ld_buf);
1022
1023         return rc;
1024 }
1025
1026 static inline int mdd_links_add(const struct lu_env *env,
1027                                 struct mdd_object *mdd_obj,
1028                                 const struct lu_fid *pfid,
1029                                 const struct lu_name *lname,
1030                                 struct thandle *handle,
1031                                 struct linkea_data *ldata, int first)
1032 {
1033         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1034                                 pfid, lname, handle, ldata, first, 0);
1035 }
1036
1037 static inline int mdd_links_del(const struct lu_env *env,
1038                                 struct mdd_object *mdd_obj,
1039                                 const struct lu_fid *pfid,
1040                                 const struct lu_name *lname,
1041                                 struct thandle *handle)
1042 {
1043         return mdd_links_rename(env, mdd_obj, pfid, lname,
1044                                 NULL, NULL, handle, NULL, 0, 0);
1045 }
1046
1047 /** Read the link EA into a temp buffer.
1048  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1049  * A pointer to the buffer is stored in \a ldata::ld_buf.
1050  *
1051  * \retval 0 or error
1052  */
1053 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1054                    struct linkea_data *ldata)
1055 {
1056         int rc;
1057
1058         if (!mdd_object_exists(mdd_obj))
1059                 return -ENODATA;
1060
1061         /* First try a small buf */
1062         LASSERT(env != NULL);
1063         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1064                                                PAGE_CACHE_SIZE);
1065         if (ldata->ld_buf->lb_buf == NULL)
1066                 return -ENOMEM;
1067
1068         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1069                           BYPASS_CAPA);
1070         if (rc == -ERANGE) {
1071                 /* Buf was too small, figure out what we need. */
1072                 lu_buf_free(ldata->ld_buf);
1073                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1074                                    XATTR_NAME_LINK, BYPASS_CAPA);
1075                 if (rc < 0)
1076                         return rc;
1077                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1078                 if (ldata->ld_buf->lb_buf == NULL)
1079                         return -ENOMEM;
1080                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1081                                   XATTR_NAME_LINK, BYPASS_CAPA);
1082         }
1083         if (rc < 0) {
1084                 lu_buf_free(ldata->ld_buf);
1085                 ldata->ld_buf = NULL;
1086                 return rc;
1087         }
1088
1089         return linkea_init(ldata);
1090 }
1091
1092 /** Read the link EA into a temp buffer.
1093  * Uses the name_buf since it is generally large.
1094  * \retval IS_ERR err
1095  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1096  */
1097 struct lu_buf *mdd_links_get(const struct lu_env *env,
1098                              struct mdd_object *mdd_obj)
1099 {
1100         struct linkea_data ldata = { NULL };
1101         int rc;
1102
1103         rc = mdd_links_read(env, mdd_obj, &ldata);
1104         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1105 }
1106
1107 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1108                     struct linkea_data *ldata, struct thandle *handle)
1109 {
1110         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1111                                                      ldata->ld_leh->leh_len);
1112         int                 rc;
1113
1114         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1115                 return 0;
1116
1117         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1118                            mdd_object_capa(env, mdd_obj));
1119         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1120             mdd_object_remote(mdd_obj) == 0) {
1121                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1122
1123                 /* XXX: If the linkEA is overflow, then we need to notify the
1124                  *      namespace LFSCK to skip "nlink" attribute verification
1125                  *      on this object to avoid the "nlink" to be shrinked by
1126                  *      wrong. It may be not good an interaction with LFSCK
1127                  *      like this. We will consider to replace it with other
1128                  *      mechanism in future. LU-5802. */
1129                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1130                                LFSCK_TYPE_NAMESPACE);
1131                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1132                                 lr, handle);
1133         }
1134
1135         return rc;
1136 }
1137
1138 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1139                           struct thandle *handle, struct linkea_data *ldata,
1140                           enum mdd_links_add_overflow overflow)
1141 {
1142         int     rc;
1143         int     ea_len;
1144         void    *linkea;
1145
1146         if (ldata != NULL && ldata->ld_leh != NULL) {
1147                 ea_len = ldata->ld_leh->leh_len;
1148                 linkea = ldata->ld_buf->lb_buf;
1149         } else {
1150                 ea_len = DEFAULT_LINKEA_SIZE;
1151                 linkea = NULL;
1152         }
1153
1154         /* XXX: max size? */
1155         rc = mdo_declare_xattr_set(env, mdd_obj,
1156                                    mdd_buf_get_const(env, linkea, ea_len),
1157                                    XATTR_NAME_LINK, 0, handle);
1158         if (rc != 0)
1159                 return rc;
1160
1161         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1162                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1163
1164                 /* XXX: If the linkEA is overflow, then we need to notify the
1165                  *      namespace LFSCK to skip "nlink" attribute verification
1166                  *      on this object to avoid the "nlink" to be shrinked by
1167                  *      wrong. It may be not good an interaction with LFSCK
1168                  *      like this. We will consider to replace it with other
1169                  *      mechanism in future. LU-5802. */
1170                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1171                                LFSCK_TYPE_NAMESPACE);
1172                 rc = lfsck_in_notify(env,
1173                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1174                                      lr, handle);
1175         }
1176
1177         return rc;
1178 }
1179
1180 static inline int mdd_declare_links_del(const struct lu_env *env,
1181                                         struct mdd_object *c,
1182                                         struct thandle *handle)
1183 {
1184         int rc = 0;
1185
1186         /* For directory, the linkEA will be removed together
1187          * with the object. */
1188         if (!S_ISDIR(mdd_object_type(c)))
1189                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1190
1191         return rc;
1192 }
1193
1194 static int mdd_declare_link(const struct lu_env *env,
1195                             struct mdd_device *mdd,
1196                             struct mdd_object *p,
1197                             struct mdd_object *c,
1198                             const struct lu_name *name,
1199                             struct thandle *handle,
1200                             struct lu_attr *la,
1201                             struct linkea_data *data)
1202 {
1203         int rc;
1204
1205         rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
1206                                       name->ln_name, handle);
1207         if (rc != 0)
1208                 return rc;
1209
1210         rc = mdo_declare_ref_add(env, c, handle);
1211         if (rc != 0)
1212                 return rc;
1213
1214         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1215                 rc = mdo_declare_ref_add(env, c, handle);
1216                 if (rc != 0)
1217                         return rc;
1218         }
1219
1220         la->la_valid = LA_CTIME | LA_MTIME;
1221         rc = mdo_declare_attr_set(env, p, la, handle);
1222         if (rc != 0)
1223                 return rc;
1224
1225         la->la_valid = LA_CTIME;
1226         rc = mdo_declare_attr_set(env, c, la, handle);
1227         if (rc != 0)
1228                 return rc;
1229
1230         rc = mdd_declare_links_add(env, c, handle, data,
1231                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1232         if (rc != 0)
1233                 return rc;
1234
1235         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1236
1237         return rc;
1238 }
1239
1240 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1241                     struct md_object *src_obj, const struct lu_name *lname,
1242                     struct md_attr *ma)
1243 {
1244         const char *name = lname->ln_name;
1245         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1246         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1247         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1248         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1249         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1250         struct mdd_device *mdd = mdo2mdd(src_obj);
1251         struct thandle *handle;
1252         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1253         int rc;
1254         ENTRY;
1255
1256         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
1257         if (rc != 0)
1258                 RETURN(rc);
1259
1260         rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
1261         if (rc != 0)
1262                 RETURN(rc);
1263
1264         handle = mdd_trans_create(env, mdd);
1265         if (IS_ERR(handle))
1266                 GOTO(out_pending, rc = PTR_ERR(handle));
1267
1268         memset(ldata, 0, sizeof(*ldata));
1269
1270         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1271         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1272
1273         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1274                               la, ldata);
1275         if (rc)
1276                 GOTO(stop, rc);
1277
1278         rc = mdd_trans_start(env, mdd, handle);
1279         if (rc)
1280                 GOTO(stop, rc);
1281
1282         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1283         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1284                                    cattr);
1285         if (rc)
1286                 GOTO(out_unlock, rc);
1287
1288         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1289                 rc = mdo_ref_add(env, mdd_sobj, handle);
1290                 if (rc != 0)
1291                         GOTO(out_unlock, rc);
1292         }
1293
1294         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1295                 rc = mdo_ref_add(env, mdd_sobj, handle);
1296                 if (rc != 0)
1297                         GOTO(out_unlock, rc);
1298         }
1299
1300         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
1301                 struct lu_fid tfid = *mdo2fid(mdd_sobj);
1302
1303                 tfid.f_oid++;
1304                 rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
1305                                              mdd_object_type(mdd_sobj),
1306                                              name, handle,
1307                                              mdd_object_capa(env, mdd_tobj));
1308         } else {
1309                 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1310                                              mdd_object_type(mdd_sobj),
1311                                              name, handle,
1312                                              mdd_object_capa(env, mdd_tobj));
1313         }
1314
1315         if (rc != 0) {
1316                 mdo_ref_del(env, mdd_sobj, handle);
1317                 GOTO(out_unlock, rc);
1318         }
1319
1320         la->la_valid = LA_CTIME | LA_MTIME;
1321         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1322         if (rc)
1323                 GOTO(out_unlock, rc);
1324
1325         la->la_valid = LA_CTIME;
1326         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1327         if (rc == 0) {
1328                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1329                                         mdo2fid(mdd_tobj), lname, 0, 0,
1330                                         ldata);
1331                 if (rc == 0)
1332                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1333                                       lname, handle, ldata, 0);
1334                 /* The failure of links_add should not cause the link
1335                  * failure, reset rc here */
1336                 rc = 0;
1337         }
1338         EXIT;
1339 out_unlock:
1340         mdd_write_unlock(env, mdd_sobj);
1341         if (rc == 0)
1342                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1343                                             mdo2fid(mdd_tobj), NULL, NULL,
1344                                             lname, NULL, handle);
1345 stop:
1346         mdd_trans_stop(env, mdd, rc, handle);
1347
1348         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1349                 /* if we vmalloced a large buffer drop it */
1350                 lu_buf_free(ldata->ld_buf);
1351 out_pending:
1352         return rc;
1353 }
1354
1355 static int mdd_mark_dead_object(const struct lu_env *env,
1356                                 struct mdd_object *obj, struct thandle *handle,
1357                                 bool declare)
1358 {
1359         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1360         int rc;
1361
1362         if (!declare)
1363                 obj->mod_flags |= DEAD_OBJ;
1364
1365         if (!S_ISDIR(mdd_object_type(obj)))
1366                 return 0;
1367
1368         attr->la_valid = LA_FLAGS;
1369         attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
1370
1371         if (declare)
1372                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1373         else
1374                 rc = mdo_attr_set(env, obj, attr, handle,
1375                                   mdd_object_capa(env, obj));
1376
1377         return rc;
1378 }
1379
1380 static int mdd_declare_finish_unlink(const struct lu_env *env,
1381                                      struct mdd_object *obj,
1382                                      struct thandle *handle)
1383 {
1384         int     rc;
1385
1386         rc = mdd_mark_dead_object(env, obj, handle, true);
1387         if (rc != 0)
1388                 return rc;
1389
1390         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1391         if (rc != 0)
1392                 return rc;
1393
1394         rc = mdo_declare_destroy(env, obj, handle);
1395         if (rc != 0)
1396                 return rc;
1397
1398         return mdd_declare_links_del(env, obj, handle);
1399 }
1400
1401 /* caller should take a lock before calling */
1402 int mdd_finish_unlink(const struct lu_env *env,
1403                       struct mdd_object *obj, struct md_attr *ma,
1404                       const struct mdd_object *pobj,
1405                       const struct lu_name *lname,
1406                       struct thandle *th)
1407 {
1408         int rc = 0;
1409         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1410         ENTRY;
1411
1412         LASSERT(mdd_write_locked(env, obj) != 0);
1413
1414         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1415                 rc = mdd_mark_dead_object(env, obj, th, false);
1416                 if (rc != 0)
1417                         RETURN(rc);
1418
1419                 /* add new orphan and the object
1420                  * will be deleted during mdd_close() */
1421                 if (obj->mod_count) {
1422                         rc = __mdd_orphan_add(env, obj, th);
1423                         if (rc == 0)
1424                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1425                                         "orphan list, open count = %d\n",
1426                                         PFID(mdd_object_fid(obj)),
1427                                         obj->mod_count);
1428                         else
1429                                 CERROR("Object "DFID" fail to be an orphan, "
1430                                        "open count = %d, maybe cause failed "
1431                                        "open replay\n",
1432                                         PFID(mdd_object_fid(obj)),
1433                                         obj->mod_count);
1434                 } else {
1435                         rc = mdo_destroy(env, obj, th);
1436                 }
1437         } else if (!is_dir) {
1438                 /* old files may not have link ea; ignore errors */
1439                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1440         }
1441
1442         RETURN(rc);
1443 }
1444
1445 /*
1446  * pobj maybe NULL
1447  * has mdd_write_lock on cobj already, but not on pobj yet
1448  */
1449 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1450                             const struct lu_attr *pattr,
1451                             struct mdd_object *cobj,
1452                             const struct lu_attr *cattr)
1453 {
1454         int rc;
1455         ENTRY;
1456
1457         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1458
1459         RETURN(rc);
1460 }
1461
1462 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1463                               struct mdd_object *p, struct mdd_object *c,
1464                               const struct lu_name *name, struct md_attr *ma,
1465                               struct thandle *handle, int no_name, int is_dir)
1466 {
1467         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1468         int              rc;
1469
1470         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1471                 if (likely(no_name == 0)) {
1472                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1473                                                       handle);
1474                         if (rc != 0)
1475                                 return rc;
1476                 }
1477
1478                 if (is_dir != 0) {
1479                         rc = mdo_declare_ref_del(env, p, handle);
1480                         if (rc != 0)
1481                                 return rc;
1482                 }
1483         }
1484
1485         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1486         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1487         la->la_valid = LA_CTIME | LA_MTIME;
1488         rc = mdo_declare_attr_set(env, p, la, handle);
1489         if (rc)
1490                 return rc;
1491
1492         if (c != NULL) {
1493                 rc = mdo_declare_ref_del(env, c, handle);
1494                 if (rc)
1495                         return rc;
1496
1497                 rc = mdo_declare_ref_del(env, c, handle);
1498                 if (rc)
1499                         return rc;
1500
1501                 la->la_valid = LA_CTIME;
1502                 rc = mdo_declare_attr_set(env, c, la, handle);
1503                 if (rc)
1504                         return rc;
1505
1506                 rc = mdd_declare_finish_unlink(env, c, handle);
1507                 if (rc)
1508                         return rc;
1509
1510                 /* FIXME: need changelog for remove entry */
1511                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1512         }
1513
1514         return rc;
1515 }
1516
1517 /*
1518  * test if a file has an HSM archive
1519  * if HSM attributes are not found in ma update them from
1520  * HSM xattr
1521  */
1522 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1523                                    struct mdd_object *obj,
1524                                    struct md_attr *ma)
1525 {
1526         ENTRY;
1527
1528         if (!(ma->ma_valid & MA_HSM)) {
1529                 /* no HSM MD provided, read xattr */
1530                 struct lu_buf   *hsm_buf;
1531                 const size_t     buflen = sizeof(struct hsm_attrs);
1532                 int              rc;
1533
1534                 hsm_buf = mdd_buf_get(env, NULL, 0);
1535                 lu_buf_alloc(hsm_buf, buflen);
1536                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM,
1537                                    mdd_object_capa(env, obj));
1538                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1539                 lu_buf_free(hsm_buf);
1540                 if (rc < 0)
1541                         RETURN(false);
1542
1543                 ma->ma_valid = MA_HSM;
1544         }
1545         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1546                 RETURN(true);
1547         RETURN(false);
1548 }
1549
1550 /**
1551  * Delete name entry and the object.
1552  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1553  * does not exist for this object, and it could only happen during resending
1554  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1555  * is also needed in this case(needed by changelog), so we have to add another
1556  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1557  * the ENOENT failure should be able to be fixed by redo mechanism.
1558  */
1559 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1560                       struct md_object *cobj, const struct lu_name *lname,
1561                       struct md_attr *ma, int no_name)
1562 {
1563         const char *name = lname->ln_name;
1564         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1565         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1566         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1567         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1568         struct mdd_object *mdd_cobj = NULL;
1569         struct mdd_device *mdd = mdo2mdd(pobj);
1570         struct thandle    *handle;
1571         int rc, is_dir = 0;
1572         ENTRY;
1573
1574         /* cobj == NULL means only delete name entry */
1575         if (likely(cobj != NULL)) {
1576                 mdd_cobj = md2mdd_obj(cobj);
1577                 if (mdd_object_exists(mdd_cobj) == 0)
1578                         RETURN(-ENOENT);
1579         }
1580
1581         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
1582         if (rc)
1583                 RETURN(rc);
1584
1585         if (likely(mdd_cobj != NULL)) {
1586                 /* fetch cattr */
1587                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1588                 if (rc)
1589                         RETURN(rc);
1590
1591                 is_dir = S_ISDIR(cattr->la_mode);
1592         }
1593
1594         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1595         if (rc)
1596                 RETURN(rc);
1597
1598         handle = mdd_trans_create(env, mdd);
1599         if (IS_ERR(handle))
1600                 RETURN(PTR_ERR(handle));
1601
1602         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1603                                 lname, ma, handle, no_name, is_dir);
1604         if (rc)
1605                 GOTO(stop, rc);
1606
1607         rc = mdd_trans_start(env, mdd, handle);
1608         if (rc)
1609                 GOTO(stop, rc);
1610
1611         if (likely(mdd_cobj != NULL))
1612                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1613
1614         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1615                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1616                                         mdd_object_capa(env, mdd_pobj));
1617                 if (rc)
1618                         GOTO(cleanup, rc);
1619         }
1620
1621         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1622             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1623                 GOTO(cleanup, rc = 0);
1624
1625         if (likely(mdd_cobj != NULL)) {
1626                 rc = mdo_ref_del(env, mdd_cobj, handle);
1627                 if (rc != 0) {
1628                         __mdd_index_insert_only(env, mdd_pobj,
1629                                                 mdo2fid(mdd_cobj),
1630                                                 mdd_object_type(mdd_cobj),
1631                                                 name, handle,
1632                                                 mdd_object_capa(env, mdd_pobj));
1633                         GOTO(cleanup, rc);
1634                 }
1635
1636                 if (is_dir)
1637                         /* unlink dot */
1638                         mdo_ref_del(env, mdd_cobj, handle);
1639
1640                 /* fetch updated nlink */
1641                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1642                 if (rc)
1643                         GOTO(cleanup, rc);
1644         }
1645
1646         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1647         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1648
1649         la->la_valid = LA_CTIME | LA_MTIME;
1650         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1651         if (rc)
1652                 GOTO(cleanup, rc);
1653
1654         /* Enough for only unlink the entry */
1655         if (unlikely(mdd_cobj == NULL))
1656                 GOTO(stop, rc);
1657
1658         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1659                 /* update ctime of an unlinked file only if it is still
1660                  * opened or a link still exists */
1661                 la->la_valid = LA_CTIME;
1662                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1663                 if (rc)
1664                         GOTO(cleanup, rc);
1665         }
1666
1667         /* XXX: this transfer to ma will be removed with LOD/OSP */
1668         ma->ma_attr = *cattr;
1669         ma->ma_valid |= MA_INODE;
1670         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1671
1672         /* fetch updated nlink */
1673         if (rc == 0)
1674                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1675
1676         /* if object is removed then we can't get its attrs, use last get */
1677         if (cattr->la_nlink == 0) {
1678                 ma->ma_attr = *cattr;
1679                 ma->ma_valid |= MA_INODE;
1680         }
1681         EXIT;
1682 cleanup:
1683         if (likely(mdd_cobj != NULL))
1684                 mdd_write_unlock(env, mdd_cobj);
1685
1686         if (rc == 0) {
1687                 int cl_flags = 0;
1688
1689                 if (cattr->la_nlink == 0) {
1690                         cl_flags |= CLF_UNLINK_LAST;
1691                         /* search for an existing archive */
1692                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1693                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1694                 }
1695
1696                 rc = mdd_changelog_ns_store(env, mdd,
1697                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1698                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1699                         handle);
1700         }
1701
1702 stop:
1703         mdd_trans_stop(env, mdd, rc, handle);
1704
1705         return rc;
1706 }
1707
1708 /*
1709  * The permission has been checked when obj created, no need check again.
1710  */
1711 static int mdd_cd_sanity_check(const struct lu_env *env,
1712                                struct mdd_object *obj)
1713 {
1714         ENTRY;
1715
1716         /* EEXIST check */
1717         if (!obj || mdd_is_dead_obj(obj))
1718                 RETURN(-ENOENT);
1719
1720         RETURN(0);
1721 }
1722
1723 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1724                            struct md_object *cobj, const struct md_op_spec *spec,
1725                            struct md_attr *ma)
1726 {
1727         struct mdd_device *mdd = mdo2mdd(cobj);
1728         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1729         struct mdd_object *son = md2mdd_obj(cobj);
1730         struct thandle    *handle;
1731         const struct lu_buf *buf;
1732         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1733         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1734         int                rc;
1735         ENTRY;
1736
1737         rc = mdd_cd_sanity_check(env, son);
1738         if (rc)
1739                 RETURN(rc);
1740
1741         if (!md_should_create(spec->sp_cr_flags))
1742                 RETURN(0);
1743
1744         /*
1745          * there are following use cases for this function:
1746          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1747          *    striping can be specified or not
1748          * 2) CMD?
1749          */
1750         rc = mdd_la_get(env, son, attr, BYPASS_CAPA);
1751         if (rc)
1752                 RETURN(rc);
1753
1754         /* calling ->ah_make_hint() is used to transfer information from parent */
1755         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1756
1757         handle = mdd_trans_create(env, mdd);
1758         if (IS_ERR(handle))
1759                 GOTO(out_free, rc = PTR_ERR(handle));
1760
1761         /*
1762          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1763          * Should this be fixed?
1764          */
1765         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1766                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1767                spec->sp_cr_flags, spec->no_create);
1768
1769         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1770                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1771                                         spec->u.sp_ea.eadatalen);
1772         } else {
1773                 buf = &LU_BUF_NULL;
1774         }
1775
1776         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1777                                   XATTR_NAME_LOV, 0, handle);
1778         if (rc)
1779                 GOTO(stop, rc);
1780
1781         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1782         if (rc)
1783                 GOTO(stop, rc);
1784
1785         rc = mdd_trans_start(env, mdd, handle);
1786         if (rc)
1787                 GOTO(stop, rc);
1788
1789         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1790                           0, handle, mdd_object_capa(env, son));
1791
1792         if (rc)
1793                 GOTO(stop, rc);
1794
1795         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1796
1797 stop:
1798         mdd_trans_stop(env, mdd, rc, handle);
1799 out_free:
1800         RETURN(rc);
1801 }
1802
1803 static int mdd_declare_object_initialize(const struct lu_env *env,
1804                                          struct mdd_object *parent,
1805                                          struct mdd_object *child,
1806                                          struct lu_attr *attr,
1807                                          struct thandle *handle)
1808 {
1809         int rc;
1810         ENTRY;
1811
1812         /*
1813          * inode mode has been set in creation time, and it's based on umask,
1814          * la_mode and acl, don't set here again! (which will go wrong
1815          * because below function doesn't consider umask).
1816          * I'd suggest set all object attributes in creation time, see above.
1817          */
1818         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1819         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1820         rc = mdo_declare_attr_set(env, child, attr, handle);
1821         attr->la_valid |= LA_MODE | LA_TYPE;
1822         if (rc != 0 || !S_ISDIR(attr->la_mode))
1823                 RETURN(rc);
1824
1825         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1826                                       dot, handle);
1827         if (rc != 0)
1828                 RETURN(rc);
1829
1830         rc = mdo_declare_ref_add(env, child, handle);
1831         if (rc != 0)
1832                 RETURN(rc);
1833
1834         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1835                                       dotdot, handle);
1836
1837         RETURN(rc);
1838 }
1839
1840 static int mdd_object_initialize(const struct lu_env *env,
1841                                  const struct lu_fid *pfid,
1842                                  struct mdd_object *child,
1843                                  struct lu_attr *attr, struct thandle *handle,
1844                                  const struct md_op_spec *spec)
1845 {
1846         int rc = 0;
1847         ENTRY;
1848
1849         if (S_ISDIR(attr->la_mode)) {
1850                 /* Add "." and ".." for newly created dir */
1851                 mdo_ref_add(env, child, handle);
1852                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1853                                              S_IFDIR, dot, handle, BYPASS_CAPA);
1854                 if (rc == 0)
1855                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1856                                                      dotdot, handle,
1857                                                      BYPASS_CAPA);
1858                 if (rc != 0)
1859                         mdo_ref_del(env, child, handle);
1860         }
1861
1862         RETURN(rc);
1863 }
1864
1865 /**
1866  * This function checks whether it can create a file/dir under the
1867  * directory(@pobj). The directory(@pobj) is not being locked by
1868  * mdd lock.
1869  *
1870  * \param[in] env       execution environment
1871  * \param[in] pobj      the directory to create files
1872  * \param[in] pattr     the attributes of the directory
1873  * \param[in] lname     the name of the created file/dir
1874  * \param[in] cattr     the attributes of the file/dir
1875  * \param[in] spec      create specification
1876  *
1877  * \retval              = 0 it is allowed to create file/dir under
1878  *                      the directory
1879  * \retval              negative error not allowed to create file/dir
1880  *                      under the directory
1881  */
1882 static int mdd_create_sanity_check(const struct lu_env *env,
1883                                    struct md_object *pobj,
1884                                    const struct lu_attr *pattr,
1885                                    const struct lu_name *lname,
1886                                    struct lu_attr *cattr,
1887                                    struct md_op_spec *spec)
1888 {
1889         struct mdd_thread_info *info = mdd_env_info(env);
1890         struct lu_fid     *fid       = &info->mti_fid;
1891         struct mdd_object *obj       = md2mdd_obj(pobj);
1892         struct mdd_device *m         = mdo2mdd(pobj);
1893         bool            check_perm = true;
1894         int rc;
1895         ENTRY;
1896
1897         /* EEXIST check */
1898         if (mdd_is_dead_obj(obj))
1899                 RETURN(-ENOENT);
1900
1901         /*
1902          * In some cases this lookup is not needed - we know before if name
1903          * exists or not because MDT performs lookup for it.
1904          * name length check is done in lookup.
1905          */
1906         if (spec->sp_cr_lookup) {
1907                 /*
1908                  * Check if the name already exist, though it will be checked in
1909                  * _index_insert also, for avoiding rolling back if exists
1910                  * _index_insert.
1911                  */
1912                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1913                                   MAY_WRITE | MAY_EXEC);
1914                 if (rc != -ENOENT)
1915                         RETURN(rc ? : -EEXIST);
1916
1917                 /* Permission is already being checked in mdd_lookup */
1918                 check_perm = false;
1919         }
1920
1921         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1922         if (rc != 0)
1923                 RETURN(rc);
1924
1925         /* sgid check */
1926         if (pattr->la_mode & S_ISGID) {
1927                 cattr->la_gid = pattr->la_gid;
1928                 if (S_ISDIR(cattr->la_mode)) {
1929                         cattr->la_mode |= S_ISGID;
1930                         cattr->la_valid |= LA_MODE;
1931                 }
1932         }
1933
1934         rc = mdd_name_check(m, lname);
1935         if (rc < 0)
1936                 RETURN(rc);
1937
1938         switch (cattr->la_mode & S_IFMT) {
1939         case S_IFLNK: {
1940                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1941
1942                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1943                         RETURN(-ENAMETOOLONG);
1944                 else
1945                         RETURN(0);
1946         }
1947         case S_IFDIR:
1948         case S_IFREG:
1949         case S_IFCHR:
1950         case S_IFBLK:
1951         case S_IFIFO:
1952         case S_IFSOCK:
1953                 rc = 0;
1954                 break;
1955         default:
1956                 rc = -EINVAL;
1957                 break;
1958         }
1959         RETURN(rc);
1960 }
1961
1962 static int mdd_declare_object_create(const struct lu_env *env,
1963                                      struct mdd_device *mdd,
1964                                      struct mdd_object *p, struct mdd_object *c,
1965                                      struct lu_attr *attr,
1966                                      struct thandle *handle,
1967                                      const struct md_op_spec *spec,
1968                                      struct lu_buf *def_acl_buf,
1969                                      struct lu_buf *acl_buf,
1970                                      struct dt_allocation_hint *hint)
1971 {
1972         int rc;
1973
1974         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
1975                                                 hint);
1976         if (rc)
1977                 GOTO(out, rc);
1978
1979 #ifdef CONFIG_FS_POSIX_ACL
1980         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
1981                 /* if dir, then can inherit default ACl */
1982                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
1983                                            XATTR_NAME_ACL_DEFAULT,
1984                                            0, handle);
1985                 if (rc)
1986                         GOTO(out, rc);
1987         }
1988
1989         if (acl_buf->lb_len > 0) {
1990                 rc = mdo_declare_attr_set(env, c, attr, handle);
1991                 if (rc)
1992                         GOTO(out, rc);
1993
1994                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1995                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1996                 if (rc)
1997                         GOTO(out, rc);
1998         }
1999 #endif
2000         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2001         if (rc)
2002                 GOTO(out, rc);
2003
2004         /* replay case, create LOV EA from client data */
2005         if (spec->no_create ||
2006             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2007                 const struct lu_buf *buf;
2008
2009                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2010                                         spec->u.sp_ea.eadatalen);
2011                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2012                                            handle);
2013                 if (rc)
2014                         GOTO(out, rc);
2015         }
2016
2017         if (S_ISLNK(attr->la_mode)) {
2018                 const char *target_name = spec->u.sp_symname;
2019                 int sym_len = strlen(target_name);
2020                 const struct lu_buf *buf;
2021
2022                 buf = mdd_buf_get_const(env, target_name, sym_len);
2023                 rc = dt_declare_record_write(env, mdd_object_child(c),
2024                                              buf, 0, handle);
2025                 if (rc)
2026                         GOTO(out, rc);
2027         }
2028 out:
2029         return rc;
2030 }
2031
2032 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2033                               struct mdd_object *p, struct mdd_object *c,
2034                               const struct lu_name *name,
2035                               struct lu_attr *attr,
2036                               struct thandle *handle,
2037                               const struct md_op_spec *spec,
2038                               struct linkea_data *ldata,
2039                               struct lu_buf *def_acl_buf,
2040                               struct lu_buf *acl_buf,
2041                               struct dt_allocation_hint *hint)
2042 {
2043         int rc;
2044
2045         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2046                                        def_acl_buf, acl_buf, hint);
2047         if (rc)
2048                 GOTO(out, rc);
2049
2050         if (S_ISDIR(attr->la_mode)) {
2051                 rc = mdo_declare_ref_add(env, p, handle);
2052                 if (rc)
2053                         GOTO(out, rc);
2054         }
2055
2056         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2057                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2058                 if (rc)
2059                         GOTO(out, rc);
2060         } else {
2061                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2062
2063                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2064                                               name->ln_name, handle);
2065                 if (rc != 0)
2066                         return rc;
2067
2068                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2069                 if (rc)
2070                         return rc;
2071
2072                 *la = *attr;
2073                 la->la_valid = LA_CTIME | LA_MTIME;
2074                 rc = mdo_declare_attr_set(env, p, la, handle);
2075                 if (rc)
2076                         return rc;
2077
2078                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2079                 if (rc)
2080                         return rc;
2081         }
2082
2083         /* XXX: For remote create, it should indicate the remote RPC
2084          * will be sent after local transaction is finished, which
2085          * is not very nice, but it will be removed once we fully support
2086          * async update */
2087         if (mdd_object_remote(p) && handle->th_update != NULL)
2088                 handle->th_update->tu_sent_after_local_trans = 1;
2089 out:
2090         return rc;
2091 }
2092
2093 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2094                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2095                         struct lu_buf *acl_buf)
2096 {
2097         int     rc;
2098         ENTRY;
2099
2100         if (S_ISLNK(la->la_mode)) {
2101                 acl_buf->lb_len = 0;
2102                 def_acl_buf->lb_len = 0;
2103                 RETURN(0);
2104         }
2105
2106         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2107         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2108                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
2109         mdd_read_unlock(env, pobj);
2110         if (rc > 0) {
2111                 /* If there are default ACL, fix mode/ACL by default ACL */
2112                 def_acl_buf->lb_len = rc;
2113                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2114                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2115                 acl_buf->lb_len = rc;
2116                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2117                 if (rc < 0)
2118                         RETURN(rc);
2119         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2120                 /* If there are no default ACL, fix mode by mask */
2121                 struct lu_ucred *uc = lu_ucred(env);
2122
2123                 /* The create triggered by MDT internal events, such as
2124                  * LFSCK reset, will not contain valid "uc". */
2125                 if (unlikely(uc != NULL))
2126                         la->la_mode &= ~uc->uc_umask;
2127                 rc = 0;
2128                 acl_buf->lb_len = 0;
2129                 def_acl_buf->lb_len = 0;
2130         }
2131
2132         RETURN(rc);
2133 }
2134
2135 /**
2136  * Create a metadata object and initialize it, set acl, xattr.
2137  **/
2138 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2139                              struct mdd_object *son, struct lu_attr *attr,
2140                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2141                              struct lu_buf *def_acl_buf,
2142                              struct dt_allocation_hint *hint,
2143                              struct thandle *handle)
2144 {
2145         int                     rc;
2146
2147         mdd_write_lock(env, son, MOR_TGT_CHILD);
2148         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2149                                         hint);
2150         if (rc)
2151                 GOTO(unlock, rc);
2152
2153         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2154          * created in declare phase, they also needs to be added to master
2155          * object as sub-directory entry. So it has to initialize the master
2156          * object, then set dir striped EA.(in mdo_xattr_set) */
2157         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2158                                    spec);
2159         if (rc != 0)
2160                 GOTO(err_destroy, rc);
2161
2162         /*
2163          * in case of replay we just set LOVEA provided by the client
2164          * XXX: I think it would be interesting to try "old" way where
2165          *      MDT calls this xattr_set(LOV) in a different transaction.
2166          *      probably this way we code can be made better.
2167          */
2168
2169         /* During creation, there are only a few cases we need do xattr_set to
2170          * create stripes.
2171          * 1. regular file: see comments above.
2172          * 2. dir: inherit default striping or pool settings from parent.
2173          * 3. create striped directory with provided stripeEA.
2174          * 4. create striped directory because inherit default layout from the
2175          * parent.
2176          */
2177         if (spec->no_create ||
2178             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2179             S_ISDIR(attr->la_mode)) {
2180                 const struct lu_buf *buf;
2181
2182                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2183                                         spec->u.sp_ea.eadatalen);
2184                 rc = mdo_xattr_set(env, son, buf,
2185                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2186                                                             XATTR_NAME_LOV, 0,
2187                                    handle, BYPASS_CAPA);
2188                 if (rc != 0)
2189                         GOTO(err_destroy, rc);
2190         }
2191
2192 #ifdef CONFIG_FS_POSIX_ACL
2193         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2194             S_ISDIR(attr->la_mode)) {
2195                 /* set default acl */
2196                 rc = mdo_xattr_set(env, son, def_acl_buf,
2197                                    XATTR_NAME_ACL_DEFAULT, 0,
2198                                    handle, BYPASS_CAPA);
2199                 if (rc)
2200                         GOTO(err_destroy, rc);
2201         }
2202         /* set its own acl */
2203         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2204                 rc = mdo_xattr_set(env, son, acl_buf,
2205                                    XATTR_NAME_ACL_ACCESS,
2206                                    0, handle, BYPASS_CAPA);
2207                 if (rc)
2208                         GOTO(err_destroy, rc);
2209         }
2210 #endif
2211
2212         if (S_ISLNK(attr->la_mode)) {
2213                 struct lu_ucred  *uc = lu_ucred_assert(env);
2214                 struct dt_object *dt = mdd_object_child(son);
2215                 const char *target_name = spec->u.sp_symname;
2216                 int sym_len = strlen(target_name);
2217                 const struct lu_buf *buf;
2218                 loff_t pos = 0;
2219
2220                 buf = mdd_buf_get_const(env, target_name, sym_len);
2221                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2222                                                 mdd_object_capa(env, son),
2223                                                 uc->uc_cap &
2224                                                 CFS_CAP_SYS_RESOURCE_MASK);
2225
2226                 if (rc == sym_len)
2227                         rc = 0;
2228                 else
2229                         GOTO(err_initlized, rc = -EFAULT);
2230         }
2231
2232 err_initlized:
2233         if (unlikely(rc != 0)) {
2234                 int rc2;
2235                 if (S_ISDIR(attr->la_mode)) {
2236                         /* Drop the reference, no need to delete "."/"..",
2237                          * because the object to be destroied directly. */
2238                         rc2 = mdo_ref_del(env, son, handle);
2239                         if (rc2 != 0)
2240                                 GOTO(unlock, rc);
2241                 }
2242                 rc2 = mdo_ref_del(env, son, handle);
2243                 if (rc2 != 0)
2244                         GOTO(unlock, rc);
2245 err_destroy:
2246                 mdo_destroy(env, son, handle);
2247         }
2248 unlock:
2249         mdd_write_unlock(env, son);
2250         RETURN(rc);
2251 }
2252
2253 /*
2254  * Create object and insert it into namespace.
2255  */
2256 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2257                       const struct lu_name *lname, struct md_object *child,
2258                       struct md_op_spec *spec, struct md_attr* ma)
2259 {
2260         struct mdd_thread_info  *info = mdd_env_info(env);
2261         struct lu_attr          *la = &info->mti_la_for_fix;
2262         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2263         struct mdd_object       *son = md2mdd_obj(child);
2264         struct mdd_device       *mdd = mdo2mdd(pobj);
2265         struct lu_attr          *attr = &ma->ma_attr;
2266         struct thandle          *handle;
2267         struct lu_attr          *pattr = &info->mti_pattr;
2268         struct lu_buf           acl_buf;
2269         struct lu_buf           def_acl_buf;
2270         struct linkea_data      *ldata = &info->mti_link_data;
2271         const char              *name = lname->ln_name;
2272         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2273         int                      rc;
2274         int                      rc2;
2275         ENTRY;
2276
2277         /*
2278          * Two operations have to be performed:
2279          *
2280          *  - an allocation of a new object (->do_create()), and
2281          *
2282          *  - an insertion into a parent index (->dio_insert()).
2283          *
2284          * Due to locking, operation order is not important, when both are
2285          * successful, *but* error handling cases are quite different:
2286          *
2287          *  - if insertion is done first, and following object creation fails,
2288          *  insertion has to be rolled back, but this operation might fail
2289          *  also leaving us with dangling index entry.
2290          *
2291          *  - if creation is done first, is has to be undone if insertion
2292          *  fails, leaving us with leaked space, which is neither good, nor
2293          *  fatal.
2294          *
2295          * It seems that creation-first is simplest solution, but it is
2296          * sub-optimal in the frequent
2297          *
2298          *         $ mkdir foo
2299          *         $ mkdir foo
2300          *
2301          * case, because second mkdir is bound to create object, only to
2302          * destroy it immediately.
2303          *
2304          * To avoid this follow local file systems that do double lookup:
2305          *
2306          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2307          *
2308          *     1. create            (mdd_object_create_internal())
2309          *
2310          *     2. insert            (__mdd_index_insert(), lookup again)
2311          */
2312
2313         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2314         if (rc != 0)
2315                 RETURN(rc);
2316
2317         /* Sanity checks before big job. */
2318         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2319         if (rc)
2320                 RETURN(rc);
2321
2322         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2323                 GOTO(out_free, rc = -EINPROGRESS);
2324
2325         handle = mdd_trans_create(env, mdd);
2326         if (IS_ERR(handle))
2327                 GOTO(out_free, rc = PTR_ERR(handle));
2328
2329         acl_buf.lb_buf = info->mti_xattr_buf;
2330         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2331         def_acl_buf.lb_buf = info->mti_key;
2332         def_acl_buf.lb_len = sizeof(info->mti_key);
2333         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2334         if (rc < 0)
2335                 GOTO(out_stop, rc);
2336
2337         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2338
2339         memset(ldata, 0, sizeof(*ldata));
2340         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2341                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2342
2343                 tfid.f_oid--;
2344                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2345                                         &tfid, lname, 1, 0, ldata);
2346         } else {
2347                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2348                                         mdd_object_fid(mdd_pobj),
2349                                         lname, 1, 0, ldata);
2350         }
2351
2352         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2353                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2354                                 hint);
2355         if (rc)
2356                 GOTO(out_stop, rc);
2357
2358         rc = mdd_trans_start(env, mdd, handle);
2359         if (rc)
2360                 GOTO(out_stop, rc);
2361
2362         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2363                                &def_acl_buf, hint, handle);
2364         if (rc != 0)
2365                 GOTO(out_stop, rc);
2366
2367         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2368                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2369                 rc = __mdd_orphan_add(env, son, handle);
2370                 GOTO(out_volatile, rc);
2371         } else {
2372                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2373                                         attr->la_mode, name, handle,
2374                                         mdd_object_capa(env, mdd_pobj));
2375                 if (rc != 0)
2376                         GOTO(err_created, rc);
2377
2378                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2379                               ldata, 1);
2380
2381                 /* update parent directory mtime/ctime */
2382                 *la = *attr;
2383                 la->la_valid = LA_CTIME | LA_MTIME;
2384                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2385                 if (rc)
2386                         GOTO(err_insert, rc);
2387         }
2388
2389         EXIT;
2390 err_insert:
2391         if (rc != 0) {
2392                 int rc2;
2393
2394                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2395                         rc2 = __mdd_orphan_del(env, son, handle);
2396                 else
2397                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2398                                                  S_ISDIR(attr->la_mode),
2399                                                  handle, BYPASS_CAPA);
2400                 if (rc2 != 0)
2401                         goto out_stop;
2402
2403 err_created:
2404                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2405                 if (S_ISDIR(attr->la_mode)) {
2406                         /* Drop the reference, no need to delete "."/"..",
2407                          * because the object is to be destroyed directly. */
2408                         rc2 = mdo_ref_del(env, son, handle);
2409                         if (rc2 != 0) {
2410                                 mdd_write_unlock(env, son);
2411                                 goto out_stop;
2412                         }
2413                 }
2414 out_volatile:
2415                 /* For volatile files drop one link immediately, since there is
2416                  * no filename in the namespace, and save any error returned. */
2417                 rc2 = mdo_ref_del(env, son, handle);
2418                 if (rc2 != 0) {
2419                         mdd_write_unlock(env, son);
2420                         if (unlikely(rc == 0))
2421                                 rc = rc2;
2422                         goto out_stop;
2423                 }
2424
2425                 /* Don't destroy the volatile object on success */
2426                 if (likely(rc != 0))
2427                         mdo_destroy(env, son, handle);
2428                 mdd_write_unlock(env, son);
2429         }
2430
2431         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2432             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2433                 rc = mdd_changelog_ns_store(env, mdd,
2434                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2435                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2436                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2437                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2438                                 NULL, handle);
2439 out_stop:
2440         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2441         if (rc == 0)
2442                 rc = rc2;
2443 out_free:
2444         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2445                 /* if we vmalloced a large buffer drop it */
2446                 lu_buf_free(ldata->ld_buf);
2447
2448         /* The child object shouldn't be cached anymore */
2449         if (rc)
2450                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2451                         &child->mo_lu.lo_header->loh_flags);
2452         return rc;
2453 }
2454
2455 /*
2456  * Get locks on parents in proper order
2457  * RETURN: < 0 - error, rename_order if successful
2458  */
2459 enum rename_order {
2460         MDD_RN_SAME,
2461         MDD_RN_SRCTGT,
2462         MDD_RN_TGTSRC
2463 };
2464
2465 static int mdd_rename_order(const struct lu_env *env,
2466                             struct mdd_device *mdd,
2467                             struct mdd_object *src_pobj,
2468                             const struct lu_attr *pattr,
2469                             struct mdd_object *tgt_pobj)
2470 {
2471         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2472         int rc;
2473         ENTRY;
2474
2475         if (src_pobj == tgt_pobj)
2476                 RETURN(MDD_RN_SAME);
2477
2478         /* compared the parent child relationship of src_p&tgt_p */
2479         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2480                 rc = MDD_RN_SRCTGT;
2481         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2482                 rc = MDD_RN_TGTSRC;
2483         } else {
2484                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2485                                    NULL);
2486                 if (rc == -EREMOTE)
2487                         rc = 0;
2488
2489                 if (rc == 1)
2490                         rc = MDD_RN_TGTSRC;
2491                 else if (rc == 0)
2492                         rc = MDD_RN_SRCTGT;
2493         }
2494
2495         RETURN(rc);
2496 }
2497
2498 /* has not mdd_write{read}_lock on any obj yet. */
2499 static int mdd_rename_sanity_check(const struct lu_env *env,
2500                                    struct mdd_object *src_pobj,
2501                                    const struct lu_attr *pattr,
2502                                    struct mdd_object *tgt_pobj,
2503                                    const struct lu_attr *tpattr,
2504                                    struct mdd_object *sobj,
2505                                    const struct lu_attr *cattr,
2506                                    struct mdd_object *tobj,
2507                                    const struct lu_attr *tattr)
2508 {
2509         int rc = 0;
2510         ENTRY;
2511
2512         /* XXX: when get here, sobj must NOT be NULL,
2513          * the other case has been processed in cld_rename
2514          * before mdd_rename and enable MDS_PERM_BYPASS. */
2515         LASSERT(sobj);
2516
2517         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2518         if (rc)
2519                 RETURN(rc);
2520
2521         /* XXX: when get here, "tobj == NULL" means tobj must
2522          * NOT exist (neither on remote MDS, such case has been
2523          * processed in cld_rename before mdd_rename and enable
2524          * MDS_PERM_BYPASS).
2525          * So check may_create, but not check may_unlink. */
2526         if (tobj == NULL)
2527                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2528                                     (src_pobj != tgt_pobj));
2529         else
2530                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2531                                     (src_pobj != tgt_pobj), 1);
2532
2533         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2534                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2535
2536         RETURN(rc);
2537 }
2538
2539 static int mdd_declare_rename(const struct lu_env *env,
2540                               struct mdd_device *mdd,
2541                               struct mdd_object *mdd_spobj,
2542                               struct mdd_object *mdd_tpobj,
2543                               struct mdd_object *mdd_sobj,
2544                               struct mdd_object *mdd_tobj,
2545                               const struct lu_name *tname,
2546                               const struct lu_name *sname,
2547                               struct md_attr *ma,
2548                               struct linkea_data *ldata,
2549                               struct thandle *handle)
2550 {
2551         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2552         int rc;
2553
2554         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2555         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2556
2557         LASSERT(mdd_spobj);
2558         LASSERT(mdd_tpobj);
2559         LASSERT(mdd_sobj);
2560
2561         /* name from source dir */
2562         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2563         if (rc)
2564                 return rc;
2565
2566         /* .. from source child */
2567         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2568                 /* source child can be directory,
2569                  * counted by source dir's nlink */
2570                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2571                 if (rc)
2572                         return rc;
2573                 if (mdd_spobj != mdd_tpobj) {
2574                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2575                                                       handle);
2576                         if (rc != 0)
2577                                 return rc;
2578
2579                         rc = mdo_declare_index_insert(env, mdd_sobj,
2580                                                       mdo2fid(mdd_tpobj),
2581                                                       S_IFDIR, dotdot, handle);
2582                         if (rc != 0)
2583                                 return rc;
2584                 }
2585
2586                 /* new target child can be directory,
2587                  * counted by target dir's nlink */
2588                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2589                 if (rc != 0)
2590                         return rc;
2591         }
2592
2593         la->la_valid = LA_CTIME | LA_MTIME;
2594         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2595         if (rc != 0)
2596                 return rc;
2597
2598         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2599         if (rc != 0)
2600                 return rc;
2601
2602         la->la_valid = LA_CTIME;
2603         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2604         if (rc)
2605                 return rc;
2606
2607         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2608                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2609         if (rc)
2610                 return rc;
2611
2612         /* new name */
2613         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2614                                       mdd_object_type(mdd_sobj),
2615                                       tname->ln_name, handle);
2616         if (rc != 0)
2617                 return rc;
2618
2619         /* name from target dir (old name), we declare it unconditionally
2620          * as mdd_rename() calls delete unconditionally as well. so just
2621          * to balance declarations vs calls to change ... */
2622         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2623         if (rc)
2624                 return rc;
2625
2626         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2627                 /* delete target child in target parent directory */
2628                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2629                 if (rc)
2630                         return rc;
2631
2632                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2633                         /* target child can be directory,
2634                          * delete "." reference in target child directory */
2635                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2636                         if (rc)
2637                                 return rc;
2638
2639                         /* delete ".." reference in target parent directory */
2640                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2641                         if (rc)
2642                                 return rc;
2643                 }
2644
2645                 la->la_valid = LA_CTIME;
2646                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2647                 if (rc)
2648                         return rc;
2649
2650                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2651                 if (rc)
2652                         return rc;
2653         }
2654
2655         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2656         if (rc)
2657                 return rc;
2658
2659         return rc;
2660 }
2661
2662 /* src object can be remote that is why we use only fid and type of object */
2663 static int mdd_rename(const struct lu_env *env,
2664                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2665                       const struct lu_fid *lf, const struct lu_name *lsname,
2666                       struct md_object *tobj, const struct lu_name *ltname,
2667                       struct md_attr *ma)
2668 {
2669         const char *sname = lsname->ln_name;
2670         const char *tname = ltname->ln_name;
2671         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2672         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2673         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2674         struct mdd_device *mdd = mdo2mdd(src_pobj);
2675         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2676         struct mdd_object *mdd_tobj = NULL;
2677         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2678         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2679         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2680         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2681         struct thandle *handle;
2682         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2683         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2684         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2685         bool is_dir;
2686         bool tobj_ref = 0;
2687         bool tobj_locked = 0;
2688         unsigned cl_flags = 0;
2689         int rc, rc2;
2690         ENTRY;
2691
2692         if (tobj)
2693                 mdd_tobj = md2mdd_obj(tobj);
2694
2695         mdd_sobj = mdd_object_find(env, mdd, lf);
2696
2697         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
2698         if (rc)
2699                 GOTO(out_pending, rc);
2700
2701         rc = mdd_la_get(env, mdd_spobj, pattr, BYPASS_CAPA);
2702         if (rc)
2703                 GOTO(out_pending, rc);
2704
2705         if (mdd_tobj) {
2706                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2707                 if (rc)
2708                         GOTO(out_pending, rc);
2709         }
2710
2711         rc = mdd_la_get(env, mdd_tpobj, tpattr, BYPASS_CAPA);
2712         if (rc)
2713                 GOTO(out_pending, rc);
2714
2715         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2716                                      mdd_sobj, cattr, mdd_tobj, tattr);
2717         if (rc)
2718                 GOTO(out_pending, rc);
2719
2720         rc = mdd_name_check(mdd, ltname);
2721         if (rc < 0)
2722                 GOTO(out_pending, rc);
2723
2724         handle = mdd_trans_create(env, mdd);
2725         if (IS_ERR(handle))
2726                 GOTO(out_pending, rc = PTR_ERR(handle));
2727
2728         memset(ldata, 0, sizeof(*ldata));
2729         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2730                            ltname, 1, 0, ldata);
2731         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2732                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2733         if (rc)
2734                 GOTO(stop, rc);
2735
2736         rc = mdd_trans_start(env, mdd, handle);
2737         if (rc)
2738                 GOTO(stop, rc);
2739
2740         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2741         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2742         if (rc < 0)
2743                 GOTO(cleanup_unlocked, rc);
2744
2745         is_dir = S_ISDIR(cattr->la_mode);
2746
2747         /* Remove source name from source directory */
2748         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2749                                 mdd_object_capa(env, mdd_spobj));
2750         if (rc)
2751                 GOTO(cleanup, rc);
2752
2753         /* "mv dir1 dir2" needs "dir1/.." link update */
2754         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2755                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2756                                         mdd_object_capa(env, mdd_sobj));
2757                 if (rc != 0)
2758                         GOTO(fixup_spobj2, rc);
2759
2760                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2761                                              dotdot, handle,
2762                                              mdd_object_capa(env, mdd_sobj));
2763                 if (rc != 0)
2764                         GOTO(fixup_spobj, rc);
2765         }
2766
2767         /* Remove target name from target directory
2768          * Here tobj can be remote one, so we do index_delete unconditionally
2769          * and -ENOENT is allowed.
2770          */
2771         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2772                                 mdd_object_capa(env, mdd_tpobj));
2773         if (rc != 0) {
2774                 if (mdd_tobj) {
2775                         /* tname might been renamed to something else */
2776                         GOTO(fixup_spobj, rc);
2777                 }
2778                 if (rc != -ENOENT)
2779                         GOTO(fixup_spobj, rc);
2780         }
2781
2782         /* Insert new fid with target name into target dir */
2783         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2784                                 tname, handle, mdd_object_capa(env, mdd_tpobj));
2785         if (rc != 0)
2786                 GOTO(fixup_tpobj, rc);
2787
2788         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2789         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2790
2791         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2792         if (mdd_sobj) {
2793                 la->la_valid = LA_CTIME;
2794                 rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2795                 if (rc)
2796                         GOTO(fixup_tpobj, rc);
2797         }
2798
2799         /* Remove old target object
2800          * For tobj is remote case cmm layer has processed
2801          * and set tobj to NULL then. So when tobj is NOT NULL,
2802          * it must be local one.
2803          */
2804         if (tobj && mdd_object_exists(mdd_tobj)) {
2805                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2806                 tobj_locked = 1;
2807                 if (mdd_is_dead_obj(mdd_tobj)) {
2808                         /* shld not be dead, something is wrong */
2809                         CERROR("tobj is dead, something is wrong\n");
2810                         rc = -EINVAL;
2811                         goto cleanup;
2812                 }
2813                 mdo_ref_del(env, mdd_tobj, handle);
2814
2815                 /* Remove dot reference. */
2816                 if (S_ISDIR(tattr->la_mode))
2817                         mdo_ref_del(env, mdd_tobj, handle);
2818                 tobj_ref = 1;
2819
2820                 /* fetch updated nlink */
2821                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2822                 if (rc != 0) {
2823                         CERROR("%s: Failed to get nlink for tobj "
2824                                 DFID": rc = %d\n",
2825                                 mdd2obd_dev(mdd)->obd_name,
2826                                 PFID(tpobj_fid), rc);
2827                         GOTO(fixup_tpobj, rc);
2828                 }
2829
2830                 la->la_valid = LA_CTIME;
2831                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2832                 if (rc != 0) {
2833                         CERROR("%s: Failed to set ctime for tobj "
2834                                 DFID": rc = %d\n",
2835                                 mdd2obd_dev(mdd)->obd_name,
2836                                 PFID(tpobj_fid), rc);
2837                         GOTO(fixup_tpobj, rc);
2838                 }
2839
2840                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2841                 ma->ma_attr = *tattr;
2842                 ma->ma_valid |= MA_INODE;
2843                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2844                                        handle);
2845                 if (rc != 0) {
2846                         CERROR("%s: Failed to unlink tobj "
2847                                 DFID": rc = %d\n",
2848                                 mdd2obd_dev(mdd)->obd_name,
2849                                 PFID(tpobj_fid), rc);
2850                         GOTO(fixup_tpobj, rc);
2851                 }
2852
2853                 /* fetch updated nlink */
2854                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2855                 if (rc != 0) {
2856                         CERROR("%s: Failed to get nlink for tobj "
2857                                 DFID": rc = %d\n",
2858                                 mdd2obd_dev(mdd)->obd_name,
2859                                 PFID(tpobj_fid), rc);
2860                         GOTO(fixup_tpobj, rc);
2861                 }
2862                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2863                 ma->ma_attr = *tattr;
2864                 ma->ma_valid |= MA_INODE;
2865
2866                 if (tattr->la_nlink == 0) {
2867                         cl_flags |= CLF_RENAME_LAST;
2868                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2869                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2870                 }
2871         }
2872
2873         la->la_valid = LA_CTIME | LA_MTIME;
2874         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2875         if (rc)
2876                 GOTO(fixup_tpobj, rc);
2877
2878         if (mdd_spobj != mdd_tpobj) {
2879                 la->la_valid = LA_CTIME | LA_MTIME;
2880                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2881         }
2882
2883         if (rc == 0 && mdd_sobj) {
2884                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2885                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2886                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2887                                       0, 0);
2888                 if (rc == -ENOENT)
2889                         /* Old files might not have EA entry */
2890                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2891                                       lsname, handle, NULL, 0);
2892                 mdd_write_unlock(env, mdd_sobj);
2893                 /* We don't fail the transaction if the link ea can't be
2894                    updated -- fid2path will use alternate lookup method. */
2895                 rc = 0;
2896         }
2897
2898         EXIT;
2899
2900 fixup_tpobj:
2901         if (rc) {
2902                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2903                                          BYPASS_CAPA);
2904                 if (rc2)
2905                         CWARN("tp obj fix error %d\n",rc2);
2906
2907                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2908                     !mdd_is_dead_obj(mdd_tobj)) {
2909                         if (tobj_ref) {
2910                                 mdo_ref_add(env, mdd_tobj, handle);
2911                                 if (is_dir)
2912                                         mdo_ref_add(env, mdd_tobj, handle);
2913                         }
2914
2915                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2916                                                   mdo2fid(mdd_tobj),
2917                                                   mdd_object_type(mdd_tobj),
2918                                                   tname, handle, BYPASS_CAPA);
2919                         if (rc2 != 0)
2920                                 CWARN("tp obj fix error: rc = %d\n", rc2);
2921                 }
2922         }
2923
2924 fixup_spobj:
2925         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2926                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2927                                               BYPASS_CAPA);
2928
2929                 if (rc2)
2930                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2931                                mdd2obd_dev(mdd)->obd_name, rc2);
2932
2933
2934                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
2935                                               dotdot, handle, BYPASS_CAPA);
2936                 if (rc2 != 0)
2937                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2938                               mdd2obd_dev(mdd)->obd_name, rc2);
2939         }
2940
2941 fixup_spobj2:
2942         if (rc != 0) {
2943                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
2944                                          mdd_object_type(mdd_sobj), sname,
2945                                          handle, BYPASS_CAPA);
2946                 if (rc2 != 0)
2947                         CWARN("sp obj fix error: rc = %d\n", rc2);
2948         }
2949
2950 cleanup:
2951         if (tobj_locked)
2952                 mdd_write_unlock(env, mdd_tobj);
2953
2954 cleanup_unlocked:
2955         if (rc == 0)
2956                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
2957                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
2958                                             ltname, lsname, handle);
2959
2960 stop:
2961         mdd_trans_stop(env, mdd, rc, handle);
2962
2963 out_pending:
2964         mdd_object_put(env, mdd_sobj);
2965         return rc;
2966 }
2967
2968 /**
2969  * During migration once the parent FID has been changed,
2970  * we need update the parent FID in linkea.
2971  **/
2972 static int mdd_linkea_update_child_internal(const struct lu_env *env,
2973                                             struct mdd_object *parent,
2974                                             struct mdd_object *child,
2975                                             const char *name, int namelen,
2976                                             struct thandle *handle,
2977                                             bool declare)
2978 {
2979         struct mdd_thread_info  *info = mdd_env_info(env);
2980         struct linkea_data      ldata = { NULL };
2981         struct lu_buf           *buf = &info->mti_link_buf;
2982         int                     count;
2983         int                     rc = 0;
2984
2985         ENTRY;
2986
2987         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2988         if (buf->lb_buf == NULL)
2989                 RETURN(-ENOMEM);
2990
2991         ldata.ld_buf = buf;
2992         rc = mdd_links_read(env, child, &ldata);
2993         if (rc != 0) {
2994                 if (rc == -ENOENT || rc == -ENODATA)
2995                         rc = 0;
2996                 RETURN(rc);
2997         }
2998
2999         LASSERT(ldata.ld_leh != NULL);
3000         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3001         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3002                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3003                 struct lu_name lname;
3004                 struct lu_fid  fid;
3005
3006                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3007                                     &lname, &fid);
3008
3009                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3010                     lu_fid_eq(&fid, mdd_object_fid(parent))) {
3011                         ldata.ld_lee = (struct link_ea_entry *)
3012                                        ((char *)ldata.ld_lee +
3013                                         ldata.ld_reclen);
3014                         continue;
3015                 }
3016
3017                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3018                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3019                        lname.ln_namelen, lname.ln_name,
3020                        PFID(mdd_object_fid(parent)));
3021                 /* update to the new parent fid */
3022                 linkea_entry_pack(ldata.ld_lee, &lname,
3023                                   mdd_object_fid(parent));
3024                 if (declare)
3025                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3026                                                    MLAO_IGNORE);
3027                 else
3028                         rc = mdd_links_write(env, child, &ldata, handle);
3029                 break;
3030         }
3031         RETURN(rc);
3032 }
3033
3034 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3035                                            struct mdd_object *parent,
3036                                            struct mdd_object *child,
3037                                            const char *name, int namelen,
3038                                            struct thandle *handle)
3039 {
3040         return mdd_linkea_update_child_internal(env, parent, child, name,
3041                                                 namelen, handle, true);
3042 }
3043
3044 static int mdd_linkea_update_child(const struct lu_env *env,
3045                                    struct mdd_object *parent,
3046                                    struct mdd_object *child,
3047                                    const char *name, int namelen,
3048                                    struct thandle *handle)
3049 {
3050         return mdd_linkea_update_child_internal(env, parent, child, name,
3051                                                 namelen, handle, false);
3052 }
3053
3054 static int mdd_update_linkea_internal(const struct lu_env *env,
3055                                       struct mdd_object *mdd_pobj,
3056                                       struct mdd_object *mdd_sobj,
3057                                       struct mdd_object *mdd_tobj,
3058                                       const struct lu_name *child_name,
3059                                       struct thandle *handle,
3060                                       int declare)
3061 {
3062         struct mdd_thread_info  *info = mdd_env_info(env);
3063         struct linkea_data      *ldata = &info->mti_link_data;
3064         int                     count;
3065         int                     rc = 0;
3066         ENTRY;
3067
3068         rc = mdd_links_read(env, mdd_sobj, ldata);
3069         if (rc != 0) {
3070                 if (rc == -ENOENT || rc == -ENODATA)
3071                         rc = 0;
3072                 RETURN(rc);
3073         }
3074
3075         /* If it is mulitple links file, we need update the name entry for
3076          * all parent */
3077         LASSERT(ldata->ld_leh != NULL);
3078         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3079         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3080                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3081                 struct mdd_object       *pobj;
3082                 struct lu_name          lname;
3083                 struct lu_fid           fid;
3084
3085                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3086                                     &lname, &fid);
3087                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3088                                                          ldata->ld_reclen);
3089                 pobj = mdd_object_find(env, mdd, &fid);
3090                 if (IS_ERR(pobj)) {
3091                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3092                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3093                               PTR_ERR(pobj));
3094                         continue;
3095                 }
3096
3097                 if (!mdd_object_exists(pobj)) {
3098                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3099                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3100                         GOTO(next_put, rc);
3101                 }
3102
3103                 if (pobj == mdd_pobj &&
3104                     lname.ln_namelen == child_name->ln_namelen &&
3105                     strncmp(lname.ln_name, child_name->ln_name,
3106                             lname.ln_namelen) == 0) {
3107                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3108                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3109                               PFID(&fid));
3110                         GOTO(next_put, rc);
3111                 }
3112
3113                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3114                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3115                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3116
3117                 if (declare) {
3118                         /* Remove source name from source directory */
3119                         /* Insert new fid with target name into target dir */
3120                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3121                                                       handle);
3122                         if (rc != 0)
3123                                 GOTO(next_put, rc);
3124
3125                         rc = mdo_declare_index_insert(env, pobj,
3126                                         mdd_object_fid(mdd_tobj),
3127                                         mdd_object_type(mdd_tobj),
3128                                         lname.ln_name, handle);
3129                         if (rc != 0)
3130                                 GOTO(next_put, rc);
3131
3132                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3133                         if (rc)
3134                                 GOTO(next_put, rc);
3135
3136                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3137                         if (rc)
3138                                 GOTO(next_put, rc);
3139                 } else {
3140                         rc = __mdd_index_delete(env, pobj, lname.ln_name,
3141                                                 0, handle,
3142                                                 mdd_object_capa(env, pobj));
3143                         if (rc)
3144                                 GOTO(next_put, rc);
3145
3146                         rc = __mdd_index_insert(env, pobj,
3147                                         mdd_object_fid(mdd_tobj),
3148                                         mdd_object_type(mdd_tobj),
3149                                         lname.ln_name, handle,
3150                                         mdd_object_capa(env, pobj));
3151                         if (rc != 0)
3152                                 GOTO(next_put, rc);
3153
3154                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3155                         rc = mdo_ref_add(env, mdd_tobj, handle);
3156                         mdd_write_unlock(env, mdd_tobj);
3157                         if (rc)
3158                                 GOTO(next_put, rc);
3159
3160                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3161                         mdo_ref_del(env, mdd_sobj, handle);
3162                         mdd_write_unlock(env, mdd_sobj);
3163                 }
3164 next_put:
3165                 mdd_object_put(env, pobj);
3166                 if (rc != 0)
3167                         break;
3168         }
3169
3170         RETURN(rc);
3171 }
3172
3173 static int mdd_migrate_xattrs(const struct lu_env *env,
3174                               struct mdd_object *mdd_sobj,
3175                               struct mdd_object *mdd_tobj)
3176 {
3177         struct mdd_thread_info  *info = mdd_env_info(env);
3178         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3179         char                    *xname;
3180         struct thandle          *handle;
3181         struct lu_buf           xbuf;
3182         int                     xlen;
3183         int                     rem;
3184         int                     xsize;
3185         int                     list_xsize;
3186         struct lu_buf           list_xbuf;
3187         int                     rc;
3188
3189         /* retrieve xattr list from the old object */
3190         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL,
3191                                     mdd_object_capa(env, mdd_sobj));
3192         if (list_xsize == -ENODATA)
3193                 return 0;
3194
3195         if (list_xsize < 0)
3196                 return list_xsize;
3197
3198         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3199         if (info->mti_big_buf.lb_buf == NULL)
3200                 return -ENOMEM;
3201
3202         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3203         list_xbuf.lb_len = list_xsize;
3204         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf,
3205                             mdd_object_capa(env, mdd_sobj));
3206         if (rc < 0)
3207                 return rc;
3208         rc = 0;
3209         rem = list_xsize;
3210         xname = list_xbuf.lb_buf;
3211         while (rem > 0) {
3212                 xlen = strnlen(xname, rem - 1) + 1;
3213                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3214                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3215                     strcmp(XATTR_NAME_LMV, xname) == 0)
3216                         goto next;
3217
3218                 /* For directory, if there are default layout, migrate here */
3219                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3220                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3221                         goto next;
3222
3223                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL,
3224                                       xname,
3225                                       mdd_object_capa(env, mdd_sobj));
3226                 if (xsize == -ENODATA)
3227                         goto next;
3228                 if (xsize < 0)
3229                         GOTO(out, rc);
3230
3231                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3232                 if (info->mti_link_buf.lb_buf == NULL)
3233                         GOTO(out, rc = -ENOMEM);
3234
3235                 xbuf.lb_len = xsize;
3236                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3237                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname,
3238                                    mdd_object_capa(env, mdd_sobj));
3239                 if (rc == -ENODATA)
3240                         goto next;
3241                 if (rc < 0)
3242                         GOTO(out, rc);
3243
3244                 handle = mdd_trans_create(env, mdd);
3245                 if (IS_ERR(handle))
3246                         GOTO(out, rc = PTR_ERR(handle));
3247
3248                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3249                                            handle);
3250                 if (rc != 0)
3251                         GOTO(stop_trans, rc);
3252                 /* Note: this transaction is part of migration, and it is not
3253                  * the last step of migration, so we set th_local = 1 to avoid
3254                  * update last rcvd for this transaction */
3255                 handle->th_local = 1;
3256                 rc = mdd_trans_start(env, mdd, handle);
3257                 if (rc != 0)
3258                         GOTO(stop_trans, rc);
3259
3260                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle,
3261                                    mdd_object_capa(env, mdd_sobj));
3262                 if (rc == -EEXIST)
3263                         GOTO(stop_trans, rc = 0);
3264
3265                 if (rc != 0)
3266                         GOTO(stop_trans, rc);
3267 stop_trans:
3268                 mdd_trans_stop(env, mdd, rc, handle);
3269                 if (rc != 0)
3270                         GOTO(out, rc);
3271 next:
3272                 rem -= xlen;
3273                 memmove(xname, xname + xlen, rem);
3274         }
3275 out:
3276         return rc;
3277 }
3278
3279 static int mdd_declare_migrate_create(const struct lu_env *env,
3280                                       struct mdd_object *mdd_pobj,
3281                                       struct mdd_object *mdd_sobj,
3282                                       struct mdd_object *mdd_tobj,
3283                                       struct md_op_spec *spec,
3284                                       struct lu_attr *la,
3285                                       union lmv_mds_md *mgr_ea,
3286                                       struct linkea_data *ldata,
3287                                       struct thandle *handle)
3288 {
3289         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3290         const struct lu_buf     *buf;
3291         int                     rc;
3292         int                     mgr_easize;
3293
3294         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3295                                                 handle, spec, NULL);
3296         if (rc != 0)
3297                 return rc;
3298
3299         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3300                                            handle);
3301         if (rc != 0)
3302                 return rc;
3303
3304         if (S_ISLNK(la->la_mode)) {
3305                 const char *target_name = spec->u.sp_symname;
3306                 int sym_len = strlen(target_name);
3307                 const struct lu_buf *buf;
3308
3309                 buf = mdd_buf_get_const(env, target_name, sym_len);
3310                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3311                                              buf, 0, handle);
3312                 if (rc != 0)
3313                         return rc;
3314         } else if (S_ISDIR(la->la_mode)) {
3315                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3316                                            MLAO_IGNORE);
3317                 if (rc != 0)
3318                         return rc;
3319         }
3320
3321         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3322                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3323                                         spec->u.sp_ea.eadatalen);
3324                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3325                                            0, handle);
3326                 if (rc)
3327                         return rc;
3328         }
3329
3330         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3331         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3332         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3333                                    0, handle);
3334         if (rc)
3335                 return rc;
3336
3337         la_flag->la_valid = LA_FLAGS;
3338         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3339         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3340
3341         return rc;
3342 }
3343
3344 static int mdd_migrate_create(const struct lu_env *env,
3345                               struct mdd_object *mdd_pobj,
3346                               struct mdd_object *mdd_sobj,
3347                               struct mdd_object *mdd_tobj,
3348                               const struct lu_name *lname,
3349                               struct lu_attr *la)
3350 {
3351         struct mdd_thread_info  *info = mdd_env_info(env);
3352         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3353         struct md_op_spec       *spec = &info->mti_spec;
3354         struct lu_buf           lmm_buf = { NULL };
3355         struct lu_buf           link_buf = { NULL };
3356         const struct lu_buf     *buf;
3357         struct thandle          *handle;
3358         struct lmv_mds_md_v1    *mgr_ea;
3359         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3360         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3361         int                     mgr_easize;
3362         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3363         int                     rc;
3364         ENTRY;
3365
3366         /* prepare spec for create */
3367         memset(spec, 0, sizeof(*spec));
3368         spec->sp_cr_lookup = 0;
3369         spec->sp_feat = &dt_directory_features;
3370         if (S_ISLNK(la->la_mode)) {
3371                 buf = lu_buf_check_and_alloc(
3372                                 &mdd_env_info(env)->mti_big_buf,
3373                                 la->la_size + 1);
3374                 link_buf = *buf;
3375                 link_buf.lb_len = la->la_size + 1;
3376                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3377                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3378                 if (rc <= 0) {
3379                         rc = rc != 0 ? rc : -EFAULT;
3380                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3381                                mdd2obd_dev(mdd)->obd_name,
3382                                PFID(mdd_object_fid(mdd_sobj)), rc);
3383                         RETURN(rc);
3384                 }
3385                 spec->u.sp_symname = link_buf.lb_buf;
3386         } else if S_ISREG(la->la_mode) {
3387                 /* retrieve lov of the old object */
3388                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3389                 if (rc != 0 && rc != -ENODATA)
3390                         RETURN(rc);
3391                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3392                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3393                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3394                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3395                 }
3396         } else if (S_ISDIR(la->la_mode)) {
3397                 rc = mdd_links_read(env, mdd_sobj, ldata);
3398                 if (rc < 0 && rc != -ENODATA)
3399                         RETURN(rc);
3400         }
3401
3402         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3403         memset(mgr_ea, 0, sizeof(*mgr_ea));
3404         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3405         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3406         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3407         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3408         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3409         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3410
3411         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3412
3413         handle = mdd_trans_create(env, mdd);
3414         if (IS_ERR(handle))
3415                 GOTO(out_free, rc = PTR_ERR(handle));
3416
3417         /* Note: this transaction is part of migration, and it is not
3418          * the last step of migration, so we set th_local = 1 to avoid
3419          * update last rcvd for this transaction */
3420         handle->th_local = 1;
3421         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3422                                         spec, la,
3423                                         (union lmv_mds_md *)info->mti_xattr_buf,
3424                                         ldata, handle);
3425         if (rc != 0)
3426                 GOTO(stop_trans, rc);
3427
3428         rc = mdd_trans_start(env, mdd, handle);
3429         if (rc != 0)
3430                 GOTO(stop_trans, rc);
3431
3432         /* create the target object */
3433         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3434                                hint, handle);
3435         if (rc != 0)
3436                 GOTO(stop_trans, rc);
3437
3438         if (S_ISDIR(la->la_mode)) {
3439                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3440                 if (rc != 0)
3441                         GOTO(stop_trans, rc);
3442         }
3443
3444         /* Set MIGRATE EA on the source inode, so once the migration needs
3445          * to be re-done during failover, the re-do process can locate the
3446          * target object which is already being created. */
3447         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3448         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3449         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0,
3450                            handle, mdd_object_capa(env, mdd_sobj));
3451         if (rc != 0)
3452                 GOTO(stop_trans, rc);
3453
3454         /* Set immutable flag, so any modification is disabled until
3455          * the migration is done. Once the migration is interrupted,
3456          * if the resume process find the migrating object has both
3457          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3458          * flag and approve the migration */
3459         la_flag->la_valid = LA_FLAGS;
3460         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3461         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
3462                           mdd_object_capa(env, mdd_sobj));
3463 stop_trans:
3464         if (handle != NULL)
3465                 mdd_trans_stop(env, mdd, rc, handle);
3466 out_free:
3467         if (lmm_buf.lb_buf != NULL)
3468                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3469         RETURN(rc);
3470 }
3471
3472 static int mdd_migrate_entries(const struct lu_env *env,
3473                                struct mdd_object *mdd_sobj,
3474                                struct mdd_object *mdd_tobj)
3475 {
3476         struct dt_object        *next = mdd_object_child(mdd_sobj);
3477         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3478         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3479         struct thandle          *handle;
3480         struct dt_it            *it;
3481         const struct dt_it_ops  *iops;
3482         int                      rc;
3483         int                      result;
3484         struct lu_dirent        *ent;
3485         ENTRY;
3486
3487         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3488         if (ent == NULL)
3489                 RETURN(-ENOMEM);
3490
3491         if (!dt_try_as_dir(env, next))
3492                 GOTO(out_ent, rc = -ENOTDIR);
3493         /*
3494          * iterate directories
3495          */
3496         iops = &next->do_index_ops->dio_it;
3497         it = iops->init(env, next, LUDA_FID | LUDA_TYPE,
3498                         mdd_object_capa(env, mdd_sobj));
3499         if (IS_ERR(it))
3500                 GOTO(out_ent, rc = PTR_ERR(it));
3501
3502         rc = iops->load(env, it, 0);
3503         if (rc == 0)
3504                 rc = iops->next(env, it);
3505         else if (rc > 0)
3506                 rc = 0;
3507         /*
3508          * At this point and across for-loop:
3509          *
3510          *  rc == 0 -> ok, proceed.
3511          *  rc >  0 -> end of directory.
3512          *  rc <  0 -> error.
3513          */
3514         do {
3515                 struct mdd_object       *child;
3516                 char                    *name = mdd_env_info(env)->mti_key;
3517                 int                     len;
3518                 int                     recsize;
3519                 int                     is_dir;
3520                 bool                    target_exist = false;
3521
3522                 len = iops->key_size(env, it);
3523                 if (len == 0)
3524                         goto next;
3525
3526                 result = iops->rec(env, it, (struct dt_rec *)ent,
3527                                    LUDA_FID | LUDA_TYPE);
3528                 if (result == -ESTALE)
3529                         goto next;
3530                 if (result != 0) {
3531                         rc = result;
3532                         goto out;
3533                 }
3534
3535                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3536                 recsize = le16_to_cpu(ent->lde_reclen);
3537
3538                 /* Insert new fid with target name into target dir */
3539                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3540                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3541                      ent->lde_name[1] == '.'))
3542                         goto next;
3543
3544                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3545                 if (IS_ERR(child))
3546                         GOTO(out, rc = PTR_ERR(child));
3547
3548                 is_dir = S_ISDIR(mdd_object_type(child));
3549
3550                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3551
3552                 /* Check whether the name has been inserted to the target */
3553                 if (dt_try_as_dir(env, dt_tobj)) {
3554                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3555
3556                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3557                                        (struct dt_key *)name,
3558                                        mdd_object_capa(env, mdd_tobj));
3559                         if (unlikely(rc == 0))
3560                                 target_exist = true;
3561                 }
3562
3563                 handle = mdd_trans_create(env, mdd);
3564                 if (IS_ERR(handle))
3565                         GOTO(out, rc = PTR_ERR(handle));
3566
3567                 /* Note: this transaction is part of migration, and it is not
3568                  * the last step of migration, so we set th_local = 1 to avoid
3569                  * updating last rcvd for this transaction */
3570                 handle->th_local = 1;
3571                 if (likely(!target_exist)) {
3572                         rc = mdo_declare_index_insert(env, mdd_tobj,
3573                                                       &ent->lde_fid,
3574                                                       mdd_object_type(child),
3575                                                       name, handle);
3576                         if (rc != 0)
3577                                 GOTO(out_put, rc);
3578
3579                         if (is_dir) {
3580                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3581                                 if (rc != 0)
3582                                         GOTO(out_put, rc);
3583                         }
3584                 }
3585
3586                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3587                 if (rc != 0)
3588                         GOTO(out_put, rc);
3589
3590                 if (is_dir) {
3591                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3592                         if (rc != 0)
3593                                 GOTO(out_put, rc);
3594
3595                         /* Update .. for child */
3596                         rc = mdo_declare_index_delete(env, child, dotdot,
3597                                                       handle);
3598                         if (rc != 0)
3599                                 GOTO(out_put, rc);
3600
3601                         rc = mdo_declare_index_insert(env, child,
3602                                                       mdd_object_fid(mdd_tobj),
3603                                                       S_IFDIR, dotdot, handle);
3604                         if (rc != 0)
3605                                 GOTO(out_put, rc);
3606                 }
3607
3608                 rc = mdd_linkea_declare_update_child(env, mdd_tobj,
3609                                                      child, name,
3610                                                      strlen(name),
3611                                                      handle);
3612                 if (rc != 0)
3613                         GOTO(out_put, rc);
3614
3615                 rc = mdd_trans_start(env, mdd, handle);
3616                 if (rc != 0) {
3617                         CERROR("%s: transaction start failed: rc = %d\n",
3618                                mdd2obd_dev(mdd)->obd_name, rc);
3619                         GOTO(out_put, rc);
3620                 }
3621
3622                 if (likely(!target_exist)) {
3623                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3624                                                 mdd_object_type(child),
3625                                                 name, handle,
3626                                                 mdd_object_capa(env, mdd_tobj));
3627                         if (rc != 0)
3628                                 GOTO(out_put, rc);
3629
3630                         if (is_dir) {
3631                                 rc = mdo_ref_add(env, mdd_tobj, handle);
3632                                 if (rc != 0)
3633                                         GOTO(out_put, rc);
3634
3635                         }
3636                 }
3637
3638                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle,
3639                                         mdd_object_capa(env, mdd_sobj));
3640                 if (rc != 0)
3641                         GOTO(out_put, rc);
3642
3643                 if (is_dir) {
3644                         rc = __mdd_index_delete_only(env, child, dotdot, handle,
3645                                                    mdd_object_capa(env, child));
3646                         if (rc != 0)
3647                                 GOTO(out_put, rc);
3648
3649                         rc = __mdd_index_insert_only(env, child,
3650                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3651                                          dotdot, handle,
3652                                          mdd_object_capa(env, child));
3653                         if (rc != 0)
3654                                 GOTO(out_put, rc);
3655                 }
3656
3657                 rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
3658                                              strlen(name), handle);
3659
3660 out_put:
3661                 mdd_object_put(env, child);
3662                 mdd_trans_stop(env, mdd, rc, handle);
3663                 if (rc != 0)
3664                         GOTO(out, rc);
3665 next:
3666                 result = iops->next(env, it);
3667                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3668                         GOTO(out, rc = -EINTR);
3669
3670                 if (result == -ESTALE)
3671                         goto next;
3672         } while (result == 0);
3673 out:
3674         iops->put(env, it);
3675         iops->fini(env, it);
3676 out_ent:
3677         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3678         RETURN(rc);
3679 }
3680
3681 static int mdd_declare_update_linkea(const struct lu_env *env,
3682                                      struct mdd_object *mdd_pobj,
3683                                      struct mdd_object *mdd_sobj,
3684                                      struct mdd_object *mdd_tobj,
3685                                      struct thandle *handle,
3686                                      const struct lu_name *child_name)
3687 {
3688         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3689                                           child_name, handle, 1);
3690 }
3691
3692 static int mdd_update_linkea(const struct lu_env *env,
3693                              struct mdd_object *mdd_pobj,
3694                              struct mdd_object *mdd_sobj,
3695                              struct mdd_object *mdd_tobj,
3696                              struct thandle *handle,
3697                              const struct lu_name *child_name)
3698 {
3699         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3700                                           child_name, handle, 0);
3701 }
3702
3703 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3704                                            struct mdd_object *mdd_pobj,
3705                                            struct mdd_object *mdd_sobj,
3706                                            struct mdd_object *mdd_tobj,
3707                                            const struct lu_name *lname,
3708                                            struct lu_attr *la,
3709                                            struct lu_attr *parent_la,
3710                                            struct thandle *handle)
3711 {
3712         struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
3713         int             rc;
3714
3715         /* Revert IMMUTABLE flag */
3716         la_flag->la_valid = LA_FLAGS;
3717         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3718         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3719         if (rc != 0)
3720                 return rc;
3721
3722         /* delete entry from source dir */
3723         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3724         if (rc != 0)
3725                 return rc;
3726
3727         rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3728                                        mdd_tobj, handle, lname);
3729         if (rc != 0)
3730                 return rc;
3731
3732         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3733                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3734                                            handle);
3735                 if (rc != 0)
3736                         return rc;
3737         }
3738
3739         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3740                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3741                 if (rc != 0)
3742                         return rc;
3743         }
3744
3745         /* new name */
3746         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3747                                       mdd_object_type(mdd_tobj),
3748                                       lname->ln_name, handle);
3749         if (rc != 0)
3750                 return rc;
3751
3752         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3753                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3754                 if (rc != 0)
3755                         return rc;
3756         }
3757
3758         /* delete old object */
3759         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3760         if (rc != 0)
3761                 return rc;
3762
3763         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3764                 /* delete old object */
3765                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3766                 if (rc != 0)
3767                         return rc;
3768                 /* set nlink to 0 */
3769                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3770                 if (rc != 0)
3771                         return rc;
3772         }
3773
3774         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3775         if (rc)
3776                 return rc;
3777
3778         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3779
3780         return rc;
3781 }
3782
3783 static int mdd_migrate_update_name(const struct lu_env *env,
3784                                    struct mdd_object *mdd_pobj,
3785                                    struct mdd_object *mdd_sobj,
3786                                    struct mdd_object *mdd_tobj,
3787                                    const struct lu_name *lname,
3788                                    struct md_attr *ma)
3789 {
3790         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3791         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3792         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3793         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3794         struct thandle          *handle;
3795         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3796         const char              *name = lname->ln_name;
3797         int                     rc;
3798         ENTRY;
3799
3800         /* update time for parent */
3801         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3802         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3803         p_la->la_valid = LA_CTIME;
3804
3805         rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
3806         if (rc != 0)
3807                 RETURN(rc);
3808
3809         handle = mdd_trans_create(env, mdd);
3810         if (IS_ERR(handle))
3811                 RETURN(PTR_ERR(handle));
3812
3813         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3814                                              lname, so_attr, p_la, handle);
3815         if (rc != 0) {
3816                 /* If the migration can not be fit in one transaction, just
3817                  * leave it in the original MDT */
3818                 if (rc == -E2BIG)
3819                         GOTO(stop_trans, rc = 0);
3820                 else
3821                         GOTO(stop_trans, rc);
3822         }
3823
3824         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3825                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3826                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3827                PFID(mdd_object_fid(mdd_tobj)));
3828
3829         rc = mdd_trans_start(env, mdd, handle);
3830         if (rc != 0)
3831                 GOTO(stop_trans, rc);
3832
3833         /* Revert IMMUTABLE flag */
3834         la_flag->la_valid = LA_FLAGS;
3835         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3836         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
3837                           mdd_object_capa(env, mdd_pobj));
3838         if (rc != 0)
3839                 GOTO(stop_trans, rc);
3840
3841         /* Remove source name from source directory */
3842         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
3843                                 mdd_object_capa(env, mdd_pobj));
3844         if (rc != 0)
3845                 GOTO(stop_trans, rc);
3846
3847         rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3848                                handle, lname);
3849         if (rc != 0)
3850                 GOTO(stop_trans, rc);
3851
3852         if (S_ISREG(so_attr->la_mode)) {
3853                 if (so_attr->la_nlink == 1) {
3854                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3855                                            handle,
3856                                            mdd_object_capa(env, mdd_sobj));
3857                         if (rc != 0 && rc != -ENODATA)
3858                                 GOTO(stop_trans, rc);
3859                 }
3860         }
3861
3862         /* Insert new fid with target name into target dir */
3863         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
3864                                 mdd_object_type(mdd_tobj), name,
3865                                 handle, mdd_object_capa(env, mdd_pobj));
3866         if (rc != 0)
3867                 GOTO(stop_trans, rc);
3868
3869         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
3870                            NULL, 1);
3871         if (rc != 0)
3872                 GOTO(stop_trans, rc);
3873
3874         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3875         mdo_ref_del(env, mdd_sobj, handle);
3876         if (is_dir)
3877                 mdo_ref_del(env, mdd_sobj, handle);
3878
3879         /* Get the attr again after ref_del */
3880         rc = mdd_la_get(env, mdd_sobj, so_attr,
3881                         mdd_object_capa(env, mdd_sobj));
3882         if (rc != 0)
3883                 GOTO(out_unlock, rc);
3884
3885         ma->ma_attr = *so_attr;
3886         ma->ma_valid |= MA_INODE;
3887         rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
3888         if (rc != 0)
3889                 GOTO(out_unlock, rc);
3890
3891         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
3892         if (rc != 0)
3893                 GOTO(out_unlock, rc);
3894
3895 out_unlock:
3896         mdd_write_unlock(env, mdd_sobj);
3897
3898 stop_trans:
3899         mdd_trans_stop(env, mdd, rc, handle);
3900
3901         RETURN(rc);
3902 }
3903
3904 /**
3905  * Check whether we should migrate the file/dir
3906  * return val
3907  *      < 0  permission check failed or other error.
3908  *      = 0  the file can be migrated.
3909  *      > 0  the file does not need to be migrated, mostly
3910  *           for multiple link file
3911  **/
3912 static int mdd_migrate_sanity_check(const struct lu_env *env,
3913                                     struct mdd_object *pobj,
3914                                     const struct lu_attr *pattr,
3915                                     struct mdd_object *sobj,
3916                                     struct lu_attr *sattr)
3917 {
3918         struct mdd_thread_info  *info = mdd_env_info(env);
3919         struct linkea_data      *ldata = &info->mti_link_data;
3920         int                     mgr_easize;
3921         struct lu_buf           *mgr_buf;
3922         int                     count;
3923         int                     rc;
3924
3925         ENTRY;
3926
3927         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3928         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
3929         if (mgr_buf->lb_buf == NULL)
3930                 RETURN(-ENOMEM);
3931
3932         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV,
3933                            mdd_object_capa(env, sobj));
3934         if (rc > 0) {
3935                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
3936
3937                 /* If the object has migrateEA, it means IMMUTE flag
3938                  * is being set by previous migration process, so it
3939                  * needs to override the IMMUTE flag, otherwise the
3940                  * following sanity check will fail */
3941                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
3942                                                 LMV_HASH_FLAG_MIGRATION) {
3943                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
3944
3945                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
3946                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
3947                                mdd2obd_dev(mdd)->obd_name,
3948                                PFID(mdd_object_fid(sobj)));
3949                 }
3950         }
3951
3952         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
3953                                      sobj, sattr, NULL, NULL);
3954         if (rc != 0)
3955                 RETURN(rc);
3956
3957         /* Then it will check if the file should be migrated. If the file
3958          * has mulitple links, we only need migrate the file if all of its
3959          * entries has been migrated to the remote MDT */
3960         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
3961                 RETURN(0);
3962
3963         rc = mdd_links_read(env, sobj, ldata);
3964         if (rc != 0) {
3965                 /* For multiple links files, if there are no linkEA data at all,
3966                  * means the file might be created before linkEA is enabled, and
3967                  * all all of its links should not be migrated yet, otherwise
3968                  * it should have some linkEA there */
3969                 if (rc == -ENOENT || rc == -ENODATA)
3970                         RETURN(1);
3971                 RETURN(rc);
3972         }
3973
3974         /* If it is mulitple links file, we need update the name entry for
3975          * all parent */
3976         LASSERT(ldata->ld_leh != NULL);
3977         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3978         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3979                 struct mdd_device       *mdd = mdo2mdd(&sobj->mod_obj);
3980                 struct mdd_object       *lpobj;
3981                 struct lu_name          lname;
3982                 struct lu_fid           fid;
3983
3984                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3985                                     &lname, &fid);
3986                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3987                                                          ldata->ld_reclen);
3988                 lpobj = mdd_object_find(env, mdd, &fid);
3989                 if (IS_ERR(lpobj)) {
3990                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3991                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3992                               PTR_ERR(lpobj));
3993                         continue;
3994                 }
3995
3996                 if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
3997                         CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
3998                                PFID(&fid), lname.ln_namelen, lname.ln_name);
3999                         mdd_object_put(env, lpobj);
4000                         continue;
4001                 }
4002
4003                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4004                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4005                        lname.ln_name, PFID(&fid));
4006                 mdd_object_put(env, lpobj);
4007                 rc = 1;
4008                 break;
4009         }
4010
4011         RETURN(rc);
4012 }
4013
4014 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4015                        struct md_object *sobj, const struct lu_name *lname,
4016                        struct md_object *tobj, struct md_attr *ma)
4017 {
4018         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4019         struct mdd_device       *mdd = mdo2mdd(pobj);
4020         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4021         struct mdd_object       *mdd_tobj = NULL;
4022         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4023         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4024         int                     rc;
4025
4026         ENTRY;
4027         /* If the file will being migrated, it will check whether
4028          * the file is being opened by someone else right now */
4029         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4030         if (mdd_sobj->mod_count >= 1) {
4031                 CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
4032                        mdd2obd_dev(mdd)->obd_name,
4033                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4034                        mdd_sobj->mod_count, -EBUSY);
4035                 mdd_read_unlock(env, mdd_sobj);
4036                 GOTO(put, rc = -EBUSY);
4037         }
4038         mdd_read_unlock(env, mdd_sobj);
4039
4040         rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
4041         if (rc != 0)
4042                 GOTO(put, rc);
4043
4044         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
4045         if (rc != 0)
4046                 GOTO(put, rc);
4047
4048         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4049         if (rc != 0) {
4050                 if (rc > 0)
4051                         rc = 0;
4052                 GOTO(put, rc);
4053         }
4054
4055         /* Sigh, it is impossible to finish all of migration in a single
4056          * transaction, for example migrating big directory entries to the
4057          * new MDT, it needs insert all of name entries of children in the
4058          * new directory.
4059          *
4060          * So migration will be done in multiple steps and transactions.
4061          *
4062          * 1. create an orphan object on the remote MDT in one transaction.
4063          * 2. migrate extend attributes to the new target file/directory.
4064          * 3. For directory, migrate the entries to the new MDT and update
4065          * linkEA of each children. Because we can not migrate all entries
4066          * in a single transaction, so the migrating directory will become
4067          * a striped directory during migration, so once the process is
4068          * interrupted, the directory is still accessible. (During lookup,
4069          * client will locate the name by searching both original and target
4070          * object).
4071          * 4. Finally, update the name/FID to point to the new file/directory
4072          * in a separate transaction.
4073          */
4074
4075         /* step 1: Check whether the orphan object has been created, and create
4076          * orphan object on the remote MDT if needed */
4077         mdd_tobj = md2mdd_obj(tobj);
4078         if (!mdd_object_exists(mdd_tobj)) {
4079                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4080                                         lname, so_attr);
4081                 if (rc != 0)
4082                         GOTO(put, rc);
4083         }
4084
4085         /* step 2: migrate xattr */
4086         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4087         if (rc != 0)
4088                 GOTO(put, rc);
4089
4090         /* step 3: migrate name entries to the orphan object */
4091         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4092                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4093                 if (rc != 0)
4094                         GOTO(put, rc);
4095                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4096                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4097                         GOTO(put, rc = 0);
4098         } else {
4099                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4100         }
4101
4102         /* step 4: update name entry to the new object */
4103         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4104                                      ma);
4105         if (rc != 0)
4106                 GOTO(put, rc);
4107 put:
4108         RETURN(rc);
4109 }
4110
4111 const struct md_dir_operations mdd_dir_ops = {
4112         .mdo_is_subdir     = mdd_is_subdir,
4113         .mdo_lookup        = mdd_lookup,
4114         .mdo_create        = mdd_create,
4115         .mdo_rename        = mdd_rename,
4116         .mdo_link          = mdd_link,
4117         .mdo_unlink        = mdd_unlink,
4118         .mdo_create_data   = mdd_create_data,
4119         .mdo_migrate       = mdd_migrate,
4120 };