Whamcloud - gitweb
LU-11104 mdt: rename may cause deadlock
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <obd_class.h>
42 #include <obd_support.h>
43 #include <lustre_mds.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         .ln_name        = (char *) dotdot,
53         .ln_namelen     = sizeof(dotdot) - 1,
54 };
55
56 static inline int
57 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
58 {
59         if (!lu_name_is_valid(ln))
60                 return -EINVAL;
61         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
62                 return -ENAMETOOLONG;
63         else
64                 return 0;
65 }
66
67 /* Get FID from name and parent */
68 static int
69 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
70              const struct lu_attr *pattr, const struct lu_name *lname,
71              struct lu_fid* fid, int mask)
72 {
73         const char *name                = lname->ln_name;
74         const struct dt_key *key        = (const struct dt_key *)name;
75         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
76         struct mdd_device *m            = mdo2mdd(pobj);
77         struct dt_object *dir           = mdd_object_child(mdd_obj);
78         int rc;
79         ENTRY;
80
81         if (unlikely(mdd_is_dead_obj(mdd_obj)))
82                 RETURN(-ESTALE);
83
84         if (!mdd_object_exists(mdd_obj))
85                 RETURN(-ESTALE);
86
87         if (mdd_object_remote(mdd_obj)) {
88                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
89                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
90         }
91
92         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
93                                             MOR_TGT_PARENT);
94         if (rc)
95                 RETURN(rc);
96
97         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
98                    dt_try_as_dir(env, dir)))
99                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
100         else
101                 rc = -ENOTDIR;
102
103         RETURN(rc);
104 }
105
106 int mdd_lookup(const struct lu_env *env,
107                struct md_object *pobj, const struct lu_name *lname,
108                struct lu_fid *fid, struct md_op_spec *spec)
109 {
110         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
111         int rc;
112         ENTRY;
113
114         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
115         if (rc != 0)
116                 RETURN(rc);
117
118         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
119                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
120         RETURN(rc);
121 }
122
123 /** Read the link EA into a temp buffer.
124  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
125  * A pointer to the buffer is stored in \a ldata::ld_buf.
126  *
127  * \retval 0 or error
128  */
129 static int __mdd_links_read(const struct lu_env *env,
130                             struct mdd_object *mdd_obj,
131                             struct linkea_data *ldata)
132 {
133         int rc;
134
135         if (!mdd_object_exists(mdd_obj))
136                 return -ENODATA;
137
138         /* First try a small buf */
139         LASSERT(env != NULL);
140         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
141                                                PAGE_SIZE);
142         if (ldata->ld_buf->lb_buf == NULL)
143                 return -ENOMEM;
144
145         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
146         if (rc == -ERANGE) {
147                 /* Buf was too small, figure out what we need. */
148                 lu_buf_free(ldata->ld_buf);
149                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
150                                    XATTR_NAME_LINK);
151                 if (rc < 0)
152                         return rc;
153                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
154                 if (ldata->ld_buf->lb_buf == NULL)
155                         return -ENOMEM;
156                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
157                                   XATTR_NAME_LINK);
158         }
159         if (rc < 0) {
160                 lu_buf_free(ldata->ld_buf);
161                 ldata->ld_buf = NULL;
162                 return rc;
163         }
164
165         return linkea_init(ldata);
166 }
167
168 static int mdd_links_read(const struct lu_env *env,
169                           struct mdd_object *mdd_obj,
170                           struct linkea_data *ldata)
171 {
172         int rc;
173
174         rc = __mdd_links_read(env, mdd_obj, ldata);
175         if (!rc)
176                 rc = linkea_init(ldata);
177
178         return rc;
179 }
180
181 static int mdd_links_read_with_rec(const struct lu_env *env,
182                                    struct mdd_object *mdd_obj,
183                                    struct linkea_data *ldata)
184 {
185         int rc;
186
187         rc = __mdd_links_read(env, mdd_obj, ldata);
188         if (!rc)
189                 rc = linkea_init_with_rec(ldata);
190
191         return rc;
192 }
193
194 /**
195  * Get parent FID of the directory
196  *
197  * Read parent FID from linkEA, if that fails, then do lookup
198  * dotdot to get the parent FID.
199  *
200  * \param[in] env       execution environment
201  * \param[in] obj       object from which to find the parent FID
202  * \param[in] attr      attribute of the object
203  * \param[out] fid      fid to get the parent FID
204  *
205  * \retval              0 if getting the parent FID succeeds.
206  * \retval              negative errno if getting the parent FID fails.
207  **/
208 static inline int mdd_parent_fid(const struct lu_env *env,
209                                  struct mdd_object *obj,
210                                  const struct lu_attr *attr,
211                                  struct lu_fid *fid)
212 {
213         struct mdd_thread_info  *info = mdd_env_info(env);
214         struct linkea_data      ldata = { NULL };
215         struct lu_buf           *buf = &info->mti_link_buf;
216         struct lu_name          lname;
217         int                     rc = 0;
218
219         ENTRY;
220
221         LASSERT(S_ISDIR(mdd_object_type(obj)));
222
223         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
224         if (buf->lb_buf == NULL)
225                 GOTO(lookup, rc = 0);
226
227         ldata.ld_buf = buf;
228         rc = mdd_links_read_with_rec(env, obj, &ldata);
229         if (rc != 0)
230                 GOTO(lookup, rc);
231
232         LASSERT(ldata.ld_leh != NULL);
233         /* Directory should only have 1 parent */
234         if (ldata.ld_leh->leh_reccount > 1)
235                 GOTO(lookup, rc);
236
237         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
238
239         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
240         if (likely(fid_is_sane(fid)))
241                 RETURN(0);
242 lookup:
243         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
244         RETURN(rc);
245 }
246
247 /*
248  * For root fid use special function, which does not compare version component
249  * of fid. Version component is different for root fids on all MDTs.
250  */
251 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
252 {
253         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
254                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
255 }
256
257 /*
258  * return 1: if \a tfid is the fid of the ancestor of \a mo;
259  * return 0: if not;
260  * otherwise: values < 0, errors.
261  */
262 static int mdd_is_parent(const struct lu_env *env,
263                         struct mdd_device *mdd,
264                         struct mdd_object *mo,
265                         const struct lu_attr *attr,
266                         const struct lu_fid *tfid)
267 {
268         struct mdd_object *mp;
269         struct lu_fid *pfid;
270         int rc;
271
272         LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
273         pfid = &mdd_env_info(env)->mti_fid;
274
275         if (mdd_is_root(mdd, mdo2fid(mo)))
276                 return 0;
277
278         if (mdd_is_root(mdd, tfid))
279                 return 1;
280
281         rc = mdd_parent_fid(env, mo, attr, pfid);
282         if (rc)
283                 return rc;
284
285         while (1) {
286                 if (lu_fid_eq(pfid, tfid))
287                         return 1;
288
289                 if (mdd_is_root(mdd, pfid))
290                         return 0;
291
292                 mp = mdd_object_find(env, mdd, pfid);
293                 if (IS_ERR(mp))
294                         return PTR_ERR(mp);
295
296                 if (!mdd_object_exists(mp)) {
297                         mdd_object_put(env, mp);
298                         return -ENOENT;
299                 }
300
301                 rc = mdd_parent_fid(env, mp, attr, pfid);
302                 mdd_object_put(env, mp);
303                 if (rc)
304                         return rc;
305         }
306
307         return 0;
308 }
309
310 /*
311  * No permission check is needed.
312  *
313  * returns 1: if fid is ancestor of @mo;
314  * returns 0: if fid is not an ancestor of @mo;
315  * returns < 0: if error
316  */
317 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
318                   const struct lu_fid *fid)
319 {
320         struct mdd_device *mdd = mdo2mdd(mo);
321         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
322         int rc;
323         ENTRY;
324
325         if (!mdd_object_exists(md2mdd_obj(mo)))
326                 RETURN(-ENOENT);
327
328         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
329                 RETURN(-ENOTDIR);
330
331         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
332         if (rc != 0)
333                 RETURN(rc);
334
335         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
336         RETURN(rc);
337 }
338
339 /*
340  * Check that @dir contains no entries except (possibly) dot and dotdot.
341  *
342  * Returns:
343  *
344  *             0        empty
345  *      -ENOTDIR        not a directory object
346  *    -ENOTEMPTY        not empty
347  *           -ve        other error
348  *
349  */
350 static int mdd_dir_is_empty(const struct lu_env *env,
351                             struct mdd_object *dir)
352 {
353         struct dt_it     *it;
354         struct dt_object *obj;
355         const struct dt_it_ops *iops;
356         int result;
357         ENTRY;
358
359         obj = mdd_object_child(dir);
360         if (!dt_try_as_dir(env, obj))
361                 RETURN(-ENOTDIR);
362
363         iops = &obj->do_index_ops->dio_it;
364         it = iops->init(env, obj, LUDA_64BITHASH);
365         if (!IS_ERR(it)) {
366                 result = iops->get(env, it, (const struct dt_key *)"");
367                 if (result > 0) {
368                         int i;
369                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
370                                 result = iops->next(env, it);
371                         if (result == 0)
372                                 result = -ENOTEMPTY;
373                         else if (result == 1)
374                                 result = 0;
375                 } else if (result == 0)
376                         /*
377                          * Huh? Index contains no zero key?
378                          */
379                         result = -EIO;
380
381                 iops->put(env, it);
382                 iops->fini(env, it);
383         } else
384                 result = PTR_ERR(it);
385         RETURN(result);
386 }
387
388 /**
389  * Determine if the target object can be hard linked, and right now it only
390  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
391  * directory nlink count might exceed the maximum link count(see
392  * osd_object_ref_add), so it only check nlink for non-directories.
393  *
394  * \param[in] env       thread environment
395  * \param[in] obj       object being linked to
396  * \param[in] la        attributes of \a obj
397  *
398  * \retval              0 if \a obj can be hard linked
399  * \retval              negative error if \a obj is a directory or has too
400  *                      many links
401  */
402 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
403                           const struct lu_attr *la)
404 {
405         struct mdd_device *m = mdd_obj2mdd_dev(obj);
406         ENTRY;
407
408         LASSERT(la != NULL);
409
410         /* Subdir count limitation can be broken through
411          * (see osd_object_ref_add), so only check non-directory here. */
412         if (!S_ISDIR(la->la_mode) &&
413             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
414                 RETURN(-EMLINK);
415
416         RETURN(0);
417 }
418
419 /**
420  * Check whether it may create the cobj under the pobj.
421  *
422  * \param[in] env       execution environment
423  * \param[in] pobj      the parent directory
424  * \param[in] pattr     the attribute of the parent directory
425  * \param[in] cobj      the child to be created
426  * \param[in] check_perm        if check WRITE|EXEC permission for parent
427  *
428  * \retval              = 0 create the child under this dir is allowed
429  * \retval              negative errno create the child under this dir is
430  *                      not allowed
431  */
432 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
433                    const struct lu_attr *pattr, struct mdd_object *cobj,
434                    bool check_perm)
435 {
436         int rc = 0;
437         ENTRY;
438
439         if (cobj && mdd_object_exists(cobj))
440                 RETURN(-EEXIST);
441
442         if (mdd_is_dead_obj(pobj))
443                 RETURN(-ENOENT);
444
445         if (check_perm)
446                 rc = mdd_permission_internal_locked(env, pobj, pattr,
447                                                     MAY_WRITE | MAY_EXEC,
448                                                     MOR_TGT_PARENT);
449         RETURN(rc);
450 }
451
452 /*
453  * Check whether can unlink from the pobj in the case of "cobj == NULL".
454  */
455 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
456                    const struct lu_attr *pattr, const struct lu_attr *attr)
457 {
458         int rc;
459         ENTRY;
460
461         if (mdd_is_dead_obj(pobj))
462                 RETURN(-ENOENT);
463
464         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
465                 RETURN(-EPERM);
466
467         rc = mdd_permission_internal_locked(env, pobj, pattr,
468                                             MAY_WRITE | MAY_EXEC,
469                                             MOR_TGT_PARENT);
470         if (rc != 0)
471                 RETURN(rc);
472
473         if (pattr->la_flags & LUSTRE_APPEND_FL)
474                 RETURN(-EPERM);
475
476         RETURN(rc);
477 }
478
479 /*
480  * pobj == NULL is remote ops case, under such case, pobj's
481  * VTX feature has been checked already, no need check again.
482  */
483 static inline int mdd_is_sticky(const struct lu_env *env,
484                                 struct mdd_object *pobj,
485                                 const struct lu_attr *pattr,
486                                 struct mdd_object *cobj,
487                                 const struct lu_attr *cattr)
488 {
489         struct lu_ucred *uc = lu_ucred_assert(env);
490
491         if (pobj != NULL) {
492                 LASSERT(pattr != NULL);
493                 if (!(pattr->la_mode & S_ISVTX) ||
494                     (pattr->la_uid == uc->uc_fsuid))
495                         return 0;
496         }
497
498         LASSERT(cattr != NULL);
499         if (cattr->la_uid == uc->uc_fsuid)
500                 return 0;
501
502         return !md_capable(uc, CFS_CAP_FOWNER);
503 }
504
505 static int mdd_may_delete_entry(const struct lu_env *env,
506                                 struct mdd_object *pobj,
507                                 const struct lu_attr *pattr,
508                                 int check_perm)
509 {
510         ENTRY;
511
512         LASSERT(pobj != NULL);
513         if (!mdd_object_exists(pobj))
514                 RETURN(-ENOENT);
515
516         if (mdd_is_dead_obj(pobj))
517                 RETURN(-ENOENT);
518
519         if (check_perm) {
520                 int rc;
521                 rc = mdd_permission_internal_locked(env, pobj, pattr,
522                                             MAY_WRITE | MAY_EXEC,
523                                             MOR_TGT_PARENT);
524                 if (rc)
525                         RETURN(rc);
526         }
527
528         if (pattr->la_flags & LUSTRE_APPEND_FL)
529                 RETURN(-EPERM);
530
531         RETURN(0);
532 }
533
534 /*
535  * Check whether it may delete the cobj from the pobj.
536  * pobj maybe NULL
537  */
538 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
539                    const struct lu_attr *tpattr, struct mdd_object *tobj,
540                    const struct lu_attr *tattr, const struct lu_attr *cattr,
541                    int check_perm, int check_empty)
542 {
543         int rc = 0;
544         ENTRY;
545
546         if (tpobj) {
547                 LASSERT(tpattr != NULL);
548                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
549                 if (rc != 0)
550                         RETURN(rc);
551         }
552
553         if (tobj == NULL)
554                 RETURN(0);
555
556         if (!mdd_object_exists(tobj))
557                 RETURN(-ENOENT);
558
559         if (mdd_is_dead_obj(tobj))
560                 RETURN(-ESTALE);
561
562         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
563                 RETURN(-EPERM);
564
565         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
566                 RETURN(-EPERM);
567
568         /* additional check the rename case */
569         if (cattr) {
570                 if (S_ISDIR(cattr->la_mode)) {
571                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
572
573                         if (!S_ISDIR(tattr->la_mode))
574                                 RETURN(-ENOTDIR);
575
576                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
577                                 RETURN(-EBUSY);
578                 } else if (S_ISDIR(tattr->la_mode))
579                         RETURN(-EISDIR);
580         }
581
582         if (S_ISDIR(tattr->la_mode) && check_empty)
583                 rc = mdd_dir_is_empty(env, tobj);
584
585         RETURN(rc);
586 }
587
588 /**
589  * Check whether it can create the link file(linked to @src_obj) under
590  * the target directory(@tgt_obj), and src_obj has been locked by
591  * mdd_write_lock.
592  *
593  * \param[in] env       execution environment
594  * \param[in] tgt_obj   the target directory
595  * \param[in] tattr     attributes of target directory
596  * \param[in] lname     the link name
597  * \param[in] src_obj   source object for link
598  * \param[in] cattr     attributes for source object
599  *
600  * \retval              = 0 it is allowed to create the link file under tgt_obj
601  * \retval              negative error not allowed to create the link file
602  */
603 static int mdd_link_sanity_check(const struct lu_env *env,
604                                  struct mdd_object *tgt_obj,
605                                  const struct lu_attr *tattr,
606                                  const struct lu_name *lname,
607                                  struct mdd_object *src_obj,
608                                  const struct lu_attr *cattr)
609 {
610         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
611         int rc = 0;
612         ENTRY;
613
614         if (!mdd_object_exists(src_obj))
615                 RETURN(-ENOENT);
616
617         if (mdd_is_dead_obj(src_obj))
618                 RETURN(-ESTALE);
619
620         /* Local ops, no lookup before link, check filename length here. */
621         rc = mdd_name_check(m, lname);
622         if (rc < 0)
623                 RETURN(rc);
624
625         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
626                 RETURN(-EPERM);
627
628         if (S_ISDIR(mdd_object_type(src_obj)))
629                 RETURN(-EPERM);
630
631         LASSERT(src_obj != tgt_obj);
632         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
633         if (rc != 0)
634                 RETURN(rc);
635
636         rc = __mdd_may_link(env, src_obj, cattr);
637
638         RETURN(rc);
639 }
640
641 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
642                                    const char *name, struct thandle *handle)
643 {
644         struct dt_object *next = mdd_object_child(pobj);
645         int rc;
646         ENTRY;
647
648         if (dt_try_as_dir(env, next))
649                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
650         else
651                 rc = -ENOTDIR;
652
653         RETURN(rc);
654 }
655
656 static int __mdd_index_insert_only(const struct lu_env *env,
657                                    struct mdd_object *pobj,
658                                    const struct lu_fid *lf, __u32 type,
659                                    const char *name,
660                                    struct thandle *handle)
661 {
662         struct dt_object *next = mdd_object_child(pobj);
663         int               rc;
664         ENTRY;
665
666         if (dt_try_as_dir(env, next)) {
667                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
668                 struct lu_ucred         *uc  = lu_ucred_check(env);
669                 int                      ignore_quota;
670
671                 rec->rec_fid = lf;
672                 rec->rec_type = type;
673                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
674                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
675                                (const struct dt_key *)name, handle,
676                                ignore_quota);
677         } else {
678                 rc = -ENOTDIR;
679         }
680         RETURN(rc);
681 }
682
683 /* insert named index, add reference if isdir */
684 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
685                               const struct lu_fid *lf, __u32 type,
686                               const char *name, struct thandle *handle)
687 {
688         int rc;
689         ENTRY;
690
691         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
692         if (rc == 0 && S_ISDIR(type)) {
693                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
694                 mdo_ref_add(env, pobj, handle);
695                 mdd_write_unlock(env, pobj);
696         }
697
698         RETURN(rc);
699 }
700
701 /* delete named index, drop reference if isdir */
702 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
703                               const char *name, int is_dir,
704                               struct thandle *handle)
705 {
706         int               rc;
707         ENTRY;
708
709         rc = __mdd_index_delete_only(env, pobj, name, handle);
710         if (rc == 0 && is_dir) {
711                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
712                 mdo_ref_del(env, pobj, handle);
713                 mdd_write_unlock(env, pobj);
714         }
715
716         RETURN(rc);
717 }
718
719 static int mdd_llog_record_calc_size(const struct lu_env *env,
720                                      const struct lu_name *tname,
721                                      const struct lu_name *sname)
722 {
723         const struct lu_ucred   *uc = lu_ucred(env);
724         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
725         enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
726
727         if (sname != NULL)
728                 crf |= CLF_RENAME;
729
730         if (uc != NULL && uc->uc_jobid[0] != '\0')
731                 crf |= CLF_JOBID;
732
733         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
734                              changelog_rec_offset(crf, crfe) +
735                              (tname != NULL ? tname->ln_namelen : 0) +
736                              (sname != NULL ? 1 + sname->ln_namelen : 0));
737 }
738
739 int mdd_declare_changelog_store(const struct lu_env *env,
740                                 struct mdd_device *mdd,
741                                 enum changelog_rec_type type,
742                                 const struct lu_name *tname,
743                                 const struct lu_name *sname,
744                                 struct thandle *handle)
745 {
746         struct obd_device               *obd = mdd2obd_dev(mdd);
747         struct llog_ctxt                *ctxt;
748         struct llog_changelog_rec       *rec;
749         struct lu_buf                   *buf;
750         struct thandle                  *llog_th;
751         int                              reclen;
752         int                              rc;
753
754         if (!mdd_changelog_enabled(env, mdd, type))
755                 return 0;
756
757         reclen = mdd_llog_record_calc_size(env, tname, sname);
758         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
759         if (buf->lb_buf == NULL)
760                 return -ENOMEM;
761
762         rec = buf->lb_buf;
763         rec->cr_hdr.lrh_len = reclen;
764         rec->cr_hdr.lrh_type = CHANGELOG_REC;
765
766         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
767         if (ctxt == NULL)
768                 return -ENXIO;
769
770         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
771         if (IS_ERR(llog_th))
772                 GOTO(out_put, rc = PTR_ERR(llog_th));
773
774         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
775
776 out_put:
777         llog_ctxt_put(ctxt);
778
779         return rc;
780 }
781
782 /** Add a changelog entry \a rec to the changelog llog
783  * \param mdd
784  * \param rec
785  * \param handle - currently ignored since llogs start their own transaction;
786  *              this will hopefully be fixed in llog rewrite
787  * \retval 0 ok
788  */
789 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
790                         struct llog_changelog_rec *rec, struct thandle *th)
791 {
792         struct obd_device       *obd = mdd2obd_dev(mdd);
793         struct llog_ctxt        *ctxt;
794         struct thandle          *llog_th;
795         int                      rc;
796
797         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
798                                             changelog_rec_varsize(&rec->cr));
799
800         /* llog_lvfs_write_rec sets the llog tail len */
801         rec->cr_hdr.lrh_type = CHANGELOG_REC;
802         rec->cr.cr_time = cl_time();
803
804         spin_lock(&mdd->mdd_cl.mc_lock);
805         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
806          * but as long as the MDD transactions are ordered correctly for e.g.
807          * rename conflicts, I don't think this should matter. */
808         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
809         spin_unlock(&mdd->mdd_cl.mc_lock);
810
811         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
812         if (ctxt == NULL)
813                 return -ENXIO;
814
815         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
816         if (IS_ERR(llog_th))
817                 GOTO(out_put, rc = PTR_ERR(llog_th));
818
819         /* nested journal transaction */
820         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
821
822         /* time to recover some space ?? */
823         if (likely(!mdd->mdd_changelog_gc ||
824                    mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
825                    mdd->mdd_changelog_min_gc_interval >=
826                         ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
827                 /* save a spin_lock trip */
828                 goto out_put;
829         spin_lock(&mdd->mdd_cl.mc_lock);
830         if (likely(mdd->mdd_changelog_gc &&
831                      mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
832                      ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
833                         mdd->mdd_changelog_min_gc_interval)) {
834                 if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
835                              mdd->mdd_changelog_min_free_cat_entries ||
836                              OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
837                         CWARN("%s:%s low on changelog_catalog free entries, "
838                               "starting ChangeLog garbage collection thread\n",
839                               obd->obd_name,
840                               OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
841                                 " simulate" : "");
842
843                         /* indicate further kthread run will occur outside
844                          * right after current journal transaction filling has
845                          * completed
846                          */
847                         mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
848                 }
849                 /* next check in mdd_changelog_min_gc_interval anyway
850                  */
851                 mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
852         }
853         spin_unlock(&mdd->mdd_cl.mc_lock);
854 out_put:
855         llog_ctxt_put(ctxt);
856         if (rc > 0)
857                 rc = 0;
858         return rc;
859 }
860
861 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
862                                          const struct lu_fid *sfid,
863                                          const struct lu_fid *spfid,
864                                          const struct lu_name *sname)
865 {
866         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
867         size_t extsize;
868
869         LASSERT(sfid != NULL);
870         LASSERT(spfid != NULL);
871         LASSERT(sname != NULL);
872
873         extsize = sname->ln_namelen + 1;
874
875         rnm->cr_sfid = *sfid;
876         rnm->cr_spfid = *spfid;
877
878         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
879         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
880         rec->cr_namelen += extsize;
881 }
882
883 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
884 {
885         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
886
887         if (jobid == NULL || jobid[0] == '\0')
888                 return;
889
890         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
891 }
892
893 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
894 {
895         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
896
897         ef->cr_extra_flags = eflags;
898 }
899
900 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
901                                     __u64 uid, __u64 gid)
902 {
903         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
904
905         uidgid->cr_uid = uid;
906         uidgid->cr_gid = gid;
907 }
908
909 void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
910                                  lnet_nid_t nid)
911 {
912         struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
913
914         clnid->cr_nid = nid;
915 }
916
917 void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, int flags)
918 {
919         struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
920
921         omd->cr_openflags = (__u32)flags;
922 }
923
924 void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
925                                    const char *xattr_name)
926 {
927         struct changelog_ext_xattr    *xattr = changelog_rec_xattr(rec);
928
929         strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
930 }
931
932 /** Store a namespace change changelog record
933  * If this fails, we must fail the whole transaction; we don't
934  * want the change to commit without the log entry.
935  * \param target - mdd_object of change
936  * \param tpfid - target parent dir/object fid
937  * \param sfid - source object fid
938  * \param spfid - source parent fid
939  * \param tname - target name string
940  * \param sname - source name string
941  * \param handle - transaction handle
942  */
943 int mdd_changelog_ns_store(const struct lu_env *env,
944                            struct mdd_device *mdd,
945                            enum changelog_rec_type type,
946                            enum changelog_rec_flags crf,
947                            struct mdd_object *target,
948                            const struct lu_fid *tpfid,
949                            const struct lu_fid *sfid,
950                            const struct lu_fid *spfid,
951                            const struct lu_name *tname,
952                            const struct lu_name *sname,
953                            struct thandle *handle)
954 {
955         const struct lu_ucred           *uc = lu_ucred(env);
956         struct llog_changelog_rec       *rec;
957         struct lu_buf                   *buf;
958         int                              reclen;
959         __u64                            xflags = CLFE_INVALID;
960         int                              rc;
961         ENTRY;
962
963         if (!mdd_changelog_enabled(env, mdd, type))
964                 RETURN(0);
965
966         LASSERT(tpfid != NULL);
967         LASSERT(tname != NULL);
968         LASSERT(handle != NULL);
969
970         reclen = mdd_llog_record_calc_size(env, tname, sname);
971         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
972         if (buf->lb_buf == NULL)
973                 RETURN(-ENOMEM);
974         rec = buf->lb_buf;
975
976         crf &= CLF_FLAGMASK;
977         crf |= CLF_EXTRA_FLAGS;
978
979         if (uc) {
980                 if (uc->uc_jobid[0] != '\0')
981                         crf |= CLF_JOBID;
982                 xflags |= CLFE_UIDGID;
983                 xflags |= CLFE_NID;
984         }
985
986         if (sname != NULL)
987                 crf |= CLF_RENAME;
988         else
989                 crf |= CLF_VERSION;
990
991         rec->cr.cr_flags = crf;
992
993         if (crf & CLF_EXTRA_FLAGS) {
994                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
995                 if (xflags & CLFE_UIDGID)
996                         mdd_changelog_rec_extra_uidgid(&rec->cr,
997                                                        uc->uc_uid, uc->uc_gid);
998                 if (xflags & CLFE_NID)
999                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
1000         }
1001
1002         rec->cr.cr_type = (__u32)type;
1003         rec->cr.cr_pfid = *tpfid;
1004         rec->cr.cr_namelen = tname->ln_namelen;
1005         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1006
1007         if (crf & CLF_RENAME)
1008                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1009
1010         if (crf & CLF_JOBID)
1011                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1012
1013         if (likely(target != NULL)) {
1014                 rec->cr.cr_tfid = *mdo2fid(target);
1015                 target->mod_cltime = ktime_get();
1016         } else {
1017                 fid_zero(&rec->cr.cr_tfid);
1018         }
1019
1020         rc = mdd_changelog_store(env, mdd, rec, handle);
1021         if (rc < 0) {
1022                 CERROR("%s: cannot store changelog record: type = %d, "
1023                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1024                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1025                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1026                 return -EFAULT;
1027         }
1028
1029         return 0;
1030 }
1031
1032 static int __mdd_links_add(const struct lu_env *env,
1033                            struct mdd_object *mdd_obj,
1034                            struct linkea_data *ldata,
1035                            const struct lu_name *lname,
1036                            const struct lu_fid *pfid,
1037                            int first, int check)
1038 {
1039         int rc;
1040
1041         if (ldata->ld_leh == NULL) {
1042                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1043                 if (rc) {
1044                         if (rc != -ENODATA)
1045                                 return rc;
1046                         rc = linkea_data_new(ldata,
1047                                              &mdd_env_info(env)->mti_link_buf);
1048                         if (rc)
1049                                 return rc;
1050                 }
1051         }
1052
1053         if (check) {
1054                 rc = linkea_links_find(ldata, lname, pfid);
1055                 if (rc && rc != -ENOENT)
1056                         return rc;
1057                 if (rc == 0)
1058                         return -EEXIST;
1059         }
1060
1061         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1062                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1063
1064                 *tfid = *pfid;
1065                 tfid->f_ver = ~0;
1066                 linkea_add_buf(ldata, lname, tfid);
1067         }
1068
1069         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1070                 linkea_add_buf(ldata, lname, pfid);
1071
1072         return linkea_add_buf(ldata, lname, pfid);
1073 }
1074
1075 static int __mdd_links_del(const struct lu_env *env,
1076                            struct mdd_object *mdd_obj,
1077                            struct linkea_data *ldata,
1078                            const struct lu_name *lname,
1079                            const struct lu_fid *pfid)
1080 {
1081         int rc;
1082
1083         if (ldata->ld_leh == NULL) {
1084                 rc = mdd_links_read(env, mdd_obj, ldata);
1085                 if (rc)
1086                         return rc;
1087         }
1088
1089         rc = linkea_links_find(ldata, lname, pfid);
1090         if (rc)
1091                 return rc;
1092
1093         linkea_del_buf(ldata, lname);
1094         return 0;
1095 }
1096
1097 static int mdd_linkea_prepare(const struct lu_env *env,
1098                               struct mdd_object *mdd_obj,
1099                               const struct lu_fid *oldpfid,
1100                               const struct lu_name *oldlname,
1101                               const struct lu_fid *newpfid,
1102                               const struct lu_name *newlname,
1103                               int first, int check,
1104                               struct linkea_data *ldata)
1105 {
1106         int rc = 0;
1107         ENTRY;
1108
1109         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1110                 RETURN(0);
1111
1112         LASSERT(oldpfid != NULL || newpfid != NULL);
1113
1114         if (mdd_obj->mod_flags & DEAD_OBJ)
1115                 /* Unnecessary to update linkEA for dead object.  */
1116                 RETURN(0);
1117
1118         if (oldpfid != NULL) {
1119                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1120                 if (rc) {
1121                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1122                                 RETURN(rc);
1123
1124                         /* No changes done. */
1125                         rc = 0;
1126                 }
1127         }
1128
1129         /* If renaming, add the new record */
1130         if (newpfid != NULL)
1131                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1132                                      first, check);
1133
1134         RETURN(rc);
1135 }
1136
1137 int mdd_links_rename(const struct lu_env *env,
1138                      struct mdd_object *mdd_obj,
1139                      const struct lu_fid *oldpfid,
1140                      const struct lu_name *oldlname,
1141                      const struct lu_fid *newpfid,
1142                      const struct lu_name *newlname,
1143                      struct thandle *handle,
1144                      struct linkea_data *ldata,
1145                      int first, int check)
1146 {
1147         int rc = 0;
1148         ENTRY;
1149
1150         if (ldata == NULL) {
1151                 ldata = &mdd_env_info(env)->mti_link_data;
1152                 memset(ldata, 0, sizeof(*ldata));
1153                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1154                                         newpfid, newlname, first, check, ldata);
1155                 if (rc)
1156                         GOTO(out, rc);
1157         }
1158
1159         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1160                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1161
1162         GOTO(out, rc);
1163
1164 out:
1165         if (rc != 0) {
1166                 if (newlname == NULL)
1167                         CERROR("link_ea add failed %d "DFID"\n",
1168                                rc, PFID(mdd_object_fid(mdd_obj)));
1169                 else if (oldpfid == NULL)
1170                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1171                                newlname->ln_namelen, newlname->ln_name, rc,
1172                                PFID(mdd_object_fid(mdd_obj)));
1173                 else if (newpfid == NULL)
1174                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1175                                oldlname->ln_namelen, oldlname->ln_name, rc,
1176                                PFID(mdd_object_fid(mdd_obj)));
1177                 else
1178                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1179                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1180                                newlname->ln_namelen, newlname->ln_name, rc,
1181                                PFID(mdd_object_fid(mdd_obj)));
1182         }
1183
1184         if (is_vmalloc_addr(ldata->ld_buf))
1185                 /* if we vmalloced a large buffer drop it */
1186                 lu_buf_free(ldata->ld_buf);
1187
1188         return rc;
1189 }
1190
1191 static inline int mdd_links_add(const struct lu_env *env,
1192                                 struct mdd_object *mdd_obj,
1193                                 const struct lu_fid *pfid,
1194                                 const struct lu_name *lname,
1195                                 struct thandle *handle,
1196                                 struct linkea_data *ldata, int first)
1197 {
1198         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1199                                 pfid, lname, handle, ldata, first, 0);
1200 }
1201
1202 static inline int mdd_links_del(const struct lu_env *env,
1203                                 struct mdd_object *mdd_obj,
1204                                 const struct lu_fid *pfid,
1205                                 const struct lu_name *lname,
1206                                 struct thandle *handle)
1207 {
1208         return mdd_links_rename(env, mdd_obj, pfid, lname,
1209                                 NULL, NULL, handle, NULL, 0, 0);
1210 }
1211
1212 /** Read the link EA into a temp buffer.
1213  * Uses the name_buf since it is generally large.
1214  * \retval IS_ERR err
1215  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1216  */
1217 struct lu_buf *mdd_links_get(const struct lu_env *env,
1218                              struct mdd_object *mdd_obj)
1219 {
1220         struct linkea_data ldata = { NULL };
1221         int rc;
1222
1223         rc = mdd_links_read(env, mdd_obj, &ldata);
1224         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1225 }
1226
1227 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1228                     struct linkea_data *ldata, struct thandle *handle)
1229 {
1230         const struct lu_buf *buf;
1231         int                 rc;
1232
1233         if (ldata == NULL || ldata->ld_buf == NULL ||
1234             ldata->ld_leh == NULL)
1235                 return 0;
1236
1237         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1238                 return 0;
1239
1240 again:
1241         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1242                                 ldata->ld_leh->leh_len);
1243         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1244         if (unlikely(rc == -ENOSPC)) {
1245                 rc = linkea_overflow_shrink(ldata);
1246                 if (likely(rc > 0))
1247                         goto again;
1248         }
1249
1250         return rc;
1251 }
1252
1253 static int mdd_declare_links_add(const struct lu_env *env,
1254                                  struct mdd_object *mdd_obj,
1255                                  struct thandle *handle,
1256                                  struct linkea_data *ldata)
1257 {
1258         int     rc;
1259         int     ea_len;
1260         void    *linkea;
1261
1262         if (ldata != NULL && ldata->ld_leh != NULL) {
1263                 ea_len = ldata->ld_leh->leh_len;
1264                 linkea = ldata->ld_buf->lb_buf;
1265         } else {
1266                 ea_len = MAX_LINKEA_SIZE;
1267                 linkea = NULL;
1268         }
1269
1270         rc = mdo_declare_xattr_set(env, mdd_obj,
1271                                    mdd_buf_get_const(env, linkea, ea_len),
1272                                    XATTR_NAME_LINK, 0, handle);
1273
1274         return rc;
1275 }
1276
1277 static inline int mdd_declare_links_del(const struct lu_env *env,
1278                                         struct mdd_object *c,
1279                                         struct thandle *handle)
1280 {
1281         int rc = 0;
1282
1283         /* For directory, the linkEA will be removed together
1284          * with the object. */
1285         if (!S_ISDIR(mdd_object_type(c)))
1286                 rc = mdd_declare_links_add(env, c, handle, NULL);
1287
1288         return rc;
1289 }
1290
1291 static int mdd_declare_link(const struct lu_env *env,
1292                             struct mdd_device *mdd,
1293                             struct mdd_object *p,
1294                             struct mdd_object *c,
1295                             const struct lu_name *name,
1296                             struct thandle *handle,
1297                             struct lu_attr *la,
1298                             struct linkea_data *data)
1299 {
1300         struct lu_fid tfid = *mdo2fid(c);
1301         int rc;
1302
1303         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1304                 tfid.f_oid = cfs_fail_val;
1305
1306         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1307                                       name->ln_name, handle);
1308         if (rc != 0)
1309                 return rc;
1310
1311         rc = mdo_declare_ref_add(env, c, handle);
1312         if (rc != 0)
1313                 return rc;
1314
1315         la->la_valid = LA_CTIME | LA_MTIME;
1316         rc = mdo_declare_attr_set(env, p, la, handle);
1317         if (rc != 0)
1318                 return rc;
1319
1320         la->la_valid = LA_CTIME;
1321         rc = mdo_declare_attr_set(env, c, la, handle);
1322         if (rc != 0)
1323                 return rc;
1324
1325         rc = mdd_declare_links_add(env, c, handle, data);
1326         if (rc != 0)
1327                 return rc;
1328
1329         rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
1330                                          handle);
1331
1332         return rc;
1333 }
1334
1335 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1336                     struct md_object *src_obj, const struct lu_name *lname,
1337                     struct md_attr *ma)
1338 {
1339         const char *name = lname->ln_name;
1340         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1341         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1342         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1343         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1344         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1345         struct mdd_device *mdd = mdo2mdd(src_obj);
1346         struct thandle *handle;
1347         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1348         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1349         int rc;
1350         ENTRY;
1351
1352         rc = mdd_la_get(env, mdd_sobj, cattr);
1353         if (rc != 0)
1354                 RETURN(rc);
1355
1356         rc = mdd_la_get(env, mdd_tobj, tattr);
1357         if (rc != 0)
1358                 RETURN(rc);
1359
1360         /*
1361          * If we are using project inheritance, we only allow hard link
1362          * creation in our tree when the project IDs are the same;
1363          * otherwise the tree quota mechanism could be circumvented.
1364          */
1365         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1366             (tattr->la_projid != cattr->la_projid))
1367                 RETURN(-EXDEV);
1368
1369         handle = mdd_trans_create(env, mdd);
1370         if (IS_ERR(handle))
1371                 GOTO(out_pending, rc = PTR_ERR(handle));
1372
1373         memset(ldata, 0, sizeof(*ldata));
1374
1375         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1376         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1377
1378         /* Note: even this function will change ldata, but it comes from
1379          * thread_info, which is completely temporary and only seen in
1380          * this function, so we do not need reset ldata once it fails.*/
1381         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1382                                 lname, 0, 0, ldata);
1383         if (rc != 0)
1384                 GOTO(stop, rc);
1385
1386         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1387                               la, ldata);
1388         if (rc)
1389                 GOTO(stop, rc);
1390
1391         rc = mdd_trans_start(env, mdd, handle);
1392         if (rc)
1393                 GOTO(stop, rc);
1394
1395         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1396         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1397                                    cattr);
1398         if (rc)
1399                 GOTO(out_unlock, rc);
1400
1401         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1402                 rc = mdo_ref_add(env, mdd_sobj, handle);
1403                 if (rc != 0)
1404                         GOTO(out_unlock, rc);
1405         }
1406
1407         *tfid = *mdo2fid(mdd_sobj);
1408         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1409                 tfid->f_oid = cfs_fail_val;
1410
1411         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1412                                      mdd_object_type(mdd_sobj), name, handle);
1413         if (rc != 0) {
1414                 mdo_ref_del(env, mdd_sobj, handle);
1415                 GOTO(out_unlock, rc);
1416         }
1417
1418         la->la_valid = LA_CTIME | LA_MTIME;
1419         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1420         if (rc)
1421                 GOTO(out_unlock, rc);
1422
1423         la->la_valid = LA_CTIME;
1424         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1425         if (rc == 0)
1426                 /* Note: The failure of links_add should not cause the
1427                  * link failure, so do not check return value. */
1428                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1429                               lname, handle, ldata, 0);
1430
1431         EXIT;
1432 out_unlock:
1433         mdd_write_unlock(env, mdd_sobj);
1434         if (rc == 0)
1435                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1436                                             mdo2fid(mdd_tobj), NULL, NULL,
1437                                             lname, NULL, handle);
1438 stop:
1439         rc = mdd_trans_stop(env, mdd, rc, handle);
1440         if (is_vmalloc_addr(ldata->ld_buf))
1441                 /* if we vmalloced a large buffer drop it */
1442                 lu_buf_free(ldata->ld_buf);
1443 out_pending:
1444         return rc;
1445 }
1446
1447 static int mdd_mark_orphan_object(const struct lu_env *env,
1448                                 struct mdd_object *obj, struct thandle *handle,
1449                                 bool declare)
1450 {
1451         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1452         int rc;
1453
1454         if (!S_ISDIR(mdd_object_type(obj)))
1455                 return 0;
1456
1457         attr->la_valid = LA_FLAGS;
1458         attr->la_flags = LUSTRE_ORPHAN_FL;
1459
1460         if (declare)
1461                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1462         else
1463                 rc = mdo_attr_set(env, obj, attr, handle);
1464
1465         return rc;
1466 }
1467
1468 static int mdd_declare_finish_unlink(const struct lu_env *env,
1469                                      struct mdd_object *obj,
1470                                      struct thandle *handle)
1471 {
1472         int rc;
1473
1474         /* Sigh, we do not know if the unlink object will become orphan in
1475          * declare phase, but fortunately the flags here does not matter
1476          * in current declare implementation */
1477         rc = mdd_mark_orphan_object(env, obj, handle, true);
1478         if (rc != 0)
1479                 return rc;
1480
1481         rc = mdo_declare_destroy(env, obj, handle);
1482         if (rc != 0)
1483                 return rc;
1484
1485         rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
1486         if (rc != 0)
1487                 return rc;
1488
1489         return mdd_declare_links_del(env, obj, handle);
1490 }
1491
1492 /* caller should take a lock before calling */
1493 int mdd_finish_unlink(const struct lu_env *env,
1494                       struct mdd_object *obj, struct md_attr *ma,
1495                       const struct mdd_object *pobj,
1496                       const struct lu_name *lname,
1497                       struct thandle *th)
1498 {
1499         int rc = 0;
1500         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1501         ENTRY;
1502
1503         LASSERT(mdd_write_locked(env, obj) != 0);
1504
1505         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1506                 /* add new orphan and the object
1507                  * will be deleted during mdd_close() */
1508                 obj->mod_flags |= DEAD_OBJ;
1509                 if (obj->mod_count) {
1510                         rc = mdd_orphan_insert(env, obj, th);
1511                         if (rc == 0)
1512                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1513                                         "orphan list, open count = %d\n",
1514                                         PFID(mdd_object_fid(obj)),
1515                                         obj->mod_count);
1516                         else
1517                                 CERROR("Object "DFID" fail to be an orphan, "
1518                                        "open count = %d, maybe cause failed "
1519                                        "open replay\n",
1520                                         PFID(mdd_object_fid(obj)),
1521                                         obj->mod_count);
1522
1523                         /* mark object as an orphan here, not
1524                          * before mdd_orphan_insert() as racing
1525                          * mdd_la_get() may propagate ORPHAN_OBJ
1526                          * causing the asserition */
1527                         rc = mdd_mark_orphan_object(env, obj, th, false);
1528                 } else {
1529                         rc = mdo_destroy(env, obj, th);
1530                 }
1531         } else if (!is_dir) {
1532                 /* old files may not have link ea; ignore errors */
1533                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1534         }
1535
1536         RETURN(rc);
1537 }
1538
1539 /*
1540  * pobj maybe NULL
1541  * has mdd_write_lock on cobj already, but not on pobj yet
1542  */
1543 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1544                             const struct lu_attr *pattr,
1545                             struct mdd_object *cobj,
1546                             const struct lu_attr *cattr)
1547 {
1548         int rc;
1549         ENTRY;
1550
1551         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1552
1553         RETURN(rc);
1554 }
1555
1556 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1557                               struct mdd_object *p, struct mdd_object *c,
1558                               const struct lu_name *name, struct md_attr *ma,
1559                               struct thandle *handle, int no_name, int is_dir)
1560 {
1561         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1562         int              rc;
1563
1564         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1565                 if (likely(no_name == 0)) {
1566                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1567                                                       handle);
1568                         if (rc != 0)
1569                                 return rc;
1570                 }
1571
1572                 if (is_dir != 0) {
1573                         rc = mdo_declare_ref_del(env, p, handle);
1574                         if (rc != 0)
1575                                 return rc;
1576                 }
1577         }
1578
1579         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1580         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1581         la->la_valid = LA_CTIME | LA_MTIME;
1582         rc = mdo_declare_attr_set(env, p, la, handle);
1583         if (rc)
1584                 return rc;
1585
1586         if (c != NULL) {
1587                 rc = mdo_declare_ref_del(env, c, handle);
1588                 if (rc)
1589                         return rc;
1590
1591                 rc = mdo_declare_ref_del(env, c, handle);
1592                 if (rc)
1593                         return rc;
1594
1595                 la->la_valid = LA_CTIME;
1596                 rc = mdo_declare_attr_set(env, c, la, handle);
1597                 if (rc)
1598                         return rc;
1599
1600                 rc = mdd_declare_finish_unlink(env, c, handle);
1601                 if (rc)
1602                         return rc;
1603
1604                 /* FIXME: need changelog for remove entry */
1605                 rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
1606                                                  NULL, handle);
1607         }
1608
1609         return rc;
1610 }
1611
1612 /*
1613  * test if a file has an HSM archive
1614  * if HSM attributes are not found in ma update them from
1615  * HSM xattr
1616  */
1617 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1618                                    struct mdd_object *obj,
1619                                    struct md_attr *ma)
1620 {
1621         ENTRY;
1622
1623         if (!(ma->ma_valid & MA_HSM)) {
1624                 /* no HSM MD provided, read xattr */
1625                 struct lu_buf   *hsm_buf;
1626                 const size_t     buflen = sizeof(struct hsm_attrs);
1627                 int              rc;
1628
1629                 hsm_buf = mdd_buf_get(env, NULL, 0);
1630                 lu_buf_alloc(hsm_buf, buflen);
1631                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1632                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1633                 lu_buf_free(hsm_buf);
1634                 if (rc < 0)
1635                         RETURN(false);
1636
1637                 ma->ma_valid |= MA_HSM;
1638         }
1639         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1640                 RETURN(true);
1641         RETURN(false);
1642 }
1643
1644 /**
1645  * Delete name entry and the object.
1646  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1647  * does not exist for this object, and it could only happen during resending
1648  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1649  * is also needed in this case(needed by changelog), so we have to add another
1650  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1651  * the ENOENT failure should be able to be fixed by redo mechanism.
1652  */
1653 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1654                       struct md_object *cobj, const struct lu_name *lname,
1655                       struct md_attr *ma, int no_name)
1656 {
1657         const char *name = lname->ln_name;
1658         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1659         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1660         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1661         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1662         struct mdd_object *mdd_cobj = NULL;
1663         struct mdd_device *mdd = mdo2mdd(pobj);
1664         struct thandle    *handle;
1665         int rc, is_dir = 0, cl_flags = 0;
1666         ENTRY;
1667
1668         /* let shutdown to start */
1669         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
1670
1671         /* cobj == NULL means only delete name entry */
1672         if (likely(cobj != NULL)) {
1673                 mdd_cobj = md2mdd_obj(cobj);
1674                 if (mdd_object_exists(mdd_cobj) == 0)
1675                         RETURN(-ENOENT);
1676         }
1677
1678         rc = mdd_la_get(env, mdd_pobj, pattr);
1679         if (rc)
1680                 RETURN(rc);
1681
1682         if (likely(mdd_cobj != NULL)) {
1683                 /* fetch cattr */
1684                 rc = mdd_la_get(env, mdd_cobj, cattr);
1685                 if (rc)
1686                         RETURN(rc);
1687
1688                 is_dir = S_ISDIR(cattr->la_mode);
1689                 /* search for an existing archive.
1690                  * we should check ahead as the object
1691                  * can be destroyed in this transaction */
1692                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1693                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1694         }
1695
1696         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1697         if (rc)
1698                 RETURN(rc);
1699
1700         handle = mdd_trans_create(env, mdd);
1701         if (IS_ERR(handle))
1702                 RETURN(PTR_ERR(handle));
1703
1704         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1705                                 lname, ma, handle, no_name, is_dir);
1706         if (rc)
1707                 GOTO(stop, rc);
1708
1709         rc = mdd_trans_start(env, mdd, handle);
1710         if (rc)
1711                 GOTO(stop, rc);
1712
1713         if (likely(mdd_cobj != NULL))
1714                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1715
1716         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1717                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1718                 if (rc)
1719                         GOTO(cleanup, rc);
1720         }
1721
1722         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1723             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1724                 GOTO(cleanup, rc = 0);
1725
1726         if (likely(mdd_cobj != NULL)) {
1727                 rc = mdo_ref_del(env, mdd_cobj, handle);
1728                 if (rc != 0) {
1729                         __mdd_index_insert_only(env, mdd_pobj,
1730                                                 mdo2fid(mdd_cobj),
1731                                                 mdd_object_type(mdd_cobj),
1732                                                 name, handle);
1733                         GOTO(cleanup, rc);
1734                 }
1735
1736                 if (is_dir)
1737                         /* unlink dot */
1738                         mdo_ref_del(env, mdd_cobj, handle);
1739
1740                 /* fetch updated nlink */
1741                 rc = mdd_la_get(env, mdd_cobj, cattr);
1742                 if (rc)
1743                         GOTO(cleanup, rc);
1744         }
1745
1746         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1747         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1748
1749         la->la_valid = LA_CTIME | LA_MTIME;
1750         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1751         if (rc)
1752                 GOTO(cleanup, rc);
1753
1754         /* Enough for only unlink the entry */
1755         if (unlikely(mdd_cobj == NULL))
1756                 GOTO(stop, rc);
1757
1758         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1759                 /* update ctime of an unlinked file only if it is still
1760                  * opened or a link still exists */
1761                 la->la_valid = LA_CTIME;
1762                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1763                 if (rc)
1764                         GOTO(cleanup, rc);
1765         }
1766
1767         /* XXX: this transfer to ma will be removed with LOD/OSP */
1768         ma->ma_attr = *cattr;
1769         ma->ma_valid |= MA_INODE;
1770         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1771         if (rc != 0)
1772                 GOTO(cleanup, rc);
1773
1774         /* fetch updated nlink */
1775         rc = mdd_la_get(env, mdd_cobj, cattr);
1776         /* if object is removed then we can't get its attrs,
1777          * use last get */
1778         if (rc == -ENOENT) {
1779                 cattr->la_nlink = 0;
1780                 rc = 0;
1781         }
1782
1783         if (cattr->la_nlink == 0) {
1784                 ma->ma_attr = *cattr;
1785                 ma->ma_valid |= MA_INODE;
1786         }
1787
1788         EXIT;
1789 cleanup:
1790         if (likely(mdd_cobj != NULL))
1791                 mdd_write_unlock(env, mdd_cobj);
1792
1793         if (rc == 0) {
1794                 if (cattr->la_nlink == 0)
1795                         cl_flags |= CLF_UNLINK_LAST;
1796                 else
1797                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1798
1799                 rc = mdd_changelog_ns_store(env, mdd,
1800                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1801                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1802                         handle);
1803         }
1804
1805 stop:
1806         rc = mdd_trans_stop(env, mdd, rc, handle);
1807
1808         return rc;
1809 }
1810
1811 /*
1812  * The permission has been checked when obj created, no need check again.
1813  */
1814 static int mdd_cd_sanity_check(const struct lu_env *env,
1815                                struct mdd_object *obj)
1816 {
1817         ENTRY;
1818
1819         /* EEXIST check */
1820         if (!obj || mdd_is_dead_obj(obj))
1821                 RETURN(-ENOENT);
1822
1823         RETURN(0);
1824 }
1825
1826 static int mdd_create_data(const struct lu_env *env,
1827                            struct md_object *pobj,
1828                            struct md_object *cobj,
1829                            const struct md_op_spec *spec,
1830                            struct md_attr *ma)
1831 {
1832         struct mdd_device *mdd = mdo2mdd(cobj);
1833         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1834         struct mdd_object *son = md2mdd_obj(cobj);
1835         struct thandle    *handle;
1836         const struct lu_buf *buf;
1837         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1838         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1839         int                rc;
1840         ENTRY;
1841
1842         rc = mdd_cd_sanity_check(env, son);
1843         if (rc)
1844                 RETURN(rc);
1845
1846         if (!md_should_create(spec->sp_cr_flags))
1847                 RETURN(0);
1848
1849         /*
1850          * there are following use cases for this function:
1851          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1852          *    striping can be specified or not
1853          * 2) CMD?
1854          */
1855         rc = mdd_la_get(env, son, attr);
1856         if (rc)
1857                 RETURN(rc);
1858
1859         /* calling ->ah_make_hint() is used to transfer information from parent */
1860         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1861
1862         handle = mdd_trans_create(env, mdd);
1863         if (IS_ERR(handle))
1864                 GOTO(out_free, rc = PTR_ERR(handle));
1865
1866         /*
1867          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1868          * Should this be fixed?
1869          */
1870         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1871                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1872                spec->sp_cr_flags, spec->no_create);
1873
1874         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1875                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1876                                         spec->u.sp_ea.eadatalen);
1877         } else {
1878                 buf = &LU_BUF_NULL;
1879         }
1880
1881         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1882                                   XATTR_NAME_LOV, 0, handle);
1883         if (rc)
1884                 GOTO(stop, rc);
1885
1886         rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
1887                                          handle);
1888         if (rc)
1889                 GOTO(stop, rc);
1890
1891         rc = mdd_trans_start(env, mdd, handle);
1892         if (rc)
1893                 GOTO(stop, rc);
1894
1895         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1896                           0, handle);
1897
1898         if (rc)
1899                 GOTO(stop, rc);
1900
1901         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1902
1903 stop:
1904         rc = mdd_trans_stop(env, mdd, rc, handle);
1905
1906 out_free:
1907         RETURN(rc);
1908 }
1909
1910 static int mdd_declare_object_initialize(const struct lu_env *env,
1911                                          struct mdd_object *parent,
1912                                          struct mdd_object *child,
1913                                          const struct lu_attr *attr,
1914                                          struct thandle *handle)
1915 {
1916         int rc;
1917         ENTRY;
1918
1919         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1920         if (!S_ISDIR(attr->la_mode))
1921                 RETURN(0);
1922
1923         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1924                                       dot, handle);
1925         if (rc != 0)
1926                 RETURN(rc);
1927
1928         rc = mdo_declare_ref_add(env, child, handle);
1929         if (rc != 0)
1930                 RETURN(rc);
1931
1932         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1933                                       dotdot, handle);
1934
1935         RETURN(rc);
1936 }
1937
1938 static int mdd_object_initialize(const struct lu_env *env,
1939                                  const struct lu_fid *pfid,
1940                                  struct mdd_object *child,
1941                                  struct lu_attr *attr, struct thandle *handle,
1942                                  const struct md_op_spec *spec)
1943 {
1944         int rc = 0;
1945         ENTRY;
1946
1947         if (S_ISDIR(attr->la_mode)) {
1948                 /* Add "." and ".." for newly created dir */
1949                 mdo_ref_add(env, child, handle);
1950                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1951                                              S_IFDIR, dot, handle);
1952                 if (rc == 0)
1953                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1954                                                      dotdot, handle);
1955                 if (rc != 0)
1956                         mdo_ref_del(env, child, handle);
1957         }
1958
1959         RETURN(rc);
1960 }
1961
1962 /**
1963  * This function checks whether it can create a file/dir under the
1964  * directory(@pobj). The directory(@pobj) is not being locked by
1965  * mdd lock.
1966  *
1967  * \param[in] env       execution environment
1968  * \param[in] pobj      the directory to create files
1969  * \param[in] pattr     the attributes of the directory
1970  * \param[in] lname     the name of the created file/dir
1971  * \param[in] cattr     the attributes of the file/dir
1972  * \param[in] spec      create specification
1973  *
1974  * \retval              = 0 it is allowed to create file/dir under
1975  *                      the directory
1976  * \retval              negative error not allowed to create file/dir
1977  *                      under the directory
1978  */
1979 static int mdd_create_sanity_check(const struct lu_env *env,
1980                                    struct md_object *pobj,
1981                                    const struct lu_attr *pattr,
1982                                    const struct lu_name *lname,
1983                                    struct lu_attr *cattr,
1984                                    struct md_op_spec *spec)
1985 {
1986         struct mdd_thread_info *info = mdd_env_info(env);
1987         struct lu_fid     *fid       = &info->mti_fid;
1988         struct mdd_object *obj       = md2mdd_obj(pobj);
1989         struct mdd_device *m         = mdo2mdd(pobj);
1990         bool            check_perm = true;
1991         int rc;
1992         ENTRY;
1993
1994         /* EEXIST check */
1995         if (mdd_is_dead_obj(obj))
1996                 RETURN(-ENOENT);
1997
1998         /*
1999          * In some cases this lookup is not needed - we know before if name
2000          * exists or not because MDT performs lookup for it.
2001          * name length check is done in lookup.
2002          */
2003         if (spec->sp_cr_lookup) {
2004                 /*
2005                  * Check if the name already exist, though it will be checked in
2006                  * _index_insert also, for avoiding rolling back if exists
2007                  * _index_insert.
2008                  */
2009                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2010                                   MAY_WRITE | MAY_EXEC);
2011                 if (rc != -ENOENT)
2012                         RETURN(rc ? : -EEXIST);
2013
2014                 /* Permission is already being checked in mdd_lookup */
2015                 check_perm = false;
2016         }
2017
2018         if (S_ISDIR(cattr->la_mode) &&
2019             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2020             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2021                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2022
2023                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2024                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2025                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2026                         rc = -EINVAL;
2027                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2028                                "stripe_offset = %d, stripe_count = %u: "
2029                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2030                                 le32_to_cpu(lum->lum_magic),
2031                                (int)le32_to_cpu(lum->lum_stripe_offset),
2032                                le32_to_cpu(lum->lum_stripe_count), rc);
2033                         return rc;
2034                 }
2035         }
2036
2037         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2038         if (rc != 0)
2039                 RETURN(rc);
2040
2041         /* sgid check */
2042         if (pattr->la_mode & S_ISGID) {
2043                 cattr->la_gid = pattr->la_gid;
2044                 if (S_ISDIR(cattr->la_mode)) {
2045                         cattr->la_mode |= S_ISGID;
2046                         cattr->la_valid |= LA_MODE;
2047                 }
2048         }
2049
2050         /* Inherit project ID from parent directory */
2051         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2052                 cattr->la_projid = pattr->la_projid;
2053                 if (S_ISDIR(cattr->la_mode)) {
2054                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2055                         cattr->la_valid |= LA_FLAGS;
2056                 }
2057                 cattr->la_valid |= LA_PROJID;
2058         }
2059
2060         rc = mdd_name_check(m, lname);
2061         if (rc < 0)
2062                 RETURN(rc);
2063
2064         switch (cattr->la_mode & S_IFMT) {
2065         case S_IFLNK: {
2066                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2067
2068                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2069                         RETURN(-ENAMETOOLONG);
2070                 else
2071                         RETURN(0);
2072         }
2073         case S_IFDIR:
2074         case S_IFREG:
2075         case S_IFCHR:
2076         case S_IFBLK:
2077         case S_IFIFO:
2078         case S_IFSOCK:
2079                 rc = 0;
2080                 break;
2081         default:
2082                 rc = -EINVAL;
2083                 break;
2084         }
2085         RETURN(rc);
2086 }
2087
2088 static int mdd_declare_create_object(const struct lu_env *env,
2089                                      struct mdd_device *mdd,
2090                                      struct mdd_object *p, struct mdd_object *c,
2091                                      struct lu_attr *attr,
2092                                      struct thandle *handle,
2093                                      const struct md_op_spec *spec,
2094                                      struct lu_buf *def_acl_buf,
2095                                      struct lu_buf *acl_buf,
2096                                      struct dt_allocation_hint *hint)
2097 {
2098         const struct lu_buf *buf;
2099         int rc;
2100
2101         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2102                                                 hint);
2103         if (rc)
2104                 GOTO(out, rc);
2105
2106 #ifdef CONFIG_FS_POSIX_ACL
2107         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2108                 /* if dir, then can inherit default ACl */
2109                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2110                                            XATTR_NAME_ACL_DEFAULT,
2111                                            0, handle);
2112                 if (rc)
2113                         GOTO(out, rc);
2114         }
2115
2116         if (acl_buf->lb_len > 0) {
2117                 rc = mdo_declare_attr_set(env, c, attr, handle);
2118                 if (rc)
2119                         GOTO(out, rc);
2120
2121                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2122                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2123                 if (rc)
2124                         GOTO(out, rc);
2125         }
2126 #endif
2127         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2128         if (rc)
2129                 GOTO(out, rc);
2130
2131         /* replay case, create LOV EA from client data */
2132         if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
2133             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2134                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2135                                         spec->u.sp_ea.eadatalen);
2136                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2137                                            handle);
2138                 if (rc)
2139                         GOTO(out, rc);
2140         }
2141
2142         if (S_ISLNK(attr->la_mode)) {
2143                 const char *target_name = spec->u.sp_symname;
2144                 int sym_len = strlen(target_name);
2145                 const struct lu_buf *buf;
2146
2147                 buf = mdd_buf_get_const(env, target_name, sym_len);
2148                 rc = dt_declare_record_write(env, mdd_object_child(c),
2149                                              buf, 0, handle);
2150                 if (rc)
2151                         GOTO(out, rc);
2152         }
2153
2154         if (spec->sp_cr_file_secctx_name != NULL) {
2155                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2156                                         spec->sp_cr_file_secctx_size);
2157                 rc = mdo_declare_xattr_set(env, c, buf,
2158                                            spec->sp_cr_file_secctx_name, 0,
2159                                            handle);
2160                 if (rc < 0)
2161                         GOTO(out, rc);
2162         }
2163 out:
2164         return rc;
2165 }
2166
2167 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2168                               struct mdd_object *p, struct mdd_object *c,
2169                               const struct lu_name *name,
2170                               struct lu_attr *attr,
2171                               struct thandle *handle,
2172                               const struct md_op_spec *spec,
2173                               struct linkea_data *ldata,
2174                               struct lu_buf *def_acl_buf,
2175                               struct lu_buf *acl_buf,
2176                               struct dt_allocation_hint *hint)
2177 {
2178         int rc;
2179
2180         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2181                                        def_acl_buf, acl_buf, hint);
2182         if (rc)
2183                 GOTO(out, rc);
2184
2185         if (S_ISDIR(attr->la_mode)) {
2186                 rc = mdo_declare_ref_add(env, p, handle);
2187                 if (rc)
2188                         GOTO(out, rc);
2189         }
2190
2191         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2192                 rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
2193                 if (rc)
2194                         GOTO(out, rc);
2195         } else {
2196                 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
2197                 enum changelog_rec_type type;
2198
2199                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2200                                               name->ln_name, handle);
2201                 if (rc != 0)
2202                         return rc;
2203
2204                 rc = mdd_declare_links_add(env, c, handle, ldata);
2205                 if (rc)
2206                         return rc;
2207
2208                 *la = *attr;
2209                 la->la_valid = LA_CTIME | LA_MTIME;
2210                 rc = mdo_declare_attr_set(env, p, la, handle);
2211                 if (rc)
2212                         return rc;
2213
2214                 type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
2215                        S_ISREG(attr->la_mode) ? CL_CREATE :
2216                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
2217
2218                 rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
2219                                                  handle);
2220                 if (rc)
2221                         return rc;
2222         }
2223 out:
2224         return rc;
2225 }
2226
2227 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2228                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2229                         struct lu_buf *acl_buf)
2230 {
2231         int     rc;
2232         ENTRY;
2233
2234         if (S_ISLNK(la->la_mode)) {
2235                 acl_buf->lb_len = 0;
2236                 def_acl_buf->lb_len = 0;
2237                 RETURN(0);
2238         }
2239
2240         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2241         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2242                            XATTR_NAME_ACL_DEFAULT);
2243         mdd_read_unlock(env, pobj);
2244         if (rc > 0) {
2245                 /* If there are default ACL, fix mode/ACL by default ACL */
2246                 def_acl_buf->lb_len = rc;
2247                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2248                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2249                 acl_buf->lb_len = rc;
2250                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2251                 if (rc < 0)
2252                         RETURN(rc);
2253         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2254                 /* If there are no default ACL, fix mode by mask */
2255                 struct lu_ucred *uc = lu_ucred(env);
2256
2257                 /* The create triggered by MDT internal events, such as
2258                  * LFSCK reset, will not contain valid "uc". */
2259                 if (unlikely(uc != NULL))
2260                         la->la_mode &= ~uc->uc_umask;
2261                 rc = 0;
2262                 acl_buf->lb_len = 0;
2263                 def_acl_buf->lb_len = 0;
2264         }
2265
2266         RETURN(rc);
2267 }
2268
2269 /**
2270  * Create a metadata object and initialize it, set acl, xattr.
2271  **/
2272 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2273                              struct mdd_object *son, struct lu_attr *attr,
2274                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2275                              struct lu_buf *def_acl_buf,
2276                              struct dt_allocation_hint *hint,
2277                              struct thandle *handle)
2278 {
2279         const struct lu_buf *buf;
2280         int rc;
2281
2282         mdd_write_lock(env, son, MOR_TGT_CHILD);
2283         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2284                                         hint);
2285         if (rc)
2286                 GOTO(unlock, rc);
2287
2288         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2289          * created in declare phase, they also needs to be added to master
2290          * object as sub-directory entry. So it has to initialize the master
2291          * object, then set dir striped EA.(in mdo_xattr_set) */
2292         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2293                                    spec);
2294         if (rc != 0)
2295                 GOTO(err_destroy, rc);
2296
2297         /*
2298          * in case of replay we just set LOVEA provided by the client
2299          * XXX: I think it would be interesting to try "old" way where
2300          *      MDT calls this xattr_set(LOV) in a different transaction.
2301          *      probably this way we code can be made better.
2302          */
2303
2304         /* During creation, there are only a few cases we need do xattr_set to
2305          * create stripes.
2306          * 1. regular file: see comments above.
2307          * 2. dir: inherit default striping or pool settings from parent.
2308          * 3. create striped directory with provided stripeEA.
2309          * 4. create striped directory because inherit default layout from the
2310          * parent.
2311          */
2312         if (spec->no_create ||
2313             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2314             S_ISDIR(attr->la_mode)) {
2315                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2316                                         spec->u.sp_ea.eadatalen);
2317                 rc = mdo_xattr_set(env, son, buf,
2318                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2319                                                             XATTR_NAME_LOV, 0,
2320                                    handle);
2321                 if (rc != 0)
2322                         GOTO(err_destroy, rc);
2323         }
2324
2325 #ifdef CONFIG_FS_POSIX_ACL
2326         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2327             S_ISDIR(attr->la_mode)) {
2328                 /* set default acl */
2329                 rc = mdo_xattr_set(env, son, def_acl_buf,
2330                                    XATTR_NAME_ACL_DEFAULT, 0,
2331                                    handle);
2332                 if (rc)
2333                         GOTO(err_destroy, rc);
2334         }
2335         /* set its own acl */
2336         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2337                 rc = mdo_xattr_set(env, son, acl_buf,
2338                                    XATTR_NAME_ACL_ACCESS,
2339                                    0, handle);
2340                 if (rc)
2341                         GOTO(err_destroy, rc);
2342         }
2343 #endif
2344
2345         if (S_ISLNK(attr->la_mode)) {
2346                 struct lu_ucred  *uc = lu_ucred_assert(env);
2347                 struct dt_object *dt = mdd_object_child(son);
2348                 const char *target_name = spec->u.sp_symname;
2349                 int sym_len = strlen(target_name);
2350                 loff_t pos = 0;
2351
2352                 buf = mdd_buf_get_const(env, target_name, sym_len);
2353                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2354                                                 uc->uc_cap &
2355                                                 CFS_CAP_SYS_RESOURCE_MASK);
2356
2357                 if (rc == sym_len)
2358                         rc = 0;
2359                 else
2360                         GOTO(err_initlized, rc = -EFAULT);
2361         }
2362
2363         if (spec->sp_cr_file_secctx_name != NULL) {
2364                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2365                                         spec->sp_cr_file_secctx_size);
2366                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2367                                    0, handle);
2368                 if (rc < 0)
2369                         GOTO(err_initlized, rc);
2370         }
2371
2372 err_initlized:
2373         if (unlikely(rc != 0)) {
2374                 int rc2;
2375                 if (S_ISDIR(attr->la_mode)) {
2376                         /* Drop the reference, no need to delete "."/"..",
2377                          * because the object to be destroied directly. */
2378                         rc2 = mdo_ref_del(env, son, handle);
2379                         if (rc2 != 0)
2380                                 GOTO(unlock, rc);
2381                 }
2382                 rc2 = mdo_ref_del(env, son, handle);
2383                 if (rc2 != 0)
2384                         GOTO(unlock, rc);
2385 err_destroy:
2386                 mdo_destroy(env, son, handle);
2387         }
2388 unlock:
2389         mdd_write_unlock(env, son);
2390         RETURN(rc);
2391 }
2392
2393 static int mdd_index_delete(const struct lu_env *env,
2394                             struct mdd_object *mdd_pobj,
2395                             struct lu_attr *cattr,
2396                             const struct lu_name *lname)
2397 {
2398         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2399         struct thandle *handle;
2400         int rc;
2401         ENTRY;
2402
2403         handle = mdd_trans_create(env, mdd);
2404         if (IS_ERR(handle))
2405                 RETURN(PTR_ERR(handle));
2406
2407         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2408                                       handle);
2409         if (rc != 0)
2410                 GOTO(stop, rc);
2411
2412         if (S_ISDIR(cattr->la_mode)) {
2413                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2414                 if (rc != 0)
2415                         GOTO(stop, rc);
2416         }
2417
2418         /* Since this will only be used in the error handler path,
2419          * Let's set the thandle to be local and not mess the transno */
2420         handle->th_local = 1;
2421         rc = mdd_trans_start(env, mdd, handle);
2422         if (rc)
2423                 GOTO(stop, rc);
2424
2425         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2426                                 S_ISDIR(cattr->la_mode), handle);
2427         if (rc)
2428                 GOTO(stop, rc);
2429 stop:
2430         rc = mdd_trans_stop(env, mdd, rc, handle);
2431
2432         RETURN(rc);
2433 }
2434
2435 /**
2436  * Create object and insert it into namespace.
2437  *
2438  * Two operations have to be performed:
2439  *
2440  *  - an allocation of a new object (->do_create()), and
2441  *  - an insertion into a parent index (->dio_insert()).
2442  *
2443  * Due to locking, operation order is not important, when both are
2444  * successful, *but* error handling cases are quite different:
2445  *
2446  *  - if insertion is done first, and following object creation fails,
2447  *  insertion has to be rolled back, but this operation might fail
2448  *  also leaving us with dangling index entry.
2449  *
2450  *  - if creation is done first, is has to be undone if insertion fails,
2451  *  leaving us with leaked space, which is not good but not fatal.
2452  *
2453  * It seems that creation-first is simplest solution, but it is sub-optimal
2454  * in the frequent
2455  *
2456  * $ mkdir foo
2457  * $ mkdir foo
2458  *
2459  * case, because second mkdir is bound to create object, only to
2460  * destroy it immediately.
2461  *
2462  * To avoid this follow local file systems that do double lookup:
2463  *
2464  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2465  * 1. create            (mdd_create_object_internal())
2466  * 2. insert            (__mdd_index_insert(), lookup again)
2467  *
2468  * \param[in] pobj      parent object
2469  * \param[in] lname     name of child being created
2470  * \param[in,out] child child object being created
2471  * \param[in] spec      additional create parameters
2472  * \param[in] ma        attributes for new child object
2473  *
2474  * \retval              0 on success
2475  * \retval              negative errno on failure
2476  */
2477 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2478                       const struct lu_name *lname, struct md_object *child,
2479                       struct md_op_spec *spec, struct md_attr *ma)
2480 {
2481         struct mdd_thread_info  *info = mdd_env_info(env);
2482         struct lu_attr          *la = &info->mti_la_for_fix;
2483         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2484         struct mdd_object       *son = md2mdd_obj(child);
2485         struct mdd_device       *mdd = mdo2mdd(pobj);
2486         struct lu_attr          *attr = &ma->ma_attr;
2487         struct thandle          *handle;
2488         struct lu_attr          *pattr = &info->mti_pattr;
2489         struct lu_buf           acl_buf;
2490         struct lu_buf           def_acl_buf;
2491         struct linkea_data      *ldata = &info->mti_link_data;
2492         const char              *name = lname->ln_name;
2493         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2494         int                      rc;
2495         int                      rc2;
2496         ENTRY;
2497
2498         rc = mdd_la_get(env, mdd_pobj, pattr);
2499         if (rc != 0)
2500                 RETURN(rc);
2501
2502         /* Sanity checks before big job. */
2503         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2504         if (rc)
2505                 RETURN(rc);
2506
2507         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2508                 GOTO(out_free, rc = -EINPROGRESS);
2509
2510         handle = mdd_trans_create(env, mdd);
2511         if (IS_ERR(handle))
2512                 GOTO(out_free, rc = PTR_ERR(handle));
2513
2514         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2515                                mdd->mdd_dt_conf.ddp_max_ea_size);
2516         acl_buf = info->mti_xattr_buf;
2517         def_acl_buf.lb_buf = info->mti_key;
2518         def_acl_buf.lb_len = sizeof(info->mti_key);
2519         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2520         if (rc < 0)
2521                 GOTO(out_stop, rc);
2522
2523         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2524
2525         memset(ldata, 0, sizeof(*ldata));
2526         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2527                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2528
2529                 tfid.f_oid--;
2530                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2531                                         &tfid, lname, 1, 0, ldata);
2532         } else {
2533                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2534                                         mdd_object_fid(mdd_pobj),
2535                                         lname, 1, 0, ldata);
2536         }
2537
2538         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2539                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2540                                 hint);
2541         if (rc)
2542                 GOTO(out_stop, rc);
2543
2544         rc = mdd_trans_start(env, mdd, handle);
2545         if (rc)
2546                 GOTO(out_stop, rc);
2547
2548         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2549                                &def_acl_buf, hint, handle);
2550         if (rc != 0)
2551                 GOTO(out_stop, rc);
2552
2553         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2554                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2555                 son->mod_flags |= VOLATILE_OBJ;
2556                 rc = mdd_orphan_insert(env, son, handle);
2557                 GOTO(out_volatile, rc);
2558         } else {
2559                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2560                                       attr->la_mode, name, handle);
2561                 if (rc != 0)
2562                         GOTO(err_created, rc);
2563
2564                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2565                               ldata, 1);
2566
2567                 /* update parent directory mtime/ctime */
2568                 *la = *attr;
2569                 la->la_valid = LA_CTIME | LA_MTIME;
2570                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2571                 if (rc)
2572                         GOTO(err_insert, rc);
2573         }
2574
2575         EXIT;
2576 err_insert:
2577         if (rc != 0) {
2578                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2579                         rc2 = mdd_orphan_delete(env, son, handle);
2580                 else
2581                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2582                                                  S_ISDIR(attr->la_mode),
2583                                                  handle);
2584                 if (rc2 != 0)
2585                         goto out_stop;
2586
2587 err_created:
2588                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2589                 if (S_ISDIR(attr->la_mode)) {
2590                         /* Drop the reference, no need to delete "."/"..",
2591                          * because the object is to be destroyed directly. */
2592                         rc2 = mdo_ref_del(env, son, handle);
2593                         if (rc2 != 0) {
2594                                 mdd_write_unlock(env, son);
2595                                 goto out_stop;
2596                         }
2597                 }
2598 out_volatile:
2599                 /* For volatile files drop one link immediately, since there is
2600                  * no filename in the namespace, and save any error returned. */
2601                 rc2 = mdo_ref_del(env, son, handle);
2602                 if (rc2 != 0) {
2603                         mdd_write_unlock(env, son);
2604                         if (unlikely(rc == 0))
2605                                 rc = rc2;
2606                         goto out_stop;
2607                 }
2608
2609                 /* Don't destroy the volatile object on success */
2610                 if (likely(rc != 0))
2611                         mdo_destroy(env, son, handle);
2612                 mdd_write_unlock(env, son);
2613         }
2614
2615         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2616             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2617                 rc = mdd_changelog_ns_store(env, mdd,
2618                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2619                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2620                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2621                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2622                                 NULL, handle);
2623 out_stop:
2624         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2625         if (rc == 0) {
2626                 /* If creation fails, it is most likely due to the remote update
2627                  * failure, because local transaction will mostly succeed at
2628                  * this stage. There is no easy way to rollback all of previous
2629                  * updates, so let's remove the object from namespace, and
2630                  * LFSCK should handle the orphan object. */
2631                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2632                         mdd_index_delete(env, mdd_pobj, attr, lname);
2633                 rc = rc2;
2634         }
2635 out_free:
2636         if (is_vmalloc_addr(ldata->ld_buf))
2637                 /* if we vmalloced a large buffer drop it */
2638                 lu_buf_free(ldata->ld_buf);
2639
2640         /* The child object shouldn't be cached anymore */
2641         if (rc)
2642                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2643                         &child->mo_lu.lo_header->loh_flags);
2644         return rc;
2645 }
2646
2647 /*
2648  * Get locks on parents in proper order
2649  * RETURN: < 0 - error, rename_order if successful
2650  */
2651 enum rename_order {
2652         MDD_RN_SAME,
2653         MDD_RN_SRCTGT,
2654         MDD_RN_TGTSRC
2655 };
2656
2657 static int mdd_rename_order(const struct lu_env *env,
2658                             struct mdd_device *mdd,
2659                             struct mdd_object *src_pobj,
2660                             const struct lu_attr *pattr,
2661                             struct mdd_object *tgt_pobj)
2662 {
2663         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2664         int rc;
2665         ENTRY;
2666
2667         if (src_pobj == tgt_pobj)
2668                 RETURN(MDD_RN_SAME);
2669
2670         /* compared the parent child relationship of src_p&tgt_p */
2671         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2672                 rc = MDD_RN_SRCTGT;
2673         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2674                 rc = MDD_RN_TGTSRC;
2675         } else {
2676                 rc = mdd_is_parent(env, mdd, src_pobj, pattr,
2677                                    mdo2fid(tgt_pobj));
2678                 if (rc == -EREMOTE)
2679                         rc = 0;
2680
2681                 if (rc == 1)
2682                         rc = MDD_RN_TGTSRC;
2683                 else if (rc == 0)
2684                         rc = MDD_RN_SRCTGT;
2685         }
2686
2687         RETURN(rc);
2688 }
2689
2690 /* has not mdd_write{read}_lock on any obj yet. */
2691 static int mdd_rename_sanity_check(const struct lu_env *env,
2692                                    struct mdd_object *src_pobj,
2693                                    const struct lu_attr *pattr,
2694                                    struct mdd_object *tgt_pobj,
2695                                    const struct lu_attr *tpattr,
2696                                    struct mdd_object *sobj,
2697                                    const struct lu_attr *cattr,
2698                                    struct mdd_object *tobj,
2699                                    const struct lu_attr *tattr)
2700 {
2701         int rc = 0;
2702         ENTRY;
2703
2704         /* XXX: when get here, sobj must NOT be NULL,
2705          * the other case has been processed in cld_rename
2706          * before mdd_rename and enable MDS_PERM_BYPASS. */
2707         LASSERT(sobj);
2708
2709         /*
2710          * If we are using project inheritance, we only allow renames
2711          * into our tree when the project IDs are the same; otherwise
2712          * tree quota mechanism would be circumvented.
2713          */
2714         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2715             tpattr->la_projid != cattr->la_projid) ||
2716             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2717             (pattr->la_projid != tpattr->la_projid)))
2718                 RETURN(-EXDEV);
2719
2720         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2721         if (rc)
2722                 RETURN(rc);
2723
2724         /* XXX: when get here, "tobj == NULL" means tobj must
2725          * NOT exist (neither on remote MDS, such case has been
2726          * processed in cld_rename before mdd_rename and enable
2727          * MDS_PERM_BYPASS).
2728          * So check may_create, but not check may_unlink. */
2729         if (tobj == NULL)
2730                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2731                                     (src_pobj != tgt_pobj));
2732         else
2733                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2734                                     (src_pobj != tgt_pobj), 1);
2735
2736         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2737                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2738
2739         RETURN(rc);
2740 }
2741
2742 static int mdd_declare_rename(const struct lu_env *env,
2743                               struct mdd_device *mdd,
2744                               struct mdd_object *mdd_spobj,
2745                               struct mdd_object *mdd_tpobj,
2746                               struct mdd_object *mdd_sobj,
2747                               struct mdd_object *mdd_tobj,
2748                               const struct lu_name *sname,
2749                               const struct lu_name *tname,
2750                               struct md_attr *ma,
2751                               struct linkea_data *ldata,
2752                               struct thandle *handle)
2753 {
2754         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2755         int rc;
2756
2757         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2758         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2759
2760         LASSERT(mdd_spobj);
2761         LASSERT(mdd_tpobj);
2762         LASSERT(mdd_sobj);
2763
2764         /* name from source dir */
2765         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2766         if (rc)
2767                 return rc;
2768
2769         /* .. from source child */
2770         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2771                 /* source child can be directory,
2772                  * counted by source dir's nlink */
2773                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2774                 if (rc)
2775                         return rc;
2776                 if (mdd_spobj != mdd_tpobj) {
2777                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2778                                                       handle);
2779                         if (rc != 0)
2780                                 return rc;
2781
2782                         rc = mdo_declare_index_insert(env, mdd_sobj,
2783                                                       mdo2fid(mdd_tpobj),
2784                                                       S_IFDIR, dotdot, handle);
2785                         if (rc != 0)
2786                                 return rc;
2787                 }
2788
2789                 /* new target child can be directory,
2790                  * counted by target dir's nlink */
2791                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2792                 if (rc != 0)
2793                         return rc;
2794         }
2795
2796         la->la_valid = LA_CTIME | LA_MTIME;
2797         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2798         if (rc != 0)
2799                 return rc;
2800
2801         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2802         if (rc != 0)
2803                 return rc;
2804
2805         la->la_valid = LA_CTIME;
2806         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2807         if (rc)
2808                 return rc;
2809
2810         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2811         if (rc)
2812                 return rc;
2813
2814         /* new name */
2815         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2816                                       mdd_object_type(mdd_sobj),
2817                                       tname->ln_name, handle);
2818         if (rc != 0)
2819                 return rc;
2820
2821         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2822                 /* delete target child in target parent directory */
2823                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2824                                               handle);
2825                 if (rc)
2826                         return rc;
2827
2828                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2829                 if (rc)
2830                         return rc;
2831
2832                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2833                         /* target child can be directory,
2834                          * delete "." reference in target child directory */
2835                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2836                         if (rc)
2837                                 return rc;
2838
2839                         /* delete ".." reference in target parent directory */
2840                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2841                         if (rc)
2842                                 return rc;
2843                 }
2844
2845                 la->la_valid = LA_CTIME;
2846                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2847                 if (rc)
2848                         return rc;
2849
2850                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2851                 if (rc)
2852                         return rc;
2853         }
2854
2855         rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
2856                                          handle);
2857         if (rc)
2858                 return rc;
2859
2860         return rc;
2861 }
2862
2863 /* src object can be remote that is why we use only fid and type of object */
2864 static int mdd_rename(const struct lu_env *env,
2865                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2866                       const struct lu_fid *lf, const struct lu_name *lsname,
2867                       struct md_object *tobj, const struct lu_name *ltname,
2868                       struct md_attr *ma)
2869 {
2870         const char *sname = lsname->ln_name;
2871         const char *tname = ltname->ln_name;
2872         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2873         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2874         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2875         struct mdd_device *mdd = mdo2mdd(src_pobj);
2876         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2877         struct mdd_object *mdd_tobj = NULL;
2878         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2879         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2880         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2881         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2882         struct thandle *handle;
2883         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2884         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2885         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2886         bool is_dir;
2887         bool tobj_ref = 0;
2888         bool tobj_locked = 0;
2889         unsigned cl_flags = 0;
2890         int rc, rc2;
2891         ENTRY;
2892
2893         /* let unlink to complete and commit */
2894         CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
2895
2896         if (tobj)
2897                 mdd_tobj = md2mdd_obj(tobj);
2898
2899         mdd_sobj = mdd_object_find(env, mdd, lf);
2900         if (IS_ERR(mdd_sobj))
2901                 RETURN(PTR_ERR(mdd_sobj));
2902
2903         rc = mdd_la_get(env, mdd_sobj, cattr);
2904         if (rc)
2905                 GOTO(out_pending, rc);
2906
2907         rc = mdd_la_get(env, mdd_spobj, pattr);
2908         if (rc)
2909                 GOTO(out_pending, rc);
2910
2911         if (mdd_tobj) {
2912                 rc = mdd_la_get(env, mdd_tobj, tattr);
2913                 if (rc)
2914                         GOTO(out_pending, rc);
2915                 /* search for an existing archive.
2916                  * we should check ahead as the object
2917                  * can be destroyed in this transaction */
2918                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2919                         cl_flags |= CLF_RENAME_LAST_EXISTS;
2920         }
2921
2922         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2923         if (rc)
2924                 GOTO(out_pending, rc);
2925
2926         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2927                                      mdd_sobj, cattr, mdd_tobj, tattr);
2928         if (rc)
2929                 GOTO(out_pending, rc);
2930
2931         rc = mdd_name_check(mdd, ltname);
2932         if (rc < 0)
2933                 GOTO(out_pending, rc);
2934
2935         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2936         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2937         if (rc < 0)
2938                 GOTO(out_pending, rc);
2939
2940         handle = mdd_trans_create(env, mdd);
2941         if (IS_ERR(handle))
2942                 GOTO(out_pending, rc = PTR_ERR(handle));
2943
2944         memset(ldata, 0, sizeof(*ldata));
2945         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
2946                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
2947                                 1, 0, ldata);
2948         if (rc)
2949                 GOTO(stop, rc);
2950
2951         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2952                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2953         if (rc)
2954                 GOTO(stop, rc);
2955
2956         rc = mdd_trans_start(env, mdd, handle);
2957         if (rc)
2958                 GOTO(stop, rc);
2959
2960         is_dir = S_ISDIR(cattr->la_mode);
2961
2962         /* Remove source name from source directory */
2963         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2964         if (rc != 0)
2965                 GOTO(stop, rc);
2966
2967         /* "mv dir1 dir2" needs "dir1/.." link update */
2968         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2969                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2970                 if (rc != 0)
2971                         GOTO(fixup_spobj2, rc);
2972
2973                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2974                                              dotdot, handle);
2975                 if (rc != 0)
2976                         GOTO(fixup_spobj, rc);
2977         }
2978
2979         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2980                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2981                 if (rc != 0)
2982                         /* tname might been renamed to something else */
2983                         GOTO(fixup_spobj, rc);
2984         }
2985
2986         /* Insert new fid with target name into target dir */
2987         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2988                                 tname, handle);
2989         if (rc != 0)
2990                 GOTO(fixup_tpobj, rc);
2991
2992         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2993         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2994
2995         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2996         la->la_valid = LA_CTIME;
2997         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2998         if (rc)
2999                 GOTO(fixup_tpobj, rc);
3000
3001         /* Update the linkEA for the source object */
3002         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3003         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3004                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3005                               0, 0);
3006         if (rc == -ENOENT)
3007                 /* Old files might not have EA entry */
3008                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3009                               lsname, handle, NULL, 0);
3010         mdd_write_unlock(env, mdd_sobj);
3011         /* We don't fail the transaction if the link ea can't be
3012            updated -- fid2path will use alternate lookup method. */
3013         rc = 0;
3014
3015         /* Remove old target object
3016          * For tobj is remote case cmm layer has processed
3017          * and set tobj to NULL then. So when tobj is NOT NULL,
3018          * it must be local one.
3019          */
3020         if (tobj && mdd_object_exists(mdd_tobj)) {
3021                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3022                 tobj_locked = 1;
3023                 if (mdd_is_dead_obj(mdd_tobj)) {
3024                         /* shld not be dead, something is wrong */
3025                         CERROR("tobj is dead, something is wrong\n");
3026                         rc = -EINVAL;
3027                         goto cleanup;
3028                 }
3029                 mdo_ref_del(env, mdd_tobj, handle);
3030
3031                 /* Remove dot reference. */
3032                 if (S_ISDIR(tattr->la_mode))
3033                         mdo_ref_del(env, mdd_tobj, handle);
3034                 tobj_ref = 1;
3035
3036                 /* fetch updated nlink */
3037                 rc = mdd_la_get(env, mdd_tobj, tattr);
3038                 if (rc != 0) {
3039                         CERROR("%s: Failed to get nlink for tobj "
3040                                 DFID": rc = %d\n",
3041                                 mdd2obd_dev(mdd)->obd_name,
3042                                 PFID(tpobj_fid), rc);
3043                         GOTO(fixup_tpobj, rc);
3044                 }
3045
3046                 la->la_valid = LA_CTIME;
3047                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3048                 if (rc != 0) {
3049                         CERROR("%s: Failed to set ctime for tobj "
3050                                 DFID": rc = %d\n",
3051                                 mdd2obd_dev(mdd)->obd_name,
3052                                 PFID(tpobj_fid), rc);
3053                         GOTO(fixup_tpobj, rc);
3054                 }
3055
3056                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3057                 ma->ma_attr = *tattr;
3058                 ma->ma_valid |= MA_INODE;
3059                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3060                                        handle);
3061                 if (rc != 0) {
3062                         CERROR("%s: Failed to unlink tobj "
3063                                 DFID": rc = %d\n",
3064                                 mdd2obd_dev(mdd)->obd_name,
3065                                 PFID(tpobj_fid), rc);
3066                         GOTO(fixup_tpobj, rc);
3067                 }
3068
3069                 /* fetch updated nlink */
3070                 rc = mdd_la_get(env, mdd_tobj, tattr);
3071                 if (rc == -ENOENT) {
3072                         /* the object got removed, let's
3073                          * return the latest known attributes */
3074                         tattr->la_nlink = 0;
3075                         rc = 0;
3076                 } else if (rc != 0) {
3077                         CERROR("%s: Failed to get nlink for tobj "
3078                                 DFID": rc = %d\n",
3079                                 mdd2obd_dev(mdd)->obd_name,
3080                                 PFID(tpobj_fid), rc);
3081                         GOTO(fixup_tpobj, rc);
3082                 }
3083                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3084                 ma->ma_attr = *tattr;
3085                 ma->ma_valid |= MA_INODE;
3086
3087                 if (tattr->la_nlink == 0)
3088                         cl_flags |= CLF_RENAME_LAST;
3089                 else
3090                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3091         }
3092
3093         la->la_valid = LA_CTIME | LA_MTIME;
3094         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3095         if (rc)
3096                 GOTO(fixup_tpobj, rc);
3097
3098         if (mdd_spobj != mdd_tpobj) {
3099                 la->la_valid = LA_CTIME | LA_MTIME;
3100                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3101                 if (rc != 0)
3102                         GOTO(fixup_tpobj, rc);
3103         }
3104
3105         EXIT;
3106
3107 fixup_tpobj:
3108         if (rc) {
3109                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3110                 if (rc2)
3111                         CWARN("tp obj fix error %d\n",rc2);
3112
3113                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3114                     !mdd_is_dead_obj(mdd_tobj)) {
3115                         if (tobj_ref) {
3116                                 mdo_ref_add(env, mdd_tobj, handle);
3117                                 if (is_dir)
3118                                         mdo_ref_add(env, mdd_tobj, handle);
3119                         }
3120
3121                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3122                                                   mdo2fid(mdd_tobj),
3123                                                   mdd_object_type(mdd_tobj),
3124                                                   tname, handle);
3125                         if (rc2 != 0)
3126                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3127                 }
3128         }
3129
3130 fixup_spobj:
3131         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3132                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3133                 if (rc2)
3134                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3135                                mdd2obd_dev(mdd)->obd_name, rc2);
3136
3137
3138                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3139                                               dotdot, handle);
3140                 if (rc2 != 0)
3141                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3142                               mdd2obd_dev(mdd)->obd_name, rc2);
3143         }
3144
3145 fixup_spobj2:
3146         if (rc != 0) {
3147                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3148                                          mdd_object_type(mdd_sobj), sname,
3149                                          handle);
3150                 if (rc2 != 0)
3151                         CWARN("sp obj fix error: rc = %d\n", rc2);
3152         }
3153
3154 cleanup:
3155         if (tobj_locked)
3156                 mdd_write_unlock(env, mdd_tobj);
3157
3158         if (rc == 0)
3159                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3160                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3161                                             ltname, lsname, handle);
3162
3163 stop:
3164         rc = mdd_trans_stop(env, mdd, rc, handle);
3165
3166 out_pending:
3167         mdd_object_put(env, mdd_sobj);
3168         return rc;
3169 }
3170
3171 /**
3172  * During migration once the parent FID has been changed,
3173  * we need update the parent FID in linkea.
3174  **/
3175 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3176                                             struct mdd_object *parent,
3177                                             struct mdd_object *newparent,
3178                                             struct mdd_object *child,
3179                                             const char *name, int namelen,
3180                                             struct thandle *handle,
3181                                             bool declare)
3182 {
3183         struct mdd_thread_info  *info = mdd_env_info(env);
3184         struct linkea_data      ldata = { NULL };
3185         struct lu_buf           *buf = &info->mti_link_buf;
3186         int                     count;
3187         int                     rc = 0;
3188
3189         ENTRY;
3190
3191         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3192         if (buf->lb_buf == NULL)
3193                 RETURN(-ENOMEM);
3194
3195         ldata.ld_buf = buf;
3196         rc = mdd_links_read(env, child, &ldata);
3197         if (rc != 0) {
3198                 if (rc == -ENOENT || rc == -ENODATA)
3199                         rc = 0;
3200                 RETURN(rc);
3201         }
3202
3203         LASSERT(ldata.ld_leh != NULL);
3204         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3205         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3206                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3207                 struct lu_name lname;
3208                 struct lu_fid  fid;
3209
3210                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3211                                     &lname, &fid);
3212
3213                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3214                     !lu_fid_eq(&fid, mdd_object_fid(parent))) {
3215                         ldata.ld_lee = (struct link_ea_entry *)
3216                                        ((char *)ldata.ld_lee +
3217                                         ldata.ld_reclen);
3218                         continue;
3219                 }
3220
3221                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3222                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3223                        lname.ln_namelen, lname.ln_name,
3224                        PFID(mdd_object_fid(newparent)));
3225                 /* update to the new parent fid */
3226                 linkea_entry_pack(ldata.ld_lee, &lname,
3227                                   mdd_object_fid(newparent));
3228                 if (declare)
3229                         rc = mdd_declare_links_add(env, child, handle, &ldata);
3230                 else
3231                         rc = mdd_links_write(env, child, &ldata, handle);
3232                 break;
3233         }
3234         RETURN(rc);
3235 }
3236
3237 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3238                                            struct mdd_object *parent,
3239                                            struct mdd_object *newparent,
3240                                            struct mdd_object *child,
3241                                            const char *name, int namelen,
3242                                            struct thandle *handle)
3243 {
3244         return mdd_linkea_update_child_internal(env, parent, newparent,
3245                                                 child, name,
3246                                                 namelen, handle, true);
3247 }
3248
3249 static int mdd_linkea_update_child(const struct lu_env *env,
3250                                    struct mdd_object *parent,
3251                                    struct mdd_object *newparent,
3252                                    struct mdd_object *child,
3253                                    const char *name, int namelen,
3254                                    struct thandle *handle)
3255 {
3256         return mdd_linkea_update_child_internal(env, parent, newparent,
3257                                                 child, name,
3258                                                 namelen, handle, false);
3259 }
3260
3261 static int mdd_update_linkea_internal(const struct lu_env *env,
3262                                       struct mdd_object *mdd_pobj,
3263                                       struct mdd_object *mdd_sobj,
3264                                       struct mdd_object *mdd_tobj,
3265                                       const struct lu_name *child_name,
3266                                       struct linkea_data *ldata,
3267                                       struct thandle *handle,
3268                                       int declare)
3269 {
3270         struct mdd_thread_info  *info = mdd_env_info(env);
3271         int                     count;
3272         int                     rc = 0;
3273         ENTRY;
3274
3275         LASSERT(ldata->ld_buf != NULL);
3276         LASSERT(ldata->ld_leh != NULL);
3277
3278         /* If it is mulitple links file, we need update the name entry for
3279          * all parent */
3280         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3281         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3282                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3283                 struct mdd_object       *pobj;
3284                 struct lu_name          lname;
3285                 struct lu_fid           fid;
3286
3287                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3288                                     &lname, &fid);
3289                 pobj = mdd_object_find(env, mdd, &fid);
3290                 if (IS_ERR(pobj)) {
3291                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3292                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3293                               PTR_ERR(pobj));
3294                         continue;
3295                 }
3296
3297                 if (!mdd_object_exists(pobj)) {
3298                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3299                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3300                         goto next_put;
3301                 }
3302
3303                 if (pobj == mdd_pobj &&
3304                     lname.ln_namelen == child_name->ln_namelen &&
3305                     strncmp(lname.ln_name, child_name->ln_name,
3306                             lname.ln_namelen) == 0) {
3307                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3308                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3309                               PFID(&fid));
3310                         goto next_put;
3311                 }
3312
3313                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3314                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3315                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3316
3317                 if (declare) {
3318                         /* Remove source name from source directory */
3319                         /* Insert new fid with target name into target dir */
3320                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3321                                                       handle);
3322                         if (rc != 0)
3323                                 GOTO(next_put, rc);
3324
3325                         rc = mdo_declare_index_insert(env, pobj,
3326                                         mdd_object_fid(mdd_tobj),
3327                                         mdd_object_type(mdd_tobj),
3328                                         lname.ln_name, handle);
3329                         if (rc != 0)
3330                                 GOTO(next_put, rc);
3331
3332                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3333                         if (rc)
3334                                 GOTO(next_put, rc);
3335
3336                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3337                         if (rc)
3338                                 GOTO(next_put, rc);
3339                 } else {
3340                         char *tmp_name = info->mti_key;
3341
3342                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3343                                 /* lnamelen is too big(> NAME_MAX + 16),
3344                                  * something wrong about this linkea, let's
3345                                  * skip it */
3346                                 CWARN("%s: the name %.*s is too long under "
3347                                       DFID"\n", mdd2obd_dev(mdd)->obd_name,
3348                                       lname.ln_namelen, lname.ln_name,
3349                                       PFID(&fid));
3350                                 goto next_put;
3351                         }
3352
3353                         /* Note: lname might be without \0 at the end, see
3354                          * linkea_entry_unpack(), let's add extra \0 by
3355                          * snprintf */
3356                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3357                                  lname.ln_namelen, lname.ln_name);
3358                         lname.ln_name = tmp_name;
3359
3360                         /* Let's check if this linkEA still valid, before
3361                          * it might be packed into the RPC buffer. */
3362                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3363                                         &info->mti_fid, NULL);
3364                         if (rc < 0 || !lu_fid_eq(&info->mti_fid,
3365                                                  mdd_object_fid(mdd_sobj)))
3366                                 GOTO(next_put, rc == -ENOENT ? 0 : rc);
3367
3368                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3369                         if (rc != 0)
3370                                 GOTO(next_put, rc);
3371
3372                         rc = __mdd_index_insert(env, pobj,
3373                                         mdd_object_fid(mdd_tobj),
3374                                         mdd_object_type(mdd_tobj),
3375                                         tmp_name, handle);
3376                         if (rc != 0)
3377                                 GOTO(next_put, rc);
3378
3379                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3380                         rc = mdo_ref_add(env, mdd_tobj, handle);
3381                         mdd_write_unlock(env, mdd_tobj);
3382                         if (rc)
3383                                 GOTO(next_put, rc);
3384
3385                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3386                         mdo_ref_del(env, mdd_sobj, handle);
3387                         mdd_write_unlock(env, mdd_sobj);
3388                 }
3389 next_put:
3390                 mdd_object_put(env, pobj);
3391                 if (rc != 0)
3392                         break;
3393
3394                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3395                                                          ldata->ld_reclen);
3396         }
3397
3398         RETURN(rc);
3399 }
3400
3401 static int mdd_migrate_xattrs(const struct lu_env *env,
3402                               struct mdd_object *mdd_sobj,
3403                               struct mdd_object *mdd_tobj)
3404 {
3405         struct mdd_thread_info  *info = mdd_env_info(env);
3406         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3407         char                    *xname;
3408         struct thandle          *handle;
3409         struct lu_buf           xbuf;
3410         int                     xlen;
3411         int                     rem;
3412         int                     xsize;
3413         int                     list_xsize;
3414         struct lu_buf           list_xbuf;
3415         int                     rc;
3416
3417         /* retrieve xattr list from the old object */
3418         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3419         if (list_xsize == -ENODATA)
3420                 return 0;
3421
3422         if (list_xsize < 0)
3423                 return list_xsize;
3424
3425         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3426         if (info->mti_big_buf.lb_buf == NULL)
3427                 return -ENOMEM;
3428
3429         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3430         list_xbuf.lb_len = list_xsize;
3431         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3432         if (rc < 0)
3433                 return rc;
3434         rc = 0;
3435         rem = list_xsize;
3436         xname = list_xbuf.lb_buf;
3437         while (rem > 0) {
3438                 xlen = strnlen(xname, rem - 1) + 1;
3439                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3440                     strcmp(XATTR_NAME_LMV, xname) == 0)
3441                         goto next;
3442
3443                 /* For directory, if there are default layout, migrate here */
3444                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3445                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3446                         goto next;
3447
3448                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3449                 if (xsize == -ENODATA)
3450                         goto next;
3451                 if (xsize < 0)
3452                         GOTO(out, rc);
3453
3454                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3455                 if (info->mti_link_buf.lb_buf == NULL)
3456                         GOTO(out, rc = -ENOMEM);
3457
3458                 xbuf.lb_len = xsize;
3459                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3460                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3461                 if (rc == -ENODATA)
3462                         goto next;
3463                 if (rc < 0)
3464                         GOTO(out, rc);
3465
3466                 handle = mdd_trans_create(env, mdd);
3467                 if (IS_ERR(handle))
3468                         GOTO(out, rc = PTR_ERR(handle));
3469
3470                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3471                                            handle);
3472                 if (rc != 0)
3473                         GOTO(stop_trans, rc);
3474                 /* Note: this transaction is part of migration, and it is not
3475                  * the last step of migration, so we set th_local = 1 to avoid
3476                  * update last rcvd for this transaction */
3477                 handle->th_local = 1;
3478                 rc = mdd_trans_start(env, mdd, handle);
3479                 if (rc != 0)
3480                         GOTO(stop_trans, rc);
3481
3482 again:
3483                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3484                 if (rc == -EEXIST)
3485                         GOTO(stop_trans, rc = 0);
3486
3487                 if (unlikely(rc == -ENOSPC &&
3488                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3489                         rc = linkea_overflow_shrink(
3490                                         (struct linkea_data *)(xbuf.lb_buf));
3491                         if (likely(rc > 0)) {
3492                                 xbuf.lb_len = rc;
3493                                 goto again;
3494                         }
3495                 }
3496
3497                 if (rc != 0)
3498                         GOTO(stop_trans, rc);
3499 stop_trans:
3500                 rc = mdd_trans_stop(env, mdd, rc, handle);
3501                 if (rc != 0)
3502                         GOTO(out, rc);
3503 next:
3504                 rem -= xlen;
3505                 memmove(xname, xname + xlen, rem);
3506         }
3507 out:
3508         return rc;
3509 }
3510
3511 static int mdd_declare_migrate_create(const struct lu_env *env,
3512                                       struct mdd_object *mdd_pobj,
3513                                       struct mdd_object *mdd_sobj,
3514                                       struct mdd_object *mdd_tobj,
3515                                       struct md_op_spec *spec,
3516                                       struct lu_attr *la,
3517                                       union lmv_mds_md *mgr_ea,
3518                                       struct linkea_data *ldata,
3519                                       struct thandle *handle)
3520 {
3521         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3522         const struct lu_buf     *buf;
3523         int                     rc;
3524         int                     mgr_easize;
3525
3526         rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
3527                                                 handle, spec, NULL);
3528         if (rc != 0)
3529                 return rc;
3530
3531         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3532                                            handle);
3533         if (rc != 0)
3534                 return rc;
3535
3536         if (S_ISLNK(la->la_mode)) {
3537                 const char *target_name = spec->u.sp_symname;
3538                 int sym_len = strlen(target_name);
3539                 const struct lu_buf *buf;
3540
3541                 buf = mdd_buf_get_const(env, target_name, sym_len);
3542                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3543                                              buf, 0, handle);
3544                 if (rc != 0)
3545                         return rc;
3546         } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
3547                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
3548                 if (rc != 0)
3549                         return rc;
3550         }
3551
3552         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3553                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3554                                         spec->u.sp_ea.eadatalen);
3555                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3556                                            0, handle);
3557                 if (rc)
3558                         return rc;
3559         }
3560
3561         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3562         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3563         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3564                                    0, handle);
3565         if (rc)
3566                 return rc;
3567
3568         la_flag->la_valid = LA_FLAGS;
3569         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3570         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3571
3572         return rc;
3573 }
3574
3575 static int mdd_migrate_create(const struct lu_env *env,
3576                               struct mdd_object *mdd_pobj,
3577                               struct mdd_object *mdd_sobj,
3578                               struct mdd_object *mdd_tobj,
3579                               const struct lu_name *lname,
3580                               struct lu_attr *la)
3581 {
3582         struct mdd_thread_info  *info = mdd_env_info(env);
3583         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3584         struct md_op_spec       *spec = &info->mti_spec;
3585         struct lu_buf           lmm_buf = { NULL };
3586         struct lu_buf           link_buf = { NULL };
3587         struct lu_buf            mgr_buf;
3588         struct thandle          *handle;
3589         struct lmv_mds_md_v1    *mgr_ea;
3590         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3591         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3592         int                     mgr_easize;
3593         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3594         int                     rc;
3595         ENTRY;
3596
3597         /* prepare spec for create */
3598         memset(spec, 0, sizeof(*spec));
3599         spec->sp_cr_lookup = 0;
3600         spec->sp_feat = &dt_directory_features;
3601         if (S_ISLNK(la->la_mode)) {
3602                 const struct lu_buf *buf;
3603
3604                 buf = lu_buf_check_and_alloc(
3605                                 &mdd_env_info(env)->mti_big_buf,
3606                                 la->la_size + 1);
3607                 link_buf = *buf;
3608                 link_buf.lb_len = la->la_size + 1;
3609                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3610                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3611                 if (rc <= 0) {
3612                         rc = rc != 0 ? rc : -EFAULT;
3613                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3614                                mdd2obd_dev(mdd)->obd_name,
3615                                PFID(mdd_object_fid(mdd_sobj)), rc);
3616                         RETURN(rc);
3617                 }
3618                 spec->u.sp_symname = link_buf.lb_buf;
3619         } else if (S_ISREG(la->la_mode)) {
3620                 /* retrieve lov of the old object */
3621                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3622                 if (rc != 0 && rc != -ENODATA)
3623                         RETURN(rc);
3624                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3625                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3626                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3627                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3628                 }
3629         } else if (S_ISDIR(la->la_mode)) {
3630                 rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
3631                 if (rc == -ENODATA) {
3632                         /* ignore the non-linkEA error */
3633                         ldata = NULL;
3634                         rc = 0;
3635                 }
3636                 if (rc < 0)
3637                         RETURN(rc);
3638         }
3639
3640         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3641         lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
3642         mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
3643         mgr_buf.lb_len = mgr_easize;
3644         mgr_ea = mgr_buf.lb_buf;
3645         memset(mgr_ea, 0, sizeof(*mgr_ea));
3646         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3647         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3648         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3649         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3650         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3651         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3652
3653         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3654
3655         handle = mdd_trans_create(env, mdd);
3656         if (IS_ERR(handle))
3657                 GOTO(out_free, rc = PTR_ERR(handle));
3658
3659         /* Note: this transaction is part of migration, and it is not
3660          * the last step of migration, so we set th_local = 1 to avoid
3661          * update last rcvd for this transaction */
3662         handle->th_local = 1;
3663         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
3664                                         la, mgr_buf.lb_buf, ldata, handle);
3665         if (rc != 0)
3666                 GOTO(stop_trans, rc);
3667
3668         rc = mdd_trans_start(env, mdd, handle);
3669         if (rc != 0)
3670                 GOTO(stop_trans, rc);
3671
3672         /* don't set nlink from the original object */
3673         la->la_valid &= ~LA_NLINK;
3674
3675         /* create the target object */
3676         rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3677                                hint, handle);
3678         if (rc != 0)
3679                 GOTO(stop_trans, rc);
3680
3681         if (S_ISDIR(la->la_mode) && ldata != NULL) {
3682                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3683                 if (rc != 0)
3684                         GOTO(stop_trans, rc);
3685         }
3686
3687         /* Set MIGRATE EA on the source inode, so once the migration needs
3688          * to be re-done during failover, the re-do process can locate the
3689          * target object which is already being created. */
3690         rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
3691         if (rc != 0)
3692                 GOTO(stop_trans, rc);
3693
3694         /* Set immutable flag, so any modification is disabled until
3695          * the migration is done. Once the migration is interrupted,
3696          * if the resume process find the migrating object has both
3697          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3698          * flag and approve the migration */
3699         la_flag->la_valid = LA_FLAGS;
3700         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3701         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3702 stop_trans:
3703         if (handle != NULL)
3704                 rc = mdd_trans_stop(env, mdd, rc, handle);
3705 out_free:
3706         if (lmm_buf.lb_buf != NULL)
3707                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3708         RETURN(rc);
3709 }
3710
3711 static int mdd_migrate_entries(const struct lu_env *env,
3712                                struct mdd_object *mdd_sobj,
3713                                struct mdd_object *mdd_tobj)
3714 {
3715         struct dt_object        *next = mdd_object_child(mdd_sobj);
3716         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3717         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3718         struct thandle          *handle;
3719         struct dt_it            *it;
3720         const struct dt_it_ops  *iops;
3721         int                      result;
3722         struct lu_dirent        *ent;
3723         int                      rc;
3724         ENTRY;
3725
3726         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3727         if (ent == NULL)
3728                 RETURN(-ENOMEM);
3729
3730         if (!dt_try_as_dir(env, next))
3731                 GOTO(out_ent, rc = -ENOTDIR);
3732         /*
3733          * iterate directories
3734          */
3735         iops = &next->do_index_ops->dio_it;
3736         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3737         if (IS_ERR(it))
3738                 GOTO(out_ent, rc = PTR_ERR(it));
3739
3740         rc = iops->load(env, it, 0);
3741         if (rc == 0)
3742                 rc = iops->next(env, it);
3743         else if (rc > 0)
3744                 rc = 0;
3745         /*
3746          * At this point and across for-loop:
3747          *
3748          *  rc == 0 -> ok, proceed.
3749          *  rc >  0 -> end of directory.
3750          *  rc <  0 -> error.
3751          */
3752         do {
3753                 struct mdd_object       *child;
3754                 char                    *name = mdd_env_info(env)->mti_key;
3755                 int                     len;
3756                 int                     is_dir;
3757                 bool                    target_exist = false;
3758
3759                 len = iops->key_size(env, it);
3760                 if (len == 0)
3761                         goto next;
3762
3763                 result = iops->rec(env, it, (struct dt_rec *)ent,
3764                                    LUDA_FID | LUDA_TYPE);
3765                 if (result == -ESTALE)
3766                         goto next;
3767                 if (result != 0) {
3768                         rc = result;
3769                         goto out;
3770                 }
3771
3772                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3773
3774                 /* Insert new fid with target name into target dir */
3775                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3776                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3777                      ent->lde_name[1] == '.'))
3778                         goto next;
3779
3780                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3781                 if (IS_ERR(child))
3782                         GOTO(out, rc = PTR_ERR(child));
3783
3784                 /* child may not exist, but lu_object_attr will assert this,
3785                  * get type from loh_attr directly */
3786                 is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
3787
3788                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3789
3790                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3791
3792                 /* Check whether the name has been inserted to the target */
3793                 if (dt_try_as_dir(env, dt_tobj)) {
3794                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3795
3796                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3797                                        (struct dt_key *)name);
3798                         if (unlikely(rc == 0))
3799                                 target_exist = true;
3800                 }
3801
3802                 handle = mdd_trans_create(env, mdd);
3803                 if (IS_ERR(handle))
3804                         GOTO(out_put, rc = PTR_ERR(handle));
3805
3806                 /* Note: this transaction is part of migration, and it is not
3807                  * the last step of migration, so we set th_local = 1 to avoid
3808                  * updating last rcvd for this transaction */
3809                 handle->th_local = 1;
3810                 if (likely(!target_exist)) {
3811                         rc = mdo_declare_index_insert(env, mdd_tobj,
3812                                 &ent->lde_fid,
3813                                 child->mod_obj.mo_lu.lo_header->loh_attr,
3814                                 name, handle);
3815                         if (rc != 0)
3816                                 GOTO(out_put, rc);
3817
3818                         if (is_dir) {
3819                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3820                                 if (rc != 0)
3821                                         GOTO(out_put, rc);
3822                         }
3823                 }
3824
3825                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3826                 if (rc != 0)
3827                         GOTO(out_put, rc);
3828
3829                 if (is_dir) {
3830                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3831                         if (rc != 0)
3832                                 GOTO(out_put, rc);
3833
3834                         /* Update .. for child */
3835                         rc = mdo_declare_index_delete(env, child, dotdot,
3836                                                       handle);
3837                         if (rc != 0)
3838                                 GOTO(out_put, rc);
3839
3840                         rc = mdo_declare_index_insert(env, child,
3841                                                       mdd_object_fid(mdd_tobj),
3842                                                       S_IFDIR, dotdot, handle);
3843                         if (rc != 0)
3844                                 GOTO(out_put, rc);
3845                 }
3846
3847                 rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
3848                                                      child, name,
3849                                                      strlen(name),
3850                                                      handle);
3851                 if (rc != 0)
3852                         GOTO(out_put, rc);
3853
3854                 rc = mdd_trans_start(env, mdd, handle);
3855                 if (rc != 0) {
3856                         CERROR("%s: transaction start failed: rc = %d\n",
3857                                mdd2obd_dev(mdd)->obd_name, rc);
3858                         GOTO(out_put, rc);
3859                 }
3860
3861                 if (likely(!target_exist)) {
3862                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3863                                 child->mod_obj.mo_lu.lo_header->loh_attr, name,
3864                                 handle);
3865                         if (rc != 0)
3866                                 GOTO(out_put, rc);
3867                 }
3868
3869                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3870                 if (rc != 0)
3871                         GOTO(out_put, rc);
3872
3873                 if (is_dir) {
3874                         rc = __mdd_index_delete_only(env, child, dotdot,
3875                                                      handle);
3876                         if (rc != 0)
3877                                 GOTO(out_put, rc);
3878
3879                         rc = __mdd_index_insert_only(env, child,
3880                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3881                                          dotdot, handle);
3882                         if (rc != 0)
3883                                 GOTO(out_put, rc);
3884                 }
3885
3886                 rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
3887                                              child, name,
3888                                              strlen(name), handle);
3889
3890 out_put:
3891                 mdd_write_unlock(env, child);
3892                 mdd_object_put(env, child);
3893                 rc = mdd_trans_stop(env, mdd, rc, handle);
3894                 if (rc != 0)
3895                         GOTO(out, rc);
3896 next:
3897                 result = iops->next(env, it);
3898                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3899                         GOTO(out, rc = -EINTR);
3900
3901                 if (result == -ESTALE)
3902                         goto next;
3903         } while (result == 0);
3904 out:
3905         iops->put(env, it);
3906         iops->fini(env, it);
3907 out_ent:
3908         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3909         RETURN(rc);
3910 }
3911
3912 static int mdd_declare_update_linkea(const struct lu_env *env,
3913                                      struct mdd_object *mdd_pobj,
3914                                      struct mdd_object *mdd_sobj,
3915                                      struct mdd_object *mdd_tobj,
3916                                      const struct lu_name *child_name,
3917                                      struct linkea_data *ldata,
3918                                      struct thandle *handle)
3919 {
3920         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3921                                           child_name, ldata, handle, 1);
3922 }
3923
3924 static int mdd_update_linkea(const struct lu_env *env,
3925                              struct mdd_object *mdd_pobj,
3926                              struct mdd_object *mdd_sobj,
3927                              struct mdd_object *mdd_tobj,
3928                              const struct lu_name *child_name,
3929                              struct linkea_data *ldata,
3930                              struct thandle *handle)
3931 {
3932         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3933                                           child_name, ldata, handle, 0);
3934 }
3935
3936 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3937                                            struct mdd_object *mdd_pobj,
3938                                            struct mdd_object *mdd_sobj,
3939                                            struct mdd_object *mdd_tobj,
3940                                            const struct lu_name *lname,
3941                                            struct lu_attr *la,
3942                                            struct lu_attr *parent_la,
3943                                            struct linkea_data *ldata,
3944                                            struct thandle *handle)
3945 {
3946         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3947         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
3948         int rc;
3949
3950         /* Revert IMMUTABLE flag */
3951         la_flag->la_valid = LA_FLAGS;
3952         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3953         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3954         if (rc != 0)
3955                 return rc;
3956
3957         /* delete entry from source dir */
3958         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3959         if (rc != 0)
3960                 return rc;
3961
3962         if (ldata->ld_buf != NULL) {
3963                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3964                                                mdd_tobj, lname, ldata, handle);
3965                 if (rc != 0)
3966                         return rc;
3967         }
3968
3969         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3970                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3971                                            handle);
3972                 if (rc != 0)
3973                         return rc;
3974
3975                 handle->th_complex = 1;
3976                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3977                                            XATTR_NAME_FID,
3978                                            LU_XATTR_REPLACE, handle);
3979                 if (rc < 0)
3980                         return rc;
3981         }
3982
3983         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3984                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3985                 if (rc != 0)
3986                         return rc;
3987         }
3988
3989         /* new name */
3990         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3991                                       mdd_object_type(mdd_tobj),
3992                                       lname->ln_name, handle);
3993         if (rc != 0)
3994                 return rc;
3995
3996         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
3997         if (rc != 0)
3998                 return rc;
3999
4000         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4001                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
4002                 if (rc != 0)
4003                         return rc;
4004         }
4005
4006         /* delete old object */
4007         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4008         if (rc != 0)
4009                 return rc;
4010
4011         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4012                 /* delete old object */
4013                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4014                 if (rc != 0)
4015                         return rc;
4016                 /* set nlink to 0 */
4017                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
4018                 if (rc != 0)
4019                         return rc;
4020         }
4021
4022         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
4023         if (rc)
4024                 return rc;
4025
4026         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
4027         if (rc != 0)
4028                 return rc;
4029
4030         rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
4031                                          handle);
4032
4033         return rc;
4034 }
4035
4036 static int mdd_migrate_update_name(const struct lu_env *env,
4037                                    struct mdd_object *mdd_pobj,
4038                                    struct mdd_object *mdd_sobj,
4039                                    struct mdd_object *mdd_tobj,
4040                                    const struct lu_name *lname,
4041                                    struct md_attr *ma)
4042 {
4043         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
4044         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4045         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
4046         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
4047         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
4048         struct thandle          *handle;
4049         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
4050         const char              *name = lname->ln_name;
4051         int                     rc;
4052         ENTRY;
4053
4054         /* update time for parent */
4055         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
4056         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
4057         p_la->la_valid = LA_CTIME;
4058
4059         rc = mdd_la_get(env, mdd_sobj, so_attr);
4060         if (rc != 0)
4061                 RETURN(rc);
4062
4063         ldata->ld_buf = NULL;
4064         rc = mdd_links_read(env, mdd_sobj, ldata);
4065         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
4066                 RETURN(rc);
4067
4068         handle = mdd_trans_create(env, mdd);
4069         if (IS_ERR(handle))
4070                 RETURN(PTR_ERR(handle));
4071
4072         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
4073                                              lname, so_attr, p_la, ldata,
4074                                              handle);
4075         if (rc != 0) {
4076                 /* If the migration can not be fit in one transaction, just
4077                  * leave it in the original MDT */
4078                 if (rc == -E2BIG)
4079                         GOTO(stop_trans, rc = 0);
4080                 else
4081                         GOTO(stop_trans, rc);
4082         }
4083
4084         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
4085                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
4086                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4087                PFID(mdd_object_fid(mdd_tobj)));
4088
4089         rc = mdd_trans_start(env, mdd, handle);
4090         if (rc != 0)
4091                 GOTO(stop_trans, rc);
4092
4093         /* Revert IMMUTABLE flag */
4094         la_flag->la_valid = LA_FLAGS;
4095         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
4096         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
4097         if (rc != 0)
4098                 GOTO(stop_trans, rc);
4099
4100         /* Remove source name from source directory */
4101         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
4102         if (rc != 0)
4103                 GOTO(stop_trans, rc);
4104
4105         if (ldata->ld_buf != NULL) {
4106                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
4107                                        lname, ldata, handle);
4108                 if (rc != 0)
4109                         GOTO(stop_trans, rc);
4110
4111                 /*  linkea update might decrease the source object
4112                  *  nlink, let's get the attr again after ref_del */
4113                 rc = mdd_la_get(env, mdd_sobj, so_attr);
4114                 if (rc != 0)
4115                         GOTO(stop_trans, rc);
4116         }
4117
4118         if (S_ISREG(so_attr->la_mode)) {
4119                 if (so_attr->la_nlink == 1) {
4120                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4121                                            handle);
4122                         if (rc != 0 && rc != -ENODATA)
4123                                 GOTO(stop_trans, rc);
4124
4125                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
4126                                            XATTR_NAME_FID,
4127                                            LU_XATTR_REPLACE, handle);
4128                         if (rc < 0)
4129                                 GOTO(stop_trans, rc);
4130                 }
4131         }
4132
4133         /* Insert new fid with target name into target dir */
4134         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4135                                 mdd_object_type(mdd_tobj), name, handle);
4136         if (rc != 0)
4137                 GOTO(stop_trans, rc);
4138
4139         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4140
4141         mdd_sobj->mod_flags |= DEAD_OBJ;
4142         rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
4143         if (rc != 0)
4144                 GOTO(out_unlock, rc);
4145
4146         rc = mdd_orphan_insert(env, mdd_sobj, handle);
4147         if (rc != 0)
4148                 GOTO(out_unlock, rc);
4149
4150         mdo_ref_del(env, mdd_sobj, handle);
4151         if (is_dir)
4152                 mdo_ref_del(env, mdd_sobj, handle);
4153
4154         /* Get the attr again after ref_del */
4155         rc = mdd_la_get(env, mdd_sobj, so_attr);
4156         if (rc != 0)
4157                 GOTO(out_unlock, rc);
4158
4159         ma->ma_attr = *so_attr;
4160         ma->ma_valid |= MA_INODE;
4161
4162         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4163         if (rc != 0)
4164                 GOTO(out_unlock, rc);
4165
4166         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4167                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4168                                mdo2fid(mdd_pobj), lname, lname, handle);
4169         if (rc != 0) {
4170                 CWARN("%s: changelog for migrate %s "DFID
4171                       "under "DFID" failed: rc = %d\n",
4172                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4173                       PFID(mdd_object_fid(mdd_sobj)),
4174                       PFID(mdd_object_fid(mdd_pobj)), rc);
4175                 /* Sigh, there are no easy way to migrate back the object, so
4176                  * let's reset the result to 0 for now XXX */
4177                 rc = 0;
4178         }
4179 out_unlock:
4180         mdd_write_unlock(env, mdd_sobj);
4181
4182 stop_trans:
4183         rc = mdd_trans_stop(env, mdd, rc, handle);
4184
4185         RETURN(rc);
4186 }
4187
4188 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4189                           const struct lu_fid *fid, __u32 *mdt_index)
4190 {
4191         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4192         struct seq_server_site *ss;
4193         int rc;
4194
4195         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4196
4197         range->lsr_flags = LU_SEQ_RANGE_MDT;
4198         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4199         if (rc != 0)
4200                 return rc;
4201
4202         *mdt_index = range->lsr_index;
4203
4204         return 0;
4205 }
4206 /**
4207  * Check whether we should migrate the file/dir
4208  * return val
4209  *      < 0  permission check failed or other error.
4210  *      = 0  the file can be migrated.
4211  *      > 0  the file does not need to be migrated, mostly
4212  *           for multiple link file
4213  **/
4214 static int mdd_migrate_sanity_check(const struct lu_env *env,
4215                                     struct mdd_object *pobj,
4216                                     const struct lu_attr *pattr,
4217                                     struct mdd_object *sobj,
4218                                     struct lu_attr *sattr)
4219 {
4220         struct mdd_thread_info  *info = mdd_env_info(env);
4221         struct linkea_data      *ldata = &info->mti_link_data;
4222         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4223         int                     mgr_easize;
4224         struct lu_buf           *mgr_buf;
4225         int                     count;
4226         int                     rc;
4227         __u64 mdt_index;
4228         ENTRY;
4229
4230         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4231         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4232         if (mgr_buf->lb_buf == NULL)
4233                 RETURN(-ENOMEM);
4234
4235         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4236         if (rc > 0) {
4237                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4238
4239                 /* If the object has migrateEA, it means IMMUTE flag
4240                  * is being set by previous migration process, so it
4241                  * needs to override the IMMUTE flag, otherwise the
4242                  * following sanity check will fail */
4243                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4244                                                 LMV_HASH_FLAG_MIGRATION) {
4245                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4246
4247                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4248                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4249                                mdd2obd_dev(mdd)->obd_name,
4250                                PFID(mdd_object_fid(sobj)));
4251                 }
4252         }
4253
4254         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4255                                      sobj, sattr, NULL, NULL);
4256         if (rc != 0)
4257                 RETURN(rc);
4258
4259         /* Then it will check if the file should be migrated. If the file
4260          * has mulitple links, we only need migrate the file if all of its
4261          * entries has been migrated to the remote MDT */
4262         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4263                 RETURN(0);
4264
4265         rc = mdd_links_read(env, sobj, ldata);
4266         if (rc != 0) {
4267                 /* For multiple links files, if there are no linkEA data at all,
4268                  * means the file might be created before linkEA is enabled, and
4269                  * all of its links should not be migrated yet, otherwise it
4270                  * should have some linkEA there */
4271                 if (rc == -ENOENT || rc == -ENODATA)
4272                         RETURN(1);
4273                 RETURN(rc);
4274         }
4275
4276         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4277         /* If there are still links locally, then the file will not be
4278          * migrated. */
4279         LASSERT(ldata->ld_leh != NULL);
4280
4281         /* If the linkEA is overflow, then means there are some unknown name
4282          * entries under unknown parents, that will prevent the migration. */
4283         if (unlikely(ldata->ld_leh->leh_overflow_time))
4284                 RETURN(1);
4285
4286         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4287         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4288                 struct lu_name          lname;
4289                 struct lu_fid           fid;
4290                 __u32                   parent_mdt_index;
4291
4292                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4293                                     &lname, &fid);
4294                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4295                                                          ldata->ld_reclen);
4296
4297                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4298                 if (rc != 0)
4299                         RETURN(rc);
4300
4301                 /* Migrate the object only if none of its parents are on the
4302                  * current MDT. */
4303                 if (parent_mdt_index != mdt_index)
4304                         continue;
4305
4306                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4307                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4308                        lname.ln_name, PFID(&fid));
4309                 rc = 1;
4310                 break;
4311         }
4312
4313         RETURN(rc);
4314 }
4315
4316 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4317                        struct md_object *sobj, const struct lu_name *lname,
4318                        struct md_object *tobj, struct md_attr *ma)
4319 {
4320         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4321         struct mdd_device       *mdd = mdo2mdd(pobj);
4322         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4323         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4324         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4325         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4326         bool                    created = false;
4327         int                     rc;
4328
4329         ENTRY;
4330         /* If the file will being migrated, it will check whether
4331          * the file is being opened by someone else right now */
4332         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4333         if (mdd_sobj->mod_count > 0) {
4334                 CDEBUG(D_OTHER,
4335                        "%s: "DFID"%s is already opened count %d: rc = %d\n",
4336                        mdd2obd_dev(mdd)->obd_name,
4337                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4338                        mdd_sobj->mod_count, -EBUSY);
4339                 mdd_read_unlock(env, mdd_sobj);
4340                 GOTO(put, rc = -EBUSY);
4341         }
4342         mdd_read_unlock(env, mdd_sobj);
4343
4344         rc = mdd_la_get(env, mdd_sobj, so_attr);
4345         if (rc != 0)
4346                 GOTO(put, rc);
4347
4348         rc = mdd_la_get(env, mdd_pobj, pattr);
4349         if (rc != 0)
4350                 GOTO(put, rc);
4351
4352         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4353         if (rc != 0) {
4354                 if (rc > 0)
4355                         rc = 0;
4356                 GOTO(put, rc);
4357         }
4358
4359         /* Sigh, it is impossible to finish all of migration in a single
4360          * transaction, for example migrating big directory entries to the
4361          * new MDT, it needs insert all of name entries of children in the
4362          * new directory.
4363          *
4364          * So migration will be done in multiple steps and transactions.
4365          *
4366          * 1. create an orphan object on the remote MDT in one transaction.
4367          * 2. migrate extend attributes to the new target file/directory.
4368          * 3. For directory, migrate the entries to the new MDT and update
4369          * linkEA of each children. Because we can not migrate all entries
4370          * in a single transaction, so the migrating directory will become
4371          * a striped directory during migration, so once the process is
4372          * interrupted, the directory is still accessible. (During lookup,
4373          * client will locate the name by searching both original and target
4374          * object).
4375          * 4. Finally, update the name/FID to point to the new file/directory
4376          * in a separate transaction.
4377          */
4378
4379         /* step 1: Check whether the orphan object has been created, and create
4380          * orphan object on the remote MDT if needed */
4381         if (!mdd_object_exists(mdd_tobj)) {
4382                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4383                                         lname, so_attr);
4384                 if (rc != 0)
4385                         GOTO(put, rc);
4386                 created = true;
4387         }
4388
4389         LASSERT(mdd_object_exists(mdd_tobj));
4390         /* step 2: migrate xattr */
4391         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4392         if (rc != 0)
4393                 GOTO(put, rc);
4394
4395         /* step 3: migrate name entries to the orphan object */
4396         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4397                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4398                 if (rc != 0)
4399                         GOTO(put, rc);
4400                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4401                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4402                         GOTO(put, rc = 0);
4403         } else {
4404                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4405         }
4406
4407         LASSERT(mdd_object_exists(mdd_tobj));
4408         /* step 4: update name entry to the new object */
4409         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4410                                      ma);
4411         if (rc != 0)
4412                 GOTO(put, rc);
4413
4414         /* newly created target was not locked, don't cache its attributes */
4415         if (created)
4416                 mdd_invalidate(env, tobj);
4417 put:
4418         RETURN(rc);
4419 }
4420
4421 const struct md_dir_operations mdd_dir_ops = {
4422         .mdo_is_subdir     = mdd_is_subdir,
4423         .mdo_lookup        = mdd_lookup,
4424         .mdo_create        = mdd_create,
4425         .mdo_rename        = mdd_rename,
4426         .mdo_link          = mdd_link,
4427         .mdo_unlink        = mdd_unlink,
4428         .mdo_create_data   = mdd_create_data,
4429         .mdo_migrate       = mdd_migrate,
4430 };