Whamcloud - gitweb
LU-7074 mdd: validate the linkea before packing
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static inline int
61 mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
62 {
63         if (!lu_name_is_valid(ln))
64                 return -EINVAL;
65         else if (ln->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)
66                 return -ENAMETOOLONG;
67         else
68                 return 0;
69 }
70
71 /* Get FID from name and parent */
72 static int
73 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
74              const struct lu_attr *pattr, const struct lu_name *lname,
75              struct lu_fid* fid, int mask)
76 {
77         const char *name                = lname->ln_name;
78         const struct dt_key *key        = (const struct dt_key *)name;
79         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
80         struct mdd_device *m            = mdo2mdd(pobj);
81         struct dt_object *dir           = mdd_object_child(mdd_obj);
82         int rc;
83         ENTRY;
84
85         if (unlikely(mdd_is_dead_obj(mdd_obj)))
86                 RETURN(-ESTALE);
87
88         if (mdd_object_remote(mdd_obj)) {
89                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
90                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
91         } else if (!mdd_object_exists(mdd_obj)) {
92                 RETURN(-ESTALE);
93         }
94
95         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
96                                             MOR_TGT_PARENT);
97         if (rc)
98                 RETURN(rc);
99
100         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
101                    dt_try_as_dir(env, dir)))
102                 rc = dt_lookup(env, dir, (struct dt_rec *)fid, key);
103         else
104                 rc = -ENOTDIR;
105
106         RETURN(rc);
107 }
108
109 int mdd_lookup(const struct lu_env *env,
110                struct md_object *pobj, const struct lu_name *lname,
111                struct lu_fid *fid, struct md_op_spec *spec)
112 {
113         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
114         int rc;
115         ENTRY;
116
117         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr);
118         if (rc != 0)
119                 RETURN(rc);
120
121         rc = __mdd_lookup(env, pobj, pattr, lname, fid,
122                           (spec != NULL && spec->sp_permitted) ? 0 : MAY_EXEC);
123         RETURN(rc);
124 }
125
126 /**
127  * Get parent FID of the directory
128  *
129  * Read parent FID from linkEA, if that fails, then do lookup
130  * dotdot to get the parent FID.
131  *
132  * \param[in] env       execution environment
133  * \param[in] obj       object from which to find the parent FID
134  * \param[in] attr      attribute of the object
135  * \param[out] fid      fid to get the parent FID
136  *
137  * \retval              0 if getting the parent FID succeeds.
138  * \retval              negative errno if getting the parent FID fails.
139  **/
140 static inline int mdd_parent_fid(const struct lu_env *env,
141                                  struct mdd_object *obj,
142                                  const struct lu_attr *attr,
143                                  struct lu_fid *fid)
144 {
145         struct mdd_thread_info  *info = mdd_env_info(env);
146         struct linkea_data      ldata = { NULL };
147         struct lu_buf           *buf = &info->mti_link_buf;
148         struct lu_name          lname;
149         int                     rc = 0;
150
151         ENTRY;
152
153         LASSERT(S_ISDIR(mdd_object_type(obj)));
154
155         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
156         if (buf->lb_buf == NULL)
157                 GOTO(lookup, rc = 0);
158
159         ldata.ld_buf = buf;
160         rc = mdd_links_read(env, obj, &ldata);
161         if (rc != 0)
162                 GOTO(lookup, rc);
163
164         LASSERT(ldata.ld_leh != NULL);
165         /* Directory should only have 1 parent */
166         if (ldata.ld_leh->leh_reccount > 1)
167                 GOTO(lookup, rc);
168
169         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
170
171         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &lname, fid);
172         if (likely(fid_is_sane(fid)))
173                 RETURN(0);
174 lookup:
175         rc =  __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
176         RETURN(rc);
177 }
178
179 /*
180  * For root fid use special function, which does not compare version component
181  * of fid. Version component is different for root fids on all MDTs.
182  */
183 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
184 {
185         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
186                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
187 }
188
189 /*
190  * return 1: if lf is the fid of the ancestor of p1;
191  * return 0: if not;
192  *
193  * return -EREMOTE: if remote object is found, in this
194  * case fid of remote object is saved to @pf;
195  *
196  * otherwise: values < 0, errors.
197  */
198 static int mdd_is_parent(const struct lu_env *env,
199                         struct mdd_device *mdd,
200                         struct mdd_object *p1,
201                         const struct lu_attr *attr,
202                         const struct lu_fid *lf,
203                         struct lu_fid *pf)
204 {
205         struct mdd_object *parent = NULL;
206         struct lu_fid *pfid;
207         int rc;
208         ENTRY;
209
210         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
211         pfid = &mdd_env_info(env)->mti_fid;
212
213         /* Check for root first. */
214         if (mdd_is_root(mdd, mdo2fid(p1)))
215                 RETURN(0);
216
217         for(;;) {
218                 /* this is done recursively */
219                 rc = mdd_parent_fid(env, p1, attr, pfid);
220                 if (rc)
221                         GOTO(out, rc);
222                 if (mdd_is_root(mdd, pfid))
223                         GOTO(out, rc = 0);
224                 if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
225                         GOTO(out, rc = 0);
226                 if (lu_fid_eq(pfid, lf))
227                         GOTO(out, rc = 1);
228                 if (parent != NULL)
229                         mdd_object_put(env, parent);
230
231                 parent = mdd_object_find(env, mdd, pfid);
232                 if (IS_ERR(parent))
233                         GOTO(out, rc = PTR_ERR(parent));
234                 p1 = parent;
235         }
236         EXIT;
237 out:
238         if (parent && !IS_ERR(parent))
239                 mdd_object_put(env, parent);
240         return rc;
241 }
242
243 /*
244  * No permission check is needed.
245  *
246  * returns 1: if fid is ancestor of @mo;
247  * returns 0: if fid is not an ancestor of @mo;
248  *
249  * returns EREMOTE if remote object is found, fid of remote object is saved to
250  * @fid;
251  *
252  * returns < 0: if error
253  */
254 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
255                   const struct lu_fid *fid, struct lu_fid *sfid)
256 {
257         struct mdd_device *mdd = mdo2mdd(mo);
258         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
259         int rc;
260         ENTRY;
261
262         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
263                 RETURN(0);
264
265         rc = mdd_la_get(env, md2mdd_obj(mo), attr);
266         if (rc != 0)
267                 RETURN(rc);
268
269         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
270         if (rc == 0) {
271                 /* found root */
272                 fid_zero(sfid);
273         } else if (rc == 1) {
274                 /* found @fid is parent */
275                 *sfid = *fid;
276                 rc = 0;
277         }
278         RETURN(rc);
279 }
280
281 /*
282  * Check that @dir contains no entries except (possibly) dot and dotdot.
283  *
284  * Returns:
285  *
286  *             0        empty
287  *      -ENOTDIR        not a directory object
288  *    -ENOTEMPTY        not empty
289  *           -ve        other error
290  *
291  */
292 static int mdd_dir_is_empty(const struct lu_env *env,
293                             struct mdd_object *dir)
294 {
295         struct dt_it     *it;
296         struct dt_object *obj;
297         const struct dt_it_ops *iops;
298         int result;
299         ENTRY;
300
301         obj = mdd_object_child(dir);
302         if (!dt_try_as_dir(env, obj))
303                 RETURN(-ENOTDIR);
304
305         iops = &obj->do_index_ops->dio_it;
306         it = iops->init(env, obj, LUDA_64BITHASH);
307         if (!IS_ERR(it)) {
308                 result = iops->get(env, it, (const struct dt_key *)"");
309                 if (result > 0) {
310                         int i;
311                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
312                                 result = iops->next(env, it);
313                         if (result == 0)
314                                 result = -ENOTEMPTY;
315                         else if (result == 1)
316                                 result = 0;
317                 } else if (result == 0)
318                         /*
319                          * Huh? Index contains no zero key?
320                          */
321                         result = -EIO;
322
323                 iops->put(env, it);
324                 iops->fini(env, it);
325         } else
326                 result = PTR_ERR(it);
327         RETURN(result);
328 }
329
330 /**
331  * Determine if the target object can be hard linked, and right now it only
332  * checks if the link count reach the maximum limit. Note: for ldiskfs, the
333  * directory nlink count might exceed the maximum link count(see
334  * osd_object_ref_add), so it only check nlink for non-directories.
335  *
336  * \param[in] env       thread environment
337  * \param[in] obj       object being linked to
338  * \param[in] la        attributes of \a obj
339  *
340  * \retval              0 if \a obj can be hard linked
341  * \retval              negative error if \a obj is a directory or has too
342  *                      many links
343  */
344 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
345                           const struct lu_attr *la)
346 {
347         struct mdd_device *m = mdd_obj2mdd_dev(obj);
348         ENTRY;
349
350         LASSERT(la != NULL);
351
352         /* Subdir count limitation can be broken through
353          * (see osd_object_ref_add), so only check non-directory here. */
354         if (!S_ISDIR(la->la_mode) &&
355             la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
356                 RETURN(-EMLINK);
357
358         RETURN(0);
359 }
360
361 /**
362  * Check whether it may create the cobj under the pobj.
363  *
364  * \param[in] env       execution environment
365  * \param[in] pobj      the parent directory
366  * \param[in] pattr     the attribute of the parent directory
367  * \param[in] cobj      the child to be created
368  * \param[in] check_perm        if check WRITE|EXEC permission for parent
369  *
370  * \retval              = 0 create the child under this dir is allowed
371  * \retval              negative errno create the child under this dir is
372  *                      not allowed
373  */
374 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
375                    const struct lu_attr *pattr, struct mdd_object *cobj,
376                    bool check_perm)
377 {
378         struct mdd_thread_info *info = mdd_env_info(env);
379         struct lu_buf   *xbuf;
380         int rc = 0;
381         ENTRY;
382
383         if (cobj && mdd_object_exists(cobj))
384                 RETURN(-EEXIST);
385
386         if (mdd_is_dead_obj(pobj))
387                 RETURN(-ENOENT);
388
389         /* If the parent is a sub-stripe, check whether it is dead */
390         xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
391         rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV);
392         if (unlikely(rc > 0)) {
393                 struct lmv_mds_md_v1  *lmv1 = xbuf->lb_buf;
394
395                 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
396                     le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
397                         RETURN(-ESTALE);
398         }
399         rc = 0;
400
401         if (check_perm)
402                 rc = mdd_permission_internal_locked(env, pobj, pattr,
403                                                     MAY_WRITE | MAY_EXEC,
404                                                     MOR_TGT_PARENT);
405         RETURN(rc);
406 }
407
408 /*
409  * Check whether can unlink from the pobj in the case of "cobj == NULL".
410  */
411 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
412                    const struct lu_attr *pattr, const struct lu_attr *attr)
413 {
414         int rc;
415         ENTRY;
416
417         if (mdd_is_dead_obj(pobj))
418                 RETURN(-ENOENT);
419
420         if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
421                 RETURN(-EPERM);
422
423         rc = mdd_permission_internal_locked(env, pobj, pattr,
424                                             MAY_WRITE | MAY_EXEC,
425                                             MOR_TGT_PARENT);
426         if (rc != 0)
427                 RETURN(rc);
428
429         if (pattr->la_flags & LUSTRE_APPEND_FL)
430                 RETURN(-EPERM);
431
432         RETURN(rc);
433 }
434
435 /*
436  * pobj == NULL is remote ops case, under such case, pobj's
437  * VTX feature has been checked already, no need check again.
438  */
439 static inline int mdd_is_sticky(const struct lu_env *env,
440                                 struct mdd_object *pobj,
441                                 const struct lu_attr *pattr,
442                                 struct mdd_object *cobj,
443                                 const struct lu_attr *cattr)
444 {
445         struct lu_ucred *uc = lu_ucred_assert(env);
446
447         if (pobj != NULL) {
448                 LASSERT(pattr != NULL);
449                 if (!(pattr->la_mode & S_ISVTX) ||
450                     (pattr->la_uid == uc->uc_fsuid))
451                         return 0;
452         }
453
454         LASSERT(cattr != NULL);
455         if (cattr->la_uid == uc->uc_fsuid)
456                 return 0;
457
458         return !md_capable(uc, CFS_CAP_FOWNER);
459 }
460
461 static int mdd_may_delete_entry(const struct lu_env *env,
462                                 struct mdd_object *pobj,
463                                 const struct lu_attr *pattr,
464                                 int check_perm)
465 {
466         ENTRY;
467
468         LASSERT(pobj != NULL);
469         if (!mdd_object_exists(pobj))
470                 RETURN(-ENOENT);
471
472         if (mdd_is_dead_obj(pobj))
473                 RETURN(-ENOENT);
474
475         if (check_perm) {
476                 int rc;
477                 rc = mdd_permission_internal_locked(env, pobj, pattr,
478                                             MAY_WRITE | MAY_EXEC,
479                                             MOR_TGT_PARENT);
480                 if (rc)
481                         RETURN(rc);
482         }
483
484         if (pattr->la_flags & LUSTRE_APPEND_FL)
485                 RETURN(-EPERM);
486
487         RETURN(0);
488 }
489
490 /*
491  * Check whether it may delete the cobj from the pobj.
492  * pobj maybe NULL
493  */
494 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
495                    const struct lu_attr *tpattr, struct mdd_object *tobj,
496                    const struct lu_attr *tattr, const struct lu_attr *cattr,
497                    int check_perm, int check_empty)
498 {
499         int rc = 0;
500         ENTRY;
501
502         if (tpobj) {
503                 LASSERT(tpattr != NULL);
504                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
505                 if (rc != 0)
506                         RETURN(rc);
507         }
508
509         if (tobj == NULL)
510                 RETURN(0);
511
512         if (!mdd_object_exists(tobj))
513                 RETURN(-ENOENT);
514
515         if (mdd_is_dead_obj(tobj))
516                 RETURN(-ESTALE);
517
518         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
519                 RETURN(-EPERM);
520
521         if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
522                 RETURN(-EPERM);
523
524         /* additional check the rename case */
525         if (cattr) {
526                 if (S_ISDIR(cattr->la_mode)) {
527                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
528
529                         if (!S_ISDIR(tattr->la_mode))
530                                 RETURN(-ENOTDIR);
531
532                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
533                                 RETURN(-EBUSY);
534                 } else if (S_ISDIR(tattr->la_mode))
535                         RETURN(-EISDIR);
536         }
537
538         if (S_ISDIR(tattr->la_mode) && check_empty)
539                 rc = mdd_dir_is_empty(env, tobj);
540
541         RETURN(rc);
542 }
543
544 /**
545  * Check whether it can create the link file(linked to @src_obj) under
546  * the target directory(@tgt_obj), and src_obj has been locked by
547  * mdd_write_lock.
548  *
549  * \param[in] env       execution environment
550  * \param[in] tgt_obj   the target directory
551  * \param[in] tattr     attributes of target directory
552  * \param[in] lname     the link name
553  * \param[in] src_obj   source object for link
554  * \param[in] cattr     attributes for source object
555  *
556  * \retval              = 0 it is allowed to create the link file under tgt_obj
557  * \retval              negative error not allowed to create the link file
558  */
559 static int mdd_link_sanity_check(const struct lu_env *env,
560                                  struct mdd_object *tgt_obj,
561                                  const struct lu_attr *tattr,
562                                  const struct lu_name *lname,
563                                  struct mdd_object *src_obj,
564                                  const struct lu_attr *cattr)
565 {
566         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
567         int rc = 0;
568         ENTRY;
569
570         if (!mdd_object_exists(src_obj))
571                 RETURN(-ENOENT);
572
573         if (mdd_is_dead_obj(src_obj))
574                 RETURN(-ESTALE);
575
576         /* Local ops, no lookup before link, check filename length here. */
577         rc = mdd_name_check(m, lname);
578         if (rc < 0)
579                 RETURN(rc);
580
581         if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
582                 RETURN(-EPERM);
583
584         if (S_ISDIR(mdd_object_type(src_obj)))
585                 RETURN(-EPERM);
586
587         LASSERT(src_obj != tgt_obj);
588         rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
589         if (rc != 0)
590                 RETURN(rc);
591
592         rc = __mdd_may_link(env, src_obj, cattr);
593
594         RETURN(rc);
595 }
596
597 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
598                                    const char *name, struct thandle *handle)
599 {
600         struct dt_object *next = mdd_object_child(pobj);
601         int rc;
602         ENTRY;
603
604         if (dt_try_as_dir(env, next))
605                 rc = dt_delete(env, next, (struct dt_key *)name, handle);
606         else
607                 rc = -ENOTDIR;
608
609         RETURN(rc);
610 }
611
612 static int __mdd_index_insert_only(const struct lu_env *env,
613                                    struct mdd_object *pobj,
614                                    const struct lu_fid *lf, __u32 type,
615                                    const char *name,
616                                    struct thandle *handle)
617 {
618         struct dt_object *next = mdd_object_child(pobj);
619         int               rc;
620         ENTRY;
621
622         if (dt_try_as_dir(env, next)) {
623                 struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
624                 struct lu_ucred         *uc  = lu_ucred_check(env);
625                 int                      ignore_quota;
626
627                 rec->rec_fid = lf;
628                 rec->rec_type = type;
629                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
630                 rc = dt_insert(env, next, (const struct dt_rec *)rec,
631                                (const struct dt_key *)name, handle,
632                                ignore_quota);
633         } else {
634                 rc = -ENOTDIR;
635         }
636         RETURN(rc);
637 }
638
639 /* insert named index, add reference if isdir */
640 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
641                               const struct lu_fid *lf, __u32 type,
642                               const char *name, struct thandle *handle)
643 {
644         int rc;
645         ENTRY;
646
647         rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
648         if (rc == 0 && S_ISDIR(type)) {
649                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
650                 mdo_ref_add(env, pobj, handle);
651                 mdd_write_unlock(env, pobj);
652         }
653
654         RETURN(rc);
655 }
656
657 /* delete named index, drop reference if isdir */
658 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
659                               const char *name, int is_dir,
660                               struct thandle *handle)
661 {
662         int               rc;
663         ENTRY;
664
665         rc = __mdd_index_delete_only(env, pobj, name, handle);
666         if (rc == 0 && is_dir) {
667                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
668                 mdo_ref_del(env, pobj, handle);
669                 mdd_write_unlock(env, pobj);
670         }
671
672         RETURN(rc);
673 }
674
675 static int mdd_llog_record_calc_size(const struct lu_env *env,
676                                      const struct lu_name *tname,
677                                      const struct lu_name *sname)
678 {
679         const struct lu_ucred   *uc = lu_ucred(env);
680         enum changelog_rec_flags crf = 0;
681         size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
682                                             sizeof(struct changelog_rec);
683
684         if (sname != NULL)
685                 crf |= CLF_RENAME;
686
687         if (uc != NULL && uc->uc_jobid[0] != '\0')
688                 crf |= CLF_JOBID;
689
690         return llog_data_len(hdr_size + changelog_rec_offset(crf) +
691                              (tname != NULL ? tname->ln_namelen : 0) +
692                              (sname != NULL ? 1 + sname->ln_namelen : 0));
693 }
694
695 int mdd_declare_changelog_store(const struct lu_env *env,
696                                 struct mdd_device *mdd,
697                                 const struct lu_name *tname,
698                                 const struct lu_name *sname,
699                                 struct thandle *handle)
700 {
701         struct obd_device               *obd = mdd2obd_dev(mdd);
702         struct llog_ctxt                *ctxt;
703         struct llog_changelog_rec       *rec;
704         struct lu_buf                   *buf;
705         struct thandle                  *llog_th;
706         int                              reclen;
707         int                              rc;
708
709         /* Not recording */
710         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
711                 return 0;
712
713         reclen = mdd_llog_record_calc_size(env, tname, sname);
714         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
715         if (buf->lb_buf == NULL)
716                 return -ENOMEM;
717
718         rec = buf->lb_buf;
719         rec->cr_hdr.lrh_len = reclen;
720         rec->cr_hdr.lrh_type = CHANGELOG_REC;
721
722         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
723         if (ctxt == NULL)
724                 return -ENXIO;
725
726         llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj);
727         if (IS_ERR(llog_th))
728                 GOTO(out_put, rc = PTR_ERR(llog_th));
729
730         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th);
731
732 out_put:
733         llog_ctxt_put(ctxt);
734
735         return rc;
736 }
737
738 /** Add a changelog entry \a rec to the changelog llog
739  * \param mdd
740  * \param rec
741  * \param handle - currently ignored since llogs start their own transaction;
742  *              this will hopefully be fixed in llog rewrite
743  * \retval 0 ok
744  */
745 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
746                         struct llog_changelog_rec *rec, struct thandle *th)
747 {
748         struct obd_device       *obd = mdd2obd_dev(mdd);
749         struct llog_ctxt        *ctxt;
750         struct thandle          *llog_th;
751         int                      rc;
752
753         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
754                                             changelog_rec_varsize(&rec->cr));
755
756         /* llog_lvfs_write_rec sets the llog tail len */
757         rec->cr_hdr.lrh_type = CHANGELOG_REC;
758         rec->cr.cr_time = cl_time();
759
760         spin_lock(&mdd->mdd_cl.mc_lock);
761         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
762          * but as long as the MDD transactions are ordered correctly for e.g.
763          * rename conflicts, I don't think this should matter. */
764         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
765         spin_unlock(&mdd->mdd_cl.mc_lock);
766
767         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
768         if (ctxt == NULL)
769                 return -ENXIO;
770
771         llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj);
772         if (IS_ERR(llog_th))
773                 GOTO(out_put, rc = PTR_ERR(llog_th));
774
775         /* nested journal transaction */
776         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
777
778 out_put:
779         llog_ctxt_put(ctxt);
780         if (rc > 0)
781                 rc = 0;
782         return rc;
783 }
784
785 static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
786                                          const struct lu_fid *sfid,
787                                          const struct lu_fid *spfid,
788                                          const struct lu_name *sname)
789 {
790         struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
791         size_t                           extsize = sname->ln_namelen + 1;
792
793         LASSERT(sfid != NULL);
794         LASSERT(spfid != NULL);
795         LASSERT(sname != NULL);
796
797         rnm->cr_sfid = *sfid;
798         rnm->cr_spfid = *spfid;
799
800         changelog_rec_name(rec)[rec->cr_namelen] = '\0';
801         strlcpy(changelog_rec_sname(rec), sname->ln_name, extsize);
802         rec->cr_namelen += extsize;
803 }
804
805 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
806 {
807         struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
808
809         if (jobid == NULL || jobid[0] == '\0')
810                 return;
811
812         strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
813 }
814
815 /** Store a namespace change changelog record
816  * If this fails, we must fail the whole transaction; we don't
817  * want the change to commit without the log entry.
818  * \param target - mdd_object of change
819  * \param tpfid - target parent dir/object fid
820  * \param sfid - source object fid
821  * \param spfid - source parent fid
822  * \param tname - target name string
823  * \param sname - source name string
824  * \param handle - transaction handle
825  */
826 int mdd_changelog_ns_store(const struct lu_env *env,
827                            struct mdd_device *mdd,
828                            enum changelog_rec_type type,
829                            enum changelog_rec_flags crf,
830                            struct mdd_object *target,
831                            const struct lu_fid *tpfid,
832                            const struct lu_fid *sfid,
833                            const struct lu_fid *spfid,
834                            const struct lu_name *tname,
835                            const struct lu_name *sname,
836                            struct thandle *handle)
837 {
838         const struct lu_ucred           *uc = lu_ucred(env);
839         struct llog_changelog_rec       *rec;
840         struct lu_buf                   *buf;
841         int                              reclen;
842         int                              rc;
843         ENTRY;
844
845         /* Not recording */
846         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
847                 RETURN(0);
848
849         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
850                 RETURN(0);
851
852         LASSERT(tpfid != NULL);
853         LASSERT(tname != NULL);
854         LASSERT(handle != NULL);
855
856         reclen = mdd_llog_record_calc_size(env, tname, sname);
857         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
858         if (buf->lb_buf == NULL)
859                 RETURN(-ENOMEM);
860         rec = buf->lb_buf;
861
862         crf &= CLF_FLAGMASK;
863
864         if (uc != NULL && uc->uc_jobid[0] != '\0')
865                 crf |= CLF_JOBID;
866
867         if (sname != NULL)
868                 crf |= CLF_RENAME;
869         else
870                 crf |= CLF_VERSION;
871
872         rec->cr.cr_flags = crf;
873         rec->cr.cr_type = (__u32)type;
874         rec->cr.cr_pfid = *tpfid;
875         rec->cr.cr_namelen = tname->ln_namelen;
876         memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
877
878         if (crf & CLF_RENAME)
879                 mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
880
881         if (crf & CLF_JOBID)
882                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
883
884         if (likely(target != NULL)) {
885                 rec->cr.cr_tfid = *mdo2fid(target);
886                 target->mod_cltime = cfs_time_current_64();
887         } else {
888                 fid_zero(&rec->cr.cr_tfid);
889         }
890
891         rc = mdd_changelog_store(env, mdd, rec, handle);
892         if (rc < 0) {
893                 CERROR("%s: cannot store changelog record: type = %d, "
894                        "name = '%s', t = "DFID", p = "DFID": rc = %d\n",
895                        mdd2obd_dev(mdd)->obd_name, type, tname->ln_name,
896                        PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), rc);
897                 return -EFAULT;
898         }
899
900         return 0;
901 }
902
903 static int __mdd_links_add(const struct lu_env *env,
904                            struct mdd_object *mdd_obj,
905                            struct linkea_data *ldata,
906                            const struct lu_name *lname,
907                            const struct lu_fid *pfid,
908                            int first, int check)
909 {
910         int rc;
911
912         if (ldata->ld_leh == NULL) {
913                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
914                 if (rc) {
915                         if (rc != -ENODATA)
916                                 return rc;
917                         rc = linkea_data_new(ldata,
918                                              &mdd_env_info(env)->mti_link_buf);
919                         if (rc)
920                                 return rc;
921                 }
922         }
923
924         if (check) {
925                 rc = linkea_links_find(ldata, lname, pfid);
926                 if (rc && rc != -ENOENT)
927                         return rc;
928                 if (rc == 0)
929                         return -EEXIST;
930         }
931
932         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
933                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
934
935                 *tfid = *pfid;
936                 tfid->f_ver = ~0;
937                 linkea_add_buf(ldata, lname, tfid);
938         }
939
940         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
941                 linkea_add_buf(ldata, lname, pfid);
942
943         return linkea_add_buf(ldata, lname, pfid);
944 }
945
946 static int __mdd_links_del(const struct lu_env *env,
947                            struct mdd_object *mdd_obj,
948                            struct linkea_data *ldata,
949                            const struct lu_name *lname,
950                            const struct lu_fid *pfid)
951 {
952         int rc;
953
954         if (ldata->ld_leh == NULL) {
955                 rc = mdd_links_read(env, mdd_obj, ldata);
956                 if (rc)
957                         return rc;
958         }
959
960         rc = linkea_links_find(ldata, lname, pfid);
961         if (rc)
962                 return rc;
963
964         linkea_del_buf(ldata, lname);
965         return 0;
966 }
967
968 static int mdd_linkea_prepare(const struct lu_env *env,
969                               struct mdd_object *mdd_obj,
970                               const struct lu_fid *oldpfid,
971                               const struct lu_name *oldlname,
972                               const struct lu_fid *newpfid,
973                               const struct lu_name *newlname,
974                               int first, int check,
975                               struct linkea_data *ldata)
976 {
977         int rc = 0;
978         int rc2 = 0;
979         ENTRY;
980
981         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
982                 return 0;
983
984         LASSERT(oldpfid != NULL || newpfid != NULL);
985
986         if (mdd_obj->mod_flags & DEAD_OBJ) {
987                 /* Prevent linkea to be updated which is NOT necessary. */
988                 ldata->ld_reclen = 0;
989                 /* No more links, don't bother */
990                 RETURN(0);
991         }
992
993         if (oldpfid != NULL) {
994                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
995                 if (rc) {
996                         if ((check == 1) ||
997                             (rc != -ENODATA && rc != -ENOENT))
998                                 RETURN(rc);
999                         /* No changes done. */
1000                         rc = 0;
1001                 }
1002         }
1003
1004         /* If renaming, add the new record */
1005         if (newpfid != NULL) {
1006                 /* even if the add fails, we still delete the out-of-date
1007                  * old link */
1008                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
1009                                       first, check);
1010         }
1011
1012         rc = rc != 0 ? rc : rc2;
1013
1014         RETURN(rc);
1015 }
1016
1017 int mdd_links_rename(const struct lu_env *env,
1018                      struct mdd_object *mdd_obj,
1019                      const struct lu_fid *oldpfid,
1020                      const struct lu_name *oldlname,
1021                      const struct lu_fid *newpfid,
1022                      const struct lu_name *newlname,
1023                      struct thandle *handle,
1024                      struct linkea_data *ldata,
1025                      int first, int check)
1026 {
1027         int rc = 0;
1028         ENTRY;
1029
1030         if (ldata == NULL) {
1031                 ldata = &mdd_env_info(env)->mti_link_data;
1032                 memset(ldata, 0, sizeof(*ldata));
1033                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1034                                         newpfid, newlname, first, check,
1035                                         ldata);
1036                 if (rc != 0)
1037                         GOTO(out, rc);
1038         }
1039
1040         if (ldata->ld_reclen != 0)
1041                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1042         EXIT;
1043 out:
1044         if (rc != 0) {
1045                 int error = 1;
1046                 if (rc == -EOVERFLOW || rc == -ENOSPC)
1047                         error = 0;
1048                 if (newlname == NULL)
1049                         CDEBUG(error ? D_ERROR : D_OTHER,
1050                                "link_ea add failed %d "DFID"\n",
1051                                rc, PFID(mdd_object_fid(mdd_obj)));
1052                 else if (oldpfid == NULL)
1053                         CDEBUG(error ? D_ERROR : D_OTHER,
1054                                "link_ea add '%.*s' failed %d "DFID"\n",
1055                                newlname->ln_namelen, newlname->ln_name,
1056                                rc, PFID(mdd_object_fid(mdd_obj)));
1057                 else if (newpfid == NULL)
1058                         CDEBUG(error ? D_ERROR : D_OTHER,
1059                                "link_ea del '%.*s' failed %d "DFID"\n",
1060                                oldlname->ln_namelen, oldlname->ln_name,
1061                                rc, PFID(mdd_object_fid(mdd_obj)));
1062                 else
1063                         CDEBUG(error ? D_ERROR : D_OTHER,
1064                                "link_ea rename '%.*s'->'%.*s' failed %d "
1065                                DFID"\n",
1066                                oldlname->ln_namelen, oldlname->ln_name,
1067                                newlname->ln_namelen, newlname->ln_name,
1068                                rc, PFID(mdd_object_fid(mdd_obj)));
1069         }
1070
1071         if (is_vmalloc_addr(ldata->ld_buf))
1072                 /* if we vmalloced a large buffer drop it */
1073                 lu_buf_free(ldata->ld_buf);
1074
1075         return rc;
1076 }
1077
1078 static inline int mdd_links_add(const struct lu_env *env,
1079                                 struct mdd_object *mdd_obj,
1080                                 const struct lu_fid *pfid,
1081                                 const struct lu_name *lname,
1082                                 struct thandle *handle,
1083                                 struct linkea_data *ldata, int first)
1084 {
1085         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1086                                 pfid, lname, handle, ldata, first, 0);
1087 }
1088
1089 static inline int mdd_links_del(const struct lu_env *env,
1090                                 struct mdd_object *mdd_obj,
1091                                 const struct lu_fid *pfid,
1092                                 const struct lu_name *lname,
1093                                 struct thandle *handle)
1094 {
1095         return mdd_links_rename(env, mdd_obj, pfid, lname,
1096                                 NULL, NULL, handle, NULL, 0, 0);
1097 }
1098
1099 /** Read the link EA into a temp buffer.
1100  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1101  * A pointer to the buffer is stored in \a ldata::ld_buf.
1102  *
1103  * \retval 0 or error
1104  */
1105 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1106                    struct linkea_data *ldata)
1107 {
1108         int rc;
1109
1110         if (!mdd_object_exists(mdd_obj))
1111                 return -ENODATA;
1112
1113         /* First try a small buf */
1114         LASSERT(env != NULL);
1115         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1116                                                PAGE_CACHE_SIZE);
1117         if (ldata->ld_buf->lb_buf == NULL)
1118                 return -ENOMEM;
1119
1120         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
1121         if (rc == -ERANGE) {
1122                 /* Buf was too small, figure out what we need. */
1123                 lu_buf_free(ldata->ld_buf);
1124                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1125                                    XATTR_NAME_LINK);
1126                 if (rc < 0)
1127                         return rc;
1128                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1129                 if (ldata->ld_buf->lb_buf == NULL)
1130                         return -ENOMEM;
1131                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1132                                   XATTR_NAME_LINK);
1133         }
1134         if (rc < 0) {
1135                 lu_buf_free(ldata->ld_buf);
1136                 ldata->ld_buf = NULL;
1137                 return rc;
1138         }
1139
1140         return linkea_init(ldata);
1141 }
1142
1143 /** Read the link EA into a temp buffer.
1144  * Uses the name_buf since it is generally large.
1145  * \retval IS_ERR err
1146  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1147  */
1148 struct lu_buf *mdd_links_get(const struct lu_env *env,
1149                              struct mdd_object *mdd_obj)
1150 {
1151         struct linkea_data ldata = { NULL };
1152         int rc;
1153
1154         rc = mdd_links_read(env, mdd_obj, &ldata);
1155         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1156 }
1157
1158 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1159                     struct linkea_data *ldata, struct thandle *handle)
1160 {
1161         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1162                                                      ldata->ld_leh->leh_len);
1163         int                 rc;
1164
1165         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
1166                 return 0;
1167
1168         rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
1169         if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
1170             mdd_object_remote(mdd_obj) == 0) {
1171                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1172                 struct thandle  *sub_th;
1173
1174                 /* XXX: If the linkEA is overflow, then we need to notify the
1175                  *      namespace LFSCK to skip "nlink" attribute verification
1176                  *      on this object to avoid the "nlink" to be shrinked by
1177                  *      wrong. It may be not good an interaction with LFSCK
1178                  *      like this. We will consider to replace it with other
1179                  *      mechanism in future. LU-5802. */
1180                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
1181                                LFSCK_TYPE_NAMESPACE);
1182
1183                 sub_th = thandle_get_sub_by_dt(env, handle,
1184                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1185                 lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1186                                 lr, sub_th);
1187         }
1188
1189         return rc;
1190 }
1191
1192 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1193                           struct thandle *handle, struct linkea_data *ldata,
1194                           enum mdd_links_add_overflow overflow)
1195 {
1196         int     rc;
1197         int     ea_len;
1198         void    *linkea;
1199
1200         if (ldata != NULL && ldata->ld_leh != NULL) {
1201                 ea_len = ldata->ld_leh->leh_len;
1202                 linkea = ldata->ld_buf->lb_buf;
1203         } else {
1204                 ea_len = DEFAULT_LINKEA_SIZE;
1205                 linkea = NULL;
1206         }
1207
1208         /* XXX: max size? */
1209         rc = mdo_declare_xattr_set(env, mdd_obj,
1210                                    mdd_buf_get_const(env, linkea, ea_len),
1211                                    XATTR_NAME_LINK, 0, handle);
1212         if (rc != 0)
1213                 return rc;
1214
1215         if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
1216                 struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
1217                 struct thandle  *sub_th;
1218
1219                 /* XXX: If the linkEA is overflow, then we need to notify the
1220                  *      namespace LFSCK to skip "nlink" attribute verification
1221                  *      on this object to avoid the "nlink" to be shrinked by
1222                  *      wrong. It may be not good an interaction with LFSCK
1223                  *      like this. We will consider to replace it with other
1224                  *      mechanism in future. LU-5802. */
1225                 lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
1226                                LFSCK_TYPE_NAMESPACE);
1227
1228                 sub_th = thandle_get_sub_by_dt(env, handle,
1229                                 mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
1230                 rc = lfsck_in_notify(env,
1231                                      mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
1232                                      lr, sub_th);
1233         }
1234
1235         return rc;
1236 }
1237
1238 static inline int mdd_declare_links_del(const struct lu_env *env,
1239                                         struct mdd_object *c,
1240                                         struct thandle *handle)
1241 {
1242         int rc = 0;
1243
1244         /* For directory, the linkEA will be removed together
1245          * with the object. */
1246         if (!S_ISDIR(mdd_object_type(c)))
1247                 rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
1248
1249         return rc;
1250 }
1251
1252 static int mdd_declare_link(const struct lu_env *env,
1253                             struct mdd_device *mdd,
1254                             struct mdd_object *p,
1255                             struct mdd_object *c,
1256                             const struct lu_name *name,
1257                             struct thandle *handle,
1258                             struct lu_attr *la,
1259                             struct linkea_data *data)
1260 {
1261         int rc;
1262
1263         rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
1264                                       name->ln_name, handle);
1265         if (rc != 0)
1266                 return rc;
1267
1268         rc = mdo_declare_ref_add(env, c, handle);
1269         if (rc != 0)
1270                 return rc;
1271
1272         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1273                 rc = mdo_declare_ref_add(env, c, handle);
1274                 if (rc != 0)
1275                         return rc;
1276         }
1277
1278         la->la_valid = LA_CTIME | LA_MTIME;
1279         rc = mdo_declare_attr_set(env, p, la, handle);
1280         if (rc != 0)
1281                 return rc;
1282
1283         la->la_valid = LA_CTIME;
1284         rc = mdo_declare_attr_set(env, c, la, handle);
1285         if (rc != 0)
1286                 return rc;
1287
1288         rc = mdd_declare_links_add(env, c, handle, data,
1289                         S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
1290         if (rc != 0)
1291                 return rc;
1292
1293         rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1294
1295         return rc;
1296 }
1297
1298 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1299                     struct md_object *src_obj, const struct lu_name *lname,
1300                     struct md_attr *ma)
1301 {
1302         const char *name = lname->ln_name;
1303         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1304         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1305         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1306         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1307         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1308         struct mdd_device *mdd = mdo2mdd(src_obj);
1309         struct thandle *handle;
1310         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1311         int rc;
1312         ENTRY;
1313
1314         rc = mdd_la_get(env, mdd_sobj, cattr);
1315         if (rc != 0)
1316                 RETURN(rc);
1317
1318         rc = mdd_la_get(env, mdd_tobj, tattr);
1319         if (rc != 0)
1320                 RETURN(rc);
1321
1322         handle = mdd_trans_create(env, mdd);
1323         if (IS_ERR(handle))
1324                 GOTO(out_pending, rc = PTR_ERR(handle));
1325
1326         memset(ldata, 0, sizeof(*ldata));
1327
1328         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1329         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1330
1331         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1332                               la, ldata);
1333         if (rc)
1334                 GOTO(stop, rc);
1335
1336         rc = mdd_trans_start(env, mdd, handle);
1337         if (rc)
1338                 GOTO(stop, rc);
1339
1340         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1341         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1342                                    cattr);
1343         if (rc)
1344                 GOTO(out_unlock, rc);
1345
1346         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
1347                 rc = mdo_ref_add(env, mdd_sobj, handle);
1348                 if (rc != 0)
1349                         GOTO(out_unlock, rc);
1350         }
1351
1352         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
1353                 rc = mdo_ref_add(env, mdd_sobj, handle);
1354                 if (rc != 0)
1355                         GOTO(out_unlock, rc);
1356         }
1357
1358         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
1359                 struct lu_fid tfid = *mdo2fid(mdd_sobj);
1360
1361                 tfid.f_oid++;
1362                 rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
1363                                              mdd_object_type(mdd_sobj),
1364                                              name, handle);
1365         } else {
1366                 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1367                                              mdd_object_type(mdd_sobj),
1368                                              name, handle);
1369         }
1370
1371         if (rc != 0) {
1372                 mdo_ref_del(env, mdd_sobj, handle);
1373                 GOTO(out_unlock, rc);
1374         }
1375
1376         la->la_valid = LA_CTIME | LA_MTIME;
1377         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1378         if (rc)
1379                 GOTO(out_unlock, rc);
1380
1381         la->la_valid = LA_CTIME;
1382         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1383         if (rc == 0) {
1384                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1385                                         mdo2fid(mdd_tobj), lname, 0, 0,
1386                                         ldata);
1387                 if (rc == 0)
1388                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1389                                       lname, handle, ldata, 0);
1390                 /* The failure of links_add should not cause the link
1391                  * failure, reset rc here */
1392                 rc = 0;
1393         }
1394         EXIT;
1395 out_unlock:
1396         mdd_write_unlock(env, mdd_sobj);
1397         if (rc == 0)
1398                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1399                                             mdo2fid(mdd_tobj), NULL, NULL,
1400                                             lname, NULL, handle);
1401 stop:
1402         mdd_trans_stop(env, mdd, rc, handle);
1403
1404         if (is_vmalloc_addr(ldata->ld_buf))
1405                 /* if we vmalloced a large buffer drop it */
1406                 lu_buf_free(ldata->ld_buf);
1407 out_pending:
1408         return rc;
1409 }
1410
1411 static int mdd_mark_dead_object(const struct lu_env *env,
1412                                 struct mdd_object *obj, struct thandle *handle,
1413                                 bool declare)
1414 {
1415         struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
1416         int rc;
1417
1418         if (!declare)
1419                 obj->mod_flags |= DEAD_OBJ;
1420
1421         if (!S_ISDIR(mdd_object_type(obj)))
1422                 return 0;
1423
1424         attr->la_valid = LA_FLAGS;
1425         attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
1426
1427         if (declare)
1428                 rc = mdo_declare_attr_set(env, obj, attr, handle);
1429         else
1430                 rc = mdo_attr_set(env, obj, attr, handle);
1431
1432         return rc;
1433 }
1434
1435 static int mdd_declare_finish_unlink(const struct lu_env *env,
1436                                      struct mdd_object *obj,
1437                                      struct thandle *handle)
1438 {
1439         int     rc;
1440
1441         rc = mdd_mark_dead_object(env, obj, handle, true);
1442         if (rc != 0)
1443                 return rc;
1444
1445         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1446         if (rc != 0)
1447                 return rc;
1448
1449         rc = mdo_declare_destroy(env, obj, handle);
1450         if (rc != 0)
1451                 return rc;
1452
1453         return mdd_declare_links_del(env, obj, handle);
1454 }
1455
1456 /* caller should take a lock before calling */
1457 int mdd_finish_unlink(const struct lu_env *env,
1458                       struct mdd_object *obj, struct md_attr *ma,
1459                       const struct mdd_object *pobj,
1460                       const struct lu_name *lname,
1461                       struct thandle *th)
1462 {
1463         int rc = 0;
1464         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1465         ENTRY;
1466
1467         LASSERT(mdd_write_locked(env, obj) != 0);
1468
1469         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1470                 rc = mdd_mark_dead_object(env, obj, th, false);
1471                 if (rc != 0)
1472                         RETURN(rc);
1473
1474                 /* add new orphan and the object
1475                  * will be deleted during mdd_close() */
1476                 if (obj->mod_count) {
1477                         rc = __mdd_orphan_add(env, obj, th);
1478                         if (rc == 0)
1479                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1480                                         "orphan list, open count = %d\n",
1481                                         PFID(mdd_object_fid(obj)),
1482                                         obj->mod_count);
1483                         else
1484                                 CERROR("Object "DFID" fail to be an orphan, "
1485                                        "open count = %d, maybe cause failed "
1486                                        "open replay\n",
1487                                         PFID(mdd_object_fid(obj)),
1488                                         obj->mod_count);
1489                 } else {
1490                         rc = mdo_destroy(env, obj, th);
1491                 }
1492         } else if (!is_dir) {
1493                 /* old files may not have link ea; ignore errors */
1494                 mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
1495         }
1496
1497         RETURN(rc);
1498 }
1499
1500 /*
1501  * pobj maybe NULL
1502  * has mdd_write_lock on cobj already, but not on pobj yet
1503  */
1504 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1505                             const struct lu_attr *pattr,
1506                             struct mdd_object *cobj,
1507                             const struct lu_attr *cattr)
1508 {
1509         int rc;
1510         ENTRY;
1511
1512         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1513
1514         RETURN(rc);
1515 }
1516
1517 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1518                               struct mdd_object *p, struct mdd_object *c,
1519                               const struct lu_name *name, struct md_attr *ma,
1520                               struct thandle *handle, int no_name, int is_dir)
1521 {
1522         struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1523         int              rc;
1524
1525         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1526                 if (likely(no_name == 0)) {
1527                         rc = mdo_declare_index_delete(env, p, name->ln_name,
1528                                                       handle);
1529                         if (rc != 0)
1530                                 return rc;
1531                 }
1532
1533                 if (is_dir != 0) {
1534                         rc = mdo_declare_ref_del(env, p, handle);
1535                         if (rc != 0)
1536                                 return rc;
1537                 }
1538         }
1539
1540         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1541         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1542         la->la_valid = LA_CTIME | LA_MTIME;
1543         rc = mdo_declare_attr_set(env, p, la, handle);
1544         if (rc)
1545                 return rc;
1546
1547         if (c != NULL) {
1548                 rc = mdo_declare_ref_del(env, c, handle);
1549                 if (rc)
1550                         return rc;
1551
1552                 rc = mdo_declare_ref_del(env, c, handle);
1553                 if (rc)
1554                         return rc;
1555
1556                 la->la_valid = LA_CTIME;
1557                 rc = mdo_declare_attr_set(env, c, la, handle);
1558                 if (rc)
1559                         return rc;
1560
1561                 rc = mdd_declare_finish_unlink(env, c, handle);
1562                 if (rc)
1563                         return rc;
1564
1565                 /* FIXME: need changelog for remove entry */
1566                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
1567         }
1568
1569         return rc;
1570 }
1571
1572 /*
1573  * test if a file has an HSM archive
1574  * if HSM attributes are not found in ma update them from
1575  * HSM xattr
1576  */
1577 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1578                                    struct mdd_object *obj,
1579                                    struct md_attr *ma)
1580 {
1581         ENTRY;
1582
1583         if (!(ma->ma_valid & MA_HSM)) {
1584                 /* no HSM MD provided, read xattr */
1585                 struct lu_buf   *hsm_buf;
1586                 const size_t     buflen = sizeof(struct hsm_attrs);
1587                 int              rc;
1588
1589                 hsm_buf = mdd_buf_get(env, NULL, 0);
1590                 lu_buf_alloc(hsm_buf, buflen);
1591                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM);
1592                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1593                 lu_buf_free(hsm_buf);
1594                 if (rc < 0)
1595                         RETURN(false);
1596
1597                 ma->ma_valid = MA_HSM;
1598         }
1599         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1600                 RETURN(true);
1601         RETURN(false);
1602 }
1603
1604 /**
1605  * Delete name entry and the object.
1606  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1607  * does not exist for this object, and it could only happen during resending
1608  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1609  * is also needed in this case(needed by changelog), so we have to add another
1610  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1611  * the ENOENT failure should be able to be fixed by redo mechanism.
1612  */
1613 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1614                       struct md_object *cobj, const struct lu_name *lname,
1615                       struct md_attr *ma, int no_name)
1616 {
1617         const char *name = lname->ln_name;
1618         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1619         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1620         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1621         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1622         struct mdd_object *mdd_cobj = NULL;
1623         struct mdd_device *mdd = mdo2mdd(pobj);
1624         struct thandle    *handle;
1625         int rc, is_dir = 0;
1626         ENTRY;
1627
1628         /* cobj == NULL means only delete name entry */
1629         if (likely(cobj != NULL)) {
1630                 mdd_cobj = md2mdd_obj(cobj);
1631                 if (mdd_object_exists(mdd_cobj) == 0)
1632                         RETURN(-ENOENT);
1633         }
1634
1635         rc = mdd_la_get(env, mdd_pobj, pattr);
1636         if (rc)
1637                 RETURN(rc);
1638
1639         if (likely(mdd_cobj != NULL)) {
1640                 /* fetch cattr */
1641                 rc = mdd_la_get(env, mdd_cobj, cattr);
1642                 if (rc)
1643                         RETURN(rc);
1644
1645                 is_dir = S_ISDIR(cattr->la_mode);
1646         }
1647
1648         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1649         if (rc)
1650                 RETURN(rc);
1651
1652         handle = mdd_trans_create(env, mdd);
1653         if (IS_ERR(handle))
1654                 RETURN(PTR_ERR(handle));
1655
1656         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1657                                 lname, ma, handle, no_name, is_dir);
1658         if (rc)
1659                 GOTO(stop, rc);
1660
1661         rc = mdd_trans_start(env, mdd, handle);
1662         if (rc)
1663                 GOTO(stop, rc);
1664
1665         if (likely(mdd_cobj != NULL))
1666                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1667
1668         if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
1669                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1670                 if (rc)
1671                         GOTO(cleanup, rc);
1672         }
1673
1674         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF) ||
1675             OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_NAMEENTRY))
1676                 GOTO(cleanup, rc = 0);
1677
1678         if (likely(mdd_cobj != NULL)) {
1679                 rc = mdo_ref_del(env, mdd_cobj, handle);
1680                 if (rc != 0) {
1681                         __mdd_index_insert_only(env, mdd_pobj,
1682                                                 mdo2fid(mdd_cobj),
1683                                                 mdd_object_type(mdd_cobj),
1684                                                 name, handle);
1685                         GOTO(cleanup, rc);
1686                 }
1687
1688                 if (is_dir)
1689                         /* unlink dot */
1690                         mdo_ref_del(env, mdd_cobj, handle);
1691
1692                 /* fetch updated nlink */
1693                 rc = mdd_la_get(env, mdd_cobj, cattr);
1694                 if (rc)
1695                         GOTO(cleanup, rc);
1696         }
1697
1698         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1699         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1700
1701         la->la_valid = LA_CTIME | LA_MTIME;
1702         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1703         if (rc)
1704                 GOTO(cleanup, rc);
1705
1706         /* Enough for only unlink the entry */
1707         if (unlikely(mdd_cobj == NULL))
1708                 GOTO(stop, rc);
1709
1710         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1711                 /* update ctime of an unlinked file only if it is still
1712                  * opened or a link still exists */
1713                 la->la_valid = LA_CTIME;
1714                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1715                 if (rc)
1716                         GOTO(cleanup, rc);
1717         }
1718
1719         /* XXX: this transfer to ma will be removed with LOD/OSP */
1720         ma->ma_attr = *cattr;
1721         ma->ma_valid |= MA_INODE;
1722         rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
1723
1724         /* fetch updated nlink */
1725         if (rc == 0)
1726                 rc = mdd_la_get(env, mdd_cobj, cattr);
1727
1728         /* if object is removed then we can't get its attrs, use last get */
1729         if (cattr->la_nlink == 0) {
1730                 ma->ma_attr = *cattr;
1731                 ma->ma_valid |= MA_INODE;
1732         }
1733         EXIT;
1734 cleanup:
1735         if (likely(mdd_cobj != NULL))
1736                 mdd_write_unlock(env, mdd_cobj);
1737
1738         if (rc == 0) {
1739                 int cl_flags = 0;
1740
1741                 if (cattr->la_nlink == 0) {
1742                         cl_flags |= CLF_UNLINK_LAST;
1743                         /* search for an existing archive */
1744                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1745                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1746                 }
1747
1748                 rc = mdd_changelog_ns_store(env, mdd,
1749                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1750                         mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
1751                         handle);
1752         }
1753
1754 stop:
1755         mdd_trans_stop(env, mdd, rc, handle);
1756
1757         return rc;
1758 }
1759
1760 /*
1761  * The permission has been checked when obj created, no need check again.
1762  */
1763 static int mdd_cd_sanity_check(const struct lu_env *env,
1764                                struct mdd_object *obj)
1765 {
1766         ENTRY;
1767
1768         /* EEXIST check */
1769         if (!obj || mdd_is_dead_obj(obj))
1770                 RETURN(-ENOENT);
1771
1772         RETURN(0);
1773 }
1774
1775 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1776                            struct md_object *cobj, const struct md_op_spec *spec,
1777                            struct md_attr *ma)
1778 {
1779         struct mdd_device *mdd = mdo2mdd(cobj);
1780         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1781         struct mdd_object *son = md2mdd_obj(cobj);
1782         struct thandle    *handle;
1783         const struct lu_buf *buf;
1784         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1785         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1786         int                rc;
1787         ENTRY;
1788
1789         rc = mdd_cd_sanity_check(env, son);
1790         if (rc)
1791                 RETURN(rc);
1792
1793         if (!md_should_create(spec->sp_cr_flags))
1794                 RETURN(0);
1795
1796         /*
1797          * there are following use cases for this function:
1798          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1799          *    striping can be specified or not
1800          * 2) CMD?
1801          */
1802         rc = mdd_la_get(env, son, attr);
1803         if (rc)
1804                 RETURN(rc);
1805
1806         /* calling ->ah_make_hint() is used to transfer information from parent */
1807         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
1808
1809         handle = mdd_trans_create(env, mdd);
1810         if (IS_ERR(handle))
1811                 GOTO(out_free, rc = PTR_ERR(handle));
1812
1813         /*
1814          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1815          * Should this be fixed?
1816          */
1817         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1818                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1819                spec->sp_cr_flags, spec->no_create);
1820
1821         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1822                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1823                                         spec->u.sp_ea.eadatalen);
1824         } else {
1825                 buf = &LU_BUF_NULL;
1826         }
1827
1828         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1829                                   XATTR_NAME_LOV, 0, handle);
1830         if (rc)
1831                 GOTO(stop, rc);
1832
1833         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1834         if (rc)
1835                 GOTO(stop, rc);
1836
1837         rc = mdd_trans_start(env, mdd, handle);
1838         if (rc)
1839                 GOTO(stop, rc);
1840
1841         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1842                           0, handle);
1843
1844         if (rc)
1845                 GOTO(stop, rc);
1846
1847         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1848
1849 stop:
1850         mdd_trans_stop(env, mdd, rc, handle);
1851 out_free:
1852         RETURN(rc);
1853 }
1854
1855 static int mdd_declare_object_initialize(const struct lu_env *env,
1856                                          struct mdd_object *parent,
1857                                          struct mdd_object *child,
1858                                          struct lu_attr *attr,
1859                                          struct thandle *handle)
1860 {
1861         int rc;
1862         ENTRY;
1863
1864         /*
1865          * inode mode has been set in creation time, and it's based on umask,
1866          * la_mode and acl, don't set here again! (which will go wrong
1867          * because below function doesn't consider umask).
1868          * I'd suggest set all object attributes in creation time, see above.
1869          */
1870         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1871         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1872         rc = mdo_declare_attr_set(env, child, attr, handle);
1873         attr->la_valid |= LA_MODE | LA_TYPE;
1874         if (rc != 0 || !S_ISDIR(attr->la_mode))
1875                 RETURN(rc);
1876
1877         rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
1878                                       dot, handle);
1879         if (rc != 0)
1880                 RETURN(rc);
1881
1882         rc = mdo_declare_ref_add(env, child, handle);
1883         if (rc != 0)
1884                 RETURN(rc);
1885
1886         rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
1887                                       dotdot, handle);
1888
1889         RETURN(rc);
1890 }
1891
1892 static int mdd_object_initialize(const struct lu_env *env,
1893                                  const struct lu_fid *pfid,
1894                                  struct mdd_object *child,
1895                                  struct lu_attr *attr, struct thandle *handle,
1896                                  const struct md_op_spec *spec)
1897 {
1898         int rc = 0;
1899         ENTRY;
1900
1901         if (S_ISDIR(attr->la_mode)) {
1902                 /* Add "." and ".." for newly created dir */
1903                 mdo_ref_add(env, child, handle);
1904                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1905                                              S_IFDIR, dot, handle);
1906                 if (rc == 0)
1907                         rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
1908                                                      dotdot, handle);
1909                 if (rc != 0)
1910                         mdo_ref_del(env, child, handle);
1911         }
1912
1913         RETURN(rc);
1914 }
1915
1916 /**
1917  * This function checks whether it can create a file/dir under the
1918  * directory(@pobj). The directory(@pobj) is not being locked by
1919  * mdd lock.
1920  *
1921  * \param[in] env       execution environment
1922  * \param[in] pobj      the directory to create files
1923  * \param[in] pattr     the attributes of the directory
1924  * \param[in] lname     the name of the created file/dir
1925  * \param[in] cattr     the attributes of the file/dir
1926  * \param[in] spec      create specification
1927  *
1928  * \retval              = 0 it is allowed to create file/dir under
1929  *                      the directory
1930  * \retval              negative error not allowed to create file/dir
1931  *                      under the directory
1932  */
1933 static int mdd_create_sanity_check(const struct lu_env *env,
1934                                    struct md_object *pobj,
1935                                    const struct lu_attr *pattr,
1936                                    const struct lu_name *lname,
1937                                    struct lu_attr *cattr,
1938                                    struct md_op_spec *spec)
1939 {
1940         struct mdd_thread_info *info = mdd_env_info(env);
1941         struct lu_fid     *fid       = &info->mti_fid;
1942         struct mdd_object *obj       = md2mdd_obj(pobj);
1943         struct mdd_device *m         = mdo2mdd(pobj);
1944         bool            check_perm = true;
1945         int rc;
1946         ENTRY;
1947
1948         /* EEXIST check */
1949         if (mdd_is_dead_obj(obj))
1950                 RETURN(-ENOENT);
1951
1952         /*
1953          * In some cases this lookup is not needed - we know before if name
1954          * exists or not because MDT performs lookup for it.
1955          * name length check is done in lookup.
1956          */
1957         if (spec->sp_cr_lookup) {
1958                 /*
1959                  * Check if the name already exist, though it will be checked in
1960                  * _index_insert also, for avoiding rolling back if exists
1961                  * _index_insert.
1962                  */
1963                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1964                                   MAY_WRITE | MAY_EXEC);
1965                 if (rc != -ENOENT)
1966                         RETURN(rc ? : -EEXIST);
1967
1968                 /* Permission is already being checked in mdd_lookup */
1969                 check_perm = false;
1970         }
1971
1972         rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
1973         if (rc != 0)
1974                 RETURN(rc);
1975
1976         /* sgid check */
1977         if (pattr->la_mode & S_ISGID) {
1978                 cattr->la_gid = pattr->la_gid;
1979                 if (S_ISDIR(cattr->la_mode)) {
1980                         cattr->la_mode |= S_ISGID;
1981                         cattr->la_valid |= LA_MODE;
1982                 }
1983         }
1984
1985         rc = mdd_name_check(m, lname);
1986         if (rc < 0)
1987                 RETURN(rc);
1988
1989         switch (cattr->la_mode & S_IFMT) {
1990         case S_IFLNK: {
1991                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1992
1993                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1994                         RETURN(-ENAMETOOLONG);
1995                 else
1996                         RETURN(0);
1997         }
1998         case S_IFDIR:
1999         case S_IFREG:
2000         case S_IFCHR:
2001         case S_IFBLK:
2002         case S_IFIFO:
2003         case S_IFSOCK:
2004                 rc = 0;
2005                 break;
2006         default:
2007                 rc = -EINVAL;
2008                 break;
2009         }
2010         RETURN(rc);
2011 }
2012
2013 static int mdd_declare_object_create(const struct lu_env *env,
2014                                      struct mdd_device *mdd,
2015                                      struct mdd_object *p, struct mdd_object *c,
2016                                      struct lu_attr *attr,
2017                                      struct thandle *handle,
2018                                      const struct md_op_spec *spec,
2019                                      struct lu_buf *def_acl_buf,
2020                                      struct lu_buf *acl_buf,
2021                                      struct dt_allocation_hint *hint)
2022 {
2023         int rc;
2024
2025         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
2026                                                 hint);
2027         if (rc)
2028                 GOTO(out, rc);
2029
2030 #ifdef CONFIG_FS_POSIX_ACL
2031         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
2032                 /* if dir, then can inherit default ACl */
2033                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
2034                                            XATTR_NAME_ACL_DEFAULT,
2035                                            0, handle);
2036                 if (rc)
2037                         GOTO(out, rc);
2038         }
2039
2040         if (acl_buf->lb_len > 0) {
2041                 rc = mdo_declare_attr_set(env, c, attr, handle);
2042                 if (rc)
2043                         GOTO(out, rc);
2044
2045                 rc = mdo_declare_xattr_set(env, c, acl_buf,
2046                                            XATTR_NAME_ACL_ACCESS, 0, handle);
2047                 if (rc)
2048                         GOTO(out, rc);
2049         }
2050 #endif
2051         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
2052         if (rc)
2053                 GOTO(out, rc);
2054
2055         /* replay case, create LOV EA from client data */
2056         if (spec->no_create ||
2057             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
2058                 const struct lu_buf *buf;
2059
2060                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2061                                         spec->u.sp_ea.eadatalen);
2062                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
2063                                            handle);
2064                 if (rc)
2065                         GOTO(out, rc);
2066         }
2067
2068         if (S_ISLNK(attr->la_mode)) {
2069                 const char *target_name = spec->u.sp_symname;
2070                 int sym_len = strlen(target_name);
2071                 const struct lu_buf *buf;
2072
2073                 buf = mdd_buf_get_const(env, target_name, sym_len);
2074                 rc = dt_declare_record_write(env, mdd_object_child(c),
2075                                              buf, 0, handle);
2076                 if (rc)
2077                         GOTO(out, rc);
2078         }
2079 out:
2080         return rc;
2081 }
2082
2083 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
2084                               struct mdd_object *p, struct mdd_object *c,
2085                               const struct lu_name *name,
2086                               struct lu_attr *attr,
2087                               struct thandle *handle,
2088                               const struct md_op_spec *spec,
2089                               struct linkea_data *ldata,
2090                               struct lu_buf *def_acl_buf,
2091                               struct lu_buf *acl_buf,
2092                               struct dt_allocation_hint *hint)
2093 {
2094         int rc;
2095
2096         rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
2097                                        def_acl_buf, acl_buf, hint);
2098         if (rc)
2099                 GOTO(out, rc);
2100
2101         if (S_ISDIR(attr->la_mode)) {
2102                 rc = mdo_declare_ref_add(env, p, handle);
2103                 if (rc)
2104                         GOTO(out, rc);
2105         }
2106
2107         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2108                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
2109                 if (rc)
2110                         GOTO(out, rc);
2111         } else {
2112                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
2113
2114                 rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
2115                                               name->ln_name, handle);
2116                 if (rc != 0)
2117                         return rc;
2118
2119                 rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
2120                 if (rc)
2121                         return rc;
2122
2123                 *la = *attr;
2124                 la->la_valid = LA_CTIME | LA_MTIME;
2125                 rc = mdo_declare_attr_set(env, p, la, handle);
2126                 if (rc)
2127                         return rc;
2128
2129                 rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
2130                 if (rc)
2131                         return rc;
2132         }
2133 out:
2134         return rc;
2135 }
2136
2137 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2138                         struct lu_attr *la, struct lu_buf *def_acl_buf,
2139                         struct lu_buf *acl_buf)
2140 {
2141         int     rc;
2142         ENTRY;
2143
2144         if (S_ISLNK(la->la_mode)) {
2145                 acl_buf->lb_len = 0;
2146                 def_acl_buf->lb_len = 0;
2147                 RETURN(0);
2148         }
2149
2150         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2151         rc = mdo_xattr_get(env, pobj, def_acl_buf,
2152                            XATTR_NAME_ACL_DEFAULT);
2153         mdd_read_unlock(env, pobj);
2154         if (rc > 0) {
2155                 /* If there are default ACL, fix mode/ACL by default ACL */
2156                 def_acl_buf->lb_len = rc;
2157                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
2158                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
2159                 acl_buf->lb_len = rc;
2160                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2161                 if (rc < 0)
2162                         RETURN(rc);
2163         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2164                 /* If there are no default ACL, fix mode by mask */
2165                 struct lu_ucred *uc = lu_ucred(env);
2166
2167                 /* The create triggered by MDT internal events, such as
2168                  * LFSCK reset, will not contain valid "uc". */
2169                 if (unlikely(uc != NULL))
2170                         la->la_mode &= ~uc->uc_umask;
2171                 rc = 0;
2172                 acl_buf->lb_len = 0;
2173                 def_acl_buf->lb_len = 0;
2174         }
2175
2176         RETURN(rc);
2177 }
2178
2179 /**
2180  * Create a metadata object and initialize it, set acl, xattr.
2181  **/
2182 static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
2183                              struct mdd_object *son, struct lu_attr *attr,
2184                              struct md_op_spec *spec, struct lu_buf *acl_buf,
2185                              struct lu_buf *def_acl_buf,
2186                              struct dt_allocation_hint *hint,
2187                              struct thandle *handle)
2188 {
2189         int                     rc;
2190
2191         mdd_write_lock(env, son, MOR_TGT_CHILD);
2192         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
2193                                         hint);
2194         if (rc)
2195                 GOTO(unlock, rc);
2196
2197         /* Note: In DNE phase I, for striped dir, though sub-stripes will be
2198          * created in declare phase, they also needs to be added to master
2199          * object as sub-directory entry. So it has to initialize the master
2200          * object, then set dir striped EA.(in mdo_xattr_set) */
2201         rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
2202                                    spec);
2203         if (rc != 0)
2204                 GOTO(err_destroy, rc);
2205
2206         /*
2207          * in case of replay we just set LOVEA provided by the client
2208          * XXX: I think it would be interesting to try "old" way where
2209          *      MDT calls this xattr_set(LOV) in a different transaction.
2210          *      probably this way we code can be made better.
2211          */
2212
2213         /* During creation, there are only a few cases we need do xattr_set to
2214          * create stripes.
2215          * 1. regular file: see comments above.
2216          * 2. dir: inherit default striping or pool settings from parent.
2217          * 3. create striped directory with provided stripeEA.
2218          * 4. create striped directory because inherit default layout from the
2219          * parent.
2220          */
2221         if (spec->no_create ||
2222             (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
2223             S_ISDIR(attr->la_mode)) {
2224                 const struct lu_buf *buf;
2225
2226                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2227                                         spec->u.sp_ea.eadatalen);
2228                 rc = mdo_xattr_set(env, son, buf,
2229                                    S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
2230                                                             XATTR_NAME_LOV, 0,
2231                                    handle);
2232                 if (rc != 0)
2233                         GOTO(err_destroy, rc);
2234         }
2235
2236 #ifdef CONFIG_FS_POSIX_ACL
2237         if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
2238             S_ISDIR(attr->la_mode)) {
2239                 /* set default acl */
2240                 rc = mdo_xattr_set(env, son, def_acl_buf,
2241                                    XATTR_NAME_ACL_DEFAULT, 0,
2242                                    handle);
2243                 if (rc)
2244                         GOTO(err_destroy, rc);
2245         }
2246         /* set its own acl */
2247         if (acl_buf != NULL && acl_buf->lb_len > 0) {
2248                 rc = mdo_xattr_set(env, son, acl_buf,
2249                                    XATTR_NAME_ACL_ACCESS,
2250                                    0, handle);
2251                 if (rc)
2252                         GOTO(err_destroy, rc);
2253         }
2254 #endif
2255
2256         if (S_ISLNK(attr->la_mode)) {
2257                 struct lu_ucred  *uc = lu_ucred_assert(env);
2258                 struct dt_object *dt = mdd_object_child(son);
2259                 const char *target_name = spec->u.sp_symname;
2260                 int sym_len = strlen(target_name);
2261                 const struct lu_buf *buf;
2262                 loff_t pos = 0;
2263
2264                 buf = mdd_buf_get_const(env, target_name, sym_len);
2265                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2266                                                 uc->uc_cap &
2267                                                 CFS_CAP_SYS_RESOURCE_MASK);
2268
2269                 if (rc == sym_len)
2270                         rc = 0;
2271                 else
2272                         GOTO(err_initlized, rc = -EFAULT);
2273         }
2274
2275 err_initlized:
2276         if (unlikely(rc != 0)) {
2277                 int rc2;
2278                 if (S_ISDIR(attr->la_mode)) {
2279                         /* Drop the reference, no need to delete "."/"..",
2280                          * because the object to be destroied directly. */
2281                         rc2 = mdo_ref_del(env, son, handle);
2282                         if (rc2 != 0)
2283                                 GOTO(unlock, rc);
2284                 }
2285                 rc2 = mdo_ref_del(env, son, handle);
2286                 if (rc2 != 0)
2287                         GOTO(unlock, rc);
2288 err_destroy:
2289                 mdo_destroy(env, son, handle);
2290         }
2291 unlock:
2292         mdd_write_unlock(env, son);
2293         RETURN(rc);
2294 }
2295
2296 /*
2297  * Create object and insert it into namespace.
2298  */
2299 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2300                       const struct lu_name *lname, struct md_object *child,
2301                       struct md_op_spec *spec, struct md_attr* ma)
2302 {
2303         struct mdd_thread_info  *info = mdd_env_info(env);
2304         struct lu_attr          *la = &info->mti_la_for_fix;
2305         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2306         struct mdd_object       *son = md2mdd_obj(child);
2307         struct mdd_device       *mdd = mdo2mdd(pobj);
2308         struct lu_attr          *attr = &ma->ma_attr;
2309         struct thandle          *handle;
2310         struct lu_attr          *pattr = &info->mti_pattr;
2311         struct lu_buf           acl_buf;
2312         struct lu_buf           def_acl_buf;
2313         struct linkea_data      *ldata = &info->mti_link_data;
2314         const char              *name = lname->ln_name;
2315         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2316         int                      rc;
2317         int                      rc2;
2318         ENTRY;
2319
2320         /*
2321          * Two operations have to be performed:
2322          *
2323          *  - an allocation of a new object (->do_create()), and
2324          *
2325          *  - an insertion into a parent index (->dio_insert()).
2326          *
2327          * Due to locking, operation order is not important, when both are
2328          * successful, *but* error handling cases are quite different:
2329          *
2330          *  - if insertion is done first, and following object creation fails,
2331          *  insertion has to be rolled back, but this operation might fail
2332          *  also leaving us with dangling index entry.
2333          *
2334          *  - if creation is done first, is has to be undone if insertion
2335          *  fails, leaving us with leaked space, which is neither good, nor
2336          *  fatal.
2337          *
2338          * It seems that creation-first is simplest solution, but it is
2339          * sub-optimal in the frequent
2340          *
2341          *         $ mkdir foo
2342          *         $ mkdir foo
2343          *
2344          * case, because second mkdir is bound to create object, only to
2345          * destroy it immediately.
2346          *
2347          * To avoid this follow local file systems that do double lookup:
2348          *
2349          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2350          *
2351          *     1. create            (mdd_object_create_internal())
2352          *
2353          *     2. insert            (__mdd_index_insert(), lookup again)
2354          */
2355
2356         rc = mdd_la_get(env, mdd_pobj, pattr);
2357         if (rc != 0)
2358                 RETURN(rc);
2359
2360         /* Sanity checks before big job. */
2361         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2362         if (rc)
2363                 RETURN(rc);
2364
2365         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2366                 GOTO(out_free, rc = -EINPROGRESS);
2367
2368         handle = mdd_trans_create(env, mdd);
2369         if (IS_ERR(handle))
2370                 GOTO(out_free, rc = PTR_ERR(handle));
2371
2372         acl_buf.lb_buf = info->mti_xattr_buf;
2373         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2374         def_acl_buf.lb_buf = info->mti_key;
2375         def_acl_buf.lb_len = sizeof(info->mti_key);
2376         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2377         if (rc < 0)
2378                 GOTO(out_stop, rc);
2379
2380         mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
2381
2382         memset(ldata, 0, sizeof(*ldata));
2383         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
2384                 struct lu_fid tfid = *mdd_object_fid(mdd_pobj);
2385
2386                 tfid.f_oid--;
2387                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2388                                         &tfid, lname, 1, 0, ldata);
2389         } else {
2390                 rc = mdd_linkea_prepare(env, son, NULL, NULL,
2391                                         mdd_object_fid(mdd_pobj),
2392                                         lname, 1, 0, ldata);
2393         }
2394
2395         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2396                                 handle, spec, ldata, &def_acl_buf, &acl_buf,
2397                                 hint);
2398         if (rc)
2399                 GOTO(out_stop, rc);
2400
2401         rc = mdd_trans_start(env, mdd, handle);
2402         if (rc)
2403                 GOTO(out_stop, rc);
2404
2405         rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
2406                                &def_acl_buf, hint, handle);
2407         if (rc != 0)
2408                 GOTO(out_stop, rc);
2409
2410         if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
2411                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2412                 rc = __mdd_orphan_add(env, son, handle);
2413                 GOTO(out_volatile, rc);
2414         } else {
2415                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2416                                         attr->la_mode, name, handle);
2417                 if (rc != 0)
2418                         GOTO(err_created, rc);
2419
2420                 mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
2421                               ldata, 1);
2422
2423                 /* update parent directory mtime/ctime */
2424                 *la = *attr;
2425                 la->la_valid = LA_CTIME | LA_MTIME;
2426                 rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2427                 if (rc)
2428                         GOTO(err_insert, rc);
2429         }
2430
2431         EXIT;
2432 err_insert:
2433         if (rc != 0) {
2434                 int rc2;
2435
2436                 if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2437                         rc2 = __mdd_orphan_del(env, son, handle);
2438                 else
2439                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2440                                                  S_ISDIR(attr->la_mode),
2441                                                  handle);
2442                 if (rc2 != 0)
2443                         goto out_stop;
2444
2445 err_created:
2446                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2447                 if (S_ISDIR(attr->la_mode)) {
2448                         /* Drop the reference, no need to delete "."/"..",
2449                          * because the object is to be destroyed directly. */
2450                         rc2 = mdo_ref_del(env, son, handle);
2451                         if (rc2 != 0) {
2452                                 mdd_write_unlock(env, son);
2453                                 goto out_stop;
2454                         }
2455                 }
2456 out_volatile:
2457                 /* For volatile files drop one link immediately, since there is
2458                  * no filename in the namespace, and save any error returned. */
2459                 rc2 = mdo_ref_del(env, son, handle);
2460                 if (rc2 != 0) {
2461                         mdd_write_unlock(env, son);
2462                         if (unlikely(rc == 0))
2463                                 rc = rc2;
2464                         goto out_stop;
2465                 }
2466
2467                 /* Don't destroy the volatile object on success */
2468                 if (likely(rc != 0))
2469                         mdo_destroy(env, son, handle);
2470                 mdd_write_unlock(env, son);
2471         }
2472
2473         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
2474             likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
2475                 rc = mdd_changelog_ns_store(env, mdd,
2476                                 S_ISDIR(attr->la_mode) ? CL_MKDIR :
2477                                 S_ISREG(attr->la_mode) ? CL_CREATE :
2478                                 S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2479                                 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
2480                                 NULL, handle);
2481 out_stop:
2482         rc2 = mdd_trans_stop(env, mdd, rc, handle);
2483         if (rc == 0)
2484                 rc = rc2;
2485 out_free:
2486         if (is_vmalloc_addr(ldata->ld_buf))
2487                 /* if we vmalloced a large buffer drop it */
2488                 lu_buf_free(ldata->ld_buf);
2489
2490         /* The child object shouldn't be cached anymore */
2491         if (rc)
2492                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2493                         &child->mo_lu.lo_header->loh_flags);
2494         return rc;
2495 }
2496
2497 /*
2498  * Get locks on parents in proper order
2499  * RETURN: < 0 - error, rename_order if successful
2500  */
2501 enum rename_order {
2502         MDD_RN_SAME,
2503         MDD_RN_SRCTGT,
2504         MDD_RN_TGTSRC
2505 };
2506
2507 static int mdd_rename_order(const struct lu_env *env,
2508                             struct mdd_device *mdd,
2509                             struct mdd_object *src_pobj,
2510                             const struct lu_attr *pattr,
2511                             struct mdd_object *tgt_pobj)
2512 {
2513         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2514         int rc;
2515         ENTRY;
2516
2517         if (src_pobj == tgt_pobj)
2518                 RETURN(MDD_RN_SAME);
2519
2520         /* compared the parent child relationship of src_p&tgt_p */
2521         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2522                 rc = MDD_RN_SRCTGT;
2523         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2524                 rc = MDD_RN_TGTSRC;
2525         } else {
2526                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2527                                    NULL);
2528                 if (rc == -EREMOTE)
2529                         rc = 0;
2530
2531                 if (rc == 1)
2532                         rc = MDD_RN_TGTSRC;
2533                 else if (rc == 0)
2534                         rc = MDD_RN_SRCTGT;
2535         }
2536
2537         RETURN(rc);
2538 }
2539
2540 /* has not mdd_write{read}_lock on any obj yet. */
2541 static int mdd_rename_sanity_check(const struct lu_env *env,
2542                                    struct mdd_object *src_pobj,
2543                                    const struct lu_attr *pattr,
2544                                    struct mdd_object *tgt_pobj,
2545                                    const struct lu_attr *tpattr,
2546                                    struct mdd_object *sobj,
2547                                    const struct lu_attr *cattr,
2548                                    struct mdd_object *tobj,
2549                                    const struct lu_attr *tattr)
2550 {
2551         int rc = 0;
2552         ENTRY;
2553
2554         /* XXX: when get here, sobj must NOT be NULL,
2555          * the other case has been processed in cld_rename
2556          * before mdd_rename and enable MDS_PERM_BYPASS. */
2557         LASSERT(sobj);
2558
2559         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2560         if (rc)
2561                 RETURN(rc);
2562
2563         /* XXX: when get here, "tobj == NULL" means tobj must
2564          * NOT exist (neither on remote MDS, such case has been
2565          * processed in cld_rename before mdd_rename and enable
2566          * MDS_PERM_BYPASS).
2567          * So check may_create, but not check may_unlink. */
2568         if (tobj == NULL)
2569                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2570                                     (src_pobj != tgt_pobj));
2571         else
2572                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2573                                     (src_pobj != tgt_pobj), 1);
2574
2575         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2576                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2577
2578         RETURN(rc);
2579 }
2580
2581 static int mdd_declare_rename(const struct lu_env *env,
2582                               struct mdd_device *mdd,
2583                               struct mdd_object *mdd_spobj,
2584                               struct mdd_object *mdd_tpobj,
2585                               struct mdd_object *mdd_sobj,
2586                               struct mdd_object *mdd_tobj,
2587                               const struct lu_name *tname,
2588                               const struct lu_name *sname,
2589                               struct md_attr *ma,
2590                               struct linkea_data *ldata,
2591                               struct thandle *handle)
2592 {
2593         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2594         int rc;
2595
2596         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2597         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2598
2599         LASSERT(mdd_spobj);
2600         LASSERT(mdd_tpobj);
2601         LASSERT(mdd_sobj);
2602
2603         /* name from source dir */
2604         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2605         if (rc)
2606                 return rc;
2607
2608         /* .. from source child */
2609         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2610                 /* source child can be directory,
2611                  * counted by source dir's nlink */
2612                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2613                 if (rc)
2614                         return rc;
2615                 if (mdd_spobj != mdd_tpobj) {
2616                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2617                                                       handle);
2618                         if (rc != 0)
2619                                 return rc;
2620
2621                         rc = mdo_declare_index_insert(env, mdd_sobj,
2622                                                       mdo2fid(mdd_tpobj),
2623                                                       S_IFDIR, dotdot, handle);
2624                         if (rc != 0)
2625                                 return rc;
2626                 }
2627
2628                 /* new target child can be directory,
2629                  * counted by target dir's nlink */
2630                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2631                 if (rc != 0)
2632                         return rc;
2633         }
2634
2635         la->la_valid = LA_CTIME | LA_MTIME;
2636         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2637         if (rc != 0)
2638                 return rc;
2639
2640         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2641         if (rc != 0)
2642                 return rc;
2643
2644         la->la_valid = LA_CTIME;
2645         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2646         if (rc)
2647                 return rc;
2648
2649         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
2650                 S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
2651         if (rc)
2652                 return rc;
2653
2654         /* new name */
2655         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2656                                       mdd_object_type(mdd_sobj),
2657                                       tname->ln_name, handle);
2658         if (rc != 0)
2659                 return rc;
2660
2661         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2662                 /* delete target child in target parent directory */
2663                 rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name,
2664                                               handle);
2665                 if (rc)
2666                         return rc;
2667
2668                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2669                 if (rc)
2670                         return rc;
2671
2672                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2673                         /* target child can be directory,
2674                          * delete "." reference in target child directory */
2675                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2676                         if (rc)
2677                                 return rc;
2678
2679                         /* delete ".." reference in target parent directory */
2680                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2681                         if (rc)
2682                                 return rc;
2683                 }
2684
2685                 la->la_valid = LA_CTIME;
2686                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2687                 if (rc)
2688                         return rc;
2689
2690                 rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
2691                 if (rc)
2692                         return rc;
2693         }
2694
2695         rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
2696         if (rc)
2697                 return rc;
2698
2699         return rc;
2700 }
2701
2702 /* src object can be remote that is why we use only fid and type of object */
2703 static int mdd_rename(const struct lu_env *env,
2704                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2705                       const struct lu_fid *lf, const struct lu_name *lsname,
2706                       struct md_object *tobj, const struct lu_name *ltname,
2707                       struct md_attr *ma)
2708 {
2709         const char *sname = lsname->ln_name;
2710         const char *tname = ltname->ln_name;
2711         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2712         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2713         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2714         struct mdd_device *mdd = mdo2mdd(src_pobj);
2715         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2716         struct mdd_object *mdd_tobj = NULL;
2717         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2718         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2719         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2720         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2721         struct thandle *handle;
2722         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2723         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2724         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2725         bool is_dir;
2726         bool tobj_ref = 0;
2727         bool tobj_locked = 0;
2728         unsigned cl_flags = 0;
2729         int rc, rc2;
2730         ENTRY;
2731
2732         if (tobj)
2733                 mdd_tobj = md2mdd_obj(tobj);
2734
2735         mdd_sobj = mdd_object_find(env, mdd, lf);
2736         if (IS_ERR(mdd_sobj))
2737                 RETURN(PTR_ERR(mdd_sobj));
2738
2739         rc = mdd_la_get(env, mdd_sobj, cattr);
2740         if (rc)
2741                 GOTO(out_pending, rc);
2742
2743         rc = mdd_la_get(env, mdd_spobj, pattr);
2744         if (rc)
2745                 GOTO(out_pending, rc);
2746
2747         if (mdd_tobj) {
2748                 rc = mdd_la_get(env, mdd_tobj, tattr);
2749                 if (rc)
2750                         GOTO(out_pending, rc);
2751         }
2752
2753         rc = mdd_la_get(env, mdd_tpobj, tpattr);
2754         if (rc)
2755                 GOTO(out_pending, rc);
2756
2757         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2758                                      mdd_sobj, cattr, mdd_tobj, tattr);
2759         if (rc)
2760                 GOTO(out_pending, rc);
2761
2762         rc = mdd_name_check(mdd, ltname);
2763         if (rc < 0)
2764                 GOTO(out_pending, rc);
2765
2766         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2767         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2768         if (rc < 0)
2769                 GOTO(out_pending, rc);
2770
2771         handle = mdd_trans_create(env, mdd);
2772         if (IS_ERR(handle))
2773                 GOTO(out_pending, rc = PTR_ERR(handle));
2774
2775         memset(ldata, 0, sizeof(*ldata));
2776         mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
2777                            mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
2778         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2779                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2780         if (rc)
2781                 GOTO(stop, rc);
2782
2783         rc = mdd_trans_start(env, mdd, handle);
2784         if (rc)
2785                 GOTO(stop, rc);
2786
2787         is_dir = S_ISDIR(cattr->la_mode);
2788
2789         /* Remove source name from source directory */
2790         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
2791         if (rc != 0)
2792                 GOTO(stop, rc);
2793
2794         /* "mv dir1 dir2" needs "dir1/.." link update */
2795         if (is_dir && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2796                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2797                 if (rc != 0)
2798                         GOTO(fixup_spobj2, rc);
2799
2800                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, S_IFDIR,
2801                                              dotdot, handle);
2802                 if (rc != 0)
2803                         GOTO(fixup_spobj, rc);
2804         }
2805
2806         if (mdd_tobj != NULL && mdd_object_exists(mdd_tobj)) {
2807                 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2808                 if (rc != 0)
2809                         /* tname might been renamed to something else */
2810                         GOTO(fixup_spobj, rc);
2811         }
2812
2813         /* Insert new fid with target name into target dir */
2814         rc = __mdd_index_insert(env, mdd_tpobj, lf, cattr->la_mode,
2815                                 tname, handle);
2816         if (rc != 0)
2817                 GOTO(fixup_tpobj, rc);
2818
2819         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2820         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2821
2822         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2823         la->la_valid = LA_CTIME;
2824         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2825         if (rc)
2826                 GOTO(fixup_tpobj, rc);
2827
2828         /* Update the linkEA for the source object */
2829         mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2830         rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2831                               mdo2fid(mdd_tpobj), ltname, handle, ldata,
2832                               0, 0);
2833         if (rc == -ENOENT)
2834                 /* Old files might not have EA entry */
2835                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2836                               lsname, handle, NULL, 0);
2837         mdd_write_unlock(env, mdd_sobj);
2838         /* We don't fail the transaction if the link ea can't be
2839            updated -- fid2path will use alternate lookup method. */
2840         rc = 0;
2841
2842         /* Remove old target object
2843          * For tobj is remote case cmm layer has processed
2844          * and set tobj to NULL then. So when tobj is NOT NULL,
2845          * it must be local one.
2846          */
2847         if (tobj && mdd_object_exists(mdd_tobj)) {
2848                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2849                 tobj_locked = 1;
2850                 if (mdd_is_dead_obj(mdd_tobj)) {
2851                         /* shld not be dead, something is wrong */
2852                         CERROR("tobj is dead, something is wrong\n");
2853                         rc = -EINVAL;
2854                         goto cleanup;
2855                 }
2856                 mdo_ref_del(env, mdd_tobj, handle);
2857
2858                 /* Remove dot reference. */
2859                 if (S_ISDIR(tattr->la_mode))
2860                         mdo_ref_del(env, mdd_tobj, handle);
2861                 tobj_ref = 1;
2862
2863                 /* fetch updated nlink */
2864                 rc = mdd_la_get(env, mdd_tobj, tattr);
2865                 if (rc != 0) {
2866                         CERROR("%s: Failed to get nlink for tobj "
2867                                 DFID": rc = %d\n",
2868                                 mdd2obd_dev(mdd)->obd_name,
2869                                 PFID(tpobj_fid), rc);
2870                         GOTO(fixup_tpobj, rc);
2871                 }
2872
2873                 la->la_valid = LA_CTIME;
2874                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2875                 if (rc != 0) {
2876                         CERROR("%s: Failed to set ctime for tobj "
2877                                 DFID": rc = %d\n",
2878                                 mdd2obd_dev(mdd)->obd_name,
2879                                 PFID(tpobj_fid), rc);
2880                         GOTO(fixup_tpobj, rc);
2881                 }
2882
2883                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2884                 ma->ma_attr = *tattr;
2885                 ma->ma_valid |= MA_INODE;
2886                 rc = mdd_finish_unlink(env, mdd_tobj, ma, mdd_tpobj, ltname,
2887                                        handle);
2888                 if (rc != 0) {
2889                         CERROR("%s: Failed to unlink tobj "
2890                                 DFID": rc = %d\n",
2891                                 mdd2obd_dev(mdd)->obd_name,
2892                                 PFID(tpobj_fid), rc);
2893                         GOTO(fixup_tpobj, rc);
2894                 }
2895
2896                 /* fetch updated nlink */
2897                 rc = mdd_la_get(env, mdd_tobj, tattr);
2898                 if (rc != 0) {
2899                         CERROR("%s: Failed to get nlink for tobj "
2900                                 DFID": rc = %d\n",
2901                                 mdd2obd_dev(mdd)->obd_name,
2902                                 PFID(tpobj_fid), rc);
2903                         GOTO(fixup_tpobj, rc);
2904                 }
2905                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2906                 ma->ma_attr = *tattr;
2907                 ma->ma_valid |= MA_INODE;
2908
2909                 if (tattr->la_nlink == 0) {
2910                         cl_flags |= CLF_RENAME_LAST;
2911                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2912                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2913                 }
2914         }
2915
2916         la->la_valid = LA_CTIME | LA_MTIME;
2917         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2918         if (rc)
2919                 GOTO(fixup_tpobj, rc);
2920
2921         if (mdd_spobj != mdd_tpobj) {
2922                 la->la_valid = LA_CTIME | LA_MTIME;
2923                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2924                 if (rc != 0)
2925                         GOTO(fixup_tpobj, rc);
2926         }
2927
2928         EXIT;
2929
2930 fixup_tpobj:
2931         if (rc) {
2932                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
2933                 if (rc2)
2934                         CWARN("tp obj fix error %d\n",rc2);
2935
2936                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2937                     !mdd_is_dead_obj(mdd_tobj)) {
2938                         if (tobj_ref) {
2939                                 mdo_ref_add(env, mdd_tobj, handle);
2940                                 if (is_dir)
2941                                         mdo_ref_add(env, mdd_tobj, handle);
2942                         }
2943
2944                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2945                                                   mdo2fid(mdd_tobj),
2946                                                   mdd_object_type(mdd_tobj),
2947                                                   tname, handle);
2948                         if (rc2 != 0)
2949                                 CWARN("tp obj fix error: rc = %d\n", rc2);
2950                 }
2951         }
2952
2953 fixup_spobj:
2954         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2955                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle);
2956                 if (rc2)
2957                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2958                                mdd2obd_dev(mdd)->obd_name, rc2);
2959
2960
2961                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, S_IFDIR,
2962                                               dotdot, handle);
2963                 if (rc2 != 0)
2964                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2965                               mdd2obd_dev(mdd)->obd_name, rc2);
2966         }
2967
2968 fixup_spobj2:
2969         if (rc != 0) {
2970                 rc2 = __mdd_index_insert(env, mdd_spobj, lf,
2971                                          mdd_object_type(mdd_sobj), sname,
2972                                          handle);
2973                 if (rc2 != 0)
2974                         CWARN("sp obj fix error: rc = %d\n", rc2);
2975         }
2976
2977 cleanup:
2978         if (tobj_locked)
2979                 mdd_write_unlock(env, mdd_tobj);
2980
2981         if (rc == 0)
2982                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, cl_flags,
2983                                             mdd_tobj, tpobj_fid, lf, spobj_fid,
2984                                             ltname, lsname, handle);
2985
2986 stop:
2987         mdd_trans_stop(env, mdd, rc, handle);
2988
2989 out_pending:
2990         mdd_object_put(env, mdd_sobj);
2991         return rc;
2992 }
2993
2994 /**
2995  * During migration once the parent FID has been changed,
2996  * we need update the parent FID in linkea.
2997  **/
2998 static int mdd_linkea_update_child_internal(const struct lu_env *env,
2999                                             struct mdd_object *parent,
3000                                             struct mdd_object *child,
3001                                             const char *name, int namelen,
3002                                             struct thandle *handle,
3003                                             bool declare)
3004 {
3005         struct mdd_thread_info  *info = mdd_env_info(env);
3006         struct linkea_data      ldata = { NULL };
3007         struct lu_buf           *buf = &info->mti_link_buf;
3008         int                     count;
3009         int                     rc = 0;
3010
3011         ENTRY;
3012
3013         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
3014         if (buf->lb_buf == NULL)
3015                 RETURN(-ENOMEM);
3016
3017         ldata.ld_buf = buf;
3018         rc = mdd_links_read(env, child, &ldata);
3019         if (rc != 0) {
3020                 if (rc == -ENOENT || rc == -ENODATA)
3021                         rc = 0;
3022                 RETURN(rc);
3023         }
3024
3025         LASSERT(ldata.ld_leh != NULL);
3026         ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
3027         for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
3028                 struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
3029                 struct lu_name lname;
3030                 struct lu_fid  fid;
3031
3032                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
3033                                     &lname, &fid);
3034
3035                 if (strncmp(lname.ln_name, name, namelen) != 0 ||
3036                     lu_fid_eq(&fid, mdd_object_fid(parent))) {
3037                         ldata.ld_lee = (struct link_ea_entry *)
3038                                        ((char *)ldata.ld_lee +
3039                                         ldata.ld_reclen);
3040                         continue;
3041                 }
3042
3043                 CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
3044                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
3045                        lname.ln_namelen, lname.ln_name,
3046                        PFID(mdd_object_fid(parent)));
3047                 /* update to the new parent fid */
3048                 linkea_entry_pack(ldata.ld_lee, &lname,
3049                                   mdd_object_fid(parent));
3050                 if (declare)
3051                         rc = mdd_declare_links_add(env, child, handle, &ldata,
3052                                                    MLAO_IGNORE);
3053                 else
3054                         rc = mdd_links_write(env, child, &ldata, handle);
3055                 break;
3056         }
3057         RETURN(rc);
3058 }
3059
3060 static int mdd_linkea_declare_update_child(const struct lu_env *env,
3061                                            struct mdd_object *parent,
3062                                            struct mdd_object *child,
3063                                            const char *name, int namelen,
3064                                            struct thandle *handle)
3065 {
3066         return mdd_linkea_update_child_internal(env, parent, child, name,
3067                                                 namelen, handle, true);
3068 }
3069
3070 static int mdd_linkea_update_child(const struct lu_env *env,
3071                                    struct mdd_object *parent,
3072                                    struct mdd_object *child,
3073                                    const char *name, int namelen,
3074                                    struct thandle *handle)
3075 {
3076         return mdd_linkea_update_child_internal(env, parent, child, name,
3077                                                 namelen, handle, false);
3078 }
3079
3080 static int mdd_update_linkea_internal(const struct lu_env *env,
3081                                       struct mdd_object *mdd_pobj,
3082                                       struct mdd_object *mdd_sobj,
3083                                       struct mdd_object *mdd_tobj,
3084                                       const struct lu_name *child_name,
3085                                       struct linkea_data *ldata,
3086                                       struct thandle *handle,
3087                                       int declare)
3088 {
3089         struct mdd_thread_info  *info = mdd_env_info(env);
3090         int                     count;
3091         int                     rc = 0;
3092         ENTRY;
3093
3094         LASSERT(ldata->ld_buf != NULL);
3095
3096 again:
3097         /* If it is mulitple links file, we need update the name entry for
3098          * all parent */
3099         LASSERT(ldata->ld_leh != NULL);
3100         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
3101         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
3102                 struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3103                 struct mdd_object       *pobj;
3104                 struct lu_name          lname;
3105                 struct lu_fid           fid;
3106
3107                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
3108                                     &lname, &fid);
3109                 pobj = mdd_object_find(env, mdd, &fid);
3110                 if (IS_ERR(pobj)) {
3111                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
3112                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
3113                               PTR_ERR(pobj));
3114                         linkea_del_buf(ldata, &lname);
3115                         goto again;
3116                 }
3117
3118                 if (!mdd_object_exists(pobj)) {
3119                         CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
3120                               mdd2obd_dev(mdd)->obd_name, PFID(&fid));
3121                         linkea_del_buf(ldata, &lname);
3122                         mdd_object_put(env, pobj);
3123                         goto again;
3124                 }
3125
3126                 if (pobj == mdd_pobj &&
3127                     lname.ln_namelen == child_name->ln_namelen &&
3128                     strncmp(lname.ln_name, child_name->ln_name,
3129                             lname.ln_namelen) == 0) {
3130                         CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
3131                               mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
3132                               PFID(&fid));
3133                         linkea_del_buf(ldata, &lname);
3134                         mdd_object_put(env, pobj);
3135                         goto again;
3136                 }
3137
3138                 CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
3139                        mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
3140                        PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
3141
3142                 if (declare) {
3143                         /* Remove source name from source directory */
3144                         /* Insert new fid with target name into target dir */
3145                         rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
3146                                                       handle);
3147                         if (rc != 0)
3148                                 GOTO(next_put, rc);
3149
3150                         rc = mdo_declare_index_insert(env, pobj,
3151                                         mdd_object_fid(mdd_tobj),
3152                                         mdd_object_type(mdd_tobj),
3153                                         lname.ln_name, handle);
3154                         if (rc != 0)
3155                                 GOTO(next_put, rc);
3156
3157                         rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3158                         if (rc)
3159                                 GOTO(next_put, rc);
3160
3161                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3162                         if (rc)
3163                                 GOTO(next_put, rc);
3164                 } else {
3165                         char *tmp_name = info->mti_key;
3166
3167                         if (lname.ln_namelen >= sizeof(info->mti_key)) {
3168                                 /* lnamelen is too big(> NAME_MAX + 16),
3169                                  * something wrong about this linkea, let's
3170                                  * skip it */
3171                                 linkea_del_buf(ldata, &lname);
3172                                 mdd_object_put(env, pobj);
3173                                 goto again;
3174                         }
3175
3176                         /* Note: lname might be without \0 at the end, see
3177                          * linkea_entry_unpack(), let's add extra \0 by
3178                          * snprintf */
3179                         snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
3180                                  lname.ln_namelen, lname.ln_name);
3181                         lname.ln_name = tmp_name;
3182
3183                         /* Let's check if this linkEA still valid, before
3184                          * it might be packed into the RPC buffer. */
3185                         rc = mdd_lookup(env, &pobj->mod_obj, &lname,
3186                                         &info->mti_fid, NULL);
3187                         if (rc < 0 ||
3188                             !lu_fid_eq(&info->mti_fid,
3189                                         mdd_object_fid(mdd_sobj))) {
3190                                 /* skip invalid linkea entry */
3191                                 linkea_del_buf(ldata, &lname);
3192                                 mdd_object_put(env, pobj);
3193                                 goto again;
3194                         }
3195
3196                         rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
3197                         if (rc != 0)
3198                                 GOTO(next_put, rc);
3199
3200                         rc = __mdd_index_insert(env, pobj,
3201                                         mdd_object_fid(mdd_tobj),
3202                                         mdd_object_type(mdd_tobj),
3203                                         tmp_name, handle);
3204                         if (rc != 0)
3205                                 GOTO(next_put, rc);
3206
3207                         mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
3208                         rc = mdo_ref_add(env, mdd_tobj, handle);
3209                         mdd_write_unlock(env, mdd_tobj);
3210                         if (rc)
3211                                 GOTO(next_put, rc);
3212
3213                         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3214                         mdo_ref_del(env, mdd_sobj, handle);
3215                         mdd_write_unlock(env, mdd_sobj);
3216                 }
3217 next_put:
3218                 mdd_object_put(env, pobj);
3219                 if (rc != 0)
3220                         break;
3221
3222                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
3223                                                          ldata->ld_reclen);
3224         }
3225
3226         RETURN(rc);
3227 }
3228
3229 static int mdd_migrate_xattrs(const struct lu_env *env,
3230                               struct mdd_object *mdd_sobj,
3231                               struct mdd_object *mdd_tobj)
3232 {
3233         struct mdd_thread_info  *info = mdd_env_info(env);
3234         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3235         char                    *xname;
3236         struct thandle          *handle;
3237         struct lu_buf           xbuf;
3238         int                     xlen;
3239         int                     rem;
3240         int                     xsize;
3241         int                     list_xsize;
3242         struct lu_buf           list_xbuf;
3243         int                     rc;
3244         int                     rc1;
3245
3246         /* retrieve xattr list from the old object */
3247         list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
3248         if (list_xsize == -ENODATA)
3249                 return 0;
3250
3251         if (list_xsize < 0)
3252                 return list_xsize;
3253
3254         lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
3255         if (info->mti_big_buf.lb_buf == NULL)
3256                 return -ENOMEM;
3257
3258         list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
3259         list_xbuf.lb_len = list_xsize;
3260         rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
3261         if (rc < 0)
3262                 return rc;
3263         rc = 0;
3264         rem = list_xsize;
3265         xname = list_xbuf.lb_buf;
3266         while (rem > 0) {
3267                 xlen = strnlen(xname, rem - 1) + 1;
3268                 if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
3269                     strcmp(XATTR_NAME_LMA, xname) == 0 ||
3270                     strcmp(XATTR_NAME_LMV, xname) == 0)
3271                         goto next;
3272
3273                 /* For directory, if there are default layout, migrate here */
3274                 if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
3275                     !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
3276                         goto next;
3277
3278                 xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
3279                 if (xsize == -ENODATA)
3280                         goto next;
3281                 if (xsize < 0)
3282                         GOTO(out, rc);
3283
3284                 lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
3285                 if (info->mti_link_buf.lb_buf == NULL)
3286                         GOTO(out, rc = -ENOMEM);
3287
3288                 xbuf.lb_len = xsize;
3289                 xbuf.lb_buf = info->mti_link_buf.lb_buf;
3290                 rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
3291                 if (rc == -ENODATA)
3292                         goto next;
3293                 if (rc < 0)
3294                         GOTO(out, rc);
3295
3296                 handle = mdd_trans_create(env, mdd);
3297                 if (IS_ERR(handle))
3298                         GOTO(out, rc = PTR_ERR(handle));
3299
3300                 rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
3301                                            handle);
3302                 if (rc != 0)
3303                         GOTO(stop_trans, rc);
3304                 /* Note: this transaction is part of migration, and it is not
3305                  * the last step of migration, so we set th_local = 1 to avoid
3306                  * update last rcvd for this transaction */
3307                 handle->th_local = 1;
3308                 rc = mdd_trans_start(env, mdd, handle);
3309                 if (rc != 0)
3310                         GOTO(stop_trans, rc);
3311
3312                 rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
3313                 if (rc == -EEXIST)
3314                         GOTO(stop_trans, rc = 0);
3315
3316                 if (rc != 0)
3317                         GOTO(stop_trans, rc);
3318 stop_trans:
3319                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3320                 if (rc == 0)
3321                         rc = rc1;
3322                 if (rc != 0)
3323                         GOTO(out, rc);
3324 next:
3325                 rem -= xlen;
3326                 memmove(xname, xname + xlen, rem);
3327         }
3328 out:
3329         return rc;
3330 }
3331
3332 static int mdd_declare_migrate_create(const struct lu_env *env,
3333                                       struct mdd_object *mdd_pobj,
3334                                       struct mdd_object *mdd_sobj,
3335                                       struct mdd_object *mdd_tobj,
3336                                       struct md_op_spec *spec,
3337                                       struct lu_attr *la,
3338                                       union lmv_mds_md *mgr_ea,
3339                                       struct linkea_data *ldata,
3340                                       struct thandle *handle)
3341 {
3342         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3343         const struct lu_buf     *buf;
3344         int                     rc;
3345         int                     mgr_easize;
3346
3347         rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
3348                                                 handle, spec, NULL);
3349         if (rc != 0)
3350                 return rc;
3351
3352         rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
3353                                            handle);
3354         if (rc != 0)
3355                 return rc;
3356
3357         if (S_ISLNK(la->la_mode)) {
3358                 const char *target_name = spec->u.sp_symname;
3359                 int sym_len = strlen(target_name);
3360                 const struct lu_buf *buf;
3361
3362                 buf = mdd_buf_get_const(env, target_name, sym_len);
3363                 rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
3364                                              buf, 0, handle);
3365                 if (rc != 0)
3366                         return rc;
3367         } else if (S_ISDIR(la->la_mode)) {
3368                 rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
3369                                            MLAO_IGNORE);
3370                 if (rc != 0)
3371                         return rc;
3372         }
3373
3374         if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
3375                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
3376                                         spec->u.sp_ea.eadatalen);
3377                 rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
3378                                            0, handle);
3379                 if (rc)
3380                         return rc;
3381         }
3382
3383         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3384         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3385         rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
3386                                    0, handle);
3387         if (rc)
3388                 return rc;
3389
3390         la_flag->la_valid = LA_FLAGS;
3391         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3392         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3393
3394         return rc;
3395 }
3396
3397 static int mdd_migrate_create(const struct lu_env *env,
3398                               struct mdd_object *mdd_pobj,
3399                               struct mdd_object *mdd_sobj,
3400                               struct mdd_object *mdd_tobj,
3401                               const struct lu_name *lname,
3402                               struct lu_attr *la)
3403 {
3404         struct mdd_thread_info  *info = mdd_env_info(env);
3405         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3406         struct md_op_spec       *spec = &info->mti_spec;
3407         struct lu_buf           lmm_buf = { NULL };
3408         struct lu_buf           link_buf = { NULL };
3409         const struct lu_buf     *buf;
3410         struct thandle          *handle;
3411         struct lmv_mds_md_v1    *mgr_ea;
3412         struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
3413         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
3414         int                     mgr_easize;
3415         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3416         int                     rc;
3417         ENTRY;
3418
3419         /* prepare spec for create */
3420         memset(spec, 0, sizeof(*spec));
3421         spec->sp_cr_lookup = 0;
3422         spec->sp_feat = &dt_directory_features;
3423         if (S_ISLNK(la->la_mode)) {
3424                 buf = lu_buf_check_and_alloc(
3425                                 &mdd_env_info(env)->mti_big_buf,
3426                                 la->la_size + 1);
3427                 link_buf = *buf;
3428                 link_buf.lb_len = la->la_size + 1;
3429                 memset(link_buf.lb_buf, 0, link_buf.lb_len);
3430                 rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
3431                 if (rc <= 0) {
3432                         rc = rc != 0 ? rc : -EFAULT;
3433                         CERROR("%s: "DFID" readlink failed: rc = %d\n",
3434                                mdd2obd_dev(mdd)->obd_name,
3435                                PFID(mdd_object_fid(mdd_sobj)), rc);
3436                         RETURN(rc);
3437                 }
3438                 spec->u.sp_symname = link_buf.lb_buf;
3439         } else if (S_ISREG(la->la_mode)) {
3440                 /* retrieve lov of the old object */
3441                 rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
3442                 if (rc != 0 && rc != -ENODATA)
3443                         RETURN(rc);
3444                 if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
3445                         spec->u.sp_ea.eadata = lmm_buf.lb_buf;
3446                         spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
3447                         spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
3448                 }
3449         } else if (S_ISDIR(la->la_mode)) {
3450                 rc = mdd_links_read(env, mdd_sobj, ldata);
3451                 if (rc < 0 && rc != -ENODATA)
3452                         RETURN(rc);
3453         }
3454
3455         mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
3456         memset(mgr_ea, 0, sizeof(*mgr_ea));
3457         mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
3458         mgr_ea->lmv_stripe_count = cpu_to_le32(2);
3459         mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
3460         mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
3461         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
3462         fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
3463
3464         mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
3465
3466         handle = mdd_trans_create(env, mdd);
3467         if (IS_ERR(handle))
3468                 GOTO(out_free, rc = PTR_ERR(handle));
3469
3470         /* Note: this transaction is part of migration, and it is not
3471          * the last step of migration, so we set th_local = 1 to avoid
3472          * update last rcvd for this transaction */
3473         handle->th_local = 1;
3474         rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
3475                                         spec, la,
3476                                         (union lmv_mds_md *)info->mti_xattr_buf,
3477                                         ldata, handle);
3478         if (rc != 0)
3479                 GOTO(stop_trans, rc);
3480
3481         rc = mdd_trans_start(env, mdd, handle);
3482         if (rc != 0)
3483                 GOTO(stop_trans, rc);
3484
3485         /* create the target object */
3486         rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
3487                                hint, handle);
3488         if (rc != 0)
3489                 GOTO(stop_trans, rc);
3490
3491         if (S_ISDIR(la->la_mode)) {
3492                 rc = mdd_links_write(env, mdd_tobj, ldata, handle);
3493                 if (rc != 0)
3494                         GOTO(stop_trans, rc);
3495         }
3496
3497         /* Set MIGRATE EA on the source inode, so once the migration needs
3498          * to be re-done during failover, the re-do process can locate the
3499          * target object which is already being created. */
3500         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
3501         buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
3502         rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
3503         if (rc != 0)
3504                 GOTO(stop_trans, rc);
3505
3506         /* Set immutable flag, so any modification is disabled until
3507          * the migration is done. Once the migration is interrupted,
3508          * if the resume process find the migrating object has both
3509          * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
3510          * flag and approve the migration */
3511         la_flag->la_valid = LA_FLAGS;
3512         la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
3513         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3514 stop_trans:
3515         if (handle != NULL) {
3516                 int rc1;
3517
3518                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3519                 if (rc == 0)
3520                         rc = rc1;
3521         }
3522 out_free:
3523         if (lmm_buf.lb_buf != NULL)
3524                 OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
3525         RETURN(rc);
3526 }
3527
3528 static int mdd_migrate_entries(const struct lu_env *env,
3529                                struct mdd_object *mdd_sobj,
3530                                struct mdd_object *mdd_tobj)
3531 {
3532         struct dt_object        *next = mdd_object_child(mdd_sobj);
3533         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3534         struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
3535         struct thandle          *handle;
3536         struct dt_it            *it;
3537         const struct dt_it_ops  *iops;
3538         int                      rc;
3539         int                      result;
3540         struct lu_dirent        *ent;
3541         ENTRY;
3542
3543         OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
3544         if (ent == NULL)
3545                 RETURN(-ENOMEM);
3546
3547         if (!dt_try_as_dir(env, next))
3548                 GOTO(out_ent, rc = -ENOTDIR);
3549         /*
3550          * iterate directories
3551          */
3552         iops = &next->do_index_ops->dio_it;
3553         it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
3554         if (IS_ERR(it))
3555                 GOTO(out_ent, rc = PTR_ERR(it));
3556
3557         rc = iops->load(env, it, 0);
3558         if (rc == 0)
3559                 rc = iops->next(env, it);
3560         else if (rc > 0)
3561                 rc = 0;
3562         /*
3563          * At this point and across for-loop:
3564          *
3565          *  rc == 0 -> ok, proceed.
3566          *  rc >  0 -> end of directory.
3567          *  rc <  0 -> error.
3568          */
3569         do {
3570                 struct mdd_object       *child;
3571                 char                    *name = mdd_env_info(env)->mti_key;
3572                 int                     len;
3573                 int                     recsize;
3574                 int                     is_dir;
3575                 bool                    target_exist = false;
3576                 int                     rc1;
3577
3578                 len = iops->key_size(env, it);
3579                 if (len == 0)
3580                         goto next;
3581
3582                 result = iops->rec(env, it, (struct dt_rec *)ent,
3583                                    LUDA_FID | LUDA_TYPE);
3584                 if (result == -ESTALE)
3585                         goto next;
3586                 if (result != 0) {
3587                         rc = result;
3588                         goto out;
3589                 }
3590
3591                 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
3592                 recsize = le16_to_cpu(ent->lde_reclen);
3593
3594                 /* Insert new fid with target name into target dir */
3595                 if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
3596                     (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
3597                      ent->lde_name[1] == '.'))
3598                         goto next;
3599
3600                 child = mdd_object_find(env, mdd, &ent->lde_fid);
3601                 if (IS_ERR(child))
3602                         GOTO(out, rc = PTR_ERR(child));
3603
3604                 mdd_write_lock(env, child, MOR_SRC_CHILD);
3605                 is_dir = S_ISDIR(mdd_object_type(child));
3606
3607                 snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
3608
3609                 /* Check whether the name has been inserted to the target */
3610                 if (dt_try_as_dir(env, dt_tobj)) {
3611                         struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
3612
3613                         rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
3614                                        (struct dt_key *)name);
3615                         if (unlikely(rc == 0))
3616                                 target_exist = true;
3617                 }
3618
3619                 handle = mdd_trans_create(env, mdd);
3620                 if (IS_ERR(handle))
3621                         GOTO(out, rc = PTR_ERR(handle));
3622
3623                 /* Note: this transaction is part of migration, and it is not
3624                  * the last step of migration, so we set th_local = 1 to avoid
3625                  * updating last rcvd for this transaction */
3626                 handle->th_local = 1;
3627                 if (likely(!target_exist)) {
3628                         rc = mdo_declare_index_insert(env, mdd_tobj,
3629                                                       &ent->lde_fid,
3630                                                       mdd_object_type(child),
3631                                                       name, handle);
3632                         if (rc != 0)
3633                                 GOTO(out_put, rc);
3634
3635                         if (is_dir) {
3636                                 rc = mdo_declare_ref_add(env, mdd_tobj, handle);
3637                                 if (rc != 0)
3638                                         GOTO(out_put, rc);
3639                         }
3640                 }
3641
3642                 rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
3643                 if (rc != 0)
3644                         GOTO(out_put, rc);
3645
3646                 if (is_dir) {
3647                         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3648                         if (rc != 0)
3649                                 GOTO(out_put, rc);
3650
3651                         /* Update .. for child */
3652                         rc = mdo_declare_index_delete(env, child, dotdot,
3653                                                       handle);
3654                         if (rc != 0)
3655                                 GOTO(out_put, rc);
3656
3657                         rc = mdo_declare_index_insert(env, child,
3658                                                       mdd_object_fid(mdd_tobj),
3659                                                       S_IFDIR, dotdot, handle);
3660                         if (rc != 0)
3661                                 GOTO(out_put, rc);
3662                 }
3663
3664                 rc = mdd_linkea_declare_update_child(env, mdd_tobj,
3665                                                      child, name,
3666                                                      strlen(name),
3667                                                      handle);
3668                 if (rc != 0)
3669                         GOTO(out_put, rc);
3670
3671                 rc = mdd_trans_start(env, mdd, handle);
3672                 if (rc != 0) {
3673                         CERROR("%s: transaction start failed: rc = %d\n",
3674                                mdd2obd_dev(mdd)->obd_name, rc);
3675                         GOTO(out_put, rc);
3676                 }
3677
3678                 if (likely(!target_exist)) {
3679                         rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
3680                                                 mdd_object_type(child),
3681                                                 name, handle);
3682                         if (rc != 0)
3683                                 GOTO(out_put, rc);
3684                 }
3685
3686                 rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
3687                 if (rc != 0)
3688                         GOTO(out_put, rc);
3689
3690                 if (is_dir) {
3691                         rc = __mdd_index_delete_only(env, child, dotdot,
3692                                                      handle);
3693                         if (rc != 0)
3694                                 GOTO(out_put, rc);
3695
3696                         rc = __mdd_index_insert_only(env, child,
3697                                          mdd_object_fid(mdd_tobj), S_IFDIR,
3698                                          dotdot, handle);
3699                         if (rc != 0)
3700                                 GOTO(out_put, rc);
3701                 }
3702
3703                 rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
3704                                              strlen(name), handle);
3705
3706 out_put:
3707                 mdd_write_unlock(env, child);
3708                 mdd_object_put(env, child);
3709                 rc1 = mdd_trans_stop(env, mdd, rc, handle);
3710                 if (rc == 0)
3711                         rc = rc1;
3712
3713                 if (rc != 0)
3714                         GOTO(out, rc);
3715 next:
3716                 result = iops->next(env, it);
3717                 if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
3718                         GOTO(out, rc = -EINTR);
3719
3720                 if (result == -ESTALE)
3721                         goto next;
3722         } while (result == 0);
3723 out:
3724         iops->put(env, it);
3725         iops->fini(env, it);
3726 out_ent:
3727         OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
3728         RETURN(rc);
3729 }
3730
3731 static int mdd_declare_update_linkea(const struct lu_env *env,
3732                                      struct mdd_object *mdd_pobj,
3733                                      struct mdd_object *mdd_sobj,
3734                                      struct mdd_object *mdd_tobj,
3735                                      const struct lu_name *child_name,
3736                                      struct linkea_data *ldata,
3737                                      struct thandle *handle)
3738 {
3739         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3740                                           child_name, ldata, handle, 1);
3741 }
3742
3743 static int mdd_update_linkea(const struct lu_env *env,
3744                              struct mdd_object *mdd_pobj,
3745                              struct mdd_object *mdd_sobj,
3746                              struct mdd_object *mdd_tobj,
3747                              const struct lu_name *child_name,
3748                              struct linkea_data *ldata,
3749                              struct thandle *handle)
3750 {
3751         return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
3752                                           child_name, ldata, handle, 0);
3753 }
3754
3755 static int mdd_declare_migrate_update_name(const struct lu_env *env,
3756                                            struct mdd_object *mdd_pobj,
3757                                            struct mdd_object *mdd_sobj,
3758                                            struct mdd_object *mdd_tobj,
3759                                            const struct lu_name *lname,
3760                                            struct lu_attr *la,
3761                                            struct lu_attr *parent_la,
3762                                            struct linkea_data *ldata,
3763                                            struct thandle *handle)
3764 {
3765         struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
3766         int             rc;
3767
3768         /* Revert IMMUTABLE flag */
3769         la_flag->la_valid = LA_FLAGS;
3770         la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
3771         rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
3772         if (rc != 0)
3773                 return rc;
3774
3775         /* delete entry from source dir */
3776         rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
3777         if (rc != 0)
3778                 return rc;
3779
3780         if (ldata->ld_buf != NULL) {
3781                 rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
3782                                                mdd_tobj, lname, ldata, handle);
3783                 if (rc != 0)
3784                         return rc;
3785         }
3786
3787         if (S_ISREG(mdd_object_type(mdd_sobj))) {
3788                 rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3789                                            handle);
3790                 if (rc != 0)
3791                         return rc;
3792
3793                 handle->th_complex = 1;
3794                 rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
3795                                            XATTR_NAME_FID,
3796                                            LU_XATTR_REPLACE, handle);
3797                 if (rc < 0)
3798                         return rc;
3799         }
3800
3801         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3802                 rc = mdo_declare_ref_del(env, mdd_pobj, handle);
3803                 if (rc != 0)
3804                         return rc;
3805         }
3806
3807         /* new name */
3808         rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
3809                                       mdd_object_type(mdd_tobj),
3810                                       lname->ln_name, handle);
3811         if (rc != 0)
3812                 return rc;
3813
3814         rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
3815         if (rc != 0)
3816                 return rc;
3817
3818         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3819                 rc = mdo_declare_ref_add(env, mdd_pobj, handle);
3820                 if (rc != 0)
3821                         return rc;
3822         }
3823
3824         /* delete old object */
3825         rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3826         if (rc != 0)
3827                 return rc;
3828
3829         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
3830                 /* delete old object */
3831                 rc = mdo_declare_ref_del(env, mdd_sobj, handle);
3832                 if (rc != 0)
3833                         return rc;
3834                 /* set nlink to 0 */
3835                 rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
3836                 if (rc != 0)
3837                         return rc;
3838         }
3839
3840         rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
3841         if (rc)
3842                 return rc;
3843
3844         rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
3845
3846         return rc;
3847 }
3848
3849 static int mdd_migrate_update_name(const struct lu_env *env,
3850                                    struct mdd_object *mdd_pobj,
3851                                    struct mdd_object *mdd_sobj,
3852                                    struct mdd_object *mdd_tobj,
3853                                    const struct lu_name *lname,
3854                                    struct md_attr *ma)
3855 {
3856         struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
3857         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
3858         struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
3859         struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
3860         struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
3861         struct thandle          *handle;
3862         int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
3863         const char              *name = lname->ln_name;
3864         int                     rc;
3865         ENTRY;
3866
3867         /* update time for parent */
3868         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
3869         p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
3870         p_la->la_valid = LA_CTIME;
3871
3872         rc = mdd_la_get(env, mdd_sobj, so_attr);
3873         if (rc != 0)
3874                 RETURN(rc);
3875
3876         ldata->ld_buf = NULL;
3877         rc = mdd_links_read(env, mdd_sobj, ldata);
3878         if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
3879                 RETURN(rc);
3880
3881         handle = mdd_trans_create(env, mdd);
3882         if (IS_ERR(handle))
3883                 RETURN(PTR_ERR(handle));
3884
3885         rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
3886                                              lname, so_attr, p_la, ldata,
3887                                              handle);
3888         if (rc != 0) {
3889                 /* If the migration can not be fit in one transaction, just
3890                  * leave it in the original MDT */
3891                 if (rc == -E2BIG)
3892                         GOTO(stop_trans, rc = 0);
3893                 else
3894                         GOTO(stop_trans, rc);
3895         }
3896
3897         CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
3898                mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
3899                PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
3900                PFID(mdd_object_fid(mdd_tobj)));
3901
3902         rc = mdd_trans_start(env, mdd, handle);
3903         if (rc != 0)
3904                 GOTO(stop_trans, rc);
3905
3906         /* Revert IMMUTABLE flag */
3907         la_flag->la_valid = LA_FLAGS;
3908         la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
3909         rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
3910         if (rc != 0)
3911                 GOTO(stop_trans, rc);
3912
3913         /* Remove source name from source directory */
3914         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
3915         if (rc != 0)
3916                 GOTO(stop_trans, rc);
3917
3918         if (ldata->ld_buf != NULL) {
3919                 rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
3920                                        lname, ldata, handle);
3921                 if (rc != 0)
3922                         GOTO(stop_trans, rc);
3923         }
3924
3925         if (S_ISREG(so_attr->la_mode)) {
3926                 if (so_attr->la_nlink == 1) {
3927                         rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
3928                                            handle);
3929                         if (rc != 0 && rc != -ENODATA)
3930                                 GOTO(stop_trans, rc);
3931
3932                         rc = mdo_xattr_set(env, mdd_tobj, NULL,
3933                                            XATTR_NAME_FID,
3934                                            LU_XATTR_REPLACE, handle);
3935                         if (rc < 0)
3936                                 GOTO(stop_trans, rc);
3937                 }
3938         }
3939
3940         /* Insert new fid with target name into target dir */
3941         rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
3942                                 mdd_object_type(mdd_tobj), name, handle);
3943         if (rc != 0)
3944                 GOTO(stop_trans, rc);
3945
3946         linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
3947         rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
3948                            ldata, 1);
3949         if (rc != 0)
3950                 GOTO(stop_trans, rc);
3951
3952         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
3953
3954         /* Increase mod_count to add the source object to the orphan list,
3955          * so if other clients still send RPC to the old object, then these
3956          * objects can help the request to find the new object, see
3957          * mdt_reint_open() */
3958         mdd_sobj->mod_count++;
3959         rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
3960         mdd_sobj->mod_count--;
3961         if (rc != 0)
3962                 GOTO(out_unlock, rc);
3963
3964         mdo_ref_del(env, mdd_sobj, handle);
3965         if (is_dir)
3966                 mdo_ref_del(env, mdd_sobj, handle);
3967
3968         /* Get the attr again after ref_del */
3969         rc = mdd_la_get(env, mdd_sobj, so_attr);
3970         if (rc != 0)
3971                 GOTO(out_unlock, rc);
3972
3973         ma->ma_attr = *so_attr;
3974         ma->ma_valid |= MA_INODE;
3975
3976         rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
3977         if (rc != 0)
3978                 GOTO(out_unlock, rc);
3979
3980 out_unlock:
3981         mdd_write_unlock(env, mdd_sobj);
3982
3983 stop_trans:
3984         mdd_trans_stop(env, mdd, rc, handle);
3985
3986         RETURN(rc);
3987 }
3988
3989 /**
3990  * Check whether we should migrate the file/dir
3991  * return val
3992  *      < 0  permission check failed or other error.
3993  *      = 0  the file can be migrated.
3994  *      > 0  the file does not need to be migrated, mostly
3995  *           for multiple link file
3996  **/
3997 static int mdd_migrate_sanity_check(const struct lu_env *env,
3998                                     struct mdd_object *pobj,
3999                                     const struct lu_attr *pattr,
4000                                     struct mdd_object *sobj,
4001                                     struct lu_attr *sattr)
4002 {
4003         struct mdd_thread_info  *info = mdd_env_info(env);
4004         struct linkea_data      *ldata = &info->mti_link_data;
4005         int                     mgr_easize;
4006         struct lu_buf           *mgr_buf;
4007         int                     count;
4008         int                     rc;
4009
4010         ENTRY;
4011
4012         mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
4013         mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
4014         if (mgr_buf->lb_buf == NULL)
4015                 RETURN(-ENOMEM);
4016
4017         rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
4018         if (rc > 0) {
4019                 union lmv_mds_md *lmm = mgr_buf->lb_buf;
4020
4021                 /* If the object has migrateEA, it means IMMUTE flag
4022                  * is being set by previous migration process, so it
4023                  * needs to override the IMMUTE flag, otherwise the
4024                  * following sanity check will fail */
4025                 if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
4026                                                 LMV_HASH_FLAG_MIGRATION) {
4027                         struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
4028
4029                         sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
4030                         CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
4031                                mdd2obd_dev(mdd)->obd_name,
4032                                PFID(mdd_object_fid(sobj)));
4033                 }
4034         }
4035
4036         rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
4037                                      sobj, sattr, NULL, NULL);
4038         if (rc != 0)
4039                 RETURN(rc);
4040
4041         /* Then it will check if the file should be migrated. If the file
4042          * has mulitple links, we only need migrate the file if all of its
4043          * entries has been migrated to the remote MDT */
4044         if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
4045                 RETURN(0);
4046
4047         rc = mdd_links_read(env, sobj, ldata);
4048         if (rc != 0) {
4049                 /* For multiple links files, if there are no linkEA data at all,
4050                  * means the file might be created before linkEA is enabled, and
4051                  * all of its links should not be migrated yet, otherwise it
4052                  * should have some linkEA there */
4053                 if (rc == -ENOENT || rc == -ENODATA)
4054                         RETURN(1);
4055                 RETURN(rc);
4056         }
4057
4058         /* If there are still links locally, then the file will not be
4059          * migrated. */
4060         LASSERT(ldata->ld_leh != NULL);
4061         ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
4062         for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
4063                 struct mdd_device       *mdd = mdo2mdd(&sobj->mod_obj);
4064                 struct mdd_object       *lpobj;
4065                 struct lu_name          lname;
4066                 struct lu_fid           fid;
4067
4068                 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
4069                                     &lname, &fid);
4070                 ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
4071                                                          ldata->ld_reclen);
4072                 lpobj = mdd_object_find(env, mdd, &fid);
4073                 if (IS_ERR(lpobj)) {
4074                         CWARN("%s: cannot find obj "DFID": rc = %ld\n",
4075                               mdd2obd_dev(mdd)->obd_name, PFID(&fid),
4076                               PTR_ERR(lpobj));
4077                         continue;
4078                 }
4079
4080                 if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
4081                         CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
4082                                PFID(&fid), lname.ln_namelen, lname.ln_name);
4083                         mdd_object_put(env, lpobj);
4084                         continue;
4085                 }
4086
4087                 CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
4088                        PFID(mdd_object_fid(sobj)), lname.ln_namelen,
4089                        lname.ln_name, PFID(&fid));
4090                 mdd_object_put(env, lpobj);
4091                 rc = 1;
4092                 break;
4093         }
4094
4095         RETURN(rc);
4096 }
4097
4098 static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
4099                        struct md_object *sobj, const struct lu_name *lname,
4100                        struct md_object *tobj, struct md_attr *ma)
4101 {
4102         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
4103         struct mdd_device       *mdd = mdo2mdd(pobj);
4104         struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
4105         struct mdd_object       *mdd_tobj = NULL;
4106         struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
4107         struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
4108         int                     rc;
4109
4110         ENTRY;
4111         /* If the file will being migrated, it will check whether
4112          * the file is being opened by someone else right now */
4113         mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
4114         if (mdd_sobj->mod_count > 0) {
4115                 CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
4116                        mdd2obd_dev(mdd)->obd_name,
4117                        PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
4118                        mdd_sobj->mod_count, -EBUSY);
4119                 mdd_read_unlock(env, mdd_sobj);
4120                 GOTO(put, rc = -EBUSY);
4121         }
4122         mdd_read_unlock(env, mdd_sobj);
4123
4124         rc = mdd_la_get(env, mdd_sobj, so_attr);
4125         if (rc != 0)
4126                 GOTO(put, rc);
4127
4128         rc = mdd_la_get(env, mdd_pobj, pattr);
4129         if (rc != 0)
4130                 GOTO(put, rc);
4131
4132         rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
4133         if (rc != 0) {
4134                 if (rc > 0)
4135                         rc = 0;
4136                 GOTO(put, rc);
4137         }
4138
4139         /* Sigh, it is impossible to finish all of migration in a single
4140          * transaction, for example migrating big directory entries to the
4141          * new MDT, it needs insert all of name entries of children in the
4142          * new directory.
4143          *
4144          * So migration will be done in multiple steps and transactions.
4145          *
4146          * 1. create an orphan object on the remote MDT in one transaction.
4147          * 2. migrate extend attributes to the new target file/directory.
4148          * 3. For directory, migrate the entries to the new MDT and update
4149          * linkEA of each children. Because we can not migrate all entries
4150          * in a single transaction, so the migrating directory will become
4151          * a striped directory during migration, so once the process is
4152          * interrupted, the directory is still accessible. (During lookup,
4153          * client will locate the name by searching both original and target
4154          * object).
4155          * 4. Finally, update the name/FID to point to the new file/directory
4156          * in a separate transaction.
4157          */
4158
4159         /* step 1: Check whether the orphan object has been created, and create
4160          * orphan object on the remote MDT if needed */
4161         mdd_tobj = md2mdd_obj(tobj);
4162         if (!mdd_object_exists(mdd_tobj)) {
4163                 rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
4164                                         lname, so_attr);
4165                 if (rc != 0)
4166                         GOTO(put, rc);
4167         }
4168
4169         LASSERT(mdd_object_exists(mdd_tobj));
4170         /* step 2: migrate xattr */
4171         rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
4172         if (rc != 0)
4173                 GOTO(put, rc);
4174
4175         /* step 3: migrate name entries to the orphan object */
4176         if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
4177                 rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
4178                 if (rc != 0)
4179                         GOTO(put, rc);
4180                 if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
4181                                                   OBD_FAIL_MDS_REINT_NET_REP)))
4182                         GOTO(put, rc = 0);
4183         } else {
4184                 OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
4185         }
4186
4187         LASSERT(mdd_object_exists(mdd_tobj));
4188         /* step 4: update name entry to the new object */
4189         rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
4190                                      ma);
4191         if (rc != 0)
4192                 GOTO(put, rc);
4193 put:
4194         RETURN(rc);
4195 }
4196
4197 const struct md_dir_operations mdd_dir_ops = {
4198         .mdo_is_subdir     = mdd_is_subdir,
4199         .mdo_lookup        = mdd_lookup,
4200         .mdo_create        = mdd_create,
4201         .mdo_rename        = mdd_rename,
4202         .mdo_link          = mdd_link,
4203         .mdo_unlink        = mdd_unlink,
4204         .mdo_create_data   = mdd_create_data,
4205         .mdo_migrate       = mdd_migrate,
4206 };