Whamcloud - gitweb
LU-3097 build: fix 'no effect' errors
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static int __mdd_lookup(const struct lu_env *, struct md_object *,
61                         const struct lu_name *, struct lu_fid*, int);
62
63 static int
64 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
65                     const struct lu_name *lname, struct lu_fid* fid, int mask)
66 {
67         const char *name = lname->ln_name;
68         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
69         struct dynlock_handle *dlh;
70         int rc;
71
72         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
73         if (unlikely(dlh == NULL))
74                 return -ENOMEM;
75         rc = __mdd_lookup(env, pobj, lname, fid, mask);
76         mdd_pdo_read_unlock(env, mdd_obj, dlh);
77
78         return rc;
79 }
80
81 int mdd_lookup(const struct lu_env *env,
82                struct md_object *pobj, const struct lu_name *lname,
83                struct lu_fid* fid, struct md_op_spec *spec)
84 {
85         int rc;
86         ENTRY;
87         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
88         RETURN(rc);
89 }
90
91 int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
92                    struct lu_fid *fid)
93 {
94         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
95 }
96
97 /*
98  * For root fid use special function, which does not compare version component
99  * of fid. Version component is different for root fids on all MDTs.
100  */
101 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
102 {
103         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
104                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
105 }
106
107 /*
108  * return 1: if lf is the fid of the ancestor of p1;
109  * return 0: if not;
110  *
111  * return -EREMOTE: if remote object is found, in this
112  * case fid of remote object is saved to @pf;
113  *
114  * otherwise: values < 0, errors.
115  */
116 static int mdd_is_parent(const struct lu_env *env,
117                          struct mdd_device *mdd,
118                          struct mdd_object *p1,
119                          const struct lu_fid *lf,
120                          struct lu_fid *pf)
121 {
122         struct mdd_object *parent = NULL;
123         struct lu_fid *pfid;
124         int rc;
125         ENTRY;
126
127         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
128         pfid = &mdd_env_info(env)->mti_fid;
129
130         /* Check for root first. */
131         if (mdd_is_root(mdd, mdo2fid(p1)))
132                 RETURN(0);
133
134         for(;;) {
135                 /* this is done recursively, bypass capa for each obj */
136                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
137                 rc = mdd_parent_fid(env, p1, pfid);
138                 if (rc)
139                         GOTO(out, rc);
140                 if (mdd_is_root(mdd, pfid))
141                         GOTO(out, rc = 0);
142                 if (lu_fid_eq(pfid, lf))
143                         GOTO(out, rc = 1);
144                 if (parent)
145                         mdd_object_put(env, parent);
146
147                 parent = mdd_object_find(env, mdd, pfid);
148                 if (IS_ERR(parent)) {
149                         GOTO(out, rc = PTR_ERR(parent));
150                 } else if (mdd_object_remote(parent)) {
151                         /*FIXME: Because of the restriction of rename in Phase I.
152                          * If the parent is remote, we just assumed lf is not the
153                          * parent of P1 for now */
154                         GOTO(out, rc = 0);
155                 }
156                 p1 = parent;
157         }
158         EXIT;
159 out:
160         if (parent && !IS_ERR(parent))
161                 mdd_object_put(env, parent);
162         return rc;
163 }
164
165 /*
166  * No permission check is needed.
167  *
168  * returns 1: if fid is ancestor of @mo;
169  * returns 0: if fid is not a ancestor of @mo;
170  *
171  * returns EREMOTE if remote object is found, fid of remote object is saved to
172  * @fid;
173  *
174  * returns < 0: if error
175  */
176 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
177                   const struct lu_fid *fid, struct lu_fid *sfid)
178 {
179         struct mdd_device *mdd = mdo2mdd(mo);
180         int rc;
181         ENTRY;
182
183         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
184                 RETURN(0);
185
186         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
187         if (rc == 0) {
188                 /* found root */
189                 fid_zero(sfid);
190         } else if (rc == 1) {
191                 /* found @fid is parent */
192                 *sfid = *fid;
193                 rc = 0;
194         }
195         RETURN(rc);
196 }
197
198 /*
199  * Check that @dir contains no entries except (possibly) dot and dotdot.
200  *
201  * Returns:
202  *
203  *             0        empty
204  *      -ENOTDIR        not a directory object
205  *    -ENOTEMPTY        not empty
206  *           -ve        other error
207  *
208  */
209 static int mdd_dir_is_empty(const struct lu_env *env,
210                             struct mdd_object *dir)
211 {
212         struct dt_it     *it;
213         struct dt_object *obj;
214         const struct dt_it_ops *iops;
215         int result;
216         ENTRY;
217
218         obj = mdd_object_child(dir);
219         if (!dt_try_as_dir(env, obj))
220                 RETURN(-ENOTDIR);
221
222         iops = &obj->do_index_ops->dio_it;
223         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
224         if (!IS_ERR(it)) {
225                 result = iops->get(env, it, (const void *)"");
226                 if (result > 0) {
227                         int i;
228                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
229                                 result = iops->next(env, it);
230                         if (result == 0)
231                                 result = -ENOTEMPTY;
232                         else if (result == +1)
233                                 result = 0;
234                 } else if (result == 0)
235                         /*
236                          * Huh? Index contains no zero key?
237                          */
238                         result = -EIO;
239
240                 iops->put(env, it);
241                 iops->fini(env, it);
242         } else
243                 result = PTR_ERR(it);
244         RETURN(result);
245 }
246
247 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
248 {
249         struct mdd_device *m = mdd_obj2mdd_dev(obj);
250         struct lu_attr *la = &mdd_env_info(env)->mti_la;
251         int rc;
252         ENTRY;
253
254         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
255         if (rc)
256                 RETURN(rc);
257
258         /*
259          * Subdir count limitation can be broken through.
260          */
261         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
262             !S_ISDIR(la->la_mode))
263                 RETURN(-EMLINK);
264         else
265                 RETURN(0);
266 }
267
268 /*
269  * Check whether it may create the cobj under the pobj.
270  * cobj maybe NULL
271  */
272 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
273                    struct mdd_object *cobj, int check_perm, int check_nlink)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (cobj && mdd_object_exists(cobj))
279                 RETURN(-EEXIST);
280
281         if (mdd_is_dead_obj(pobj))
282                 RETURN(-ENOENT);
283
284         if (check_perm)
285                 rc = mdd_permission_internal_locked(env, pobj, NULL,
286                                                     MAY_WRITE | MAY_EXEC,
287                                                     MOR_TGT_PARENT);
288         if (!rc && check_nlink)
289                 rc = __mdd_may_link(env, pobj);
290
291         RETURN(rc);
292 }
293
294 /*
295  * Check whether can unlink from the pobj in the case of "cobj == NULL".
296  */
297 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
298                    const struct lu_attr *attr)
299 {
300         int rc;
301         ENTRY;
302
303         if (mdd_is_dead_obj(pobj))
304                 RETURN(-ENOENT);
305
306         if ((attr->la_valid & LA_FLAGS) &&
307             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
308                 RETURN(-EPERM);
309
310         rc = mdd_permission_internal_locked(env, pobj, NULL,
311                                             MAY_WRITE | MAY_EXEC,
312                                             MOR_TGT_PARENT);
313         if (rc)
314                 RETURN(rc);
315
316         if (mdd_is_append(pobj))
317                 RETURN(-EPERM);
318
319         RETURN(rc);
320 }
321
322 /*
323  * pobj == NULL is remote ops case, under such case, pobj's
324  * VTX feature has been checked already, no need check again.
325  */
326 static inline int mdd_is_sticky(const struct lu_env *env,
327                                 struct mdd_object *pobj,
328                                 struct mdd_object *cobj)
329 {
330         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
331         struct lu_ucred *uc = lu_ucred_assert(env);
332         int rc;
333
334         if (pobj) {
335                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
336                 if (rc)
337                         return rc;
338
339                 if (!(tmp_la->la_mode & S_ISVTX) ||
340                     (tmp_la->la_uid == uc->uc_fsuid))
341                         return 0;
342         }
343
344         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
345         if (rc)
346                 return rc;
347
348         if (tmp_la->la_uid == uc->uc_fsuid)
349                 return 0;
350
351         return !md_capable(uc, CFS_CAP_FOWNER);
352 }
353
354 static int mdd_may_delete_entry(const struct lu_env *env,
355                                 struct mdd_object *pobj, int check_perm)
356 {
357         ENTRY;
358
359         LASSERT(pobj != NULL);
360         if (!mdd_object_exists(pobj))
361                 RETURN(-ENOENT);
362
363         if (mdd_is_dead_obj(pobj))
364                 RETURN(-ENOENT);
365
366         if (check_perm) {
367                 int rc;
368                 rc = mdd_permission_internal_locked(env, pobj, NULL,
369                                             MAY_WRITE | MAY_EXEC,
370                                             MOR_TGT_PARENT);
371                 if (rc)
372                         RETURN(rc);
373         }
374
375         if (mdd_is_append(pobj))
376                 RETURN(-EPERM);
377
378         RETURN(0);
379 }
380
381 /*
382  * Check whether it may delete the cobj from the pobj.
383  * pobj maybe NULL
384  */
385 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
386                    struct mdd_object *cobj, struct lu_attr *cattr,
387                    struct lu_attr *src_attr, int check_perm, int check_empty)
388 {
389         int rc = 0;
390         ENTRY;
391
392         if (pobj) {
393                 rc = mdd_may_delete_entry(env, pobj, check_perm);
394                 if (rc != 0)
395                         RETURN(rc);
396         }
397
398         if (cobj == NULL)
399                 RETURN(0);
400
401         if (!mdd_object_exists(cobj))
402                 RETURN(-ENOENT);
403
404         if (mdd_is_dead_obj(cobj))
405                 RETURN(-ESTALE);
406
407
408         if (mdd_is_sticky(env, pobj, cobj))
409                 RETURN(-EPERM);
410
411         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
412                 RETURN(-EPERM);
413
414         if ((cattr->la_valid & LA_FLAGS) &&
415             (cattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
416                 RETURN(-EPERM);
417
418         /* additional check the rename case */
419         if (src_attr) {
420                 if (S_ISDIR(src_attr->la_mode)) {
421                         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
422
423                         if (!S_ISDIR(cattr->la_mode))
424                                 RETURN(-ENOTDIR);
425
426                         if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
427                                 RETURN(-EBUSY);
428                 } else if (S_ISDIR(cattr->la_mode))
429                         RETURN(-EISDIR);
430         }
431
432         if (S_ISDIR(cattr->la_mode) && check_empty)
433                 rc = mdd_dir_is_empty(env, cobj);
434
435         RETURN(rc);
436 }
437
438 /*
439  * tgt maybe NULL
440  * has mdd_write_lock on src already, but not on tgt yet
441  */
442 int mdd_link_sanity_check(const struct lu_env *env,
443                           struct mdd_object *tgt_obj,
444                           const struct lu_name *lname,
445                           struct mdd_object *src_obj)
446 {
447         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
448         int rc = 0;
449         ENTRY;
450
451         if (!mdd_object_exists(src_obj))
452                 RETURN(-ENOENT);
453
454         if (mdd_is_dead_obj(src_obj))
455                 RETURN(-ESTALE);
456
457         /* Local ops, no lookup before link, check filename length here. */
458         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
459                 RETURN(-ENAMETOOLONG);
460
461         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
462                 RETURN(-EPERM);
463
464         if (S_ISDIR(mdd_object_type(src_obj)))
465                 RETURN(-EPERM);
466
467         LASSERT(src_obj != tgt_obj);
468         if (tgt_obj) {
469                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
470                 if (rc)
471                         RETURN(rc);
472         }
473
474         rc = __mdd_may_link(env, src_obj);
475
476         RETURN(rc);
477 }
478
479 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
480                                    const char *name, struct thandle *handle,
481                                    struct lustre_capa *capa)
482 {
483         struct dt_object *next = mdd_object_child(pobj);
484         int               rc;
485         ENTRY;
486
487         if (dt_try_as_dir(env, next)) {
488                 rc = next->do_index_ops->dio_delete(env, next,
489                                                     (struct dt_key *)name,
490                                                     handle, capa);
491         } else
492                 rc = -ENOTDIR;
493
494         RETURN(rc);
495 }
496
497 static int __mdd_index_insert_only(const struct lu_env *env,
498                                    struct mdd_object *pobj,
499                                    const struct lu_fid *lf, const char *name,
500                                    struct thandle *handle,
501                                    struct lustre_capa *capa)
502 {
503         struct dt_object *next = mdd_object_child(pobj);
504         int               rc;
505         ENTRY;
506
507         if (dt_try_as_dir(env, next)) {
508                 struct lu_ucred  *uc = lu_ucred_check(env);
509                 int ignore_quota;
510
511                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
512                 rc = next->do_index_ops->dio_insert(env, next,
513                                                     (struct dt_rec*)lf,
514                                                     (const struct dt_key *)name,
515                                                     handle, capa, ignore_quota);
516         } else {
517                 rc = -ENOTDIR;
518         }
519         RETURN(rc);
520 }
521
522 /* insert named index, add reference if isdir */
523 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
524                               const struct lu_fid *lf, const char *name, int is_dir,
525                               struct thandle *handle, struct lustre_capa *capa)
526 {
527         int               rc;
528         ENTRY;
529
530         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
531         if (rc == 0 && is_dir) {
532                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
533                 mdo_ref_add(env, pobj, handle);
534                 mdd_write_unlock(env, pobj);
535         }
536         RETURN(rc);
537 }
538
539 /* delete named index, drop reference if isdir */
540 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
541                               const char *name, int is_dir, struct thandle *handle,
542                               struct lustre_capa *capa)
543 {
544         int               rc;
545         ENTRY;
546
547         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
548         if (rc == 0 && is_dir) {
549                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
550                 mdo_ref_del(env, pobj, handle);
551                 mdd_write_unlock(env, pobj);
552         }
553
554         RETURN(rc);
555 }
556
557 int mdd_declare_changelog_store(const struct lu_env *env,
558                                 struct mdd_device *mdd,
559                                 const struct lu_name *fname,
560                                 struct thandle *handle)
561 {
562         struct obd_device               *obd = mdd2obd_dev(mdd);
563         struct llog_ctxt                *ctxt;
564         struct llog_changelog_rec       *rec;
565         struct lu_buf                   *buf;
566         int                              reclen;
567         int                              rc;
568
569         /* Not recording */
570         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
571                 return 0;
572
573         reclen = llog_data_len(sizeof(*rec) +
574                                (fname != NULL ? fname->ln_namelen : 0));
575         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
576         if (buf->lb_buf == NULL)
577                 return -ENOMEM;
578
579         rec = buf->lb_buf;
580         rec->cr_hdr.lrh_len = reclen;
581         rec->cr_hdr.lrh_type = CHANGELOG_REC;
582
583         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
584         if (ctxt == NULL)
585                 return -ENXIO;
586
587         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
588         llog_ctxt_put(ctxt);
589
590         return rc;
591 }
592
593 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
594                                            struct mdd_device *mdd,
595                                            const struct lu_name *tname,
596                                            const struct lu_name *sname,
597                                            struct thandle *handle)
598 {
599         struct obd_device               *obd = mdd2obd_dev(mdd);
600         struct llog_ctxt                *ctxt;
601         struct llog_changelog_ext_rec   *rec;
602         struct lu_buf                   *buf;
603         int                              reclen;
604         int                              rc;
605
606         /* Not recording */
607         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
608                 return 0;
609
610         reclen = llog_data_len(sizeof(*rec) +
611                                (tname != NULL ? tname->ln_namelen : 0) +
612                                (sname != NULL ? 1 + sname->ln_namelen : 0));
613         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
614         if (buf->lb_buf == NULL)
615                 return -ENOMEM;
616
617         rec = buf->lb_buf;
618         rec->cr_hdr.lrh_len = reclen;
619         rec->cr_hdr.lrh_type = CHANGELOG_REC;
620
621         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
622         if (ctxt == NULL)
623                 return -ENXIO;
624
625         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
626         llog_ctxt_put(ctxt);
627
628         return rc;
629 }
630
631 /** Add a changelog entry \a rec to the changelog llog
632  * \param mdd
633  * \param rec
634  * \param handle - currently ignored since llogs start their own transaction;
635  *                 this will hopefully be fixed in llog rewrite
636  * \retval 0 ok
637  */
638 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
639                         struct llog_changelog_rec *rec, struct thandle *th)
640 {
641         struct obd_device       *obd = mdd2obd_dev(mdd);
642         struct llog_ctxt        *ctxt;
643         int                      rc;
644
645         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
646         rec->cr_hdr.lrh_type = CHANGELOG_REC;
647         rec->cr.cr_time = cl_time();
648
649         spin_lock(&mdd->mdd_cl.mc_lock);
650         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
651          * but as long as the MDD transactions are ordered correctly for e.g.
652          * rename conflicts, I don't think this should matter. */
653         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
654         spin_unlock(&mdd->mdd_cl.mc_lock);
655
656         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
657         if (ctxt == NULL)
658                 return -ENXIO;
659
660         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
661         llog_ctxt_put(ctxt);
662         if (rc > 0)
663                 rc = 0;
664         return rc;
665 }
666
667 /** Add a changelog_ext entry \a rec to the changelog llog
668  * \param mdd
669  * \param rec
670  * \param handle - currently ignored since llogs start their own transaction;
671  *              this will hopefully be fixed in llog rewrite
672  * \retval 0 ok
673  */
674 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
675                             struct llog_changelog_ext_rec *rec,
676                             struct thandle *th)
677 {
678         struct obd_device       *obd = mdd2obd_dev(mdd);
679         struct llog_ctxt        *ctxt;
680         int                      rc;
681
682         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
683         /* llog_lvfs_write_rec sets the llog tail len */
684         rec->cr_hdr.lrh_type = CHANGELOG_REC;
685         rec->cr.cr_time = cl_time();
686
687         spin_lock(&mdd->mdd_cl.mc_lock);
688         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
689          * but as long as the MDD transactions are ordered correctly for e.g.
690          * rename conflicts, I don't think this should matter. */
691         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
692         spin_unlock(&mdd->mdd_cl.mc_lock);
693
694         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
695         if (ctxt == NULL)
696                 return -ENXIO;
697
698         /* nested journal transaction */
699         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
700         llog_ctxt_put(ctxt);
701         if (rc > 0)
702                 rc = 0;
703
704         return rc;
705 }
706
707 /** Store a namespace change changelog record
708  * If this fails, we must fail the whole transaction; we don't
709  * want the change to commit without the log entry.
710  * \param target - mdd_object of change
711  * \param parent - parent dir/object
712  * \param tname - target name string
713  * \param handle - transacion handle
714  */
715 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
716                            enum changelog_rec_type type, unsigned flags,
717                            struct mdd_object *target, struct mdd_object *parent,
718                            const struct lu_name *tname, struct thandle *handle)
719 {
720         struct llog_changelog_rec *rec;
721         struct lu_buf *buf;
722         int reclen;
723         int rc;
724         ENTRY;
725
726         /* Not recording */
727         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
728                 RETURN(0);
729         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
730                 RETURN(0);
731
732         LASSERT(target != NULL);
733         LASSERT(parent != NULL);
734         LASSERT(tname != NULL);
735         LASSERT(handle != NULL);
736
737         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
738         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
739         if (buf->lb_buf == NULL)
740                 RETURN(-ENOMEM);
741         rec = buf->lb_buf;
742
743         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
744         rec->cr.cr_type = (__u32)type;
745         rec->cr.cr_tfid = *mdo2fid(target);
746         rec->cr.cr_pfid = *mdo2fid(parent);
747         rec->cr.cr_namelen = tname->ln_namelen;
748         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
749
750         target->mod_cltime = cfs_time_current_64();
751
752         rc = mdd_changelog_store(env, mdd, rec, handle);
753         if (rc < 0) {
754                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
755                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
756                         PFID(&rec->cr.cr_pfid));
757                 RETURN(-EFAULT);
758         }
759
760         RETURN(0);
761 }
762
763
764 /** Store a namespace change changelog record
765  * If this fails, we must fail the whole transaction; we don't
766  * want the change to commit without the log entry.
767  * \param target - mdd_object of change
768  * \param tpfid - target parent dir/object fid
769  * \param sfid - source object fid
770  * \param spfid - source parent fid
771  * \param tname - target name string
772  * \param sname - source name string
773  * \param handle - transacion handle
774  */
775 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
776                                       struct mdd_device    *mdd,
777                                       enum changelog_rec_type type,
778                                       unsigned flags,
779                                       struct mdd_object    *target,
780                                       const struct lu_fid  *tpfid,
781                                       const struct lu_fid  *sfid,
782                                       const struct lu_fid  *spfid,
783                                       const struct lu_name *tname,
784                                       const struct lu_name *sname,
785                                       struct thandle *handle)
786 {
787         struct llog_changelog_ext_rec *rec;
788         struct lu_buf *buf;
789         int reclen;
790         int rc;
791         ENTRY;
792
793         /* Not recording */
794         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
795                 RETURN(0);
796         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
797                 RETURN(0);
798
799         LASSERT(sfid != NULL);
800         LASSERT(tpfid != NULL);
801         LASSERT(tname != NULL);
802         LASSERT(handle != NULL);
803
804         reclen = llog_data_len(sizeof(*rec) +
805                                sname != NULL ? 1 + sname->ln_namelen : 0);
806         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
807         if (buf->lb_buf == NULL)
808                 RETURN(-ENOMEM);
809         rec = buf->lb_buf;
810
811         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
812         rec->cr.cr_type = (__u32)type;
813         rec->cr.cr_pfid = *tpfid;
814         rec->cr.cr_sfid = *sfid;
815         rec->cr.cr_spfid = *spfid;
816         rec->cr.cr_namelen = tname->ln_namelen;
817         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
818         if (sname) {
819                 rec->cr.cr_name[tname->ln_namelen] = '\0';
820                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
821                         sname->ln_namelen);
822                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
823         }
824
825         if (likely(target != NULL)) {
826                 rec->cr.cr_tfid = *mdo2fid(target);
827                 target->mod_cltime = cfs_time_current_64();
828         } else {
829                 fid_zero(&rec->cr.cr_tfid);
830         }
831
832         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
833         if (rc < 0) {
834                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
835                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
836                 return -EFAULT;
837         }
838
839         return 0;
840 }
841
842 static int __mdd_links_add(const struct lu_env *env,
843                            struct mdd_object *mdd_obj,
844                            struct linkea_data *ldata,
845                            const struct lu_name *lname,
846                            const struct lu_fid *pfid,
847                            int first, int check)
848 {
849         int rc;
850
851         if (ldata->ld_leh == NULL) {
852                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
853                 if (rc) {
854                         if (rc != -ENODATA)
855                                 return rc;
856                         rc = linkea_data_new(ldata,
857                                              &mdd_env_info(env)->mti_link_buf);
858                         if (rc)
859                                 return rc;
860                 }
861         }
862
863         if (check) {
864                 rc = linkea_links_find(ldata, lname, pfid);
865                 if (rc && rc != -ENOENT)
866                         return rc;
867                 if (rc == 0)
868                         return -EEXIST;
869         }
870
871         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
872                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
873
874                 *tfid = *pfid;
875                 tfid->f_ver = ~0;
876                 linkea_add_buf(ldata, lname, tfid);
877         }
878
879         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
880                 linkea_add_buf(ldata, lname, pfid);
881
882         return linkea_add_buf(ldata, lname, pfid);
883 }
884
885 static int __mdd_links_del(const struct lu_env *env,
886                            struct mdd_object *mdd_obj,
887                            struct linkea_data *ldata,
888                            const struct lu_name *lname,
889                            const struct lu_fid *pfid)
890 {
891         int rc;
892
893         if (ldata->ld_leh == NULL) {
894                 rc = mdd_links_read(env, mdd_obj, ldata);
895                 if (rc)
896                         return rc;
897         }
898
899         rc = linkea_links_find(ldata, lname, pfid);
900         if (rc)
901                 return rc;
902
903         linkea_del_buf(ldata, lname);
904         return 0;
905 }
906
907 static int mdd_linkea_prepare(const struct lu_env *env,
908                               struct mdd_object *mdd_obj,
909                               const struct lu_fid *oldpfid,
910                               const struct lu_name *oldlname,
911                               const struct lu_fid *newpfid,
912                               const struct lu_name *newlname,
913                               int first, int check,
914                               struct linkea_data *ldata)
915 {
916         int rc = 0;
917         int rc2 = 0;
918         ENTRY;
919
920         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
921                 return 0;
922
923         LASSERT(oldpfid != NULL || newpfid != NULL);
924
925         if (mdd_obj->mod_flags & DEAD_OBJ) {
926                 /* Prevent linkea to be updated which is NOT necessary. */
927                 ldata->ld_reclen = 0;
928                 /* No more links, don't bother */
929                 RETURN(0);
930         }
931
932         if (oldpfid != NULL) {
933                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
934                 if (rc) {
935                         if ((check == 1) ||
936                             (rc != -ENODATA && rc != -ENOENT))
937                                 RETURN(rc);
938                         /* No changes done. */
939                         rc = 0;
940                 }
941         }
942
943         /* If renaming, add the new record */
944         if (newpfid != NULL) {
945                 /* even if the add fails, we still delete the out-of-date
946                  * old link */
947                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
948                                       first, check);
949         }
950
951         rc = rc != 0 ? rc : rc2;
952
953         RETURN(rc);
954 }
955
956 int mdd_links_rename(const struct lu_env *env,
957                      struct mdd_object *mdd_obj,
958                      const struct lu_fid *oldpfid,
959                      const struct lu_name *oldlname,
960                      const struct lu_fid *newpfid,
961                      const struct lu_name *newlname,
962                      struct thandle *handle,
963                      struct linkea_data *ldata,
964                      int first, int check)
965 {
966         int rc2 = 0;
967         int rc = 0;
968         ENTRY;
969
970         if (ldata == NULL) {
971                 ldata = &mdd_env_info(env)->mti_link_data;
972                 memset(ldata, 0, sizeof(*ldata));
973                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
974                                         newpfid, newlname, first, check,
975                                         ldata);
976                 if (rc != 0)
977                         GOTO(out, rc);
978         }
979
980         if (ldata->ld_reclen != 0)
981                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
982         EXIT;
983 out:
984         if (rc == 0)
985                 rc = rc2;
986         if (rc) {
987                 int error = 1;
988                 if (rc == -EOVERFLOW || rc == -ENOSPC)
989                         error = 0;
990                 if (oldpfid == NULL)
991                         CDEBUG(error ? D_ERROR : D_OTHER,
992                                "link_ea add '%.*s' failed %d "DFID"\n",
993                                newlname->ln_namelen, newlname->ln_name,
994                                rc, PFID(mdd_object_fid(mdd_obj)));
995                 else if (newpfid == NULL)
996                         CDEBUG(error ? D_ERROR : D_OTHER,
997                                "link_ea del '%.*s' failed %d "DFID"\n",
998                                oldlname->ln_namelen, oldlname->ln_name,
999                                rc, PFID(mdd_object_fid(mdd_obj)));
1000                 else
1001                         CDEBUG(error ? D_ERROR : D_OTHER,
1002                                "link_ea rename '%.*s'->'%.*s' failed %d "
1003                                DFID"\n",
1004                                oldlname->ln_namelen, oldlname->ln_name,
1005                                newlname->ln_namelen, newlname->ln_name,
1006                                rc, PFID(mdd_object_fid(mdd_obj)));
1007         }
1008
1009         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1010                 /* if we vmalloced a large buffer drop it */
1011                 lu_buf_free(ldata->ld_buf);
1012
1013         return rc;
1014 }
1015
1016 static inline int mdd_links_add(const struct lu_env *env,
1017                                 struct mdd_object *mdd_obj,
1018                                 const struct lu_fid *pfid,
1019                                 const struct lu_name *lname,
1020                                 struct thandle *handle,
1021                                 struct linkea_data *data, int first)
1022 {
1023         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1024                                 pfid, lname, handle, data, first, 0);
1025 }
1026
1027 static inline int mdd_links_del(const struct lu_env *env,
1028                                 struct mdd_object *mdd_obj,
1029                                 const struct lu_fid *pfid,
1030                                 const struct lu_name *lname,
1031                                 struct thandle *handle)
1032 {
1033         return mdd_links_rename(env, mdd_obj, pfid, lname,
1034                                 NULL, NULL, handle, NULL, 0, 0);
1035 }
1036
1037 /** Read the link EA into a temp buffer.
1038  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1039  * A pointer to the buffer is stored in \a ldata::ld_buf.
1040  *
1041  * \retval 0 or error
1042  */
1043 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1044                    struct linkea_data *ldata)
1045 {
1046         int rc;
1047
1048         /* First try a small buf */
1049         LASSERT(env != NULL);
1050         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1051                                                PAGE_CACHE_SIZE);
1052         if (ldata->ld_buf->lb_buf == NULL)
1053                 return -ENOMEM;
1054
1055         if (!mdd_object_exists(mdd_obj))
1056                 return -ENODATA;
1057
1058         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1059                           BYPASS_CAPA);
1060         if (rc == -ERANGE) {
1061                 /* Buf was too small, figure out what we need. */
1062                 lu_buf_free(ldata->ld_buf);
1063                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1064                                    XATTR_NAME_LINK, BYPASS_CAPA);
1065                 if (rc < 0)
1066                         return rc;
1067                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1068                 if (ldata->ld_buf->lb_buf == NULL)
1069                         return -ENOMEM;
1070                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1071                                   XATTR_NAME_LINK, BYPASS_CAPA);
1072         }
1073         if (rc < 0)
1074                 return rc;
1075
1076         return linkea_init(ldata);
1077 }
1078
1079 /** Read the link EA into a temp buffer.
1080  * Uses the name_buf since it is generally large.
1081  * \retval IS_ERR err
1082  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1083  */
1084 struct lu_buf *mdd_links_get(const struct lu_env *env,
1085                              struct mdd_object *mdd_obj)
1086 {
1087         struct linkea_data ldata = { 0 };
1088         int rc;
1089
1090         rc = mdd_links_read(env, mdd_obj, &ldata);
1091         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1092 }
1093
1094 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1095                     struct linkea_data *ldata, struct thandle *handle)
1096 {
1097         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1098                                                      ldata->ld_leh->leh_len);
1099         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1100                              mdd_object_capa(env, mdd_obj));
1101 }
1102
1103 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1104                           struct thandle *handle, struct linkea_data *ldata)
1105 {
1106         int     rc;
1107         int     ea_len;
1108         void    *linkea;
1109
1110         if (ldata != NULL && ldata->ld_lee != NULL) {
1111                 ea_len = ldata->ld_leh->leh_len;
1112                 linkea = ldata->ld_buf->lb_buf;
1113         } else {
1114                 ea_len = DEFAULT_LINKEA_SIZE;
1115                 linkea = NULL;
1116         }
1117
1118         /* XXX: max size? */
1119         rc = mdo_declare_xattr_set(env, mdd_obj,
1120                                    mdd_buf_get_const(env, linkea, ea_len),
1121                                    XATTR_NAME_LINK, 0, handle);
1122         return rc;
1123 }
1124
1125 static inline int mdd_declare_links_del(const struct lu_env *env,
1126                                         struct mdd_object *c,
1127                                         struct thandle *handle)
1128 {
1129         int rc = 0;
1130
1131         /* For directory, the linkEA will be removed together
1132          * with the object. */
1133         if (!S_ISDIR(mdd_object_type(c)))
1134                 rc = mdd_declare_links_add(env, c, handle, NULL);
1135
1136         return rc;
1137 }
1138
1139 static int mdd_declare_link(const struct lu_env *env,
1140                             struct mdd_device *mdd,
1141                             struct mdd_object *p,
1142                             struct mdd_object *c,
1143                             const struct lu_name *name,
1144                             struct thandle *handle,
1145                             struct lu_attr *la,
1146                             struct linkea_data *data)
1147 {
1148         int rc;
1149
1150         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
1151         if (rc)
1152                 return rc;
1153
1154         rc = mdo_declare_ref_add(env, c, handle);
1155         if (rc)
1156                 return rc;
1157
1158         la->la_valid = LA_CTIME | LA_MTIME;
1159         rc = mdo_declare_attr_set(env, p, la, handle);
1160         if (rc != 0)
1161                 return rc;
1162
1163         la->la_valid = LA_CTIME;
1164         rc = mdo_declare_attr_set(env, c, la, handle);
1165         if (rc)
1166                 return rc;
1167
1168         rc = mdd_declare_links_add(env, c, handle, data);
1169         if (rc)
1170                 return rc;
1171
1172         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1173
1174         return rc;
1175 }
1176
1177 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1178                     struct md_object *src_obj, const struct lu_name *lname,
1179                     struct md_attr *ma)
1180 {
1181         const char *name = lname->ln_name;
1182         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1183         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1184         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1185         struct mdd_device *mdd = mdo2mdd(src_obj);
1186         struct dynlock_handle *dlh;
1187         struct thandle *handle;
1188         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1189         int rc;
1190         ENTRY;
1191
1192         handle = mdd_trans_create(env, mdd);
1193         if (IS_ERR(handle))
1194                 GOTO(out_pending, rc = PTR_ERR(handle));
1195
1196         memset(ldata, 0, sizeof(*ldata));
1197
1198         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1199         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1200
1201         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1202                               la, ldata);
1203         if (rc)
1204                 GOTO(stop, rc);
1205
1206         rc = mdd_trans_start(env, mdd, handle);
1207         if (rc)
1208                 GOTO(stop, rc);
1209
1210         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
1211         if (dlh == NULL)
1212                 GOTO(out_trans, rc = -ENOMEM);
1213         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1214
1215         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
1216         if (rc)
1217                 GOTO(out_unlock, rc);
1218
1219         rc = mdo_ref_add(env, mdd_sobj, handle);
1220         if (rc)
1221                 GOTO(out_unlock, rc);
1222
1223
1224         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1225                                      name, handle,
1226                                      mdd_object_capa(env, mdd_tobj));
1227         if (rc != 0) {
1228                 mdo_ref_del(env, mdd_sobj, handle);
1229                 GOTO(out_unlock, rc);
1230         }
1231
1232         la->la_valid = LA_CTIME | LA_MTIME;
1233         rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1234         if (rc)
1235                 GOTO(out_unlock, rc);
1236
1237         la->la_valid = LA_CTIME;
1238         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
1239         if (rc == 0) {
1240                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1241                                         mdo2fid(mdd_tobj), lname, 0, 0,
1242                                         ldata);
1243                 if (rc == 0)
1244                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1245                                       lname, handle, ldata, 0);
1246                 /* The failure of links_add should not cause the link
1247                  * failure, reset rc here */
1248                 rc = 0;
1249         }
1250         EXIT;
1251 out_unlock:
1252         mdd_write_unlock(env, mdd_sobj);
1253         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
1254 out_trans:
1255         if (rc == 0)
1256                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1257                                             mdd_tobj, lname, handle);
1258 stop:
1259         mdd_trans_stop(env, mdd, rc, handle);
1260
1261         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1262                 /* if we vmalloced a large buffer drop it */
1263                 lu_buf_free(ldata->ld_buf);
1264 out_pending:
1265         return rc;
1266 }
1267
1268 int mdd_declare_finish_unlink(const struct lu_env *env,
1269                               struct mdd_object *obj,
1270                               struct md_attr *ma,
1271                               struct thandle *handle)
1272 {
1273         int     rc;
1274
1275         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1276         if (rc)
1277                 return rc;
1278
1279         return mdo_declare_destroy(env, obj, handle);
1280 }
1281
1282 /* caller should take a lock before calling */
1283 int mdd_finish_unlink(const struct lu_env *env,
1284                       struct mdd_object *obj, struct md_attr *ma,
1285                       struct thandle *th)
1286 {
1287         int rc = 0;
1288         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1289         ENTRY;
1290
1291         LASSERT(mdd_write_locked(env, obj) != 0);
1292
1293         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1294                 obj->mod_flags |= DEAD_OBJ;
1295
1296                 /* add new orphan and the object
1297                  * will be deleted during mdd_close() */
1298                 if (obj->mod_count) {
1299                         rc = __mdd_orphan_add(env, obj, th);
1300                         if (rc == 0)
1301                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1302                                         "orphan list, open count = %d\n",
1303                                         PFID(mdd_object_fid(obj)),
1304                                         obj->mod_count);
1305                         else
1306                                 CERROR("Object "DFID" fail to be an orphan, "
1307                                        "open count = %d, maybe cause failed "
1308                                        "open replay\n",
1309                                         PFID(mdd_object_fid(obj)),
1310                                         obj->mod_count);
1311                 } else {
1312                         rc = mdo_destroy(env, obj, th);
1313                 }
1314         }
1315
1316         RETURN(rc);
1317 }
1318
1319 /*
1320  * pobj maybe NULL
1321  * has mdd_write_lock on cobj already, but not on pobj yet
1322  */
1323 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1324                             struct mdd_object *cobj, struct lu_attr *cattr)
1325 {
1326         int rc;
1327         ENTRY;
1328
1329         rc = mdd_may_delete(env, pobj, cobj, cattr, NULL, 1, 1);
1330
1331         RETURN(rc);
1332 }
1333
1334 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1335                               struct mdd_object *p, struct mdd_object *c,
1336                               const struct lu_name *name, struct md_attr *ma,
1337                               struct thandle *handle, int no_name)
1338 {
1339         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1340         int rc;
1341
1342         if (likely(no_name == 0)) {
1343                 rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1344                 if (rc)
1345                         return rc;
1346         }
1347
1348         rc = mdo_declare_ref_del(env, p, handle);
1349         if (rc)
1350                 return rc;
1351
1352         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1353         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1354         la->la_valid = LA_CTIME | LA_MTIME;
1355         rc = mdo_declare_attr_set(env, p, la, handle);
1356         if (rc)
1357                 return rc;
1358
1359         if (c != NULL) {
1360                 rc = mdo_declare_ref_del(env, c, handle);
1361                 if (rc)
1362                         return rc;
1363
1364                 rc = mdo_declare_ref_del(env, c, handle);
1365                 if (rc)
1366                         return rc;
1367
1368                 la->la_valid = LA_CTIME;
1369                 rc = mdo_declare_attr_set(env, c, la, handle);
1370                 if (rc)
1371                         return rc;
1372
1373                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1374                 if (rc)
1375                         return rc;
1376
1377                 rc = mdd_declare_links_del(env, c, handle);
1378                 if (rc != 0)
1379                         return rc;
1380
1381                 /* FIXME: need changelog for remove entry */
1382                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1383         }
1384
1385         return rc;
1386 }
1387
1388 /*
1389  * test if a file has an HSM archive
1390  * if HSM attributes are not found in ma update them from
1391  * HSM xattr
1392  */
1393 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1394                                    struct mdd_object *obj,
1395                                    struct md_attr *ma)
1396 {
1397         ENTRY;
1398
1399         if (!(ma->ma_valid & MA_HSM)) {
1400                 /* no HSM MD provided, read xattr */
1401                 struct lu_buf   *hsm_buf;
1402                 const size_t     buflen = sizeof(struct hsm_attrs);
1403                 int              rc;
1404
1405                 hsm_buf = mdd_buf_get(env, NULL, 0);
1406                 lu_buf_alloc(hsm_buf, buflen);
1407                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM,
1408                                    mdd_object_capa(env, obj));
1409                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1410                 lu_buf_free(hsm_buf);
1411                 if (rc < 0)
1412                         RETURN(false);
1413
1414                 ma->ma_valid = MA_HSM;
1415         }
1416         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1417                 RETURN(true);
1418         RETURN(false);
1419 }
1420
1421 /**
1422  * Delete name entry and the object.
1423  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1424  * does not exist for this object, and it could only happen during resending
1425  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1426  * is also needed in this case(needed by changelog), so we have to add another
1427  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1428  * the ENOENT failure should be able to be fixed by redo mechanism.
1429  */
1430 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1431                       struct md_object *cobj, const struct lu_name *lname,
1432                       struct md_attr *ma, int no_name)
1433 {
1434         const char *name = lname->ln_name;
1435         struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
1436         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1437         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1438         struct mdd_object *mdd_cobj = NULL;
1439         struct mdd_device *mdd = mdo2mdd(pobj);
1440         struct dynlock_handle *dlh;
1441         struct thandle    *handle;
1442         int rc, is_dir = 0;
1443         ENTRY;
1444
1445         /* cobj == NULL means only delete name entry */
1446         if (likely(cobj != NULL)) {
1447                 mdd_cobj = md2mdd_obj(cobj);
1448                 if (mdd_object_exists(mdd_cobj) == 0)
1449                         RETURN(-ENOENT);
1450                 /* currently it is assume, it could only delete
1451                  * name entry of remote directory */
1452                 is_dir = 1;
1453         }
1454
1455         handle = mdd_trans_create(env, mdd);
1456         if (IS_ERR(handle))
1457                 RETURN(PTR_ERR(handle));
1458
1459         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1460                                 lname, ma, handle, no_name);
1461         if (rc)
1462                 GOTO(stop, rc);
1463
1464         rc = mdd_trans_start(env, mdd, handle);
1465         if (rc)
1466                 GOTO(stop, rc);
1467
1468         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1469         if (dlh == NULL)
1470                 GOTO(stop, rc = -ENOMEM);
1471
1472         if (likely(mdd_cobj != NULL)) {
1473                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1474
1475                 /* fetch cattr */
1476                 rc = mdd_la_get(env, mdd_cobj, cattr,
1477                                 mdd_object_capa(env, mdd_cobj));
1478                 if (rc)
1479                         GOTO(cleanup, rc);
1480
1481                 is_dir = S_ISDIR(cattr->la_mode);
1482
1483         }
1484
1485         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
1486         if (rc)
1487                 GOTO(cleanup, rc);
1488
1489         if (likely(no_name == 0)) {
1490                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1491                                         mdd_object_capa(env, mdd_pobj));
1492                 if (rc)
1493                         GOTO(cleanup, rc);
1494         }
1495
1496         if (likely(mdd_cobj != NULL)) {
1497                 rc = mdo_ref_del(env, mdd_cobj, handle);
1498                 if (rc != 0) {
1499                         __mdd_index_insert_only(env, mdd_pobj,
1500                                                 mdo2fid(mdd_cobj),
1501                                                 name, handle,
1502                                                 mdd_object_capa(env, mdd_pobj));
1503                         GOTO(cleanup, rc);
1504                 }
1505
1506                 if (is_dir)
1507                         /* unlink dot */
1508                         mdo_ref_del(env, mdd_cobj, handle);
1509
1510                 /* fetch updated nlink */
1511                 rc = mdd_la_get(env, mdd_cobj, cattr,
1512                                 mdd_object_capa(env, mdd_cobj));
1513                 if (rc)
1514                         GOTO(cleanup, rc);
1515         }
1516
1517         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1518         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1519
1520         la->la_valid = LA_CTIME | LA_MTIME;
1521         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1522         if (rc)
1523                 GOTO(cleanup, rc);
1524
1525         /* Enough for only unlink the entry */
1526         if (unlikely(mdd_cobj == NULL)) {
1527                 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1528                 GOTO(stop, rc);
1529         }
1530
1531         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1532                 /* update ctime of an unlinked file only if it is still
1533                  * opened or a link still exists */
1534                 la->la_valid = LA_CTIME;
1535                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1536                 if (rc)
1537                         GOTO(cleanup, rc);
1538         }
1539
1540         /* XXX: this transfer to ma will be removed with LOD/OSP */
1541         ma->ma_attr = *cattr;
1542         ma->ma_valid |= MA_INODE;
1543         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1544
1545         /* fetch updated nlink */
1546         if (rc == 0)
1547                 rc = mdd_la_get(env, mdd_cobj, cattr,
1548                                 mdd_object_capa(env, mdd_cobj));
1549
1550         if (!is_dir)
1551                 /* old files may not have link ea; ignore errors */
1552                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1553
1554         /* if object is removed then we can't get its attrs, use last get */
1555         if (cattr->la_nlink == 0) {
1556                 ma->ma_attr = *cattr;
1557                 ma->ma_valid |= MA_INODE;
1558         }
1559         EXIT;
1560 cleanup:
1561         mdd_write_unlock(env, mdd_cobj);
1562         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1563         if (rc == 0) {
1564                 int cl_flags = 0;
1565
1566                 if (cattr->la_nlink == 0) {
1567                         cl_flags |= CLF_UNLINK_LAST;
1568                         /* search for an existing archive */
1569                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1570                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1571                 }
1572
1573                 rc = mdd_changelog_ns_store(env, mdd,
1574                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1575                         mdd_cobj, mdd_pobj, lname, handle);
1576         }
1577
1578 stop:
1579         mdd_trans_stop(env, mdd, rc, handle);
1580
1581         return rc;
1582 }
1583
1584 /*
1585  * The permission has been checked when obj created, no need check again.
1586  */
1587 static int mdd_cd_sanity_check(const struct lu_env *env,
1588                                struct mdd_object *obj)
1589 {
1590         ENTRY;
1591
1592         /* EEXIST check */
1593         if (!obj || mdd_is_dead_obj(obj))
1594                 RETURN(-ENOENT);
1595
1596         RETURN(0);
1597
1598 }
1599
1600 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1601                            struct md_object *cobj, const struct md_op_spec *spec,
1602                            struct md_attr *ma)
1603 {
1604         struct mdd_device *mdd = mdo2mdd(cobj);
1605         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1606         struct mdd_object *son = md2mdd_obj(cobj);
1607         struct thandle    *handle;
1608         const struct lu_buf *buf;
1609         struct lu_attr    *attr = &mdd_env_info(env)->mti_cattr;
1610         int                rc;
1611         ENTRY;
1612
1613         rc = mdd_cd_sanity_check(env, son);
1614         if (rc)
1615                 RETURN(rc);
1616
1617         if (!md_should_create(spec->sp_cr_flags))
1618                 RETURN(0);
1619
1620         /*
1621          * there are following use cases for this function:
1622          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1623          *    striping can be specified or not
1624          * 2) CMD?
1625          */
1626         rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
1627         if (rc)
1628                 RETURN(rc);
1629
1630         /* calling ->ah_make_hint() is used to transfer information from parent */
1631         mdd_object_make_hint(env, mdd_pobj, son, attr);
1632
1633         handle = mdd_trans_create(env, mdd);
1634         if (IS_ERR(handle))
1635                 GOTO(out_free, rc = PTR_ERR(handle));
1636
1637         /*
1638          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1639          * Should this be fixed?
1640          */
1641         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
1642                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1643                spec->sp_cr_flags, spec->no_create);
1644
1645         if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
1646                 /* replay case or lfs setstripe */
1647                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1648                                         spec->u.sp_ea.eadatalen);
1649         } else {
1650                 buf = &LU_BUF_NULL;
1651         }
1652
1653         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1654                                   XATTR_NAME_LOV, 0, handle);
1655         if (rc)
1656                 GOTO(stop, rc);
1657
1658         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1659         if (rc)
1660                 GOTO(stop, rc);
1661
1662         rc = mdd_trans_start(env, mdd, handle);
1663         if (rc)
1664                 GOTO(stop, rc);
1665
1666         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1667                           0, handle, mdd_object_capa(env, son));
1668
1669         if (rc)
1670                 GOTO(stop, rc);
1671
1672         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1673
1674 stop:
1675         mdd_trans_stop(env, mdd, rc, handle);
1676 out_free:
1677         RETURN(rc);
1678 }
1679
1680 /* Get fid from name and parent */
1681 static int
1682 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1683              const struct lu_name *lname, struct lu_fid* fid, int mask)
1684 {
1685         const char          *name = lname->ln_name;
1686         const struct dt_key *key = (const struct dt_key *)name;
1687         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1688         struct mdd_device   *m = mdo2mdd(pobj);
1689         struct dt_object    *dir = mdd_object_child(mdd_obj);
1690         int rc;
1691         ENTRY;
1692
1693         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1694                 RETURN(-ESTALE);
1695
1696         if (mdd_object_remote(mdd_obj)) {
1697                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
1698                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
1699         } else if (!mdd_object_exists(mdd_obj)) {
1700                 RETURN(-ESTALE);
1701         }
1702
1703         /* The common filename length check. */
1704         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1705                 RETURN(-ENAMETOOLONG);
1706
1707         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1708                                             MOR_TGT_PARENT);
1709         if (rc)
1710                 RETURN(rc);
1711
1712         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1713                    dt_try_as_dir(env, dir))) {
1714
1715                 rc = dir->do_index_ops->dio_lookup(env, dir,
1716                                                  (struct dt_rec *)fid, key,
1717                                                  mdd_object_capa(env, mdd_obj));
1718                 if (rc > 0)
1719                         rc = 0;
1720                 else if (rc == 0)
1721                         rc = -ENOENT;
1722         } else
1723                 rc = -ENOTDIR;
1724
1725         RETURN(rc);
1726 }
1727
1728 static int mdd_declare_object_initialize(const struct lu_env *env,
1729                                          struct mdd_object *parent,
1730                                          struct mdd_object *child,
1731                                          struct lu_attr *attr,
1732                                          struct thandle *handle,
1733                                          struct linkea_data *ldata)
1734 {
1735         int rc;
1736         ENTRY;
1737
1738         /*
1739          * inode mode has been set in creation time, and it's based on umask,
1740          * la_mode and acl, don't set here again! (which will go wrong
1741          * because below function doesn't consider umask).
1742          * I'd suggest set all object attributes in creation time, see above.
1743          */
1744         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1745         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1746         rc = mdo_declare_attr_set(env, child, attr, handle);
1747         attr->la_valid |= LA_MODE | LA_TYPE;
1748         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1749                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1750                                               dot, handle);
1751                 if (rc == 0)
1752                         rc = mdo_declare_ref_add(env, child, handle);
1753
1754                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1755                                               dotdot, handle);
1756         }
1757
1758         if (rc == 0)
1759                 mdd_declare_links_add(env, child, handle, ldata);
1760
1761         RETURN(rc);
1762 }
1763
1764 static int mdd_object_initialize(const struct lu_env *env,
1765                                  const struct lu_fid *pfid,
1766                                  const struct lu_name *lname,
1767                                  struct mdd_object *child,
1768                                  struct lu_attr *attr, struct thandle *handle,
1769                                  const struct md_op_spec *spec,
1770                                  struct linkea_data *ldata)
1771 {
1772         int rc;
1773         ENTRY;
1774
1775         /*
1776          * Update attributes for child.
1777          *
1778          * FIXME:
1779          *  (1) the valid bits should be converted between Lustre and Linux;
1780          *  (2) maybe, the child attributes should be set in OSD when creation.
1781          */
1782
1783         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1784         /* arguments are supposed to stay the same */
1785         if (S_ISDIR(attr->la_mode)) {
1786                 /* Add "." and ".." for newly created dir */
1787                 mdo_ref_add(env, child, handle);
1788                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1789                                              dot, handle, BYPASS_CAPA);
1790                 if (rc == 0)
1791                         rc = __mdd_index_insert_only(env, child, pfid,
1792                                                      dotdot, handle,
1793                                                      BYPASS_CAPA);
1794                 if (rc != 0)
1795                         mdo_ref_del(env, child, handle);
1796         }
1797
1798         if (rc == 0)
1799                 mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
1800
1801         RETURN(rc);
1802 }
1803
1804 /* has not lock on pobj yet */
1805 static int mdd_create_sanity_check(const struct lu_env *env,
1806                                    struct md_object *pobj,
1807                                    struct lu_attr *pattr,
1808                                    const struct lu_name *lname,
1809                                    struct lu_attr *cattr,
1810                                    struct md_op_spec *spec)
1811 {
1812         struct mdd_thread_info *info = mdd_env_info(env);
1813         struct lu_fid     *fid       = &info->mti_fid;
1814         struct mdd_object *obj       = md2mdd_obj(pobj);
1815         struct mdd_device *m         = mdo2mdd(pobj);
1816         int rc;
1817         ENTRY;
1818
1819         /* EEXIST check */
1820         if (mdd_is_dead_obj(obj))
1821                 RETURN(-ENOENT);
1822
1823         /*
1824          * In some cases this lookup is not needed - we know before if name
1825          * exists or not because MDT performs lookup for it.
1826          * name length check is done in lookup.
1827          */
1828         if (spec->sp_cr_lookup) {
1829                 /*
1830                  * Check if the name already exist, though it will be checked in
1831                  * _index_insert also, for avoiding rolling back if exists
1832                  * _index_insert.
1833                  */
1834                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1835                                          MAY_WRITE | MAY_EXEC);
1836                 if (rc != -ENOENT)
1837                         RETURN(rc ? : -EEXIST);
1838         } else {
1839                 /*
1840                  * Check WRITE permission for the parent.
1841                  * EXEC permission have been checked
1842                  * when lookup before create already.
1843                  */
1844                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1845                                                     MOR_TGT_PARENT);
1846                 if (rc)
1847                         RETURN(rc);
1848         }
1849
1850         /* sgid check */
1851         if (pattr->la_mode & S_ISGID) {
1852                 cattr->la_gid = pattr->la_gid;
1853                 if (S_ISDIR(cattr->la_mode)) {
1854                         cattr->la_mode |= S_ISGID;
1855                         cattr->la_valid |= LA_MODE;
1856                 }
1857         }
1858
1859         switch (cattr->la_mode & S_IFMT) {
1860         case S_IFLNK: {
1861                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1862
1863                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1864                         RETURN(-ENAMETOOLONG);
1865                 else
1866                         RETURN(0);
1867         }
1868         case S_IFDIR:
1869         case S_IFREG:
1870         case S_IFCHR:
1871         case S_IFBLK:
1872         case S_IFIFO:
1873         case S_IFSOCK:
1874                 rc = 0;
1875                 break;
1876         default:
1877                 rc = -EINVAL;
1878                 break;
1879         }
1880         RETURN(rc);
1881 }
1882
1883 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1884                               struct mdd_object *p, struct mdd_object *c,
1885                               const struct lu_name *name,
1886                               struct lu_attr *attr,
1887                               struct thandle *handle,
1888                               const struct md_op_spec *spec,
1889                               struct linkea_data *ldata,
1890                               struct lu_buf *def_acl_buf,
1891                               struct lu_buf *acl_buf)
1892 {
1893         int rc;
1894
1895         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1896         if (rc)
1897                 GOTO(out, rc);
1898
1899 #ifdef CONFIG_FS_POSIX_ACL
1900         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
1901                 /* if dir, then can inherit default ACl */
1902                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
1903                                            XATTR_NAME_ACL_DEFAULT,
1904                                            0, handle);
1905                 if (rc)
1906                         GOTO(out, rc);
1907         }
1908
1909         if (acl_buf->lb_len > 0) {
1910                 rc = mdo_declare_attr_set(env, c, attr, handle);
1911                 if (rc)
1912                         GOTO(out, rc);
1913
1914                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1915                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1916                 if (rc)
1917                         GOTO(out, rc);
1918         }
1919 #endif
1920
1921         if (S_ISDIR(attr->la_mode)) {
1922                 rc = mdo_declare_ref_add(env, p, handle);
1923                 if (rc)
1924                         GOTO(out, rc);
1925         }
1926
1927         rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
1928         if (rc)
1929                 GOTO(out, rc);
1930
1931         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1932                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1933         else
1934                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1935                                               name->ln_name, handle);
1936         if (rc)
1937                 GOTO(out, rc);
1938
1939         /* replay case, create LOV EA from client data */
1940         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1941                 const struct lu_buf *buf;
1942
1943                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1944                                         spec->u.sp_ea.eadatalen);
1945                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1946                                            0, handle);
1947                 if (rc)
1948                         GOTO(out, rc);
1949         }
1950
1951         if (S_ISLNK(attr->la_mode)) {
1952                 rc = dt_declare_record_write(env, mdd_object_child(c),
1953                                              strlen(spec->u.sp_symname), 0,
1954                                              handle);
1955                 if (rc)
1956                         GOTO(out, rc);
1957         }
1958
1959         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1960                 rc = mdo_declare_attr_set(env, p, attr, handle);
1961                 if (rc)
1962                         return rc;
1963         }
1964
1965         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1966         if (rc)
1967                 return rc;
1968
1969 out:
1970         return rc;
1971 }
1972
1973 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1974                         struct lu_attr *la, struct lu_buf *def_acl_buf,
1975                         struct lu_buf *acl_buf)
1976 {
1977         int     rc;
1978         ENTRY;
1979
1980         if (S_ISLNK(la->la_mode)) {
1981                 acl_buf->lb_len = 0;
1982                 def_acl_buf->lb_len = 0;
1983                 RETURN(0);
1984         }
1985
1986         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
1987         rc = mdo_xattr_get(env, pobj, def_acl_buf,
1988                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
1989         mdd_read_unlock(env, pobj);
1990         if (rc > 0) {
1991                 /* If there are default ACL, fix mode/ACL by default ACL */
1992                 def_acl_buf->lb_len = rc;
1993                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
1994                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
1995                 acl_buf->lb_len = rc;
1996                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
1997                 if (rc < 0)
1998                         RETURN(rc);
1999         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2000                 /* If there are no default ACL, fix mode by mask */
2001                 struct lu_ucred *uc = lu_ucred(env);
2002
2003                 /* The create triggered by MDT internal events, such as
2004                  * LFSCK reset, will not contain valid "uc". */
2005                 if (unlikely(uc != NULL))
2006                         la->la_mode &= ~uc->uc_umask;
2007                 rc = 0;
2008                 acl_buf->lb_len = 0;
2009                 def_acl_buf->lb_len = 0;
2010         }
2011
2012         RETURN(rc);
2013 }
2014
2015 /*
2016  * Create object and insert it into namespace.
2017  */
2018 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2019                       const struct lu_name *lname, struct md_object *child,
2020                       struct md_op_spec *spec, struct md_attr* ma)
2021 {
2022         struct mdd_thread_info  *info = mdd_env_info(env);
2023         struct lu_attr          *la = &info->mti_la_for_fix;
2024         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2025         struct mdd_object       *son = md2mdd_obj(child);
2026         struct mdd_device       *mdd = mdo2mdd(pobj);
2027         struct lu_attr          *attr = &ma->ma_attr;
2028         struct thandle          *handle;
2029         struct lu_attr          *pattr = &info->mti_pattr;
2030         struct lu_buf           acl_buf;
2031         struct lu_buf           def_acl_buf;
2032         struct linkea_data      *ldata = &info->mti_link_data;
2033         struct dynlock_handle   *dlh;
2034         const char              *name = lname->ln_name;
2035         int                      rc, created = 0, initialized = 0, inserted = 0;
2036         ENTRY;
2037
2038         /*
2039          * Two operations have to be performed:
2040          *
2041          *  - an allocation of a new object (->do_create()), and
2042          *
2043          *  - an insertion into a parent index (->dio_insert()).
2044          *
2045          * Due to locking, operation order is not important, when both are
2046          * successful, *but* error handling cases are quite different:
2047          *
2048          *  - if insertion is done first, and following object creation fails,
2049          *  insertion has to be rolled back, but this operation might fail
2050          *  also leaving us with dangling index entry.
2051          *
2052          *  - if creation is done first, is has to be undone if insertion
2053          *  fails, leaving us with leaked space, which is neither good, nor
2054          *  fatal.
2055          *
2056          * It seems that creation-first is simplest solution, but it is
2057          * sub-optimal in the frequent
2058          *
2059          *         $ mkdir foo
2060          *         $ mkdir foo
2061          *
2062          * case, because second mkdir is bound to create object, only to
2063          * destroy it immediately.
2064          *
2065          * To avoid this follow local file systems that do double lookup:
2066          *
2067          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2068          *
2069          *     1. create            (mdd_object_create_internal())
2070          *
2071          *     2. insert            (__mdd_index_insert(), lookup again)
2072          */
2073
2074         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2075         if (rc != 0)
2076                 RETURN(rc);
2077
2078         /* Sanity checks before big job. */
2079         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2080         if (rc)
2081                 RETURN(rc);
2082
2083         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2084                 GOTO(out_free, rc = -EINPROGRESS);
2085
2086         acl_buf.lb_buf = info->mti_xattr_buf;
2087         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2088         def_acl_buf.lb_buf = info->mti_key;
2089         def_acl_buf.lb_len = sizeof(info->mti_key);
2090         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2091         if (rc < 0)
2092                 GOTO(out_free, rc);
2093
2094         mdd_object_make_hint(env, mdd_pobj, son, attr);
2095
2096         handle = mdd_trans_create(env, mdd);
2097         if (IS_ERR(handle))
2098                 GOTO(out_free, rc = PTR_ERR(handle));
2099
2100         memset(ldata, 0, sizeof(*ldata));
2101         mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
2102                            lname, 1, 0, ldata);
2103
2104         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2105                                 handle, spec, ldata, &def_acl_buf, &acl_buf);
2106         if (rc)
2107                 GOTO(out_stop, rc);
2108
2109         rc = mdd_trans_start(env, mdd, handle);
2110         if (rc)
2111                 GOTO(out_stop, rc);
2112
2113         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
2114         if (dlh == NULL)
2115                 GOTO(out_trans, rc = -ENOMEM);
2116
2117         mdd_write_lock(env, son, MOR_TGT_CHILD);
2118         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
2119         if (rc) {
2120                 mdd_write_unlock(env, son);
2121                 GOTO(cleanup, rc);
2122         }
2123
2124         created = 1;
2125
2126 #ifdef CONFIG_FS_POSIX_ACL
2127         if (def_acl_buf.lb_len > 0 && S_ISDIR(attr->la_mode)) {
2128                 /* set default acl */
2129                 rc = mdo_xattr_set(env, son, &def_acl_buf,
2130                                    XATTR_NAME_ACL_DEFAULT, 0,
2131                                    handle, BYPASS_CAPA);
2132                 if (rc) {
2133                         mdd_write_unlock(env, son);
2134                         GOTO(cleanup, rc);
2135                 }
2136         }
2137         /* set its own acl */
2138         if (acl_buf.lb_len > 0) {
2139                 rc = mdo_xattr_set(env, son, &acl_buf,
2140                                    XATTR_NAME_ACL_ACCESS,
2141                                    0, handle, BYPASS_CAPA);
2142                 if (rc) {
2143                         mdd_write_unlock(env, son);
2144                         GOTO(cleanup, rc);
2145                 }
2146         }
2147 #endif
2148
2149         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
2150                                    son, attr, handle, spec, ldata);
2151
2152         /*
2153          * in case of replay we just set LOVEA provided by the client
2154          * XXX: I think it would be interesting to try "old" way where
2155          *      MDT calls this xattr_set(LOV) in a different transaction.
2156          *      probably this way we code can be made better.
2157          */
2158         if (rc == 0 && (spec->no_create ||
2159                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
2160                 const struct lu_buf *buf;
2161
2162                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2163                                 spec->u.sp_ea.eadatalen);
2164                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
2165                                 BYPASS_CAPA);
2166         }
2167
2168         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2169                 rc = __mdd_orphan_add(env, son, handle);
2170
2171         mdd_write_unlock(env, son);
2172
2173         if (rc != 0)
2174                 /*
2175                  * Object has no links, so it will be destroyed when last
2176                  * reference is released. (XXX not now.)
2177                  */
2178                 GOTO(cleanup, rc);
2179
2180         initialized = 1;
2181
2182         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
2183                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2184                                         name, S_ISDIR(attr->la_mode), handle,
2185                                         mdd_object_capa(env, mdd_pobj));
2186
2187         if (rc != 0)
2188                 GOTO(cleanup, rc);
2189
2190         inserted = 1;
2191
2192         if (S_ISLNK(attr->la_mode)) {
2193                 struct lu_ucred  *uc = lu_ucred_assert(env);
2194                 struct dt_object *dt = mdd_object_child(son);
2195                 const char *target_name = spec->u.sp_symname;
2196                 int sym_len = strlen(target_name);
2197                 const struct lu_buf *buf;
2198                 loff_t pos = 0;
2199
2200                 buf = mdd_buf_get_const(env, target_name, sym_len);
2201                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2202                                                 mdd_object_capa(env, son),
2203                                                 uc->uc_cap &
2204                                                 CFS_CAP_SYS_RESOURCE_MASK);
2205
2206                 if (rc == sym_len)
2207                         rc = 0;
2208                 else
2209                         GOTO(cleanup, rc = -EFAULT);
2210         }
2211
2212         /* volatile file creation does not update parent directory times */
2213         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2214                 GOTO(cleanup, rc = 0);
2215
2216         /* update parent directory mtime/ctime */
2217         *la = *attr;
2218         la->la_valid = LA_CTIME | LA_MTIME;
2219         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
2220         if (rc)
2221                 GOTO(cleanup, rc);
2222
2223         EXIT;
2224 cleanup:
2225         if (rc != 0 && created != 0) {
2226                 int rc2;
2227
2228                 if (inserted != 0) {
2229                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2230                                 rc2 = __mdd_orphan_del(env, son, handle);
2231                         else
2232                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
2233                                                          S_ISDIR(attr->la_mode),
2234                                                          handle, BYPASS_CAPA);
2235                         if (rc2 != 0)
2236                                 goto out_stop;
2237                 }
2238
2239                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2240                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
2241                         /* Drop the reference, no need to delete "."/"..",
2242                          * because the object to be destroied directly. */
2243                         rc2 = mdo_ref_del(env, son, handle);
2244                         if (rc2 != 0) {
2245                                 mdd_write_unlock(env, son);
2246                                 goto out_stop;
2247                         }
2248                 }
2249
2250                 rc2 = mdo_ref_del(env, son, handle);
2251                 if (rc2 != 0) {
2252                         mdd_write_unlock(env, son);
2253                         goto out_stop;
2254                 }
2255
2256                 mdo_destroy(env, son, handle);
2257                 mdd_write_unlock(env, son);
2258         }
2259
2260         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
2261 out_trans:
2262         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)))
2263                 rc = mdd_changelog_ns_store(env, mdd,
2264                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
2265                         S_ISREG(attr->la_mode) ? CL_CREATE :
2266                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2267                         0, son, mdd_pobj, lname, handle);
2268 out_stop:
2269         mdd_trans_stop(env, mdd, rc, handle);
2270 out_free:
2271         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2272                 /* if we vmalloced a large buffer drop it */
2273                 lu_buf_free(ldata->ld_buf);
2274
2275         /* The child object shouldn't be cached anymore */
2276         if (rc)
2277                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2278                             &child->mo_lu.lo_header->loh_flags);
2279         return rc;
2280 }
2281
2282 /*
2283  * Get locks on parents in proper order
2284  * RETURN: < 0 - error, rename_order if successful
2285  */
2286 enum rename_order {
2287         MDD_RN_SAME,
2288         MDD_RN_SRCTGT,
2289         MDD_RN_TGTSRC
2290 };
2291
2292 static int mdd_rename_order(const struct lu_env *env,
2293                             struct mdd_device *mdd,
2294                             struct mdd_object *src_pobj,
2295                             struct mdd_object *tgt_pobj)
2296 {
2297         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2298         int rc;
2299         ENTRY;
2300
2301         if (src_pobj == tgt_pobj)
2302                 RETURN(MDD_RN_SAME);
2303
2304         /* compared the parent child relationship of src_p&tgt_p */
2305         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2306                 rc = MDD_RN_SRCTGT;
2307         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2308                 rc = MDD_RN_TGTSRC;
2309         } else {
2310                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2311                 if (rc == -EREMOTE)
2312                         rc = 0;
2313
2314                 if (rc == 1)
2315                         rc = MDD_RN_TGTSRC;
2316                 else if (rc == 0)
2317                         rc = MDD_RN_SRCTGT;
2318         }
2319
2320         RETURN(rc);
2321 }
2322
2323 /* has not mdd_write{read}_lock on any obj yet. */
2324 static int mdd_rename_sanity_check(const struct lu_env *env,
2325                                    struct mdd_object *src_pobj,
2326                                    struct mdd_object *tgt_pobj,
2327                                    struct mdd_object *sobj,
2328                                    struct mdd_object *tobj,
2329                                    struct lu_attr *so_attr,
2330                                    struct lu_attr *tg_attr)
2331 {
2332         int rc = 0;
2333         ENTRY;
2334
2335         /* XXX: when get here, sobj must NOT be NULL,
2336          * the other case has been processed in cld_rename
2337          * before mdd_rename and enable MDS_PERM_BYPASS. */
2338         LASSERT(sobj);
2339
2340         rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
2341         if (rc)
2342                 RETURN(rc);
2343
2344         /* XXX: when get here, "tobj == NULL" means tobj must
2345          * NOT exist (neither on remote MDS, such case has been
2346          * processed in cld_rename before mdd_rename and enable
2347          * MDS_PERM_BYPASS).
2348          * So check may_create, but not check may_unlink. */
2349         if (!tobj)
2350                 rc = mdd_may_create(env, tgt_pobj, NULL,
2351                                     (src_pobj != tgt_pobj), 0);
2352         else
2353                 rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
2354                                     (src_pobj != tgt_pobj), 1);
2355
2356         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2357             S_ISDIR(so_attr->la_mode))
2358                 rc = __mdd_may_link(env, tgt_pobj);
2359
2360         RETURN(rc);
2361 }
2362
2363 static int mdd_declare_rename(const struct lu_env *env,
2364                               struct mdd_device *mdd,
2365                               struct mdd_object *mdd_spobj,
2366                               struct mdd_object *mdd_tpobj,
2367                               struct mdd_object *mdd_sobj,
2368                               struct mdd_object *mdd_tobj,
2369                               const struct lu_name *tname,
2370                               const struct lu_name *sname,
2371                               struct md_attr *ma,
2372                               struct linkea_data *ldata,
2373                               struct thandle *handle)
2374 {
2375         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2376         int rc;
2377
2378         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2379         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2380
2381         LASSERT(mdd_spobj);
2382         LASSERT(mdd_tpobj);
2383         LASSERT(mdd_sobj);
2384
2385         /* name from source dir */
2386         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2387         if (rc)
2388                 return rc;
2389
2390         /* .. from source child */
2391         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2392                 /* source child can be directory,
2393                  * counted by source dir's nlink */
2394                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2395                 if (rc)
2396                         return rc;
2397                 if (mdd_spobj != mdd_tpobj) {
2398                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2399                                                       handle);
2400                         if (rc)
2401                                 return rc;
2402
2403                         rc = mdo_declare_index_insert(env, mdd_sobj,
2404                                                       mdo2fid(mdd_tpobj),
2405                                                       dotdot, handle);
2406                         if (rc)
2407                                 return rc;
2408                 }
2409                 /* new target child can be directory,
2410                  * counted by target dir's nlink */
2411                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2412                 if (rc)
2413                         return rc;
2414
2415         }
2416
2417         la->la_valid = LA_CTIME | LA_MTIME;
2418         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2419         if (rc != 0)
2420                 return rc;
2421
2422         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2423         if (rc != 0)
2424                 return rc;
2425
2426         la->la_valid = LA_CTIME;
2427         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2428         if (rc)
2429                 return rc;
2430
2431         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2432         if (rc)
2433                 return rc;
2434
2435         /* new name */
2436         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2437                         tname->ln_name, handle);
2438         if (rc)
2439                 return rc;
2440
2441         /* name from target dir (old name), we declare it unconditionally
2442          * as mdd_rename() calls delete unconditionally as well. so just
2443          * to balance declarations vs calls to change ... */
2444         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2445         if (rc)
2446                 return rc;
2447
2448         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2449                 /* delete target child in target parent directory */
2450                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2451                 if (rc)
2452                         return rc;
2453
2454                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2455                         /* target child can be directory,
2456                          * delete "." reference in target child directory */
2457                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2458                         if (rc)
2459                                 return rc;
2460
2461                         /* delete ".." reference in target parent directory */
2462                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2463                         if (rc)
2464                                 return rc;
2465                 }
2466
2467                 la->la_valid = LA_CTIME;
2468                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2469                 if (rc)
2470                         return rc;
2471
2472                 rc = mdd_declare_links_del(env, mdd_tobj, handle);
2473                 if (rc)
2474                         return rc;
2475
2476                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2477                 if (rc)
2478                         return rc;
2479         }
2480
2481         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2482         if (rc)
2483                 return rc;
2484
2485         return rc;
2486 }
2487
2488 /* src object can be remote that is why we use only fid and type of object */
2489 static int mdd_rename(const struct lu_env *env,
2490                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2491                       const struct lu_fid *lf, const struct lu_name *lsname,
2492                       struct md_object *tobj, const struct lu_name *ltname,
2493                       struct md_attr *ma)
2494 {
2495         const char *sname = lsname->ln_name;
2496         const char *tname = ltname->ln_name;
2497         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2498         struct lu_attr    *so_attr = &mdd_env_info(env)->mti_cattr;
2499         struct lu_attr    *tg_attr = &mdd_env_info(env)->mti_pattr;
2500         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2501         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2502         struct mdd_device *mdd = mdo2mdd(src_pobj);
2503         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2504         struct mdd_object *mdd_tobj = NULL;
2505         struct dynlock_handle *sdlh = NULL, *tdlh = NULL;
2506         struct thandle *handle;
2507         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2508         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2509         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2510         bool is_dir;
2511         bool tobj_ref = 0;
2512         bool tobj_locked = 0;
2513         unsigned cl_flags = 0;
2514         int rc, rc2;
2515         ENTRY;
2516
2517         if (tobj)
2518                 mdd_tobj = md2mdd_obj(tobj);
2519
2520         mdd_sobj = mdd_object_find(env, mdd, lf);
2521
2522         rc = mdd_la_get(env, mdd_sobj, so_attr,
2523                         mdd_object_capa(env, mdd_sobj));
2524         if (rc)
2525                 GOTO(out_pending, rc);
2526
2527         if (mdd_tobj) {
2528                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2529                                 mdd_object_capa(env, mdd_tobj));
2530                 if (rc)
2531                         GOTO(out_pending, rc);
2532         }
2533
2534         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj,
2535                                      mdd_tobj, so_attr, tg_attr);
2536         if (rc)
2537                 GOTO(out_pending, rc);
2538
2539         handle = mdd_trans_create(env, mdd);
2540         if (IS_ERR(handle))
2541                 GOTO(out_pending, rc = PTR_ERR(handle));
2542
2543         memset(ldata, 0, sizeof(*ldata));
2544         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2545                            ltname, 1, 0, ldata);
2546         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2547                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2548         if (rc)
2549                 GOTO(stop, rc);
2550
2551         rc = mdd_trans_start(env, mdd, handle);
2552         if (rc)
2553                 GOTO(stop, rc);
2554
2555         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2556         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2557         if (rc < 0)
2558                 GOTO(cleanup_unlocked, rc);
2559
2560         /* Get locks in determined order */
2561         if (rc == MDD_RN_SAME) {
2562                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2563                                           sname, MOR_SRC_PARENT);
2564                 /* check hashes to determine do we need one lock or two */
2565                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2566                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2567                                 MOR_TGT_PARENT);
2568                 else
2569                         tdlh = sdlh;
2570         } else if (rc == MDD_RN_SRCTGT) {
2571                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2572                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2573         } else {
2574                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2575                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2576         }
2577         if (sdlh == NULL || tdlh == NULL)
2578                 GOTO(cleanup, rc = -ENOMEM);
2579
2580         is_dir = S_ISDIR(so_attr->la_mode);
2581
2582         /* Remove source name from source directory */
2583         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2584                                 mdd_object_capa(env, mdd_spobj));
2585         if (rc)
2586                 GOTO(cleanup, rc);
2587
2588         /* "mv dir1 dir2" needs "dir1/.." link update */
2589         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2590                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2591                                         mdd_object_capa(env, mdd_sobj));
2592                 if (rc)
2593                         GOTO(fixup_spobj2, rc);
2594
2595                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2596                                       handle, mdd_object_capa(env, mdd_sobj));
2597                 if (rc)
2598                         GOTO(fixup_spobj, rc);
2599         }
2600
2601         /* Remove target name from target directory
2602          * Here tobj can be remote one, so we do index_delete unconditionally
2603          * and -ENOENT is allowed.
2604          */
2605         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2606                                 mdd_object_capa(env, mdd_tpobj));
2607         if (rc != 0) {
2608                 if (mdd_tobj) {
2609                         /* tname might been renamed to something else */
2610                         GOTO(fixup_spobj, rc);
2611                 }
2612                 if (rc != -ENOENT)
2613                         GOTO(fixup_spobj, rc);
2614         }
2615
2616         /* Insert new fid with target name into target dir */
2617         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2618                                 mdd_object_capa(env, mdd_tpobj));
2619         if (rc)
2620                 GOTO(fixup_tpobj, rc);
2621
2622         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2623         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2624
2625         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2626         if (mdd_sobj) {
2627                 la->la_valid = LA_CTIME;
2628                 rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
2629                 if (rc)
2630                         GOTO(fixup_tpobj, rc);
2631         }
2632
2633         /* Remove old target object
2634          * For tobj is remote case cmm layer has processed
2635          * and set tobj to NULL then. So when tobj is NOT NULL,
2636          * it must be local one.
2637          */
2638         if (tobj && mdd_object_exists(mdd_tobj)) {
2639                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2640                 tobj_locked = 1;
2641                 if (mdd_is_dead_obj(mdd_tobj)) {
2642                         /* shld not be dead, something is wrong */
2643                         CERROR("tobj is dead, something is wrong\n");
2644                         rc = -EINVAL;
2645                         goto cleanup;
2646                 }
2647                 mdo_ref_del(env, mdd_tobj, handle);
2648
2649                 /* Remove dot reference. */
2650                 if (S_ISDIR(tg_attr->la_mode))
2651                         mdo_ref_del(env, mdd_tobj, handle);
2652                 tobj_ref = 1;
2653
2654                 /* fetch updated nlink */
2655                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2656                                 mdd_object_capa(env, mdd_tobj));
2657                 if (rc != 0) {
2658                         CERROR("%s: Failed to get nlink for tobj "
2659                                 DFID": rc = %d\n",
2660                                 mdd2obd_dev(mdd)->obd_name,
2661                                 PFID(tpobj_fid), rc);
2662                         GOTO(fixup_tpobj, rc);
2663                 }
2664
2665                 la->la_valid = LA_CTIME;
2666                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2667                 if (rc != 0) {
2668                         CERROR("%s: Failed to set ctime for tobj "
2669                                 DFID": rc = %d\n",
2670                                 mdd2obd_dev(mdd)->obd_name,
2671                                 PFID(tpobj_fid), rc);
2672                         GOTO(fixup_tpobj, rc);
2673                 }
2674
2675                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2676                 ma->ma_attr = *tg_attr;
2677                 ma->ma_valid |= MA_INODE;
2678                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2679                 if (rc != 0) {
2680                         CERROR("%s: Failed to unlink tobj "
2681                                 DFID": rc = %d\n",
2682                                 mdd2obd_dev(mdd)->obd_name,
2683                                 PFID(tpobj_fid), rc);
2684                         GOTO(fixup_tpobj, rc);
2685                 }
2686
2687                 /* fetch updated nlink */
2688                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2689                                 mdd_object_capa(env, mdd_tobj));
2690                 if (rc != 0) {
2691                         CERROR("%s: Failed to get nlink for tobj "
2692                                 DFID": rc = %d\n",
2693                                 mdd2obd_dev(mdd)->obd_name,
2694                                 PFID(tpobj_fid), rc);
2695                         GOTO(fixup_tpobj, rc);
2696                 }
2697                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2698                 ma->ma_attr = *tg_attr;
2699                 ma->ma_valid |= MA_INODE;
2700
2701                 if (tg_attr->la_nlink == 0) {
2702                         cl_flags |= CLF_RENAME_LAST;
2703                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2704                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2705                 }
2706         }
2707
2708         la->la_valid = LA_CTIME | LA_MTIME;
2709         rc = mdd_attr_check_set_internal(env, mdd_spobj, la, handle, 0);
2710         if (rc)
2711                 GOTO(fixup_tpobj, rc);
2712
2713         if (mdd_spobj != mdd_tpobj) {
2714                 la->la_valid = LA_CTIME | LA_MTIME;
2715                 rc = mdd_attr_check_set_internal(env, mdd_tpobj, la,
2716                                                  handle, 0);
2717         }
2718
2719         if (rc == 0 && mdd_sobj) {
2720                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2721                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2722                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2723                                       0, 0);
2724                 if (rc == -ENOENT)
2725                         /* Old files might not have EA entry */
2726                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2727                                       lsname, handle, NULL, 0);
2728                 mdd_write_unlock(env, mdd_sobj);
2729                 /* We don't fail the transaction if the link ea can't be
2730                    updated -- fid2path will use alternate lookup method. */
2731                 rc = 0;
2732         }
2733
2734         EXIT;
2735
2736 fixup_tpobj:
2737         if (rc) {
2738                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2739                                          BYPASS_CAPA);
2740                 if (rc2)
2741                         CWARN("tp obj fix error %d\n",rc2);
2742
2743                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2744                     !mdd_is_dead_obj(mdd_tobj)) {
2745                         if (tobj_ref) {
2746                                 mdo_ref_add(env, mdd_tobj, handle);
2747                                 if (is_dir)
2748                                         mdo_ref_add(env, mdd_tobj, handle);
2749                         }
2750
2751                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2752                                          mdo2fid(mdd_tobj), tname,
2753                                          is_dir, handle,
2754                                          BYPASS_CAPA);
2755
2756                         if (rc2)
2757                                 CWARN("tp obj fix error %d\n",rc2);
2758                 }
2759         }
2760
2761 fixup_spobj:
2762         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2763                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2764                                               BYPASS_CAPA);
2765
2766                 if (rc2)
2767                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2768                                mdd2obd_dev(mdd)->obd_name, rc2);
2769
2770
2771                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2772                                               dotdot, handle, BYPASS_CAPA);
2773                 if (rc2)
2774                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2775                               mdd2obd_dev(mdd)->obd_name, rc2);
2776         }
2777
2778 fixup_spobj2:
2779         if (rc) {
2780                 rc2 = __mdd_index_insert(env, mdd_spobj,
2781                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2782                 if (rc2)
2783                         CWARN("sp obj fix error %d\n",rc2);
2784         }
2785 cleanup:
2786         if (tobj_locked)
2787                 mdd_write_unlock(env, mdd_tobj);
2788         if (likely(tdlh) && sdlh != tdlh)
2789                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2790         if (likely(sdlh))
2791                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2792 cleanup_unlocked:
2793         if (rc == 0)
2794                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2795                                                 mdd_tobj, tpobj_fid, lf,
2796                                                 spobj_fid, ltname, lsname,
2797                                                 handle);
2798
2799 stop:
2800         mdd_trans_stop(env, mdd, rc, handle);
2801 out_pending:
2802         mdd_object_put(env, mdd_sobj);
2803         return rc;
2804 }
2805
2806 const struct md_dir_operations mdd_dir_ops = {
2807         .mdo_is_subdir     = mdd_is_subdir,
2808         .mdo_lookup        = mdd_lookup,
2809         .mdo_create        = mdd_create,
2810         .mdo_rename        = mdd_rename,
2811         .mdo_link          = mdd_link,
2812         .mdo_unlink        = mdd_unlink,
2813         .mdo_create_data   = mdd_create_data,
2814 };