Whamcloud - gitweb
LU-2789 lod: sidestep setstripe vs rename race
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static int __mdd_lookup(const struct lu_env *, struct md_object *,
61                         const struct lu_name *, struct lu_fid*, int);
62
63 static int
64 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
65                     const struct lu_name *lname, struct lu_fid* fid, int mask)
66 {
67         const char *name = lname->ln_name;
68         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
69         struct dynlock_handle *dlh;
70         int rc;
71
72         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
73         if (unlikely(dlh == NULL))
74                 return -ENOMEM;
75         rc = __mdd_lookup(env, pobj, lname, fid, mask);
76         mdd_pdo_read_unlock(env, mdd_obj, dlh);
77
78         return rc;
79 }
80
81 int mdd_lookup(const struct lu_env *env,
82                struct md_object *pobj, const struct lu_name *lname,
83                struct lu_fid* fid, struct md_op_spec *spec)
84 {
85         int rc;
86         ENTRY;
87         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
88         RETURN(rc);
89 }
90
91 int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
92                    struct lu_fid *fid)
93 {
94         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
95 }
96
97 /*
98  * For root fid use special function, which does not compare version component
99  * of fid. Version component is different for root fids on all MDTs.
100  */
101 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
102 {
103         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
104                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
105 }
106
107 /*
108  * return 1: if lf is the fid of the ancestor of p1;
109  * return 0: if not;
110  *
111  * return -EREMOTE: if remote object is found, in this
112  * case fid of remote object is saved to @pf;
113  *
114  * otherwise: values < 0, errors.
115  */
116 static int mdd_is_parent(const struct lu_env *env,
117                          struct mdd_device *mdd,
118                          struct mdd_object *p1,
119                          const struct lu_fid *lf,
120                          struct lu_fid *pf)
121 {
122         struct mdd_object *parent = NULL;
123         struct lu_fid *pfid;
124         int rc;
125         ENTRY;
126
127         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
128         pfid = &mdd_env_info(env)->mti_fid;
129
130         /* Check for root first. */
131         if (mdd_is_root(mdd, mdo2fid(p1)))
132                 RETURN(0);
133
134         for(;;) {
135                 /* this is done recursively, bypass capa for each obj */
136                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
137                 rc = mdd_parent_fid(env, p1, pfid);
138                 if (rc)
139                         GOTO(out, rc);
140                 if (mdd_is_root(mdd, pfid))
141                         GOTO(out, rc = 0);
142                 if (lu_fid_eq(pfid, lf))
143                         GOTO(out, rc = 1);
144                 if (parent)
145                         mdd_object_put(env, parent);
146
147                 parent = mdd_object_find(env, mdd, pfid);
148                 if (IS_ERR(parent)) {
149                         GOTO(out, rc = PTR_ERR(parent));
150                 } else if (mdd_object_remote(parent)) {
151                         /*FIXME: Because of the restriction of rename in Phase I.
152                          * If the parent is remote, we just assumed lf is not the
153                          * parent of P1 for now */
154                         GOTO(out, rc = 0);
155                 }
156                 p1 = parent;
157         }
158         EXIT;
159 out:
160         if (parent && !IS_ERR(parent))
161                 mdd_object_put(env, parent);
162         return rc;
163 }
164
165 /*
166  * No permission check is needed.
167  *
168  * returns 1: if fid is ancestor of @mo;
169  * returns 0: if fid is not a ancestor of @mo;
170  *
171  * returns EREMOTE if remote object is found, fid of remote object is saved to
172  * @fid;
173  *
174  * returns < 0: if error
175  */
176 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
177                   const struct lu_fid *fid, struct lu_fid *sfid)
178 {
179         struct mdd_device *mdd = mdo2mdd(mo);
180         int rc;
181         ENTRY;
182
183         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
184                 RETURN(0);
185
186         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
187         if (rc == 0) {
188                 /* found root */
189                 fid_zero(sfid);
190         } else if (rc == 1) {
191                 /* found @fid is parent */
192                 *sfid = *fid;
193                 rc = 0;
194         }
195         RETURN(rc);
196 }
197
198 /*
199  * Check that @dir contains no entries except (possibly) dot and dotdot.
200  *
201  * Returns:
202  *
203  *             0        empty
204  *      -ENOTDIR        not a directory object
205  *    -ENOTEMPTY        not empty
206  *           -ve        other error
207  *
208  */
209 static int mdd_dir_is_empty(const struct lu_env *env,
210                             struct mdd_object *dir)
211 {
212         struct dt_it     *it;
213         struct dt_object *obj;
214         const struct dt_it_ops *iops;
215         int result;
216         ENTRY;
217
218         obj = mdd_object_child(dir);
219         if (!dt_try_as_dir(env, obj))
220                 RETURN(-ENOTDIR);
221
222         iops = &obj->do_index_ops->dio_it;
223         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
224         if (!IS_ERR(it)) {
225                 result = iops->get(env, it, (const void *)"");
226                 if (result > 0) {
227                         int i;
228                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
229                                 result = iops->next(env, it);
230                         if (result == 0)
231                                 result = -ENOTEMPTY;
232                         else if (result == +1)
233                                 result = 0;
234                 } else if (result == 0)
235                         /*
236                          * Huh? Index contains no zero key?
237                          */
238                         result = -EIO;
239
240                 iops->put(env, it);
241                 iops->fini(env, it);
242         } else
243                 result = PTR_ERR(it);
244         RETURN(result);
245 }
246
247 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
248 {
249         struct mdd_device *m = mdd_obj2mdd_dev(obj);
250         struct lu_attr *la = &mdd_env_info(env)->mti_la;
251         int rc;
252         ENTRY;
253
254         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
255         if (rc)
256                 RETURN(rc);
257
258         /*
259          * Subdir count limitation can be broken through.
260          */
261         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
262             !S_ISDIR(la->la_mode))
263                 RETURN(-EMLINK);
264         else
265                 RETURN(0);
266 }
267
268 /*
269  * Check whether it may create the cobj under the pobj.
270  * cobj maybe NULL
271  */
272 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
273                    struct mdd_object *cobj, int check_perm, int check_nlink)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (cobj && mdd_object_exists(cobj))
279                 RETURN(-EEXIST);
280
281         if (mdd_is_dead_obj(pobj))
282                 RETURN(-ENOENT);
283
284         if (check_perm)
285                 rc = mdd_permission_internal_locked(env, pobj, NULL,
286                                                     MAY_WRITE | MAY_EXEC,
287                                                     MOR_TGT_PARENT);
288         if (!rc && check_nlink)
289                 rc = __mdd_may_link(env, pobj);
290
291         RETURN(rc);
292 }
293
294 /*
295  * Check whether can unlink from the pobj in the case of "cobj == NULL".
296  */
297 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
298                    const struct lu_attr *attr)
299 {
300         int rc;
301         ENTRY;
302
303         if (mdd_is_dead_obj(pobj))
304                 RETURN(-ENOENT);
305
306         if ((attr->la_valid & LA_FLAGS) &&
307             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
308                 RETURN(-EPERM);
309
310         rc = mdd_permission_internal_locked(env, pobj, NULL,
311                                             MAY_WRITE | MAY_EXEC,
312                                             MOR_TGT_PARENT);
313         if (rc)
314                 RETURN(rc);
315
316         if (mdd_is_append(pobj))
317                 RETURN(-EPERM);
318
319         RETURN(rc);
320 }
321
322 /*
323  * pobj == NULL is remote ops case, under such case, pobj's
324  * VTX feature has been checked already, no need check again.
325  */
326 static inline int mdd_is_sticky(const struct lu_env *env,
327                                 struct mdd_object *pobj,
328                                 struct mdd_object *cobj)
329 {
330         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
331         struct lu_ucred *uc = lu_ucred_assert(env);
332         int rc;
333
334         if (pobj) {
335                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
336                 if (rc)
337                         return rc;
338
339                 if (!(tmp_la->la_mode & S_ISVTX) ||
340                     (tmp_la->la_uid == uc->uc_fsuid))
341                         return 0;
342         }
343
344         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
345         if (rc)
346                 return rc;
347
348         if (tmp_la->la_uid == uc->uc_fsuid)
349                 return 0;
350
351         return !md_capable(uc, CFS_CAP_FOWNER);
352 }
353
354 static int mdd_may_delete_entry(const struct lu_env *env,
355                                 struct mdd_object *pobj, int check_perm)
356 {
357         ENTRY;
358
359         LASSERT(pobj != NULL);
360         if (!mdd_object_exists(pobj))
361                 RETURN(-ENOENT);
362
363         if (mdd_is_dead_obj(pobj))
364                 RETURN(-ENOENT);
365
366         if (check_perm) {
367                 int rc;
368                 rc = mdd_permission_internal_locked(env, pobj, NULL,
369                                             MAY_WRITE | MAY_EXEC,
370                                             MOR_TGT_PARENT);
371                 if (rc)
372                         RETURN(rc);
373         }
374
375         if (mdd_is_append(pobj))
376                 RETURN(-EPERM);
377
378         RETURN(0);
379 }
380
381 /*
382  * Check whether it may delete the cobj from the pobj.
383  * pobj maybe NULL
384  */
385 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
386                    struct mdd_object *cobj, struct lu_attr *cattr,
387                    struct lu_attr *src_attr, int check_perm, int check_empty)
388 {
389         int rc = 0;
390         ENTRY;
391
392         if (pobj) {
393                 rc = mdd_may_delete_entry(env, pobj, check_perm);
394                 if (rc != 0)
395                         RETURN(rc);
396         }
397
398         if (cobj == NULL)
399                 RETURN(0);
400
401         if (!mdd_object_exists(cobj))
402                 RETURN(-ENOENT);
403
404         if (mdd_is_dead_obj(cobj))
405                 RETURN(-ESTALE);
406
407
408         if (mdd_is_sticky(env, pobj, cobj))
409                 RETURN(-EPERM);
410
411         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
412                 RETURN(-EPERM);
413
414         if ((cattr->la_valid & LA_FLAGS) &&
415             (cattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
416                 RETURN(-EPERM);
417
418         /* additional check the rename case */
419         if (src_attr) {
420                 if (S_ISDIR(src_attr->la_mode)) {
421                         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
422
423                         if (!S_ISDIR(cattr->la_mode))
424                                 RETURN(-ENOTDIR);
425
426                         if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
427                                 RETURN(-EBUSY);
428                 } else if (S_ISDIR(cattr->la_mode))
429                         RETURN(-EISDIR);
430         }
431
432         if (S_ISDIR(cattr->la_mode) && check_empty)
433                 rc = mdd_dir_is_empty(env, cobj);
434
435         RETURN(rc);
436 }
437
438 /*
439  * tgt maybe NULL
440  * has mdd_write_lock on src already, but not on tgt yet
441  */
442 int mdd_link_sanity_check(const struct lu_env *env,
443                           struct mdd_object *tgt_obj,
444                           const struct lu_name *lname,
445                           struct mdd_object *src_obj)
446 {
447         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
448         int rc = 0;
449         ENTRY;
450
451         if (!mdd_object_exists(src_obj))
452                 RETURN(-ENOENT);
453
454         if (mdd_is_dead_obj(src_obj))
455                 RETURN(-ESTALE);
456
457         /* Local ops, no lookup before link, check filename length here. */
458         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
459                 RETURN(-ENAMETOOLONG);
460
461         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
462                 RETURN(-EPERM);
463
464         if (S_ISDIR(mdd_object_type(src_obj)))
465                 RETURN(-EPERM);
466
467         LASSERT(src_obj != tgt_obj);
468         if (tgt_obj) {
469                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
470                 if (rc)
471                         RETURN(rc);
472         }
473
474         rc = __mdd_may_link(env, src_obj);
475
476         RETURN(rc);
477 }
478
479 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
480                                    const char *name, struct thandle *handle,
481                                    struct lustre_capa *capa)
482 {
483         struct dt_object *next = mdd_object_child(pobj);
484         int               rc;
485         ENTRY;
486
487         if (dt_try_as_dir(env, next)) {
488                 rc = next->do_index_ops->dio_delete(env, next,
489                                                     (struct dt_key *)name,
490                                                     handle, capa);
491         } else
492                 rc = -ENOTDIR;
493
494         RETURN(rc);
495 }
496
497 static int __mdd_index_insert_only(const struct lu_env *env,
498                                    struct mdd_object *pobj,
499                                    const struct lu_fid *lf, const char *name,
500                                    struct thandle *handle,
501                                    struct lustre_capa *capa)
502 {
503         struct dt_object *next = mdd_object_child(pobj);
504         int               rc;
505         ENTRY;
506
507         if (dt_try_as_dir(env, next)) {
508                 struct lu_ucred  *uc = lu_ucred_check(env);
509                 int ignore_quota;
510
511                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
512                 rc = next->do_index_ops->dio_insert(env, next,
513                                                     (struct dt_rec*)lf,
514                                                     (const struct dt_key *)name,
515                                                     handle, capa, ignore_quota);
516         } else {
517                 rc = -ENOTDIR;
518         }
519         RETURN(rc);
520 }
521
522 /* insert named index, add reference if isdir */
523 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
524                               const struct lu_fid *lf, const char *name, int is_dir,
525                               struct thandle *handle, struct lustre_capa *capa)
526 {
527         int               rc;
528         ENTRY;
529
530         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
531         if (rc == 0 && is_dir) {
532                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
533                 mdo_ref_add(env, pobj, handle);
534                 mdd_write_unlock(env, pobj);
535         }
536         RETURN(rc);
537 }
538
539 /* delete named index, drop reference if isdir */
540 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
541                               const char *name, int is_dir, struct thandle *handle,
542                               struct lustre_capa *capa)
543 {
544         int               rc;
545         ENTRY;
546
547         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
548         if (rc == 0 && is_dir) {
549                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
550                 mdo_ref_del(env, pobj, handle);
551                 mdd_write_unlock(env, pobj);
552         }
553
554         RETURN(rc);
555 }
556
557 int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
558                             int reclen, struct thandle *handle)
559 {
560         int rc;
561
562         /* XXX: this is a temporary solution to declare llog changes
563          *      will be fixed in 2.3 with new llog implementation */
564
565         LASSERT(mdd->mdd_capa);
566
567         /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
568          *      have to set the parameter 'size' as INT_MAX or 0 to inform
569          *      OSD that this record write is for a llog write or catalog
570          *      header update, and osd declare function will reserve less
571          *      credits for optimization purpose.
572          *
573          *      Reserve 6 blocks for a llog write, since the llog file is
574          *      usually small, reserve 2 blocks for catalog header update,
575          *      because we know for sure that catalog header is already
576          *      allocated.
577          *
578          *      This hack should be removed in 2.3.
579          */
580
581         /* record itself */
582         rc = dt_declare_record_write(env, mdd->mdd_capa,
583                                      DECLARE_LLOG_WRITE, 0, handle);
584         if (rc)
585                 return rc;
586
587         /* header will be updated as well */
588         rc = dt_declare_record_write(env, mdd->mdd_capa,
589                                      DECLARE_LLOG_WRITE, 0, handle);
590         if (rc)
591                 return rc;
592
593         /* also we should be able to create new plain log */
594         rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
595         if (rc)
596                 return rc;
597
598         /* new record referencing new plain llog */
599         rc = dt_declare_record_write(env, mdd->mdd_capa,
600                                      DECLARE_LLOG_WRITE, 0, handle);
601         if (rc)
602                 return rc;
603
604         /* catalog's header will be updated as well */
605         rc = dt_declare_record_write(env, mdd->mdd_capa,
606                                      DECLARE_LLOG_REWRITE, 0, handle);
607
608         return rc;
609 }
610
611 int mdd_declare_changelog_store(const struct lu_env *env,
612                                 struct mdd_device *mdd,
613                                 const struct lu_name *fname,
614                                 struct thandle *handle)
615 {
616         struct obd_device               *obd = mdd2obd_dev(mdd);
617         struct llog_ctxt                *ctxt;
618         struct llog_changelog_rec       *rec;
619         struct lu_buf                   *buf;
620         int                              reclen;
621         int                              rc;
622
623         /* Not recording */
624         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
625                 return 0;
626
627         reclen = llog_data_len(sizeof(*rec) +
628                                (fname != NULL ? fname->ln_namelen : 0));
629         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
630         if (buf->lb_buf == NULL)
631                 return -ENOMEM;
632
633         rec = buf->lb_buf;
634         rec->cr_hdr.lrh_len = reclen;
635         rec->cr_hdr.lrh_type = CHANGELOG_REC;
636
637         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
638         if (ctxt == NULL)
639                 return -ENXIO;
640
641         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
642         llog_ctxt_put(ctxt);
643
644         return rc;
645 }
646
647 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
648                                            struct mdd_device *mdd,
649                                            const struct lu_name *tname,
650                                            const struct lu_name *sname,
651                                            struct thandle *handle)
652 {
653         struct obd_device               *obd = mdd2obd_dev(mdd);
654         struct llog_ctxt                *ctxt;
655         struct llog_changelog_ext_rec   *rec;
656         struct lu_buf                   *buf;
657         int                              reclen;
658         int                              rc;
659
660         /* Not recording */
661         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
662                 return 0;
663
664         reclen = llog_data_len(sizeof(*rec) +
665                                (tname != NULL ? tname->ln_namelen : 0) +
666                                (sname != NULL ? 1 + sname->ln_namelen : 0));
667         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
668         if (buf->lb_buf == NULL)
669                 return -ENOMEM;
670
671         rec = buf->lb_buf;
672         rec->cr_hdr.lrh_len = reclen;
673         rec->cr_hdr.lrh_type = CHANGELOG_REC;
674
675         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
676         if (ctxt == NULL)
677                 return -ENXIO;
678
679         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
680         llog_ctxt_put(ctxt);
681
682         return rc;
683 }
684
685 /** Add a changelog entry \a rec to the changelog llog
686  * \param mdd
687  * \param rec
688  * \param handle - currently ignored since llogs start their own transaction;
689  *                 this will hopefully be fixed in llog rewrite
690  * \retval 0 ok
691  */
692 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
693                         struct llog_changelog_rec *rec, struct thandle *th)
694 {
695         struct obd_device       *obd = mdd2obd_dev(mdd);
696         struct llog_ctxt        *ctxt;
697         int                      rc;
698
699         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
700         rec->cr_hdr.lrh_type = CHANGELOG_REC;
701         rec->cr.cr_time = cl_time();
702
703         spin_lock(&mdd->mdd_cl.mc_lock);
704         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
705          * but as long as the MDD transactions are ordered correctly for e.g.
706          * rename conflicts, I don't think this should matter. */
707         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
708         spin_unlock(&mdd->mdd_cl.mc_lock);
709
710         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
711         if (ctxt == NULL)
712                 return -ENXIO;
713
714         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
715         llog_ctxt_put(ctxt);
716         if (rc > 0)
717                 rc = 0;
718         return rc;
719 }
720
721 /** Add a changelog_ext entry \a rec to the changelog llog
722  * \param mdd
723  * \param rec
724  * \param handle - currently ignored since llogs start their own transaction;
725  *              this will hopefully be fixed in llog rewrite
726  * \retval 0 ok
727  */
728 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
729                             struct llog_changelog_ext_rec *rec,
730                             struct thandle *th)
731 {
732         struct obd_device       *obd = mdd2obd_dev(mdd);
733         struct llog_ctxt        *ctxt;
734         int                      rc;
735
736         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
737         /* llog_lvfs_write_rec sets the llog tail len */
738         rec->cr_hdr.lrh_type = CHANGELOG_REC;
739         rec->cr.cr_time = cl_time();
740
741         spin_lock(&mdd->mdd_cl.mc_lock);
742         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
743          * but as long as the MDD transactions are ordered correctly for e.g.
744          * rename conflicts, I don't think this should matter. */
745         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
746         spin_unlock(&mdd->mdd_cl.mc_lock);
747
748         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
749         if (ctxt == NULL)
750                 return -ENXIO;
751
752         /* nested journal transaction */
753         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
754         llog_ctxt_put(ctxt);
755         if (rc > 0)
756                 rc = 0;
757
758         return rc;
759 }
760
761 /** Store a namespace change changelog record
762  * If this fails, we must fail the whole transaction; we don't
763  * want the change to commit without the log entry.
764  * \param target - mdd_object of change
765  * \param parent - parent dir/object
766  * \param tname - target name string
767  * \param handle - transacion handle
768  */
769 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
770                            enum changelog_rec_type type, unsigned flags,
771                            struct mdd_object *target, struct mdd_object *parent,
772                            const struct lu_name *tname, struct thandle *handle)
773 {
774         struct llog_changelog_rec *rec;
775         struct lu_buf *buf;
776         int reclen;
777         int rc;
778         ENTRY;
779
780         /* Not recording */
781         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
782                 RETURN(0);
783         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
784                 RETURN(0);
785
786         LASSERT(target != NULL);
787         LASSERT(parent != NULL);
788         LASSERT(tname != NULL);
789         LASSERT(handle != NULL);
790
791         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
792         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
793         if (buf->lb_buf == NULL)
794                 RETURN(-ENOMEM);
795         rec = buf->lb_buf;
796
797         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
798         rec->cr.cr_type = (__u32)type;
799         rec->cr.cr_tfid = *mdo2fid(target);
800         rec->cr.cr_pfid = *mdo2fid(parent);
801         rec->cr.cr_namelen = tname->ln_namelen;
802         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
803
804         target->mod_cltime = cfs_time_current_64();
805
806         rc = mdd_changelog_store(env, mdd, rec, handle);
807         if (rc < 0) {
808                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
809                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
810                         PFID(&rec->cr.cr_pfid));
811                 RETURN(-EFAULT);
812         }
813
814         RETURN(0);
815 }
816
817
818 /** Store a namespace change changelog record
819  * If this fails, we must fail the whole transaction; we don't
820  * want the change to commit without the log entry.
821  * \param target - mdd_object of change
822  * \param tpfid - target parent dir/object fid
823  * \param sfid - source object fid
824  * \param spfid - source parent fid
825  * \param tname - target name string
826  * \param sname - source name string
827  * \param handle - transacion handle
828  */
829 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
830                                       struct mdd_device    *mdd,
831                                       enum changelog_rec_type type,
832                                       unsigned flags,
833                                       struct mdd_object    *target,
834                                       const struct lu_fid  *tpfid,
835                                       const struct lu_fid  *sfid,
836                                       const struct lu_fid  *spfid,
837                                       const struct lu_name *tname,
838                                       const struct lu_name *sname,
839                                       struct thandle *handle)
840 {
841         struct llog_changelog_ext_rec *rec;
842         struct lu_buf *buf;
843         int reclen;
844         int rc;
845         ENTRY;
846
847         /* Not recording */
848         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
849                 RETURN(0);
850         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
851                 RETURN(0);
852
853         LASSERT(sfid != NULL);
854         LASSERT(tpfid != NULL);
855         LASSERT(tname != NULL);
856         LASSERT(handle != NULL);
857
858         reclen = llog_data_len(sizeof(*rec) +
859                                sname != NULL ? 1 + sname->ln_namelen : 0);
860         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
861         if (buf->lb_buf == NULL)
862                 RETURN(-ENOMEM);
863         rec = buf->lb_buf;
864
865         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
866         rec->cr.cr_type = (__u32)type;
867         rec->cr.cr_pfid = *tpfid;
868         rec->cr.cr_sfid = *sfid;
869         rec->cr.cr_spfid = *spfid;
870         rec->cr.cr_namelen = tname->ln_namelen;
871         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
872         if (sname) {
873                 rec->cr.cr_name[tname->ln_namelen] = '\0';
874                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
875                         sname->ln_namelen);
876                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
877         }
878
879         if (likely(target != NULL)) {
880                 rec->cr.cr_tfid = *mdo2fid(target);
881                 target->mod_cltime = cfs_time_current_64();
882         } else {
883                 fid_zero(&rec->cr.cr_tfid);
884         }
885
886         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
887         if (rc < 0) {
888                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
889                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
890                 return -EFAULT;
891         }
892
893         return 0;
894 }
895
896 static int __mdd_links_add(const struct lu_env *env,
897                            struct mdd_object *mdd_obj,
898                            struct linkea_data *ldata,
899                            const struct lu_name *lname,
900                            const struct lu_fid *pfid,
901                            int first, int check)
902 {
903         int rc;
904
905         if (ldata->ld_leh == NULL) {
906                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
907                 if (rc) {
908                         if (rc != -ENODATA)
909                                 return rc;
910                         rc = linkea_data_new(ldata,
911                                              &mdd_env_info(env)->mti_link_buf);
912                         if (rc)
913                                 return rc;
914                 }
915         }
916
917         if (check) {
918                 rc = linkea_links_find(ldata, lname, pfid);
919                 if (rc && rc != -ENOENT)
920                         return rc;
921                 if (rc == 0)
922                         return -EEXIST;
923         }
924
925         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
926                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
927
928                 *tfid = *pfid;
929                 tfid->f_ver = ~0;
930                 linkea_add_buf(ldata, lname, tfid);
931         }
932
933         return linkea_add_buf(ldata, lname, pfid);
934 }
935
936 static int __mdd_links_del(const struct lu_env *env,
937                            struct mdd_object *mdd_obj,
938                            struct linkea_data *ldata,
939                            const struct lu_name *lname,
940                            const struct lu_fid *pfid)
941 {
942         int rc;
943
944         if (ldata->ld_leh == NULL) {
945                 rc = mdd_links_read(env, mdd_obj, ldata);
946                 if (rc)
947                         return rc;
948         }
949
950         rc = linkea_links_find(ldata, lname, pfid);
951         if (rc)
952                 return rc;
953
954         linkea_del_buf(ldata, lname);
955         return 0;
956 }
957
958 static int mdd_linkea_prepare(const struct lu_env *env,
959                               struct mdd_object *mdd_obj,
960                               const struct lu_fid *oldpfid,
961                               const struct lu_name *oldlname,
962                               const struct lu_fid *newpfid,
963                               const struct lu_name *newlname,
964                               int first, int check,
965                               struct linkea_data *ldata)
966 {
967         int rc = 0;
968         int rc2 = 0;
969         ENTRY;
970
971         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
972                 return 0;
973
974         LASSERT(oldpfid != NULL || newpfid != NULL);
975
976         if (mdd_obj->mod_flags & DEAD_OBJ)
977                 /* No more links, don't bother */
978                 RETURN(0);
979
980         if (oldpfid != NULL) {
981                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
982                 if (rc) {
983                         if ((check == 0) ||
984                             (rc != -ENODATA && rc != -ENOENT))
985                                 RETURN(rc);
986                         /* No changes done. */
987                         rc = 0;
988                 }
989         }
990
991         /* If renaming, add the new record */
992         if (newpfid != NULL) {
993                 /* even if the add fails, we still delete the out-of-date
994                  * old link */
995                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
996                                       first, check);
997                 if (rc2 == -EEXIST)
998                         rc2 = 0;
999         }
1000
1001         rc = rc != 0 ? rc : rc2;
1002
1003         RETURN(rc);
1004 }
1005
1006 int mdd_links_rename(const struct lu_env *env,
1007                      struct mdd_object *mdd_obj,
1008                      const struct lu_fid *oldpfid,
1009                      const struct lu_name *oldlname,
1010                      const struct lu_fid *newpfid,
1011                      const struct lu_name *newlname,
1012                      struct thandle *handle,
1013                      struct linkea_data *ldata,
1014                      int first, int check)
1015 {
1016         int rc2 = 0;
1017         int rc = 0;
1018         ENTRY;
1019
1020         if (ldata == NULL) {
1021                 ldata = &mdd_env_info(env)->mti_link_data;
1022                 memset(ldata, 0, sizeof(*ldata));
1023                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1024                                         newpfid, newlname, first, check,
1025                                         ldata);
1026                 if (rc != 0)
1027                         GOTO(out, rc);
1028         }
1029
1030         if (ldata->ld_lee != NULL)
1031                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1032         EXIT;
1033 out:
1034         if (rc == 0)
1035                 rc = rc2;
1036         if (rc) {
1037                 int error = 1;
1038                 if (rc == -EOVERFLOW || rc == -ENOENT)
1039                         error = 0;
1040                 if (oldpfid == NULL)
1041                         CDEBUG(error ? D_ERROR : D_OTHER,
1042                                "link_ea add '%.*s' failed %d "DFID"\n",
1043                                newlname->ln_namelen, newlname->ln_name,
1044                                rc, PFID(mdd_object_fid(mdd_obj)));
1045                 else if (newpfid == NULL)
1046                         CDEBUG(error ? D_ERROR : D_OTHER,
1047                                "link_ea del '%.*s' failed %d "DFID"\n",
1048                                oldlname->ln_namelen, oldlname->ln_name,
1049                                rc, PFID(mdd_object_fid(mdd_obj)));
1050                 else
1051                         CDEBUG(error ? D_ERROR : D_OTHER,
1052                                "link_ea rename '%.*s'->'%.*s' failed %d "
1053                                DFID"\n",
1054                                oldlname->ln_namelen, oldlname->ln_name,
1055                                newlname->ln_namelen, newlname->ln_name,
1056                                rc, PFID(mdd_object_fid(mdd_obj)));
1057         }
1058
1059         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1060                 /* if we vmalloced a large buffer drop it */
1061                 lu_buf_free(ldata->ld_buf);
1062
1063         return rc;
1064 }
1065
1066 static inline int mdd_links_add(const struct lu_env *env,
1067                                 struct mdd_object *mdd_obj,
1068                                 const struct lu_fid *pfid,
1069                                 const struct lu_name *lname,
1070                                 struct thandle *handle,
1071                                 struct linkea_data *data, int first)
1072 {
1073         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1074                                 pfid, lname, handle, data, first, 0);
1075 }
1076
1077 static inline int mdd_links_del(const struct lu_env *env,
1078                                 struct mdd_object *mdd_obj,
1079                                 const struct lu_fid *pfid,
1080                                 const struct lu_name *lname,
1081                                 struct thandle *handle)
1082 {
1083         return mdd_links_rename(env, mdd_obj, pfid, lname,
1084                                 NULL, NULL, handle, NULL, 0, 0);
1085 }
1086
1087 /** Read the link EA into a temp buffer.
1088  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1089  * A pointer to the buffer is stored in \a ldata::ld_buf.
1090  *
1091  * \retval 0 or error
1092  */
1093 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1094                    struct linkea_data *ldata)
1095 {
1096         int rc;
1097
1098         /* First try a small buf */
1099         LASSERT(env != NULL);
1100         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1101                                                CFS_PAGE_SIZE);
1102         if (ldata->ld_buf->lb_buf == NULL)
1103                 return -ENOMEM;
1104
1105         if (!mdd_object_exists(mdd_obj))
1106                 return -ENODATA;
1107
1108         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1109                           BYPASS_CAPA);
1110         if (rc == -ERANGE) {
1111                 /* Buf was too small, figure out what we need. */
1112                 lu_buf_free(ldata->ld_buf);
1113                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1114                                    XATTR_NAME_LINK, BYPASS_CAPA);
1115                 if (rc < 0)
1116                         return rc;
1117                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1118                 if (ldata->ld_buf->lb_buf == NULL)
1119                         return -ENOMEM;
1120                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1121                                   XATTR_NAME_LINK, BYPASS_CAPA);
1122         }
1123         if (rc < 0)
1124                 return rc;
1125
1126         linkea_init(ldata);
1127         return 0;
1128 }
1129
1130 /** Read the link EA into a temp buffer.
1131  * Uses the name_buf since it is generally large.
1132  * \retval IS_ERR err
1133  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1134  */
1135 struct lu_buf *mdd_links_get(const struct lu_env *env,
1136                              struct mdd_object *mdd_obj)
1137 {
1138         struct linkea_data ldata = { 0 };
1139         int rc;
1140
1141         rc = mdd_links_read(env, mdd_obj, &ldata);
1142         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1143 }
1144
1145 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1146                     struct linkea_data *ldata, struct thandle *handle)
1147 {
1148         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1149                                                      ldata->ld_leh->leh_len);
1150         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1151                              mdd_object_capa(env, mdd_obj));
1152 }
1153
1154 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1155                           struct thandle *handle, struct linkea_data *ldata)
1156 {
1157         int     rc;
1158         int     ea_len;
1159         void    *linkea;
1160
1161         if (ldata != NULL && ldata->ld_lee != NULL) {
1162                 ea_len = ldata->ld_leh->leh_len;
1163                 linkea = ldata->ld_buf->lb_buf;
1164         } else {
1165                 ea_len = 4096;
1166                 linkea = NULL;
1167         }
1168
1169         /* XXX: max size? */
1170         rc = mdo_declare_xattr_set(env, mdd_obj,
1171                                    mdd_buf_get_const(env, linkea, ea_len),
1172                                    XATTR_NAME_LINK, 0, handle);
1173         return rc;
1174 }
1175
1176 static inline int mdd_declare_links_del(const struct lu_env *env,
1177                                         struct mdd_object *c,
1178                                         struct thandle *handle)
1179 {
1180         int rc = 0;
1181
1182         /* For directory, the linkEA will be removed together
1183          * with the object. */
1184         if (!S_ISDIR(mdd_object_type(c)))
1185                 rc = mdd_declare_links_add(env, c, handle, NULL);
1186
1187         return rc;
1188 }
1189
1190 static int mdd_declare_link(const struct lu_env *env,
1191                             struct mdd_device *mdd,
1192                             struct mdd_object *p,
1193                             struct mdd_object *c,
1194                             const struct lu_name *name,
1195                             struct thandle *handle,
1196                             struct lu_attr *la,
1197                             struct linkea_data *data)
1198 {
1199         int rc;
1200
1201         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
1202         if (rc)
1203                 return rc;
1204
1205         rc = mdo_declare_ref_add(env, c, handle);
1206         if (rc)
1207                 return rc;
1208
1209         la->la_valid = LA_CTIME | LA_MTIME;
1210         rc = mdo_declare_attr_set(env, p, la, handle);
1211         if (rc != 0)
1212                 return rc;
1213
1214         la->la_valid = LA_CTIME;
1215         rc = mdo_declare_attr_set(env, c, la, handle);
1216         if (rc)
1217                 return rc;
1218
1219         rc = mdd_declare_links_add(env, c, handle, data);
1220         if (rc)
1221                 return rc;
1222
1223         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1224
1225         return rc;
1226 }
1227
1228 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1229                     struct md_object *src_obj, const struct lu_name *lname,
1230                     struct md_attr *ma)
1231 {
1232         const char *name = lname->ln_name;
1233         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1234         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1235         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1236         struct mdd_device *mdd = mdo2mdd(src_obj);
1237         struct dynlock_handle *dlh;
1238         struct thandle *handle;
1239         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1240         int rc;
1241         ENTRY;
1242
1243         handle = mdd_trans_create(env, mdd);
1244         if (IS_ERR(handle))
1245                 GOTO(out_pending, rc = PTR_ERR(handle));
1246
1247         memset(ldata, 0, sizeof(*ldata));
1248
1249         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1250         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1251
1252         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1253                               la, ldata);
1254         if (rc)
1255                 GOTO(stop, rc);
1256
1257         rc = mdd_trans_start(env, mdd, handle);
1258         if (rc)
1259                 GOTO(stop, rc);
1260
1261         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
1262         if (dlh == NULL)
1263                 GOTO(out_trans, rc = -ENOMEM);
1264         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1265
1266         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
1267         if (rc)
1268                 GOTO(out_unlock, rc);
1269
1270         rc = mdo_ref_add(env, mdd_sobj, handle);
1271         if (rc)
1272                 GOTO(out_unlock, rc);
1273
1274
1275         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1276                                      name, handle,
1277                                      mdd_object_capa(env, mdd_tobj));
1278         if (rc != 0) {
1279                 mdo_ref_del(env, mdd_sobj, handle);
1280                 GOTO(out_unlock, rc);
1281         }
1282
1283         la->la_valid = LA_CTIME | LA_MTIME;
1284         rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1285         if (rc)
1286                 GOTO(out_unlock, rc);
1287
1288         la->la_valid = LA_CTIME;
1289         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
1290         if (rc == 0) {
1291                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1292                                         mdo2fid(mdd_tobj), lname, 0, 0,
1293                                         ldata);
1294                 if (rc == 0)
1295                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1296                                       lname, handle, ldata, 0);
1297                 /* The failure of links_add should not cause the link
1298                  * failure, reset rc here */
1299                 rc = 0;
1300         }
1301         EXIT;
1302 out_unlock:
1303         mdd_write_unlock(env, mdd_sobj);
1304         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
1305 out_trans:
1306         if (rc == 0)
1307                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1308                                             mdd_tobj, lname, handle);
1309 stop:
1310         mdd_trans_stop(env, mdd, rc, handle);
1311
1312         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1313                 /* if we vmalloced a large buffer drop it */
1314                 lu_buf_free(ldata->ld_buf);
1315 out_pending:
1316         return rc;
1317 }
1318
1319 int mdd_declare_finish_unlink(const struct lu_env *env,
1320                               struct mdd_object *obj,
1321                               struct md_attr *ma,
1322                               struct thandle *handle)
1323 {
1324         int     rc;
1325
1326         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1327         if (rc)
1328                 return rc;
1329
1330         return mdo_declare_destroy(env, obj, handle);
1331 }
1332
1333 /* caller should take a lock before calling */
1334 int mdd_finish_unlink(const struct lu_env *env,
1335                       struct mdd_object *obj, struct md_attr *ma,
1336                       struct thandle *th)
1337 {
1338         int rc = 0;
1339         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1340         ENTRY;
1341
1342         LASSERT(mdd_write_locked(env, obj) != 0);
1343
1344         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
1345                 obj->mod_flags |= DEAD_OBJ;
1346                 /* add new orphan and the object
1347                  * will be deleted during mdd_close() */
1348                 if (obj->mod_count) {
1349                         rc = __mdd_orphan_add(env, obj, th);
1350                         if (rc == 0)
1351                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1352                                         "orphan list, open count = %d\n",
1353                                         PFID(mdd_object_fid(obj)),
1354                                         obj->mod_count);
1355                         else
1356                                 CERROR("Object "DFID" fail to be an orphan, "
1357                                        "open count = %d, maybe cause failed "
1358                                        "open replay\n",
1359                                         PFID(mdd_object_fid(obj)),
1360                                         obj->mod_count);
1361                 } else {
1362                         rc = mdo_destroy(env, obj, th);
1363                 }
1364         }
1365
1366         RETURN(rc);
1367 }
1368
1369 /*
1370  * pobj maybe NULL
1371  * has mdd_write_lock on cobj already, but not on pobj yet
1372  */
1373 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1374                             struct mdd_object *cobj, struct lu_attr *cattr)
1375 {
1376         int rc;
1377         ENTRY;
1378
1379         rc = mdd_may_delete(env, pobj, cobj, cattr, NULL, 1, 1);
1380
1381         RETURN(rc);
1382 }
1383
1384 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1385                               struct mdd_object *p, struct mdd_object *c,
1386                               const struct lu_name *name, struct md_attr *ma,
1387                               struct thandle *handle, int no_name)
1388 {
1389         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1390         int rc;
1391
1392         if (likely(no_name == 0)) {
1393                 rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1394                 if (rc)
1395                         return rc;
1396         }
1397
1398         rc = mdo_declare_ref_del(env, p, handle);
1399         if (rc)
1400                 return rc;
1401
1402         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1403         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1404         la->la_valid = LA_CTIME | LA_MTIME;
1405         rc = mdo_declare_attr_set(env, p, la, handle);
1406         if (rc)
1407                 return rc;
1408
1409         if (c != NULL) {
1410                 rc = mdo_declare_ref_del(env, c, handle);
1411                 if (rc)
1412                         return rc;
1413
1414                 rc = mdo_declare_ref_del(env, c, handle);
1415                 if (rc)
1416                         return rc;
1417
1418                 la->la_valid = LA_CTIME;
1419                 rc = mdo_declare_attr_set(env, c, la, handle);
1420                 if (rc)
1421                         return rc;
1422
1423                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1424                 if (rc)
1425                         return rc;
1426
1427                 rc = mdd_declare_links_del(env, c, handle);
1428                 if (rc != 0)
1429                         return rc;
1430
1431                 /* FIXME: need changelog for remove entry */
1432                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1433         }
1434
1435         return rc;
1436 }
1437
1438 /**
1439  * Delete name entry and the object.
1440  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1441  * does not exist for this object, and it could only happen during resending
1442  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1443  * is also needed in this case(needed by changelog), so we have to add another
1444  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1445  * the ENOENT failure should be able to be fixed by redo mechanism.
1446  */
1447 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1448                       struct md_object *cobj, const struct lu_name *lname,
1449                       struct md_attr *ma, int no_name)
1450 {
1451         const char *name = lname->ln_name;
1452         struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
1453         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1454         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1455         struct mdd_object *mdd_cobj = NULL;
1456         struct mdd_device *mdd = mdo2mdd(pobj);
1457         struct dynlock_handle *dlh;
1458         struct thandle    *handle;
1459         int rc, is_dir = 0;
1460         ENTRY;
1461
1462         /* cobj == NULL means only delete name entry */
1463         if (likely(cobj != NULL)) {
1464                 mdd_cobj = md2mdd_obj(cobj);
1465                 if (mdd_object_exists(mdd_cobj) == 0)
1466                         RETURN(-ENOENT);
1467                 /* currently it is assume, it could only delete
1468                  * name entry of remote directory */
1469                 is_dir = 1;
1470         }
1471
1472         handle = mdd_trans_create(env, mdd);
1473         if (IS_ERR(handle))
1474                 RETURN(PTR_ERR(handle));
1475
1476         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1477                                 lname, ma, handle, no_name);
1478         if (rc)
1479                 GOTO(stop, rc);
1480
1481         rc = mdd_trans_start(env, mdd, handle);
1482         if (rc)
1483                 GOTO(stop, rc);
1484
1485         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1486         if (dlh == NULL)
1487                 GOTO(stop, rc = -ENOMEM);
1488
1489         if (likely(mdd_cobj != NULL)) {
1490                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1491
1492                 /* fetch cattr */
1493                 rc = mdd_la_get(env, mdd_cobj, cattr,
1494                                 mdd_object_capa(env, mdd_cobj));
1495                 if (rc)
1496                         GOTO(cleanup, rc);
1497
1498                 is_dir = S_ISDIR(cattr->la_mode);
1499
1500         }
1501
1502         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
1503         if (rc)
1504                 GOTO(cleanup, rc);
1505
1506         if (likely(no_name == 0)) {
1507                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1508                                         mdd_object_capa(env, mdd_pobj));
1509                 if (rc)
1510                         GOTO(cleanup, rc);
1511         }
1512
1513         if (likely(mdd_cobj != NULL)) {
1514                 rc = mdo_ref_del(env, mdd_cobj, handle);
1515                 if (rc != 0) {
1516                         __mdd_index_insert_only(env, mdd_pobj,
1517                                                 mdo2fid(mdd_cobj),
1518                                                 name, handle,
1519                                                 mdd_object_capa(env, mdd_pobj));
1520                         GOTO(cleanup, rc);
1521                 }
1522
1523                 if (is_dir)
1524                         /* unlink dot */
1525                         mdo_ref_del(env, mdd_cobj, handle);
1526
1527                 /* fetch updated nlink */
1528                 rc = mdd_la_get(env, mdd_cobj, cattr,
1529                                 mdd_object_capa(env, mdd_cobj));
1530                 if (rc)
1531                         GOTO(cleanup, rc);
1532         }
1533
1534         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1535         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1536
1537         la->la_valid = LA_CTIME | LA_MTIME;
1538         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1539         if (rc)
1540                 GOTO(cleanup, rc);
1541
1542         /* Enough for only unlink the entry */
1543         if (unlikely(mdd_cobj == NULL)) {
1544                 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1545                 GOTO(stop, rc);
1546         }
1547
1548         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1549                 /* update ctime of an unlinked file only if it is still
1550                  * opened or a link still exists */
1551                 la->la_valid = LA_CTIME;
1552                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1553                 if (rc)
1554                         GOTO(cleanup, rc);
1555         }
1556
1557         /* XXX: this transfer to ma will be removed with LOD/OSP */
1558         ma->ma_attr = *cattr;
1559         ma->ma_valid |= MA_INODE;
1560         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1561
1562         /* fetch updated nlink */
1563         if (rc == 0)
1564                 rc = mdd_la_get(env, mdd_cobj, cattr,
1565                                 mdd_object_capa(env, mdd_cobj));
1566
1567         if (!is_dir)
1568                 /* old files may not have link ea; ignore errors */
1569                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1570
1571         /* if object is removed then we can't get its attrs, use last get */
1572         if (cattr->la_nlink == 0) {
1573                 ma->ma_attr = *cattr;
1574                 ma->ma_valid |= MA_INODE;
1575         }
1576         EXIT;
1577 cleanup:
1578         mdd_write_unlock(env, mdd_cobj);
1579         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1580         if (rc == 0) {
1581                 int cl_flags;
1582
1583                 cl_flags = (cattr->la_nlink == 0) ? CLF_UNLINK_LAST : 0;
1584                 if ((ma->ma_valid & MA_HSM) &&
1585                     (ma->ma_hsm.mh_flags & HS_EXISTS))
1586                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1587
1588                 rc = mdd_changelog_ns_store(env, mdd,
1589                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1590                         mdd_cobj, mdd_pobj, lname, handle);
1591         }
1592
1593 stop:
1594         mdd_trans_stop(env, mdd, rc, handle);
1595
1596         return rc;
1597 }
1598
1599 /*
1600  * The permission has been checked when obj created, no need check again.
1601  */
1602 static int mdd_cd_sanity_check(const struct lu_env *env,
1603                                struct mdd_object *obj)
1604 {
1605         ENTRY;
1606
1607         /* EEXIST check */
1608         if (!obj || mdd_is_dead_obj(obj))
1609                 RETURN(-ENOENT);
1610
1611         RETURN(0);
1612
1613 }
1614
1615 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1616                            struct md_object *cobj, const struct md_op_spec *spec,
1617                            struct md_attr *ma)
1618 {
1619         struct mdd_device *mdd = mdo2mdd(cobj);
1620         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1621         struct mdd_object *son = md2mdd_obj(cobj);
1622         struct thandle    *handle;
1623         const struct lu_buf *buf;
1624         struct lu_attr    *attr = &mdd_env_info(env)->mti_cattr;
1625         int                rc;
1626         ENTRY;
1627
1628         /* do not let users to create stripes via .lustre/
1629          * mdd_obf_setup() sets IMMUTE_OBJ on this directory */
1630         if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ)
1631                 RETURN(-ENOENT);
1632
1633         rc = mdd_cd_sanity_check(env, son);
1634         if (rc)
1635                 RETURN(rc);
1636
1637         if (!md_should_create(spec->sp_cr_flags))
1638                 RETURN(0);
1639
1640         /*
1641          * there are following use cases for this function:
1642          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1643          *    striping can be specified or not
1644          * 2) CMD?
1645          */
1646         rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
1647         if (rc)
1648                 RETURN(rc);
1649
1650         /* calling ->ah_make_hint() is used to transfer information from parent */
1651         mdd_object_make_hint(env, mdd_pobj, son, attr);
1652
1653         handle = mdd_trans_create(env, mdd);
1654         if (IS_ERR(handle))
1655                 GOTO(out_free, rc = PTR_ERR(handle));
1656
1657         /*
1658          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1659          * Should this be fixed?
1660          */
1661         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
1662                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1663                spec->sp_cr_flags, spec->no_create);
1664
1665         if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
1666                 /* replay case or lfs setstripe */
1667                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1668                                         spec->u.sp_ea.eadatalen);
1669         } else {
1670                 buf = &LU_BUF_NULL;
1671         }
1672
1673         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1674                                   XATTR_NAME_LOV, 0, handle);
1675         if (rc)
1676                 GOTO(stop, rc);
1677
1678         rc = mdd_trans_start(env, mdd, handle);
1679         if (rc)
1680                 GOTO(stop, rc);
1681
1682         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1683                           0, handle, mdd_object_capa(env, son));
1684 stop:
1685         mdd_trans_stop(env, mdd, rc, handle);
1686 out_free:
1687         RETURN(rc);
1688 }
1689
1690 /* Get fid from name and parent */
1691 static int
1692 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1693              const struct lu_name *lname, struct lu_fid* fid, int mask)
1694 {
1695         const char          *name = lname->ln_name;
1696         const struct dt_key *key = (const struct dt_key *)name;
1697         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1698         struct mdd_device   *m = mdo2mdd(pobj);
1699         struct dt_object    *dir = mdd_object_child(mdd_obj);
1700         int rc;
1701         ENTRY;
1702
1703         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1704                 RETURN(-ESTALE);
1705
1706         if (mdd_object_remote(mdd_obj)) {
1707                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
1708                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
1709         } else if (!mdd_object_exists(mdd_obj)) {
1710                 RETURN(-ESTALE);
1711         }
1712
1713         /* The common filename length check. */
1714         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1715                 RETURN(-ENAMETOOLONG);
1716
1717         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1718                                             MOR_TGT_PARENT);
1719         if (rc)
1720                 RETURN(rc);
1721
1722         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1723                    dt_try_as_dir(env, dir))) {
1724
1725                 rc = dir->do_index_ops->dio_lookup(env, dir,
1726                                                  (struct dt_rec *)fid, key,
1727                                                  mdd_object_capa(env, mdd_obj));
1728                 if (rc > 0)
1729                         rc = 0;
1730                 else if (rc == 0)
1731                         rc = -ENOENT;
1732         } else
1733                 rc = -ENOTDIR;
1734
1735         RETURN(rc);
1736 }
1737
1738 static int mdd_declare_object_initialize(const struct lu_env *env,
1739                                          struct mdd_object *parent,
1740                                          struct mdd_object *child,
1741                                          struct lu_attr *attr,
1742                                          struct thandle *handle,
1743                                          struct linkea_data *ldata)
1744 {
1745         int rc;
1746         ENTRY;
1747
1748         /*
1749          * inode mode has been set in creation time, and it's based on umask,
1750          * la_mode and acl, don't set here again! (which will go wrong
1751          * because below function doesn't consider umask).
1752          * I'd suggest set all object attributes in creation time, see above.
1753          */
1754         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1755         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1756         rc = mdo_declare_attr_set(env, child, attr, handle);
1757         attr->la_valid |= LA_MODE | LA_TYPE;
1758         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1759                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1760                                               dot, handle);
1761                 if (rc == 0)
1762                         rc = mdo_declare_ref_add(env, child, handle);
1763
1764                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1765                                               dotdot, handle);
1766         }
1767
1768         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1769                         fid_is_dot_lustre(mdo2fid(child)) ||
1770                         fid_is_root(mdo2fid(child))))
1771                 mdd_declare_links_add(env, child, handle, ldata);
1772
1773         RETURN(rc);
1774 }
1775
1776 static int mdd_object_initialize(const struct lu_env *env,
1777                                  const struct lu_fid *pfid,
1778                                  const struct lu_name *lname,
1779                                  struct mdd_object *child,
1780                                  struct lu_attr *attr, struct thandle *handle,
1781                                  const struct md_op_spec *spec,
1782                                  struct linkea_data *ldata)
1783 {
1784         int rc;
1785         ENTRY;
1786
1787         /*
1788          * Update attributes for child.
1789          *
1790          * FIXME:
1791          *  (1) the valid bits should be converted between Lustre and Linux;
1792          *  (2) maybe, the child attributes should be set in OSD when creation.
1793          */
1794
1795         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1796         /* arguments are supposed to stay the same */
1797         if (S_ISDIR(attr->la_mode)) {
1798                 /* Add "." and ".." for newly created dir */
1799                 mdo_ref_add(env, child, handle);
1800                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1801                                              dot, handle, BYPASS_CAPA);
1802                 if (rc == 0)
1803                         rc = __mdd_index_insert_only(env, child, pfid,
1804                                                      dotdot, handle,
1805                                                      BYPASS_CAPA);
1806                 if (rc != 0)
1807                         mdo_ref_del(env, child, handle);
1808         }
1809
1810         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1811                         fid_is_dot_lustre(mdo2fid(child)) ||
1812                         fid_is_root(mdo2fid(child))))
1813                 mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
1814
1815         RETURN(rc);
1816 }
1817
1818 /* has not lock on pobj yet */
1819 static int mdd_create_sanity_check(const struct lu_env *env,
1820                                    struct md_object *pobj,
1821                                    struct lu_attr *pattr,
1822                                    const struct lu_name *lname,
1823                                    struct lu_attr *cattr,
1824                                    struct md_op_spec *spec)
1825 {
1826         struct mdd_thread_info *info = mdd_env_info(env);
1827         struct lu_fid     *fid       = &info->mti_fid;
1828         struct mdd_object *obj       = md2mdd_obj(pobj);
1829         struct mdd_device *m         = mdo2mdd(pobj);
1830         int rc;
1831         ENTRY;
1832
1833         /* EEXIST check */
1834         if (mdd_is_dead_obj(obj))
1835                 RETURN(-ENOENT);
1836
1837         /*
1838          * In some cases this lookup is not needed - we know before if name
1839          * exists or not because MDT performs lookup for it.
1840          * name length check is done in lookup.
1841          */
1842         if (spec->sp_cr_lookup) {
1843                 /*
1844                  * Check if the name already exist, though it will be checked in
1845                  * _index_insert also, for avoiding rolling back if exists
1846                  * _index_insert.
1847                  */
1848                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1849                                          MAY_WRITE | MAY_EXEC);
1850                 if (rc != -ENOENT)
1851                         RETURN(rc ? : -EEXIST);
1852         } else {
1853                 /*
1854                  * Check WRITE permission for the parent.
1855                  * EXEC permission have been checked
1856                  * when lookup before create already.
1857                  */
1858                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1859                                                     MOR_TGT_PARENT);
1860                 if (rc)
1861                         RETURN(rc);
1862         }
1863
1864         /* sgid check */
1865         if (pattr->la_mode & S_ISGID) {
1866                 cattr->la_gid = pattr->la_gid;
1867                 if (S_ISDIR(cattr->la_mode)) {
1868                         cattr->la_mode |= S_ISGID;
1869                         cattr->la_valid |= LA_MODE;
1870                 }
1871         }
1872
1873         switch (cattr->la_mode & S_IFMT) {
1874         case S_IFLNK: {
1875                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1876
1877                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1878                         RETURN(-ENAMETOOLONG);
1879                 else
1880                         RETURN(0);
1881         }
1882         case S_IFDIR:
1883         case S_IFREG:
1884         case S_IFCHR:
1885         case S_IFBLK:
1886         case S_IFIFO:
1887         case S_IFSOCK:
1888                 rc = 0;
1889                 break;
1890         default:
1891                 rc = -EINVAL;
1892                 break;
1893         }
1894         RETURN(rc);
1895 }
1896
1897 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1898                               struct mdd_object *p, struct mdd_object *c,
1899                               const struct lu_name *name,
1900                               struct lu_attr *attr,
1901                               int got_def_acl,
1902                               struct thandle *handle,
1903                               const struct md_op_spec *spec,
1904                               struct linkea_data *ldata)
1905 {
1906         int rc;
1907
1908         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1909         if (rc)
1910                 GOTO(out, rc);
1911
1912 #ifdef CONFIG_FS_POSIX_ACL
1913         if (got_def_acl > 0) {
1914                 struct lu_buf *acl_buf;
1915
1916                 acl_buf = mdd_buf_get(env, NULL, got_def_acl);
1917                 /* if dir, then can inherit default ACl */
1918                 if (S_ISDIR(attr->la_mode)) {
1919                         rc = mdo_declare_xattr_set(env, c, acl_buf,
1920                                                    XATTR_NAME_ACL_DEFAULT,
1921                                                    0, handle);
1922                         if (rc)
1923                                 GOTO(out, rc);
1924                 }
1925
1926                 rc = mdo_declare_attr_set(env, c, attr, handle);
1927                 if (rc)
1928                         GOTO(out, rc);
1929
1930                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1931                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1932                 if (rc)
1933                         GOTO(out, rc);
1934         }
1935 #endif
1936
1937         if (S_ISDIR(attr->la_mode)) {
1938                 rc = mdo_declare_ref_add(env, p, handle);
1939                 if (rc)
1940                         GOTO(out, rc);
1941         }
1942
1943         rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
1944         if (rc)
1945                 GOTO(out, rc);
1946
1947         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1948                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1949         else
1950                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1951                                               name->ln_name, handle);
1952         if (rc)
1953                 GOTO(out, rc);
1954
1955         /* replay case, create LOV EA from client data */
1956         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1957                 const struct lu_buf *buf;
1958
1959                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1960                                         spec->u.sp_ea.eadatalen);
1961                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1962                                            0, handle);
1963                 if (rc)
1964                         GOTO(out, rc);
1965         }
1966
1967         if (S_ISLNK(attr->la_mode)) {
1968                 rc = dt_declare_record_write(env, mdd_object_child(c),
1969                                              strlen(spec->u.sp_symname), 0,
1970                                              handle);
1971                 if (rc)
1972                         GOTO(out, rc);
1973         }
1974
1975         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1976                 rc = mdo_declare_attr_set(env, p, attr, handle);
1977                 if (rc)
1978                         return rc;
1979         }
1980
1981         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1982         if (rc)
1983                 return rc;
1984
1985 out:
1986         return rc;
1987 }
1988
1989 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1990                         struct lu_attr *la, struct lu_buf *acl_buf,
1991                         int *got_def_acl, int *reset_acl)
1992 {
1993         int     rc;
1994         ENTRY;
1995
1996         if (S_ISLNK(la->la_mode))
1997                 RETURN(0);
1998
1999         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
2000         rc = mdo_xattr_get(env, pobj, acl_buf,
2001                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
2002         mdd_read_unlock(env, pobj);
2003         if (rc > 0) {
2004                 /* If there are default ACL, fix mode by default ACL */
2005                 *got_def_acl = rc;
2006                 acl_buf->lb_len = rc;
2007                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2008                 if (rc < 0)
2009                         RETURN(rc);
2010                 *reset_acl = rc;
2011         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2012                 /* If there are no default ACL, fix mode by mask */
2013                 struct lu_ucred *uc = lu_ucred(env);
2014
2015                 /* The create triggered by MDT internal events, such as
2016                  * LFSCK reset, will not contain valid "uc". */
2017                 if (unlikely(uc != NULL))
2018                         la->la_mode &= ~uc->uc_umask;
2019                 rc = 0;
2020         }
2021
2022         RETURN(rc);
2023 }
2024
2025 /*
2026  * Create object and insert it into namespace.
2027  */
2028 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2029                       const struct lu_name *lname, struct md_object *child,
2030                       struct md_op_spec *spec, struct md_attr* ma)
2031 {
2032         struct mdd_thread_info  *info = mdd_env_info(env);
2033         struct lu_attr          *la = &info->mti_la_for_fix;
2034         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2035         struct mdd_object       *son = md2mdd_obj(child);
2036         struct mdd_device       *mdd = mdo2mdd(pobj);
2037         struct lu_attr          *attr = &ma->ma_attr;
2038         struct thandle          *handle;
2039         struct lu_attr          *pattr = &info->mti_pattr;
2040         struct lu_buf           acl_buf;
2041         struct linkea_data      *ldata = &info->mti_link_data;
2042         struct dynlock_handle   *dlh;
2043         const char              *name = lname->ln_name;
2044         int                      rc, created = 0, initialized = 0, inserted = 0;
2045         int                      got_def_acl = 0;
2046         int                      reset_acl = 0;
2047         ENTRY;
2048
2049         /*
2050          * Two operations have to be performed:
2051          *
2052          *  - an allocation of a new object (->do_create()), and
2053          *
2054          *  - an insertion into a parent index (->dio_insert()).
2055          *
2056          * Due to locking, operation order is not important, when both are
2057          * successful, *but* error handling cases are quite different:
2058          *
2059          *  - if insertion is done first, and following object creation fails,
2060          *  insertion has to be rolled back, but this operation might fail
2061          *  also leaving us with dangling index entry.
2062          *
2063          *  - if creation is done first, is has to be undone if insertion
2064          *  fails, leaving us with leaked space, which is neither good, nor
2065          *  fatal.
2066          *
2067          * It seems that creation-first is simplest solution, but it is
2068          * sub-optimal in the frequent
2069          *
2070          *         $ mkdir foo
2071          *         $ mkdir foo
2072          *
2073          * case, because second mkdir is bound to create object, only to
2074          * destroy it immediately.
2075          *
2076          * To avoid this follow local file systems that do double lookup:
2077          *
2078          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2079          *
2080          *     1. create            (mdd_object_create_internal())
2081          *
2082          *     2. insert            (__mdd_index_insert(), lookup again)
2083          */
2084
2085         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2086         if (rc != 0)
2087                 RETURN(rc);
2088
2089         /* Sanity checks before big job. */
2090         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2091         if (rc)
2092                 RETURN(rc);
2093
2094         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2095                 GOTO(out_free, rc = -EINPROGRESS);
2096
2097         acl_buf.lb_buf = info->mti_xattr_buf;
2098         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2099         rc = mdd_acl_init(env, mdd_pobj, attr, &acl_buf, &got_def_acl,
2100                           &reset_acl);
2101         if (rc < 0)
2102                 GOTO(out_free, rc);
2103
2104         mdd_object_make_hint(env, mdd_pobj, son, attr);
2105
2106         handle = mdd_trans_create(env, mdd);
2107         if (IS_ERR(handle))
2108                 GOTO(out_free, rc = PTR_ERR(handle));
2109
2110         memset(ldata, 0, sizeof(*ldata));
2111         mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
2112                            lname, 1, 0, ldata);
2113         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2114                                 got_def_acl, handle, spec, ldata);
2115         if (rc)
2116                 GOTO(out_stop, rc);
2117
2118         rc = mdd_trans_start(env, mdd, handle);
2119         if (rc)
2120                 GOTO(out_stop, rc);
2121
2122         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
2123         if (dlh == NULL)
2124                 GOTO(out_trans, rc = -ENOMEM);
2125
2126         mdd_write_lock(env, son, MOR_TGT_CHILD);
2127         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
2128         if (rc) {
2129                 mdd_write_unlock(env, son);
2130                 GOTO(cleanup, rc);
2131         }
2132
2133         created = 1;
2134
2135 #ifdef CONFIG_FS_POSIX_ACL
2136         if (got_def_acl) {
2137                 /* set default acl */
2138                 if (S_ISDIR(attr->la_mode)) {
2139                         LASSERTF(acl_buf.lb_len  == got_def_acl,
2140                                  "invalid acl_buf: %p:%d got_def %d\n",
2141                                  acl_buf.lb_buf, (int)acl_buf.lb_len,
2142                                  got_def_acl);
2143                         rc = mdo_xattr_set(env, son, &acl_buf,
2144                                            XATTR_NAME_ACL_DEFAULT, 0,
2145                                            handle, BYPASS_CAPA);
2146                         if (rc) {
2147                                 mdd_write_unlock(env, son);
2148                                 GOTO(cleanup, rc);
2149                         }
2150                 }
2151
2152                 /* set its own acl */
2153                 if (reset_acl) {
2154                         LASSERTF(acl_buf.lb_buf != NULL && acl_buf.lb_len != 0,
2155                                  "invalid acl_buf %p:%d\n", acl_buf.lb_buf,
2156                                  (int)acl_buf.lb_len);
2157                         rc = mdo_xattr_set(env, son, &acl_buf,
2158                                            XATTR_NAME_ACL_ACCESS,
2159                                            0, handle, BYPASS_CAPA);
2160                         if (rc) {
2161                                 mdd_write_unlock(env, son);
2162                                 GOTO(cleanup, rc);
2163                         }
2164                 }
2165         }
2166 #endif
2167
2168         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
2169                                    son, attr, handle, spec, ldata);
2170
2171         /*
2172          * in case of replay we just set LOVEA provided by the client
2173          * XXX: I think it would be interesting to try "old" way where
2174          *      MDT calls this xattr_set(LOV) in a different transaction.
2175          *      probably this way we code can be made better.
2176          */
2177         if (rc == 0 && (spec->no_create ||
2178                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
2179                 const struct lu_buf *buf;
2180
2181                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2182                                 spec->u.sp_ea.eadatalen);
2183                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
2184                                 BYPASS_CAPA);
2185         }
2186
2187         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2188                 rc = __mdd_orphan_add(env, son, handle);
2189
2190         mdd_write_unlock(env, son);
2191
2192         if (rc != 0)
2193                 /*
2194                  * Object has no links, so it will be destroyed when last
2195                  * reference is released. (XXX not now.)
2196                  */
2197                 GOTO(cleanup, rc);
2198
2199         initialized = 1;
2200
2201         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
2202                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2203                                         name, S_ISDIR(attr->la_mode), handle,
2204                                         mdd_object_capa(env, mdd_pobj));
2205
2206         if (rc != 0)
2207                 GOTO(cleanup, rc);
2208
2209         inserted = 1;
2210
2211         if (S_ISLNK(attr->la_mode)) {
2212                 struct lu_ucred  *uc = lu_ucred_assert(env);
2213                 struct dt_object *dt = mdd_object_child(son);
2214                 const char *target_name = spec->u.sp_symname;
2215                 int sym_len = strlen(target_name);
2216                 const struct lu_buf *buf;
2217                 loff_t pos = 0;
2218
2219                 buf = mdd_buf_get_const(env, target_name, sym_len);
2220                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2221                                                 mdd_object_capa(env, son),
2222                                                 uc->uc_cap &
2223                                                 CFS_CAP_SYS_RESOURCE_MASK);
2224
2225                 if (rc == sym_len)
2226                         rc = 0;
2227                 else
2228                         GOTO(cleanup, rc = -EFAULT);
2229         }
2230
2231         /* volatile file creation does not update parent directory times */
2232         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2233                 GOTO(cleanup, rc = 0);
2234
2235         /* update parent directory mtime/ctime */
2236         *la = *attr;
2237         la->la_valid = LA_CTIME | LA_MTIME;
2238         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
2239         if (rc)
2240                 GOTO(cleanup, rc);
2241
2242         EXIT;
2243 cleanup:
2244         if (rc != 0 && created != 0) {
2245                 int rc2;
2246
2247                 if (inserted != 0) {
2248                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2249                                 rc2 = __mdd_orphan_del(env, son, handle);
2250                         else
2251                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
2252                                                          S_ISDIR(attr->la_mode),
2253                                                          handle, BYPASS_CAPA);
2254                         if (rc2 != 0)
2255                                 goto out_stop;
2256                 }
2257
2258                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2259                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
2260                         /* Drop the reference, no need to delete "."/"..",
2261                          * because the object to be destroied directly. */
2262                         rc2 = mdo_ref_del(env, son, handle);
2263                         if (rc2 != 0) {
2264                                 mdd_write_unlock(env, son);
2265                                 goto out_stop;
2266                         }
2267                 }
2268
2269                 rc2 = mdo_ref_del(env, son, handle);
2270                 if (rc2 != 0) {
2271                         mdd_write_unlock(env, son);
2272                         goto out_stop;
2273                 }
2274
2275                 mdo_destroy(env, son, handle);
2276                 mdd_write_unlock(env, son);
2277         }
2278
2279         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
2280 out_trans:
2281         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)))
2282                 rc = mdd_changelog_ns_store(env, mdd,
2283                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
2284                         S_ISREG(attr->la_mode) ? CL_CREATE :
2285                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2286                         0, son, mdd_pobj, lname, handle);
2287 out_stop:
2288         mdd_trans_stop(env, mdd, rc, handle);
2289 out_free:
2290         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2291                 /* if we vmalloced a large buffer drop it */
2292                 lu_buf_free(ldata->ld_buf);
2293
2294         /* The child object shouldn't be cached anymore */
2295         if (rc)
2296                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2297                             &child->mo_lu.lo_header->loh_flags);
2298         return rc;
2299 }
2300
2301 /*
2302  * Get locks on parents in proper order
2303  * RETURN: < 0 - error, rename_order if successful
2304  */
2305 enum rename_order {
2306         MDD_RN_SAME,
2307         MDD_RN_SRCTGT,
2308         MDD_RN_TGTSRC
2309 };
2310
2311 static int mdd_rename_order(const struct lu_env *env,
2312                             struct mdd_device *mdd,
2313                             struct mdd_object *src_pobj,
2314                             struct mdd_object *tgt_pobj)
2315 {
2316         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2317         int rc;
2318         ENTRY;
2319
2320         if (src_pobj == tgt_pobj)
2321                 RETURN(MDD_RN_SAME);
2322
2323         /* compared the parent child relationship of src_p&tgt_p */
2324         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2325                 rc = MDD_RN_SRCTGT;
2326         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2327                 rc = MDD_RN_TGTSRC;
2328         } else {
2329                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2330                 if (rc == -EREMOTE)
2331                         rc = 0;
2332
2333                 if (rc == 1)
2334                         rc = MDD_RN_TGTSRC;
2335                 else if (rc == 0)
2336                         rc = MDD_RN_SRCTGT;
2337         }
2338
2339         RETURN(rc);
2340 }
2341
2342 /* has not mdd_write{read}_lock on any obj yet. */
2343 static int mdd_rename_sanity_check(const struct lu_env *env,
2344                                    struct mdd_object *src_pobj,
2345                                    struct mdd_object *tgt_pobj,
2346                                    struct mdd_object *sobj,
2347                                    struct mdd_object *tobj,
2348                                    struct lu_attr *so_attr,
2349                                    struct lu_attr *tg_attr)
2350 {
2351         int rc = 0;
2352         ENTRY;
2353
2354         /* XXX: when get here, sobj must NOT be NULL,
2355          * the other case has been processed in cld_rename
2356          * before mdd_rename and enable MDS_PERM_BYPASS. */
2357         LASSERT(sobj);
2358
2359         rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
2360         if (rc)
2361                 RETURN(rc);
2362
2363         /* XXX: when get here, "tobj == NULL" means tobj must
2364          * NOT exist (neither on remote MDS, such case has been
2365          * processed in cld_rename before mdd_rename and enable
2366          * MDS_PERM_BYPASS).
2367          * So check may_create, but not check may_unlink. */
2368         if (!tobj)
2369                 rc = mdd_may_create(env, tgt_pobj, NULL,
2370                                     (src_pobj != tgt_pobj), 0);
2371         else
2372                 rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
2373                                     (src_pobj != tgt_pobj), 1);
2374
2375         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2376             S_ISDIR(so_attr->la_mode))
2377                 rc = __mdd_may_link(env, tgt_pobj);
2378
2379         RETURN(rc);
2380 }
2381
2382 static int mdd_declare_rename(const struct lu_env *env,
2383                               struct mdd_device *mdd,
2384                               struct mdd_object *mdd_spobj,
2385                               struct mdd_object *mdd_tpobj,
2386                               struct mdd_object *mdd_sobj,
2387                               struct mdd_object *mdd_tobj,
2388                               const struct lu_name *tname,
2389                               const struct lu_name *sname,
2390                               struct md_attr *ma,
2391                               struct linkea_data *ldata,
2392                               struct thandle *handle)
2393 {
2394         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2395         int rc;
2396
2397         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2398         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2399
2400         LASSERT(mdd_spobj);
2401         LASSERT(mdd_tpobj);
2402         LASSERT(mdd_sobj);
2403
2404         /* name from source dir */
2405         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2406         if (rc)
2407                 return rc;
2408
2409         /* .. from source child */
2410         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2411                 /* source child can be directory,
2412                  * counted by source dir's nlink */
2413                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2414                 if (rc)
2415                         return rc;
2416                 if (mdd_spobj != mdd_tpobj) {
2417                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2418                                                       handle);
2419                         if (rc)
2420                                 return rc;
2421
2422                         rc = mdo_declare_index_insert(env, mdd_sobj,
2423                                                       mdo2fid(mdd_tpobj),
2424                                                       dotdot, handle);
2425                         if (rc)
2426                                 return rc;
2427                 }
2428                 /* new target child can be directory,
2429                  * counted by target dir's nlink */
2430                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2431                 if (rc)
2432                         return rc;
2433
2434         }
2435
2436         la->la_valid = LA_CTIME | LA_MTIME;
2437         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2438         if (rc != 0)
2439                 return rc;
2440
2441         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2442         if (rc != 0)
2443                 return rc;
2444
2445         la->la_valid = LA_CTIME;
2446         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2447         if (rc)
2448                 return rc;
2449
2450         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2451         if (rc)
2452                 return rc;
2453
2454         /* new name */
2455         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2456                         tname->ln_name, handle);
2457         if (rc)
2458                 return rc;
2459
2460         /* name from target dir (old name), we declare it unconditionally
2461          * as mdd_rename() calls delete unconditionally as well. so just
2462          * to balance declarations vs calls to change ... */
2463         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2464         if (rc)
2465                 return rc;
2466
2467         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2468                 /* delete target child in target parent directory */
2469                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2470                 if (rc)
2471                         return rc;
2472
2473                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2474                         /* target child can be directory,
2475                          * delete "." reference in target child directory */
2476                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2477                         if (rc)
2478                                 return rc;
2479
2480                         /* delete ".." reference in target parent directory */
2481                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2482                         if (rc)
2483                                 return rc;
2484                 }
2485
2486                 la->la_valid = LA_CTIME;
2487                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2488                 if (rc)
2489                         return rc;
2490
2491                 mdd_declare_links_del(env, mdd_tobj, handle);
2492                 if (rc)
2493                         return rc;
2494
2495                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2496                 if (rc)
2497                         return rc;
2498         }
2499
2500         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2501         if (rc)
2502                 return rc;
2503
2504         return rc;
2505 }
2506
2507 /* src object can be remote that is why we use only fid and type of object */
2508 static int mdd_rename(const struct lu_env *env,
2509                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2510                       const struct lu_fid *lf, const struct lu_name *lsname,
2511                       struct md_object *tobj, const struct lu_name *ltname,
2512                       struct md_attr *ma)
2513 {
2514         const char *sname = lsname->ln_name;
2515         const char *tname = ltname->ln_name;
2516         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2517         struct lu_attr    *so_attr = &mdd_env_info(env)->mti_cattr;
2518         struct lu_attr    *tg_attr = &mdd_env_info(env)->mti_pattr;
2519         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2520         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2521         struct mdd_device *mdd = mdo2mdd(src_pobj);
2522         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2523         struct mdd_object *mdd_tobj = NULL;
2524         struct dynlock_handle *sdlh = NULL, *tdlh = NULL;
2525         struct thandle *handle;
2526         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2527         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2528         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2529         bool is_dir;
2530         bool tobj_ref = 0;
2531         bool tobj_locked = 0;
2532         unsigned cl_flags = 0;
2533         int rc, rc2;
2534         ENTRY;
2535
2536         if (tobj)
2537                 mdd_tobj = md2mdd_obj(tobj);
2538
2539         mdd_sobj = mdd_object_find(env, mdd, lf);
2540
2541         rc = mdd_la_get(env, mdd_sobj, so_attr,
2542                         mdd_object_capa(env, mdd_sobj));
2543         if (rc)
2544                 GOTO(out_pending, rc);
2545
2546         if (mdd_tobj) {
2547                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2548                                 mdd_object_capa(env, mdd_tobj));
2549                 if (rc)
2550                         GOTO(out_pending, rc);
2551         }
2552
2553         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj,
2554                                      mdd_tobj, so_attr, tg_attr);
2555         if (rc)
2556                 GOTO(out_pending, rc);
2557
2558         handle = mdd_trans_create(env, mdd);
2559         if (IS_ERR(handle))
2560                 GOTO(out_pending, rc = PTR_ERR(handle));
2561
2562         memset(ldata, 0, sizeof(*ldata));
2563         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2564                            ltname, 1, 0, ldata);
2565         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2566                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2567         if (rc)
2568                 GOTO(stop, rc);
2569
2570         rc = mdd_trans_start(env, mdd, handle);
2571         if (rc)
2572                 GOTO(stop, rc);
2573
2574         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2575         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2576         if (rc < 0)
2577                 GOTO(cleanup_unlocked, rc);
2578
2579         /* Get locks in determined order */
2580         if (rc == MDD_RN_SAME) {
2581                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2582                                           sname, MOR_SRC_PARENT);
2583                 /* check hashes to determine do we need one lock or two */
2584                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2585                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2586                                 MOR_TGT_PARENT);
2587                 else
2588                         tdlh = sdlh;
2589         } else if (rc == MDD_RN_SRCTGT) {
2590                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2591                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2592         } else {
2593                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2594                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2595         }
2596         if (sdlh == NULL || tdlh == NULL)
2597                 GOTO(cleanup, rc = -ENOMEM);
2598
2599         is_dir = S_ISDIR(so_attr->la_mode);
2600
2601         /* Remove source name from source directory */
2602         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2603                                 mdd_object_capa(env, mdd_spobj));
2604         if (rc)
2605                 GOTO(cleanup, rc);
2606
2607         /* "mv dir1 dir2" needs "dir1/.." link update */
2608         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2609                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2610                                         mdd_object_capa(env, mdd_sobj));
2611                 if (rc)
2612                         GOTO(fixup_spobj2, rc);
2613
2614                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2615                                       handle, mdd_object_capa(env, mdd_sobj));
2616                 if (rc)
2617                         GOTO(fixup_spobj, rc);
2618         }
2619
2620         /* Remove target name from target directory
2621          * Here tobj can be remote one, so we do index_delete unconditionally
2622          * and -ENOENT is allowed.
2623          */
2624         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2625                                 mdd_object_capa(env, mdd_tpobj));
2626         if (rc != 0) {
2627                 if (mdd_tobj) {
2628                         /* tname might been renamed to something else */
2629                         GOTO(fixup_spobj, rc);
2630                 }
2631                 if (rc != -ENOENT)
2632                         GOTO(fixup_spobj, rc);
2633         }
2634
2635         /* Insert new fid with target name into target dir */
2636         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2637                                 mdd_object_capa(env, mdd_tpobj));
2638         if (rc)
2639                 GOTO(fixup_tpobj, rc);
2640
2641         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2642         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2643
2644         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2645         if (mdd_sobj) {
2646                 la->la_valid = LA_CTIME;
2647                 rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
2648                 if (rc)
2649                         GOTO(fixup_tpobj, rc);
2650         }
2651
2652         /* Remove old target object
2653          * For tobj is remote case cmm layer has processed
2654          * and set tobj to NULL then. So when tobj is NOT NULL,
2655          * it must be local one.
2656          */
2657         if (tobj && mdd_object_exists(mdd_tobj)) {
2658                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2659                 tobj_locked = 1;
2660                 if (mdd_is_dead_obj(mdd_tobj)) {
2661                         /* shld not be dead, something is wrong */
2662                         CERROR("tobj is dead, something is wrong\n");
2663                         rc = -EINVAL;
2664                         goto cleanup;
2665                 }
2666                 mdo_ref_del(env, mdd_tobj, handle);
2667
2668                 /* Remove dot reference. */
2669                 if (S_ISDIR(tg_attr->la_mode))
2670                         mdo_ref_del(env, mdd_tobj, handle);
2671                 tobj_ref = 1;
2672
2673                 /* fetch updated nlink */
2674                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2675                                 mdd_object_capa(env, mdd_tobj));
2676                 if (rc != 0) {
2677                         CERROR("%s: Failed to get nlink for tobj "
2678                                 DFID": rc = %d\n",
2679                                 mdd2obd_dev(mdd)->obd_name,
2680                                 PFID(tpobj_fid), rc);
2681                         GOTO(fixup_tpobj, rc);
2682                 }
2683
2684                 la->la_valid = LA_CTIME;
2685                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2686                 if (rc != 0) {
2687                         CERROR("%s: Failed to set ctime for tobj "
2688                                 DFID": rc = %d\n",
2689                                 mdd2obd_dev(mdd)->obd_name,
2690                                 PFID(tpobj_fid), rc);
2691                         GOTO(fixup_tpobj, rc);
2692                 }
2693
2694                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2695                 ma->ma_attr = *tg_attr;
2696                 ma->ma_valid |= MA_INODE;
2697                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2698                 if (rc != 0) {
2699                         CERROR("%s: Failed to unlink tobj "
2700                                 DFID": rc = %d\n",
2701                                 mdd2obd_dev(mdd)->obd_name,
2702                                 PFID(tpobj_fid), rc);
2703                         GOTO(fixup_tpobj, rc);
2704                 }
2705
2706                 /* fetch updated nlink */
2707                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2708                                 mdd_object_capa(env, mdd_tobj));
2709                 if (rc != 0) {
2710                         CERROR("%s: Failed to get nlink for tobj "
2711                                 DFID": rc = %d\n",
2712                                 mdd2obd_dev(mdd)->obd_name,
2713                                 PFID(tpobj_fid), rc);
2714                         GOTO(fixup_tpobj, rc);
2715                 }
2716                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2717                 ma->ma_attr = *tg_attr;
2718                 ma->ma_valid |= MA_INODE;
2719
2720                 if (so_attr->la_nlink == 0)
2721                         cl_flags |= CLF_RENAME_LAST;
2722         }
2723
2724         la->la_valid = LA_CTIME | LA_MTIME;
2725         rc = mdd_attr_check_set_internal(env, mdd_spobj, la, handle, 0);
2726         if (rc)
2727                 GOTO(fixup_tpobj, rc);
2728
2729         if (mdd_spobj != mdd_tpobj) {
2730                 la->la_valid = LA_CTIME | LA_MTIME;
2731                 rc = mdd_attr_check_set_internal(env, mdd_tpobj, la,
2732                                                  handle, 0);
2733         }
2734
2735         if (rc == 0 && mdd_sobj) {
2736                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2737                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2738                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2739                                       0, 0);
2740                 if (rc == -ENOENT)
2741                         /* Old files might not have EA entry */
2742                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2743                                       lsname, handle, NULL, 0);
2744                 mdd_write_unlock(env, mdd_sobj);
2745                 /* We don't fail the transaction if the link ea can't be
2746                    updated -- fid2path will use alternate lookup method. */
2747                 rc = 0;
2748         }
2749
2750         EXIT;
2751
2752 fixup_tpobj:
2753         if (rc) {
2754                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2755                                          BYPASS_CAPA);
2756                 if (rc2)
2757                         CWARN("tp obj fix error %d\n",rc2);
2758
2759                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2760                     !mdd_is_dead_obj(mdd_tobj)) {
2761                         if (tobj_ref) {
2762                                 mdo_ref_add(env, mdd_tobj, handle);
2763                                 if (is_dir)
2764                                         mdo_ref_add(env, mdd_tobj, handle);
2765                         }
2766
2767                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2768                                          mdo2fid(mdd_tobj), tname,
2769                                          is_dir, handle,
2770                                          BYPASS_CAPA);
2771
2772                         if (rc2)
2773                                 CWARN("tp obj fix error %d\n",rc2);
2774                 }
2775         }
2776
2777 fixup_spobj:
2778         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2779                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2780                                               BYPASS_CAPA);
2781
2782                 if (rc2)
2783                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2784                                mdd2obd_dev(mdd)->obd_name, rc2);
2785
2786
2787                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2788                                               dotdot, handle, BYPASS_CAPA);
2789                 if (rc2)
2790                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2791                               mdd2obd_dev(mdd)->obd_name, rc2);
2792         }
2793
2794 fixup_spobj2:
2795         if (rc) {
2796                 rc2 = __mdd_index_insert(env, mdd_spobj,
2797                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2798                 if (rc2)
2799                         CWARN("sp obj fix error %d\n",rc2);
2800         }
2801 cleanup:
2802         if (tobj_locked)
2803                 mdd_write_unlock(env, mdd_tobj);
2804         if (likely(tdlh) && sdlh != tdlh)
2805                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2806         if (likely(sdlh))
2807                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2808 cleanup_unlocked:
2809         if (rc == 0)
2810                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2811                                                 mdd_tobj, tpobj_fid, lf,
2812                                                 spobj_fid, ltname, lsname,
2813                                                 handle);
2814
2815 stop:
2816         mdd_trans_stop(env, mdd, rc, handle);
2817 out_pending:
2818         mdd_object_put(env, mdd_sobj);
2819         return rc;
2820 }
2821
2822 const struct md_dir_operations mdd_dir_ops = {
2823         .mdo_is_subdir     = mdd_is_subdir,
2824         .mdo_lookup        = mdd_lookup,
2825         .mdo_create        = mdd_create,
2826         .mdo_rename        = mdd_rename,
2827         .mdo_link          = mdd_link,
2828         .mdo_unlink        = mdd_unlink,
2829         .mdo_create_data   = mdd_create_data,
2830 };