Whamcloud - gitweb
LU-10454 mdd: check return value of lu_ucred()
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_dir.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <linux/kthread.h>
42
43 #include <obd_class.h>
44 #include <obd_support.h>
45 #include <lustre_mds.h>
46 #include <lustre_fid.h>
47
48 #include "mdd_internal.h"
49
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
52
53 static struct lu_name lname_dotdot = {
54         .ln_name        = (char *) dotdot,
55         .ln_namelen     = sizeof(dotdot) - 1,
56 };
57
58 static inline int
59 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
60 {
61         if (!lu_name_is_valid(ln))
62                 return -EINVAL;
63         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
64                 return -ENAMETOOLONG;
65         else
66                 return 0;
67 }
68
69 /* Get FID from name and parent */
70 static int
71 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72              const struct lu_attr *pattr, const struct lu_name *lname,
73              struct lu_fid* fid, int mask)
74 {
75         const char *name                = lname->ln_name;
76         const struct dt_key *key        = (const struct dt_key *)name;
77         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
78         struct mdd_device *m            = mdo2mdd(pobj);
79         struct dt_object *dir           = mdd_object_child(mdd_obj);
80         int rc;
81         ENTRY;
82
83         if (unlikely(mdd_is_dead_obj(mdd_obj)))
84                 RETURN(-ESTALE);
85
86         if (!mdd_object_exists(mdd_obj))
87                 RETURN(-ESTALE);
88
89         if (mdd_object_remote(mdd_obj)) {
90                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
91                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
92         }
93
94         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
95                                             MOR_TGT_PARENT);
96         if (rc)
97                 RETURN(rc);
98
99         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
100                    dt_try_as_dir(env, dir)))
101                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
102         else
103                 rc = -ENOTDIR;
104
105         RETURN(rc);
106 }
107
108 int mdd_lookup(const struct lu_env *env,
109                struct md_object *pobj, const struct lu_name *lname,
110                struct lu_fid *fid, struct md_op_spec *spec)
111 {
112         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
113         int rc;
114         ENTRY;
115
116         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
117         if (rc != 0)
118                 RETURN(rc);
119
120         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
121                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
122         RETURN(rc);
123 }
124
125 /** Read the link EA into a temp buffer.
126  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
127  * A pointer to the buffer is stored in \a ldata::ld_buf.
128  *
129  * \retval 0 or error
130  */
131 static int __mdd_links_read(const struct lu_env *env,
132                             struct mdd_object *mdd_obj,
133                             struct linkea_data *ldata)
134 {
135         int rc;
136
137         if (!mdd_object_exists(mdd_obj))
138                 return -ENODATA;
139
140         /* First try a small buf */
141         LASSERT(env != NULL);
142         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
143                                                PAGE_SIZE);
144         if (ldata->ld_buf->lb_buf == NULL)
145                 return -ENOMEM;
146
147         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
148         if (rc == -ERANGE) {
149                 /* Buf was too small, figure out what we need. */
150                 lu_buf_free(ldata->ld_buf);
151                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
152                                    XATTR_NAME_LINK);
153                 if (rc < 0)
154                         return rc;
155                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
156                 if (ldata->ld_buf->lb_buf == NULL)
157                         return -ENOMEM;
158                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
159                                   XATTR_NAME_LINK);
160         }
161         if (rc < 0) {
162                 lu_buf_free(ldata->ld_buf);
163                 ldata->ld_buf = NULL;
164                 return rc;
165         }
166
167         return linkea_init(ldata);
168 }
169
170 static int mdd_links_read(const struct lu_env *env,
171                           struct mdd_object *mdd_obj,
172                           struct linkea_data *ldata)
173 {
174         int rc;
175
176         rc = __mdd_links_read(env, mdd_obj, ldata);
177         if (!rc)
178                 rc = linkea_init(ldata);
179
180         return rc;
181 }
182
183 static int mdd_links_read_with_rec(const struct lu_env *env,
184                                    struct mdd_object *mdd_obj,
185                                    struct linkea_data *ldata)
186 {
187         int rc;
188
189         rc = __mdd_links_read(env, mdd_obj, ldata);
190         if (!rc)
191                 rc = linkea_init_with_rec(ldata);
192
193         return rc;
194 }
195
196 /**
197  * Get parent FID of the directory
198  *
199  * Read parent FID from linkEA, if that fails, then do lookup
200  * dotdot to get the parent FID.
201  *
202  * \param[in] env       execution environment
203  * \param[in] obj       object from which to find the parent FID
204  * \param[in] attr      attribute of the object
205  * \param[out] fid      fid to get the parent FID
206  *
207  * \retval              0 if getting the parent FID succeeds.
208  * \retval              negative errno if getting the parent FID fails.
209  **/
210 static inline int mdd_parent_fid(const struct lu_env *env,
211                                  struct mdd_object *obj,
212                                  const struct lu_attr *attr,
213                                  struct lu_fid *fid)
214 {
215         struct mdd_thread_info  *info = mdd_env_info(env);
216         struct linkea_data      ldata = { NULL };
217         struct lu_buf           *buf = &info->mti_link_buf;
218         struct lu_name          lname;
219         int                     rc = 0;
220
221         ENTRY;
222
223         LASSERT(S_ISDIR(mdd_object_type(obj)));
224
225         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
226         if (buf->lb_buf == NULL)
227                 GOTO(lookup, rc = 0);
228
229         ldata.ld_buf = buf;
230         rc = mdd_links_read_with_rec(env, obj, &ldata);
231         if (rc != 0)
232                 GOTO(lookup, rc);
233
234         LASSERT(ldata.ld_leh != NULL);
235         /* Directory should only have 1 parent */
236         if (ldata.ld_leh->leh_reccount > 1)
237                 GOTO(lookup, rc);
238
239         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
240
241         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
242         if (likely(fid_is_sane(fid)))
243                 RETURN(0);
244 lookup:
245         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
246         RETURN(rc);
247 }
248
249 /*
250  * For root fid use special function, which does not compare version component
251  * of fid. Version component is different for root fids on all MDTs.
252  */
253 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
254 {
255         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
256                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
257 }
258
259 /*
260  * return 1: if lf is the fid of the ancestor of p1;
261  * return 0: if not;
262  *
263  * return -EREMOTE: if remote object is found, in this
264  * case fid of remote object is saved to @pf;
265  *
266  * otherwise: values < 0, errors.
267  */
268 static int mdd_is_parent(const struct lu_env *env,
269                         struct mdd_device *mdd,
270                         struct mdd_object *p1,
271                         const struct lu_attr *attr,
272                         const struct lu_fid *lf,
273                         struct lu_fid *pf)
274 {
275         struct mdd_object *parent = NULL;
276         struct lu_fid *pfid;
277         int rc;
278         ENTRY;
279
280         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
281         pfid = &mdd_env_info(env)->mti_fid;
282
283         /* Check for root first. */
284         if (mdd_is_root(mdd, mdo2fid(p1)))
285                 RETURN(0);
286
287         for(;;) {
288                 /* this is done recursively */
289                 rc = mdd_parent_fid(env, p1, attr, pfid);
290                 if (rc)
291                         GOTO(out, rc);
292                 if (mdd_is_root(mdd, pfid))
293                         GOTO(out, rc = 0);
294                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
295                         GOTO(out, rc = 0);
296                 if (lu_fid_eq(pfid, lf))
297                         GOTO(out, rc = 1);
298                 if (parent != NULL)
299                         mdd_object_put(env, parent);
300
301                 parent = mdd_object_find(env, mdd, pfid);
302                 if (IS_ERR(parent))
303                         GOTO(out, rc = PTR_ERR(parent));
304
305                 if (!mdd_object_exists(parent))
306                         GOTO(out, rc = -EINVAL);
307
308                 p1 = parent;
309         }
310         EXIT;
311 out:
312         if (parent && !IS_ERR(parent))
313                 mdd_object_put(env, parent);
314         return rc;
315 }
316
317 /*
318  * No permission check is needed.
319  *
320  * returns 1: if fid is ancestor of @mo;
321  * returns 0: if fid is not an ancestor of @mo;
322  *
323  * returns EREMOTE if remote object is found, fid of remote object is saved to
324  * @fid;
325  *
326  * returns < 0: if error
327  */
328 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
329                   const struct lu_fid *fid, struct lu_fid *sfid)
330 {
331         struct mdd_device *mdd = mdo2mdd(mo);
332         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
333         int rc;
334         ENTRY;
335
336         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
337                 RETURN(0);
338
339         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
340         if (rc != 0)
341                 RETURN(rc);
342
343         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
344         if (rc == 0) {
345                 /* found root */
346                 fid_zero(sfid);
347         } else if (rc == 1) {
348                 /* found @fid is parent */
349                 *sfid = *fid;
350                 rc = 0;
351         }
352         RETURN(rc);
353 }
354
355 /*
356  * Check that @dir contains no entries except (possibly) dot and dotdot.
357  *
358  * Returns:
359  *
360  *             0        empty
361  *      -ENOTDIR        not a directory object
362  *    -ENOTEMPTY        not empty
363  *           -ve        other error
364  *
365  */
366 static int mdd_dir_is_empty(const struct lu_env *env,
367                             struct mdd_object *dir)
368 {
369         struct dt_it     *it;
370         struct dt_object *obj;
371         const struct dt_it_ops *iops;
372         int result;
373         ENTRY;
374
375         obj = mdd_object_child(dir);
376         if (!dt_try_as_dir(env, obj))
377                 RETURN(-ENOTDIR);
378
379         iops = &obj->do_index_ops->dio_it;
380         it = iops->init(env, obj, LUDA_64BITHASH);
381         if (!IS_ERR(it)) {
382                 result = iops->get(env, it, (const struct dt_key *)"");
383                 if (result > 0) {
384                         int i;
385                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
386                                 result = iops->next(env, it);
387                         if (result == 0)
388                                 result = -ENOTEMPTY;
389                         else if (result == 1)
390                                 result = 0;
391                 } else if (result == 0)
392                         /*
393                          * Huh? Index contains no zero key?
394                          */
395                         result = -EIO;
396
397                 iops->put(env, it);
398                 iops->fini(env, it);
399         } else
400                 result = PTR_ERR(it);
401         RETURN(result);
402 }
403
404 /**
405  * Determine if the target object can be hard linked, and right now it only
406  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
407  * directory nlink count might exceed the maximum link count(see
408  * osd_object_ref_add), so it only check nlink for non-directories.
409  *
410  * \param[in] env       thread environment
411  * \param[in] obj       object being linked to
412  * \param[in] la        attributes of \a obj
413  *
414  * \retval              0 if \a obj can be hard linked
415  * \retval              negative error if \a obj is a directory or has too
416  *                      many links
417  */
418 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
419                           const struct lu_attr *la)
420 {
421         struct mdd_device *m = mdd_obj2mdd_dev(obj);
422         ENTRY;
423
424         LASSERT(la != NULL);
425
426         /* Subdir count limitation can be broken through
427          * (see osd_object_ref_add), so only check non-directory here. */
428         if (!S_ISDIR(la->la_mode) &&
429             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
430                 RETURN(-EMLINK);
431
432         RETURN(0);
433 }
434
435 /**
436  * Check whether it may create the cobj under the pobj.
437  *
438  * \param[in] env       execution environment
439  * \param[in] pobj      the parent directory
440  * \param[in] pattr     the attribute of the parent directory
441  * \param[in] cobj      the child to be created
442  * \param[in] check_perm        if check WRITE|EXEC permission for parent
443  *
444  * \retval              = 0 create the child under this dir is allowed
445  * \retval              negative errno create the child under this dir is
446  *                      not allowed
447  */
448 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
449                    const struct lu_attr *pattr, struct mdd_object *cobj,
450                    bool check_perm)
451 {
452         int rc = 0;
453         ENTRY;
454
455         if (cobj && mdd_object_exists(cobj))
456                 RETURN(-EEXIST);
457
458         if (mdd_is_dead_obj(pobj))
459                 RETURN(-ENOENT);
460
461         if (check_perm)
462                 rc = mdd_permission_internal_locked(env, pobj, pattr,
463                                                     MAY_WRITE | MAY_EXEC,
464                                                     MOR_TGT_PARENT);
465         RETURN(rc);
466 }
467
468 /*
469  * Check whether can unlink from the pobj in the case of "cobj == NULL".
470  */
471 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
472                    const struct lu_attr *pattr, const struct lu_attr *attr)
473 {
474         int rc;
475         ENTRY;
476
477         if (mdd_is_dead_obj(pobj))
478                 RETURN(-ENOENT);
479
480         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
481                 RETURN(-EPERM);
482
483         rc = mdd_permission_internal_locked(env, pobj, pattr,
484                                             MAY_WRITE | MAY_EXEC,
485                                             MOR_TGT_PARENT);
486         if (rc != 0)
487                 RETURN(rc);
488
489         if (pattr->la_flags & LUSTRE_APPEND_FL)
490                 RETURN(-EPERM);
491
492         RETURN(rc);
493 }
494
495 /*
496  * pobj == NULL is remote ops case, under such case, pobj's
497  * VTX feature has been checked already, no need check again.
498  */
499 static inline int mdd_is_sticky(const struct lu_env *env,
500                                 struct mdd_object *pobj,
501                                 const struct lu_attr *pattr,
502                                 struct mdd_object *cobj,
503                                 const struct lu_attr *cattr)
504 {
505         struct lu_ucred *uc = lu_ucred_assert(env);
506
507         if (pobj != NULL) {
508                 LASSERT(pattr != NULL);
509                 if (!(pattr->la_mode & S_ISVTX) ||
510                     (pattr->la_uid == uc->uc_fsuid))
511                         return 0;
512         }
513
514         LASSERT(cattr != NULL);
515         if (cattr->la_uid == uc->uc_fsuid)
516                 return 0;
517
518         return !md_capable(uc, CFS_CAP_FOWNER);
519 }
520
521 static int mdd_may_delete_entry(const struct lu_env *env,
522                                 struct mdd_object *pobj,
523                                 const struct lu_attr *pattr,
524                                 int check_perm)
525 {
526         ENTRY;
527
528         LASSERT(pobj != NULL);
529         if (!mdd_object_exists(pobj))
530                 RETURN(-ENOENT);
531
532         if (mdd_is_dead_obj(pobj))
533                 RETURN(-ENOENT);
534
535         if (check_perm) {
536                 int rc;
537                 rc = mdd_permission_internal_locked(env, pobj, pattr,
538                                             MAY_WRITE | MAY_EXEC,
539                                             MOR_TGT_PARENT);
540                 if (rc)
541                         RETURN(rc);
542         }
543
544         if (pattr->la_flags & LUSTRE_APPEND_FL)
545                 RETURN(-EPERM);
546
547         RETURN(0);
548 }
549
550 /*
551  * Check whether it may delete the cobj from the pobj.
552  * pobj maybe NULL
553  */
554 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
555                    const struct lu_attr *tpattr, struct mdd_object *tobj,
556                    const struct lu_attr *tattr, const struct lu_attr *cattr,
557                    int check_perm, int check_empty)
558 {
559         int rc = 0;
560         ENTRY;
561
562         if (tpobj) {
563                 LASSERT(tpattr != NULL);
564                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
565                 if (rc != 0)
566                         RETURN(rc);
567         }
568
569         if (tobj == NULL)
570                 RETURN(0);
571
572         if (!mdd_object_exists(tobj))
573                 RETURN(-ENOENT);
574
575         if (mdd_is_dead_obj(tobj))
576                 RETURN(-ESTALE);
577
578         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
579                 RETURN(-EPERM);
580
581         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
582                 RETURN(-EPERM);
583
584         /* additional check the rename case */
585         if (cattr) {
586                 if (S_ISDIR(cattr->la_mode)) {
587                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
588
589                         if (!S_ISDIR(tattr->la_mode))
590                                 RETURN(-ENOTDIR);
591
592                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
593                                 RETURN(-EBUSY);
594                 } else if (S_ISDIR(tattr->la_mode))
595                         RETURN(-EISDIR);
596         }
597
598         if (S_ISDIR(tattr->la_mode) && check_empty)
599                 rc = mdd_dir_is_empty(env, tobj);
600
601         RETURN(rc);
602 }
603
604 /**
605  * Check whether it can create the link file(linked to @src_obj) under
606  * the target directory(@tgt_obj), and src_obj has been locked by
607  * mdd_write_lock.
608  *
609  * \param[in] env       execution environment
610  * \param[in] tgt_obj   the target directory
611  * \param[in] tattr     attributes of target directory
612  * \param[in] lname     the link name
613  * \param[in] src_obj   source object for link
614  * \param[in] cattr     attributes for source object
615  *
616  * \retval              = 0 it is allowed to create the link file under tgt_obj
617  * \retval              negative error not allowed to create the link file
618  */
619 static int mdd_link_sanity_check(const struct lu_env *env,
620                                  struct mdd_object *tgt_obj,
621                                  const struct lu_attr *tattr,
622                                  const struct lu_name *lname,
623                                  struct mdd_object *src_obj,
624                                  const struct lu_attr *cattr)
625 {
626         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
627         int rc = 0;
628         ENTRY;
629
630         if (!mdd_object_exists(src_obj))
631                 RETURN(-ENOENT);
632
633         if (mdd_is_dead_obj(src_obj))
634                 RETURN(-ESTALE);
635
636         /* Local ops, no lookup before link, check filename length here. */
637         rc = mdd_name_check(m, lname);
638         if (rc < 0)
639                 RETURN(rc);
640
641         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
642                 RETURN(-EPERM);
643
644         if (S_ISDIR(mdd_object_type(src_obj)))
645                 RETURN(-EPERM);
646
647         LASSERT(src_obj != tgt_obj);
648         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
649         if (rc != 0)
650                 RETURN(rc);
651
652         rc = __mdd_may_link(env, src_obj, cattr);
653
654         RETURN(rc);
655 }
656
657 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
658                                    const char *name, struct thandle *handle)
659 {
660         struct dt_object *next = mdd_object_child(pobj);
661         int rc;
662         ENTRY;
663
664         if (dt_try_as_dir(env, next))
665                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
666         else
667                 rc = -ENOTDIR;
668
669         RETURN(rc);
670 }
671
672 static int __mdd_index_insert_only(const struct lu_env *env,
673                                    struct mdd_object *pobj,
674                                    const struct lu_fid *lf, __u32 type,
675                                    const char *name,
676                                    struct thandle *handle)
677 {
678         struct dt_object *next = mdd_object_child(pobj);
679         int               rc;
680         ENTRY;
681
682         if (dt_try_as_dir(env, next)) {
683                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
684                 struct lu_ucred         *uc  = lu_ucred_check(env);
685                 int                      ignore_quota;
686
687                 rec->rec_fid = lf;
688                 rec->rec_type = type;
689                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
690                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
691                                (const struct dt_key *)name, handle,
692                                ignore_quota);
693         } else {
694                 rc = -ENOTDIR;
695         }
696         RETURN(rc);
697 }
698
699 /* insert named index, add reference if isdir */
700 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
701                               const struct lu_fid *lf, __u32 type,
702                               const char *name, struct thandle *handle)
703 {
704         int rc;
705         ENTRY;
706
707         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
708         if (rc == 0 && S_ISDIR(type)) {
709                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
710                 mdo_ref_add(env, pobj, handle);
711                 mdd_write_unlock(env, pobj);
712         }
713
714         RETURN(rc);
715 }
716
717 /* delete named index, drop reference if isdir */
718 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
719                               const char *name, int is_dir,
720                               struct thandle *handle)
721 {
722         int               rc;
723         ENTRY;
724
725         rc = __mdd_index_delete_only(env, pobj, name, handle);
726         if (rc == 0 && is_dir) {
727                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
728                 mdo_ref_del(env, pobj, handle);
729                 mdd_write_unlock(env, pobj);
730         }
731
732         RETURN(rc);
733 }
734
735 static int mdd_llog_record_calc_size(const struct lu_env *env,
736                                      const struct lu_name *tname,
737                                      const struct lu_name *sname)
738 {
739         const struct lu_ucred   *uc = lu_ucred(env);
740         enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
741         enum changelog_rec_extra_flags crfe = CLFE_UIDGID;
742
743         if (sname != NULL)
744                 crf |= CLF_RENAME;
745
746         if (uc != NULL && uc->uc_jobid[0] != '\0')
747                 crf |= CLF_JOBID;
748
749         return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
750                              changelog_rec_offset(crf, crfe) +
751                              (tname != NULL ? tname->ln_namelen : 0) +
752                              (sname != NULL ? 1 + sname->ln_namelen : 0));
753 }
754
755 int mdd_declare_changelog_store(const struct lu_env *env,
756                                 struct mdd_device *mdd,
757                                 const struct lu_name *tname,
758                                 const struct lu_name *sname,
759                                 struct thandle *handle)
760 {
761         struct obd_device               *obd = mdd2obd_dev(mdd);
762         struct llog_ctxt                *ctxt;
763         struct llog_changelog_rec       *rec;
764         struct lu_buf                   *buf;
765         struct thandle                  *llog_th;
766         int                              reclen;
767         int                              rc;
768
769         /* Not recording */
770         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
771                 return 0;
772
773         reclen = mdd_llog_record_calc_size(env, tname, sname);
774         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
775         if (buf->lb_buf == NULL)
776                 return -ENOMEM;
777
778         rec = buf->lb_buf;
779         rec->cr_hdr.lrh_len = reclen;
780         rec->cr_hdr.lrh_type = CHANGELOG_REC;
781
782         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
783         if (ctxt == NULL)
784                 return -ENXIO;
785
786         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
787         if (IS_ERR(llog_th))
788                 GOTO(out_put, rc = PTR_ERR(llog_th));
789
790         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
791
792 out_put:
793         llog_ctxt_put(ctxt);
794
795         return rc;
796 }
797
798 struct mdd_changelog_gc {
799         struct mdd_device *mcgc_mdd;
800         bool mcgc_found;
801         __u32 mcgc_maxtime;
802         __u64 mcgc_maxindexes;
803         __u32 mcgc_id;
804 };
805
806 /* return first registered ChangeLog user idle since too long
807  * use ChangeLog's user plain LLOG mtime for this */
808 static int mdd_changelog_gc_cb(const struct lu_env *env,
809                                struct llog_handle *llh,
810                                struct llog_rec_hdr *hdr, void *data)
811 {
812         struct llog_changelog_user_rec  *rec;
813         struct mdd_changelog_gc *mcgc = (struct mdd_changelog_gc *)data;
814         struct mdd_device *mdd = mcgc->mcgc_mdd;
815         ENTRY;
816
817         if ((llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) == 0)
818                 RETURN(-ENXIO);
819
820         rec = container_of(hdr, struct llog_changelog_user_rec,
821                            cur_hdr);
822
823         /* find oldest idle user, based on last record update/cancel time (new
824          * behavior), or for old user records, last record index vs current
825          * ChangeLog index. Late users with old record format will be treated
826          * first as we assume they could be idle since longer
827          */
828         if (rec->cur_time != 0) {
829                 __u32 time_now = (__u32)get_seconds();
830                 __u32 time_out = rec->cur_time +
831                                  mdd->mdd_changelog_max_idle_time;
832                 __u32 idle_time = time_now - rec->cur_time;
833
834                 /* treat oldest idle user first, and if no old format user
835                  * has been already selected
836                  */
837                 if (time_after32(time_now, time_out) &&
838                     idle_time > mcgc->mcgc_maxtime &&
839                     mcgc->mcgc_maxindexes == 0) {
840                         mcgc->mcgc_maxtime = idle_time;
841                         mcgc->mcgc_id = rec->cur_id;
842                         mcgc->mcgc_found = true;
843                 }
844         } else {
845                 /* old user record with no idle time stamp, so use empirical
846                  * method based on its current index/position
847                  */
848                 __u64 idle_indexes;
849
850                 idle_indexes = mdd->mdd_cl.mc_index - rec->cur_endrec;
851
852                 /* treat user with the oldest/smallest current index first */
853                 if (idle_indexes >= mdd->mdd_changelog_max_idle_indexes &&
854                     idle_indexes > mcgc->mcgc_maxindexes) {
855                         mcgc->mcgc_maxindexes = idle_indexes;
856                         mcgc->mcgc_id = rec->cur_id;
857                         mcgc->mcgc_found = true;
858                 }
859
860         }
861         RETURN(0);
862 }
863
864 /* recover space from long-term inactive ChangeLog users */
865 static int mdd_chlg_garbage_collect(void *data)
866 {
867         struct mdd_device *mdd = (struct mdd_device *)data;
868         struct lu_env             *env = NULL;
869         int                        rc;
870         struct llog_ctxt *ctxt;
871         struct mdd_changelog_gc mcgc = {
872                 .mcgc_mdd = mdd,
873                 .mcgc_found = false,
874                 .mcgc_maxtime = 0,
875                 .mcgc_maxindexes = 0,
876         };
877         ENTRY;
878
879         CDEBUG(D_HA, "%s: ChangeLog garbage collect thread start\n",
880                mdd2obd_dev(mdd)->obd_name);
881
882         OBD_ALLOC_PTR(env);
883         if (env == NULL)
884                 GOTO(out, rc = -ENOMEM);
885
886         rc = lu_env_init(env, LCT_MD_THREAD);
887         if (rc)
888                 GOTO(out, rc);
889
890         for (;;) {
891                 ctxt = llog_get_context(mdd2obd_dev(mdd),
892                                         LLOG_CHANGELOG_USER_ORIG_CTXT);
893                 if (ctxt == NULL ||
894                     (ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) == 0)
895                         GOTO(out_env, rc = -ENXIO);
896
897                 rc = llog_cat_process(env, ctxt->loc_handle,
898                                       mdd_changelog_gc_cb, &mcgc, 0, 0);
899                 if (rc != 0 || mcgc.mcgc_found == false)
900                         break;
901                 llog_ctxt_put(ctxt);
902
903                 CWARN("%s: Force deregister of ChangeLog user cl%d idle more "
904                       "than %us\n", mdd2obd_dev(mdd)->obd_name, mcgc.mcgc_id,
905                       mcgc.mcgc_maxtime);
906
907                 mdd_changelog_user_purge(env, mdd, mcgc.mcgc_id);
908
909                 /* try again to search for another candidate */
910                 mcgc.mcgc_found = false;
911                 mcgc.mcgc_maxtime = 0;
912                 mcgc.mcgc_maxindexes = 0;
913         }
914
915 out_env:
916         if (ctxt != NULL)
917                 llog_ctxt_put(ctxt);
918
919         lu_env_fini(env);
920         GOTO(out, rc);
921 out:
922         if (env)
923                 OBD_FREE_PTR(env);
924         mdd->mdd_cl.mc_gc_task = NULL;
925         return rc;
926 }
927
928 /** Add a changelog entry \a rec to the changelog llog
929  * \param mdd
930  * \param rec
931  * \param handle - currently ignored since llogs start their own transaction;
932  *              this will hopefully be fixed in llog rewrite
933  * \retval 0 ok
934  */
935 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
936                         struct llog_changelog_rec *rec, struct thandle *th)
937 {
938         struct obd_device       *obd = mdd2obd_dev(mdd);
939         struct llog_ctxt        *ctxt;
940         struct thandle          *llog_th;
941         int                      rc;
942         bool                     run_gc_task;
943
944         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
945                                             changelog_rec_varsize(&rec->cr));
946
947         /* llog_lvfs_write_rec sets the llog tail len */
948         rec->cr_hdr.lrh_type = CHANGELOG_REC;
949         rec->cr.cr_time = cl_time();
950
951         spin_lock(&mdd->mdd_cl.mc_lock);
952         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
953          * but as long as the MDD transactions are ordered correctly for e.g.
954          * rename conflicts, I don't think this should matter. */
955         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
956         spin_unlock(&mdd->mdd_cl.mc_lock);
957
958         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
959         if (ctxt == NULL)
960                 return -ENXIO;
961
962         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
963         if (IS_ERR(llog_th))
964                 GOTO(out_put, rc = PTR_ERR(llog_th));
965
966         /* nested journal transaction */
967         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
968
969         /* time to recover some space ?? */
970         spin_lock(&mdd->mdd_cl.mc_lock);
971         if (unlikely(mdd->mdd_changelog_gc && (ktime_get_real_seconds() -
972             mdd->mdd_cl.mc_gc_time > mdd->mdd_changelog_min_gc_interval) &&
973             mdd->mdd_cl.mc_gc_task == NULL &&
974             llog_cat_free_space(ctxt->loc_handle) <=
975                                 mdd->mdd_changelog_min_free_cat_entries)) {
976                 CWARN("%s: low on changelog_catalog free entries, starting "
977                       "ChangeLog garbage collection thread\n", obd->obd_name);
978
979                 /* indicate further kthread run will occur outside right after
980                  * critical section
981                  */
982                 mdd->mdd_cl.mc_gc_task = (struct task_struct *)(-1);
983                 run_gc_task = true;
984         }
985         spin_unlock(&mdd->mdd_cl.mc_lock);
986         if (run_gc_task) {
987                 struct task_struct *gc_task;
988
989                 gc_task = kthread_run(mdd_chlg_garbage_collect, mdd,
990                                       "chlg_gc_thread");
991                 if (IS_ERR(gc_task)) {
992                         CERROR("%s: cannot start ChangeLog garbage collection "
993                                "thread: rc = %ld\n", obd->obd_name,
994                                PTR_ERR(gc_task));
995                         mdd->mdd_cl.mc_gc_task = NULL;
996                 } else {
997                         CDEBUG(D_HA, "%s: ChangeLog garbage collection thread "
998                                "has started with Pid %d\n", obd->obd_name,
999                                gc_task->pid);
1000                         mdd->mdd_cl.mc_gc_task = gc_task;
1001                         mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
1002                 }
1003         }
1004 out_put:
1005         llog_ctxt_put(ctxt);
1006         if (rc > 0)
1007                 rc = 0;
1008         return rc;
1009 }
1010
1011 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
1012                                          const struct lu_fid *sfid,
1013                                          const struct lu_fid *spfid,
1014                                          const struct lu_name *sname)
1015 {
1016         struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
1017         size_t extsize = sname->ln_namelen + 1;
1018
1019         LASSERT(sfid != NULL);
1020         LASSERT(spfid != NULL);
1021         LASSERT(sname != NULL);
1022
1023         rnm->cr_sfid = *sfid;
1024         rnm->cr_spfid = *spfid;
1025
1026         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
1027         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
1028         rec->cr_namelen += extsize;
1029 }
1030
1031 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
1032 {
1033         struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
1034
1035         if (jobid == NULL || jobid[0] == '\0')
1036                 return;
1037
1038         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
1039 }
1040
1041 void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
1042 {
1043         struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
1044
1045         ef->cr_extra_flags = eflags;
1046 }
1047
1048 void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
1049                                     __u64 uid, __u64 gid)
1050 {
1051         struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
1052
1053         uidgid->cr_uid = uid;
1054         uidgid->cr_gid = gid;
1055 }
1056
1057 /** Store a namespace change changelog record
1058  * If this fails, we must fail the whole transaction; we don't
1059  * want the change to commit without the log entry.
1060  * \param target - mdd_object of change
1061  * \param tpfid - target parent dir/object fid
1062  * \param sfid - source object fid
1063  * \param spfid - source parent fid
1064  * \param tname - target name string
1065  * \param sname - source name string
1066  * \param handle - transaction handle
1067  */
1068 int mdd_changelog_ns_store(const struct lu_env *env,
1069                            struct mdd_device *mdd,
1070                            enum changelog_rec_type type,
1071                            enum changelog_rec_flags crf,
1072                            struct mdd_object *target,
1073                            const struct lu_fid *tpfid,
1074                            const struct lu_fid *sfid,
1075                            const struct lu_fid *spfid,
1076                            const struct lu_name *tname,
1077                            const struct lu_name *sname,
1078                            struct thandle *handle)
1079 {
1080         const struct lu_ucred           *uc = lu_ucred(env);
1081         struct llog_changelog_rec       *rec;
1082         struct lu_buf                   *buf;
1083         int                              reclen;
1084         __u64                            xflags = CLFE_INVALID;
1085         int                              rc;
1086         ENTRY;
1087
1088         /* Not recording */
1089         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1090                 RETURN(0);
1091
1092         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1093                 RETURN(0);
1094
1095         LASSERT(tpfid != NULL);
1096         LASSERT(tname != NULL);
1097         LASSERT(handle != NULL);
1098
1099         reclen = mdd_llog_record_calc_size(env, tname, sname);
1100         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
1101         if (buf->lb_buf == NULL)
1102                 RETURN(-ENOMEM);
1103         rec = buf->lb_buf;
1104
1105         crf &= CLF_FLAGMASK;
1106         crf |= CLF_EXTRA_FLAGS;
1107
1108         if (uc) {
1109                 if (uc->uc_jobid[0] != '\0')
1110                         crf |= CLF_JOBID;
1111                 xflags |= CLFE_UIDGID;
1112         }
1113
1114         if (sname != NULL)
1115                 crf |= CLF_RENAME;
1116         else
1117                 crf |= CLF_VERSION;
1118
1119         rec->cr.cr_flags = crf;
1120
1121         if (crf & CLF_EXTRA_FLAGS) {
1122                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
1123                 if (xflags & CLFE_UIDGID)
1124                         mdd_changelog_rec_extra_uidgid(&rec->cr,
1125                                                        uc->uc_uid, uc->uc_gid);
1126         }
1127
1128         rec->cr.cr_type = (__u32)type;
1129         rec->cr.cr_pfid = *tpfid;
1130         rec->cr.cr_namelen = tname->ln_namelen;
1131         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
1132
1133         if (crf & CLF_RENAME)
1134                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
1135
1136         if (crf & CLF_JOBID)
1137                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
1138
1139         if (likely(target != NULL)) {
1140                 rec->cr.cr_tfid = *mdo2fid(target);
1141                 target->mod_cltime = ktime_get();
1142         } else {
1143                 fid_zero(&rec->cr.cr_tfid);
1144         }
1145
1146         rc = mdd_changelog_store(env, mdd, rec, handle);
1147         if (rc < 0) {
1148                 CERROR("%s: cannot store changelog record: type = %d, "
1149                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
1150                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
1151                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
1152                 return -EFAULT;
1153         }
1154
1155         return 0;
1156 }
1157
1158 static int __mdd_links_add(const struct lu_env *env,
1159                            struct mdd_object *mdd_obj,
1160                            struct linkea_data *ldata,
1161                            const struct lu_name *lname,
1162                            const struct lu_fid *pfid,
1163                            int first, int check)
1164 {
1165         int rc;
1166
1167         if (ldata->ld_leh == NULL) {
1168                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
1169                 if (rc) {
1170                         if (rc != -ENODATA)
1171                                 return rc;
1172                         rc = linkea_data_new(ldata,
1173                                              &mdd_env_info(env)->mti_link_buf);
1174                         if (rc)
1175                                 return rc;
1176                 }
1177         }
1178
1179         if (check) {
1180                 rc = linkea_links_find(ldata, lname, pfid);
1181                 if (rc && rc != -ENOENT)
1182                         return rc;
1183                 if (rc == 0)
1184                         return -EEXIST;
1185         }
1186
1187         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
1188                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1189
1190                 *tfid = *pfid;
1191                 tfid->f_ver = ~0;
1192                 linkea_add_buf(ldata, lname, tfid);
1193         }
1194
1195         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
1196                 linkea_add_buf(ldata, lname, pfid);
1197
1198         return linkea_add_buf(ldata, lname, pfid);
1199 }
1200
1201 static int __mdd_links_del(const struct lu_env *env,
1202                            struct mdd_object *mdd_obj,
1203                            struct linkea_data *ldata,
1204                            const struct lu_name *lname,
1205                            const struct lu_fid *pfid)
1206 {
1207         int rc;
1208
1209         if (ldata->ld_leh == NULL) {
1210                 rc = mdd_links_read(env, mdd_obj, ldata);
1211                 if (rc)
1212                         return rc;
1213         }
1214
1215         rc = linkea_links_find(ldata, lname, pfid);
1216         if (rc)
1217                 return rc;
1218
1219         linkea_del_buf(ldata, lname);
1220         return 0;
1221 }
1222
1223 static int mdd_linkea_prepare(const struct lu_env *env,
1224                               struct mdd_object *mdd_obj,
1225                               const struct lu_fid *oldpfid,
1226                               const struct lu_name *oldlname,
1227                               const struct lu_fid *newpfid,
1228                               const struct lu_name *newlname,
1229                               int first, int check,
1230                               struct linkea_data *ldata)
1231 {
1232         int rc = 0;
1233         ENTRY;
1234
1235         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1236                 RETURN(0);
1237
1238         LASSERT(oldpfid != NULL || newpfid != NULL);
1239
1240         if (mdd_obj->mod_flags & DEAD_OBJ)
1241                 /* Unnecessary to update linkEA for dead object.  */
1242                 RETURN(0);
1243
1244         if (oldpfid != NULL) {
1245                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
1246                 if (rc) {
1247                         if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
1248                                 RETURN(rc);
1249
1250                         /* No changes done. */
1251                         rc = 0;
1252                 }
1253         }
1254
1255         /* If renaming, add the new record */
1256         if (newpfid != NULL)
1257                 rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1258                                      first, check);
1259
1260         RETURN(rc);
1261 }
1262
1263 int mdd_links_rename(const struct lu_env *env,
1264                      struct mdd_object *mdd_obj,
1265                      const struct lu_fid *oldpfid,
1266                      const struct lu_name *oldlname,
1267                      const struct lu_fid *newpfid,
1268                      const struct lu_name *newlname,
1269                      struct thandle *handle,
1270                      struct linkea_data *ldata,
1271                      int first, int check)
1272 {
1273         int rc = 0;
1274         ENTRY;
1275
1276         if (ldata == NULL) {
1277                 ldata = &mdd_env_info(env)->mti_link_data;
1278                 memset(ldata, 0, sizeof(*ldata));
1279                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1280                                         newpfid, newlname, first, check, ldata);
1281                 if (rc)
1282                         GOTO(out, rc);
1283         }
1284
1285         if (!(mdd_obj->mod_flags & DEAD_OBJ))
1286                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1287
1288         GOTO(out, rc);
1289
1290 out:
1291         if (rc != 0) {
1292                 if (newlname == NULL)
1293                         CERROR("link_ea add failed %d "DFID"\n",
1294                                rc, PFID(mdd_object_fid(mdd_obj)));
1295                 else if (oldpfid == NULL)
1296                         CERROR("link_ea add '%.*s' failed %d "DFID"\n",
1297                                newlname->ln_namelen, newlname->ln_name, rc,
1298                                PFID(mdd_object_fid(mdd_obj)));
1299                 else if (newpfid == NULL)
1300                         CERROR("link_ea del '%.*s' failed %d "DFID"\n",
1301                                oldlname->ln_namelen, oldlname->ln_name, rc,
1302                                PFID(mdd_object_fid(mdd_obj)));
1303                 else
1304                         CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
1305                                "\n", oldlname->ln_namelen, oldlname->ln_name,
1306                                newlname->ln_namelen, newlname->ln_name, rc,
1307                                PFID(mdd_object_fid(mdd_obj)));
1308         }
1309
1310         if (is_vmalloc_addr(ldata->ld_buf))
1311                 /* if we vmalloced a large buffer drop it */
1312                 lu_buf_free(ldata->ld_buf);
1313
1314         return rc;
1315 }
1316
1317 static inline int mdd_links_add(const struct lu_env *env,
1318                                 struct mdd_object *mdd_obj,
1319                                 const struct lu_fid *pfid,
1320                                 const struct lu_name *lname,
1321                                 struct thandle *handle,
1322                                 struct linkea_data *ldata, int first)
1323 {
1324         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1325                                 pfid, lname, handle, ldata, first, 0);
1326 }
1327
1328 static inline int mdd_links_del(const struct lu_env *env,
1329                                 struct mdd_object *mdd_obj,
1330                                 const struct lu_fid *pfid,
1331                                 const struct lu_name *lname,
1332                                 struct thandle *handle)
1333 {
1334         return mdd_links_rename(env, mdd_obj, pfid, lname,
1335                                 NULL, NULL, handle, NULL, 0, 0);
1336 }
1337
1338 /** Read the link EA into a temp buffer.
1339  * Uses the name_buf since it is generally large.
1340  * \retval IS_ERR err
1341  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1342  */
1343 struct lu_buf *mdd_links_get(const struct lu_env *env,
1344                              struct mdd_object *mdd_obj)
1345 {
1346         struct linkea_data ldata = { NULL };
1347         int rc;
1348
1349         rc = mdd_links_read(env, mdd_obj, &ldata);
1350         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1351 }
1352
1353 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1354                     struct linkea_data *ldata, struct thandle *handle)
1355 {
1356         const struct lu_buf *buf;
1357         int                 rc;
1358
1359         if (ldata == NULL || ldata->ld_buf == NULL ||
1360             ldata->ld_leh == NULL)
1361                 return 0;
1362
1363         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1364                 return 0;
1365
1366 again:
1367         buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1368                                 ldata->ld_leh->leh_len);
1369         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1370         if (unlikely(rc == -ENOSPC)) {
1371                 rc = linkea_overflow_shrink(ldata);
1372                 if (likely(rc > 0))
1373                         goto again;
1374         }
1375
1376         return rc;
1377 }
1378
1379 static int mdd_declare_links_add(const struct lu_env *env,
1380                                  struct mdd_object *mdd_obj,
1381                                  struct thandle *handle,
1382                                  struct linkea_data *ldata)
1383 {
1384         int     rc;
1385         int     ea_len;
1386         void    *linkea;
1387
1388         if (ldata != NULL && ldata->ld_leh != NULL) {
1389                 ea_len = ldata->ld_leh->leh_len;
1390                 linkea = ldata->ld_buf->lb_buf;
1391         } else {
1392                 ea_len = MAX_LINKEA_SIZE;
1393                 linkea = NULL;
1394         }
1395
1396         rc = mdo_declare_xattr_set(env, mdd_obj,
1397                                    mdd_buf_get_const(env, linkea, ea_len),
1398                                    XATTR_NAME_LINK, 0, handle);
1399
1400         return rc;
1401 }
1402
1403 static inline int mdd_declare_links_del(const struct lu_env *env,
1404                                         struct mdd_object *c,
1405                                         struct thandle *handle)
1406 {
1407         int rc = 0;
1408
1409         /* For directory, the linkEA will be removed together
1410          * with the object. */
1411         if (!S_ISDIR(mdd_object_type(c)))
1412                 rc = mdd_declare_links_add(env, c, handle, NULL);
1413
1414         return rc;
1415 }
1416
1417 static int mdd_declare_link(const struct lu_env *env,
1418                             struct mdd_device *mdd,
1419                             struct mdd_object *p,
1420                             struct mdd_object *c,
1421                             const struct lu_name *name,
1422                             struct thandle *handle,
1423                             struct lu_attr *la,
1424                             struct linkea_data *data)
1425 {
1426         struct lu_fid tfid = *mdo2fid(c);
1427         int rc;
1428
1429         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1430                 tfid.f_oid = cfs_fail_val;
1431
1432         rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
1433                                       name->ln_name, handle);
1434         if (rc != 0)
1435                 return rc;
1436
1437         rc = mdo_declare_ref_add(env, c, handle);
1438         if (rc != 0)
1439                 return rc;
1440
1441         la->la_valid = LA_CTIME | LA_MTIME;
1442         rc = mdo_declare_attr_set(env, p, la, handle);
1443         if (rc != 0)
1444                 return rc;
1445
1446         la->la_valid = LA_CTIME;
1447         rc = mdo_declare_attr_set(env, c, la, handle);
1448         if (rc != 0)
1449                 return rc;
1450
1451         rc = mdd_declare_links_add(env, c, handle, data);
1452         if (rc != 0)
1453                 return rc;
1454
1455         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1456
1457         return rc;
1458 }
1459
1460 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1461                     struct md_object *src_obj, const struct lu_name *lname,
1462                     struct md_attr *ma)
1463 {
1464         const char *name = lname->ln_name;
1465         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1466         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1467         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1468         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1469         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1470         struct mdd_device *mdd = mdo2mdd(src_obj);
1471         struct thandle *handle;
1472         struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
1473         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1474         int rc;
1475         ENTRY;
1476
1477         rc = mdd_la_get(env, mdd_sobj, cattr);
1478         if (rc != 0)
1479                 RETURN(rc);
1480
1481         rc = mdd_la_get(env, mdd_tobj, tattr);
1482         if (rc != 0)
1483                 RETURN(rc);
1484
1485         /*
1486          * If we are using project inheritance, we only allow hard link
1487          * creation in our tree when the project IDs are the same;
1488          * otherwise the tree quota mechanism could be circumvented.
1489          */
1490         if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
1491             (tattr->la_projid != cattr->la_projid))
1492                 RETURN(-EXDEV);
1493
1494         handle = mdd_trans_create(env, mdd);
1495         if (IS_ERR(handle))
1496                 GOTO(out_pending, rc = PTR_ERR(handle));
1497
1498         memset(ldata, 0, sizeof(*ldata));
1499
1500         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1501         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1502
1503         /* Note: even this function will change ldata, but it comes from
1504          * thread_info, which is completely temporary and only seen in
1505          * this function, so we do not need reset ldata once it fails.*/
1506         rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
1507                                 lname, 0, 0, ldata);
1508         if (rc != 0)
1509                 GOTO(stop, rc);
1510
1511         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1512                               la, ldata);
1513         if (rc)
1514                 GOTO(stop, rc);
1515
1516         rc = mdd_trans_start(env, mdd, handle);
1517         if (rc)
1518                 GOTO(stop, rc);
1519
1520         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1521         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1522                                    cattr);
1523         if (rc)
1524                 GOTO(out_unlock, rc);
1525
1526         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1527                 rc = mdo_ref_add(env, mdd_sobj, handle);
1528                 if (rc != 0)
1529                         GOTO(out_unlock, rc);
1530         }
1531
1532         *tfid = *mdo2fid(mdd_sobj);
1533         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
1534                 tfid->f_oid = cfs_fail_val;
1535
1536         rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
1537                                      mdd_object_type(mdd_sobj), name, handle);
1538         if (rc != 0) {
1539                 mdo_ref_del(env, mdd_sobj, handle);
1540                 GOTO(out_unlock, rc);
1541         }
1542
1543         la->la_valid = LA_CTIME | LA_MTIME;
1544         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1545         if (rc)
1546                 GOTO(out_unlock, rc);
1547
1548         la->la_valid = LA_CTIME;
1549         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1550         if (rc == 0)
1551                 /* Note: The failure of links_add should not cause the
1552                  * link failure, so do not check return value. */
1553                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1554                               lname, handle, ldata, 0);
1555
1556         EXIT;
1557 out_unlock:
1558         mdd_write_unlock(env, mdd_sobj);
1559         if (rc == 0)
1560                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1561                                             mdo2fid(mdd_tobj), NULL, NULL,
1562                                             lname, NULL, handle);
1563 stop:
1564         rc = mdd_trans_stop(env, mdd, rc, handle);
1565         if (is_vmalloc_addr(ldata->ld_buf))
1566                 /* if we vmalloced a large buffer drop it */
1567                 lu_buf_free(ldata->ld_buf);
1568 out_pending:
1569         return rc;
1570 }
1571
1572 static int mdd_mark_orphan_object(const struct lu_env *env,
1573                                 struct mdd_object *obj, struct thandle *handle,
1574                                 bool declare)
1575 {
1576         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1577         int rc;
1578
1579         if (!S_ISDIR(mdd_object_type(obj)))
1580                 return 0;
1581
1582         attr->la_valid = LA_FLAGS;
1583         attr->la_flags = LUSTRE_ORPHAN_FL;
1584
1585         if (declare)
1586                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1587         else
1588                 rc = mdo_attr_set(env, obj, attr, handle);
1589
1590         return rc;
1591 }
1592
1593 static int mdd_declare_finish_unlink(const struct lu_env *env,
1594                                      struct mdd_object *obj,
1595                                      struct thandle *handle)
1596 {
1597         int     rc;
1598
1599         /* Sigh, we do not know if the unlink object will become orphan in
1600          * declare phase, but fortunately the flags here does not matter
1601          * in current declare implementation */
1602         rc = mdd_mark_orphan_object(env, obj, handle, true);
1603         if (rc != 0)
1604                 return rc;
1605
1606         rc = mdo_declare_destroy(env, obj, handle);
1607         if (rc != 0)
1608                 return rc;
1609
1610         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1611         if (rc != 0)
1612                 return rc;
1613
1614         return mdd_declare_links_del(env, obj, handle);
1615 }
1616
1617 /* caller should take a lock before calling */
1618 int mdd_finish_unlink(const struct lu_env *env,
1619                       struct mdd_object *obj, struct md_attr *ma,
1620                       const struct mdd_object *pobj,
1621                       const struct lu_name *lname,
1622                       struct thandle *th)
1623 {
1624         int rc = 0;
1625         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1626         ENTRY;
1627
1628         LASSERT(mdd_write_locked(env, obj) != 0);
1629
1630         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1631                 /* add new orphan and the object
1632                  * will be deleted during mdd_close() */
1633                 obj->mod_flags |= DEAD_OBJ;
1634                 if (obj->mod_count) {
1635                         rc = __mdd_orphan_add(env, obj, th);
1636                         if (rc == 0)
1637                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1638                                         "orphan list, open count = %d\n",
1639                                         PFID(mdd_object_fid(obj)),
1640                                         obj->mod_count);
1641                         else
1642                                 CERROR("Object "DFID" fail to be an orphan, "
1643                                        "open count = %d, maybe cause failed "
1644                                        "open replay\n",
1645                                         PFID(mdd_object_fid(obj)),
1646                                         obj->mod_count);
1647
1648                         /* mark object as an orphan here, not
1649                          * before __mdd_orphan_add() as racing
1650                          * mdd_la_get() may propagate ORPHAN_OBJ
1651                          * causing the asserition */
1652                         rc = mdd_mark_orphan_object(env, obj, th, false);
1653                 } else {
1654                         rc = mdo_destroy(env, obj, th);
1655                 }
1656         } else if (!is_dir) {
1657                 /* old files may not have link ea; ignore errors */
1658                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1659         }
1660
1661         RETURN(rc);
1662 }
1663
1664 /*
1665  * pobj maybe NULL
1666  * has mdd_write_lock on cobj already, but not on pobj yet
1667  */
1668 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1669                             const struct lu_attr *pattr,
1670                             struct mdd_object *cobj,
1671                             const struct lu_attr *cattr)
1672 {
1673         int rc;
1674         ENTRY;
1675
1676         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1677
1678         RETURN(rc);
1679 }
1680
1681 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1682                               struct mdd_object *p, struct mdd_object *c,
1683                               const struct lu_name *name, struct md_attr *ma,
1684                               struct thandle *handle, int no_name, int is_dir)
1685 {
1686         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1687         int              rc;
1688
1689         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1690                 if (likely(no_name == 0)) {
1691                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1692                                                       handle);
1693                         if (rc != 0)
1694                                 return rc;
1695                 }
1696
1697                 if (is_dir != 0) {
1698                         rc = mdo_declare_ref_del(env, p, handle);
1699                         if (rc != 0)
1700                                 return rc;
1701                 }
1702         }
1703
1704         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1705         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1706         la->la_valid = LA_CTIME | LA_MTIME;
1707         rc = mdo_declare_attr_set(env, p, la, handle);
1708         if (rc)
1709                 return rc;
1710
1711         if (c != NULL) {
1712                 rc = mdo_declare_ref_del(env, c, handle);
1713                 if (rc)
1714                         return rc;
1715
1716                 rc = mdo_declare_ref_del(env, c, handle);
1717                 if (rc)
1718                         return rc;
1719
1720                 la->la_valid = LA_CTIME;
1721                 rc = mdo_declare_attr_set(env, c, la, handle);
1722                 if (rc)
1723                         return rc;
1724
1725                 rc = mdd_declare_finish_unlink(env, c, handle);
1726                 if (rc)
1727                         return rc;
1728
1729                 /* FIXME: need changelog for remove entry */
1730                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1731         }
1732
1733         return rc;
1734 }
1735
1736 /*
1737  * test if a file has an HSM archive
1738  * if HSM attributes are not found in ma update them from
1739  * HSM xattr
1740  */
1741 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1742                                    struct mdd_object *obj,
1743                                    struct md_attr *ma)
1744 {
1745         ENTRY;
1746
1747         if (!(ma->ma_valid & MA_HSM)) {
1748                 /* no HSM MD provided, read xattr */
1749                 struct lu_buf   *hsm_buf;
1750                 const size_t     buflen = sizeof(struct hsm_attrs);
1751                 int              rc;
1752
1753                 hsm_buf = mdd_buf_get(env, NULL, 0);
1754                 lu_buf_alloc(hsm_buf, buflen);
1755                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1756                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1757                 lu_buf_free(hsm_buf);
1758                 if (rc < 0)
1759                         RETURN(false);
1760
1761                 ma->ma_valid |= MA_HSM;
1762         }
1763         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1764                 RETURN(true);
1765         RETURN(false);
1766 }
1767
1768 /**
1769  * Delete name entry and the object.
1770  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1771  * does not exist for this object, and it could only happen during resending
1772  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1773  * is also needed in this case(needed by changelog), so we have to add another
1774  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1775  * the ENOENT failure should be able to be fixed by redo mechanism.
1776  */
1777 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1778                       struct md_object *cobj, const struct lu_name *lname,
1779                       struct md_attr *ma, int no_name)
1780 {
1781         const char *name = lname->ln_name;
1782         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1783         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1784         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1785         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1786         struct mdd_object *mdd_cobj = NULL;
1787         struct mdd_device *mdd = mdo2mdd(pobj);
1788         struct thandle    *handle;
1789         int rc, is_dir = 0, cl_flags = 0;
1790         ENTRY;
1791
1792         /* cobj == NULL means only delete name entry */
1793         if (likely(cobj != NULL)) {
1794                 mdd_cobj = md2mdd_obj(cobj);
1795                 if (mdd_object_exists(mdd_cobj) == 0)
1796                         RETURN(-ENOENT);
1797         }
1798
1799         rc = mdd_la_get(env, mdd_pobj, pattr);
1800         if (rc)
1801                 RETURN(rc);
1802
1803         if (likely(mdd_cobj != NULL)) {
1804                 /* fetch cattr */
1805                 rc = mdd_la_get(env, mdd_cobj, cattr);
1806                 if (rc)
1807                         RETURN(rc);
1808
1809                 is_dir = S_ISDIR(cattr->la_mode);
1810                 /* search for an existing archive.
1811                  * we should check ahead as the object
1812                  * can be destroyed in this transaction */
1813                 if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1814                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1815         }
1816
1817         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1818         if (rc)
1819                 RETURN(rc);
1820
1821         handle = mdd_trans_create(env, mdd);
1822         if (IS_ERR(handle))
1823                 RETURN(PTR_ERR(handle));
1824
1825         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1826                                 lname, ma, handle, no_name, is_dir);
1827         if (rc)
1828                 GOTO(stop, rc);
1829
1830         rc = mdd_trans_start(env, mdd, handle);
1831         if (rc)
1832                 GOTO(stop, rc);
1833
1834         if (likely(mdd_cobj != NULL))
1835                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1836
1837         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1838                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1839                 if (rc)
1840                         GOTO(cleanup, rc);
1841         }
1842
1843         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1844             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1845                 GOTO(cleanup, rc = 0);
1846
1847         if (likely(mdd_cobj != NULL)) {
1848                 rc = mdo_ref_del(env, mdd_cobj, handle);
1849                 if (rc != 0) {
1850                         __mdd_index_insert_only(env, mdd_pobj,
1851                                                 mdo2fid(mdd_cobj),
1852                                                 mdd_object_type(mdd_cobj),
1853                                                 name, handle);
1854                         GOTO(cleanup, rc);
1855                 }
1856
1857                 if (is_dir)
1858                         /* unlink dot */
1859                         mdo_ref_del(env, mdd_cobj, handle);
1860
1861                 /* fetch updated nlink */
1862                 rc = mdd_la_get(env, mdd_cobj, cattr);
1863                 if (rc)
1864                         GOTO(cleanup, rc);
1865         }
1866
1867         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1868         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1869
1870         la->la_valid = LA_CTIME | LA_MTIME;
1871         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1872         if (rc)
1873                 GOTO(cleanup, rc);
1874
1875         /* Enough for only unlink the entry */
1876         if (unlikely(mdd_cobj == NULL))
1877                 GOTO(stop, rc);
1878
1879         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1880                 /* update ctime of an unlinked file only if it is still
1881                  * opened or a link still exists */
1882                 la->la_valid = LA_CTIME;
1883                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1884                 if (rc)
1885                         GOTO(cleanup, rc);
1886         }
1887
1888         /* XXX: this transfer to ma will be removed with LOD/OSP */
1889         ma->ma_attr = *cattr;
1890         ma->ma_valid |= MA_INODE;
1891         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1892         if (rc != 0)
1893                 GOTO(cleanup, rc);
1894
1895         /* fetch updated nlink */
1896         rc = mdd_la_get(env, mdd_cobj, cattr);
1897         /* if object is removed then we can't get its attrs,
1898          * use last get */
1899         if (rc == -ENOENT) {
1900                 cattr->la_nlink = 0;
1901                 rc = 0;
1902         }
1903
1904         if (cattr->la_nlink == 0) {
1905                 ma->ma_attr = *cattr;
1906                 ma->ma_valid |= MA_INODE;
1907         }
1908
1909         EXIT;
1910 cleanup:
1911         if (likely(mdd_cobj != NULL))
1912                 mdd_write_unlock(env, mdd_cobj);
1913
1914         if (rc == 0) {
1915                 if (cattr->la_nlink == 0)
1916                         cl_flags |= CLF_UNLINK_LAST;
1917                 else
1918                         cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
1919
1920                 rc = mdd_changelog_ns_store(env, mdd,
1921                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1922                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1923                         handle);
1924         }
1925
1926 stop:
1927         rc = mdd_trans_stop(env, mdd, rc, handle);
1928
1929         return rc;
1930 }
1931
1932 /*
1933  * The permission has been checked when obj created, no need check again.
1934  */
1935 static int mdd_cd_sanity_check(const struct lu_env *env,
1936                                struct mdd_object *obj)
1937 {
1938         ENTRY;
1939
1940         /* EEXIST check */
1941         if (!obj || mdd_is_dead_obj(obj))
1942                 RETURN(-ENOENT);
1943
1944         RETURN(0);
1945 }
1946
1947 static int mdd_create_data(const struct lu_env *env,
1948                            struct md_object *pobj,
1949                            struct md_object *cobj,
1950                            const struct md_op_spec *spec,
1951                            struct md_attr *ma)
1952 {
1953         struct mdd_device *mdd = mdo2mdd(cobj);
1954         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1955         struct mdd_object *son = md2mdd_obj(cobj);
1956         struct thandle    *handle;
1957         const struct lu_buf *buf;
1958         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1959         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1960         int                rc;
1961         ENTRY;
1962
1963         rc = mdd_cd_sanity_check(env, son);
1964         if (rc)
1965                 RETURN(rc);
1966
1967         if (!md_should_create(spec->sp_cr_flags))
1968                 RETURN(0);
1969
1970         /*
1971          * there are following use cases for this function:
1972          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1973          *    striping can be specified or not
1974          * 2) CMD?
1975          */
1976         rc = mdd_la_get(env, son, attr);
1977         if (rc)
1978                 RETURN(rc);
1979
1980         /* calling ->ah_make_hint() is used to transfer information from parent */
1981         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1982
1983         handle = mdd_trans_create(env, mdd);
1984         if (IS_ERR(handle))
1985                 GOTO(out_free, rc = PTR_ERR(handle));
1986
1987         /*
1988          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1989          * Should this be fixed?
1990          */
1991         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
1992                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1993                spec->sp_cr_flags, spec->no_create);
1994
1995         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1996                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1997                                         spec->u.sp_ea.eadatalen);
1998         } else {
1999                 buf = &LU_BUF_NULL;
2000         }
2001
2002         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
2003                                   XATTR_NAME_LOV, 0, handle);
2004         if (rc)
2005                 GOTO(stop, rc);
2006
2007         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
2008         if (rc)
2009                 GOTO(stop, rc);
2010
2011         rc = mdd_trans_start(env, mdd, handle);
2012         if (rc)
2013                 GOTO(stop, rc);
2014
2015         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
2016                           0, handle);
2017
2018         if (rc)
2019                 GOTO(stop, rc);
2020
2021         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
2022
2023 stop:
2024         rc = mdd_trans_stop(env, mdd, rc, handle);
2025
2026 out_free:
2027         RETURN(rc);
2028 }
2029
2030 static int mdd_declare_object_initialize(const struct lu_env *env,
2031                                          struct mdd_object *parent,
2032                                          struct mdd_object *child,
2033                                          const struct lu_attr *attr,
2034                                          struct thandle *handle)
2035 {
2036         int rc;
2037         ENTRY;
2038
2039         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
2040         if (!S_ISDIR(attr->la_mode))
2041                 RETURN(0);
2042
2043         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
2044                                       dot, handle);
2045         if (rc != 0)
2046                 RETURN(rc);
2047
2048         rc = mdo_declare_ref_add(env, child, handle);
2049         if (rc != 0)
2050                 RETURN(rc);
2051
2052         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
2053                                       dotdot, handle);
2054
2055         RETURN(rc);
2056 }
2057
2058 static int mdd_object_initialize(const struct lu_env *env,
2059                                  const struct lu_fid *pfid,
2060                                  struct mdd_object *child,
2061                                  struct lu_attr *attr, struct thandle *handle,
2062                                  const struct md_op_spec *spec)
2063 {
2064         int rc = 0;
2065         ENTRY;
2066
2067         if (S_ISDIR(attr->la_mode)) {
2068                 /* Add "." and ".." for newly created dir */
2069                 mdo_ref_add(env, child, handle);
2070                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2071                                              S_IFDIR, dot, handle);
2072                 if (rc == 0)
2073                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
2074                                                      dotdot, handle);
2075                 if (rc != 0)
2076                         mdo_ref_del(env, child, handle);
2077         }
2078
2079         RETURN(rc);
2080 }
2081
2082 /**
2083  * This function checks whether it can create a file/dir under the
2084  * directory(@pobj). The directory(@pobj) is not being locked by
2085  * mdd lock.
2086  *
2087  * \param[in] env       execution environment
2088  * \param[in] pobj      the directory to create files
2089  * \param[in] pattr     the attributes of the directory
2090  * \param[in] lname     the name of the created file/dir
2091  * \param[in] cattr     the attributes of the file/dir
2092  * \param[in] spec      create specification
2093  *
2094  * \retval              = 0 it is allowed to create file/dir under
2095  *                      the directory
2096  * \retval              negative error not allowed to create file/dir
2097  *                      under the directory
2098  */
2099 static int mdd_create_sanity_check(const struct lu_env *env,
2100                                    struct md_object *pobj,
2101                                    const struct lu_attr *pattr,
2102                                    const struct lu_name *lname,
2103                                    struct lu_attr *cattr,
2104                                    struct md_op_spec *spec)
2105 {
2106         struct mdd_thread_info *info = mdd_env_info(env);
2107         struct lu_fid     *fid       = &info->mti_fid;
2108         struct mdd_object *obj       = md2mdd_obj(pobj);
2109         struct mdd_device *m         = mdo2mdd(pobj);
2110         bool            check_perm = true;
2111         int rc;
2112         ENTRY;
2113
2114         /* EEXIST check */
2115         if (mdd_is_dead_obj(obj))
2116                 RETURN(-ENOENT);
2117
2118         /*
2119          * In some cases this lookup is not needed - we know before if name
2120          * exists or not because MDT performs lookup for it.
2121          * name length check is done in lookup.
2122          */
2123         if (spec->sp_cr_lookup) {
2124                 /*
2125                  * Check if the name already exist, though it will be checked in
2126                  * _index_insert also, for avoiding rolling back if exists
2127                  * _index_insert.
2128                  */
2129                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
2130                                   MAY_WRITE | MAY_EXEC);
2131                 if (rc != -ENOENT)
2132                         RETURN(rc ? : -EEXIST);
2133
2134                 /* Permission is already being checked in mdd_lookup */
2135                 check_perm = false;
2136         }
2137
2138         if (S_ISDIR(cattr->la_mode) &&
2139             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
2140             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
2141                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
2142
2143                 if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
2144                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
2145                     le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
2146                         rc = -EINVAL;
2147                         CERROR("%s: invalid lmv_user_md: magic = %x, "
2148                                "stripe_offset = %d, stripe_count = %u: "
2149                                "rc = %d\n", mdd2obd_dev(m)->obd_name,
2150                                 le32_to_cpu(lum->lum_magic),
2151                                (int)le32_to_cpu(lum->lum_stripe_offset),
2152                                le32_to_cpu(lum->lum_stripe_count), rc);
2153                         return rc;
2154                 }
2155         }
2156
2157         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
2158         if (rc != 0)
2159                 RETURN(rc);
2160
2161         /* sgid check */
2162         if (pattr->la_mode & S_ISGID) {
2163                 cattr->la_gid = pattr->la_gid;
2164                 if (S_ISDIR(cattr->la_mode)) {
2165                         cattr->la_mode |= S_ISGID;
2166                         cattr->la_valid |= LA_MODE;
2167                 }
2168         }
2169
2170         /* Inherit project ID from parent directory */
2171         if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
2172                 cattr->la_projid = pattr->la_projid;
2173                 if (S_ISDIR(cattr->la_mode)) {
2174                         cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
2175                         cattr->la_valid |= LA_FLAGS;
2176                 }
2177                 cattr->la_valid |= LA_PROJID;
2178         }
2179
2180         rc = mdd_name_check(m, lname);
2181         if (rc < 0)
2182                 RETURN(rc);
2183
2184         switch (cattr->la_mode & S_IFMT) {
2185         case S_IFLNK: {
2186                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
2187
2188                 if (symlen > m->mdd_dt_conf.ddp_symlink_max)
2189                         RETURN(-ENAMETOOLONG);
2190                 else
2191                         RETURN(0);
2192         }
2193         case S_IFDIR:
2194         case S_IFREG:
2195         case S_IFCHR:
2196         case S_IFBLK:
2197         case S_IFIFO:
2198         case S_IFSOCK:
2199                 rc = 0;
2200                 break;
2201         default:
2202                 rc = -EINVAL;
2203                 break;
2204         }
2205         RETURN(rc);
2206 }
2207
2208 static int mdd_declare_create_object(const struct lu_env *env,
2209                                      struct mdd_device *mdd,
2210                                      struct mdd_object *p, struct mdd_object *c,
2211                                      struct lu_attr *attr,
2212                                      struct thandle *handle,
2213                                      const struct md_op_spec *spec,
2214                                      struct lu_buf *def_acl_buf,
2215                                      struct lu_buf *acl_buf,
2216                                      struct dt_allocation_hint *hint)
2217 {
2218         const struct lu_buf *buf;
2219         int rc;
2220
2221         rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
2222                                                 hint);
2223         if (rc)
2224                 GOTO(out, rc);
2225
2226 #ifdef CONFIG_FS_POSIX_ACL
2227         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2228                 /* if dir, then can inherit default ACl */
2229                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2230                                            XATTR_NAME_ACL_DEFAULT,
2231                                            0, handle);
2232                 if (rc)
2233                         GOTO(out, rc);
2234         }
2235
2236         if (acl_buf->lb_len > 0) {
2237                 rc = mdo_declare_attr_set(env, c, attr, handle);
2238                 if (rc)
2239                         GOTO(out, rc);
2240
2241                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2242                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2243                 if (rc)
2244                         GOTO(out, rc);
2245         }
2246 #endif
2247         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2248         if (rc)
2249                 GOTO(out, rc);
2250
2251         /* replay case, create LOV EA from client data */
2252         if (spec->no_create ||
2253             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2254                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2255                                         spec->u.sp_ea.eadatalen);
2256                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2257                                            handle);
2258                 if (rc)
2259                         GOTO(out, rc);
2260         }
2261
2262         if (S_ISLNK(attr->la_mode)) {
2263                 const char *target_name = spec->u.sp_symname;
2264                 int sym_len = strlen(target_name);
2265                 const struct lu_buf *buf;
2266
2267                 buf = mdd_buf_get_const(env, target_name, sym_len);
2268                 rc = dt_declare_record_write(env, mdd_object_child(c),
2269                                              buf, 0, handle);
2270                 if (rc)
2271                         GOTO(out, rc);
2272         }
2273
2274         if (spec->sp_cr_file_secctx_name != NULL) {
2275                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2276                                         spec->sp_cr_file_secctx_size);
2277                 rc = mdo_declare_xattr_set(env, c, buf,
2278                                            spec->sp_cr_file_secctx_name, 0,
2279                                            handle);
2280                 if (rc < 0)
2281                         GOTO(out, rc);
2282         }
2283 out:
2284         return rc;
2285 }
2286
2287 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2288                               struct mdd_object *p, struct mdd_object *c,
2289                               const struct lu_name *name,
2290                               struct lu_attr *attr,
2291                               struct thandle *handle,
2292                               const struct md_op_spec *spec,
2293                               struct linkea_data *ldata,
2294                               struct lu_buf *def_acl_buf,
2295                               struct lu_buf *acl_buf,
2296                               struct dt_allocation_hint *hint)
2297 {
2298         int rc;
2299
2300         rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
2301                                        def_acl_buf, acl_buf, hint);
2302         if (rc)
2303                 GOTO(out, rc);
2304
2305         if (S_ISDIR(attr->la_mode)) {
2306                 rc = mdo_declare_ref_add(env, p, handle);
2307                 if (rc)
2308                         GOTO(out, rc);
2309         }
2310
2311         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2312                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2313                 if (rc)
2314                         GOTO(out, rc);
2315         } else {
2316                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2317
2318                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2319                                               name->ln_name, handle);
2320                 if (rc != 0)
2321                         return rc;
2322
2323                 rc = mdd_declare_links_add(env, c, handle, ldata);
2324                 if (rc)
2325                         return rc;
2326
2327                 *la = *attr;
2328                 la->la_valid = LA_CTIME | LA_MTIME;
2329                 rc = mdo_declare_attr_set(env, p, la, handle);
2330                 if (rc)
2331                         return rc;
2332
2333                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2334                 if (rc)
2335                         return rc;
2336         }
2337 out:
2338         return rc;
2339 }
2340
2341 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2342                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2343                         struct lu_buf *acl_buf)
2344 {
2345         int     rc;
2346         ENTRY;
2347
2348         if (S_ISLNK(la->la_mode)) {
2349                 acl_buf->lb_len = 0;
2350                 def_acl_buf->lb_len = 0;
2351                 RETURN(0);
2352         }
2353
2354         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2355         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2356                            XATTR_NAME_ACL_DEFAULT);
2357         mdd_read_unlock(env, pobj);
2358         if (rc > 0) {
2359                 /* If there are default ACL, fix mode/ACL by default ACL */
2360                 def_acl_buf->lb_len = rc;
2361                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2362                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2363                 acl_buf->lb_len = rc;
2364                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2365                 if (rc < 0)
2366                         RETURN(rc);
2367         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2368                 /* If there are no default ACL, fix mode by mask */
2369                 struct lu_ucred *uc = lu_ucred(env);
2370
2371                 /* The create triggered by MDT internal events, such as
2372                  * LFSCK reset, will not contain valid "uc". */
2373                 if (unlikely(uc != NULL))
2374                         la->la_mode &= ~uc->uc_umask;
2375                 rc = 0;
2376                 acl_buf->lb_len = 0;
2377                 def_acl_buf->lb_len = 0;
2378         }
2379
2380         RETURN(rc);
2381 }
2382
2383 /**
2384  * Create a metadata object and initialize it, set acl, xattr.
2385  **/
2386 static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
2387                              struct mdd_object *son, struct lu_attr *attr,
2388                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2389                              struct lu_buf *def_acl_buf,
2390                              struct dt_allocation_hint *hint,
2391                              struct thandle *handle)
2392 {
2393         const struct lu_buf *buf;
2394         int rc;
2395
2396         mdd_write_lock(env, son, MOR_TGT_CHILD);
2397         rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
2398                                         hint);
2399         if (rc)
2400                 GOTO(unlock, rc);
2401
2402         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2403          * created in declare phase, they also needs to be added to master
2404          * object as sub-directory entry. So it has to initialize the master
2405          * object, then set dir striped EA.(in mdo_xattr_set) */
2406         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2407                                    spec);
2408         if (rc != 0)
2409                 GOTO(err_destroy, rc);
2410
2411         /*
2412          * in case of replay we just set LOVEA provided by the client
2413          * XXX: I think it would be interesting to try "old" way where
2414          *      MDT calls this xattr_set(LOV) in a different transaction.
2415          *      probably this way we code can be made better.
2416          */
2417
2418         /* During creation, there are only a few cases we need do xattr_set to
2419          * create stripes.
2420          * 1. regular file: see comments above.
2421          * 2. dir: inherit default striping or pool settings from parent.
2422          * 3. create striped directory with provided stripeEA.
2423          * 4. create striped directory because inherit default layout from the
2424          * parent.
2425          */
2426         if (spec->no_create ||
2427             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2428             S_ISDIR(attr->la_mode)) {
2429                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2430                                         spec->u.sp_ea.eadatalen);
2431                 rc = mdo_xattr_set(env, son, buf,
2432                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2433                                                             XATTR_NAME_LOV, 0,
2434                                    handle);
2435                 if (rc != 0)
2436                         GOTO(err_destroy, rc);
2437         }
2438
2439 #ifdef CONFIG_FS_POSIX_ACL
2440         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2441             S_ISDIR(attr->la_mode)) {
2442                 /* set default acl */
2443                 rc = mdo_xattr_set(env, son, def_acl_buf,
2444                                    XATTR_NAME_ACL_DEFAULT, 0,
2445                                    handle);
2446                 if (rc)
2447                         GOTO(err_destroy, rc);
2448         }
2449         /* set its own acl */
2450         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2451                 rc = mdo_xattr_set(env, son, acl_buf,
2452                                    XATTR_NAME_ACL_ACCESS,
2453                                    0, handle);
2454                 if (rc)
2455                         GOTO(err_destroy, rc);
2456         }
2457 #endif
2458
2459         if (S_ISLNK(attr->la_mode)) {
2460                 struct lu_ucred  *uc = lu_ucred_assert(env);
2461                 struct dt_object *dt = mdd_object_child(son);
2462                 const char *target_name = spec->u.sp_symname;
2463                 int sym_len = strlen(target_name);
2464                 loff_t pos = 0;
2465
2466                 buf = mdd_buf_get_const(env, target_name, sym_len);
2467                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2468                                                 uc->uc_cap &
2469                                                 CFS_CAP_SYS_RESOURCE_MASK);
2470
2471                 if (rc == sym_len)
2472                         rc = 0;
2473                 else
2474                         GOTO(err_initlized, rc = -EFAULT);
2475         }
2476
2477         if (spec->sp_cr_file_secctx_name != NULL) {
2478                 buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
2479                                         spec->sp_cr_file_secctx_size);
2480                 rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
2481                                    0, handle);
2482                 if (rc < 0)
2483                         GOTO(err_initlized, rc);
2484         }
2485
2486 err_initlized:
2487         if (unlikely(rc != 0)) {
2488                 int rc2;
2489                 if (S_ISDIR(attr->la_mode)) {
2490                         /* Drop the reference, no need to delete "."/"..",
2491                          * because the object to be destroied directly. */
2492                         rc2 = mdo_ref_del(env, son, handle);
2493                         if (rc2 != 0)
2494                                 GOTO(unlock, rc);
2495                 }
2496                 rc2 = mdo_ref_del(env, son, handle);
2497                 if (rc2 != 0)
2498                         GOTO(unlock, rc);
2499 err_destroy:
2500                 mdo_destroy(env, son, handle);
2501         }
2502 unlock:
2503         mdd_write_unlock(env, son);
2504         RETURN(rc);
2505 }
2506
2507 static int mdd_index_delete(const struct lu_env *env,
2508                             struct mdd_object *mdd_pobj,
2509                             struct lu_attr *cattr,
2510                             const struct lu_name *lname)
2511 {
2512         struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
2513         struct thandle *handle;
2514         int rc;
2515         ENTRY;
2516
2517         handle = mdd_trans_create(env, mdd);
2518         if (IS_ERR(handle))
2519                 RETURN(PTR_ERR(handle));
2520
2521         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
2522                                       handle);
2523         if (rc != 0)
2524                 GOTO(stop, rc);
2525
2526         if (S_ISDIR(cattr->la_mode)) {
2527                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
2528                 if (rc != 0)
2529                         GOTO(stop, rc);
2530         }
2531
2532         /* Since this will only be used in the error handler path,
2533          * Let's set the thandle to be local and not mess the transno */
2534         handle->th_local = 1;
2535         rc = mdd_trans_start(env, mdd, handle);
2536         if (rc)
2537                 GOTO(stop, rc);
2538
2539         rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
2540                                 S_ISDIR(cattr->la_mode), handle);
2541         if (rc)
2542                 GOTO(stop, rc);
2543 stop:
2544         rc = mdd_trans_stop(env, mdd, rc, handle);
2545
2546         RETURN(rc);
2547 }
2548
2549 /**
2550  * Create object and insert it into namespace.
2551  *
2552  * Two operations have to be performed:
2553  *
2554  *  - an allocation of a new object (->do_create()), and
2555  *  - an insertion into a parent index (->dio_insert()).
2556  *
2557  * Due to locking, operation order is not important, when both are
2558  * successful, *but* error handling cases are quite different:
2559  *
2560  *  - if insertion is done first, and following object creation fails,
2561  *  insertion has to be rolled back, but this operation might fail
2562  *  also leaving us with dangling index entry.
2563  *
2564  *  - if creation is done first, is has to be undone if insertion fails,
2565  *  leaving us with leaked space, which is not good but not fatal.
2566  *
2567  * It seems that creation-first is simplest solution, but it is sub-optimal
2568  * in the frequent
2569  *
2570  * $ mkdir foo
2571  * $ mkdir foo
2572  *
2573  * case, because second mkdir is bound to create object, only to
2574  * destroy it immediately.
2575  *
2576  * To avoid this follow local file systems that do double lookup:
2577  *
2578  * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2579  * 1. create            (mdd_create_object_internal())
2580  * 2. insert            (__mdd_index_insert(), lookup again)
2581  *
2582  * \param[in] pobj      parent object
2583  * \param[in] lname     name of child being created
2584  * \param[in,out] child child object being created
2585  * \param[in] spec      additional create parameters
2586  * \param[in] ma        attributes for new child object
2587  *
2588  * \retval              0 on success
2589  * \retval              negative errno on failure
2590  */
2591 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2592                       const struct lu_name *lname, struct md_object *child,
2593                       struct md_op_spec *spec, struct md_attr *ma)
2594 {
2595         struct mdd_thread_info  *info = mdd_env_info(env);
2596         struct lu_attr          *la = &info->mti_la_for_fix;
2597         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2598         struct mdd_object       *son = md2mdd_obj(child);
2599         struct mdd_device       *mdd = mdo2mdd(pobj);
2600         struct lu_attr          *attr = &ma->ma_attr;
2601         struct thandle          *handle;
2602         struct lu_attr          *pattr = &info->mti_pattr;
2603         struct lu_buf           acl_buf;
2604         struct lu_buf           def_acl_buf;
2605         struct linkea_data      *ldata = &info->mti_link_data;
2606         const char              *name = lname->ln_name;
2607         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2608         int                      rc;
2609         int                      rc2;
2610         ENTRY;
2611
2612         rc = mdd_la_get(env, mdd_pobj, pattr);
2613         if (rc != 0)
2614                 RETURN(rc);
2615
2616         /* Sanity checks before big job. */
2617         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2618         if (rc)
2619                 RETURN(rc);
2620
2621         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2622                 GOTO(out_free, rc = -EINPROGRESS);
2623
2624         handle = mdd_trans_create(env, mdd);
2625         if (IS_ERR(handle))
2626                 GOTO(out_free, rc = PTR_ERR(handle));
2627
2628         lu_buf_check_and_alloc(&info->mti_xattr_buf,
2629                                mdd->mdd_dt_conf.ddp_max_ea_size);
2630         acl_buf = info->mti_xattr_buf;
2631         def_acl_buf.lb_buf = info->mti_key;
2632         def_acl_buf.lb_len = sizeof(info->mti_key);
2633         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2634         if (rc < 0)
2635                 GOTO(out_stop, rc);
2636
2637         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2638
2639         memset(ldata, 0, sizeof(*ldata));
2640         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2641                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2642
2643                 tfid.f_oid--;
2644                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2645                                         &tfid, lname, 1, 0, ldata);
2646         } else {
2647                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2648                                         mdd_object_fid(mdd_pobj),
2649                                         lname, 1, 0, ldata);
2650         }
2651
2652         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2653                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2654                                 hint);
2655         if (rc)
2656                 GOTO(out_stop, rc);
2657
2658         rc = mdd_trans_start(env, mdd, handle);
2659         if (rc)
2660                 GOTO(out_stop, rc);
2661
2662         rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
2663                                &def_acl_buf, hint, handle);
2664         if (rc != 0)
2665                 GOTO(out_stop, rc);
2666
2667         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2668                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2669                 son->mod_flags |= VOLATILE_OBJ;
2670                 rc = __mdd_orphan_add(env, son, handle);
2671                 GOTO(out_volatile, rc);
2672         } else {
2673                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2674                                         attr->la_mode, name, handle);
2675                 if (rc != 0)
2676                         GOTO(err_created, rc);
2677
2678                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2679                               ldata, 1);
2680
2681                 /* update parent directory mtime/ctime */
2682                 *la = *attr;
2683                 la->la_valid = LA_CTIME | LA_MTIME;
2684                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2685                 if (rc)
2686                         GOTO(err_insert, rc);
2687         }
2688
2689         EXIT;
2690 err_insert:
2691         if (rc != 0) {
2692                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2693                         rc2 = __mdd_orphan_del(env, son, handle);
2694                 else
2695                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2696                                                  S_ISDIR(attr->la_mode),
2697                                                  handle);
2698                 if (rc2 != 0)
2699                         goto out_stop;
2700
2701 err_created:
2702                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2703                 if (S_ISDIR(attr->la_mode)) {
2704                         /* Drop the reference, no need to delete "."/"..",
2705                          * because the object is to be destroyed directly. */
2706                         rc2 = mdo_ref_del(env, son, handle);
2707                         if (rc2 != 0) {
2708                                 mdd_write_unlock(env, son);
2709                                 goto out_stop;
2710                         }
2711                 }
2712 out_volatile:
2713                 /* For volatile files drop one link immediately, since there is
2714                  * no filename in the namespace, and save any error returned. */
2715                 rc2 = mdo_ref_del(env, son, handle);
2716                 if (rc2 != 0) {
2717                         mdd_write_unlock(env, son);
2718                         if (unlikely(rc == 0))
2719                                 rc = rc2;
2720                         goto out_stop;
2721                 }
2722
2723                 /* Don't destroy the volatile object on success */
2724                 if (likely(rc != 0))
2725                         mdo_destroy(env, son, handle);
2726                 mdd_write_unlock(env, son);
2727         }
2728
2729         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2730             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2731                 rc = mdd_changelog_ns_store(env, mdd,
2732                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2733                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2734                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2735                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2736                                 NULL, handle);
2737 out_stop:
2738         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2739         if (rc == 0) {
2740                 /* If creation fails, it is most likely due to the remote update
2741                  * failure, because local transaction will mostly succeed at
2742                  * this stage. There is no easy way to rollback all of previous
2743                  * updates, so let's remove the object from namespace, and
2744                  * LFSCK should handle the orphan object. */
2745                 if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
2746                         mdd_index_delete(env, mdd_pobj, attr, lname);
2747                 rc = rc2;
2748         }
2749 out_free:
2750         if (is_vmalloc_addr(ldata->ld_buf))
2751                 /* if we vmalloced a large buffer drop it */
2752                 lu_buf_free(ldata->ld_buf);
2753
2754         /* The child object shouldn't be cached anymore */
2755         if (rc)
2756                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2757                         &child->mo_lu.lo_header->loh_flags);
2758         return rc;
2759 }
2760
2761 /*
2762  * Get locks on parents in proper order
2763  * RETURN: < 0 - error, rename_order if successful
2764  */
2765 enum rename_order {
2766         MDD_RN_SAME,
2767         MDD_RN_SRCTGT,
2768         MDD_RN_TGTSRC
2769 };
2770
2771 static int mdd_rename_order(const struct lu_env *env,
2772                             struct mdd_device *mdd,
2773                             struct mdd_object *src_pobj,
2774                             const struct lu_attr *pattr,
2775                             struct mdd_object *tgt_pobj)
2776 {
2777         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2778         int rc;
2779         ENTRY;
2780
2781         if (src_pobj == tgt_pobj)
2782                 RETURN(MDD_RN_SAME);
2783
2784         /* compared the parent child relationship of src_p&tgt_p */
2785         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2786                 rc = MDD_RN_SRCTGT;
2787         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2788                 rc = MDD_RN_TGTSRC;
2789         } else {
2790                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2791                                    NULL);
2792                 if (rc == -EREMOTE)
2793                         rc = 0;
2794
2795                 if (rc == 1)
2796                         rc = MDD_RN_TGTSRC;
2797                 else if (rc == 0)
2798                         rc = MDD_RN_SRCTGT;
2799         }
2800
2801         RETURN(rc);
2802 }
2803
2804 /* has not mdd_write{read}_lock on any obj yet. */
2805 static int mdd_rename_sanity_check(const struct lu_env *env,
2806                                    struct mdd_object *src_pobj,
2807                                    const struct lu_attr *pattr,
2808                                    struct mdd_object *tgt_pobj,
2809                                    const struct lu_attr *tpattr,
2810                                    struct mdd_object *sobj,
2811                                    const struct lu_attr *cattr,
2812                                    struct mdd_object *tobj,
2813                                    const struct lu_attr *tattr)
2814 {
2815         int rc = 0;
2816         ENTRY;
2817
2818         /* XXX: when get here, sobj must NOT be NULL,
2819          * the other case has been processed in cld_rename
2820          * before mdd_rename and enable MDS_PERM_BYPASS. */
2821         LASSERT(sobj);
2822
2823         /*
2824          * If we are using project inheritance, we only allow renames
2825          * into our tree when the project IDs are the same; otherwise
2826          * tree quota mechanism would be circumvented.
2827          */
2828         if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2829             tpattr->la_projid != cattr->la_projid) ||
2830             ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
2831             (pattr->la_projid != tpattr->la_projid)))
2832                 RETURN(-EXDEV);
2833
2834         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2835         if (rc)
2836                 RETURN(rc);
2837
2838         /* XXX: when get here, "tobj == NULL" means tobj must
2839          * NOT exist (neither on remote MDS, such case has been
2840          * processed in cld_rename before mdd_rename and enable
2841          * MDS_PERM_BYPASS).
2842          * So check may_create, but not check may_unlink. */
2843         if (tobj == NULL)
2844                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2845                                     (src_pobj != tgt_pobj));
2846         else
2847                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2848                                     (src_pobj != tgt_pobj), 1);
2849
2850         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2851                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2852
2853         RETURN(rc);
2854 }
2855
2856 static int mdd_declare_rename(const struct lu_env *env,
2857                               struct mdd_device *mdd,
2858                               struct mdd_object *mdd_spobj,
2859                               struct mdd_object *mdd_tpobj,
2860                               struct mdd_object *mdd_sobj,
2861                               struct mdd_object *mdd_tobj,
2862                               const struct lu_name *sname,
2863                               const struct lu_name *tname,
2864                               struct md_attr *ma,
2865                               struct linkea_data *ldata,
2866                               struct thandle *handle)
2867 {
2868         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2869         int rc;
2870
2871         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2872         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2873
2874         LASSERT(mdd_spobj);
2875         LASSERT(mdd_tpobj);
2876         LASSERT(mdd_sobj);
2877
2878         /* name from source dir */
2879         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2880         if (rc)
2881                 return rc;
2882
2883         /* .. from source child */
2884         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2885                 /* source child can be directory,
2886                  * counted by source dir's nlink */
2887                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2888                 if (rc)
2889                         return rc;
2890                 if (mdd_spobj != mdd_tpobj) {
2891                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2892                                                       handle);
2893                         if (rc != 0)
2894                                 return rc;
2895
2896                         rc = mdo_declare_index_insert(env, mdd_sobj,
2897                                                       mdo2fid(mdd_tpobj),
2898                                                       S_IFDIR, dotdot, handle);
2899                         if (rc != 0)
2900                                 return rc;
2901                 }
2902
2903                 /* new target child can be directory,
2904                  * counted by target dir's nlink */
2905                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2906                 if (rc != 0)
2907                         return rc;
2908         }
2909
2910         la->la_valid = LA_CTIME | LA_MTIME;
2911         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2912         if (rc != 0)
2913                 return rc;
2914
2915         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2916         if (rc != 0)
2917                 return rc;
2918
2919         la->la_valid = LA_CTIME;
2920         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2921         if (rc)
2922                 return rc;
2923
2924         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2925         if (rc)
2926                 return rc;
2927
2928         /* new name */
2929         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2930                                       mdd_object_type(mdd_sobj),
2931                                       tname->ln_name, handle);
2932         if (rc != 0)
2933                 return rc;
2934
2935         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2936                 /* delete target child in target parent directory */
2937                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2938                                               handle);
2939                 if (rc)
2940                         return rc;
2941
2942                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2943                 if (rc)
2944                         return rc;
2945
2946                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2947                         /* target child can be directory,
2948                          * delete "." reference in target child directory */
2949                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2950                         if (rc)
2951                                 return rc;
2952
2953                         /* delete ".." reference in target parent directory */
2954                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2955                         if (rc)
2956                                 return rc;
2957                 }
2958
2959                 la->la_valid = LA_CTIME;
2960                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2961                 if (rc)
2962                         return rc;
2963
2964                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2965                 if (rc)
2966                         return rc;
2967         }
2968
2969         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2970         if (rc)
2971                 return rc;
2972
2973         return rc;
2974 }
2975
2976 /* src object can be remote that is why we use only fid and type of object */
2977 static int mdd_rename(const struct lu_env *env,
2978                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2979                       const struct lu_fid *lf, const struct lu_name *lsname,
2980                       struct md_object *tobj, const struct lu_name *ltname,
2981                       struct md_attr *ma)
2982 {
2983         const char *sname = lsname->ln_name;
2984         const char *tname = ltname->ln_name;
2985         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2986         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2987         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2988         struct mdd_device *mdd = mdo2mdd(src_pobj);
2989         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2990         struct mdd_object *mdd_tobj = NULL;
2991         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2992         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2993         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2994         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2995         struct thandle *handle;
2996         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2997         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2998         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2999         bool is_dir;
3000         bool tobj_ref = 0;
3001         bool tobj_locked = 0;
3002         unsigned cl_flags = 0;
3003         int rc, rc2;
3004         ENTRY;
3005
3006         if (tobj)
3007                 mdd_tobj = md2mdd_obj(tobj);
3008
3009         mdd_sobj = mdd_object_find(env, mdd, lf);
3010         if (IS_ERR(mdd_sobj))
3011                 RETURN(PTR_ERR(mdd_sobj));
3012
3013         rc = mdd_la_get(env, mdd_sobj, cattr);
3014         if (rc)
3015                 GOTO(out_pending, rc);
3016
3017         rc = mdd_la_get(env, mdd_spobj, pattr);
3018         if (rc)
3019                 GOTO(out_pending, rc);
3020
3021         if (mdd_tobj) {
3022                 rc = mdd_la_get(env, mdd_tobj, tattr);
3023                 if (rc)
3024                         GOTO(out_pending, rc);
3025                 /* search for an existing archive.
3026                  * we should check ahead as the object
3027                  * can be destroyed in this transaction */
3028                 if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
3029                         cl_flags |= CLF_RENAME_LAST_EXISTS;
3030         }
3031
3032         rc = mdd_la_get(env, mdd_tpobj, tpattr);
3033         if (rc)
3034                 GOTO(out_pending, rc);
3035
3036         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
3037                                      mdd_sobj, cattr, mdd_tobj, tattr);
3038         if (rc)
3039                 GOTO(out_pending, rc);
3040
3041         rc = mdd_name_check(mdd, ltname);
3042         if (rc < 0)
3043                 GOTO(out_pending, rc);
3044
3045         /* FIXME: Should consider tobj and sobj too in rename_lock. */
3046         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
3047         if (rc < 0)
3048                 GOTO(out_pending, rc);
3049
3050         handle = mdd_trans_create(env, mdd);
3051         if (IS_ERR(handle))
3052                 GOTO(out_pending, rc = PTR_ERR(handle));
3053
3054         memset(ldata, 0, sizeof(*ldata));
3055         rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
3056                                 lsname, mdd_object_fid(mdd_tpobj), ltname,
3057                                 1, 0, ldata);
3058         if (rc)
3059                 GOTO(stop, rc);
3060
3061         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
3062                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
3063         if (rc)
3064                 GOTO(stop, rc);
3065
3066         rc = mdd_trans_start(env, mdd, handle);
3067         if (rc)
3068                 GOTO(stop, rc);
3069
3070         is_dir = S_ISDIR(cattr->la_mode);
3071
3072         /* Remove source name from source directory */
3073         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
3074         if (rc != 0)
3075                 GOTO(stop, rc);
3076
3077         /* "mv dir1 dir2" needs "dir1/.." link update */
3078         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
3079                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3080                 if (rc != 0)
3081                         GOTO(fixup_spobj2, rc);
3082
3083                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
3084                                              dotdot, handle);
3085                 if (rc != 0)
3086                         GOTO(fixup_spobj, rc);
3087         }
3088
3089         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
3090                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3091                 if (rc != 0)
3092                         /* tname might been renamed to something else */
3093                         GOTO(fixup_spobj, rc);
3094         }
3095
3096         /* Insert new fid with target name into target dir */
3097         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
3098                                 tname, handle);
3099         if (rc != 0)
3100                 GOTO(fixup_tpobj, rc);
3101
3102         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3103         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
3104
3105         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
3106         la->la_valid = LA_CTIME;
3107         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
3108         if (rc)
3109                 GOTO(fixup_tpobj, rc);
3110
3111         /* Update the linkEA for the source object */
3112         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
3113         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
3114                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
3115                               0, 0);
3116         if (rc == -ENOENT)
3117                 /* Old files might not have EA entry */
3118                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
3119                               lsname, handle, NULL, 0);
3120         mdd_write_unlock(env, mdd_sobj);
3121         /* We don't fail the transaction if the link ea can't be
3122            updated -- fid2path will use alternate lookup method. */
3123         rc = 0;
3124
3125         /* Remove old target object
3126          * For tobj is remote case cmm layer has processed
3127          * and set tobj to NULL then. So when tobj is NOT NULL,
3128          * it must be local one.
3129          */
3130         if (tobj && mdd_object_exists(mdd_tobj)) {
3131                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
3132                 tobj_locked = 1;
3133                 if (mdd_is_dead_obj(mdd_tobj)) {
3134                         /* shld not be dead, something is wrong */
3135                         CERROR("tobj is dead, something is wrong\n");
3136                         rc = -EINVAL;
3137                         goto cleanup;
3138                 }
3139                 mdo_ref_del(env, mdd_tobj, handle);
3140
3141                 /* Remove dot reference. */
3142                 if (S_ISDIR(tattr->la_mode))
3143                         mdo_ref_del(env, mdd_tobj, handle);
3144                 tobj_ref = 1;
3145
3146                 /* fetch updated nlink */
3147                 rc = mdd_la_get(env, mdd_tobj, tattr);
3148                 if (rc != 0) {
3149                         CERROR("%s: Failed to get nlink for tobj "
3150                                 DFID": rc = %d\n",
3151                                 mdd2obd_dev(mdd)->obd_name,
3152                                 PFID(tpobj_fid), rc);
3153                         GOTO(fixup_tpobj, rc);
3154                 }
3155
3156                 la->la_valid = LA_CTIME;
3157                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
3158                 if (rc != 0) {
3159                         CERROR("%s: Failed to set ctime for tobj "
3160                                 DFID": rc = %d\n",
3161                                 mdd2obd_dev(mdd)->obd_name,
3162                                 PFID(tpobj_fid), rc);
3163                         GOTO(fixup_tpobj, rc);
3164                 }
3165
3166                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3167                 ma->ma_attr = *tattr;
3168                 ma->ma_valid |= MA_INODE;
3169                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
3170                                        handle);
3171                 if (rc != 0) {
3172                         CERROR("%s: Failed to unlink tobj "
3173                                 DFID": rc = %d\n",
3174                                 mdd2obd_dev(mdd)->obd_name,
3175                                 PFID(tpobj_fid), rc);
3176                         GOTO(fixup_tpobj, rc);
3177                 }
3178
3179                 /* fetch updated nlink */
3180                 rc = mdd_la_get(env, mdd_tobj, tattr);
3181                 if (rc == -ENOENT) {
3182                         /* the object got removed, let's
3183                          * return the latest known attributes */
3184                         tattr->la_nlink = 0;
3185                         rc = 0;
3186                 } else if (rc != 0) {
3187                         CERROR("%s: Failed to get nlink for tobj "
3188                                 DFID": rc = %d\n",
3189                                 mdd2obd_dev(mdd)->obd_name,
3190                                 PFID(tpobj_fid), rc);
3191                         GOTO(fixup_tpobj, rc);
3192                 }
3193                 /* XXX: this transfer to ma will be removed with LOD/OSP */
3194                 ma->ma_attr = *tattr;
3195                 ma->ma_valid |= MA_INODE;
3196
3197                 if (tattr->la_nlink == 0)
3198                         cl_flags |= CLF_RENAME_LAST;
3199                 else
3200                         cl_flags &= ~CLF_RENAME_LAST_EXISTS;
3201         }
3202
3203         la->la_valid = LA_CTIME | LA_MTIME;
3204         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
3205         if (rc)
3206                 GOTO(fixup_tpobj, rc);
3207
3208         if (mdd_spobj != mdd_tpobj) {
3209                 la->la_valid = LA_CTIME | LA_MTIME;
3210                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
3211                 if (rc != 0)
3212                         GOTO(fixup_tpobj, rc);
3213         }
3214
3215         EXIT;
3216
3217 fixup_tpobj:
3218         if (rc) {
3219                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
3220                 if (rc2)
3221                         CWARN("tp obj fix error %d\n",rc2);
3222
3223                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
3224                     !mdd_is_dead_obj(mdd_tobj)) {
3225                         if (tobj_ref) {
3226                                 mdo_ref_add(env, mdd_tobj, handle);
3227                                 if (is_dir)
3228                                         mdo_ref_add(env, mdd_tobj, handle);
3229                         }
3230
3231                         rc2 = __mdd_index_insert(env, mdd_tpobj,
3232                                                   mdo2fid(mdd_tobj),
3233                                                   mdd_object_type(mdd_tobj),
3234                                                   tname, handle);
3235                         if (rc2 != 0)
3236                                 CWARN("tp obj fix error: rc = %d\n", rc2);
3237                 }
3238         }
3239
3240 fixup_spobj:
3241         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
3242                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
3243                 if (rc2)
3244                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
3245                                mdd2obd_dev(mdd)->obd_name, rc2);
3246
3247
3248                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
3249                                               dotdot, handle);
3250                 if (rc2 != 0)
3251                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
3252                               mdd2obd_dev(mdd)->obd_name, rc2);
3253         }
3254
3255 fixup_spobj2:
3256         if (rc != 0) {
3257                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
3258                                          mdd_object_type(mdd_sobj), sname,
3259                                          handle);
3260                 if (rc2 != 0)
3261                         CWARN("sp obj fix error: rc = %d\n", rc2);
3262         }
3263
3264 cleanup:
3265         if (tobj_locked)
3266                 mdd_write_unlock(env, mdd_tobj);
3267
3268         if (rc == 0)
3269                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
3270                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
3271                                             ltname, lsname, handle);
3272
3273 stop:
3274         rc = mdd_trans_stop(env, mdd, rc, handle);
3275
3276 out_pending:
3277         mdd_object_put(env, mdd_sobj);
3278         return rc;
3279 }
3280
3281 /**
3282  * During migration once the parent FID has been changed,
3283  * we need update the parent FID in linkea.
3284  **/
3285 static int mdd_linkea_update_child_internal(const struct lu_env *env,
3286                                             struct mdd_object *parent,
3287                                             struct mdd_object *newparent,
3288                                             struct mdd_object *child,
3289                                             const char *name, int namelen,
3290                                             struct thandle *handle,
3291                                             bool declare)
3292 {
3293         struct mdd_thread_info  *info = mdd_env_info(env);
3294         struct linkea_data      ldata = { NULL };
3295         struct lu_buf           *buf = &info->mti_link_buf;
3296         int                     count;
3297         int                     rc = 0;
3298
3299         ENTRY;
3300
3301         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3302         if (buf->lb_buf == NULL)
3303                 RETURN(-ENOMEM);
3304
3305         ldata.ld_buf = buf;
3306         rc = mdd_links_read(env, child, &ldata);
3307         if (rc != 0) {
3308                 if (rc == -ENOENT || rc == -ENODATA)
3309                         rc = 0;
3310                 RETURN(rc);
3311         }
3312
3313         LASSERT(ldata.ld_leh != NULL);
3314         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3315         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3316                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3317                 struct lu_name lname;
3318                 struct lu_fid  fid;
3319
3320                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3321                                     &lname, &fid);
3322
3323                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3324                     !lu_fid_eq(&fid, mdd_object_fid(parent))) {
3325                         ldata.ld_lee = (struct link_ea_entry *)
3326                                        ((char *)ldata.ld_lee +
3327                                         ldata.ld_reclen);
3328                         continue;
3329                 }
3330
3331                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3332                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3333                        lname.ln_namelen, lname.ln_name,
3334                        PFID(mdd_object_fid(newparent)));
3335                 /* update to the new parent fid */
3336                 linkea_entry_pack(ldata.ld_lee, &lname,
3337                                   mdd_object_fid(newparent));
3338                 if (declare)
3339                         rc = mdd_declare_links_add(env, child, handle, &ldata);
3340                 else
3341                         rc = mdd_links_write(env, child, &ldata, handle);
3342                 break;
3343         }
3344         RETURN(rc);
3345 }
3346
3347 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3348                                            struct mdd_object *parent,
3349                                            struct mdd_object *newparent,
3350                                            struct mdd_object *child,
3351                                            const char *name, int namelen,
3352                                            struct thandle *handle)
3353 {
3354         return mdd_linkea_update_child_internal(env, parent, newparent,
3355                                                 child, name,
3356                                                 namelen, handle, true);
3357 }
3358
3359 static int mdd_linkea_update_child(const struct lu_env *env,
3360                                    struct mdd_object *parent,
3361                                    struct mdd_object *newparent,
3362                                    struct mdd_object *child,
3363                                    const char *name, int namelen,
3364                                    struct thandle *handle)
3365 {
3366         return mdd_linkea_update_child_internal(env, parent, newparent,
3367                                                 child, name,
3368                                                 namelen, handle, false);
3369 }
3370
3371 static int mdd_update_linkea_internal(const struct lu_env *env,
3372                                       struct mdd_object *mdd_pobj,
3373                                       struct mdd_object *mdd_sobj,
3374                                       struct mdd_object *mdd_tobj,
3375                                       const struct lu_name *child_name,
3376                                       struct linkea_data *ldata,
3377                                       struct thandle *handle,
3378                                       int declare)
3379 {
3380         struct mdd_thread_info  *info = mdd_env_info(env);
3381         int                     count;
3382         int                     rc = 0;
3383         ENTRY;
3384
3385         LASSERT(ldata->ld_buf != NULL);
3386         LASSERT(ldata->ld_leh != NULL);
3387
3388         /* If it is mulitple links file, we need update the name entry for
3389          * all parent */
3390         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3391         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3392                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3393                 struct mdd_object       *pobj;
3394                 struct lu_name          lname;
3395                 struct lu_fid           fid;
3396
3397                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3398                                     &lname, &fid);
3399                 pobj = mdd_object_find(env, mdd, &fid);
3400                 if (IS_ERR(pobj)) {
3401                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3402                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3403                               PTR_ERR(pobj));
3404                         continue;
3405                 }
3406
3407                 if (!mdd_object_exists(pobj)) {
3408                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3409                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3410                         goto next_put;
3411                 }
3412
3413                 if (pobj == mdd_pobj &&
3414                     lname.ln_namelen == child_name->ln_namelen &&
3415                     strncmp(lname.ln_name, child_name->ln_name,
3416                             lname.ln_namelen) == 0) {
3417                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3418                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3419                               PFID(&fid));
3420                         goto next_put;
3421                 }
3422
3423                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3424                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3425                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3426
3427                 if (declare) {
3428                         /* Remove source name from source directory */
3429                         /* Insert new fid with target name into target dir */
3430                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3431                                                       handle);
3432                         if (rc != 0)
3433                                 GOTO(next_put, rc);
3434
3435                         rc = mdo_declare_index_insert(env, pobj,
3436                                         mdd_object_fid(mdd_tobj),
3437                                         mdd_object_type(mdd_tobj),
3438                                         lname.ln_name, handle);
3439                         if (rc != 0)
3440                                 GOTO(next_put, rc);
3441
3442                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3443                         if (rc)
3444                                 GOTO(next_put, rc);
3445
3446                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3447                         if (rc)
3448                                 GOTO(next_put, rc);
3449                 } else {
3450                         char *tmp_name = info->mti_key;
3451
3452                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3453                                 /* lnamelen is too big(> NAME_MAX + 16),
3454                                  * something wrong about this linkea, let's
3455                                  * skip it */
3456                                 CWARN("%s: the name %.*s is too long under "
3457                                       DFID"\n", mdd2obd_dev(mdd)->obd_name,
3458                                       lname.ln_namelen, lname.ln_name,
3459                                       PFID(&fid));
3460                                 goto next_put;
3461                         }
3462
3463                         /* Note: lname might be without \0 at the end, see
3464                          * linkea_entry_unpack(), let's add extra \0 by
3465                          * snprintf */
3466                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3467                                  lname.ln_namelen, lname.ln_name);
3468                         lname.ln_name = tmp_name;
3469
3470                         /* Let's check if this linkEA still valid, before
3471                          * it might be packed into the RPC buffer. */
3472                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3473                                         &info->mti_fid, NULL);
3474                         if (rc < 0 || !lu_fid_eq(&info->mti_fid,
3475                                                  mdd_object_fid(mdd_sobj)))
3476                                 GOTO(next_put, rc == -ENOENT ? 0 : rc);
3477
3478                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3479                         if (rc != 0)
3480                                 GOTO(next_put, rc);
3481
3482                         rc = __mdd_index_insert(env, pobj,
3483                                         mdd_object_fid(mdd_tobj),
3484                                         mdd_object_type(mdd_tobj),
3485                                         tmp_name, handle);
3486                         if (rc != 0)
3487                                 GOTO(next_put, rc);
3488
3489                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3490                         rc = mdo_ref_add(env, mdd_tobj, handle);
3491                         mdd_write_unlock(env, mdd_tobj);
3492                         if (rc)
3493                                 GOTO(next_put, rc);
3494
3495                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3496                         mdo_ref_del(env, mdd_sobj, handle);
3497                         mdd_write_unlock(env, mdd_sobj);
3498                 }
3499 next_put:
3500                 mdd_object_put(env, pobj);
3501                 if (rc != 0)
3502                         break;
3503
3504                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3505                                                          ldata->ld_reclen);
3506         }
3507
3508         RETURN(rc);
3509 }
3510
3511 static int mdd_migrate_xattrs(const struct lu_env *env,
3512                               struct mdd_object *mdd_sobj,
3513                               struct mdd_object *mdd_tobj)
3514 {
3515         struct mdd_thread_info  *info = mdd_env_info(env);
3516         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3517         char                    *xname;
3518         struct thandle          *handle;
3519         struct lu_buf           xbuf;
3520         int                     xlen;
3521         int                     rem;
3522         int                     xsize;
3523         int                     list_xsize;
3524         struct lu_buf           list_xbuf;
3525         int                     rc;
3526
3527         /* retrieve xattr list from the old object */
3528         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3529         if (list_xsize == -ENODATA)
3530                 return 0;
3531
3532         if (list_xsize < 0)
3533                 return list_xsize;
3534
3535         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3536         if (info->mti_big_buf.lb_buf == NULL)
3537                 return -ENOMEM;
3538
3539         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3540         list_xbuf.lb_len = list_xsize;
3541         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3542         if (rc < 0)
3543                 return rc;
3544         rc = 0;
3545         rem = list_xsize;
3546         xname = list_xbuf.lb_buf;
3547         while (rem > 0) {
3548                 xlen = strnlen(xname, rem - 1) + 1;
3549                 if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
3550                     strcmp(XATTR_NAME_LMV, xname) == 0)
3551                         goto next;
3552
3553                 /* For directory, if there are default layout, migrate here */
3554                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3555                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3556                         goto next;
3557
3558                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3559                 if (xsize == -ENODATA)
3560                         goto next;
3561                 if (xsize < 0)
3562                         GOTO(out, rc);
3563
3564                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3565                 if (info->mti_link_buf.lb_buf == NULL)
3566                         GOTO(out, rc = -ENOMEM);
3567
3568                 xbuf.lb_len = xsize;
3569                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3570                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3571                 if (rc == -ENODATA)
3572                         goto next;
3573                 if (rc < 0)
3574                         GOTO(out, rc);
3575
3576                 handle = mdd_trans_create(env, mdd);
3577                 if (IS_ERR(handle))
3578                         GOTO(out, rc = PTR_ERR(handle));
3579
3580                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3581                                            handle);
3582                 if (rc != 0)
3583                         GOTO(stop_trans, rc);
3584                 /* Note: this transaction is part of migration, and it is not
3585                  * the last step of migration, so we set th_local = 1 to avoid
3586                  * update last rcvd for this transaction */
3587                 handle->th_local = 1;
3588                 rc = mdd_trans_start(env, mdd, handle);
3589                 if (rc != 0)
3590                         GOTO(stop_trans, rc);
3591
3592 again:
3593                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3594                 if (rc == -EEXIST)
3595                         GOTO(stop_trans, rc = 0);
3596
3597                 if (unlikely(rc == -ENOSPC &&
3598                              strcmp(xname, XATTR_NAME_LINK) == 0)) {
3599                         rc = linkea_overflow_shrink(
3600                                         (struct linkea_data *)(xbuf.lb_buf));
3601                         if (likely(rc > 0)) {
3602                                 xbuf.lb_len = rc;
3603                                 goto again;
3604                         }
3605                 }
3606
3607                 if (rc != 0)
3608                         GOTO(stop_trans, rc);
3609 stop_trans:
3610                 rc = mdd_trans_stop(env, mdd, rc, handle);
3611                 if (rc != 0)
3612                         GOTO(out, rc);
3613 next:
3614                 rem -= xlen;
3615                 memmove(xname, xname + xlen, rem);
3616         }
3617 out:
3618         return rc;
3619 }
3620
3621 static int mdd_declare_migrate_create(const struct lu_env *env,
3622                                       struct mdd_object *mdd_pobj,
3623                                       struct mdd_object *mdd_sobj,
3624                                       struct mdd_object *mdd_tobj,
3625                                       struct md_op_spec *spec,
3626                                       struct lu_attr *la,
3627                                       union lmv_mds_md *mgr_ea,
3628                                       struct linkea_data *ldata,
3629                                       struct thandle *handle)
3630 {
3631         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3632         const struct lu_buf     *buf;
3633         int                     rc;
3634         int                     mgr_easize;
3635
3636         rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
3637                                                 handle, spec, NULL);
3638         if (rc != 0)
3639                 return rc;
3640
3641         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3642                                            handle);
3643         if (rc != 0)
3644                 return rc;
3645
3646         if (S_ISLNK(la->la_mode)) {
3647                 const char *target_name = spec->u.sp_symname;
3648                 int sym_len = strlen(target_name);
3649                 const struct lu_buf *buf;
3650
3651                 buf = mdd_buf_get_const(env, target_name, sym_len);
3652                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3653                                              buf, 0, handle);
3654                 if (rc != 0)
3655                         return rc;
3656         } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
3657                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
3658                 if (rc != 0)
3659                         return rc;
3660         }
3661
3662         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3663                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3664                                         spec->u.sp_ea.eadatalen);
3665                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3666                                            0, handle);
3667                 if (rc)
3668                         return rc;
3669         }
3670
3671         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3672         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3673         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3674                                    0, handle);
3675         if (rc)
3676                 return rc;
3677
3678         la_flag->la_valid = LA_FLAGS;
3679         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3680         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3681
3682         return rc;
3683 }
3684
3685 static int mdd_migrate_create(const struct lu_env *env,
3686                               struct mdd_object *mdd_pobj,
3687                               struct mdd_object *mdd_sobj,
3688                               struct mdd_object *mdd_tobj,
3689                               const struct lu_name *lname,
3690                               struct lu_attr *la)
3691 {
3692         struct mdd_thread_info  *info = mdd_env_info(env);
3693         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3694         struct md_op_spec       *spec = &info->mti_spec;
3695         struct lu_buf           lmm_buf = { NULL };
3696         struct lu_buf           link_buf = { NULL };
3697         struct lu_buf            mgr_buf;
3698         struct thandle          *handle;
3699         struct lmv_mds_md_v1    *mgr_ea;
3700         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3701         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3702         int                     mgr_easize;
3703         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3704         int                     rc;
3705         ENTRY;
3706
3707         /* prepare spec for create */
3708         memset(spec, 0, sizeof(*spec));
3709         spec->sp_cr_lookup = 0;
3710         spec->sp_feat = &dt_directory_features;
3711         if (S_ISLNK(la->la_mode)) {
3712                 const struct lu_buf *buf;
3713
3714                 buf = lu_buf_check_and_alloc(
3715                                 &mdd_env_info(env)->mti_big_buf,
3716                                 la->la_size + 1);
3717                 link_buf = *buf;
3718                 link_buf.lb_len = la->la_size + 1;
3719                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3720                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3721                 if (rc <= 0) {
3722                         rc = rc != 0 ? rc : -EFAULT;
3723                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3724                                mdd2obd_dev(mdd)->obd_name,
3725                                PFID(mdd_object_fid(mdd_sobj)), rc);
3726                         RETURN(rc);
3727                 }
3728                 spec->u.sp_symname = link_buf.lb_buf;
3729         } else if (S_ISREG(la->la_mode)) {
3730                 /* retrieve lov of the old object */
3731                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3732                 if (rc != 0 && rc != -ENODATA)
3733                         RETURN(rc);
3734                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3735                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3736                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3737                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3738                 }
3739         } else if (S_ISDIR(la->la_mode)) {
3740                 rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
3741                 if (rc == -ENODATA) {
3742                         /* ignore the non-linkEA error */
3743                         ldata = NULL;
3744                         rc = 0;
3745                 }
3746                 if (rc < 0)
3747                         RETURN(rc);
3748         }
3749
3750         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3751         lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
3752         mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
3753         mgr_buf.lb_len = mgr_easize;
3754         mgr_ea = mgr_buf.lb_buf;
3755         memset(mgr_ea, 0, sizeof(*mgr_ea));
3756         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3757         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3758         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3759         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3760         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3761         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3762
3763         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3764
3765         handle = mdd_trans_create(env, mdd);
3766         if (IS_ERR(handle))
3767                 GOTO(out_free, rc = PTR_ERR(handle));
3768
3769         /* Note: this transaction is part of migration, and it is not
3770          * the last step of migration, so we set th_local = 1 to avoid
3771          * update last rcvd for this transaction */
3772         handle->th_local = 1;
3773         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
3774                                         la, mgr_buf.lb_buf, ldata, handle);
3775         if (rc != 0)
3776                 GOTO(stop_trans, rc);
3777
3778         rc = mdd_trans_start(env, mdd, handle);
3779         if (rc != 0)
3780                 GOTO(stop_trans, rc);
3781
3782         /* don't set nlink from the original object */
3783         la->la_valid &= ~LA_NLINK;
3784
3785         /* create the target object */
3786         rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3787                                hint, handle);
3788         if (rc != 0)
3789                 GOTO(stop_trans, rc);
3790
3791         if (S_ISDIR(la->la_mode) && ldata != NULL) {
3792                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3793                 if (rc != 0)
3794                         GOTO(stop_trans, rc);
3795         }
3796
3797         /* Set MIGRATE EA on the source inode, so once the migration needs
3798          * to be re-done during failover, the re-do process can locate the
3799          * target object which is already being created. */
3800         rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
3801         if (rc != 0)
3802                 GOTO(stop_trans, rc);
3803
3804         /* Set immutable flag, so any modification is disabled until
3805          * the migration is done. Once the migration is interrupted,
3806          * if the resume process find the migrating object has both
3807          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3808          * flag and approve the migration */
3809         la_flag->la_valid = LA_FLAGS;
3810         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3811         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3812 stop_trans:
3813         if (handle != NULL)
3814                 rc = mdd_trans_stop(env, mdd, rc, handle);
3815 out_free:
3816         if (lmm_buf.lb_buf != NULL)
3817                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3818         RETURN(rc);
3819 }
3820
3821 static int mdd_migrate_entries(const struct lu_env *env,
3822                                struct mdd_object *mdd_sobj,
3823                                struct mdd_object *mdd_tobj)
3824 {
3825         struct dt_object        *next = mdd_object_child(mdd_sobj);
3826         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3827         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3828         struct thandle          *handle;
3829         struct dt_it            *it;
3830         const struct dt_it_ops  *iops;
3831         int                      result;
3832         struct lu_dirent        *ent;
3833         int                      rc;
3834         ENTRY;
3835
3836         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3837         if (ent == NULL)
3838                 RETURN(-ENOMEM);
3839
3840         if (!dt_try_as_dir(env, next))
3841                 GOTO(out_ent, rc = -ENOTDIR);
3842         /*
3843          * iterate directories
3844          */
3845         iops = &next->do_index_ops->dio_it;
3846         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3847         if (IS_ERR(it))
3848                 GOTO(out_ent, rc = PTR_ERR(it));
3849
3850         rc = iops->load(env, it, 0);
3851         if (rc == 0)
3852                 rc = iops->next(env, it);
3853         else if (rc > 0)
3854                 rc = 0;
3855         /*
3856          * At this point and across for-loop:
3857          *
3858          *  rc == 0 -> ok, proceed.
3859          *  rc >  0 -> end of directory.
3860          *  rc <  0 -> error.
3861          */
3862         do {
3863                 struct mdd_object       *child;
3864                 char                    *name = mdd_env_info(env)->mti_key;
3865                 int                     len;
3866                 int                     is_dir;
3867                 bool                    target_exist = false;
3868
3869                 len = iops->key_size(env, it);
3870                 if (len == 0)
3871                         goto next;
3872
3873                 result = iops->rec(env, it, (struct dt_rec *)ent,
3874                                    LUDA_FID | LUDA_TYPE);
3875                 if (result == -ESTALE)
3876                         goto next;
3877                 if (result != 0) {
3878                         rc = result;
3879                         goto out;
3880                 }
3881
3882                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3883
3884                 /* Insert new fid with target name into target dir */
3885                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3886                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3887                      ent->lde_name[1] == '.'))
3888                         goto next;
3889
3890                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3891                 if (IS_ERR(child))
3892                         GOTO(out, rc = PTR_ERR(child));
3893
3894                 /* child may not exist, but lu_object_attr will assert this,
3895                  * get type from loh_attr directly */
3896                 is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
3897
3898                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3899
3900                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3901
3902                 /* Check whether the name has been inserted to the target */
3903                 if (dt_try_as_dir(env, dt_tobj)) {
3904                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3905
3906                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3907                                        (struct dt_key *)name);
3908                         if (unlikely(rc == 0))
3909                                 target_exist = true;
3910                 }
3911
3912                 handle = mdd_trans_create(env, mdd);
3913                 if (IS_ERR(handle))
3914                         GOTO(out_put, rc = PTR_ERR(handle));
3915
3916                 /* Note: this transaction is part of migration, and it is not
3917                  * the last step of migration, so we set th_local = 1 to avoid
3918                  * updating last rcvd for this transaction */
3919                 handle->th_local = 1;
3920                 if (likely(!target_exist)) {
3921                         rc = mdo_declare_index_insert(env, mdd_tobj,
3922                                 &ent->lde_fid,
3923                                 child->mod_obj.mo_lu.lo_header->loh_attr,
3924                                 name, handle);
3925                         if (rc != 0)
3926                                 GOTO(out_put, rc);
3927
3928                         if (is_dir) {
3929                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3930                                 if (rc != 0)
3931                                         GOTO(out_put, rc);
3932                         }
3933                 }
3934
3935                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3936                 if (rc != 0)
3937                         GOTO(out_put, rc);
3938
3939                 if (is_dir) {
3940                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3941                         if (rc != 0)
3942                                 GOTO(out_put, rc);
3943
3944                         /* Update .. for child */
3945                         rc = mdo_declare_index_delete(env, child, dotdot,
3946                                                       handle);
3947                         if (rc != 0)
3948                                 GOTO(out_put, rc);
3949
3950                         rc = mdo_declare_index_insert(env, child,
3951                                                       mdd_object_fid(mdd_tobj),
3952                                                       S_IFDIR, dotdot, handle);
3953                         if (rc != 0)
3954                                 GOTO(out_put, rc);
3955                 }
3956
3957                 rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
3958                                                      child, name,
3959                                                      strlen(name),
3960                                                      handle);
3961                 if (rc != 0)
3962                         GOTO(out_put, rc);
3963
3964                 rc = mdd_trans_start(env, mdd, handle);
3965                 if (rc != 0) {
3966                         CERROR("%s: transaction start failed: rc = %d\n",
3967                                mdd2obd_dev(mdd)->obd_name, rc);
3968                         GOTO(out_put, rc);
3969                 }
3970
3971                 if (likely(!target_exist)) {
3972                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3973                                 child->mod_obj.mo_lu.lo_header->loh_attr, name,
3974                                 handle);
3975                         if (rc != 0)
3976                                 GOTO(out_put, rc);
3977                 }
3978
3979                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3980                 if (rc != 0)
3981                         GOTO(out_put, rc);
3982
3983                 if (is_dir) {
3984                         rc = __mdd_index_delete_only(env, child, dotdot,
3985                                                      handle);
3986                         if (rc != 0)
3987                                 GOTO(out_put, rc);
3988
3989                         rc = __mdd_index_insert_only(env, child,
3990                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3991                                          dotdot, handle);
3992                         if (rc != 0)
3993                                 GOTO(out_put, rc);
3994                 }
3995
3996                 rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
3997                                              child, name,
3998                                              strlen(name), handle);
3999
4000 out_put:
4001                 mdd_write_unlock(env, child);
4002                 mdd_object_put(env, child);
4003                 rc = mdd_trans_stop(env, mdd, rc, handle);
4004                 if (rc != 0)
4005                         GOTO(out, rc);
4006 next:
4007                 result = iops->next(env, it);
4008                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
4009                         GOTO(out, rc = -EINTR);
4010
4011                 if (result == -ESTALE)
4012                         goto next;
4013         } while (result == 0);
4014 out:
4015         iops->put(env, it);
4016         iops->fini(env, it);
4017 out_ent:
4018         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
4019         RETURN(rc);
4020 }
4021
4022 static int mdd_declare_update_linkea(const struct lu_env *env,
4023                                      struct mdd_object *mdd_pobj,
4024                                      struct mdd_object *mdd_sobj,
4025                                      struct mdd_object *mdd_tobj,
4026                                      const struct lu_name *child_name,
4027                                      struct linkea_data *ldata,
4028                                      struct thandle *handle)
4029 {
4030         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
4031                                           child_name, ldata, handle, 1);
4032 }
4033
4034 static int mdd_update_linkea(const struct lu_env *env,
4035                              struct mdd_object *mdd_pobj,
4036                              struct mdd_object *mdd_sobj,
4037                              struct mdd_object *mdd_tobj,
4038                              const struct lu_name *child_name,
4039                              struct linkea_data *ldata,
4040                              struct thandle *handle)
4041 {
4042         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
4043                                           child_name, ldata, handle, 0);
4044 }
4045
4046 static int mdd_declare_migrate_update_name(const struct lu_env *env,
4047                                            struct mdd_object *mdd_pobj,
4048                                            struct mdd_object *mdd_sobj,
4049                                            struct mdd_object *mdd_tobj,
4050                                            const struct lu_name *lname,
4051                                            struct lu_attr *la,
4052                                            struct lu_attr *parent_la,
4053                                            struct linkea_data *ldata,
4054                                            struct thandle *handle)
4055 {
4056         struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
4057         struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
4058         int rc;
4059
4060         /* Revert IMMUTABLE flag */
4061         la_flag->la_valid = LA_FLAGS;
4062         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
4063         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
4064         if (rc != 0)
4065                 return rc;
4066
4067         /* delete entry from source dir */
4068         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
4069         if (rc != 0)
4070                 return rc;
4071
4072         if (ldata->ld_buf != NULL) {
4073                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
4074                                                mdd_tobj, lname, ldata, handle);
4075                 if (rc != 0)
4076                         return rc;
4077         }
4078
4079         if (S_ISREG(mdd_object_type(mdd_sobj))) {
4080                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4081                                            handle);
4082                 if (rc != 0)
4083                         return rc;
4084
4085                 handle->th_complex = 1;
4086                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
4087                                            XATTR_NAME_FID,
4088                                            LU_XATTR_REPLACE, handle);
4089                 if (rc < 0)
4090                         return rc;
4091         }
4092
4093         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4094                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
4095                 if (rc != 0)
4096                         return rc;
4097         }
4098
4099         /* new name */
4100         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
4101                                       mdd_object_type(mdd_tobj),
4102                                       lname->ln_name, handle);
4103         if (rc != 0)
4104                 return rc;
4105
4106         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
4107         if (rc != 0)
4108                 return rc;
4109
4110         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4111                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
4112                 if (rc != 0)
4113                         return rc;
4114         }
4115
4116         /* delete old object */
4117         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4118         if (rc != 0)
4119                 return rc;
4120
4121         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
4122                 /* delete old object */
4123                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
4124                 if (rc != 0)
4125                         return rc;
4126                 /* set nlink to 0 */
4127                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
4128                 if (rc != 0)
4129                         return rc;
4130         }
4131
4132         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
4133         if (rc)
4134                 return rc;
4135
4136         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
4137         if (rc != 0)
4138                 return rc;
4139
4140         rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
4141
4142         return rc;
4143 }
4144
4145 static int mdd_migrate_update_name(const struct lu_env *env,
4146                                    struct mdd_object *mdd_pobj,
4147                                    struct mdd_object *mdd_sobj,
4148                                    struct mdd_object *mdd_tobj,
4149                                    const struct lu_name *lname,
4150                                    struct md_attr *ma)
4151 {
4152         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
4153         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4154         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
4155         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
4156         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
4157         struct thandle          *handle;
4158         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
4159         const char              *name = lname->ln_name;
4160         int                     rc;
4161         ENTRY;
4162
4163         /* update time for parent */
4164         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
4165         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
4166         p_la->la_valid = LA_CTIME;
4167
4168         rc = mdd_la_get(env, mdd_sobj, so_attr);
4169         if (rc != 0)
4170                 RETURN(rc);
4171
4172         ldata->ld_buf = NULL;
4173         rc = mdd_links_read(env, mdd_sobj, ldata);
4174         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
4175                 RETURN(rc);
4176
4177         handle = mdd_trans_create(env, mdd);
4178         if (IS_ERR(handle))
4179                 RETURN(PTR_ERR(handle));
4180
4181         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
4182                                              lname, so_attr, p_la, ldata,
4183                                              handle);
4184         if (rc != 0) {
4185                 /* If the migration can not be fit in one transaction, just
4186                  * leave it in the original MDT */
4187                 if (rc == -E2BIG)
4188                         GOTO(stop_trans, rc = 0);
4189                 else
4190                         GOTO(stop_trans, rc);
4191         }
4192
4193         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
4194                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
4195                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4196                PFID(mdd_object_fid(mdd_tobj)));
4197
4198         rc = mdd_trans_start(env, mdd, handle);
4199         if (rc != 0)
4200                 GOTO(stop_trans, rc);
4201
4202         /* Revert IMMUTABLE flag */
4203         la_flag->la_valid = LA_FLAGS;
4204         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
4205         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
4206         if (rc != 0)
4207                 GOTO(stop_trans, rc);
4208
4209         /* Remove source name from source directory */
4210         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
4211         if (rc != 0)
4212                 GOTO(stop_trans, rc);
4213
4214         if (ldata->ld_buf != NULL) {
4215                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
4216                                        lname, ldata, handle);
4217                 if (rc != 0)
4218                         GOTO(stop_trans, rc);
4219
4220                 /*  linkea update might decrease the source object
4221                  *  nlink, let's get the attr again after ref_del */
4222                 rc = mdd_la_get(env, mdd_sobj, so_attr);
4223                 if (rc != 0)
4224                         GOTO(stop_trans, rc);
4225         }
4226
4227         if (S_ISREG(so_attr->la_mode)) {
4228                 if (so_attr->la_nlink == 1) {
4229                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
4230                                            handle);
4231                         if (rc != 0 && rc != -ENODATA)
4232                                 GOTO(stop_trans, rc);
4233
4234                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
4235                                            XATTR_NAME_FID,
4236                                            LU_XATTR_REPLACE, handle);
4237                         if (rc < 0)
4238                                 GOTO(stop_trans, rc);
4239                 }
4240         }
4241
4242         /* Insert new fid with target name into target dir */
4243         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
4244                                 mdd_object_type(mdd_tobj), name, handle);
4245         if (rc != 0)
4246                 GOTO(stop_trans, rc);
4247
4248         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
4249
4250         mdd_sobj->mod_flags |= DEAD_OBJ;
4251         rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
4252         if (rc != 0)
4253                 GOTO(out_unlock, rc);
4254
4255         rc = __mdd_orphan_add(env, mdd_sobj, handle);
4256         if (rc != 0)
4257                 GOTO(out_unlock, rc);
4258
4259         mdo_ref_del(env, mdd_sobj, handle);
4260         if (is_dir)
4261                 mdo_ref_del(env, mdd_sobj, handle);
4262
4263         /* Get the attr again after ref_del */
4264         rc = mdd_la_get(env, mdd_sobj, so_attr);
4265         if (rc != 0)
4266                 GOTO(out_unlock, rc);
4267
4268         ma->ma_attr = *so_attr;
4269         ma->ma_valid |= MA_INODE;
4270
4271         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
4272         if (rc != 0)
4273                 GOTO(out_unlock, rc);
4274
4275         rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
4276                                mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
4277                                mdo2fid(mdd_pobj), lname, lname, handle);
4278         if (rc != 0) {
4279                 CWARN("%s: changelog for migrate %s "DFID
4280                       "under "DFID" failed: rc = %d\n",
4281                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
4282                       PFID(mdd_object_fid(mdd_sobj)),
4283                       PFID(mdd_object_fid(mdd_pobj)), rc);
4284                 /* Sigh, there are no easy way to migrate back the object, so
4285                  * let's reset the result to 0 for now XXX */
4286                 rc = 0;
4287         }
4288 out_unlock:
4289         mdd_write_unlock(env, mdd_sobj);
4290
4291 stop_trans:
4292         rc = mdd_trans_stop(env, mdd, rc, handle);
4293
4294         RETURN(rc);
4295 }
4296
4297 static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
4298                           const struct lu_fid *fid, __u32 *mdt_index)
4299 {
4300         struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
4301         struct seq_server_site *ss;
4302         int rc;
4303
4304         ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
4305
4306         range->lsr_flags = LU_SEQ_RANGE_MDT;
4307         rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
4308         if (rc != 0)
4309                 return rc;
4310
4311         *mdt_index = range->lsr_index;
4312
4313         return 0;
4314 }
4315 /**
4316  * Check whether we should migrate the file/dir
4317  * return val
4318  *      < 0  permission check failed or other error.
4319  *      = 0  the file can be migrated.
4320  *      > 0  the file does not need to be migrated, mostly
4321  *           for multiple link file
4322  **/
4323 static int mdd_migrate_sanity_check(const struct lu_env *env,
4324                                     struct mdd_object *pobj,
4325                                     const struct lu_attr *pattr,
4326                                     struct mdd_object *sobj,
4327                                     struct lu_attr *sattr)
4328 {
4329         struct mdd_thread_info  *info = mdd_env_info(env);
4330         struct linkea_data      *ldata = &info->mti_link_data;
4331         struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
4332         int                     mgr_easize;
4333         struct lu_buf           *mgr_buf;
4334         int                     count;
4335         int                     rc;
4336         __u64 mdt_index;
4337         ENTRY;
4338
4339         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4340         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4341         if (mgr_buf->lb_buf == NULL)
4342                 RETURN(-ENOMEM);
4343
4344         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4345         if (rc > 0) {
4346                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4347
4348                 /* If the object has migrateEA, it means IMMUTE flag
4349                  * is being set by previous migration process, so it
4350                  * needs to override the IMMUTE flag, otherwise the
4351                  * following sanity check will fail */
4352                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4353                                                 LMV_HASH_FLAG_MIGRATION) {
4354                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4355
4356                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4357                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4358                                mdd2obd_dev(mdd)->obd_name,
4359                                PFID(mdd_object_fid(sobj)));
4360                 }
4361         }
4362
4363         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4364                                      sobj, sattr, NULL, NULL);
4365         if (rc != 0)
4366                 RETURN(rc);
4367
4368         /* Then it will check if the file should be migrated. If the file
4369          * has mulitple links, we only need migrate the file if all of its
4370          * entries has been migrated to the remote MDT */
4371         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4372                 RETURN(0);
4373
4374         rc = mdd_links_read(env, sobj, ldata);
4375         if (rc != 0) {
4376                 /* For multiple links files, if there are no linkEA data at all,
4377                  * means the file might be created before linkEA is enabled, and
4378                  * all of its links should not be migrated yet, otherwise it
4379                  * should have some linkEA there */
4380                 if (rc == -ENOENT || rc == -ENODATA)
4381                         RETURN(1);
4382                 RETURN(rc);
4383         }
4384
4385         mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
4386         /* If there are still links locally, then the file will not be
4387          * migrated. */
4388         LASSERT(ldata->ld_leh != NULL);
4389
4390         /* If the linkEA is overflow, then means there are some unknown name
4391          * entries under unknown parents, that will prevent the migration. */
4392         if (unlikely(ldata->ld_leh->leh_overflow_time))
4393                 RETURN(1);
4394
4395         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4396         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4397                 struct lu_name          lname;
4398                 struct lu_fid           fid;
4399                 __u32                   parent_mdt_index;
4400
4401                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4402                                     &lname, &fid);
4403                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4404                                                          ldata->ld_reclen);
4405
4406                 rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
4407                 if (rc != 0)
4408                         RETURN(rc);
4409
4410                 /* Migrate the object only if none of its parents are on the
4411                  * current MDT. */
4412                 if (parent_mdt_index != mdt_index)
4413                         continue;
4414
4415                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4416                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4417                        lname.ln_name, PFID(&fid));
4418                 rc = 1;
4419                 break;
4420         }
4421
4422         RETURN(rc);
4423 }
4424
4425 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4426                        struct md_object *sobj, const struct lu_name *lname,
4427                        struct md_object *tobj, struct md_attr *ma)
4428 {
4429         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4430         struct mdd_device       *mdd = mdo2mdd(pobj);
4431         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4432         struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
4433         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4434         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4435         bool                    created = false;
4436         int                     rc;
4437
4438         ENTRY;
4439         /* If the file will being migrated, it will check whether
4440          * the file is being opened by someone else right now */
4441         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4442         if (mdd_sobj->mod_count > 0) {
4443                 CDEBUG(D_OTHER,
4444                        "%s: "DFID"%s is already opened count %d: rc = %d\n",
4445                        mdd2obd_dev(mdd)->obd_name,
4446                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4447                        mdd_sobj->mod_count, -EBUSY);
4448                 mdd_read_unlock(env, mdd_sobj);
4449                 GOTO(put, rc = -EBUSY);
4450         }
4451         mdd_read_unlock(env, mdd_sobj);
4452
4453         rc = mdd_la_get(env, mdd_sobj, so_attr);
4454         if (rc != 0)
4455                 GOTO(put, rc);
4456
4457         rc = mdd_la_get(env, mdd_pobj, pattr);
4458         if (rc != 0)
4459                 GOTO(put, rc);
4460
4461         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4462         if (rc != 0) {
4463                 if (rc > 0)
4464                         rc = 0;
4465                 GOTO(put, rc);
4466         }
4467
4468         /* Sigh, it is impossible to finish all of migration in a single
4469          * transaction, for example migrating big directory entries to the
4470          * new MDT, it needs insert all of name entries of children in the
4471          * new directory.
4472          *
4473          * So migration will be done in multiple steps and transactions.
4474          *
4475          * 1. create an orphan object on the remote MDT in one transaction.
4476          * 2. migrate extend attributes to the new target file/directory.
4477          * 3. For directory, migrate the entries to the new MDT and update
4478          * linkEA of each children. Because we can not migrate all entries
4479          * in a single transaction, so the migrating directory will become
4480          * a striped directory during migration, so once the process is
4481          * interrupted, the directory is still accessible. (During lookup,
4482          * client will locate the name by searching both original and target
4483          * object).
4484          * 4. Finally, update the name/FID to point to the new file/directory
4485          * in a separate transaction.
4486          */
4487
4488         /* step 1: Check whether the orphan object has been created, and create
4489          * orphan object on the remote MDT if needed */
4490         if (!mdd_object_exists(mdd_tobj)) {
4491                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4492                                         lname, so_attr);
4493                 if (rc != 0)
4494                         GOTO(put, rc);
4495                 created = true;
4496         }
4497
4498         LASSERT(mdd_object_exists(mdd_tobj));
4499         /* step 2: migrate xattr */
4500         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4501         if (rc != 0)
4502                 GOTO(put, rc);
4503
4504         /* step 3: migrate name entries to the orphan object */
4505         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4506                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4507                 if (rc != 0)
4508                         GOTO(put, rc);
4509                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4510                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4511                         GOTO(put, rc = 0);
4512         } else {
4513                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4514         }
4515
4516         LASSERT(mdd_object_exists(mdd_tobj));
4517         /* step 4: update name entry to the new object */
4518         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4519                                      ma);
4520         if (rc != 0)
4521                 GOTO(put, rc);
4522
4523         /* newly created target was not locked, don't cache its attributes */
4524         if (created)
4525                 mdd_invalidate(env, tobj);
4526 put:
4527         RETURN(rc);
4528 }
4529
4530 const struct md_dir_operations mdd_dir_ops = {
4531         .mdo_is_subdir     = mdd_is_subdir,
4532         .mdo_lookup        = mdd_lookup,
4533         .mdo_create        = mdd_create,
4534         .mdo_rename        = mdd_rename,
4535         .mdo_link          = mdd_link,
4536         .mdo_unlink        = mdd_unlink,
4537         .mdo_create_data   = mdd_create_data,
4538         .mdo_migrate       = mdd_migrate,
4539 };